Source code for pyensae.sql.file_text_binary_columns

# coding: latin-1
"""
contains a class which iterations on rows of a text file structured as a table.



:githublink:`%|py|8`
"""


import re
import os
import decimal

from pyquickhelper.loghelper import noLOG
from pyquickhelper.loghelper.flog import GetPath
from .file_text_binary import TextFile


[docs]class TextFileColumns(TextFile): """ This class opens a text file as if it were a binary file. It can deal with null characters. The file is interpreted as a TSV file or file containing columns. The separator is found automatically. The columns are assumed to be in the first line but it is not mandatory. It walks along a file through an iterator, every line is automatically converted into a dictionary ``{ column : value }``. If the class was able to guess what type is which column, the conversion will automatically take place. :: f = TextFileColumns(filename) # filename is a file # the separator is unknown --> the class automatically determines it # as well as the columns and their type f.open() for d in f: print(d) # d is a dictionary f.close() .. list-table:: :widths: auto :header-rows: 1 * - attribute - meaning * - _force_header - there is a header even if not detected * - _force_noheader - there is no header even if detected * - _changes - replace the columns name * - _regexfix - impose a regular expression to interpret a line instead of the automatically built one * - _filter_dict - it is a function which takes a dictionary and returns a boolean which tells if the line must considered or not * - _fields - name of the columns (if there is no header) Spaces and non-ascii characters cannot be used to name a column. This name must be a named group for a regular expression. :githublink:`%|py|48` """
[docs] def __init__(self, filename, errors=None, fLOG=noLOG, force_header=False, changes=None, force_noheader=False, regex=None, filter=None, fields=None, keep_text_when_bad_type=False, break_at=-1, strip_space=True, force_sep=None, nb_line_guess=100, mistake=3, encoding="utf-8", strict_separator=False): """ :param filename: filename :param errors: see str (errors = ...) :param fLOG: LOG function, see `fLOG <http://www.xavierdupre.fr/app/pyquickhelper/helpsphinx/ pyquickhelper/loghelper/flog.html#pyquickhelper.loghelper.flog.fLOG>`_ :param force_header: defines the first line as columns header whatever is it relevant or not :param changes: to change the column name, gives the correspondence, example: { "query":"query___" }, it can be a list if there is no header and you want to name any column :param force_noheader: there is no header at all :param regex: specify a different regular expression (only if changes is a list) if it is a dictionary, the class will replace the default by the one associated in regex for this field :param filter: None if there is no filter, otherwise it is a function which takes a dictionary and returns a boolean which tells if the line must considered or not :param fields: when the header is not here, these fields will name the columns :param keep_text_when_bad_type: keep the value when the conversion type does not word :param break_at: if != -1, stop when this limit is reached :param strip_space: remove space around columns if True :param force_sep: if != None, impose a column separator :param nb_line_guess: number of lines used to guess types :param mistake: not more than mistake conversion in numbers are allowed :param encoding: encoding :param strict_separator: strict number of columns, it assumes there is no separator in the content of every column :githublink:`%|py|80` """ if changes is None: changes = {} TextFile.__init__(self, filename, errors, fLOG=fLOG, encoding=encoding) self._force_header = force_header self._force_noheader = force_noheader self._changes = changes self._regexfix = regex self._filter_dict = filter self._fields = fields self._keep_text_when_bad_type = keep_text_when_bad_type self._break_at = break_at self._strip_space = strip_space self._force_sep = force_sep self._nb_guess_line = nb_line_guess self._mistake = mistake self._strict_separator = strict_separator self._encoding = encoding if isinstance(changes, list): hhhh, _ = 0, len(changes) while _ > 0: hhhh, _ = hhhh, _ / 10 # pylint: disable=W0127 forma_ = "c%0" + str(hhhh) + "d" self._changes = {} for i, c in enumerate(changes): self._changes[forma_ % i] = c if self._regexfix is not None and \ not isinstance(self._regexfix, dict) and \ "(?P<" not in self._regexfix: reg = re.compile("[(](.+?)[)]") fi = reg.findall(self._regexfix) if len(fi) != len(changes): raise Exception( "not the same number of fields in regular expression (%d,%d):\n%s\n%s" % (len(fi), len(changes), str(fi), str(changes))) exp = [] for a, b in zip(fi, changes): s = "(?P<%s>%s)" % (b, a) exp.append(s) p = self._regexfix.find(")") + 1 s = self._regexfix[p] self._regexfix = s.join(exp) self.LOG("split: ", fi) self.LOG("new regex: ", self._regexfix) else: self.LOG(" TextFileColumns (1): regex: ", self._regexfix) else: self.LOG(" TextFileColumns (2): regex: ", self._regexfix)
[docs] def __str__(self): """ Returns the header. :githublink:`%|py|137` """ return str(self.__dict__)
[docs] def get_columns(self): """ :return: the columns :githublink:`%|py|143` """ if "_columns" not in self.__dict__: raise Exception("there is no available columns") return self._columns
[docs] def open(self): """ Opens the file and find out if there is a header, what are the columns, what are their type... any information about which format was found is logged. :githublink:`%|py|152` """ if "_header" not in self.__dict__: header, columns, sep, regex = self.guess_columns(force_header=self._force_header, changes=self._changes, force_noheader=self._force_noheader, fields=self._fields, regex=self._regexfix if isinstance( self._regexfix, dict) else {}, force_sep=self._force_sep, nb=self._nb_guess_line, mistake=self._mistake) if self._regexfix is not None and not isinstance( self._regexfix, dict): regex = self._regexfix self._header = header self._columns = columns self._sep = sep try: self._regex = re.compile(regex) except Exception as e: raise RuntimeError( # pylint: disable=W0707 "algorithm problem: (type %r, %r)\nunable to understand a regular expression (file %r)\nexp: %r" % (str(type(e)), str(e), self.filename, regex)) self._name = {} self._nb = 0 self._conv = {} for k, v in self._columns.items(): self._name[v[0]] = (k, v[1]) if v[1] in [int, float, decimal.Decimal]: self._conv[v[0]] = v[1] self._nb += 1 TextFile.open(self)
[docs] def close(self): """ Closes the file and remove all information related to the format, next time it is opened, the format will be checked again. :githublink:`%|py|190` """ TextFile.close(self) self._nb -= 1 if self._nb == 0: del self.__dict__["_header"] del self.__dict__["_columns"] del self.__dict__["_regex"] del self.__dict__["_name"] del self.__dict__["_conv"]
[docs] def __iter__(self): """ :return: a dictionary ``{ column_name: value }`` :githublink:`%|py|203` """ class tempo__: def __init__(self, r): self.res = r def groupdict(self): return self.res if "_header" not in self.__dict__: raise Exception("file not open %s" % self.filename) regex_simple = re.compile(self._regex.pattern.replace(">.*)", ">.*?)")) nb = 0 nberr = 0 nbert = 0 for line in TextFile.__iter__(self): if nb == 0 and self._header: nb += 1 continue tempc = line.split(self._sep) if len(tempc) == len(self._columns): res = {} for i, a in enumerate(tempc): res[self._columns[i][0]] = a r = tempo__(res) elif not self._strict_separator: if len(tempc) < len(self._columns): # impossible r = None else: # conflicts... r = regex_simple.match(line) if r is None: r = self._regex.match(line) else: r = None if r is None: if nberr == 0: self.LOG(self._regex.pattern) self.LOG( "error regex", nberr, "unable to interpret line ", nb, ": ", repr(line)) nberr += 1 if nberr * 10 > nb and nberr > 4: message = "pattern: %s\n line: %s" % ( regex_simple.pattern, line) raise Exception( "(a) there are probably too many errors %d (%d)\n%s" % (nberr, nb, message)) else: res = r.groupdict() if self._strip_space: for k in res: res[k] = res[k].strip() giveup = False for k in res: if k in self._conv: try: if len(res[k]) == 0 and (self._conv[k] == int or self._conv[ k] == float or self._conv[k] == decimal.Decimal): ttt = self._conv[k](0) else: ttt = self._conv[k](res[k]) res[k] = ttt except ValueError: nbert += 1 if self._keep_text_when_bad_type: if nbert % 1000 == 1: self.LOG( "error type", nbert, "unable to interpret line (but keep it) ", nb, "value", repr( res[k]), " type ", repr( self._conv[k]), " line ", repr(line)) else: self.LOG( "error type", nbert, "unable to interpret line ", nb, "value", repr( res[k]), " type ", repr( self._conv[k]), " line ", repr(line)) if nbert * 10 > nb and nbert > 4: message = "pattern: %s\n line: %s" % ( regex_simple.pattern, line) raise RuntimeError( # pylint: disable=W0707 "(b) there are probably too many errors %r\n%r" % (nberr, message)) giveup = True break if giveup: continue if self._filter_dict is None or self._filter_dict(res): yield res nb += 1 if self._break_at != -1 and nb > self._break_at: break
[docs] @staticmethod def _store(output, la, encoding="utf-8"): """ Stores a list of dictionaries into a file (add a header). :param output: filename :param la: list of dictionary key:value :param encoding: encoding .. warning:: format is utf-8 :githublink:`%|py|325` """ sepline = "\n" # GetSepLine () f = open(output, "w", encoding=encoding) nbline = 0 for d in la: if nbline == 0: keys = list(d.keys()) keys.sort() f.write("\t".join(keys) + sepline) val = [str(d[k]) for k in keys] s = "\t".join(val) f.write(s + sepline) nbline += 1 f.close()
[docs] def sort(self, output, key, maxmemory=2 ** 28, folder=None, fLOG=noLOG): """ Sorts a text file, even a big one, one or several columns gives the order. :param output: output file result :param key: lines sorted depending of these columns :param maxmemory: a file is split into smaller files which contains not more than maxmemory lines :param folder: the function needs to create temporary files, this folder will contain them before they get removed :param fLOG: logging function :return: .. warning:: We assume this file is not opened. :githublink:`%|py|355` """ if isinstance(key, str): key = (key,) if folder is None: folder = GetPath() if not os.path.exists(folder): raise Exception("unable to find folder %s" % folder) try: file = open(output, "w", encoding=self._encoding) file.close() except Exception as e: raise RuntimeError( # pylint: disable=W0707 "Unable to create file %r, reason: %r" % (output, str(e))) self.LOG("sorting file ", self.filename) #root = self.filename.replace (":", "_").replace ("/", "_").replace ("\\", "_").replace (".", "_") files = [] memo = [] self.open() for line in self: try: k = tuple([line[k] for k in key]) except KeyError as e: raise Exception("unable to find one column in\n{0}".format( self.get_columns())) from e memo.append((k, line)) if len(memo) > maxmemory: memo.sort(key=lambda el: el[0]) memo = [la[1] for la in memo] tempout = os.path.join(folder, "root_%05d.txt" % len(files)) self.LOG("writing file %d lines in " % len(memo), tempout) TextFileColumns._store(tempout, memo) files.append(tempout) memo = [] if len(memo) > 0: memo.sort(key=lambda el: el[0]) memo = [la[1] for la in memo] tempout = os.path.join(folder, "root_%05d.txt" % len(files)) self.LOG("writing file %d lines in " % len(memo), tempout) TextFileColumns._store(tempout, memo) files.append(tempout) memo = [] self.close() TextFileColumns.fusion( key, files, output, force_header=self._force_header, fLOG=self.LOG) for m in files: self.LOG("removing ", m) os.remove(m)
[docs] @staticmethod def fusion(key, files, output, force_header=False, encoding="utf-8", fLOG=noLOG): """ Does a fusion between several files with the same columns (different order is allowed). :param key: columns to be compared :param files: list of files :param output: output file :param force_header: impose the first line as a header :param encoding: encoding :param fLOG: logging function .. warning:: We assume all files are sorted depending on columns in key :githublink:`%|py|426` """ fh = [] for f in files: h = TextFileColumns(f, force_header=force_header, encoding=encoding, fLOG=fLOG) h.open() fh.append([h, iter(h)]) res = open(output, "w", encoding=encoding) nbline = 0 sepline = "\n" # GetSepLine () if isinstance(key, str): key = [key] # start kline = [] for li in fh: try: if li[1] is None: d = None else: d = li[1].__next__() except StopIteration: d = None if d is not None: try: k = tuple([d[k] for k in key]) except KeyError as e: raise Exception("unable to find one column in\n{0}".format( li[0].get_columns())) from e kline.append([k, d] + li) # loop while len(kline) > 0: # minimum mi = None for i, line in enumerate(kline): if mi is None or line[0] < mi: mi = line[0] pos = i # picking line = kline[pos] del kline[pos] # adding d = line[1] if nbline == 0: keys = list(d.keys()) keys.sort() res.write("\t".join(keys) + sepline) val = [str(d[k_]) for k_ in keys] s = "\t".join(val) res.write(s + sepline) nbline += 1 # next try: d = line[-1].__next__() except StopIteration: d = None if d is not None: k = tuple([d[k_] for k_ in key]) kline.append([k, d] + line[2:]) # end for li in fh: li[0].close() res.close()