Coverage for src/lightmlrestapi/mlapp/base_logging.py: 95%
74 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-06 07:16 +0200
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-06 07:16 +0200
1"""
2@file
3@brief Machine Learning Post request
4"""
5from datetime import datetime
6import os
7import json
8from time import perf_counter
9import logging
10from logging import Formatter
11from logging.handlers import TimedRotatingFileHandler
12from ..tools import json_loads, json_dumps
15class BaseLogging:
16 """
17 Simplifies logging. Logging is encrypted
18 with module :epkg:`pyjwt` if *secret* is specified.
19 """
21 def __init__(self, secret, folder='.', level=logging.INFO, encoding='utf-8', when='d'):
22 """
23 @param secret secret for encryption (None to avoid encryption)
24 @param folder folder where to write the logs (None to disable the logging)
25 @param level logging level
26 @param when when rotating the logs,
27 see `TimedRotatingFileHandler
28 <https://docs.python.org/3/library/logging.handlers.html?
29 highlight=streamhandler#logging.handlers.TimedRotatingFileHandler>`_
30 @param encoding encoding
31 """
32 if folder is None:
33 self.logger = None
34 else:
35 current_time = datetime.now()
36 current_date = current_time.strftime("%Y-%m-%d")
37 filename = "{0}-{1}.log".format(
38 self.__class__.__name__, current_date)
39 file_location = os.path.join(folder, filename)
40 form = Formatter('%(asctime)s,%(levelname)s,%(message)s,%(data)s')
41 self.handler = TimedRotatingFileHandler(
42 file_location, encoding=encoding, delay=True, when=when)
43 self.handler.setFormatter(form)
44 self.logger = logging.getLogger(self.__class__.__name__)
45 self.logger.setLevel(level)
46 self.logger.addHandler(self.handler)
47 self.secret = secret
48 if self.secret is not None:
49 import jwt
50 self.jwt = jwt
52 def save_time(self):
53 """
54 Saves the times to get a duration later.
55 """
56 self.t1 = perf_counter()
58 def duration(self):
59 """
60 Get the duration since @see me save_time was called.
61 """
62 return perf_counter() - self.t1
64 def info(self, msg, data):
65 """
66 Logs any king of data into the logs.
68 @param msg message
69 @param data data to log
70 """
71 self._logerrorinfo('info', msg, data)
73 def error(self, msg, data):
74 """
75 Logs any king of data into the logs.
77 @param msg message
78 @param data data to log
79 """
80 self._logerrorinfo('error', msg, data)
82 def _logerrorinfo(self, fct, msg, data):
83 """
84 Logs any king of data into the logs.
86 @param msg message
87 @param data data to log
88 """
89 if self.logger is not None:
90 if self.secret is None:
91 try:
92 dumped = json_dumps(data)
93 except Exception: # pylint: disable=W0703
94 # Cannot serialize into json.
95 dumped = str(data)
96 extra = dict(data=dumped)
97 else:
98 enc_data = self.jwt.encode(
99 data, self.secret, algorithm='HS256')
100 extra = dict(data=enc_data)
101 if fct == 'info':
102 self.logger.info(msg, extra=extra)
103 else:
104 self.logger.error(msg, extra=extra)
107def enumerate_parsed_logs(folder, secret, encoding='utf-8'):
108 """
109 Goes through a list of logged files,
110 reads and decrypts the content.
112 @param folder folder which contains the logs
113 @param secret secret
114 @param encoding encoding
115 @return iterator on decrypted data
116 """
117 if secret is not None:
118 import jwt
119 for root, _, files in os.walk(folder):
120 for name in files:
121 full = os.path.join(root, name)
122 with open(full, 'r', encoding=encoding) as f:
123 for i, line in enumerate(f):
124 spl = line.rstrip('\n\r').split(',')
125 if secret is None:
126 dt = datetime.strptime(spl[0], '%Y-%m-%d %H:%M:%S')
127 data = ','.join(spl[4:])
128 try:
129 data = json_loads(data)
130 except ValueError:
131 # json gives better error messages.
132 data = json.loads(data)
133 # data should be a dictionary saved as a string
134 rec = dict(dt=dt, code=spl[1], level=spl[2],
135 msg=spl[3], data=data)
136 yield rec
137 else:
138 if len(spl) != 5:
139 raise ValueError(
140 "Format issue in\n File \"{0}\", line {1}".format(full, i + 1))
141 if spl[4].startswith("b'") and spl[4].endswith("'"):
142 data = spl[4][2:-1]
143 else:
144 data = spl[4]
145 dec = jwt.decode(data, secret, algorithms=['HS256'])
146 dt = datetime.strptime(spl[0], '%Y-%m-%d %H:%M:%S')
147 rec = dict(dt=dt, code=spl[1], level=spl[2],
148 msg=spl[3], data=dec)
149 yield rec