Coverage for /home/ubuntu/f8s/python/f8s/tools.py: 94%
36 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-28 23:26 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-28 23:26 +0000
1from typing import Any, Callable, Optional # noqa F401
3from pprint import pformat
4import json
5import traceback
7import flask
8import flasgger
9import flask_healthz
10# ------------------------------------------------------------------------------
13'''
14K8s ready Flask REST application tools.
15'''
18swagger = flasgger.Swagger()
19healthz = flask_healthz.Healthz()
22def error_to_response(error):
23 # type: (Exception) -> flask.Response
24 '''
25 Convenience function for formatting a given exception as a Flask Response.
27 Args:
28 error (Exception): Error to be formatted.
30 Returns:
31 flask.Response: Flask response.
32 '''
33 args = [] # type: Any
34 for arg in error.args:
35 if hasattr(arg, 'items'):
36 for key, val in arg.items():
37 args.append(pformat({key: pformat(val)}))
38 else:
39 args.append(str(arg))
40 args = [' ' + x for x in args]
41 args = '\n'.join(args)
42 klass = error.__class__.__name__
43 msg = f'{klass}(\n{args}\n)'
44 return flask.Response(
45 response=json.dumps(dict(
46 error=error.__class__.__name__,
47 args=list(map(str, error.args)),
48 message=msg,
49 code=500,
50 traceback=traceback.format_exc(),
51 )),
52 mimetype='application/json',
53 status=500,
54 )
57def get_app(extensions, live_probe=None, ready_probe=None, testing=False):
58 # type: (list, Optional[Callable], Optional[Callable], bool) -> flask.Flask
59 '''
60 Creates a F8S app.
62 Returns:
63 flask.Flask: Flask app.
64 '''
65 noop = lambda: None
66 if live_probe is None:
67 live_probe = noop
69 if ready_probe is None:
70 ready_probe = noop
72 app = flask.Flask('F8s')
73 app.config['TESTING'] = testing
74 app.config['HEALTHZ'] = dict(live=live_probe, ready=ready_probe)
76 swagger.init_app(app)
77 healthz.init_app(app)
78 for ext in extensions:
79 ext.init_app(app)
81 return app