Dynamic Content with Lambda Response Streaming

Published on November 22, 2023

With some creativity, we can use Lambda Response Streaming to dynamically show status updates, like the progress of long-running processes. And with proper TypeScript types and generator functions, we can write clean, dynamic content streaming Lambda with composable payload enrichers that will impress your work colleagues (if they are into this stuff).

🫣 Lambda Response Streaming in (probably) not intended way

The primary use case for Response Streaming is progressively sending payloads without buffering them first in the Lambda. This is useful for large payloads to reduce time to first byte or for the data generated in chunks – like massive SQL query results (read with cursor) or your million-dollar idea ChatGPT integrations.

But we can also use Lambda Response Streaming to send dynamic content, like status updates.

Previously, we looked at how to force Lambda Response Streaming to send the content we write immediately to the client. The trick was to increase each payload chunk size to 100 KB by adding whitespaces. As a result, we could show a dynamic progress bar in the browser with nothing but a single HTTP request.

But you must agree that writing responseStream.write('my text' + bigStr) for every chunk you send does not look best. So now, let’s move our Response Streaming Lambda code to the next level. This is the best approach I found – it involves yield and generator functions, but don’t worry, it’s really simple.

🤓 TypeScript types

The first step: make it work with TypeScript. If you don’t like to know the type of data you have in your variables, feel free to skip this 🤷‍♂️

To use Response Streaming, we call awslambda.streamifyResponse() function provided globally in the Lambda runtime. And AWS did not create any types for it, so we must declare them ourselves.

In our project, we install @types/aws-lambda library and create @types/awslambda/index.d.ts file:

import {APIGatewayProxyEventV2, Context, Handler} from "aws-lambda";
import {Writable} from 'stream';

declare global {
  namespace awslambda {
    export namespace HttpResponseStream {
      function from(writable: Writable, metadata: any): Writable;
    }

    export type ResponseStream = Writable & {
      setContentType(type: string): void;
    }

    export type StreamifyHandler = (event: APIGatewayProxyEventV2, responseStream: ResponseStream, context: Context) => Promise<any>;

    export function streamifyResponse(handler: StreamifyHandler): Handler<APIGatewayProxyEventV2>;
  }
}

Credits for the types to @llozano and their Response Streaming example.

With that in place, we can create our basic handler in TypeScript:

export const handler = awslambda.streamifyResponse(async (event, responseStream) => {
  responseStream.setContentType("plain/text");
  responseStream.write("Hello!");
  responseStream.end();
});

Note that the event type is APIGatewayProxyEventV2.

🌱 Generate and yield chunks

Have you ever used JavaScript generator functions? I’ll be honest – I did not. To me, they always seemed interesting but with no practical use. Until now.

Generator functions, declared as function*, produce multiple consecutive values with a yield keyword. Since generators are iterable, we can call them in a loop to ask for more values as long as they are available.

In our case, we can yield the next messages to send to the stream.

The first benefit of using generator functions for Response Streaming is that we don’t pass the responseStream as an argument to all nested functions. Instead, we write to the stream in a single place. The business logic functions are unaware of the stream, and their only responsibility is to yield the next messages.

So let’s add this to our Lambda:

export const handler = awslambda.streamifyResponse(async (event, responseStream) => {
  responseStream.setContentType("plain/text");

  const chunks = processRequest(event); // this only creates Generator
  for await (const chunk of chunks) { // this actually calls it
    responseStream.write(chunk);
  }

  responseStream.end();
});

const processRequest = async function* (event: APIGatewayProxyEventV2) {
  yield 'Processing request, please wait';

  for (let i = 0; i < 5; i++) {
    yield '.';
    await sleep(1000);
  }

  yield '\nCompleted!';
};

export const sleep = (ms: number) => new Promise(r => setTimeout(r, ms));

As you can see, generators also work well with async/await.

The result, as you may expect, looks like this:

Processing request, please wait.....
Completed!

🌿 More generator functions

Okay, the previous code was relatively simple. But what if we have more complex logic split over multiple functions?

Worry not, yield has our back. Or rather yield*, which delegates calls to another generator:

export const handler = awslambda.streamifyResponse(async (event, responseStream) => {
  responseStream.setContentType("plain/text");

  const chunks = processRequest(event);
  for await (const chunk of chunks) {
    responseStream.write(chunk);
  }

  responseStream.end();
});

const processRequest = async function* () {
  yield 'Processing request, please wait\n';

  yield* stepOne();
  yield* stepTwo();

  yield 'Completed!';
};

const stepOne = async function* () {
  yield 'Doing step one';
  for (let i = 0; i < 5; i++) {
    yield '.';
    await sleep(1000);
  }
  yield '\n';
  yield* finishStepOne();
}

const finishStepOne = async function* () {
  yield 'Finishing step one\n';
}

const stepTwo = async function* () {
  yield 'Doing step two'
  for (let i = 0; i < 3; i++) {
    yield '.';
    await sleep(1000);
  }
  yield '\n';
}

export const sleep = (ms: number) => new Promise(r => setTimeout(r, ms));

Output:

Processing request, please wait
Doing step one.....
Finishing step one
Doing step two...
Completed!

🪃 What about return values?

No worries, too. Generator functions can return the “final” value like any other function.

The return value IS NOT yielded, so, in our case, it won’t be sent to stream.

export const handler = awslambda.streamifyResponse(async (event, responseStream) => {
  responseStream.setContentType("plain/text");

  const chunks = processRequest();
  for await (const chunk of chunks) {
    responseStream.write(chunk);
  }

  responseStream.end();
});

const processRequest = async function* () {
  yield 'Processing request, please wait\n';

  const value = yield* generateNumber();
  yield* wait(value);

  yield 'Completed!';
};

const generateNumber = async function* () {
  yield 'Generating random integer\n';
  return Math.floor(Math.random() * 3) + 1; // random int from 1 to 3
}

const wait = async function* (value: number) {
  yield `Waiting for ${value} seconds`;
  for (let i = 0; i < stepOneResult; i++) {
    yield '.';
    await sleep(1000);
  }
  yield '\n';
}

export const sleep = (ms: number) => new Promise(r => setTimeout(r, ms));

Example output:

Processing request, please wait
Generating random integer
Waiting for 2 seconds..
Completed!

🪤 Error handling

One element is still missing in our code, and like in most codebases, it’s proper error handling. Let’s fix it:

export const handler = awslambda.streamifyResponse(async (event, responseStream) => {
  responseStream.setContentType("plain/text");

  try {
    const chunks = processRequest();
    for await (const chunk of chunks) {
      responseStream.write(chunk);
    }
  } catch (err) {
    console.error(err);
    const message = err instanceof Error ? err.message : String(err);
    responseStream.write(`ERROR: ${message}`);
  }

  responseStream.end();
});

Now, if an error is thrown at any point in our request processing, we write it to the logs and the stream.

🙃 A better, but not working way

There is even a simpler way to send chunks to the response stream than iterating over the generator. It is the pipeline() function:

import {pipeline} from 'stream/promises';

export const handler = awslambda.streamifyResponse(async (event, responseStream) => {
  responseStream.setContentType("text/plain");

  try {
    // DO NOT USE
    await pipeline(processRequest(), responseStream);
  } catch (e) {
    // the error in NOT caught here, despite the try-catch block:
    responseStream.write(`ERROR: ${String(e)}`);
  }

  responseStream.end();
});

However, the error handling simply does not work here, and if the processRequest() generator function throws an error, it is NOT caught in the catch block. The Lambda invocation finishes with Runtime.UnhandledPromiseRejection error.

I suspect an issue in Lambda internals, and I filled a support ticket 2 months ago. No response so far…

🪆 Enrich chunks

I’m sure you love generator functions by now. What if I told you there is one more neat aspect of them?

You may have noticed we didn’t add the “big string” to the yielded response chunks we sent. For this reason, in practice, they would be buffered instead of sent individually as we might expect.

So let’s fix this.

But instead of adding the big string to every yield or the main for-await loop, we can compose various “enricher” generator functions. Let’s add three: one to force chunk flush, one to add line breaks, and one to add timestamps to every chunk.

export const handler = awslambda.streamifyResponse(async (event, responseStream) => {
  responseStream.setContentType("text/html"); // notice we changed to HTML
  responseStream.write("<style>body {font-family: monospace}</style>"); // looks better

  try {
    const enriched = forceFlush(addLineBreak(addTimestamp(processRequest())))
    for await (const chunk of enriched) {
      responseStream.write(chunk);
    }
  } catch (err) {
    console.error(err);
    const message = err instanceof Error ? err.message : String(err);
    responseStream.write(`ERROR: ${message}`);
  }

  responseStream.end();
});

export const addTimestamp = async function* (iterable: AsyncIterable<string>) {
  for await (const line of iterable) {
    yield `${new Date().toTimeString().substring(0, 8)}\t ${line}`;
  }
}

export const addLineBreak = async function* (iterable: AsyncIterable<string>) {
  for await (const line of iterable) {
    yield `${line}<br>\n`;
  }
}

const bigStr = " ".repeat(100_000);
export const forceFlush = async function* (iterable: AsyncIterable<string>) {
  for await (const line of iterable) {
    yield `${line}${bigStr}`;
  }
}

const processRequest = async function* () {
  yield 'Processing request, please wait...';
  await sleep(1000);
  yield 'Planning...';
  await sleep(1000);
  yield 'Doing...';
  await sleep(1000);
  yield 'Testing...';
  await sleep(1000);
  yield 'Completed!';
};

export const sleep = (ms: number) => new Promise(r => setTimeout(r, ms));

The output:

18:34:45 Processing request, please wait...
18:34:46 Planning...
18:34:47 Doing...
18:34:48 Testing...
18:34:49 Completed!

Because enricher functions are composable, we can easily mix and match them as needed.

❗ One caveat

Lambda Response Streaming execution does not stop when you close the connection – the browser window, in this case. The function invocation will go until it finishes or timeouts. So keep in mind that even though you closed the browser, the execution continues, and you will pay $0.01 for every 80 minutes of Lambda execution with 128 MB of memory (with us-east-1 region pricing).

🧭 What’s next?

The next should be a practical use case. And I have one – starting an EC2 instance (😱), waiting for the server to load, and redirecting to the app it hosts. But that’s a topic for another post.