0
0
mirror of https://gitflic.ru/project/maks1ms/ocab.git synced 2025-04-21 00:43:45 +03:00
ocab/src/ocab_modules/standard/fsm_database_storage/fsm.py
2024-07-13 22:33:10 +03:00

123 lines
3.3 KiB
Python

import json
from typing import Any, Dict, Optional
from aiogram.fsm.state import State
from aiogram.fsm.storage.base import BaseStorage, StorageKey
from ocab_core.modules_system.public_api import get_module, log
from ocab_core.modules_system.public_api.public_api import set_fsm
FSMDataRepository = get_module("standard.database", "repositories.FSMDataRepository")
def serialize_key(key: StorageKey) -> str:
return f"{key.bot_id}:{key.chat_id}:{key.user_id}"
def serialize_object(obj: object) -> str | None:
try:
return json.dumps(obj)
except Exception as e:
log(f"Serializing error! {e}")
return None
def deserialize_object(obj):
try:
return json.loads(obj)
except Exception as e:
log(f"Deserializing error! {e}")
return None
class SQLStorage(BaseStorage):
def __init__(self):
super().__init__()
self.repo = FSMDataRepository()
async def set_state(self, key: StorageKey, state: State | None = None) -> None:
"""
Set state for specified key
:param key: storage key
:param state: new state
"""
s_key = serialize_key(key)
s_state = state.state if isinstance(state, State) else state
try:
self.repo.set_state(s_key, s_state)
except Exception as e:
log(f"FSM Storage database error: {e}")
async def get_state(self, key: StorageKey) -> Optional[str]:
"""
Get key state
:param key: storage key
:return: current state
"""
s_key = serialize_key(key)
try:
s_state = self.repo.get(s_key)
return s_state.state if s_state else None
except Exception as e:
log(f"FSM Storage database error: {e}")
return None
async def set_data(self, key: StorageKey, data: Dict[str, Any]) -> None:
"""
Write data (replace)
:param key: storage key
:param data: new data
"""
s_key = serialize_key(key)
s_data = serialize_object(data)
try:
self.repo.set_data(s_key, s_data)
except Exception as e:
log(f"FSM Storage database error: {e}")
async def get_data(self, key: StorageKey) -> Optional[Dict[str, Any]]:
"""
Get current data for key
:param key: storage key
:return: current data
"""
s_key = serialize_key(key)
try:
s_data = self.repo.get(s_key)
return deserialize_object(s_data.data) if s_data else None
except Exception as e:
log(f"FSM Storage database error: {e}")
return None
async def update_data(
self, key: StorageKey, data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Update data in the storage for key (like dict.update)
:param key: storage key
:param data: partial data
:return: new data
"""
current_data = await self.get_data(key=key)
if not current_data:
current_data = {}
current_data.update(data)
await self.set_data(key=key, data=current_data)
return current_data.copy()
async def close(self) -> None: # pragma: no cover
pass
async def module_init():
set_fsm(SQLStorage())