Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from typing import Any, Callable, Dict, Iterator, List, Optional, Union 

2from IPython.display import HTML, Image 

3import pydot 

4 

5from collections import Counter 

6import json 

7import os 

8import re 

9from copy import deepcopy 

10from pathlib import Path 

11 

12import lunchbox.tools as lbt 

13from pandas import DataFrame 

14import networkx 

15 

16import rolling_pin.tools as tools 

17# ------------------------------------------------------------------------------ 

18 

19''' 

20Contains the BlobETL class, which is used for coverting JSON blobs, and their 

21python equivalents, into flat dictionaries that can easily be modified and 

22converted to directed graphs. 

23''' 

24 

25 

26class BlobETL(): 

27 ''' 

28 Converts blob data internally into a flat dictionary that is universally 

29 searchable, editable and convertable back to the data's original structure, 

30 new blob structures or dircted graphs. 

31 ''' 

32 def __init__(self, blob, separator='/'): 

33 # type: (Any, str) -> None 

34 ''' 

35 Contructs BlobETL instance. 

36 

37 Args: 

38 blob (object): Iterable object. 

39 separator (str, optional): String to be used as a field separator in 

40 each key. Default: '/'. 

41 ''' 

42 self._data = tools \ 

43 .flatten(blob, separator=separator, embed_types=True) # type: Dict[str, Any] 

44 self._separator = separator # type: str 

45 

46 # EDIT_METHODS-------------------------------------------------------------- 

47 def query(self, regex, ignore_case=True): 

48 # type: (str, bool) -> BlobETL 

49 ''' 

50 Filter data items by key according to given regular expression. 

51 

52 Args: 

53 regex (str): Regular expression. 

54 ignore_casd (bool, optional): Whether to consider case in the 

55 regular expression search. Default: False. 

56 

57 Returns: 

58 BlobETL: New BlobETL instance. 

59 ''' 

60 if ignore_case: 

61 return self.filter(lambda x: bool(re.search(regex, x, re.I)), by='key') 

62 return self.filter(lambda x: bool(re.search(regex, x)), by='key') 

63 

64 def filter(self, predicate, by='key'): 

65 # type: (Callable[[Any], bool], str) -> BlobETL 

66 ''' 

67 Filter data items by key, value or key + value, according to a given 

68 predicate. 

69 

70 Args: 

71 predicate: Function that returns a boolean value. 

72 by (str, optional): Value handed to predicate. 

73 Options include: key, value, key+value. Default: key. 

74 

75 Raises: 

76 ValueError: If by keyword is not key, value, or key+value. 

77 

78 Returns: 

79 BlobETL: New BlobETL instance. 

80 ''' 

81 data = {} 

82 if by not in ['key', 'value', 'key+value']: 

83 msg = f'Invalid by argument: {by}. Needs to be one of: ' 

84 msg += 'key, value, key+value.' 

85 raise ValueError(msg) 

86 

87 for key, val in self._data.items(): 

88 item = None 

89 if by == 'key': 

90 item = [key] 

91 elif by == 'value': 

92 item = [val] 

93 else: 

94 item = [key, val] 

95 

96 if predicate(*item): 

97 data[key] = val 

98 

99 return BlobETL(data, separator=self._separator) 

100 

101 def delete(self, predicate, by='key'): 

102 # type: (Callable[[Any], bool], str) -> BlobETL 

103 ''' 

104 Delete data items by key, value or key + value, according to a given 

105 predicate. 

106 

107 Args: 

108 predicate: Function that returns a boolean value. 

109 by (str, optional): Value handed to predicate. 

110 Options include: key, value, key+value. Default: key. 

111 

112 Raises: 

113 ValueError: If by keyword is not key, value, or key+value. 

114 

115 Returns: 

116 BlobETL: New BlobETL instance. 

117 ''' 

118 data = deepcopy(self._data) 

119 if by not in ['key', 'value', 'key+value']: 

120 msg = f'Invalid by argument: {by}. Needs to be one of: ' 

121 msg += 'key, value, key+value.' 

122 raise ValueError(msg) 

123 

124 for key, val in self._data.items(): 

125 item = None 

126 if by == 'key': 

127 item = [key] 

128 elif by == 'value': 

129 item = [val] 

130 else: 

131 item = [key, val] 

132 

133 if predicate(*item): 

134 del data[key] 

135 

136 return BlobETL(data, separator=self._separator) 

137 

138 def set( 

139 self, 

140 predicate=None, # type: Optional[Callable[[Any, Any], bool]] 

141 key_setter=None, # type: Optional[Callable[[Any, Any], str]] 

142 value_setter=None, # type: Optional[Callable[[Any, Any], Any]] 

143 ): 

144 # type: (...) -> BlobETL 

145 ''' 

146 Filter data items by key, value or key + value, according to a given 

147 predicate. Then set that items key by a given function and value by a 

148 given function. 

149 

150 Args: 

151 predicate (function, optional): Function of the form: 

152 lambda k, v: bool. Default: None --> lambda k, v: True. 

153 key_setter (function, optional): Function of the form: 

154 lambda k, v: str. Default: None --> lambda k, v: k. 

155 value_setter (function, optional): Function of the form: 

156 lambda k, v: object. Default: None --> lambda k, v: v. 

157 

158 Returns: 

159 BlobETL: New BlobETL instance. 

160 ''' 

161 # assign default predicate 

162 if predicate is None: 

163 predicate = lambda k, v: True 

164 

165 # assign default key_setter 

166 if key_setter is None: 

167 key_setter = lambda k, v: k 

168 

169 # assign default value_setter 

170 if value_setter is None: 

171 value_setter = lambda k, v: v 

172 

173 data = deepcopy(self._data) 

174 for item in self._data.items(): 

175 if predicate(*item): 

176 k = key_setter(*item) 

177 v = value_setter(*item) 

178 del data[item[0]] 

179 data[k] = v 

180 

181 return BlobETL(data, separator=self._separator) 

182 

183 def update(self, item): 

184 # type: (Union[Dict, BlobETL]) -> BlobETL 

185 ''' 

186 Updates internal dictionary with given dictionary or BlobETL instance. 

187 Given dictionary is first flattened with embeded types. 

188 

189 Args: 

190 item (dict or BlobETL): Dictionary to be used for update. 

191 

192 Returns: 

193 BlobETL: New BlobETL instance. 

194 ''' 

195 if isinstance(item, BlobETL): 

196 item = item._data 

197 temp = tools.flatten(item, separator=self._separator, embed_types=True) 

198 data = deepcopy(self._data) 

199 data.update(temp) 

200 return BlobETL(data, separator=self._separator) 

201 

202 def set_field(self, index, field_setter): 

203 # type: (int, Callable[[str], str]) -> BlobETL 

204 ''' 

205 Set's a field at a given index according to a given function. 

206 

207 Args: 

208 index (int): Field index. 

209 field_setter (function): Function of form lambda str: str. 

210 

211 Returns: 

212 BlobETL: New BlobETL instance. 

213 ''' 

214 output = {} 

215 for key, val in self._data.items(): 

216 fields = key.split(self._separator) 

217 fields[index] = field_setter(fields[index]) 

218 key = self._separator.join(fields) 

219 output[key] = val 

220 return BlobETL(output, separator=self._separator) 

221 

222 # EXPORT-METHODS------------------------------------------------------------ 

223 def to_dict(self): 

224 # type: () -> Dict[str, Any] 

225 ''' 

226 Returns: 

227 dict: Nested representation of internal data. 

228 ''' 

229 return tools.unembed( 

230 tools.nest(deepcopy(self._data), separator=self._separator) 

231 ) 

232 

233 def to_flat_dict(self): 

234 # type: () -> Dict[str, Any] 

235 ''' 

236 Returns: 

237 dict: Flat dictionary with embedded types. 

238 ''' 

239 return deepcopy(self._data) 

240 

241 def to_records(self): 

242 # type: () -> List[Dict] 

243 ''' 

244 Returns: 

245 list[dict]: Data in records format. 

246 ''' 

247 data = [] 

248 for key, val in self._data.items(): 

249 fields = key.split(self._separator) 

250 row = {i: v for i, v in enumerate(fields)} # type: Dict[Any, Any] 

251 row['value'] = val 

252 data.append(row) 

253 return data 

254 

255 def to_dataframe(self, group_by=None): 

256 # type: (Optional[int]) -> DataFrame 

257 ''' 

258 Convert data to pandas DataFrame. 

259 

260 Args: 

261 group_by (int, optional): Field index to group rows of data by. 

262 Default: None. 

263 

264 Returns: 

265 DataFrame: DataFrame. 

266 ''' 

267 data = self.to_records() # type: Any 

268 data = DataFrame(data) 

269 

270 if group_by is not None: 

271 group = list(range(0, group_by)) 

272 data = DataFrame(data)\ 

273 .groupby(group, as_index=False)\ 

274 .agg(lambda x: x.tolist())\ 

275 .apply(lambda x: x.to_dict(), axis=1)\ 

276 .tolist() 

277 data = DataFrame(data) 

278 

279 # clean up column order 

280 cols = data.columns.tolist() # type: List[str] 

281 cols = list(sorted(filter(lambda x: x != 'value', cols))) 

282 cols += ['value'] 

283 data = data[cols] 

284 

285 return data 

286 

287 def to_prototype(self): 

288 # type: () -> BlobETL 

289 ''' 

290 Convert data to prototypical representation. 

291 

292 Example: 

293 

294 >>> data = { 

295 'users': [ 

296 { 

297 'name': { 

298 'first': 'tom', 

299 'last': 'smith', 

300 } 

301 },{ 

302 'name': { 

303 'first': 'dick', 

304 'last': 'smith', 

305 } 

306 },{ 

307 'name': { 

308 'first': 'jane', 

309 'last': 'doe', 

310 } 

311 }, 

312 ] 

313 } 

314 >>> BlobETL(data).to_prototype().to_dict() 

315 { 

316 '^users': { 

317 '<list_[0-9]+>': { 

318 'name': { 

319 'first$': Counter({'dick': 1, 'jane': 1, 'tom': 1}), 

320 'last$': Counter({'doe': 1, 'smith': 2}) 

321 } 

322 } 

323 } 

324 } 

325 

326 Returns: 

327 BlobETL: New BlobETL instance. 

328 ''' 

329 def regex_in_list(regex, items): 

330 # type: (str, List[str]) -> bool 

331 for item in items: 

332 if re.search(regex, item): 

333 return True 

334 return False # pragma: no cover 

335 

336 def field_combinations(a, b): 

337 # type: (List[str], List[str]) -> List[str] 

338 output = [] 

339 for fa in a: 

340 for fb in b: 

341 output.append(fa + self._separator + fb) 

342 return output 

343 

344 keys = list(self._data.keys()) 

345 fields = list(map(lambda x: x.split(self._separator), keys)) 

346 

347 fields = DataFrame(fields)\ 

348 .apply(lambda x: x.unique().tolist())\ 

349 .apply(lambda x: filter(lambda y: y is not None, x)) \ 

350 .apply(lambda x: map( 

351 lambda y: re.sub(r'<([a-z]+)_\d+>', '<\\1_[0-9]+>', y), 

352 x)) \ 

353 .apply(lambda x: list(set(x))) \ 

354 .tolist() 

355 

356 prev = fields[0] 

357 regexes = list() 

358 for i, level in enumerate(fields[1:]): 

359 temp = field_combinations(prev, level) # type: Union[List, Iterator] 

360 temp = filter(lambda x: regex_in_list('^' + x, keys), temp) 

361 prev = list(temp) 

362 regexes.extend(prev) 

363 

364 regexes = lbt.get_ordered_unique(regexes) 

365 

366 p_keys = set() 

367 for regex in regexes: 

368 other = deepcopy(regexes) 

369 other.remove(regex) 

370 not_in_other = True 

371 for item in other: 

372 if regex in item: 

373 not_in_other = False 

374 if not_in_other: 

375 p_keys.add(f'^{regex}$') 

376 

377 output = {} 

378 for key in p_keys: 

379 values = self.query(key).to_flat_dict().values() 

380 output[key] = Counter(values) 

381 return BlobETL(output, separator=self._separator) 

382 

383 def to_networkx_graph(self): 

384 # type: () -> networkx.DiGraph 

385 ''' 

386 Converts internal dictionary into a networkx directed graph. 

387 

388 Returns: 

389 networkx.DiGraph: Graph representation of dictionary. 

390 ''' 

391 graph = networkx.DiGraph() 

392 graph.add_node('root') 

393 embed_re = re.compile(r'<[a-z]+_(\d+)>') 

394 

395 def recurse(item, parent): 

396 # type: (Dict, str) -> None 

397 for key, val in item.items(): 

398 k = f'{parent}{self._separator}{key}' 

399 short_name = embed_re.sub('\\1', key) 

400 graph.add_node(k, short_name=short_name, node_type='key') 

401 graph.add_edge(parent, k) 

402 

403 if isinstance(val, dict): 

404 recurse(val, k) 

405 else: 

406 graph.nodes[k]['value'] = [val] 

407 name = f'"{str(val)}"' 

408 v = f'"{k}{self._separator}{str(val)}"' 

409 graph.add_node( 

410 v, short_name=name, node_type='value', value=[val] 

411 ) 

412 graph.add_edge(k, v) 

413 

414 recurse(tools.nest(self._data, self._separator), 'root') 

415 graph.remove_node('root') 

416 return graph 

417 

418 def to_dot_graph( 

419 self, orthogonal_edges=False, orient='tb', color_scheme=None 

420 ): 

421 # type: (bool, str, Optional[Dict[str, str]]) -> pydot.Dot 

422 ''' 

423 Converts internal dictionary into pydot graph. 

424 Key and value nodes and edges are colored differently. 

425 

426 Args: 

427 orthogonal_edges (bool, optional): Whether graph edges should have 

428 non-right angles. Default: False. 

429 orient (str, optional): Graph layout orientation. Default: tb. 

430 Options include: 

431 

432 * tb - top to bottom 

433 * bt - bottom to top 

434 * lr - left to right 

435 * rl - right to left 

436 color_scheme: (dict, optional): Color scheme to be applied to graph. 

437 Default: rolling_pin.tools.COLOR_SCHEME 

438 

439 Raises: 

440 ValueError: If orient is invalid. 

441 

442 Returns: 

443 pydot.Dot: Dot graph representation of dictionary. 

444 ''' 

445 orient = orient.lower() 

446 orientations = ['tb', 'bt', 'lr', 'rl'] 

447 if orient not in orientations: 

448 msg = f'Invalid orient value. {orient} not in {orientations}.' 

449 raise ValueError(msg) 

450 

451 # set default colort scheme 

452 if color_scheme is None: 

453 color_scheme = tools.COLOR_SCHEME 

454 

455 # create pydot graph 

456 graph = self.to_networkx_graph() 

457 dot = networkx.drawing.nx_pydot.to_pydot(graph) 

458 

459 # set layout orientation 

460 dot.set_rankdir(orient.upper()) 

461 

462 # set graph background color 

463 dot.set_bgcolor(color_scheme['background']) 

464 

465 # set edge draw type 

466 if orthogonal_edges: 

467 dot.set_splines('ortho') 

468 

469 # set draw parameters for each node of graph 

470 for node in dot.get_nodes(): 

471 node.set_shape('rect') 

472 node.set_style('filled') 

473 node.set_color(color_scheme['node']) 

474 node.set_fillcolor(color_scheme['node']) 

475 node.set_fontcolor(color_scheme['node_font']) 

476 node.set_fontname('Courier') 

477 

478 # if node has short name, set its displayed name to that 

479 attrs = node.get_attributes() 

480 if 'short_name' in attrs: 

481 node.set_label(attrs['short_name']) 

482 

483 # if node type is value change its colors 

484 if 'node_type' in attrs and attrs['node_type'] == 'value': 

485 node.set_color(color_scheme['node_value']) 

486 node.set_fillcolor(color_scheme['node_value']) 

487 node.set_fontcolor(color_scheme['node_value_font']) 

488 

489 # set draw parameters for each edge in graph 

490 for edge in dot.get_edges(): 

491 edge.set_color(color_scheme['edge']) 

492 

493 # if edge destination node type is value change its color 

494 node = dot.get_node(edge.get_destination())[0] 

495 attrs = node.get_attributes() 

496 if 'node_type' in attrs and attrs['node_type'] == 'value': 

497 edge.set_color(color_scheme['edge_value']) 

498 

499 return dot 

500 

501 def to_html( 

502 self, 

503 layout='dot', 

504 orthogonal_edges=False, 

505 orient='tb', 

506 color_scheme=None, 

507 as_png=False, 

508 ): 

509 # type: (str, bool, str, Optional[Dict[str, str]], bool) -> Union[Image, HTML] 

510 ''' 

511 For use in inline rendering of graph data in Jupyter Lab. 

512 

513 Args: 

514 layout (str, optional): Graph layout style. 

515 Options include: circo, dot, fdp, neato, sfdp, twopi. 

516 Default: dot. 

517 orthogonal_edges (bool, optional): Whether graph edges should have 

518 non-right angles. Default: False. 

519 orient (str, optional): Graph layout orientation. Default: tb. 

520 Options include: 

521 

522 * tb - top to bottom 

523 * bt - bottom to top 

524 * lr - left to right 

525 * rl - right to left 

526 color_scheme: (dict, optional): Color scheme to be applied to graph. 

527 Default: rolling_pin.tools.COLOR_SCHEME 

528 as_png (bool, optional): Display graph as a PNG image instead of 

529 SVG. Useful for display on Github. Default: False. 

530 

531 Returns: 

532 IPython.display.HTML: HTML object for inline display. 

533 ''' 

534 if color_scheme is None: 

535 color_scheme = tools.COLOR_SCHEME 

536 

537 dot = self.to_dot_graph( 

538 orthogonal_edges=orthogonal_edges, 

539 orient=orient, 

540 color_scheme=color_scheme, 

541 ) 

542 return tools.dot_to_html(dot, layout=layout, as_png=as_png) 

543 

544 def write( 

545 self, 

546 fullpath, 

547 layout='dot', 

548 orthogonal_edges=False, 

549 orient='tb', 

550 color_scheme=None 

551 ): 

552 # type: (Union[str, Path], str, bool, str, Dict[str, str]) -> BlobETL 

553 ''' 

554 Writes internal dictionary to a given filepath. 

555 Formats supported: svg, dot, png, json. 

556 

557 Args: 

558 fulllpath (str or Path): File tobe written to. 

559 layout (str, optional): Graph layout style. 

560 Options include: circo, dot, fdp, neato, sfdp, twopi. 

561 Default: dot. 

562 orthogonal_edges (bool, optional): Whether graph edges should have 

563 non-right angles. Default: False. 

564 orient (str, optional): Graph layout orientation. Default: tb. 

565 Options include: 

566 

567 * tb - top to bottom 

568 * bt - bottom to top 

569 * lr - left to right 

570 * rl - right to left 

571 color_scheme: (dict, optional): Color scheme to be applied to graph. 

572 Default: rolling_pin.tools.COLOR_SCHEME 

573 

574 Raises: 

575 ValueError: If invalid file extension given. 

576 

577 Returns: 

578 BlobETL: self. 

579 ''' 

580 if isinstance(fullpath, Path): 

581 fullpath = fullpath.absolute().as_posix() 

582 

583 _, ext = os.path.splitext(fullpath) 

584 ext = re.sub(r'^\.', '', ext) 

585 if re.search('^json$', ext, re.I): 

586 with open(fullpath, 'w') as f: 

587 json.dump(self.to_dict(), f) 

588 return self 

589 

590 if color_scheme is None: 

591 color_scheme = tools.COLOR_SCHEME 

592 

593 graph = self.to_dot_graph( 

594 orthogonal_edges=orthogonal_edges, 

595 orient=orient, 

596 color_scheme=color_scheme, 

597 ) 

598 try: 

599 tools.write_dot_graph(graph, fullpath, layout=layout,) 

600 except ValueError: 

601 msg = f'Invalid extension found: {ext}. ' 

602 msg += 'Valid extensions include: svg, dot, png, json.' 

603 raise ValueError(msg) 

604 return self