Coverage for pyquickhelper/server/filestore_sqlite.py: 96%

163 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-03 02:21 +0200

1# -*- coding:utf-8 -*- 

2""" 

3@file 

4@brief Simple class to store and retrieve files with a sqlite3 detabase. 

5""" 

6import os 

7import io 

8import sqlite3 

9import json 

10from datetime import datetime 

11import pandas 

12 

13 

14class SqlLite3FileStore: 

15 """ 

16 Simple file storage implemented with :epkg:`python:sqlite3`. 

17 

18 :param path: location of the database. 

19 """ 

20 @staticmethod 

21 def v2s(value, s="'"): 

22 if isinstance(value, str): 

23 return f"{s}{value}{s}" 

24 return str(value) 

25 

26 def __init__(self, path="_file_store_.db3"): 

27 self.path_ = path 

28 self._create() 

29 

30 def _get_column_table(self, table, con=None): 

31 close = con is None 

32 if con is None: 

33 con = self._get_connexion() 

34 cur = con.cursor() 

35 res = cur.execute(f"PRAGMA table_info({table});") 

36 res = cur.fetchall() 

37 if close: 

38 con.close() 

39 return res 

40 

41 def _check_same_column(self, table, columns, con=None): 

42 cols = self._get_column_table(table, con=con) 

43 names = [_[1] for _ in cols] 

44 return names == columns 

45 

46 def _get_connexion(self): 

47 return sqlite3.connect(self.path_) 

48 

49 def _create(self): 

50 """ 

51 Creates the database if it does not exists. 

52 """ 

53 con = self._get_connexion() 

54 cur = con.cursor() 

55 cur.execute("SELECT name FROM sqlite_master WHERE type='table';") 

56 res = cur.fetchall() 

57 commit = False 

58 if ('files',) not in res: 

59 cur.execute( 

60 '''CREATE TABLE files 

61 (id INTEGER PRIMARY KEY, date TEXT, name TEXT, 

62 format TEXT, metadata TEXT, team TEXT, 

63 project TEXT, version INT, content BLOB)''') 

64 commit = True 

65 

66 if (('data',) in res and not self._check_same_column( 

67 "data", ["id", "idfile", "name", "value", "date", "comment"], 

68 con=con)): 

69 cur.execute("DROP TABLE data;") 

70 con.commit() 

71 cur.execute("SELECT name FROM sqlite_master WHERE type='table';") 

72 res = cur.fetchall() 

73 

74 if ('data',) not in res: 

75 cur.execute( 

76 '''CREATE TABLE data 

77 (id INTEGER PRIMARY KEY, idfile INTEGER, 

78 name TEXT, value REAL, date TEXT, comment TEXT)''') 

79 commit = True 

80 if commit: 

81 con.commit() 

82 con.close() 

83 

84 def submit(self, name, content, format=None, date=None, metadata=None, 

85 team=None, project=None, version=None): 

86 """ 

87 Submits a file to the database. 

88 

89 :param name: filename 

90 :param content: file content (it can be a dataframe) 

91 :param format: format 

92 :param date: date, by default now 

93 :param metadata: addition information 

94 :param team: another name 

95 :param project: another name 

96 :param version: version 

97 :return: added data as a dictionary (no content) 

98 """ 

99 if date is None: 

100 date = datetime.now() 

101 date = date.isoformat() 

102 if isinstance(metadata, dict): 

103 metadata = json.dumps(metadata) 

104 elif metadata is not None: 

105 raise TypeError( 

106 "metadata must be None or a dictionary.") 

107 if isinstance(content, pandas.DataFrame): 

108 st = io.StringIO() 

109 content.to_csv(st, index=False, encoding="utf-8") 

110 content = st.getvalue() 

111 if format is None: 

112 format = "df" 

113 if format is None: 

114 format = os.path.splitext(name)[-1] 

115 record = dict(name=name, content=content, format=format, 

116 metadata=metadata, team=team, project=project, 

117 version=version, date=date) 

118 fields = [] 

119 values = [] 

120 for k, n in record.items(): 

121 if n is None: 

122 continue 

123 fields.append(k) 

124 if isinstance(n, str): 

125 values.append(n.replace("\\", "\\\\").replace("'", "''")) 

126 else: 

127 values.append(n) 

128 sqlite_insert_blob_query = """ 

129 INSERT INTO files (%s) VALUES (%s)""" % ( 

130 ",".join(fields), ",".join(map(SqlLite3FileStore.v2s, values))) 

131 con = self._get_connexion() 

132 cur = con.cursor() 

133 cur.execute(sqlite_insert_blob_query) 

134 con.commit() 

135 con.close() 

136 output = dict(name=name, format=format, 

137 metadata=metadata, team=team, project=project, 

138 version=version, date=date) 

139 return {k: v for k, v in output.items() if v is not None} 

140 

141 def submit_data(self, idfile, name, value, date=None, comment=None): 

142 """ 

143 Submits data to the database. 

144 

145 :param idfile: refers to database files 

146 :param date: date, by default now 

147 :param name: name 

148 :param value: data value 

149 :param comment: additional comment 

150 :return: added data 

151 """ 

152 if date is None: 

153 date = datetime.now() 

154 date = date.isoformat() 

155 if comment is None: 

156 comment = "" 

157 

158 fields = ['idfile', 'date', 'name', 'value', 'comment'] 

159 values = [idfile, date, name, value, comment] 

160 sqlite_insert_blob_query = """ 

161 INSERT INTO data (%s) VALUES (%s)""" % ( 

162 ",".join(fields), ",".join("%r" % v for v in values)) 

163 con = self._get_connexion() 

164 cur = con.cursor() 

165 cur.execute(sqlite_insert_blob_query) 

166 con.commit() 

167 con.close() 

168 

169 def _enumerate(self, condition, fields): 

170 con = self._get_connexion() 

171 cur = con.cursor() 

172 query = '''SELECT %s FROM files %s %s''' % ( 

173 ",".join(fields), 

174 "WHERE" if len(condition) > 0 else "", 

175 " AND ".join(condition)) 

176 res = cur.execute(query) 

177 

178 for line in res: 

179 res = {k: v for k, v in zip(fields, line)} # pylint: disable=R1721 

180 if 'format' in res and 'content' in res and res['format'] == 'df': 

181 st = io.StringIO(res['content']) 

182 df = pandas.read_csv(st, encoding="utf-8") 

183 res['content'] = df 

184 if 'metadata' in res and res['metadata']: 

185 res['metadata'] = json.loads(res['metadata']) 

186 yield res 

187 con.close() 

188 

189 def enumerate_content(self, name=None, format=None, date=None, metadata=None, 

190 team=None, project=None, version=None): 

191 """ 

192 Queries the database, enumerates the results, 

193 returns the content as well. 

194 

195 :param name: filename 

196 :param format: format 

197 :param date: date, by default now 

198 :param metadata: addition information 

199 :param team: another name 

200 :param project: another name 

201 :param version: version 

202 :return: results 

203 """ 

204 record = dict(name=name, format=format, 

205 metadata=metadata, team=team, project=project, 

206 version=version, date=date) 

207 cond = [] 

208 for k, v in record.items(): 

209 if v is None: 

210 continue 

211 cond.append('%s=%s' % (k, SqlLite3FileStore.v2s(v, '"'))) 

212 fields = ["id", "name", "format", "date", "metadata", 

213 "team", "project", "version", "content"] 

214 for it in self._enumerate(cond, fields): 

215 yield it 

216 

217 def enumerate(self, name=None, format=None, date=None, metadata=None, 

218 team=None, project=None, version=None): 

219 """ 

220 Queries the database, enumerates the results. 

221 

222 :param name: filename 

223 :param format: format 

224 :param date: date, by default now 

225 :param metadata: addition information 

226 :param team: another name 

227 :param project: another name 

228 :param version: version 

229 :return: results 

230 """ 

231 record = dict(name=name, format=format, 

232 metadata=metadata, team=team, project=project, 

233 version=version, date=date) 

234 cond = [] 

235 for k, v in record.items(): 

236 if v is None: 

237 continue 

238 cond.append('%s=%s' % (k, SqlLite3FileStore.v2s(v, '"'))) 

239 fields = ["id", "name", "format", "date", "metadata", 

240 "team", "project", "version"] 

241 for it in self._enumerate(cond, fields): 

242 yield it 

243 

244 def enumerate_data(self, idfile=None, name=None, join=False, 

245 project=None): 

246 """ 

247 Queries the database, enumerates the results. 

248 

249 :param idfile: file identifier 

250 :param name: value name, None if not specified 

251 :param join: join with the table *files* 

252 :param project: filter by project 

253 :return: results 

254 """ 

255 fields2 = ['name', 'project', 'team', 'version'] 

256 

257 record = dict(name=name, idfile=idfile, project=project) 

258 cond = [] 

259 for k, v in record.items(): 

260 if v is None: 

261 continue 

262 if join: 

263 if k in fields2: 

264 if not join: 

265 raise RuntimeError( 

266 "join must be true if metrics are " 

267 "filtered by project.") 

268 cond.append(f'B.{k}="{v}"') 

269 else: 

270 cond.append(f'data.{k}="{v}"') 

271 else: 

272 cond.append('%s=%s' % (k, SqlLite3FileStore.v2s(v, '"'))) 

273 

274 con = self._get_connexion() 

275 cur = con.cursor() 

276 if join: 

277 fields = ["data.id", "idfile", "data.name", "data.date", 

278 "data.value", "data.comment"] 

279 query = ''' 

280 SELECT %s, %s 

281 FROM data INNER JOIN files AS B on B.id = idfile 

282 %s %s''' % ( 

283 ",".join(fields), 

284 ",".join(map(lambda s: "B.%s" % s, fields2)), 

285 "WHERE" if len(cond) > 0 else "", 

286 " AND ".join(cond)) 

287 else: 

288 fields = ["id", "idfile", "name", "date", "value", "comment"] 

289 query = '''SELECT %s FROM data %s %s''' % ( 

290 ",".join(fields), "WHERE" if len(cond) > 0 else "", 

291 " AND ".join(cond)) 

292 res = cur.execute(query) 

293 

294 if join: 

295 fields = ([s.replace('data.', '') for s in fields] + 

296 ['name_f', 'project', 'team', 'version']) 

297 for line in res: 

298 res = {k: v for k, v in zip(fields, line) # pylint: disable=R1721 

299 if v is not None} 

300 yield res 

301 con.close()