Coverage for pyquickhelper/filehelper/encryption.py: 92%

93 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-03 02:21 +0200

1""" 

2@file 

3@brief Encryption functionalities. 

4 

5Inspired from `AES encryption of files in Python with PyCrypto 

6<http://eli.thegreenplace.net/2010/06/25/aes-encryption-of-files-in-python-with-pycrypto>`_ 

7""" 

8import random 

9import os 

10import struct 

11import base64 

12from io import BytesIO as StreamIO 

13 

14 

15class EncryptionError(Exception): 

16 """ 

17 raised if an issue happen during encryption 

18 """ 

19 pass 

20 

21 

22def open_input_output(filename, out_filename=None): 

23 """ 

24 Converts *filename* and *out_filename* as streams. 

25 

26 @param filename bytes or filename or BytesIO 

27 @param out_filename BytesIO or filename or None 

28 @return in_size, in_close, in_stream, out_close, out_return, out_stream 

29 """ 

30 # input 

31 typstr = str # unicode # 

32 if isinstance(filename, typstr): 

33 if not os.path.exists(filename): 

34 raise FileNotFoundError(filename) 

35 st = open(filename, "rb") 

36 close = True 

37 filesize = os.path.getsize(filename) 

38 elif isinstance(filename, StreamIO): 

39 st = filename 

40 close = False 

41 filesize = len(st.getvalue()) 

42 else: 

43 st = StreamIO(filename) 

44 close = False 

45 filesize = len(filename) 

46 

47 # output 

48 if out_filename is None: 

49 sto = StreamIO() 

50 ret = True 

51 out_close = False 

52 elif isinstance(out_filename, StreamIO): 

53 sto = out_filename 

54 ret = False 

55 out_close = False 

56 else: 

57 sto = open(out_filename, "wb") 

58 ret = False 

59 out_close = True 

60 

61 return filesize, close, st, out_close, ret, sto 

62 

63 

64def close_input_output(in_size, in_close, in_stream, out_close, out_return, out_stream): 

65 """ 

66 Takes the output of @see fn open_input_output and closes streams 

67 and return expected values. 

68 

69 @param in_size size of input 

70 @param in_close should it close the input stream 

71 @param in_stream input stream 

72 @param out_close should it closes the output stream 

73 @param out_return should it returns something 

74 @param out_stream output stream 

75 @return None or content of output stream 

76 """ 

77 if in_close: 

78 in_stream.close() 

79 

80 if out_close: 

81 if out_return: 

82 raise EncryptionError("incompability") 

83 out_stream.close() 

84 

85 if out_return: 

86 return out_stream.getvalue() 

87 else: 

88 return None 

89 

90 

91def get_encryptor(key, algo="AES", chunksize=2 ** 24, **params): 

92 """ 

93 Returns an encryptor with method encrypt and decrypt. 

94 

95 @param key key 

96 @param algo AES or fernet 

97 @param chunksize Fernet does not allow streaming 

98 @param params additional parameters 

99 @return encryptor, origsize 

100 """ 

101 if algo == "fernet": 

102 from cryptography.fernet import Fernet 

103 if hasattr(key, "encode"): 

104 # it a string 

105 bkey = key.encode() 

106 else: 

107 bkey = key 

108 bkey = base64.b64encode(bkey) 

109 encryptor = Fernet(bkey) 

110 origsize = None 

111 chunksize = None 

112 elif algo == "AES": 

113 try: 

114 from Cryptodome.Cipher import AES 

115 except ImportError: 

116 from Crypto.Cipher import AES 

117 ksize = {16, 32, 64, 128, 256} 

118 chunksize = chunksize # pylint: disable=W0127 

119 if len(key) not in ksize: 

120 raise EncryptionError( 

121 f"len(key)=={len(key)} should be of length {str(ksize)}") 

122 if "out_stream" in params: 

123 iv = bytes([random.randint(0, 0xFF) for i in range(16)]) 

124 params["out_stream"].write(struct.pack('<Q', params["in_size"])) 

125 params["out_stream"].write(iv) 

126 encryptor = AES.new(key, AES.MODE_CBC, iv) 

127 origsize = params["in_size"] 

128 else: 

129 origsize = struct.unpack( 

130 '<Q', params["in_stream"].read(struct.calcsize('Q')))[0] 

131 iv = params["in_stream"].read(16) 

132 encryptor = AES.new(key, AES.MODE_CBC, iv) # decryptor 

133 else: 

134 raise ValueError("unknown algorithm: {0}, should be in {1}".format( 

135 algo, ["fernet", "AES"])) 

136 return encryptor, origsize, chunksize 

137 

138 

139def encrypt_stream(key, filename, out_filename=None, chunksize=2 ** 18, algo="AES"): 

140 """ 

141 Encrypts a file using AES (CBC mode) with the given key. 

142 The function relies on module :epkg:`pycrypto`, :epkg:`cryptography`, 

143 algoritm `AES <https://fr.wikipedia.org/wiki/Advanced_Encryption_Standard>`_, 

144 `Fernet <https://cryptography.io/en/latest/fernet/>`_. 

145 

146 @param key The encryption key - a string that must be 

147 either 16, 24 or 32 bytes long. Longer keys 

148 are more secure. If the data to encrypt is in bytes, 

149 the key must be given in bytes too. 

150 

151 @param filename bytes or Name of the input file 

152 @param out_filename if None, the returns bytes 

153 

154 @param chunksize Sets the size of the chunk which the function 

155 uses to read and encrypt the file. Larger chunk 

156 sizes can be faster for some files and machines. 

157 chunksize must be divisible by 16. 

158 

159 @param algo AES (PyCryptodomex) of or fernet (cryptography) 

160 

161 @return filename or bytes 

162 """ 

163 

164 in_size, in_close, in_stream, out_close, out_return, out_stream = open_input_output( 

165 filename, out_filename) 

166 

167 encryptor, origsize, chunksize = get_encryptor( 

168 key, algo, out_stream=out_stream, in_size=in_size, chunksize=chunksize) 

169 

170 while True: 

171 chunk = in_stream.read(chunksize) 

172 if len(chunk) == 0: 

173 break 

174 if len(chunk) % 16 != 0 and origsize is not None: 

175 chunk += b' ' * (16 - len(chunk) % 16) 

176 

177 out_stream.write(encryptor.encrypt(chunk)) 

178 

179 return close_input_output(in_size, in_close, in_stream, out_close, out_return, out_stream) 

180 

181 

182def decrypt_stream(key, filename, out_filename=None, chunksize=3 * 2 ** 13, algo="AES"): 

183 """ 

184 Decrypts a file using AES (CBC mode) with the given key. 

185 The function relies on module :epkg:`pycrypto`, :epkg:`cryptography`, 

186 algoritm `AES <https://fr.wikipedia.org/wiki/Advanced_Encryption_Standard>`_, 

187 `Fernet <https://cryptography.io/en/latest/fernet/>`_. 

188 

189 @param key The encryption key - a string that must be 

190 either 16, 24 or 32 bytes long. Longer keys 

191 are more secure. If the data to encrypt is in bytes, 

192 the key must be given in bytes too. 

193 

194 @param filename bytes or Name of the input file 

195 @param out_filename if None, the returns bytes 

196 

197 @param chunksize Sets the size of the chunk which the function 

198 uses to read and encrypt the file. Larger chunk 

199 sizes can be faster for some files and machines. 

200 chunksize must be divisible by 16. 

201 

202 @param algo AES (:epkg:`pycryptodomex`) of or fernet (cryptography) 

203 

204 @return filename or bytes 

205 """ 

206 in_size, in_close, in_stream, out_close, out_return, out_stream = open_input_output( 

207 filename, out_filename) 

208 

209 decryptor, origsize, chunksize = get_encryptor( 

210 key, algo, in_stream=in_stream, chunksize=chunksize) 

211 

212 while True: 

213 chunk = in_stream.read(chunksize) 

214 if len(chunk) == 0: 

215 break 

216 out_stream.write(decryptor.decrypt(chunk)) 

217 out_stream.truncate(origsize) 

218 

219 return close_input_output(in_size, in_close, in_stream, out_close, out_return, out_stream)