Coverage for /home/ubuntu/rolling-pin/python/rolling_pin/conform_etl.py: 100%

92 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2023-11-15 00:43 +0000

1from typing import Any, Dict, List, Union # noqa: F401 

2from IPython.display import HTML, Image # noqa: F401 

3 

4from copy import deepcopy 

5from itertools import chain 

6from pathlib import Path 

7import re 

8 

9from lunchbox.enforce import Enforce 

10from pandas import DataFrame 

11import yaml 

12 

13from rolling_pin.blob_etl import BlobETL 

14from rolling_pin.conform_config import ConformConfig 

15import rolling_pin.tools as rpt 

16 

17Rules = List[Dict[str, str]] 

18# ------------------------------------------------------------------------------ 

19 

20 

21CONFORM_COLOR_SCHEME = deepcopy(rpt.COLOR_SCHEME) 

22CONFORM_COLOR_SCHEME.update({ 

23 'node_font': '#DE958E', 

24 'node_value_font': '#B6ECF3', 

25 'edge': '#DE958E', 

26 'edge_value': '#B6ECF3', 

27 'node_library_font': '#B6ECF3', 

28 'node_module_font': '#DE958E', 

29 'edge_library': '#B6ECF3', 

30 'edge_module': '#DE958E' 

31}) 

32 

33 

34class ConformETL: 

35 ''' 

36 ConformETL creates a DataFrame from a given directory of source files. 

37 Then it generates target paths given a set of rules. 

38 Finally, the conform method is called and the source files are copied to 

39 their target filepaths. 

40 ''' 

41 @staticmethod 

42 def _get_data( 

43 source_rules=[], rename_rules=[], group_rules=[], line_rules=[] 

44 ): 

45 # type: (Rules, Rules, Rules, Rules) -> DataFrame 

46 ''' 

47 Generates DataFrame from given source_rules and then generates target 

48 paths for them given other rules. 

49 

50 Args: 

51 source_rules (Rules): A list of rules for parsing directories. 

52 Default: []. 

53 rename_rules (Rules): A list of rules for renaming source filepath 

54 to target filepaths. Default: []. 

55 group_rules (Rules): A list of rules for grouping files. 

56 Default: []. 

57 line_rules (Rules): A list of rules for peforming line copies on 

58 files belonging to a given group. Default: []. 

59 

60 Returns: 

61 DataFrame: Conform DataFrame. 

62 ''' 

63 # source 

64 source = [] # type: List[Any] 

65 for rule in source_rules: 

66 files = rpt.list_all_files( 

67 rule['path'], 

68 include_regex=rule.get('include', None), 

69 exclude_regex=rule.get('exclude', None), 

70 ) 

71 source.extend(files) 

72 source = sorted([x.as_posix() for x in source]) 

73 data = DataFrame() 

74 data['source'] = source 

75 data['target'] = source 

76 

77 # rename 

78 for rule in rename_rules: 

79 data.target = data.target.apply( 

80 lambda x: rpt.replace_and_format( 

81 rule['regex'], rule['replace'], x 

82 ) 

83 ) 

84 

85 # group 

86 data['groups'] = data.source.apply(lambda x: []) 

87 for rule in group_rules: 

88 mask = data.source \ 

89 .apply(lambda x: re.search(rule['regex'], x)) \ 

90 .astype(bool) 

91 data.loc[mask, 'groups'] = data.groups \ 

92 .apply(lambda x: x + [rule['name']]) 

93 mask = data.groups.apply(lambda x: x == []) 

94 data.loc[mask, 'groups'] = data.loc[mask, 'groups'] \ 

95 .apply(lambda x: ['base']) 

96 

97 # line 

98 groups = set([x['group'] for x in line_rules]) 

99 data['line_rule'] = data.groups \ 

100 .apply(lambda x: len(set(x).intersection(groups)) > 0) 

101 

102 return data 

103 

104 @classmethod 

105 def from_yaml(cls, filepath): 

106 # type: (Union[str, Path]) -> ConformETL 

107 ''' 

108 Construct ConformETL instance from given yaml file. 

109 

110 Args: 

111 filepath (str or Path): YAML file. 

112 

113 Raises: 

114 EnforceError: If file does not end in yml or yaml. 

115 

116 Returns: 

117 ConformETL: ConformETL instance. 

118 ''' 

119 filepath = Path(filepath).as_posix() 

120 ext = Path(filepath).suffix[1:].lower() 

121 msg = f'{filepath} does not end in yml or yaml.' 

122 Enforce(ext, 'in', ['yml', 'yaml'], message=msg) 

123 # ---------------------------------------------------------------------- 

124 

125 with open(filepath) as f: 

126 config = yaml.safe_load(f) 

127 return cls(**config) 

128 

129 def __init__( 

130 self, source_rules=[], rename_rules=[], group_rules=[], line_rules=[] 

131 ): 

132 # type: (Rules, Rules, Rules, Rules) -> None 

133 ''' 

134 Generates DataFrame from given source_rules and then generates target 

135 paths for them given other rules. 

136 

137 Args: 

138 source_rules (Rules): A list of rules for parsing directories. 

139 Default: []. 

140 rename_rules (Rules): A list of rules for renaming source filepath 

141 to target filepaths. Default: []. 

142 group_rules (Rules): A list of rules for grouping files. 

143 Default: []. 

144 line_rules (Rules): A list of rules for peforming line copies on 

145 files belonging to a given group. Default: []. 

146 

147 Raises: 

148 DataError: If configuration is invalid. 

149 ''' 

150 config = dict( 

151 source_rules=source_rules, 

152 rename_rules=rename_rules, 

153 group_rules=group_rules, 

154 line_rules=line_rules, 

155 ) 

156 cfg = ConformConfig(config) 

157 cfg.validate() 

158 config = cfg.to_native() 

159 

160 self._data = self._get_data( 

161 source_rules=source_rules, 

162 rename_rules=rename_rules, 

163 group_rules=group_rules, 

164 line_rules=line_rules, 

165 ) # type: DataFrame 

166 self._line_rules = line_rules # type: Rules 

167 

168 def __repr__(self): 

169 # type: () -> str 

170 ''' 

171 String representation of conform DataFrame. 

172 

173 Returns: 

174 str: Table optimized for output to shell. 

175 ''' 

176 data = self._data.copy() 

177 data.line_rule = data.line_rule.apply(lambda x: 'X' if x else '') 

178 data.rename(lambda x: x.upper(), axis=1, inplace=True) 

179 output = data \ 

180 .to_string(index=False, max_colwidth=150, col_space=[50, 50, 20, 10]) 

181 return output 

182 

183 @property 

184 def groups(self): 

185 # type: () -> List[str] 

186 ''' 

187 list[str]: List of groups found with self._data. 

188 ''' 

189 output = self._data.groups.tolist() 

190 output = sorted(list(set(chain(*output)))) 

191 output.remove('base') 

192 output.insert(0, 'base') 

193 return output 

194 

195 def to_dataframe(self): 

196 # type: () -> DataFrame 

197 ''' 

198 Returns: 

199 DataFrame: Copy of internal data. 

200 ''' 

201 return self._data.copy() 

202 

203 def to_blob(self): 

204 # type: () -> BlobETL 

205 ''' 

206 Converts self into a BlobETL object with target column as keys and 

207 source columns as values. 

208 

209 Returns: 

210 BlobETL: BlobETL of target and source filepaths. 

211 ''' 

212 data = self._data 

213 keys = data.target.tolist() 

214 vals = data.source.tolist() 

215 output = dict(zip(keys, vals)) 

216 return BlobETL(output) 

217 

218 def to_html( 

219 self, orient='lr', color_scheme=CONFORM_COLOR_SCHEME, as_png=False 

220 ): 

221 # type: (str, Dict[str, str], bool) -> Union[Image, HTML] 

222 ''' 

223 For use in inline rendering of graph data in Jupyter Lab. 

224 Graph from target to source filepath. Target is in red, source is in 

225 cyan. 

226 

227 Args: 

228 orient (str, optional): Graph layout orientation. Default: lr. 

229 Options include: 

230 

231 * tb - top to bottom 

232 * bt - bottom to top 

233 * lr - left to right 

234 * rl - right to left 

235 color_scheme: (dict, optional): Color scheme to be applied to graph. 

236 Default: rolling_pin.conform_etl.CONFORM_COLOR_SCHEME 

237 as_png (bool, optional): Display graph as a PNG image instead of 

238 SVG. Useful for display on Github. Default: False. 

239 

240 Returns: 

241 IPython.display.HTML: HTML object for inline display. 

242 ''' 

243 return self.to_blob() \ 

244 .to_html(orient=orient, color_scheme=color_scheme, as_png=as_png) 

245 

246 def conform(self, groups='all'): 

247 # type: (Union[str, List[str]]) -> None 

248 ''' 

249 Copies source files to target filepaths. 

250 

251 Args: 

252 groups (str or list[str]): Groups of files which are to be conformed. 

253 'all' means all groups. Default: 'all'. 

254 ''' 

255 if isinstance(groups, str): 

256 groups = [groups] 

257 if groups == ['all']: 

258 groups = self.groups 

259 

260 data = self.to_dataframe() 

261 

262 # copy files 

263 grps = set(groups) 

264 mask = data.groups \ 

265 .apply(lambda x: set(x).intersection(grps)) \ 

266 .apply(lambda x: len(x) > 0) 

267 data = data[mask] 

268 data.apply(lambda x: rpt.copy_file(x.source, x.target), axis=1) 

269 

270 # copy lines 

271 data['text'] = data.source.apply(rpt.read_text) 

272 rules = list(filter(lambda x: x['group'] in groups, self._line_rules)) 

273 for rule in rules: 

274 mask = data.groups.apply(lambda x: rule['group'] in x) 

275 data.loc[mask, 'text'] = data.loc[mask, 'text'].apply( 

276 lambda x: rpt.filter_text( 

277 x, 

278 include_regex=rule.get('include', None), 

279 exclude_regex=rule.get('exclude', None), 

280 replace_regex=rule.get('regex', None), 

281 replace_value=rule.get('replace', None), 

282 ) 

283 ) 

284 data.apply(lambda x: rpt.write_text(x.text, x.target), axis=1)