Coverage for pyquickhelper/filehelper/files_status.py: 76%

134 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-03 02:21 +0200

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief keep the status of a folder, assuming this folder is not moved 

5""" 

6import os 

7import datetime 

8from ..loghelper.flog import noLOG 

9from .file_info import convert_st_date_to_datetime, checksum_md5, FileInfo 

10 

11 

12class FilesStatus: 

13 """ 

14 This class maintains a list of files 

15 and does some verifications in order to check if a file 

16 was modified or not (if yes, then it will be updated to the website). 

17 """ 

18 

19 def __init__(self, file, fLOG=noLOG): 

20 """ 

21 file which will contains the status 

22 @param file file, if None, fill _children 

23 @param fLOG logging function 

24 """ 

25 self._file = file 

26 self.copyFiles = {} 

27 self.fileKeep = file 

28 self.LOG = fLOG 

29 

30 if os.path.exists(self.fileKeep): 

31 with open(self.fileKeep, "r", encoding="utf8") as f: 

32 for ni, _ in enumerate(f.readlines()): 

33 if ni == 0 and _.startswith("\ufeff"): 

34 _ = _[len("\ufeff"):] # pragma: no cover 

35 spl = _.strip("\r\n ").split("\t") 

36 try: 

37 if len(spl) >= 2: 

38 a, b = spl[:2] 

39 obj = FileInfo(a, int(b), None, None, None) 

40 if len(spl) > 2 and len(spl[2]) > 0: 

41 obj.set_date( 

42 convert_st_date_to_datetime(spl[2])) 

43 if len(spl) > 3 and len(spl[3]) > 0: 

44 obj.set_mdate( 

45 convert_st_date_to_datetime(spl[3])) 

46 if len(spl) > 4 and len(spl[4]) > 0: 

47 obj.set_md5(spl[4]) 

48 self.copyFiles[a] = obj 

49 else: 

50 raise ValueError( # pragma: no cover 

51 "expecting a filename and a date on this line: " + _) 

52 except Exception as e: 

53 raise RuntimeError( # pragma: no cover 

54 f"issue with line:\n {_} -- {spl}") from e 

55 

56 # contains all file to update 

57 self.modifiedFile = {} 

58 

59 def __iter__(self): 

60 """ 

61 Iterates on all files stored in the current file, 

62 yield a couple *(filename, FileInfo)*. 

63 """ 

64 for a, b in self.copyFiles.items(): 

65 yield a, b 

66 

67 def iter_modified(self): 

68 """ 

69 Iterates on all modified files yield a 

70 couple *(filename, reason)*. 

71 """ 

72 for a, b in self.modifiedFile: 

73 yield a, b 

74 

75 def save_dates(self, checkfile=None): 

76 """ 

77 Saves the status of the copy. 

78 

79 @param checkfile check the status for file checkfile 

80 """ 

81 typstr = str 

82 if checkfile is None: 

83 checkfile = [] 

84 rows = [] 

85 for k in sorted(self.copyFiles): 

86 obj = self.copyFiles[k] 

87 da = "" if obj.date is None else str(obj.date) 

88 mda = "" if obj.mdate is None else str(obj.mdate) 

89 sum5 = "" if obj.checksum is None else str(obj.checksum) 

90 

91 if k in checkfile and len(da) == 0: 

92 raise ValueError( # pragma: no cover 

93 "There should be a date for file " + k + "\n" + str(obj)) 

94 if k in checkfile and len(mda) == 0: 

95 raise ValueError( # pragma: no cover 

96 "There should be a mdate for file " + k + "\n" + str(obj)) 

97 if k in checkfile and len(sum5) <= 10: 

98 raise ValueError( # pragma: no cover 

99 "There should be a checksum( for file " + k + "\n" + str(obj)) 

100 

101 values = [k, typstr(obj.size), da, mda, sum5] 

102 sval = "%s\n" % "\t".join(values) 

103 if "\tNone" in sval: 

104 raise AssertionError( # pragma: no cover 

105 "This case should happen " + sval + "\n" + str(obj)) 

106 

107 rows.append(sval) 

108 

109 with open(self.fileKeep, "w", encoding="utf8") as f: 

110 for r in rows: 

111 f.write(r) 

112 

113 def has_been_modified_and_reason(self, file): 

114 """ 

115 Returns *(True, reason)* if a file was modified or *(False, None)* if not. 

116 @param file filename 

117 @return *(True, reason)* or *(False, None)* 

118 """ 

119 res = True 

120 reason = None 

121 typstr = str 

122 

123 if file not in self.copyFiles: 

124 reason = "new" 

125 res = True 

126 else: 

127 obj = self.copyFiles[file] 

128 st = os.stat(file) 

129 if st.st_size != obj.size: 

130 reason = f"size {str(st.st_size)} != old size {typstr(obj.size)}" 

131 res = True 

132 else: 

133 ld = obj.mdate 

134 _m = st.st_mtime 

135 d = convert_st_date_to_datetime(_m) 

136 if d != ld: 

137 # dates are different but files might be the same 

138 if obj.checksum is not None: 

139 ch = checksum_md5(file) 

140 if ch != obj.checksum: 

141 reason = "date/md5 %s != old date %s md5 %s != %s" % ( 

142 typstr(ld), typstr(d), obj.checksum, ch) 

143 res = True 

144 else: 

145 res = False 

146 else: 

147 # it cannot know, it does nothing 

148 res = False 

149 else: 

150 # mda.... no expected modification (dates did not change) 

151 res = False 

152 

153 return res, reason 

154 

155 def add_modified_file(self, file, reason): 

156 """ 

157 Adds a file the modified list of files. 

158 

159 @param file file to add 

160 @param reason reason for modification 

161 """ 

162 if file in self.modifiedFile: 

163 raise KeyError(f"file {file} is already present") 

164 self.modifiedFile[file] = reason 

165 

166 def add_if_modified(self, file): 

167 """ 

168 Adds a file to self.modifiedList if it was modified. 

169 @param file filename 

170 @return True or False 

171 """ 

172 res, reason = self.has_been_modified_and_reason(file) 

173 if res: 

174 self.add_modified_file(res, reason) 

175 return res 

176 

177 def difference(self, files, u4=False, nlog=None): 

178 """ 

179 Goes through the list of files and tells which one has changed. 

180 

181 @param files @see cl FileTreeNode 

182 @param u4 @see cl FileTreeNode (changes the output) 

183 @param nlog if not None, print something every ``nlog`` processed files 

184 @return iterator on files which changed 

185 """ 

186 memo = {} 

187 if u4: 

188 nb = 0 

189 for file in files: 

190 memo[file.fullname] = True 

191 if file._file is None: 

192 continue 

193 nb += 1 

194 if nlog is not None and nb % nlog == 0: 

195 self.LOG( # pragma: no cover 

196 "[FileTreeStatus], processed", nb, "files") 

197 

198 full = file.fullname 

199 r, reason = self.has_been_modified_and_reason(full) 

200 if r: 

201 if reason == "new": 

202 r = (">+", file._file, file, None) 

203 yield r 

204 else: 

205 r = (">", file._file, file, None) 

206 yield r 

207 else: 

208 r = ("==", file._file, file, None) 

209 yield r 

210 else: 

211 nb = 0 

212 for file in files: 

213 memo[file.fullpath] = True 

214 nb += 1 

215 if nlog is not None and nb % nlog == 0: 

216 self.LOG("[FileTreeStatus], processed", nb, "files") 

217 full = file.fullname 

218 if self.has_been_modified_and_reason(full): 

219 yield file 

220 

221 for file in self.copyFiles.values(): 

222 if file.filename not in memo: 

223 yield ("<+", file.filename, None, None) 

224 

225 def update_copied_file(self, file, delete=False): 

226 """ 

227 Updates the file in copyFiles (before saving), update all fields. 

228 @param file filename 

229 @param delete to remove this file 

230 @return file object 

231 """ 

232 if delete: 

233 if file not in self.copyFiles: 

234 raise FileNotFoundError( # pragma: no cover 

235 f"Unable to find a file in the list of monitored files: '{file}'.") 

236 del self.copyFiles[file] 

237 return None 

238 st = os.stat(file) 

239 size = st.st_size 

240 mdate = convert_st_date_to_datetime(st.st_mtime) 

241 date = datetime.datetime.now() 

242 md = checksum_md5(file) 

243 obj = FileInfo(file, size, date, mdate, md) 

244 self.copyFiles[file] = obj 

245 return obj