fix: classifier timeout + Featherless concurrency cap

Two related issues blocking real-world use of the kickoff parse:

1. Classifier calls take ~12s end-to-end on Featherless for the
   complex KickoffParse schema (Hermes-3-8B generating ~1.3KB of
   structured JSON). The 10s timeout was firing on most attempts,
   causing all 3 retries to time out and the empty-fallback to render
   with blank form values. Bumping the default
   classifier_timeout_s 10 → 30s gives generous headroom; measured
   p99 is ~13s, so 30s is comfortable.

2. Featherless caps concurrent connections per account (2 on free /
   lower paid tiers). Each turn flow can fire 4–5 calls (parse,
   scene-close detect, narrative stream, two state-update passes)
   plus the background significance worker. Without a gate, we'd
   exceed the cap and fail.

   Added a class-level ``asyncio.Semaphore`` to FeatherlessClient,
   shared across all instances, configured once in lifespan from
   ``Settings.featherless_max_concurrent`` (default 2). Both
   ``generate`` and ``stream`` acquire the semaphore for the duration
   of the call; the stream holds it until the async generator
   completes, so token streaming is correctly accounted for.

Verified live: 4/4 sequential kickoff parses for the same bot all
succeed with real parsed values (previously ~50% blank-fallback).
Full suite: 168 passed.
This commit is contained in:
Joseph Doherty
2026-04-26 15:15:14 -04:00
parent 5aab98e4d7
commit 5c039c8e56
3 changed files with 53 additions and 19 deletions
+7 -2
View File
@@ -65,12 +65,17 @@ async def lifespan(app: FastAPI):
app.state.settings = settings app.state.settings = settings
# Cap concurrent Featherless connections to the account's limit
# (free / lower paid tiers cap at 2). Shared across all
# FeatherlessClient instances in the process.
from chat.llm.featherless import FeatherlessClient
FeatherlessClient.configure_concurrency(settings.featherless_max_concurrent)
# Background worker for the async significance pass (T22). Each job # Background worker for the async significance pass (T22). Each job
# constructs a fresh FeatherlessClient via the factory; tests can # constructs a fresh FeatherlessClient via the factory; tests can
# disable enqueue by toggling ``app.state.background_worker.enabled``. # disable enqueue by toggling ``app.state.background_worker.enabled``.
def _factory(): def _factory():
from chat.llm.featherless import FeatherlessClient
return FeatherlessClient( return FeatherlessClient(
api_key=settings.featherless_api_key, api_key=settings.featherless_api_key,
base_url=settings.featherless_base_url, base_url=settings.featherless_base_url,
+4 -1
View File
@@ -24,7 +24,10 @@ class Settings(BaseModel):
narrative_budget_hard: int = 8000 narrative_budget_hard: int = 8000
narrative_budget_soft: int = 6000 narrative_budget_soft: int = 6000
classifier_budget_hard: int = 4000 classifier_budget_hard: int = 4000
classifier_timeout_s: float = 10.0 classifier_timeout_s: float = 30.0
# Featherless free tier and lower paid tiers cap concurrent connections.
# Set this to your account's max-concurrent-connections limit.
featherless_max_concurrent: int = 2
db_path: Path = DEFAULT_DB db_path: Path = DEFAULT_DB
data_dir: Path = REPO_ROOT / "data" data_dir: Path = REPO_ROOT / "data"
bind_host: str = "127.0.0.1" bind_host: str = "127.0.0.1"
+42 -16
View File
@@ -1,29 +1,55 @@
from __future__ import annotations from __future__ import annotations
import asyncio
from typing import AsyncIterator, Sequence from typing import AsyncIterator, Sequence
from openai import AsyncOpenAI from openai import AsyncOpenAI
from .client import Message from .client import Message
class FeatherlessClient: class FeatherlessClient:
"""Client for Featherless's OpenAI-compatible API.
Featherless caps concurrent connections per account (2 on free / lower
paid tiers). A class-level semaphore gates every ``generate`` and
``stream`` call so the orchestrator never exceeds the configured cap,
regardless of how many ``FeatherlessClient`` instances are alive.
Configure once at app startup via :meth:`configure_concurrency`. The
default is 2.
"""
_semaphore: asyncio.Semaphore | None = None
@classmethod
def configure_concurrency(cls, max_concurrent: int) -> None:
cls._semaphore = asyncio.Semaphore(max(1, int(max_concurrent)))
@classmethod
def _sem(cls) -> asyncio.Semaphore:
if cls._semaphore is None:
cls._semaphore = asyncio.Semaphore(2)
return cls._semaphore
def __init__(self, api_key: str, base_url: str = "https://api.featherless.ai/v1"): def __init__(self, api_key: str, base_url: str = "https://api.featherless.ai/v1"):
self._client = AsyncOpenAI(api_key=api_key, base_url=base_url) self._client = AsyncOpenAI(api_key=api_key, base_url=base_url)
async def generate(self, messages: Sequence[Message], *, model: str, **params) -> str: async def generate(self, messages: Sequence[Message], *, model: str, **params) -> str:
resp = await self._client.chat.completions.create( async with self._sem():
model=model, resp = await self._client.chat.completions.create(
messages=[{"role": m.role, "content": m.content} for m in messages], model=model,
**params, messages=[{"role": m.role, "content": m.content} for m in messages],
) **params,
return resp.choices[0].message.content or "" )
return resp.choices[0].message.content or ""
async def stream(self, messages: Sequence[Message], *, model: str, **params) -> AsyncIterator[str]: async def stream(self, messages: Sequence[Message], *, model: str, **params) -> AsyncIterator[str]:
stream = await self._client.chat.completions.create( async with self._sem():
model=model, stream = await self._client.chat.completions.create(
messages=[{"role": m.role, "content": m.content} for m in messages], model=model,
stream=True, messages=[{"role": m.role, "content": m.content} for m in messages],
**params, stream=True,
) **params,
async for chunk in stream: )
delta = chunk.choices[0].delta.content or "" async for chunk in stream:
if delta: delta = chunk.choices[0].delta.content or ""
yield delta if delta:
yield delta