Coverage for /home/ubuntu/f8s/python/f8s/tools.py: 94%

36 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-02-28 23:26 +0000

1from typing import Any, Callable, Optional # noqa F401 

2 

3from pprint import pformat 

4import json 

5import traceback 

6 

7import flask 

8import flasgger 

9import flask_healthz 

10# ------------------------------------------------------------------------------ 

11 

12 

13''' 

14K8s ready Flask REST application tools. 

15''' 

16 

17 

18swagger = flasgger.Swagger() 

19healthz = flask_healthz.Healthz() 

20 

21 

22def error_to_response(error): 

23 # type: (Exception) -> flask.Response 

24 ''' 

25 Convenience function for formatting a given exception as a Flask Response. 

26 

27 Args: 

28 error (Exception): Error to be formatted. 

29 

30 Returns: 

31 flask.Response: Flask response. 

32 ''' 

33 args = [] # type: Any 

34 for arg in error.args: 

35 if hasattr(arg, 'items'): 

36 for key, val in arg.items(): 

37 args.append(pformat({key: pformat(val)})) 

38 else: 

39 args.append(str(arg)) 

40 args = [' ' + x for x in args] 

41 args = '\n'.join(args) 

42 klass = error.__class__.__name__ 

43 msg = f'{klass}(\n{args}\n)' 

44 return flask.Response( 

45 response=json.dumps(dict( 

46 error=error.__class__.__name__, 

47 args=list(map(str, error.args)), 

48 message=msg, 

49 code=500, 

50 traceback=traceback.format_exc(), 

51 )), 

52 mimetype='application/json', 

53 status=500, 

54 ) 

55 

56 

57def get_app(extensions, live_probe=None, ready_probe=None, testing=False): 

58 # type: (list, Optional[Callable], Optional[Callable], bool) -> flask.Flask 

59 ''' 

60 Creates a F8S app. 

61 

62 Returns: 

63 flask.Flask: Flask app. 

64 ''' 

65 noop = lambda: None 

66 if live_probe is None: 

67 live_probe = noop 

68 

69 if ready_probe is None: 

70 ready_probe = noop 

71 

72 app = flask.Flask('F8s') 

73 app.config['TESTING'] = testing 

74 app.config['HEALTHZ'] = dict(live=live_probe, ready=ready_probe) 

75 

76 swagger.init_app(app) 

77 healthz.init_app(app) 

78 for ext in extensions: 

79 ext.init_app(app) 

80 

81 return app