import json import gzip import io import requests import base64 from typing import Optional, Dict, Any, Callable from .models import PowChallenge, CompletionData class Client: # API endpoints AUTH_URL = "https://chat.deepseek.com/api/v0/users/login" CHAT_CREATE_URL = "https://chat.deepseek.com/api/v0/chat_session/create" CHAT_DELETE_URL = "https://chat.deepseek.com/api/v0/chat_session/delete" CHAT_EDIT_URL = "https://chat.deepseek.com/api/v0/chat_session/update_title" CHAT_LIST_URL = "https://chat.deepseek.com/api/v0/chat_session/fetch_page" COMPLETION_URL = "https://chat.deepseek.com/api/v0/chat/completion" HISTORY_BASE_URL = "https://chat.deepseek.com/api/v0/chat/history_messages?chat_session_id=" LOGOUT_URL = "https://chat.deepseek.com/api/v0/users/logout" POW_URL = "https://chat.deepseek.com/api/v0/chat/create_pow_challenge" PROFILE_URL = "https://chat.deepseek.com/api/v0/users/current" QUOTA_URL = "https://chat.deepseek.com/api/v0/users/feature_quota" CHAT_CREATE_BODY = '{"agent":"chat"}' def __init__(self, pow_solver, api_key: str): self.api_key = api_key self.pow_solver = pow_solver self.http_client = requests.Session() # Configure TLS with modern user agent self.http_client.headers.update({ 'User-Agent': 'DeepSeek/2.0.0 Android/31', # Updated version }) def _apply_headers(self, req: requests.Request, body_len: int = 0) -> None: """Apply standard headers to request""" if 'User-Agent' not in req.headers: req.headers['User-Agent'] = "DeepSeek/2.0.0 Android/31" req.headers['Content-Type'] = "application/json" if body_len > 0: req.headers['Content-Length'] = str(body_len) if self.api_key: req.headers['Authorization'] = f"Bearer {self.api_key}" req.headers['Accept'] = "application/json" req.headers['Accept-Encoding'] = "gzip" req.headers['Accept-Charset'] = "UTF-8" req.headers['X-Client-Locale'] = "en_US" req.headers['X-Client-Version'] = "2.0.0" req.headers['X-Client-Platform'] = "android" def _apply_pow_header(self, req: requests.Request, answer: int, pow: PowChallenge) -> None: """Apply Proof-of-Work header to request""" header = { "algorithm": pow.algorithm, "challenge": pow.challenge, "salt": pow.salt, "signature": pow.signature, "answer": answer, "target_path": pow.target_path, } encoded_header = base64.b64encode(json.dumps(header).encode()).decode() req.headers['X-Ds-Pow-Response'] = encoded_header def _unmarshal(self, body: bytes) -> Dict[str, Any]: """Parse gzipped JSON response""" try: decompressed = gzip.decompress(body) except: decompressed = body return json.loads(decompressed.decode('utf-8')) def _execute(self, url: str, body: str, method: str) -> Dict[str, Any]: """Send a request""" if body and method != "GET": req = requests.Request(method, url, data=body) else: req = requests.Request(method, url) prepared = self.http_client.prepare_request(req) self._apply_headers(prepared, len(body) if body else 0) print(f"[DEBUG] Sending {method} to {url}") resp = self.http_client.send(prepared) print(f"[DEBUG] Response status: {resp.status_code}") print(f"[DEBUG] Response headers: {resp.headers}") # Check HTTP status code if resp.status_code >= 400: try: error_data = self._unmarshal(resp.content) error_msg = error_data.get('msg', error_data.get('message', 'Unknown error')) except Exception as parse_err: error_msg = resp.text or f"HTTP {resp.status_code}" print(f"[DEBUG] Failed to parse error response: {parse_err}") print(f"[DEBUG] API Error: {error_msg}") raise Exception(f"API Error ({resp.status_code}): {error_msg}") result = self._unmarshal(resp.content) print(f"[DEBUG] Response data: {result}") # Check for API-level error codes in the response if result and isinstance(result, dict): code = result.get('code') if code and code != 0: # 0 or missing code = success msg = result.get('msg', 'Unknown error') print(f"[DEBUG] API returned error code {code}: {msg}") raise Exception(f"API Error ({code}): {msg}") return result def create_chat(self) -> str: """Create a new chat session. Returns UUID of a new chat session.""" data = self._execute(self.CHAT_CREATE_URL, self.CHAT_CREATE_BODY, "POST") try: if not data or not data.get('data'): raise Exception(f"Invalid response structure - missing data field. Response: {data}") biz_data = data['data'].get('biz_data') if not biz_data: raise Exception(f"Invalid response structure - missing biz_data. Response: {data}") # Try both possible structures if 'chat_session' in biz_data: return biz_data['chat_session']['id'] elif 'id' in biz_data: return biz_data['id'] else: raise Exception(f"Could not find chat ID in response. biz_data keys: {biz_data.keys()}") except (KeyError, TypeError) as e: raise Exception(f"Failed to create chat: invalid response structure - {str(e)}. Response: {data}") def get_all_chats(self) -> list: """Get all chat sessions""" data = self._execute(self.CHAT_LIST_URL, "", "GET") return data['data']['biz_data']['chat_sessions'] def change_title(self, chat_session_id: str, title: str) -> None: """Change chat session title""" body = json.dumps({ "chat_session_id": chat_session_id, "title": title, }) self._execute(self.CHAT_EDIT_URL, body, "POST") def delete_chat_session(self, chat_session_id: str) -> None: """Delete a chat session""" body = json.dumps({ "chat_session_id": chat_session_id, }) self._execute(self.CHAT_DELETE_URL, body, "POST") def get_message_history(self, chat_session_id: str) -> Dict[str, Any]: """Get message history for a chat session""" data = self._execute(self.HISTORY_BASE_URL + chat_session_id, "", "GET") return data['data']['biz_data'] def login(self, email: str, password: str, device_id: str) -> str: """Login and get API token""" body = json.dumps({ "email": email, "password": password, "device_id": device_id, "os": "android", }) data = self._execute(self.AUTH_URL, body, "POST") return data['data']['biz_data']['user']['token'] def logout(self) -> None: """Logout""" self._execute(self.LOGOUT_URL, "", "POST") def get_profile(self) -> Dict[str, Any]: """Get user profile""" data = self._execute(self.PROFILE_URL, "", "GET") return data['data']['biz_data'] def get_quota(self) -> Dict[str, Any]: """Get thinking quota""" data = self._execute(self.QUOTA_URL, "", "GET") return data['data']['biz_data']['thinking'] def _get_pow(self, endpoint: str) -> PowChallenge: """Get Proof-of-Work challenge""" body = json.dumps({ "target_path": endpoint, }) data = self._execute(self.POW_URL, body, "POST") challenge = data['data']['biz_data']['challenge'] return PowChallenge( algorithm=challenge.get('algorithm', ''), challenge=challenge.get('challenge', ''), salt=challenge.get('salt', ''), signature=challenge.get('signature', ''), target_path=challenge.get('target_path', ''), expire_at=challenge.get('expire_at', 0), ) def completion( self, chat_session_id: str, parent_message: str, prompt: str, think: bool, search: bool, response_callback: Callable[[str], None], ) -> None: """Send completion request and stream responses""" print(f"[DEBUG] Starting completion for chat {chat_session_id}") think = False search = False pow = self._get_pow("/api/v0/chat/completion") print(f"[DEBUG] Got PoW challenge") answer = self.pow_solver.calculate_hash( pow.challenge, pow.salt, pow.expire_at, pow.expire_at, ) print(f"[DEBUG] Calculated PoW answer: {answer}") completion_data = CompletionData( chat_session_id=chat_session_id, prompt=prompt, thinking_enabled=think, search_enabled=search, parent_message_id=None, ref_file_ids=[], ) if parent_message: try: completion_data.parent_message_id = int(parent_message) except ValueError: pass body = json.dumps(completion_data.to_dict()) print(f"[DEBUG] Completion request body: {body}") req = requests.Request("POST", self.COMPLETION_URL, data=body) prepared = self.http_client.prepare_request(req) self._apply_headers(prepared, len(body)) self._apply_pow_header(prepared, int(answer), pow) print(f"[DEBUG] Sending completion request to {self.COMPLETION_URL}") print(f"[DEBUG] thinking_enabled={think}, search_enabled={search}") resp = self.http_client.send(prepared, stream=True) print(f"[DEBUG] Completion response status: {resp.status_code}") print(f"[DEBUG] Completion response headers: {dict(resp.headers)}") if resp.status_code != 200: try: content = resp.content.decode('utf-8') except: content = str(resp.content) print(f"[DEBUG] Error response: {content}") raise Exception(f"Completion failed with status {resp.status_code}: {content}") self._parse_events(resp, response_callback) def _parse_events(self, resp, response_callback: Callable[[str], None]) -> None: """Parse SSE events from response""" print(f"[DEBUG] Starting to parse events") line_count = 0 events_received = [] think = False search = False raw_events_logged = 0 def emit_value(value: Any) -> None: if isinstance(value, str): if value == "FINISHED": return response_callback(value) return if isinstance(value, list): for item in value: emit_value(item) return if isinstance(value, dict): if 'content' in value and isinstance(value['content'], str): response_callback(value['content']) for nested_value in value.values(): if isinstance(nested_value, (dict, list)): emit_value(nested_value) try: for line in resp.iter_lines(): line_count += 1 if not line: continue line = line.decode('utf-8').strip() if isinstance(line, bytes) else line.strip() if line.startswith("event: "): continue raw = line[6:] if line.startswith("data: ") else line if not raw: continue try: data = json.loads(raw) except json.JSONDecodeError as e: print(f"[DEBUG] Failed to parse JSON on line {line_count}: {e}") print(f"[DEBUG] Raw line: {raw}") continue if raw_events_logged < 5: print(f"[DEBUG] Raw event #{raw_events_logged + 1}: {raw}") raw_events_logged += 1 p = data.get('p', '') v = data.get('v') events_received.append(p) print(f"[DEBUG] Parsed event type: {p}, has value: {v is not None}") if p: if p == "response/search_status": if not search: response_callback("\n\n") search = True if p == "response/thinking_content": if not search: response_callback("\n\n") response_callback("\n\n") think = True elif p == "response/content": if think: response_callback("\n\n") if isinstance(v, dict): fragments = v.get('response', {}).get('message', {}).get('fragments', []) if isinstance(fragments, list) and fragments: for fragment in fragments: if isinstance(fragment, dict): fragment_content = fragment.get('content') if isinstance(fragment_content, str): response_callback(fragment_content) elif fragment_content is not None: emit_value(fragment_content) else: emit_value(v) else: emit_value(v) print(f"[DEBUG] Finished parsing events. Events received: {events_received}") except Exception as e: print(f"[DEBUG] Error in _parse_events: {e}") print(f"[DEBUG] Line count: {line_count}") print(f"[DEBUG] Events received so far: {events_received}") raise