Source code for pyensae.sql.file_text_binary

"""
contains a class which opens a text file as a binary file.


:githublink:`%|py|6`
"""


import re
import os
import math
import time
import decimal
from pyquickhelper.loghelper import noLOG
from .type_helpers import guess_type_value


[docs]class TextFile: """ This class opens a text file as if it were a binary file. It can deal with null characters which are missed by open function. .. list-table:: :widths: auto :header-rows: 1 * - attribute - meaning * - filename - file name * - errors - decoding in utf8 can raise some errors, see `str <https://docs.python.org/3.4/library/stdtypes.html?highlight=str#str>`_ to understand the meaning of this parameter * - LOG - logging function * - _buffer_size - read a text file _buffer_size bytes each time * - _filter - function filter, None or return True or False whether a line should considered or not * - _encoding - encoding Example: :: f = TextFile(filename) f.open () for line in f : print line f.close () :githublink:`%|py|41` """ _split_expr = re.compile("\\r?\\t", re.U) _sep_available = "\t;,| "
[docs] def __init__(self, filename, errors=None, fLOG=noLOG, buffer_size=2 ** 20, filter=None, separated=False, encoding="utf-8"): """ :param filename: filename :param errors: see str (errors = ...) :param fLOG: LOG function, see `fLOG <http://www.xavierdupre.fr/app/pyquickhelper/ helpsphinx/pyquickhelper/loghelper/flog.html#pyquickhelper.loghelper.flog.fLOG>`_ :param buffer_size: buffer_size (mostly use to test the reading function) :param filter: None if there is no filter, otherwise it is a function which takes a list and returns a boolean which tells if the line must considered or not :param separated: if True, the line returned by the iterator are splitted by the most probable separator :githublink:`%|py|57` """ self.filename = filename self._encoding = encoding self.errors = errors self.LOG = fLOG self._buffer_size = buffer_size self._filter = filter self._separated = separated
[docs] def open(self): """ Opens the file in reading mode. :githublink:`%|py|69` """ self.LOG(" TextFile: opening file ", self.filename) if self._separated: res = self.guess_columns() self.LOG(" TextFile: guessed: ", res) sep = res[2] self._separated_value = sep self._f = open(self.filename, "r", encoding=self._encoding) self._nbline = 0 self._read = 0
[docs] def close(self): """ Closes the file. :githublink:`%|py|84` """ self._f.close() self.LOG(" TextFile: closing file ", self.filename) del self.__dict__["_f"]
[docs] def get_nb_readlines(self): """ Returns the number of read lines. :githublink:`%|py|92` """ return self._nbline
[docs] def get_nb_readbytes(self): """ Returns the number of read bytes. :githublink:`%|py|98` """ return self._nbline
[docs] def readlines(self): """ Extracts all the lines, the file must not be opened through method open ``\\n`` are removed. :githublink:`%|py|106` """ self.open() res = [] for line in self: li = line.strip("\r\n") res.append(li) self.close() return res
[docs] def __iter__(self): """ Iterator :: f = open('...', 'r') for line in f : ... f.close () :return: a str string :githublink:`%|py|127` """ if "_f" not in self.__dict__: raise Exception("file %s is not opened." % self.filename) filesize = os.stat(self.filename).st_size size = self._buffer_size blin = self._f.read(size) self._read = len(blin) if blin.startswith("\xef\xbb\xbf"): self.LOG(" removing the three first character u'\\xef\\xbb\\xbf'") blin = blin[3:] if blin.startswith("\ufeff"): self.LOG(" removing the three first character u'\\ufeff'") blin = blin[len("\ufeff"):] endline = "\n" endchar = "\r" begin = 0 sel = 0 tim = time.perf_counter() while len(blin) > 0: pos = blin.find(endline, begin) if pos == -1: pos = blin.find(endchar, begin) if begin == 0 and pos != -1: self.LOG(" problem in file ", self.filename) self.LOG(" the line separator is not \\n but \\r") while pos == -1: if begin > 0: blin = blin[begin:] begin = 0 temp = self._f.read(size) self._read += len(temp) blin += temp pos = blin.find("\n") if pos == -1: pos = blin.find("\r", begin) if len(temp) == 0 and pos == -1: pos = len(blin) temp = blin[begin:pos] line = str(temp) begin = pos + 1 tim2 = time.perf_counter() if tim2 - tim > 60: tim = tim2 ratio = float(self._read) / filesize * 100 self.LOG( " processing line ", self._nbline, " read bytes ", self._read, " sel ", sel, " ratio %2.2f" % ratio, "%") r = line if self._encoding == "utf-8": r = r.rstrip(endchar) if self._filter is None or self._filter(r): if self._separated: yield r.split(self._separated_value) else: yield r self._nbline += 1
[docs] def _load(self, filename, this_column, file_column, prefix, **param): """ load... :githublink:`%|py|203` """ f = TextFile(filename, fLOG=self.LOG, encoding=self._encoding, **param) f.open() cont = {} for line in f: if f.get_nb_readlines() == 0: columns = self._interpret_columns(line) else: col = self._interpret(line) key = col[columns[file_column]] cont[key] = col f.close() return cont, columns, this_column, file_column, prefix
[docs] def _interpret_columns(self, line): """ Interprets the first line which contains the columns name. :param line: string :return: dictionary { name:position } :githublink:`%|py|222` """ col = self._interpret(line) res = {} for i in range(0, len(col)): res[col[i]] = i return res
[docs] def _interpret(self, line): """ Splits a line into a list, separator ``\\t``. :param line: string :return: list :githublink:`%|py|235` """ col = TextFile._split_expr.split(line.strip(" \r\n")) return col
[docs] def join(self, definition, output, missing_value="", unique=None, **param): """ Joins several files together. :param definition: list of triplets: filename, this_column, file_column, prefix :param output: if None, return the results as a list, otherwise save it into output :param param: parameter used to open files :param missing_value: specify a value for the missing values :param unique: if unique is a column name, do not process a line whose value has already been processed, None otherwise :return: columns, matrix or number of of missing values We assume that every file starts with header giving columns names. The function associates *this_column* value to *file_column* and appends all the columns from filename with a prefix. We also assumes values in file_column are unique. :githublink:`%|py|255` """ if output is not None: output = open(output, "w", encoding=self._encoding) files = [] for i, tu in enumerate(definition): if len(tu) == 2: a, b = tu c = b d = "f%d_" % (i + 1) elif len(tu) == 3: a, b, c = tu d = "f%d_" % (i + 1) elif len(tu) == 4: a, b, c, d = tu else: raise ValueError( "definition must contain tuple (size, 2, 3 ,4), not {0}".format(tu)) files.append(self._load(a, b, c, d, **param)) res = [] miss = 0 uniquekey = {} self.open() for line in self: if self.get_nb_readlines() == 0: columns = self._interpret_columns(line) oldnb = len(columns) last = max(columns.values()) + 1 for file in files: col = file[1] pre = file[-1] for k, v in col.items(): columns[pre + k] = last + v last += len(col) linecol = ["" for c in columns] for k, v in columns.items(): linecol[v] = k if output is None: res.append(linecol) else: output.write("\t".join(linecol) + "\n") s1 = len(linecol) s2 = oldnb for f in files: s2 += len(f[1]) if s1 != s2: mes = "size problem %d != " % (s1) mes += " + ".join([str(x) for x in [oldnb, ] + [len(f[1]) for f in files]]) raise Exception(mes) else: col = self._interpret(line) if unique is not None: key = columns[unique] val = col[key] if val in uniquekey: uniquekey[val] += 1 continue uniquekey[val] = 1 if len(col) != oldnb: col.extend(["" for i in range(0, oldnb - len(col))]) if len(col) != oldnb: mes = "line %d: problem len(col) = %d and oldnb = %d\n%s" % ( self.get_nb_readlines(), len(col), oldnb, repr(line)) raise Exception(mes) for file in files: cont = file[0] c = file[1] this_key = col[columns[file[2]]] if this_key in cont: val = cont[this_key] if len(val) == 0 or (len(val) == 1 and len(val[0]) == 0): # empty line continue if len(val) != len(c): ll = self.get_nb_readlines() mes = "line %d: problem len(val) = %d and len (c) = %d\n\"%s\"" % ( ll, len(val), len(c), file) raise Exception(mes) else: val = [missing_value for k in c] miss += len(val) col.extend(val) if len(col) != len(columns): vals = list(set(col)) if vals == ['']: continue mes = "problem 1 with line %d\n" % self.get_nb_readlines() mes += "len (col) = %d len (columns) = %d" % (len(col), len(columns)) raise Exception(mes) if len(("\t".join(col)).split("\t")) != len(col): mes = "problem 2 with line %d\n" % self.get_nb_readlines() mes += "len (col) = %d len (columns) = %d" % ( len(("\t".join(col)).split("\t")), len(columns)) raise Exception(mes) if output is None: res.append(col) else: output.write("\t".join(col) + "\n") if output is None: return res else: output.close() return miss
[docs] def _count_s(self, car): """ Returns the number of every character in car. :githublink:`%|py|376` """ res = {} for i, c in enumerate(car): if c in res: res[c] += 1 else: res[c] = 1 return res
[docs] def _get_type(self, s): """ Guesses the type of value s. :githublink:`%|py|388` """ return guess_type_value(s)
[docs] def guess_columns(self, nb=100, force_header=False, changes=None, force_noheader=False, fields=None, regex=None, force_sep=None, mistake=3): """ Guesses the columns type. :param nb: number of lines to have a look to in order to find all the necessary elements :param force_header: impose a header whether it is detect or not :param changes: modify some column names, example { "query":"query___" } :param force_noheader: there is no header at all :param fields: name of the columns if there is no header (instead of c000, c001...) :param regex: if the default expression for a field is not the expected one, change by looking into regex :param force_sep: force the separator to be the one chosen by the user (None by default) :param mistake: not more than mistake conversion in numbers are allowed :return: 4-tuple, see below Returned result is a 4 t-uple: - True or False: presence of a header (it means there is at least one numerical column) - column definition ``{ position : (name, type) }`` or ``{ position : (name, (str, max_length*2)) }`` - separator - regex which allow the user to extract information from the file The column separator is looked into ``, | ; \\t`` .. warning:: The file must not be opened, it will be several times. :githublink:`%|py|417` """ if changes is None: changes = {} if regex is None: regex = {} self.LOG(" TextFile.guess_columns: processing file ", self.filename) endlinechar = "\n " # n lines temp = TextFile(self.filename, encoding=self._encoding, fLOG=self.LOG) lines = [] temp.open() for line in temp: line = line.strip(endlinechar) if len(line) == 0: continue lines.append(line) if len(lines) > nb: break self.LOG(" TextFile.guess_columns: using ", len(lines), " lines") temp.close() # guess the separation sep = TextFile._sep_available if force_sep not in (None, force_sep): sep += force_sep h = {} mx = 0 for line in lines: co = self._count_s(line) for s in sep: n = co.get(s, 0) if n == 0: continue k = s, n if k not in h: h[k] = 1 else: h[k] += 1 mx = max(n, mx) mx += 1 best = None iner = None for c in sep: m = {} z = 0 for k in range(mx): if (c, k) in h: m[k] = h[c, k] z += k * m[k] if len(m) == 0: continue g = max(sum(m.values()), len(lines)) if z < max(len(lines) * 9 / 10, 1): continue for k in m: m[k] = float(m[k]) / g s = 0.0 for k in m: s += m[k] * math.log(m[k]) if iner is None or s > iner: iner = s best = c bestsep = best if force_sep is not None and bestsep != force_sep: self.LOG( " TextFile.guess_columns: changes the separator", repr(force_sep)) bestsep = force_sep bestcol = 0 bestnb = 0 for k in range(mx): if (bestsep, k) in h: if bestnb < h[bestsep, k]: bestnb = h[bestsep, k] bestcol = k + 1 self.LOG(" TextFile.guess_columns: sep ", repr(bestsep), "nb cols", bestcol, " bestnb ", bestnb, " more ", h) # determine the type of every column h = {} for line in lines: cols = line.split(bestsep) for i in range(len(cols)): ty = self._get_type(cols[i]) k = i, ty if k not in h: h[k] = 1 else: h[k] += 1 columns = {} for a in h: k, t = a if k >= bestcol: continue if k not in columns: columns[k] = (t, h[a]) elif h[a] > columns[k][1]: columns[k] = (t, h[a]) for pos in columns: # int and float corrections if columns[pos][0] == int and h.get((pos, float), 0) > 0: self.LOG( " changing column type ", pos, columns[pos], " into ", float) columns[pos] = (float, h[pos, float] + h[pos, int]) su = h.get((pos, str), 0) if (columns[pos][0] == int or columns[pos][0] == float or columns[ pos][0] == decimal.Decimal) and su > mistake: self.LOG( " changing column type ", pos, columns[pos], " into ", str, " mistakes ", su, " > ", mistake) columns[pos] = (str, columns[pos][1] + su) # header or not mat = 0 no = 0 cols = lines[0].split(bestsep) for i, c in enumerate(cols): t = self._get_type(c) e = columns.get(i, (str, 0))[0] if e != str: if t == e: mat += 1 else: no += 1 header = not force_noheader and (force_header or (no > mat)) # determine the column name if header: names = lines[0].split(bestsep) del lines[0] if len(names) != bestcol: raise Exception( "unable to continue: the header does not contain the same number of columns %s != %s" % (len(names), bestcol)) elif fields is not None: if len(fields) != bestcol: raise Exception( "the number of fields (%d) is different of the number of columns found in the file %d" % (len(fields), bestcol)) names = fields else: hhhh, _ = 0, bestcol while _ > 0: hhhh, _ = hhhh, _ / 10 # pylint: disable=W0127 format = "c%0" + str(hhhh) + "d" names = [format % i for i in range(bestcol)] for k in columns: if k >= len(names): raise Exception( "incoherence in the file being read: %d >= %d: " % (k, len(names)) + repr(names) + "\n" + repr(columns)) columns[k] = (changes.get(names[k], names[k]), columns[k][0]) self.LOG( " TextFile.guess_columns: header ", header, " columns ", columns) coy = columns.copy() # end exp = self._build_regex(bestsep, columns, regex=regex) self.LOG(" TextFile.guess_columns: regex ", exp) # determines the length of columns length = {} no = 0 for line in lines: spl = line.split(bestsep) if len(spl) != len(columns): continue no += 1 for i, c in enumerate(spl): vl = length.get(i, 0) if vl < len(c): length[i] = len(c) for c in columns: v = columns[c] if v[1] == str and c in length and length[c] > 0: v = (v[0], (v[1], length[c] * 2)) columns[c] = v if coy != columns: self.LOG( " TextFile.guess_columns: header ", header, " columns ", columns) return header, columns, bestsep, exp
[docs] def count_rejected_lines(self, header, exp, output=None): """ Counts the number of rejected lines by regular expression exp. :param header: header or not in the first line :param exp: regular expression :param output: if != None, output is a stream which will receive the unrecognized line (see below) :return: nb_accepted, nb rejected Format for the file containing the unrecognized lines:: line number \t line :githublink:`%|py|650` """ if isinstance(exp, str): exp = re.compile(exp, re.U) acc, rej = 0., 0. temp = TextFile(self.filename, fLOG=self.LOG, encoding=self._encoding) temp.open() nb = 0 for line in temp: nb += 1 if header and acc + rej == 0: header = False continue if len(line) == 0: continue r = exp.search(line) if r: acc += 1 else: rej += 1 if output is not None: output.write("%d\t%s\n" % (nb - 1, line)) temp.close() return acc, rej
_build_regex_default_value_types = {int: "([-]?[1-9][0-9]*?)|(0?)", decimal.Decimal: "([-]?[1-9][0-9]*?L?)|(0?)", float: "[-]?[0-9]*?([.][0-9]*?)?([eE][-]?[0-9]{0,4})?", str: ".*"}
[docs] def _build_regex(self, sep, columns, # pylint: disable=W0102 exp=_build_regex_default_value_types, # pylint: disable=W0102 nomore=False, regex=None): """ Builds a regular expression. :param sep: separator :param columns: columns definition :param exp: regular expression associated to each type, (see below for the default value) :param nomore: private argument, no more try, not possible to simplify :param regex: if the default expression for a field is not the expected one, look into regex if there is one :return: regex Default value for ``exp``:: { int: "([-]?[1-9][0-9]*?)|(0?)", decimal.Decimal: "([-]?[1-9][0-9]*?L?)|(0?)", float: "[-]?[0-9]*?([.][0-9]*?)?([eE][-]?[0-9]{0,4})?", str: ".*" } :githublink:`%|py|703` """ if regex is None: regex = {} mx = max(columns.keys()) + 1 res = [None for i in range(mx)] for k, v in columns.items(): t = v[1] if t not in exp: raise Exception("unknown type %s" % str(t)) nv0 = v[0].strip() if nv0 in regex: res[k] = (nv0, regex[nv0]) else: res[k] = (nv0, exp[t]) for c in res: if " " in c[0]: raise ValueError( "Accents are not allowed for column names: {0}".format(c)) res = ["(?P<%s>%s)" % c for c in res] if sep == "\t": sep = "\\t" final = "^%s$" % sep.join(res) try: self.LOG(" compiling", final) exp = re.compile(final) return final except Exception as e: if "but this version only supports 100 named groups" in str(e): self.LOG( " problem with expression (more than 100 groups) ", final) if nomore: if "bad character in group name" in str(e): reg = re.compile("?P<(.*?)>") all = reg.findall(final) s = ",".join(all) raise RuntimeError( # pylint: disable=W0707 "this expression does not compile (%r), pattern %r, columns %r" % (str(e), final, s)) raise RuntimeError( # pylint: disable=W0707 "This expression does not compile (%r), pattern %r" % (str(e), final)) exp = {int: "[-]?[0-9]*?", float: "[0-9.eE]*?", str: ".*"} return self._build_regex(sep, columns, exp, True)