From 5c039c8e564c419d063e6eb47eef4d678f4c236d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 26 Apr 2026 15:15:14 -0400 Subject: [PATCH] fix: classifier timeout + Featherless concurrency cap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related issues blocking real-world use of the kickoff parse: 1. Classifier calls take ~12s end-to-end on Featherless for the complex KickoffParse schema (Hermes-3-8B generating ~1.3KB of structured JSON). The 10s timeout was firing on most attempts, causing all 3 retries to time out and the empty-fallback to render with blank form values. Bumping the default classifier_timeout_s 10 → 30s gives generous headroom; measured p99 is ~13s, so 30s is comfortable. 2. Featherless caps concurrent connections per account (2 on free / lower paid tiers). Each turn flow can fire 4–5 calls (parse, scene-close detect, narrative stream, two state-update passes) plus the background significance worker. Without a gate, we'd exceed the cap and fail. Added a class-level ``asyncio.Semaphore`` to FeatherlessClient, shared across all instances, configured once in lifespan from ``Settings.featherless_max_concurrent`` (default 2). Both ``generate`` and ``stream`` acquire the semaphore for the duration of the call; the stream holds it until the async generator completes, so token streaming is correctly accounted for. Verified live: 4/4 sequential kickoff parses for the same bot all succeed with real parsed values (previously ~50% blank-fallback). Full suite: 168 passed. --- chat/app.py | 9 +++++-- chat/config.py | 5 +++- chat/llm/featherless.py | 58 +++++++++++++++++++++++++++++------------ 3 files changed, 53 insertions(+), 19 deletions(-) diff --git a/chat/app.py b/chat/app.py index 3e96347..c9daf90 100644 --- a/chat/app.py +++ b/chat/app.py @@ -65,12 +65,17 @@ async def lifespan(app: FastAPI): app.state.settings = settings + # Cap concurrent Featherless connections to the account's limit + # (free / lower paid tiers cap at 2). Shared across all + # FeatherlessClient instances in the process. + from chat.llm.featherless import FeatherlessClient + + FeatherlessClient.configure_concurrency(settings.featherless_max_concurrent) + # Background worker for the async significance pass (T22). Each job # constructs a fresh FeatherlessClient via the factory; tests can # disable enqueue by toggling ``app.state.background_worker.enabled``. def _factory(): - from chat.llm.featherless import FeatherlessClient - return FeatherlessClient( api_key=settings.featherless_api_key, base_url=settings.featherless_base_url, diff --git a/chat/config.py b/chat/config.py index b2aed74..935cb21 100644 --- a/chat/config.py +++ b/chat/config.py @@ -24,7 +24,10 @@ class Settings(BaseModel): narrative_budget_hard: int = 8000 narrative_budget_soft: int = 6000 classifier_budget_hard: int = 4000 - classifier_timeout_s: float = 10.0 + classifier_timeout_s: float = 30.0 + # Featherless free tier and lower paid tiers cap concurrent connections. + # Set this to your account's max-concurrent-connections limit. + featherless_max_concurrent: int = 2 db_path: Path = DEFAULT_DB data_dir: Path = REPO_ROOT / "data" bind_host: str = "127.0.0.1" diff --git a/chat/llm/featherless.py b/chat/llm/featherless.py index 3e1fbcf..cf1138b 100644 --- a/chat/llm/featherless.py +++ b/chat/llm/featherless.py @@ -1,29 +1,55 @@ from __future__ import annotations +import asyncio from typing import AsyncIterator, Sequence from openai import AsyncOpenAI from .client import Message class FeatherlessClient: + """Client for Featherless's OpenAI-compatible API. + + Featherless caps concurrent connections per account (2 on free / lower + paid tiers). A class-level semaphore gates every ``generate`` and + ``stream`` call so the orchestrator never exceeds the configured cap, + regardless of how many ``FeatherlessClient`` instances are alive. + + Configure once at app startup via :meth:`configure_concurrency`. The + default is 2. + """ + + _semaphore: asyncio.Semaphore | None = None + + @classmethod + def configure_concurrency(cls, max_concurrent: int) -> None: + cls._semaphore = asyncio.Semaphore(max(1, int(max_concurrent))) + + @classmethod + def _sem(cls) -> asyncio.Semaphore: + if cls._semaphore is None: + cls._semaphore = asyncio.Semaphore(2) + return cls._semaphore + def __init__(self, api_key: str, base_url: str = "https://api.featherless.ai/v1"): self._client = AsyncOpenAI(api_key=api_key, base_url=base_url) async def generate(self, messages: Sequence[Message], *, model: str, **params) -> str: - resp = await self._client.chat.completions.create( - model=model, - messages=[{"role": m.role, "content": m.content} for m in messages], - **params, - ) - return resp.choices[0].message.content or "" + async with self._sem(): + resp = await self._client.chat.completions.create( + model=model, + messages=[{"role": m.role, "content": m.content} for m in messages], + **params, + ) + return resp.choices[0].message.content or "" async def stream(self, messages: Sequence[Message], *, model: str, **params) -> AsyncIterator[str]: - stream = await self._client.chat.completions.create( - model=model, - messages=[{"role": m.role, "content": m.content} for m in messages], - stream=True, - **params, - ) - async for chunk in stream: - delta = chunk.choices[0].delta.content or "" - if delta: - yield delta + async with self._sem(): + stream = await self._client.chat.completions.create( + model=model, + messages=[{"role": m.role, "content": m.content} for m in messages], + stream=True, + **params, + ) + async for chunk in stream: + delta = chunk.choices[0].delta.content or "" + if delta: + yield delta