Coverage for /home/ubuntu/rolling-pin/python/rolling_pin/tools.py: 100%

190 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2023-11-15 00:43 +0000

1from typing import Any, Dict, Generator, Iterable, List, Optional, Union # noqa: F401 

2import pydot # noqa: F401 

3 

4from collections import OrderedDict 

5from pathlib import Path 

6import logging 

7import os 

8import re 

9import shutil 

10 

11from IPython.display import HTML, Image 

12import pandas as pd 

13 

14Filepath = Union[str, Path] 

15LOG_LEVEL = os.environ.get('LOG_LEVEL', 'WARNING').upper() 

16logging.basicConfig(level=LOG_LEVEL) 

17LOGGER = logging.getLogger(__name__) 

18# ------------------------------------------------------------------------------ 

19 

20''' 

21Contains basic functions for more complex ETL functions and classes. 

22''' 

23 

24# COLOR-SCHEME------------------------------------------------------------------ 

25COLOR_SCHEME = dict( 

26 background='#242424', 

27 node='#343434', 

28 node_font='#B6ECF3', 

29 node_value='#343434', 

30 node_value_font='#DE958E', 

31 edge='#B6ECF3', 

32 edge_value='#DE958E', 

33 node_library_font='#DE958E', 

34 node_subpackage_font='#A0D17B', 

35 node_module_font='#B6ECF3', 

36 edge_library='#DE958E', 

37 edge_subpackage='#A0D17B', 

38 edge_module='#B6ECF3', 

39) # type: Dict[str, str] 

40 

41COLOR_SCALE = [ 

42 '#B6ECF3', 

43 '#DE958E', 

44 '#EBB483', 

45 '#A0D17B', 

46 '#93B6E6', 

47 '#AC92DE', 

48 '#E9EABE', 

49 '#7EC4CF', 

50 '#F77E70', 

51 '#EB9E58', 

52] # type: List[str] 

53 

54 

55# PREDICATE-FUNCTIONS----------------------------------------------------------- 

56def is_iterable(item): 

57 # type: (Any) -> bool 

58 ''' 

59 Determines if given item is iterable. 

60 

61 Args: 

62 item (object): Object to be tested. 

63 

64 Returns: 

65 bool: Whether given item is iterable. 

66 ''' 

67 if is_listlike(item) or is_dictlike(item): 

68 return True 

69 return False 

70 

71 

72def is_dictlike(item): 

73 # type: (Any) -> bool 

74 ''' 

75 Determines if given item is dict-like. 

76 

77 Args: 

78 item (object): Object to be tested. 

79 

80 Returns: 

81 bool: Whether given item is dict-like. 

82 ''' 

83 for type_ in [dict, OrderedDict]: 

84 if isinstance(item, type_): 

85 if item.__class__.__name__ == 'Counter': 

86 return False 

87 return True 

88 return False 

89 

90 

91def is_listlike(item): 

92 # type: (Any) -> bool 

93 ''' 

94 Determines if given item is list-like. 

95 

96 Args: 

97 item (object): Object to be tested. 

98 

99 Returns: 

100 bool: Whether given item is list-like. 

101 ''' 

102 for type_ in [list, tuple, set]: 

103 if isinstance(item, type_): 

104 return True 

105 return False 

106 

107 

108# CORE-FUNCTIONS---------------------------------------------------------------- 

109def flatten(item, separator='/', embed_types=True): 

110 # type: (Iterable, str, bool) -> Dict[str, Any] 

111 ''' 

112 Flattens a iterable object into a flat dictionary. 

113 

114 Args: 

115 item (object): Iterable object. 

116 separator (str, optional): Field separator in keys. Default: '/'. 

117 

118 Returns: 

119 dict: Dictionary representation of given object. 

120 ''' 

121 output = {} # type: Dict[str, Any] 

122 

123 def recurse(item, cursor): 

124 # type (Iterable, Any) -> None 

125 if is_listlike(item): 

126 if embed_types: 

127 name = item.__class__.__name__ 

128 item = [(f'<{name}_{i}>', val) for i, val in enumerate(item)] 

129 item = dict(item) 

130 else: 

131 item = dict(enumerate(item)) 

132 if is_dictlike(item): 

133 for key, val in item.items(): 

134 new_key = f'{cursor}{separator}{str(key)}' 

135 if is_iterable(val) and len(val) > 0: 

136 recurse(val, new_key) 

137 else: 

138 final_key = re.sub('^' + separator, '', new_key) 

139 output[final_key] = val 

140 

141 recurse(item, '') 

142 return output 

143 

144 

145def nest(flat_dict, separator='/'): 

146 # type: (Dict[str, Any], str) -> Dict[str, Any] 

147 ''' 

148 Converts a flat dictionary into a nested dictionary by splitting keys by a 

149 given separator. 

150 

151 Args: 

152 flat_dict (dict): Flat dictionary. 

153 separator (str, optional): Field separator within given dictionary's 

154 keys. Default: '/'. 

155 

156 Returns: 

157 dict: Nested dictionary. 

158 ''' 

159 output = {} # type: Dict[str, Any] 

160 for keys, val in flat_dict.items(): 

161 split_keys = list(filter( 

162 lambda x: x != '', keys.split(separator) 

163 )) 

164 cursor = output 

165 last = split_keys.pop() 

166 for key in split_keys: 

167 if key not in cursor: 

168 cursor[key] = {} 

169 

170 if not isinstance(cursor[key], dict): 

171 msg = f"Duplicate key conflict. Key: '{key}'." 

172 raise KeyError(msg) 

173 

174 cursor = cursor[key] 

175 cursor[last] = val 

176 return output 

177 

178 

179def unembed(item): 

180 # type: (Any) -> Any 

181 ''' 

182 Convert embeded types in dictionary keys into python types. 

183 

184 Args: 

185 item (object): Dictionary with embedded types. 

186 

187 Returns: 

188 object: Converted object. 

189 ''' 

190 lut = {'list': list, 'tuple': tuple, 'set': set} 

191 embed_re = re.compile(r'^<([a-z]+)_(\d+)>$') 

192 

193 if is_dictlike(item) and item != {}: 

194 output = {} # type: Any 

195 keys = list(item.keys()) 

196 match = embed_re.match(keys[0]) 

197 if match: 

198 indices = [embed_re.match(key).group(2) for key in keys] # type: ignore 

199 indices = map(int, indices) # type: ignore 

200 

201 output = [] 

202 for i, key in sorted(zip(indices, keys)): 

203 next_item = item[key] 

204 if is_dictlike(next_item): 

205 next_item = unembed(next_item) 

206 output.append(next_item) 

207 

208 output = lut[match.group(1)](output) 

209 return output 

210 else: 

211 for key, val in item.items(): 

212 output[key] = unembed(val) 

213 return output 

214 return item 

215 

216 

217# FILE-FUNCTIONS---------------------------------------------------------------- 

218def list_all_files( 

219 directory, # type: Filepath 

220 include_regex=None, # type: Optional[str] 

221 exclude_regex=None # type: Optional[str] 

222): 

223 # type: (...) -> Generator[Path, None, None] 

224 ''' 

225 Recusively list all files within a given directory. 

226 

227 Args: 

228 directory (str or Path): Directory to walk. 

229 include_regex (str, optional): Include filenames that match this regex. 

230 Default: None. 

231 exclude_regex (str, optional): Exclude filenames that match this regex. 

232 Default: None. 

233 

234 Raises: 

235 FileNotFoundError: If argument is not a directory or does not exist. 

236 

237 Yields: 

238 Path: File. 

239 ''' 

240 directory = Path(directory) 

241 if not directory.is_dir(): 

242 msg = f'{directory} is not a directory or does not exist.' 

243 raise FileNotFoundError(msg) 

244 

245 include_re = re.compile(include_regex or '') # type: Any 

246 exclude_re = re.compile(exclude_regex or '') # type: Any 

247 

248 for root, _, files in os.walk(directory): 

249 for file_ in files: 

250 filepath = Path(root, file_) 

251 

252 output = True 

253 temp = filepath.absolute().as_posix() 

254 if include_regex is not None and not include_re.search(temp): 

255 output = False 

256 if exclude_regex is not None and exclude_re.search(temp): 

257 output = False 

258 

259 if output: 

260 yield Path(root, file_) 

261 

262 

263def directory_to_dataframe(directory, include_regex='', exclude_regex=r'\.DS_Store'): 

264 # type: (Filepath, str, str) -> pd.DataFrame 

265 r''' 

266 Recursively list files with in a given directory as rows in a pd.DataFrame. 

267 

268 Args: 

269 directory (str or Path): Directory to walk. 

270 include_regex (str, optional): Include filenames that match this regex. 

271 Default: None. 

272 exclude_regex (str, optional): Exclude filenames that match this regex. 

273 Default: '\.DS_Store'. 

274 

275 Returns: 

276 pd.DataFrame: pd.DataFrame with one file per row. 

277 ''' 

278 files = list_all_files( 

279 directory, 

280 include_regex=include_regex, 

281 exclude_regex=exclude_regex 

282 ) # type: Any 

283 files = sorted(list(files)) 

284 

285 data = pd.DataFrame() 

286 data['filepath'] = files 

287 data['filename'] = data.filepath.apply(lambda x: x.name) 

288 data['extension'] = data.filepath \ 

289 .apply(lambda x: Path(x).suffix.lstrip('.')) 

290 data.filepath = data.filepath.apply(lambda x: x.absolute().as_posix()) 

291 return data 

292 

293 

294def get_parent_fields(key, separator='/'): 

295 # type: (str, str) -> List[str] 

296 ''' 

297 Get all the parent fields of a given key, split by given separator. 

298 

299 Args: 

300 key (str): Key. 

301 separator (str, optional): String that splits key into fields. 

302 Default: '/'. 

303 

304 Returns: 

305 list(str): List of absolute parent fields. 

306 ''' 

307 fields = key.split(separator) 

308 output = [] # type: List[str] 

309 for i in range(len(fields) - 1): 

310 output.append(separator.join(fields[:i + 1])) 

311 return output 

312 

313 

314def filter_text( 

315 text, # type: str 

316 include_regex=None, # type: Optional[str] 

317 exclude_regex=None, # type: Optional[str] 

318 replace_regex=None, # type: Optional[str] 

319 replace_value=None, # type: Optional[str] 

320): 

321 # type: (...) -> str 

322 ''' 

323 Filter given text by applying regular expressions to each line. 

324 

325 Args: 

326 text (str): Newline separated lines. 

327 include_regex (str, optional): Keep lines that match given regex. 

328 Default: None. 

329 exclude_regex (str, optional): Remove lines that match given regex. 

330 Default: None. 

331 replace_regex (str, optional): Substitutes regex matches in lines with 

332 replace_value. Default: None. 

333 replace_value (str, optional): Regex substitution value. Default: ''. 

334 

335 Raises: 

336 AssertionError: If source is not a file. 

337 

338 Returns: 

339 str: Filtered text. 

340 ''' 

341 lines = text.split('\n') 

342 if include_regex is not None: 

343 lines = list(filter(lambda x: re.search(include_regex, x), lines)) # type: ignore 

344 if exclude_regex is not None: 

345 lines = list(filter(lambda x: not re.search(exclude_regex, x), lines)) # type: ignore 

346 if replace_regex is not None: 

347 rep_val = replace_value or '' 

348 lines = [re.sub(replace_regex, rep_val, x) for x in lines] 

349 output = '\n'.join(lines) 

350 return output 

351 

352 

353def read_text(filepath): 

354 # type: (Filepath) -> str 

355 ''' 

356 Convenience function for reading text from given file. 

357 

358 Args: 

359 filepath (str or Path): File to be read. 

360 

361 Raises: 

362 AssertionError: If source is not a file. 

363 

364 Returns: 

365 str: text. 

366 ''' 

367 assert Path(filepath).is_file() 

368 with open(filepath) as f: 

369 return f.read() 

370 

371 

372def write_text(text, filepath): 

373 # type: (str, Filepath) -> None 

374 ''' 

375 Convenience function for writing text to given file. 

376 Creates directories as needed. 

377 

378 Args: 

379 text (str): Text to be written. 

380 filepath (str or Path): File to be written. 

381 ''' 

382 os.makedirs(Path(filepath).parent, exist_ok=True) 

383 with open(filepath, 'w') as f: 

384 f.write(text) 

385 

386 

387def copy_file(source, target): 

388 # type: (Filepath, Filepath) -> None 

389 ''' 

390 Copy a source file to a target file. Creating directories as needed. 

391 

392 Args: 

393 source (str or Path): Source filepath. 

394 target (str or Path): Target filepath. 

395 

396 Raises: 

397 AssertionError: If source is not a file. 

398 ''' 

399 assert Path(source).is_file() 

400 os.makedirs(Path(target).parent, exist_ok=True) 

401 shutil.copy2(source, target) 

402 

403 

404def move_file(source, target): 

405 # type: (Filepath, Filepath) -> None 

406 ''' 

407 Moves a source file to a target file. Creating directories as needed. 

408 

409 Args: 

410 source (str or Path): Source filepath. 

411 target (str or Path): Target filepath. 

412 

413 Raises: 

414 AssertionError: If source is not a file. 

415 ''' 

416 src = Path(source).as_posix() 

417 assert Path(src).is_file() 

418 os.makedirs(Path(target).parent, exist_ok=True) 

419 shutil.move(src, target) 

420 

421 

422# EXPORT-FUNCTIONS-------------------------------------------------------------- 

423def dot_to_html(dot, layout='dot', as_png=False): 

424 # type: (pydot.Dot, str, bool) -> Union[HTML, Image] 

425 ''' 

426 Converts a given pydot graph into a IPython.display.HTML object. 

427 Used in jupyter lab inline display of graph data. 

428 

429 Args: 

430 dot (pydot.Dot): Pydot Graph instance. 

431 layout (str, optional): Graph layout style. 

432 Options include: circo, dot, fdp, neato, sfdp, twopi. 

433 Default: dot. 

434 as_png (bool, optional): Display graph as a PNG image instead of SVG. 

435 Useful for display on Github. Default: False. 

436 

437 Raises: 

438 ValueError: If invalid layout given. 

439 

440 Returns: 

441 IPython.display.HTML: HTML instance. 

442 ''' 

443 layouts = ['circo', 'dot', 'fdp', 'neato', 'sfdp', 'twopi'] 

444 if layout not in layouts: 

445 msg = f'Invalid layout value. {layout} not in {layouts}.' 

446 raise ValueError(msg) 

447 

448 if as_png: 

449 return Image(data=dot.create_png()) 

450 

451 svg = dot.create_svg(prog=layout) 

452 html = f'<object type="image/svg+xml" data="data:image/svg+xml;{svg}"></object>' # type: Any 

453 html = HTML(html) 

454 html.data = re.sub(r'\\n|\\', '', html.data) 

455 html.data = re.sub('</svg>.*', '</svg>', html.data) 

456 return html 

457 

458 

459def write_dot_graph( 

460 dot, 

461 fullpath, 

462 layout='dot', 

463): 

464 # type: (pydot.Dot, Union[str, Path], str) -> None 

465 ''' 

466 Writes a pydot.Dot object to a given filepath. 

467 Formats supported: svg, dot, png. 

468 

469 Args: 

470 dot (pydot.Dot): Pydot Dot instance. 

471 fulllpath (str or Path): File to be written to. 

472 layout (str, optional): Graph layout style. 

473 Options include: circo, dot, fdp, neato, sfdp, twopi. Default: dot. 

474 

475 Raises: 

476 ValueError: If invalid file extension given. 

477 ''' 

478 if isinstance(fullpath, Path): 

479 fullpath = Path(fullpath).absolute().as_posix() 

480 

481 _, ext = os.path.splitext(fullpath) 

482 ext = re.sub(r'^\.', '', ext) 

483 if re.search('^svg$', ext, re.I): 

484 dot.write_svg(fullpath, prog=layout) 

485 elif re.search('^dot$', ext, re.I): 

486 dot.write_dot(fullpath, prog=layout) 

487 elif re.search('^png$', ext, re.I): 

488 dot.write_png(fullpath, prog=layout) 

489 else: 

490 msg = f'Invalid extension found: {ext}. ' 

491 msg += 'Valid extensions include: svg, dot, png.' 

492 raise ValueError(msg) 

493 

494 

495# MISC-FUNCTIONS---------------------------------------------------------------- 

496def replace_and_format(regex, replace, string, flags=0): 

497 # type: (str, str, str, Any) -> str 

498 r''' 

499 Perform a regex substitution on a given string and format any named group 

500 found in the result with groupdict data from the pattern. Group beggining 

501 with 'i' will be converted to integers. Groups beggining with 'f' will be 

502 converted to floats. 

503 

504 ---------------------------------------------------------------------------- 

505 

506 Named group anatomy: 

507 ==================== 

508 * (?P<NAME>PATTERN) 

509 * NAME becomes a key and whatever matches PATTERN becomes its value. 

510 >>> re.search('(?P<i>\d+)', 'foobar123').groupdict() 

511 {'i': '123'} 

512 

513 ---------------------------------------------------------------------------- 

514 

515 Examples: 

516 ========= 

517 Special groups: 

518 * (?P<i>\d) - string matched by '\d' will be converted to an integer 

519 * (?P<f>\d) - string matched by '\d' will be converted to an float 

520 * (?P<i_foo>\d) - string matched by '\d' will be converted to an integer 

521 * (?P<f_bar>\d) - string matched by '\d' will be converted to an float 

522 

523 Named groups (long): 

524 >>> proj = '(?P<p>[a-z0-9]+)' 

525 >>> spec = '(?P<s>[a-z0-9]+)' 

526 >>> desc = '(?P<d>[a-z0-9\-]+)' 

527 >>> ver = '(?P<iv>\d+)\.' 

528 >>> frame = '(?P<i_f>\d+)' 

529 >>> regex = f'{proj}\.{spec}\.{desc}\.v{ver}\.{frame}.*' 

530 >>> replace = 'p-{p}_s-{s}_d-{d}_v{iv:03d}_f{i_f:04d}.jpeg' 

531 >>> string = 'proj.spec.desc.v1.25.png' 

532 >>> replace_and_format(regex, replace, string, flags=re.IGNORECASE) 

533 p-proj_s-spec_d-desc_v001_f0025.jpeg 

534 

535 Named groups (short): 

536 >>> replace_and_format( 

537 '(?P<p>[a-z0-9]+)\.(?P<s>[a-z0-9]+)\.(?P<d>[a-z0-9\-]+)\.v(?P<iv>\d+)\.(?P<i_f>\d+).*', 

538 'p-{p}_s-{s}_d-{d}_v{iv:03d}_f{i_f:04d}.jpeg', 

539 'proj.spec.desc.v1.25.png', 

540 ) 

541 p-proj_s-spec_d-desc_v001_f0025.jpeg 

542 

543 No groups: 

544 >>> replace_and_format('foo', 'bar', 'foobar') 

545 barbar 

546 

547 ---------------------------------------------------------------------------- 

548 

549 Args: 

550 regex (str): Regex pattern to search string with. 

551 replace (str): Replacement string which may contain formart variables 

552 ie '{variable}'. 

553 string (str): String to be converted. 

554 flags (object, optional): re.sub flags. Default: 0. 

555 

556 Returns: 

557 str: Converted string. 

558 ''' 

559 match = re.search(regex, string, flags=flags) 

560 grp = {} 

561 if match: 

562 grp = match.groupdict() 

563 

564 for key, val in grp.items(): 

565 if key.startswith('f'): 

566 grp[key] = float(val) 

567 elif key.startswith('i'): 

568 grp[key] = int(val) 

569 

570 output = re.sub(regex, replace, string, flags=flags) 

571 # .format won't evaluate math expressions so do this 

572 if grp != {}: 

573 output = eval(f"f'{output}'", None, grp) 

574 return output