Coverage for /home/ubuntu/hidebound/python/hidebound/exporters/exporter_base.py: 100%
69 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-05 23:50 +0000
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-05 23:50 +0000
1from typing import Any, Dict, List, Optional, Union # noqa F401
3from pathlib import Path
4import re
6from schematics import Model
7from schematics.types import ListType, ModelType, StringType
8import dask.dataframe as dd
10from hidebound.core.connection import DaskConnection, DaskConnectionConfig
11from hidebound.core.logging import DummyLogger, ProgressLogger
12import hidebound.core.tools as hbt
13import hidebound.core.validators as vd
14# ------------------------------------------------------------------------------
17class ExporterConfigBase(Model):
18 '''
19 A class for validating configurations supplied to S3Exporter.
21 Attributes:
22 metadata_types (list, optional): List of metadata types for export.
23 Default: [asset, file, asset-chunk, file-chunk].
24 dask (dict, optional). Dask configuration. Default: {}.
25 '''
26 metadata_types = ListType(
27 StringType(validators=[vd.is_metadata_type]),
28 required=True,
29 default=['asset', 'file', 'asset-chunk', 'file-chunk']
30 )
31 dask = ModelType(
32 DaskConnectionConfig, default={}, required=True
33 ) # type: ModelType
36class ExporterBase:
37 '''
38 Abstract base class for hidebound exporters.
39 '''
40 def __init__(
41 self,
42 metadata_types=['asset', 'file', 'asset-chunk', 'file-chunk'],
43 dask={},
44 ):
45 # type: (List[str], Dict[str, Any]) -> None
46 '''
47 Constructs a ExporterBase instance.
49 Args:
50 metadata_types (list[st], optional). Metadata types to be exported.
51 Default: [asset, file, asset-chunk, file-chunk].
52 dask (dict, optional). Dask configuration. Default: {}.
53 '''
54 self._metadata_types = metadata_types
55 self._time = hbt.time_string()
56 self._dask_config = dask
58 def _enforce_directory_structure(self, staging_dir):
59 # type: (Union[str, Path]) -> None
60 '''
61 Ensure the following directory exist under given hidebound directory.
62 * content
63 * metadata
64 * metadata/asset
65 * metadata/file
66 * metadata/asset-chunk
67 * metadata/file-chunk
69 Args:
70 staging_dir (Path or str): Hidebound directory.
72 Raises:
73 FileNotFoundError: If any of the directories have not been found.
74 '''
75 data = Path(staging_dir, 'content')
76 meta = Path(staging_dir, 'metadata')
77 asset_dir = Path(meta, 'asset')
78 file_dir = Path(meta, 'file')
79 asset_chunk = Path(meta, 'asset-chunk')
80 file_chunk = Path(meta, 'file-chunk')
82 paths = [data, meta, asset_dir, file_dir, asset_chunk, file_chunk]
83 for path in paths:
84 if not path.is_dir():
85 msg = f'{path.as_posix()} directory does not exist.'
86 raise FileNotFoundError(msg)
88 def export(
89 self,
90 staging_dir, # type: Union[str, Path]
91 logger=None # type: Optional[Union[DummyLogger, ProgressLogger]]
92 ):
93 # type: (...) -> None
94 '''
95 Exports data within given hidebound directory.
97 Args:
98 staging_dir (Path or str): Hidebound directory.
99 logger (object, optional): Progress logger. Default: None.
100 '''
101 # set logger
102 if not isinstance(logger, ProgressLogger):
103 logger = DummyLogger()
105 self._enforce_directory_structure(staging_dir)
107 staging_dir = Path(staging_dir).as_posix()
108 data = hbt.directory_to_dataframe(staging_dir)
109 data['metadata'] = None
111 total = 1 + len(self._metadata_types)
113 # export content
114 regex = f'{staging_dir}/metadata/file/'
115 mask = data.filepath.apply(lambda x: re.search(regex, x)).astype(bool)
116 content = data[mask].filepath.apply(hbt.read_json)
117 with DaskConnection(self._dask_config) as conn:
118 content = dd.from_pandas(content, npartitions=conn.num_partitions)
119 content.apply(self._export_content, meta=(None, 'object')).compute()
120 logger.info('exporter: export content', step=1, total=total)
122 # export metadata
123 lut = {
124 'asset': self._export_asset,
125 'file': self._export_file,
126 'asset-chunk': self._export_asset_chunk,
127 'file-chunk': self._export_file_chunk,
128 }
129 for i, mtype in enumerate(self._metadata_types):
130 regex = f'{staging_dir}/metadata/{mtype}/'
131 mask = data.filepath.apply(lambda x: bool(re.search(regex, x)))
132 meta = data[mask].filepath.apply(hbt.read_json)
133 with DaskConnection(self._dask_config) as conn:
134 meta = dd.from_pandas(meta, npartitions=conn.num_partitions)
135 meta.apply(lut[mtype], meta=(None, 'object')).compute()
137 logger.info(f'exporter: export {mtype}', step=i + 1, total=total)
139 def _export_content(self, metadata):
140 # type: (Dict) -> None
141 '''
142 Exports from file from hidebound/content named in metadata.
143 Metadata should have filepath, filepath_relative keys.
145 Args:
146 metadata (dict): File metadata.
148 Raises:
149 NotImplementedError: If method is not implemented in subclass.
150 '''
151 msg = '_export_content method must be implemented in subclass.'
152 raise NotImplementedError(msg)
154 def _export_asset(self, metadata):
155 # type: (Dict) -> None
156 '''
157 Exports metadata from single JSON file in hidebound/metadata/asset.
159 Args:
160 metadata (dict): Asset metadata.
162 Raises:
163 NotImplementedError: If method is not implemented in subclass.
164 '''
165 msg = '_export_asset method must be implemented in subclass.'
166 raise NotImplementedError(msg)
168 def _export_file(self, metadata):
169 # type: (Dict) -> None
170 '''
171 Exports metadata from single JSON file in hidebound/metadata/file.
173 Args:
174 metadata (dict): File metadata.
176 Raises:
177 NotImplementedError: If method is not implemented in subclass.
178 '''
179 msg = '_export_file method must be implemented in subclass.'
180 raise NotImplementedError(msg)
182 def _export_asset_chunk(self, metadata):
183 # type: (List[dict]) -> None
184 '''
185 Exports list of asset metadata to a single asset in
186 hidebound/metadata/asset-chunk.
188 Args:
189 metadata (list[dict]): asset metadata.
191 Raises:
192 NotImplementedError: If method is not implemented in subclass.
193 '''
194 msg = '_export_asset_chunk method must be implemented in subclass.'
195 raise NotImplementedError(msg)
197 def _export_file_chunk(self, metadata):
198 # type: (List[dict]) -> None
199 '''
200 Exports list of file metadata to a single file in
201 hidebound/metadata/file-chunk.
203 Args:
204 metadata (list[dict]): File metadata.
206 Raises:
207 NotImplementedError: If method is not implemented in subclass.
208 '''
209 msg = '_export_file_chunk method must be implemented in subclass.'
210 raise NotImplementedError(msg)