Coverage for /home/ubuntu/rolling-pin/python/rolling_pin/radon_etl.py: 100%

208 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-13 19:35 +0000

1from typing import Any, Dict, List, Union # noqa: F401 

2 

3import json 

4import os 

5import re 

6from pathlib import Path 

7 

8from pandas import DataFrame 

9from radon.cli import CCHarvester, HCHarvester, MIHarvester, RawHarvester 

10from radon.cli import Config 

11import numpy as np 

12import pandas as pd 

13import plotly.express as px 

14import radon.complexity 

15 

16from rolling_pin.blob_etl import BlobETL 

17import rolling_pin.tools as rpt 

18# ------------------------------------------------------------------------------ 

19 

20''' 

21Contain the RadonETL class, which is used for generating a radon report on the 

22code wthin a given directory. 

23''' 

24 

25 

26class RadonETL(): 

27 ''' 

28 Conforms all four radon reports (raw metrics, Halstead, maintainability and 

29 cyclomatic complexity) into a single DataFrame that can then be plotted. 

30 ''' 

31 def __init__(self, fullpath): 

32 # type: (Union[str, Path]) -> None 

33 ''' 

34 Constructs a RadonETL instance. 

35 

36 Args: 

37 fullpath (str or Path): Python file or directory of python files. 

38 ''' 

39 self._report = RadonETL._get_radon_report(fullpath) 

40 # -------------------------------------------------------------------------- 

41 

42 @property 

43 def report(self): 

44 # type: () -> Dict 

45 ''' 

46 dict: Dictionary of all radon metrics. 

47 ''' 

48 return self._report 

49 

50 @property 

51 def data(self): 

52 # type: () -> DataFrame 

53 ''' 

54 DataFrame: DataFrame of all radon metrics. 

55 ''' 

56 return self._get_radon_data() 

57 

58 @property 

59 def raw_metrics(self): 

60 # type: () -> DataFrame 

61 ''' 

62 DataFrame: DataFrame of radon raw metrics. 

63 ''' 

64 return self._get_raw_metrics_dataframe(self._report) 

65 

66 @property 

67 def maintainability_index(self): 

68 # type: () -> DataFrame 

69 ''' 

70 DataFrame: DataFrame of radon maintainability index metrics. 

71 ''' 

72 return self._get_maintainability_index_dataframe(self._report) 

73 

74 @property 

75 def cyclomatic_complexity_metrics(self): 

76 # type: () -> DataFrame 

77 ''' 

78 DataFrame: DataFrame of radon cyclomatic complexity metrics. 

79 ''' 

80 return self._get_cyclomatic_complexity_dataframe(self._report) 

81 

82 @property 

83 def halstead_metrics(self): 

84 # type: () -> DataFrame 

85 ''' 

86 DataFrame: DataFrame of radon Halstead metrics. 

87 ''' 

88 return self._get_halstead_dataframe(self._report) 

89 # -------------------------------------------------------------------------- 

90 

91 def _get_radon_data(self): 

92 # type: () -> DataFrame 

93 ''' 

94 Constructs a DataFrame representing all the radon reports generated for 

95 a given python file or directory containing python files. 

96 

97 Returns: 

98 DataFrame: Radon report DataFrame. 

99 ''' 

100 hal = self.halstead_metrics 

101 cc = self.cyclomatic_complexity_metrics 

102 raw = self.raw_metrics 

103 mi = self.maintainability_index 

104 

105 data = hal.merge(cc, how='outer', on=['fullpath', 'name']) 

106 data['object_type'] = data.object_type_x 

107 mask = data.object_type_x.apply(pd.isnull) 

108 mask = data[mask].index 

109 data.loc[mask, 'object_type'] = data.loc[mask, 'object_type_y'] 

110 del data['object_type_x'] 

111 del data['object_type_y'] 

112 

113 module = raw.merge(mi, on='fullpath') 

114 

115 cols = set(module.columns.tolist()) # type: Any 

116 cols = cols.difference(data.columns.tolist()) 

117 cols = list(cols) 

118 for col in cols: 

119 data[col] = np.nan 

120 

121 mask = data.object_type == 'module' 

122 for i, row in data[mask].iterrows(): 

123 for col in cols: 

124 val = module[module.fullpath == row.fullpath][col].item() 

125 data.loc[i, col] = val 

126 

127 cols = [ 

128 'fullpath', 'name', 'class_name', 'object_type', 'blank', 'bugs', 

129 'calculated_length', 'code', 'column_offset', 'comment', 

130 'cyclomatic_complexity', 'cyclomatic_rank', 'difficulty', 'effort', 

131 'h1', 'h2', 'length', 'logical_code', 'maintainability_index', 

132 'maintainability_rank', 'multiline_comment', 'n1', 'n2', 

133 'single_comment', 'source_code', 'start_line', 'stop_line', 'time', 

134 'vocabulary', 'volume', 

135 ] 

136 data = data[cols] 

137 

138 return data 

139 # -------------------------------------------------------------------------- 

140 

141 @staticmethod 

142 def _get_radon_report(fullpath): 

143 # type: (Union[str, Path]) -> Dict[str, Any] 

144 ''' 

145 Gets all 4 report from radon and aggregates them into a single blob 

146 object. 

147 

148 Args: 

149 fullpath (str or Path): Python file or directory of python files. 

150 

151 Returns: 

152 dict: Radon report blob. 

153 ''' 

154 fullpath_ = [Path(fullpath).absolute().as_posix()] # type: List[str] 

155 output = [] # type: Any 

156 

157 config = Config( 

158 min='A', 

159 max='F', 

160 exclude=None, 

161 ignore=None, 

162 show_complexity=False, 

163 average=False, 

164 total_average=False, 

165 order=getattr( 

166 radon.complexity, 'SCORE', getattr(radon.complexity, 'SCORE') 

167 ), 

168 no_assert=False, 

169 show_closures=False, 

170 ) 

171 output.append(CCHarvester(fullpath_, config).as_json()) 

172 

173 config = Config( 

174 exclude=None, 

175 ignore=None, 

176 summary=False, 

177 ) 

178 output.append(RawHarvester(fullpath_, config).as_json()) 

179 

180 config = Config( 

181 min='A', 

182 max='C', 

183 exclude=None, 

184 ignore=None, 

185 multi=True, 

186 show=False, 

187 sort=False, 

188 ) 

189 output.append(MIHarvester(fullpath_, config).as_json()) 

190 

191 config = Config( 

192 exclude=None, 

193 ignore=None, 

194 by_function=False, 

195 ) 

196 output.append(HCHarvester(fullpath_, config).as_json()) 

197 

198 output = list(map(json.loads, output)) 

199 keys = [ 

200 'cyclomatic_complexity', 'raw_metrics', 'maintainability_index', 

201 'halstead_metrics', 

202 ] 

203 output = dict(zip(keys, output)) 

204 return output 

205 

206 @staticmethod 

207 def _get_raw_metrics_dataframe(report): 

208 # type: (Dict) -> DataFrame 

209 ''' 

210 Converts radon raw metrics report into a pandas DataFrame. 

211 

212 Args: 

213 report (dict): Radon report blob. 

214 

215 Returns: 

216 DataFrame: Raw metrics DataFrame. 

217 ''' 

218 raw = report['raw_metrics'] 

219 fullpaths = list(raw.keys()) 

220 path_lut = {k: f'<list_{i}>' for i, k in enumerate(fullpaths)} 

221 fullpath_fields = {x: {'fullpath': x} for x in fullpaths} 

222 

223 # loc = Lines of Code (total lines) - sloc + blanks + multi + single_comments 

224 # lloc = Logical Lines of Code 

225 # comments = Comments lines 

226 # multi = Multi-line strings (assumed to be docstrings) 

227 # blank = Blank lines (or whitespace-only lines) 

228 # single_comments = Single-line comments or docstrings 

229 name_lut = dict( 

230 blank='blank', 

231 comments='comment', 

232 lloc='logical_code', 

233 loc='code', 

234 multi='multiline_comment', 

235 single_comments='single_comment', 

236 sloc='source_code', 

237 fullpath='fullpath', 

238 ) 

239 data = BlobETL(raw, '#')\ 

240 .update(fullpath_fields) \ 

241 .set_field(0, lambda x: path_lut[x])\ 

242 .set_field(1, lambda x: name_lut[x])\ 

243 .to_dict() # type: Union[Dict, DataFrame] 

244 

245 data = DataFrame(data) 

246 data.sort_values('fullpath', inplace=True) 

247 data.reset_index(drop=True, inplace=True) 

248 cols = [ 

249 'fullpath', 'blank', 'code', 'comment', 'logical_code', 

250 'multiline_comment', 'single_comment', 'source_code', 

251 ] 

252 data = data[cols] 

253 

254 return data 

255 

256 @staticmethod 

257 def _get_maintainability_index_dataframe(report): 

258 # type: (Dict) -> DataFrame 

259 ''' 

260 Converts radon maintainability index report into a pandas DataFrame. 

261 

262 Args: 

263 report (dict): Radon report blob. 

264 

265 Returns: 

266 DataFrame: Maintainability DataFrame. 

267 ''' 

268 mi = report['maintainability_index'] 

269 fullpaths = list(mi.keys()) 

270 path_lut = {k: f'<list_{i}>' for i, k in enumerate(fullpaths)} 

271 fullpath_fields = {x: {'fullpath': x} for x in fullpaths} 

272 name_lut = dict( 

273 mi='maintainability_index', 

274 rank='maintainability_rank', 

275 fullpath='fullpath', 

276 ) 

277 data = None # type: Any 

278 data = BlobETL(mi, '#')\ 

279 .update(fullpath_fields) \ 

280 .set_field(0, lambda x: path_lut[x])\ 

281 .set_field(1, lambda x: name_lut[x])\ 

282 .to_dict() 

283 

284 data = DataFrame(data) 

285 data.sort_values('fullpath', inplace=True) 

286 data.reset_index(drop=True, inplace=True) 

287 cols = ['fullpath', 'maintainability_index', 'maintainability_rank'] 

288 data = data[cols] 

289 

290 # convert rank to integer 

291 rank_lut = {k: i for i, k in enumerate('ABCDEF')} 

292 data['maintainability_rank'] = data['maintainability_rank']\ 

293 .apply(lambda x: rank_lut[x]) 

294 

295 return data 

296 

297 @staticmethod 

298 def _get_cyclomatic_complexity_dataframe(report): 

299 # type: (Dict) -> DataFrame 

300 ''' 

301 Converts radon cyclomatic complexity report into a pandas DataFrame. 

302 

303 Args: 

304 report (dict): Radon report blob. 

305 

306 Returns: 

307 DataFrame: Cyclomatic complexity DataFrame. 

308 ''' 

309 filters = [ 

310 [4, 6, 'method_closure', 

311 '^[^#]+#<list_[0-9]+>#methods#<list_[0-9]+>#closures#<list_[0-9]+>#[^#]+$'], 

312 [3, 4, 'closure', '^[^#]+#<list_[0-9]+>#closures#<list_[0-9]+>#[^#]+$'], 

313 [3, 4, 'method', '^[^#]+#<list_[0-9]+>#methods#<list_[0-9]+>#[^#]+$'], 

314 [2, 2, None, '^[^#]+#<list_[0-9]+>#[^#]+$'], 

315 ] # type: Any 

316 

317 cc = report['cyclomatic_complexity'] 

318 rows = [] 

319 for i, j, type_, regex in filters: 

320 temp = BlobETL(cc, '#').query(regex) # type: DataFrame 

321 if len(temp.to_flat_dict().keys()) > 0: 

322 temp = temp.to_dataframe(i) 

323 item = temp\ 

324 .apply(lambda x: dict(zip(x[j], x['value'])), axis=1)\ 

325 .tolist() 

326 item = DataFrame(item) 

327 item['fullpath'] = temp[0] 

328 if type_ is not None: 

329 item.type = type_ 

330 rows.append(item) 

331 data = pd.concat(rows, ignore_index=True, sort=False) 

332 

333 cols = [ 

334 'fullpath', 'name', 'classname', 'type', 'complexity', 'rank', 

335 'lineno', 'endline', 'col_offset' 

336 ] 

337 data = data[cols] 

338 lut = { 

339 'fullpath': 'fullpath', 

340 'name': 'name', 

341 'classname': 'class_name', 

342 'type': 'object_type', 

343 'complexity': 'cyclomatic_complexity', 

344 'rank': 'cyclomatic_rank', 

345 'lineno': 'start_line', 

346 'endline': 'stop_line', 

347 'col_offset': 'column_offset', 

348 } 

349 data.drop_duplicates(inplace=True) 

350 data.rename(mapper=lambda x: lut[x], axis=1, inplace=True) 

351 data.reset_index(drop=True, inplace=True) 

352 

353 # convert rank to integer 

354 rank_lut = {k: i for i, k in enumerate('ABCDEF')} 

355 data['cyclomatic_rank'] = data['cyclomatic_rank']\ 

356 .apply(lambda x: rank_lut[x]) 

357 

358 return data 

359 

360 @staticmethod 

361 def _get_halstead_dataframe(report): 

362 # type: (Dict) -> DataFrame 

363 ''' 

364 Converts radon Halstead report into a pandas DataFrame. 

365 

366 Args: 

367 report (dict): Radon report blob. 

368 

369 Returns: 

370 DataFrame: Halstead DataFrame. 

371 ''' 

372 hal = report['halstead_metrics'] 

373 keys = [ 

374 'h1', 'h2', 'n1', 'n2', 'vocabulary', 'length', 'calculated_length', 

375 'volume', 'difficulty', 'effort', 'time', 'bugs', 

376 ] 

377 data = BlobETL(hal, '#').query('function|closure').to_dataframe(3) 

378 data['fullpath'] = data[0] 

379 data['object_type'] = data[1].apply(lambda x: re.sub('s$', '', x)) 

380 data['name'] = data.value.apply(lambda x: x[0]) 

381 

382 score = data.value.apply(lambda x: dict(zip(keys, x[1:]))).tolist() 

383 score = DataFrame(score) 

384 data = data.join(score) 

385 

386 total = BlobETL(hal, '#').query('total').to_dataframe() 

387 total['fullpath'] = total[0] 

388 total = total.groupby('fullpath', as_index=False)\ 

389 .agg(lambda x: dict(zip(keys, x))) 

390 score = total.value.tolist() 

391 score = DataFrame(score) 

392 total = total.join(score) 

393 total['object_type'] = 'module' 

394 total['name'] = total.fullpath\ 

395 .apply(lambda x: os.path.splitext((Path(x).name))[0]) 

396 data = pd.concat([data, total], ignore_index=True, sort=False) 

397 

398 cols = ['fullpath', 'name', 'object_type'] 

399 cols.extend(keys) 

400 data = data[cols] 

401 

402 return data 

403 

404 # EXPORT-------------------------------------------------------------------- 

405 def write_plots(self, fullpath): 

406 # type: (Union[str, Path]) -> RadonETL 

407 ''' 

408 Writes metrics plots to given file. 

409 

410 Args: 

411 fullpath (Path or str): Target file. 

412 

413 Returns: 

414 RadonETL: self. 

415 ''' 

416 def remove_test_modules(data): 

417 # type: (DataFrame) -> DataFrame 

418 mask = data.fullpath\ 

419 .apply(lambda x: not re.search(r'_test\.py$', x)).astype(bool) 

420 return data[mask] 

421 

422 lut = dict( 

423 h1='h1 - the number of distinct operators', 

424 h2='h2 - the number of distinct operands', 

425 n1='n1 - the total number of operators', 

426 n2='n2 - the total number of operands', 

427 vocabulary='vocabulary (h) - h1 + h2', 

428 length='length (N) - n1 + n2', 

429 calculated_length='calculated_length - h1 * log2(h1) + h2 * log2(h2)', 

430 volume='volume (V) - N * log2(h)', 

431 difficulty='difficulty (D) - h1 / 2 * n2 / h2', 

432 effort='effort (E) - D * V', 

433 time='time (T) - E / 18 seconds', 

434 bugs='bugs (B) - V / 3000 - an estimate of the errors in the implementation', 

435 ) 

436 

437 html = '<body style="background: #242424">\n' 

438 

439 raw = remove_test_modules(self.raw_metrics) 

440 mi = remove_test_modules(self.maintainability_index) 

441 cc = remove_test_modules(self.cyclomatic_complexity_metrics) 

442 hal = remove_test_modules(self.halstead_metrics) 

443 

444 raw['docstring_ratio'] = raw.multiline_comment / raw.code 

445 raw.sort_values('docstring_ratio', inplace=True) 

446 

447 # line count 

448 fig = px.bar( 

449 raw, 

450 title='Line Count Metrics', 

451 x=raw.drop(columns='fullpath').columns.tolist(), 

452 y='fullpath', 

453 orientation='h', 

454 barmode='group', 

455 width=900, 

456 height=900, 

457 color_discrete_sequence=rpt.COLOR_SCALE, 

458 ) 

459 fig.layout.update(rpt.PLOTLY_LAYOUT_THEME) 

460 html += fig.to_html() 

461 

462 # maintainability 

463 fig = px.bar( 

464 mi, 

465 title='Maintainability Metrics', 

466 x='maintainability_index', 

467 y='fullpath', 

468 orientation='h', 

469 barmode='group', 

470 width=900, 

471 height=900, 

472 color_discrete_sequence=rpt.COLOR_SCALE, 

473 ) 

474 fig.layout.update(rpt.PLOTLY_LAYOUT_THEME) 

475 html += fig.to_html() 

476 

477 # cyclomatic 

478 fig = px.histogram( 

479 cc[['cyclomatic_complexity', 'cyclomatic_rank']], 

480 title='Cyclomatic Metric Distributions', 

481 nbins=10, 

482 width=900, 

483 height=500, 

484 color_discrete_sequence=rpt.COLOR_SCALE, 

485 ) 

486 fig.layout.update(rpt.PLOTLY_LAYOUT_THEME) 

487 html += fig.to_html() 

488 

489 # halstead 

490 cols = [ 

491 'h1', 'h2', 'n1', 'n2', 'vocabulary', 'length', 'calculated_length', 

492 'volume', 'difficulty', 'effort', 'time', 'bugs' 

493 ] 

494 fig = px.histogram( 

495 hal[cols].rename(mapper=lambda x: lut[x], axis=1), 

496 title='Halstead Metric Distributions', 

497 nbins=10, 

498 width=1400, 

499 height=500, 

500 color_discrete_sequence=rpt.COLOR_SCALE, 

501 ) 

502 fig.layout.update(rpt.PLOTLY_LAYOUT_THEME) 

503 html += fig.to_html() 

504 

505 html += '\n</body>' 

506 

507 with open(fullpath, 'w') as f: 

508 f.write(html) 

509 

510 return self 

511 

512 def write_tables(self, target_dir): 

513 # type: (Union[str, Path]) -> RadonETL 

514 ''' 

515 Writes metrics tables as HTML files to given directory. 

516 

517 Args: 

518 target_dir (Path or str): Target directory. 

519 

520 Returns: 

521 RadonETL: self. 

522 ''' 

523 def write_table(data, target): 

524 # type: (DataFrame, Path) -> None 

525 html = data.to_html() 

526 

527 # make table sortable 

528 script = '<script ' 

529 script += 'src="http://www.kryogenix.org/code/browser/sorttable/sorttable.js" ' 

530 script += 'type="text/javascript"></script>\n' 

531 html = re.sub('class="dataframe"', 'class="sortable"', html) 

532 html = script + html 

533 

534 with open(target, 'w') as f: 

535 f.write(html) 

536 

537 data = self.data 

538 raw = self.raw_metrics 

539 mi = self.maintainability_index 

540 cc = self.cyclomatic_complexity_metrics 

541 hal = self.halstead_metrics 

542 

543 write_table(data, Path(target_dir, 'all_metrics.html')) 

544 write_table(raw, Path(target_dir, 'raw_metrics.html')) 

545 write_table(mi, Path(target_dir, 'maintainability_metrics.html')) 

546 write_table(cc, Path(target_dir, 'cyclomatic_complexity_metrics.html')) 

547 write_table(hal, Path(target_dir, 'halstead_metrics.html')) 

548 

549 return self