Coverage for /home/ubuntu/hidebound/python/hidebound/core/tools.py: 100%

139 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-05 23:50 +0000

1from typing import Any, Callable, Dict, Generator, List, Union # noqa F401 

2 

3from collections import defaultdict 

4from datetime import datetime 

5from pathlib import Path 

6from pprint import pformat 

7import json 

8import os 

9import re 

10import shutil 

11 

12from lunchbox.enforce import Enforce 

13from schematics.exceptions import DataError, ValidationError 

14import dask.dataframe as dd 

15import pandas as pd 

16import pyjson5 as jsonc 

17 

18FilePath = Union[str, Path] 

19DF = Union[pd.DataFrame, dd.DataFrame] 

20DFS = Union[pd.DataFrame, pd.Series, dd.DataFrame, dd.Series] 

21# ------------------------------------------------------------------------------ 

22 

23 

24''' 

25The tools module contains general functions useful to other hidebound modules. 

26''' 

27 

28 

29# DIRECTORY-FUNCS--------------------------------------------------------------- 

30def traverse_directory( 

31 directory, include_regex='', exclude_regex='', entry_type='file' 

32): 

33 # type: (FilePath, str, str, str) -> Generator[Path, None, None] 

34 ''' 

35 Recusively list all files or directories within a given directory. 

36 

37 Args: 

38 directory (str or Path): Directory to walk. 

39 include_regex (str, optional): Include filenames that match this regex. 

40 Default: ''. 

41 exclude_regex (str, optional): Exclude filenames that match this regex. 

42 Default: ''. 

43 entry_type (str, optional): Kind of directory entry to return. Options 

44 include: file, directory. Default: file. 

45 

46 Raises: 

47 FileNotFoundError: If argument is not a directory or does not exist. 

48 EnforceError: If entry_type is not file or directory. 

49 

50 Yields: 

51 Path: File. 

52 ''' 

53 etypes = ['file', 'directory'] 

54 msg = 'Illegal entry type: {a}. Legal entry types: {b}.' 

55 Enforce(entry_type, 'in', etypes, message=msg) 

56 

57 directory = Path(directory) 

58 if not directory.is_dir(): 

59 msg = f'{directory} is not a directory or does not exist.' 

60 raise FileNotFoundError(msg) 

61 # -------------------------------------------------------------------------- 

62 

63 include_re = re.compile(include_regex) 

64 exclude_re = re.compile(exclude_regex) 

65 

66 for root, dirs, items in os.walk(directory): 

67 if entry_type == 'directory': 

68 items = dirs 

69 for item in items: 

70 filepath = Path(root, item) 

71 

72 output = True 

73 temp = filepath.absolute().as_posix() 

74 if include_regex != '' and not include_re.search(temp): 

75 output = False 

76 if exclude_regex != '' and exclude_re.search(temp): 

77 output = False 

78 

79 if output: 

80 yield filepath 

81 

82 

83def delete_empty_directories(directory): 

84 # type: (FilePath) -> None 

85 ''' 

86 Recurses given directory tree and deletes directories that do not contain 

87 files or directories trees with files. .DS_Store files do not count as 

88 files. Does not delete given directory. 

89 

90 Args: 

91 directory (str or Path): Directory to recurse. 

92 

93 Raises: 

94 EnforceError: If argument is not a directory or does not exist. 

95 ''' 

96 dir_ = Path(directory).as_posix() 

97 if not Path(dir_).is_dir(): 

98 msg = f'{dir_} is not a directory or does not exist.' 

99 raise FileNotFoundError(msg) 

100 

101 empty = [[], ['.DS_Store']] 

102 paths = [] 

103 for root, _, files in os.walk(directory): 

104 if files in empty: 

105 paths.append(root) 

106 

107 if dir_ in paths: 

108 paths.remove(dir_) 

109 

110 for path in reversed(paths): 

111 if os.listdir(path) in empty: 

112 shutil.rmtree(path) 

113 

114 

115def directory_to_dataframe(directory, include_regex='', exclude_regex=r'\.DS_Store'): 

116 # type: (FilePath, str, str) -> pd.DataFrame 

117 r''' 

118 Recursively list files with in a given directory as rows in a pd.DataFrame. 

119 

120 Args: 

121 directory (str or Path): Directory to walk. 

122 include_regex (str, optional): Include filenames that match this regex. 

123 Default: None. 

124 exclude_regex (str, optional): Exclude filenames that match this regex. 

125 Default: '\.DS_Store'. 

126 

127 Returns: 

128 pd.DataFrame: pd.DataFrame with one file per row. 

129 ''' 

130 files = traverse_directory( 

131 directory, 

132 include_regex=include_regex, 

133 exclude_regex=exclude_regex 

134 ) # type: Any 

135 files = sorted(list(files)) 

136 

137 data = pd.DataFrame() 

138 data['filepath'] = files 

139 data['filename'] = data.filepath.apply(lambda x: x.name) 

140 data['extension'] = data.filepath \ 

141 .apply(lambda x: Path(x).suffix.lstrip('.')) 

142 data.filepath = data.filepath.apply(lambda x: x.absolute().as_posix()) 

143 return data 

144 

145 

146# STRING-FUNCS------------------------------------------------------------------ 

147def str_to_bool(string): 

148 # type: (str) -> bool 

149 ''' 

150 Converts a string to a boolean value. 

151 

152 Args: 

153 string (str): String to be converted. 

154 

155 Returns: 

156 bool: Boolean 

157 ''' 

158 if string.lower() == 'true': 

159 return True 

160 return False 

161 

162 

163def error_to_string(error): 

164 # type: (Exception) -> str 

165 ''' 

166 Formats error as string. 

167 

168 Args: 

169 error (Exception): Error. 

170 

171 Returns: 

172 str: Error message. 

173 ''' 

174 output = error.args[0] 

175 if isinstance(error, DataError): 

176 output = '\n' + pformat(dict(output)) + '\n' 

177 elif isinstance(error, ValidationError): 

178 output = [x.summary for x in output] 

179 if len(output) == 1: 

180 output = f' {output} ' 

181 else: 

182 output = '\n' + '\n'.join(output) + '\n' 

183 else: 

184 output = f' {output} ' 

185 output = f'{error.__class__.__name__}({output})' 

186 return output 

187 

188 

189def time_string(): 

190 # type: () -> str 

191 ''' 

192 Returns: 

193 str: String representing current time. 

194 ''' 

195 return datetime.now().strftime('%Y-%m-%dT-%H-%M-%S') 

196 

197 

198# MISC-FUNCS-------------------------------------------------------------------- 

199def to_prototype(dicts): 

200 # type: (List[Dict]) -> Dict 

201 ''' 

202 Converts a list of dicts into a dict of lists. 

203 .. example:: 

204 :nowrap: 

205 

206 >>> dicts = [dict(a=1, b=2, c=3), dict(a=10, b=20)] 

207 >>> to_prototype(dicts) 

208 {'a': [1, 10], 'b': [2, 20], 'c': [3]} 

209 

210 Args: 

211 dicts (list[dict]): List of dicts. 

212 

213 Returns: 

214 dict: Prototype dictionary. 

215 ''' 

216 output = defaultdict(lambda: []) # type: Any 

217 for dict_ in dicts: 

218 for key, val in dict_.items(): 

219 output[key].append(val) 

220 output = dict(output) 

221 return output 

222 

223 

224# IO-FUNCS---------------------------------------------------------------------- 

225def write_json(data, filepath): 

226 # type: (object, FilePath) -> None 

227 ''' 

228 Convenience function for writing objects to JSON files. 

229 Writes lists with 1 item per line. 

230 

231 Args: 

232 data (object): Object to be written. 

233 filepath (Path or str): Filepath. 

234 ''' 

235 if isinstance(data, list): 

236 with open(filepath, 'w') as f: 

237 f.write('[\n') 

238 f.write(',\n'.join(map(json.dumps, data))) 

239 f.write('\n]') 

240 else: 

241 with open(filepath, 'w') as f: 

242 json.dump(data, f) 

243 

244 

245def read_json(filepath): 

246 # type (FilePath) -> object 

247 ''' 

248 Convenience function for reading JSON files. 

249 Files may include comments. 

250 

251 Args: 

252 filepath (Path or str): Filepath. 

253 

254 Raises: 

255 JSONDecodeError: If no JSON data could be decoded. 

256 

257 Returns: 

258 object: JSON object. 

259 ''' 

260 with open(filepath) as f: 

261 try: 

262 output = jsonc.load(f) 

263 except Exception as e: 

264 msg = f'No JSON data could be decoded from {filepath}. {str(e)}' 

265 raise json.JSONDecodeError(msg, '', 0) 

266 return output 

267 

268 

269# DASK-FUNCS-------------------------------------------------------------------- 

270def get_meta_kwargs(data, meta): 

271 # type: (DFS, Any) -> dict 

272 ''' 

273 Convenience utility for coercing the meta keyword between pandas and dask. 

274 

275 Args: 

276 data (DataFrame or Series): Pandas or dask object. 

277 meta (object): Meta key word argument. 

278 

279 Returns: 

280 dict: Appropriate keyword args. 

281 ''' 

282 kwargs = {} 

283 if meta != '__no_default__' and data.__class__ in [dd.DataFrame, dd.Series]: 

284 kwargs = dict(meta=meta) 

285 return kwargs 

286 

287 

288def pred_combinator( 

289 data, # type: DFS 

290 predicate, # type: Callable[[Any], bool] 

291 true_func, # type: Callable[[Any], Any] 

292 false_func, # type: Callable[[Any], Any] 

293 meta='object', # type: Any 

294): 

295 # type: (...) -> DFS 

296 ''' 

297 Apply true_func to rows where predicate if true and false_func to rows where 

298 it is false. 

299 

300 Args: 

301 data (DataFrame): DataFrame or Series. 

302 predicate (function): Function that expects a row and returns a bool. 

303 true_func (function): Function that expects a row. Called when predicate 

304 is true. 

305 false_func (function): Function that expects a row. Called when predicate 

306 is false. 

307 meta (object, optional): Metadata inference. Default: 'object'. 

308 

309 Returns: 

310 DataFrame or Series: Apply results. 

311 ''' 

312 kwargs = get_meta_kwargs(data, meta) 

313 if data.__class__ in [pd.DataFrame, dd.DataFrame]: 

314 return data.apply( 

315 lambda x: true_func(x) if predicate(x) else false_func(x), 

316 axis=1, 

317 **kwargs, 

318 ) 

319 return data.apply( 

320 lambda x: true_func(x) if predicate(x) else false_func(x), 

321 **kwargs, 

322 ) 

323 

324 

325def get_lut(data, column, aggregator, meta='__no_default__'): 

326 # type: (DF, str, Callable[[DF], Any], Any) -> DF 

327 ''' 

328 Constructs a lookup table with the given column as its keys and the 

329 aggregator results as its values. 

330 Data is grouped by given column and the given aggregator is applied to each 

331 group of values. 

332 

333 Args: 

334 data (DataFrame): DataFrame. 

335 column (str): Column to be used as the key. 

336 aggregator (function): Function that expects a group DataFrame and 

337 returns a scalar. 

338 meta (object, optional): Metadata inference. Default: '__no_default__'. 

339 

340 Returns: 

341 DataFrame: DataFrame with key and value columns. 

342 ''' 

343 kwargs = get_meta_kwargs(data, ('value', meta)) 

344 merge = pd.merge 

345 empty = pd.DataFrame(columns=['key', 'value']) 

346 if isinstance(data, dd.DataFrame): 

347 merge = dd.merge 

348 empty = dd.from_pandas(empty, npartitions=1) 

349 

350 grp = data.groupby(column) 

351 keys = grp[column].first().to_frame(name='key') 

352 if len(keys) == 0: 

353 return empty 

354 vals = grp.apply(aggregator, include_groups=False, **kwargs).to_frame(name='value') 

355 lut = merge(keys, vals, left_index=True, right_index=True) \ 

356 .reset_index(drop=True) 

357 return lut 

358 

359 

360def lut_combinator( 

361 data, key_column, value_column, aggregator, meta='__no_default__' 

362): 

363 # type: (DF, str, str, Callable[[DF], Any], Any) -> DF 

364 ''' 

365 Constructs a lookup table from given key_column, then applies it to given 

366 data as value column. 

367 

368 Args: 

369 data (DataFrame): DataFrame. 

370 key_column (str): Column to be used as the lut keys. 

371 value_column (str): Column to be used as the values. 

372 aggregator (function): Function that expects a pd.DataFrame. 

373 meta (object, optional): Metadata irom_nference. Default: '__no_default__'. 

374 

375 Returns: 

376 DataFrame: DataFrame with value column. 

377 ''' 

378 kwargs = get_meta_kwargs(data, meta) 

379 merge = pd.merge 

380 if isinstance(data, dd.DataFrame): 

381 merge = dd.merge 

382 

383 lut = get_lut(data, key_column, aggregator, **kwargs) 

384 lut.columns = [key_column, value_column] 

385 data = merge(data, lut, on=key_column, how='left') 

386 return data