forked from Azure-Samples/azure-sql-db-python-rest-api
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
129 lines (107 loc) · 4.11 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import sys
import os
import json
import pyodbc
import socket
from flask import Flask
from flask_restful import reqparse, abort, Api, Resource
from threading import Lock
from tenacity import *
from opencensus.ext.azure.trace_exporter import AzureExporter
from opencensus.ext.flask.flask_middleware import FlaskMiddleware
from opencensus.trace.samplers import ProbabilitySampler
import logging
# Initialize Flask
app = Flask(__name__)
# Setup Azure Monitor
if 'APPINSIGHTS_KEY' in os.environ:
middleware = FlaskMiddleware(
app,
exporter=AzureExporter(connection_string="InstrumentationKey={0}".format(os.environ['APPINSIGHTS_KEY'])),
sampler=ProbabilitySampler(rate=1.0),
)
# Setup Flask Restful framework
api = Api(app)
parser = reqparse.RequestParser()
parser.add_argument('customer')
# Implement singleton to avoid global objects
class ConnectionManager(object):
__instance = None
__connection = None
__lock = Lock()
def __new__(cls):
if ConnectionManager.__instance is None:
ConnectionManager.__instance = object.__new__(cls)
return ConnectionManager.__instance
def __getConnection(self):
if (self.__connection == None):
application_name = ";APP={0}".format(socket.gethostname())
self.__connection = pyodbc.connect(os.environ['SQLAZURECONNSTR_WWIF'] + application_name)
return self.__connection
def __removeConnection(self):
self.__connection = None
@retry(stop=stop_after_attempt(3), wait=wait_fixed(10), retry=retry_if_exception_type(pyodbc.OperationalError), after=after_log(app.logger, logging.DEBUG))
def executeQueryJSON(self, procedure, payload=None):
result = {}
try:
conn = self.__getConnection()
cursor = conn.cursor()
if payload:
cursor.execute(f"EXEC {procedure} ?", json.dumps(payload))
else:
cursor.execute(f"EXEC {procedure}")
result = cursor.fetchone()
if result:
result = json.loads(result[0])
else:
result = {}
cursor.commit()
except pyodbc.OperationalError as e:
app.logger.error(f"{e.args[1]}")
if e.args[0] == "08S01":
# If there is a "Communication Link Failure" error,
# then connection must be removed
# as it will be in an invalid state
self.__removeConnection()
raise
finally:
cursor.close()
return result
class Queryable(Resource):
def executeQueryJson(self, verb, payload=None):
result = {}
entity = type(self).__name__.lower()
procedure = f"web.{verb}_{entity}"
result = ConnectionManager().executeQueryJSON(procedure, payload)
return result
# Customer Class
class Customer(Queryable):
def get(self, customer_id):
customer = {}
customer["CustomerID"] = customer_id
result = self.executeQueryJson("get", customer)
return result, 200
def put(self):
args = parser.parse_args()
customer = json.loads(args['customer'])
result = self.executeQueryJson("put", customer)
return result, 201
def patch(self, customer_id):
args = parser.parse_args()
customer = json.loads(args['customer'])
customer["CustomerID"] = customer_id
result = self.executeQueryJson("patch", customer)
return result, 202
def delete(self, customer_id):
customer = {}
customer["CustomerID"] = customer_id
result = self.executeQueryJson("delete", customer)
return result, 202
# Customers Class
class Customers(Queryable):
def get(self):
result = self.executeQueryJson("get")
return result, 200
# Create API routes
api.add_resource(Customer, '/customer', '/customer/<customer_id>')
api.add_resource(Customers, '/customers')