Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from typing import Any, Dict, Iterable, List, Union
2import pydot
4import logging
5import os
6import re
7from collections import OrderedDict
8from pathlib import Path
10from IPython.display import HTML, Image
12LOG_LEVEL = os.environ.get('LOG_LEVEL', 'WARNING').upper()
13logging.basicConfig(level=LOG_LEVEL)
14LOGGER = logging.getLogger(__name__)
15# ------------------------------------------------------------------------------
17'''
18Contains basic functions for more complex ETL functions and classes.
19'''
21# COLOR-SCHEME------------------------------------------------------------------
22COLOR_SCHEME = dict(
23 background='#242424',
24 node='#343434',
25 node_font='#B6ECF3',
26 node_value='#343434',
27 node_value_font='#DE958E',
28 edge='#B6ECF3',
29 edge_value='#DE958E',
30 node_library_font='#DE958E',
31 node_subpackage_font='#A0D17B',
32 node_module_font='#B6ECF3',
33 edge_library='#DE958E',
34 edge_subpackage='#A0D17B',
35 edge_module='#B6ECF3',
36) # type: Dict[str, str]
38COLOR_SCALE = [
39 '#B6ECF3',
40 '#DE958E',
41 '#EBB483',
42 '#A0D17B',
43 '#93B6E6',
44 '#AC92DE',
45 '#E9EABE',
46 '#7EC4CF',
47 '#F77E70',
48 '#EB9E58',
49] # type: List[str]
52# PREDICATE-FUNCTIONS-----------------------------------------------------------
53def is_iterable(item):
54 # type: (Any) -> bool
55 '''
56 Determines if given item is iterable.
58 Args:
59 item (object): Object to be tested.
61 Returns:
62 bool: Whether given item is iterable.
63 '''
64 if is_listlike(item) or is_dictlike(item):
65 return True
66 return False
69def is_dictlike(item):
70 # type: (Any) -> bool
71 '''
72 Determines if given item is dict-like.
74 Args:
75 item (object): Object to be tested.
77 Returns:
78 bool: Whether given item is dict-like.
79 '''
80 for type_ in [dict, OrderedDict]:
81 if isinstance(item, type_):
82 if item.__class__.__name__ == 'Counter':
83 return False
84 return True
85 return False
88def is_listlike(item):
89 # type: (Any) -> bool
90 '''
91 Determines if given item is list-like.
93 Args:
94 item (object): Object to be tested.
96 Returns:
97 bool: Whether given item is list-like.
98 '''
99 for type_ in [list, tuple, set]:
100 if isinstance(item, type_):
101 return True
102 return False
105# CORE-FUNCTIONS----------------------------------------------------------------
106def flatten(item, separator='/', embed_types=True):
107 # type: (Iterable, str, bool) -> Dict[str, Any]
108 '''
109 Flattens a iterable object into a flat dictionary.
111 Args:
112 item (object): Iterable object.
113 separator (str, optional): Field separator in keys. Default: '/'.
115 Returns:
116 dict: Dictionary representation of given object.
117 '''
118 output = {} # type: Dict[str, Any]
120 def recurse(item, cursor):
121 # type (Iterable, Any) -> None
122 if is_listlike(item):
123 if embed_types:
124 name = item.__class__.__name__
125 item = [(f'<{name}_{i}>', val) for i, val in enumerate(item)]
126 item = dict(item)
127 else:
128 item = dict(enumerate(item))
129 if is_dictlike(item):
130 for key, val in item.items():
131 new_key = f'{cursor}{separator}{str(key)}'
132 if is_iterable(val) and len(val) > 0:
133 recurse(val, new_key)
134 else:
135 final_key = re.sub('^' + separator, '', new_key)
136 output[final_key] = val
138 recurse(item, '')
139 return output
142def nest(flat_dict, separator='/'):
143 # type: (Dict[str, Any], str) -> Dict[str, Any]
144 '''
145 Converts a flat dictionary into a nested dictionary by splitting keys by a
146 given separator.
148 Args:
149 flat_dict (dict): Flat dictionary.
150 separator (str, optional): Field separator within given dictionary's
151 keys. Default: '/'.
153 Returns:
154 dict: Nested dictionary.
155 '''
156 output = {} # type: Dict[str, Any]
157 for keys, val in flat_dict.items():
158 split_keys = list(filter(
159 lambda x: x != '', keys.split(separator)
160 ))
161 cursor = output
162 last = split_keys.pop()
163 for key in split_keys:
164 if key not in cursor:
165 cursor[key] = {}
167 if not isinstance(cursor[key], dict):
168 msg = f"Duplicate key conflict. Key: '{key}'."
169 raise KeyError(msg)
171 cursor = cursor[key]
172 cursor[last] = val
173 return output
176def unembed(item):
177 # type: (Any) -> Any
178 '''
179 Convert embeded types in dictionary keys into python types.
181 Args:
182 item (object): Dictionary with embedded types.
184 Returns:
185 object: Converted object.
186 '''
187 lut = {'list': list, 'tuple': tuple, 'set': set}
188 embed_re = re.compile(r'^<([a-z]+)_(\d+)>$')
190 if is_dictlike(item) and item != {}:
191 output = {} # type: Any
192 keys = list(item.keys())
193 match = embed_re.match(keys[0])
194 if match:
195 indices = [embed_re.match(key).group(2) for key in keys] # type: ignore
196 indices = map(int, indices) # type: ignore
198 output = []
199 for i, key in sorted(zip(indices, keys)):
200 next_item = item[key]
201 if is_dictlike(next_item):
202 next_item = unembed(next_item)
203 output.append(next_item)
205 output = lut[match.group(1)](output)
206 return output
207 else:
208 for key, val in item.items():
209 output[key] = unembed(val)
210 return output
211 return item
214# FILE-FUNCTIONS----------------------------------------------------------------
215def list_all_files(directory):
216 # type: (Union[str, Path]) -> List[Path]
217 '''
218 Recursively lists all files within a give directory.
220 Args:
221 directory (str or Path): Directory to be recursed.
223 Returns:
224 list[Path]: List of filepaths.
225 '''
226 output = [] # type: List[Path]
227 for root, dirs, files in os.walk(directory):
228 for file_ in files:
229 fullpath = Path(root, file_)
230 output.append(fullpath)
231 return output
234def get_parent_fields(key, separator='/'):
235 # type: (str, str) -> List[str]
236 '''
237 Get all the parent fields of a given key, split by given separator.
239 Args:
240 key (str): Key.
241 separator (str, optional): String that splits key into fields.
242 Default: '/'.
244 Returns:
245 list(str): List of absolute parent fields.
246 '''
247 fields = key.split(separator)
248 output = [] # type: List[str]
249 for i in range(len(fields) - 1):
250 output.append(separator.join(fields[:i + 1]))
251 return output
254# EXPORT-FUNCTIONS--------------------------------------------------------------
255def dot_to_html(dot, layout='dot', as_png=False):
256 # type: (pydot.Dot, str, bool) -> Union[HTML, Image]
257 '''
258 Converts a given pydot graph into a IPython.display.HTML object.
259 Used in jupyter lab inline display of graph data.
261 Args:
262 dot (pydot.Dot): Pydot Graph instance.
263 layout (str, optional): Graph layout style.
264 Options include: circo, dot, fdp, neato, sfdp, twopi.
265 Default: dot.
266 as_png (bool, optional): Display graph as a PNG image instead of SVG.
267 Useful for display on Github. Default: False.
269 Raises:
270 ValueError: If invalid layout given.
272 Returns:
273 IPython.display.HTML: HTML instance.
274 '''
275 layouts = ['circo', 'dot', 'fdp', 'neato', 'sfdp', 'twopi']
276 if layout not in layouts:
277 msg = f'Invalid layout value. {layout} not in {layouts}.'
278 raise ValueError(msg)
280 if as_png:
281 return Image(data=dot.create_png())
283 svg = dot.create_svg(prog=layout)
284 html = f'<object type="image/svg+xml" data="data:image/svg+xml;{svg}"></object>' # type: Any
285 html = HTML(html)
286 html.data = re.sub(r'\\n|\\', '', html.data)
287 html.data = re.sub('</svg>.*', '</svg>', html.data)
288 return html
291def write_dot_graph(
292 dot,
293 fullpath,
294 layout='dot',
295):
296 # type: (pydot.Dot, Union[str, Path], str) -> None
297 '''
298 Writes a pydot.Dot object to a given filepath.
299 Formats supported: svg, dot, png.
301 Args:
302 dot (pydot.Dot): Pydot Dot instance.
303 fulllpath (str or Path): File to be written to.
304 layout (str, optional): Graph layout style.
305 Options include: circo, dot, fdp, neato, sfdp, twopi. Default: dot.
307 Raises:
308 ValueError: If invalid file extension given.
309 '''
310 if isinstance(fullpath, Path):
311 fullpath = Path(fullpath).absolute().as_posix()
313 _, ext = os.path.splitext(fullpath)
314 ext = re.sub(r'^\.', '', ext)
315 if re.search('^svg$', ext, re.I):
316 dot.write_svg(fullpath, prog=layout)
317 elif re.search('^dot$', ext, re.I):
318 dot.write_dot(fullpath, prog=layout)
319 elif re.search('^png$', ext, re.I):
320 dot.write_png(fullpath, prog=layout)
321 else:
322 msg = f'Invalid extension found: {ext}. '
323 msg += 'Valid extensions include: svg, dot, png.'
324 raise ValueError(msg)