Coverage for /home/ubuntu/hidebound/python/hidebound/core/parser.py: 100%
126 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-05 23:50 +0000
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-05 23:50 +0000
1from typing import Any, Callable, Dict, List # noqa F401
3from copy import copy
4from pyparsing import Group, Optional, ParseException, Regex, Suppress
5# ------------------------------------------------------------------------------
8class AssetNameParser:
9 '''
10 A class for converting asset names to metadata and metadata to asset names,
11 according to a dynimcally defined grammar.
12 '''
13 FIELD_SEPARATOR = '_' # type: str
14 TOKEN_SEPARATOR = '-' # type: str
15 PROJECT_INDICATOR = 'p' + TOKEN_SEPARATOR # type: str
16 SPECIFICATION_INDICATOR = 's' + TOKEN_SEPARATOR # type: str
17 DESCRIPTOR_INDICATOR = 'd' + TOKEN_SEPARATOR # type: str
18 VERSION_INDICATOR = 'v' # type: str
19 COORDINATE_INDICATOR = 'c' # type: str
20 FRAME_INDICATOR = 'f' # type: str
21 EXTENSION_INDICATOR = '.' # type: str
22 LEGAL_FIELDS = [
23 'project',
24 'specification',
25 'descriptor',
26 'version',
27 'coordinate',
28 'frame',
29 'extension'
30 ] # type: List[str]
32 VERSION_PADDING = 3 # type: int
33 COORDINATE_PADDING = 4 # type: int
34 FRAME_PADDING = 4 # type: int
36 def __init__(self, fields):
37 # type: (List[str]) -> None
38 '''
39 Create a AssetNameParser instance with given fields.
41 Args:
42 fields (list[str]): An ordered list of asset fields.
44 Raises:
45 ValueError: If fields is empty.
46 ValueError: If fields are duplicated.
47 ValueError: If illegal fields are given.
48 ValueError: If illegal field order given.
50 Returns:
51 AssetNameParser: instance.
52 '''
53 # ensure fields is not empty
54 if len(fields) == 0:
55 msg = 'Fields cannot be empty.'
56 raise ValueError(msg)
58 # ensure fields are ot duplicated
59 if len(fields) != len(set(fields)):
60 msg = 'Fields cannot contain duplicates.'
61 raise ValueError(msg)
63 # ensure fields are legal
64 illegal_fields = list(filter(lambda x: x not in self.LEGAL_FIELDS, fields))
65 if len(illegal_fields) > 0:
66 msg = f'Illegal fields found: {illegal_fields}. '
67 msg += f'Legal fields include: {self.LEGAL_FIELDS}.'
68 raise ValueError(msg)
70 # ensure extension is last field
71 if 'extension' in fields and fields[-1] != 'extension':
72 msg = 'Illegal field order: Extension field must be last if it is '
73 msg += 'included in fields.'
74 raise ValueError(msg)
76 grammar = self._get_grammar()
77 self._extension_parser = self._get_extension_parser(grammar)
78 self._parser = self._get_parser(grammar, fields)
79 self._fields = fields
81 # GRAMMAR-------------------------------------------------------------------
82 @staticmethod
83 def _raise_field_error(field, part):
84 # type: (str, str) -> Callable[[str, Any, Any, Any], None]
85 '''
86 A convenience function used for raising custom ParseExceptions.
88 Args:
89 field (str): Field.
90 part (str): Part of field.
92 Returns:
93 function: lambda s, l, i, e: raise_error(field, s, i)
94 '''
95 def raise_error(field, text, instance):
96 # type: (str, str, Any) -> None
97 expr = None
98 if hasattr(instance, 'expr'):
99 expr = instance.expr
100 else:
101 expr = instance.pattern
103 msg = f'Illegal {field} field {part} in "{text}". '
104 msg += f'Expecting: {expr}'
105 raise ParseException(msg)
106 return lambda s, l, i, e: raise_error(field, s, i) # noqa E741
108 @staticmethod
109 def _get_grammar():
110 # type: () -> Dict[str, Any]
111 '''
112 Create parser grammar dictionary.
114 Returns:
115 dict: Grammar.
116 '''
117 project = Regex(r'[a-z]{3,10}\d\d?\d?\d?') \
118 .setResultsName('project') \
119 .setFailAction(AssetNameParser._raise_field_error('project', 'token'))
121 specification = Regex(r'[a-z]{3,4}\d\d\d') \
122 .setResultsName('specification') \
123 .setFailAction(AssetNameParser._raise_field_error('specification', 'token'))
125 descriptor = Regex(r'[a-z0-9][a-z0-9-]*') \
126 .setResultsName('descriptor') \
127 .setFailAction(AssetNameParser._raise_field_error('descriptor', 'token'))
129 version = Regex(r'\d{' + str(AssetNameParser.VERSION_PADDING) + '}') \
130 .setParseAction(lambda s, l, t: int(t[0])) \
131 .setResultsName('version') \
132 .setFailAction(AssetNameParser._raise_field_error('version', 'token')) # noqa E741
134 coord = Regex(r'\d{' + str(AssetNameParser.COORDINATE_PADDING) + '}') \
135 .setParseAction(lambda s, l, t: int(t[0])) # noqa E741
136 t_sep = Suppress(AssetNameParser.TOKEN_SEPARATOR)
137 opt_coord = Optional(t_sep + coord)
138 coordinate = Group(coord + opt_coord + opt_coord) \
139 .setResultsName('coordinate') \
140 .setFailAction(AssetNameParser._raise_field_error('coordinate', 'token'))
142 frame = Regex(r'\d{' + str(AssetNameParser.FRAME_PADDING) + '}') \
143 .setParseAction(lambda s, l, t: int(t[0])) \
144 .setResultsName('frame') \
145 .setFailAction(AssetNameParser._raise_field_error('frame', 'token')) # noqa E741
147 extension = Regex(r'[a-zA-Z0-9]+$') \
148 .setResultsName('extension') \
149 .setFailAction(AssetNameParser._raise_field_error('extension', 'token'))
150 # ----------------------------------------------------------------------
152 project_indicator = Suppress(AssetNameParser.PROJECT_INDICATOR) \
153 .setFailAction(AssetNameParser._raise_field_error('project', 'indicator'))
155 specification_indicator = Suppress(AssetNameParser.SPECIFICATION_INDICATOR) \
156 .setFailAction(AssetNameParser._raise_field_error('specification', 'indicator'))
158 descriptor_indicator = Suppress(AssetNameParser.DESCRIPTOR_INDICATOR) \
159 .setFailAction(AssetNameParser._raise_field_error('descriptor', 'indicator'))
161 version_indicator = Suppress(AssetNameParser.VERSION_INDICATOR) \
162 .setFailAction(AssetNameParser._raise_field_error('version', 'indicator'))
164 coordinate_indicator = Suppress(AssetNameParser.COORDINATE_INDICATOR) \
165 .setFailAction(AssetNameParser._raise_field_error('coordinate', 'indicator'))
167 frame_indicator = Suppress(AssetNameParser.FRAME_INDICATOR) \
168 .setFailAction(AssetNameParser._raise_field_error('frame', 'indicator'))
170 extension_indicator = Suppress(AssetNameParser.EXTENSION_INDICATOR) \
171 .setFailAction(AssetNameParser._raise_field_error('extension', 'indicator'))
172 # ----------------------------------------------------------------------
174 grammar = {
175 'project': project_indicator + project,
176 'specification': specification_indicator + specification,
177 'specification_token': specification,
178 'descriptor': descriptor_indicator + descriptor,
179 'version': version_indicator + version,
180 'coordinate': coordinate_indicator + coordinate,
181 'frame': frame_indicator + frame,
182 'extension': extension_indicator + extension,
183 'extension_token': extension,
184 'field_separator': Suppress(AssetNameParser.FIELD_SEPARATOR)
185 }
186 return grammar
188 # PARSERS-------------------------------------------------------------------
189 @staticmethod
190 def _get_extension_parser(grammar):
191 # type: (Dict[str, Any]) -> Group
192 '''
193 Creates a parser for file extensions.
195 Args:
196 grammar (dict): AssetNameParser grammar dictionary.
198 Returns:
199 Group: Parser.
200 '''
201 parser = Optional(Suppress(Regex(r'.*\.|.*?'))) + grammar['extension_token']
202 output = Group(parser)
203 return output
205 @staticmethod
206 def _get_parser(grammar, fields):
207 # type: (Dict[str, Any], List[str]) -> Group
208 '''
209 Creates a parser for asset names.
211 Args:
212 grammar (dict): AssetNameParser grammar dictionary.
213 fields (list[str]): List of fields.
215 Returns:
216 Group: Parser.
217 '''
218 parser = Suppress(Regex('^')) # type: Any
219 for i, field in enumerate(fields[:-1]):
220 parser += grammar[field]
221 if fields[i + 1] != 'extension':
222 parser += grammar['field_separator']
223 parser += grammar[fields[-1]]
224 parser += Suppress(Regex('$'))
225 output = Group(parser)
226 return output
228 @staticmethod
229 def _get_specification_parser():
230 # type: () -> Group
231 '''
232 Returns a parser for finding a specification within an arbitrary string.
234 Returns:
235 Group: Parser.
236 '''
237 grammar = AssetNameParser._get_grammar()
238 indicator = Suppress(Regex('.*?' + AssetNameParser.SPECIFICATION_INDICATOR))
239 parser = indicator + grammar['specification_token'] + Suppress(Regex(r'$|\D'))
240 parser = Group(parser)
241 return parser
243 # PUBLIC--------------------------------------------------------------------
244 @staticmethod
245 def parse_specification(text):
246 # type: (str) -> Dict
247 '''
248 Parse a string for a specification.
250 Args:
251 text (str): String to be parsed.
253 Raises:
254 ParseException: If specification is not found.
256 Returns:
257 dict: Dictionary with "specification" key.
258 '''
259 try:
260 return AssetNameParser\
261 ._get_specification_parser()\
262 .parseString(text)[0].asDict()
263 except ParseException:
264 msg = f'Specification not found in "{text}".'
265 raise ParseException(msg)
267 def parse(self, text):
268 # type: (str) -> dict
269 '''
270 Parse a given string.
272 Args:
273 text (str): String to be parsed.
275 Raises:
276 ParseException: If parse fails.
278 Returns:
279 dict: parser.
280 '''
281 if self._fields == ['extension']:
282 return self._extension_parser.parseString(text)[0].asDict()
283 return self._parser.parseString(text)[0].asDict()
285 def to_string(self, dict_):
286 # type: (Dict) -> str
287 '''
288 Converts a given dictionary to a string.
290 Args:
291 dict_ (dict): Dictionary.
293 Returns:
294 str: Asset name.
295 '''
296 fields = copy(self._fields)
297 has_extension = False
298 if fields[-1] == 'extension':
299 has_extension = True
300 fields.pop()
302 output = [] # type: Any
303 for field in fields:
304 if field in dict_.keys():
305 indicator = getattr(self, field.upper() + '_INDICATOR')
307 token = dict_[field]
308 if field == 'version':
309 token = str(token).zfill(self.VERSION_PADDING)
311 elif field == 'coordinate':
312 token = [str(x).zfill(self.COORDINATE_PADDING) for x in token]
313 token = self.TOKEN_SEPARATOR.join(token)
315 elif field == 'frame':
316 token = str(token).zfill(self.FRAME_PADDING)
318 output.append(indicator + token)
319 output = self.FIELD_SEPARATOR.join(output)
321 if has_extension:
322 output += self.EXTENSION_INDICATOR + dict_['extension']
323 return output