Coverage for /home/ubuntu/hidebound/python/hidebound/core/tools.py: 100%
139 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-05 23:50 +0000
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-05 23:50 +0000
1from typing import Any, Callable, Dict, Generator, List, Union # noqa F401
3from collections import defaultdict
4from datetime import datetime
5from pathlib import Path
6from pprint import pformat
7import json
8import os
9import re
10import shutil
12from lunchbox.enforce import Enforce
13from schematics.exceptions import DataError, ValidationError
14import dask.dataframe as dd
15import pandas as pd
16import pyjson5 as jsonc
18FilePath = Union[str, Path]
19DF = Union[pd.DataFrame, dd.DataFrame]
20DFS = Union[pd.DataFrame, pd.Series, dd.DataFrame, dd.Series]
21# ------------------------------------------------------------------------------
24'''
25The tools module contains general functions useful to other hidebound modules.
26'''
29# DIRECTORY-FUNCS---------------------------------------------------------------
30def traverse_directory(
31 directory, include_regex='', exclude_regex='', entry_type='file'
32):
33 # type: (FilePath, str, str, str) -> Generator[Path, None, None]
34 '''
35 Recusively list all files or directories within a given directory.
37 Args:
38 directory (str or Path): Directory to walk.
39 include_regex (str, optional): Include filenames that match this regex.
40 Default: ''.
41 exclude_regex (str, optional): Exclude filenames that match this regex.
42 Default: ''.
43 entry_type (str, optional): Kind of directory entry to return. Options
44 include: file, directory. Default: file.
46 Raises:
47 FileNotFoundError: If argument is not a directory or does not exist.
48 EnforceError: If entry_type is not file or directory.
50 Yields:
51 Path: File.
52 '''
53 etypes = ['file', 'directory']
54 msg = 'Illegal entry type: {a}. Legal entry types: {b}.'
55 Enforce(entry_type, 'in', etypes, message=msg)
57 directory = Path(directory)
58 if not directory.is_dir():
59 msg = f'{directory} is not a directory or does not exist.'
60 raise FileNotFoundError(msg)
61 # --------------------------------------------------------------------------
63 include_re = re.compile(include_regex)
64 exclude_re = re.compile(exclude_regex)
66 for root, dirs, items in os.walk(directory):
67 if entry_type == 'directory':
68 items = dirs
69 for item in items:
70 filepath = Path(root, item)
72 output = True
73 temp = filepath.absolute().as_posix()
74 if include_regex != '' and not include_re.search(temp):
75 output = False
76 if exclude_regex != '' and exclude_re.search(temp):
77 output = False
79 if output:
80 yield filepath
83def delete_empty_directories(directory):
84 # type: (FilePath) -> None
85 '''
86 Recurses given directory tree and deletes directories that do not contain
87 files or directories trees with files. .DS_Store files do not count as
88 files. Does not delete given directory.
90 Args:
91 directory (str or Path): Directory to recurse.
93 Raises:
94 EnforceError: If argument is not a directory or does not exist.
95 '''
96 dir_ = Path(directory).as_posix()
97 if not Path(dir_).is_dir():
98 msg = f'{dir_} is not a directory or does not exist.'
99 raise FileNotFoundError(msg)
101 empty = [[], ['.DS_Store']]
102 paths = []
103 for root, _, files in os.walk(directory):
104 if files in empty:
105 paths.append(root)
107 if dir_ in paths:
108 paths.remove(dir_)
110 for path in reversed(paths):
111 if os.listdir(path) in empty:
112 shutil.rmtree(path)
115def directory_to_dataframe(directory, include_regex='', exclude_regex=r'\.DS_Store'):
116 # type: (FilePath, str, str) -> pd.DataFrame
117 r'''
118 Recursively list files with in a given directory as rows in a pd.DataFrame.
120 Args:
121 directory (str or Path): Directory to walk.
122 include_regex (str, optional): Include filenames that match this regex.
123 Default: None.
124 exclude_regex (str, optional): Exclude filenames that match this regex.
125 Default: '\.DS_Store'.
127 Returns:
128 pd.DataFrame: pd.DataFrame with one file per row.
129 '''
130 files = traverse_directory(
131 directory,
132 include_regex=include_regex,
133 exclude_regex=exclude_regex
134 ) # type: Any
135 files = sorted(list(files))
137 data = pd.DataFrame()
138 data['filepath'] = files
139 data['filename'] = data.filepath.apply(lambda x: x.name)
140 data['extension'] = data.filepath \
141 .apply(lambda x: Path(x).suffix.lstrip('.'))
142 data.filepath = data.filepath.apply(lambda x: x.absolute().as_posix())
143 return data
146# STRING-FUNCS------------------------------------------------------------------
147def str_to_bool(string):
148 # type: (str) -> bool
149 '''
150 Converts a string to a boolean value.
152 Args:
153 string (str): String to be converted.
155 Returns:
156 bool: Boolean
157 '''
158 if string.lower() == 'true':
159 return True
160 return False
163def error_to_string(error):
164 # type: (Exception) -> str
165 '''
166 Formats error as string.
168 Args:
169 error (Exception): Error.
171 Returns:
172 str: Error message.
173 '''
174 output = error.args[0]
175 if isinstance(error, DataError):
176 output = '\n' + pformat(dict(output)) + '\n'
177 elif isinstance(error, ValidationError):
178 output = [x.summary for x in output]
179 if len(output) == 1:
180 output = f' {output} '
181 else:
182 output = '\n' + '\n'.join(output) + '\n'
183 else:
184 output = f' {output} '
185 output = f'{error.__class__.__name__}({output})'
186 return output
189def time_string():
190 # type: () -> str
191 '''
192 Returns:
193 str: String representing current time.
194 '''
195 return datetime.now().strftime('%Y-%m-%dT-%H-%M-%S')
198# MISC-FUNCS--------------------------------------------------------------------
199def to_prototype(dicts):
200 # type: (List[Dict]) -> Dict
201 '''
202 Converts a list of dicts into a dict of lists.
203 .. example::
204 :nowrap:
206 >>> dicts = [dict(a=1, b=2, c=3), dict(a=10, b=20)]
207 >>> to_prototype(dicts)
208 {'a': [1, 10], 'b': [2, 20], 'c': [3]}
210 Args:
211 dicts (list[dict]): List of dicts.
213 Returns:
214 dict: Prototype dictionary.
215 '''
216 output = defaultdict(lambda: []) # type: Any
217 for dict_ in dicts:
218 for key, val in dict_.items():
219 output[key].append(val)
220 output = dict(output)
221 return output
224# IO-FUNCS----------------------------------------------------------------------
225def write_json(data, filepath):
226 # type: (object, FilePath) -> None
227 '''
228 Convenience function for writing objects to JSON files.
229 Writes lists with 1 item per line.
231 Args:
232 data (object): Object to be written.
233 filepath (Path or str): Filepath.
234 '''
235 if isinstance(data, list):
236 with open(filepath, 'w') as f:
237 f.write('[\n')
238 f.write(',\n'.join(map(json.dumps, data)))
239 f.write('\n]')
240 else:
241 with open(filepath, 'w') as f:
242 json.dump(data, f)
245def read_json(filepath):
246 # type (FilePath) -> object
247 '''
248 Convenience function for reading JSON files.
249 Files may include comments.
251 Args:
252 filepath (Path or str): Filepath.
254 Raises:
255 JSONDecodeError: If no JSON data could be decoded.
257 Returns:
258 object: JSON object.
259 '''
260 with open(filepath) as f:
261 try:
262 output = jsonc.load(f)
263 except Exception as e:
264 msg = f'No JSON data could be decoded from {filepath}. {str(e)}'
265 raise json.JSONDecodeError(msg, '', 0)
266 return output
269# DASK-FUNCS--------------------------------------------------------------------
270def get_meta_kwargs(data, meta):
271 # type: (DFS, Any) -> dict
272 '''
273 Convenience utility for coercing the meta keyword between pandas and dask.
275 Args:
276 data (DataFrame or Series): Pandas or dask object.
277 meta (object): Meta key word argument.
279 Returns:
280 dict: Appropriate keyword args.
281 '''
282 kwargs = {}
283 if meta != '__no_default__' and data.__class__ in [dd.DataFrame, dd.Series]:
284 kwargs = dict(meta=meta)
285 return kwargs
288def pred_combinator(
289 data, # type: DFS
290 predicate, # type: Callable[[Any], bool]
291 true_func, # type: Callable[[Any], Any]
292 false_func, # type: Callable[[Any], Any]
293 meta='object', # type: Any
294):
295 # type: (...) -> DFS
296 '''
297 Apply true_func to rows where predicate if true and false_func to rows where
298 it is false.
300 Args:
301 data (DataFrame): DataFrame or Series.
302 predicate (function): Function that expects a row and returns a bool.
303 true_func (function): Function that expects a row. Called when predicate
304 is true.
305 false_func (function): Function that expects a row. Called when predicate
306 is false.
307 meta (object, optional): Metadata inference. Default: 'object'.
309 Returns:
310 DataFrame or Series: Apply results.
311 '''
312 kwargs = get_meta_kwargs(data, meta)
313 if data.__class__ in [pd.DataFrame, dd.DataFrame]:
314 return data.apply(
315 lambda x: true_func(x) if predicate(x) else false_func(x),
316 axis=1,
317 **kwargs,
318 )
319 return data.apply(
320 lambda x: true_func(x) if predicate(x) else false_func(x),
321 **kwargs,
322 )
325def get_lut(data, column, aggregator, meta='__no_default__'):
326 # type: (DF, str, Callable[[DF], Any], Any) -> DF
327 '''
328 Constructs a lookup table with the given column as its keys and the
329 aggregator results as its values.
330 Data is grouped by given column and the given aggregator is applied to each
331 group of values.
333 Args:
334 data (DataFrame): DataFrame.
335 column (str): Column to be used as the key.
336 aggregator (function): Function that expects a group DataFrame and
337 returns a scalar.
338 meta (object, optional): Metadata inference. Default: '__no_default__'.
340 Returns:
341 DataFrame: DataFrame with key and value columns.
342 '''
343 kwargs = get_meta_kwargs(data, ('value', meta))
344 merge = pd.merge
345 empty = pd.DataFrame(columns=['key', 'value'])
346 if isinstance(data, dd.DataFrame):
347 merge = dd.merge
348 empty = dd.from_pandas(empty, npartitions=1)
350 grp = data.groupby(column)
351 keys = grp[column].first().to_frame(name='key')
352 if len(keys) == 0:
353 return empty
354 vals = grp.apply(aggregator, include_groups=False, **kwargs).to_frame(name='value')
355 lut = merge(keys, vals, left_index=True, right_index=True) \
356 .reset_index(drop=True)
357 return lut
360def lut_combinator(
361 data, key_column, value_column, aggregator, meta='__no_default__'
362):
363 # type: (DF, str, str, Callable[[DF], Any], Any) -> DF
364 '''
365 Constructs a lookup table from given key_column, then applies it to given
366 data as value column.
368 Args:
369 data (DataFrame): DataFrame.
370 key_column (str): Column to be used as the lut keys.
371 value_column (str): Column to be used as the values.
372 aggregator (function): Function that expects a pd.DataFrame.
373 meta (object, optional): Metadata irom_nference. Default: '__no_default__'.
375 Returns:
376 DataFrame: DataFrame with value column.
377 '''
378 kwargs = get_meta_kwargs(data, meta)
379 merge = pd.merge
380 if isinstance(data, dd.DataFrame):
381 merge = dd.merge
383 lut = get_lut(data, key_column, aggregator, **kwargs)
384 lut.columns = [key_column, value_column]
385 data = merge(data, lut, on=key_column, how='left')
386 return data