Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from typing import Any, Dict, Iterable, List, Union 

2import pydot 

3 

4import logging 

5import os 

6import re 

7from collections import OrderedDict 

8from pathlib import Path 

9 

10from IPython.display import HTML, Image 

11 

12LOG_LEVEL = os.environ.get('LOG_LEVEL', 'WARNING').upper() 

13logging.basicConfig(level=LOG_LEVEL) 

14LOGGER = logging.getLogger(__name__) 

15# ------------------------------------------------------------------------------ 

16 

17''' 

18Contains basic functions for more complex ETL functions and classes. 

19''' 

20 

21# COLOR-SCHEME------------------------------------------------------------------ 

22COLOR_SCHEME = dict( 

23 background='#242424', 

24 node='#343434', 

25 node_font='#B6ECF3', 

26 node_value='#343434', 

27 node_value_font='#DE958E', 

28 edge='#B6ECF3', 

29 edge_value='#DE958E', 

30 node_library_font='#DE958E', 

31 node_subpackage_font='#A0D17B', 

32 node_module_font='#B6ECF3', 

33 edge_library='#DE958E', 

34 edge_subpackage='#A0D17B', 

35 edge_module='#B6ECF3', 

36) # type: Dict[str, str] 

37 

38COLOR_SCALE = [ 

39 '#B6ECF3', 

40 '#DE958E', 

41 '#EBB483', 

42 '#A0D17B', 

43 '#93B6E6', 

44 '#AC92DE', 

45 '#E9EABE', 

46 '#7EC4CF', 

47 '#F77E70', 

48 '#EB9E58', 

49] # type: List[str] 

50 

51 

52# PREDICATE-FUNCTIONS----------------------------------------------------------- 

53def is_iterable(item): 

54 # type: (Any) -> bool 

55 ''' 

56 Determines if given item is iterable. 

57 

58 Args: 

59 item (object): Object to be tested. 

60 

61 Returns: 

62 bool: Whether given item is iterable. 

63 ''' 

64 if is_listlike(item) or is_dictlike(item): 

65 return True 

66 return False 

67 

68 

69def is_dictlike(item): 

70 # type: (Any) -> bool 

71 ''' 

72 Determines if given item is dict-like. 

73 

74 Args: 

75 item (object): Object to be tested. 

76 

77 Returns: 

78 bool: Whether given item is dict-like. 

79 ''' 

80 for type_ in [dict, OrderedDict]: 

81 if isinstance(item, type_): 

82 if item.__class__.__name__ == 'Counter': 

83 return False 

84 return True 

85 return False 

86 

87 

88def is_listlike(item): 

89 # type: (Any) -> bool 

90 ''' 

91 Determines if given item is list-like. 

92 

93 Args: 

94 item (object): Object to be tested. 

95 

96 Returns: 

97 bool: Whether given item is list-like. 

98 ''' 

99 for type_ in [list, tuple, set]: 

100 if isinstance(item, type_): 

101 return True 

102 return False 

103 

104 

105# CORE-FUNCTIONS---------------------------------------------------------------- 

106def flatten(item, separator='/', embed_types=True): 

107 # type: (Iterable, str, bool) -> Dict[str, Any] 

108 ''' 

109 Flattens a iterable object into a flat dictionary. 

110 

111 Args: 

112 item (object): Iterable object. 

113 separator (str, optional): Field separator in keys. Default: '/'. 

114 

115 Returns: 

116 dict: Dictionary representation of given object. 

117 ''' 

118 output = {} # type: Dict[str, Any] 

119 

120 def recurse(item, cursor): 

121 # type (Iterable, Any) -> None 

122 if is_listlike(item): 

123 if embed_types: 

124 name = item.__class__.__name__ 

125 item = [(f'<{name}_{i}>', val) for i, val in enumerate(item)] 

126 item = dict(item) 

127 else: 

128 item = dict(enumerate(item)) 

129 if is_dictlike(item): 

130 for key, val in item.items(): 

131 new_key = f'{cursor}{separator}{str(key)}' 

132 if is_iterable(val) and len(val) > 0: 

133 recurse(val, new_key) 

134 else: 

135 final_key = re.sub('^' + separator, '', new_key) 

136 output[final_key] = val 

137 

138 recurse(item, '') 

139 return output 

140 

141 

142def nest(flat_dict, separator='/'): 

143 # type: (Dict[str, Any], str) -> Dict[str, Any] 

144 ''' 

145 Converts a flat dictionary into a nested dictionary by splitting keys by a 

146 given separator. 

147 

148 Args: 

149 flat_dict (dict): Flat dictionary. 

150 separator (str, optional): Field separator within given dictionary's 

151 keys. Default: '/'. 

152 

153 Returns: 

154 dict: Nested dictionary. 

155 ''' 

156 output = {} # type: Dict[str, Any] 

157 for keys, val in flat_dict.items(): 

158 split_keys = list(filter( 

159 lambda x: x != '', keys.split(separator) 

160 )) 

161 cursor = output 

162 last = split_keys.pop() 

163 for key in split_keys: 

164 if key not in cursor: 

165 cursor[key] = {} 

166 

167 if not isinstance(cursor[key], dict): 

168 msg = f"Duplicate key conflict. Key: '{key}'." 

169 raise KeyError(msg) 

170 

171 cursor = cursor[key] 

172 cursor[last] = val 

173 return output 

174 

175 

176def unembed(item): 

177 # type: (Any) -> Any 

178 ''' 

179 Convert embeded types in dictionary keys into python types. 

180 

181 Args: 

182 item (object): Dictionary with embedded types. 

183 

184 Returns: 

185 object: Converted object. 

186 ''' 

187 lut = {'list': list, 'tuple': tuple, 'set': set} 

188 embed_re = re.compile(r'^<([a-z]+)_(\d+)>$') 

189 

190 if is_dictlike(item) and item != {}: 

191 output = {} # type: Any 

192 keys = list(item.keys()) 

193 match = embed_re.match(keys[0]) 

194 if match: 

195 indices = [embed_re.match(key).group(2) for key in keys] # type: ignore 

196 indices = map(int, indices) # type: ignore 

197 

198 output = [] 

199 for i, key in sorted(zip(indices, keys)): 

200 next_item = item[key] 

201 if is_dictlike(next_item): 

202 next_item = unembed(next_item) 

203 output.append(next_item) 

204 

205 output = lut[match.group(1)](output) 

206 return output 

207 else: 

208 for key, val in item.items(): 

209 output[key] = unembed(val) 

210 return output 

211 return item 

212 

213 

214# FILE-FUNCTIONS---------------------------------------------------------------- 

215def list_all_files(directory): 

216 # type: (Union[str, Path]) -> List[Path] 

217 ''' 

218 Recursively lists all files within a give directory. 

219 

220 Args: 

221 directory (str or Path): Directory to be recursed. 

222 

223 Returns: 

224 list[Path]: List of filepaths. 

225 ''' 

226 output = [] # type: List[Path] 

227 for root, dirs, files in os.walk(directory): 

228 for file_ in files: 

229 fullpath = Path(root, file_) 

230 output.append(fullpath) 

231 return output 

232 

233 

234def get_parent_fields(key, separator='/'): 

235 # type: (str, str) -> List[str] 

236 ''' 

237 Get all the parent fields of a given key, split by given separator. 

238 

239 Args: 

240 key (str): Key. 

241 separator (str, optional): String that splits key into fields. 

242 Default: '/'. 

243 

244 Returns: 

245 list(str): List of absolute parent fields. 

246 ''' 

247 fields = key.split(separator) 

248 output = [] # type: List[str] 

249 for i in range(len(fields) - 1): 

250 output.append(separator.join(fields[:i + 1])) 

251 return output 

252 

253 

254# EXPORT-FUNCTIONS-------------------------------------------------------------- 

255def dot_to_html(dot, layout='dot', as_png=False): 

256 # type: (pydot.Dot, str, bool) -> Union[HTML, Image] 

257 ''' 

258 Converts a given pydot graph into a IPython.display.HTML object. 

259 Used in jupyter lab inline display of graph data. 

260 

261 Args: 

262 dot (pydot.Dot): Pydot Graph instance. 

263 layout (str, optional): Graph layout style. 

264 Options include: circo, dot, fdp, neato, sfdp, twopi. 

265 Default: dot. 

266 as_png (bool, optional): Display graph as a PNG image instead of SVG. 

267 Useful for display on Github. Default: False. 

268 

269 Raises: 

270 ValueError: If invalid layout given. 

271 

272 Returns: 

273 IPython.display.HTML: HTML instance. 

274 ''' 

275 layouts = ['circo', 'dot', 'fdp', 'neato', 'sfdp', 'twopi'] 

276 if layout not in layouts: 

277 msg = f'Invalid layout value. {layout} not in {layouts}.' 

278 raise ValueError(msg) 

279 

280 if as_png: 

281 return Image(data=dot.create_png()) 

282 

283 svg = dot.create_svg(prog=layout) 

284 html = f'<object type="image/svg+xml" data="data:image/svg+xml;{svg}"></object>' # type: Any 

285 html = HTML(html) 

286 html.data = re.sub(r'\\n|\\', '', html.data) 

287 html.data = re.sub('</svg>.*', '</svg>', html.data) 

288 return html 

289 

290 

291def write_dot_graph( 

292 dot, 

293 fullpath, 

294 layout='dot', 

295): 

296 # type: (pydot.Dot, Union[str, Path], str) -> None 

297 ''' 

298 Writes a pydot.Dot object to a given filepath. 

299 Formats supported: svg, dot, png. 

300 

301 Args: 

302 dot (pydot.Dot): Pydot Dot instance. 

303 fulllpath (str or Path): File to be written to. 

304 layout (str, optional): Graph layout style. 

305 Options include: circo, dot, fdp, neato, sfdp, twopi. Default: dot. 

306 

307 Raises: 

308 ValueError: If invalid file extension given. 

309 ''' 

310 if isinstance(fullpath, Path): 

311 fullpath = Path(fullpath).absolute().as_posix() 

312 

313 _, ext = os.path.splitext(fullpath) 

314 ext = re.sub(r'^\.', '', ext) 

315 if re.search('^svg$', ext, re.I): 

316 dot.write_svg(fullpath, prog=layout) 

317 elif re.search('^dot$', ext, re.I): 

318 dot.write_dot(fullpath, prog=layout) 

319 elif re.search('^png$', ext, re.I): 

320 dot.write_png(fullpath, prog=layout) 

321 else: 

322 msg = f'Invalid extension found: {ext}. ' 

323 msg += 'Valid extensions include: svg, dot, png.' 

324 raise ValueError(msg)