Files
2026-03-17 18:32:44 +03:00

169 lines
5.8 KiB
Python

from __future__ import annotations
from typing import Any
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Request, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database.session import get_session
from app.models import ExecutionRun, ExecutionStepRun, Pipeline, User, UserRole
from app.schemas.execution_sch import ExecutionRunDetailResponse, ExecutionStepRunResponse
from app.utils.business_logger import log_business_event
from app.utils.token_manager import get_current_user
router = APIRouter(tags=["Executions"])
KNOWN_HTTP_METHODS = {"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"}
REQUEST_BODY_METHODS = {"POST", "PUT", "PATCH"}
def _extract_method(request_snapshot: dict[str, Any] | None) -> str | None:
if not isinstance(request_snapshot, dict):
return None
method_raw = request_snapshot.get("method")
if not isinstance(method_raw, str):
return None
method = method_raw.upper()
if method in KNOWN_HTTP_METHODS:
return method
return None
def _extract_status_code(response_snapshot: dict[str, Any] | None) -> int | None:
if not isinstance(response_snapshot, dict):
return None
status_code_raw = response_snapshot.get("status_code")
if isinstance(status_code_raw, int):
return status_code_raw
if isinstance(status_code_raw, str) and status_code_raw.isdigit():
return int(status_code_raw)
return None
def _extract_accepted_payload(
*,
method: str | None,
request_snapshot: dict[str, Any] | None,
) -> Any:
if method not in REQUEST_BODY_METHODS:
return None
if not isinstance(request_snapshot, dict):
return None
return request_snapshot.get("json_body")
def _extract_output_payload(response_snapshot: dict[str, Any] | None) -> Any:
if not isinstance(response_snapshot, dict):
return None
return response_snapshot.get("body")
def _build_step_run_response(step_run: ExecutionStepRun) -> ExecutionStepRunResponse:
status_value = step_run.status.value if hasattr(step_run.status, "value") else step_run.status
base = ExecutionStepRunResponse(
step=step_run.step,
name=step_run.name,
capability_id=step_run.capability_id,
action_id=step_run.action_id,
status=status_value,
resolved_inputs=step_run.resolved_inputs,
request_snapshot=step_run.request_snapshot,
response_snapshot=step_run.response_snapshot,
error=step_run.error,
started_at=step_run.started_at,
finished_at=step_run.finished_at,
duration_ms=step_run.duration_ms,
created_at=step_run.created_at,
updated_at=step_run.updated_at,
)
request_snapshot = base.request_snapshot if isinstance(base.request_snapshot, dict) else None
response_snapshot = base.response_snapshot if isinstance(base.response_snapshot, dict) else None
method = _extract_method(request_snapshot)
status_code = _extract_status_code(response_snapshot)
accepted_payload = _extract_accepted_payload(method=method, request_snapshot=request_snapshot)
output_payload = _extract_output_payload(response_snapshot)
return base.model_copy(
update={
"method": method,
"status_code": status_code,
"accepted_payload": accepted_payload,
"output_payload": output_payload,
}
)
@router.get("/{run_id}", response_model=ExecutionRunDetailResponse)
async def get_execution(
run_id: UUID,
request: Request,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_user),
):
trace_id = getattr(request.state, "traceId", None)
run = await session.get(ExecutionRun, run_id)
if run is None:
log_business_event(
"execution_fetch_rejected",
trace_id=trace_id,
user_id=str(current_user.id),
run_id=str(run_id),
reason="run_not_found",
)
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Execution run not found")
if current_user.role != UserRole.ADMIN:
is_owner = run.initiated_by == current_user.id
if not is_owner and run.initiated_by is None:
pipeline = await session.get(Pipeline, run.pipeline_id)
is_owner = pipeline is not None and pipeline.created_by == current_user.id
if not is_owner:
log_business_event(
"execution_fetch_rejected",
trace_id=trace_id,
user_id=str(current_user.id),
run_id=str(run.id),
pipeline_id=str(run.pipeline_id),
reason="run_not_found_or_forbidden",
)
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Execution run not found")
step_query = (
select(ExecutionStepRun)
.where(ExecutionStepRun.run_id == run.id)
.order_by(ExecutionStepRun.step.asc(), ExecutionStepRun.created_at.asc())
)
step_result = await session.execute(step_query)
step_runs = list(step_result.scalars().all())
log_business_event(
"execution_fetched",
trace_id=trace_id,
user_id=str(current_user.id),
run_id=str(run.id),
pipeline_id=str(run.pipeline_id),
result_status=run.status.value,
step_count=len(step_runs),
)
return ExecutionRunDetailResponse(
id=run.id,
pipeline_id=run.pipeline_id,
status=run.status.value,
inputs=run.inputs or {},
summary=run.summary,
error=run.error,
started_at=run.started_at,
finished_at=run.finished_at,
created_at=run.created_at,
updated_at=run.updated_at,
steps=[
_build_step_run_response(step_run)
for step_run in step_runs
],
)