Source code for hidebound.core.database_tools

from typing import Any, Dict, Optional, Tuple, Union  # noqa F401
from hidebound.core.specification_base import SpecificationBase  # noqa F401

from collections import defaultdict
from pathlib import Path
import re
import uuid

from schematics.exceptions import DataError, ValidationError
import dask.dataframe as dd
import lunchbox.tools as lbt
import numpy as np
import pandas as pd

from hidebound.core.parser import AssetNameParser
import hidebound.core.tools as hbt

DF = Union[pd.DataFrame, dd.DataFrame]
# ------------------------------------------------------------------------------


'''
A library of tools for Database to use in construction of its central DataFrame.
'''


[docs] def add_specification(data, specifications): # type: (DF, Dict[str, SpecificationBase]) -> DF ''' Adds specification data to given DataFrame. Columns added: * specification * specification_class * file_error Args: data (DataFrame): DataFrame. specifications (dict): Dictionary of specifications. Returns: DataFrame: DataFrame with specification, specification_class and file_error columns. ''' def get_spec(filename): output = lbt.try_( AssetNameParser.parse_specification, filename, 'error' ) if isinstance(output, dict): return output['specification'], np.nan return np.nan, str(output) # parse filenames parse = data.filename \ .apply(get_spec, **hbt.get_meta_kwargs(data, ('filename', 'object'))) # set specifications data['specification'] = parse.apply( lambda x: x[0], **hbt.get_meta_kwargs(data, ('specification', str)) ) # set file errors data['file_error'] = parse \ .apply(lambda x: x[1], **hbt.get_meta_kwargs(data, ('file_error', str))) # add specification classes data['specification_class'] = data.specification.apply( lambda x: specifications.get(x, np.nan), **hbt.get_meta_kwargs(data, ('specification_class', 'object')) ) # add spec not found errors to rows with no file errors error = hbt.error_to_string(KeyError('Specification not found.')) data.file_error = data.file_error.mask( data.file_error.isnull() & data.specification_class.isnull(), error ) return data
[docs] def validate_filepath(data): # type: (DF) -> DF ''' Validates filepath column of given DataFrame. Adds error to error column if invalid. Args: data (DataFrame): DataFrame. Returns: DataFrame: DataFrame with updated file_error columns. ''' def validate(row): try: row.specification_class().validate_filepath(row.filepath) return np.nan except ValidationError as e: return hbt.error_to_string(e) data.file_error = hbt.pred_combinator( data, lambda x: pd.isnull(x.file_error), validate, lambda x: x.file_error, **hbt.get_meta_kwargs(data, ('file_error', 'object')) ) return data
[docs] def add_file_traits(data): # type: (DF) -> DF ''' Adds traits derived from file in filepath. Add file_traits column and one column per traits key. Args: data (DataFrame): DataFrame. Returns: DataFrame: DataFrame with updated file_error columns. ''' data['file_traits'] = hbt.pred_combinator( data, lambda x: pd.notnull(x.specification_class), lambda x: x.specification_class().get_traits(x.filepath), lambda x: {}, **hbt.get_meta_kwargs(data, ('file_traits', 'object')) ) return data
[docs] def add_relative_path(data, column, root_dir): # type: (DF, str, Union[str, Path]) -> DF ''' Adds relative path column derived from given column. Args: data (DataFrame): DataFrame. column (str): Column to be made relative. root_dir (Path or str): Root path to be removed. Returns: DataFrame: DataFrame with updated [column]_relative column. ''' root_dir_ = Path(root_dir).as_posix() # type: str if not root_dir_.endswith('/'): root_dir_ += '/' col = column + '_relative' data[col] = hbt.pred_combinator( data[column], lambda x: isinstance(x, str), lambda x: re.sub(root_dir_, '', Path(x).as_posix()), lambda x: x, **hbt.get_meta_kwargs(data, (col, str)) ) return data
[docs] def add_asset_name(data): # type: (DF) -> DF ''' Adds asset_name column derived from filepath. Args: data (DataFrame): DataFrame. Returns: DataFrame: DataFrame with updated asset_name column. ''' data['asset_name'] = hbt.pred_combinator( data, lambda x: pd.isnull(x.file_error), lambda x: x.specification_class().get_asset_name(x.filepath), lambda x: np.nan, **hbt.get_meta_kwargs(data, ('asset_name', str)) ) return data
[docs] def add_asset_path(data): # type: (DF) -> DF ''' Adds asset_path column derived from filepath. Args: data (DataFrame): DataFrame. Returns: DataFrame: DataFrame with asset_path column. ''' data['asset_path'] = hbt.pred_combinator( data, lambda x: pd.notnull(x.specification_class), lambda x: x.specification_class().get_asset_path(x.filepath).as_posix(), lambda x: np.nan, **hbt.get_meta_kwargs(data, ('asset_path', str)) ) return data
[docs] def add_asset_type(data): # type: (DF) -> DF ''' Adds asset_type column derived from specification. Args: data (DataFrame): DataFrame. Returns: DataFrame: DataFrame with asset_type column. ''' data['asset_type'] = hbt.pred_combinator( data.specification_class, lambda x: pd.notnull(x), lambda x: x.asset_type, lambda x: np.nan, **hbt.get_meta_kwargs(data, ('asset_type', str)) ) return data
[docs] def add_asset_traits(data): # type: (DF) -> DF ''' Adds traits derived from aggregation of file traits. Add asset_traits column and one column per traits key. Args: data (DataFrame): DataFrame. Returns: DataFrame: DataFrame with asset_traits column. ''' data = hbt.lut_combinator( data, 'asset_path', 'asset_traits', lambda x: x.file_traits.tolist(), **hbt.get_meta_kwargs(data, 'object') ) data.asset_traits = hbt.pred_combinator( data.asset_traits, lambda x: isinstance(x, list), hbt.to_prototype, lambda x: np.nan, **hbt.get_meta_kwargs(data, ('asset_traits', 'object')) ) return data
[docs] def validate_assets(data): # type: (DF) -> DF ''' Validates assets according to their specification. Add asset_error and asset_valid columns. Args: data (DataFrame): DataFrame. Returns: DataFrame: DataFrame with asset_error and asset_valid columns. ''' def error_func(row): try: row.specification_class(row.asset_traits).validate() except DataError as e: return hbt.error_to_string(e) return np.nan # add asset error data['asset_error'] = hbt.pred_combinator( data, lambda x: isinstance(x.asset_traits, dict) and pd.notnull(x.specification_class), error_func, lambda x: np.nan, **hbt.get_meta_kwargs(data, ('asset_error', 'object')) ) # assign asset_valid column data['asset_valid'] = hbt.pred_combinator( data, lambda x: pd.isnull(x.asset_error) and pd.isnull(x.file_error) and pd.notnull(x.specification_class), # noqa E501 lambda x: True, lambda x: False, **hbt.get_meta_kwargs(data, ('asset_valid', bool)) ) return data
[docs] def cleanup(data): # type: (DF) -> DF ''' Ensures only specific columns are present and in correct order and Paths are converted to strings. Args: data (DataFrame): DataFrame. Returns: DataFrame: Cleaned up DataFrame. ''' columns = [ 'specification', 'extension', 'filename', 'filepath', 'file_error', 'file_traits', 'asset_name', 'asset_path', 'asset_type', 'asset_traits', 'asset_error', 'asset_valid', ] # if no files are found return empty DataFrame for col in columns: if col not in data.columns: data[col] = np.nan # use copy to avoid SettingWithCopyWarning # TODO: figure out a way to prevent warning without copy. cols = data.columns cols = set(cols).difference(columns) cols = sorted(cols) cols = columns + cols cols = list(filter(lambda x: x != 'specification_class', cols)) data = data[cols].copy() # convert Paths to str for col in data.columns: mask = data[col].apply(lambda x: isinstance(x, Path)) data.loc[mask, col] = data.loc[mask, col]\ .apply(lambda x: x.absolute().as_posix()) return data
[docs] def add_asset_id(data): # type: (pd.DataFrame) -> pd.DataFrame ''' Adds asset_id column derived UUID hash of asset filepath. Args: data (pd.DataFrame): DataFrame. Returns: pd.DataFrame: DataFrame with asset_id column. ''' mask = data.file_error.isnull() data['asset_id'] = np.nan data.asset_id = data.asset_id.astype(np.object_) if len(data[mask]) > 0: data.loc[mask, 'asset_id'] = data.loc[mask].apply( lambda x: x.specification_class().get_asset_id(x.filepath), axis=1, ).astype(str) return data
[docs] def get_data_for_write( data, # type: pd.DataFrame source_dir, # type: Union[str, Path] target_dir, # type: Union[str, Path] ): # type: (...) -> Optional[Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]] # noqa: E501 ''' Split given data into three DataFrame creating files. Args: data: DataFrame: DataFrame to be transformed. source_dir (str or Path): Source directory of asset files. target_dir (str or Path): Target directory where data will be written. DataFrames: * File data - For writing asset file data to a target filepath. * Asset metadata - For writing asset metadata to a target json file. * File metadata - For writing file metadata to a target json file. * Asset chunk - For writing asset metadata chunk to a target json file. * File chunk - For writing file metadata chunk to a target json file. Returns: tuple[DataFrame]: file_data, asset_metadata, file_metadata, asset_chunk, file_chunk. ''' # TODO: flatten file_traits and flatten asset_traits # get valid asset data data = data.copy() data = data[data.asset_valid] # return if there is no valid asset data if len(data) == 0: return None source_dir = Path(source_dir).absolute().as_posix() data_dir = Path(target_dir, 'content').absolute().as_posix() meta_dir = Path(target_dir, 'metadata').absolute().as_posix() # add asset id keys = data.asset_path.unique().tolist() vals = [str(uuid.uuid4()) for x in keys] lut = dict(zip(keys, vals)) # type: Any lut = defaultdict(lambda: np.nan, lut) data['asset_id'] = data.asset_path.apply(lambda x: lut[x]) # create file id and metadata data['file_id'] = data.asset_name.apply(lambda x: str(uuid.uuid4())) data['metadata'] = data.apply( lambda x: dict( asset_id=x.asset_id, asset_path=Path(data_dir, x.asset_path_relative).as_posix(), asset_path_relative=x.asset_path_relative, asset_name=x.asset_name, asset_type=x.asset_type, file_id=x.file_id, file_traits=x.file_traits, filename=x.filename, filepath=Path(data_dir, x.filepath_relative).as_posix(), filepath_relative=x.filepath_relative, ), axis=1 ) # create asset metadata asset_meta = data\ .groupby('asset_id', as_index=False) \ .agg(lambda x: x.tolist()) meta = [] lut = dict( asset_id='asset_id', asset_path='asset_path', asset_path_relative='asset_path_relative', asset_name='asset_name', asset_traits='asset_traits', asset_type='asset_type', file_id='file_ids', file_traits='file_traits', filename='filenames', filepath='filepaths', filepath_relative='filepaths_relative', ) keys = asset_meta.columns.tolist() for _, row in asset_meta.iterrows(): vals = row.tolist() item = dict(zip(keys, vals)) item = {lut[k]: item[k] for k in lut.keys()} # grab the first occurence of these columns cols = [ 'asset_name', 'asset_path', 'asset_path_relative', 'asset_type', 'asset_traits' ] for col in cols: item[col] = item[col][0] del item['file_traits'] # replace asset root item['asset_path'] = Path(data_dir, item['asset_path_relative']) \ .as_posix() meta.append(item) asset_meta['metadata'] = meta asset_meta['target'] = asset_meta.asset_id\ .apply(lambda x: Path(meta_dir, 'asset', x + '.json').as_posix()) asset_meta = asset_meta[['metadata', 'target']] # create file data file_data = data.copy() file_data['source'] = file_data.filepath file_data['target'] = file_data.source\ .apply(lambda x: re.sub(source_dir, data_dir, x)) file_data = file_data[['source', 'target']] # create file metadata file_meta = data.copy() file_meta['target'] = file_meta.file_id\ .apply(lambda x: Path(meta_dir, 'file', x + '.json').as_posix()) file_meta = file_meta[['metadata', 'target']] # get time now = hbt.time_string() # create asset chunk asset_chunk = pd.DataFrame() asset_chunk['metadata'] = [asset_meta.metadata.tolist()] asset_chunk['target'] = [Path( meta_dir, 'asset-chunk', f'hidebound-asset-chunk_{now}.json' ).as_posix()] # create file chunk file_chunk = pd.DataFrame() file_chunk['metadata'] = [file_meta.metadata.tolist()] file_chunk['target'] = [Path( meta_dir, 'file-chunk', f'hidebound-file-chunk_{now}.json' ).as_posix()] return file_data, asset_meta, file_meta, asset_chunk, file_chunk