Files
2026-03-17 18:32:44 +03:00

391 lines
10 KiB
Python

from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Annotated
from fastapi import FastAPI, Query
from pydantic import BaseModel, Field
class User(BaseModel):
id: str
email: str
last_active: datetime
class Hotel(BaseModel):
id: str
name: str
city: str
class Segment(BaseModel):
segment_id: str
hotel_id: str
user_ids: list[str] = Field(default_factory=list)
class Assignment(BaseModel):
user_id: str
hotel_id: str
class RecentUsersResponse(BaseModel):
users: list[User] = Field(default_factory=list)
class TopHotelsResponse(BaseModel):
hotels: list[Hotel] = Field(default_factory=list)
class HotelSegmentsRequest(BaseModel):
users: list[User] = Field(default_factory=list)
hotels: list[Hotel] = Field(default_factory=list)
class HotelSegmentsResponse(BaseModel):
segments: list[Segment] = Field(default_factory=list)
class AssignmentsRequest(BaseModel):
segments: list[Segment] = Field(default_factory=list)
class AssignmentsResponse(BaseModel):
assignments: list[Assignment] = Field(default_factory=list)
class EmailOfferRequest(BaseModel):
template_id: str = "offer_template_2026"
assignments: list[Assignment] = Field(default_factory=list)
class FailedDelivery(BaseModel):
user_id: str
reason: str
class EmailOfferResponse(BaseModel):
sent_count: int
failed_count: int
failed: list[FailedDelivery] = Field(default_factory=list)
class Lead(BaseModel):
lead_id: str
email: str
source: str
class QualifiedLead(BaseModel):
lead_id: str
email: str
score: int
tier: str
class PreparedOffer(BaseModel):
offer_id: str
lead_id: str
channel: str
message: str
class RecentLeadsResponse(BaseModel):
leads: list[Lead] = Field(default_factory=list)
class QualifyLeadsRequest(BaseModel):
leads: list[Lead] = Field(default_factory=list)
class QualifyLeadsResponse(BaseModel):
qualified_leads: list[QualifiedLead] = Field(default_factory=list)
class PrepareOffersRequest(BaseModel):
qualified_leads: list[QualifiedLead] = Field(default_factory=list)
class PrepareOffersResponse(BaseModel):
offers: list[PreparedOffer] = Field(default_factory=list)
class SendOffersRequest(BaseModel):
offers: list[PreparedOffer] = Field(default_factory=list)
class FailedLeadDelivery(BaseModel):
lead_id: str
reason: str
class SendOffersResponse(BaseModel):
sent_count: int
failed_count: int
failed: list[FailedLeadDelivery] = Field(default_factory=list)
APP_DESCRIPTION = """
Synthetic API with multiple linear demo workflows.
Travel workflow:
1. `GET /users/recent`
2. `GET /hotels/top`
3. `POST /segments/hotel`
4. `POST /assignments/hotels`
5. `POST /emails/send-offers`
CRM workflow:
1. `GET /crm/leads/recent`
2. `POST /crm/leads/qualify`
3. `POST /crm/offers/prepare`
4. `POST /crm/offers/send`
""".strip()
app = FastAPI(
title="Travel Product Manager API",
version="1.0.0",
description=APP_DESCRIPTION,
)
BASE_USERS_TS = datetime(2026, 3, 13, 10, 0, tzinfo=timezone.utc)
HOTEL_CATALOG: list[Hotel] = [
Hotel(id="hotel_001", name="Hotel Aurora", city="Berlin"),
Hotel(id="hotel_002", name="Sea Breeze Resort", city="Lisbon"),
Hotel(id="hotel_003", name="Mountain Vista", city="Zurich"),
Hotel(id="hotel_004", name="City Loft", city="Amsterdam"),
Hotel(id="hotel_005", name="River Palace", city="Prague"),
Hotel(id="hotel_006", name="Nordic Harbor", city="Stockholm"),
Hotel(id="hotel_007", name="Sunset Bay", city="Barcelona"),
Hotel(id="hotel_008", name="Alpine Crown", city="Vienna"),
]
def _build_users() -> list[User]:
users: list[User] = []
for idx in range(1, 31):
users.append(
User(
id=f"usr_{idx:03d}",
email=f"user{idx:03d}@example.com",
last_active=BASE_USERS_TS - timedelta(minutes=(idx - 1) * 5),
)
)
return users
def _build_recent_leads() -> list[Lead]:
leads: list[Lead] = []
sources = ["landing", "webinar", "partner", "organic"]
for idx in range(1, 21):
leads.append(
Lead(
lead_id=f"lead_{idx:03d}",
email=f"lead{idx:03d}@example.com",
source=sources[idx % len(sources)],
)
)
return leads
@app.get(
"/users/recent",
response_model=RecentUsersResponse,
operation_id="getRecentUsers",
tags=["travel-offer-workflow"],
)
async def get_recent_users(
last_active_after: Annotated[datetime | None, Query()] = None,
limit: Annotated[int, Query(ge=1, le=100)] = 30,
) -> RecentUsersResponse:
users = _build_users()
if last_active_after is not None:
users = [user for user in users if user.last_active > last_active_after]
return RecentUsersResponse(users=users[:limit])
@app.get(
"/hotels/top",
response_model=TopHotelsResponse,
operation_id="getTopHotels",
tags=["travel-offer-workflow"],
)
async def get_top_hotels(
limit: Annotated[int, Query(ge=1, le=20)] = 5,
city: Annotated[str | None, Query()] = None,
) -> TopHotelsResponse:
hotels = HOTEL_CATALOG
if city:
city_normalized = city.strip().lower()
hotels = [hotel for hotel in hotels if hotel.city.lower() == city_normalized]
return TopHotelsResponse(hotels=hotels[:limit])
@app.post(
"/segments/hotel",
response_model=HotelSegmentsResponse,
operation_id="segmentUsersByHotelPreferences",
tags=["travel-offer-workflow"],
)
async def segment_users_by_hotel_preferences(
payload: HotelSegmentsRequest,
) -> HotelSegmentsResponse:
if not payload.users or not payload.hotels:
return HotelSegmentsResponse(segments=[])
grouped: dict[str, list[str]] = {hotel.id: [] for hotel in payload.hotels}
for index, user in enumerate(payload.users):
hotel = payload.hotels[index % len(payload.hotels)]
grouped[hotel.id].append(user.id)
segments: list[Segment] = []
for hotel in payload.hotels:
user_ids = grouped.get(hotel.id, [])
if not user_ids:
continue
segments.append(
Segment(
segment_id=f"seg_{hotel.id}",
hotel_id=hotel.id,
user_ids=user_ids,
)
)
return HotelSegmentsResponse(segments=segments)
@app.post(
"/assignments/hotels",
response_model=AssignmentsResponse,
operation_id="assignUsersToHotels",
tags=["travel-offer-workflow"],
)
async def assign_users_to_hotels(payload: AssignmentsRequest) -> AssignmentsResponse:
assignments: list[Assignment] = []
for segment in payload.segments:
for user_id in segment.user_ids:
assignments.append(Assignment(user_id=user_id, hotel_id=segment.hotel_id))
return AssignmentsResponse(assignments=assignments)
@app.post(
"/emails/send-offers",
response_model=EmailOfferResponse,
status_code=200,
operation_id="sendHotelOffersByEmail",
tags=["travel-offer-workflow"],
)
async def send_hotel_offers_by_email(payload: EmailOfferRequest) -> EmailOfferResponse:
_ = payload.template_id
failed: list[FailedDelivery] = []
for assignment in payload.assignments:
if assignment.user_id.endswith("000"):
failed.append(
FailedDelivery(
user_id=assignment.user_id,
reason="Invalid user id for delivery",
)
)
sent_count = len(payload.assignments) - len(failed)
return EmailOfferResponse(
sent_count=sent_count,
failed_count=len(failed),
failed=failed,
)
@app.get(
"/crm/leads/recent",
response_model=RecentLeadsResponse,
operation_id="getRecentLeads",
tags=["crm-linear-workflow"],
)
async def get_recent_leads(
limit: Annotated[int, Query(ge=1, le=50)] = 20,
source: Annotated[str | None, Query()] = None,
) -> RecentLeadsResponse:
leads = _build_recent_leads()
if source:
source_normalized = source.strip().lower()
leads = [lead for lead in leads if lead.source.lower() == source_normalized]
return RecentLeadsResponse(leads=leads[:limit])
@app.post(
"/crm/leads/qualify",
response_model=QualifyLeadsResponse,
operation_id="qualifyLeadsForOffer",
tags=["crm-linear-workflow"],
)
async def qualify_leads_for_offer(payload: QualifyLeadsRequest) -> QualifyLeadsResponse:
qualified: list[QualifiedLead] = []
for index, lead in enumerate(payload.leads):
score = 55 + ((index * 7) % 45)
tier = "high" if score >= 80 else "medium" if score >= 65 else "low"
qualified.append(
QualifiedLead(
lead_id=lead.lead_id,
email=lead.email,
score=score,
tier=tier,
)
)
return QualifyLeadsResponse(qualified_leads=qualified)
@app.post(
"/crm/offers/prepare",
response_model=PrepareOffersResponse,
operation_id="prepareOffersForLeads",
tags=["crm-linear-workflow"],
)
async def prepare_offers_for_leads(payload: PrepareOffersRequest) -> PrepareOffersResponse:
offers: list[PreparedOffer] = []
for lead in payload.qualified_leads:
channel = "email" if lead.tier in {"high", "medium"} else "push"
offers.append(
PreparedOffer(
offer_id=f"offer_{lead.lead_id}",
lead_id=lead.lead_id,
channel=channel,
message=f"Special travel offer for {lead.tier} intent lead",
)
)
return PrepareOffersResponse(offers=offers)
@app.post(
"/crm/offers/send",
response_model=SendOffersResponse,
operation_id="sendPreparedOffers",
tags=["crm-linear-workflow"],
)
async def send_prepared_offers(payload: SendOffersRequest) -> SendOffersResponse:
failed: list[FailedLeadDelivery] = []
for offer in payload.offers:
if offer.lead_id.endswith("000"):
failed.append(
FailedLeadDelivery(
lead_id=offer.lead_id,
reason="Invalid lead for delivery",
)
)
sent_count = len(payload.offers) - len(failed)
return SendOffersResponse(
sent_count=sent_count,
failed_count=len(failed),
failed=failed,
)
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok"}