Coverage for /home/ubuntu/hidebound/python/hidebound/exporters/exporter_base.py: 100%

69 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-05 23:50 +0000

1from typing import Any, Dict, List, Optional, Union # noqa F401 

2 

3from pathlib import Path 

4import re 

5 

6from schematics import Model 

7from schematics.types import ListType, ModelType, StringType 

8import dask.dataframe as dd 

9 

10from hidebound.core.connection import DaskConnection, DaskConnectionConfig 

11from hidebound.core.logging import DummyLogger, ProgressLogger 

12import hidebound.core.tools as hbt 

13import hidebound.core.validators as vd 

14# ------------------------------------------------------------------------------ 

15 

16 

17class ExporterConfigBase(Model): 

18 ''' 

19 A class for validating configurations supplied to S3Exporter. 

20 

21 Attributes: 

22 metadata_types (list, optional): List of metadata types for export. 

23 Default: [asset, file, asset-chunk, file-chunk]. 

24 dask (dict, optional). Dask configuration. Default: {}. 

25 ''' 

26 metadata_types = ListType( 

27 StringType(validators=[vd.is_metadata_type]), 

28 required=True, 

29 default=['asset', 'file', 'asset-chunk', 'file-chunk'] 

30 ) 

31 dask = ModelType( 

32 DaskConnectionConfig, default={}, required=True 

33 ) # type: ModelType 

34 

35 

36class ExporterBase: 

37 ''' 

38 Abstract base class for hidebound exporters. 

39 ''' 

40 def __init__( 

41 self, 

42 metadata_types=['asset', 'file', 'asset-chunk', 'file-chunk'], 

43 dask={}, 

44 ): 

45 # type: (List[str], Dict[str, Any]) -> None 

46 ''' 

47 Constructs a ExporterBase instance. 

48 

49 Args: 

50 metadata_types (list[st], optional). Metadata types to be exported. 

51 Default: [asset, file, asset-chunk, file-chunk]. 

52 dask (dict, optional). Dask configuration. Default: {}. 

53 ''' 

54 self._metadata_types = metadata_types 

55 self._time = hbt.time_string() 

56 self._dask_config = dask 

57 

58 def _enforce_directory_structure(self, staging_dir): 

59 # type: (Union[str, Path]) -> None 

60 ''' 

61 Ensure the following directory exist under given hidebound directory. 

62 * content 

63 * metadata 

64 * metadata/asset 

65 * metadata/file 

66 * metadata/asset-chunk 

67 * metadata/file-chunk 

68 

69 Args: 

70 staging_dir (Path or str): Hidebound directory. 

71 

72 Raises: 

73 FileNotFoundError: If any of the directories have not been found. 

74 ''' 

75 data = Path(staging_dir, 'content') 

76 meta = Path(staging_dir, 'metadata') 

77 asset_dir = Path(meta, 'asset') 

78 file_dir = Path(meta, 'file') 

79 asset_chunk = Path(meta, 'asset-chunk') 

80 file_chunk = Path(meta, 'file-chunk') 

81 

82 paths = [data, meta, asset_dir, file_dir, asset_chunk, file_chunk] 

83 for path in paths: 

84 if not path.is_dir(): 

85 msg = f'{path.as_posix()} directory does not exist.' 

86 raise FileNotFoundError(msg) 

87 

88 def export( 

89 self, 

90 staging_dir, # type: Union[str, Path] 

91 logger=None # type: Optional[Union[DummyLogger, ProgressLogger]] 

92 ): 

93 # type: (...) -> None 

94 ''' 

95 Exports data within given hidebound directory. 

96 

97 Args: 

98 staging_dir (Path or str): Hidebound directory. 

99 logger (object, optional): Progress logger. Default: None. 

100 ''' 

101 # set logger 

102 if not isinstance(logger, ProgressLogger): 

103 logger = DummyLogger() 

104 

105 self._enforce_directory_structure(staging_dir) 

106 

107 staging_dir = Path(staging_dir).as_posix() 

108 data = hbt.directory_to_dataframe(staging_dir) 

109 data['metadata'] = None 

110 

111 total = 1 + len(self._metadata_types) 

112 

113 # export content 

114 regex = f'{staging_dir}/metadata/file/' 

115 mask = data.filepath.apply(lambda x: re.search(regex, x)).astype(bool) 

116 content = data[mask].filepath.apply(hbt.read_json) 

117 with DaskConnection(self._dask_config) as conn: 

118 content = dd.from_pandas(content, npartitions=conn.num_partitions) 

119 content.apply(self._export_content, meta=(None, 'object')).compute() 

120 logger.info('exporter: export content', step=1, total=total) 

121 

122 # export metadata 

123 lut = { 

124 'asset': self._export_asset, 

125 'file': self._export_file, 

126 'asset-chunk': self._export_asset_chunk, 

127 'file-chunk': self._export_file_chunk, 

128 } 

129 for i, mtype in enumerate(self._metadata_types): 

130 regex = f'{staging_dir}/metadata/{mtype}/' 

131 mask = data.filepath.apply(lambda x: bool(re.search(regex, x))) 

132 meta = data[mask].filepath.apply(hbt.read_json) 

133 with DaskConnection(self._dask_config) as conn: 

134 meta = dd.from_pandas(meta, npartitions=conn.num_partitions) 

135 meta.apply(lut[mtype], meta=(None, 'object')).compute() 

136 

137 logger.info(f'exporter: export {mtype}', step=i + 1, total=total) 

138 

139 def _export_content(self, metadata): 

140 # type: (Dict) -> None 

141 ''' 

142 Exports from file from hidebound/content named in metadata. 

143 Metadata should have filepath, filepath_relative keys. 

144 

145 Args: 

146 metadata (dict): File metadata. 

147 

148 Raises: 

149 NotImplementedError: If method is not implemented in subclass. 

150 ''' 

151 msg = '_export_content method must be implemented in subclass.' 

152 raise NotImplementedError(msg) 

153 

154 def _export_asset(self, metadata): 

155 # type: (Dict) -> None 

156 ''' 

157 Exports metadata from single JSON file in hidebound/metadata/asset. 

158 

159 Args: 

160 metadata (dict): Asset metadata. 

161 

162 Raises: 

163 NotImplementedError: If method is not implemented in subclass. 

164 ''' 

165 msg = '_export_asset method must be implemented in subclass.' 

166 raise NotImplementedError(msg) 

167 

168 def _export_file(self, metadata): 

169 # type: (Dict) -> None 

170 ''' 

171 Exports metadata from single JSON file in hidebound/metadata/file. 

172 

173 Args: 

174 metadata (dict): File metadata. 

175 

176 Raises: 

177 NotImplementedError: If method is not implemented in subclass. 

178 ''' 

179 msg = '_export_file method must be implemented in subclass.' 

180 raise NotImplementedError(msg) 

181 

182 def _export_asset_chunk(self, metadata): 

183 # type: (List[dict]) -> None 

184 ''' 

185 Exports list of asset metadata to a single asset in 

186 hidebound/metadata/asset-chunk. 

187 

188 Args: 

189 metadata (list[dict]): asset metadata. 

190 

191 Raises: 

192 NotImplementedError: If method is not implemented in subclass. 

193 ''' 

194 msg = '_export_asset_chunk method must be implemented in subclass.' 

195 raise NotImplementedError(msg) 

196 

197 def _export_file_chunk(self, metadata): 

198 # type: (List[dict]) -> None 

199 ''' 

200 Exports list of file metadata to a single file in 

201 hidebound/metadata/file-chunk. 

202 

203 Args: 

204 metadata (list[dict]): File metadata. 

205 

206 Raises: 

207 NotImplementedError: If method is not implemented in subclass. 

208 ''' 

209 msg = '_export_file_chunk method must be implemented in subclass.' 

210 raise NotImplementedError(msg)