Coverage for /home/ubuntu/hidebound/python/hidebound/core/database.py: 100%

261 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-05 23:50 +0000

1from typing import Any, Dict, List, Union # noqa F401 

2 

3from copy import deepcopy 

4from importlib import import_module 

5from pathlib import Path 

6import json 

7import os 

8import shutil 

9import sys 

10 

11from pandas import DataFrame 

12from requests.exceptions import ConnectionError 

13from requests.models import Response 

14import dask.dataframe as dd 

15import pyjson5 as jsonc 

16import numpy as np 

17import pandasql 

18import requests 

19import yaml 

20 

21from hidebound.core.config import Config 

22from hidebound.core.connection import DaskConnection, DaskConnectionConfig 

23from hidebound.core.logging import ProgressLogger 

24from hidebound.core.specification_base import SpecificationBase 

25from hidebound.exporters.disk_exporter import DiskExporter 

26from hidebound.exporters.girder_exporter import GirderExporter 

27from hidebound.exporters.s3_exporter import S3Exporter 

28import hidebound.core.database_tools as db_tools 

29import hidebound.core.tools as hbt 

30# ------------------------------------------------------------------------------ 

31 

32 

33class Database: 

34 ''' 

35 Generates a DataFrame using the files within a given directory as rows. 

36 ''' 

37 @staticmethod 

38 def from_config(config): 

39 # type: (Dict[str, Any]) -> "Database" 

40 ''' 

41 Constructs a Database instance given a valid config. 

42 

43 Args: 

44 config (dict): Dictionary that meets Config class standards. 

45 

46 Raises: 

47 DataError: If config is invalid. 

48 

49 Returns: 

50 Database: Database instance. 

51 ''' 

52 # validate config and populate with default values 

53 config = deepcopy(config) 

54 testing = config.get('testing', False) 

55 config.pop('testing', None) 

56 config = Config(config) 

57 config.validate() 

58 config = config.to_primitive() 

59 

60 specs = [] 

61 for path in config['specification_files']: 

62 sys.path.append(path) 

63 

64 filepath = Path(path) 

65 filename = filepath.name 

66 filename, _ = os.path.splitext(filename) 

67 module = import_module(filename, filepath) # type: ignore 

68 

69 specs.extend(module.SPECIFICATIONS) 

70 

71 specs = list(set(specs)) 

72 config['specifications'] = specs 

73 

74 return Database( 

75 config['ingress_directory'], 

76 config['staging_directory'], 

77 specifications=specs, 

78 include_regex=config['include_regex'], 

79 exclude_regex=config['exclude_regex'], 

80 write_mode=config['write_mode'], 

81 exporters=config['exporters'], 

82 webhooks=config['webhooks'], 

83 dask=config['dask'], 

84 testing=testing, 

85 ) 

86 

87 @staticmethod 

88 def from_json(filepath): 

89 # type: (Union[str, Path]) -> "Database" 

90 ''' 

91 Constructs a Database instance from a given json file. 

92 

93 Args: 

94 filepath (str or Path): Filepath of json config file. 

95 

96 Returns: 

97 Database: Database instance. 

98 ''' 

99 with open(filepath) as f: 

100 config = jsonc.load(f) 

101 return Database.from_config(config) 

102 

103 @staticmethod 

104 def from_yaml(filepath): 

105 # type: (Union[str, Path]) -> "Database" 

106 ''' 

107 Constructs a Database instance from a given yaml file. 

108 

109 Args: 

110 filepath (str or Path): Filepath of yaml config file. 

111 

112 Returns: 

113 Database: Database instance. 

114 ''' 

115 with open(filepath) as f: 

116 config = yaml.safe_load(f) 

117 return Database.from_config(config) 

118 

119 def __init__( 

120 self, 

121 ingress_dir, # type: Union[str, Path] 

122 staging_dir, # type: Union[str, Path] 

123 specifications=[], # type: List[SpecificationBase] 

124 include_regex='', # type: str 

125 exclude_regex=r'\.DS_Store', # type: str 

126 write_mode='copy', # type: str 

127 exporters=[], # type: List[Dict[str, Any]] 

128 webhooks=[], # type: List[Dict[str, Any]] 

129 dask={}, # type: Dict[str, Any] 

130 testing=False, # type: bool 

131 ): 

132 # type: (...) -> None 

133 r''' 

134 Creates an instance of Database but does not populate it with data. 

135 

136 Args: 

137 ingress_dir (str or Path): Root directory to recurse. 

138 staging_dir (str or Path): Directory where hidebound data will be 

139 staged. 

140 specifications (list[SpecificationBase], optional): List of asset 

141 specifications. Default: []. 

142 include_regex (str, optional): Include filenames that match this 

143 regex. Default: None. 

144 exclude_regex (str, optional): Exclude filenames that match this 

145 regex. Default: '\.DS_Store'. 

146 write_mode (str, optional): How assets will be extracted to 

147 hidebound/content directory. Default: copy. 

148 exporters (list[dict], optional): List of exporter configs. 

149 Default: []. 

150 webhooks (list[dict], optional): List of webhooks to call. 

151 Default: []. 

152 Default: False. 

153 dask (dict, optional): Dask configuration. Default: {}. 

154 testing: (bool, optional): Used for testing. Default: False. 

155 

156 Raises: 

157 TypeError: If specifications contains a non-SpecificationBase 

158 object. 

159 ValueError: If write_mode not is not "copy" or "move". 

160 FileNotFoundError: If root is not a directory or does not exist. 

161 FileNotFoundError: If staging_dir is not directory or does not 

162 exist. 

163 NameError: If staging_dir is not named "hidebound". 

164 

165 Returns: 

166 Database: Database instance. 

167 ''' 

168 # validate hidebound dir 

169 staging = Path(staging_dir) 

170 if not staging.is_dir(): 

171 msg = f'{staging} is not a directory or does not exist.' 

172 raise FileNotFoundError(msg) 

173 

174 if Path(staging).name != 'hidebound': 

175 msg = f'{staging} directory is not named hidebound.' 

176 raise NameError(msg) 

177 

178 # setup logger 

179 self._logger = ProgressLogger(__name__) 

180 

181 # validate spec classes 

182 bad_specs = list(filter( 

183 lambda x: not issubclass(x, SpecificationBase), specifications 

184 )) 

185 if len(bad_specs) > 0: 

186 msg = 'SpecificationBase may only contain subclasses of ' 

187 msg += f'SpecificationBase. Found: {bad_specs}.' 

188 self._logger.error(msg) 

189 raise TypeError(msg) 

190 

191 # validate root dir 

192 root = Path(ingress_dir) 

193 if not root.is_dir(): 

194 msg = f'{root} is not a directory or does not exist.' 

195 self._logger.error(msg) 

196 raise FileNotFoundError(msg) 

197 

198 # validate write mode 

199 modes = ['copy', 'move'] 

200 if write_mode not in modes: 

201 msg = f'Invalid write mode: {write_mode} not in {modes}.' 

202 self._logger.error(msg) 

203 raise ValueError(msg) 

204 

205 self._root = root 

206 self._staging = staging 

207 self._include_regex = include_regex 

208 self._exclude_regex = exclude_regex 

209 self._write_mode = write_mode 

210 self._specifications = {x.__name__.lower(): x for x in specifications} \ 

211 # type: Dict[str, SpecificationBase] 

212 

213 # webhooks 

214 self._webhooks = webhooks 

215 

216 # dask 

217 config = DaskConnectionConfig(dask) 

218 config.validate() 

219 self._dask_config = config.to_primitive() 

220 

221 # setup exporters 

222 for exp in exporters: 

223 exp['dask'] = exp.get('dask', self._dask_config) 

224 self._exporters = exporters 

225 

226 self._testing = testing 

227 self.data = None 

228 

229 # needed for testing 

230 self.__exporter_lut = None 

231 

232 self._logger.info('Database initialized', step=1, total=1) 

233 

234 def create(self): 

235 # type: () -> "Database" 

236 ''' 

237 Extract valid assets as data and metadata within the hidebound 

238 directory. 

239 

240 Writes: 

241 

242 * file content to hb_parent/hidebound/content - under same directory 

243 structure 

244 * asset metadata as json to hb_parent/hidebound/metadata/asset 

245 * file metadata as json to hb_parent/hidebound/metadata/file 

246 * asset metadata as single json to hb_parent/hidebound/metadata/asset-chunk 

247 * file metadata as single json to hb_parent/hidebound/metadata/file-chunk 

248 

249 Raises: 

250 RunTimeError: If data has not been initialized. 

251 

252 Returns: 

253 Database: self. 

254 ''' 

255 total = 7 

256 if self.data is None: 

257 msg = 'Data not initialized. Please call update.' 

258 raise RuntimeError(msg) 

259 

260 temp = db_tools.get_data_for_write(self.data, self._root, self._staging) 

261 self._logger.info('create: get data', step=1, total=total) 

262 if temp is None: 

263 return self 

264 

265 # convert to dask dataframes 

266 file_data, asset_meta, file_meta, asset_chunk, file_chunk = temp 

267 

268 with DaskConnection(self._dask_config) as conn: 

269 nparts = conn.num_partitions 

270 file_data = dd.from_pandas(file_data, npartitions=nparts) 

271 asset_meta = dd.from_pandas(asset_meta, npartitions=nparts) 

272 file_meta = dd.from_pandas(file_meta, npartitions=nparts) 

273 asset_chunk = dd.from_pandas(asset_chunk, npartitions=nparts) 

274 file_chunk = dd.from_pandas(file_chunk, npartitions=nparts) 

275 temp = [file_data, asset_meta, file_meta, asset_chunk, file_chunk] 

276 

277 # make directories 

278 for item in temp: 

279 item.target.apply( 

280 lambda x: os.makedirs(Path(x).parent, exist_ok=True), 

281 meta=('target', 'object') 

282 ).compute() 

283 self._logger.info('create: make directories', step=2, total=total) 

284 

285 # write file data 

286 if self._write_mode == 'move': 

287 file_data.apply( 

288 lambda x: shutil.move(x.source, x.target), 

289 axis=1, meta=(None, 'object') 

290 ).compute() 

291 hbt.delete_empty_directories(self._root) 

292 else: 

293 file_data.apply( 

294 lambda x: shutil.copy2(x.source, x.target), 

295 axis=1, meta=(None, 'object') 

296 ).compute() 

297 self._logger.info('create: write file data', step=3, total=total) 

298 

299 # write asset metadata 

300 asset_meta.apply( 

301 lambda x: hbt.write_json(x.metadata, x.target), 

302 axis=1, meta=(None, 'object') 

303 ).compute() 

304 self._logger.info('create: write asset metadata', step=4, total=total) 

305 

306 # write file metadata 

307 file_meta.apply( 

308 lambda x: hbt.write_json(x.metadata, x.target), 

309 axis=1, meta=(None, 'object') 

310 ).compute() 

311 self._logger.info('create: write file metadata', step=5, total=total) 

312 

313 # write asset chunk 

314 asset_chunk.apply( 

315 lambda x: hbt.write_json(x.metadata, x.target), 

316 axis=1, meta=(None, 'object') 

317 ).compute() 

318 self._logger.info('create: write asset chunk', step=6, total=total) 

319 

320 # write file chunk 

321 file_chunk.apply( 

322 lambda x: hbt.write_json(x.metadata, x.target), 

323 axis=1, meta=(None, 'object') 

324 ).compute() 

325 self._logger.info('create: write file chunk', step=7, total=total) 

326 

327 self._logger.info('create: complete', step=7, total=total) 

328 return self 

329 

330 def read(self, group_by_asset=False): 

331 # type: (bool) -> "DataFrame" 

332 ''' 

333 Return a DataFrame which can be easily be queried and has only cells 

334 with scalar values. 

335 

336 Args: 

337 group_by_asset (bool, optional): Whether to group the data by asset. 

338 Default: False. 

339 

340 Raises: 

341 RunTimeError: If data has not been initialized. 

342 

343 Returns: 

344 DataFrame: Formatted data. 

345 ''' 

346 total = 5 

347 if self.data is None: 

348 msg = 'Data not initialized. Please call update.' 

349 raise RuntimeError(msg) 

350 

351 def coordinate_to_dict(item): 

352 if 'coordinate' in item.keys(): 

353 keys = ['coordinate_x', 'coordinate_y', 'coordinate_z'] 

354 coords = dict(zip(keys, item['coordinate'])) 

355 del item['coordinate'] 

356 item.update(coords) 

357 return item 

358 

359 data = self.data.copy() 

360 self._logger.info('read: copy data', step=1, total=total) 

361 

362 col = 'file_traits' 

363 if group_by_asset: 

364 col = 'asset_traits' 

365 data = data.groupby('asset_path', as_index=False).first() 

366 self._logger.info('read: group by asset', step=2, total=total) 

367 

368 data[col] = data[col].apply(coordinate_to_dict) 

369 traits = DataFrame(data[col].tolist()) 

370 

371 for col in traits.columns: 

372 if col not in data.columns: 

373 data[col] = np.nan 

374 data[col] = data[col].astype(np.object_) 

375 

376 mask = traits[col].notnull().tolist() 

377 data.loc[mask, col] = traits.loc[mask, col] 

378 self._logger.info('read: filter traits', step=3, total=total) 

379 

380 # find columns by legal type 

381 cols = self.data.columns.tolist() 

382 if len(self.data) > 0: 

383 cols = data \ 

384 .map(type) \ 

385 .apply(lambda x: x.unique().tolist()) 

386 legal_cols = set([int, float, str, bool, None]) 

387 cols = cols.apply(lambda x: set(x).difference(legal_cols) == set()) 

388 cols = cols[cols].index.tolist() 

389 self._logger.info('read: filter legal types', step=4, total=total) 

390 

391 # nicely order columns 

392 head_cols = [ 

393 'project', 

394 'specification', 

395 'descriptor', 

396 'version', 

397 'coordinate_x', 

398 'coordinate_y', 

399 'coordinate_z', 

400 'frame', 

401 'extension', 

402 'filename', 

403 'filepath', 

404 'file_error', 

405 'asset_name', 

406 'asset_path', 

407 'asset_type', 

408 'asset_error', 

409 'asset_valid', 

410 ] 

411 head_cols = list(filter(lambda x: x in cols, head_cols)) 

412 tail_cols = sorted(list(set(cols).difference(head_cols))) 

413 cols = head_cols + tail_cols 

414 data = data[cols] 

415 self._logger.info('read: order columns', step=5, total=total) 

416 

417 self._logger.info('read: complete', step=5, total=total) 

418 return data 

419 

420 def update(self): 

421 # type: () -> "Database" 

422 ''' 

423 Recurse root directory, populate self.data with its files, locate and 

424 validate assets. 

425 

426 Returns: 

427 Database: self. 

428 ''' 

429 total = 3 

430 self._logger.info('update', step=0, total=total) 

431 

432 exclude_re = '|'.join([self._exclude_regex, 'hidebound/logs']) 

433 data = hbt.directory_to_dataframe( 

434 self._root, 

435 include_regex=self._include_regex, 

436 exclude_regex=exclude_re 

437 ) 

438 self._logger.info(f'update: parsed {self._root}', step=1, total=total) 

439 

440 # TODO: find a means of performing async execution with progress updates 

441 if len(data) > 0: 

442 with DaskConnection(self._dask_config) as conn: 

443 data = dd.from_pandas(data, npartitions=conn.num_partitions) 

444 data = db_tools.add_specification(data, self._specifications) 

445 data = db_tools.validate_filepath(data) 

446 data = db_tools.add_file_traits(data) 

447 data = db_tools.add_relative_path(data, 'filepath', self._root) 

448 data = db_tools.add_asset_name(data) 

449 data = db_tools.add_asset_path(data) 

450 data = db_tools.add_relative_path(data, 'asset_path', self._root) 

451 data = db_tools.add_asset_type(data) 

452 data = db_tools.add_asset_traits(data) 

453 data = db_tools.validate_assets(data) 

454 data = data.compute() 

455 self._logger.info('update: generate', step=2, total=total) 

456 

457 data = db_tools.cleanup(data) 

458 self._logger.info('update: cleanup', step=3, total=total) 

459 self.data = data 

460 

461 self._logger.info('update: complete', step=3, total=total) 

462 return self 

463 

464 def delete(self): 

465 # type: () -> "Database" 

466 ''' 

467 Deletes hidebound/content and hidebound/metadata directories and all their 

468 contents. 

469 

470 Returns: 

471 Database: self. 

472 ''' 

473 total = 2 

474 data_dir = Path(self._staging, 'content') 

475 if data_dir.exists(): 

476 shutil.rmtree(data_dir) 

477 self._logger.info('delete: data directory', step=1, total=total) 

478 

479 meta_dir = Path(self._staging, 'metadata') 

480 if meta_dir.exists(): 

481 shutil.rmtree(meta_dir) 

482 self._logger.info('delete: metadata directory', step=2, total=total) 

483 

484 self._logger.info('delete: complete', step=2, total=total) 

485 return self 

486 

487 def call_webhooks(self): 

488 # type () -> requests.Response 

489 ''' 

490 Calls webhooks defined in config. 

491 

492 Yields: 

493 requests.Response: Webhook response. 

494 ''' 

495 total = len(self._webhooks) 

496 for i, hook in enumerate(self._webhooks): 

497 url = hook['url'] 

498 headers = hook.get('headers', None) 

499 method = hook['method'] 

500 

501 kwargs = {} 

502 if 'data' in hook and method in ['post', 'put', 'patch']: 

503 kwargs['data'] = json.dumps(hook['data']).encode() 

504 

505 if 'json' in hook and method == 'post': 

506 kwargs['json'] = hook['json'] 

507 

508 if 'params' in hook and method == 'get': 

509 kwargs['params'] = hook['params'] 

510 

511 if 'timeout' in hook: 

512 kwargs['timeout'] = hook['timeout'] 

513 

514 # test response 

515 response = Response() 

516 response.status_code = 200 

517 response._content = b'Webhook called.' 

518 

519 if not self._testing: 

520 method = getattr(requests, method) 

521 try: 

522 response = method(url, headers=headers, **kwargs) 

523 except ConnectionError as e: # pragma: no cover 

524 response = Response() # pragma: no cover 

525 response.status_code = 403 # pragma: no cover 

526 response._content = str(e).encode('utf-8') # pragma: no cover 

527 

528 self._logger.info( 

529 f'call_webhooks: {url} {response.text}', 

530 step=i + 1, 

531 total=total 

532 ) 

533 yield response 

534 

535 def export(self): 

536 # type: () -> "Database" 

537 ''' 

538 Exports all the files found in in hidebound root directory. 

539 Calls webhooks afterwards. 

540 

541 Returns: 

542 Database: Self. 

543 ''' 

544 # TODO: Find a nicer pattern for injecting exporters. 

545 lut = dict( 

546 girder=GirderExporter, 

547 disk=DiskExporter, 

548 s3=S3Exporter, 

549 ) # type: Dict[str, Any] 

550 

551 # reassign lut for testing 

552 if self.__exporter_lut is not None: 

553 lut = self.__exporter_lut 

554 

555 total = len(self._exporters) 

556 i = 0 

557 for i, config in enumerate(self._exporters): 

558 key = config['name'] 

559 exporter = lut[key].from_config(config) 

560 exporter.export(self._staging, logger=self._logger) 

561 self._logger.info(f'export: {key}', step=i + 1, total=total) 

562 

563 # assign instance to exporter_lut for testing 

564 if self.__exporter_lut is not None: 

565 self.__exporter_lut[key] = exporter 

566 

567 list(self.call_webhooks()) 

568 self._logger.info('export: complete', step=i + 1, total=total) 

569 return self 

570 

571 def search(self, query, group_by_asset=False): 

572 # type: (str, bool) -> "DataFrame" 

573 ''' 

574 Search data according to given SQL query. 

575 

576 Args: 

577 query (str): SQL query. Make sure to use "FROM data" in query. 

578 group_by_asset (bool, optional): Whether to group the data by asset. 

579 Default: False. 

580 

581 Returns: 

582 DataFrame: Formatted data. 

583 ''' 

584 env = {'data': self.read(group_by_asset=group_by_asset)} 

585 result = pandasql.PandaSQL()(query, env) 

586 self._logger.info(f'search: {query}', step=1, total=1) 

587 self._logger.info('search: complete', step=1, total=1) 

588 return result