Coverage for /home/ubuntu/rolling-pin/python/rolling_pin/tools.py: 100%

193 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-13 19:35 +0000

1from typing import Any, Dict, Generator, Iterable, List, Optional, Union # noqa: F401 

2import pydot # noqa: F401 

3 

4from collections import OrderedDict 

5from pathlib import Path 

6import logging 

7import os 

8import re 

9import shutil 

10 

11from IPython.display import HTML, Image 

12import pandas as pd 

13 

14Filepath = Union[str, Path] 

15LOG_LEVEL = os.environ.get('LOG_LEVEL', 'WARNING').upper() 

16logging.basicConfig(level=LOG_LEVEL) 

17LOGGER = logging.getLogger(__name__) 

18# ------------------------------------------------------------------------------ 

19 

20''' 

21Contains basic functions for more complex ETL functions and classes. 

22''' 

23 

24# COLOR-SCHEME------------------------------------------------------------------ 

25COLOR_SCHEME = dict( 

26 background='#242424', 

27 node='#343434', 

28 node_font='#B6ECF3', 

29 node_value='#343434', 

30 node_value_font='#DE958E', 

31 edge='#B6ECF3', 

32 edge_value='#DE958E', 

33 node_library_font='#DE958E', 

34 node_subpackage_font='#A0D17B', 

35 node_module_font='#B6ECF3', 

36 edge_library='#DE958E', 

37 edge_subpackage='#A0D17B', 

38 edge_module='#B6ECF3', 

39) # type: Dict[str, str] 

40 

41 

42PLOTLY_COLOR_SCHEME = dict( 

43 bg='#242424', 

44 blue1='#5F95DE', 

45 blue2='#93B6E6', 

46 cyan1='#7EC4CF', 

47 cyan2='#B6ECF3', 

48 dark1='#040404', 

49 dark2='#141414', 

50 dialog1='#444459', 

51 dialog2='#5D5D7A', 

52 green1='#8BD155', 

53 green2='#A0D17B', 

54 grey1='#343434', 

55 grey2='#444444', 

56 light1='#A4A4A4', 

57 light2='#F4F4F4', 

58 orange1='#EB9E58', 

59 orange2='#EBB483', 

60 purple1='#C98FDE', 

61 purple2='#AC92DE', 

62 red1='#F77E70', 

63 red2='#DE958E', 

64 yellow1='#E8EA7E', 

65 yellow2='#E9EABE', 

66) 

67 

68 

69COLOR_SCALE = [ 

70 'cyan2', 'red2', 'green2', 'blue2', 'orange2', 'purple2', 'yellow2', 

71 'light2', 'cyan1', 'red1', 'green1', 'blue1' 

72] # type: List[str] 

73COLOR_SCALE = [PLOTLY_COLOR_SCHEME[x] for x in COLOR_SCALE] 

74 

75 

76PLOTLY_LAYOUT_THEME = { 

77 'legend': { 

78 'bgcolor': PLOTLY_COLOR_SCHEME['bg'], 

79 'title': {'text': ''}, 

80 'font': { 

81 'color': PLOTLY_COLOR_SCHEME['light2'] 

82 } 

83 }, 

84 'paper_bgcolor': PLOTLY_COLOR_SCHEME['bg'], 

85 'plot_bgcolor': PLOTLY_COLOR_SCHEME['bg'], 

86 'title': { 

87 'font': { 

88 'color': PLOTLY_COLOR_SCHEME['light2'] 

89 } 

90 }, 

91 'xaxis': { 

92 'gridcolor': PLOTLY_COLOR_SCHEME['grey1'], 

93 'showgrid': True, 

94 'tickfont': { 

95 'color': PLOTLY_COLOR_SCHEME['light1'] 

96 }, 

97 'title': { 

98 'font': { 

99 'color': PLOTLY_COLOR_SCHEME['light1'] 

100 } 

101 }, 

102 'zerolinecolor': PLOTLY_COLOR_SCHEME['grey2'] 

103 }, 

104 'yaxis': { 

105 'gridcolor': PLOTLY_COLOR_SCHEME['grey1'], 

106 'showgrid': True, 

107 'tickfont': { 

108 'color': PLOTLY_COLOR_SCHEME['light1'] 

109 }, 

110 'title': { 

111 'font': { 

112 'color': PLOTLY_COLOR_SCHEME['light1'] 

113 } 

114 }, 

115 'zerolinecolor': PLOTLY_COLOR_SCHEME['grey2'] 

116 } 

117} 

118 

119 

120# PREDICATE-FUNCTIONS----------------------------------------------------------- 

121def is_iterable(item): 

122 # type: (Any) -> bool 

123 ''' 

124 Determines if given item is iterable. 

125 

126 Args: 

127 item (object): Object to be tested. 

128 

129 Returns: 

130 bool: Whether given item is iterable. 

131 ''' 

132 if is_listlike(item) or is_dictlike(item): 

133 return True 

134 return False 

135 

136 

137def is_dictlike(item): 

138 # type: (Any) -> bool 

139 ''' 

140 Determines if given item is dict-like. 

141 

142 Args: 

143 item (object): Object to be tested. 

144 

145 Returns: 

146 bool: Whether given item is dict-like. 

147 ''' 

148 for type_ in [dict, OrderedDict]: 

149 if isinstance(item, type_): 

150 if item.__class__.__name__ == 'Counter': 

151 return False 

152 return True 

153 return False 

154 

155 

156def is_listlike(item): 

157 # type: (Any) -> bool 

158 ''' 

159 Determines if given item is list-like. 

160 

161 Args: 

162 item (object): Object to be tested. 

163 

164 Returns: 

165 bool: Whether given item is list-like. 

166 ''' 

167 for type_ in [list, tuple, set]: 

168 if isinstance(item, type_): 

169 return True 

170 return False 

171 

172 

173# CORE-FUNCTIONS---------------------------------------------------------------- 

174def flatten(item, separator='/', embed_types=True): 

175 # type: (Iterable, str, bool) -> Dict[str, Any] 

176 ''' 

177 Flattens a iterable object into a flat dictionary. 

178 

179 Args: 

180 item (object): Iterable object. 

181 separator (str, optional): Field separator in keys. Default: '/'. 

182 

183 Returns: 

184 dict: Dictionary representation of given object. 

185 ''' 

186 output = {} # type: Dict[str, Any] 

187 

188 def recurse(item, cursor): 

189 # type (Iterable, Any) -> None 

190 if is_listlike(item): 

191 if embed_types: 

192 name = item.__class__.__name__ 

193 item = [(f'<{name}_{i}>', val) for i, val in enumerate(item)] 

194 item = dict(item) 

195 else: 

196 item = dict(enumerate(item)) 

197 if is_dictlike(item): 

198 for key, val in item.items(): 

199 new_key = f'{cursor}{separator}{str(key)}' 

200 if is_iterable(val) and len(val) > 0: 

201 recurse(val, new_key) 

202 else: 

203 final_key = re.sub('^' + separator, '', new_key) 

204 output[final_key] = val 

205 

206 recurse(item, '') 

207 return output 

208 

209 

210def nest(flat_dict, separator='/'): 

211 # type: (Dict[str, Any], str) -> Dict[str, Any] 

212 ''' 

213 Converts a flat dictionary into a nested dictionary by splitting keys by a 

214 given separator. 

215 

216 Args: 

217 flat_dict (dict): Flat dictionary. 

218 separator (str, optional): Field separator within given dictionary's 

219 keys. Default: '/'. 

220 

221 Returns: 

222 dict: Nested dictionary. 

223 ''' 

224 output = {} # type: Dict[str, Any] 

225 for keys, val in flat_dict.items(): 

226 split_keys = list(filter( 

227 lambda x: x != '', keys.split(separator) 

228 )) 

229 cursor = output 

230 last = split_keys.pop() 

231 for key in split_keys: 

232 if key not in cursor: 

233 cursor[key] = {} 

234 

235 if not isinstance(cursor[key], dict): 

236 msg = f"Duplicate key conflict. Key: '{key}'." 

237 raise KeyError(msg) 

238 

239 cursor = cursor[key] 

240 cursor[last] = val 

241 return output 

242 

243 

244def unembed(item): 

245 # type: (Any) -> Any 

246 ''' 

247 Convert embeded types in dictionary keys into python types. 

248 

249 Args: 

250 item (object): Dictionary with embedded types. 

251 

252 Returns: 

253 object: Converted object. 

254 ''' 

255 lut = {'list': list, 'tuple': tuple, 'set': set} 

256 embed_re = re.compile(r'^<([a-z]+)_(\d+)>$') 

257 

258 if is_dictlike(item) and item != {}: 

259 output = {} # type: Any 

260 keys = list(item.keys()) 

261 match = embed_re.match(keys[0]) 

262 if match: 

263 indices = [embed_re.match(key).group(2) for key in keys] # type: ignore 

264 indices = map(int, indices) # type: ignore 

265 

266 output = [] 

267 for i, key in sorted(zip(indices, keys)): 

268 next_item = item[key] 

269 if is_dictlike(next_item): 

270 next_item = unembed(next_item) 

271 output.append(next_item) 

272 

273 output = lut[match.group(1)](output) 

274 return output 

275 else: 

276 for key, val in item.items(): 

277 output[key] = unembed(val) 

278 return output 

279 return item 

280 

281 

282# FILE-FUNCTIONS---------------------------------------------------------------- 

283def list_all_files( 

284 directory, # type: Filepath 

285 include_regex=None, # type: Optional[str] 

286 exclude_regex=None # type: Optional[str] 

287): 

288 # type: (...) -> Generator[Path, None, None] 

289 ''' 

290 Recusively list all files within a given directory. 

291 

292 Args: 

293 directory (str or Path): Directory to walk. 

294 include_regex (str, optional): Include filenames that match this regex. 

295 Default: None. 

296 exclude_regex (str, optional): Exclude filenames that match this regex. 

297 Default: None. 

298 

299 Raises: 

300 FileNotFoundError: If argument is not a directory or does not exist. 

301 

302 Yields: 

303 Path: File. 

304 ''' 

305 directory = Path(directory) 

306 if not directory.is_dir(): 

307 msg = f'{directory} is not a directory or does not exist.' 

308 raise FileNotFoundError(msg) 

309 

310 include_re = re.compile(include_regex or '') # type: Any 

311 exclude_re = re.compile(exclude_regex or '') # type: Any 

312 

313 for root, _, files in os.walk(directory): 

314 for file_ in files: 

315 filepath = Path(root, file_) 

316 

317 output = True 

318 temp = filepath.absolute().as_posix() 

319 if include_regex is not None and not include_re.search(temp): 

320 output = False 

321 if exclude_regex is not None and exclude_re.search(temp): 

322 output = False 

323 

324 if output: 

325 yield Path(root, file_) 

326 

327 

328def directory_to_dataframe(directory, include_regex='', exclude_regex=r'\.DS_Store'): 

329 # type: (Filepath, str, str) -> pd.DataFrame 

330 r''' 

331 Recursively list files with in a given directory as rows in a pd.DataFrame. 

332 

333 Args: 

334 directory (str or Path): Directory to walk. 

335 include_regex (str, optional): Include filenames that match this regex. 

336 Default: None. 

337 exclude_regex (str, optional): Exclude filenames that match this regex. 

338 Default: '\.DS_Store'. 

339 

340 Returns: 

341 pd.DataFrame: pd.DataFrame with one file per row. 

342 ''' 

343 files = list_all_files( 

344 directory, 

345 include_regex=include_regex, 

346 exclude_regex=exclude_regex 

347 ) # type: Any 

348 files = sorted(list(files)) 

349 

350 data = pd.DataFrame() 

351 data['filepath'] = files 

352 data['filename'] = data.filepath.apply(lambda x: x.name) 

353 data['extension'] = data.filepath \ 

354 .apply(lambda x: Path(x).suffix.lstrip('.')) 

355 data.filepath = data.filepath.apply(lambda x: x.absolute().as_posix()) 

356 return data 

357 

358 

359def get_parent_fields(key, separator='/'): 

360 # type: (str, str) -> List[str] 

361 ''' 

362 Get all the parent fields of a given key, split by given separator. 

363 

364 Args: 

365 key (str): Key. 

366 separator (str, optional): String that splits key into fields. 

367 Default: '/'. 

368 

369 Returns: 

370 list(str): List of absolute parent fields. 

371 ''' 

372 fields = key.split(separator) 

373 output = [] # type: List[str] 

374 for i in range(len(fields) - 1): 

375 output.append(separator.join(fields[:i + 1])) 

376 return output 

377 

378 

379def filter_text( 

380 text, # type: str 

381 include_regex=None, # type: Optional[str] 

382 exclude_regex=None, # type: Optional[str] 

383 replace_regex=None, # type: Optional[str] 

384 replace_value=None, # type: Optional[str] 

385): 

386 # type: (...) -> str 

387 ''' 

388 Filter given text by applying regular expressions to each line. 

389 

390 Args: 

391 text (str): Newline separated lines. 

392 include_regex (str, optional): Keep lines that match given regex. 

393 Default: None. 

394 exclude_regex (str, optional): Remove lines that match given regex. 

395 Default: None. 

396 replace_regex (str, optional): Substitutes regex matches in lines with 

397 replace_value. Default: None. 

398 replace_value (str, optional): Regex substitution value. Default: ''. 

399 

400 Raises: 

401 AssertionError: If source is not a file. 

402 

403 Returns: 

404 str: Filtered text. 

405 ''' 

406 lines = text.split('\n') 

407 if include_regex is not None: 

408 lines = list(filter(lambda x: re.search(include_regex, x), lines)) 

409 if exclude_regex is not None: 

410 lines = list(filter(lambda x: not re.search(exclude_regex, x), lines)) 

411 if replace_regex is not None: 

412 rep_val = replace_value or '' 

413 lines = [re.sub(replace_regex, rep_val, x) for x in lines] 

414 output = '\n'.join(lines) 

415 return output 

416 

417 

418def read_text(filepath): 

419 # type: (Filepath) -> str 

420 ''' 

421 Convenience function for reading text from given file. 

422 

423 Args: 

424 filepath (str or Path): File to be read. 

425 

426 Raises: 

427 AssertionError: If source is not a file. 

428 

429 Returns: 

430 str: text. 

431 ''' 

432 assert Path(filepath).is_file() 

433 with open(filepath) as f: 

434 return f.read() 

435 

436 

437def write_text(text, filepath): 

438 # type: (str, Filepath) -> None 

439 ''' 

440 Convenience function for writing text to given file. 

441 Creates directories as needed. 

442 

443 Args: 

444 text (str): Text to be written. 

445 filepath (str or Path): File to be written. 

446 ''' 

447 os.makedirs(Path(filepath).parent, exist_ok=True) 

448 with open(filepath, 'w') as f: 

449 f.write(text) 

450 

451 

452def copy_file(source, target): 

453 # type: (Filepath, Filepath) -> None 

454 ''' 

455 Copy a source file to a target file. Creating directories as needed. 

456 

457 Args: 

458 source (str or Path): Source filepath. 

459 target (str or Path): Target filepath. 

460 

461 Raises: 

462 AssertionError: If source is not a file. 

463 ''' 

464 assert Path(source).is_file() 

465 os.makedirs(Path(target).parent, exist_ok=True) 

466 shutil.copy2(source, target) 

467 

468 

469def move_file(source, target): 

470 # type: (Filepath, Filepath) -> None 

471 ''' 

472 Moves a source file to a target file. Creating directories as needed. 

473 

474 Args: 

475 source (str or Path): Source filepath. 

476 target (str or Path): Target filepath. 

477 

478 Raises: 

479 AssertionError: If source is not a file. 

480 ''' 

481 src = Path(source).as_posix() 

482 assert Path(src).is_file() 

483 os.makedirs(Path(target).parent, exist_ok=True) 

484 shutil.move(src, target) 

485 

486 

487# EXPORT-FUNCTIONS-------------------------------------------------------------- 

488def dot_to_html(dot, layout='dot', as_png=False): 

489 # type: (pydot.Dot, str, bool) -> Union[HTML, Image] 

490 ''' 

491 Converts a given pydot graph into a IPython.display.HTML object. 

492 Used in jupyter lab inline display of graph data. 

493 

494 Args: 

495 dot (pydot.Dot): Pydot Graph instance. 

496 layout (str, optional): Graph layout style. 

497 Options include: circo, dot, fdp, neato, sfdp, twopi. 

498 Default: dot. 

499 as_png (bool, optional): Display graph as a PNG image instead of SVG. 

500 Useful for display on Github. Default: False. 

501 

502 Raises: 

503 ValueError: If invalid layout given. 

504 

505 Returns: 

506 IPython.display.HTML: HTML instance. 

507 ''' 

508 layouts = ['circo', 'dot', 'fdp', 'neato', 'sfdp', 'twopi'] 

509 if layout not in layouts: 

510 msg = f'Invalid layout value. {layout} not in {layouts}.' 

511 raise ValueError(msg) 

512 

513 if as_png: 

514 return Image(data=dot.create_png()) 

515 

516 svg = dot.create_svg(prog=layout) 

517 html = f'<object type="image/svg+xml" data="data:image/svg+xml;{svg}"></object>' # type: Any 

518 html = HTML(html) 

519 html.data = re.sub(r'\\n|\\', '', html.data) 

520 html.data = re.sub('</svg>.*', '</svg>', html.data) 

521 return html 

522 

523 

524def write_dot_graph( 

525 dot, 

526 fullpath, 

527 layout='dot', 

528): 

529 # type: (pydot.Dot, Union[str, Path], str) -> None 

530 ''' 

531 Writes a pydot.Dot object to a given filepath. 

532 Formats supported: svg, dot, png. 

533 

534 Args: 

535 dot (pydot.Dot): Pydot Dot instance. 

536 fulllpath (str or Path): File to be written to. 

537 layout (str, optional): Graph layout style. 

538 Options include: circo, dot, fdp, neato, sfdp, twopi. Default: dot. 

539 

540 Raises: 

541 ValueError: If invalid file extension given. 

542 ''' 

543 if isinstance(fullpath, Path): 

544 fullpath = Path(fullpath).absolute().as_posix() 

545 

546 _, ext = os.path.splitext(fullpath) 

547 ext = re.sub(r'^\.', '', ext) 

548 if re.search('^svg$', ext, re.I): 

549 dot.write_svg(fullpath, prog=layout) 

550 elif re.search('^dot$', ext, re.I): 

551 dot.write_dot(fullpath, prog=layout) 

552 elif re.search('^png$', ext, re.I): 

553 dot.write_png(fullpath, prog=layout) 

554 else: 

555 msg = f'Invalid extension found: {ext}. ' 

556 msg += 'Valid extensions include: svg, dot, png.' 

557 raise ValueError(msg) 

558 

559 

560# MISC-FUNCTIONS---------------------------------------------------------------- 

561def replace_and_format(regex, replace, string, flags=0): 

562 # type: (str, str, str, Any) -> str 

563 r''' 

564 Perform a regex substitution on a given string and format any named group 

565 found in the result with groupdict data from the pattern. Group beggining 

566 with 'i' will be converted to integers. Groups beggining with 'f' will be 

567 converted to floats. 

568 

569 ---------------------------------------------------------------------------- 

570 

571 Named group anatomy: 

572 ==================== 

573 * (?P<NAME>PATTERN) 

574 * NAME becomes a key and whatever matches PATTERN becomes its value. 

575 >>> re.search('(?P<i>\d+)', 'foobar123').groupdict() 

576 {'i': '123'} 

577 

578 ---------------------------------------------------------------------------- 

579 

580 Examples: 

581 ========= 

582 Special groups: 

583 * (?P<i>\d) - string matched by '\d' will be converted to an integer 

584 * (?P<f>\d) - string matched by '\d' will be converted to an float 

585 * (?P<i_foo>\d) - string matched by '\d' will be converted to an integer 

586 * (?P<f_bar>\d) - string matched by '\d' will be converted to an float 

587 

588 Named groups (long): 

589 >>> proj = '(?P<p>[a-z0-9]+)' 

590 >>> spec = '(?P<s>[a-z0-9]+)' 

591 >>> desc = '(?P<d>[a-z0-9\-]+)' 

592 >>> ver = '(?P<iv>\d+)\.' 

593 >>> frame = '(?P<i_f>\d+)' 

594 >>> regex = f'{proj}\.{spec}\.{desc}\.v{ver}\.{frame}.*' 

595 >>> replace = 'p-{p}_s-{s}_d-{d}_v{iv:03d}_f{i_f:04d}.jpeg' 

596 >>> string = 'proj.spec.desc.v1.25.png' 

597 >>> replace_and_format(regex, replace, string, flags=re.IGNORECASE) 

598 p-proj_s-spec_d-desc_v001_f0025.jpeg 

599 

600 Named groups (short): 

601 >>> replace_and_format( 

602 '(?P<p>[a-z0-9]+)\.(?P<s>[a-z0-9]+)\.(?P<d>[a-z0-9\-]+)\.v(?P<iv>\d+)\.(?P<i_f>\d+).*', 

603 'p-{p}_s-{s}_d-{d}_v{iv:03d}_f{i_f:04d}.jpeg', 

604 'proj.spec.desc.v1.25.png', 

605 ) 

606 p-proj_s-spec_d-desc_v001_f0025.jpeg 

607 

608 No groups: 

609 >>> replace_and_format('foo', 'bar', 'foobar') 

610 barbar 

611 

612 ---------------------------------------------------------------------------- 

613 

614 Args: 

615 regex (str): Regex pattern to search string with. 

616 replace (str): Replacement string which may contain formart variables 

617 ie '{variable}'. 

618 string (str): String to be converted. 

619 flags (object, optional): re.sub flags. Default: 0. 

620 

621 Returns: 

622 str: Converted string. 

623 ''' 

624 match = re.search(regex, string, flags=flags) 

625 grp = {} 

626 if match: 

627 grp = match.groupdict() 

628 

629 for key, val in grp.items(): 

630 if key.startswith('f'): 

631 grp[key] = float(val) 

632 elif key.startswith('i'): 

633 grp[key] = int(val) 

634 

635 output = re.sub(regex, replace, string, flags=flags) 

636 # .format won't evaluate math expressions so do this 

637 if grp != {}: 

638 output = eval(f"f'{output}'", None, grp) 

639 return output