Coverage for /home/ubuntu/hidebound/python/hidebound/core/database_tools.py: 100%
139 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-05 23:50 +0000
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-05 23:50 +0000
1from typing import Any, Dict, Optional, Tuple, Union # noqa F401
2from hidebound.core.specification_base import SpecificationBase # noqa F401
4from collections import defaultdict
5from pathlib import Path
6import re
7import uuid
9from schematics.exceptions import DataError, ValidationError
10import dask.dataframe as dd
11import lunchbox.tools as lbt
12import numpy as np
13import pandas as pd
15from hidebound.core.parser import AssetNameParser
16import hidebound.core.tools as hbt
18DF = Union[pd.DataFrame, dd.DataFrame]
19# ------------------------------------------------------------------------------
22'''
23A library of tools for Database to use in construction of its central DataFrame.
24'''
27def add_specification(data, specifications):
28 # type: (DF, Dict[str, SpecificationBase]) -> DF
29 '''
30 Adds specification data to given DataFrame.
32 Columns added:
34 * specification
35 * specification_class
36 * file_error
38 Args:
39 data (DataFrame): DataFrame.
40 specifications (dict): Dictionary of specifications.
42 Returns:
43 DataFrame: DataFrame with specification, specification_class and
44 file_error columns.
45 '''
46 def get_spec(filename):
47 output = lbt.try_(
48 AssetNameParser.parse_specification, filename, 'error'
49 )
50 if isinstance(output, dict):
51 return output['specification'], np.nan
52 return np.nan, str(output)
54 # parse filenames
55 parse = data.filename \
56 .apply(get_spec, **hbt.get_meta_kwargs(data, ('filename', 'object')))
58 # set specifications
59 data['specification'] = parse.apply(
60 lambda x: x[0],
61 **hbt.get_meta_kwargs(data, ('specification', str))
62 )
64 # set file errors
65 data['file_error'] = parse \
66 .apply(lambda x: x[1], **hbt.get_meta_kwargs(data, ('file_error', str)))
68 # add specification classes
69 data['specification_class'] = data.specification.apply(
70 lambda x: specifications.get(x, np.nan),
71 **hbt.get_meta_kwargs(data, ('specification_class', 'object'))
72 )
74 # add spec not found errors to rows with no file errors
75 error = hbt.error_to_string(KeyError('Specification not found.'))
76 data.file_error = data.file_error.mask(
77 data.file_error.isnull() & data.specification_class.isnull(),
78 error
79 )
80 return data
83def validate_filepath(data):
84 # type: (DF) -> DF
85 '''
86 Validates filepath column of given DataFrame.
87 Adds error to error column if invalid.
89 Args:
90 data (DataFrame): DataFrame.
92 Returns:
93 DataFrame: DataFrame with updated file_error columns.
94 '''
95 def validate(row):
96 try:
97 row.specification_class().validate_filepath(row.filepath)
98 return np.nan
99 except ValidationError as e:
100 return hbt.error_to_string(e)
102 data.file_error = hbt.pred_combinator(
103 data,
104 lambda x: pd.isnull(x.file_error),
105 validate,
106 lambda x: x.file_error,
107 **hbt.get_meta_kwargs(data, ('file_error', 'object'))
108 )
109 return data
112def add_file_traits(data):
113 # type: (DF) -> DF
114 '''
115 Adds traits derived from file in filepath.
116 Add file_traits column and one column per traits key.
118 Args:
119 data (DataFrame): DataFrame.
121 Returns:
122 DataFrame: DataFrame with updated file_error columns.
123 '''
124 data['file_traits'] = hbt.pred_combinator(
125 data,
126 lambda x: pd.notnull(x.specification_class),
127 lambda x: x.specification_class().get_traits(x.filepath),
128 lambda x: {},
129 **hbt.get_meta_kwargs(data, ('file_traits', 'object'))
130 )
131 return data
134def add_relative_path(data, column, root_dir):
135 # type: (DF, str, Union[str, Path]) -> DF
136 '''
137 Adds relative path column derived from given column.
139 Args:
140 data (DataFrame): DataFrame.
141 column (str): Column to be made relative.
142 root_dir (Path or str): Root path to be removed.
144 Returns:
145 DataFrame: DataFrame with updated [column]_relative column.
146 '''
147 root_dir_ = Path(root_dir).as_posix() # type: str
148 if not root_dir_.endswith('/'):
149 root_dir_ += '/'
150 col = column + '_relative'
151 data[col] = hbt.pred_combinator(
152 data[column],
153 lambda x: isinstance(x, str),
154 lambda x: re.sub(root_dir_, '', Path(x).as_posix()),
155 lambda x: x,
156 **hbt.get_meta_kwargs(data, (col, str))
157 )
158 return data
161def add_asset_name(data):
162 # type: (DF) -> DF
163 '''
164 Adds asset_name column derived from filepath.
166 Args:
167 data (DataFrame): DataFrame.
169 Returns:
170 DataFrame: DataFrame with updated asset_name column.
171 '''
172 data['asset_name'] = hbt.pred_combinator(
173 data,
174 lambda x: pd.isnull(x.file_error),
175 lambda x: x.specification_class().get_asset_name(x.filepath),
176 lambda x: np.nan,
177 **hbt.get_meta_kwargs(data, ('asset_name', str))
178 )
179 return data
182def add_asset_path(data):
183 # type: (DF) -> DF
184 '''
185 Adds asset_path column derived from filepath.
187 Args:
188 data (DataFrame): DataFrame.
190 Returns:
191 DataFrame: DataFrame with asset_path column.
192 '''
193 data['asset_path'] = hbt.pred_combinator(
194 data,
195 lambda x: pd.notnull(x.specification_class),
196 lambda x: x.specification_class().get_asset_path(x.filepath).as_posix(),
197 lambda x: np.nan,
198 **hbt.get_meta_kwargs(data, ('asset_path', str))
199 )
200 return data
203def add_asset_type(data):
204 # type: (DF) -> DF
205 '''
206 Adds asset_type column derived from specification.
208 Args:
209 data (DataFrame): DataFrame.
211 Returns:
212 DataFrame: DataFrame with asset_type column.
213 '''
214 data['asset_type'] = hbt.pred_combinator(
215 data.specification_class,
216 lambda x: pd.notnull(x),
217 lambda x: x.asset_type,
218 lambda x: np.nan,
219 **hbt.get_meta_kwargs(data, ('asset_type', str))
220 )
221 return data
224def add_asset_traits(data):
225 # type: (DF) -> DF
226 '''
227 Adds traits derived from aggregation of file traits.
228 Add asset_traits column and one column per traits key.
230 Args:
231 data (DataFrame): DataFrame.
233 Returns:
234 DataFrame: DataFrame with asset_traits column.
235 '''
236 data = hbt.lut_combinator(
237 data,
238 'asset_path',
239 'asset_traits',
240 lambda x: x.file_traits.tolist(),
241 **hbt.get_meta_kwargs(data, 'object')
242 )
243 data.asset_traits = hbt.pred_combinator(
244 data.asset_traits,
245 lambda x: isinstance(x, list),
246 hbt.to_prototype,
247 lambda x: np.nan,
248 **hbt.get_meta_kwargs(data, ('asset_traits', 'object'))
249 )
250 return data
253def validate_assets(data):
254 # type: (DF) -> DF
255 '''
256 Validates assets according to their specification.
257 Add asset_error and asset_valid columns.
259 Args:
260 data (DataFrame): DataFrame.
262 Returns:
263 DataFrame: DataFrame with asset_error and asset_valid columns.
264 '''
265 def error_func(row):
266 try:
267 row.specification_class(row.asset_traits).validate()
268 except DataError as e:
269 return hbt.error_to_string(e)
270 return np.nan
272 # add asset error
273 data['asset_error'] = hbt.pred_combinator(
274 data,
275 lambda x: isinstance(x.asset_traits, dict) and pd.notnull(x.specification_class),
276 error_func,
277 lambda x: np.nan,
278 **hbt.get_meta_kwargs(data, ('asset_error', 'object'))
279 )
281 # assign asset_valid column
282 data['asset_valid'] = hbt.pred_combinator(
283 data,
284 lambda x: pd.isnull(x.asset_error) and pd.isnull(x.file_error) and pd.notnull(x.specification_class), # noqa E501
285 lambda x: True,
286 lambda x: False,
287 **hbt.get_meta_kwargs(data, ('asset_valid', bool))
288 )
289 return data
292def cleanup(data):
293 # type: (DF) -> DF
294 '''
295 Ensures only specific columns are present and in correct order and Paths
296 are converted to strings.
298 Args:
299 data (DataFrame): DataFrame.
301 Returns:
302 DataFrame: Cleaned up DataFrame.
303 '''
304 columns = [
305 'specification',
306 'extension',
307 'filename',
308 'filepath',
309 'file_error',
310 'file_traits',
311 'asset_name',
312 'asset_path',
313 'asset_type',
314 'asset_traits',
315 'asset_error',
316 'asset_valid',
317 ]
318 # if no files are found return empty DataFrame
319 for col in columns:
320 if col not in data.columns:
321 data[col] = np.nan
322 # use copy to avoid SettingWithCopyWarning
323 # TODO: figure out a way to prevent warning without copy.
324 cols = data.columns
325 cols = set(cols).difference(columns)
326 cols = sorted(cols)
327 cols = columns + cols
328 cols = list(filter(lambda x: x != 'specification_class', cols))
329 data = data[cols].copy()
331 # convert Paths to str
332 for col in data.columns:
333 mask = data[col].apply(lambda x: isinstance(x, Path))
334 data.loc[mask, col] = data.loc[mask, col]\
335 .apply(lambda x: x.absolute().as_posix())
336 return data
339def add_asset_id(data):
340 # type: (pd.DataFrame) -> pd.DataFrame
341 '''
342 Adds asset_id column derived UUID hash of asset filepath.
344 Args:
345 data (pd.DataFrame): DataFrame.
347 Returns:
348 pd.DataFrame: DataFrame with asset_id column.
349 '''
350 mask = data.file_error.isnull()
351 data['asset_id'] = np.nan
352 data.asset_id = data.asset_id.astype(np.object_)
353 if len(data[mask]) > 0:
354 data.loc[mask, 'asset_id'] = data.loc[mask].apply(
355 lambda x: x.specification_class().get_asset_id(x.filepath),
356 axis=1,
357 ).astype(str)
358 return data
361def get_data_for_write(
362 data, # type: pd.DataFrame
363 source_dir, # type: Union[str, Path]
364 target_dir, # type: Union[str, Path]
365): # type: (...) -> Optional[Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]] # noqa: E501
366 '''
367 Split given data into three DataFrame creating files.
369 Args:
370 data: DataFrame: DataFrame to be transformed.
371 source_dir (str or Path): Source directory of asset files.
372 target_dir (str or Path): Target directory where data will be written.
374 DataFrames:
376 * File data - For writing asset file data to a target filepath.
377 * Asset metadata - For writing asset metadata to a target json file.
378 * File metadata - For writing file metadata to a target json file.
379 * Asset chunk - For writing asset metadata chunk to a target json file.
380 * File chunk - For writing file metadata chunk to a target json file.
382 Returns:
383 tuple[DataFrame]: file_data, asset_metadata, file_metadata, asset_chunk,
384 file_chunk.
385 '''
386 # TODO: flatten file_traits and flatten asset_traits
387 # get valid asset data
388 data = data.copy()
389 data = data[data.asset_valid]
391 # return if there is no valid asset data
392 if len(data) == 0:
393 return None
395 source_dir = Path(source_dir).absolute().as_posix()
396 data_dir = Path(target_dir, 'content').absolute().as_posix()
397 meta_dir = Path(target_dir, 'metadata').absolute().as_posix()
399 # add asset id
400 keys = data.asset_path.unique().tolist()
401 vals = [str(uuid.uuid4()) for x in keys]
402 lut = dict(zip(keys, vals)) # type: Any
403 lut = defaultdict(lambda: np.nan, lut)
404 data['asset_id'] = data.asset_path.apply(lambda x: lut[x])
406 # create file id and metadata
407 data['file_id'] = data.asset_name.apply(lambda x: str(uuid.uuid4()))
408 data['metadata'] = data.apply(
409 lambda x: dict(
410 asset_id=x.asset_id,
411 asset_path=Path(data_dir, x.asset_path_relative).as_posix(),
412 asset_path_relative=x.asset_path_relative,
413 asset_name=x.asset_name,
414 asset_type=x.asset_type,
415 file_id=x.file_id,
416 file_traits=x.file_traits,
417 filename=x.filename,
418 filepath=Path(data_dir, x.filepath_relative).as_posix(),
419 filepath_relative=x.filepath_relative,
420 ),
421 axis=1
422 )
424 # create asset metadata
425 asset_meta = data\
426 .groupby('asset_id', as_index=False) \
427 .agg(lambda x: x.tolist())
429 meta = []
430 lut = dict(
431 asset_id='asset_id',
432 asset_path='asset_path',
433 asset_path_relative='asset_path_relative',
434 asset_name='asset_name',
435 asset_traits='asset_traits',
436 asset_type='asset_type',
437 file_id='file_ids',
438 file_traits='file_traits',
439 filename='filenames',
440 filepath='filepaths',
441 filepath_relative='filepaths_relative',
442 )
443 keys = asset_meta.columns.tolist()
444 for _, row in asset_meta.iterrows():
445 vals = row.tolist()
446 item = dict(zip(keys, vals))
447 item = {lut[k]: item[k] for k in lut.keys()}
449 # grab the first occurence of these columns
450 cols = [
451 'asset_name',
452 'asset_path',
453 'asset_path_relative',
454 'asset_type',
455 'asset_traits'
456 ]
457 for col in cols:
458 item[col] = item[col][0]
459 del item['file_traits']
461 # replace asset root
462 item['asset_path'] = Path(data_dir, item['asset_path_relative']) \
463 .as_posix()
465 meta.append(item)
466 asset_meta['metadata'] = meta
468 asset_meta['target'] = asset_meta.asset_id\
469 .apply(lambda x: Path(meta_dir, 'asset', x + '.json').as_posix())
470 asset_meta = asset_meta[['metadata', 'target']]
472 # create file data
473 file_data = data.copy()
474 file_data['source'] = file_data.filepath
475 file_data['target'] = file_data.source\
476 .apply(lambda x: re.sub(source_dir, data_dir, x))
477 file_data = file_data[['source', 'target']]
479 # create file metadata
480 file_meta = data.copy()
481 file_meta['target'] = file_meta.file_id\
482 .apply(lambda x: Path(meta_dir, 'file', x + '.json').as_posix())
483 file_meta = file_meta[['metadata', 'target']]
485 # get time
486 now = hbt.time_string()
488 # create asset chunk
489 asset_chunk = pd.DataFrame()
490 asset_chunk['metadata'] = [asset_meta.metadata.tolist()]
491 asset_chunk['target'] = [Path(
492 meta_dir, 'asset-chunk', f'hidebound-asset-chunk_{now}.json'
493 ).as_posix()]
495 # create file chunk
496 file_chunk = pd.DataFrame()
497 file_chunk['metadata'] = [file_meta.metadata.tolist()]
498 file_chunk['target'] = [Path(
499 meta_dir, 'file-chunk', f'hidebound-file-chunk_{now}.json'
500 ).as_posix()]
502 return file_data, asset_meta, file_meta, asset_chunk, file_chunk