Coverage for src/lib/client.py: 85%

217 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2025-03-09 17:37 +0000

1 

2import datetime as dt 

3from socket import socket as Socket 

4from uuid import uuid4 

5from base64 import b64encode, b64decode 

6 

7from lib.cash import Cash 

8from lib.overlay import Node, Distance 

9from lib.helper import generate_id_from_public_key_rsa 

10 

11from cryptography.hazmat.primitives import serialization, hashes 

12from cryptography.hazmat.primitives.asymmetric import padding 

13 

14 

15class Action(): 

16 id: str 

17 subid: str 

18 is_strong: bool # Strong actions are not removed from the queue on soft_reset_actions() 

19 valid_until: dt.datetime 

20 

21 def __init__(self, id: str, subid: str = None, data = None): 

22 self.id = id 

23 self.subid = subid 

24 self.data = data 

25 self.is_strong = False 

26 self.func = None 

27 self.valid_until = None 

28 

29 def __str__(self): # pragma: no cover 

30 return 'Action({}/{},d={},s={})'.format(self.id, self.subid, self.data, self.is_strong) 

31 

32 def __repr__(self): # pragma: no cover 

33 return 'Action({}/{})'.format(self.id, self.subid) 

34 

35 def __eq__(self, other) -> bool: 

36 if not isinstance(other, Action): 

37 return False 

38 

39 if self.id == other.id and self.subid == other.subid: 

40 return True 

41 

42 return False 

43 

44 

45class Challenge(): 

46 min: int 

47 max: int 

48 data: str 

49 proof: str 

50 nonce: str 

51 

52 

53class Client(): 

54 uuid: str # Internal ID 

55 address: str 

56 port: int 

57 id: str 

58 seen_at: dt.datetime 

59 meetings: int 

60 is_bootstrap: bool 

61 is_trusted: bool 

62 debug_add: str 

63 

64 # Unmapped 

65 node: Node 

66 sock: Socket 

67 # buf: bytes 

68 

69 # Connection Mode 

70 # 0 = DISCONNECTED 

71 # 1 = CONNECTED 

72 # 2 = AUTHENTICATED (has sent ID command and received ID command) 

73 conn_mode: int 

74 conn_msg: str 

75 

76 # Directory Mode 

77 # i = incoming 

78 # o = outgoing 

79 dir_mode: str 

80 

81 # Authenticated (Binary) 

82 # 0, 0, 0, 0 = 0 (Not Authenticated) 

83 # 0, 0, 0, 1 = 1 (sent CHALLENGE command) 

84 # 0, 0, 1, 0 = 2 (received CHALLENGE command) 

85 # 0, 1, 0, 0 = 4 (sent ID command) 

86 # 1, 0, 0, 0 = 8 (received ID command) 

87 # 1, 1, 1, 1 = 15 (Authenticated both) 

88 auth: int 

89 

90 actions: list[Action] 

91 cash: Cash 

92 challenge: Challenge 

93 

94 def __init__(self): 

95 self.uuid = str(uuid4()) 

96 self.address = None 

97 self.port = None 

98 self.id = None 

99 self.created_at = dt.datetime.now(dt.UTC) 

100 self.seen_at = dt.datetime.strptime('2001-01-01 00:00:00+0000', '%Y-%m-%d %H:%M:%S%z') 

101 self.used_at = dt.datetime.now(dt.UTC) 

102 self.meetings = 0 

103 self.is_bootstrap = False 

104 self.is_trusted = False 

105 self.debug_add = 'Init' 

106 

107 # Unmapped 

108 self.node = None 

109 self.sock = None 

110 self.conn_mode = 0 

111 self.conn_msg = 'Init' 

112 self.dir_mode = None 

113 self.auth = 0 

114 self.actions = list() 

115 self.public_key = None 

116 self.cash = None 

117 

118 self.challenge = Challenge() 

119 

120 def __str__(self): 

121 return 'Client({},{}:{},ID={},c={},d={},a={},ac={})'.format(self.uuid, self.address, self.port, self.id, self.conn_mode, self.dir_mode, self.auth, len(self.actions)) 

122 

123 def __repr__(self): # pragma: no cover 

124 return 'Client({})'.format(self.uuid) 

125 

126 def __eq__(self, other) -> bool: 

127 if not isinstance(other, Client): 

128 return False 

129 

130 if self.id == '' or other.id == '': 

131 return False 

132 

133 if self.uuid is not None and other.uuid is not None and self.uuid == other.uuid: 

134 return True 

135 

136 if self.id is None or other.id is None: 

137 return False 

138 

139 return self.id == other.id 

140 

141 def as_dict(self) -> dict: 

142 data = dict() 

143 if self.address is not None: 

144 data['address'] = self.address 

145 if self.port is not None: 

146 data['port'] = self.port 

147 if self.id is not None: 

148 data['id'] = self.id 

149 if self.created_at is not None: 

150 data['created_at'] = self.created_at.isoformat() 

151 if self.seen_at is not None: 

152 data['seen_at'] = self.seen_at.isoformat() 

153 if self.used_at is not None: 

154 data['used_at'] = self.used_at.isoformat() 

155 if self.meetings is not None: 

156 data['meetings'] = self.meetings 

157 if self.is_bootstrap: 

158 data['is_bootstrap'] = self.is_bootstrap 

159 if self.is_trusted: 

160 data['is_trusted'] = self.is_trusted 

161 if self.debug_add is not None: 

162 data['debug_add'] = self.debug_add 

163 

164 return data 

165 

166 def from_dict(self, data: dict): 

167 if 'address' in data: 

168 self.address = data['address'] 

169 if 'port' in data: 

170 self.port = int(data['port']) 

171 if 'id' in data: 

172 self.set_id(data['id']) 

173 if 'created_at' in data: 

174 self.created_at = dt.datetime.fromisoformat(data['created_at']) 

175 if 'seen_at' in data: 

176 self.seen_at = dt.datetime.fromisoformat(data['seen_at']) 

177 if 'used_at' in data: 

178 self.used_at = dt.datetime.fromisoformat(data['used_at']) 

179 if 'meetings' in data: 

180 self.meetings = int(data['meetings']) 

181 if 'is_bootstrap' in data: 

182 self.is_bootstrap = data['is_bootstrap'] 

183 if 'is_trusted' in data: 

184 self.is_trusted = data['is_trusted'] 

185 if 'debug_add' in data: 

186 self.debug_add = data['debug_add'] 

187 

188 def from_list(self, data: list): 

189 self.id = data[0] 

190 

191 if len(data) >= 3: 

192 self.address = data[1] 

193 self.port = int(data[2]) 

194 

195 def refresh_seen_at(self): 

196 self.seen_at = dt.datetime.now(dt.UTC) 

197 

198 def refresh_used_at(self): 

199 self.used_at = dt.datetime.now(dt.UTC) 

200 

201 def inc_meetings(self): 

202 self.meetings += 1 

203 

204 def set_id(self, id: str): 

205 self.id = id 

206 self.node = Node.parse(id) 

207 

208 def distance(self, node: Node) -> int: 

209 if self.node is None: 

210 return Distance() 

211 

212 return self.node.distance(node) 

213 

214 def add_action(self, action: Action): 

215 self.actions.append(action) 

216 

217 def get_actions(self, soft_reset: bool = False) -> list: 

218 _actions = list(self.actions) 

219 if soft_reset: 

220 self.soft_reset_actions() 

221 return _actions 

222 

223 # Remove actions with is_strong == False 

224 def soft_reset_actions(self) -> list: 

225 strong_actions = list(filter(lambda _action: _action.is_strong, self.actions)) 

226 actions = list(self.actions) 

227 self.actions = strong_actions 

228 return actions 

229 

230 def has_action(self, id: str, subid: str = None) -> bool: 

231 def ffunc(_action): 

232 return _action.id == id and _action.subid == subid 

233 found = list(filter(ffunc, self.actions)) 

234 return len(found) > 0 

235 

236 # Search for action by id and subid and remove it from actions list. 

237 # Keep Strong actions. 

238 # Force remove will also remove strong actions. 

239 def resolve_action(self, id: str, subid: str = None, force_remove: bool = False) -> Action: 

240 def ffunc(_action): 

241 return _action.id == id and _action.subid == subid 

242 found = list(filter(ffunc, self.actions)) 

243 if len(found) > 0: 

244 if not found[0].is_strong or force_remove: 

245 self.actions.remove(found[0]) 

246 return found[0] 

247 return None 

248 

249 def remove_action(self, action: Action): 

250 self.actions.remove(action) 

251 

252 def has_contact(self) -> bool: 

253 return self.address is not None and self.port is not None 

254 

255 def load_public_key_from_pem_file(self, path: str): 

256 with open(path, 'rb') as f: 

257 key = f.read() 

258 

259 self.public_key = serialization.load_pem_public_key(key) 

260 

261 def write_public_key_to_pem_file(self, path: str) -> bool: 

262 if not self.has_public_key(): 

263 return False 

264 

265 # PEM is used to store public keys in Base64 encoded format, with header and footer. 

266 public_key_pem = self.public_key.public_bytes( 

267 encoding=serialization.Encoding.PEM, 

268 format=serialization.PublicFormat. SubjectPublicKeyInfo 

269 ) 

270 

271 with open(path, 'wb') as f: 

272 f.write(public_key_pem) 

273 

274 return True 

275 

276 def load_public_key_from_pem(self, raw: str): 

277 print(f'-> load_public_key_from_pem: {raw}') 

278 der_key = b64decode(raw) 

279 # self.public_key = serialization.load_pem_public_key(raw) 

280 self.public_key = serialization.load_der_public_key(der_key) 

281 

282 def get_base64_public_key(self) -> str: 

283 if not self.has_public_key(): 

284 return None 

285 

286 # DER is binary representation of public key. 

287 public_bin = self.public_key.public_bytes( 

288 encoding=serialization.Encoding.DER, 

289 format=serialization.PublicFormat.SubjectPublicKeyInfo 

290 ) 

291 

292 return b64encode(public_bin).decode() 

293 

294 def reset_public_key(self): 

295 self.public_key = None 

296 

297 def has_public_key(self) -> bool: 

298 return self.public_key is not None 

299 

300 def verify_public_key(self) -> bool: 

301 if not self.has_public_key(): 

302 return False 

303 

304 return generate_id_from_public_key_rsa(self.public_key) == self.id 

305 

306 def encrypt(self, data: bytes) -> bytes: 

307 if not self.has_public_key(): 

308 return None 

309 

310 return self.public_key.encrypt( 

311 data, 

312 padding.OAEP( 

313 mgf=padding.MGF1(algorithm=hashes.SHA256()), 

314 algorithm=hashes.SHA256(), 

315 label=None 

316 ) 

317 ) 

318 

319 def reset(self): 

320 self.sock = None 

321 self.conn_mode = 0 

322 self.dir_mode = None 

323 self.auth = 0 

324 self.actions = [] 

325 self.challenge = Challenge()