Coverage for /home/ubuntu/rolling-pin/python/rolling_pin/tools.py: 100%
193 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-13 19:35 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-13 19:35 +0000
1from typing import Any, Dict, Generator, Iterable, List, Optional, Union # noqa: F401
2import pydot # noqa: F401
4from collections import OrderedDict
5from pathlib import Path
6import logging
7import os
8import re
9import shutil
11from IPython.display import HTML, Image
12import pandas as pd
14Filepath = Union[str, Path]
15LOG_LEVEL = os.environ.get('LOG_LEVEL', 'WARNING').upper()
16logging.basicConfig(level=LOG_LEVEL)
17LOGGER = logging.getLogger(__name__)
18# ------------------------------------------------------------------------------
20'''
21Contains basic functions for more complex ETL functions and classes.
22'''
24# COLOR-SCHEME------------------------------------------------------------------
25COLOR_SCHEME = dict(
26 background='#242424',
27 node='#343434',
28 node_font='#B6ECF3',
29 node_value='#343434',
30 node_value_font='#DE958E',
31 edge='#B6ECF3',
32 edge_value='#DE958E',
33 node_library_font='#DE958E',
34 node_subpackage_font='#A0D17B',
35 node_module_font='#B6ECF3',
36 edge_library='#DE958E',
37 edge_subpackage='#A0D17B',
38 edge_module='#B6ECF3',
39) # type: Dict[str, str]
42PLOTLY_COLOR_SCHEME = dict(
43 bg='#242424',
44 blue1='#5F95DE',
45 blue2='#93B6E6',
46 cyan1='#7EC4CF',
47 cyan2='#B6ECF3',
48 dark1='#040404',
49 dark2='#141414',
50 dialog1='#444459',
51 dialog2='#5D5D7A',
52 green1='#8BD155',
53 green2='#A0D17B',
54 grey1='#343434',
55 grey2='#444444',
56 light1='#A4A4A4',
57 light2='#F4F4F4',
58 orange1='#EB9E58',
59 orange2='#EBB483',
60 purple1='#C98FDE',
61 purple2='#AC92DE',
62 red1='#F77E70',
63 red2='#DE958E',
64 yellow1='#E8EA7E',
65 yellow2='#E9EABE',
66)
69COLOR_SCALE = [
70 'cyan2', 'red2', 'green2', 'blue2', 'orange2', 'purple2', 'yellow2',
71 'light2', 'cyan1', 'red1', 'green1', 'blue1'
72] # type: List[str]
73COLOR_SCALE = [PLOTLY_COLOR_SCHEME[x] for x in COLOR_SCALE]
76PLOTLY_LAYOUT_THEME = {
77 'legend': {
78 'bgcolor': PLOTLY_COLOR_SCHEME['bg'],
79 'title': {'text': ''},
80 'font': {
81 'color': PLOTLY_COLOR_SCHEME['light2']
82 }
83 },
84 'paper_bgcolor': PLOTLY_COLOR_SCHEME['bg'],
85 'plot_bgcolor': PLOTLY_COLOR_SCHEME['bg'],
86 'title': {
87 'font': {
88 'color': PLOTLY_COLOR_SCHEME['light2']
89 }
90 },
91 'xaxis': {
92 'gridcolor': PLOTLY_COLOR_SCHEME['grey1'],
93 'showgrid': True,
94 'tickfont': {
95 'color': PLOTLY_COLOR_SCHEME['light1']
96 },
97 'title': {
98 'font': {
99 'color': PLOTLY_COLOR_SCHEME['light1']
100 }
101 },
102 'zerolinecolor': PLOTLY_COLOR_SCHEME['grey2']
103 },
104 'yaxis': {
105 'gridcolor': PLOTLY_COLOR_SCHEME['grey1'],
106 'showgrid': True,
107 'tickfont': {
108 'color': PLOTLY_COLOR_SCHEME['light1']
109 },
110 'title': {
111 'font': {
112 'color': PLOTLY_COLOR_SCHEME['light1']
113 }
114 },
115 'zerolinecolor': PLOTLY_COLOR_SCHEME['grey2']
116 }
117}
120# PREDICATE-FUNCTIONS-----------------------------------------------------------
121def is_iterable(item):
122 # type: (Any) -> bool
123 '''
124 Determines if given item is iterable.
126 Args:
127 item (object): Object to be tested.
129 Returns:
130 bool: Whether given item is iterable.
131 '''
132 if is_listlike(item) or is_dictlike(item):
133 return True
134 return False
137def is_dictlike(item):
138 # type: (Any) -> bool
139 '''
140 Determines if given item is dict-like.
142 Args:
143 item (object): Object to be tested.
145 Returns:
146 bool: Whether given item is dict-like.
147 '''
148 for type_ in [dict, OrderedDict]:
149 if isinstance(item, type_):
150 if item.__class__.__name__ == 'Counter':
151 return False
152 return True
153 return False
156def is_listlike(item):
157 # type: (Any) -> bool
158 '''
159 Determines if given item is list-like.
161 Args:
162 item (object): Object to be tested.
164 Returns:
165 bool: Whether given item is list-like.
166 '''
167 for type_ in [list, tuple, set]:
168 if isinstance(item, type_):
169 return True
170 return False
173# CORE-FUNCTIONS----------------------------------------------------------------
174def flatten(item, separator='/', embed_types=True):
175 # type: (Iterable, str, bool) -> Dict[str, Any]
176 '''
177 Flattens a iterable object into a flat dictionary.
179 Args:
180 item (object): Iterable object.
181 separator (str, optional): Field separator in keys. Default: '/'.
183 Returns:
184 dict: Dictionary representation of given object.
185 '''
186 output = {} # type: Dict[str, Any]
188 def recurse(item, cursor):
189 # type (Iterable, Any) -> None
190 if is_listlike(item):
191 if embed_types:
192 name = item.__class__.__name__
193 item = [(f'<{name}_{i}>', val) for i, val in enumerate(item)]
194 item = dict(item)
195 else:
196 item = dict(enumerate(item))
197 if is_dictlike(item):
198 for key, val in item.items():
199 new_key = f'{cursor}{separator}{str(key)}'
200 if is_iterable(val) and len(val) > 0:
201 recurse(val, new_key)
202 else:
203 final_key = re.sub('^' + separator, '', new_key)
204 output[final_key] = val
206 recurse(item, '')
207 return output
210def nest(flat_dict, separator='/'):
211 # type: (Dict[str, Any], str) -> Dict[str, Any]
212 '''
213 Converts a flat dictionary into a nested dictionary by splitting keys by a
214 given separator.
216 Args:
217 flat_dict (dict): Flat dictionary.
218 separator (str, optional): Field separator within given dictionary's
219 keys. Default: '/'.
221 Returns:
222 dict: Nested dictionary.
223 '''
224 output = {} # type: Dict[str, Any]
225 for keys, val in flat_dict.items():
226 split_keys = list(filter(
227 lambda x: x != '', keys.split(separator)
228 ))
229 cursor = output
230 last = split_keys.pop()
231 for key in split_keys:
232 if key not in cursor:
233 cursor[key] = {}
235 if not isinstance(cursor[key], dict):
236 msg = f"Duplicate key conflict. Key: '{key}'."
237 raise KeyError(msg)
239 cursor = cursor[key]
240 cursor[last] = val
241 return output
244def unembed(item):
245 # type: (Any) -> Any
246 '''
247 Convert embeded types in dictionary keys into python types.
249 Args:
250 item (object): Dictionary with embedded types.
252 Returns:
253 object: Converted object.
254 '''
255 lut = {'list': list, 'tuple': tuple, 'set': set}
256 embed_re = re.compile(r'^<([a-z]+)_(\d+)>$')
258 if is_dictlike(item) and item != {}:
259 output = {} # type: Any
260 keys = list(item.keys())
261 match = embed_re.match(keys[0])
262 if match:
263 indices = [embed_re.match(key).group(2) for key in keys] # type: ignore
264 indices = map(int, indices) # type: ignore
266 output = []
267 for i, key in sorted(zip(indices, keys)):
268 next_item = item[key]
269 if is_dictlike(next_item):
270 next_item = unembed(next_item)
271 output.append(next_item)
273 output = lut[match.group(1)](output)
274 return output
275 else:
276 for key, val in item.items():
277 output[key] = unembed(val)
278 return output
279 return item
282# FILE-FUNCTIONS----------------------------------------------------------------
283def list_all_files(
284 directory, # type: Filepath
285 include_regex=None, # type: Optional[str]
286 exclude_regex=None # type: Optional[str]
287):
288 # type: (...) -> Generator[Path, None, None]
289 '''
290 Recusively list all files within a given directory.
292 Args:
293 directory (str or Path): Directory to walk.
294 include_regex (str, optional): Include filenames that match this regex.
295 Default: None.
296 exclude_regex (str, optional): Exclude filenames that match this regex.
297 Default: None.
299 Raises:
300 FileNotFoundError: If argument is not a directory or does not exist.
302 Yields:
303 Path: File.
304 '''
305 directory = Path(directory)
306 if not directory.is_dir():
307 msg = f'{directory} is not a directory or does not exist.'
308 raise FileNotFoundError(msg)
310 include_re = re.compile(include_regex or '') # type: Any
311 exclude_re = re.compile(exclude_regex or '') # type: Any
313 for root, _, files in os.walk(directory):
314 for file_ in files:
315 filepath = Path(root, file_)
317 output = True
318 temp = filepath.absolute().as_posix()
319 if include_regex is not None and not include_re.search(temp):
320 output = False
321 if exclude_regex is not None and exclude_re.search(temp):
322 output = False
324 if output:
325 yield Path(root, file_)
328def directory_to_dataframe(directory, include_regex='', exclude_regex=r'\.DS_Store'):
329 # type: (Filepath, str, str) -> pd.DataFrame
330 r'''
331 Recursively list files with in a given directory as rows in a pd.DataFrame.
333 Args:
334 directory (str or Path): Directory to walk.
335 include_regex (str, optional): Include filenames that match this regex.
336 Default: None.
337 exclude_regex (str, optional): Exclude filenames that match this regex.
338 Default: '\.DS_Store'.
340 Returns:
341 pd.DataFrame: pd.DataFrame with one file per row.
342 '''
343 files = list_all_files(
344 directory,
345 include_regex=include_regex,
346 exclude_regex=exclude_regex
347 ) # type: Any
348 files = sorted(list(files))
350 data = pd.DataFrame()
351 data['filepath'] = files
352 data['filename'] = data.filepath.apply(lambda x: x.name)
353 data['extension'] = data.filepath \
354 .apply(lambda x: Path(x).suffix.lstrip('.'))
355 data.filepath = data.filepath.apply(lambda x: x.absolute().as_posix())
356 return data
359def get_parent_fields(key, separator='/'):
360 # type: (str, str) -> List[str]
361 '''
362 Get all the parent fields of a given key, split by given separator.
364 Args:
365 key (str): Key.
366 separator (str, optional): String that splits key into fields.
367 Default: '/'.
369 Returns:
370 list(str): List of absolute parent fields.
371 '''
372 fields = key.split(separator)
373 output = [] # type: List[str]
374 for i in range(len(fields) - 1):
375 output.append(separator.join(fields[:i + 1]))
376 return output
379def filter_text(
380 text, # type: str
381 include_regex=None, # type: Optional[str]
382 exclude_regex=None, # type: Optional[str]
383 replace_regex=None, # type: Optional[str]
384 replace_value=None, # type: Optional[str]
385):
386 # type: (...) -> str
387 '''
388 Filter given text by applying regular expressions to each line.
390 Args:
391 text (str): Newline separated lines.
392 include_regex (str, optional): Keep lines that match given regex.
393 Default: None.
394 exclude_regex (str, optional): Remove lines that match given regex.
395 Default: None.
396 replace_regex (str, optional): Substitutes regex matches in lines with
397 replace_value. Default: None.
398 replace_value (str, optional): Regex substitution value. Default: ''.
400 Raises:
401 AssertionError: If source is not a file.
403 Returns:
404 str: Filtered text.
405 '''
406 lines = text.split('\n')
407 if include_regex is not None:
408 lines = list(filter(lambda x: re.search(include_regex, x), lines))
409 if exclude_regex is not None:
410 lines = list(filter(lambda x: not re.search(exclude_regex, x), lines))
411 if replace_regex is not None:
412 rep_val = replace_value or ''
413 lines = [re.sub(replace_regex, rep_val, x) for x in lines]
414 output = '\n'.join(lines)
415 return output
418def read_text(filepath):
419 # type: (Filepath) -> str
420 '''
421 Convenience function for reading text from given file.
423 Args:
424 filepath (str or Path): File to be read.
426 Raises:
427 AssertionError: If source is not a file.
429 Returns:
430 str: text.
431 '''
432 assert Path(filepath).is_file()
433 with open(filepath) as f:
434 return f.read()
437def write_text(text, filepath):
438 # type: (str, Filepath) -> None
439 '''
440 Convenience function for writing text to given file.
441 Creates directories as needed.
443 Args:
444 text (str): Text to be written.
445 filepath (str or Path): File to be written.
446 '''
447 os.makedirs(Path(filepath).parent, exist_ok=True)
448 with open(filepath, 'w') as f:
449 f.write(text)
452def copy_file(source, target):
453 # type: (Filepath, Filepath) -> None
454 '''
455 Copy a source file to a target file. Creating directories as needed.
457 Args:
458 source (str or Path): Source filepath.
459 target (str or Path): Target filepath.
461 Raises:
462 AssertionError: If source is not a file.
463 '''
464 assert Path(source).is_file()
465 os.makedirs(Path(target).parent, exist_ok=True)
466 shutil.copy2(source, target)
469def move_file(source, target):
470 # type: (Filepath, Filepath) -> None
471 '''
472 Moves a source file to a target file. Creating directories as needed.
474 Args:
475 source (str or Path): Source filepath.
476 target (str or Path): Target filepath.
478 Raises:
479 AssertionError: If source is not a file.
480 '''
481 src = Path(source).as_posix()
482 assert Path(src).is_file()
483 os.makedirs(Path(target).parent, exist_ok=True)
484 shutil.move(src, target)
487# EXPORT-FUNCTIONS--------------------------------------------------------------
488def dot_to_html(dot, layout='dot', as_png=False):
489 # type: (pydot.Dot, str, bool) -> Union[HTML, Image]
490 '''
491 Converts a given pydot graph into a IPython.display.HTML object.
492 Used in jupyter lab inline display of graph data.
494 Args:
495 dot (pydot.Dot): Pydot Graph instance.
496 layout (str, optional): Graph layout style.
497 Options include: circo, dot, fdp, neato, sfdp, twopi.
498 Default: dot.
499 as_png (bool, optional): Display graph as a PNG image instead of SVG.
500 Useful for display on Github. Default: False.
502 Raises:
503 ValueError: If invalid layout given.
505 Returns:
506 IPython.display.HTML: HTML instance.
507 '''
508 layouts = ['circo', 'dot', 'fdp', 'neato', 'sfdp', 'twopi']
509 if layout not in layouts:
510 msg = f'Invalid layout value. {layout} not in {layouts}.'
511 raise ValueError(msg)
513 if as_png:
514 return Image(data=dot.create_png())
516 svg = dot.create_svg(prog=layout)
517 html = f'<object type="image/svg+xml" data="data:image/svg+xml;{svg}"></object>' # type: Any
518 html = HTML(html)
519 html.data = re.sub(r'\\n|\\', '', html.data)
520 html.data = re.sub('</svg>.*', '</svg>', html.data)
521 return html
524def write_dot_graph(
525 dot,
526 fullpath,
527 layout='dot',
528):
529 # type: (pydot.Dot, Union[str, Path], str) -> None
530 '''
531 Writes a pydot.Dot object to a given filepath.
532 Formats supported: svg, dot, png.
534 Args:
535 dot (pydot.Dot): Pydot Dot instance.
536 fulllpath (str or Path): File to be written to.
537 layout (str, optional): Graph layout style.
538 Options include: circo, dot, fdp, neato, sfdp, twopi. Default: dot.
540 Raises:
541 ValueError: If invalid file extension given.
542 '''
543 if isinstance(fullpath, Path):
544 fullpath = Path(fullpath).absolute().as_posix()
546 _, ext = os.path.splitext(fullpath)
547 ext = re.sub(r'^\.', '', ext)
548 if re.search('^svg$', ext, re.I):
549 dot.write_svg(fullpath, prog=layout)
550 elif re.search('^dot$', ext, re.I):
551 dot.write_dot(fullpath, prog=layout)
552 elif re.search('^png$', ext, re.I):
553 dot.write_png(fullpath, prog=layout)
554 else:
555 msg = f'Invalid extension found: {ext}. '
556 msg += 'Valid extensions include: svg, dot, png.'
557 raise ValueError(msg)
560# MISC-FUNCTIONS----------------------------------------------------------------
561def replace_and_format(regex, replace, string, flags=0):
562 # type: (str, str, str, Any) -> str
563 r'''
564 Perform a regex substitution on a given string and format any named group
565 found in the result with groupdict data from the pattern. Group beggining
566 with 'i' will be converted to integers. Groups beggining with 'f' will be
567 converted to floats.
569 ----------------------------------------------------------------------------
571 Named group anatomy:
572 ====================
573 * (?P<NAME>PATTERN)
574 * NAME becomes a key and whatever matches PATTERN becomes its value.
575 >>> re.search('(?P<i>\d+)', 'foobar123').groupdict()
576 {'i': '123'}
578 ----------------------------------------------------------------------------
580 Examples:
581 =========
582 Special groups:
583 * (?P<i>\d) - string matched by '\d' will be converted to an integer
584 * (?P<f>\d) - string matched by '\d' will be converted to an float
585 * (?P<i_foo>\d) - string matched by '\d' will be converted to an integer
586 * (?P<f_bar>\d) - string matched by '\d' will be converted to an float
588 Named groups (long):
589 >>> proj = '(?P<p>[a-z0-9]+)'
590 >>> spec = '(?P<s>[a-z0-9]+)'
591 >>> desc = '(?P<d>[a-z0-9\-]+)'
592 >>> ver = '(?P<iv>\d+)\.'
593 >>> frame = '(?P<i_f>\d+)'
594 >>> regex = f'{proj}\.{spec}\.{desc}\.v{ver}\.{frame}.*'
595 >>> replace = 'p-{p}_s-{s}_d-{d}_v{iv:03d}_f{i_f:04d}.jpeg'
596 >>> string = 'proj.spec.desc.v1.25.png'
597 >>> replace_and_format(regex, replace, string, flags=re.IGNORECASE)
598 p-proj_s-spec_d-desc_v001_f0025.jpeg
600 Named groups (short):
601 >>> replace_and_format(
602 '(?P<p>[a-z0-9]+)\.(?P<s>[a-z0-9]+)\.(?P<d>[a-z0-9\-]+)\.v(?P<iv>\d+)\.(?P<i_f>\d+).*',
603 'p-{p}_s-{s}_d-{d}_v{iv:03d}_f{i_f:04d}.jpeg',
604 'proj.spec.desc.v1.25.png',
605 )
606 p-proj_s-spec_d-desc_v001_f0025.jpeg
608 No groups:
609 >>> replace_and_format('foo', 'bar', 'foobar')
610 barbar
612 ----------------------------------------------------------------------------
614 Args:
615 regex (str): Regex pattern to search string with.
616 replace (str): Replacement string which may contain formart variables
617 ie '{variable}'.
618 string (str): String to be converted.
619 flags (object, optional): re.sub flags. Default: 0.
621 Returns:
622 str: Converted string.
623 '''
624 match = re.search(regex, string, flags=flags)
625 grp = {}
626 if match:
627 grp = match.groupdict()
629 for key, val in grp.items():
630 if key.startswith('f'):
631 grp[key] = float(val)
632 elif key.startswith('i'):
633 grp[key] = int(val)
635 output = re.sub(regex, replace, string, flags=flags)
636 # .format won't evaluate math expressions so do this
637 if grp != {}:
638 output = eval(f"f'{output}'", None, grp)
639 return output