2183 lines
84 KiB
Python
2183 lines
84 KiB
Python
from __future__ import annotations
|
||
|
||
import json
|
||
import re
|
||
from typing import Any
|
||
from uuid import UUID
|
||
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.models import Pipeline, PipelineStatus
|
||
from app.services.capability_service import CapabilityService
|
||
from app.services.dialog_memory import DialogMemoryService
|
||
from app.services.semantic_selection import SelectedCapability, SemanticSelectionService
|
||
from app.utils.ollama_client import chat_json, reset_model_session
|
||
|
||
|
||
class PipelineServiceError(Exception):
|
||
pass
|
||
|
||
|
||
class PipelineService:
|
||
# Clarification loop is disabled: service should attempt a full graph in one shot.
|
||
LOW_CONFIDENCE_MAX_QUESTIONS = 0
|
||
LOW_CONFIDENCE_QUESTION_MARKER = "нужно уточнить цель, чтобы построить точный сценарий"
|
||
LOW_CONFIDENCE_DIALOG_MARKER = "[[low_confidence_question]]"
|
||
STRICT_CAPABILITY_ISSUES = {
|
||
"graph:invalid_capability_ref",
|
||
"graph:missing_capability_ref",
|
||
}
|
||
|
||
def __init__(self, session: AsyncSession) -> None:
|
||
self.session = session
|
||
self.capability_service = CapabilityService(session)
|
||
self.semantic_selector = SemanticSelectionService()
|
||
self.dialog_memory = DialogMemoryService()
|
||
|
||
async def reset_dialog(self, dialog_id: UUID) -> dict[str, Any]:
|
||
await self.dialog_memory.reset(str(dialog_id))
|
||
return {
|
||
"status": "ok",
|
||
"message_ru": "Диалог сброшен.",
|
||
}
|
||
|
||
async def generate(
|
||
self,
|
||
*,
|
||
dialog_id: UUID,
|
||
message: str,
|
||
user_id: UUID | None = None,
|
||
capability_ids: list[UUID] | None = None,
|
||
previous_pipeline_id: UUID | None = None,
|
||
) -> dict[str, Any]:
|
||
dialog_messages, dialog_summary = await self.dialog_memory.get_context(
|
||
str(dialog_id)
|
||
)
|
||
|
||
if capability_ids:
|
||
try:
|
||
capabilities = await self.capability_service.get_capabilities(
|
||
capability_ids=capability_ids,
|
||
owner_user_id=user_id,
|
||
include_all=False,
|
||
)
|
||
except TypeError:
|
||
# Backward-compatible path for simplified test doubles.
|
||
capabilities = await self.capability_service.get_capabilities(
|
||
capability_ids=capability_ids,
|
||
)
|
||
if len(capabilities) != len(set(capability_ids)):
|
||
return {
|
||
"status": "needs_input",
|
||
"message_ru": "Часть выбранных capabilities недоступна для этого пользователя.",
|
||
"chat_reply_ru": "Некоторые capabilities вам не принадлежат или были удалены. Выберите доступные.",
|
||
"nodes": [],
|
||
"edges": [],
|
||
"context_summary": None,
|
||
}
|
||
selected_capabilities = [
|
||
SelectedCapability(capability=c, score=1.0, confidence_tier="high")
|
||
for c in capabilities
|
||
]
|
||
else:
|
||
selection_query = self._build_selection_query(
|
||
message=message,
|
||
dialog_messages=dialog_messages,
|
||
dialog_summary=dialog_summary,
|
||
)
|
||
selected_capabilities = await self.semantic_selector.select_capabilities(
|
||
self.session,
|
||
selection_query,
|
||
owner_user_id=user_id,
|
||
limit=10,
|
||
)
|
||
|
||
previous_pipeline: Pipeline | None = None
|
||
previous_nodes: list[dict[str, Any]] = []
|
||
previous_edges: list[dict[str, Any]] = []
|
||
if previous_pipeline_id is not None:
|
||
candidate = await self.session.get(Pipeline, previous_pipeline_id)
|
||
if candidate is not None and (
|
||
user_id is None or candidate.created_by in (None, user_id)
|
||
):
|
||
previous_pipeline = candidate
|
||
previous_nodes = (
|
||
candidate.nodes
|
||
if isinstance(candidate.nodes, list)
|
||
else []
|
||
)
|
||
previous_edges = (
|
||
candidate.edges
|
||
if isinstance(candidate.edges, list)
|
||
else []
|
||
)
|
||
|
||
if not selected_capabilities:
|
||
return {
|
||
"status": "needs_input",
|
||
"message_ru": "Не удалось найти доступные инструменты. Загрузите OpenAPI/Swagger.",
|
||
"chat_reply_ru": (
|
||
"Для вашего аккаунта пока нет capabilities. "
|
||
"Загрузите OpenAPI/Swagger, чтобы я смог собрать исполнимый pipeline."
|
||
),
|
||
"nodes": [],
|
||
"edges": [],
|
||
"missing_requirements": ["selection:no_matches"],
|
||
"context_summary": dialog_summary,
|
||
}
|
||
|
||
prompt = self._build_generation_prompt(
|
||
user_query=message,
|
||
selected_capabilities=selected_capabilities,
|
||
dialog_messages=dialog_messages,
|
||
dialog_summary=dialog_summary,
|
||
previous_nodes=previous_nodes,
|
||
previous_edges=previous_edges,
|
||
)
|
||
|
||
try:
|
||
raw_graph = self.generate_raw_graph(message, selected_capabilities, prompt)
|
||
except Exception:
|
||
raw_graph = self._build_minimal_raw_graph(selected_capabilities)
|
||
if raw_graph is None:
|
||
return {
|
||
"status": "cannot_build",
|
||
"message_ru": "Не удалось построить сценарий. Нет доступной исполнимой capability.",
|
||
"chat_reply_ru": "Не удалось построить сценарий. Попробуйте уточнить запрос.",
|
||
"nodes": [],
|
||
"edges": [],
|
||
"context_summary": dialog_summary,
|
||
}
|
||
|
||
normalized_nodes, normalized_edges, is_ready, missing = self._prepare_graph(
|
||
raw_graph=raw_graph,
|
||
selected_capabilities=selected_capabilities,
|
||
)
|
||
chat_reply = self._build_chat_reply_ru(normalized_nodes, normalized_edges)
|
||
has_strict_capability_issues = self._has_strict_capability_issues(missing)
|
||
|
||
if not is_ready and not has_strict_capability_issues:
|
||
fallback_raw_graph = self._build_minimal_raw_graph(selected_capabilities)
|
||
if fallback_raw_graph is not None:
|
||
fallback_nodes, fallback_edges, fallback_ready, fallback_missing = self._prepare_graph(
|
||
raw_graph=fallback_raw_graph,
|
||
selected_capabilities=selected_capabilities,
|
||
)
|
||
if fallback_ready:
|
||
normalized_nodes = fallback_nodes
|
||
normalized_edges = fallback_edges
|
||
is_ready = True
|
||
missing = []
|
||
chat_reply = self._build_chat_reply_ru(normalized_nodes, normalized_edges)
|
||
else:
|
||
missing = fallback_missing
|
||
|
||
if not is_ready:
|
||
if has_strict_capability_issues:
|
||
chat_reply = (
|
||
"Не удалось безопасно собрать сценарий: модель вернула шаги "
|
||
"без подтвержденных capability_id. Уточните задачу и повторите запрос."
|
||
)
|
||
message_ru = (
|
||
"Сценарий отклонен: обнаружены неподтвержденные ссылки на capability."
|
||
)
|
||
else:
|
||
message_ru = "Сценарий не готов к выполнению. Не хватает входных данных."
|
||
await self.dialog_memory.append_and_summarize(
|
||
str(dialog_id), "user", message
|
||
)
|
||
await self.dialog_memory.append_and_summarize(
|
||
str(dialog_id), "assistant", chat_reply
|
||
)
|
||
return {
|
||
"status": "cannot_build",
|
||
"message_ru": message_ru,
|
||
"chat_reply_ru": chat_reply,
|
||
"nodes": normalized_nodes,
|
||
"edges": normalized_edges,
|
||
"missing_requirements": missing,
|
||
"context_summary": dialog_summary,
|
||
}
|
||
|
||
if previous_pipeline is not None:
|
||
previous_pipeline.name = self._build_pipeline_name(message)
|
||
previous_pipeline.description = None
|
||
previous_pipeline.user_prompt = message
|
||
previous_pipeline.nodes = normalized_nodes
|
||
previous_pipeline.edges = normalized_edges
|
||
previous_pipeline.status = PipelineStatus.READY
|
||
if previous_pipeline.created_by is None:
|
||
previous_pipeline.created_by = user_id
|
||
pipeline = previous_pipeline
|
||
else:
|
||
pipeline = Pipeline(
|
||
name=self._build_pipeline_name(message),
|
||
description=None,
|
||
user_prompt=message,
|
||
nodes=normalized_nodes,
|
||
edges=normalized_edges,
|
||
status=PipelineStatus.READY,
|
||
created_by=user_id,
|
||
)
|
||
self.session.add(pipeline)
|
||
await self.session.flush()
|
||
await self.session.refresh(pipeline)
|
||
await self.session.commit()
|
||
|
||
await self.dialog_memory.append_and_summarize(str(dialog_id), "user", message)
|
||
await self.dialog_memory.append_and_summarize(
|
||
str(dialog_id), "assistant", chat_reply
|
||
)
|
||
|
||
return {
|
||
"status": "ready",
|
||
"message_ru": "Сценарий готов.",
|
||
"chat_reply_ru": chat_reply,
|
||
"pipeline_id": pipeline.id,
|
||
"nodes": normalized_nodes,
|
||
"edges": normalized_edges,
|
||
"context_summary": dialog_summary,
|
||
}
|
||
|
||
def _prepare_graph(
|
||
self,
|
||
*,
|
||
raw_graph: dict[str, Any],
|
||
selected_capabilities: list[SelectedCapability],
|
||
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], bool, list[str]]:
|
||
normalized_nodes, normalized_edges, normalization_issues = self._normalize_workflow(
|
||
raw_graph, selected_capabilities
|
||
)
|
||
normalized_edges = self._repair_edges_with_data_flow(normalized_nodes, normalized_edges)
|
||
normalized_nodes, normalized_edges = self._compact_step_sequence(
|
||
normalized_nodes,
|
||
normalized_edges,
|
||
)
|
||
self._sync_node_connections(normalized_nodes, normalized_edges)
|
||
self._ensure_external_inputs(normalized_nodes, normalized_edges)
|
||
|
||
reviewed_nodes, reviewed_edges = self._review_graph_with_llm(
|
||
normalized_nodes,
|
||
normalized_edges,
|
||
selected_capabilities,
|
||
)
|
||
reviewed_edges = self._repair_edges_with_data_flow(reviewed_nodes, reviewed_edges)
|
||
reviewed_edges = self._prune_edges_for_terminal_goal(reviewed_nodes, reviewed_edges)
|
||
reviewed_edges = self._prune_edges_by_required_inputs(reviewed_nodes, reviewed_edges)
|
||
reviewed_nodes, reviewed_edges = self._prune_disconnected_nodes(
|
||
reviewed_nodes,
|
||
reviewed_edges,
|
||
)
|
||
reviewed_nodes, reviewed_edges = self._compact_step_sequence(
|
||
reviewed_nodes,
|
||
reviewed_edges,
|
||
)
|
||
self._sync_node_connections(reviewed_nodes, reviewed_edges)
|
||
self._ensure_external_inputs(reviewed_nodes, reviewed_edges)
|
||
is_ready, missing = self._validate_ready_graph(reviewed_nodes, reviewed_edges)
|
||
if normalization_issues:
|
||
missing = sorted(set(missing + normalization_issues))
|
||
is_ready = False
|
||
return reviewed_nodes, reviewed_edges, is_ready, missing
|
||
|
||
def _compact_step_sequence(
|
||
self,
|
||
nodes: list[dict[str, Any]],
|
||
edges: list[dict[str, Any]],
|
||
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
|
||
step_values = sorted(
|
||
{
|
||
step
|
||
for node in nodes
|
||
if isinstance((step := node.get("step")), int)
|
||
}
|
||
)
|
||
if not step_values:
|
||
return nodes, edges
|
||
|
||
target = list(range(1, len(step_values) + 1))
|
||
if step_values == target:
|
||
return nodes, edges
|
||
|
||
step_map = {
|
||
old_step: new_step
|
||
for new_step, old_step in enumerate(step_values, start=1)
|
||
}
|
||
|
||
compact_nodes: list[dict[str, Any]] = []
|
||
for node in nodes:
|
||
step = node.get("step")
|
||
if not isinstance(step, int) or step not in step_map:
|
||
continue
|
||
|
||
compact_node = dict(node)
|
||
compact_node["step"] = step_map[step]
|
||
compact_node["input_connected_from"] = sorted(
|
||
{
|
||
step_map[src]
|
||
for src in self._normalize_int_list(node.get("input_connected_from"))
|
||
if src in step_map
|
||
}
|
||
)
|
||
compact_node["output_connected_to"] = sorted(
|
||
{
|
||
step_map[dst]
|
||
for dst in self._normalize_int_list(node.get("output_connected_to"))
|
||
if dst in step_map
|
||
}
|
||
)
|
||
|
||
compact_input_types: list[dict[str, Any]] = []
|
||
for item in self._normalize_input_data_types(
|
||
node.get("input_data_type_from_previous")
|
||
):
|
||
from_step = item.get("from_step")
|
||
edge_type = item.get("type")
|
||
if (
|
||
isinstance(from_step, int)
|
||
and from_step in step_map
|
||
and isinstance(edge_type, str)
|
||
):
|
||
compact_input_types.append(
|
||
{
|
||
"from_step": step_map[from_step],
|
||
"type": edge_type,
|
||
}
|
||
)
|
||
compact_node["input_data_type_from_previous"] = compact_input_types
|
||
compact_nodes.append(compact_node)
|
||
|
||
compact_edges: list[dict[str, Any]] = []
|
||
seen_edges: set[tuple[int, int, str]] = set()
|
||
for edge in edges:
|
||
src = edge.get("from_step")
|
||
dst = edge.get("to_step")
|
||
edge_type = edge.get("type")
|
||
if (
|
||
not isinstance(src, int)
|
||
or not isinstance(dst, int)
|
||
or src not in step_map
|
||
or dst not in step_map
|
||
or not isinstance(edge_type, str)
|
||
or not edge_type.strip()
|
||
):
|
||
continue
|
||
remapped = (step_map[src], step_map[dst], edge_type.strip())
|
||
if remapped[0] == remapped[1] or remapped in seen_edges:
|
||
continue
|
||
seen_edges.add(remapped)
|
||
compact_edges.append(
|
||
{
|
||
"from_step": remapped[0],
|
||
"to_step": remapped[1],
|
||
"type": remapped[2],
|
||
}
|
||
)
|
||
|
||
compact_nodes.sort(key=lambda item: item.get("step", 0))
|
||
compact_edges.sort(
|
||
key=lambda item: (
|
||
item.get("from_step", 0),
|
||
item.get("to_step", 0),
|
||
str(item.get("type", "")),
|
||
)
|
||
)
|
||
return compact_nodes, compact_edges
|
||
|
||
def _build_minimal_raw_graph(
|
||
self,
|
||
selected_capabilities: list[SelectedCapability],
|
||
) -> dict[str, Any] | None:
|
||
for item in selected_capabilities:
|
||
cap = item.capability
|
||
cap_id = getattr(cap, "id", None)
|
||
if cap_id is None:
|
||
continue
|
||
cap_type = self._capability_type_value(cap)
|
||
action_id = getattr(cap, "action_id", None)
|
||
if cap_type == "ATOMIC" and action_id is None:
|
||
continue
|
||
if cap_type == "COMPOSITE" and not self._recipe_is_executable(
|
||
getattr(cap, "recipe", None)
|
||
):
|
||
continue
|
||
required_inputs = self._extract_required_inputs(
|
||
getattr(cap, "input_schema", None)
|
||
)
|
||
return {
|
||
"nodes": [
|
||
{
|
||
"step": 1,
|
||
"name": str(getattr(cap, "name", "Step 1") or "Step 1"),
|
||
"description": getattr(cap, "description", None),
|
||
"capability_id": str(cap_id),
|
||
"input_connected_from": [],
|
||
"output_connected_to": [],
|
||
"input_data_type_from_previous": [],
|
||
"external_inputs": required_inputs,
|
||
}
|
||
],
|
||
"edges": [],
|
||
}
|
||
return None
|
||
|
||
def generate_raw_graph(
|
||
self,
|
||
user_query: str,
|
||
selected_capabilities: list[SelectedCapability],
|
||
prompt: str,
|
||
) -> dict[str, Any]:
|
||
system_prompt = (
|
||
"You are Qwen2.5-Coder (7B) building executable workflow DAGs. "
|
||
"Output MUST be a single valid JSON object only. "
|
||
"No markdown, no comments, no extra keys, no prose."
|
||
)
|
||
reset_model_session()
|
||
payload = chat_json(system_prompt=system_prompt, user_prompt=prompt)
|
||
if not isinstance(payload, dict):
|
||
raise PipelineServiceError("Failed to call Ollama")
|
||
return payload
|
||
|
||
def _build_generation_prompt(
|
||
self,
|
||
*,
|
||
user_query: str,
|
||
selected_capabilities: list[SelectedCapability],
|
||
dialog_messages: list[dict[str, Any]],
|
||
dialog_summary: str | None,
|
||
previous_nodes: list[dict[str, Any]] | None = None,
|
||
previous_edges: list[dict[str, Any]] | None = None,
|
||
) -> str:
|
||
capabilities_payload = []
|
||
allowed_capability_ids: list[str] = []
|
||
for sc in selected_capabilities:
|
||
cap = sc.capability
|
||
capabilities_payload.append(self._build_capability_prompt_payload(cap))
|
||
cap_id = str(getattr(cap, "id", "") or "").strip()
|
||
if cap_id:
|
||
allowed_capability_ids.append(cap_id)
|
||
|
||
context_payload = {
|
||
"summary": dialog_summary,
|
||
"recent_messages": dialog_messages[-6:],
|
||
"previous_graph": {
|
||
"nodes": previous_nodes or [],
|
||
"edges": previous_edges or [],
|
||
},
|
||
}
|
||
|
||
instruction = (
|
||
"MODEL_PROFILE: qwen2.5:7b-coder\n"
|
||
"TASK: Build an executable DAG pipeline from USER_QUERY, CONTEXT and CAPABILITIES.\n"
|
||
"LANGUAGE: Keep values human-readable; structure and keys must follow OUTPUT_SCHEMA.\n\n"
|
||
"HARD_RULES:\n"
|
||
"1) Return ONLY a single JSON object matching OUTPUT_SCHEMA.\n"
|
||
"2) Graph must be a DAG (no cycles, no self-links).\n"
|
||
"3) Use ONLY capability_id values from ALLOWED_CAPABILITY_IDS.\n"
|
||
"4) Never replace capability_id with name/path/operation_id/action_id.\n"
|
||
" Никогда не подменяй capability_id значениями name/path/operation_id/action_id.\n"
|
||
"5) Edges must represent data-flow, not just chronological order.\n"
|
||
"6) For each edge from_step->to_step:\n"
|
||
" - to_step must be in from_step.output_connected_to\n"
|
||
" - from_step must be in to_step.input_connected_from\n"
|
||
"7) COMPOSITE capability must remain one node (do not expand substeps).\n"
|
||
"8) If PREVIOUS_GRAPH is non-empty, edit it in-place and keep unchanged valid parts.\n"
|
||
"9) If exact capability choice is impossible, return empty graph: {\"nodes\": [], \"edges\": []}.\n\n"
|
||
"MERGE_PATTERN_EXAMPLE:\n"
|
||
"- Step 1 produce users\n"
|
||
"- Step 2 produce hotels\n"
|
||
"- Step 3 consumes users and hotels\n"
|
||
"Expected edges:\n"
|
||
'- (1->3, type=\"users\"), (2->3, type=\"hotels\")\n'
|
||
"Expected links:\n"
|
||
"- node[3].input_connected_from = [1,2]\n"
|
||
"- node[1].output_connected_to contains 3\n"
|
||
"- node[2].output_connected_to contains 3\n\n"
|
||
"TARGET_LINEAR_PATTERN_IF_RELEVANT:\n"
|
||
"- Step 1: get recently active users\n"
|
||
"- Step 2: get top hotels\n"
|
||
"- Step 3: segment users by hotel interests (consumes 1+2)\n"
|
||
"- Step 4: assign specific hotels to users (consumes 3)\n"
|
||
"- Step 5: send personalized offers to users (consumes 4)\n"
|
||
"- Step 6: evaluate lead quality (consumes 5 and/or 4)\n"
|
||
)
|
||
|
||
return (
|
||
f"{instruction}\n\n"
|
||
f"USER_QUERY:\n{user_query}\n\n"
|
||
f"DIALOG_CONTEXT:\n{json.dumps(context_payload, ensure_ascii=False)}\n\n"
|
||
f"ALLOWED_CAPABILITY_IDS:\n{json.dumps(allowed_capability_ids, ensure_ascii=False)}\n\n"
|
||
f"CAPABILITIES:\n{json.dumps(capabilities_payload, ensure_ascii=False)}\n\n"
|
||
"OUTPUT_SCHEMA:\n"
|
||
"{\n"
|
||
' "nodes": [\n'
|
||
" {\n"
|
||
' "step": 1,\n'
|
||
' "name": "Step name",\n'
|
||
' "capability_id": "UUID from CAPABILITIES",\n'
|
||
' "description": "Step purpose",\n'
|
||
' "input_connected_from": [],\n'
|
||
' "output_connected_to": [],\n'
|
||
' "input_data_type_from_previous": [],\n'
|
||
' "external_inputs": []\n'
|
||
" }\n"
|
||
" ],\n"
|
||
' "edges": [\n'
|
||
' {"from_step": 1, "to_step": 2, "type": "field_name"}\n'
|
||
" ]\n"
|
||
"}\n\n"
|
||
"SELF-CHECK (INTERNAL ONLY):\n"
|
||
"- Verify every node.capability_id is in ALLOWED_CAPABILITY_IDS.\n"
|
||
"- Verify every required input has either upstream edge or external_inputs.\n"
|
||
"- Verify edges and node links are synchronized.\n"
|
||
"- Output final JSON only.\n"
|
||
)
|
||
|
||
def _has_strict_capability_issues(self, issues: list[str]) -> bool:
|
||
return any(issue in self.STRICT_CAPABILITY_ISSUES for issue in issues)
|
||
|
||
def _build_selection_query(
|
||
self,
|
||
*,
|
||
message: str,
|
||
dialog_messages: list[dict[str, Any]],
|
||
dialog_summary: str | None,
|
||
) -> str:
|
||
recent_chunks = [
|
||
str(item.get("content", ""))
|
||
for item in dialog_messages[-4:]
|
||
if isinstance(item, dict) and item.get("role") == "user"
|
||
]
|
||
parts = [str(message or "").strip()]
|
||
if dialog_summary:
|
||
parts.append(self._strip_low_confidence_marker(dialog_summary))
|
||
parts.extend(chunk for chunk in recent_chunks if chunk)
|
||
return "\n".join(part for part in parts if part)
|
||
|
||
def _selection_is_low_confidence(
|
||
self, selected_capabilities: list[SelectedCapability]
|
||
) -> bool:
|
||
if not selected_capabilities:
|
||
return False
|
||
return str(selected_capabilities[0].confidence_tier).lower() == "low"
|
||
|
||
def _build_low_confidence_question_ru(
|
||
self,
|
||
*,
|
||
question_number: int = 1,
|
||
message: str = "",
|
||
dialog_messages: list[dict[str, Any]] | None = None,
|
||
selected_capabilities: list[SelectedCapability] | None = None,
|
||
) -> str:
|
||
llm_question = self._generate_clarification_question_ru(
|
||
question_number=question_number,
|
||
message=message,
|
||
dialog_messages=dialog_messages or [],
|
||
selected_capabilities=selected_capabilities or [],
|
||
)
|
||
if llm_question:
|
||
return llm_question
|
||
|
||
outcome_question = (
|
||
"Нужно уточнить цель, чтобы построить точный сценарий. "
|
||
"Какой финальный бизнес-результат нужен: сегмент, рассылка, "
|
||
"обновление CRM или отчёт?"
|
||
)
|
||
if question_number <= 1 or not selected_capabilities:
|
||
return outcome_question
|
||
|
||
context_tokens = self._collect_user_context_tokens(
|
||
message=message,
|
||
dialog_messages=dialog_messages or [],
|
||
)
|
||
missing_inputs = self._collect_missing_required_inputs(
|
||
selected_capabilities=selected_capabilities,
|
||
context_tokens=context_tokens,
|
||
limit=3,
|
||
)
|
||
if not missing_inputs:
|
||
return (
|
||
"Нужно уточнить входные ограничения для точного графа. "
|
||
"Укажите: кого выбираем, по каким фильтрам, и какие поля обязательны в результате."
|
||
)
|
||
|
||
humanized_inputs = ", ".join(self._humanize_input_name(name) for name in missing_inputs)
|
||
return (
|
||
"Нужно уточнить цель, чтобы построить точный сценарий. "
|
||
f"Чтобы собрать исполнимый граф, уточните входные данные: {humanized_inputs}."
|
||
)
|
||
|
||
def _generate_clarification_question_ru(
|
||
self,
|
||
*,
|
||
question_number: int,
|
||
message: str,
|
||
dialog_messages: list[dict[str, Any]],
|
||
selected_capabilities: list[SelectedCapability],
|
||
) -> str | None:
|
||
if not selected_capabilities:
|
||
return None
|
||
|
||
user_messages = [
|
||
str(item.get("content", ""))
|
||
for item in dialog_messages[-8:]
|
||
if isinstance(item, dict)
|
||
and str(item.get("role", "")).lower() == "user"
|
||
]
|
||
previous_questions = [
|
||
self._strip_low_confidence_marker(str(item.get("content", "")))
|
||
for item in dialog_messages[-8:]
|
||
if isinstance(item, dict)
|
||
and str(item.get("role", "")).lower() == "assistant"
|
||
and self._is_low_confidence_question(str(item.get("content", "")))
|
||
]
|
||
|
||
context_tokens = self._collect_user_context_tokens(
|
||
message=message,
|
||
dialog_messages=dialog_messages,
|
||
)
|
||
missing_inputs = self._collect_missing_required_inputs(
|
||
selected_capabilities=selected_capabilities,
|
||
context_tokens=context_tokens,
|
||
limit=5,
|
||
)
|
||
grounding_terms = self._collect_capability_grounding_terms(
|
||
selected_capabilities=selected_capabilities,
|
||
missing_inputs=missing_inputs,
|
||
limit=20,
|
||
)
|
||
|
||
capabilities_payload = []
|
||
for item in selected_capabilities[:5]:
|
||
cap = item.capability
|
||
capabilities_payload.append(
|
||
{
|
||
"name": str(getattr(cap, "name", "") or ""),
|
||
"description": str(getattr(cap, "description", "") or ""),
|
||
"required_inputs": self._extract_required_inputs(
|
||
getattr(cap, "input_schema", None)
|
||
),
|
||
"action_context": self._extract_capability_action_context(cap),
|
||
}
|
||
)
|
||
|
||
prompt_payload = {
|
||
"stage": question_number,
|
||
"max_questions": self.LOW_CONFIDENCE_MAX_QUESTIONS,
|
||
"current_user_message": message,
|
||
"recent_user_messages": user_messages[-4:],
|
||
"previous_clarification_questions": previous_questions[-2:],
|
||
"candidate_capabilities": capabilities_payload,
|
||
"missing_required_inputs": missing_inputs,
|
||
"grounding_terms": grounding_terms,
|
||
}
|
||
|
||
system_prompt = (
|
||
"Ты продуктовый ассистент, который задаёт один уточняющий вопрос на русском "
|
||
"для построения исполнимого pipeline. "
|
||
"Верни только JSON формата {\"question_ru\": \"...\"}. "
|
||
"Не используй префиксы вроде 'Уточнение 1/2'. "
|
||
"Не перечисляй много пунктов: один конкретный вопрос. "
|
||
"Вопрос должен быть привязан к candidate_capabilities: используй термины из grounding_terms "
|
||
"или missing_required_inputs и не придумывай новые сущности/ручки."
|
||
)
|
||
user_prompt = (
|
||
"Сгенерируй один следующий вопрос для пользователя на основе контекста.\n"
|
||
"Правила:\n"
|
||
"- Если stage=1: спроси про финальный бизнес-результат и критерий успеха.\n"
|
||
"- Если stage=2: спроси про самый важный недостающий вход/ограничение для исполнения.\n"
|
||
"- Вопрос должен быть конкретным и связанным с candidate_capabilities.\n"
|
||
"- Вопрос обязан содержать минимум один термин из grounding_terms.\n"
|
||
"- Если есть missing_required_inputs, приоритетно используй их в формулировке.\n"
|
||
"- Верни только JSON.\n\n"
|
||
f"CONTEXT:\n{json.dumps(prompt_payload, ensure_ascii=False)}"
|
||
)
|
||
|
||
try:
|
||
payload = chat_json(system_prompt=system_prompt, user_prompt=user_prompt)
|
||
except Exception:
|
||
return None
|
||
if not isinstance(payload, dict):
|
||
return None
|
||
|
||
question = payload.get("question_ru")
|
||
if not isinstance(question, str):
|
||
return None
|
||
normalized = " ".join(question.strip().split())
|
||
normalized = self._strip_low_confidence_marker(normalized)
|
||
normalized = normalized.replace("Уточнение 1/2:", "").replace("Уточнение 2/2:", "").strip()
|
||
if len(normalized) < 12:
|
||
return None
|
||
if not self._is_question_grounded_in_capabilities(
|
||
question=normalized,
|
||
grounding_terms=grounding_terms,
|
||
missing_inputs=missing_inputs,
|
||
):
|
||
return self._build_grounded_fallback_question_ru(
|
||
question_number=question_number,
|
||
selected_capabilities=selected_capabilities,
|
||
missing_inputs=missing_inputs,
|
||
)
|
||
if not normalized.endswith("?"):
|
||
normalized = f"{normalized}?"
|
||
return normalized
|
||
|
||
def _collect_capability_grounding_terms(
|
||
self,
|
||
*,
|
||
selected_capabilities: list[SelectedCapability],
|
||
missing_inputs: list[str],
|
||
limit: int = 20,
|
||
) -> list[str]:
|
||
terms: list[str] = []
|
||
seen: set[str] = set()
|
||
|
||
for item in selected_capabilities[:5]:
|
||
cap = item.capability
|
||
context = self._extract_capability_action_context(cap)
|
||
for value in (
|
||
getattr(cap, "name", None),
|
||
getattr(cap, "description", None),
|
||
context.get("operation_id"),
|
||
context.get("path"),
|
||
context.get("method"),
|
||
context.get("summary"),
|
||
):
|
||
if not isinstance(value, str):
|
||
continue
|
||
cleaned = value.strip()
|
||
key = cleaned.lower()
|
||
if not cleaned or key in seen:
|
||
continue
|
||
seen.add(key)
|
||
terms.append(cleaned)
|
||
if len(terms) >= limit:
|
||
return terms
|
||
|
||
tags = context.get("tags")
|
||
if isinstance(tags, list):
|
||
for tag in tags:
|
||
if not isinstance(tag, str):
|
||
continue
|
||
cleaned = tag.strip()
|
||
key = cleaned.lower()
|
||
if not cleaned or key in seen:
|
||
continue
|
||
seen.add(key)
|
||
terms.append(cleaned)
|
||
if len(terms) >= limit:
|
||
return terms
|
||
|
||
required = context.get("required_inputs")
|
||
if isinstance(required, list):
|
||
for item_value in required:
|
||
if not isinstance(item_value, str):
|
||
continue
|
||
cleaned = item_value.strip()
|
||
key = cleaned.lower()
|
||
if not cleaned or key in seen:
|
||
continue
|
||
seen.add(key)
|
||
terms.append(cleaned)
|
||
if len(terms) >= limit:
|
||
return terms
|
||
|
||
for value in missing_inputs:
|
||
cleaned = str(value).strip()
|
||
key = cleaned.lower()
|
||
if not cleaned or key in seen:
|
||
continue
|
||
seen.add(key)
|
||
terms.append(cleaned)
|
||
if len(terms) >= limit:
|
||
break
|
||
|
||
return terms
|
||
|
||
def _is_question_grounded_in_capabilities(
|
||
self,
|
||
*,
|
||
question: str,
|
||
grounding_terms: list[str],
|
||
missing_inputs: list[str],
|
||
) -> bool:
|
||
question_lower = question.lower()
|
||
question_tokens = self._tokenize_text(question)
|
||
if not question_tokens:
|
||
return False
|
||
|
||
for term in grounding_terms + missing_inputs:
|
||
term_text = str(term).strip()
|
||
if not term_text:
|
||
continue
|
||
term_lower = term_text.lower()
|
||
if term_lower in question_lower:
|
||
return True
|
||
term_tokens = self._tokenize_field_name(term_text)
|
||
if term_tokens and term_tokens & question_tokens:
|
||
return True
|
||
|
||
return False
|
||
|
||
def _build_grounded_fallback_question_ru(
|
||
self,
|
||
*,
|
||
question_number: int,
|
||
selected_capabilities: list[SelectedCapability],
|
||
missing_inputs: list[str],
|
||
) -> str:
|
||
primary_signature = "выбранной capability"
|
||
if selected_capabilities:
|
||
cap = selected_capabilities[0].capability
|
||
context = self._extract_capability_action_context(cap)
|
||
method = context.get("method")
|
||
path = context.get("path")
|
||
if isinstance(method, str) and isinstance(path, str) and method and path:
|
||
primary_signature = f"{method} {path}"
|
||
else:
|
||
cap_name = str(getattr(cap, "name", "") or "").strip()
|
||
if cap_name:
|
||
primary_signature = cap_name
|
||
|
||
if question_number <= 1:
|
||
return (
|
||
f"Какой конечный бизнес-результат нужен для сценария с {primary_signature}, "
|
||
"чтобы я собрал исполнимый pipeline?"
|
||
)
|
||
|
||
if missing_inputs:
|
||
first_input = self._humanize_input_name(missing_inputs[0])
|
||
return (
|
||
f"Для шага {primary_signature} какое значение вы передадите в поле "
|
||
f"\"{first_input}\"?"
|
||
)
|
||
|
||
return (
|
||
f"Для шага {primary_signature} какие входные параметры обязательны, "
|
||
"чтобы можно было выполнить pipeline без ошибок?"
|
||
)
|
||
|
||
def _count_low_confidence_questions(
|
||
self,
|
||
dialog_messages: list[dict[str, Any]],
|
||
) -> int:
|
||
count = 0
|
||
# Count only the current clarification chain (from the end of dialog),
|
||
# so each new request can start a fresh 2-step clarification when needed.
|
||
for item in reversed(dialog_messages):
|
||
if not isinstance(item, dict):
|
||
continue
|
||
role = str(item.get("role", "")).lower()
|
||
if role == "user":
|
||
continue
|
||
if role != "assistant":
|
||
continue
|
||
content = str(item.get("content", "")).lower()
|
||
if self._is_low_confidence_question(content):
|
||
count += 1
|
||
continue
|
||
break
|
||
return count
|
||
|
||
def _is_low_confidence_question(self, content: str) -> bool:
|
||
content = content.strip().lower()
|
||
return (
|
||
self.LOW_CONFIDENCE_DIALOG_MARKER in content
|
||
or self.LOW_CONFIDENCE_QUESTION_MARKER in content
|
||
or "какой финальный бизнес-результат нужен" in content
|
||
)
|
||
|
||
def _attach_low_confidence_marker(self, question: str) -> str:
|
||
return f"{question}\n{self.LOW_CONFIDENCE_DIALOG_MARKER}"
|
||
|
||
def _strip_low_confidence_marker(self, content: str) -> str:
|
||
return content.replace(self.LOW_CONFIDENCE_DIALOG_MARKER, "").strip()
|
||
|
||
def _collect_user_context_tokens(
|
||
self,
|
||
*,
|
||
message: str,
|
||
dialog_messages: list[dict[str, Any]],
|
||
) -> set[str]:
|
||
tokens = set(self._tokenize_text(message or ""))
|
||
for item in dialog_messages[-6:]:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
if str(item.get("role", "")).lower() != "user":
|
||
continue
|
||
tokens.update(self._tokenize_text(str(item.get("content", ""))))
|
||
return tokens
|
||
|
||
def _collect_missing_required_inputs(
|
||
self,
|
||
*,
|
||
selected_capabilities: list[SelectedCapability],
|
||
context_tokens: set[str],
|
||
limit: int = 3,
|
||
) -> list[str]:
|
||
missing: list[str] = []
|
||
seen: set[str] = set()
|
||
|
||
for item in selected_capabilities[:3]:
|
||
required_inputs = self._extract_required_inputs(
|
||
getattr(item.capability, "input_schema", None)
|
||
)
|
||
for required_input in required_inputs:
|
||
key = str(required_input).strip().lower()
|
||
if not key or key in seen:
|
||
continue
|
||
seen.add(key)
|
||
|
||
required_tokens = self._tokenize_field_name(required_input)
|
||
if required_tokens and required_tokens & context_tokens:
|
||
continue
|
||
|
||
missing.append(str(required_input))
|
||
if len(missing) >= limit:
|
||
return missing
|
||
|
||
return missing
|
||
|
||
def _tokenize_field_name(self, value: str) -> set[str]:
|
||
normalized = re.sub(r"([a-z])([A-Z])", r"\1 \2", str(value))
|
||
normalized = normalized.replace("_", " ").replace("-", " ")
|
||
tokens = self._tokenize_text(normalized)
|
||
|
||
value_lower = str(value).lower()
|
||
if value_lower.endswith("_id") and len(value_lower) > 3:
|
||
tokens.add(value_lower[:-3])
|
||
return tokens
|
||
|
||
def _humanize_input_name(self, name: str) -> str:
|
||
label = re.sub(r"([a-z])([A-Z])", r"\1 \2", str(name))
|
||
label = label.replace("_", " ").replace("-", " ").strip()
|
||
return label or str(name)
|
||
|
||
def _extract_capability_action_context(self, capability: Any) -> dict[str, Any]:
|
||
llm_payload = getattr(capability, "llm_payload", None)
|
||
if not isinstance(llm_payload, dict):
|
||
return {}
|
||
|
||
brief = llm_payload.get("action_context_brief")
|
||
if isinstance(brief, dict):
|
||
return brief
|
||
|
||
fallback = llm_payload.get("action_context")
|
||
if not isinstance(fallback, dict):
|
||
return {}
|
||
|
||
# Keep prompt payload compact even if full action context is stored.
|
||
compact: dict[str, Any] = {}
|
||
for key in (
|
||
"operation_id",
|
||
"method",
|
||
"path",
|
||
"base_url",
|
||
"summary",
|
||
"description",
|
||
"tags",
|
||
"source_filename",
|
||
):
|
||
if key in fallback:
|
||
compact[key] = fallback.get(key)
|
||
return compact
|
||
|
||
def _build_capability_prompt_payload(self, capability: Any) -> dict[str, Any]:
|
||
payload = {
|
||
"id": str(getattr(capability, "id", "")),
|
||
"action_id": str(getattr(capability, "action_id", ""))
|
||
if getattr(capability, "action_id", None) is not None
|
||
else None,
|
||
"type": self._capability_type_value(capability),
|
||
"name": getattr(capability, "name", None),
|
||
"description": getattr(capability, "description", None),
|
||
"input_type": getattr(capability, "input_schema", None),
|
||
"output_type": getattr(capability, "output_schema", None),
|
||
"required_inputs": self._extract_required_inputs(
|
||
getattr(capability, "input_schema", None)
|
||
),
|
||
"data_format": getattr(capability, "data_format", None),
|
||
"action_context": self._extract_capability_action_context(capability),
|
||
}
|
||
recipe_summary = self._extract_recipe_summary(capability)
|
||
if recipe_summary is not None:
|
||
payload["recipe_summary"] = recipe_summary
|
||
return payload
|
||
|
||
def _extract_recipe_summary(self, capability: Any) -> dict[str, Any] | None:
|
||
llm_payload = getattr(capability, "llm_payload", None)
|
||
if isinstance(llm_payload, dict):
|
||
summary = llm_payload.get("recipe_summary")
|
||
if isinstance(summary, dict):
|
||
return summary
|
||
|
||
recipe = getattr(capability, "recipe", None)
|
||
if not isinstance(recipe, dict):
|
||
return None
|
||
steps = recipe.get("steps")
|
||
if not isinstance(steps, list):
|
||
return None
|
||
return {
|
||
"version": recipe.get("version"),
|
||
"steps_count": len(steps),
|
||
}
|
||
|
||
def _capability_type_value(self, capability: Any) -> str:
|
||
cap_type = getattr(capability, "type", None)
|
||
if isinstance(cap_type, str):
|
||
return cap_type.upper()
|
||
if hasattr(cap_type, "value"):
|
||
return str(cap_type.value).upper()
|
||
return "ATOMIC"
|
||
|
||
def _recipe_is_executable(self, recipe: Any) -> bool:
|
||
if not isinstance(recipe, dict):
|
||
return False
|
||
if recipe.get("version") != 1:
|
||
return False
|
||
steps = recipe.get("steps")
|
||
return isinstance(steps, list) and bool(steps)
|
||
|
||
def _normalize_workflow(
|
||
self,
|
||
raw_graph: dict[str, Any],
|
||
selected_capabilities: list[SelectedCapability],
|
||
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[str]]:
|
||
capabilities_by_id = {
|
||
str(sc.capability.id): sc.capability for sc in selected_capabilities
|
||
}
|
||
capabilities = [sc.capability for sc in selected_capabilities]
|
||
issues: list[str] = []
|
||
|
||
raw_nodes = raw_graph.get("nodes") if isinstance(raw_graph, dict) else None
|
||
if not isinstance(raw_nodes, list):
|
||
raw_nodes = []
|
||
|
||
normalized_nodes: list[dict[str, Any]] = []
|
||
step_map: dict[Any, int] = {}
|
||
known_steps: set[int] = set()
|
||
next_step = 1
|
||
|
||
for raw_node in raw_nodes:
|
||
if not isinstance(raw_node, dict):
|
||
continue
|
||
|
||
raw_step = raw_node.get("step")
|
||
raw_id = raw_node.get("id")
|
||
step = self._resolve_step_reference(raw_step, step_map=step_map, known_steps=known_steps)
|
||
if step is None:
|
||
step = self._resolve_step_reference(raw_id, step_map=step_map, known_steps=known_steps)
|
||
if step is None:
|
||
while next_step in known_steps:
|
||
next_step += 1
|
||
step = next_step
|
||
next_step += 1
|
||
|
||
known_steps.add(step)
|
||
self._register_step_alias(step_map, raw_step, step)
|
||
self._register_step_alias(step_map, raw_id, step)
|
||
self._register_step_alias(step_map, step, step)
|
||
|
||
capability_id_raw = raw_node.get("capability_id")
|
||
raw_endpoints = raw_node.get("endpoints")
|
||
has_raw_endpoints = isinstance(raw_endpoints, list) and bool(raw_endpoints)
|
||
cap = self._resolve_capability_for_node(
|
||
raw_node=raw_node,
|
||
capability_id=capability_id_raw,
|
||
capabilities=capabilities,
|
||
capabilities_by_id=capabilities_by_id,
|
||
)
|
||
has_explicit_capability_ref = (
|
||
capability_id_raw is not None and str(capability_id_raw).strip() != ""
|
||
)
|
||
if has_explicit_capability_ref and cap is None:
|
||
issues.append("graph:invalid_capability_ref")
|
||
elif cap is None and len(capabilities) > 1 and not has_raw_endpoints:
|
||
issues.append("graph:missing_capability_ref")
|
||
|
||
endpoints_payload = self._resolve_endpoints_for_node(
|
||
raw_node=raw_node,
|
||
fallback_capability=cap,
|
||
capabilities=capabilities,
|
||
capabilities_by_id=capabilities_by_id,
|
||
issues=issues,
|
||
)
|
||
if not endpoints_payload and cap is not None:
|
||
endpoints_payload = [self._build_endpoint_payload(cap)]
|
||
|
||
normalized_nodes.append(
|
||
{
|
||
"step": step,
|
||
"name": raw_node.get("name")
|
||
or (cap.name if cap else f"Шаг {step}"),
|
||
"description": raw_node.get("description"),
|
||
"input_connected_from": self._normalize_int_list(
|
||
raw_node.get("input_connected_from")
|
||
),
|
||
"output_connected_to": self._normalize_int_list(
|
||
raw_node.get("output_connected_to")
|
||
),
|
||
"input_data_type_from_previous": self._normalize_input_data_types(
|
||
raw_node.get("input_data_type_from_previous")
|
||
),
|
||
"external_inputs": self._normalize_str_list(
|
||
raw_node.get("external_inputs")
|
||
),
|
||
"endpoints": endpoints_payload,
|
||
}
|
||
)
|
||
|
||
raw_edges = raw_graph.get("edges") if isinstance(raw_graph, dict) else None
|
||
if not isinstance(raw_edges, list):
|
||
raw_edges = []
|
||
|
||
normalized_edges: list[dict[str, Any]] = []
|
||
for raw_edge in raw_edges:
|
||
if not isinstance(raw_edge, dict):
|
||
continue
|
||
from_ref = (
|
||
raw_edge.get("from_step")
|
||
or raw_edge.get("from")
|
||
or raw_edge.get("source")
|
||
)
|
||
to_ref = (
|
||
raw_edge.get("to_step") or raw_edge.get("to") or raw_edge.get("target")
|
||
)
|
||
edge_type = raw_edge.get("type")
|
||
from_step = self._resolve_step_reference(from_ref, step_map=step_map, known_steps=known_steps)
|
||
to_step = self._resolve_step_reference(to_ref, step_map=step_map, known_steps=known_steps)
|
||
if not isinstance(from_step, int) or not isinstance(to_step, int):
|
||
continue
|
||
if not isinstance(edge_type, str) or not edge_type.strip():
|
||
continue
|
||
normalized_edges.append(
|
||
{
|
||
"from_step": from_step,
|
||
"to_step": to_step,
|
||
"type": edge_type.strip(),
|
||
}
|
||
)
|
||
|
||
return normalized_nodes, normalized_edges, sorted(set(issues))
|
||
|
||
def _resolve_endpoints_for_node(
|
||
self,
|
||
*,
|
||
raw_node: dict[str, Any],
|
||
fallback_capability: Any | None,
|
||
capabilities: list[Any],
|
||
capabilities_by_id: dict[str, Any],
|
||
issues: list[str],
|
||
) -> list[dict[str, Any]]:
|
||
raw_endpoints = raw_node.get("endpoints")
|
||
if not isinstance(raw_endpoints, list):
|
||
return []
|
||
|
||
resolved_endpoints: list[dict[str, Any]] = []
|
||
fallback_capability_id = raw_node.get("capability_id")
|
||
fallback_capability_str = (
|
||
str(fallback_capability_id).strip()
|
||
if fallback_capability_id is not None
|
||
else ""
|
||
)
|
||
fallback_from_node = (
|
||
capabilities_by_id.get(fallback_capability_str)
|
||
if fallback_capability_str
|
||
else None
|
||
)
|
||
|
||
for raw_endpoint in raw_endpoints:
|
||
if not isinstance(raw_endpoint, dict):
|
||
issues.append("graph:invalid_capability_ref")
|
||
continue
|
||
|
||
endpoint_capability_id = raw_endpoint.get("capability_id")
|
||
endpoint_capability: Any | None = None
|
||
if endpoint_capability_id is not None and str(endpoint_capability_id).strip():
|
||
endpoint_capability = capabilities_by_id.get(str(endpoint_capability_id).strip())
|
||
elif fallback_from_node is not None:
|
||
endpoint_capability = fallback_from_node
|
||
elif fallback_capability is not None:
|
||
endpoint_capability = fallback_capability
|
||
elif len(capabilities) == 1:
|
||
endpoint_capability = capabilities[0]
|
||
|
||
if endpoint_capability is None:
|
||
issues.append("graph:invalid_capability_ref")
|
||
continue
|
||
|
||
resolved_endpoints.append(
|
||
self._build_endpoint_payload(endpoint_capability, raw_endpoint=raw_endpoint)
|
||
)
|
||
|
||
return resolved_endpoints
|
||
|
||
def _build_endpoint_payload(
|
||
self,
|
||
capability: Any,
|
||
*,
|
||
raw_endpoint: dict[str, Any] | None = None,
|
||
) -> dict[str, Any]:
|
||
cap_type = getattr(capability, "type", None)
|
||
if hasattr(cap_type, "value"):
|
||
cap_type_value = cap_type.value
|
||
elif cap_type is None:
|
||
cap_type_value = None
|
||
else:
|
||
cap_type_value = str(cap_type)
|
||
|
||
endpoint_name = None
|
||
if isinstance(raw_endpoint, dict):
|
||
raw_name = raw_endpoint.get("name")
|
||
if isinstance(raw_name, str) and raw_name.strip():
|
||
endpoint_name = raw_name.strip()
|
||
|
||
return {
|
||
"name": endpoint_name or capability.name,
|
||
"capability_id": str(capability.id),
|
||
"action_id": str(capability.action_id) if capability.action_id is not None else None,
|
||
"type": cap_type_value,
|
||
"input_type": capability.input_schema,
|
||
"output_type": capability.output_schema,
|
||
}
|
||
|
||
def _review_graph_with_llm(
|
||
self,
|
||
nodes: list[dict[str, Any]],
|
||
edges: list[dict[str, Any]],
|
||
selected_capabilities: list[SelectedCapability],
|
||
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
|
||
if not nodes:
|
||
return nodes, edges
|
||
|
||
capabilities_payload = [
|
||
self._build_capability_prompt_payload(sc.capability)
|
||
for sc in selected_capabilities
|
||
]
|
||
|
||
review_prompt = (
|
||
"ROLE: Graph reviewer for qwen2.5:7b-coder.\n"
|
||
"INPUT: GRAPH and CAPABILITIES only.\n"
|
||
"TASK: Keep executable DAG, remove dead branches, and fix edges.\n"
|
||
"CONSTRAINTS:\n"
|
||
"- Do not invent new steps.\n"
|
||
"- Keep only steps that contribute to final goal.\n"
|
||
"- Return JSON only.\n\n"
|
||
"OUTPUT_FORMAT:\n"
|
||
"{\n"
|
||
" \"keep_steps\": [1,2,3],\n"
|
||
" \"edges\": [\n"
|
||
" {\"from_step\": 1, \"to_step\": 2, \"type\": \"field_name\"}\n"
|
||
" ]\n"
|
||
"}\n\n"
|
||
f"CAPABILITIES:\n{json.dumps(capabilities_payload, ensure_ascii=False)}\n\n"
|
||
f"GRAPH:\n{json.dumps({'nodes': nodes, 'edges': edges}, ensure_ascii=False)}"
|
||
)
|
||
system_prompt = (
|
||
"You validate workflow graph connectivity for qwen2.5-coder. "
|
||
"Return ONLY one valid JSON object."
|
||
)
|
||
|
||
try:
|
||
payload = chat_json(system_prompt=system_prompt, user_prompt=review_prompt)
|
||
except Exception:
|
||
return nodes, edges
|
||
|
||
if not isinstance(payload, dict):
|
||
return nodes, edges
|
||
|
||
known_steps = {
|
||
step for node in nodes if isinstance((step := node.get("step")), int)
|
||
}
|
||
if not known_steps:
|
||
return nodes, edges
|
||
|
||
keep_steps = set(self._normalize_int_list(payload.get("keep_steps")))
|
||
keep_steps = keep_steps & known_steps if keep_steps else set(known_steps)
|
||
|
||
reviewed_edges = self._normalize_review_edges(payload.get("edges"), keep_steps)
|
||
if not reviewed_edges:
|
||
reviewed_edges = [
|
||
edge
|
||
for edge in edges
|
||
if isinstance(edge.get("from_step"), int)
|
||
and isinstance(edge.get("to_step"), int)
|
||
and edge["from_step"] in keep_steps
|
||
and edge["to_step"] in keep_steps
|
||
]
|
||
|
||
reviewed_nodes = [
|
||
node
|
||
for node in nodes
|
||
if isinstance(node.get("step"), int) and node["step"] in keep_steps
|
||
]
|
||
return reviewed_nodes, reviewed_edges
|
||
|
||
def _normalize_review_edges(
|
||
self,
|
||
raw_edges: Any,
|
||
keep_steps: set[int],
|
||
) -> list[dict[str, Any]]:
|
||
if not isinstance(raw_edges, list):
|
||
return []
|
||
|
||
result: list[dict[str, Any]] = []
|
||
seen: set[tuple[int, int, str]] = set()
|
||
for edge in raw_edges:
|
||
if not isinstance(edge, dict):
|
||
continue
|
||
src = edge.get("from_step")
|
||
dst = edge.get("to_step")
|
||
edge_type = edge.get("type")
|
||
if not isinstance(src, int) or not isinstance(dst, int):
|
||
continue
|
||
if src not in keep_steps or dst not in keep_steps:
|
||
continue
|
||
if not isinstance(edge_type, str) or not edge_type.strip():
|
||
continue
|
||
key = (src, dst, edge_type.strip())
|
||
if key in seen:
|
||
continue
|
||
seen.add(key)
|
||
result.append(
|
||
{
|
||
"from_step": src,
|
||
"to_step": dst,
|
||
"type": edge_type.strip(),
|
||
}
|
||
)
|
||
return result
|
||
|
||
def _resolve_capability_for_node(
|
||
self,
|
||
*,
|
||
raw_node: dict[str, Any],
|
||
capability_id: Any,
|
||
capabilities: list[Any],
|
||
capabilities_by_id: dict[str, Any],
|
||
) -> Any | None:
|
||
_ = raw_node
|
||
has_explicit_capability_ref = capability_id is not None and str(capability_id).strip() != ""
|
||
if has_explicit_capability_ref:
|
||
return capabilities_by_id.get(str(capability_id).strip())
|
||
|
||
if len(capabilities) == 1 and not has_explicit_capability_ref:
|
||
return capabilities[0]
|
||
|
||
return None
|
||
|
||
def _collect_node_capability_hints(self, raw_node: dict[str, Any]) -> list[str]:
|
||
hints: list[str] = []
|
||
for key in ("name", "description", "title", "operation_id", "action_id"):
|
||
value = raw_node.get(key)
|
||
if isinstance(value, str) and value.strip():
|
||
hints.append(value.strip())
|
||
|
||
endpoints = raw_node.get("endpoints")
|
||
if isinstance(endpoints, list):
|
||
for endpoint in endpoints:
|
||
if not isinstance(endpoint, dict):
|
||
continue
|
||
for key in ("name", "description", "summary", "operation_id", "action_id", "path"):
|
||
value = endpoint.get(key)
|
||
if isinstance(value, str) and value.strip():
|
||
hints.append(value.strip())
|
||
|
||
return hints
|
||
|
||
def _match_capability_by_alias(self, capabilities: list[Any], lookup_value: str) -> Any | None:
|
||
query = str(lookup_value or "").strip()
|
||
if not query:
|
||
return None
|
||
|
||
lowered = query.lower()
|
||
normalized_query = self._normalize_lookup_token(query)
|
||
strong_matches: list[Any] = []
|
||
normalized_matches: list[Any] = []
|
||
fuzzy_matches: list[Any] = []
|
||
|
||
for capability in capabilities:
|
||
aliases = self._collect_capability_aliases(capability)
|
||
for alias in aliases:
|
||
alias_lower = alias.lower()
|
||
if alias_lower == lowered:
|
||
strong_matches.append(capability)
|
||
break
|
||
|
||
if capability in strong_matches:
|
||
continue
|
||
|
||
for alias in aliases:
|
||
alias_normalized = self._normalize_lookup_token(alias)
|
||
if normalized_query and alias_normalized == normalized_query:
|
||
normalized_matches.append(capability)
|
||
break
|
||
|
||
if capability in normalized_matches:
|
||
continue
|
||
|
||
if len(normalized_query) >= 4:
|
||
for alias in aliases:
|
||
alias_normalized = self._normalize_lookup_token(alias)
|
||
if not alias_normalized:
|
||
continue
|
||
if normalized_query in alias_normalized or alias_normalized in normalized_query:
|
||
fuzzy_matches.append(capability)
|
||
break
|
||
|
||
unique_strong = self._single_or_none(strong_matches)
|
||
if unique_strong is not None:
|
||
return unique_strong
|
||
|
||
unique_normalized = self._single_or_none(normalized_matches)
|
||
if unique_normalized is not None:
|
||
return unique_normalized
|
||
|
||
return self._single_or_none(fuzzy_matches)
|
||
|
||
def _collect_capability_aliases(self, capability: Any) -> list[str]:
|
||
aliases: list[str] = []
|
||
seen: set[str] = set()
|
||
|
||
def add_alias(raw_value: Any) -> None:
|
||
if raw_value is None:
|
||
return
|
||
value = str(raw_value).strip()
|
||
if not value:
|
||
return
|
||
key = value.lower()
|
||
if key in seen:
|
||
return
|
||
seen.add(key)
|
||
aliases.append(value)
|
||
|
||
add_alias(getattr(capability, "id", None))
|
||
add_alias(getattr(capability, "name", None))
|
||
add_alias(getattr(capability, "description", None))
|
||
add_alias(getattr(capability, "action_id", None))
|
||
|
||
action_context = self._extract_capability_action_context(capability)
|
||
if isinstance(action_context, dict):
|
||
add_alias(action_context.get("operation_id"))
|
||
add_alias(action_context.get("path"))
|
||
add_alias(action_context.get("summary"))
|
||
add_alias(action_context.get("description"))
|
||
method = action_context.get("method")
|
||
path = action_context.get("path")
|
||
if isinstance(method, str) and isinstance(path, str):
|
||
add_alias(f"{method} {path}")
|
||
|
||
return aliases
|
||
|
||
def _normalize_lookup_token(self, value: Any) -> str:
|
||
return re.sub(r"[^a-zа-я0-9]+", "", str(value or "").lower())
|
||
|
||
def _single_or_none(self, items: list[Any]) -> Any | None:
|
||
if not items:
|
||
return None
|
||
first = items[0]
|
||
if all(candidate is first for candidate in items):
|
||
return first
|
||
return None
|
||
|
||
def _repair_edges_with_data_flow(
|
||
self,
|
||
nodes: list[dict[str, Any]],
|
||
edges: list[dict[str, Any]],
|
||
) -> list[dict[str, Any]]:
|
||
known_steps = {
|
||
step for node in nodes if isinstance((step := node.get("step")), int)
|
||
}
|
||
sanitized: list[dict[str, Any]] = []
|
||
seen: set[tuple[int, int, str]] = set()
|
||
for edge in edges:
|
||
src = edge.get("from_step")
|
||
dst = edge.get("to_step")
|
||
edge_type = edge.get("type")
|
||
if not isinstance(src, int) or not isinstance(dst, int):
|
||
continue
|
||
if src not in known_steps or dst not in known_steps or src == dst:
|
||
continue
|
||
if not isinstance(edge_type, str) or not edge_type.strip():
|
||
continue
|
||
key = (src, dst, edge_type.strip())
|
||
if key in seen:
|
||
continue
|
||
seen.add(key)
|
||
sanitized.append(
|
||
{
|
||
"from_step": src,
|
||
"to_step": dst,
|
||
"type": edge_type.strip(),
|
||
}
|
||
)
|
||
return sanitized
|
||
|
||
def _prune_edges_for_terminal_goal(
|
||
self,
|
||
nodes: list[dict[str, Any]],
|
||
edges: list[dict[str, Any]],
|
||
) -> list[dict[str, Any]]:
|
||
known_steps = {
|
||
step for node in nodes if isinstance((step := node.get("step")), int)
|
||
}
|
||
if not known_steps:
|
||
return edges
|
||
|
||
out_degree: dict[int, int] = {step: 0 for step in known_steps}
|
||
reverse_adjacency: dict[int, set[int]] = {}
|
||
for edge in edges:
|
||
src = edge.get("from_step")
|
||
dst = edge.get("to_step")
|
||
if isinstance(src, int) and isinstance(dst, int):
|
||
out_degree[src] = out_degree.get(src, 0) + 1
|
||
reverse_adjacency.setdefault(dst, set()).add(src)
|
||
|
||
sink_steps = [step for step in known_steps if out_degree.get(step, 0) == 0]
|
||
if not sink_steps:
|
||
return []
|
||
|
||
reachable: set[int] = set()
|
||
stack: list[int] = list(sink_steps)
|
||
while stack:
|
||
current = stack.pop()
|
||
if current in reachable:
|
||
continue
|
||
reachable.add(current)
|
||
stack.extend(reverse_adjacency.get(current, set()))
|
||
|
||
return [
|
||
edge
|
||
for edge in edges
|
||
if isinstance(edge.get("from_step"), int)
|
||
and isinstance(edge.get("to_step"), int)
|
||
and edge["from_step"] in reachable
|
||
and edge["to_step"] in reachable
|
||
]
|
||
|
||
def _prune_edges_by_required_inputs(
|
||
self,
|
||
nodes: list[dict[str, Any]],
|
||
edges: list[dict[str, Any]],
|
||
) -> list[dict[str, Any]]:
|
||
required_by_step: dict[int, set[str]] = {}
|
||
explicit_by_step: dict[int, set[str]] = {}
|
||
for node in nodes:
|
||
step = node.get("step")
|
||
if not isinstance(step, int):
|
||
continue
|
||
required_inputs = self._extract_required_inputs_from_node(node)
|
||
if required_inputs:
|
||
required_by_step[step] = set(required_inputs)
|
||
explicit_types = {
|
||
edge_ref.get("type")
|
||
for edge_ref in self._normalize_input_data_types(
|
||
node.get("input_data_type_from_previous")
|
||
)
|
||
if isinstance(edge_ref.get("type"), str)
|
||
}
|
||
if explicit_types:
|
||
explicit_by_step[step] = explicit_types
|
||
|
||
edges_by_target: dict[int, list[dict[str, Any]]] = {}
|
||
passthrough_edges: list[dict[str, Any]] = []
|
||
for edge in edges:
|
||
to_step = edge.get("to_step")
|
||
if isinstance(to_step, int):
|
||
edges_by_target.setdefault(to_step, []).append(edge)
|
||
else:
|
||
passthrough_edges.append(edge)
|
||
|
||
filtered: list[dict[str, Any]] = []
|
||
for to_step, target_edges in edges_by_target.items():
|
||
required_inputs = required_by_step.get(to_step, set())
|
||
explicit_types = explicit_by_step.get(to_step, set())
|
||
if not required_inputs and not explicit_types:
|
||
filtered.extend(target_edges)
|
||
continue
|
||
|
||
matched_edges = [
|
||
edge
|
||
for edge in target_edges
|
||
if self._edge_matches_expected_inputs(
|
||
edge_type=edge.get("type"),
|
||
required_inputs=required_inputs,
|
||
explicit_types=explicit_types,
|
||
)
|
||
]
|
||
# Keep original edges if aliases did not match any schema field.
|
||
filtered.extend(matched_edges if matched_edges else target_edges)
|
||
|
||
filtered.extend(passthrough_edges)
|
||
return filtered
|
||
|
||
def _prune_disconnected_nodes(
|
||
self,
|
||
nodes: list[dict[str, Any]],
|
||
edges: list[dict[str, Any]],
|
||
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
|
||
if len(nodes) <= 1:
|
||
return nodes, edges
|
||
|
||
steps = [n.get("step") for n in nodes if isinstance(n.get("step"), int)]
|
||
if not steps:
|
||
return nodes, edges
|
||
|
||
connected_steps: set[int] = set()
|
||
for edge in edges:
|
||
src = edge.get("from_step")
|
||
dst = edge.get("to_step")
|
||
if isinstance(src, int):
|
||
connected_steps.add(src)
|
||
if isinstance(dst, int):
|
||
connected_steps.add(dst)
|
||
|
||
if connected_steps:
|
||
keep_steps = connected_steps
|
||
else:
|
||
# No usable data-flow edges: keep only one primary step
|
||
# to avoid returning multiple hanging nodes.
|
||
keep_steps = {max(steps)}
|
||
|
||
pruned_nodes = [
|
||
node
|
||
for node in nodes
|
||
if isinstance(node.get("step"), int) and node["step"] in keep_steps
|
||
]
|
||
pruned_edges = [
|
||
edge
|
||
for edge in edges
|
||
if isinstance(edge.get("from_step"), int)
|
||
and isinstance(edge.get("to_step"), int)
|
||
and edge["from_step"] in keep_steps
|
||
and edge["to_step"] in keep_steps
|
||
]
|
||
|
||
return pruned_nodes, pruned_edges
|
||
|
||
def _ensure_external_inputs(
|
||
self,
|
||
nodes: list[dict[str, Any]],
|
||
edges: list[dict[str, Any]],
|
||
) -> None:
|
||
edges_by_target: dict[int, set[str]] = {}
|
||
for edge in edges:
|
||
to_step = edge.get("to_step")
|
||
edge_type = edge.get("type")
|
||
if isinstance(to_step, int) and isinstance(edge_type, str):
|
||
edges_by_target.setdefault(to_step, set()).add(edge_type)
|
||
|
||
for node in nodes:
|
||
step = node.get("step")
|
||
required_inputs = self._extract_required_inputs_from_node(node)
|
||
if not required_inputs:
|
||
continue
|
||
external_inputs = set(self._normalize_str_list(node.get("external_inputs")))
|
||
incoming_types = edges_by_target.get(step, set())
|
||
for required_input in required_inputs:
|
||
if self._is_input_satisfied_by_values(required_input, incoming_types):
|
||
continue
|
||
external_inputs.add(required_input)
|
||
node["external_inputs"] = sorted(external_inputs)
|
||
|
||
def _sync_node_connections(
|
||
self,
|
||
nodes: list[dict[str, Any]],
|
||
edges: list[dict[str, Any]],
|
||
) -> None:
|
||
incoming_by_step: dict[int, set[int]] = {}
|
||
outgoing_by_step: dict[int, set[int]] = {}
|
||
known_steps = {
|
||
step for node in nodes if isinstance((step := node.get("step")), int)
|
||
}
|
||
|
||
for step in known_steps:
|
||
incoming_by_step[step] = set()
|
||
outgoing_by_step[step] = set()
|
||
|
||
for edge in edges:
|
||
src = edge.get("from_step")
|
||
dst = edge.get("to_step")
|
||
if not isinstance(src, int) or not isinstance(dst, int):
|
||
continue
|
||
if src not in known_steps or dst not in known_steps:
|
||
continue
|
||
outgoing_by_step[src].add(dst)
|
||
incoming_by_step[dst].add(src)
|
||
|
||
for node in nodes:
|
||
step = node.get("step")
|
||
if not isinstance(step, int):
|
||
node["input_connected_from"] = []
|
||
node["output_connected_to"] = []
|
||
continue
|
||
node["input_connected_from"] = sorted(incoming_by_step.get(step, set()))
|
||
node["output_connected_to"] = sorted(outgoing_by_step.get(step, set()))
|
||
|
||
def _validate_ready_graph(
|
||
self,
|
||
nodes: list[dict[str, Any]],
|
||
edges: list[dict[str, Any]],
|
||
) -> tuple[bool, list[str]]:
|
||
missing: list[str] = []
|
||
structure_issues = self._collect_graph_structure_issues(nodes, edges)
|
||
# Multiple sink nodes are valid for fan-out scenarios and should not block execution.
|
||
missing.extend(
|
||
issue for issue in structure_issues if issue != "graph:ambiguous_terminal"
|
||
)
|
||
edges_by_target: dict[int, set[str]] = {}
|
||
for edge in edges:
|
||
to_step = edge.get("to_step")
|
||
edge_type = edge.get("type")
|
||
if isinstance(to_step, int) and isinstance(edge_type, str):
|
||
edges_by_target.setdefault(to_step, set()).add(edge_type)
|
||
|
||
for node in nodes:
|
||
step = node.get("step")
|
||
endpoints = node.get("endpoints") or []
|
||
if not endpoints:
|
||
missing.append(f"node_{step}: missing_endpoint")
|
||
continue
|
||
|
||
for endpoint in endpoints:
|
||
if not endpoint.get("capability_id"):
|
||
missing.append(f"node_{step}: invalid_endpoint")
|
||
continue
|
||
endpoint_type = str(endpoint.get("type") or "").upper()
|
||
if endpoint_type == "COMPOSITE":
|
||
continue
|
||
if not endpoint.get("action_id"):
|
||
missing.append(f"node_{step}: invalid_endpoint")
|
||
|
||
required_inputs = self._extract_required_inputs_from_node(node)
|
||
if not required_inputs:
|
||
continue
|
||
|
||
external_inputs = set(self._normalize_str_list(node.get("external_inputs")))
|
||
incoming_types = edges_by_target.get(step, set())
|
||
for required_input in required_inputs:
|
||
if self._is_input_satisfied_by_values(required_input, external_inputs):
|
||
continue
|
||
if self._is_input_satisfied_by_values(required_input, incoming_types):
|
||
continue
|
||
missing.append(f"node_{step}: missing_input:{required_input}")
|
||
|
||
return len(missing) == 0, missing
|
||
|
||
def _collect_graph_structure_issues(
|
||
self,
|
||
nodes: list[dict[str, Any]],
|
||
edges: list[dict[str, Any]],
|
||
) -> list[str]:
|
||
issues: list[str] = []
|
||
valid_steps = {
|
||
step for node in nodes if isinstance((step := node.get("step")), int)
|
||
}
|
||
if not valid_steps:
|
||
return ["graph: empty"]
|
||
|
||
valid_edges: list[dict[str, Any]] = []
|
||
for edge in edges:
|
||
src = edge.get("from_step")
|
||
dst = edge.get("to_step")
|
||
edge_type = edge.get("type")
|
||
if not isinstance(src, int) or not isinstance(dst, int):
|
||
issues.append("graph: invalid_edge_reference")
|
||
continue
|
||
if src not in valid_steps or dst not in valid_steps:
|
||
issues.append("graph: edge_to_missing_node")
|
||
continue
|
||
if not isinstance(edge_type, str) or not edge_type.strip():
|
||
issues.append("graph: invalid_edge_type")
|
||
continue
|
||
valid_edges.append(edge)
|
||
|
||
if len(valid_steps) > 1 and not valid_edges:
|
||
issues.append("graph: missing_edges")
|
||
|
||
if valid_edges and self._graph_has_cycle(valid_steps, valid_edges):
|
||
issues.append("graph: cycle")
|
||
|
||
expected_inputs: dict[int, set[int]] = {step: set() for step in valid_steps}
|
||
expected_outputs: dict[int, set[int]] = {step: set() for step in valid_steps}
|
||
reverse_adjacency: dict[int, set[int]] = {step: set() for step in valid_steps}
|
||
for edge in valid_edges:
|
||
src = edge["from_step"]
|
||
dst = edge["to_step"]
|
||
expected_outputs[src].add(dst)
|
||
expected_inputs[dst].add(src)
|
||
reverse_adjacency[dst].add(src)
|
||
|
||
if len(valid_steps) > 1 and valid_edges:
|
||
sink_steps = [
|
||
step for step in valid_steps if len(expected_outputs.get(step, set())) == 0
|
||
]
|
||
if len(sink_steps) > 1:
|
||
issues.append("graph:ambiguous_terminal")
|
||
reachable_to_terminal: set[int] = set()
|
||
stack: list[int] = list(sink_steps)
|
||
while stack:
|
||
current = stack.pop()
|
||
if current in reachable_to_terminal:
|
||
continue
|
||
reachable_to_terminal.add(current)
|
||
stack.extend(reverse_adjacency.get(current, set()))
|
||
|
||
for step in sorted(valid_steps - reachable_to_terminal):
|
||
issues.append(f"graph: unreachable_to_terminal:{step}")
|
||
|
||
for node in nodes:
|
||
step = node.get("step")
|
||
if not isinstance(step, int):
|
||
continue
|
||
actual_inputs = set(self._normalize_int_list(node.get("input_connected_from")))
|
||
actual_outputs = set(self._normalize_int_list(node.get("output_connected_to")))
|
||
if actual_inputs != expected_inputs.get(step, set()):
|
||
issues.append(f"graph: node_link_mismatch_input:{step}")
|
||
if actual_outputs != expected_outputs.get(step, set()):
|
||
issues.append(f"graph: node_link_mismatch_output:{step}")
|
||
|
||
return issues
|
||
|
||
def _graph_has_cycle(
|
||
self,
|
||
valid_steps: set[int],
|
||
edges: list[dict[str, Any]],
|
||
) -> bool:
|
||
adjacency: dict[int, set[int]] = {step: set() for step in valid_steps}
|
||
for edge in edges:
|
||
adjacency.setdefault(edge["from_step"], set()).add(edge["to_step"])
|
||
|
||
visiting: set[int] = set()
|
||
visited: set[int] = set()
|
||
|
||
def dfs(step: int) -> bool:
|
||
if step in visiting:
|
||
return True
|
||
if step in visited:
|
||
return False
|
||
visiting.add(step)
|
||
for neighbor in adjacency.get(step, set()):
|
||
if dfs(neighbor):
|
||
return True
|
||
visiting.remove(step)
|
||
visited.add(step)
|
||
return False
|
||
|
||
return any(dfs(step) for step in valid_steps)
|
||
|
||
def _build_chat_reply_ru(
|
||
self, nodes: list[dict[str, Any]], edges: list[dict[str, Any]]
|
||
) -> str:
|
||
if not nodes:
|
||
return "Мне не удалось построить шаги выполнения."
|
||
|
||
steps = sorted(nodes, key=lambda n: n.get("step", 0))
|
||
if len(steps) > 1 and not edges:
|
||
return (
|
||
"Мне удалось выделить шаги, но не удалось корректно связать их "
|
||
"в исполнимый сценарий."
|
||
)
|
||
is_linear = self._is_linear_chain(steps, edges)
|
||
|
||
if is_linear:
|
||
names = [f"{n.get('step')}. {n.get('name')}" for n in steps]
|
||
return "План выполнения: " + " -> ".join(names)
|
||
|
||
lines = ["План выполнения:"]
|
||
for node in steps:
|
||
lines.append(f"Шаг {node.get('step')}: {node.get('name')}")
|
||
return "\n".join(lines)
|
||
|
||
def _build_pipeline_name(self, message: str) -> str:
|
||
cleaned = message.strip().split("\n", 1)[0]
|
||
return cleaned[:120] if cleaned else "Generated pipeline"
|
||
|
||
def _is_low_quality_message(self, message: str) -> bool:
|
||
normalized = (message or "").strip().lower()
|
||
if not normalized:
|
||
return True
|
||
|
||
tokens = self._tokenize_text(normalized)
|
||
if not tokens:
|
||
return True
|
||
|
||
explicit_noise = {"писяпопа", "asdf", "qwerty", "лол", "хз", "test", "тест"}
|
||
if any(token in explicit_noise for token in tokens):
|
||
return True
|
||
|
||
intent_markers = {
|
||
"сдел",
|
||
"получ",
|
||
"отправ",
|
||
"найд",
|
||
"сегмент",
|
||
"собер",
|
||
"рассыл",
|
||
"обнов",
|
||
"созд",
|
||
"удал",
|
||
"assign",
|
||
"send",
|
||
"segment",
|
||
"build",
|
||
"get",
|
||
"email",
|
||
"user",
|
||
"hotel",
|
||
"pipeline",
|
||
}
|
||
has_intent = any(marker in normalized for marker in intent_markers)
|
||
if len(tokens) == 1 and not has_intent:
|
||
return True
|
||
if len(tokens) <= 2 and not has_intent and len(normalized) < 18:
|
||
return True
|
||
return False
|
||
|
||
def _tokenize_text(self, value: str) -> set[str]:
|
||
tokens = set(re.findall(r"[a-zA-Zа-яА-Я0-9]+", value.lower()))
|
||
return {token for token in tokens if len(token) >= 3}
|
||
|
||
def _extract_primary_type(self, node: dict[str, Any], field: str) -> str | None:
|
||
endpoints = node.get("endpoints") or []
|
||
if not endpoints:
|
||
return None
|
||
for endpoint in endpoints:
|
||
if not isinstance(endpoint, dict):
|
||
continue
|
||
raw = endpoint.get(field)
|
||
if isinstance(raw, str) and raw.strip():
|
||
return raw
|
||
if isinstance(raw, dict):
|
||
endpoint_type = raw.get("type")
|
||
if isinstance(endpoint_type, str) and endpoint_type.strip():
|
||
return endpoint_type
|
||
return None
|
||
|
||
def _extract_required_inputs(
|
||
self, input_schema: dict[str, Any] | None
|
||
) -> list[str]:
|
||
if not isinstance(input_schema, dict):
|
||
return []
|
||
required = input_schema.get("required")
|
||
if isinstance(required, list):
|
||
return [str(item) for item in required if item]
|
||
return []
|
||
|
||
def _edge_matches_expected_inputs(
|
||
self,
|
||
*,
|
||
edge_type: Any,
|
||
required_inputs: set[str],
|
||
explicit_types: set[str],
|
||
) -> bool:
|
||
if not isinstance(edge_type, str):
|
||
return False
|
||
if not required_inputs and not explicit_types:
|
||
return True
|
||
for expected in required_inputs | explicit_types:
|
||
if self._field_alias_matches(edge_type=edge_type, expected_input=expected):
|
||
return True
|
||
return False
|
||
|
||
def _is_input_satisfied_by_values(
|
||
self,
|
||
required_input: str,
|
||
candidate_values: set[str],
|
||
) -> bool:
|
||
for candidate in candidate_values:
|
||
if self._field_alias_matches(edge_type=candidate, expected_input=required_input):
|
||
return True
|
||
return False
|
||
|
||
def _field_alias_matches(self, *, edge_type: str, expected_input: str) -> bool:
|
||
left = str(edge_type).strip()
|
||
right = str(expected_input).strip()
|
||
if not left or not right:
|
||
return False
|
||
if left == right:
|
||
return True
|
||
|
||
left_base = left[:-2] if left.endswith("[]") else left
|
||
right_base = right[:-2] if right.endswith("[]") else right
|
||
if left_base == right_base:
|
||
return True
|
||
|
||
left_normalized = self._normalize_lookup_token(left_base)
|
||
right_normalized = self._normalize_lookup_token(right_base)
|
||
if left_normalized and right_normalized and left_normalized == right_normalized:
|
||
return True
|
||
|
||
left_tokens = self._tokenize_field_name(left_base)
|
||
right_tokens = self._tokenize_field_name(right_base)
|
||
return bool(left_tokens and right_tokens and left_tokens == right_tokens)
|
||
|
||
def _extract_required_inputs_from_node(self, node: dict[str, Any]) -> list[str]:
|
||
endpoints = node.get("endpoints") or []
|
||
if not endpoints:
|
||
return []
|
||
required_inputs: list[str] = []
|
||
seen: set[str] = set()
|
||
for endpoint in endpoints:
|
||
if not isinstance(endpoint, dict):
|
||
continue
|
||
input_type = endpoint.get("input_type")
|
||
if not isinstance(input_type, dict):
|
||
continue
|
||
for input_name in self._extract_required_inputs(input_type):
|
||
normalized = str(input_name).strip()
|
||
if not normalized or normalized in seen:
|
||
continue
|
||
seen.add(normalized)
|
||
required_inputs.append(normalized)
|
||
return required_inputs
|
||
|
||
def _normalize_int_list(self, value: Any) -> list[int]:
|
||
if not isinstance(value, list):
|
||
return []
|
||
result = []
|
||
for item in value:
|
||
if isinstance(item, int):
|
||
result.append(item)
|
||
continue
|
||
if isinstance(item, str):
|
||
stripped = item.strip()
|
||
if stripped.lstrip("-").isdigit():
|
||
result.append(int(stripped))
|
||
return result
|
||
|
||
def _normalize_input_data_types(self, value: Any) -> list[dict[str, Any]]:
|
||
if not isinstance(value, list):
|
||
return []
|
||
result = []
|
||
for item in value:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
from_step = item.get("from_step") or item.get("step")
|
||
edge_type = item.get("type") or item.get("output") or item.get("data_type")
|
||
normalized_from_step: int | None = None
|
||
if isinstance(from_step, int):
|
||
normalized_from_step = from_step
|
||
elif isinstance(from_step, str):
|
||
stripped = from_step.strip()
|
||
if stripped.lstrip("-").isdigit():
|
||
normalized_from_step = int(stripped)
|
||
if normalized_from_step is not None and isinstance(edge_type, str):
|
||
result.append({"from_step": normalized_from_step, "type": edge_type})
|
||
return result
|
||
|
||
def _register_step_alias(self, step_map: dict[Any, int], alias: Any, step: int) -> None:
|
||
if alias is None:
|
||
return
|
||
step_map[alias] = step
|
||
if isinstance(alias, str):
|
||
stripped = alias.strip()
|
||
if stripped:
|
||
step_map[stripped] = step
|
||
if stripped.lstrip("-").isdigit():
|
||
step_map[int(stripped)] = step
|
||
step_map[stripped.lower()] = step
|
||
if isinstance(alias, int):
|
||
step_map[str(alias)] = step
|
||
|
||
def _resolve_step_reference(
|
||
self,
|
||
raw_ref: Any,
|
||
*,
|
||
step_map: dict[Any, int],
|
||
known_steps: set[int],
|
||
) -> int | None:
|
||
if isinstance(raw_ref, int):
|
||
if raw_ref in known_steps:
|
||
return raw_ref
|
||
mapped = step_map.get(raw_ref)
|
||
if isinstance(mapped, int):
|
||
return mapped
|
||
return raw_ref
|
||
|
||
if isinstance(raw_ref, str):
|
||
stripped = raw_ref.strip()
|
||
if not stripped:
|
||
return None
|
||
mapped = step_map.get(stripped)
|
||
if isinstance(mapped, int):
|
||
return mapped
|
||
mapped = step_map.get(stripped.lower())
|
||
if isinstance(mapped, int):
|
||
return mapped
|
||
if stripped.lstrip("-").isdigit():
|
||
numeric = int(stripped)
|
||
if numeric in known_steps:
|
||
return numeric
|
||
mapped = step_map.get(numeric)
|
||
if isinstance(mapped, int):
|
||
return mapped
|
||
return numeric
|
||
|
||
mapped = step_map.get(raw_ref)
|
||
if isinstance(mapped, int):
|
||
return mapped
|
||
return None
|
||
|
||
def _normalize_str_list(self, value: Any) -> list[str]:
|
||
if not isinstance(value, list):
|
||
return []
|
||
return [str(item) for item in value if isinstance(item, (str, int))]
|
||
|
||
def _edge_creates_cycle(
|
||
self, edges: list[dict[str, Any]], from_step: int, to_step: int
|
||
) -> bool:
|
||
adjacency: dict[int, set[int]] = {}
|
||
for edge in edges:
|
||
src = edge.get("from_step")
|
||
dst = edge.get("to_step")
|
||
if isinstance(src, int) and isinstance(dst, int):
|
||
adjacency.setdefault(src, set()).add(dst)
|
||
adjacency.setdefault(from_step, set()).add(to_step)
|
||
|
||
visiting: set[int] = set()
|
||
visited: set[int] = set()
|
||
|
||
def dfs(node: int) -> bool:
|
||
if node in visiting:
|
||
return True
|
||
if node in visited:
|
||
return False
|
||
visiting.add(node)
|
||
for neighbor in adjacency.get(node, set()):
|
||
if dfs(neighbor):
|
||
return True
|
||
visiting.remove(node)
|
||
visited.add(node)
|
||
return False
|
||
|
||
return any(dfs(node) for node in list(adjacency.keys()))
|
||
|
||
def _is_linear_chain(
|
||
self, nodes: list[dict[str, Any]], edges: list[dict[str, Any]]
|
||
) -> bool:
|
||
if not nodes:
|
||
return False
|
||
if len(nodes) == 1:
|
||
return True
|
||
if len(edges) != len(nodes) - 1:
|
||
return False
|
||
|
||
outgoing = {n["step"]: set() for n in nodes}
|
||
incoming = {n["step"]: set() for n in nodes}
|
||
for edge in edges:
|
||
src = edge.get("from_step")
|
||
dst = edge.get("to_step")
|
||
if src in outgoing and dst in incoming:
|
||
outgoing[src].add(dst)
|
||
incoming[dst].add(src)
|
||
|
||
for step in outgoing:
|
||
if len(outgoing[step]) > 1:
|
||
return False
|
||
for step in incoming:
|
||
if len(incoming[step]) > 1:
|
||
return False
|
||
|
||
return True
|