Coverage for src/lib/helper.py: 100%

71 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2025-03-09 17:37 +0000

1 

2from secrets import token_bytes 

3from base58 import b58encode 

4from base64 import b64encode 

5from json import dump, load 

6from os import getenv, path 

7from uuid import UUID 

8 

9from cryptography.hazmat.primitives import serialization, hashes 

10from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC 

11from cryptography.hazmat.primitives.asymmetric import rsa 

12 

13 

14# Generate ID from Public Key 

15def generate_id_from_public_key_file(file_path: str) -> str: 

16 f = open(file_path, 'rb') 

17 key_data = f.read() 

18 f.close() 

19 

20 public_key = serialization.load_pem_public_key(key_data) 

21 

22 return generate_id_from_public_key_rsa(public_key) 

23 

24# Generate ID from Public Key Data 

25def generate_id_from_public_key_rsa(public_key: rsa.RSAPublicKey) -> str: 

26 # DER is binary representation of public key. 

27 public_bin = public_key.public_bytes( 

28 encoding=serialization.Encoding.DER, 

29 format=serialization.PublicFormat.SubjectPublicKeyInfo 

30 ) 

31 

32 # print('public_bin:', public_bin) 

33 

34 return generate_id_from_public_key_der(public_bin) 

35 

36def generate_id_from_public_key_der(public_key_bytes: bytes) -> str: 

37 hasher = hashes.Hash(hashes.SHA256()) 

38 hasher.update(public_key_bytes) 

39 digest = hasher.finalize() 

40 

41 # print('digest:', digest.hex()) 

42 

43 base58_hash = b58encode(digest).decode() 

44 return f'FC_{base58_hash}' 

45 

46def generate_test_id() -> str: # pragma: no cover 

47 public_bytes = token_bytes(20) 

48 

49 hasher = hashes.Hash(hashes.SHA256()) 

50 hasher.update(public_bytes) 

51 digest = hasher.finalize() 

52 

53 base58_hash = b58encode(digest).decode() 

54 return f'FC_{base58_hash}' 

55 

56def password_key_derivation(key_password: bytes) -> str: 

57 iterations = int(getenv('FLUXCHAT_KEY_DERIVATION_ITERATIONS', 600000)) 

58 

59 salt = b'FluxChat_Static_Salt' 

60 kdf = PBKDF2HMAC( 

61 algorithm=hashes.SHA256(), 

62 length=64, 

63 salt=salt, 

64 iterations=iterations, 

65 ) 

66 kdf_b = kdf.derive(key_password) 

67 

68 # base64 

69 kdf_b64 = b64encode(kdf_b).decode() 

70 

71 return kdf_b64 

72 

73def write_json_file(file_path: str, data): 

74 with open(file_path, 'w') as write_file: 

75 dump(data, write_file, indent=4) 

76 

77def read_json_file(file_path: str, default = None) -> dict: 

78 if not path.exists(file_path) and default is not None: 

79 write_json_file(file_path, default) 

80 

81 with open(file_path, 'r') as read_file: 

82 return load(read_file) 

83 

84def is_valid_uuid(id: str): 

85 try: 

86 obj = UUID(id, version=4) 

87 except ValueError: 

88 return False 

89 return str(obj) == id 

90 

91# key: data 

92def binary_encode(data: dict, max_len: int = 4) -> bytes: 

93 items = [] 

94 for key, value in data.items(): 

95 d_len = len(value) 

96 

97 try: 

98 items.append(key.to_bytes(1, 'little')) 

99 except AttributeError as e: 

100 # print('type:', type(key)) 

101 # print('key:', key) 

102 raise e 

103 

104 items.append(d_len.to_bytes(max_len, 'little')) 

105 

106 if isinstance(value, bytes): 

107 items.append(value) 

108 elif isinstance(value, str): 

109 items.append(value.encode()) 

110 

111 return b''.join(items) 

112 

113def binary_decode(data: bytes, max_len: int = 4) -> dict: 

114 data_len = len(data) 

115 pos = 0 

116 items = {} 

117 while pos < data_len: 

118 item_t = int.from_bytes(data[pos:pos+1], 'little') 

119 pos += 1 

120 # print('item_t:', item_t) 

121 

122 item_l = int.from_bytes(data[pos:pos+max_len], 'little') 

123 pos += max_len 

124 # print('item_l:', item_l) 

125 

126 items[item_t] = data[pos:pos+item_l] 

127 # print('item:', items[item_t]) 

128 

129 pos += item_l 

130 

131 return items