import redis import hashlib import json from typing import Optional, Dict, Any class ChatData: def __init__(self, chat_id: str = "", current_message_id: str = ""): self.chat_id = chat_id self.current_message_id = current_message_id def serialize(self) -> str: """Serialize to string format""" return f"{self.chat_id};{self.current_message_id}" @staticmethod def deserialize(text: str) -> 'ChatData': """Deserialize from string format""" parts = text.split(';') if len(parts) > 2: raise ValueError("Invalid cache data") chat_id = parts[0] if len(parts) > 0 else "" current_message_id = parts[1] if len(parts) > 1 else "" return ChatData(chat_id=chat_id, current_message_id=current_message_id) class Cache: def __init__(self, redis_addr: str = "localhost:6379"): """Initialize cache with Redis connection""" host, port = redis_addr.split(':') self.redis = redis.Redis(host=host, port=int(port), decode_responses=True) # Test connection self.redis.ping() def _get_key(self, token: str, title: str) -> str: """Generate cache key using FNV-1a hash""" combined = f"{token};{title}" # Simple FNV-1a hash implementation hash_value = 0xcbf29ce484222325 for byte in combined.encode('utf-8'): hash_value ^= byte hash_value = (hash_value * 0x100000001b3) & 0xffffffffffffffff return str(hash_value) def get_chat_data(self, token: str, title: str) -> ChatData: """Get chat data from cache""" key = self._get_key(token, title) data = self.redis.get(key) if data is None: return ChatData(chat_id="", current_message_id="") return ChatData.deserialize(data) def set_chat_data(self, token: str, title: str, data: ChatData) -> None: """Set chat data in cache""" key = self._get_key(token, title) self.redis.set(key, data.serialize()) def close(self): """Close Redis connection""" self.redis.close()