Coverage for /home/ubuntu/hidebound/python/hidebound/core/database_tools.py: 100%

139 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-05 23:50 +0000

1from typing import Any, Dict, Optional, Tuple, Union # noqa F401 

2from hidebound.core.specification_base import SpecificationBase # noqa F401 

3 

4from collections import defaultdict 

5from pathlib import Path 

6import re 

7import uuid 

8 

9from schematics.exceptions import DataError, ValidationError 

10import dask.dataframe as dd 

11import lunchbox.tools as lbt 

12import numpy as np 

13import pandas as pd 

14 

15from hidebound.core.parser import AssetNameParser 

16import hidebound.core.tools as hbt 

17 

18DF = Union[pd.DataFrame, dd.DataFrame] 

19# ------------------------------------------------------------------------------ 

20 

21 

22''' 

23A library of tools for Database to use in construction of its central DataFrame. 

24''' 

25 

26 

27def add_specification(data, specifications): 

28 # type: (DF, Dict[str, SpecificationBase]) -> DF 

29 ''' 

30 Adds specification data to given DataFrame. 

31 

32 Columns added: 

33 

34 * specification 

35 * specification_class 

36 * file_error 

37 

38 Args: 

39 data (DataFrame): DataFrame. 

40 specifications (dict): Dictionary of specifications. 

41 

42 Returns: 

43 DataFrame: DataFrame with specification, specification_class and 

44 file_error columns. 

45 ''' 

46 def get_spec(filename): 

47 output = lbt.try_( 

48 AssetNameParser.parse_specification, filename, 'error' 

49 ) 

50 if isinstance(output, dict): 

51 return output['specification'], np.nan 

52 return np.nan, str(output) 

53 

54 # parse filenames 

55 parse = data.filename \ 

56 .apply(get_spec, **hbt.get_meta_kwargs(data, ('filename', 'object'))) 

57 

58 # set specifications 

59 data['specification'] = parse.apply( 

60 lambda x: x[0], 

61 **hbt.get_meta_kwargs(data, ('specification', str)) 

62 ) 

63 

64 # set file errors 

65 data['file_error'] = parse \ 

66 .apply(lambda x: x[1], **hbt.get_meta_kwargs(data, ('file_error', str))) 

67 

68 # add specification classes 

69 data['specification_class'] = data.specification.apply( 

70 lambda x: specifications.get(x, np.nan), 

71 **hbt.get_meta_kwargs(data, ('specification_class', 'object')) 

72 ) 

73 

74 # add spec not found errors to rows with no file errors 

75 error = hbt.error_to_string(KeyError('Specification not found.')) 

76 data.file_error = data.file_error.mask( 

77 data.file_error.isnull() & data.specification_class.isnull(), 

78 error 

79 ) 

80 return data 

81 

82 

83def validate_filepath(data): 

84 # type: (DF) -> DF 

85 ''' 

86 Validates filepath column of given DataFrame. 

87 Adds error to error column if invalid. 

88 

89 Args: 

90 data (DataFrame): DataFrame. 

91 

92 Returns: 

93 DataFrame: DataFrame with updated file_error columns. 

94 ''' 

95 def validate(row): 

96 try: 

97 row.specification_class().validate_filepath(row.filepath) 

98 return np.nan 

99 except ValidationError as e: 

100 return hbt.error_to_string(e) 

101 

102 data.file_error = hbt.pred_combinator( 

103 data, 

104 lambda x: pd.isnull(x.file_error), 

105 validate, 

106 lambda x: x.file_error, 

107 **hbt.get_meta_kwargs(data, ('file_error', 'object')) 

108 ) 

109 return data 

110 

111 

112def add_file_traits(data): 

113 # type: (DF) -> DF 

114 ''' 

115 Adds traits derived from file in filepath. 

116 Add file_traits column and one column per traits key. 

117 

118 Args: 

119 data (DataFrame): DataFrame. 

120 

121 Returns: 

122 DataFrame: DataFrame with updated file_error columns. 

123 ''' 

124 data['file_traits'] = hbt.pred_combinator( 

125 data, 

126 lambda x: pd.notnull(x.specification_class), 

127 lambda x: x.specification_class().get_traits(x.filepath), 

128 lambda x: {}, 

129 **hbt.get_meta_kwargs(data, ('file_traits', 'object')) 

130 ) 

131 return data 

132 

133 

134def add_relative_path(data, column, root_dir): 

135 # type: (DF, str, Union[str, Path]) -> DF 

136 ''' 

137 Adds relative path column derived from given column. 

138 

139 Args: 

140 data (DataFrame): DataFrame. 

141 column (str): Column to be made relative. 

142 root_dir (Path or str): Root path to be removed. 

143 

144 Returns: 

145 DataFrame: DataFrame with updated [column]_relative column. 

146 ''' 

147 root_dir_ = Path(root_dir).as_posix() # type: str 

148 if not root_dir_.endswith('/'): 

149 root_dir_ += '/' 

150 col = column + '_relative' 

151 data[col] = hbt.pred_combinator( 

152 data[column], 

153 lambda x: isinstance(x, str), 

154 lambda x: re.sub(root_dir_, '', Path(x).as_posix()), 

155 lambda x: x, 

156 **hbt.get_meta_kwargs(data, (col, str)) 

157 ) 

158 return data 

159 

160 

161def add_asset_name(data): 

162 # type: (DF) -> DF 

163 ''' 

164 Adds asset_name column derived from filepath. 

165 

166 Args: 

167 data (DataFrame): DataFrame. 

168 

169 Returns: 

170 DataFrame: DataFrame with updated asset_name column. 

171 ''' 

172 data['asset_name'] = hbt.pred_combinator( 

173 data, 

174 lambda x: pd.isnull(x.file_error), 

175 lambda x: x.specification_class().get_asset_name(x.filepath), 

176 lambda x: np.nan, 

177 **hbt.get_meta_kwargs(data, ('asset_name', str)) 

178 ) 

179 return data 

180 

181 

182def add_asset_path(data): 

183 # type: (DF) -> DF 

184 ''' 

185 Adds asset_path column derived from filepath. 

186 

187 Args: 

188 data (DataFrame): DataFrame. 

189 

190 Returns: 

191 DataFrame: DataFrame with asset_path column. 

192 ''' 

193 data['asset_path'] = hbt.pred_combinator( 

194 data, 

195 lambda x: pd.notnull(x.specification_class), 

196 lambda x: x.specification_class().get_asset_path(x.filepath).as_posix(), 

197 lambda x: np.nan, 

198 **hbt.get_meta_kwargs(data, ('asset_path', str)) 

199 ) 

200 return data 

201 

202 

203def add_asset_type(data): 

204 # type: (DF) -> DF 

205 ''' 

206 Adds asset_type column derived from specification. 

207 

208 Args: 

209 data (DataFrame): DataFrame. 

210 

211 Returns: 

212 DataFrame: DataFrame with asset_type column. 

213 ''' 

214 data['asset_type'] = hbt.pred_combinator( 

215 data.specification_class, 

216 lambda x: pd.notnull(x), 

217 lambda x: x.asset_type, 

218 lambda x: np.nan, 

219 **hbt.get_meta_kwargs(data, ('asset_type', str)) 

220 ) 

221 return data 

222 

223 

224def add_asset_traits(data): 

225 # type: (DF) -> DF 

226 ''' 

227 Adds traits derived from aggregation of file traits. 

228 Add asset_traits column and one column per traits key. 

229 

230 Args: 

231 data (DataFrame): DataFrame. 

232 

233 Returns: 

234 DataFrame: DataFrame with asset_traits column. 

235 ''' 

236 data = hbt.lut_combinator( 

237 data, 

238 'asset_path', 

239 'asset_traits', 

240 lambda x: x.file_traits.tolist(), 

241 **hbt.get_meta_kwargs(data, 'object') 

242 ) 

243 data.asset_traits = hbt.pred_combinator( 

244 data.asset_traits, 

245 lambda x: isinstance(x, list), 

246 hbt.to_prototype, 

247 lambda x: np.nan, 

248 **hbt.get_meta_kwargs(data, ('asset_traits', 'object')) 

249 ) 

250 return data 

251 

252 

253def validate_assets(data): 

254 # type: (DF) -> DF 

255 ''' 

256 Validates assets according to their specification. 

257 Add asset_error and asset_valid columns. 

258 

259 Args: 

260 data (DataFrame): DataFrame. 

261 

262 Returns: 

263 DataFrame: DataFrame with asset_error and asset_valid columns. 

264 ''' 

265 def error_func(row): 

266 try: 

267 row.specification_class(row.asset_traits).validate() 

268 except DataError as e: 

269 return hbt.error_to_string(e) 

270 return np.nan 

271 

272 # add asset error 

273 data['asset_error'] = hbt.pred_combinator( 

274 data, 

275 lambda x: isinstance(x.asset_traits, dict) and pd.notnull(x.specification_class), 

276 error_func, 

277 lambda x: np.nan, 

278 **hbt.get_meta_kwargs(data, ('asset_error', 'object')) 

279 ) 

280 

281 # assign asset_valid column 

282 data['asset_valid'] = hbt.pred_combinator( 

283 data, 

284 lambda x: pd.isnull(x.asset_error) and pd.isnull(x.file_error) and pd.notnull(x.specification_class), # noqa E501 

285 lambda x: True, 

286 lambda x: False, 

287 **hbt.get_meta_kwargs(data, ('asset_valid', bool)) 

288 ) 

289 return data 

290 

291 

292def cleanup(data): 

293 # type: (DF) -> DF 

294 ''' 

295 Ensures only specific columns are present and in correct order and Paths 

296 are converted to strings. 

297 

298 Args: 

299 data (DataFrame): DataFrame. 

300 

301 Returns: 

302 DataFrame: Cleaned up DataFrame. 

303 ''' 

304 columns = [ 

305 'specification', 

306 'extension', 

307 'filename', 

308 'filepath', 

309 'file_error', 

310 'file_traits', 

311 'asset_name', 

312 'asset_path', 

313 'asset_type', 

314 'asset_traits', 

315 'asset_error', 

316 'asset_valid', 

317 ] 

318 # if no files are found return empty DataFrame 

319 for col in columns: 

320 if col not in data.columns: 

321 data[col] = np.nan 

322 # use copy to avoid SettingWithCopyWarning 

323 # TODO: figure out a way to prevent warning without copy. 

324 cols = data.columns 

325 cols = set(cols).difference(columns) 

326 cols = sorted(cols) 

327 cols = columns + cols 

328 cols = list(filter(lambda x: x != 'specification_class', cols)) 

329 data = data[cols].copy() 

330 

331 # convert Paths to str 

332 for col in data.columns: 

333 mask = data[col].apply(lambda x: isinstance(x, Path)) 

334 data.loc[mask, col] = data.loc[mask, col]\ 

335 .apply(lambda x: x.absolute().as_posix()) 

336 return data 

337 

338 

339def add_asset_id(data): 

340 # type: (pd.DataFrame) -> pd.DataFrame 

341 ''' 

342 Adds asset_id column derived UUID hash of asset filepath. 

343 

344 Args: 

345 data (pd.DataFrame): DataFrame. 

346 

347 Returns: 

348 pd.DataFrame: DataFrame with asset_id column. 

349 ''' 

350 mask = data.file_error.isnull() 

351 data['asset_id'] = np.nan 

352 data.asset_id = data.asset_id.astype(np.object_) 

353 if len(data[mask]) > 0: 

354 data.loc[mask, 'asset_id'] = data.loc[mask].apply( 

355 lambda x: x.specification_class().get_asset_id(x.filepath), 

356 axis=1, 

357 ).astype(str) 

358 return data 

359 

360 

361def get_data_for_write( 

362 data, # type: pd.DataFrame 

363 source_dir, # type: Union[str, Path] 

364 target_dir, # type: Union[str, Path] 

365): # type: (...) -> Optional[Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]] # noqa: E501 

366 ''' 

367 Split given data into three DataFrame creating files. 

368 

369 Args: 

370 data: DataFrame: DataFrame to be transformed. 

371 source_dir (str or Path): Source directory of asset files. 

372 target_dir (str or Path): Target directory where data will be written. 

373 

374 DataFrames: 

375 

376 * File data - For writing asset file data to a target filepath. 

377 * Asset metadata - For writing asset metadata to a target json file. 

378 * File metadata - For writing file metadata to a target json file. 

379 * Asset chunk - For writing asset metadata chunk to a target json file. 

380 * File chunk - For writing file metadata chunk to a target json file. 

381 

382 Returns: 

383 tuple[DataFrame]: file_data, asset_metadata, file_metadata, asset_chunk, 

384 file_chunk. 

385 ''' 

386 # TODO: flatten file_traits and flatten asset_traits 

387 # get valid asset data 

388 data = data.copy() 

389 data = data[data.asset_valid] 

390 

391 # return if there is no valid asset data 

392 if len(data) == 0: 

393 return None 

394 

395 source_dir = Path(source_dir).absolute().as_posix() 

396 data_dir = Path(target_dir, 'content').absolute().as_posix() 

397 meta_dir = Path(target_dir, 'metadata').absolute().as_posix() 

398 

399 # add asset id 

400 keys = data.asset_path.unique().tolist() 

401 vals = [str(uuid.uuid4()) for x in keys] 

402 lut = dict(zip(keys, vals)) # type: Any 

403 lut = defaultdict(lambda: np.nan, lut) 

404 data['asset_id'] = data.asset_path.apply(lambda x: lut[x]) 

405 

406 # create file id and metadata 

407 data['file_id'] = data.asset_name.apply(lambda x: str(uuid.uuid4())) 

408 data['metadata'] = data.apply( 

409 lambda x: dict( 

410 asset_id=x.asset_id, 

411 asset_path=Path(data_dir, x.asset_path_relative).as_posix(), 

412 asset_path_relative=x.asset_path_relative, 

413 asset_name=x.asset_name, 

414 asset_type=x.asset_type, 

415 file_id=x.file_id, 

416 file_traits=x.file_traits, 

417 filename=x.filename, 

418 filepath=Path(data_dir, x.filepath_relative).as_posix(), 

419 filepath_relative=x.filepath_relative, 

420 ), 

421 axis=1 

422 ) 

423 

424 # create asset metadata 

425 asset_meta = data\ 

426 .groupby('asset_id', as_index=False) \ 

427 .agg(lambda x: x.tolist()) 

428 

429 meta = [] 

430 lut = dict( 

431 asset_id='asset_id', 

432 asset_path='asset_path', 

433 asset_path_relative='asset_path_relative', 

434 asset_name='asset_name', 

435 asset_traits='asset_traits', 

436 asset_type='asset_type', 

437 file_id='file_ids', 

438 file_traits='file_traits', 

439 filename='filenames', 

440 filepath='filepaths', 

441 filepath_relative='filepaths_relative', 

442 ) 

443 keys = asset_meta.columns.tolist() 

444 for _, row in asset_meta.iterrows(): 

445 vals = row.tolist() 

446 item = dict(zip(keys, vals)) 

447 item = {lut[k]: item[k] for k in lut.keys()} 

448 

449 # grab the first occurence of these columns 

450 cols = [ 

451 'asset_name', 

452 'asset_path', 

453 'asset_path_relative', 

454 'asset_type', 

455 'asset_traits' 

456 ] 

457 for col in cols: 

458 item[col] = item[col][0] 

459 del item['file_traits'] 

460 

461 # replace asset root 

462 item['asset_path'] = Path(data_dir, item['asset_path_relative']) \ 

463 .as_posix() 

464 

465 meta.append(item) 

466 asset_meta['metadata'] = meta 

467 

468 asset_meta['target'] = asset_meta.asset_id\ 

469 .apply(lambda x: Path(meta_dir, 'asset', x + '.json').as_posix()) 

470 asset_meta = asset_meta[['metadata', 'target']] 

471 

472 # create file data 

473 file_data = data.copy() 

474 file_data['source'] = file_data.filepath 

475 file_data['target'] = file_data.source\ 

476 .apply(lambda x: re.sub(source_dir, data_dir, x)) 

477 file_data = file_data[['source', 'target']] 

478 

479 # create file metadata 

480 file_meta = data.copy() 

481 file_meta['target'] = file_meta.file_id\ 

482 .apply(lambda x: Path(meta_dir, 'file', x + '.json').as_posix()) 

483 file_meta = file_meta[['metadata', 'target']] 

484 

485 # get time 

486 now = hbt.time_string() 

487 

488 # create asset chunk 

489 asset_chunk = pd.DataFrame() 

490 asset_chunk['metadata'] = [asset_meta.metadata.tolist()] 

491 asset_chunk['target'] = [Path( 

492 meta_dir, 'asset-chunk', f'hidebound-asset-chunk_{now}.json' 

493 ).as_posix()] 

494 

495 # create file chunk 

496 file_chunk = pd.DataFrame() 

497 file_chunk['metadata'] = [file_meta.metadata.tolist()] 

498 file_chunk['target'] = [Path( 

499 meta_dir, 'file-chunk', f'hidebound-file-chunk_{now}.json' 

500 ).as_posix()] 

501 

502 return file_data, asset_meta, file_meta, asset_chunk, file_chunk