Coverage for src/lib/mail.py: 61%
322 statements
« prev ^ index » next coverage.py v7.2.7, created at 2025-03-09 17:37 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2025-03-09 17:37 +0000
2import datetime as dt
3from base64 import b64encode
4from uuid import uuid4
5from logging import getLogger
6import lib.overlay as overlay
7from lib.helper import read_json_file, write_json_file, binary_encode, binary_decode
10class Mail():
11 uuid: str
12 sender: str
13 receiver: str
14 origin: overlay.Node
15 target: overlay.Node
16 subject: str
17 body: str
18 created_at: dt.datetime
19 received_at: dt.datetime
20 valid_until: dt.datetime
21 forwarded_to: list
22 is_encrypted: bool
23 is_delivered: bool
24 is_new: bool
25 verified: str
26 sign_hash: str
27 sign: str
29 def __init__(self, uuid_s: str = None):
30 if uuid_s is None:
31 self.uuid = str(uuid4())
32 else:
33 self.uuid = uuid_s
34 self.sender = None
35 self.receiver = None
36 self.origin = None
37 self.target = None
38 self.subject = None
39 self.body = None
40 self.created_at = dt.datetime.now(dt.UTC)
41 self.received_at = None
42 self.valid_until = None
43 self.forwarded_to = []
44 self.is_encrypted = False
45 self.is_delivered = None
46 self.is_new = None
47 self.verified = None
48 self.sign_hash = None
49 self.sign = None
51 self._logger = getLogger('app.mail')
53 def __str__(self): # pragma: no cover
54 return 'Mail({})'.format(self.uuid)
56 def __repr__(self): # pragma: no cover
57 return self.__str__()
59 def as_dict(self) -> dict:
60 # self._logger.debug('as_dict()')
62 data = dict()
63 if self.sender is not None:
64 data['sender'] = self.sender
65 if self.receiver is not None:
66 data['receiver'] = self.receiver
67 if self.subject is not None:
68 data['subject'] = self.subject
69 if self.body is not None:
70 data['body'] = self.body
71 if self.created_at is not None:
72 data['created_at'] = self.created_at.isoformat()
73 if self.received_at is not None:
74 data['received_at'] = self.received_at.isoformat()
75 if self.valid_until is not None:
76 data['valid_until'] = self.valid_until.isoformat()
77 if self.forwarded_to is not None and len(self.forwarded_to) > 0:
78 data['forwarded_to'] = self.forwarded_to
79 if self.is_encrypted is not None:
80 data['is_encrypted'] = self.is_encrypted
81 if self.is_delivered is not None:
82 data['is_delivered'] = self.is_delivered
83 if self.is_new is not None:
84 data['is_new'] = self.is_new
85 if self.verified is not None:
86 data['verified'] = self.verified
87 if self.sign_hash is not None:
88 data['sign_hash'] = self.sign_hash
89 if self.sign is not None:
90 data['sign'] = self.sign
91 return data
93 def from_dict(self, data: dict):
94 self._logger.debug('from_dict() -> %s', data)
96 if 'sender' in data:
97 self.set_sender(data['sender'])
98 if 'receiver' in data:
99 self.set_receiver(data['receiver'])
101 if 'subject' in data:
102 self.subject = data['subject']
103 if 'body' in data:
104 self.body = data['body']
105 if 'created_at' in data:
106 self.created_at = dt.datetime.fromisoformat(data['created_at'])
107 if 'received_at' in data:
108 self.received_at = dt.datetime.fromisoformat(data['received_at'])
109 if 'valid_until' in data:
110 self.valid_until = dt.datetime.fromisoformat(data['valid_until'])
111 if 'forwarded_to' in data:
112 self.forwarded_to = data['forwarded_to']
113 if 'is_encrypted' in data:
114 self.is_encrypted = data['is_encrypted']
115 if 'is_delivered' in data:
116 self.is_delivered = data['is_delivered']
117 if 'is_new' in data:
118 self.is_new = bool(data['is_new'])
119 if 'verified' in data:
120 self.verified = data['verified']
121 if 'sign_hash' in data:
122 self.sign_hash = data['sign_hash']
123 if 'sign' in data:
124 self.sign = data['sign']
126 def received_now(self):
127 # self._logger.debug('received_now()')
129 self.received_at = dt.datetime.now(dt.UTC)
131 def set_sender(self, sender: str):
132 # self._logger.debug('set_sender(%s)', sender)
134 try:
135 self.origin = overlay.Node.parse(sender)
136 except:
137 self.origin = None
138 self.sender = None
139 else:
140 self.sender = sender
142 def set_receiver(self, receiver: str):
143 # self._logger.debug('set_receiver(%s)', receiver)
145 try:
146 self.target = overlay.Node.parse(receiver)
147 except:
148 self.target = None
149 self.receiver = None
150 else:
151 self.receiver = receiver
153 def encode(self) -> str:
154 self._logger.debug('encode()')
156 # uuid_len = len(self.uuid).to_bytes(1, 'little')
157 sender_len = len(self.sender).to_bytes(1, 'little')
158 receiver_len = len(self.receiver).to_bytes(1, 'little')
159 subject_len = len(self.subject).to_bytes(1, 'little')
160 body_len = len(self.body).to_bytes(4, 'little')
161 items = [
162 # b'\x00', uuid_len, self.uuid.encode(),
163 b'\x01\x13', self.created_at.strftime('%FT%T').encode(),
164 b'\x10', sender_len, self.sender.encode(),
165 b'\x11', receiver_len, self.receiver.encode(),
166 b'\x20', subject_len, self.subject.encode(),
167 b'\x21', body_len, self.body.encode(),
168 ]
169 raw = b''.join(items)
170 return b64encode(raw).decode()
172 def decode(self, data: bytes):
173 self._logger.debug('decode(%s)', data)
174 # print('data', data)
176 data_len = len(data)
178 pos = 0
179 while pos < data_len:
180 item_t = int.from_bytes(data[pos:pos+1], 'little')
181 pos += 1
183 # if item_t == 0x00:
184 if item_t == 0x01:
185 item_l = int.from_bytes(data[pos:pos+1], 'little')
186 pos += 1
187 val = data[pos:pos+item_l].decode()
188 self.created_at = dt.datetime.fromisoformat(val)
190 elif item_t == 0x10:
191 item_l = int.from_bytes(data[pos:pos+1], 'little')
192 pos += 1
193 val = data[pos:pos+item_l].decode()
194 self.set_sender(val)
196 elif item_t == 0x11:
197 item_l = int.from_bytes(data[pos:pos+1], 'little')
198 pos += 1
199 val = data[pos:pos+item_l].decode()
200 self.set_receiver(val)
202 elif item_t == 0x20:
203 item_l = int.from_bytes(data[pos:pos+1], 'little')
204 pos += 1
205 val = data[pos:pos+item_l].decode()
206 self.subject = val
208 elif item_t == 0x21:
209 item_l = int.from_bytes(data[pos:pos+4], 'little')
210 pos += 4
211 self._logger.debug('body length: %d', item_l)
213 val = data[pos:pos+item_l].decode()
214 self._logger.debug('body: "%s"', val)
216 self.body = val
218 else:
219 self._logger.warning('unknown type: %s', item_t)
220 val = None
221 item_l = 0
223 pos += item_l
225 self._logger.debug('type=%s(%s), length=%d(%s), value=%s(%s)', item_t, type(item_t), item_l, type(item_l), val, type(val))
227 def ipc_encode(self) -> bytes:
228 self._logger.debug('ipc_encode()')
230 data = {}
231 if self.uuid is not None:
232 data[0x00] = self.uuid
233 if self.created_at is not None:
234 data[0x01] = self.created_at.strftime('%FT%T')
235 if self.received_at is not None:
236 data[0x02] = self.received_at.strftime('%FT%T')
237 if self.valid_until is not None:
238 data[0x03] = self.valid_until.strftime('%FT%T')
239 if self.verified is not None:
240 data[0x10] = self.verified
241 if self.sender is not None:
242 data[0x20] = self.sender
243 if self.receiver is not None:
244 data[0x21] = self.receiver
245 if self.subject is not None:
246 data[0x30] = self.subject
247 if self.body is not None:
248 data[0x31] = self.body
250 self._logger.debug('data: %s', data)
252 return binary_encode(data)
254 def ipc_decode(self, raw):
255 self._logger.debug('ipc_decode()')
256 self._logger.debug('raw: %s "%s"', type(raw), raw)
258 data = binary_decode(raw)
260 self._logger.debug('data: %s %s', type(data), data)
262 if 0x00 in data:
263 self.uuid = data[0x00].decode()
264 if 0x01 in data:
265 item = data[0x01].decode()
266 self.created_at = dt.datetime.fromisoformat(item)
267 if 0x02 in data:
268 item = data[0x02].decode()
269 self.received_at = dt.datetime.fromisoformat(item)
270 if 0x03 in data:
271 item = data[0x03].decode()
272 self.valid_until = dt.datetime.fromisoformat(item)
273 if 0x10 in data:
274 self.verified = data[0x10].decode()
275 if 0x20 in data:
276 self.sender = data[0x20].decode()
277 if 0x21 in data:
278 self.receiver = data[0x21].decode()
279 if 0x30 in data:
280 self.subject = data[0x30].decode()
281 if 0x31 in data:
282 self.body = data[0x31].decode()
284class Queue():
285 _path: str
286 _config: dict
287 _mail_config: dict
288 _mails_by_uuid: dict[str, Mail]
289 _changes: bool
290 _retention_time: dt.timedelta
292 def __init__(self, path: str, config: dict = None):
293 self._path = path
294 self._config = config
295 self._mail_config = self._config['mail']
296 self._mails_by_uuid = dict()
297 self._changes = False
299 self._retention_time = dt.timedelta(hours=self._mail_config['retention_time'])
301 self._logger = getLogger('app.mail.Queue')
302 self._logger.info('init()')
304 def load(self):
305 self._logger.info('load')
307 _data = read_json_file(self._path, {})
308 for m_uuid, row in _data.items():
309 mail = Mail(m_uuid)
310 mail.from_dict(row)
312 self._logger.debug('load mail: %s', mail)
314 self._mails_by_uuid[m_uuid] = mail
316 def save(self) -> bool:
317 self._logger.info('save() changes=%s', self._changes)
319 if not self._changes:
320 return False
322 _data = dict()
323 for mail_uuid, mail in self._mails_by_uuid.items():
324 _data[mail_uuid] = mail.as_dict()
326 write_json_file(self._path, _data)
327 self._changes = False
329 return True
331 def add_mail(self, mail: Mail):
332 self._logger.debug('add_mail(%s)', mail)
334 mail.valid_until = dt.datetime.now(dt.UTC) + self._retention_time
336 self._mails_by_uuid[mail.uuid] = mail
337 self._changes = True
339 def get_mails(self) -> dict[str, Mail]:
340 return self._mails_by_uuid
342 def has_mail(self, mail_uuid: str) -> bool:
343 self._logger.debug('has_mail(%s)', mail_uuid)
344 return mail_uuid in self._mails_by_uuid
346 def changed(self):
347 self._changes = True
349 def clean_up(self):
350 self._logger.info('clean up')
352 remove_mails = []
354 def ffunc(_mail):
355 return _mail[1].valid_until is not None and dt.datetime.now(dt.UTC) >= _mail[1].valid_until
356 old_mails = list(filter(ffunc, self._mails_by_uuid.items()))
357 self._logger.debug('old mails A: %s', old_mails)
358 remove_mails += old_mails
360 for mail_uuid, mail in remove_mails:
361 self._logger.debug('remove mail: %s', mail)
363 del self._mails_by_uuid[mail_uuid]
364 self._changes = True
366class Database():
367 _path: str
368 _data: dict[str, Mail]
369 _changes: bool
371 def __init__(self, path: str) -> None:
372 self._path = path
373 self._data = dict()
374 self._changes = False
376 self._logger = getLogger('app.mail.Database')
377 self._logger.info('init()')
379 def load(self):
380 self._logger.info('load()')
382 data = read_json_file(self._path, {})
383 for mail_uuid, mail_raw in data.items():
384 mail = Mail(mail_uuid)
385 mail.from_dict(mail_raw)
387 self._logger.debug('load mail: %s', mail)
389 self._data[mail_uuid] = mail
391 def save(self):
392 self._logger.info('save() changes=%s', self._changes)
394 if not self._changes:
395 return
397 data = dict()
398 for mail_uuid, mail in self._data.items():
399 data[mail_uuid] = mail.as_dict()
401 write_json_file(self._path, data)
402 self._changes = False
404 def changed(self):
405 self._changes = True
407 def add_mail(self, mail: Mail):
408 self._logger.debug('add_mail %s', mail)
410 self._data[mail.uuid] = mail
411 self._changes = True
413 def has_mail(self, mail_uuid: str) -> bool:
414 self._logger.debug('has_mail %s', mail_uuid)
415 return mail_uuid in self._data
417 def get_mails(self) -> dict[str, Mail]:
418 return self._data
420 def get_mail(self, mail_uuid: str) -> Mail:
421 self._logger.debug('get_mail %s', mail_uuid)
422 self._logger.debug('_data %s', self._data)
424 try:
425 return self._data[mail_uuid]
426 except KeyError:
427 return None