Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from typing import Any, Dict, List, Union 

2 

3import json 

4import os 

5import re 

6from pathlib import Path 

7 

8import cufflinks as cf 

9import numpy as np 

10import pandas as pd 

11from pandas import DataFrame 

12import radon.complexity 

13from radon.cli import Config 

14from radon.cli import CCHarvester, HCHarvester, MIHarvester, RawHarvester 

15 

16from rolling_pin.blob_etl import BlobETL 

17import rolling_pin.tools as tools 

18# ------------------------------------------------------------------------------ 

19 

20''' 

21Contain the RadonETL class, which is used for generating a radon report on the 

22code wthin a given directory. 

23''' 

24 

25 

26class RadonETL(): 

27 ''' 

28 Conforms all four radon reports (raw metrics, Halstead, maintainability and 

29 cyclomatic complexity) into a single DataFrame that can then be plotted. 

30 ''' 

31 def __init__(self, fullpath): 

32 # type: (Union[str, Path]) -> None 

33 ''' 

34 Constructs a RadonETL instance. 

35 

36 Args: 

37 fullpath (str or Path): Python file or directory of python files. 

38 ''' 

39 self._report = RadonETL._get_radon_report(fullpath) 

40 # -------------------------------------------------------------------------- 

41 

42 @property 

43 def report(self): 

44 # type: () -> Dict 

45 ''' 

46 dict: Dictionary of all radon metrics. 

47 ''' 

48 return self._report 

49 

50 @property 

51 def data(self): 

52 # type: () -> DataFrame 

53 ''' 

54 DataFrame: DataFrame of all radon metrics. 

55 ''' 

56 return self._get_radon_data() 

57 

58 @property 

59 def raw_metrics(self): 

60 # type: () -> DataFrame 

61 ''' 

62 DataFrame: DataFrame of radon raw metrics. 

63 ''' 

64 return self._get_raw_metrics_dataframe(self._report) 

65 

66 @property 

67 def maintainability_index(self): 

68 # type: () -> DataFrame 

69 ''' 

70 DataFrame: DataFrame of radon maintainability index metrics. 

71 ''' 

72 return self._get_maintainability_index_dataframe(self._report) 

73 

74 @property 

75 def cyclomatic_complexity_metrics(self): 

76 # type: () -> DataFrame 

77 ''' 

78 DataFrame: DataFrame of radon cyclomatic complexity metrics. 

79 ''' 

80 return self._get_cyclomatic_complexity_dataframe(self._report) 

81 

82 @property 

83 def halstead_metrics(self): 

84 # type: () -> DataFrame 

85 ''' 

86 DataFrame: DataFrame of radon Halstead metrics. 

87 ''' 

88 return self._get_halstead_dataframe(self._report) 

89 # -------------------------------------------------------------------------- 

90 

91 def _get_radon_data(self): 

92 # type: () -> DataFrame 

93 ''' 

94 Constructs a DataFrame representing all the radon reports generated for 

95 a given python file or directory containing python files. 

96 

97 Returns: 

98 DataFrame: Radon report DataFrame. 

99 ''' 

100 hal = self.halstead_metrics 

101 cc = self.cyclomatic_complexity_metrics 

102 raw = self.raw_metrics 

103 mi = self.maintainability_index 

104 

105 data = hal.merge(cc, how='outer', on=['fullpath', 'name']) 

106 data['object_type'] = data.object_type_x 

107 mask = data.object_type_x.apply(pd.isnull) 

108 mask = data[mask].index 

109 data.loc[mask, 'object_type'] = data.loc[mask, 'object_type_y'] 

110 del data['object_type_x'] 

111 del data['object_type_y'] 

112 

113 module = raw.merge(mi, on='fullpath') 

114 

115 cols = set(module.columns.tolist()) # type: Any 

116 cols = cols.difference(data.columns.tolist()) 

117 cols = list(cols) 

118 for col in cols: 

119 data[col] = np.nan 

120 

121 mask = data.object_type == 'module' 

122 for i, row in data[mask].iterrows(): 

123 for col in cols: 

124 val = module[module.fullpath == row.fullpath][col].item() 

125 data.loc[i, col] = val 

126 

127 cols = [ 

128 'fullpath', 'name', 'class_name', 'object_type', 'blank', 'bugs', 

129 'calculated_length', 'code', 'column_offset', 'comment', 

130 'cyclomatic_complexity', 'cyclomatic_rank', 'difficulty', 'effort', 

131 'h1', 'h2', 'length', 'logical_code', 'maintainability_index', 

132 'maintainability_rank', 'multiline_comment', 'n1', 'n2', 

133 'single_comment', 'source_code', 'start_line', 'stop_line', 'time', 

134 'vocabulary', 'volume', 

135 ] 

136 data = data[cols] 

137 

138 return data 

139 # -------------------------------------------------------------------------- 

140 

141 @staticmethod 

142 def _get_radon_report(fullpath): 

143 # type: (Union[str, Path]) -> Dict[str, Any] 

144 ''' 

145 Gets all 4 report from radon and aggregates them into a single blob 

146 object. 

147 

148 Args: 

149 fullpath (str or Path): Python file or directory of python files. 

150 

151 Returns: 

152 dict: Radon report blob. 

153 ''' 

154 fullpath_ = [Path(fullpath).absolute().as_posix()] # type: List[str] 

155 output = [] # type: Any 

156 

157 config = Config( 

158 min='A', 

159 max='F', 

160 exclude=None, 

161 ignore=None, 

162 show_complexity=False, 

163 average=False, 

164 total_average=False, 

165 order=getattr( 

166 radon.complexity, 'SCORE', getattr(radon.complexity, 'SCORE') 

167 ), 

168 no_assert=False, 

169 show_closures=False, 

170 ) 

171 output.append(CCHarvester(fullpath_, config).as_json()) 

172 

173 config = Config( 

174 exclude=None, 

175 ignore=None, 

176 summary=False, 

177 ) 

178 output.append(RawHarvester(fullpath_, config).as_json()) 

179 

180 config = Config( 

181 min='A', 

182 max='C', 

183 exclude=None, 

184 ignore=None, 

185 multi=True, 

186 show=False, 

187 sort=False, 

188 ) 

189 output.append(MIHarvester(fullpath_, config).as_json()) 

190 

191 config = Config( 

192 exclude=None, 

193 ignore=None, 

194 by_function=False, 

195 ) 

196 output.append(HCHarvester(fullpath_, config).as_json()) 

197 

198 output = list(map(json.loads, output)) 

199 keys = [ 

200 'cyclomatic_complexity', 'raw_metrics', 'maintainability_index', 

201 'halstead_metrics', 

202 ] 

203 output = dict(zip(keys, output)) 

204 return output 

205 

206 @staticmethod 

207 def _get_raw_metrics_dataframe(report): 

208 # type: (Dict) -> DataFrame 

209 ''' 

210 Converts radon raw metrics report into a pandas DataFrame. 

211 

212 Args: 

213 report (dict): Radon report blob. 

214 

215 Returns: 

216 DataFrame: Raw metrics DataFrame. 

217 ''' 

218 raw = report['raw_metrics'] 

219 fullpaths = list(raw.keys()) 

220 path_lut = {k: f'<list_{i}>' for i, k in enumerate(fullpaths)} 

221 fullpath_fields = {x: {'fullpath': x} for x in fullpaths} 

222 

223 # loc = Lines of Code (total lines) - sloc + blanks + multi + single_comments 

224 # lloc = Logical Lines of Code 

225 # comments = Comments lines 

226 # multi = Multi-line strings (assumed to be docstrings) 

227 # blank = Blank lines (or whitespace-only lines) 

228 # single_comments = Single-line comments or docstrings 

229 name_lut = dict( 

230 blank='blank', 

231 comments='comment', 

232 lloc='logical_code', 

233 loc='code', 

234 multi='multiline_comment', 

235 single_comments='single_comment', 

236 sloc='source_code', 

237 fullpath='fullpath', 

238 ) 

239 data = BlobETL(raw, '#')\ 

240 .update(fullpath_fields) \ 

241 .set_field(0, lambda x: path_lut[x])\ 

242 .set_field(1, lambda x: name_lut[x])\ 

243 .to_dict() # type: Union[Dict, DataFrame] 

244 

245 data = DataFrame(data) 

246 data.sort_values('fullpath', inplace=True) 

247 data.reset_index(drop=True, inplace=True) 

248 cols = [ 

249 'fullpath', 'blank', 'code', 'comment', 'logical_code', 

250 'multiline_comment', 'single_comment', 'source_code', 

251 ] 

252 data = data[cols] 

253 

254 return data 

255 

256 @staticmethod 

257 def _get_maintainability_index_dataframe(report): 

258 # type: (Dict) -> DataFrame 

259 ''' 

260 Converts radon maintainability index report into a pandas DataFrame. 

261 

262 Args: 

263 report (dict): Radon report blob. 

264 

265 Returns: 

266 DataFrame: Maintainability DataFrame. 

267 ''' 

268 mi = report['maintainability_index'] 

269 fullpaths = list(mi.keys()) 

270 path_lut = {k: f'<list_{i}>' for i, k in enumerate(fullpaths)} 

271 fullpath_fields = {x: {'fullpath': x} for x in fullpaths} 

272 name_lut = dict( 

273 mi='maintainability_index', 

274 rank='maintainability_rank', 

275 fullpath='fullpath', 

276 ) 

277 data = None # type: Any 

278 data = BlobETL(mi, '#')\ 

279 .update(fullpath_fields) \ 

280 .set_field(0, lambda x: path_lut[x])\ 

281 .set_field(1, lambda x: name_lut[x])\ 

282 .to_dict() 

283 

284 data = DataFrame(data) 

285 data.sort_values('fullpath', inplace=True) 

286 data.reset_index(drop=True, inplace=True) 

287 cols = ['fullpath', 'maintainability_index', 'maintainability_rank'] 

288 data = data[cols] 

289 

290 # convert rank to integer 

291 rank_lut = {k: i for i, k in enumerate('ABCDEF')} 

292 data['maintainability_rank'] = data['maintainability_rank']\ 

293 .apply(lambda x: rank_lut[x]) 

294 

295 return data 

296 

297 @staticmethod 

298 def _get_cyclomatic_complexity_dataframe(report): 

299 # type: (Dict) -> DataFrame 

300 ''' 

301 Converts radon cyclomatic complexity report into a pandas DataFrame. 

302 

303 Args: 

304 report (dict): Radon report blob. 

305 

306 Returns: 

307 DataFrame: Cyclomatic complexity DataFrame. 

308 ''' 

309 filters = [ 

310 [4, 6, 'method_closure', 

311 '^[^#]+#<list_[0-9]+>#methods#<list_[0-9]+>#closures#<list_[0-9]+>#[^#]+$'], 

312 [3, 4, 'closure', '^[^#]+#<list_[0-9]+>#closures#<list_[0-9]+>#[^#]+$'], 

313 [3, 4, 'method', '^[^#]+#<list_[0-9]+>#methods#<list_[0-9]+>#[^#]+$'], 

314 [2, 2, None, '^[^#]+#<list_[0-9]+>#[^#]+$'], 

315 ] # type: Any 

316 

317 cc = report['cyclomatic_complexity'] 

318 data = DataFrame() 

319 for i, j, type_, regex in filters: 

320 temp = BlobETL(cc, '#').query(regex) # type: DataFrame 

321 if len(temp.to_flat_dict().keys()) > 0: 

322 temp = temp.to_dataframe(i) 

323 item = temp\ 

324 .apply(lambda x: dict(zip(x[j], x['value'])), axis=1)\ 

325 .tolist() 

326 item = DataFrame(item) 

327 item['fullpath'] = temp[0] 

328 if type_ is not None: 

329 item.type = type_ 

330 data = data.append(item, ignore_index=True, sort=False) 

331 

332 cols = [ 

333 'fullpath', 'name', 'classname', 'type', 'complexity', 'rank', 

334 'lineno', 'endline', 'col_offset' 

335 ] 

336 data = data[cols] 

337 lut = { 

338 'fullpath': 'fullpath', 

339 'name': 'name', 

340 'classname': 'class_name', 

341 'type': 'object_type', 

342 'complexity': 'cyclomatic_complexity', 

343 'rank': 'cyclomatic_rank', 

344 'lineno': 'start_line', 

345 'endline': 'stop_line', 

346 'col_offset': 'column_offset', 

347 } 

348 data.drop_duplicates(inplace=True) 

349 data.rename(mapper=lambda x: lut[x], axis=1, inplace=True) 

350 data.reset_index(drop=True, inplace=True) 

351 

352 # convert rank to integer 

353 rank_lut = {k: i for i, k in enumerate('ABCDEF')} 

354 data['cyclomatic_rank'] = data['cyclomatic_rank']\ 

355 .apply(lambda x: rank_lut[x]) 

356 

357 return data 

358 

359 @staticmethod 

360 def _get_halstead_dataframe(report): 

361 # type: (Dict) -> DataFrame 

362 ''' 

363 Converts radon Halstead report into a pandas DataFrame. 

364 

365 Args: 

366 report (dict): Radon report blob. 

367 

368 Returns: 

369 DataFrame: Halstead DataFrame. 

370 ''' 

371 hal = report['halstead_metrics'] 

372 keys = [ 

373 'h1', 'h2', 'n1', 'n2', 'vocabulary', 'length', 'calculated_length', 

374 'volume', 'difficulty', 'effort', 'time', 'bugs', 

375 ] 

376 data = BlobETL(hal, '#').query('function|closure').to_dataframe(3) 

377 data['fullpath'] = data[0] 

378 data['object_type'] = data[1].apply(lambda x: re.sub('s$', '', x)) 

379 data['name'] = data.value.apply(lambda x: x[0]) 

380 

381 score = data.value.apply(lambda x: dict(zip(keys, x[1:]))).tolist() 

382 score = DataFrame(score) 

383 data = data.join(score) 

384 

385 total = BlobETL(hal, '#').query('total').to_dataframe() 

386 total['fullpath'] = total[0] 

387 total = total.groupby('fullpath', as_index=False)\ 

388 .agg(lambda x: dict(zip(keys, x))) 

389 score = total.value.tolist() 

390 score = DataFrame(score) 

391 total = total.join(score) 

392 total['object_type'] = 'module' 

393 total['name'] = total.fullpath\ 

394 .apply(lambda x: os.path.splitext((Path(x).name))[0]) 

395 data = data.append(total, sort=False) 

396 

397 cols = ['fullpath', 'name', 'object_type'] 

398 cols.extend(keys) 

399 data = data[cols] 

400 

401 return data 

402 

403 # EXPORT-------------------------------------------------------------------- 

404 def write_plots(self, fullpath): 

405 # type: (Union[str, Path]) -> RadonETL 

406 ''' 

407 Writes metrics plots to given file. 

408 

409 Args: 

410 fullpath (Path or str): Target file. 

411 

412 Returns: 

413 RadonETL: self. 

414 ''' 

415 cf.go_offline() 

416 

417 def remove_test_modules(data): 

418 # type: (DataFrame) -> DataFrame 

419 mask = data.fullpath\ 

420 .apply(lambda x: not re.search(r'_test\.py$', x)).astype(bool) 

421 return data[mask] 

422 

423 lut = dict( 

424 h1='h1 - the number of distinct operators', 

425 h2='h2 - the number of distinct operands', 

426 n1='n1 - the total number of operators', 

427 n2='n2 - the total number of operands', 

428 vocabulary='vocabulary (h) - h1 + h2', 

429 length='length (N) - n1 + n2', 

430 calculated_length='calculated_length - h1 * log2(h1) + h2 * log2(h2)', 

431 volume='volume (V) - N * log2(h)', 

432 difficulty='difficulty (D) - h1 / 2 * n2 / h2', 

433 effort='effort (E) - D * V', 

434 time='time (T) - E / 18 seconds', 

435 bugs='bugs (B) - V / 3000 - an estimate of the errors in the implementation', 

436 ) 

437 

438 params = dict( 

439 theme='henanigans', 

440 colors=tools.COLOR_SCALE, 

441 dimensions=(900, 900), 

442 asFigure=True, 

443 ) 

444 

445 html = '<body style="background: #242424">\n' 

446 

447 raw = remove_test_modules(self.raw_metrics) 

448 mi = remove_test_modules(self.maintainability_index) 

449 cc = remove_test_modules(self.cyclomatic_complexity_metrics) 

450 hal = remove_test_modules(self.halstead_metrics) 

451 

452 raw['docstring_ratio'] = raw.multiline_comment / raw.code 

453 raw.sort_values('docstring_ratio', inplace=True) 

454 html += raw.iplot( 

455 x='fullpath', 

456 kind='barh', 

457 title='Line Count Metrics', 

458 **params 

459 ).to_html() 

460 

461 html += mi.iplot( 

462 x='fullpath', 

463 kind='barh', 

464 title='Maintainability Metrics', 

465 **params 

466 ).to_html() 

467 

468 params['dimensions'] = (900, 500) 

469 

470 cols = ['cyclomatic_complexity', 'cyclomatic_rank'] 

471 html += cc[cols].iplot( 

472 kind='hist', 

473 bins=50, 

474 title='Cyclomatic Metric Distributions', 

475 **params 

476 ).to_html() 

477 

478 cols = [ 

479 'h1', 'h2', 'n1', 'n2', 'vocabulary', 'length', 'calculated_length', 

480 'volume', 'difficulty', 'effort', 'time', 'bugs' 

481 ] 

482 html += hal[cols]\ 

483 .rename(mapper=lambda x: lut[x], axis=1)\ 

484 .iplot( 

485 kind='hist', 

486 bins=50, 

487 title='Halstead Metric Distributions', 

488 **params)\ 

489 .to_html() 

490 

491 html += '\n</body>' 

492 

493 with open(fullpath, 'w') as f: 

494 f.write(html) 

495 

496 return self 

497 

498 def write_tables(self, target_dir): 

499 # type: (Union[str, Path]) -> RadonETL 

500 ''' 

501 Writes metrics tables as HTML files to given directory. 

502 

503 Args: 

504 target_dir (Path or str): Target directory. 

505 

506 Returns: 

507 RadonETL: self. 

508 ''' 

509 def write_table(data, target): 

510 # type: (DataFrame, Path) -> None 

511 html = data.to_html() 

512 

513 # make table sortable 

514 script = '<script ' 

515 script += 'src="http://www.kryogenix.org/code/browser/sorttable/sorttable.js" ' 

516 script += 'type="text/javascript"></script>\n' 

517 html = re.sub('class="dataframe"', 'class="sortable"', html) 

518 html = script + html 

519 

520 with open(target, 'w') as f: 

521 f.write(html) 

522 

523 data = self.data 

524 raw = self.raw_metrics 

525 mi = self.maintainability_index 

526 cc = self.cyclomatic_complexity_metrics 

527 hal = self.halstead_metrics 

528 

529 write_table(data, Path(target_dir, 'all_metrics.html')) 

530 write_table(raw, Path(target_dir, 'raw_metrics.html')) 

531 write_table(mi, Path(target_dir, 'maintainability_metrics.html')) 

532 write_table(cc, Path(target_dir, 'cyclomatic_complexity_metrics.html')) 

533 write_table(hal, Path(target_dir, 'halstead_metrics.html')) 

534 

535 return self