Coverage for /home/ubuntu/rolling-pin/python/rolling_pin/radon_etl.py: 100%

204 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2023-11-15 00:43 +0000

1from typing import Any, Dict, List, Union # noqa: F401 

2 

3import json 

4import os 

5import re 

6from pathlib import Path 

7 

8import cufflinks as cf 

9import numpy as np 

10import pandas as pd 

11from pandas import DataFrame 

12import radon.complexity 

13from radon.cli import Config 

14from radon.cli import CCHarvester, HCHarvester, MIHarvester, RawHarvester 

15 

16from rolling_pin.blob_etl import BlobETL 

17import rolling_pin.tools as rpt 

18# ------------------------------------------------------------------------------ 

19 

20''' 

21Contain the RadonETL class, which is used for generating a radon report on the 

22code wthin a given directory. 

23''' 

24 

25 

26class RadonETL(): 

27 ''' 

28 Conforms all four radon reports (raw metrics, Halstead, maintainability and 

29 cyclomatic complexity) into a single DataFrame that can then be plotted. 

30 ''' 

31 def __init__(self, fullpath): 

32 # type: (Union[str, Path]) -> None 

33 ''' 

34 Constructs a RadonETL instance. 

35 

36 Args: 

37 fullpath (str or Path): Python file or directory of python files. 

38 ''' 

39 self._report = RadonETL._get_radon_report(fullpath) 

40 # -------------------------------------------------------------------------- 

41 

42 @property 

43 def report(self): 

44 # type: () -> Dict 

45 ''' 

46 dict: Dictionary of all radon metrics. 

47 ''' 

48 return self._report 

49 

50 @property 

51 def data(self): 

52 # type: () -> DataFrame 

53 ''' 

54 DataFrame: DataFrame of all radon metrics. 

55 ''' 

56 return self._get_radon_data() 

57 

58 @property 

59 def raw_metrics(self): 

60 # type: () -> DataFrame 

61 ''' 

62 DataFrame: DataFrame of radon raw metrics. 

63 ''' 

64 return self._get_raw_metrics_dataframe(self._report) 

65 

66 @property 

67 def maintainability_index(self): 

68 # type: () -> DataFrame 

69 ''' 

70 DataFrame: DataFrame of radon maintainability index metrics. 

71 ''' 

72 return self._get_maintainability_index_dataframe(self._report) 

73 

74 @property 

75 def cyclomatic_complexity_metrics(self): 

76 # type: () -> DataFrame 

77 ''' 

78 DataFrame: DataFrame of radon cyclomatic complexity metrics. 

79 ''' 

80 return self._get_cyclomatic_complexity_dataframe(self._report) 

81 

82 @property 

83 def halstead_metrics(self): 

84 # type: () -> DataFrame 

85 ''' 

86 DataFrame: DataFrame of radon Halstead metrics. 

87 ''' 

88 return self._get_halstead_dataframe(self._report) 

89 # -------------------------------------------------------------------------- 

90 

91 def _get_radon_data(self): 

92 # type: () -> DataFrame 

93 ''' 

94 Constructs a DataFrame representing all the radon reports generated for 

95 a given python file or directory containing python files. 

96 

97 Returns: 

98 DataFrame: Radon report DataFrame. 

99 ''' 

100 hal = self.halstead_metrics 

101 cc = self.cyclomatic_complexity_metrics 

102 raw = self.raw_metrics 

103 mi = self.maintainability_index 

104 

105 data = hal.merge(cc, how='outer', on=['fullpath', 'name']) 

106 data['object_type'] = data.object_type_x 

107 mask = data.object_type_x.apply(pd.isnull) 

108 mask = data[mask].index 

109 data.loc[mask, 'object_type'] = data.loc[mask, 'object_type_y'] 

110 del data['object_type_x'] 

111 del data['object_type_y'] 

112 

113 module = raw.merge(mi, on='fullpath') 

114 

115 cols = set(module.columns.tolist()) # type: Any 

116 cols = cols.difference(data.columns.tolist()) 

117 cols = list(cols) 

118 for col in cols: 

119 data[col] = np.nan 

120 

121 mask = data.object_type == 'module' 

122 for i, row in data[mask].iterrows(): 

123 for col in cols: 

124 val = module[module.fullpath == row.fullpath][col].item() 

125 data.loc[i, col] = val 

126 

127 cols = [ 

128 'fullpath', 'name', 'class_name', 'object_type', 'blank', 'bugs', 

129 'calculated_length', 'code', 'column_offset', 'comment', 

130 'cyclomatic_complexity', 'cyclomatic_rank', 'difficulty', 'effort', 

131 'h1', 'h2', 'length', 'logical_code', 'maintainability_index', 

132 'maintainability_rank', 'multiline_comment', 'n1', 'n2', 

133 'single_comment', 'source_code', 'start_line', 'stop_line', 'time', 

134 'vocabulary', 'volume', 

135 ] 

136 data = data[cols] 

137 

138 return data 

139 # -------------------------------------------------------------------------- 

140 

141 @staticmethod 

142 def _get_radon_report(fullpath): 

143 # type: (Union[str, Path]) -> Dict[str, Any] 

144 ''' 

145 Gets all 4 report from radon and aggregates them into a single blob 

146 object. 

147 

148 Args: 

149 fullpath (str or Path): Python file or directory of python files. 

150 

151 Returns: 

152 dict: Radon report blob. 

153 ''' 

154 fullpath_ = [Path(fullpath).absolute().as_posix()] # type: List[str] 

155 output = [] # type: Any 

156 

157 config = Config( 

158 min='A', 

159 max='F', 

160 exclude=None, 

161 ignore=None, 

162 show_complexity=False, 

163 average=False, 

164 total_average=False, 

165 order=getattr( 

166 radon.complexity, 'SCORE', getattr(radon.complexity, 'SCORE') 

167 ), 

168 no_assert=False, 

169 show_closures=False, 

170 ) 

171 output.append(CCHarvester(fullpath_, config).as_json()) 

172 

173 config = Config( 

174 exclude=None, 

175 ignore=None, 

176 summary=False, 

177 ) 

178 output.append(RawHarvester(fullpath_, config).as_json()) 

179 

180 config = Config( 

181 min='A', 

182 max='C', 

183 exclude=None, 

184 ignore=None, 

185 multi=True, 

186 show=False, 

187 sort=False, 

188 ) 

189 output.append(MIHarvester(fullpath_, config).as_json()) 

190 

191 config = Config( 

192 exclude=None, 

193 ignore=None, 

194 by_function=False, 

195 ) 

196 output.append(HCHarvester(fullpath_, config).as_json()) 

197 

198 output = list(map(json.loads, output)) 

199 keys = [ 

200 'cyclomatic_complexity', 'raw_metrics', 'maintainability_index', 

201 'halstead_metrics', 

202 ] 

203 output = dict(zip(keys, output)) 

204 return output 

205 

206 @staticmethod 

207 def _get_raw_metrics_dataframe(report): 

208 # type: (Dict) -> DataFrame 

209 ''' 

210 Converts radon raw metrics report into a pandas DataFrame. 

211 

212 Args: 

213 report (dict): Radon report blob. 

214 

215 Returns: 

216 DataFrame: Raw metrics DataFrame. 

217 ''' 

218 raw = report['raw_metrics'] 

219 fullpaths = list(raw.keys()) 

220 path_lut = {k: f'<list_{i}>' for i, k in enumerate(fullpaths)} 

221 fullpath_fields = {x: {'fullpath': x} for x in fullpaths} 

222 

223 # loc = Lines of Code (total lines) - sloc + blanks + multi + single_comments 

224 # lloc = Logical Lines of Code 

225 # comments = Comments lines 

226 # multi = Multi-line strings (assumed to be docstrings) 

227 # blank = Blank lines (or whitespace-only lines) 

228 # single_comments = Single-line comments or docstrings 

229 name_lut = dict( 

230 blank='blank', 

231 comments='comment', 

232 lloc='logical_code', 

233 loc='code', 

234 multi='multiline_comment', 

235 single_comments='single_comment', 

236 sloc='source_code', 

237 fullpath='fullpath', 

238 ) 

239 data = BlobETL(raw, '#')\ 

240 .update(fullpath_fields) \ 

241 .set_field(0, lambda x: path_lut[x])\ 

242 .set_field(1, lambda x: name_lut[x])\ 

243 .to_dict() # type: Union[Dict, DataFrame] 

244 

245 data = DataFrame(data) 

246 data.sort_values('fullpath', inplace=True) 

247 data.reset_index(drop=True, inplace=True) 

248 cols = [ 

249 'fullpath', 'blank', 'code', 'comment', 'logical_code', 

250 'multiline_comment', 'single_comment', 'source_code', 

251 ] 

252 data = data[cols] 

253 

254 return data 

255 

256 @staticmethod 

257 def _get_maintainability_index_dataframe(report): 

258 # type: (Dict) -> DataFrame 

259 ''' 

260 Converts radon maintainability index report into a pandas DataFrame. 

261 

262 Args: 

263 report (dict): Radon report blob. 

264 

265 Returns: 

266 DataFrame: Maintainability DataFrame. 

267 ''' 

268 mi = report['maintainability_index'] 

269 fullpaths = list(mi.keys()) 

270 path_lut = {k: f'<list_{i}>' for i, k in enumerate(fullpaths)} 

271 fullpath_fields = {x: {'fullpath': x} for x in fullpaths} 

272 name_lut = dict( 

273 mi='maintainability_index', 

274 rank='maintainability_rank', 

275 fullpath='fullpath', 

276 ) 

277 data = None # type: Any 

278 data = BlobETL(mi, '#')\ 

279 .update(fullpath_fields) \ 

280 .set_field(0, lambda x: path_lut[x])\ 

281 .set_field(1, lambda x: name_lut[x])\ 

282 .to_dict() 

283 

284 data = DataFrame(data) 

285 data.sort_values('fullpath', inplace=True) 

286 data.reset_index(drop=True, inplace=True) 

287 cols = ['fullpath', 'maintainability_index', 'maintainability_rank'] 

288 data = data[cols] 

289 

290 # convert rank to integer 

291 rank_lut = {k: i for i, k in enumerate('ABCDEF')} 

292 data['maintainability_rank'] = data['maintainability_rank']\ 

293 .apply(lambda x: rank_lut[x]) 

294 

295 return data 

296 

297 @staticmethod 

298 def _get_cyclomatic_complexity_dataframe(report): 

299 # type: (Dict) -> DataFrame 

300 ''' 

301 Converts radon cyclomatic complexity report into a pandas DataFrame. 

302 

303 Args: 

304 report (dict): Radon report blob. 

305 

306 Returns: 

307 DataFrame: Cyclomatic complexity DataFrame. 

308 ''' 

309 filters = [ 

310 [4, 6, 'method_closure', 

311 '^[^#]+#<list_[0-9]+>#methods#<list_[0-9]+>#closures#<list_[0-9]+>#[^#]+$'], 

312 [3, 4, 'closure', '^[^#]+#<list_[0-9]+>#closures#<list_[0-9]+>#[^#]+$'], 

313 [3, 4, 'method', '^[^#]+#<list_[0-9]+>#methods#<list_[0-9]+>#[^#]+$'], 

314 [2, 2, None, '^[^#]+#<list_[0-9]+>#[^#]+$'], 

315 ] # type: Any 

316 

317 cc = report['cyclomatic_complexity'] 

318 rows = [] 

319 for i, j, type_, regex in filters: 

320 temp = BlobETL(cc, '#').query(regex) # type: DataFrame 

321 if len(temp.to_flat_dict().keys()) > 0: 

322 temp = temp.to_dataframe(i) 

323 item = temp\ 

324 .apply(lambda x: dict(zip(x[j], x['value'])), axis=1)\ 

325 .tolist() 

326 item = DataFrame(item) 

327 item['fullpath'] = temp[0] 

328 if type_ is not None: 

329 item.type = type_ 

330 rows.append(item) 

331 data = pd.concat(rows, ignore_index=True, sort=False) 

332 

333 cols = [ 

334 'fullpath', 'name', 'classname', 'type', 'complexity', 'rank', 

335 'lineno', 'endline', 'col_offset' 

336 ] 

337 data = data[cols] 

338 lut = { 

339 'fullpath': 'fullpath', 

340 'name': 'name', 

341 'classname': 'class_name', 

342 'type': 'object_type', 

343 'complexity': 'cyclomatic_complexity', 

344 'rank': 'cyclomatic_rank', 

345 'lineno': 'start_line', 

346 'endline': 'stop_line', 

347 'col_offset': 'column_offset', 

348 } 

349 data.drop_duplicates(inplace=True) 

350 data.rename(mapper=lambda x: lut[x], axis=1, inplace=True) 

351 data.reset_index(drop=True, inplace=True) 

352 

353 # convert rank to integer 

354 rank_lut = {k: i for i, k in enumerate('ABCDEF')} 

355 data['cyclomatic_rank'] = data['cyclomatic_rank']\ 

356 .apply(lambda x: rank_lut[x]) 

357 

358 return data 

359 

360 @staticmethod 

361 def _get_halstead_dataframe(report): 

362 # type: (Dict) -> DataFrame 

363 ''' 

364 Converts radon Halstead report into a pandas DataFrame. 

365 

366 Args: 

367 report (dict): Radon report blob. 

368 

369 Returns: 

370 DataFrame: Halstead DataFrame. 

371 ''' 

372 hal = report['halstead_metrics'] 

373 keys = [ 

374 'h1', 'h2', 'n1', 'n2', 'vocabulary', 'length', 'calculated_length', 

375 'volume', 'difficulty', 'effort', 'time', 'bugs', 

376 ] 

377 data = BlobETL(hal, '#').query('function|closure').to_dataframe(3) 

378 data['fullpath'] = data[0] 

379 data['object_type'] = data[1].apply(lambda x: re.sub('s$', '', x)) 

380 data['name'] = data.value.apply(lambda x: x[0]) 

381 

382 score = data.value.apply(lambda x: dict(zip(keys, x[1:]))).tolist() 

383 score = DataFrame(score) 

384 data = data.join(score) 

385 

386 total = BlobETL(hal, '#').query('total').to_dataframe() 

387 total['fullpath'] = total[0] 

388 total = total.groupby('fullpath', as_index=False)\ 

389 .agg(lambda x: dict(zip(keys, x))) 

390 score = total.value.tolist() 

391 score = DataFrame(score) 

392 total = total.join(score) 

393 total['object_type'] = 'module' 

394 total['name'] = total.fullpath\ 

395 .apply(lambda x: os.path.splitext((Path(x).name))[0]) 

396 data = pd.concat([data, total], ignore_index=True, sort=False) 

397 

398 cols = ['fullpath', 'name', 'object_type'] 

399 cols.extend(keys) 

400 data = data[cols] 

401 

402 return data 

403 

404 # EXPORT-------------------------------------------------------------------- 

405 def write_plots(self, fullpath): 

406 # type: (Union[str, Path]) -> RadonETL 

407 ''' 

408 Writes metrics plots to given file. 

409 

410 Args: 

411 fullpath (Path or str): Target file. 

412 

413 Returns: 

414 RadonETL: self. 

415 ''' 

416 cf.go_offline() 

417 

418 def remove_test_modules(data): 

419 # type: (DataFrame) -> DataFrame 

420 mask = data.fullpath\ 

421 .apply(lambda x: not re.search(r'_test\.py$', x)).astype(bool) 

422 return data[mask] 

423 

424 lut = dict( 

425 h1='h1 - the number of distinct operators', 

426 h2='h2 - the number of distinct operands', 

427 n1='n1 - the total number of operators', 

428 n2='n2 - the total number of operands', 

429 vocabulary='vocabulary (h) - h1 + h2', 

430 length='length (N) - n1 + n2', 

431 calculated_length='calculated_length - h1 * log2(h1) + h2 * log2(h2)', 

432 volume='volume (V) - N * log2(h)', 

433 difficulty='difficulty (D) - h1 / 2 * n2 / h2', 

434 effort='effort (E) - D * V', 

435 time='time (T) - E / 18 seconds', 

436 bugs='bugs (B) - V / 3000 - an estimate of the errors in the implementation', 

437 ) 

438 

439 params = dict( 

440 theme='henanigans', 

441 colors=rpt.COLOR_SCALE, 

442 dimensions=(900, 900), 

443 asFigure=True, 

444 ) 

445 

446 html = '<body style="background: #242424">\n' 

447 

448 raw = remove_test_modules(self.raw_metrics) 

449 mi = remove_test_modules(self.maintainability_index) 

450 cc = remove_test_modules(self.cyclomatic_complexity_metrics) 

451 hal = remove_test_modules(self.halstead_metrics) 

452 

453 raw['docstring_ratio'] = raw.multiline_comment / raw.code 

454 raw.sort_values('docstring_ratio', inplace=True) 

455 html += raw.iplot( 

456 x='fullpath', 

457 kind='barh', 

458 title='Line Count Metrics', 

459 **params 

460 ).to_html() 

461 

462 html += mi.iplot( 

463 x='fullpath', 

464 kind='barh', 

465 title='Maintainability Metrics', 

466 **params 

467 ).to_html() 

468 

469 params['dimensions'] = (900, 500) 

470 

471 cols = ['cyclomatic_complexity', 'cyclomatic_rank'] 

472 html += cc[cols].iplot( 

473 kind='hist', 

474 bins=50, 

475 title='Cyclomatic Metric Distributions', 

476 **params 

477 ).to_html() 

478 

479 cols = [ 

480 'h1', 'h2', 'n1', 'n2', 'vocabulary', 'length', 'calculated_length', 

481 'volume', 'difficulty', 'effort', 'time', 'bugs' 

482 ] 

483 html += hal[cols]\ 

484 .rename(mapper=lambda x: lut[x], axis=1)\ 

485 .iplot( 

486 kind='hist', 

487 bins=50, 

488 title='Halstead Metric Distributions', 

489 **params)\ 

490 .to_html() 

491 

492 html += '\n</body>' 

493 

494 with open(fullpath, 'w') as f: 

495 f.write(html) 

496 

497 return self 

498 

499 def write_tables(self, target_dir): 

500 # type: (Union[str, Path]) -> RadonETL 

501 ''' 

502 Writes metrics tables as HTML files to given directory. 

503 

504 Args: 

505 target_dir (Path or str): Target directory. 

506 

507 Returns: 

508 RadonETL: self. 

509 ''' 

510 def write_table(data, target): 

511 # type: (DataFrame, Path) -> None 

512 html = data.to_html() 

513 

514 # make table sortable 

515 script = '<script ' 

516 script += 'src="http://www.kryogenix.org/code/browser/sorttable/sorttable.js" ' 

517 script += 'type="text/javascript"></script>\n' 

518 html = re.sub('class="dataframe"', 'class="sortable"', html) 

519 html = script + html 

520 

521 with open(target, 'w') as f: 

522 f.write(html) 

523 

524 data = self.data 

525 raw = self.raw_metrics 

526 mi = self.maintainability_index 

527 cc = self.cyclomatic_complexity_metrics 

528 hal = self.halstead_metrics 

529 

530 write_table(data, Path(target_dir, 'all_metrics.html')) 

531 write_table(raw, Path(target_dir, 'raw_metrics.html')) 

532 write_table(mi, Path(target_dir, 'maintainability_metrics.html')) 

533 write_table(cc, Path(target_dir, 'cyclomatic_complexity_metrics.html')) 

534 write_table(hal, Path(target_dir, 'halstead_metrics.html')) 

535 

536 return self