MingaEditor.Agent.Ingest (Minga v0.1.0)

Copy Markdown View Source

Coalesces high-frequency agent stream deltas before they reach the Editor.

Agent token deltas (:text_delta, :thinking_delta, :tool_update) arrive from MingaAgent.Session at potentially hundreds of messages per second. Delivered straight into the Editor mailbox, each one runs a real buffer write and sits FIFO ahead of any key press queued behind it: head-of-line blocking that jitters keystroke latency under streaming load (epic #2220 AC 3, #2289).

Ingest sits between the session and the Editor. It subscribes to the session on the Editor's behalf via MingaAgent.Session.subscribe/2, so deltas land in its mailbox, not the Editor's. It then forwards one {:agent_stream_batch, session_pid, batch} per coalescing window instead of one message per delta. Keystrokes never pass through Ingest; they gain only a shorter mailbox ahead of them.

Debounce strategy (leading + trailing edge)

  • Leading edge (#2289 AC 5): the first delta after an idle period is forwarded immediately as a one-element batch, so time-to-first-token is unchanged. Forwarding then starts a Process.send_after/3 tick.
  • Trailing edge (#2289 AC 2): deltas that arrive while the tick is pending accumulate in state. When the tick fires, the whole accumulation forwards as a single batch and the session returns to idle. N steady-state deltas within one window therefore produce exactly one Editor message and one buffer sync.

Control-event ordering (#2289 AC 3)

Control events (status change, tool start/end, error, turn end, approvals, anything that is not a stream delta) flush the pending batch ahead of themselves, in order, then forward the control event unchanged as a normal {:agent_event, session_pid, event}. The Editor therefore never observes a turn ending before that turn's trailing text.

This is a Layer 2 process. MingaAgent.Session is unchanged.

Ingest is start_link'd from MingaEditor.init/1 and is intentionally linked to the Editor without trap_exit or its own supervision. It is an editor-integral process: it exists only to feed the Editor, holds no state the Editor cannot rebuild on the next subscribe, and its callbacks are total over arbitrary input (route/3 classifies any non-delta term as a control event, handle_info/2 ignores unknown messages, and timer/flush handling tolerates stale ticks). A crash here therefore means the Editor's own assumptions are already broken, so propagating the crash to restart both together is the correct outcome rather than silently losing the coalescer. This differs from Renderer.Server, which is a supervised sibling the Editor resolves by pid; Ingest is cheap to recreate and owned directly by the Editor on each boot.

Summary

Types

A coalesced run of deltas for one session, in arrival order.

Stream delta events that Ingest coalesces.

Per-session accumulation: pending deltas (reversed) and the tick ref.

Functions

Returns a specification to start this module under a supervisor.

Starts the ingest process.

Subscribes Ingest to session_pid so the session's events flow through here.

Types

batch()

@type batch() :: [delta()]

A coalesced run of deltas for one session, in arrival order.

delta()

@type delta() ::
  {:text_delta, term()}
  | {:thinking_delta, term()}
  | {:tool_update, term(), term(), term()}

Stream delta events that Ingest coalesces.

session_state()

@type session_state() :: %{pending: [delta()], timer: reference() | nil}

Per-session accumulation: pending deltas (reversed) and the tick ref.

state()

@type state() :: %{
  editor: pid(),
  window_ms: non_neg_integer(),
  sessions: %{optional(pid()) => session_state()}
}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

start_link(opts \\ [])

@spec start_link(keyword()) :: GenServer.on_start()

Starts the ingest process.

:editor is the pid that batches and control events are forwarded to and defaults to the calling process. :window_ms overrides the coalescing tick (tests pass 0 so the tick fires on the next message turn deterministically).

subscribe_session(server, session_pid, opts \\ [])

@spec subscribe_session(GenServer.server(), pid(), keyword()) ::
  :ok | {:error, term()}

Subscribes Ingest to session_pid so the session's events flow through here.

Runs the MingaAgent.Session.subscribe/2 call inside the Ingest process so Ingest (not the caller) becomes the session subscriber. Returns the result of the subscribe call.