391 lines
10 KiB
Python
391 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Annotated
|
|
|
|
from fastapi import FastAPI, Query
|
|
from pydantic import BaseModel, Field
|
|
|
|
|
|
class User(BaseModel):
|
|
id: str
|
|
email: str
|
|
last_active: datetime
|
|
|
|
|
|
class Hotel(BaseModel):
|
|
id: str
|
|
name: str
|
|
city: str
|
|
|
|
|
|
class Segment(BaseModel):
|
|
segment_id: str
|
|
hotel_id: str
|
|
user_ids: list[str] = Field(default_factory=list)
|
|
|
|
|
|
class Assignment(BaseModel):
|
|
user_id: str
|
|
hotel_id: str
|
|
|
|
|
|
class RecentUsersResponse(BaseModel):
|
|
users: list[User] = Field(default_factory=list)
|
|
|
|
|
|
class TopHotelsResponse(BaseModel):
|
|
hotels: list[Hotel] = Field(default_factory=list)
|
|
|
|
|
|
class HotelSegmentsRequest(BaseModel):
|
|
users: list[User] = Field(default_factory=list)
|
|
hotels: list[Hotel] = Field(default_factory=list)
|
|
|
|
|
|
class HotelSegmentsResponse(BaseModel):
|
|
segments: list[Segment] = Field(default_factory=list)
|
|
|
|
|
|
class AssignmentsRequest(BaseModel):
|
|
segments: list[Segment] = Field(default_factory=list)
|
|
|
|
|
|
class AssignmentsResponse(BaseModel):
|
|
assignments: list[Assignment] = Field(default_factory=list)
|
|
|
|
|
|
class EmailOfferRequest(BaseModel):
|
|
template_id: str = "offer_template_2026"
|
|
assignments: list[Assignment] = Field(default_factory=list)
|
|
|
|
|
|
class FailedDelivery(BaseModel):
|
|
user_id: str
|
|
reason: str
|
|
|
|
|
|
class EmailOfferResponse(BaseModel):
|
|
sent_count: int
|
|
failed_count: int
|
|
failed: list[FailedDelivery] = Field(default_factory=list)
|
|
|
|
|
|
class Lead(BaseModel):
|
|
lead_id: str
|
|
email: str
|
|
source: str
|
|
|
|
|
|
class QualifiedLead(BaseModel):
|
|
lead_id: str
|
|
email: str
|
|
score: int
|
|
tier: str
|
|
|
|
|
|
class PreparedOffer(BaseModel):
|
|
offer_id: str
|
|
lead_id: str
|
|
channel: str
|
|
message: str
|
|
|
|
|
|
class RecentLeadsResponse(BaseModel):
|
|
leads: list[Lead] = Field(default_factory=list)
|
|
|
|
|
|
class QualifyLeadsRequest(BaseModel):
|
|
leads: list[Lead] = Field(default_factory=list)
|
|
|
|
|
|
class QualifyLeadsResponse(BaseModel):
|
|
qualified_leads: list[QualifiedLead] = Field(default_factory=list)
|
|
|
|
|
|
class PrepareOffersRequest(BaseModel):
|
|
qualified_leads: list[QualifiedLead] = Field(default_factory=list)
|
|
|
|
|
|
class PrepareOffersResponse(BaseModel):
|
|
offers: list[PreparedOffer] = Field(default_factory=list)
|
|
|
|
|
|
class SendOffersRequest(BaseModel):
|
|
offers: list[PreparedOffer] = Field(default_factory=list)
|
|
|
|
|
|
class FailedLeadDelivery(BaseModel):
|
|
lead_id: str
|
|
reason: str
|
|
|
|
|
|
class SendOffersResponse(BaseModel):
|
|
sent_count: int
|
|
failed_count: int
|
|
failed: list[FailedLeadDelivery] = Field(default_factory=list)
|
|
|
|
|
|
APP_DESCRIPTION = """
|
|
Synthetic API with multiple linear demo workflows.
|
|
|
|
Travel workflow:
|
|
1. `GET /users/recent`
|
|
2. `GET /hotels/top`
|
|
3. `POST /segments/hotel`
|
|
4. `POST /assignments/hotels`
|
|
5. `POST /emails/send-offers`
|
|
|
|
CRM workflow:
|
|
1. `GET /crm/leads/recent`
|
|
2. `POST /crm/leads/qualify`
|
|
3. `POST /crm/offers/prepare`
|
|
4. `POST /crm/offers/send`
|
|
""".strip()
|
|
|
|
|
|
app = FastAPI(
|
|
title="Travel Product Manager API",
|
|
version="1.0.0",
|
|
description=APP_DESCRIPTION,
|
|
)
|
|
|
|
|
|
BASE_USERS_TS = datetime(2026, 3, 13, 10, 0, tzinfo=timezone.utc)
|
|
HOTEL_CATALOG: list[Hotel] = [
|
|
Hotel(id="hotel_001", name="Hotel Aurora", city="Berlin"),
|
|
Hotel(id="hotel_002", name="Sea Breeze Resort", city="Lisbon"),
|
|
Hotel(id="hotel_003", name="Mountain Vista", city="Zurich"),
|
|
Hotel(id="hotel_004", name="City Loft", city="Amsterdam"),
|
|
Hotel(id="hotel_005", name="River Palace", city="Prague"),
|
|
Hotel(id="hotel_006", name="Nordic Harbor", city="Stockholm"),
|
|
Hotel(id="hotel_007", name="Sunset Bay", city="Barcelona"),
|
|
Hotel(id="hotel_008", name="Alpine Crown", city="Vienna"),
|
|
]
|
|
|
|
|
|
def _build_users() -> list[User]:
|
|
users: list[User] = []
|
|
for idx in range(1, 31):
|
|
users.append(
|
|
User(
|
|
id=f"usr_{idx:03d}",
|
|
email=f"user{idx:03d}@example.com",
|
|
last_active=BASE_USERS_TS - timedelta(minutes=(idx - 1) * 5),
|
|
)
|
|
)
|
|
return users
|
|
|
|
|
|
def _build_recent_leads() -> list[Lead]:
|
|
leads: list[Lead] = []
|
|
sources = ["landing", "webinar", "partner", "organic"]
|
|
for idx in range(1, 21):
|
|
leads.append(
|
|
Lead(
|
|
lead_id=f"lead_{idx:03d}",
|
|
email=f"lead{idx:03d}@example.com",
|
|
source=sources[idx % len(sources)],
|
|
)
|
|
)
|
|
return leads
|
|
|
|
|
|
@app.get(
|
|
"/users/recent",
|
|
response_model=RecentUsersResponse,
|
|
operation_id="getRecentUsers",
|
|
tags=["travel-offer-workflow"],
|
|
)
|
|
async def get_recent_users(
|
|
last_active_after: Annotated[datetime | None, Query()] = None,
|
|
limit: Annotated[int, Query(ge=1, le=100)] = 30,
|
|
) -> RecentUsersResponse:
|
|
users = _build_users()
|
|
if last_active_after is not None:
|
|
users = [user for user in users if user.last_active > last_active_after]
|
|
return RecentUsersResponse(users=users[:limit])
|
|
|
|
|
|
@app.get(
|
|
"/hotels/top",
|
|
response_model=TopHotelsResponse,
|
|
operation_id="getTopHotels",
|
|
tags=["travel-offer-workflow"],
|
|
)
|
|
async def get_top_hotels(
|
|
limit: Annotated[int, Query(ge=1, le=20)] = 5,
|
|
city: Annotated[str | None, Query()] = None,
|
|
) -> TopHotelsResponse:
|
|
hotels = HOTEL_CATALOG
|
|
if city:
|
|
city_normalized = city.strip().lower()
|
|
hotels = [hotel for hotel in hotels if hotel.city.lower() == city_normalized]
|
|
return TopHotelsResponse(hotels=hotels[:limit])
|
|
|
|
|
|
@app.post(
|
|
"/segments/hotel",
|
|
response_model=HotelSegmentsResponse,
|
|
operation_id="segmentUsersByHotelPreferences",
|
|
tags=["travel-offer-workflow"],
|
|
)
|
|
async def segment_users_by_hotel_preferences(
|
|
payload: HotelSegmentsRequest,
|
|
) -> HotelSegmentsResponse:
|
|
if not payload.users or not payload.hotels:
|
|
return HotelSegmentsResponse(segments=[])
|
|
|
|
grouped: dict[str, list[str]] = {hotel.id: [] for hotel in payload.hotels}
|
|
for index, user in enumerate(payload.users):
|
|
hotel = payload.hotels[index % len(payload.hotels)]
|
|
grouped[hotel.id].append(user.id)
|
|
|
|
segments: list[Segment] = []
|
|
for hotel in payload.hotels:
|
|
user_ids = grouped.get(hotel.id, [])
|
|
if not user_ids:
|
|
continue
|
|
segments.append(
|
|
Segment(
|
|
segment_id=f"seg_{hotel.id}",
|
|
hotel_id=hotel.id,
|
|
user_ids=user_ids,
|
|
)
|
|
)
|
|
|
|
return HotelSegmentsResponse(segments=segments)
|
|
|
|
|
|
@app.post(
|
|
"/assignments/hotels",
|
|
response_model=AssignmentsResponse,
|
|
operation_id="assignUsersToHotels",
|
|
tags=["travel-offer-workflow"],
|
|
)
|
|
async def assign_users_to_hotels(payload: AssignmentsRequest) -> AssignmentsResponse:
|
|
assignments: list[Assignment] = []
|
|
for segment in payload.segments:
|
|
for user_id in segment.user_ids:
|
|
assignments.append(Assignment(user_id=user_id, hotel_id=segment.hotel_id))
|
|
return AssignmentsResponse(assignments=assignments)
|
|
|
|
|
|
@app.post(
|
|
"/emails/send-offers",
|
|
response_model=EmailOfferResponse,
|
|
status_code=200,
|
|
operation_id="sendHotelOffersByEmail",
|
|
tags=["travel-offer-workflow"],
|
|
|
|
)
|
|
async def send_hotel_offers_by_email(payload: EmailOfferRequest) -> EmailOfferResponse:
|
|
_ = payload.template_id
|
|
|
|
failed: list[FailedDelivery] = []
|
|
for assignment in payload.assignments:
|
|
if assignment.user_id.endswith("000"):
|
|
failed.append(
|
|
FailedDelivery(
|
|
user_id=assignment.user_id,
|
|
reason="Invalid user id for delivery",
|
|
)
|
|
)
|
|
|
|
sent_count = len(payload.assignments) - len(failed)
|
|
return EmailOfferResponse(
|
|
sent_count=sent_count,
|
|
failed_count=len(failed),
|
|
failed=failed,
|
|
)
|
|
|
|
|
|
@app.get(
|
|
"/crm/leads/recent",
|
|
response_model=RecentLeadsResponse,
|
|
operation_id="getRecentLeads",
|
|
tags=["crm-linear-workflow"],
|
|
)
|
|
async def get_recent_leads(
|
|
limit: Annotated[int, Query(ge=1, le=50)] = 20,
|
|
source: Annotated[str | None, Query()] = None,
|
|
) -> RecentLeadsResponse:
|
|
leads = _build_recent_leads()
|
|
if source:
|
|
source_normalized = source.strip().lower()
|
|
leads = [lead for lead in leads if lead.source.lower() == source_normalized]
|
|
return RecentLeadsResponse(leads=leads[:limit])
|
|
|
|
|
|
@app.post(
|
|
"/crm/leads/qualify",
|
|
response_model=QualifyLeadsResponse,
|
|
operation_id="qualifyLeadsForOffer",
|
|
tags=["crm-linear-workflow"],
|
|
)
|
|
async def qualify_leads_for_offer(payload: QualifyLeadsRequest) -> QualifyLeadsResponse:
|
|
qualified: list[QualifiedLead] = []
|
|
for index, lead in enumerate(payload.leads):
|
|
score = 55 + ((index * 7) % 45)
|
|
tier = "high" if score >= 80 else "medium" if score >= 65 else "low"
|
|
qualified.append(
|
|
QualifiedLead(
|
|
lead_id=lead.lead_id,
|
|
email=lead.email,
|
|
score=score,
|
|
tier=tier,
|
|
)
|
|
)
|
|
return QualifyLeadsResponse(qualified_leads=qualified)
|
|
|
|
|
|
@app.post(
|
|
"/crm/offers/prepare",
|
|
response_model=PrepareOffersResponse,
|
|
operation_id="prepareOffersForLeads",
|
|
tags=["crm-linear-workflow"],
|
|
)
|
|
async def prepare_offers_for_leads(payload: PrepareOffersRequest) -> PrepareOffersResponse:
|
|
offers: list[PreparedOffer] = []
|
|
for lead in payload.qualified_leads:
|
|
channel = "email" if lead.tier in {"high", "medium"} else "push"
|
|
offers.append(
|
|
PreparedOffer(
|
|
offer_id=f"offer_{lead.lead_id}",
|
|
lead_id=lead.lead_id,
|
|
channel=channel,
|
|
message=f"Special travel offer for {lead.tier} intent lead",
|
|
)
|
|
)
|
|
return PrepareOffersResponse(offers=offers)
|
|
|
|
|
|
@app.post(
|
|
"/crm/offers/send",
|
|
response_model=SendOffersResponse,
|
|
operation_id="sendPreparedOffers",
|
|
tags=["crm-linear-workflow"],
|
|
)
|
|
async def send_prepared_offers(payload: SendOffersRequest) -> SendOffersResponse:
|
|
failed: list[FailedLeadDelivery] = []
|
|
for offer in payload.offers:
|
|
if offer.lead_id.endswith("000"):
|
|
failed.append(
|
|
FailedLeadDelivery(
|
|
lead_id=offer.lead_id,
|
|
reason="Invalid lead for delivery",
|
|
)
|
|
)
|
|
|
|
sent_count = len(payload.offers) - len(failed)
|
|
return SendOffersResponse(
|
|
sent_count=sent_count,
|
|
failed_count=len(failed),
|
|
failed=failed,
|
|
)
|
|
|
|
|
|
@app.get("/health")
|
|
async def health() -> dict[str, str]:
|
|
return {"status": "ok"}
|