Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2@file
3@brief Buffer files writing
4"""
5import io
6import os
7from pyquickhelper.loghelper import noLOG
8from .helpers_exceptions import FileAlreadyExistingException, FileNotFlushedException
11class BufferFilesWriting:
12 """
13 this class aims at delaying writing files,
14 method *open* returns a buffer,
15 method *flush* actually writes the file
16 """
18 def __init__(self, flush_every=20, fLOG=noLOG):
19 """
20 constructor
22 @param flush_every flush every 20 created files
23 """
24 self._nb = 0
25 self._buffer = {}
26 self._done = set()
27 self.fLOG = fLOG
28 self._flush_every = flush_every
30 def exists(self, name, local=True):
31 """
32 tells if a file was already written
34 @param name name
35 @param local check local existence too
36 @return boolean
37 """
38 return (name in self._done or name in self._buffer or
39 (local and os.path.exists(name)))
41 def listfiles(self):
42 """
43 returns the list of flushed and opened files,
44 does not preserved order
46 @return list of files
47 """
48 return list(self._done) + list(self._buffer.keys())
50 def __len__(self):
51 """
52 return the number of buffered files
53 """
54 return len(self._buffer)
56 def open(self, name, text=True, encoding="utf8"):
57 """
58 open a file and returns a buffer
60 @param name filename
61 @param text text or binary file
62 @param encoding encoding
63 @return a buffer
64 """
65 if name is None or name == '':
66 raise ValueError("name cannot be empty.")
67 if len(self._buffer) > self._flush_every:
68 self.flush(None)
69 if name in self._buffer:
70 raise FileAlreadyExistingException(name)
71 if text:
72 buf = io.StringIO()
73 else:
74 buf = io.BytesIO()
76 self._buffer[name] = (buf, encoding, self._nb)
77 self._nb += 1
78 return buf
80 def read_binary_content(self, name, local=True):
81 """
82 return the content of file (binary format)
84 @param name name
85 @param local check local existence too and read the content from it
86 @return boolean
87 """
88 if name is None or name == '':
89 raise ValueError("name cannot be empty.")
90 if name in self._buffer:
91 content = self._buffer[name][0].getvalue()
92 if isinstance(content, str):
93 content = bytes(content, self._buffer[name][1])
94 else:
95 with open(name, "rb") as f:
96 content = f.read()
97 return content
99 def flush(self, name, upto=False):
100 """
101 flush a file (actually write it) and make it disappear from the list of buffered files,
102 if the folder does not exists, the method creates it
104 @param name file name (or None for all)
105 @param upto flush all files up to this one
106 @return number of written bytes
107 """
108 if name is None:
109 size = 0
110 keys = list(self._buffer.keys())
111 for name_ in keys:
112 size += self.flush(name_)
113 return size
114 else:
115 if name not in self._buffer:
116 raise FileNotFoundError(
117 name + "\nEXISTING\n" + "\n".join(self.listfiles()))
118 if upto:
119 n = self._buffer[name][2]
120 names = [name for name, v in self._buffer.items() if v[2] <= n]
121 size = 0
122 for name_ in names:
123 size += self.flush(name_)
124 return size
125 else:
126 fold = os.path.dirname(name)
127 if fold is None or fold == '':
128 raise RuntimeError(
129 "Folder cannot be empty for file '{0}'".format(name))
130 if not os.path.exists(fold):
131 os.makedirs(fold)
132 buf = self._buffer[name][0]
133 if isinstance(buf, io.StringIO):
134 b = buf.getvalue()
135 with open(name, "w", encoding=self._buffer[name][1]) as f:
136 f.write(b)
137 else:
138 b = buf.getvalue()
139 with open(name, "wb") as f:
140 f.write(b)
141 self.fLOG(
142 "[BufferFilesWriting.flush] write '{0}'".format(name))
143 del self._buffer[name]
144 self._done.add(name)
145 return len(b)
147 def __del__(self):
148 """
149 destructor, check everything was flushed
150 """
151 if len(self._buffer) > 0:
152 raise FileNotFlushedException(
153 "\n".join(sorted(self._buffer.keys())))