359 lines
14 KiB
Python
359 lines
14 KiB
Python
import json
|
|
import gzip
|
|
import io
|
|
import requests
|
|
import base64
|
|
from typing import Optional, Dict, Any, Callable
|
|
from .models import PowChallenge, CompletionData
|
|
|
|
|
|
class Client:
|
|
# API endpoints
|
|
AUTH_URL = "https://chat.deepseek.com/api/v0/users/login"
|
|
CHAT_CREATE_URL = "https://chat.deepseek.com/api/v0/chat_session/create"
|
|
CHAT_DELETE_URL = "https://chat.deepseek.com/api/v0/chat_session/delete"
|
|
CHAT_EDIT_URL = "https://chat.deepseek.com/api/v0/chat_session/update_title"
|
|
CHAT_LIST_URL = "https://chat.deepseek.com/api/v0/chat_session/fetch_page"
|
|
COMPLETION_URL = "https://chat.deepseek.com/api/v0/chat/completion"
|
|
HISTORY_BASE_URL = "https://chat.deepseek.com/api/v0/chat/history_messages?chat_session_id="
|
|
LOGOUT_URL = "https://chat.deepseek.com/api/v0/users/logout"
|
|
POW_URL = "https://chat.deepseek.com/api/v0/chat/create_pow_challenge"
|
|
PROFILE_URL = "https://chat.deepseek.com/api/v0/users/current"
|
|
QUOTA_URL = "https://chat.deepseek.com/api/v0/users/feature_quota"
|
|
|
|
CHAT_CREATE_BODY = '{"agent":"chat"}'
|
|
|
|
def __init__(self, pow_solver, api_key: str):
|
|
self.api_key = api_key
|
|
self.pow_solver = pow_solver
|
|
self.http_client = requests.Session()
|
|
# Configure TLS with modern user agent
|
|
self.http_client.headers.update({
|
|
'User-Agent': 'DeepSeek/2.0.0 Android/31', # Updated version
|
|
})
|
|
|
|
def _apply_headers(self, req: requests.Request, body_len: int = 0) -> None:
|
|
"""Apply standard headers to request"""
|
|
if 'User-Agent' not in req.headers:
|
|
req.headers['User-Agent'] = "DeepSeek/2.0.0 Android/31"
|
|
req.headers['Content-Type'] = "application/json"
|
|
if body_len > 0:
|
|
req.headers['Content-Length'] = str(body_len)
|
|
if self.api_key:
|
|
req.headers['Authorization'] = f"Bearer {self.api_key}"
|
|
req.headers['Accept'] = "application/json"
|
|
req.headers['Accept-Encoding'] = "gzip"
|
|
req.headers['Accept-Charset'] = "UTF-8"
|
|
req.headers['X-Client-Locale'] = "en_US"
|
|
req.headers['X-Client-Version'] = "2.0.0"
|
|
req.headers['X-Client-Platform'] = "android"
|
|
|
|
def _apply_pow_header(self, req: requests.Request, answer: int, pow: PowChallenge) -> None:
|
|
"""Apply Proof-of-Work header to request"""
|
|
header = {
|
|
"algorithm": pow.algorithm,
|
|
"challenge": pow.challenge,
|
|
"salt": pow.salt,
|
|
"signature": pow.signature,
|
|
"answer": answer,
|
|
"target_path": pow.target_path,
|
|
}
|
|
encoded_header = base64.b64encode(json.dumps(header).encode()).decode()
|
|
req.headers['X-Ds-Pow-Response'] = encoded_header
|
|
|
|
def _unmarshal(self, body: bytes) -> Dict[str, Any]:
|
|
"""Parse gzipped JSON response"""
|
|
try:
|
|
decompressed = gzip.decompress(body)
|
|
except:
|
|
decompressed = body
|
|
return json.loads(decompressed.decode('utf-8'))
|
|
|
|
def _execute(self, url: str, body: str, method: str) -> Dict[str, Any]:
|
|
"""Send a request"""
|
|
if body and method != "GET":
|
|
req = requests.Request(method, url, data=body)
|
|
else:
|
|
req = requests.Request(method, url)
|
|
|
|
prepared = self.http_client.prepare_request(req)
|
|
self._apply_headers(prepared, len(body) if body else 0)
|
|
|
|
print(f"[DEBUG] Sending {method} to {url}")
|
|
resp = self.http_client.send(prepared)
|
|
print(f"[DEBUG] Response status: {resp.status_code}")
|
|
print(f"[DEBUG] Response headers: {resp.headers}")
|
|
|
|
# Check HTTP status code
|
|
if resp.status_code >= 400:
|
|
try:
|
|
error_data = self._unmarshal(resp.content)
|
|
error_msg = error_data.get('msg', error_data.get('message', 'Unknown error'))
|
|
except Exception as parse_err:
|
|
error_msg = resp.text or f"HTTP {resp.status_code}"
|
|
print(f"[DEBUG] Failed to parse error response: {parse_err}")
|
|
print(f"[DEBUG] API Error: {error_msg}")
|
|
raise Exception(f"API Error ({resp.status_code}): {error_msg}")
|
|
|
|
result = self._unmarshal(resp.content)
|
|
print(f"[DEBUG] Response data: {result}")
|
|
|
|
# Check for API-level error codes in the response
|
|
if result and isinstance(result, dict):
|
|
code = result.get('code')
|
|
if code and code != 0: # 0 or missing code = success
|
|
msg = result.get('msg', 'Unknown error')
|
|
print(f"[DEBUG] API returned error code {code}: {msg}")
|
|
raise Exception(f"API Error ({code}): {msg}")
|
|
|
|
return result
|
|
|
|
def create_chat(self) -> str:
|
|
"""Create a new chat session. Returns UUID of a new chat session."""
|
|
data = self._execute(self.CHAT_CREATE_URL, self.CHAT_CREATE_BODY, "POST")
|
|
try:
|
|
if not data or not data.get('data'):
|
|
raise Exception(f"Invalid response structure - missing data field. Response: {data}")
|
|
|
|
biz_data = data['data'].get('biz_data')
|
|
if not biz_data:
|
|
raise Exception(f"Invalid response structure - missing biz_data. Response: {data}")
|
|
|
|
# Try both possible structures
|
|
if 'chat_session' in biz_data:
|
|
return biz_data['chat_session']['id']
|
|
elif 'id' in biz_data:
|
|
return biz_data['id']
|
|
else:
|
|
raise Exception(f"Could not find chat ID in response. biz_data keys: {biz_data.keys()}")
|
|
|
|
except (KeyError, TypeError) as e:
|
|
raise Exception(f"Failed to create chat: invalid response structure - {str(e)}. Response: {data}")
|
|
|
|
def get_all_chats(self) -> list:
|
|
"""Get all chat sessions"""
|
|
data = self._execute(self.CHAT_LIST_URL, "", "GET")
|
|
return data['data']['biz_data']['chat_sessions']
|
|
|
|
def change_title(self, chat_session_id: str, title: str) -> None:
|
|
"""Change chat session title"""
|
|
body = json.dumps({
|
|
"chat_session_id": chat_session_id,
|
|
"title": title,
|
|
})
|
|
self._execute(self.CHAT_EDIT_URL, body, "POST")
|
|
|
|
def delete_chat_session(self, chat_session_id: str) -> None:
|
|
"""Delete a chat session"""
|
|
body = json.dumps({
|
|
"chat_session_id": chat_session_id,
|
|
})
|
|
self._execute(self.CHAT_DELETE_URL, body, "POST")
|
|
|
|
def get_message_history(self, chat_session_id: str) -> Dict[str, Any]:
|
|
"""Get message history for a chat session"""
|
|
data = self._execute(self.HISTORY_BASE_URL + chat_session_id, "", "GET")
|
|
return data['data']['biz_data']
|
|
|
|
def login(self, email: str, password: str, device_id: str) -> str:
|
|
"""Login and get API token"""
|
|
body = json.dumps({
|
|
"email": email,
|
|
"password": password,
|
|
"device_id": device_id,
|
|
"os": "android",
|
|
})
|
|
data = self._execute(self.AUTH_URL, body, "POST")
|
|
return data['data']['biz_data']['user']['token']
|
|
|
|
def logout(self) -> None:
|
|
"""Logout"""
|
|
self._execute(self.LOGOUT_URL, "", "POST")
|
|
|
|
def get_profile(self) -> Dict[str, Any]:
|
|
"""Get user profile"""
|
|
data = self._execute(self.PROFILE_URL, "", "GET")
|
|
return data['data']['biz_data']
|
|
|
|
def get_quota(self) -> Dict[str, Any]:
|
|
"""Get thinking quota"""
|
|
data = self._execute(self.QUOTA_URL, "", "GET")
|
|
return data['data']['biz_data']['thinking']
|
|
|
|
def _get_pow(self, endpoint: str) -> PowChallenge:
|
|
"""Get Proof-of-Work challenge"""
|
|
body = json.dumps({
|
|
"target_path": endpoint,
|
|
})
|
|
data = self._execute(self.POW_URL, body, "POST")
|
|
challenge = data['data']['biz_data']['challenge']
|
|
return PowChallenge(
|
|
algorithm=challenge.get('algorithm', ''),
|
|
challenge=challenge.get('challenge', ''),
|
|
salt=challenge.get('salt', ''),
|
|
signature=challenge.get('signature', ''),
|
|
target_path=challenge.get('target_path', ''),
|
|
expire_at=challenge.get('expire_at', 0),
|
|
)
|
|
|
|
def completion(
|
|
self,
|
|
chat_session_id: str,
|
|
parent_message: str,
|
|
prompt: str,
|
|
think: bool,
|
|
search: bool,
|
|
response_callback: Callable[[str], None],
|
|
) -> None:
|
|
"""Send completion request and stream responses"""
|
|
print(f"[DEBUG] Starting completion for chat {chat_session_id}")
|
|
think = False
|
|
search = False
|
|
|
|
pow = self._get_pow("/api/v0/chat/completion")
|
|
print(f"[DEBUG] Got PoW challenge")
|
|
|
|
answer = self.pow_solver.calculate_hash(
|
|
pow.challenge,
|
|
pow.salt,
|
|
pow.expire_at,
|
|
pow.expire_at,
|
|
)
|
|
print(f"[DEBUG] Calculated PoW answer: {answer}")
|
|
|
|
completion_data = CompletionData(
|
|
chat_session_id=chat_session_id,
|
|
prompt=prompt,
|
|
thinking_enabled=think,
|
|
search_enabled=search,
|
|
parent_message_id=None,
|
|
ref_file_ids=[],
|
|
)
|
|
|
|
if parent_message:
|
|
try:
|
|
completion_data.parent_message_id = int(parent_message)
|
|
except ValueError:
|
|
pass
|
|
|
|
body = json.dumps(completion_data.to_dict())
|
|
print(f"[DEBUG] Completion request body: {body}")
|
|
|
|
req = requests.Request("POST", self.COMPLETION_URL, data=body)
|
|
prepared = self.http_client.prepare_request(req)
|
|
self._apply_headers(prepared, len(body))
|
|
self._apply_pow_header(prepared, int(answer), pow)
|
|
|
|
print(f"[DEBUG] Sending completion request to {self.COMPLETION_URL}")
|
|
print(f"[DEBUG] thinking_enabled={think}, search_enabled={search}")
|
|
resp = self.http_client.send(prepared, stream=True)
|
|
|
|
print(f"[DEBUG] Completion response status: {resp.status_code}")
|
|
print(f"[DEBUG] Completion response headers: {dict(resp.headers)}")
|
|
|
|
if resp.status_code != 200:
|
|
try:
|
|
content = resp.content.decode('utf-8')
|
|
except:
|
|
content = str(resp.content)
|
|
print(f"[DEBUG] Error response: {content}")
|
|
raise Exception(f"Completion failed with status {resp.status_code}: {content}")
|
|
|
|
self._parse_events(resp, response_callback)
|
|
|
|
def _parse_events(self, resp, response_callback: Callable[[str], None]) -> None:
|
|
"""Parse SSE events from response"""
|
|
print(f"[DEBUG] Starting to parse events")
|
|
line_count = 0
|
|
events_received = []
|
|
think = False
|
|
search = False
|
|
raw_events_logged = 0
|
|
|
|
def emit_value(value: Any) -> None:
|
|
if isinstance(value, str):
|
|
if value == "FINISHED":
|
|
return
|
|
response_callback(value)
|
|
return
|
|
|
|
if isinstance(value, list):
|
|
for item in value:
|
|
emit_value(item)
|
|
return
|
|
|
|
if isinstance(value, dict):
|
|
if 'content' in value and isinstance(value['content'], str):
|
|
response_callback(value['content'])
|
|
for nested_value in value.values():
|
|
if isinstance(nested_value, (dict, list)):
|
|
emit_value(nested_value)
|
|
|
|
try:
|
|
for line in resp.iter_lines():
|
|
line_count += 1
|
|
if not line:
|
|
continue
|
|
|
|
line = line.decode('utf-8').strip() if isinstance(line, bytes) else line.strip()
|
|
|
|
if line.startswith("event: "):
|
|
continue
|
|
|
|
raw = line[6:] if line.startswith("data: ") else line
|
|
if not raw:
|
|
continue
|
|
|
|
try:
|
|
data = json.loads(raw)
|
|
except json.JSONDecodeError as e:
|
|
print(f"[DEBUG] Failed to parse JSON on line {line_count}: {e}")
|
|
print(f"[DEBUG] Raw line: {raw}")
|
|
continue
|
|
|
|
if raw_events_logged < 5:
|
|
print(f"[DEBUG] Raw event #{raw_events_logged + 1}: {raw}")
|
|
raw_events_logged += 1
|
|
|
|
p = data.get('p', '')
|
|
v = data.get('v')
|
|
|
|
events_received.append(p)
|
|
print(f"[DEBUG] Parsed event type: {p}, has value: {v is not None}")
|
|
|
|
if p:
|
|
if p == "response/search_status":
|
|
if not search:
|
|
response_callback("\n<thinking>\n")
|
|
search = True
|
|
if p == "response/thinking_content":
|
|
if not search:
|
|
response_callback("\n<thinking>\n")
|
|
response_callback("\n\n")
|
|
think = True
|
|
elif p == "response/content":
|
|
if think:
|
|
response_callback("\n</thinking>\n")
|
|
|
|
if isinstance(v, dict):
|
|
fragments = v.get('response', {}).get('message', {}).get('fragments', [])
|
|
if isinstance(fragments, list) and fragments:
|
|
for fragment in fragments:
|
|
if isinstance(fragment, dict):
|
|
fragment_content = fragment.get('content')
|
|
if isinstance(fragment_content, str):
|
|
response_callback(fragment_content)
|
|
elif fragment_content is not None:
|
|
emit_value(fragment_content)
|
|
else:
|
|
emit_value(v)
|
|
else:
|
|
emit_value(v)
|
|
|
|
print(f"[DEBUG] Finished parsing events. Events received: {events_received}")
|
|
except Exception as e:
|
|
print(f"[DEBUG] Error in _parse_events: {e}")
|
|
print(f"[DEBUG] Line count: {line_count}")
|
|
print(f"[DEBUG] Events received so far: {events_received}")
|
|
raise
|