Coverage for /home/ubuntu/rolling-pin/python/rolling_pin/conform_etl.py: 100%

95 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-13 19:35 +0000

1from typing import Any, Dict, List, Union # noqa: F401 

2from IPython.display import HTML, Image # noqa: F401 

3 

4from copy import deepcopy 

5from itertools import chain 

6from pathlib import Path 

7import re 

8 

9from lunchbox.enforce import Enforce 

10from pandas import DataFrame 

11import lunchbox.tools as lbt 

12import yaml 

13 

14from rolling_pin.blob_etl import BlobETL 

15from rolling_pin.conform_config import ConformConfig 

16import rolling_pin.tools as rpt 

17 

18Rules = List[Dict[str, str]] 

19# ------------------------------------------------------------------------------ 

20 

21 

22CONFORM_COLOR_SCHEME = deepcopy(rpt.COLOR_SCHEME) 

23CONFORM_COLOR_SCHEME.update({ 

24 'node_font': '#DE958E', 

25 'node_value_font': '#B6ECF3', 

26 'edge': '#DE958E', 

27 'edge_value': '#B6ECF3', 

28 'node_library_font': '#B6ECF3', 

29 'node_module_font': '#DE958E', 

30 'edge_library': '#B6ECF3', 

31 'edge_module': '#DE958E' 

32}) 

33 

34 

35class ConformETL: 

36 ''' 

37 ConformETL creates a DataFrame from a given directory of source files. 

38 Then it generates target paths given a set of rules. 

39 Finally, the conform method is called and the source files are copied to 

40 their target filepaths. 

41 ''' 

42 @staticmethod 

43 def _get_data( 

44 source_rules=[], rename_rules=[], group_rules=[], line_rules=[] 

45 ): 

46 # type: (Rules, Rules, Rules, Rules) -> DataFrame 

47 ''' 

48 Generates DataFrame from given source_rules and then generates target 

49 paths for them given other rules. 

50 

51 Args: 

52 source_rules (Rules): A list of rules for parsing directories. 

53 Default: []. 

54 rename_rules (Rules): A list of rules for renaming source filepath 

55 to target filepaths. Default: []. 

56 group_rules (Rules): A list of rules for grouping files. 

57 Default: []. 

58 line_rules (Rules): A list of rules for peforming line copies on 

59 files belonging to a given group. Default: []. 

60 

61 Returns: 

62 DataFrame: Conform DataFrame. 

63 ''' 

64 # source 

65 source = [] # type: List[Any] 

66 for rule in source_rules: 

67 files = rpt.list_all_files( 

68 rule['path'], 

69 include_regex=rule.get('include', None), 

70 exclude_regex=rule.get('exclude', None), 

71 ) 

72 source.extend(files) 

73 source = sorted([x.as_posix() for x in source]) 

74 data = DataFrame() 

75 data['source'] = source 

76 data['target'] = source 

77 

78 # rename 

79 for rule in rename_rules: 

80 data.target = data.target.apply( 

81 lambda x: rpt.replace_and_format( 

82 rule['regex'], rule['replace'], x 

83 ) 

84 ) 

85 

86 # group 

87 data['groups'] = data.source.apply(lambda x: []) 

88 for rule in group_rules: 

89 mask = data.source \ 

90 .apply(lambda x: re.search(rule['regex'], x)) \ 

91 .astype(bool) 

92 data.loc[mask, 'groups'] = data.groups \ 

93 .apply(lambda x: x + [rule['name']]) 

94 mask = data.groups.apply(lambda x: x == []) 

95 data.loc[mask, 'groups'] = data.loc[mask, 'groups'] \ 

96 .apply(lambda x: ['base']) 

97 

98 # line 

99 groups = set([x['group'] for x in line_rules]) 

100 data['line_rule'] = data.groups \ 

101 .apply(lambda x: len(set(x).intersection(groups)) > 0) 

102 

103 return data 

104 

105 @classmethod 

106 def from_yaml(cls, filepath): 

107 # type: (Union[str, Path]) -> ConformETL 

108 ''' 

109 Construct ConformETL instance from given yaml file. 

110 

111 Args: 

112 filepath (str or Path): YAML file. 

113 

114 Raises: 

115 EnforceError: If file does not end in yml or yaml. 

116 

117 Returns: 

118 ConformETL: ConformETL instance. 

119 ''' 

120 filepath = Path(filepath).as_posix() 

121 ext = Path(filepath).suffix[1:].lower() 

122 msg = f'{filepath} does not end in yml or yaml.' 

123 Enforce(ext, 'in', ['yml', 'yaml'], message=msg) 

124 # ---------------------------------------------------------------------- 

125 

126 with open(filepath) as f: 

127 config = yaml.safe_load(f) 

128 return cls(**config) 

129 

130 def __init__( 

131 self, source_rules=[], rename_rules=[], group_rules=[], line_rules=[] 

132 ): 

133 # type: (Rules, Rules, Rules, Rules) -> None 

134 ''' 

135 Generates DataFrame from given source_rules and then generates target 

136 paths for them given other rules. 

137 

138 Args: 

139 source_rules (Rules): A list of rules for parsing directories. 

140 Default: []. 

141 rename_rules (Rules): A list of rules for renaming source filepath 

142 to target filepaths. Default: []. 

143 group_rules (Rules): A list of rules for grouping files. 

144 Default: []. 

145 line_rules (Rules): A list of rules for peforming line copies on 

146 files belonging to a given group. Default: []. 

147 

148 Raises: 

149 DataError: If configuration is invalid. 

150 ''' 

151 config = dict( 

152 source_rules=source_rules, 

153 rename_rules=rename_rules, 

154 group_rules=group_rules, 

155 line_rules=line_rules, 

156 ) 

157 cfg = ConformConfig(config) 

158 cfg.validate() 

159 config = cfg.to_native() 

160 

161 self._data = self._get_data( 

162 source_rules=source_rules, 

163 rename_rules=rename_rules, 

164 group_rules=group_rules, 

165 line_rules=line_rules, 

166 ) # type: DataFrame 

167 self._line_rules = line_rules # type: Rules 

168 

169 def __repr__(self): 

170 # type: () -> str 

171 ''' 

172 String representation of conform DataFrame. 

173 

174 Returns: 

175 str: Table optimized for output to shell. 

176 ''' 

177 data = self._data.copy() 

178 data.line_rule = data.line_rule.apply(lambda x: 'X' if x else '') 

179 data.rename(lambda x: x.upper(), axis=1, inplace=True) 

180 output = data \ 

181 .to_string(index=False, max_colwidth=150, col_space=[50, 50, 20, 10]) 

182 return output 

183 

184 @property 

185 def groups(self): 

186 # type: () -> List[str] 

187 ''' 

188 list[str]: List of groups found with self._data. 

189 ''' 

190 output = self._data.groups.tolist() 

191 output = sorted(list(set(chain(*output)))) 

192 output.remove('base') 

193 output.insert(0, 'base') 

194 return output 

195 

196 def to_dataframe(self): 

197 # type: () -> DataFrame 

198 ''' 

199 Returns: 

200 DataFrame: Copy of internal data. 

201 ''' 

202 return self._data.copy() 

203 

204 def to_blob(self): 

205 # type: () -> BlobETL 

206 ''' 

207 Converts self into a BlobETL object with target column as keys and 

208 source columns as values. 

209 

210 Returns: 

211 BlobETL: BlobETL of target and source filepaths. 

212 ''' 

213 data = self._data 

214 keys = data.target.tolist() 

215 vals = data.source.tolist() 

216 output = dict(zip(keys, vals)) 

217 return BlobETL(output) 

218 

219 def to_html( 

220 self, orient='lr', color_scheme=CONFORM_COLOR_SCHEME, as_png=False 

221 ): 

222 # type: (str, Dict[str, str], bool) -> Union[Image, HTML] 

223 ''' 

224 For use in inline rendering of graph data in Jupyter Lab. 

225 Graph from target to source filepath. Target is in red, source is in 

226 cyan. 

227 

228 Args: 

229 orient (str, optional): Graph layout orientation. Default: lr. 

230 Options include: 

231 

232 * tb - top to bottom 

233 * bt - bottom to top 

234 * lr - left to right 

235 * rl - right to left 

236 color_scheme: (dict, optional): Color scheme to be applied to graph. 

237 Default: rolling_pin.conform_etl.CONFORM_COLOR_SCHEME 

238 as_png (bool, optional): Display graph as a PNG image instead of 

239 SVG. Useful for display on Github. Default: False. 

240 

241 Returns: 

242 IPython.display.HTML: HTML object for inline display. 

243 ''' 

244 return self.to_blob() \ 

245 .to_html(orient=orient, color_scheme=color_scheme, as_png=as_png) 

246 

247 def conform(self, groups='all'): 

248 # type: (Union[str, List[str]]) -> None 

249 ''' 

250 Copies source files to target filepaths. 

251 

252 Args: 

253 groups (str or list[str]): Groups of files which are to be conformed. 

254 'all' means all groups. Default: 'all'. 

255 ''' 

256 if isinstance(groups, str): 

257 groups = [groups] 

258 if groups == ['all']: 

259 groups = self.groups 

260 

261 data = self.to_dataframe() 

262 

263 # copy files 

264 grps = set(groups) 

265 mask = data.groups \ 

266 .apply(lambda x: set(x).intersection(grps)) \ 

267 .apply(lambda x: len(x) > 0) 

268 data = data[mask] 

269 data.apply(lambda x: rpt.copy_file(x.source, x.target), axis=1) 

270 

271 # copy lines 

272 data['text'] = data.source.apply(lambda x: lbt.try_(rpt.read_text, x, 'error')) 

273 readable_mask = data.text.apply(lambda x: isinstance(x, str)) 

274 data.loc[~readable_mask, 'text'] = '' 

275 rules = list(filter(lambda x: x['group'] in groups, self._line_rules)) 

276 for rule in rules: 

277 mask = data.groups.apply(lambda x: rule['group'] in x) 

278 data.loc[mask, 'text'] = data.loc[mask, 'text'].apply( 

279 lambda x: rpt.filter_text( 

280 x, 

281 include_regex=rule.get('include', None), 

282 exclude_regex=rule.get('exclude', None), 

283 replace_regex=rule.get('regex', None), 

284 replace_value=rule.get('replace', None), 

285 ) 

286 ) 

287 data[readable_mask].apply(lambda x: rpt.write_text(x.text, x.target), axis=1)