Skip to content

feat(realtime): Realtime streams v2#2632

Merged
ericallam merged 65 commits intomainfrom
feat/realtime-streams-2
Nov 11, 2025
Merged

feat(realtime): Realtime streams v2#2632
ericallam merged 65 commits intomainfrom
feat/realtime-streams-2

Conversation

@ericallam
Copy link
Copy Markdown
Member

@ericallam ericallam commented Oct 24, 2025

This PR introduces an upgraded Realtime streams backend and SDK that makes streams more reliable (and resumable) with increased or the removal of limits. We've also improved the visibily of streams via the run dashboard.

New limits

View the below limits table for more details:

Limit Streams v1 Streams v2
Maximum stream length 2000 Unlimited
Number of active streams per run 5 Unlimited
Maximum streams per run 10 Unlimited
Maximum stream TTL 1 day 28 days
Maximum stream size 10MB 300 MiB

Additionally, previously only a single client stream could be sent to a Realtime stream. Now, you can send multiple client streams to a single Realtime stream.

Reliability improvements

When appending to a stream, the backend will now reliably resume appending from the last chunk index if there is a lost connection. Additionally, we've improved the reliability of reading from a stream by automatically resuming failed reads from the last chunk index if there is a lost connection.

This means that both sides of the stream will be much more reliable and will not lose data even when faced with network issues or other disruptions.

SDK improvements

We've moved the stream logic into their own dedicated namespace in the SDK instead of being mixed in with the other metadata methods:

import { streams, metadata, task } from "@trigger.dev/sdk";

const myTask = task({
  id: "my-task",
  run: async (payload: { message: string }) => {
    const stream = // ... get a stream from somewhere
      // Before:
      await metadata.stream("my-stream", stream);

    // After: (notice we don't await the result)
    streams.pipe("my-stream", stream);
  },
});

You can now pipe to a stream using the streams.pipe method, which returns a result that can be used to wait until the stream is complete:

import { streams, task } from "@trigger.dev/sdk";

const myTask = task({
  id: "my-task",
  run: async (payload: { message: string }) => {
    const myAIStream = // ... get a stream from somewhere

    const { stream, waitUntilComplete } = streams.pipe("ai-stream", myAIStream);

    // A. Iterate over the stream which is an async iterable and a ReadableStream
    for await (const chunk of stream) {
      console.log(chunk);
    }

    // B. Wait until the stream is complete
    await waitUntilComplete();

    return {
      message: "Stream completed successfully",
    };
  },
});

When calling streams.pipe from inside a task, the stream is automatically associated with the current run. You can also optionally specify a target run ID to pipe to a stream on a different run:

import { streams, task } from "@trigger.dev/sdk";

const myTask = task({
  id: "my-task",
  run: async (payload: { message: string; otherRunId?: string }, { ctx }) => {
    const myAIStream = // ... get a stream from somewhere

    // Pipe to a stream on the root run
    const { stream, waitUntilComplete } = streams.pipe("ai-stream", myAIStream, { target: ctx.run.rootTaskRunId });

    // Pipe to a stream on the parent run
    const { stream, waitUntilComplete } = streams.pipe("ai-stream", myAIStream, { target: ctx.run.parentTaskRunId });

    // Pipe to a stream on another run by ID
    const { stream, waitUntilComplete } = streams.pipe("ai-stream", myAIStream, { target: payload.otherRunId });
  },
});

This means that, if you specify a target run ID, you can pipe to a stream outside of a task:

import { streams } from "@trigger.dev/sdk";
import { openai } from "@ai-sdk/openai";
import {
  convertToModelMessages,
  streamText,
  UIMessage,
  createUIMessageStreamResponse,
} from "ai";

// Allow streaming responses up to 30 seconds
export const maxDuration = 30;

export async function POST(req: Request) {
  const { messages, runId }: { messages: UIMessage[]; runId: string } =
    await req.json();

  const result = streamText({
    model: openai("gpt-4.1"),
    system: "You are a helpful assistant.",
    messages: convertToModelMessages(messages),
  });

  const { stream } = streams.pipe("ai-stream", result.toUIMessageStream(), {
    target: runId,
  });

  return createUIMessageStreamResponse({
    stream,
  });
}

We've also added a new streams.read method to read from a stream:

import { streams } from "@trigger.dev/sdk";

const stream = await streams.read(runId, "my-stream");

for await (const chunk of stream) {
  console.log(chunk);
}

You can also specify a timeout and start index to read from:

import { streams } from "@trigger.dev/sdk";

const stream = await streams.read(runId, "my-stream", {
  timeoutInSeconds: 10, // Will stop the stream if no data is received within 10 seconds
  startIndex: 10, // Will start reading from the 10th chunk
});

Default stream

Runs also now have a "default" stream which means you can optionally skip specifying a stream key:

import { streams } from "@trigger.dev/sdk";

// Skip the stream key, uses the default run stream
const { stream } = pipe(result.toUIMessageStream(), {
  target: runId,
});

// Read from the default run stream
const readStream = await streams.read(runId);

streams.append/writer

You can append a single chunk to a stream using the streams.append method:

import { streams } from "@trigger.dev/sdk";

// Call this from inside a task
await streams.append("my-stream", "Hello, world!");

Or you can use the streams.writer method to write multiple chunks to a stream or merge a stream into another stream:

import { streams } from "@trigger.dev/sdk";

// Call this from inside a task
streams.writer("my-stream", {
  execute: ({ write, merge }) => {
    write("Hello, world!");
    write("Hello, world 2!");
    merge(ReadableStream.from(["Hello, world 3!", "Hello, world 4!"]));
  },
});

Both of these methods accept a target run ID to append/write to a stream on a different run:

import { streams } from "@trigger.dev/sdk";

// Append to a stream on the root run
await streams.append("my-stream", "Hello, world!", {
  target: ctx.run.rootTaskRunId,
});

// Write to a stream on the parent run
streams.writer("my-stream", {
  execute: ({ write, merge }) => {
    write("Hello, world!");
  },
  target: ctx.run.parentTaskRunId,
});

streams.define

You can now define a stream in one place with the chunk type and the stream ID and use it in multiple places, DRYing up your code:

import { streams } from "@trigger.dev/sdk";

// Define a stream with a string chunk type
const myStream = streams.define<string>({
  id: "my-stream",
});

// Pipe to the stream from inside a task
myStream.pipe(result.toUIMessageStream());
// Append to the stream from inside a task
await myStream.append("Hello, world!");
// Write to the stream from inside a task
myStream.writer({
  execute: ({ write, merge }) => {
    write("Hello, world!");
    merge(ReadableStream.from(["Hello, world 2!", "Hello, world 3!"]));
  },
});
// Read from the stream from anywhere
const stream = await myStream.read(runId);

New useRealtimeStream hook

We've added a new useRealtimeStream hook to subscribe to a stream by its run ID and optional stream key:

"use client";

import { useRealtimeStream } from "@trigger.dev/react-hooks";
import { Streamdown } from "streamdown";

export function Streams({
  accessToken,
  runId,
  startIndex,
}: {
  accessToken: string;
  runId: string;
  startIndex?: number;
}) {
  // parts will be typed as `string[]`
  const { parts, error } = useRealtimeStream<string>(runId, "stream", {
    accessToken, // Pass in a public access token to authenticate the request
    onData: (data) => {
      console.log(data); // Optionally, you can listen to the data as it comes in
    },
    timeoutInSeconds: 600, // Will stop the stream if no data is received within 600 seconds (default is 60 seconds)
    startIndex, // Will start reading from the xth chunk if provided (default is 0)
    throttleInMs: 50, // Will throttle the stream updates to 50ms (default is 16ms)
  });

  if (error)
    return (
      <div className="text-red-600 font-semibold">Error: {error.message}</div>
    );

  if (!parts) return <div className="text-gray-600">Loading...</div>;

  const stream = parts.join("");

  return (
    <div className="space-y-4">
      <div className="text-sm font-medium text-gray-700">
        <span className="font-semibold">Run:</span> {runId}
      </div>
      <div className="prose prose-sm max-w-none text-gray-900">
        <Streamdown isAnimating={true}>{stream}</Streamdown>
      </div>
    </div>
  );
}

Just like the previous new functions, you can skip specifying the stream key when using useRealtimeStream:

// Uses the default stream
const { parts, error } = useRealtimeStream<string>(runId);

You can also pass in the stream instance directly:

import { streams } from "@trigger.dev/sdk";

const myStream = streams.define<string>({
  id: "my-stream",
});

const { parts, error } = useRealtimeStream(myStream, runId);

Dashboard improvements

We're now surfacing streams in the runs dashboard that will allow you to view the stream data in real-time:

CleanShot.2025-10-24.at.17.19.22.mp4

Loading
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants