Coverage for /home/ubuntu/rolling-pin/python/rolling_pin/tools.py: 100%
190 statements
« prev ^ index » next coverage.py v7.1.0, created at 2023-11-15 00:43 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2023-11-15 00:43 +0000
1from typing import Any, Dict, Generator, Iterable, List, Optional, Union # noqa: F401
2import pydot # noqa: F401
4from collections import OrderedDict
5from pathlib import Path
6import logging
7import os
8import re
9import shutil
11from IPython.display import HTML, Image
12import pandas as pd
14Filepath = Union[str, Path]
15LOG_LEVEL = os.environ.get('LOG_LEVEL', 'WARNING').upper()
16logging.basicConfig(level=LOG_LEVEL)
17LOGGER = logging.getLogger(__name__)
18# ------------------------------------------------------------------------------
20'''
21Contains basic functions for more complex ETL functions and classes.
22'''
24# COLOR-SCHEME------------------------------------------------------------------
25COLOR_SCHEME = dict(
26 background='#242424',
27 node='#343434',
28 node_font='#B6ECF3',
29 node_value='#343434',
30 node_value_font='#DE958E',
31 edge='#B6ECF3',
32 edge_value='#DE958E',
33 node_library_font='#DE958E',
34 node_subpackage_font='#A0D17B',
35 node_module_font='#B6ECF3',
36 edge_library='#DE958E',
37 edge_subpackage='#A0D17B',
38 edge_module='#B6ECF3',
39) # type: Dict[str, str]
41COLOR_SCALE = [
42 '#B6ECF3',
43 '#DE958E',
44 '#EBB483',
45 '#A0D17B',
46 '#93B6E6',
47 '#AC92DE',
48 '#E9EABE',
49 '#7EC4CF',
50 '#F77E70',
51 '#EB9E58',
52] # type: List[str]
55# PREDICATE-FUNCTIONS-----------------------------------------------------------
56def is_iterable(item):
57 # type: (Any) -> bool
58 '''
59 Determines if given item is iterable.
61 Args:
62 item (object): Object to be tested.
64 Returns:
65 bool: Whether given item is iterable.
66 '''
67 if is_listlike(item) or is_dictlike(item):
68 return True
69 return False
72def is_dictlike(item):
73 # type: (Any) -> bool
74 '''
75 Determines if given item is dict-like.
77 Args:
78 item (object): Object to be tested.
80 Returns:
81 bool: Whether given item is dict-like.
82 '''
83 for type_ in [dict, OrderedDict]:
84 if isinstance(item, type_):
85 if item.__class__.__name__ == 'Counter':
86 return False
87 return True
88 return False
91def is_listlike(item):
92 # type: (Any) -> bool
93 '''
94 Determines if given item is list-like.
96 Args:
97 item (object): Object to be tested.
99 Returns:
100 bool: Whether given item is list-like.
101 '''
102 for type_ in [list, tuple, set]:
103 if isinstance(item, type_):
104 return True
105 return False
108# CORE-FUNCTIONS----------------------------------------------------------------
109def flatten(item, separator='/', embed_types=True):
110 # type: (Iterable, str, bool) -> Dict[str, Any]
111 '''
112 Flattens a iterable object into a flat dictionary.
114 Args:
115 item (object): Iterable object.
116 separator (str, optional): Field separator in keys. Default: '/'.
118 Returns:
119 dict: Dictionary representation of given object.
120 '''
121 output = {} # type: Dict[str, Any]
123 def recurse(item, cursor):
124 # type (Iterable, Any) -> None
125 if is_listlike(item):
126 if embed_types:
127 name = item.__class__.__name__
128 item = [(f'<{name}_{i}>', val) for i, val in enumerate(item)]
129 item = dict(item)
130 else:
131 item = dict(enumerate(item))
132 if is_dictlike(item):
133 for key, val in item.items():
134 new_key = f'{cursor}{separator}{str(key)}'
135 if is_iterable(val) and len(val) > 0:
136 recurse(val, new_key)
137 else:
138 final_key = re.sub('^' + separator, '', new_key)
139 output[final_key] = val
141 recurse(item, '')
142 return output
145def nest(flat_dict, separator='/'):
146 # type: (Dict[str, Any], str) -> Dict[str, Any]
147 '''
148 Converts a flat dictionary into a nested dictionary by splitting keys by a
149 given separator.
151 Args:
152 flat_dict (dict): Flat dictionary.
153 separator (str, optional): Field separator within given dictionary's
154 keys. Default: '/'.
156 Returns:
157 dict: Nested dictionary.
158 '''
159 output = {} # type: Dict[str, Any]
160 for keys, val in flat_dict.items():
161 split_keys = list(filter(
162 lambda x: x != '', keys.split(separator)
163 ))
164 cursor = output
165 last = split_keys.pop()
166 for key in split_keys:
167 if key not in cursor:
168 cursor[key] = {}
170 if not isinstance(cursor[key], dict):
171 msg = f"Duplicate key conflict. Key: '{key}'."
172 raise KeyError(msg)
174 cursor = cursor[key]
175 cursor[last] = val
176 return output
179def unembed(item):
180 # type: (Any) -> Any
181 '''
182 Convert embeded types in dictionary keys into python types.
184 Args:
185 item (object): Dictionary with embedded types.
187 Returns:
188 object: Converted object.
189 '''
190 lut = {'list': list, 'tuple': tuple, 'set': set}
191 embed_re = re.compile(r'^<([a-z]+)_(\d+)>$')
193 if is_dictlike(item) and item != {}:
194 output = {} # type: Any
195 keys = list(item.keys())
196 match = embed_re.match(keys[0])
197 if match:
198 indices = [embed_re.match(key).group(2) for key in keys] # type: ignore
199 indices = map(int, indices) # type: ignore
201 output = []
202 for i, key in sorted(zip(indices, keys)):
203 next_item = item[key]
204 if is_dictlike(next_item):
205 next_item = unembed(next_item)
206 output.append(next_item)
208 output = lut[match.group(1)](output)
209 return output
210 else:
211 for key, val in item.items():
212 output[key] = unembed(val)
213 return output
214 return item
217# FILE-FUNCTIONS----------------------------------------------------------------
218def list_all_files(
219 directory, # type: Filepath
220 include_regex=None, # type: Optional[str]
221 exclude_regex=None # type: Optional[str]
222):
223 # type: (...) -> Generator[Path, None, None]
224 '''
225 Recusively list all files within a given directory.
227 Args:
228 directory (str or Path): Directory to walk.
229 include_regex (str, optional): Include filenames that match this regex.
230 Default: None.
231 exclude_regex (str, optional): Exclude filenames that match this regex.
232 Default: None.
234 Raises:
235 FileNotFoundError: If argument is not a directory or does not exist.
237 Yields:
238 Path: File.
239 '''
240 directory = Path(directory)
241 if not directory.is_dir():
242 msg = f'{directory} is not a directory or does not exist.'
243 raise FileNotFoundError(msg)
245 include_re = re.compile(include_regex or '') # type: Any
246 exclude_re = re.compile(exclude_regex or '') # type: Any
248 for root, _, files in os.walk(directory):
249 for file_ in files:
250 filepath = Path(root, file_)
252 output = True
253 temp = filepath.absolute().as_posix()
254 if include_regex is not None and not include_re.search(temp):
255 output = False
256 if exclude_regex is not None and exclude_re.search(temp):
257 output = False
259 if output:
260 yield Path(root, file_)
263def directory_to_dataframe(directory, include_regex='', exclude_regex=r'\.DS_Store'):
264 # type: (Filepath, str, str) -> pd.DataFrame
265 r'''
266 Recursively list files with in a given directory as rows in a pd.DataFrame.
268 Args:
269 directory (str or Path): Directory to walk.
270 include_regex (str, optional): Include filenames that match this regex.
271 Default: None.
272 exclude_regex (str, optional): Exclude filenames that match this regex.
273 Default: '\.DS_Store'.
275 Returns:
276 pd.DataFrame: pd.DataFrame with one file per row.
277 '''
278 files = list_all_files(
279 directory,
280 include_regex=include_regex,
281 exclude_regex=exclude_regex
282 ) # type: Any
283 files = sorted(list(files))
285 data = pd.DataFrame()
286 data['filepath'] = files
287 data['filename'] = data.filepath.apply(lambda x: x.name)
288 data['extension'] = data.filepath \
289 .apply(lambda x: Path(x).suffix.lstrip('.'))
290 data.filepath = data.filepath.apply(lambda x: x.absolute().as_posix())
291 return data
294def get_parent_fields(key, separator='/'):
295 # type: (str, str) -> List[str]
296 '''
297 Get all the parent fields of a given key, split by given separator.
299 Args:
300 key (str): Key.
301 separator (str, optional): String that splits key into fields.
302 Default: '/'.
304 Returns:
305 list(str): List of absolute parent fields.
306 '''
307 fields = key.split(separator)
308 output = [] # type: List[str]
309 for i in range(len(fields) - 1):
310 output.append(separator.join(fields[:i + 1]))
311 return output
314def filter_text(
315 text, # type: str
316 include_regex=None, # type: Optional[str]
317 exclude_regex=None, # type: Optional[str]
318 replace_regex=None, # type: Optional[str]
319 replace_value=None, # type: Optional[str]
320):
321 # type: (...) -> str
322 '''
323 Filter given text by applying regular expressions to each line.
325 Args:
326 text (str): Newline separated lines.
327 include_regex (str, optional): Keep lines that match given regex.
328 Default: None.
329 exclude_regex (str, optional): Remove lines that match given regex.
330 Default: None.
331 replace_regex (str, optional): Substitutes regex matches in lines with
332 replace_value. Default: None.
333 replace_value (str, optional): Regex substitution value. Default: ''.
335 Raises:
336 AssertionError: If source is not a file.
338 Returns:
339 str: Filtered text.
340 '''
341 lines = text.split('\n')
342 if include_regex is not None:
343 lines = list(filter(lambda x: re.search(include_regex, x), lines)) # type: ignore
344 if exclude_regex is not None:
345 lines = list(filter(lambda x: not re.search(exclude_regex, x), lines)) # type: ignore
346 if replace_regex is not None:
347 rep_val = replace_value or ''
348 lines = [re.sub(replace_regex, rep_val, x) for x in lines]
349 output = '\n'.join(lines)
350 return output
353def read_text(filepath):
354 # type: (Filepath) -> str
355 '''
356 Convenience function for reading text from given file.
358 Args:
359 filepath (str or Path): File to be read.
361 Raises:
362 AssertionError: If source is not a file.
364 Returns:
365 str: text.
366 '''
367 assert Path(filepath).is_file()
368 with open(filepath) as f:
369 return f.read()
372def write_text(text, filepath):
373 # type: (str, Filepath) -> None
374 '''
375 Convenience function for writing text to given file.
376 Creates directories as needed.
378 Args:
379 text (str): Text to be written.
380 filepath (str or Path): File to be written.
381 '''
382 os.makedirs(Path(filepath).parent, exist_ok=True)
383 with open(filepath, 'w') as f:
384 f.write(text)
387def copy_file(source, target):
388 # type: (Filepath, Filepath) -> None
389 '''
390 Copy a source file to a target file. Creating directories as needed.
392 Args:
393 source (str or Path): Source filepath.
394 target (str or Path): Target filepath.
396 Raises:
397 AssertionError: If source is not a file.
398 '''
399 assert Path(source).is_file()
400 os.makedirs(Path(target).parent, exist_ok=True)
401 shutil.copy2(source, target)
404def move_file(source, target):
405 # type: (Filepath, Filepath) -> None
406 '''
407 Moves a source file to a target file. Creating directories as needed.
409 Args:
410 source (str or Path): Source filepath.
411 target (str or Path): Target filepath.
413 Raises:
414 AssertionError: If source is not a file.
415 '''
416 src = Path(source).as_posix()
417 assert Path(src).is_file()
418 os.makedirs(Path(target).parent, exist_ok=True)
419 shutil.move(src, target)
422# EXPORT-FUNCTIONS--------------------------------------------------------------
423def dot_to_html(dot, layout='dot', as_png=False):
424 # type: (pydot.Dot, str, bool) -> Union[HTML, Image]
425 '''
426 Converts a given pydot graph into a IPython.display.HTML object.
427 Used in jupyter lab inline display of graph data.
429 Args:
430 dot (pydot.Dot): Pydot Graph instance.
431 layout (str, optional): Graph layout style.
432 Options include: circo, dot, fdp, neato, sfdp, twopi.
433 Default: dot.
434 as_png (bool, optional): Display graph as a PNG image instead of SVG.
435 Useful for display on Github. Default: False.
437 Raises:
438 ValueError: If invalid layout given.
440 Returns:
441 IPython.display.HTML: HTML instance.
442 '''
443 layouts = ['circo', 'dot', 'fdp', 'neato', 'sfdp', 'twopi']
444 if layout not in layouts:
445 msg = f'Invalid layout value. {layout} not in {layouts}.'
446 raise ValueError(msg)
448 if as_png:
449 return Image(data=dot.create_png())
451 svg = dot.create_svg(prog=layout)
452 html = f'<object type="image/svg+xml" data="data:image/svg+xml;{svg}"></object>' # type: Any
453 html = HTML(html)
454 html.data = re.sub(r'\\n|\\', '', html.data)
455 html.data = re.sub('</svg>.*', '</svg>', html.data)
456 return html
459def write_dot_graph(
460 dot,
461 fullpath,
462 layout='dot',
463):
464 # type: (pydot.Dot, Union[str, Path], str) -> None
465 '''
466 Writes a pydot.Dot object to a given filepath.
467 Formats supported: svg, dot, png.
469 Args:
470 dot (pydot.Dot): Pydot Dot instance.
471 fulllpath (str or Path): File to be written to.
472 layout (str, optional): Graph layout style.
473 Options include: circo, dot, fdp, neato, sfdp, twopi. Default: dot.
475 Raises:
476 ValueError: If invalid file extension given.
477 '''
478 if isinstance(fullpath, Path):
479 fullpath = Path(fullpath).absolute().as_posix()
481 _, ext = os.path.splitext(fullpath)
482 ext = re.sub(r'^\.', '', ext)
483 if re.search('^svg$', ext, re.I):
484 dot.write_svg(fullpath, prog=layout)
485 elif re.search('^dot$', ext, re.I):
486 dot.write_dot(fullpath, prog=layout)
487 elif re.search('^png$', ext, re.I):
488 dot.write_png(fullpath, prog=layout)
489 else:
490 msg = f'Invalid extension found: {ext}. '
491 msg += 'Valid extensions include: svg, dot, png.'
492 raise ValueError(msg)
495# MISC-FUNCTIONS----------------------------------------------------------------
496def replace_and_format(regex, replace, string, flags=0):
497 # type: (str, str, str, Any) -> str
498 r'''
499 Perform a regex substitution on a given string and format any named group
500 found in the result with groupdict data from the pattern. Group beggining
501 with 'i' will be converted to integers. Groups beggining with 'f' will be
502 converted to floats.
504 ----------------------------------------------------------------------------
506 Named group anatomy:
507 ====================
508 * (?P<NAME>PATTERN)
509 * NAME becomes a key and whatever matches PATTERN becomes its value.
510 >>> re.search('(?P<i>\d+)', 'foobar123').groupdict()
511 {'i': '123'}
513 ----------------------------------------------------------------------------
515 Examples:
516 =========
517 Special groups:
518 * (?P<i>\d) - string matched by '\d' will be converted to an integer
519 * (?P<f>\d) - string matched by '\d' will be converted to an float
520 * (?P<i_foo>\d) - string matched by '\d' will be converted to an integer
521 * (?P<f_bar>\d) - string matched by '\d' will be converted to an float
523 Named groups (long):
524 >>> proj = '(?P<p>[a-z0-9]+)'
525 >>> spec = '(?P<s>[a-z0-9]+)'
526 >>> desc = '(?P<d>[a-z0-9\-]+)'
527 >>> ver = '(?P<iv>\d+)\.'
528 >>> frame = '(?P<i_f>\d+)'
529 >>> regex = f'{proj}\.{spec}\.{desc}\.v{ver}\.{frame}.*'
530 >>> replace = 'p-{p}_s-{s}_d-{d}_v{iv:03d}_f{i_f:04d}.jpeg'
531 >>> string = 'proj.spec.desc.v1.25.png'
532 >>> replace_and_format(regex, replace, string, flags=re.IGNORECASE)
533 p-proj_s-spec_d-desc_v001_f0025.jpeg
535 Named groups (short):
536 >>> replace_and_format(
537 '(?P<p>[a-z0-9]+)\.(?P<s>[a-z0-9]+)\.(?P<d>[a-z0-9\-]+)\.v(?P<iv>\d+)\.(?P<i_f>\d+).*',
538 'p-{p}_s-{s}_d-{d}_v{iv:03d}_f{i_f:04d}.jpeg',
539 'proj.spec.desc.v1.25.png',
540 )
541 p-proj_s-spec_d-desc_v001_f0025.jpeg
543 No groups:
544 >>> replace_and_format('foo', 'bar', 'foobar')
545 barbar
547 ----------------------------------------------------------------------------
549 Args:
550 regex (str): Regex pattern to search string with.
551 replace (str): Replacement string which may contain formart variables
552 ie '{variable}'.
553 string (str): String to be converted.
554 flags (object, optional): re.sub flags. Default: 0.
556 Returns:
557 str: Converted string.
558 '''
559 match = re.search(regex, string, flags=flags)
560 grp = {}
561 if match:
562 grp = match.groupdict()
564 for key, val in grp.items():
565 if key.startswith('f'):
566 grp[key] = float(val)
567 elif key.startswith('i'):
568 grp[key] = int(val)
570 output = re.sub(regex, replace, string, flags=flags)
571 # .format won't evaluate math expressions so do this
572 if grp != {}:
573 output = eval(f"f'{output}'", None, grp)
574 return output