Coverage for /home/ubuntu/rolling-pin/python/rolling_pin/blob_etl.py: 100%
226 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-13 19:35 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-13 19:35 +0000
1from typing import Any, Callable, Dict, Iterator, List, Optional, Union # noqa: F401
2from IPython.display import HTML, Image # noqa: F401
3import pydot # noqa: F401
5from collections import Counter
6import json
7import os
8import re
9from copy import deepcopy
10from pathlib import Path
12import lunchbox.tools as lbt
13from pandas import DataFrame
14import networkx
16import rolling_pin.tools as rpt
17# ------------------------------------------------------------------------------
19'''
20Contains the BlobETL class, which is used for coverting JSON blobs, and their
21python equivalents, into flat dictionaries that can easily be modified and
22converted to directed graphs.
23'''
26class BlobETL():
27 '''
28 Converts blob data internally into a flat dictionary that is universally
29 searchable, editable and convertable back to the data's original structure,
30 new blob structures or directed graphs.
31 '''
32 def __init__(self, blob, separator='/'):
33 # type: (Any, str) -> None
34 '''
35 Contructs BlobETL instance.
37 Args:
38 blob (object): Iterable object.
39 separator (str, optional): String to be used as a field separator in
40 each key. Default: '/'.
41 '''
42 self._data = rpt \
43 .flatten(blob, separator=separator, embed_types=True) # type: Dict[str, Any]
44 self._separator = separator # type: str
46 # EDIT_METHODS--------------------------------------------------------------
47 def query(self, regex, ignore_case=True, invert=False):
48 # type: (str, bool, bool) -> BlobETL
49 '''
50 Filter data items by key according to given regular expression.
52 Args:
53 regex (str): Regular expression.
54 ignore_case (bool, optional): Whether to consider case in the
55 regular expression search. Default: False.
56 invert (bool, optional): Whether to invert the predicate.
57 Default: False.
59 Returns:
60 BlobETL: New BlobETL instance.
61 '''
62 r = re.compile(regex, re.IGNORECASE) if ignore_case else re.compile(regex)
63 return self.filter(lambda x: bool(r.search(x)), by='key', invert=invert)
65 def filter(self, predicate, by='key', invert=False):
66 # type: (Callable[[Any], bool], str, bool) -> BlobETL
67 '''
68 Filter data items by key, value or key + value, according to a given
69 predicate.
71 Args:
72 predicate: Function that returns a boolean value.
73 by (str, optional): Value handed to predicate.
74 Options include: key, value, key+value. Default: key.
75 invert (bool, optional): Whether to invert the predicate.
76 Default: False.
78 Raises:
79 ValueError: If by keyword is not key, value, or key+value.
81 Returns:
82 BlobETL: New BlobETL instance.
83 '''
84 pred = lambda items: predicate(*items)
85 if invert:
86 pred = lambda items: not predicate(*items)
88 data = {}
89 if by not in ['key', 'value', 'key+value']:
90 msg = f'Invalid by argument: {by}. Needs to be one of: '
91 msg += 'key, value, key+value.'
92 raise ValueError(msg)
94 for key, val in self._data.items():
95 item = None
96 if by == 'key':
97 item = [key]
98 elif by == 'value':
99 item = [val]
100 else:
101 item = [key, val]
103 if pred(item):
104 data[key] = val
106 return BlobETL(data, separator=self._separator)
108 def delete(self, predicate, by='key'):
109 # type: (Callable[[Any], bool], str) -> BlobETL
110 '''
111 Delete data items by key, value or key + value, according to a given
112 predicate.
114 Args:
115 predicate: Function that returns a boolean value.
116 by (str, optional): Value handed to predicate.
117 Options include: key, value, key+value. Default: key.
119 Raises:
120 ValueError: If by keyword is not key, value, or key+value.
122 Returns:
123 BlobETL: New BlobETL instance.
124 '''
125 data = deepcopy(self._data)
126 if by not in ['key', 'value', 'key+value']:
127 msg = f'Invalid by argument: {by}. Needs to be one of: '
128 msg += 'key, value, key+value.'
129 raise ValueError(msg)
131 for key, val in self._data.items():
132 item = None
133 if by == 'key':
134 item = [key]
135 elif by == 'value':
136 item = [val]
137 else:
138 item = [key, val]
140 if predicate(*item):
141 del data[key]
143 return BlobETL(data, separator=self._separator)
145 def set(
146 self,
147 predicate=None, # type: Optional[Callable[[Any, Any], bool]]
148 key_setter=None, # type: Optional[Callable[[Any, Any], str]]
149 value_setter=None, # type: Optional[Callable[[Any, Any], Any]]
150 ):
151 # type: (...) -> BlobETL
152 '''
153 Filter data items by key, value or key + value, according to a given
154 predicate. Then set that items key by a given function and value by a
155 given function.
157 Args:
158 predicate (function, optional): Function of the form:
159 lambda k, v: bool. Default: None --> lambda k, v: True.
160 key_setter (function, optional): Function of the form:
161 lambda k, v: str. Default: None --> lambda k, v: k.
162 value_setter (function, optional): Function of the form:
163 lambda k, v: object. Default: None --> lambda k, v: v.
165 Returns:
166 BlobETL: New BlobETL instance.
167 '''
168 # assign default predicate
169 if predicate is None:
170 predicate = lambda k, v: True
172 # assign default key_setter
173 if key_setter is None:
174 key_setter = lambda k, v: k
176 # assign default value_setter
177 if value_setter is None:
178 value_setter = lambda k, v: v
180 data = deepcopy(self._data)
181 for item in self._data.items():
182 if predicate(*item):
183 k = key_setter(*item)
184 v = value_setter(*item)
185 del data[item[0]]
186 data[k] = v
188 return BlobETL(data, separator=self._separator)
190 def update(self, item):
191 # type: (Union[Dict, BlobETL]) -> BlobETL
192 '''
193 Updates internal dictionary with given dictionary or BlobETL instance.
194 Given dictionary is first flattened with embeded types.
196 Args:
197 item (dict or BlobETL): Dictionary to be used for update.
199 Returns:
200 BlobETL: New BlobETL instance.
201 '''
202 if isinstance(item, BlobETL):
203 item = item._data
204 temp = rpt.flatten(item, separator=self._separator, embed_types=True)
205 data = deepcopy(self._data)
206 data.update(temp)
207 return BlobETL(data, separator=self._separator)
209 def set_field(self, index, field_setter):
210 # type: (int, Callable[[str], str]) -> BlobETL
211 '''
212 Set's a field at a given index according to a given function.
214 Args:
215 index (int): Field index.
216 field_setter (function): Function of form lambda str: str.
218 Returns:
219 BlobETL: New BlobETL instance.
220 '''
221 output = {}
222 for key, val in self._data.items():
223 fields = key.split(self._separator)
224 fields[index] = field_setter(fields[index])
225 key = self._separator.join(fields)
226 output[key] = val
227 return BlobETL(output, separator=self._separator)
229 # EXPORT-METHODS------------------------------------------------------------
230 def to_dict(self):
231 # type: () -> Dict[str, Any]
232 '''
233 Returns:
234 dict: Nested representation of internal data.
235 '''
236 return rpt.unembed(
237 rpt.nest(deepcopy(self._data), separator=self._separator)
238 )
240 def to_flat_dict(self):
241 # type: () -> Dict[str, Any]
242 '''
243 Returns:
244 dict: Flat dictionary with embedded types.
245 '''
246 return deepcopy(self._data)
248 def to_records(self):
249 # type: () -> List[Dict]
250 '''
251 Returns:
252 list[dict]: Data in records format.
253 '''
254 data = []
255 for key, val in self._data.items():
256 fields = key.split(self._separator)
257 row = {i: v for i, v in enumerate(fields)} # type: Dict[Any, Any]
258 row['value'] = val
259 data.append(row)
260 return data
262 def to_dataframe(self, group_by=None):
263 # type: (Optional[int]) -> DataFrame
264 '''
265 Convert data to pandas DataFrame.
267 Args:
268 group_by (int, optional): Field index to group rows of data by.
269 Default: None.
271 Returns:
272 DataFrame: DataFrame.
273 '''
274 data = self.to_records() # type: Any
275 data = DataFrame(data)
277 if group_by is not None:
278 group = list(range(0, group_by))
279 data = DataFrame(data)\
280 .groupby(group, as_index=False)\
281 .agg(lambda x: x.tolist())\
282 .apply(lambda x: x.to_dict(), axis=1)\
283 .tolist()
284 data = DataFrame(data)
286 # clean up column order
287 cols = data.columns.tolist() # type: List[str]
288 cols = list(sorted(filter(lambda x: x != 'value', cols)))
289 cols += ['value']
290 data = data[cols]
292 return data
294 def to_prototype(self):
295 # type: () -> BlobETL
296 '''
297 Convert data to prototypical representation.
299 Example:
300 ========
301 >>> data = {
302 'users': [
303 {
304 'name': {
305 'first': 'tom',
306 'last': 'smith',
307 }
308 },{
309 'name': {
310 'first': 'dick',
311 'last': 'smith',
312 }
313 },{
314 'name': {
315 'first': 'jane',
316 'last': 'doe',
317 }
318 },
319 ]
320 }
321 >>> BlobETL(data).to_prototype().to_dict()
322 {
323 '^users': {
324 '<list_[0-9]+>': {
325 'name': {
326 'first$': Counter({'dick': 1, 'jane': 1, 'tom': 1}),
327 'last$': Counter({'doe': 1, 'smith': 2})
328 }
329 }
330 }
331 }
333 Returns:
334 BlobETL: New BlobETL instance.
335 '''
336 def regex_in_list(regex, items):
337 # type: (str, List[str]) -> bool
338 for item in items:
339 if re.search(regex, item):
340 return True
341 return False # pragma: no cover
343 def field_combinations(a, b):
344 # type: (List[str], List[str]) -> List[str]
345 output = []
346 for fa in a:
347 for fb in b:
348 output.append(fa + self._separator + fb)
349 return output
351 keys = list(self._data.keys())
352 fields = list(map(lambda x: x.split(self._separator), keys))
354 fields = DataFrame(fields)\
355 .apply(lambda x: x.unique().tolist())\
356 .apply(lambda x: filter(lambda y: y is not None, x)) \
357 .apply(lambda x: map(
358 lambda y: re.sub(r'<([a-z]+)_\d+>', '<\\1_[0-9]+>', y),
359 x)) \
360 .apply(lambda x: list(set(x))) \
361 .tolist()
363 prev = fields[0]
364 regexes = list()
365 for i, level in enumerate(fields[1:]):
366 temp = field_combinations(prev, level) # type: Union[List, Iterator]
367 temp = filter(lambda x: regex_in_list('^' + x, keys), temp)
368 prev = list(temp)
369 regexes.extend(prev)
371 regexes = lbt.get_ordered_unique(regexes)
373 p_keys = set()
374 for regex in regexes:
375 other = deepcopy(regexes)
376 other.remove(regex)
377 not_in_other = True
378 for item in other:
379 if regex in item:
380 not_in_other = False
381 if not_in_other:
382 p_keys.add(f'^{regex}$')
384 output = {}
385 for key in p_keys:
386 values = self.query(key).to_flat_dict().values()
387 output[key] = Counter(values)
388 return BlobETL(output, separator=self._separator)
390 def to_networkx_graph(self):
391 # type: () -> networkx.DiGraph
392 '''
393 Converts internal dictionary into a networkx directed graph.
395 Returns:
396 networkx.DiGraph: Graph representation of dictionary.
397 '''
398 graph = networkx.DiGraph()
399 graph.add_node('root')
400 embed_re = re.compile(r'<[a-z]+_(\d+)>')
402 def recurse(item, parent):
403 # type: (Dict, str) -> None
404 for key, val in item.items():
405 k = f'{parent}{self._separator}{key}'
406 short_name = embed_re.sub('\\1', key)
407 graph.add_node(k, short_name=short_name, node_type='key')
408 graph.add_edge(parent, k)
410 if isinstance(val, dict):
411 recurse(val, k)
412 else:
413 graph.nodes[k]['value'] = [val]
414 name = f'"{str(val)}"'
415 v = f'"{k}{self._separator}{str(val)}"'
416 graph.add_node(
417 v, short_name=name, node_type='value', value=[val]
418 )
419 graph.add_edge(k, v)
421 recurse(rpt.nest(self._data, self._separator), 'root')
422 graph.remove_node('root')
423 return graph
425 def to_dot_graph(
426 self, orthogonal_edges=False, orient='tb', color_scheme=None
427 ):
428 # type: (bool, str, Optional[Dict[str, str]]) -> pydot.Dot
429 '''
430 Converts internal dictionary into pydot graph.
431 Key and value nodes and edges are colored differently.
433 Args:
434 orthogonal_edges (bool, optional): Whether graph edges should have
435 non-right angles. Default: False.
436 orient (str, optional): Graph layout orientation. Default: tb.
437 Options include:
439 * tb - top to bottom
440 * bt - bottom to top
441 * lr - left to right
442 * rl - right to left
443 color_scheme: (dict, optional): Color scheme to be applied to graph.
444 Default: rolling_pin.tools.COLOR_SCHEME
446 Raises:
447 ValueError: If orient is invalid.
449 Returns:
450 pydot.Dot: Dot graph representation of dictionary.
451 '''
452 orient = orient.lower()
453 orientations = ['tb', 'bt', 'lr', 'rl']
454 if orient not in orientations:
455 msg = f'Invalid orient value. {orient} not in {orientations}.'
456 raise ValueError(msg)
458 # set default colort scheme
459 if color_scheme is None:
460 color_scheme = rpt.COLOR_SCHEME
462 # create pydot graph
463 graph = self.to_networkx_graph()
464 dot = networkx.drawing.nx_pydot.to_pydot(graph)
466 # set layout orientation
467 dot.set_rankdir(orient.upper())
469 # set graph background color
470 dot.set_bgcolor(color_scheme['background'])
472 # set edge draw type
473 if orthogonal_edges:
474 dot.set_splines('ortho')
476 # set draw parameters for each node of graph
477 for node in dot.get_nodes():
478 node.set_shape('rect')
479 node.set_style('filled')
480 node.set_color(color_scheme['node'])
481 node.set_fillcolor(color_scheme['node'])
482 node.set_fontcolor(color_scheme['node_font'])
483 node.set_fontname('Courier')
485 # if node has short name, set its displayed name to that
486 attrs = node.get_attributes()
487 if 'short_name' in attrs:
488 node.set_label(attrs['short_name'])
490 # if node type is value change its colors
491 if 'node_type' in attrs and attrs['node_type'] == 'value':
492 node.set_color(color_scheme['node_value'])
493 node.set_fillcolor(color_scheme['node_value'])
494 node.set_fontcolor(color_scheme['node_value_font'])
496 # set draw parameters for each edge in graph
497 for edge in dot.get_edges():
498 edge.set_color(color_scheme['edge'])
500 # if edge destination node type is value change its color
501 node = dot.get_node(edge.get_destination())[0]
502 attrs = node.get_attributes()
503 if 'node_type' in attrs and attrs['node_type'] == 'value':
504 edge.set_color(color_scheme['edge_value'])
506 return dot
508 def to_html(
509 self,
510 layout='dot',
511 orthogonal_edges=False,
512 orient='tb',
513 color_scheme=None,
514 as_png=False,
515 ):
516 # type: (str, bool, str, Optional[Dict[str, str]], bool) -> Union[Image, HTML]
517 '''
518 For use in inline rendering of graph data in Jupyter Lab.
520 Args:
521 layout (str, optional): Graph layout style.
522 Options include: circo, dot, fdp, neato, sfdp, twopi.
523 Default: dot.
524 orthogonal_edges (bool, optional): Whether graph edges should have
525 non-right angles. Default: False.
526 orient (str, optional): Graph layout orientation. Default: tb.
527 Options include:
529 * tb - top to bottom
530 * bt - bottom to top
531 * lr - left to right
532 * rl - right to left
533 color_scheme: (dict, optional): Color scheme to be applied to graph.
534 Default: rolling_pin.tools.COLOR_SCHEME
535 as_png (bool, optional): Display graph as a PNG image instead of
536 SVG. Useful for display on Github. Default: False.
538 Returns:
539 IPython.display.HTML: HTML object for inline display.
540 '''
541 if color_scheme is None:
542 color_scheme = rpt.COLOR_SCHEME
544 dot = self.to_dot_graph(
545 orthogonal_edges=orthogonal_edges,
546 orient=orient,
547 color_scheme=color_scheme,
548 )
549 return rpt.dot_to_html(dot, layout=layout, as_png=as_png)
551 def write(
552 self,
553 fullpath,
554 layout='dot',
555 orthogonal_edges=False,
556 orient='tb',
557 color_scheme=None
558 ):
559 # type: (Union[str, Path], str, bool, str, Optional[Dict[str, str]]) -> BlobETL
560 '''
561 Writes internal dictionary to a given filepath.
562 Formats supported: svg, dot, png, json.
564 Args:
565 fulllpath (str or Path): File tobe written to.
566 layout (str, optional): Graph layout style.
567 Options include: circo, dot, fdp, neato, sfdp, twopi.
568 Default: dot.
569 orthogonal_edges (bool, optional): Whether graph edges should have
570 non-right angles. Default: False.
571 orient (str, optional): Graph layout orientation. Default: tb.
572 Options include:
574 * tb - top to bottom
575 * bt - bottom to top
576 * lr - left to right
577 * rl - right to left
578 color_scheme: (dict, optional): Color scheme to be applied to graph.
579 Default: rolling_pin.tools.COLOR_SCHEME
581 Raises:
582 ValueError: If invalid file extension given.
584 Returns:
585 BlobETL: self.
586 '''
587 if isinstance(fullpath, Path):
588 fullpath = fullpath.absolute().as_posix()
590 _, ext = os.path.splitext(fullpath)
591 ext = re.sub(r'^\.', '', ext)
592 if re.search('^json$', ext, re.I):
593 with open(fullpath, 'w') as f:
594 json.dump(self.to_dict(), f)
595 return self
597 if color_scheme is None:
598 color_scheme = rpt.COLOR_SCHEME
600 graph = self.to_dot_graph(
601 orthogonal_edges=orthogonal_edges,
602 orient=orient,
603 color_scheme=color_scheme,
604 )
605 try:
606 rpt.write_dot_graph(graph, fullpath, layout=layout,)
607 except ValueError:
608 msg = f'Invalid extension found: {ext}. '
609 msg += 'Valid extensions include: svg, dot, png, json.'
610 raise ValueError(msg)
611 return self