169 lines
5.8 KiB
Python
169 lines
5.8 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
from uuid import UUID
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request, status
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.database.session import get_session
|
|
from app.models import ExecutionRun, ExecutionStepRun, Pipeline, User, UserRole
|
|
from app.schemas.execution_sch import ExecutionRunDetailResponse, ExecutionStepRunResponse
|
|
from app.utils.business_logger import log_business_event
|
|
from app.utils.token_manager import get_current_user
|
|
|
|
|
|
router = APIRouter(tags=["Executions"])
|
|
KNOWN_HTTP_METHODS = {"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"}
|
|
REQUEST_BODY_METHODS = {"POST", "PUT", "PATCH"}
|
|
|
|
|
|
def _extract_method(request_snapshot: dict[str, Any] | None) -> str | None:
|
|
if not isinstance(request_snapshot, dict):
|
|
return None
|
|
|
|
method_raw = request_snapshot.get("method")
|
|
if not isinstance(method_raw, str):
|
|
return None
|
|
|
|
method = method_raw.upper()
|
|
if method in KNOWN_HTTP_METHODS:
|
|
return method
|
|
return None
|
|
|
|
|
|
def _extract_status_code(response_snapshot: dict[str, Any] | None) -> int | None:
|
|
if not isinstance(response_snapshot, dict):
|
|
return None
|
|
|
|
status_code_raw = response_snapshot.get("status_code")
|
|
if isinstance(status_code_raw, int):
|
|
return status_code_raw
|
|
if isinstance(status_code_raw, str) and status_code_raw.isdigit():
|
|
return int(status_code_raw)
|
|
return None
|
|
|
|
|
|
def _extract_accepted_payload(
|
|
*,
|
|
method: str | None,
|
|
request_snapshot: dict[str, Any] | None,
|
|
) -> Any:
|
|
if method not in REQUEST_BODY_METHODS:
|
|
return None
|
|
if not isinstance(request_snapshot, dict):
|
|
return None
|
|
return request_snapshot.get("json_body")
|
|
|
|
|
|
def _extract_output_payload(response_snapshot: dict[str, Any] | None) -> Any:
|
|
if not isinstance(response_snapshot, dict):
|
|
return None
|
|
return response_snapshot.get("body")
|
|
|
|
|
|
def _build_step_run_response(step_run: ExecutionStepRun) -> ExecutionStepRunResponse:
|
|
status_value = step_run.status.value if hasattr(step_run.status, "value") else step_run.status
|
|
base = ExecutionStepRunResponse(
|
|
step=step_run.step,
|
|
name=step_run.name,
|
|
capability_id=step_run.capability_id,
|
|
action_id=step_run.action_id,
|
|
status=status_value,
|
|
resolved_inputs=step_run.resolved_inputs,
|
|
request_snapshot=step_run.request_snapshot,
|
|
response_snapshot=step_run.response_snapshot,
|
|
error=step_run.error,
|
|
started_at=step_run.started_at,
|
|
finished_at=step_run.finished_at,
|
|
duration_ms=step_run.duration_ms,
|
|
created_at=step_run.created_at,
|
|
updated_at=step_run.updated_at,
|
|
)
|
|
request_snapshot = base.request_snapshot if isinstance(base.request_snapshot, dict) else None
|
|
response_snapshot = base.response_snapshot if isinstance(base.response_snapshot, dict) else None
|
|
method = _extract_method(request_snapshot)
|
|
status_code = _extract_status_code(response_snapshot)
|
|
accepted_payload = _extract_accepted_payload(method=method, request_snapshot=request_snapshot)
|
|
output_payload = _extract_output_payload(response_snapshot)
|
|
return base.model_copy(
|
|
update={
|
|
"method": method,
|
|
"status_code": status_code,
|
|
"accepted_payload": accepted_payload,
|
|
"output_payload": output_payload,
|
|
}
|
|
)
|
|
|
|
|
|
@router.get("/{run_id}", response_model=ExecutionRunDetailResponse)
|
|
async def get_execution(
|
|
run_id: UUID,
|
|
request: Request,
|
|
session: AsyncSession = Depends(get_session),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
trace_id = getattr(request.state, "traceId", None)
|
|
run = await session.get(ExecutionRun, run_id)
|
|
if run is None:
|
|
log_business_event(
|
|
"execution_fetch_rejected",
|
|
trace_id=trace_id,
|
|
user_id=str(current_user.id),
|
|
run_id=str(run_id),
|
|
reason="run_not_found",
|
|
)
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Execution run not found")
|
|
|
|
if current_user.role != UserRole.ADMIN:
|
|
is_owner = run.initiated_by == current_user.id
|
|
if not is_owner and run.initiated_by is None:
|
|
pipeline = await session.get(Pipeline, run.pipeline_id)
|
|
is_owner = pipeline is not None and pipeline.created_by == current_user.id
|
|
if not is_owner:
|
|
log_business_event(
|
|
"execution_fetch_rejected",
|
|
trace_id=trace_id,
|
|
user_id=str(current_user.id),
|
|
run_id=str(run.id),
|
|
pipeline_id=str(run.pipeline_id),
|
|
reason="run_not_found_or_forbidden",
|
|
)
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Execution run not found")
|
|
|
|
step_query = (
|
|
select(ExecutionStepRun)
|
|
.where(ExecutionStepRun.run_id == run.id)
|
|
.order_by(ExecutionStepRun.step.asc(), ExecutionStepRun.created_at.asc())
|
|
)
|
|
step_result = await session.execute(step_query)
|
|
step_runs = list(step_result.scalars().all())
|
|
|
|
log_business_event(
|
|
"execution_fetched",
|
|
trace_id=trace_id,
|
|
user_id=str(current_user.id),
|
|
run_id=str(run.id),
|
|
pipeline_id=str(run.pipeline_id),
|
|
result_status=run.status.value,
|
|
step_count=len(step_runs),
|
|
)
|
|
|
|
return ExecutionRunDetailResponse(
|
|
id=run.id,
|
|
pipeline_id=run.pipeline_id,
|
|
status=run.status.value,
|
|
inputs=run.inputs or {},
|
|
summary=run.summary,
|
|
error=run.error,
|
|
started_at=run.started_at,
|
|
finished_at=run.finished_at,
|
|
created_at=run.created_at,
|
|
updated_at=run.updated_at,
|
|
steps=[
|
|
_build_step_run_response(step_run)
|
|
for step_run in step_runs
|
|
],
|
|
)
|