Coverage for src/lib/helper.py: 100%
71 statements
« prev ^ index » next coverage.py v7.2.7, created at 2025-03-09 17:37 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2025-03-09 17:37 +0000
2from secrets import token_bytes
3from base58 import b58encode
4from base64 import b64encode
5from json import dump, load
6from os import getenv, path
7from uuid import UUID
9from cryptography.hazmat.primitives import serialization, hashes
10from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
11from cryptography.hazmat.primitives.asymmetric import rsa
14# Generate ID from Public Key
15def generate_id_from_public_key_file(file_path: str) -> str:
16 f = open(file_path, 'rb')
17 key_data = f.read()
18 f.close()
20 public_key = serialization.load_pem_public_key(key_data)
22 return generate_id_from_public_key_rsa(public_key)
24# Generate ID from Public Key Data
25def generate_id_from_public_key_rsa(public_key: rsa.RSAPublicKey) -> str:
26 # DER is binary representation of public key.
27 public_bin = public_key.public_bytes(
28 encoding=serialization.Encoding.DER,
29 format=serialization.PublicFormat.SubjectPublicKeyInfo
30 )
32 # print('public_bin:', public_bin)
34 return generate_id_from_public_key_der(public_bin)
36def generate_id_from_public_key_der(public_key_bytes: bytes) -> str:
37 hasher = hashes.Hash(hashes.SHA256())
38 hasher.update(public_key_bytes)
39 digest = hasher.finalize()
41 # print('digest:', digest.hex())
43 base58_hash = b58encode(digest).decode()
44 return f'FC_{base58_hash}'
46def generate_test_id() -> str: # pragma: no cover
47 public_bytes = token_bytes(20)
49 hasher = hashes.Hash(hashes.SHA256())
50 hasher.update(public_bytes)
51 digest = hasher.finalize()
53 base58_hash = b58encode(digest).decode()
54 return f'FC_{base58_hash}'
56def password_key_derivation(key_password: bytes) -> str:
57 iterations = int(getenv('FLUXCHAT_KEY_DERIVATION_ITERATIONS', 600000))
59 salt = b'FluxChat_Static_Salt'
60 kdf = PBKDF2HMAC(
61 algorithm=hashes.SHA256(),
62 length=64,
63 salt=salt,
64 iterations=iterations,
65 )
66 kdf_b = kdf.derive(key_password)
68 # base64
69 kdf_b64 = b64encode(kdf_b).decode()
71 return kdf_b64
73def write_json_file(file_path: str, data):
74 with open(file_path, 'w') as write_file:
75 dump(data, write_file, indent=4)
77def read_json_file(file_path: str, default = None) -> dict:
78 if not path.exists(file_path) and default is not None:
79 write_json_file(file_path, default)
81 with open(file_path, 'r') as read_file:
82 return load(read_file)
84def is_valid_uuid(id: str):
85 try:
86 obj = UUID(id, version=4)
87 except ValueError:
88 return False
89 return str(obj) == id
91# key: data
92def binary_encode(data: dict, max_len: int = 4) -> bytes:
93 items = []
94 for key, value in data.items():
95 d_len = len(value)
97 try:
98 items.append(key.to_bytes(1, 'little'))
99 except AttributeError as e:
100 # print('type:', type(key))
101 # print('key:', key)
102 raise e
104 items.append(d_len.to_bytes(max_len, 'little'))
106 if isinstance(value, bytes):
107 items.append(value)
108 elif isinstance(value, str):
109 items.append(value.encode())
111 return b''.join(items)
113def binary_decode(data: bytes, max_len: int = 4) -> dict:
114 data_len = len(data)
115 pos = 0
116 items = {}
117 while pos < data_len:
118 item_t = int.from_bytes(data[pos:pos+1], 'little')
119 pos += 1
120 # print('item_t:', item_t)
122 item_l = int.from_bytes(data[pos:pos+max_len], 'little')
123 pos += max_len
124 # print('item_l:', item_l)
126 items[item_t] = data[pos:pos+item_l]
127 # print('item:', items[item_t])
129 pos += item_l
131 return items