Coverage for /home/ubuntu/rolling-pin/python/rolling_pin/conform_etl.py: 100%
95 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-13 19:35 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-13 19:35 +0000
1from typing import Any, Dict, List, Union # noqa: F401
2from IPython.display import HTML, Image # noqa: F401
4from copy import deepcopy
5from itertools import chain
6from pathlib import Path
7import re
9from lunchbox.enforce import Enforce
10from pandas import DataFrame
11import lunchbox.tools as lbt
12import yaml
14from rolling_pin.blob_etl import BlobETL
15from rolling_pin.conform_config import ConformConfig
16import rolling_pin.tools as rpt
18Rules = List[Dict[str, str]]
19# ------------------------------------------------------------------------------
22CONFORM_COLOR_SCHEME = deepcopy(rpt.COLOR_SCHEME)
23CONFORM_COLOR_SCHEME.update({
24 'node_font': '#DE958E',
25 'node_value_font': '#B6ECF3',
26 'edge': '#DE958E',
27 'edge_value': '#B6ECF3',
28 'node_library_font': '#B6ECF3',
29 'node_module_font': '#DE958E',
30 'edge_library': '#B6ECF3',
31 'edge_module': '#DE958E'
32})
35class ConformETL:
36 '''
37 ConformETL creates a DataFrame from a given directory of source files.
38 Then it generates target paths given a set of rules.
39 Finally, the conform method is called and the source files are copied to
40 their target filepaths.
41 '''
42 @staticmethod
43 def _get_data(
44 source_rules=[], rename_rules=[], group_rules=[], line_rules=[]
45 ):
46 # type: (Rules, Rules, Rules, Rules) -> DataFrame
47 '''
48 Generates DataFrame from given source_rules and then generates target
49 paths for them given other rules.
51 Args:
52 source_rules (Rules): A list of rules for parsing directories.
53 Default: [].
54 rename_rules (Rules): A list of rules for renaming source filepath
55 to target filepaths. Default: [].
56 group_rules (Rules): A list of rules for grouping files.
57 Default: [].
58 line_rules (Rules): A list of rules for peforming line copies on
59 files belonging to a given group. Default: [].
61 Returns:
62 DataFrame: Conform DataFrame.
63 '''
64 # source
65 source = [] # type: List[Any]
66 for rule in source_rules:
67 files = rpt.list_all_files(
68 rule['path'],
69 include_regex=rule.get('include', None),
70 exclude_regex=rule.get('exclude', None),
71 )
72 source.extend(files)
73 source = sorted([x.as_posix() for x in source])
74 data = DataFrame()
75 data['source'] = source
76 data['target'] = source
78 # rename
79 for rule in rename_rules:
80 data.target = data.target.apply(
81 lambda x: rpt.replace_and_format(
82 rule['regex'], rule['replace'], x
83 )
84 )
86 # group
87 data['groups'] = data.source.apply(lambda x: [])
88 for rule in group_rules:
89 mask = data.source \
90 .apply(lambda x: re.search(rule['regex'], x)) \
91 .astype(bool)
92 data.loc[mask, 'groups'] = data.groups \
93 .apply(lambda x: x + [rule['name']])
94 mask = data.groups.apply(lambda x: x == [])
95 data.loc[mask, 'groups'] = data.loc[mask, 'groups'] \
96 .apply(lambda x: ['base'])
98 # line
99 groups = set([x['group'] for x in line_rules])
100 data['line_rule'] = data.groups \
101 .apply(lambda x: len(set(x).intersection(groups)) > 0)
103 return data
105 @classmethod
106 def from_yaml(cls, filepath):
107 # type: (Union[str, Path]) -> ConformETL
108 '''
109 Construct ConformETL instance from given yaml file.
111 Args:
112 filepath (str or Path): YAML file.
114 Raises:
115 EnforceError: If file does not end in yml or yaml.
117 Returns:
118 ConformETL: ConformETL instance.
119 '''
120 filepath = Path(filepath).as_posix()
121 ext = Path(filepath).suffix[1:].lower()
122 msg = f'{filepath} does not end in yml or yaml.'
123 Enforce(ext, 'in', ['yml', 'yaml'], message=msg)
124 # ----------------------------------------------------------------------
126 with open(filepath) as f:
127 config = yaml.safe_load(f)
128 return cls(**config)
130 def __init__(
131 self, source_rules=[], rename_rules=[], group_rules=[], line_rules=[]
132 ):
133 # type: (Rules, Rules, Rules, Rules) -> None
134 '''
135 Generates DataFrame from given source_rules and then generates target
136 paths for them given other rules.
138 Args:
139 source_rules (Rules): A list of rules for parsing directories.
140 Default: [].
141 rename_rules (Rules): A list of rules for renaming source filepath
142 to target filepaths. Default: [].
143 group_rules (Rules): A list of rules for grouping files.
144 Default: [].
145 line_rules (Rules): A list of rules for peforming line copies on
146 files belonging to a given group. Default: [].
148 Raises:
149 DataError: If configuration is invalid.
150 '''
151 config = dict(
152 source_rules=source_rules,
153 rename_rules=rename_rules,
154 group_rules=group_rules,
155 line_rules=line_rules,
156 )
157 cfg = ConformConfig(config)
158 cfg.validate()
159 config = cfg.to_native()
161 self._data = self._get_data(
162 source_rules=source_rules,
163 rename_rules=rename_rules,
164 group_rules=group_rules,
165 line_rules=line_rules,
166 ) # type: DataFrame
167 self._line_rules = line_rules # type: Rules
169 def __repr__(self):
170 # type: () -> str
171 '''
172 String representation of conform DataFrame.
174 Returns:
175 str: Table optimized for output to shell.
176 '''
177 data = self._data.copy()
178 data.line_rule = data.line_rule.apply(lambda x: 'X' if x else '')
179 data.rename(lambda x: x.upper(), axis=1, inplace=True)
180 output = data \
181 .to_string(index=False, max_colwidth=150, col_space=[50, 50, 20, 10])
182 return output
184 @property
185 def groups(self):
186 # type: () -> List[str]
187 '''
188 list[str]: List of groups found with self._data.
189 '''
190 output = self._data.groups.tolist()
191 output = sorted(list(set(chain(*output))))
192 output.remove('base')
193 output.insert(0, 'base')
194 return output
196 def to_dataframe(self):
197 # type: () -> DataFrame
198 '''
199 Returns:
200 DataFrame: Copy of internal data.
201 '''
202 return self._data.copy()
204 def to_blob(self):
205 # type: () -> BlobETL
206 '''
207 Converts self into a BlobETL object with target column as keys and
208 source columns as values.
210 Returns:
211 BlobETL: BlobETL of target and source filepaths.
212 '''
213 data = self._data
214 keys = data.target.tolist()
215 vals = data.source.tolist()
216 output = dict(zip(keys, vals))
217 return BlobETL(output)
219 def to_html(
220 self, orient='lr', color_scheme=CONFORM_COLOR_SCHEME, as_png=False
221 ):
222 # type: (str, Dict[str, str], bool) -> Union[Image, HTML]
223 '''
224 For use in inline rendering of graph data in Jupyter Lab.
225 Graph from target to source filepath. Target is in red, source is in
226 cyan.
228 Args:
229 orient (str, optional): Graph layout orientation. Default: lr.
230 Options include:
232 * tb - top to bottom
233 * bt - bottom to top
234 * lr - left to right
235 * rl - right to left
236 color_scheme: (dict, optional): Color scheme to be applied to graph.
237 Default: rolling_pin.conform_etl.CONFORM_COLOR_SCHEME
238 as_png (bool, optional): Display graph as a PNG image instead of
239 SVG. Useful for display on Github. Default: False.
241 Returns:
242 IPython.display.HTML: HTML object for inline display.
243 '''
244 return self.to_blob() \
245 .to_html(orient=orient, color_scheme=color_scheme, as_png=as_png)
247 def conform(self, groups='all'):
248 # type: (Union[str, List[str]]) -> None
249 '''
250 Copies source files to target filepaths.
252 Args:
253 groups (str or list[str]): Groups of files which are to be conformed.
254 'all' means all groups. Default: 'all'.
255 '''
256 if isinstance(groups, str):
257 groups = [groups]
258 if groups == ['all']:
259 groups = self.groups
261 data = self.to_dataframe()
263 # copy files
264 grps = set(groups)
265 mask = data.groups \
266 .apply(lambda x: set(x).intersection(grps)) \
267 .apply(lambda x: len(x) > 0)
268 data = data[mask]
269 data.apply(lambda x: rpt.copy_file(x.source, x.target), axis=1)
271 # copy lines
272 data['text'] = data.source.apply(lambda x: lbt.try_(rpt.read_text, x, 'error'))
273 readable_mask = data.text.apply(lambda x: isinstance(x, str))
274 data.loc[~readable_mask, 'text'] = ''
275 rules = list(filter(lambda x: x['group'] in groups, self._line_rules))
276 for rule in rules:
277 mask = data.groups.apply(lambda x: rule['group'] in x)
278 data.loc[mask, 'text'] = data.loc[mask, 'text'].apply(
279 lambda x: rpt.filter_text(
280 x,
281 include_regex=rule.get('include', None),
282 exclude_regex=rule.get('exclude', None),
283 replace_regex=rule.get('regex', None),
284 replace_value=rule.get('replace', None),
285 )
286 )
287 data[readable_mask].apply(lambda x: rpt.write_text(x.text, x.target), axis=1)