Coalesces high-frequency agent stream deltas before they reach the Editor.
Agent token deltas (:text_delta, :thinking_delta, :tool_update) arrive
from MingaAgent.Session at potentially hundreds of messages per second.
Delivered straight into the Editor mailbox, each one runs a real buffer write
and sits FIFO ahead of any key press queued behind it: head-of-line blocking
that jitters keystroke latency under streaming load (epic #2220 AC 3, #2289).
Ingest sits between the session and the Editor. It subscribes to the session
on the Editor's behalf via MingaAgent.Session.subscribe/2, so deltas land in
its mailbox, not the Editor's. It then forwards one
{:agent_stream_batch, session_pid, batch} per coalescing window instead of
one message per delta. Keystrokes never pass through Ingest; they gain only a
shorter mailbox ahead of them.
Debounce strategy (leading + trailing edge)
- Leading edge (#2289 AC 5): the first delta after an idle period is
forwarded immediately as a one-element batch, so time-to-first-token is
unchanged. Forwarding then starts a
Process.send_after/3tick. - Trailing edge (#2289 AC 2): deltas that arrive while the tick is pending accumulate in state. When the tick fires, the whole accumulation forwards as a single batch and the session returns to idle. N steady-state deltas within one window therefore produce exactly one Editor message and one buffer sync.
Control-event ordering (#2289 AC 3)
Control events (status change, tool start/end, error, turn end, approvals,
anything that is not a stream delta) flush the pending batch ahead of
themselves, in order, then forward the control event unchanged as a normal
{:agent_event, session_pid, event}. The Editor therefore never observes a
turn ending before that turn's trailing text.
This is a Layer 2 process. MingaAgent.Session is unchanged.
Link policy
Ingest is start_link'd from MingaEditor.init/1 and is intentionally linked
to the Editor without trap_exit or its own supervision. It is an
editor-integral process: it exists only to feed the Editor, holds no state the
Editor cannot rebuild on the next subscribe, and its callbacks are total over
arbitrary input (route/3 classifies any non-delta term as a control event,
handle_info/2 ignores unknown messages, and timer/flush handling tolerates
stale ticks). A crash here therefore means the Editor's own assumptions are
already broken, so propagating the crash to restart both together is the
correct outcome rather than silently losing the coalescer. This differs from
Renderer.Server, which is a supervised sibling the Editor resolves by pid;
Ingest is cheap to recreate and owned directly by the Editor on each boot.
Summary
Types
A coalesced run of deltas for one session, in arrival order.
Stream delta events that Ingest coalesces.
Per-session accumulation: pending deltas (reversed) and the tick ref.
Functions
Returns a specification to start this module under a supervisor.
Starts the ingest process.
Subscribes Ingest to session_pid so the session's events flow through here.
Types
@type batch() :: [delta()]
A coalesced run of deltas for one session, in arrival order.
@type delta() :: {:text_delta, term()} | {:thinking_delta, term()} | {:tool_update, term(), term(), term()}
Stream delta events that Ingest coalesces.
Per-session accumulation: pending deltas (reversed) and the tick ref.
@type state() :: %{ editor: pid(), window_ms: non_neg_integer(), sessions: %{optional(pid()) => session_state()} }
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec start_link(keyword()) :: GenServer.on_start()
Starts the ingest process.
:editor is the pid that batches and control events are forwarded to and
defaults to the calling process. :window_ms overrides the coalescing tick
(tests pass 0 so the tick fires on the next message turn deterministically).
@spec subscribe_session(GenServer.server(), pid(), keyword()) :: :ok | {:error, term()}
Subscribes Ingest to session_pid so the session's events flow through here.
Runs the MingaAgent.Session.subscribe/2 call inside the Ingest process so
Ingest (not the caller) becomes the session subscriber. Returns the result of
the subscribe call.