Coverage for /home/ubuntu/rolling-pin/python/rolling_pin/blob_etl.py: 100%

226 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2023-11-15 00:43 +0000

1from typing import Any, Callable, Dict, Iterator, List, Optional, Union # noqa: F401 

2from IPython.display import HTML, Image # noqa: F401 

3import pydot # noqa: F401 

4 

5from collections import Counter 

6import json 

7import os 

8import re 

9from copy import deepcopy 

10from pathlib import Path 

11 

12import lunchbox.tools as lbt 

13from pandas import DataFrame 

14import networkx 

15 

16import rolling_pin.tools as rpt 

17# ------------------------------------------------------------------------------ 

18 

19''' 

20Contains the BlobETL class, which is used for coverting JSON blobs, and their 

21python equivalents, into flat dictionaries that can easily be modified and 

22converted to directed graphs. 

23''' 

24 

25 

26class BlobETL(): 

27 ''' 

28 Converts blob data internally into a flat dictionary that is universally 

29 searchable, editable and convertable back to the data's original structure, 

30 new blob structures or directed graphs. 

31 ''' 

32 def __init__(self, blob, separator='/'): 

33 # type: (Any, str) -> None 

34 ''' 

35 Contructs BlobETL instance. 

36 

37 Args: 

38 blob (object): Iterable object. 

39 separator (str, optional): String to be used as a field separator in 

40 each key. Default: '/'. 

41 ''' 

42 self._data = rpt \ 

43 .flatten(blob, separator=separator, embed_types=True) # type: Dict[str, Any] 

44 self._separator = separator # type: str 

45 

46 # EDIT_METHODS-------------------------------------------------------------- 

47 def query(self, regex, ignore_case=True, invert=False): 

48 # type: (str, bool, bool) -> BlobETL 

49 ''' 

50 Filter data items by key according to given regular expression. 

51 

52 Args: 

53 regex (str): Regular expression. 

54 ignore_case (bool, optional): Whether to consider case in the 

55 regular expression search. Default: False. 

56 invert (bool, optional): Whether to invert the predicate. 

57 Default: False. 

58 

59 Returns: 

60 BlobETL: New BlobETL instance. 

61 ''' 

62 r = re.compile(regex, re.IGNORECASE) if ignore_case else re.compile(regex) 

63 return self.filter(lambda x: bool(r.search(x)), by='key', invert=invert) 

64 

65 def filter(self, predicate, by='key', invert=False): 

66 # type: (Callable[[Any], bool], str, bool) -> BlobETL 

67 ''' 

68 Filter data items by key, value or key + value, according to a given 

69 predicate. 

70 

71 Args: 

72 predicate: Function that returns a boolean value. 

73 by (str, optional): Value handed to predicate. 

74 Options include: key, value, key+value. Default: key. 

75 invert (bool, optional): Whether to invert the predicate. 

76 Default: False. 

77 

78 Raises: 

79 ValueError: If by keyword is not key, value, or key+value. 

80 

81 Returns: 

82 BlobETL: New BlobETL instance. 

83 ''' 

84 pred = lambda items: predicate(*items) 

85 if invert: 

86 pred = lambda items: not predicate(*items) 

87 

88 data = {} 

89 if by not in ['key', 'value', 'key+value']: 

90 msg = f'Invalid by argument: {by}. Needs to be one of: ' 

91 msg += 'key, value, key+value.' 

92 raise ValueError(msg) 

93 

94 for key, val in self._data.items(): 

95 item = None 

96 if by == 'key': 

97 item = [key] 

98 elif by == 'value': 

99 item = [val] 

100 else: 

101 item = [key, val] 

102 

103 if pred(item): 

104 data[key] = val 

105 

106 return BlobETL(data, separator=self._separator) 

107 

108 def delete(self, predicate, by='key'): 

109 # type: (Callable[[Any], bool], str) -> BlobETL 

110 ''' 

111 Delete data items by key, value or key + value, according to a given 

112 predicate. 

113 

114 Args: 

115 predicate: Function that returns a boolean value. 

116 by (str, optional): Value handed to predicate. 

117 Options include: key, value, key+value. Default: key. 

118 

119 Raises: 

120 ValueError: If by keyword is not key, value, or key+value. 

121 

122 Returns: 

123 BlobETL: New BlobETL instance. 

124 ''' 

125 data = deepcopy(self._data) 

126 if by not in ['key', 'value', 'key+value']: 

127 msg = f'Invalid by argument: {by}. Needs to be one of: ' 

128 msg += 'key, value, key+value.' 

129 raise ValueError(msg) 

130 

131 for key, val in self._data.items(): 

132 item = None 

133 if by == 'key': 

134 item = [key] 

135 elif by == 'value': 

136 item = [val] 

137 else: 

138 item = [key, val] 

139 

140 if predicate(*item): 

141 del data[key] 

142 

143 return BlobETL(data, separator=self._separator) 

144 

145 def set( 

146 self, 

147 predicate=None, # type: Optional[Callable[[Any, Any], bool]] 

148 key_setter=None, # type: Optional[Callable[[Any, Any], str]] 

149 value_setter=None, # type: Optional[Callable[[Any, Any], Any]] 

150 ): 

151 # type: (...) -> BlobETL 

152 ''' 

153 Filter data items by key, value or key + value, according to a given 

154 predicate. Then set that items key by a given function and value by a 

155 given function. 

156 

157 Args: 

158 predicate (function, optional): Function of the form: 

159 lambda k, v: bool. Default: None --> lambda k, v: True. 

160 key_setter (function, optional): Function of the form: 

161 lambda k, v: str. Default: None --> lambda k, v: k. 

162 value_setter (function, optional): Function of the form: 

163 lambda k, v: object. Default: None --> lambda k, v: v. 

164 

165 Returns: 

166 BlobETL: New BlobETL instance. 

167 ''' 

168 # assign default predicate 

169 if predicate is None: 

170 predicate = lambda k, v: True 

171 

172 # assign default key_setter 

173 if key_setter is None: 

174 key_setter = lambda k, v: k 

175 

176 # assign default value_setter 

177 if value_setter is None: 

178 value_setter = lambda k, v: v 

179 

180 data = deepcopy(self._data) 

181 for item in self._data.items(): 

182 if predicate(*item): 

183 k = key_setter(*item) 

184 v = value_setter(*item) 

185 del data[item[0]] 

186 data[k] = v 

187 

188 return BlobETL(data, separator=self._separator) 

189 

190 def update(self, item): 

191 # type: (Union[Dict, BlobETL]) -> BlobETL 

192 ''' 

193 Updates internal dictionary with given dictionary or BlobETL instance. 

194 Given dictionary is first flattened with embeded types. 

195 

196 Args: 

197 item (dict or BlobETL): Dictionary to be used for update. 

198 

199 Returns: 

200 BlobETL: New BlobETL instance. 

201 ''' 

202 if isinstance(item, BlobETL): 

203 item = item._data 

204 temp = rpt.flatten(item, separator=self._separator, embed_types=True) 

205 data = deepcopy(self._data) 

206 data.update(temp) 

207 return BlobETL(data, separator=self._separator) 

208 

209 def set_field(self, index, field_setter): 

210 # type: (int, Callable[[str], str]) -> BlobETL 

211 ''' 

212 Set's a field at a given index according to a given function. 

213 

214 Args: 

215 index (int): Field index. 

216 field_setter (function): Function of form lambda str: str. 

217 

218 Returns: 

219 BlobETL: New BlobETL instance. 

220 ''' 

221 output = {} 

222 for key, val in self._data.items(): 

223 fields = key.split(self._separator) 

224 fields[index] = field_setter(fields[index]) 

225 key = self._separator.join(fields) 

226 output[key] = val 

227 return BlobETL(output, separator=self._separator) 

228 

229 # EXPORT-METHODS------------------------------------------------------------ 

230 def to_dict(self): 

231 # type: () -> Dict[str, Any] 

232 ''' 

233 Returns: 

234 dict: Nested representation of internal data. 

235 ''' 

236 return rpt.unembed( 

237 rpt.nest(deepcopy(self._data), separator=self._separator) 

238 ) 

239 

240 def to_flat_dict(self): 

241 # type: () -> Dict[str, Any] 

242 ''' 

243 Returns: 

244 dict: Flat dictionary with embedded types. 

245 ''' 

246 return deepcopy(self._data) 

247 

248 def to_records(self): 

249 # type: () -> List[Dict] 

250 ''' 

251 Returns: 

252 list[dict]: Data in records format. 

253 ''' 

254 data = [] 

255 for key, val in self._data.items(): 

256 fields = key.split(self._separator) 

257 row = {i: v for i, v in enumerate(fields)} # type: Dict[Any, Any] 

258 row['value'] = val 

259 data.append(row) 

260 return data 

261 

262 def to_dataframe(self, group_by=None): 

263 # type: (Optional[int]) -> DataFrame 

264 ''' 

265 Convert data to pandas DataFrame. 

266 

267 Args: 

268 group_by (int, optional): Field index to group rows of data by. 

269 Default: None. 

270 

271 Returns: 

272 DataFrame: DataFrame. 

273 ''' 

274 data = self.to_records() # type: Any 

275 data = DataFrame(data) 

276 

277 if group_by is not None: 

278 group = list(range(0, group_by)) 

279 data = DataFrame(data)\ 

280 .groupby(group, as_index=False)\ 

281 .agg(lambda x: x.tolist())\ 

282 .apply(lambda x: x.to_dict(), axis=1)\ 

283 .tolist() 

284 data = DataFrame(data) 

285 

286 # clean up column order 

287 cols = data.columns.tolist() # type: List[str] 

288 cols = list(sorted(filter(lambda x: x != 'value', cols))) 

289 cols += ['value'] 

290 data = data[cols] 

291 

292 return data 

293 

294 def to_prototype(self): 

295 # type: () -> BlobETL 

296 ''' 

297 Convert data to prototypical representation. 

298 

299 Example: 

300 ======== 

301 >>> data = { 

302 'users': [ 

303 { 

304 'name': { 

305 'first': 'tom', 

306 'last': 'smith', 

307 } 

308 },{ 

309 'name': { 

310 'first': 'dick', 

311 'last': 'smith', 

312 } 

313 },{ 

314 'name': { 

315 'first': 'jane', 

316 'last': 'doe', 

317 } 

318 }, 

319 ] 

320 } 

321 >>> BlobETL(data).to_prototype().to_dict() 

322 { 

323 '^users': { 

324 '<list_[0-9]+>': { 

325 'name': { 

326 'first$': Counter({'dick': 1, 'jane': 1, 'tom': 1}), 

327 'last$': Counter({'doe': 1, 'smith': 2}) 

328 } 

329 } 

330 } 

331 } 

332 

333 Returns: 

334 BlobETL: New BlobETL instance. 

335 ''' 

336 def regex_in_list(regex, items): 

337 # type: (str, List[str]) -> bool 

338 for item in items: 

339 if re.search(regex, item): 

340 return True 

341 return False # pragma: no cover 

342 

343 def field_combinations(a, b): 

344 # type: (List[str], List[str]) -> List[str] 

345 output = [] 

346 for fa in a: 

347 for fb in b: 

348 output.append(fa + self._separator + fb) 

349 return output 

350 

351 keys = list(self._data.keys()) 

352 fields = list(map(lambda x: x.split(self._separator), keys)) 

353 

354 fields = DataFrame(fields)\ 

355 .apply(lambda x: x.unique().tolist())\ 

356 .apply(lambda x: filter(lambda y: y is not None, x)) \ 

357 .apply(lambda x: map( 

358 lambda y: re.sub(r'<([a-z]+)_\d+>', '<\\1_[0-9]+>', y), 

359 x)) \ 

360 .apply(lambda x: list(set(x))) \ 

361 .tolist() 

362 

363 prev = fields[0] 

364 regexes = list() 

365 for i, level in enumerate(fields[1:]): 

366 temp = field_combinations(prev, level) # type: Union[List, Iterator] 

367 temp = filter(lambda x: regex_in_list('^' + x, keys), temp) 

368 prev = list(temp) 

369 regexes.extend(prev) 

370 

371 regexes = lbt.get_ordered_unique(regexes) 

372 

373 p_keys = set() 

374 for regex in regexes: 

375 other = deepcopy(regexes) 

376 other.remove(regex) 

377 not_in_other = True 

378 for item in other: 

379 if regex in item: 

380 not_in_other = False 

381 if not_in_other: 

382 p_keys.add(f'^{regex}$') 

383 

384 output = {} 

385 for key in p_keys: 

386 values = self.query(key).to_flat_dict().values() 

387 output[key] = Counter(values) 

388 return BlobETL(output, separator=self._separator) 

389 

390 def to_networkx_graph(self): 

391 # type: () -> networkx.DiGraph 

392 ''' 

393 Converts internal dictionary into a networkx directed graph. 

394 

395 Returns: 

396 networkx.DiGraph: Graph representation of dictionary. 

397 ''' 

398 graph = networkx.DiGraph() 

399 graph.add_node('root') 

400 embed_re = re.compile(r'<[a-z]+_(\d+)>') 

401 

402 def recurse(item, parent): 

403 # type: (Dict, str) -> None 

404 for key, val in item.items(): 

405 k = f'{parent}{self._separator}{key}' 

406 short_name = embed_re.sub('\\1', key) 

407 graph.add_node(k, short_name=short_name, node_type='key') 

408 graph.add_edge(parent, k) 

409 

410 if isinstance(val, dict): 

411 recurse(val, k) 

412 else: 

413 graph.nodes[k]['value'] = [val] 

414 name = f'"{str(val)}"' 

415 v = f'"{k}{self._separator}{str(val)}"' 

416 graph.add_node( 

417 v, short_name=name, node_type='value', value=[val] 

418 ) 

419 graph.add_edge(k, v) 

420 

421 recurse(rpt.nest(self._data, self._separator), 'root') 

422 graph.remove_node('root') 

423 return graph 

424 

425 def to_dot_graph( 

426 self, orthogonal_edges=False, orient='tb', color_scheme=None 

427 ): 

428 # type: (bool, str, Optional[Dict[str, str]]) -> pydot.Dot 

429 ''' 

430 Converts internal dictionary into pydot graph. 

431 Key and value nodes and edges are colored differently. 

432 

433 Args: 

434 orthogonal_edges (bool, optional): Whether graph edges should have 

435 non-right angles. Default: False. 

436 orient (str, optional): Graph layout orientation. Default: tb. 

437 Options include: 

438 

439 * tb - top to bottom 

440 * bt - bottom to top 

441 * lr - left to right 

442 * rl - right to left 

443 color_scheme: (dict, optional): Color scheme to be applied to graph. 

444 Default: rolling_pin.tools.COLOR_SCHEME 

445 

446 Raises: 

447 ValueError: If orient is invalid. 

448 

449 Returns: 

450 pydot.Dot: Dot graph representation of dictionary. 

451 ''' 

452 orient = orient.lower() 

453 orientations = ['tb', 'bt', 'lr', 'rl'] 

454 if orient not in orientations: 

455 msg = f'Invalid orient value. {orient} not in {orientations}.' 

456 raise ValueError(msg) 

457 

458 # set default colort scheme 

459 if color_scheme is None: 

460 color_scheme = rpt.COLOR_SCHEME 

461 

462 # create pydot graph 

463 graph = self.to_networkx_graph() 

464 dot = networkx.drawing.nx_pydot.to_pydot(graph) 

465 

466 # set layout orientation 

467 dot.set_rankdir(orient.upper()) 

468 

469 # set graph background color 

470 dot.set_bgcolor(color_scheme['background']) 

471 

472 # set edge draw type 

473 if orthogonal_edges: 

474 dot.set_splines('ortho') 

475 

476 # set draw parameters for each node of graph 

477 for node in dot.get_nodes(): 

478 node.set_shape('rect') 

479 node.set_style('filled') 

480 node.set_color(color_scheme['node']) 

481 node.set_fillcolor(color_scheme['node']) 

482 node.set_fontcolor(color_scheme['node_font']) 

483 node.set_fontname('Courier') 

484 

485 # if node has short name, set its displayed name to that 

486 attrs = node.get_attributes() 

487 if 'short_name' in attrs: 

488 node.set_label(attrs['short_name']) 

489 

490 # if node type is value change its colors 

491 if 'node_type' in attrs and attrs['node_type'] == 'value': 

492 node.set_color(color_scheme['node_value']) 

493 node.set_fillcolor(color_scheme['node_value']) 

494 node.set_fontcolor(color_scheme['node_value_font']) 

495 

496 # set draw parameters for each edge in graph 

497 for edge in dot.get_edges(): 

498 edge.set_color(color_scheme['edge']) 

499 

500 # if edge destination node type is value change its color 

501 node = dot.get_node(edge.get_destination())[0] 

502 attrs = node.get_attributes() 

503 if 'node_type' in attrs and attrs['node_type'] == 'value': 

504 edge.set_color(color_scheme['edge_value']) 

505 

506 return dot 

507 

508 def to_html( 

509 self, 

510 layout='dot', 

511 orthogonal_edges=False, 

512 orient='tb', 

513 color_scheme=None, 

514 as_png=False, 

515 ): 

516 # type: (str, bool, str, Optional[Dict[str, str]], bool) -> Union[Image, HTML] 

517 ''' 

518 For use in inline rendering of graph data in Jupyter Lab. 

519 

520 Args: 

521 layout (str, optional): Graph layout style. 

522 Options include: circo, dot, fdp, neato, sfdp, twopi. 

523 Default: dot. 

524 orthogonal_edges (bool, optional): Whether graph edges should have 

525 non-right angles. Default: False. 

526 orient (str, optional): Graph layout orientation. Default: tb. 

527 Options include: 

528 

529 * tb - top to bottom 

530 * bt - bottom to top 

531 * lr - left to right 

532 * rl - right to left 

533 color_scheme: (dict, optional): Color scheme to be applied to graph. 

534 Default: rolling_pin.tools.COLOR_SCHEME 

535 as_png (bool, optional): Display graph as a PNG image instead of 

536 SVG. Useful for display on Github. Default: False. 

537 

538 Returns: 

539 IPython.display.HTML: HTML object for inline display. 

540 ''' 

541 if color_scheme is None: 

542 color_scheme = rpt.COLOR_SCHEME 

543 

544 dot = self.to_dot_graph( 

545 orthogonal_edges=orthogonal_edges, 

546 orient=orient, 

547 color_scheme=color_scheme, 

548 ) 

549 return rpt.dot_to_html(dot, layout=layout, as_png=as_png) 

550 

551 def write( 

552 self, 

553 fullpath, 

554 layout='dot', 

555 orthogonal_edges=False, 

556 orient='tb', 

557 color_scheme=None 

558 ): 

559 # type: (Union[str, Path], str, bool, str, Optional[Dict[str, str]]) -> BlobETL 

560 ''' 

561 Writes internal dictionary to a given filepath. 

562 Formats supported: svg, dot, png, json. 

563 

564 Args: 

565 fulllpath (str or Path): File tobe written to. 

566 layout (str, optional): Graph layout style. 

567 Options include: circo, dot, fdp, neato, sfdp, twopi. 

568 Default: dot. 

569 orthogonal_edges (bool, optional): Whether graph edges should have 

570 non-right angles. Default: False. 

571 orient (str, optional): Graph layout orientation. Default: tb. 

572 Options include: 

573 

574 * tb - top to bottom 

575 * bt - bottom to top 

576 * lr - left to right 

577 * rl - right to left 

578 color_scheme: (dict, optional): Color scheme to be applied to graph. 

579 Default: rolling_pin.tools.COLOR_SCHEME 

580 

581 Raises: 

582 ValueError: If invalid file extension given. 

583 

584 Returns: 

585 BlobETL: self. 

586 ''' 

587 if isinstance(fullpath, Path): 

588 fullpath = fullpath.absolute().as_posix() 

589 

590 _, ext = os.path.splitext(fullpath) 

591 ext = re.sub(r'^\.', '', ext) 

592 if re.search('^json$', ext, re.I): 

593 with open(fullpath, 'w') as f: 

594 json.dump(self.to_dict(), f) 

595 return self 

596 

597 if color_scheme is None: 

598 color_scheme = rpt.COLOR_SCHEME 

599 

600 graph = self.to_dot_graph( 

601 orthogonal_edges=orthogonal_edges, 

602 orient=orient, 

603 color_scheme=color_scheme, 

604 ) 

605 try: 

606 rpt.write_dot_graph(graph, fullpath, layout=layout,) 

607 except ValueError: 

608 msg = f'Invalid extension found: {ext}. ' 

609 msg += 'Valid extensions include: svg, dot, png, json.' 

610 raise ValueError(msg) 

611 return self