Coverage for src/pysqllike/generic/iter_rows.py: 82%
268 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-20 04:53 +0200
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-20 04:53 +0200
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief An class which iterates on any set.
5"""
7from .iter_exceptions import IterException, SchemaException
8from .column_type import ColumnType, ColumnTableType, ColumnGroupType
9from .others_types import NoSortClass, GroupByContainer, NA
12class IterRow:
14 """
15 Defines an iterator which mimic SQL behavior.
16 """
18 def __init__(self, schema=None, anyset=None, as_dict=True):
19 """
20 Initializes the iterator.
22 @param schema list of tuple [ (name, type) ], type can be None id it is unknown or a list of @see cl ColumnType
23 @param anyset any set or iterator following the previous schema (or None if there is not any)
24 @param as_dict in that case, the class iterator returns a list of dictionaries for each row
26 *schema* can be None if anyset if a list of dictionaries ``[ {"col1":value1, ... } ]``.
27 In that case, the construction will build the schema from the first row.
29 .. exref::
30 :title: IterRow with a list of dictionaries
32 ::
34 l = [ {"nom": 10}, {"jean": 40} ]
35 tbl = IterRow (None, l)
37 .. exref::
38 :title: IterRow with a schema
40 ::
42 l = [ ("nom", 10), ("jean", 40) ]
43 schema = [ ("nom", str), ("age", int) ]
44 tbl = IterRow (schema, l)
45 """
46 if schema is None:
47 if len(anyset) == 0:
48 raise ValueError("unable to guess a schema from an empty list")
49 firstrow = anyset[0]
50 if not isinstance(firstrow, dict):
51 raise ValueError(
52 "the first row must be a dictionary, otherwise, the schema cannot be guessed")
53 schema = [(k, type(v)) for k, v in firstrow.items()]
55 if len(schema) == 0:
56 raise IterException("schema is empty")
58 truesch = []
59 for _ in schema:
60 if isinstance(_, ColumnType):
61 c = _.copy(new_owner=self)
62 elif isinstance(_, str):
63 c = ColumnTableType(_, None, owner=self)
64 elif isinstance(_, tuple):
65 if len(_) == 1:
66 c = ColumnTableType(_[0], None, owner=self)
67 elif len(_) == 2:
68 c = ColumnTableType(_[0], _[1], owner=self)
69 else:
70 raise IterException(
71 "schema is not properly defined {0}".format(
72 str(_)))
73 else:
74 raise IterException(
75 "schema is not properly defined {0}".format(
76 str(_)))
77 truesch.append(c)
79 names = set(_.Name for _ in truesch)
80 if len(names) < len(truesch):
81 raise IterException(
82 "some columns share the same name: " +
83 str(truesch))
85 self._schema = truesch
86 self._thisset = anyset
87 self._as_dict = as_dict
89 for sch in self._schema:
90 if sch.Name in self.__dict__:
91 raise IterException(
92 "a column has a wrong name: {0}".format(sch))
93 self.__dict__[sch.Name] = sch
95 @property
96 def Schema(self):
97 """
98 return _schema
99 """
100 return self._schema
102 def __str__(self):
103 """
104 usual
105 """
106 return ";".join([str(_) for _ in self._schema])
108 def __call__(self):
109 """
110 evaluate
111 """
112 return [_() for _ in self._schema]
114 def __iter__(self):
115 """
116 iterator, returns this row,
117 it always outputs a list of list
118 """
119 if self._thisset is None:
120 raise IterException("this class contains no iterator")
122 if self._as_dict:
123 for _ in self._thisset:
124 if isinstance(_, dict):
125 yield {k.Name: _[k.Name] for k in self._schema}
126 else:
127 yield {k.Name: v for k, v in zip(self._schema, _)}
128 else:
129 for _ in self._thisset:
130 if isinstance(_, dict):
131 yield tuple([_[k.Name] for k in self._schema])
132 else:
133 yield _
135 for _ in self._schema:
136 _.set_none()
138 def print_schema(self):
139 """
140 calls @see me print_parent on each column
141 """
142 rows = ["number of columns={0}".format(len(self._schema))]
143 for sch in self._schema:
144 rows.append(sch.print_parent())
145 return "\n".join(rows)
147 def select(self, *nochange, as_dict=True, **changed):
148 """
149 This function takes an undefined number of arguments.
150 It can be used the following way:
152 .. exref::
153 :title: simple select
155 ::
157 tbl = IterRow( ... )
158 it = tbl.select ( tbl.name, tbl.age * 2, old = tbl.age )
160 .. exref::
161 :title: chained select
163 ::
165 tbl = IterRow ( ... )
166 iter = tbl.select(tbl.nom, age2=tbl.age, age3= tbl.age*0.5)
167 iter2 = iter.select(iter.nom, age4=iter.age2*iter.age3)
168 l = list ( iter2 )
170 @param nochange list of fields to keep
171 @param changed list of custom fields
172 @param as_dict returns results as a list of dictionaries [ { "colname": value, ... } ]
173 @return IterRow
175 @warning The function does not guarantee the order of the output columns.
177 .. exref::
178 :title: example with a function
180 ::
182 def myf(x,y) :
183 return x*2.5 + y
184 tbl = IterRow ( ... )
185 iter = tbl.select(tbl.nom, age0= CFT(myf, tbl.age, tbl.age) )
186 res = list(iter)
187 """
188 # newschema = list(nochange) + [(k, None) for k in changed.keys()]
190 for el in nochange:
191 if not isinstance(el, ColumnType):
192 raise IterException(
193 "expecting a ColumnType here not: {0}".format(
194 str(el)))
195 if el._owner != self:
196 raise IterException(
197 "mismatch: all columns should belong to this view, check all columns come from this instance")
199 arow = [v.copy(None) for v in nochange] # we do not know the owner yet
200 for k, v in changed.items():
201 if not isinstance(v, ColumnType):
202 raise IterException(
203 "expecting a ColumnType here not: {0}-{1}".format(type(v), str(v)))
204 v = v.copy(None) # we do not know the owner yet
205 v.set_name(k)
206 arow.append(v)
208 schema = arow
210 for _ in schema:
211 if not isinstance(_, ColumnType):
212 raise TypeError("we expect a ColumnType for column")
214 def itervalues():
215 for row in self._thisset:
216 if isinstance(row, dict):
217 for col in self._schema:
218 col.set(row[col.Name])
219 else:
220 for col, r in zip(self._schema, row):
221 col.set(r)
223 if as_dict:
224 yield {_.Name: _() for _ in schema}
225 else:
226 yield tuple([_() for _ in schema])
228 tbl = IterRow(schema, anyset=itervalues(), as_dict=as_dict)
229 for c in schema:
230 c.set_owner(tbl)
231 return tbl
233 def where(self, condition, as_dict=True, append_condition=False):
234 """
235 This function filters elements from an @see cl IterRow instance.
237 @param condition a ColumnType or an expression of ColumnType
238 @param append_condition append the condition to the schema (for debugging purpose)
239 @param as_dict returns results as a list of dictionaries [ { "colname": value, ... } ]
240 @return IterRow
242 .. exref::
243 :title: where
245 ::
247 tbl = IterRow ( ... )
248 iter = tbl.where(tbl.age == 40)
249 res = list(iter)
251 @warning For operator ``or``, ``and``, ``not``, the syntax is different because they cannot be overriden in Python.
253 .. exref::
254 :title: where with or
256 ::
259 tbl = IterRow ( ... )
260 iter = tbl.where( ( tbl.age == 2).Or( tbl.age == 40))
261 iter2 = tbl.where((tbl.age == 10).Not())
262 """
263 if not isinstance(condition, ColumnType):
264 raise TypeError(
265 "condition should a ColumnType: {0}".format(
266 str(condition)))
268 schema = [v.copy(None)
269 for v in self._schema] # we do not know the owner yet
270 if append_condition:
271 schema.append(condition)
273 def itervalues():
274 for row in self._thisset:
275 if isinstance(row, dict):
276 for col in self._schema:
277 col.set(row[col.Name])
278 else:
279 for col, r in zip(self._schema, row):
280 col.set(r)
282 if condition():
283 if as_dict:
284 yield {_.Name: _() for _ in schema}
285 else:
286 yield tuple([_() for _ in schema])
288 tbl = IterRow(schema, anyset=itervalues(), as_dict=as_dict)
289 for c in schema:
290 c.set_owner(tbl)
291 return tbl
293 def orderby(self, *nochange, as_dict=True, ascending=True):
294 """
295 This function sorts elements from an IterRow instance.
297 @param nochange list of columns used to sort
298 @param ascending order
299 @param as_dict returns results as a list of dictionaries [ { "colname": value, ... } ]
300 @return IterRow
302 .. exref::
303 :title: order by
305 ::
307 l = [ { "nom":"j", "age": 10, "gender":"M"} ,
308 {"nom":"jean", "age":40, "gender":"M"},
309 {"nom":"jeanne", "age":2, "gender":"F"} ]
310 tbl = IterRow(None, l)
312 iter = tbl.orderby(tbl.nom, tbl.age, ascending=False )
313 """
314 schema = [v.copy(None)
315 for v in self._schema] # we do not know the owner yet
317 def itervalues():
318 colsi = None
319 for row in self._thisset:
320 if isinstance(row, dict):
321 for col in self._schema:
322 col.set(row[col.Name])
323 key = tuple(row[k.Name] for k in nochange)
324 else:
325 for col, r in zip(self._schema, row):
326 col.set(r)
327 if colsi is None:
328 colsi = [
329 self._findschema(
330 self._schema,
331 k.Name) for k in nochange]
332 key = tuple(row[k] for k in colsi)
334 if as_dict:
335 yield key, {_.Name: _() for _ in schema}
336 else:
337 yield key, tuple([_() for _ in schema])
339 def itervalues_sort():
340 for key, row in sorted(itervalues(), reverse=not ascending):
341 yield row
343 tbl = IterRow(schema, anyset=itervalues_sort(), as_dict=as_dict)
344 for c in schema:
345 c.set_owner(tbl)
346 return tbl
348 def _findschema(self, schema, name):
349 """
350 look for column index whose name is name
352 @param name column name to search
353 @param schema schama
354 @return position
355 """
356 for i, col in enumerate(schema):
357 if col.Name == name:
358 return i
359 raise IndexError()
361 def groupby(self, *nochange, as_dict=True, **changed):
362 """
363 This function applies a groupby (same behavior as SQL's version)
365 @param nochange list of fields to keep
366 @param changed list of custom fields
367 @param as_dict returns results as a list of dictionaries [ { "colname": value, ... } ]
368 @return IterRow
370 @warning The function does not guarantee the order of the output columns.
372 .. exref::
373 :title: group by
375 ::
377 l = [ { "nom":"j", "age": 10, "gender":"M"} ,
378 {"nom":"jean", "age":40, "gender":"M"},
379 {"nom":"jeanne", "age":2, "gender":"F"} ]
380 tbl = IterRow (None, l)
382 iter = tbl.groupby(tbl.gender, len_nom=tbl.nom.len(), avg_age=tbl.age.avg())
383 """
384 # selftbl = self.orderby(nochange, as_dict=as_dict)
385 # newschema = list(nochange) + [(k, None) for k in changed.keys()]
387 for el in nochange:
388 if not isinstance(el, ColumnType):
389 raise IterException(
390 "expecting a ColumnType here not: {0}".format(
391 str(el)))
392 if el._owner != self:
393 raise IterException(
394 "mismatch: all columns should belong to this view, check all columns come from this instance")
396 arow = [v.copy(None) for v in nochange] # we do not know the owner yet
397 for k, v in changed.items():
398 if not isinstance(v, ColumnType):
399 raise IterException(
400 "expecting a ColumnType here not: {0}-{1}".format(type(v), str(v)))
401 v.set_name(k)
402 arow.append(v)
404 schema = arow
406 for _ in schema:
407 if not isinstance(_, ColumnType):
408 raise TypeError("we expect a ColumnType for column")
410 def to_matrix(iter):
411 mat = list(iter)
412 if isinstance(mat[0], dict):
413 res = {}
414 for k in mat[0]:
415 i = self._findschema(schema, k)
416 col = schema[i]
417 if isinstance(col, ColumnGroupType):
418 temp = GroupByContainer(m[k] for m in mat)
419 col.set(temp)
420 res[k] = col()
421 else:
422 temp = mat[0][k]
423 col.set(temp)
424 res[k] = temp
425 return res
426 else:
427 raise NotImplementedError()
428 # res = []
429 # for i in range(0, len(mat[0])):
430 # res.append(GroupByContainer(m[i] for m in mat))
431 # self._schema[i].set(res[-1])
432 # return res
434 def itervalues():
435 colsi = None
436 for row in self._thisset:
437 if isinstance(row, dict):
438 for col in self._schema:
439 col.set(row[col.Name])
440 key = tuple(row[k.Name] for k in nochange)
441 else:
442 for col, r in zip(self._schema, row):
443 col.set(r)
444 if colsi is None:
445 colsi = [
446 self._findschema(
447 self._schema,
448 k.Name) for k in nochange]
449 key = tuple(row[k] for k in colsi)
451 if as_dict:
452 yield key, NoSortClass({_.Name: _() for _ in schema})
453 else:
454 yield key, NoSortClass(tuple([_() for _ in schema]))
456 def itervalues_group():
457 current = []
458 keycur = None
459 for key, row in sorted(itervalues()):
460 if key != keycur:
461 if len(current) > 0:
462 tom = to_matrix(current)
463 yield tom
464 current = [row.value]
465 keycur = key
466 else:
467 current.append(row.value)
468 if len(current) > 0:
469 tom = to_matrix(current)
470 yield tom
472 tbl = IterRow(schema, anyset=itervalues_group(), as_dict=as_dict)
473 for c in schema:
474 c.set_owner(tbl)
475 return tbl
477 def unionall(self, iter, merge_schema=False, as_dict=True):
478 """
479 Concatenates this table with another one
481 @param iter IterRow
482 @param merge_schema if False, the function expects you find the same schema,
483 otherwise, it merges them (same column name are not duplicated)
484 @param as_dict returns results as a list of dictionaries [ { "colname": value, ... } ]
485 @return IterRow
487 .. exref::
488 :title: union all
490 ::
492 l = [ { "nom":"j", "age": 10, "gender":"M"} ,
493 {"nom":"jean", "age":40, "gender":"M"},
494 {"nom":"jeanne", "age":2, "gender":"F"} ]
495 tbl = IterRow (None, l)
497 iter = tbl.unionall(tbl)
499 .. exref::
500 :title: union all with different schema
502 ::
504 l = [ { "nom":"j", "age": 10, "gender":"M"} ,
505 {"nom":"jean", "age":40, "gender":"M"},
506 {"nom":"jeanne", "age":2, "gender":"F"} ]
507 tbl = IterRow (None, l)
509 l = [ { "nom":"j", "newage": 10, "gender":"M"} ,
510 {"nom":"jean", "newage":40, "gender":"M"},
511 {"nom":"jeanne", "newage":2, "gender":"F"} ]
512 tbl2 = IterRow (None, l)
514 iter = tbl.unionall(tbl2, merge_schema = True)
515 """
517 if merge_schema:
518 names = set(a.Name for a in self._schema)
519 name2 = set(a.Name for a in iter._schema)
520 common = names & name2
522 schema = []
523 for c in common:
524 i = self._findschema(self._schema, c)
525 col = self._schema[i]
526 schema.append(col.copy(None))
528 for col in self._schema:
529 if col.Name not in common:
530 schema.append(col.copy(None))
531 for col in iter._schema:
532 if col.Name not in common:
533 schema.append(col.copy(None))
535 not_in_self = set(
536 c.Name for c in iter._schema if c.Name not in common)
537 not_in_iter = set(
538 c.Name for c in self._schema if c.Name not in common)
540 else:
541 if len(self._schema) != len(self._schema):
542 raise SchemaException(
543 "cannot concatenate, different schema length")
544 names = sorted(a.Name for a in self._schema)
545 name2 = sorted(a.Name for a in iter._schema)
546 for a, b in zip(names, name2):
547 if a != b:
548 raise SchemaException(
549 "cannot concatenate, different schema column: {0} != {1}".format(
550 a,
551 b))
553 # we do not know the owner yet
554 schema = [v.copy(None) for v in self._schema]
556 not_in_self = set()
557 not_in_iter = set()
559 not_in_self = [iter._findschema(iter._schema, c) for c in not_in_self]
560 not_in_iter = [self._findschema(self._schema, c) for c in not_in_iter]
562 def iter_union():
563 for i in not_in_self:
564 iter._schema[i].set(NA())
565 for row in self._thisset:
566 if isinstance(row, dict):
567 for col in self._schema:
568 col.set(row[col.Name])
569 else:
570 for col, r in zip(self._schema, row):
571 col.set(r)
573 if as_dict:
574 yield {_.Name: _() for _ in schema}
575 else:
576 yield tuple([_() for _ in schema])
578 for i in not_in_iter:
579 self._schema[i].set(NA())
580 for row in iter._thisset:
581 if isinstance(row, dict):
582 for col in iter._schema:
583 col.set(row[col.Name])
584 else:
585 for col, r in zip(iter._schema, row):
586 col.set(r)
588 if as_dict:
589 yield {_.Name: _() for _ in schema}
590 else:
591 yield tuple([_() for _ in schema])
593 tbl = IterRow(schema, anyset=iter_union(), as_dict=as_dict)
594 for c in schema:
595 c.set_owner(tbl)
596 return tbl