Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" 

2@file 

3@brief Buffer files writing 

4""" 

5import io 

6import os 

7from pyquickhelper.loghelper import noLOG 

8from .helpers_exceptions import FileAlreadyExistingException, FileNotFlushedException 

9 

10 

11class BufferFilesWriting: 

12 """ 

13 this class aims at delaying writing files, 

14 method *open* returns a buffer, 

15 method *flush* actually writes the file 

16 """ 

17 

18 def __init__(self, flush_every=20, fLOG=noLOG): 

19 """ 

20 constructor 

21 

22 @param flush_every flush every 20 created files 

23 """ 

24 self._nb = 0 

25 self._buffer = {} 

26 self._done = set() 

27 self.fLOG = fLOG 

28 self._flush_every = flush_every 

29 

30 def exists(self, name, local=True): 

31 """ 

32 tells if a file was already written 

33 

34 @param name name 

35 @param local check local existence too 

36 @return boolean 

37 """ 

38 return (name in self._done or name in self._buffer or 

39 (local and os.path.exists(name))) 

40 

41 def listfiles(self): 

42 """ 

43 returns the list of flushed and opened files, 

44 does not preserved order 

45 

46 @return list of files 

47 """ 

48 return list(self._done) + list(self._buffer.keys()) 

49 

50 def __len__(self): 

51 """ 

52 return the number of buffered files 

53 """ 

54 return len(self._buffer) 

55 

56 def open(self, name, text=True, encoding="utf8"): 

57 """ 

58 open a file and returns a buffer 

59 

60 @param name filename 

61 @param text text or binary file 

62 @param encoding encoding 

63 @return a buffer 

64 """ 

65 if name is None or name == '': 

66 raise ValueError("name cannot be empty.") 

67 if len(self._buffer) > self._flush_every: 

68 self.flush(None) 

69 if name in self._buffer: 

70 raise FileAlreadyExistingException(name) 

71 if text: 

72 buf = io.StringIO() 

73 else: 

74 buf = io.BytesIO() 

75 

76 self._buffer[name] = (buf, encoding, self._nb) 

77 self._nb += 1 

78 return buf 

79 

80 def read_binary_content(self, name, local=True): 

81 """ 

82 return the content of file (binary format) 

83 

84 @param name name 

85 @param local check local existence too and read the content from it 

86 @return boolean 

87 """ 

88 if name is None or name == '': 

89 raise ValueError("name cannot be empty.") 

90 if name in self._buffer: 

91 content = self._buffer[name][0].getvalue() 

92 if isinstance(content, str): 

93 content = bytes(content, self._buffer[name][1]) 

94 else: 

95 with open(name, "rb") as f: 

96 content = f.read() 

97 return content 

98 

99 def flush(self, name, upto=False): 

100 """ 

101 flush a file (actually write it) and make it disappear from the list of buffered files, 

102 if the folder does not exists, the method creates it 

103 

104 @param name file name (or None for all) 

105 @param upto flush all files up to this one 

106 @return number of written bytes 

107 """ 

108 if name is None: 

109 size = 0 

110 keys = list(self._buffer.keys()) 

111 for name_ in keys: 

112 size += self.flush(name_) 

113 return size 

114 else: 

115 if name not in self._buffer: 

116 raise FileNotFoundError( 

117 name + "\nEXISTING\n" + "\n".join(self.listfiles())) 

118 if upto: 

119 n = self._buffer[name][2] 

120 names = [name for name, v in self._buffer.items() if v[2] <= n] 

121 size = 0 

122 for name_ in names: 

123 size += self.flush(name_) 

124 return size 

125 else: 

126 fold = os.path.dirname(name) 

127 if fold is None or fold == '': 

128 raise RuntimeError( 

129 "Folder cannot be empty for file '{0}'".format(name)) 

130 if not os.path.exists(fold): 

131 os.makedirs(fold) 

132 buf = self._buffer[name][0] 

133 if isinstance(buf, io.StringIO): 

134 b = buf.getvalue() 

135 with open(name, "w", encoding=self._buffer[name][1]) as f: 

136 f.write(b) 

137 else: 

138 b = buf.getvalue() 

139 with open(name, "wb") as f: 

140 f.write(b) 

141 self.fLOG( 

142 "[BufferFilesWriting.flush] write '{0}'".format(name)) 

143 del self._buffer[name] 

144 self._done.add(name) 

145 return len(b) 

146 

147 def __del__(self): 

148 """ 

149 destructor, check everything was flushed 

150 """ 

151 if len(self._buffer) > 0: 

152 raise FileNotFlushedException( 

153 "\n".join(sorted(self._buffer.keys())))