Coverage for pyquickhelper/filehelper/encryption.py: 92%
93 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
1"""
2@file
3@brief Encryption functionalities.
5Inspired from `AES encryption of files in Python with PyCrypto
6<http://eli.thegreenplace.net/2010/06/25/aes-encryption-of-files-in-python-with-pycrypto>`_
7"""
8import random
9import os
10import struct
11import base64
12from io import BytesIO as StreamIO
15class EncryptionError(Exception):
16 """
17 raised if an issue happen during encryption
18 """
19 pass
22def open_input_output(filename, out_filename=None):
23 """
24 Converts *filename* and *out_filename* as streams.
26 @param filename bytes or filename or BytesIO
27 @param out_filename BytesIO or filename or None
28 @return in_size, in_close, in_stream, out_close, out_return, out_stream
29 """
30 # input
31 typstr = str # unicode #
32 if isinstance(filename, typstr):
33 if not os.path.exists(filename):
34 raise FileNotFoundError(filename)
35 st = open(filename, "rb")
36 close = True
37 filesize = os.path.getsize(filename)
38 elif isinstance(filename, StreamIO):
39 st = filename
40 close = False
41 filesize = len(st.getvalue())
42 else:
43 st = StreamIO(filename)
44 close = False
45 filesize = len(filename)
47 # output
48 if out_filename is None:
49 sto = StreamIO()
50 ret = True
51 out_close = False
52 elif isinstance(out_filename, StreamIO):
53 sto = out_filename
54 ret = False
55 out_close = False
56 else:
57 sto = open(out_filename, "wb")
58 ret = False
59 out_close = True
61 return filesize, close, st, out_close, ret, sto
64def close_input_output(in_size, in_close, in_stream, out_close, out_return, out_stream):
65 """
66 Takes the output of @see fn open_input_output and closes streams
67 and return expected values.
69 @param in_size size of input
70 @param in_close should it close the input stream
71 @param in_stream input stream
72 @param out_close should it closes the output stream
73 @param out_return should it returns something
74 @param out_stream output stream
75 @return None or content of output stream
76 """
77 if in_close:
78 in_stream.close()
80 if out_close:
81 if out_return:
82 raise EncryptionError("incompability")
83 out_stream.close()
85 if out_return:
86 return out_stream.getvalue()
87 else:
88 return None
91def get_encryptor(key, algo="AES", chunksize=2 ** 24, **params):
92 """
93 Returns an encryptor with method encrypt and decrypt.
95 @param key key
96 @param algo AES or fernet
97 @param chunksize Fernet does not allow streaming
98 @param params additional parameters
99 @return encryptor, origsize
100 """
101 if algo == "fernet":
102 from cryptography.fernet import Fernet
103 if hasattr(key, "encode"):
104 # it a string
105 bkey = key.encode()
106 else:
107 bkey = key
108 bkey = base64.b64encode(bkey)
109 encryptor = Fernet(bkey)
110 origsize = None
111 chunksize = None
112 elif algo == "AES":
113 try:
114 from Cryptodome.Cipher import AES
115 except ImportError:
116 from Crypto.Cipher import AES
117 ksize = {16, 32, 64, 128, 256}
118 chunksize = chunksize # pylint: disable=W0127
119 if len(key) not in ksize:
120 raise EncryptionError(
121 f"len(key)=={len(key)} should be of length {str(ksize)}")
122 if "out_stream" in params:
123 iv = bytes([random.randint(0, 0xFF) for i in range(16)])
124 params["out_stream"].write(struct.pack('<Q', params["in_size"]))
125 params["out_stream"].write(iv)
126 encryptor = AES.new(key, AES.MODE_CBC, iv)
127 origsize = params["in_size"]
128 else:
129 origsize = struct.unpack(
130 '<Q', params["in_stream"].read(struct.calcsize('Q')))[0]
131 iv = params["in_stream"].read(16)
132 encryptor = AES.new(key, AES.MODE_CBC, iv) # decryptor
133 else:
134 raise ValueError("unknown algorithm: {0}, should be in {1}".format(
135 algo, ["fernet", "AES"]))
136 return encryptor, origsize, chunksize
139def encrypt_stream(key, filename, out_filename=None, chunksize=2 ** 18, algo="AES"):
140 """
141 Encrypts a file using AES (CBC mode) with the given key.
142 The function relies on module :epkg:`pycrypto`, :epkg:`cryptography`,
143 algoritm `AES <https://fr.wikipedia.org/wiki/Advanced_Encryption_Standard>`_,
144 `Fernet <https://cryptography.io/en/latest/fernet/>`_.
146 @param key The encryption key - a string that must be
147 either 16, 24 or 32 bytes long. Longer keys
148 are more secure. If the data to encrypt is in bytes,
149 the key must be given in bytes too.
151 @param filename bytes or Name of the input file
152 @param out_filename if None, the returns bytes
154 @param chunksize Sets the size of the chunk which the function
155 uses to read and encrypt the file. Larger chunk
156 sizes can be faster for some files and machines.
157 chunksize must be divisible by 16.
159 @param algo AES (PyCryptodomex) of or fernet (cryptography)
161 @return filename or bytes
162 """
164 in_size, in_close, in_stream, out_close, out_return, out_stream = open_input_output(
165 filename, out_filename)
167 encryptor, origsize, chunksize = get_encryptor(
168 key, algo, out_stream=out_stream, in_size=in_size, chunksize=chunksize)
170 while True:
171 chunk = in_stream.read(chunksize)
172 if len(chunk) == 0:
173 break
174 if len(chunk) % 16 != 0 and origsize is not None:
175 chunk += b' ' * (16 - len(chunk) % 16)
177 out_stream.write(encryptor.encrypt(chunk))
179 return close_input_output(in_size, in_close, in_stream, out_close, out_return, out_stream)
182def decrypt_stream(key, filename, out_filename=None, chunksize=3 * 2 ** 13, algo="AES"):
183 """
184 Decrypts a file using AES (CBC mode) with the given key.
185 The function relies on module :epkg:`pycrypto`, :epkg:`cryptography`,
186 algoritm `AES <https://fr.wikipedia.org/wiki/Advanced_Encryption_Standard>`_,
187 `Fernet <https://cryptography.io/en/latest/fernet/>`_.
189 @param key The encryption key - a string that must be
190 either 16, 24 or 32 bytes long. Longer keys
191 are more secure. If the data to encrypt is in bytes,
192 the key must be given in bytes too.
194 @param filename bytes or Name of the input file
195 @param out_filename if None, the returns bytes
197 @param chunksize Sets the size of the chunk which the function
198 uses to read and encrypt the file. Larger chunk
199 sizes can be faster for some files and machines.
200 chunksize must be divisible by 16.
202 @param algo AES (:epkg:`pycryptodomex`) of or fernet (cryptography)
204 @return filename or bytes
205 """
206 in_size, in_close, in_stream, out_close, out_return, out_stream = open_input_output(
207 filename, out_filename)
209 decryptor, origsize, chunksize = get_encryptor(
210 key, algo, in_stream=in_stream, chunksize=chunksize)
212 while True:
213 chunk = in_stream.read(chunksize)
214 if len(chunk) == 0:
215 break
216 out_stream.write(decryptor.decrypt(chunk))
217 out_stream.truncate(origsize)
219 return close_input_output(in_size, in_close, in_stream, out_close, out_return, out_stream)