Coverage for /home/ubuntu/hidebound/python/hidebound/core/database.py: 100%
261 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-05 23:50 +0000
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-05 23:50 +0000
1from typing import Any, Dict, List, Union # noqa F401
3from copy import deepcopy
4from importlib import import_module
5from pathlib import Path
6import json
7import os
8import shutil
9import sys
11from pandas import DataFrame
12from requests.exceptions import ConnectionError
13from requests.models import Response
14import dask.dataframe as dd
15import pyjson5 as jsonc
16import numpy as np
17import pandasql
18import requests
19import yaml
21from hidebound.core.config import Config
22from hidebound.core.connection import DaskConnection, DaskConnectionConfig
23from hidebound.core.logging import ProgressLogger
24from hidebound.core.specification_base import SpecificationBase
25from hidebound.exporters.disk_exporter import DiskExporter
26from hidebound.exporters.girder_exporter import GirderExporter
27from hidebound.exporters.s3_exporter import S3Exporter
28import hidebound.core.database_tools as db_tools
29import hidebound.core.tools as hbt
30# ------------------------------------------------------------------------------
33class Database:
34 '''
35 Generates a DataFrame using the files within a given directory as rows.
36 '''
37 @staticmethod
38 def from_config(config):
39 # type: (Dict[str, Any]) -> "Database"
40 '''
41 Constructs a Database instance given a valid config.
43 Args:
44 config (dict): Dictionary that meets Config class standards.
46 Raises:
47 DataError: If config is invalid.
49 Returns:
50 Database: Database instance.
51 '''
52 # validate config and populate with default values
53 config = deepcopy(config)
54 testing = config.get('testing', False)
55 config.pop('testing', None)
56 config = Config(config)
57 config.validate()
58 config = config.to_primitive()
60 specs = []
61 for path in config['specification_files']:
62 sys.path.append(path)
64 filepath = Path(path)
65 filename = filepath.name
66 filename, _ = os.path.splitext(filename)
67 module = import_module(filename, filepath) # type: ignore
69 specs.extend(module.SPECIFICATIONS)
71 specs = list(set(specs))
72 config['specifications'] = specs
74 return Database(
75 config['ingress_directory'],
76 config['staging_directory'],
77 specifications=specs,
78 include_regex=config['include_regex'],
79 exclude_regex=config['exclude_regex'],
80 write_mode=config['write_mode'],
81 exporters=config['exporters'],
82 webhooks=config['webhooks'],
83 dask=config['dask'],
84 testing=testing,
85 )
87 @staticmethod
88 def from_json(filepath):
89 # type: (Union[str, Path]) -> "Database"
90 '''
91 Constructs a Database instance from a given json file.
93 Args:
94 filepath (str or Path): Filepath of json config file.
96 Returns:
97 Database: Database instance.
98 '''
99 with open(filepath) as f:
100 config = jsonc.load(f)
101 return Database.from_config(config)
103 @staticmethod
104 def from_yaml(filepath):
105 # type: (Union[str, Path]) -> "Database"
106 '''
107 Constructs a Database instance from a given yaml file.
109 Args:
110 filepath (str or Path): Filepath of yaml config file.
112 Returns:
113 Database: Database instance.
114 '''
115 with open(filepath) as f:
116 config = yaml.safe_load(f)
117 return Database.from_config(config)
119 def __init__(
120 self,
121 ingress_dir, # type: Union[str, Path]
122 staging_dir, # type: Union[str, Path]
123 specifications=[], # type: List[SpecificationBase]
124 include_regex='', # type: str
125 exclude_regex=r'\.DS_Store', # type: str
126 write_mode='copy', # type: str
127 exporters=[], # type: List[Dict[str, Any]]
128 webhooks=[], # type: List[Dict[str, Any]]
129 dask={}, # type: Dict[str, Any]
130 testing=False, # type: bool
131 ):
132 # type: (...) -> None
133 r'''
134 Creates an instance of Database but does not populate it with data.
136 Args:
137 ingress_dir (str or Path): Root directory to recurse.
138 staging_dir (str or Path): Directory where hidebound data will be
139 staged.
140 specifications (list[SpecificationBase], optional): List of asset
141 specifications. Default: [].
142 include_regex (str, optional): Include filenames that match this
143 regex. Default: None.
144 exclude_regex (str, optional): Exclude filenames that match this
145 regex. Default: '\.DS_Store'.
146 write_mode (str, optional): How assets will be extracted to
147 hidebound/content directory. Default: copy.
148 exporters (list[dict], optional): List of exporter configs.
149 Default: [].
150 webhooks (list[dict], optional): List of webhooks to call.
151 Default: [].
152 Default: False.
153 dask (dict, optional): Dask configuration. Default: {}.
154 testing: (bool, optional): Used for testing. Default: False.
156 Raises:
157 TypeError: If specifications contains a non-SpecificationBase
158 object.
159 ValueError: If write_mode not is not "copy" or "move".
160 FileNotFoundError: If root is not a directory or does not exist.
161 FileNotFoundError: If staging_dir is not directory or does not
162 exist.
163 NameError: If staging_dir is not named "hidebound".
165 Returns:
166 Database: Database instance.
167 '''
168 # validate hidebound dir
169 staging = Path(staging_dir)
170 if not staging.is_dir():
171 msg = f'{staging} is not a directory or does not exist.'
172 raise FileNotFoundError(msg)
174 if Path(staging).name != 'hidebound':
175 msg = f'{staging} directory is not named hidebound.'
176 raise NameError(msg)
178 # setup logger
179 self._logger = ProgressLogger(__name__)
181 # validate spec classes
182 bad_specs = list(filter(
183 lambda x: not issubclass(x, SpecificationBase), specifications
184 ))
185 if len(bad_specs) > 0:
186 msg = 'SpecificationBase may only contain subclasses of '
187 msg += f'SpecificationBase. Found: {bad_specs}.'
188 self._logger.error(msg)
189 raise TypeError(msg)
191 # validate root dir
192 root = Path(ingress_dir)
193 if not root.is_dir():
194 msg = f'{root} is not a directory or does not exist.'
195 self._logger.error(msg)
196 raise FileNotFoundError(msg)
198 # validate write mode
199 modes = ['copy', 'move']
200 if write_mode not in modes:
201 msg = f'Invalid write mode: {write_mode} not in {modes}.'
202 self._logger.error(msg)
203 raise ValueError(msg)
205 self._root = root
206 self._staging = staging
207 self._include_regex = include_regex
208 self._exclude_regex = exclude_regex
209 self._write_mode = write_mode
210 self._specifications = {x.__name__.lower(): x for x in specifications} \
211 # type: Dict[str, SpecificationBase]
213 # webhooks
214 self._webhooks = webhooks
216 # dask
217 config = DaskConnectionConfig(dask)
218 config.validate()
219 self._dask_config = config.to_primitive()
221 # setup exporters
222 for exp in exporters:
223 exp['dask'] = exp.get('dask', self._dask_config)
224 self._exporters = exporters
226 self._testing = testing
227 self.data = None
229 # needed for testing
230 self.__exporter_lut = None
232 self._logger.info('Database initialized', step=1, total=1)
234 def create(self):
235 # type: () -> "Database"
236 '''
237 Extract valid assets as data and metadata within the hidebound
238 directory.
240 Writes:
242 * file content to hb_parent/hidebound/content - under same directory
243 structure
244 * asset metadata as json to hb_parent/hidebound/metadata/asset
245 * file metadata as json to hb_parent/hidebound/metadata/file
246 * asset metadata as single json to hb_parent/hidebound/metadata/asset-chunk
247 * file metadata as single json to hb_parent/hidebound/metadata/file-chunk
249 Raises:
250 RunTimeError: If data has not been initialized.
252 Returns:
253 Database: self.
254 '''
255 total = 7
256 if self.data is None:
257 msg = 'Data not initialized. Please call update.'
258 raise RuntimeError(msg)
260 temp = db_tools.get_data_for_write(self.data, self._root, self._staging)
261 self._logger.info('create: get data', step=1, total=total)
262 if temp is None:
263 return self
265 # convert to dask dataframes
266 file_data, asset_meta, file_meta, asset_chunk, file_chunk = temp
268 with DaskConnection(self._dask_config) as conn:
269 nparts = conn.num_partitions
270 file_data = dd.from_pandas(file_data, npartitions=nparts)
271 asset_meta = dd.from_pandas(asset_meta, npartitions=nparts)
272 file_meta = dd.from_pandas(file_meta, npartitions=nparts)
273 asset_chunk = dd.from_pandas(asset_chunk, npartitions=nparts)
274 file_chunk = dd.from_pandas(file_chunk, npartitions=nparts)
275 temp = [file_data, asset_meta, file_meta, asset_chunk, file_chunk]
277 # make directories
278 for item in temp:
279 item.target.apply(
280 lambda x: os.makedirs(Path(x).parent, exist_ok=True),
281 meta=('target', 'object')
282 ).compute()
283 self._logger.info('create: make directories', step=2, total=total)
285 # write file data
286 if self._write_mode == 'move':
287 file_data.apply(
288 lambda x: shutil.move(x.source, x.target),
289 axis=1, meta=(None, 'object')
290 ).compute()
291 hbt.delete_empty_directories(self._root)
292 else:
293 file_data.apply(
294 lambda x: shutil.copy2(x.source, x.target),
295 axis=1, meta=(None, 'object')
296 ).compute()
297 self._logger.info('create: write file data', step=3, total=total)
299 # write asset metadata
300 asset_meta.apply(
301 lambda x: hbt.write_json(x.metadata, x.target),
302 axis=1, meta=(None, 'object')
303 ).compute()
304 self._logger.info('create: write asset metadata', step=4, total=total)
306 # write file metadata
307 file_meta.apply(
308 lambda x: hbt.write_json(x.metadata, x.target),
309 axis=1, meta=(None, 'object')
310 ).compute()
311 self._logger.info('create: write file metadata', step=5, total=total)
313 # write asset chunk
314 asset_chunk.apply(
315 lambda x: hbt.write_json(x.metadata, x.target),
316 axis=1, meta=(None, 'object')
317 ).compute()
318 self._logger.info('create: write asset chunk', step=6, total=total)
320 # write file chunk
321 file_chunk.apply(
322 lambda x: hbt.write_json(x.metadata, x.target),
323 axis=1, meta=(None, 'object')
324 ).compute()
325 self._logger.info('create: write file chunk', step=7, total=total)
327 self._logger.info('create: complete', step=7, total=total)
328 return self
330 def read(self, group_by_asset=False):
331 # type: (bool) -> "DataFrame"
332 '''
333 Return a DataFrame which can be easily be queried and has only cells
334 with scalar values.
336 Args:
337 group_by_asset (bool, optional): Whether to group the data by asset.
338 Default: False.
340 Raises:
341 RunTimeError: If data has not been initialized.
343 Returns:
344 DataFrame: Formatted data.
345 '''
346 total = 5
347 if self.data is None:
348 msg = 'Data not initialized. Please call update.'
349 raise RuntimeError(msg)
351 def coordinate_to_dict(item):
352 if 'coordinate' in item.keys():
353 keys = ['coordinate_x', 'coordinate_y', 'coordinate_z']
354 coords = dict(zip(keys, item['coordinate']))
355 del item['coordinate']
356 item.update(coords)
357 return item
359 data = self.data.copy()
360 self._logger.info('read: copy data', step=1, total=total)
362 col = 'file_traits'
363 if group_by_asset:
364 col = 'asset_traits'
365 data = data.groupby('asset_path', as_index=False).first()
366 self._logger.info('read: group by asset', step=2, total=total)
368 data[col] = data[col].apply(coordinate_to_dict)
369 traits = DataFrame(data[col].tolist())
371 for col in traits.columns:
372 if col not in data.columns:
373 data[col] = np.nan
374 data[col] = data[col].astype(np.object_)
376 mask = traits[col].notnull().tolist()
377 data.loc[mask, col] = traits.loc[mask, col]
378 self._logger.info('read: filter traits', step=3, total=total)
380 # find columns by legal type
381 cols = self.data.columns.tolist()
382 if len(self.data) > 0:
383 cols = data \
384 .map(type) \
385 .apply(lambda x: x.unique().tolist())
386 legal_cols = set([int, float, str, bool, None])
387 cols = cols.apply(lambda x: set(x).difference(legal_cols) == set())
388 cols = cols[cols].index.tolist()
389 self._logger.info('read: filter legal types', step=4, total=total)
391 # nicely order columns
392 head_cols = [
393 'project',
394 'specification',
395 'descriptor',
396 'version',
397 'coordinate_x',
398 'coordinate_y',
399 'coordinate_z',
400 'frame',
401 'extension',
402 'filename',
403 'filepath',
404 'file_error',
405 'asset_name',
406 'asset_path',
407 'asset_type',
408 'asset_error',
409 'asset_valid',
410 ]
411 head_cols = list(filter(lambda x: x in cols, head_cols))
412 tail_cols = sorted(list(set(cols).difference(head_cols)))
413 cols = head_cols + tail_cols
414 data = data[cols]
415 self._logger.info('read: order columns', step=5, total=total)
417 self._logger.info('read: complete', step=5, total=total)
418 return data
420 def update(self):
421 # type: () -> "Database"
422 '''
423 Recurse root directory, populate self.data with its files, locate and
424 validate assets.
426 Returns:
427 Database: self.
428 '''
429 total = 3
430 self._logger.info('update', step=0, total=total)
432 exclude_re = '|'.join([self._exclude_regex, 'hidebound/logs'])
433 data = hbt.directory_to_dataframe(
434 self._root,
435 include_regex=self._include_regex,
436 exclude_regex=exclude_re
437 )
438 self._logger.info(f'update: parsed {self._root}', step=1, total=total)
440 # TODO: find a means of performing async execution with progress updates
441 if len(data) > 0:
442 with DaskConnection(self._dask_config) as conn:
443 data = dd.from_pandas(data, npartitions=conn.num_partitions)
444 data = db_tools.add_specification(data, self._specifications)
445 data = db_tools.validate_filepath(data)
446 data = db_tools.add_file_traits(data)
447 data = db_tools.add_relative_path(data, 'filepath', self._root)
448 data = db_tools.add_asset_name(data)
449 data = db_tools.add_asset_path(data)
450 data = db_tools.add_relative_path(data, 'asset_path', self._root)
451 data = db_tools.add_asset_type(data)
452 data = db_tools.add_asset_traits(data)
453 data = db_tools.validate_assets(data)
454 data = data.compute()
455 self._logger.info('update: generate', step=2, total=total)
457 data = db_tools.cleanup(data)
458 self._logger.info('update: cleanup', step=3, total=total)
459 self.data = data
461 self._logger.info('update: complete', step=3, total=total)
462 return self
464 def delete(self):
465 # type: () -> "Database"
466 '''
467 Deletes hidebound/content and hidebound/metadata directories and all their
468 contents.
470 Returns:
471 Database: self.
472 '''
473 total = 2
474 data_dir = Path(self._staging, 'content')
475 if data_dir.exists():
476 shutil.rmtree(data_dir)
477 self._logger.info('delete: data directory', step=1, total=total)
479 meta_dir = Path(self._staging, 'metadata')
480 if meta_dir.exists():
481 shutil.rmtree(meta_dir)
482 self._logger.info('delete: metadata directory', step=2, total=total)
484 self._logger.info('delete: complete', step=2, total=total)
485 return self
487 def call_webhooks(self):
488 # type () -> requests.Response
489 '''
490 Calls webhooks defined in config.
492 Yields:
493 requests.Response: Webhook response.
494 '''
495 total = len(self._webhooks)
496 for i, hook in enumerate(self._webhooks):
497 url = hook['url']
498 headers = hook.get('headers', None)
499 method = hook['method']
501 kwargs = {}
502 if 'data' in hook and method in ['post', 'put', 'patch']:
503 kwargs['data'] = json.dumps(hook['data']).encode()
505 if 'json' in hook and method == 'post':
506 kwargs['json'] = hook['json']
508 if 'params' in hook and method == 'get':
509 kwargs['params'] = hook['params']
511 if 'timeout' in hook:
512 kwargs['timeout'] = hook['timeout']
514 # test response
515 response = Response()
516 response.status_code = 200
517 response._content = b'Webhook called.'
519 if not self._testing:
520 method = getattr(requests, method)
521 try:
522 response = method(url, headers=headers, **kwargs)
523 except ConnectionError as e: # pragma: no cover
524 response = Response() # pragma: no cover
525 response.status_code = 403 # pragma: no cover
526 response._content = str(e).encode('utf-8') # pragma: no cover
528 self._logger.info(
529 f'call_webhooks: {url} {response.text}',
530 step=i + 1,
531 total=total
532 )
533 yield response
535 def export(self):
536 # type: () -> "Database"
537 '''
538 Exports all the files found in in hidebound root directory.
539 Calls webhooks afterwards.
541 Returns:
542 Database: Self.
543 '''
544 # TODO: Find a nicer pattern for injecting exporters.
545 lut = dict(
546 girder=GirderExporter,
547 disk=DiskExporter,
548 s3=S3Exporter,
549 ) # type: Dict[str, Any]
551 # reassign lut for testing
552 if self.__exporter_lut is not None:
553 lut = self.__exporter_lut
555 total = len(self._exporters)
556 i = 0
557 for i, config in enumerate(self._exporters):
558 key = config['name']
559 exporter = lut[key].from_config(config)
560 exporter.export(self._staging, logger=self._logger)
561 self._logger.info(f'export: {key}', step=i + 1, total=total)
563 # assign instance to exporter_lut for testing
564 if self.__exporter_lut is not None:
565 self.__exporter_lut[key] = exporter
567 list(self.call_webhooks())
568 self._logger.info('export: complete', step=i + 1, total=total)
569 return self
571 def search(self, query, group_by_asset=False):
572 # type: (str, bool) -> "DataFrame"
573 '''
574 Search data according to given SQL query.
576 Args:
577 query (str): SQL query. Make sure to use "FROM data" in query.
578 group_by_asset (bool, optional): Whether to group the data by asset.
579 Default: False.
581 Returns:
582 DataFrame: Formatted data.
583 '''
584 env = {'data': self.read(group_by_asset=group_by_asset)}
585 result = pandasql.PandaSQL()(query, env)
586 self._logger.info(f'search: {query}', step=1, total=1)
587 self._logger.info('search: complete', step=1, total=1)
588 return result