Coverage for /home/ubuntu/rolling-pin/python/rolling_pin/radon_etl.py: 100%
204 statements
« prev ^ index » next coverage.py v7.1.0, created at 2023-11-15 00:43 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2023-11-15 00:43 +0000
1from typing import Any, Dict, List, Union # noqa: F401
3import json
4import os
5import re
6from pathlib import Path
8import cufflinks as cf
9import numpy as np
10import pandas as pd
11from pandas import DataFrame
12import radon.complexity
13from radon.cli import Config
14from radon.cli import CCHarvester, HCHarvester, MIHarvester, RawHarvester
16from rolling_pin.blob_etl import BlobETL
17import rolling_pin.tools as rpt
18# ------------------------------------------------------------------------------
20'''
21Contain the RadonETL class, which is used for generating a radon report on the
22code wthin a given directory.
23'''
26class RadonETL():
27 '''
28 Conforms all four radon reports (raw metrics, Halstead, maintainability and
29 cyclomatic complexity) into a single DataFrame that can then be plotted.
30 '''
31 def __init__(self, fullpath):
32 # type: (Union[str, Path]) -> None
33 '''
34 Constructs a RadonETL instance.
36 Args:
37 fullpath (str or Path): Python file or directory of python files.
38 '''
39 self._report = RadonETL._get_radon_report(fullpath)
40 # --------------------------------------------------------------------------
42 @property
43 def report(self):
44 # type: () -> Dict
45 '''
46 dict: Dictionary of all radon metrics.
47 '''
48 return self._report
50 @property
51 def data(self):
52 # type: () -> DataFrame
53 '''
54 DataFrame: DataFrame of all radon metrics.
55 '''
56 return self._get_radon_data()
58 @property
59 def raw_metrics(self):
60 # type: () -> DataFrame
61 '''
62 DataFrame: DataFrame of radon raw metrics.
63 '''
64 return self._get_raw_metrics_dataframe(self._report)
66 @property
67 def maintainability_index(self):
68 # type: () -> DataFrame
69 '''
70 DataFrame: DataFrame of radon maintainability index metrics.
71 '''
72 return self._get_maintainability_index_dataframe(self._report)
74 @property
75 def cyclomatic_complexity_metrics(self):
76 # type: () -> DataFrame
77 '''
78 DataFrame: DataFrame of radon cyclomatic complexity metrics.
79 '''
80 return self._get_cyclomatic_complexity_dataframe(self._report)
82 @property
83 def halstead_metrics(self):
84 # type: () -> DataFrame
85 '''
86 DataFrame: DataFrame of radon Halstead metrics.
87 '''
88 return self._get_halstead_dataframe(self._report)
89 # --------------------------------------------------------------------------
91 def _get_radon_data(self):
92 # type: () -> DataFrame
93 '''
94 Constructs a DataFrame representing all the radon reports generated for
95 a given python file or directory containing python files.
97 Returns:
98 DataFrame: Radon report DataFrame.
99 '''
100 hal = self.halstead_metrics
101 cc = self.cyclomatic_complexity_metrics
102 raw = self.raw_metrics
103 mi = self.maintainability_index
105 data = hal.merge(cc, how='outer', on=['fullpath', 'name'])
106 data['object_type'] = data.object_type_x
107 mask = data.object_type_x.apply(pd.isnull)
108 mask = data[mask].index
109 data.loc[mask, 'object_type'] = data.loc[mask, 'object_type_y']
110 del data['object_type_x']
111 del data['object_type_y']
113 module = raw.merge(mi, on='fullpath')
115 cols = set(module.columns.tolist()) # type: Any
116 cols = cols.difference(data.columns.tolist())
117 cols = list(cols)
118 for col in cols:
119 data[col] = np.nan
121 mask = data.object_type == 'module'
122 for i, row in data[mask].iterrows():
123 for col in cols:
124 val = module[module.fullpath == row.fullpath][col].item()
125 data.loc[i, col] = val
127 cols = [
128 'fullpath', 'name', 'class_name', 'object_type', 'blank', 'bugs',
129 'calculated_length', 'code', 'column_offset', 'comment',
130 'cyclomatic_complexity', 'cyclomatic_rank', 'difficulty', 'effort',
131 'h1', 'h2', 'length', 'logical_code', 'maintainability_index',
132 'maintainability_rank', 'multiline_comment', 'n1', 'n2',
133 'single_comment', 'source_code', 'start_line', 'stop_line', 'time',
134 'vocabulary', 'volume',
135 ]
136 data = data[cols]
138 return data
139 # --------------------------------------------------------------------------
141 @staticmethod
142 def _get_radon_report(fullpath):
143 # type: (Union[str, Path]) -> Dict[str, Any]
144 '''
145 Gets all 4 report from radon and aggregates them into a single blob
146 object.
148 Args:
149 fullpath (str or Path): Python file or directory of python files.
151 Returns:
152 dict: Radon report blob.
153 '''
154 fullpath_ = [Path(fullpath).absolute().as_posix()] # type: List[str]
155 output = [] # type: Any
157 config = Config(
158 min='A',
159 max='F',
160 exclude=None,
161 ignore=None,
162 show_complexity=False,
163 average=False,
164 total_average=False,
165 order=getattr(
166 radon.complexity, 'SCORE', getattr(radon.complexity, 'SCORE')
167 ),
168 no_assert=False,
169 show_closures=False,
170 )
171 output.append(CCHarvester(fullpath_, config).as_json())
173 config = Config(
174 exclude=None,
175 ignore=None,
176 summary=False,
177 )
178 output.append(RawHarvester(fullpath_, config).as_json())
180 config = Config(
181 min='A',
182 max='C',
183 exclude=None,
184 ignore=None,
185 multi=True,
186 show=False,
187 sort=False,
188 )
189 output.append(MIHarvester(fullpath_, config).as_json())
191 config = Config(
192 exclude=None,
193 ignore=None,
194 by_function=False,
195 )
196 output.append(HCHarvester(fullpath_, config).as_json())
198 output = list(map(json.loads, output))
199 keys = [
200 'cyclomatic_complexity', 'raw_metrics', 'maintainability_index',
201 'halstead_metrics',
202 ]
203 output = dict(zip(keys, output))
204 return output
206 @staticmethod
207 def _get_raw_metrics_dataframe(report):
208 # type: (Dict) -> DataFrame
209 '''
210 Converts radon raw metrics report into a pandas DataFrame.
212 Args:
213 report (dict): Radon report blob.
215 Returns:
216 DataFrame: Raw metrics DataFrame.
217 '''
218 raw = report['raw_metrics']
219 fullpaths = list(raw.keys())
220 path_lut = {k: f'<list_{i}>' for i, k in enumerate(fullpaths)}
221 fullpath_fields = {x: {'fullpath': x} for x in fullpaths}
223 # loc = Lines of Code (total lines) - sloc + blanks + multi + single_comments
224 # lloc = Logical Lines of Code
225 # comments = Comments lines
226 # multi = Multi-line strings (assumed to be docstrings)
227 # blank = Blank lines (or whitespace-only lines)
228 # single_comments = Single-line comments or docstrings
229 name_lut = dict(
230 blank='blank',
231 comments='comment',
232 lloc='logical_code',
233 loc='code',
234 multi='multiline_comment',
235 single_comments='single_comment',
236 sloc='source_code',
237 fullpath='fullpath',
238 )
239 data = BlobETL(raw, '#')\
240 .update(fullpath_fields) \
241 .set_field(0, lambda x: path_lut[x])\
242 .set_field(1, lambda x: name_lut[x])\
243 .to_dict() # type: Union[Dict, DataFrame]
245 data = DataFrame(data)
246 data.sort_values('fullpath', inplace=True)
247 data.reset_index(drop=True, inplace=True)
248 cols = [
249 'fullpath', 'blank', 'code', 'comment', 'logical_code',
250 'multiline_comment', 'single_comment', 'source_code',
251 ]
252 data = data[cols]
254 return data
256 @staticmethod
257 def _get_maintainability_index_dataframe(report):
258 # type: (Dict) -> DataFrame
259 '''
260 Converts radon maintainability index report into a pandas DataFrame.
262 Args:
263 report (dict): Radon report blob.
265 Returns:
266 DataFrame: Maintainability DataFrame.
267 '''
268 mi = report['maintainability_index']
269 fullpaths = list(mi.keys())
270 path_lut = {k: f'<list_{i}>' for i, k in enumerate(fullpaths)}
271 fullpath_fields = {x: {'fullpath': x} for x in fullpaths}
272 name_lut = dict(
273 mi='maintainability_index',
274 rank='maintainability_rank',
275 fullpath='fullpath',
276 )
277 data = None # type: Any
278 data = BlobETL(mi, '#')\
279 .update(fullpath_fields) \
280 .set_field(0, lambda x: path_lut[x])\
281 .set_field(1, lambda x: name_lut[x])\
282 .to_dict()
284 data = DataFrame(data)
285 data.sort_values('fullpath', inplace=True)
286 data.reset_index(drop=True, inplace=True)
287 cols = ['fullpath', 'maintainability_index', 'maintainability_rank']
288 data = data[cols]
290 # convert rank to integer
291 rank_lut = {k: i for i, k in enumerate('ABCDEF')}
292 data['maintainability_rank'] = data['maintainability_rank']\
293 .apply(lambda x: rank_lut[x])
295 return data
297 @staticmethod
298 def _get_cyclomatic_complexity_dataframe(report):
299 # type: (Dict) -> DataFrame
300 '''
301 Converts radon cyclomatic complexity report into a pandas DataFrame.
303 Args:
304 report (dict): Radon report blob.
306 Returns:
307 DataFrame: Cyclomatic complexity DataFrame.
308 '''
309 filters = [
310 [4, 6, 'method_closure',
311 '^[^#]+#<list_[0-9]+>#methods#<list_[0-9]+>#closures#<list_[0-9]+>#[^#]+$'],
312 [3, 4, 'closure', '^[^#]+#<list_[0-9]+>#closures#<list_[0-9]+>#[^#]+$'],
313 [3, 4, 'method', '^[^#]+#<list_[0-9]+>#methods#<list_[0-9]+>#[^#]+$'],
314 [2, 2, None, '^[^#]+#<list_[0-9]+>#[^#]+$'],
315 ] # type: Any
317 cc = report['cyclomatic_complexity']
318 rows = []
319 for i, j, type_, regex in filters:
320 temp = BlobETL(cc, '#').query(regex) # type: DataFrame
321 if len(temp.to_flat_dict().keys()) > 0:
322 temp = temp.to_dataframe(i)
323 item = temp\
324 .apply(lambda x: dict(zip(x[j], x['value'])), axis=1)\
325 .tolist()
326 item = DataFrame(item)
327 item['fullpath'] = temp[0]
328 if type_ is not None:
329 item.type = type_
330 rows.append(item)
331 data = pd.concat(rows, ignore_index=True, sort=False)
333 cols = [
334 'fullpath', 'name', 'classname', 'type', 'complexity', 'rank',
335 'lineno', 'endline', 'col_offset'
336 ]
337 data = data[cols]
338 lut = {
339 'fullpath': 'fullpath',
340 'name': 'name',
341 'classname': 'class_name',
342 'type': 'object_type',
343 'complexity': 'cyclomatic_complexity',
344 'rank': 'cyclomatic_rank',
345 'lineno': 'start_line',
346 'endline': 'stop_line',
347 'col_offset': 'column_offset',
348 }
349 data.drop_duplicates(inplace=True)
350 data.rename(mapper=lambda x: lut[x], axis=1, inplace=True)
351 data.reset_index(drop=True, inplace=True)
353 # convert rank to integer
354 rank_lut = {k: i for i, k in enumerate('ABCDEF')}
355 data['cyclomatic_rank'] = data['cyclomatic_rank']\
356 .apply(lambda x: rank_lut[x])
358 return data
360 @staticmethod
361 def _get_halstead_dataframe(report):
362 # type: (Dict) -> DataFrame
363 '''
364 Converts radon Halstead report into a pandas DataFrame.
366 Args:
367 report (dict): Radon report blob.
369 Returns:
370 DataFrame: Halstead DataFrame.
371 '''
372 hal = report['halstead_metrics']
373 keys = [
374 'h1', 'h2', 'n1', 'n2', 'vocabulary', 'length', 'calculated_length',
375 'volume', 'difficulty', 'effort', 'time', 'bugs',
376 ]
377 data = BlobETL(hal, '#').query('function|closure').to_dataframe(3)
378 data['fullpath'] = data[0]
379 data['object_type'] = data[1].apply(lambda x: re.sub('s$', '', x))
380 data['name'] = data.value.apply(lambda x: x[0])
382 score = data.value.apply(lambda x: dict(zip(keys, x[1:]))).tolist()
383 score = DataFrame(score)
384 data = data.join(score)
386 total = BlobETL(hal, '#').query('total').to_dataframe()
387 total['fullpath'] = total[0]
388 total = total.groupby('fullpath', as_index=False)\
389 .agg(lambda x: dict(zip(keys, x)))
390 score = total.value.tolist()
391 score = DataFrame(score)
392 total = total.join(score)
393 total['object_type'] = 'module'
394 total['name'] = total.fullpath\
395 .apply(lambda x: os.path.splitext((Path(x).name))[0])
396 data = pd.concat([data, total], ignore_index=True, sort=False)
398 cols = ['fullpath', 'name', 'object_type']
399 cols.extend(keys)
400 data = data[cols]
402 return data
404 # EXPORT--------------------------------------------------------------------
405 def write_plots(self, fullpath):
406 # type: (Union[str, Path]) -> RadonETL
407 '''
408 Writes metrics plots to given file.
410 Args:
411 fullpath (Path or str): Target file.
413 Returns:
414 RadonETL: self.
415 '''
416 cf.go_offline()
418 def remove_test_modules(data):
419 # type: (DataFrame) -> DataFrame
420 mask = data.fullpath\
421 .apply(lambda x: not re.search(r'_test\.py$', x)).astype(bool)
422 return data[mask]
424 lut = dict(
425 h1='h1 - the number of distinct operators',
426 h2='h2 - the number of distinct operands',
427 n1='n1 - the total number of operators',
428 n2='n2 - the total number of operands',
429 vocabulary='vocabulary (h) - h1 + h2',
430 length='length (N) - n1 + n2',
431 calculated_length='calculated_length - h1 * log2(h1) + h2 * log2(h2)',
432 volume='volume (V) - N * log2(h)',
433 difficulty='difficulty (D) - h1 / 2 * n2 / h2',
434 effort='effort (E) - D * V',
435 time='time (T) - E / 18 seconds',
436 bugs='bugs (B) - V / 3000 - an estimate of the errors in the implementation',
437 )
439 params = dict(
440 theme='henanigans',
441 colors=rpt.COLOR_SCALE,
442 dimensions=(900, 900),
443 asFigure=True,
444 )
446 html = '<body style="background: #242424">\n'
448 raw = remove_test_modules(self.raw_metrics)
449 mi = remove_test_modules(self.maintainability_index)
450 cc = remove_test_modules(self.cyclomatic_complexity_metrics)
451 hal = remove_test_modules(self.halstead_metrics)
453 raw['docstring_ratio'] = raw.multiline_comment / raw.code
454 raw.sort_values('docstring_ratio', inplace=True)
455 html += raw.iplot(
456 x='fullpath',
457 kind='barh',
458 title='Line Count Metrics',
459 **params
460 ).to_html()
462 html += mi.iplot(
463 x='fullpath',
464 kind='barh',
465 title='Maintainability Metrics',
466 **params
467 ).to_html()
469 params['dimensions'] = (900, 500)
471 cols = ['cyclomatic_complexity', 'cyclomatic_rank']
472 html += cc[cols].iplot(
473 kind='hist',
474 bins=50,
475 title='Cyclomatic Metric Distributions',
476 **params
477 ).to_html()
479 cols = [
480 'h1', 'h2', 'n1', 'n2', 'vocabulary', 'length', 'calculated_length',
481 'volume', 'difficulty', 'effort', 'time', 'bugs'
482 ]
483 html += hal[cols]\
484 .rename(mapper=lambda x: lut[x], axis=1)\
485 .iplot(
486 kind='hist',
487 bins=50,
488 title='Halstead Metric Distributions',
489 **params)\
490 .to_html()
492 html += '\n</body>'
494 with open(fullpath, 'w') as f:
495 f.write(html)
497 return self
499 def write_tables(self, target_dir):
500 # type: (Union[str, Path]) -> RadonETL
501 '''
502 Writes metrics tables as HTML files to given directory.
504 Args:
505 target_dir (Path or str): Target directory.
507 Returns:
508 RadonETL: self.
509 '''
510 def write_table(data, target):
511 # type: (DataFrame, Path) -> None
512 html = data.to_html()
514 # make table sortable
515 script = '<script '
516 script += 'src="http://www.kryogenix.org/code/browser/sorttable/sorttable.js" '
517 script += 'type="text/javascript"></script>\n'
518 html = re.sub('class="dataframe"', 'class="sortable"', html)
519 html = script + html
521 with open(target, 'w') as f:
522 f.write(html)
524 data = self.data
525 raw = self.raw_metrics
526 mi = self.maintainability_index
527 cc = self.cyclomatic_complexity_metrics
528 hal = self.halstead_metrics
530 write_table(data, Path(target_dir, 'all_metrics.html'))
531 write_table(raw, Path(target_dir, 'raw_metrics.html'))
532 write_table(mi, Path(target_dir, 'maintainability_metrics.html'))
533 write_table(cc, Path(target_dir, 'cyclomatic_complexity_metrics.html'))
534 write_table(hal, Path(target_dir, 'halstead_metrics.html'))
536 return self