Coverage for /home/ubuntu/rolling-pin/python/rolling_pin/radon_etl.py: 100%
208 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-13 19:35 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-13 19:35 +0000
1from typing import Any, Dict, List, Union # noqa: F401
3import json
4import os
5import re
6from pathlib import Path
8from pandas import DataFrame
9from radon.cli import CCHarvester, HCHarvester, MIHarvester, RawHarvester
10from radon.cli import Config
11import numpy as np
12import pandas as pd
13import plotly.express as px
14import radon.complexity
16from rolling_pin.blob_etl import BlobETL
17import rolling_pin.tools as rpt
18# ------------------------------------------------------------------------------
20'''
21Contain the RadonETL class, which is used for generating a radon report on the
22code wthin a given directory.
23'''
26class RadonETL():
27 '''
28 Conforms all four radon reports (raw metrics, Halstead, maintainability and
29 cyclomatic complexity) into a single DataFrame that can then be plotted.
30 '''
31 def __init__(self, fullpath):
32 # type: (Union[str, Path]) -> None
33 '''
34 Constructs a RadonETL instance.
36 Args:
37 fullpath (str or Path): Python file or directory of python files.
38 '''
39 self._report = RadonETL._get_radon_report(fullpath)
40 # --------------------------------------------------------------------------
42 @property
43 def report(self):
44 # type: () -> Dict
45 '''
46 dict: Dictionary of all radon metrics.
47 '''
48 return self._report
50 @property
51 def data(self):
52 # type: () -> DataFrame
53 '''
54 DataFrame: DataFrame of all radon metrics.
55 '''
56 return self._get_radon_data()
58 @property
59 def raw_metrics(self):
60 # type: () -> DataFrame
61 '''
62 DataFrame: DataFrame of radon raw metrics.
63 '''
64 return self._get_raw_metrics_dataframe(self._report)
66 @property
67 def maintainability_index(self):
68 # type: () -> DataFrame
69 '''
70 DataFrame: DataFrame of radon maintainability index metrics.
71 '''
72 return self._get_maintainability_index_dataframe(self._report)
74 @property
75 def cyclomatic_complexity_metrics(self):
76 # type: () -> DataFrame
77 '''
78 DataFrame: DataFrame of radon cyclomatic complexity metrics.
79 '''
80 return self._get_cyclomatic_complexity_dataframe(self._report)
82 @property
83 def halstead_metrics(self):
84 # type: () -> DataFrame
85 '''
86 DataFrame: DataFrame of radon Halstead metrics.
87 '''
88 return self._get_halstead_dataframe(self._report)
89 # --------------------------------------------------------------------------
91 def _get_radon_data(self):
92 # type: () -> DataFrame
93 '''
94 Constructs a DataFrame representing all the radon reports generated for
95 a given python file or directory containing python files.
97 Returns:
98 DataFrame: Radon report DataFrame.
99 '''
100 hal = self.halstead_metrics
101 cc = self.cyclomatic_complexity_metrics
102 raw = self.raw_metrics
103 mi = self.maintainability_index
105 data = hal.merge(cc, how='outer', on=['fullpath', 'name'])
106 data['object_type'] = data.object_type_x
107 mask = data.object_type_x.apply(pd.isnull)
108 mask = data[mask].index
109 data.loc[mask, 'object_type'] = data.loc[mask, 'object_type_y']
110 del data['object_type_x']
111 del data['object_type_y']
113 module = raw.merge(mi, on='fullpath')
115 cols = set(module.columns.tolist()) # type: Any
116 cols = cols.difference(data.columns.tolist())
117 cols = list(cols)
118 for col in cols:
119 data[col] = np.nan
121 mask = data.object_type == 'module'
122 for i, row in data[mask].iterrows():
123 for col in cols:
124 val = module[module.fullpath == row.fullpath][col].item()
125 data.loc[i, col] = val
127 cols = [
128 'fullpath', 'name', 'class_name', 'object_type', 'blank', 'bugs',
129 'calculated_length', 'code', 'column_offset', 'comment',
130 'cyclomatic_complexity', 'cyclomatic_rank', 'difficulty', 'effort',
131 'h1', 'h2', 'length', 'logical_code', 'maintainability_index',
132 'maintainability_rank', 'multiline_comment', 'n1', 'n2',
133 'single_comment', 'source_code', 'start_line', 'stop_line', 'time',
134 'vocabulary', 'volume',
135 ]
136 data = data[cols]
138 return data
139 # --------------------------------------------------------------------------
141 @staticmethod
142 def _get_radon_report(fullpath):
143 # type: (Union[str, Path]) -> Dict[str, Any]
144 '''
145 Gets all 4 report from radon and aggregates them into a single blob
146 object.
148 Args:
149 fullpath (str or Path): Python file or directory of python files.
151 Returns:
152 dict: Radon report blob.
153 '''
154 fullpath_ = [Path(fullpath).absolute().as_posix()] # type: List[str]
155 output = [] # type: Any
157 config = Config(
158 min='A',
159 max='F',
160 exclude=None,
161 ignore=None,
162 show_complexity=False,
163 average=False,
164 total_average=False,
165 order=getattr(
166 radon.complexity, 'SCORE', getattr(radon.complexity, 'SCORE')
167 ),
168 no_assert=False,
169 show_closures=False,
170 )
171 output.append(CCHarvester(fullpath_, config).as_json())
173 config = Config(
174 exclude=None,
175 ignore=None,
176 summary=False,
177 )
178 output.append(RawHarvester(fullpath_, config).as_json())
180 config = Config(
181 min='A',
182 max='C',
183 exclude=None,
184 ignore=None,
185 multi=True,
186 show=False,
187 sort=False,
188 )
189 output.append(MIHarvester(fullpath_, config).as_json())
191 config = Config(
192 exclude=None,
193 ignore=None,
194 by_function=False,
195 )
196 output.append(HCHarvester(fullpath_, config).as_json())
198 output = list(map(json.loads, output))
199 keys = [
200 'cyclomatic_complexity', 'raw_metrics', 'maintainability_index',
201 'halstead_metrics',
202 ]
203 output = dict(zip(keys, output))
204 return output
206 @staticmethod
207 def _get_raw_metrics_dataframe(report):
208 # type: (Dict) -> DataFrame
209 '''
210 Converts radon raw metrics report into a pandas DataFrame.
212 Args:
213 report (dict): Radon report blob.
215 Returns:
216 DataFrame: Raw metrics DataFrame.
217 '''
218 raw = report['raw_metrics']
219 fullpaths = list(raw.keys())
220 path_lut = {k: f'<list_{i}>' for i, k in enumerate(fullpaths)}
221 fullpath_fields = {x: {'fullpath': x} for x in fullpaths}
223 # loc = Lines of Code (total lines) - sloc + blanks + multi + single_comments
224 # lloc = Logical Lines of Code
225 # comments = Comments lines
226 # multi = Multi-line strings (assumed to be docstrings)
227 # blank = Blank lines (or whitespace-only lines)
228 # single_comments = Single-line comments or docstrings
229 name_lut = dict(
230 blank='blank',
231 comments='comment',
232 lloc='logical_code',
233 loc='code',
234 multi='multiline_comment',
235 single_comments='single_comment',
236 sloc='source_code',
237 fullpath='fullpath',
238 )
239 data = BlobETL(raw, '#')\
240 .update(fullpath_fields) \
241 .set_field(0, lambda x: path_lut[x])\
242 .set_field(1, lambda x: name_lut[x])\
243 .to_dict() # type: Union[Dict, DataFrame]
245 data = DataFrame(data)
246 data.sort_values('fullpath', inplace=True)
247 data.reset_index(drop=True, inplace=True)
248 cols = [
249 'fullpath', 'blank', 'code', 'comment', 'logical_code',
250 'multiline_comment', 'single_comment', 'source_code',
251 ]
252 data = data[cols]
254 return data
256 @staticmethod
257 def _get_maintainability_index_dataframe(report):
258 # type: (Dict) -> DataFrame
259 '''
260 Converts radon maintainability index report into a pandas DataFrame.
262 Args:
263 report (dict): Radon report blob.
265 Returns:
266 DataFrame: Maintainability DataFrame.
267 '''
268 mi = report['maintainability_index']
269 fullpaths = list(mi.keys())
270 path_lut = {k: f'<list_{i}>' for i, k in enumerate(fullpaths)}
271 fullpath_fields = {x: {'fullpath': x} for x in fullpaths}
272 name_lut = dict(
273 mi='maintainability_index',
274 rank='maintainability_rank',
275 fullpath='fullpath',
276 )
277 data = None # type: Any
278 data = BlobETL(mi, '#')\
279 .update(fullpath_fields) \
280 .set_field(0, lambda x: path_lut[x])\
281 .set_field(1, lambda x: name_lut[x])\
282 .to_dict()
284 data = DataFrame(data)
285 data.sort_values('fullpath', inplace=True)
286 data.reset_index(drop=True, inplace=True)
287 cols = ['fullpath', 'maintainability_index', 'maintainability_rank']
288 data = data[cols]
290 # convert rank to integer
291 rank_lut = {k: i for i, k in enumerate('ABCDEF')}
292 data['maintainability_rank'] = data['maintainability_rank']\
293 .apply(lambda x: rank_lut[x])
295 return data
297 @staticmethod
298 def _get_cyclomatic_complexity_dataframe(report):
299 # type: (Dict) -> DataFrame
300 '''
301 Converts radon cyclomatic complexity report into a pandas DataFrame.
303 Args:
304 report (dict): Radon report blob.
306 Returns:
307 DataFrame: Cyclomatic complexity DataFrame.
308 '''
309 filters = [
310 [4, 6, 'method_closure',
311 '^[^#]+#<list_[0-9]+>#methods#<list_[0-9]+>#closures#<list_[0-9]+>#[^#]+$'],
312 [3, 4, 'closure', '^[^#]+#<list_[0-9]+>#closures#<list_[0-9]+>#[^#]+$'],
313 [3, 4, 'method', '^[^#]+#<list_[0-9]+>#methods#<list_[0-9]+>#[^#]+$'],
314 [2, 2, None, '^[^#]+#<list_[0-9]+>#[^#]+$'],
315 ] # type: Any
317 cc = report['cyclomatic_complexity']
318 rows = []
319 for i, j, type_, regex in filters:
320 temp = BlobETL(cc, '#').query(regex) # type: DataFrame
321 if len(temp.to_flat_dict().keys()) > 0:
322 temp = temp.to_dataframe(i)
323 item = temp\
324 .apply(lambda x: dict(zip(x[j], x['value'])), axis=1)\
325 .tolist()
326 item = DataFrame(item)
327 item['fullpath'] = temp[0]
328 if type_ is not None:
329 item.type = type_
330 rows.append(item)
331 data = pd.concat(rows, ignore_index=True, sort=False)
333 cols = [
334 'fullpath', 'name', 'classname', 'type', 'complexity', 'rank',
335 'lineno', 'endline', 'col_offset'
336 ]
337 data = data[cols]
338 lut = {
339 'fullpath': 'fullpath',
340 'name': 'name',
341 'classname': 'class_name',
342 'type': 'object_type',
343 'complexity': 'cyclomatic_complexity',
344 'rank': 'cyclomatic_rank',
345 'lineno': 'start_line',
346 'endline': 'stop_line',
347 'col_offset': 'column_offset',
348 }
349 data.drop_duplicates(inplace=True)
350 data.rename(mapper=lambda x: lut[x], axis=1, inplace=True)
351 data.reset_index(drop=True, inplace=True)
353 # convert rank to integer
354 rank_lut = {k: i for i, k in enumerate('ABCDEF')}
355 data['cyclomatic_rank'] = data['cyclomatic_rank']\
356 .apply(lambda x: rank_lut[x])
358 return data
360 @staticmethod
361 def _get_halstead_dataframe(report):
362 # type: (Dict) -> DataFrame
363 '''
364 Converts radon Halstead report into a pandas DataFrame.
366 Args:
367 report (dict): Radon report blob.
369 Returns:
370 DataFrame: Halstead DataFrame.
371 '''
372 hal = report['halstead_metrics']
373 keys = [
374 'h1', 'h2', 'n1', 'n2', 'vocabulary', 'length', 'calculated_length',
375 'volume', 'difficulty', 'effort', 'time', 'bugs',
376 ]
377 data = BlobETL(hal, '#').query('function|closure').to_dataframe(3)
378 data['fullpath'] = data[0]
379 data['object_type'] = data[1].apply(lambda x: re.sub('s$', '', x))
380 data['name'] = data.value.apply(lambda x: x[0])
382 score = data.value.apply(lambda x: dict(zip(keys, x[1:]))).tolist()
383 score = DataFrame(score)
384 data = data.join(score)
386 total = BlobETL(hal, '#').query('total').to_dataframe()
387 total['fullpath'] = total[0]
388 total = total.groupby('fullpath', as_index=False)\
389 .agg(lambda x: dict(zip(keys, x)))
390 score = total.value.tolist()
391 score = DataFrame(score)
392 total = total.join(score)
393 total['object_type'] = 'module'
394 total['name'] = total.fullpath\
395 .apply(lambda x: os.path.splitext((Path(x).name))[0])
396 data = pd.concat([data, total], ignore_index=True, sort=False)
398 cols = ['fullpath', 'name', 'object_type']
399 cols.extend(keys)
400 data = data[cols]
402 return data
404 # EXPORT--------------------------------------------------------------------
405 def write_plots(self, fullpath):
406 # type: (Union[str, Path]) -> RadonETL
407 '''
408 Writes metrics plots to given file.
410 Args:
411 fullpath (Path or str): Target file.
413 Returns:
414 RadonETL: self.
415 '''
416 def remove_test_modules(data):
417 # type: (DataFrame) -> DataFrame
418 mask = data.fullpath\
419 .apply(lambda x: not re.search(r'_test\.py$', x)).astype(bool)
420 return data[mask]
422 lut = dict(
423 h1='h1 - the number of distinct operators',
424 h2='h2 - the number of distinct operands',
425 n1='n1 - the total number of operators',
426 n2='n2 - the total number of operands',
427 vocabulary='vocabulary (h) - h1 + h2',
428 length='length (N) - n1 + n2',
429 calculated_length='calculated_length - h1 * log2(h1) + h2 * log2(h2)',
430 volume='volume (V) - N * log2(h)',
431 difficulty='difficulty (D) - h1 / 2 * n2 / h2',
432 effort='effort (E) - D * V',
433 time='time (T) - E / 18 seconds',
434 bugs='bugs (B) - V / 3000 - an estimate of the errors in the implementation',
435 )
437 html = '<body style="background: #242424">\n'
439 raw = remove_test_modules(self.raw_metrics)
440 mi = remove_test_modules(self.maintainability_index)
441 cc = remove_test_modules(self.cyclomatic_complexity_metrics)
442 hal = remove_test_modules(self.halstead_metrics)
444 raw['docstring_ratio'] = raw.multiline_comment / raw.code
445 raw.sort_values('docstring_ratio', inplace=True)
447 # line count
448 fig = px.bar(
449 raw,
450 title='Line Count Metrics',
451 x=raw.drop(columns='fullpath').columns.tolist(),
452 y='fullpath',
453 orientation='h',
454 barmode='group',
455 width=900,
456 height=900,
457 color_discrete_sequence=rpt.COLOR_SCALE,
458 )
459 fig.layout.update(rpt.PLOTLY_LAYOUT_THEME)
460 html += fig.to_html()
462 # maintainability
463 fig = px.bar(
464 mi,
465 title='Maintainability Metrics',
466 x='maintainability_index',
467 y='fullpath',
468 orientation='h',
469 barmode='group',
470 width=900,
471 height=900,
472 color_discrete_sequence=rpt.COLOR_SCALE,
473 )
474 fig.layout.update(rpt.PLOTLY_LAYOUT_THEME)
475 html += fig.to_html()
477 # cyclomatic
478 fig = px.histogram(
479 cc[['cyclomatic_complexity', 'cyclomatic_rank']],
480 title='Cyclomatic Metric Distributions',
481 nbins=10,
482 width=900,
483 height=500,
484 color_discrete_sequence=rpt.COLOR_SCALE,
485 )
486 fig.layout.update(rpt.PLOTLY_LAYOUT_THEME)
487 html += fig.to_html()
489 # halstead
490 cols = [
491 'h1', 'h2', 'n1', 'n2', 'vocabulary', 'length', 'calculated_length',
492 'volume', 'difficulty', 'effort', 'time', 'bugs'
493 ]
494 fig = px.histogram(
495 hal[cols].rename(mapper=lambda x: lut[x], axis=1),
496 title='Halstead Metric Distributions',
497 nbins=10,
498 width=1400,
499 height=500,
500 color_discrete_sequence=rpt.COLOR_SCALE,
501 )
502 fig.layout.update(rpt.PLOTLY_LAYOUT_THEME)
503 html += fig.to_html()
505 html += '\n</body>'
507 with open(fullpath, 'w') as f:
508 f.write(html)
510 return self
512 def write_tables(self, target_dir):
513 # type: (Union[str, Path]) -> RadonETL
514 '''
515 Writes metrics tables as HTML files to given directory.
517 Args:
518 target_dir (Path or str): Target directory.
520 Returns:
521 RadonETL: self.
522 '''
523 def write_table(data, target):
524 # type: (DataFrame, Path) -> None
525 html = data.to_html()
527 # make table sortable
528 script = '<script '
529 script += 'src="http://www.kryogenix.org/code/browser/sorttable/sorttable.js" '
530 script += 'type="text/javascript"></script>\n'
531 html = re.sub('class="dataframe"', 'class="sortable"', html)
532 html = script + html
534 with open(target, 'w') as f:
535 f.write(html)
537 data = self.data
538 raw = self.raw_metrics
539 mi = self.maintainability_index
540 cc = self.cyclomatic_complexity_metrics
541 hal = self.halstead_metrics
543 write_table(data, Path(target_dir, 'all_metrics.html'))
544 write_table(raw, Path(target_dir, 'raw_metrics.html'))
545 write_table(mi, Path(target_dir, 'maintainability_metrics.html'))
546 write_table(cc, Path(target_dir, 'cyclomatic_complexity_metrics.html'))
547 write_table(hal, Path(target_dir, 'halstead_metrics.html'))
549 return self