-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathModel.py
231 lines (180 loc) · 5.21 KB
/
Model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# -*- coding: utf8 -*-
"""
This module provides the needed tools to execute the basic CRUD operations
on a sqlite database (Insert, update, delete, load all elements, load
elements from a pk, load rows from a given condition...).
"""
import config
import sqlite3
class Model(object):
"""
This is the base class.
If a model class has to use it, it must extends it.
From this point, the table associated to the class will be the module name,
the table's primary key will be id_<module-name>
"""
_db = None
#public:
@staticmethod
def fetchAllRows(query, params={}):
"""
c.fetchAllRows(query, params) -> list()
Returns all the rows of the given query with the given parameters.
@param query string Sql query to execute
@param params dict the query's parameters
@return list the result of the query, will be a list of dict, and
an empty list if there's no result.
"""
Model._connect()
c = Model._db.cursor()
result = []
currentRow = {}
nbCols = 0
c.execute(query, list(params))
#~ Get the columns names
column_names = [d[0] for d in c.description]
result = c.fetchall()
resultList = list()
for r in result:
resultList.append(Model._createRow(r, column_names))
return resultList
@staticmethod
def fetchOneRow(query, params={}):
"""
c.fetchOneRow(query, params) -> dict()
Returns the first row of the given query with the given parameters.
@param query string Sql query to execute
@param params dict the query's parameters
@return dict the result of the query, will be a dict, and
an empty dict if there's no result.
"""
Model._connect()
c = Model._db.cursor()
result = dict()
nbCols = 0
c.execute(query, params)
r = c.fetchone()
#~ Get the colums names
column_names = [d[0] for d in c.description]
if r is not None:
result = Model._createRow(r, column_names)
return result
@classmethod
def save(cls, fields, where=list()):
"""
c.save(fields, where)
Save a row in the database.
If the where parameter is not null, an update will be executed, else
the row will be inserted.
@param fields dict of fields with as keys the table's fields and as
values, the row's values.
@param where tuple with as first element the SQL where and as second
element the where's params
"""
#~ insert
if len(where) == 0:
cls.insert(fields)
else: # update
cls.update(fields, where)
@classmethod
def insert(cls, fields):
"""
Insert a new row in the database
"""
Model._connect()
c = Model._db.cursor()
fields = cls.filterFields(fields)
fieldsNames = list(map(lambda x: '"' + x + '"', fields.keys()))
values = ['?'] * len(fieldsNames)
query = "INSERT INTO %s (%s) VALUES (%s)" % (
cls.__module__, ','.join(fieldsNames), ','.join(values)
)
c.execute(query, list(fields.values()))
Model._db.commit()
return c.lastrowid
@classmethod
def update(cls, fields, where):
Model._connect()
c = Model._db.cursor()
fields = cls.filterFields(fields)
fieldsNames = map(lambda x: '"' + x + '" = ?', fields.keys())
query = "UPDATE %(table)s SET %(values)s WHERE %(where)s" %\
{'table': cls.__module__, 'values': ','.join(fieldsNames), 'where': where[0]}
c.execute(query, list(fields.values()) + where[1])
Model._db.commit()
@classmethod
def delete(cls, where):
Model._connect()
c = Model._db.cursor()
query = "DELETE FROM %(table)s WHERE %(where)s" %\
{'table': cls.__module__, 'where': where[0]}
r = c.execute(query, where[1])
Model._db.commit()
return r
#protected:
@staticmethod
def _connect():
if Model._db is None:
Model._db = sqlite3.connect(config.db)
return True
@staticmethod
def _createRow(sqliteRow, columns):
row = {}
for i, v in enumerate(sqliteRow):
row[columns[i]] = v
return row
@classmethod
def loadAll(cls, fields=None):
fields = cls.prepareFieldsForSelect(fields)
query = "\
SELECT\
%(fields)s\
FROM\
%(table)s\
" % {'fields': fields, 'table': cls.__module__}
return Model.fetchAllRows(query, {})
@classmethod
def loadById(cls, id, fields=None):
fields = cls.prepareFieldsForSelect(fields)
table = cls.__module__
query = "\
SELECT\
%(fields)s\
FROM\
%(table)s\
WHERE\
%(where)s\
" % {'fields': fields, 'table': table, 'where': 'id_' + table + ' = ?'}
return Model.fetchOneRow(query, [id])
@classmethod
def loadBy(cls, filters, fields=None):
fields = cls.prepareFieldsForSelect(fields)
filters = cls.filterFields(filters)
filtersNames = map(lambda x: '"' + x + '" = ?', filters.keys())
query = "\
SELECT\
%(fields)s\
FROM\
%(table)s\
WHERE\
%(where)s\
" % {
'fields': fields,
'table': cls.__module__,
'where': ' AND '.join(filtersNames)
}
return Model.fetchAllRows(query, filters.values())
@classmethod
def prepareFieldsForSelect(cls, fields=None):
if not fields:
fields = cls.fields
if isinstance(fields, list):
fields = ', '.join(fields)
elif isinstance(fields, dict):
fields = ', '.join(map(lambda x: fields[x] + ' AS ' + x, fields))
elif not isinstance(fields, basestring):
raise TypeError('Unexpected type of fields (%s)' % type(fields))
return fields
@classmethod
def filterFields(cls, fields):
return dict((k, fields[k]) for k in cls.fields if k in fields)