Coverage for /home/ubuntu/hidebound/python/hidebound/core/parser.py: 100%

126 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-05 23:50 +0000

1from typing import Any, Callable, Dict, List # noqa F401 

2 

3from copy import copy 

4from pyparsing import Group, Optional, ParseException, Regex, Suppress 

5# ------------------------------------------------------------------------------ 

6 

7 

8class AssetNameParser: 

9 ''' 

10 A class for converting asset names to metadata and metadata to asset names, 

11 according to a dynimcally defined grammar. 

12 ''' 

13 FIELD_SEPARATOR = '_' # type: str 

14 TOKEN_SEPARATOR = '-' # type: str 

15 PROJECT_INDICATOR = 'p' + TOKEN_SEPARATOR # type: str 

16 SPECIFICATION_INDICATOR = 's' + TOKEN_SEPARATOR # type: str 

17 DESCRIPTOR_INDICATOR = 'd' + TOKEN_SEPARATOR # type: str 

18 VERSION_INDICATOR = 'v' # type: str 

19 COORDINATE_INDICATOR = 'c' # type: str 

20 FRAME_INDICATOR = 'f' # type: str 

21 EXTENSION_INDICATOR = '.' # type: str 

22 LEGAL_FIELDS = [ 

23 'project', 

24 'specification', 

25 'descriptor', 

26 'version', 

27 'coordinate', 

28 'frame', 

29 'extension' 

30 ] # type: List[str] 

31 

32 VERSION_PADDING = 3 # type: int 

33 COORDINATE_PADDING = 4 # type: int 

34 FRAME_PADDING = 4 # type: int 

35 

36 def __init__(self, fields): 

37 # type: (List[str]) -> None 

38 ''' 

39 Create a AssetNameParser instance with given fields. 

40 

41 Args: 

42 fields (list[str]): An ordered list of asset fields. 

43 

44 Raises: 

45 ValueError: If fields is empty. 

46 ValueError: If fields are duplicated. 

47 ValueError: If illegal fields are given. 

48 ValueError: If illegal field order given. 

49 

50 Returns: 

51 AssetNameParser: instance. 

52 ''' 

53 # ensure fields is not empty 

54 if len(fields) == 0: 

55 msg = 'Fields cannot be empty.' 

56 raise ValueError(msg) 

57 

58 # ensure fields are ot duplicated 

59 if len(fields) != len(set(fields)): 

60 msg = 'Fields cannot contain duplicates.' 

61 raise ValueError(msg) 

62 

63 # ensure fields are legal 

64 illegal_fields = list(filter(lambda x: x not in self.LEGAL_FIELDS, fields)) 

65 if len(illegal_fields) > 0: 

66 msg = f'Illegal fields found: {illegal_fields}. ' 

67 msg += f'Legal fields include: {self.LEGAL_FIELDS}.' 

68 raise ValueError(msg) 

69 

70 # ensure extension is last field 

71 if 'extension' in fields and fields[-1] != 'extension': 

72 msg = 'Illegal field order: Extension field must be last if it is ' 

73 msg += 'included in fields.' 

74 raise ValueError(msg) 

75 

76 grammar = self._get_grammar() 

77 self._extension_parser = self._get_extension_parser(grammar) 

78 self._parser = self._get_parser(grammar, fields) 

79 self._fields = fields 

80 

81 # GRAMMAR------------------------------------------------------------------- 

82 @staticmethod 

83 def _raise_field_error(field, part): 

84 # type: (str, str) -> Callable[[str, Any, Any, Any], None] 

85 ''' 

86 A convenience function used for raising custom ParseExceptions. 

87 

88 Args: 

89 field (str): Field. 

90 part (str): Part of field. 

91 

92 Returns: 

93 function: lambda s, l, i, e: raise_error(field, s, i) 

94 ''' 

95 def raise_error(field, text, instance): 

96 # type: (str, str, Any) -> None 

97 expr = None 

98 if hasattr(instance, 'expr'): 

99 expr = instance.expr 

100 else: 

101 expr = instance.pattern 

102 

103 msg = f'Illegal {field} field {part} in "{text}". ' 

104 msg += f'Expecting: {expr}' 

105 raise ParseException(msg) 

106 return lambda s, l, i, e: raise_error(field, s, i) # noqa E741 

107 

108 @staticmethod 

109 def _get_grammar(): 

110 # type: () -> Dict[str, Any] 

111 ''' 

112 Create parser grammar dictionary. 

113 

114 Returns: 

115 dict: Grammar. 

116 ''' 

117 project = Regex(r'[a-z]{3,10}\d\d?\d?\d?') \ 

118 .setResultsName('project') \ 

119 .setFailAction(AssetNameParser._raise_field_error('project', 'token')) 

120 

121 specification = Regex(r'[a-z]{3,4}\d\d\d') \ 

122 .setResultsName('specification') \ 

123 .setFailAction(AssetNameParser._raise_field_error('specification', 'token')) 

124 

125 descriptor = Regex(r'[a-z0-9][a-z0-9-]*') \ 

126 .setResultsName('descriptor') \ 

127 .setFailAction(AssetNameParser._raise_field_error('descriptor', 'token')) 

128 

129 version = Regex(r'\d{' + str(AssetNameParser.VERSION_PADDING) + '}') \ 

130 .setParseAction(lambda s, l, t: int(t[0])) \ 

131 .setResultsName('version') \ 

132 .setFailAction(AssetNameParser._raise_field_error('version', 'token')) # noqa E741 

133 

134 coord = Regex(r'\d{' + str(AssetNameParser.COORDINATE_PADDING) + '}') \ 

135 .setParseAction(lambda s, l, t: int(t[0])) # noqa E741 

136 t_sep = Suppress(AssetNameParser.TOKEN_SEPARATOR) 

137 opt_coord = Optional(t_sep + coord) 

138 coordinate = Group(coord + opt_coord + opt_coord) \ 

139 .setResultsName('coordinate') \ 

140 .setFailAction(AssetNameParser._raise_field_error('coordinate', 'token')) 

141 

142 frame = Regex(r'\d{' + str(AssetNameParser.FRAME_PADDING) + '}') \ 

143 .setParseAction(lambda s, l, t: int(t[0])) \ 

144 .setResultsName('frame') \ 

145 .setFailAction(AssetNameParser._raise_field_error('frame', 'token')) # noqa E741 

146 

147 extension = Regex(r'[a-zA-Z0-9]+$') \ 

148 .setResultsName('extension') \ 

149 .setFailAction(AssetNameParser._raise_field_error('extension', 'token')) 

150 # ---------------------------------------------------------------------- 

151 

152 project_indicator = Suppress(AssetNameParser.PROJECT_INDICATOR) \ 

153 .setFailAction(AssetNameParser._raise_field_error('project', 'indicator')) 

154 

155 specification_indicator = Suppress(AssetNameParser.SPECIFICATION_INDICATOR) \ 

156 .setFailAction(AssetNameParser._raise_field_error('specification', 'indicator')) 

157 

158 descriptor_indicator = Suppress(AssetNameParser.DESCRIPTOR_INDICATOR) \ 

159 .setFailAction(AssetNameParser._raise_field_error('descriptor', 'indicator')) 

160 

161 version_indicator = Suppress(AssetNameParser.VERSION_INDICATOR) \ 

162 .setFailAction(AssetNameParser._raise_field_error('version', 'indicator')) 

163 

164 coordinate_indicator = Suppress(AssetNameParser.COORDINATE_INDICATOR) \ 

165 .setFailAction(AssetNameParser._raise_field_error('coordinate', 'indicator')) 

166 

167 frame_indicator = Suppress(AssetNameParser.FRAME_INDICATOR) \ 

168 .setFailAction(AssetNameParser._raise_field_error('frame', 'indicator')) 

169 

170 extension_indicator = Suppress(AssetNameParser.EXTENSION_INDICATOR) \ 

171 .setFailAction(AssetNameParser._raise_field_error('extension', 'indicator')) 

172 # ---------------------------------------------------------------------- 

173 

174 grammar = { 

175 'project': project_indicator + project, 

176 'specification': specification_indicator + specification, 

177 'specification_token': specification, 

178 'descriptor': descriptor_indicator + descriptor, 

179 'version': version_indicator + version, 

180 'coordinate': coordinate_indicator + coordinate, 

181 'frame': frame_indicator + frame, 

182 'extension': extension_indicator + extension, 

183 'extension_token': extension, 

184 'field_separator': Suppress(AssetNameParser.FIELD_SEPARATOR) 

185 } 

186 return grammar 

187 

188 # PARSERS------------------------------------------------------------------- 

189 @staticmethod 

190 def _get_extension_parser(grammar): 

191 # type: (Dict[str, Any]) -> Group 

192 ''' 

193 Creates a parser for file extensions. 

194 

195 Args: 

196 grammar (dict): AssetNameParser grammar dictionary. 

197 

198 Returns: 

199 Group: Parser. 

200 ''' 

201 parser = Optional(Suppress(Regex(r'.*\.|.*?'))) + grammar['extension_token'] 

202 output = Group(parser) 

203 return output 

204 

205 @staticmethod 

206 def _get_parser(grammar, fields): 

207 # type: (Dict[str, Any], List[str]) -> Group 

208 ''' 

209 Creates a parser for asset names. 

210 

211 Args: 

212 grammar (dict): AssetNameParser grammar dictionary. 

213 fields (list[str]): List of fields. 

214 

215 Returns: 

216 Group: Parser. 

217 ''' 

218 parser = Suppress(Regex('^')) # type: Any 

219 for i, field in enumerate(fields[:-1]): 

220 parser += grammar[field] 

221 if fields[i + 1] != 'extension': 

222 parser += grammar['field_separator'] 

223 parser += grammar[fields[-1]] 

224 parser += Suppress(Regex('$')) 

225 output = Group(parser) 

226 return output 

227 

228 @staticmethod 

229 def _get_specification_parser(): 

230 # type: () -> Group 

231 ''' 

232 Returns a parser for finding a specification within an arbitrary string. 

233 

234 Returns: 

235 Group: Parser. 

236 ''' 

237 grammar = AssetNameParser._get_grammar() 

238 indicator = Suppress(Regex('.*?' + AssetNameParser.SPECIFICATION_INDICATOR)) 

239 parser = indicator + grammar['specification_token'] + Suppress(Regex(r'$|\D')) 

240 parser = Group(parser) 

241 return parser 

242 

243 # PUBLIC-------------------------------------------------------------------- 

244 @staticmethod 

245 def parse_specification(text): 

246 # type: (str) -> Dict 

247 ''' 

248 Parse a string for a specification. 

249 

250 Args: 

251 text (str): String to be parsed. 

252 

253 Raises: 

254 ParseException: If specification is not found. 

255 

256 Returns: 

257 dict: Dictionary with "specification" key. 

258 ''' 

259 try: 

260 return AssetNameParser\ 

261 ._get_specification_parser()\ 

262 .parseString(text)[0].asDict() 

263 except ParseException: 

264 msg = f'Specification not found in "{text}".' 

265 raise ParseException(msg) 

266 

267 def parse(self, text): 

268 # type: (str) -> dict 

269 ''' 

270 Parse a given string. 

271 

272 Args: 

273 text (str): String to be parsed. 

274 

275 Raises: 

276 ParseException: If parse fails. 

277 

278 Returns: 

279 dict: parser. 

280 ''' 

281 if self._fields == ['extension']: 

282 return self._extension_parser.parseString(text)[0].asDict() 

283 return self._parser.parseString(text)[0].asDict() 

284 

285 def to_string(self, dict_): 

286 # type: (Dict) -> str 

287 ''' 

288 Converts a given dictionary to a string. 

289 

290 Args: 

291 dict_ (dict): Dictionary. 

292 

293 Returns: 

294 str: Asset name. 

295 ''' 

296 fields = copy(self._fields) 

297 has_extension = False 

298 if fields[-1] == 'extension': 

299 has_extension = True 

300 fields.pop() 

301 

302 output = [] # type: Any 

303 for field in fields: 

304 if field in dict_.keys(): 

305 indicator = getattr(self, field.upper() + '_INDICATOR') 

306 

307 token = dict_[field] 

308 if field == 'version': 

309 token = str(token).zfill(self.VERSION_PADDING) 

310 

311 elif field == 'coordinate': 

312 token = [str(x).zfill(self.COORDINATE_PADDING) for x in token] 

313 token = self.TOKEN_SEPARATOR.join(token) 

314 

315 elif field == 'frame': 

316 token = str(token).zfill(self.FRAME_PADDING) 

317 

318 output.append(indicator + token) 

319 output = self.FIELD_SEPARATOR.join(output) 

320 

321 if has_extension: 

322 output += self.EXTENSION_INDICATOR + dict_['extension'] 

323 return output