-
Notifications
You must be signed in to change notification settings - Fork 3
/
flask_webhook.py
62 lines (46 loc) · 1.85 KB
/
flask_webhook.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
"""Flask extension leveraging Blueprints for creating application webhooks.
https://github.com/Bogdan-Alexandrescu/Flask-Webhook
https://pypi.python.org/pypi/Flask-Webhook
"""
import sys
from flask import Blueprint, jsonify, make_response, request
__author__ = '@balex'
__license__ = 'MIT'
__version__ = '0.1.0'
class FlaskWebhookHandleException(BaseException): pass
class WebHook(object):
"""The JIRA hook blueprint and handler."""
def __init__(self, url_prefix, name=None, log=None, app=None):
"""Construct the JIRAHook Blueprint."""
self._app = app
self._log = log
self._name = name
self._blueprint = Blueprint(self._name, __name__, url_prefix=url_prefix)
self.handlers = dict()
if app is not None:
self.init_app(app)
def add_route(self, route, methods):
self._blueprint.add_url_rule(route, view_func=self._run_endpoint_handlers, methods=methods)
if self._app:
self.init_app(self._app)
@property
def hook(self):
return self._blueprint
def _run_endpoint_handlers(self):
"""Calls the event handlers handlers attached to the webhook object and returns a json response.
"""
for key, func in self.handlers.items():
try:
func(request)
except Exception:
if self._log:
self._log.error( "Caught exception while handling '%s': %s" % (key, sys.exc_info()) )
raise FlaskWebhookHandleException( "Caught exception while handling '%s': %s" % (key, sys.exc_info()) )
return_message = (dict(status='success', message="'%s' was triggered successfully" % self._name)
if self._name else dict(status='success'))
if self._log:
self._log.info(return_message)
return make_response(jsonify(return_message), 200)
def init_app(self, app):
"""Method used to register the webhooks on the flask app object"""
app.register_blueprint(self._blueprint)