Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from typing import Any, Callable, Dict, Iterator, List, Optional, Union
2from IPython.display import HTML, Image
3import pydot
5from collections import Counter
6import json
7import os
8import re
9from copy import deepcopy
10from pathlib import Path
12import lunchbox.tools as lbt
13from pandas import DataFrame
14import networkx
16import rolling_pin.tools as tools
17# ------------------------------------------------------------------------------
19'''
20Contains the BlobETL class, which is used for coverting JSON blobs, and their
21python equivalents, into flat dictionaries that can easily be modified and
22converted to directed graphs.
23'''
26class BlobETL():
27 '''
28 Converts blob data internally into a flat dictionary that is universally
29 searchable, editable and convertable back to the data's original structure,
30 new blob structures or dircted graphs.
31 '''
32 def __init__(self, blob, separator='/'):
33 # type: (Any, str) -> None
34 '''
35 Contructs BlobETL instance.
37 Args:
38 blob (object): Iterable object.
39 separator (str, optional): String to be used as a field separator in
40 each key. Default: '/'.
41 '''
42 self._data = tools \
43 .flatten(blob, separator=separator, embed_types=True) # type: Dict[str, Any]
44 self._separator = separator # type: str
46 # EDIT_METHODS--------------------------------------------------------------
47 def query(self, regex, ignore_case=True):
48 # type: (str, bool) -> BlobETL
49 '''
50 Filter data items by key according to given regular expression.
52 Args:
53 regex (str): Regular expression.
54 ignore_casd (bool, optional): Whether to consider case in the
55 regular expression search. Default: False.
57 Returns:
58 BlobETL: New BlobETL instance.
59 '''
60 if ignore_case:
61 return self.filter(lambda x: bool(re.search(regex, x, re.I)), by='key')
62 return self.filter(lambda x: bool(re.search(regex, x)), by='key')
64 def filter(self, predicate, by='key'):
65 # type: (Callable[[Any], bool], str) -> BlobETL
66 '''
67 Filter data items by key, value or key + value, according to a given
68 predicate.
70 Args:
71 predicate: Function that returns a boolean value.
72 by (str, optional): Value handed to predicate.
73 Options include: key, value, key+value. Default: key.
75 Raises:
76 ValueError: If by keyword is not key, value, or key+value.
78 Returns:
79 BlobETL: New BlobETL instance.
80 '''
81 data = {}
82 if by not in ['key', 'value', 'key+value']:
83 msg = f'Invalid by argument: {by}. Needs to be one of: '
84 msg += 'key, value, key+value.'
85 raise ValueError(msg)
87 for key, val in self._data.items():
88 item = None
89 if by == 'key':
90 item = [key]
91 elif by == 'value':
92 item = [val]
93 else:
94 item = [key, val]
96 if predicate(*item):
97 data[key] = val
99 return BlobETL(data, separator=self._separator)
101 def delete(self, predicate, by='key'):
102 # type: (Callable[[Any], bool], str) -> BlobETL
103 '''
104 Delete data items by key, value or key + value, according to a given
105 predicate.
107 Args:
108 predicate: Function that returns a boolean value.
109 by (str, optional): Value handed to predicate.
110 Options include: key, value, key+value. Default: key.
112 Raises:
113 ValueError: If by keyword is not key, value, or key+value.
115 Returns:
116 BlobETL: New BlobETL instance.
117 '''
118 data = deepcopy(self._data)
119 if by not in ['key', 'value', 'key+value']:
120 msg = f'Invalid by argument: {by}. Needs to be one of: '
121 msg += 'key, value, key+value.'
122 raise ValueError(msg)
124 for key, val in self._data.items():
125 item = None
126 if by == 'key':
127 item = [key]
128 elif by == 'value':
129 item = [val]
130 else:
131 item = [key, val]
133 if predicate(*item):
134 del data[key]
136 return BlobETL(data, separator=self._separator)
138 def set(
139 self,
140 predicate=None, # type: Optional[Callable[[Any, Any], bool]]
141 key_setter=None, # type: Optional[Callable[[Any, Any], str]]
142 value_setter=None, # type: Optional[Callable[[Any, Any], Any]]
143 ):
144 # type: (...) -> BlobETL
145 '''
146 Filter data items by key, value or key + value, according to a given
147 predicate. Then set that items key by a given function and value by a
148 given function.
150 Args:
151 predicate (function, optional): Function of the form:
152 lambda k, v: bool. Default: None --> lambda k, v: True.
153 key_setter (function, optional): Function of the form:
154 lambda k, v: str. Default: None --> lambda k, v: k.
155 value_setter (function, optional): Function of the form:
156 lambda k, v: object. Default: None --> lambda k, v: v.
158 Returns:
159 BlobETL: New BlobETL instance.
160 '''
161 # assign default predicate
162 if predicate is None:
163 predicate = lambda k, v: True
165 # assign default key_setter
166 if key_setter is None:
167 key_setter = lambda k, v: k
169 # assign default value_setter
170 if value_setter is None:
171 value_setter = lambda k, v: v
173 data = deepcopy(self._data)
174 for item in self._data.items():
175 if predicate(*item):
176 k = key_setter(*item)
177 v = value_setter(*item)
178 del data[item[0]]
179 data[k] = v
181 return BlobETL(data, separator=self._separator)
183 def update(self, item):
184 # type: (Union[Dict, BlobETL]) -> BlobETL
185 '''
186 Updates internal dictionary with given dictionary or BlobETL instance.
187 Given dictionary is first flattened with embeded types.
189 Args:
190 item (dict or BlobETL): Dictionary to be used for update.
192 Returns:
193 BlobETL: New BlobETL instance.
194 '''
195 if isinstance(item, BlobETL):
196 item = item._data
197 temp = tools.flatten(item, separator=self._separator, embed_types=True)
198 data = deepcopy(self._data)
199 data.update(temp)
200 return BlobETL(data, separator=self._separator)
202 def set_field(self, index, field_setter):
203 # type: (int, Callable[[str], str]) -> BlobETL
204 '''
205 Set's a field at a given index according to a given function.
207 Args:
208 index (int): Field index.
209 field_setter (function): Function of form lambda str: str.
211 Returns:
212 BlobETL: New BlobETL instance.
213 '''
214 output = {}
215 for key, val in self._data.items():
216 fields = key.split(self._separator)
217 fields[index] = field_setter(fields[index])
218 key = self._separator.join(fields)
219 output[key] = val
220 return BlobETL(output, separator=self._separator)
222 # EXPORT-METHODS------------------------------------------------------------
223 def to_dict(self):
224 # type: () -> Dict[str, Any]
225 '''
226 Returns:
227 dict: Nested representation of internal data.
228 '''
229 return tools.unembed(
230 tools.nest(deepcopy(self._data), separator=self._separator)
231 )
233 def to_flat_dict(self):
234 # type: () -> Dict[str, Any]
235 '''
236 Returns:
237 dict: Flat dictionary with embedded types.
238 '''
239 return deepcopy(self._data)
241 def to_records(self):
242 # type: () -> List[Dict]
243 '''
244 Returns:
245 list[dict]: Data in records format.
246 '''
247 data = []
248 for key, val in self._data.items():
249 fields = key.split(self._separator)
250 row = {i: v for i, v in enumerate(fields)} # type: Dict[Any, Any]
251 row['value'] = val
252 data.append(row)
253 return data
255 def to_dataframe(self, group_by=None):
256 # type: (Optional[int]) -> DataFrame
257 '''
258 Convert data to pandas DataFrame.
260 Args:
261 group_by (int, optional): Field index to group rows of data by.
262 Default: None.
264 Returns:
265 DataFrame: DataFrame.
266 '''
267 data = self.to_records() # type: Any
268 data = DataFrame(data)
270 if group_by is not None:
271 group = list(range(0, group_by))
272 data = DataFrame(data)\
273 .groupby(group, as_index=False)\
274 .agg(lambda x: x.tolist())\
275 .apply(lambda x: x.to_dict(), axis=1)\
276 .tolist()
277 data = DataFrame(data)
279 # clean up column order
280 cols = data.columns.tolist() # type: List[str]
281 cols = list(sorted(filter(lambda x: x != 'value', cols)))
282 cols += ['value']
283 data = data[cols]
285 return data
287 def to_prototype(self):
288 # type: () -> BlobETL
289 '''
290 Convert data to prototypical representation.
292 Example:
294 >>> data = {
295 'users': [
296 {
297 'name': {
298 'first': 'tom',
299 'last': 'smith',
300 }
301 },{
302 'name': {
303 'first': 'dick',
304 'last': 'smith',
305 }
306 },{
307 'name': {
308 'first': 'jane',
309 'last': 'doe',
310 }
311 },
312 ]
313 }
314 >>> BlobETL(data).to_prototype().to_dict()
315 {
316 '^users': {
317 '<list_[0-9]+>': {
318 'name': {
319 'first$': Counter({'dick': 1, 'jane': 1, 'tom': 1}),
320 'last$': Counter({'doe': 1, 'smith': 2})
321 }
322 }
323 }
324 }
326 Returns:
327 BlobETL: New BlobETL instance.
328 '''
329 def regex_in_list(regex, items):
330 # type: (str, List[str]) -> bool
331 for item in items:
332 if re.search(regex, item):
333 return True
334 return False # pragma: no cover
336 def field_combinations(a, b):
337 # type: (List[str], List[str]) -> List[str]
338 output = []
339 for fa in a:
340 for fb in b:
341 output.append(fa + self._separator + fb)
342 return output
344 keys = list(self._data.keys())
345 fields = list(map(lambda x: x.split(self._separator), keys))
347 fields = DataFrame(fields)\
348 .apply(lambda x: x.unique().tolist())\
349 .apply(lambda x: filter(lambda y: y is not None, x)) \
350 .apply(lambda x: map(
351 lambda y: re.sub(r'<([a-z]+)_\d+>', '<\\1_[0-9]+>', y),
352 x)) \
353 .apply(lambda x: list(set(x))) \
354 .tolist()
356 prev = fields[0]
357 regexes = list()
358 for i, level in enumerate(fields[1:]):
359 temp = field_combinations(prev, level) # type: Union[List, Iterator]
360 temp = filter(lambda x: regex_in_list('^' + x, keys), temp)
361 prev = list(temp)
362 regexes.extend(prev)
364 regexes = lbt.get_ordered_unique(regexes)
366 p_keys = set()
367 for regex in regexes:
368 other = deepcopy(regexes)
369 other.remove(regex)
370 not_in_other = True
371 for item in other:
372 if regex in item:
373 not_in_other = False
374 if not_in_other:
375 p_keys.add(f'^{regex}$')
377 output = {}
378 for key in p_keys:
379 values = self.query(key).to_flat_dict().values()
380 output[key] = Counter(values)
381 return BlobETL(output, separator=self._separator)
383 def to_networkx_graph(self):
384 # type: () -> networkx.DiGraph
385 '''
386 Converts internal dictionary into a networkx directed graph.
388 Returns:
389 networkx.DiGraph: Graph representation of dictionary.
390 '''
391 graph = networkx.DiGraph()
392 graph.add_node('root')
393 embed_re = re.compile(r'<[a-z]+_(\d+)>')
395 def recurse(item, parent):
396 # type: (Dict, str) -> None
397 for key, val in item.items():
398 k = f'{parent}{self._separator}{key}'
399 short_name = embed_re.sub('\\1', key)
400 graph.add_node(k, short_name=short_name, node_type='key')
401 graph.add_edge(parent, k)
403 if isinstance(val, dict):
404 recurse(val, k)
405 else:
406 graph.nodes[k]['value'] = [val]
407 name = f'"{str(val)}"'
408 v = f'"{k}{self._separator}{str(val)}"'
409 graph.add_node(
410 v, short_name=name, node_type='value', value=[val]
411 )
412 graph.add_edge(k, v)
414 recurse(tools.nest(self._data, self._separator), 'root')
415 graph.remove_node('root')
416 return graph
418 def to_dot_graph(
419 self, orthogonal_edges=False, orient='tb', color_scheme=None
420 ):
421 # type: (bool, str, Optional[Dict[str, str]]) -> pydot.Dot
422 '''
423 Converts internal dictionary into pydot graph.
424 Key and value nodes and edges are colored differently.
426 Args:
427 orthogonal_edges (bool, optional): Whether graph edges should have
428 non-right angles. Default: False.
429 orient (str, optional): Graph layout orientation. Default: tb.
430 Options include:
432 * tb - top to bottom
433 * bt - bottom to top
434 * lr - left to right
435 * rl - right to left
436 color_scheme: (dict, optional): Color scheme to be applied to graph.
437 Default: rolling_pin.tools.COLOR_SCHEME
439 Raises:
440 ValueError: If orient is invalid.
442 Returns:
443 pydot.Dot: Dot graph representation of dictionary.
444 '''
445 orient = orient.lower()
446 orientations = ['tb', 'bt', 'lr', 'rl']
447 if orient not in orientations:
448 msg = f'Invalid orient value. {orient} not in {orientations}.'
449 raise ValueError(msg)
451 # set default colort scheme
452 if color_scheme is None:
453 color_scheme = tools.COLOR_SCHEME
455 # create pydot graph
456 graph = self.to_networkx_graph()
457 dot = networkx.drawing.nx_pydot.to_pydot(graph)
459 # set layout orientation
460 dot.set_rankdir(orient.upper())
462 # set graph background color
463 dot.set_bgcolor(color_scheme['background'])
465 # set edge draw type
466 if orthogonal_edges:
467 dot.set_splines('ortho')
469 # set draw parameters for each node of graph
470 for node in dot.get_nodes():
471 node.set_shape('rect')
472 node.set_style('filled')
473 node.set_color(color_scheme['node'])
474 node.set_fillcolor(color_scheme['node'])
475 node.set_fontcolor(color_scheme['node_font'])
476 node.set_fontname('Courier')
478 # if node has short name, set its displayed name to that
479 attrs = node.get_attributes()
480 if 'short_name' in attrs:
481 node.set_label(attrs['short_name'])
483 # if node type is value change its colors
484 if 'node_type' in attrs and attrs['node_type'] == 'value':
485 node.set_color(color_scheme['node_value'])
486 node.set_fillcolor(color_scheme['node_value'])
487 node.set_fontcolor(color_scheme['node_value_font'])
489 # set draw parameters for each edge in graph
490 for edge in dot.get_edges():
491 edge.set_color(color_scheme['edge'])
493 # if edge destination node type is value change its color
494 node = dot.get_node(edge.get_destination())[0]
495 attrs = node.get_attributes()
496 if 'node_type' in attrs and attrs['node_type'] == 'value':
497 edge.set_color(color_scheme['edge_value'])
499 return dot
501 def to_html(
502 self,
503 layout='dot',
504 orthogonal_edges=False,
505 orient='tb',
506 color_scheme=None,
507 as_png=False,
508 ):
509 # type: (str, bool, str, Optional[Dict[str, str]], bool) -> Union[Image, HTML]
510 '''
511 For use in inline rendering of graph data in Jupyter Lab.
513 Args:
514 layout (str, optional): Graph layout style.
515 Options include: circo, dot, fdp, neato, sfdp, twopi.
516 Default: dot.
517 orthogonal_edges (bool, optional): Whether graph edges should have
518 non-right angles. Default: False.
519 orient (str, optional): Graph layout orientation. Default: tb.
520 Options include:
522 * tb - top to bottom
523 * bt - bottom to top
524 * lr - left to right
525 * rl - right to left
526 color_scheme: (dict, optional): Color scheme to be applied to graph.
527 Default: rolling_pin.tools.COLOR_SCHEME
528 as_png (bool, optional): Display graph as a PNG image instead of
529 SVG. Useful for display on Github. Default: False.
531 Returns:
532 IPython.display.HTML: HTML object for inline display.
533 '''
534 if color_scheme is None:
535 color_scheme = tools.COLOR_SCHEME
537 dot = self.to_dot_graph(
538 orthogonal_edges=orthogonal_edges,
539 orient=orient,
540 color_scheme=color_scheme,
541 )
542 return tools.dot_to_html(dot, layout=layout, as_png=as_png)
544 def write(
545 self,
546 fullpath,
547 layout='dot',
548 orthogonal_edges=False,
549 orient='tb',
550 color_scheme=None
551 ):
552 # type: (Union[str, Path], str, bool, str, Dict[str, str]) -> BlobETL
553 '''
554 Writes internal dictionary to a given filepath.
555 Formats supported: svg, dot, png, json.
557 Args:
558 fulllpath (str or Path): File tobe written to.
559 layout (str, optional): Graph layout style.
560 Options include: circo, dot, fdp, neato, sfdp, twopi.
561 Default: dot.
562 orthogonal_edges (bool, optional): Whether graph edges should have
563 non-right angles. Default: False.
564 orient (str, optional): Graph layout orientation. Default: tb.
565 Options include:
567 * tb - top to bottom
568 * bt - bottom to top
569 * lr - left to right
570 * rl - right to left
571 color_scheme: (dict, optional): Color scheme to be applied to graph.
572 Default: rolling_pin.tools.COLOR_SCHEME
574 Raises:
575 ValueError: If invalid file extension given.
577 Returns:
578 BlobETL: self.
579 '''
580 if isinstance(fullpath, Path):
581 fullpath = fullpath.absolute().as_posix()
583 _, ext = os.path.splitext(fullpath)
584 ext = re.sub(r'^\.', '', ext)
585 if re.search('^json$', ext, re.I):
586 with open(fullpath, 'w') as f:
587 json.dump(self.to_dict(), f)
588 return self
590 if color_scheme is None:
591 color_scheme = tools.COLOR_SCHEME
593 graph = self.to_dot_graph(
594 orthogonal_edges=orthogonal_edges,
595 orient=orient,
596 color_scheme=color_scheme,
597 )
598 try:
599 tools.write_dot_graph(graph, fullpath, layout=layout,)
600 except ValueError:
601 msg = f'Invalid extension found: {ext}. '
602 msg += 'Valid extensions include: svg, dot, png, json.'
603 raise ValueError(msg)
604 return self