Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" 

2@file 

3@brief Manages a sqlite3 database. 

4""" 

5import sqlite3 

6import datetime 

7import decimal 

8import numpy 

9 

10 

11class DBException(Exception): 

12 """ 

13 Exception raised by class @see cl Database. 

14 """ 

15 pass 

16 

17 

18class Database: 

19 """ 

20 Common functions about sqlite3. 

21 """ 

22 

23 _field_option = ["PRIMARYKEY", "AUTOINCREMENT", "AUTOFILL"] 

24 

25 def __init__(self, dbfile): 

26 """ 

27 @param dbfile filename or ``:memory:`` 

28 """ 

29 self._sql_file = dbfile 

30 self._connection = None 

31 

32 def get_file(self): 

33 """ 

34 Returns the file name. 

35 """ 

36 return self._sql_file 

37 

38 def _check_connection(self): 

39 """ 

40 Check the SQL connection. 

41 """ 

42 if self._connection is None: 

43 message = "Use connect method before doing operation on this database." 

44 raise Exception(message) 

45 

46 def _is_memory(self): 

47 """ 

48 Tells if the database takes place in memory (``:memory:``). 

49 """ 

50 return self._sql_file == ":memory:" 

51 

52 def connect(self): 

53 """ 

54 Opens a connection to the database. 

55 """ 

56 if self._is_memory(): 

57 if self._connection is None: 

58 self._connection = sqlite3.connect(self._sql_file) 

59 elif self._connection is not None: 

60 raise Exception("A previous connection was not closed.") 

61 else: 

62 self._connection = sqlite3.connect(self._sql_file) 

63 

64 def close(self): 

65 """ 

66 Close the database. 

67 """ 

68 self._check_connection() 

69 if self._is_memory(): 

70 # We should not close, otherwise, we lose the data. 

71 # self._connection = None 

72 pass 

73 else: 

74 self._connection.close() 

75 self._connection = None 

76 

77 def commit(self): 

78 """ 

79 Call this function after any insert request. 

80 """ 

81 self._check_connection() 

82 self._connection.commit() 

83 

84 def execute(self, request): 

85 """ 

86 Open a cursor with a query and return it to the user. 

87 

88 @param request SQL request 

89 @return cursor 

90 """ 

91 # classic ways 

92 self._check_connection() 

93 cur = self._connection.cursor() 

94 try: 

95 cur.execute(request) 

96 except Exception as e: 

97 raise DBException( 

98 "Unable to execute a SQL request (1) (file '%s')" % 

99 self.get_file(), e, request) from e 

100 return cur 

101 

102 def get_table_list(self): 

103 """ 

104 Returns the list of tables. 

105 

106 @return the table list 

107 """ 

108 self._check_connection() 

109 request = """ SELECT name 

110 FROM (SELECT * FROM sqlite_master UNION ALL SELECT * FROM sqlite_temp_master) AS temptbl 

111 WHERE type in('table','temp') AND name != 'sqlite_sequence' ORDER BY name;""" 

112 

113 select = self._connection.execute(request) 

114 res = [] 

115 for el in select: 

116 res.append(el[0]) 

117 return res 

118 

119 def create_table(self, table, columns, temporary=False): 

120 """ 

121 Creates a table. 

122 

123 @param table table name 

124 @param columns columns definition, dictionary ``{ key:(column_name,python_type) }`` 

125 if ``PRIMARYKEY`` is added, the key is considered as the primary key. 

126 Example:: 

127 

128 columns = { -1:("key", int, "PRIMARYKEY", "AUTOINCREMENT"), 

129 0:("name",str), 1:("number", float) } 

130 

131 @param temporary if True the table is temporary 

132 @return cursor 

133 """ 

134 if table == "sqlite_sequence": 

135 raise DBException("Unable to create a table named 'sql_sequence'.") 

136 

137 tables = self.get_table_list() 

138 if table in tables: 

139 raise DBException("Tables '{0}' is already present.".format(table)) 

140 

141 if isinstance(columns, list): 

142 columns_ = {} 

143 for i, v in enumerate(columns): 

144 columns_[i] = v 

145 columns = columns_ 

146 

147 if temporary: 

148 sql = "CREATE TEMPORARY TABLE " + table + "(" 

149 else: 

150 sql = "CREATE TABLE " + table + "(" 

151 col = [] 

152 for c, val in columns.items(): 

153 if isinstance(val[1], tuple): 

154 v = val[1][0] 

155 else: 

156 v = val[1] 

157 

158 if v is str: 

159 col.append(val[0] + " TEXT") 

160 elif v is int: 

161 col.append(val[0] + " INTEGER") 

162 elif v is float: 

163 col.append(val[0] + " FLOAT") 

164 elif v is numpy.int64: 

165 col.append(val[0] + " INTEGER") 

166 elif v is numpy.float64: 

167 col.append(val[0] + " FLOAT") 

168 elif v is decimal.Decimal: 

169 col.append(val[0] + " Decimal") 

170 elif v is datetime.datetime: 

171 col.append(val[0] + " DATETIME") 

172 else: 

173 raise DBException( 

174 "Unable to add column '{0}' ... {1} v={2}".format(c, val, v)) 

175 

176 if "PRIMARYKEY" in val: 

177 if val[1] != int: 

178 raise DBException( 

179 "unable to create a primary key on something differont from an integer (%s)" % 

180 str(val)) 

181 col[-1] += " PRIMARY KEY" 

182 if "AUTOINCREMENT" in val: 

183 if self.isMSSQL(): 

184 col[-1] += " IDENTITY(0,1)" 

185 else: 

186 col[-1] += " AUTOINCREMENT" 

187 

188 sql += ",\n ".join(col) 

189 sql += ");" 

190 return self.execute(sql) 

191 

192 def has_rows(self, table): 

193 """ 

194 Tells if a table has rows. 

195 

196 @param table table name 

197 @return boolean 

198 """ 

199 res = list(self.execute("SELECT * FROM {0} LIMIT 1".format(table))) 

200 return len(res) > 0