Coverage for src/lib/mail.py: 61%

322 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2025-03-09 17:37 +0000

1 

2import datetime as dt 

3from base64 import b64encode 

4from uuid import uuid4 

5from logging import getLogger 

6import lib.overlay as overlay 

7from lib.helper import read_json_file, write_json_file, binary_encode, binary_decode 

8 

9 

10class Mail(): 

11 uuid: str 

12 sender: str 

13 receiver: str 

14 origin: overlay.Node 

15 target: overlay.Node 

16 subject: str 

17 body: str 

18 created_at: dt.datetime 

19 received_at: dt.datetime 

20 valid_until: dt.datetime 

21 forwarded_to: list 

22 is_encrypted: bool 

23 is_delivered: bool 

24 is_new: bool 

25 verified: str 

26 sign_hash: str 

27 sign: str 

28 

29 def __init__(self, uuid_s: str = None): 

30 if uuid_s is None: 

31 self.uuid = str(uuid4()) 

32 else: 

33 self.uuid = uuid_s 

34 self.sender = None 

35 self.receiver = None 

36 self.origin = None 

37 self.target = None 

38 self.subject = None 

39 self.body = None 

40 self.created_at = dt.datetime.now(dt.UTC) 

41 self.received_at = None 

42 self.valid_until = None 

43 self.forwarded_to = [] 

44 self.is_encrypted = False 

45 self.is_delivered = None 

46 self.is_new = None 

47 self.verified = None 

48 self.sign_hash = None 

49 self.sign = None 

50 

51 self._logger = getLogger('app.mail') 

52 

53 def __str__(self): # pragma: no cover 

54 return 'Mail({})'.format(self.uuid) 

55 

56 def __repr__(self): # pragma: no cover 

57 return self.__str__() 

58 

59 def as_dict(self) -> dict: 

60 # self._logger.debug('as_dict()') 

61 

62 data = dict() 

63 if self.sender is not None: 

64 data['sender'] = self.sender 

65 if self.receiver is not None: 

66 data['receiver'] = self.receiver 

67 if self.subject is not None: 

68 data['subject'] = self.subject 

69 if self.body is not None: 

70 data['body'] = self.body 

71 if self.created_at is not None: 

72 data['created_at'] = self.created_at.isoformat() 

73 if self.received_at is not None: 

74 data['received_at'] = self.received_at.isoformat() 

75 if self.valid_until is not None: 

76 data['valid_until'] = self.valid_until.isoformat() 

77 if self.forwarded_to is not None and len(self.forwarded_to) > 0: 

78 data['forwarded_to'] = self.forwarded_to 

79 if self.is_encrypted is not None: 

80 data['is_encrypted'] = self.is_encrypted 

81 if self.is_delivered is not None: 

82 data['is_delivered'] = self.is_delivered 

83 if self.is_new is not None: 

84 data['is_new'] = self.is_new 

85 if self.verified is not None: 

86 data['verified'] = self.verified 

87 if self.sign_hash is not None: 

88 data['sign_hash'] = self.sign_hash 

89 if self.sign is not None: 

90 data['sign'] = self.sign 

91 return data 

92 

93 def from_dict(self, data: dict): 

94 self._logger.debug('from_dict() -> %s', data) 

95 

96 if 'sender' in data: 

97 self.set_sender(data['sender']) 

98 if 'receiver' in data: 

99 self.set_receiver(data['receiver']) 

100 

101 if 'subject' in data: 

102 self.subject = data['subject'] 

103 if 'body' in data: 

104 self.body = data['body'] 

105 if 'created_at' in data: 

106 self.created_at = dt.datetime.fromisoformat(data['created_at']) 

107 if 'received_at' in data: 

108 self.received_at = dt.datetime.fromisoformat(data['received_at']) 

109 if 'valid_until' in data: 

110 self.valid_until = dt.datetime.fromisoformat(data['valid_until']) 

111 if 'forwarded_to' in data: 

112 self.forwarded_to = data['forwarded_to'] 

113 if 'is_encrypted' in data: 

114 self.is_encrypted = data['is_encrypted'] 

115 if 'is_delivered' in data: 

116 self.is_delivered = data['is_delivered'] 

117 if 'is_new' in data: 

118 self.is_new = bool(data['is_new']) 

119 if 'verified' in data: 

120 self.verified = data['verified'] 

121 if 'sign_hash' in data: 

122 self.sign_hash = data['sign_hash'] 

123 if 'sign' in data: 

124 self.sign = data['sign'] 

125 

126 def received_now(self): 

127 # self._logger.debug('received_now()') 

128 

129 self.received_at = dt.datetime.now(dt.UTC) 

130 

131 def set_sender(self, sender: str): 

132 # self._logger.debug('set_sender(%s)', sender) 

133 

134 try: 

135 self.origin = overlay.Node.parse(sender) 

136 except: 

137 self.origin = None 

138 self.sender = None 

139 else: 

140 self.sender = sender 

141 

142 def set_receiver(self, receiver: str): 

143 # self._logger.debug('set_receiver(%s)', receiver) 

144 

145 try: 

146 self.target = overlay.Node.parse(receiver) 

147 except: 

148 self.target = None 

149 self.receiver = None 

150 else: 

151 self.receiver = receiver 

152 

153 def encode(self) -> str: 

154 self._logger.debug('encode()') 

155 

156 # uuid_len = len(self.uuid).to_bytes(1, 'little') 

157 sender_len = len(self.sender).to_bytes(1, 'little') 

158 receiver_len = len(self.receiver).to_bytes(1, 'little') 

159 subject_len = len(self.subject).to_bytes(1, 'little') 

160 body_len = len(self.body).to_bytes(4, 'little') 

161 items = [ 

162 # b'\x00', uuid_len, self.uuid.encode(), 

163 b'\x01\x13', self.created_at.strftime('%FT%T').encode(), 

164 b'\x10', sender_len, self.sender.encode(), 

165 b'\x11', receiver_len, self.receiver.encode(), 

166 b'\x20', subject_len, self.subject.encode(), 

167 b'\x21', body_len, self.body.encode(), 

168 ] 

169 raw = b''.join(items) 

170 return b64encode(raw).decode() 

171 

172 def decode(self, data: bytes): 

173 self._logger.debug('decode(%s)', data) 

174 # print('data', data) 

175 

176 data_len = len(data) 

177 

178 pos = 0 

179 while pos < data_len: 

180 item_t = int.from_bytes(data[pos:pos+1], 'little') 

181 pos += 1 

182 

183 # if item_t == 0x00: 

184 if item_t == 0x01: 

185 item_l = int.from_bytes(data[pos:pos+1], 'little') 

186 pos += 1 

187 val = data[pos:pos+item_l].decode() 

188 self.created_at = dt.datetime.fromisoformat(val) 

189 

190 elif item_t == 0x10: 

191 item_l = int.from_bytes(data[pos:pos+1], 'little') 

192 pos += 1 

193 val = data[pos:pos+item_l].decode() 

194 self.set_sender(val) 

195 

196 elif item_t == 0x11: 

197 item_l = int.from_bytes(data[pos:pos+1], 'little') 

198 pos += 1 

199 val = data[pos:pos+item_l].decode() 

200 self.set_receiver(val) 

201 

202 elif item_t == 0x20: 

203 item_l = int.from_bytes(data[pos:pos+1], 'little') 

204 pos += 1 

205 val = data[pos:pos+item_l].decode() 

206 self.subject = val 

207 

208 elif item_t == 0x21: 

209 item_l = int.from_bytes(data[pos:pos+4], 'little') 

210 pos += 4 

211 self._logger.debug('body length: %d', item_l) 

212 

213 val = data[pos:pos+item_l].decode() 

214 self._logger.debug('body: "%s"', val) 

215 

216 self.body = val 

217 

218 else: 

219 self._logger.warning('unknown type: %s', item_t) 

220 val = None 

221 item_l = 0 

222 

223 pos += item_l 

224 

225 self._logger.debug('type=%s(%s), length=%d(%s), value=%s(%s)', item_t, type(item_t), item_l, type(item_l), val, type(val)) 

226 

227 def ipc_encode(self) -> bytes: 

228 self._logger.debug('ipc_encode()') 

229 

230 data = {} 

231 if self.uuid is not None: 

232 data[0x00] = self.uuid 

233 if self.created_at is not None: 

234 data[0x01] = self.created_at.strftime('%FT%T') 

235 if self.received_at is not None: 

236 data[0x02] = self.received_at.strftime('%FT%T') 

237 if self.valid_until is not None: 

238 data[0x03] = self.valid_until.strftime('%FT%T') 

239 if self.verified is not None: 

240 data[0x10] = self.verified 

241 if self.sender is not None: 

242 data[0x20] = self.sender 

243 if self.receiver is not None: 

244 data[0x21] = self.receiver 

245 if self.subject is not None: 

246 data[0x30] = self.subject 

247 if self.body is not None: 

248 data[0x31] = self.body 

249 

250 self._logger.debug('data: %s', data) 

251 

252 return binary_encode(data) 

253 

254 def ipc_decode(self, raw): 

255 self._logger.debug('ipc_decode()') 

256 self._logger.debug('raw: %s "%s"', type(raw), raw) 

257 

258 data = binary_decode(raw) 

259 

260 self._logger.debug('data: %s %s', type(data), data) 

261 

262 if 0x00 in data: 

263 self.uuid = data[0x00].decode() 

264 if 0x01 in data: 

265 item = data[0x01].decode() 

266 self.created_at = dt.datetime.fromisoformat(item) 

267 if 0x02 in data: 

268 item = data[0x02].decode() 

269 self.received_at = dt.datetime.fromisoformat(item) 

270 if 0x03 in data: 

271 item = data[0x03].decode() 

272 self.valid_until = dt.datetime.fromisoformat(item) 

273 if 0x10 in data: 

274 self.verified = data[0x10].decode() 

275 if 0x20 in data: 

276 self.sender = data[0x20].decode() 

277 if 0x21 in data: 

278 self.receiver = data[0x21].decode() 

279 if 0x30 in data: 

280 self.subject = data[0x30].decode() 

281 if 0x31 in data: 

282 self.body = data[0x31].decode() 

283 

284class Queue(): 

285 _path: str 

286 _config: dict 

287 _mail_config: dict 

288 _mails_by_uuid: dict[str, Mail] 

289 _changes: bool 

290 _retention_time: dt.timedelta 

291 

292 def __init__(self, path: str, config: dict = None): 

293 self._path = path 

294 self._config = config 

295 self._mail_config = self._config['mail'] 

296 self._mails_by_uuid = dict() 

297 self._changes = False 

298 

299 self._retention_time = dt.timedelta(hours=self._mail_config['retention_time']) 

300 

301 self._logger = getLogger('app.mail.Queue') 

302 self._logger.info('init()') 

303 

304 def load(self): 

305 self._logger.info('load') 

306 

307 _data = read_json_file(self._path, {}) 

308 for m_uuid, row in _data.items(): 

309 mail = Mail(m_uuid) 

310 mail.from_dict(row) 

311 

312 self._logger.debug('load mail: %s', mail) 

313 

314 self._mails_by_uuid[m_uuid] = mail 

315 

316 def save(self) -> bool: 

317 self._logger.info('save() changes=%s', self._changes) 

318 

319 if not self._changes: 

320 return False 

321 

322 _data = dict() 

323 for mail_uuid, mail in self._mails_by_uuid.items(): 

324 _data[mail_uuid] = mail.as_dict() 

325 

326 write_json_file(self._path, _data) 

327 self._changes = False 

328 

329 return True 

330 

331 def add_mail(self, mail: Mail): 

332 self._logger.debug('add_mail(%s)', mail) 

333 

334 mail.valid_until = dt.datetime.now(dt.UTC) + self._retention_time 

335 

336 self._mails_by_uuid[mail.uuid] = mail 

337 self._changes = True 

338 

339 def get_mails(self) -> dict[str, Mail]: 

340 return self._mails_by_uuid 

341 

342 def has_mail(self, mail_uuid: str) -> bool: 

343 self._logger.debug('has_mail(%s)', mail_uuid) 

344 return mail_uuid in self._mails_by_uuid 

345 

346 def changed(self): 

347 self._changes = True 

348 

349 def clean_up(self): 

350 self._logger.info('clean up') 

351 

352 remove_mails = [] 

353 

354 def ffunc(_mail): 

355 return _mail[1].valid_until is not None and dt.datetime.now(dt.UTC) >= _mail[1].valid_until 

356 old_mails = list(filter(ffunc, self._mails_by_uuid.items())) 

357 self._logger.debug('old mails A: %s', old_mails) 

358 remove_mails += old_mails 

359 

360 for mail_uuid, mail in remove_mails: 

361 self._logger.debug('remove mail: %s', mail) 

362 

363 del self._mails_by_uuid[mail_uuid] 

364 self._changes = True 

365 

366class Database(): 

367 _path: str 

368 _data: dict[str, Mail] 

369 _changes: bool 

370 

371 def __init__(self, path: str) -> None: 

372 self._path = path 

373 self._data = dict() 

374 self._changes = False 

375 

376 self._logger = getLogger('app.mail.Database') 

377 self._logger.info('init()') 

378 

379 def load(self): 

380 self._logger.info('load()') 

381 

382 data = read_json_file(self._path, {}) 

383 for mail_uuid, mail_raw in data.items(): 

384 mail = Mail(mail_uuid) 

385 mail.from_dict(mail_raw) 

386 

387 self._logger.debug('load mail: %s', mail) 

388 

389 self._data[mail_uuid] = mail 

390 

391 def save(self): 

392 self._logger.info('save() changes=%s', self._changes) 

393 

394 if not self._changes: 

395 return 

396 

397 data = dict() 

398 for mail_uuid, mail in self._data.items(): 

399 data[mail_uuid] = mail.as_dict() 

400 

401 write_json_file(self._path, data) 

402 self._changes = False 

403 

404 def changed(self): 

405 self._changes = True 

406 

407 def add_mail(self, mail: Mail): 

408 self._logger.debug('add_mail %s', mail) 

409 

410 self._data[mail.uuid] = mail 

411 self._changes = True 

412 

413 def has_mail(self, mail_uuid: str) -> bool: 

414 self._logger.debug('has_mail %s', mail_uuid) 

415 return mail_uuid in self._data 

416 

417 def get_mails(self) -> dict[str, Mail]: 

418 return self._data 

419 

420 def get_mail(self, mail_uuid: str) -> Mail: 

421 self._logger.debug('get_mail %s', mail_uuid) 

422 self._logger.debug('_data %s', self._data) 

423 

424 try: 

425 return self._data[mail_uuid] 

426 except KeyError: 

427 return None