WebBroadcastableMessage

This commit is contained in:
BarsTiger
2023-07-17 15:41:15 +03:00
parent 36d956af09
commit 95dd77d878
4 changed files with 123 additions and 30 deletions

View File

@@ -1,6 +1,7 @@
from attrs import define
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
import base64
@@ -20,9 +21,30 @@ class Identity:
public_exponent=65537,
key_size=4096
)
return self
def public_key(self):
return base64.urlsafe_b64encode(self.private_key.public_key().public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.PKCS1
))
def encrypt(self, content: str):
return base64.urlsafe_b64encode(self.private_key.public_key().encrypt(
content.encode(),
padding=padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
))
def decrypt(self, content: bytes):
return self.private_key.decrypt(
base64.urlsafe_b64decode(content),
padding=padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
).decode()

View File

@@ -1,22 +1 @@
from .webmessage import (
webmessage_type_literal,
webmessage_error_message_literal,
WebMessageMessage,
WebErrorMessage,
WebNotificationMessage,
WebUserMessage,
WebMessage,
webmessages_union
)
__all__ = [
'webmessage_type_literal',
'webmessage_error_message_literal',
'WebMessageMessage',
'WebErrorMessage',
'WebNotificationMessage',
'WebUserMessage',
'WebMessage',
'webmessages_union'
]
pass

View File

@@ -1,7 +1,12 @@
from typing import Literal, Final, Union
from dataclasses_json import dataclass_json
from dataclasses import dataclass
from dataclasses import dataclass, field
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
import base64
from ..encryption.identity import Identity
webmessage_type_literal = Literal[
"connect", "message", "disconnect", "error", "notification"
@@ -23,28 +28,61 @@ class _WebAnyMessage:
@dataclass_json
@dataclass
class WebMessageMessage:
"""
Sent as regular message
:param username: From user
:param message: Encrypted b64-encoded message
"""
username: str
message: str
message: bytes
type: Final = "message"
def decrypt(self, identity: Identity):
return identity.decrypt(self.message)
@dataclass_json
@dataclass
class WebErrorMessage:
"""
Sent when error on server occurs
:param error_message: See webmessage_error_message_literal
"""
error_message: webmessage_error_message_literal
type: Final = "error"
@dataclass_json
@dataclass
class WebUserMessage:
type: Literal["connect", "disconnect"]
class WebConnectionMessage:
"""
Sent when user is connected (sent by user)
:param username: Username of connected
:param public_key: b64-encoded rsa public key
"""
username: str
public_key: bytes
type = "connect"
@dataclass_json
@dataclass
class WebDisconnectMessage:
"""
Sent when user is disconnected
:param username: Username of disconnected
"""
username: str
type = "disconnect"
@dataclass_json
@dataclass
class WebNotificationMessage:
"""
Sent from server name as unencrypted notification
:param message: Message content, not encrypted
"""
message: str
type: Final = "notification"
@@ -52,18 +90,68 @@ class WebNotificationMessage:
webmessages_union = Union[
WebMessageMessage,
WebErrorMessage,
WebUserMessage,
WebConnectionMessage,
WebDisconnectMessage,
WebNotificationMessage
]
class WebMessage:
"""
Class for handling incoming webmessages
"""
@staticmethod
def from_json(data) -> webmessages_union:
"""
Restores webmessage object from json
:param data: Valid json data
:return: One of types from webmessages_union
"""
return {
"connect": WebUserMessage.from_json,
"disconnect": WebUserMessage.from_json,
"connect": WebConnectionMessage.from_json,
"disconnect": WebDisconnectMessage.from_json,
"message": WebMessageMessage.from_json,
"error": WebErrorMessage.from_json,
"notification": WebNotificationMessage.from_json
}[_WebAnyMessage.from_json(data).type](data)
@dataclass
class WebBroadcastableMessage:
"""
Class for creating outcoming messages
:param from_user: User, that send message
:param message_content: Text of message
:param keys: Dict with public keys in format username:public_key
"""
from_user: str
message_content: str
keys: dict[str, bytes]
encrypted_messages: dict[str, webmessages_union] = field(default_factory=dict)
def __post_init__(self):
for username in self.keys.keys():
public_key = serialization.load_der_public_key(
base64.urlsafe_b64decode(self.keys[username])
)
self.encrypted_messages[username] = WebMessageMessage(
username=self.from_user,
message=base64.urlsafe_b64encode(public_key.encrypt(
self.message_content.encode(),
padding=padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
))
)
def json(self):
return dict(
map(
lambda i, j: (i, j),
self.encrypted_messages.keys(),
[item.to_json() for item in self.encrypted_messages.values()]
)
)

View File

@@ -14,6 +14,10 @@ cryptography = "^41.0.2"
attrs = "^23.1.0"
[tool.poetry.dev-dependencies]
rich = "^13.4.2"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"