I have been really enjoying protohackers recently. Protohackers feels quite similar to Advent of Code but on a less rigid schedule and with a focus on low level networking code (TCP & UDP). I have been doing the challenges with Deno and have learned a lot both about networking, binary protocols and about Deno.

In the beginning, my code was very imperative but for one of the more recent challenges, I started looking into Deno’s streaming APIs some more. Deno uses the newly standardised Web Streams API which attempt to make streaming APIs available in a variety of JavaScript runtimes.

For the challenge I was attempting, I needed to parse binary data and pass it on as objects on a higher abstraction level. I quickly found that a TransformStream would be what I wanted for converting the data but I could not find a lot of detailed documentation on them. The Deno blog contains a basic example and I also found it helpful to read the source code of the DelimiterStream class in Deno’s std-lib .

I’m going to try to summarise what I have learned and will show another couple examples here.

Let’s start with the most basic way to use streams. We’ll create a stream that is both writable and readable and will pass some data though it:

import { writableStreamFromWriter } from "https://deno.land/std@0.157.0/streams/conversion.ts";

const stream = new TransformStream();

stream.readable
  .pipeThrough(new TextEncoderStream())
  .pipeTo(writableStreamFromWriter(Deno.stdout))
  .catch(console.error);

const texts: string[] = ["message 1\n", "message 2\n"];

const writer = stream.writable.getWriter();
for (const part of texts) {
  await writer.ready;
  await writer.write(part);
}

If we run this, we will get the following output:

message 1
message 2

Not very exciting yet. We create a TransformStream - that is a stream that has both a readable and a writable side. In reality, you would probably have a readable stream here, something like a Deno.conn or the result of a fetch call. We then connect a new output to the stream by wrapping Deno.stdout in a writableStreamFromWriter.

We could do something quite similar to the example from the Deno blog above and do:

stream.readable
  .pipeThrough(
    new TransformStream({
      transform: (chunk, controller) => {
        controller.enqueue(chunk.toUpperCase());
      },
    })
  )
  .pipeThrough(new TextEncoderStream())
  .pipeTo(writableStreamFromWriter(Deno.stdout))
  .catch(console.error);

We now get:

MESSAGE 1
MESSAGE 2

What I struggled with though and what I think is even more interesting, is not passing on data chunk by chunk but instead changing when it gets forwarded. Let’s change our writer to write a new message every second:

const writer = stream.writable.getWriter();
let count = 0;
setInterval(async () => {
  await writer.ready;
  await writer.write(`message ${count}`);
  count++;
}, 1_000);

We are now going to add a custom transformer which only ever forwards messages in chunks of 50 characters:


class StringLengthTransformer extends TransformStream<string, string> {
  data: string = "";
  LIMIT: number;
  constructor(limit: number) {
    super({
      transform: (chunk, controller) => {
        this.#handle(chunk, controller);
      },
    });
    this.LIMIT = limit;
  }

  #handle(chunk: string, controller: TransformStreamDefaultController<string>) {
    this.data += chunk;
    if (this.data.length >= this.LIMIT) {
      controller.enqueue(this.data.slice(0, this.LIMIT));
      this.data = this.data.slice(this.LIMIT);
    }
  }
}

As you can see, we create a new class which subclasses TranformStream. It holds a data field in which it stores the data. For every chunk that it gets, it appends it to data and checks if the limit is reached yet. If so, it forwards the message.

We can also slighlty refactor, how we construct our stream and how we log the result:

const chunkedStream = stream.readable
  .pipeThrough(new StringLengthTransformer(20));

for await (const message of chunkedStream) {
  console.log(message);
}

On execution we now see (after a while):

message 0message 1me
ssage 2message 3mess
age 4message 5messag
e 6message 7message 
8message 9message 10

Note that there is one edge case the we have not handled yet. If were to change our writer to:

let count = 0;
setInterval(async () => {
  await writer.ready;
  await writer.write(
    `This is a really long message. It is message number ${count}`
  );
  count++;
}, 1_000);

const chunkedStream = stream.readable.pipeThrough(
  new StringLengthTransformer(20)
);

for await (const message of chunkedStream) {
  const timeStamp = new Intl.DateTimeFormat("de-DE", {
    timeStyle: "long",
  }).format(new Date());
  console.log(`[${timeStamp}]: ${message}`);
}

We would get:

[15:21:35 MEZ]: This is a really lon
[15:21:36 MEZ]: g message. It is mes
[15:21:37 MEZ]: sage number 0This is

Our chunk now gets split into three parts. The problem is that we only enqueue a message whenever a new one arrives. This means that by the time the third message arrives, we just get done with enqueuing the first one. In order to handle messages that are longer than the limit already, we can swap out the if for a while in our Transformer:

  #handle(chunk: string, controller: TransformStreamDefaultController<string>) {
    this.data += chunk;
    while (this.data.length >= this.LIMIT) {
      controller.enqueue(this.data.slice(0, this.LIMIT));
      this.data = this.data.slice(this.LIMIT);
    }
  }

Now, the output looks much closer to what we expect. We get multiple logs per second so that all data can be written:

[15:24:29 MEZ]: This is a really lon
[15:24:29 MEZ]: g message. It is mes
[15:24:30 MEZ]: sage number 0This is
[15:24:30 MEZ]:  a really long messa
[15:24:30 MEZ]: ge. It is message nu

There are many more things that one could do with TransformStreams and I’m quite happy to have learned about them. I’m already looking forward to the next time I get to use them.