from typing import Any, Callable, Dict, Iterator, List, Optional, Union # noqa: F401
from IPython.display import HTML, Image # noqa: F401
import pydot # noqa: F401
from collections import Counter
import json
import os
import re
from copy import deepcopy
from pathlib import Path
import lunchbox.tools as lbt
from pandas import DataFrame
import networkx
import rolling_pin.tools as rpt
# ------------------------------------------------------------------------------
'''
Contains the BlobETL class, which is used for coverting JSON blobs, and their
python equivalents, into flat dictionaries that can easily be modified and
converted to directed graphs.
'''
[docs]
class BlobETL():
'''
Converts blob data internally into a flat dictionary that is universally
searchable, editable and convertable back to the data's original structure,
new blob structures or directed graphs.
'''
[docs]
def __init__(self, blob, separator='/'):
# type: (Any, str) -> None
'''
Contructs BlobETL instance.
Args:
blob (object): Iterable object.
separator (str, optional): String to be used as a field separator in
each key. Default: '/'.
'''
self._data = rpt \
.flatten(blob, separator=separator, embed_types=True) # type: Dict[str, Any]
self._separator = separator # type: str
# EDIT_METHODS--------------------------------------------------------------
[docs]
def query(self, regex, ignore_case=True, invert=False):
# type: (str, bool, bool) -> BlobETL
'''
Filter data items by key according to given regular expression.
Args:
regex (str): Regular expression.
ignore_case (bool, optional): Whether to consider case in the
regular expression search. Default: False.
invert (bool, optional): Whether to invert the predicate.
Default: False.
Returns:
BlobETL: New BlobETL instance.
'''
r = re.compile(regex, re.IGNORECASE) if ignore_case else re.compile(regex)
return self.filter(lambda x: bool(r.search(x)), by='key', invert=invert)
[docs]
def filter(self, predicate, by='key', invert=False):
# type: (Callable[[Any], bool], str, bool) -> BlobETL
'''
Filter data items by key, value or key + value, according to a given
predicate.
Args:
predicate: Function that returns a boolean value.
by (str, optional): Value handed to predicate.
Options include: key, value, key+value. Default: key.
invert (bool, optional): Whether to invert the predicate.
Default: False.
Raises:
ValueError: If by keyword is not key, value, or key+value.
Returns:
BlobETL: New BlobETL instance.
'''
pred = lambda items: predicate(*items)
if invert:
pred = lambda items: not predicate(*items)
data = {}
if by not in ['key', 'value', 'key+value']:
msg = f'Invalid by argument: {by}. Needs to be one of: '
msg += 'key, value, key+value.'
raise ValueError(msg)
for key, val in self._data.items():
item = None
if by == 'key':
item = [key]
elif by == 'value':
item = [val]
else:
item = [key, val]
if pred(item):
data[key] = val
return BlobETL(data, separator=self._separator)
[docs]
def delete(self, predicate, by='key'):
# type: (Callable[[Any], bool], str) -> BlobETL
'''
Delete data items by key, value or key + value, according to a given
predicate.
Args:
predicate: Function that returns a boolean value.
by (str, optional): Value handed to predicate.
Options include: key, value, key+value. Default: key.
Raises:
ValueError: If by keyword is not key, value, or key+value.
Returns:
BlobETL: New BlobETL instance.
'''
data = deepcopy(self._data)
if by not in ['key', 'value', 'key+value']:
msg = f'Invalid by argument: {by}. Needs to be one of: '
msg += 'key, value, key+value.'
raise ValueError(msg)
for key, val in self._data.items():
item = None
if by == 'key':
item = [key]
elif by == 'value':
item = [val]
else:
item = [key, val]
if predicate(*item):
del data[key]
return BlobETL(data, separator=self._separator)
[docs]
def set(
self,
predicate=None, # type: Optional[Callable[[Any, Any], bool]]
key_setter=None, # type: Optional[Callable[[Any, Any], str]]
value_setter=None, # type: Optional[Callable[[Any, Any], Any]]
):
# type: (...) -> BlobETL
'''
Filter data items by key, value or key + value, according to a given
predicate. Then set that items key by a given function and value by a
given function.
Args:
predicate (function, optional): Function of the form:
lambda k, v: bool. Default: None --> lambda k, v: True.
key_setter (function, optional): Function of the form:
lambda k, v: str. Default: None --> lambda k, v: k.
value_setter (function, optional): Function of the form:
lambda k, v: object. Default: None --> lambda k, v: v.
Returns:
BlobETL: New BlobETL instance.
'''
# assign default predicate
if predicate is None:
predicate = lambda k, v: True
# assign default key_setter
if key_setter is None:
key_setter = lambda k, v: k
# assign default value_setter
if value_setter is None:
value_setter = lambda k, v: v
data = deepcopy(self._data)
for item in self._data.items():
if predicate(*item):
k = key_setter(*item)
v = value_setter(*item)
del data[item[0]]
data[k] = v
return BlobETL(data, separator=self._separator)
[docs]
def update(self, item):
# type: (Union[Dict, BlobETL]) -> BlobETL
'''
Updates internal dictionary with given dictionary or BlobETL instance.
Given dictionary is first flattened with embeded types.
Args:
item (dict or BlobETL): Dictionary to be used for update.
Returns:
BlobETL: New BlobETL instance.
'''
if isinstance(item, BlobETL):
item = item._data
temp = rpt.flatten(item, separator=self._separator, embed_types=True)
data = deepcopy(self._data)
data.update(temp)
return BlobETL(data, separator=self._separator)
[docs]
def set_field(self, index, field_setter):
# type: (int, Callable[[str], str]) -> BlobETL
'''
Set's a field at a given index according to a given function.
Args:
index (int): Field index.
field_setter (function): Function of form lambda str: str.
Returns:
BlobETL: New BlobETL instance.
'''
output = {}
for key, val in self._data.items():
fields = key.split(self._separator)
fields[index] = field_setter(fields[index])
key = self._separator.join(fields)
output[key] = val
return BlobETL(output, separator=self._separator)
# EXPORT-METHODS------------------------------------------------------------
[docs]
def to_dict(self):
# type: () -> Dict[str, Any]
'''
Returns:
dict: Nested representation of internal data.
'''
return rpt.unembed(
rpt.nest(deepcopy(self._data), separator=self._separator)
)
[docs]
def to_flat_dict(self):
# type: () -> Dict[str, Any]
'''
Returns:
dict: Flat dictionary with embedded types.
'''
return deepcopy(self._data)
[docs]
def to_records(self):
# type: () -> List[Dict]
'''
Returns:
list[dict]: Data in records format.
'''
data = []
for key, val in self._data.items():
fields = key.split(self._separator)
row = {i: v for i, v in enumerate(fields)} # type: Dict[Any, Any]
row['value'] = val
data.append(row)
return data
[docs]
def to_dataframe(self, group_by=None):
# type: (Optional[int]) -> DataFrame
'''
Convert data to pandas DataFrame.
Args:
group_by (int, optional): Field index to group rows of data by.
Default: None.
Returns:
DataFrame: DataFrame.
'''
data = self.to_records() # type: Any
data = DataFrame(data)
if group_by is not None:
group = list(range(0, group_by))
data = DataFrame(data)\
.groupby(group, as_index=False)\
.agg(lambda x: x.tolist())\
.apply(lambda x: x.to_dict(), axis=1)\
.tolist()
data = DataFrame(data)
# clean up column order
cols = data.columns.tolist() # type: List[str]
cols = list(sorted(filter(lambda x: x != 'value', cols)))
cols += ['value']
data = data[cols]
return data
[docs]
def to_prototype(self):
# type: () -> BlobETL
'''
Convert data to prototypical representation.
Example:
========
>>> data = {
'users': [
{
'name': {
'first': 'tom',
'last': 'smith',
}
},{
'name': {
'first': 'dick',
'last': 'smith',
}
},{
'name': {
'first': 'jane',
'last': 'doe',
}
},
]
}
>>> BlobETL(data).to_prototype().to_dict()
{
'^users': {
'<list_[0-9]+>': {
'name': {
'first$': Counter({'dick': 1, 'jane': 1, 'tom': 1}),
'last$': Counter({'doe': 1, 'smith': 2})
}
}
}
}
Returns:
BlobETL: New BlobETL instance.
'''
def regex_in_list(regex, items):
# type: (str, List[str]) -> bool
for item in items:
if re.search(regex, item):
return True
return False # pragma: no cover
def field_combinations(a, b):
# type: (List[str], List[str]) -> List[str]
output = []
for fa in a:
for fb in b:
output.append(fa + self._separator + fb)
return output
keys = list(self._data.keys())
fields = list(map(lambda x: x.split(self._separator), keys))
fields = DataFrame(fields)\
.apply(lambda x: x.unique().tolist())\
.apply(lambda x: filter(lambda y: y is not None, x)) \
.apply(lambda x: map(
lambda y: re.sub(r'<([a-z]+)_\d+>', '<\\1_[0-9]+>', y),
x)) \
.apply(lambda x: list(set(x))) \
.tolist()
prev = fields[0]
regexes = list()
for i, level in enumerate(fields[1:]):
temp = field_combinations(prev, level) # type: Union[List, Iterator]
temp = filter(lambda x: regex_in_list('^' + x, keys), temp)
prev = list(temp)
regexes.extend(prev)
regexes = lbt.get_ordered_unique(regexes)
p_keys = set()
for regex in regexes:
other = deepcopy(regexes)
other.remove(regex)
not_in_other = True
for item in other:
if regex in item:
not_in_other = False
if not_in_other:
p_keys.add(f'^{regex}$')
output = {}
for key in p_keys:
values = self.query(key).to_flat_dict().values()
output[key] = Counter(values)
return BlobETL(output, separator=self._separator)
[docs]
def to_networkx_graph(self):
# type: () -> networkx.DiGraph
'''
Converts internal dictionary into a networkx directed graph.
Returns:
networkx.DiGraph: Graph representation of dictionary.
'''
graph = networkx.DiGraph()
graph.add_node('root')
embed_re = re.compile(r'<[a-z]+_(\d+)>')
def recurse(item, parent):
# type: (Dict, str) -> None
for key, val in item.items():
k = f'{parent}{self._separator}{key}'
short_name = embed_re.sub('\\1', key)
graph.add_node(k, short_name=short_name, node_type='key')
graph.add_edge(parent, k)
if isinstance(val, dict):
recurse(val, k)
else:
graph.nodes[k]['value'] = [val]
name = f'"{str(val)}"'
v = f'"{k}{self._separator}{str(val)}"'
graph.add_node(
v, short_name=name, node_type='value', value=[val]
)
graph.add_edge(k, v)
recurse(rpt.nest(self._data, self._separator), 'root')
graph.remove_node('root')
return graph
[docs]
def to_dot_graph(
self, orthogonal_edges=False, orient='tb', color_scheme=None
):
# type: (bool, str, Optional[Dict[str, str]]) -> pydot.Dot
'''
Converts internal dictionary into pydot graph.
Key and value nodes and edges are colored differently.
Args:
orthogonal_edges (bool, optional): Whether graph edges should have
non-right angles. Default: False.
orient (str, optional): Graph layout orientation. Default: tb.
Options include:
* tb - top to bottom
* bt - bottom to top
* lr - left to right
* rl - right to left
color_scheme: (dict, optional): Color scheme to be applied to graph.
Default: rolling_pin.tools.COLOR_SCHEME
Raises:
ValueError: If orient is invalid.
Returns:
pydot.Dot: Dot graph representation of dictionary.
'''
orient = orient.lower()
orientations = ['tb', 'bt', 'lr', 'rl']
if orient not in orientations:
msg = f'Invalid orient value. {orient} not in {orientations}.'
raise ValueError(msg)
# set default colort scheme
if color_scheme is None:
color_scheme = rpt.COLOR_SCHEME
# create pydot graph
graph = self.to_networkx_graph()
dot = networkx.drawing.nx_pydot.to_pydot(graph)
# set layout orientation
dot.set_rankdir(orient.upper())
# set graph background color
dot.set_bgcolor(color_scheme['background'])
# set edge draw type
if orthogonal_edges:
dot.set_splines('ortho')
# set draw parameters for each node of graph
for node in dot.get_nodes():
node.set_shape('rect')
node.set_style('filled')
node.set_color(color_scheme['node'])
node.set_fillcolor(color_scheme['node'])
node.set_fontcolor(color_scheme['node_font'])
node.set_fontname('Courier')
# if node has short name, set its displayed name to that
attrs = node.get_attributes()
if 'short_name' in attrs:
node.set_label(attrs['short_name'])
# if node type is value change its colors
if 'node_type' in attrs and attrs['node_type'] == 'value':
node.set_color(color_scheme['node_value'])
node.set_fillcolor(color_scheme['node_value'])
node.set_fontcolor(color_scheme['node_value_font'])
# set draw parameters for each edge in graph
for edge in dot.get_edges():
edge.set_color(color_scheme['edge'])
# if edge destination node type is value change its color
node = dot.get_node(edge.get_destination())[0]
attrs = node.get_attributes()
if 'node_type' in attrs and attrs['node_type'] == 'value':
edge.set_color(color_scheme['edge_value'])
return dot
[docs]
def to_html(
self,
layout='dot',
orthogonal_edges=False,
orient='tb',
color_scheme=None,
as_png=False,
):
# type: (str, bool, str, Optional[Dict[str, str]], bool) -> Union[Image, HTML]
'''
For use in inline rendering of graph data in Jupyter Lab.
Args:
layout (str, optional): Graph layout style.
Options include: circo, dot, fdp, neato, sfdp, twopi.
Default: dot.
orthogonal_edges (bool, optional): Whether graph edges should have
non-right angles. Default: False.
orient (str, optional): Graph layout orientation. Default: tb.
Options include:
* tb - top to bottom
* bt - bottom to top
* lr - left to right
* rl - right to left
color_scheme: (dict, optional): Color scheme to be applied to graph.
Default: rolling_pin.tools.COLOR_SCHEME
as_png (bool, optional): Display graph as a PNG image instead of
SVG. Useful for display on Github. Default: False.
Returns:
IPython.display.HTML: HTML object for inline display.
'''
if color_scheme is None:
color_scheme = rpt.COLOR_SCHEME
dot = self.to_dot_graph(
orthogonal_edges=orthogonal_edges,
orient=orient,
color_scheme=color_scheme,
)
return rpt.dot_to_html(dot, layout=layout, as_png=as_png)
[docs]
def write(
self,
fullpath,
layout='dot',
orthogonal_edges=False,
orient='tb',
color_scheme=None
):
# type: (Union[str, Path], str, bool, str, Optional[Dict[str, str]]) -> BlobETL
'''
Writes internal dictionary to a given filepath.
Formats supported: svg, dot, png, json.
Args:
fulllpath (str or Path): File tobe written to.
layout (str, optional): Graph layout style.
Options include: circo, dot, fdp, neato, sfdp, twopi.
Default: dot.
orthogonal_edges (bool, optional): Whether graph edges should have
non-right angles. Default: False.
orient (str, optional): Graph layout orientation. Default: tb.
Options include:
* tb - top to bottom
* bt - bottom to top
* lr - left to right
* rl - right to left
color_scheme: (dict, optional): Color scheme to be applied to graph.
Default: rolling_pin.tools.COLOR_SCHEME
Raises:
ValueError: If invalid file extension given.
Returns:
BlobETL: self.
'''
if isinstance(fullpath, Path):
fullpath = fullpath.absolute().as_posix()
_, ext = os.path.splitext(fullpath)
ext = re.sub(r'^\.', '', ext)
if re.search('^json$', ext, re.I):
with open(fullpath, 'w') as f:
json.dump(self.to_dict(), f)
return self
if color_scheme is None:
color_scheme = rpt.COLOR_SCHEME
graph = self.to_dot_graph(
orthogonal_edges=orthogonal_edges,
orient=orient,
color_scheme=color_scheme,
)
try:
rpt.write_dot_graph(graph, fullpath, layout=layout,)
except ValueError:
msg = f'Invalid extension found: {ext}. '
msg += 'Valid extensions include: svg, dot, png, json.'
raise ValueError(msg)
return self