Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from typing import Any, Dict, List, Union
3import json
4import os
5import re
6from pathlib import Path
8import cufflinks as cf
9import numpy as np
10import pandas as pd
11from pandas import DataFrame
12import radon.complexity
13from radon.cli import Config
14from radon.cli import CCHarvester, HCHarvester, MIHarvester, RawHarvester
16from rolling_pin.blob_etl import BlobETL
17import rolling_pin.tools as tools
18# ------------------------------------------------------------------------------
20'''
21Contain the RadonETL class, which is used for generating a radon report on the
22code wthin a given directory.
23'''
26class RadonETL():
27 '''
28 Conforms all four radon reports (raw metrics, Halstead, maintainability and
29 cyclomatic complexity) into a single DataFrame that can then be plotted.
30 '''
31 def __init__(self, fullpath):
32 # type: (Union[str, Path]) -> None
33 '''
34 Constructs a RadonETL instance.
36 Args:
37 fullpath (str or Path): Python file or directory of python files.
38 '''
39 self._report = RadonETL._get_radon_report(fullpath)
40 # --------------------------------------------------------------------------
42 @property
43 def report(self):
44 # type: () -> Dict
45 '''
46 dict: Dictionary of all radon metrics.
47 '''
48 return self._report
50 @property
51 def data(self):
52 # type: () -> DataFrame
53 '''
54 DataFrame: DataFrame of all radon metrics.
55 '''
56 return self._get_radon_data()
58 @property
59 def raw_metrics(self):
60 # type: () -> DataFrame
61 '''
62 DataFrame: DataFrame of radon raw metrics.
63 '''
64 return self._get_raw_metrics_dataframe(self._report)
66 @property
67 def maintainability_index(self):
68 # type: () -> DataFrame
69 '''
70 DataFrame: DataFrame of radon maintainability index metrics.
71 '''
72 return self._get_maintainability_index_dataframe(self._report)
74 @property
75 def cyclomatic_complexity_metrics(self):
76 # type: () -> DataFrame
77 '''
78 DataFrame: DataFrame of radon cyclomatic complexity metrics.
79 '''
80 return self._get_cyclomatic_complexity_dataframe(self._report)
82 @property
83 def halstead_metrics(self):
84 # type: () -> DataFrame
85 '''
86 DataFrame: DataFrame of radon Halstead metrics.
87 '''
88 return self._get_halstead_dataframe(self._report)
89 # --------------------------------------------------------------------------
91 def _get_radon_data(self):
92 # type: () -> DataFrame
93 '''
94 Constructs a DataFrame representing all the radon reports generated for
95 a given python file or directory containing python files.
97 Returns:
98 DataFrame: Radon report DataFrame.
99 '''
100 hal = self.halstead_metrics
101 cc = self.cyclomatic_complexity_metrics
102 raw = self.raw_metrics
103 mi = self.maintainability_index
105 data = hal.merge(cc, how='outer', on=['fullpath', 'name'])
106 data['object_type'] = data.object_type_x
107 mask = data.object_type_x.apply(pd.isnull)
108 mask = data[mask].index
109 data.loc[mask, 'object_type'] = data.loc[mask, 'object_type_y']
110 del data['object_type_x']
111 del data['object_type_y']
113 module = raw.merge(mi, on='fullpath')
115 cols = set(module.columns.tolist()) # type: Any
116 cols = cols.difference(data.columns.tolist())
117 cols = list(cols)
118 for col in cols:
119 data[col] = np.nan
121 mask = data.object_type == 'module'
122 for i, row in data[mask].iterrows():
123 for col in cols:
124 val = module[module.fullpath == row.fullpath][col].item()
125 data.loc[i, col] = val
127 cols = [
128 'fullpath', 'name', 'class_name', 'object_type', 'blank', 'bugs',
129 'calculated_length', 'code', 'column_offset', 'comment',
130 'cyclomatic_complexity', 'cyclomatic_rank', 'difficulty', 'effort',
131 'h1', 'h2', 'length', 'logical_code', 'maintainability_index',
132 'maintainability_rank', 'multiline_comment', 'n1', 'n2',
133 'single_comment', 'source_code', 'start_line', 'stop_line', 'time',
134 'vocabulary', 'volume',
135 ]
136 data = data[cols]
138 return data
139 # --------------------------------------------------------------------------
141 @staticmethod
142 def _get_radon_report(fullpath):
143 # type: (Union[str, Path]) -> Dict[str, Any]
144 '''
145 Gets all 4 report from radon and aggregates them into a single blob
146 object.
148 Args:
149 fullpath (str or Path): Python file or directory of python files.
151 Returns:
152 dict: Radon report blob.
153 '''
154 fullpath_ = [Path(fullpath).absolute().as_posix()] # type: List[str]
155 output = [] # type: Any
157 config = Config(
158 min='A',
159 max='F',
160 exclude=None,
161 ignore=None,
162 show_complexity=False,
163 average=False,
164 total_average=False,
165 order=getattr(
166 radon.complexity, 'SCORE', getattr(radon.complexity, 'SCORE')
167 ),
168 no_assert=False,
169 show_closures=False,
170 )
171 output.append(CCHarvester(fullpath_, config).as_json())
173 config = Config(
174 exclude=None,
175 ignore=None,
176 summary=False,
177 )
178 output.append(RawHarvester(fullpath_, config).as_json())
180 config = Config(
181 min='A',
182 max='C',
183 exclude=None,
184 ignore=None,
185 multi=True,
186 show=False,
187 sort=False,
188 )
189 output.append(MIHarvester(fullpath_, config).as_json())
191 config = Config(
192 exclude=None,
193 ignore=None,
194 by_function=False,
195 )
196 output.append(HCHarvester(fullpath_, config).as_json())
198 output = list(map(json.loads, output))
199 keys = [
200 'cyclomatic_complexity', 'raw_metrics', 'maintainability_index',
201 'halstead_metrics',
202 ]
203 output = dict(zip(keys, output))
204 return output
206 @staticmethod
207 def _get_raw_metrics_dataframe(report):
208 # type: (Dict) -> DataFrame
209 '''
210 Converts radon raw metrics report into a pandas DataFrame.
212 Args:
213 report (dict): Radon report blob.
215 Returns:
216 DataFrame: Raw metrics DataFrame.
217 '''
218 raw = report['raw_metrics']
219 fullpaths = list(raw.keys())
220 path_lut = {k: f'<list_{i}>' for i, k in enumerate(fullpaths)}
221 fullpath_fields = {x: {'fullpath': x} for x in fullpaths}
223 # loc = Lines of Code (total lines) - sloc + blanks + multi + single_comments
224 # lloc = Logical Lines of Code
225 # comments = Comments lines
226 # multi = Multi-line strings (assumed to be docstrings)
227 # blank = Blank lines (or whitespace-only lines)
228 # single_comments = Single-line comments or docstrings
229 name_lut = dict(
230 blank='blank',
231 comments='comment',
232 lloc='logical_code',
233 loc='code',
234 multi='multiline_comment',
235 single_comments='single_comment',
236 sloc='source_code',
237 fullpath='fullpath',
238 )
239 data = BlobETL(raw, '#')\
240 .update(fullpath_fields) \
241 .set_field(0, lambda x: path_lut[x])\
242 .set_field(1, lambda x: name_lut[x])\
243 .to_dict() # type: Union[Dict, DataFrame]
245 data = DataFrame(data)
246 data.sort_values('fullpath', inplace=True)
247 data.reset_index(drop=True, inplace=True)
248 cols = [
249 'fullpath', 'blank', 'code', 'comment', 'logical_code',
250 'multiline_comment', 'single_comment', 'source_code',
251 ]
252 data = data[cols]
254 return data
256 @staticmethod
257 def _get_maintainability_index_dataframe(report):
258 # type: (Dict) -> DataFrame
259 '''
260 Converts radon maintainability index report into a pandas DataFrame.
262 Args:
263 report (dict): Radon report blob.
265 Returns:
266 DataFrame: Maintainability DataFrame.
267 '''
268 mi = report['maintainability_index']
269 fullpaths = list(mi.keys())
270 path_lut = {k: f'<list_{i}>' for i, k in enumerate(fullpaths)}
271 fullpath_fields = {x: {'fullpath': x} for x in fullpaths}
272 name_lut = dict(
273 mi='maintainability_index',
274 rank='maintainability_rank',
275 fullpath='fullpath',
276 )
277 data = None # type: Any
278 data = BlobETL(mi, '#')\
279 .update(fullpath_fields) \
280 .set_field(0, lambda x: path_lut[x])\
281 .set_field(1, lambda x: name_lut[x])\
282 .to_dict()
284 data = DataFrame(data)
285 data.sort_values('fullpath', inplace=True)
286 data.reset_index(drop=True, inplace=True)
287 cols = ['fullpath', 'maintainability_index', 'maintainability_rank']
288 data = data[cols]
290 # convert rank to integer
291 rank_lut = {k: i for i, k in enumerate('ABCDEF')}
292 data['maintainability_rank'] = data['maintainability_rank']\
293 .apply(lambda x: rank_lut[x])
295 return data
297 @staticmethod
298 def _get_cyclomatic_complexity_dataframe(report):
299 # type: (Dict) -> DataFrame
300 '''
301 Converts radon cyclomatic complexity report into a pandas DataFrame.
303 Args:
304 report (dict): Radon report blob.
306 Returns:
307 DataFrame: Cyclomatic complexity DataFrame.
308 '''
309 filters = [
310 [4, 6, 'method_closure',
311 '^[^#]+#<list_[0-9]+>#methods#<list_[0-9]+>#closures#<list_[0-9]+>#[^#]+$'],
312 [3, 4, 'closure', '^[^#]+#<list_[0-9]+>#closures#<list_[0-9]+>#[^#]+$'],
313 [3, 4, 'method', '^[^#]+#<list_[0-9]+>#methods#<list_[0-9]+>#[^#]+$'],
314 [2, 2, None, '^[^#]+#<list_[0-9]+>#[^#]+$'],
315 ] # type: Any
317 cc = report['cyclomatic_complexity']
318 data = DataFrame()
319 for i, j, type_, regex in filters:
320 temp = BlobETL(cc, '#').query(regex) # type: DataFrame
321 if len(temp.to_flat_dict().keys()) > 0:
322 temp = temp.to_dataframe(i)
323 item = temp\
324 .apply(lambda x: dict(zip(x[j], x['value'])), axis=1)\
325 .tolist()
326 item = DataFrame(item)
327 item['fullpath'] = temp[0]
328 if type_ is not None:
329 item.type = type_
330 data = data.append(item, ignore_index=True, sort=False)
332 cols = [
333 'fullpath', 'name', 'classname', 'type', 'complexity', 'rank',
334 'lineno', 'endline', 'col_offset'
335 ]
336 data = data[cols]
337 lut = {
338 'fullpath': 'fullpath',
339 'name': 'name',
340 'classname': 'class_name',
341 'type': 'object_type',
342 'complexity': 'cyclomatic_complexity',
343 'rank': 'cyclomatic_rank',
344 'lineno': 'start_line',
345 'endline': 'stop_line',
346 'col_offset': 'column_offset',
347 }
348 data.drop_duplicates(inplace=True)
349 data.rename(mapper=lambda x: lut[x], axis=1, inplace=True)
350 data.reset_index(drop=True, inplace=True)
352 # convert rank to integer
353 rank_lut = {k: i for i, k in enumerate('ABCDEF')}
354 data['cyclomatic_rank'] = data['cyclomatic_rank']\
355 .apply(lambda x: rank_lut[x])
357 return data
359 @staticmethod
360 def _get_halstead_dataframe(report):
361 # type: (Dict) -> DataFrame
362 '''
363 Converts radon Halstead report into a pandas DataFrame.
365 Args:
366 report (dict): Radon report blob.
368 Returns:
369 DataFrame: Halstead DataFrame.
370 '''
371 hal = report['halstead_metrics']
372 keys = [
373 'h1', 'h2', 'n1', 'n2', 'vocabulary', 'length', 'calculated_length',
374 'volume', 'difficulty', 'effort', 'time', 'bugs',
375 ]
376 data = BlobETL(hal, '#').query('function|closure').to_dataframe(3)
377 data['fullpath'] = data[0]
378 data['object_type'] = data[1].apply(lambda x: re.sub('s$', '', x))
379 data['name'] = data.value.apply(lambda x: x[0])
381 score = data.value.apply(lambda x: dict(zip(keys, x[1:]))).tolist()
382 score = DataFrame(score)
383 data = data.join(score)
385 total = BlobETL(hal, '#').query('total').to_dataframe()
386 total['fullpath'] = total[0]
387 total = total.groupby('fullpath', as_index=False)\
388 .agg(lambda x: dict(zip(keys, x)))
389 score = total.value.tolist()
390 score = DataFrame(score)
391 total = total.join(score)
392 total['object_type'] = 'module'
393 total['name'] = total.fullpath\
394 .apply(lambda x: os.path.splitext((Path(x).name))[0])
395 data = data.append(total, sort=False)
397 cols = ['fullpath', 'name', 'object_type']
398 cols.extend(keys)
399 data = data[cols]
401 return data
403 # EXPORT--------------------------------------------------------------------
404 def write_plots(self, fullpath):
405 # type: (Union[str, Path]) -> RadonETL
406 '''
407 Writes metrics plots to given file.
409 Args:
410 fullpath (Path or str): Target file.
412 Returns:
413 RadonETL: self.
414 '''
415 cf.go_offline()
417 def remove_test_modules(data):
418 # type: (DataFrame) -> DataFrame
419 mask = data.fullpath\
420 .apply(lambda x: not re.search(r'_test\.py$', x)).astype(bool)
421 return data[mask]
423 lut = dict(
424 h1='h1 - the number of distinct operators',
425 h2='h2 - the number of distinct operands',
426 n1='n1 - the total number of operators',
427 n2='n2 - the total number of operands',
428 vocabulary='vocabulary (h) - h1 + h2',
429 length='length (N) - n1 + n2',
430 calculated_length='calculated_length - h1 * log2(h1) + h2 * log2(h2)',
431 volume='volume (V) - N * log2(h)',
432 difficulty='difficulty (D) - h1 / 2 * n2 / h2',
433 effort='effort (E) - D * V',
434 time='time (T) - E / 18 seconds',
435 bugs='bugs (B) - V / 3000 - an estimate of the errors in the implementation',
436 )
438 params = dict(
439 theme='henanigans',
440 colors=tools.COLOR_SCALE,
441 dimensions=(900, 900),
442 asFigure=True,
443 )
445 html = '<body style="background: #242424">\n'
447 raw = remove_test_modules(self.raw_metrics)
448 mi = remove_test_modules(self.maintainability_index)
449 cc = remove_test_modules(self.cyclomatic_complexity_metrics)
450 hal = remove_test_modules(self.halstead_metrics)
452 raw['docstring_ratio'] = raw.multiline_comment / raw.code
453 raw.sort_values('docstring_ratio', inplace=True)
454 html += raw.iplot(
455 x='fullpath',
456 kind='barh',
457 title='Line Count Metrics',
458 **params
459 ).to_html()
461 html += mi.iplot(
462 x='fullpath',
463 kind='barh',
464 title='Maintainability Metrics',
465 **params
466 ).to_html()
468 params['dimensions'] = (900, 500)
470 cols = ['cyclomatic_complexity', 'cyclomatic_rank']
471 html += cc[cols].iplot(
472 kind='hist',
473 bins=50,
474 title='Cyclomatic Metric Distributions',
475 **params
476 ).to_html()
478 cols = [
479 'h1', 'h2', 'n1', 'n2', 'vocabulary', 'length', 'calculated_length',
480 'volume', 'difficulty', 'effort', 'time', 'bugs'
481 ]
482 html += hal[cols]\
483 .rename(mapper=lambda x: lut[x], axis=1)\
484 .iplot(
485 kind='hist',
486 bins=50,
487 title='Halstead Metric Distributions',
488 **params)\
489 .to_html()
491 html += '\n</body>'
493 with open(fullpath, 'w') as f:
494 f.write(html)
496 return self
498 def write_tables(self, target_dir):
499 # type: (Union[str, Path]) -> RadonETL
500 '''
501 Writes metrics tables as HTML files to given directory.
503 Args:
504 target_dir (Path or str): Target directory.
506 Returns:
507 RadonETL: self.
508 '''
509 def write_table(data, target):
510 # type: (DataFrame, Path) -> None
511 html = data.to_html()
513 # make table sortable
514 script = '<script '
515 script += 'src="http://www.kryogenix.org/code/browser/sorttable/sorttable.js" '
516 script += 'type="text/javascript"></script>\n'
517 html = re.sub('class="dataframe"', 'class="sortable"', html)
518 html = script + html
520 with open(target, 'w') as f:
521 f.write(html)
523 data = self.data
524 raw = self.raw_metrics
525 mi = self.maintainability_index
526 cc = self.cyclomatic_complexity_metrics
527 hal = self.halstead_metrics
529 write_table(data, Path(target_dir, 'all_metrics.html'))
530 write_table(raw, Path(target_dir, 'raw_metrics.html'))
531 write_table(mi, Path(target_dir, 'maintainability_metrics.html'))
532 write_table(cc, Path(target_dir, 'cyclomatic_complexity_metrics.html'))
533 write_table(hal, Path(target_dir, 'halstead_metrics.html'))
535 return self