Files
2026-03-17 18:32:44 +03:00

372 lines
13 KiB
Python

from __future__ import annotations
import re
from typing import Any
import yaml
from app.models import ActionIngestStatus, HttpMethod
class OpenAPIService:
SUPPORTED_METHODS = {method.value.lower(): method for method in HttpMethod}
JSON_CONTENT_TYPES = ("application/json", "application/*+json")
@staticmethod
def load_document(raw_bytes: bytes) -> dict[str, Any]:
if not raw_bytes:
raise ValueError("OpenAPI file is empty")
try:
document = yaml.safe_load(raw_bytes.decode("utf-8"))
except UnicodeDecodeError as exc:
raise ValueError("OpenAPI file must be UTF-8 encoded") from exc
except yaml.YAMLError as exc:
raise ValueError("OpenAPI file is not valid YAML or JSON") from exc
if not isinstance(document, dict):
raise ValueError("OpenAPI root must be an object")
openapi_version = document.get("openapi")
if not isinstance(openapi_version, str) or not openapi_version.startswith("3."):
raise ValueError("Only OpenAPI 3.x documents are supported")
if not isinstance(document.get("paths"), dict) or not document["paths"]:
raise ValueError("OpenAPI file must contain a non-empty paths section")
base_url = OpenAPIService._extract_base_url(document)
if base_url is None:
raise ValueError(
"OpenAPI file must contain servers[0].url (base_url)"
)
return document
@classmethod
def extract_actions(
cls,
document: dict[str, Any],
*,
source_filename: str | None = None,
) -> list[dict[str, Any]]:
return cls.extract_actions_with_failures(document, source_filename=source_filename)["succeeded"]
@classmethod
def extract_actions_with_failures(
cls,
document: dict[str, Any],
*,
source_filename: str | None = None,
) -> dict[str, list[dict[str, Any]]]:
base_url = cls._extract_base_url(document)
succeeded_actions: list[dict[str, Any]] = []
failed_actions: list[dict[str, Any]] = []
for path, path_item in document.get("paths", {}).items():
if not isinstance(path_item, dict):
continue
shared_parameters = path_item.get("parameters", [])
for method_name, operation in path_item.items():
if method_name not in cls.SUPPORTED_METHODS:
continue
if not isinstance(operation, dict):
failed_actions.append(
cls._build_failed_action_payload(
method_name=method_name,
path=path,
base_url=base_url,
source_filename=source_filename,
raw_spec=operation,
error_message="Operation definition must be an object",
)
)
continue
try:
succeeded_actions.append(
cls._build_succeeded_action_payload(
method_name=method_name,
path=path,
operation=operation,
shared_parameters=shared_parameters,
document=document,
base_url=base_url,
source_filename=source_filename,
)
)
except ValueError as exc:
failed_actions.append(
cls._build_failed_action_payload(
method_name=method_name,
path=path,
base_url=base_url,
source_filename=source_filename,
raw_spec=operation,
error_message=str(exc),
)
)
return {
"succeeded": succeeded_actions,
"failed": failed_actions,
}
@classmethod
def _build_succeeded_action_payload(
cls,
*,
method_name: str,
path: str,
operation: dict[str, Any],
shared_parameters: list[Any] | None,
document: dict[str, Any],
base_url: str | None,
source_filename: str | None,
) -> dict[str, Any]:
normalized_operation = cls._dereference(operation, document)
parameters = cls._merge_parameters(shared_parameters, normalized_operation.get("parameters", []), document)
return {
"operation_id": normalized_operation.get("operationId") or cls._build_operation_id(method_name, path),
"method": cls.SUPPORTED_METHODS[method_name],
"path": path,
"base_url": base_url,
"summary": normalized_operation.get("summary"),
"description": normalized_operation.get("description"),
"tags": normalized_operation.get("tags"),
"parameters_schema": cls._build_parameters_schema(parameters, document),
"request_body_schema": cls._extract_request_body_schema(normalized_operation, document),
"response_schema": cls._extract_response_schema(normalized_operation, document),
"source_filename": source_filename,
"raw_spec": normalized_operation,
"ingest_status": ActionIngestStatus.SUCCEEDED,
"ingest_error": None,
}
@classmethod
def _build_failed_action_payload(
cls,
*,
method_name: str,
path: str,
base_url: str | None,
source_filename: str | None,
raw_spec: Any,
error_message: str,
) -> dict[str, Any]:
operation = raw_spec if isinstance(raw_spec, dict) else {}
return {
"operation_id": operation.get("operationId") or cls._build_operation_id(method_name, path),
"method": cls.SUPPORTED_METHODS[method_name],
"path": path,
"base_url": base_url,
"summary": operation.get("summary"),
"description": operation.get("description"),
"tags": operation.get("tags"),
"parameters_schema": None,
"request_body_schema": None,
"response_schema": None,
"source_filename": source_filename,
"raw_spec": operation or None,
"ingest_status": ActionIngestStatus.FAILED,
"ingest_error": error_message,
}
@staticmethod
def _extract_base_url(document: dict[str, Any]) -> str | None:
servers = document.get("servers")
if isinstance(servers, list) and servers:
first_server = servers[0]
if isinstance(first_server, dict):
url = first_server.get("url")
if isinstance(url, str):
normalized_url = url.strip()
if normalized_url:
return normalized_url
return None
@classmethod
def _merge_parameters(
cls,
path_parameters: list[Any] | None,
operation_parameters: list[Any] | None,
document: dict[str, Any],
) -> list[dict[str, Any]]:
merged: dict[tuple[str | None, str | None], dict[str, Any]] = {}
for raw_parameter in (path_parameters or []) + (operation_parameters or []):
parameter = cls._dereference(raw_parameter, document)
if not isinstance(parameter, dict):
continue
key = (parameter.get("name"), parameter.get("in"))
merged[key] = parameter
return list(merged.values())
@classmethod
def _build_parameters_schema(
cls,
parameters: list[dict[str, Any]],
document: dict[str, Any],
) -> dict[str, Any] | None:
if not parameters:
return None
properties: dict[str, Any] = {}
required: list[str] = []
for parameter in parameters:
name = parameter.get("name")
if not name:
continue
if parameter.get("in") not in {"query", "path", "header", "cookie"}:
continue
schema = parameter.get("schema")
if schema is None:
schema = cls._extract_schema_from_content(parameter.get("content"), document)
else:
schema = cls._dereference(schema, document)
property_schema = schema if isinstance(schema, dict) else {"type": "string"}
property_schema = {
**property_schema,
"x-parameter-location": parameter.get("in"),
}
if parameter.get("description"):
property_schema["description"] = parameter["description"]
properties[name] = property_schema
if parameter.get("required"):
required.append(name)
if not properties:
return None
schema: dict[str, Any] = {
"type": "object",
"properties": properties,
}
if required:
schema["required"] = required
return schema
@classmethod
def _extract_request_body_schema(
cls,
operation: dict[str, Any],
document: dict[str, Any],
) -> dict[str, Any] | None:
request_body = operation.get("requestBody")
if not isinstance(request_body, dict):
return None
request_body = cls._dereference(request_body, document)
schema = cls._extract_schema_from_content(request_body.get("content"), document)
if not isinstance(schema, dict):
return None
if request_body.get("required"):
schema = {**schema, "x-required": True}
return schema
@classmethod
def _extract_response_schema(
cls,
operation: dict[str, Any],
document: dict[str, Any],
) -> dict[str, Any] | None:
responses = operation.get("responses")
if not isinstance(responses, dict):
return None
for status_code, response in responses.items():
if not str(status_code).startswith("2"):
continue
normalized_response = cls._dereference(response, document)
if not isinstance(normalized_response, dict):
continue
schema = cls._extract_schema_from_content(normalized_response.get("content"), document)
if isinstance(schema, dict):
return schema
if normalized_response.get("description"):
return {"description": normalized_response["description"]}
return None
@classmethod
def _extract_schema_from_content(cls, content: Any, document: dict[str, Any]) -> dict[str, Any] | None:
if not isinstance(content, dict):
return None
preferred_content_type = next((content_type for content_type in cls.JSON_CONTENT_TYPES if content_type in content), None)
items = []
if preferred_content_type:
items.append((preferred_content_type, content[preferred_content_type]))
items.extend((content_type, value) for content_type, value in content.items() if content_type != preferred_content_type)
for content_type, value in items:
if not isinstance(value, dict):
continue
schema = value.get("schema")
if not isinstance(schema, dict):
continue
normalized_schema = cls._dereference(schema, document)
if isinstance(normalized_schema, dict):
return {
**normalized_schema,
"x-content-type": content_type,
}
return None
@classmethod
def _dereference(cls, value: Any, document: dict[str, Any]) -> Any:
if isinstance(value, list):
return [cls._dereference(item, document) for item in value]
if not isinstance(value, dict):
return value
if "$ref" in value:
resolved = cls._resolve_ref(value["$ref"], document)
merged = cls._dereference(resolved, document)
if not isinstance(merged, dict):
return merged
sibling_fields = {key: cls._dereference(item, document) for key, item in value.items() if key != "$ref"}
return {**merged, **sibling_fields}
return {key: cls._dereference(item, document) for key, item in value.items()}
@staticmethod
def _resolve_ref(ref: str, document: dict[str, Any]) -> Any:
if not ref.startswith("#/"):
raise ValueError(f"Only local $ref values are supported, got: {ref}")
current: Any = document
for part in ref[2:].split("/"):
token = part.replace("~1", "/").replace("~0", "~")
if not isinstance(current, dict) or token not in current:
raise ValueError(f"Could not resolve OpenAPI reference: {ref}")
current = current[token]
return current
@staticmethod
def _build_operation_id(method_name: str, path: str) -> str:
normalized_path = re.sub(r"[{}]", "", path).strip("/")
normalized_path = re.sub(r"[^a-zA-Z0-9/]+", "_", normalized_path)
normalized_path = normalized_path.replace("/", "_") or "root"
return f"{method_name.lower()}_{normalized_path.lower()}"