Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.vertz.dev/llms.txt

Use this file to discover all available pages before exploring further.

query() consumes streaming endpoints the same way it consumes REST endpoints — pass an SDK call that returns a StreamDescriptor<T>, and the data accumulates in a reactive array. For ad-hoc / hand-rolled iterables (a WebSocket, an EventSource, an agent.stream()), pass a function thunk with an explicit key.

Two ways to consume a stream

@vertz/openapi emits stream endpoints as StreamDescriptors. The call site is identical to a REST endpoint:
import { query } from '@vertz/ui';
import { sdk } from './generated';

export function DeployFeed() {
  const events = query(sdk.events.stream({ topic: 'deploys' }));

  return (
    <div>
      {events.loading && <Spinner />}
      {events.error && <ErrorBanner error={events.error} />}
      {events.data.map((e) => (
        <Event key={e.id} event={e} />
      ))}
    </div>
  );
}
The descriptor carries its own _key (derived from method + path + args, identical to REST descriptors), so cache identity and dedup work the same way they do for query(api.tasks.list({ page: 1 })). No manual key, no wrapper thunk. Note: invalidate(descriptor) operates on entity-backed REST queries today and is not yet supported for stream descriptors. To restart a stream manually, call events.refetch() on the result.

2. Function thunks (escape hatch for ad-hoc iterables)

When the source isn’t a generated SDK method — e.g., a raw WebSocket, a hand-rolled async generator, an SDK from outside @vertz/openapi — use the function-thunk overload with an explicit key:
import { query, fromWebSocket } from '@vertz/ui';

export function TickerWidget() {
  const ticks = query<TickEvent>(
    (signal) => fromWebSocket<TickEvent>('wss://stream.example/ticks', signal),
    { key: 'ticks' },
  );
  // ...
}
Use thunks when the iterable’s identity can’t be expressed as a method:path?args string. Use descriptors otherwise.

When to reach for it

  • Agent run output. agent.stream(sessionId) yields AgentEvents (assistant tokens, tool calls, tool results) as they arrive. Render them progressively without manual subscribe / unsubscribe wiring.
  • Chat transcripts. Each message is a yield; the array IS the transcript.
  • Live dashboards. Stock ticks, sensor readings, deploy events.
  • Log tails / SSE streams. Server-sent events become an iterable with one line.
If your data is a single snapshot that you re-fetch periodically, stick with the promise overload (optionally with refetchInterval).

The shape of the result

A stream-backed query() returns a QueryStreamResult<T> — slightly different from the promise overload’s QueryResult<T>:
interface QueryStreamResult<T> {
  data: T[]; // ← always-array, never undefined
  loading: boolean; // true until the first yield
  reconnecting: boolean; // true between a refetch / restart and the next yield
  error: unknown; // last error from the iterator (iteration halts)
  idle: boolean; // true only when the thunk has not yet run
  refetch: () => void; // cancel + reset data + start a new iterator
  revalidate: () => void; // alias for refetch
  dispose: () => void; // cancel + clean up
}
The most important difference: data is never undefined. Render it directly:
// Always safe — empty array renders nothing
{
  messages.data.map((m) => <Message key={m.id} message={m} />);
}
No if (data) { ... } guard needed. While the stream is connecting, loading is true and data is [].

Lifecycle

The query owns the iterator’s lifecycle:
  • dispose() (or auto-cleanup on component unmount) calls signal.abort() and iterator.return?.() so producers can release resources.
  • refetch() cancels the current iterator, resets data to [], sets reconnecting to true, and starts a fresh iterator.
  • HMR triggers dispose() on the old query before re-evaluating the module — no leaked iterators.
The thunk receives an AbortSignal bound to that lifecycle. Wire it to your producer:
async function* myStream(signal?: AbortSignal) {
  const ws = new WebSocket('wss://example/x');
  signal?.addEventListener('abort', () => ws.close());
  // ... yield messages
}
If you forget to wire signal to your producer, dispose() still stops yields from landing in data (the framework checks signal.aborted between iterator steps), but the underlying socket / fetch / etc. keeps running. Always wire the signal.

Reactive keys

Stream queries support reactive keys the same way promise queries do — read a signal in your thunk and the iterator restarts when that signal changes:
import { signal } from '@vertz/ui';
import { agent } from '~/agents/triage';

const sessionId = signal('s1');

function SessionTranscript() {
  const messages = query((signal) => agent.stream(sessionId.value, { signal }), {
    key: ['session', sessionId.value, 'messages'] as const,
  });
  // ...
}
When sessionId.value changes, the previous iterator aborts (signal fires, iterator.return?.() fires), data resets to [], a new iterator starts for the new id. No useEffect, no manual diffing. Tuple keys (['session', id, 'messages']) serialize deterministically so two queries with equivalent shapes share a cache slot.

Built-in helpers

import { fromWebSocket, fromEventSource, query } from '@vertz/ui';

// WebSocket
const events = query<MyEvent>(
  (signal) => fromWebSocket<MyEvent>('wss://api.example/events', signal),
  { key: 'events' },
);

// Server-Sent Events
const updates = query<UpdateMsg>(
  (signal) => fromEventSource<UpdateMsg>('https://api.example/sse', signal),
  { key: 'updates' },
);
Both helpers:
  • Try JSON.parse(event.data); yield the parsed value (or the raw string on parse failure).
  • Close the underlying source on signal.abort().
  • Throw inside the generator on socket-level errors (lands on .error).
Heads-up on errors: native WebSocket / EventSource error events don’t carry the underlying failure reason (web platform security limitation). The helpers throw a generic new Error('source error'). For diagnostic detail, use DevTools’ network panel or wrap your messages in a richer envelope ({ ok, data, error }).

What stream queries are not

This is intentionally a minimal v1. The following are non-goals — recipes below explain how to handle them in user-land.
  • No SSR for streams. A stream query renders with data: [] during the SSR pass; the iterator attaches on hydration. messages.data.map(...) produces an empty list during SSR, then progressively fills.
  • No accumulated-state cache across navigation. When you navigate away and back, the iterator re-attaches from scratch. Use cursor semantics in your producer (next recipe) if you need replay.
  • No refetchInterval interop. Polling and streaming are mutually exclusive — passing both throws a QueryStreamMisuseError.
  • No reducer / select hooks. Produce the merged shape inside your iterator (next recipe).
  • No multi-tenant re-auth on stream queries. Component remount on auth change is the contract — the iterator does not re-validate tokens mid-flight.
  • No source-type swap inside one query. A thunk that returns an AsyncIterable on Tuesday and a Promise on Wednesday throws — split into two queries with distinct keys.
  • No shared entity-store with REST queries. A query(api.tasks.list()) and a query(sdk.tasks.events()) for the same entity type don’t share cache state. If you mutate a task via the REST API, the optimistic update flows into the REST query immediately — but the stream’s data array only updates when the server pushes the change back through the stream. For mixed REST + stream rendering of the same entity, invalidate the REST query manually when stream events arrive (or treat the stream as authoritative and skip the REST query for that entity).

Recipes

Cursor / replay pattern

When the user navigates back to a chat session, you want to resume from the last seen message — not replay the entire history. Push the cursor into your producer:
const lastEventId = useLastSeenId(sessionId);

const messages = query((signal) => agent.stream(sessionId, { since: lastEventId, signal }), {
  key: ['session', sessionId, 'messages'] as const,
});
The framework re-establishes the iterator on each mount; your producer decides what to send.

Dedup wrapper

Wrap your iterator with an async generator that filters duplicates:
async function* dedupById<T extends { id: string }>(src: AsyncIterable<T>): AsyncIterable<T> {
  const seen = new Set<string>();
  for await (const item of src) {
    if (seen.has(item.id)) continue;
    seen.add(item.id);
    yield item;
  }
}

const messages = query((signal) => dedupById(agent.stream(sessionId, { signal })), {
  key: ['session', sessionId, 'messages'] as const,
});

Forgetting to wire the AbortSignal

This will appear to work but leaks the underlying socket on dispose:
// WRONG — signal never wired to the WebSocket
async function* leaky() {
  const ws = new WebSocket('wss://example/x');
  for await (const m of asEvents(ws, 'message')) yield m;
}
Symptoms: every HMR of the file leaves a dangling WebSocket connection in DevTools’ network panel. Fix by passing signal through:
async function* clean(signal?: AbortSignal) {
  const ws = new WebSocket('wss://example/x');
  signal?.addEventListener('abort', () => ws.close());
  for await (const m of asEvents(ws, 'message')) yield m;
}
The fromWebSocket and fromEventSource helpers handle this for you.