Using readable byte streams

Readable byte streams are readable streams that have an underlying byte source of type: "bytes", and which support efficient zero-copy transfer of data from the underlying source to a consumer (bypassing the stream's internal queues). They are intended for use cases where data might be supplied or requested in arbitrary sized and potentially very large chunks, and hence where avoiding making copies is likely to improve efficiency.

This article explains how readable byte streams compare to normal "default" streams, and how you create and consume them.

Note: Readable byte streams are almost identical to "normal" readable streams and almost all of the concepts are the same. This article assumes that you already understand those concepts and will only be covering them superficially (if at all). If you're not familiar with the relevant concepts, please first read: Using readable streams, Streams concepts and usage overview, and Streams API concepts.

Overview

Readable streams provides a consistent interface for streaming data from some underlying source, such as a file or socket, to a consumer, such as a reader, transform stream or writable stream. In a normal readable stream, data from the underlying source always passes to a consumer through the internal queues. A readable byte stream differs in that if the internal queues are empty, the underlying source can write directly to the consumer (an efficient zero-copy transfer).

A readable byte stream is created by specifying type: "bytes" in the underlyingSource object that may be passed as the first parameter to the ReadableStream() constructor. With this value set, the stream is created with a ReadableByteStreamController, and this is the object that is passed to the underlying source when the start(controller) and pull(controller) callback functions are invoked.

The main difference between ReadableByteStreamController and the default controller (ReadableStreamDefaultController) is that it has an additional property ReadableByteStreamController.byobRequest of type ReadableStreamBYOBRequest. This represents a pending read request by a consumer that will be made as a zero-copy transfer from the underlying source. The property will be null if there is no pending request.

A byobRequest is only made available when a read request is made on a readable byte stream and there is no data in the stream's internal queues (if there is data then the request is satisfied from those queues).

An underlying byte source that needs to transfer data must check the byobRequest property and, if it is available, use it to transfer data. If the property is null, incoming data should instead be added to the stream's internal queues using ReadableByteStreamController.enqueue() (this is the only way to transfer data when using a "default" stream).

The ReadableStreamBYOBRequest has a view property, which is a view on the buffer allocated for transfer. Data from an underlying source should be written into this property, and then the underlying source must call respond() indicating the number of bytes written. This signals that the data should be transferred, and the pending read request by the consumer resolved. After calling respond() the view can no longer be written.

There is also an additional method ReadableStreamBYOBRequest.respondWithNewView() to which an underlying source can pass a "new" view containing data to be transferred. This new view must be over the same memory buffer as the original, and from the same starting offset. This method might be used if the underlying byte source needs to first transfer the view to a worker thread to populate (for example) and then get it back before responding to the byobRequest. In most cases this method will not be needed.

Readable byte streams are normally read using a ReadableStreamBYOBReader, which can be obtained by calling ReadableStream.getReader() on the stream, specifying mode: "byob" in the options parameter.

A readable byte stream can also be read using a default reader (ReadableStreamDefaultReader), but in this case byobRequest objects are only created when automatic buffer allocation is enabled for the stream (autoAllocateChunkSize was set for the stream's underlyingSource). Note that the size indicated by autoAllocateChunkSize is used for the buffer size in this case; for a byte reader the buffer used is supplied by the consumer. If the property was not specified, the default reader will still "work" but the underlying source will never be offered a byobRequest, and all data will be transferred through the stream's internal queues.

Other than the differences outlined above, the controller and underlying source for bytes streams are very similar to those for default streams, and are used in much the same way.

Examples

Underlying push source with byte reader

This live example shows how to create a readable byte stream with a push underlying byte source, and read it using a byte reader.

Unlike with a pull underlying byte source, data can arrive at any time. Therefore the underlying source must use controller.byobRequest to transfer incoming data if one exists, and otherwise enqueue the data into the stream's internal queues. Further, since the data can arrive at any time the monitoring behavior is set up in the underlyingSource.start() callback function.

The example is highly influenced by a push byte source example in the stream specification. It uses a mocked "hypothetical socket" source that supplies data of arbitrary sizes. The reader is deliberately delayed at various points to allow the underlying source to use both transfer and enqueuing to send data to the stream. Backpressure support is not demonstrated.

Note: An underlying byte source can also be used with a default reader. If automatic buffer allocation is enabled the controller will supply fixed-size buffers for zero-copy transfers when there is an outstanding request from a reader and the stream's internal queues are empty. If automatic buffer allocation is not enabled then all data from the byte stream will always be enqueued. This is similar to the behavior shown in the "pull: underlying byte source examples.

Mocked underlying socket source

The mocked underlying source has three important methods:

  • select2() represents an outstanding request on the socket. It returns a promise that is resolved when data is available.
  • readInto() reads data from the socket into a supplied buffer and then clears the data.
  • close() closes the socket.

The implementation is very simplistic. As shown below, select2() creates a randomly sized buffer of random data on a timeout. The created data is read into a buffer then cleared in readInto().

class MockHypotheticalSocket {
  constructor() {
    this.max_data = 800; // total amount of data to stream from "socket"
    this.max_per_read = 100; // max data per read
    this.min_per_read = 40; // min data per read
    this.data_read = 0; // total data read so far (capped is maxdata)
    this.socketdata = null;
  }

  // Method returning promise when this socket is readable.
  select2() {
    // Object used to resolve promise
    const resultobj = {};
    resultobj["bytesRead"] = 0;

    return new Promise((resolve/*, reject*/) => {
      if (this.data_read >= this.max_data) { //out of data
        resolve(resultobj);
        return;
      }

      // Emulate slow read of data
      setTimeout(() => {
        const numberBytesReceived = this.getNumberRandomBytesSocket();
        this.data_read += numberBytesReceived;
        this.socketdata = this.randomByteArray(numberBytesReceived);
        resultobj["bytesRead"] = numberBytesReceived;
        resolve(resultobj);
      }, 500);
    });
  }

  /* Read data into specified buffer offset */
  readInto(buffer, offset, length) {
    let length_data = 0;
    if (this.socketdata) {
      length_data = this.socketdata.length;
      const myview = new Uint8Array(buffer, offset, length);
      // Write the length of data specified into buffer
      // Code assumes buffer always bigger than incoming data
      for (let i = 0; i < length_data; i++) {
        myview[i]=this.socketdata[i];
      }
      this.socketdata = null; // Clear "socket" data after reading
    }
    return length_data;
  }

  // Dummy close function
  close() {
    return;
  }

  // Return random number bytes in this call of socket
  getNumberRandomBytesSocket() {
    // Capped to remaining data and the max min return-per-read range
    const remaining_data = this.max_data - this.data_read;
    const numberBytesReceived = remaining_data < this.min_per_read
      ? remaining_data
      : this.getRandomIntInclusive(this.min_per_read, Math.min(this.max_per_read, remaining_data));
    return numberBytesReceived;
  }

  // Return random number between two values
  getRandomIntInclusive(min, max) {
    min = Math.ceil(min);
    max = Math.floor(max);
    return Math.floor(Math.random() * (max - min + 1) + min);
  }

  // Return random character string
  randomChars(length = 8) {
    let string = "";
    let choices = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()";

    for (let i = 0; i < length; i++) {
      string += choices.charAt(Math.floor(Math.random() * choices.length));
    }
    return string;
  }

  /* Return random Uint8Array of bytes */
  randomByteArray(bytes = 8) {
    const textEncoder = new TextEncoder();
    return textEncoder.encode(this.randomChars(bytes));
  }
};

Creating a readable socket push byte stream

The following code shows how to define a readable socket "push" byte stream.

The underlyingSource object definition is passed as the first parameter to the ReadableStream() constructor. To make this a readable "byte" stream, we specify type: "bytes" as a property of the object. This ensures that the stream is handed a ReadableByteStreamController (instead of the default controller (ReadableStreamDefaultController))

Since data can arrive at the socket before the consumer is ready to handle it, everything about reading the underlying source is configured in the start() callback method (we don't wait on a pull to start handling data). The implementation opens the "socket" and calls select2() to request data. When the returned promise resolves the code checks if controller.byobRequest exists (is not null), and if so calls socket.readInto() to copy data into the request and transfer it. If byobRequest does not exist there is no outstanding request from a consuming stream that can be satisfied as a zero-copy transfer. In this case, controller.enqueue() used to copy data to the stream internal queues.

The select2() request for more data is reposted until a request is returned with no data. A this point the controller is used to close the stream.

const stream = makeSocketStream("dummy host", "dummy port")

const DEFAULT_CHUNK_SIZE = 400;

function makeSocketStream(host, port) {
  const socket = new MockHypotheticalSocket();

  return new ReadableStream({
    type: "bytes",

    start(controller) {
      readRepeatedly().catch((e) => controller.error(e));
      function readRepeatedly() {
        return socket.select2().then(() => {
          // Since the socket can become readable even when there's
          // no pending BYOB requests, we need to handle both cases.
          let bytesRead;
          if (controller.byobRequest) {
            const v = controller.byobRequest.view;
            bytesRead = socket.readInto(v.buffer, v.byteOffset, v.byteLength);
            if (bytesRead === 0) {
              controller.close();
            }
            controller.byobRequest.respond(bytesRead);
            logSource(`byobRequest with ${bytesRead} bytes`);
          } else {
            const buffer = new ArrayBuffer(DEFAULT_CHUNK_SIZE);
            bytesRead = socket.readInto(buffer, 0, DEFAULT_CHUNK_SIZE);
            if (bytesRead === 0) {
              controller.close();
            } else {
              controller.enqueue(new Uint8Array(buffer, 0, bytesRead));
            }
            logSource(`enqueue() ${bytesRead} bytes (no byobRequest)`);
          }

          if (bytesRead === 0) {
            return;
            // no more bytes in source
          }
          return readRepeatedly();
        });
      }
    },

    cancel() {
      socket.close();
      logSource(`cancel(): socket closed`);
    }
  });
}

Note that readRepeatedly() returns a promise, and we use this to catch any errors from setting up or handling the read operation. The errors are then passed to the controller as shown above (see readRepeatedly().catch((e) => controller.error(e));).

A cancel() method is provided at the end to close the underlying source; the pull() callback is not needed, and is therefore not implemented.

Consuming the push byte stream

The following code creates a ReadableStreamBYOBReader for the socket byte stream and uses it read data into a buffer. Note processText() is called recursively to read more data until the buffer is filled. When the underlying source signals that it has no more data, the reader.read() will have done set to true, which in turn completes the read operation.

This code is almost exactly the same as for the Underlying pull source with byte reader example above. The only difference is that the reader includes some code to slow down reading, so the log output can demonstrate that data will be enqueued if not read fast enough.

const reader = stream.getReader({mode: "byob"});
let buffer = new ArrayBuffer(4000);
readStream(reader);

function readStream(reader) {
  let bytesReceived = 0;
  let offset = 0;

  while (offset < buffer.byteLength) {
    // read() returns a promise that resolves when a value has been received
    reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
      .then(async function processText({ done, value }) {
        // Result objects contain two properties:
        // done  - true if the stream has already given all its data.
        // value - some data. Always undefined when done is true.

        if (done) {
          logConsumer(`readStream() complete. Total bytes: ${bytesReceived}`);
          return;
        }

        buffer = value.buffer;
        offset += value.byteLength;
        bytesReceived += value.byteLength;

        //logConsumer(`Read ${bytesReceived} bytes: ${value}`);
        logConsumer(`Read ${bytesReceived} bytes`);
        result += value;

        // Add delay to emulate when data can't be read and data is enqueued
        if (bytesReceived > 300 && bytesReceived < 600) {
          logConsumer(`Delaying read to emulate slow stream reading`);
          const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
          await delay(1000);
        }

        // Read some more, and call this function again
        return reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset)).then(processText);
      });
  }
}

Cancelling the stream using the reader

We can use ReadableStreamBYOBReader.cancel() to cancel the stream. For this example we call the method if a button is clicked with a reason "user choice" (other HTML and code for the button not shown). We also log when the cancel operation completes.

button.addEventListener('click', () => {
  reader.cancel("user choice").then(() => logConsumer('reader.cancel complete'));
});

ReadableStreamBYOBReader.releaseLock() can be used to release the reader without cancelling the stream. Note however that any outstanding read requests will immediately be rejected. A new reader can be acquired later on to read the remaining chunks.

Monitoring for stream for close/error

The ReadableStreamBYOBReader.closed property returns a promise that will resolve when the stream is closed, and reject if there is an error. While no errors are expected in this case, the following code should log the completion case.

reader.closed
  .then(() => { logConsumer("ReadableStreamBYOBReader.closed: resolved")} )
  .catch(() => { logConsumer("ReadableStreamBYOBReader.closed: rejected:")} );

Result

The logging from the underlying push source (left) and consumer (right) are shown below. Not the period in the middle where data is enqueued rather than transferred as a zero-copy operation.

Underlying pull source with byte reader

This live example shows how data might be read from an "pull" underlying byte source, such as a file, and transferred by a stream as a zero-copy transfer to a ReadableStreamBYOBReader.

Mocked underlying file source

For the underlying pull source we use the following class to (very superficially) mock a nodejs FileHandle, and in particular the read() method. The class generates random data to represent a file. The read() method reads from this data into a provided buffer from the specified position. The close() method does nothing: it is only provided to show where you might close the source when defining the constructor for the stream.

Note: This same class is used for all the "pull source" examples. It is shown here for information only (so that it is obvious that it is a mock).

class MockUnderlyingFileHandle {
  constructor() {
    this.maxdata = 1300; // "file size"
    this.filedata = this.randomByteArray(this.maxdata);
    this.position = 0;
  }

  // Read data from "file" at position/length into specified buffer offset
  read(buffer, offset, length, position) {
    // Object used to resolve promise
    const resultobj = {};
    resultobj["buffer"] = buffer;
    resultobj["bytesRead"] = 0;

    return new Promise((resolve/*, reject*/) => {
      if (position >= this.maxdata) { //out of data
        resolve(resultobj);
        return;
      }

      // Read random data into supplied buffer
      const myview = new Uint8Array(buffer, offset, length);
      // Write the length of data specified
      for (let i = 0; i < length; i++) {
        myview[i]=this.filedata[position + i];
        resultobj["bytesRead"] = i;
        if (position + i >= this.maxdata) {
          break;
        }
      }
      // Emulate slow read of data
      setTimeout(() => { resolve(resultobj); }, 1000);
    });
  }

  // Dummy close function
  close() {
    return;
  }

  // Return random character string
  randomChars(length = 8) {
    let string = "";
    let choices = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()";

    for (let i = 0; i < length; i++) {
      string += choices.charAt(Math.floor(Math.random() * choices.length));
    }
    return string;
  }

  // Return random Uint8Array of bytes
  randomByteArray(bytes = 8) {
    const textEncoder = new TextEncoder();
    return textEncoder.encode(this.randomChars(bytes));
  }
};

Creating a readable file byte stream

The following code shows how to define a readable file byte stream.

Just as for the previous example, the underlyingSource object definition is passed as the first parameter to the ReadableStream() constructor. To make this a readable "byte" stream, we specify type: "bytes" as a property of the object. This ensures that the stream is handed a ReadableByteStreamController.

The start() function simply opens the file handle, which is then closed in the cancel() callback. cancel() is provided to clean up any resources if ReadableStream.cancel() or ReadableStreamDefaultController.close() are called.

Most of the interesting code is in the pull() callback. This copies data from the file into the pending read request (ReadableByteStreamController.byobRequest) and then calls respond() to indicate how much data is in the buffer and transfer it. If 0 bytes were transferred from the file then we know it has all been copied, and call close() on the controller, which in turn will result in cancel() being called on the underlying source.

const stream = makeReadableByteFileStream("dummy file.txt")

function makeReadableByteFileStream(filename) {
  let fileHandle;
  let position = 0;
  return new ReadableStream({
    type: "bytes",  // An underlying byte stream!
    start(controller) {
      // Called to initialise the underlying source.
      // For a file source open a file handle (here we just create the mocked object).
      fileHandle = new MockUnderlyingFileHandle();
      logSource(`start(): ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`)
    },
    async pull(controller) {
      // Called when there is a pull request for data
      const theView = controller.byobRequest.view;
      const { bytesRead, buffer } =
        await fileHandle.read(theView.buffer, theView.offset, theView.length, position)
      if (bytesRead === 0) {
        await fileHandle.close();
        controller.close();
        controller.byobRequest.respond(0);
        logSource(`pull() with byobRequest. Close controller (read bytes: ${bytesRead})`);
      } else {
        position += bytesRead;
        controller.byobRequest.respond(bytesRead);
        logSource(`pull() with byobRequest. Transfer ${bytesRead} bytes`);
      }
    },
    cancel(reason) {
      // This is called if the stream is cancelled (via reader or controller).
      // Clean up any resources
      fileHandle.close();
      logSource(`cancel() with reason: ${reason}`);
    }
  });
}

Consuming the byte stream

The following code creates a ReadableStreamBYOBReader for the file byte stream and uses it read data into a buffer. Note processText() is called recursively to read more data until the buffer is filled. When the underlying source signals that it has no more data, the reader.read() will have done set to true, which in turn completes the read operation.

const reader = stream.getReader({mode: "byob"});
let buffer = new ArrayBuffer(4000);
readStream(reader);

function readStream(reader) {
  let bytesReceived = 0;
  let offset = 0;

  while (offset < buffer.byteLength) {
    // read() returns a promise that resolves when a value has been received
    reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
      .then(function processText({ done, value }) {
        // Result objects contain two properties:
        // done  - true if the stream has already given all its data.
        // value - some data. Always undefined when done is true.

        if (done) {
          logConsumer(`readStream() complete. Total bytes: ${bytesReceived}`);
          return;
        }

        buffer = value.buffer;
        offset += value.byteLength;
        bytesReceived += value.byteLength;

        logConsumer(`Read ${bytesReceived} bytes: ${value}`);
        result += value;

        // Read some more, and call this function again
        return reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset)).then(processText);
      });
  }
}

Lastly, we add a handler that will cancel the stream if a button is clicked (other HTML and code for the button not shown).

button.addEventListener('click', () => { reader.cancel("user choice").then(() => { logConsumer(`reader.cancel complete`) }) } );

Result

The logging from the underlying pull source (left) and consumer (right) are shown below. Of particular note are that the:

  • start() function is passed a ReadableByteStreamController
  • the buffer passed to the reader is large enough to encompass the whole "file", so the whole file is transferred in one operation.

Underlying pull source with default reader

This live example shows how the same data might be read as a zero-copy transfer using a default reader (ReadableStreamDefaultReader). This uses the same mocked underlying file source as in the preceding example.

Creating a readable file byte stream with automatic buffer allocation

The only difference in our underlying source is that we must specify autoAllocateChunkSize, and that the size will be used as the view buffer size for controller.byobRequest, rather than one supplied by the consumer.

const DEFAULT_CHUNK_SIZE = 200;
const stream = makeReadableByteFileStream("dummy file.txt")

function makeReadableByteFileStream(filename) {
  let fileHandle;
  let position = 0;
  return new ReadableStream({
    type: "bytes",  // An underlying byte stream!
    start(controller) {
      // Called to initialise the underlying source.
      // For a file source open a file handle (here we just create the mocked object).
      fileHandle = new MockUnderlyingFileHandle();
      logSource(`start(): ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`)
    },
    async pull(controller) {
      // Called when there is a pull request for data
      const theView = controller.byobRequest.view;
      const { bytesRead, buffer } =
        await fileHandle.read(theView.buffer, theView.offset, theView.length, position)
      if (bytesRead === 0) {
        await fileHandle.close();
        controller.close();
        controller.byobRequest.respond(0);
        logSource(`pull() with byobRequest. Close controller (read bytes: ${bytesRead})`);
      } else {
        position += bytesRead;
        controller.byobRequest.respond(bytesRead);
        logSource(`pull() with byobRequest. Transfer ${bytesRead} bytes`);
      }
    },
    cancel(reason) {
      // This is called if the stream is cancelled (via reader or controller).
      // Clean up any resources
      fileHandle.close();
      logSource(`cancel() with reason: ${reason}`);
    },
    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE // Only relevant if using a default reader
  });
}

Consuming the byte stream with default reader

The following code creates a ReadableStreamDefaultReader for the file byte stream by calling stream.getReader(); without specifying the mode, and uses it read data into a buffer. The operation of the code is the same as the previous example except that the buffer is supplied by the stream rather than the consumer.

const reader = stream.getReader();
readStream(reader);

function readStream(reader) {
  let bytesReceived = 0;
  let result = '';

  // read() returns a promise that resolves
  // when a value has been received
  reader.read().then(function processText({ done, value }) {
    // Result objects contain two properties:
    // done  - true if the stream has already given you all its data.
    // value - some data. Always undefined when done is true.
    if (done) {
      logConsumer(`readStream() complete. Total bytes: ${bytesReceived}`);
      return;
    }

    bytesReceived += value.length;
    logConsumer(`Read ${bytesReceived} bytes so far. Current bytes = ${value}`);
    result += value;

    // Read some more, and call this function again
    return reader.read().then(processText);
  });
}

Lastly, we add a handler that will cancel the stream if a button is clicked (other HTML and code for the button not shown).

button.addEventListener('click', () => { reader.cancel("user choice").then(() => { logConsumer(`reader.cancel complete`) }) } );

Result

The logging from the underlying bye pull source (left) and consumer (right) are shown below.

Note that the chunks are now 200-byte wide, as specified in the underlying byte source. These are made as zero-copy transfers.

Underlying pull source with default reader and no allocation

For completeness, we can also use a default reader with a byte source that does not support automatic buffer allocation.

However in this case the controller will not supply a byobRequest for the underlying source to write into. Instead the underlying source would have to enqueue the data. Note below that to support this case, in pull() we need to check if the byobRequest exists.

const stream = makeReadableByteFileStream("dummy file.txt")
const DEFAULT_CHUNK_SIZE = 300;

function makeReadableByteFileStream(filename) {
  let fileHandle;
  let position = 0;
  return new ReadableStream({
    type: "bytes",  // An underlying byte stream!
    start(controller) {
      // Called to initialise the underlying source.
      // For a file source open a file handle (here we just create the mocked object).
      fileHandle = new MockUnderlyingFileHandle();
      logSource(`start(): ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`)
    },
    async pull(controller) {
      // Called when there is a pull request for data
      if (controller.byobRequest) {
         const theView = controller.byobRequest.view;
         const {bytesRead, buffer} = await fileHandle.read(theView.buffer, theView.offset, theView.length, position)
         if (bytesRead === 0) {
           await fileHandle.close();
           controller.close();
           controller.byobRequest.respond(0);
           logSource(`pull() with byobRequest. Close controller (read bytes: ${bytesRead})`);
         } else {
            position += bytesRead;
            controller.byobRequest.respond(bytesRead);
            logSource(`pull() with byobRequest. Transfer ${bytesRead} bytes`);
         }
      } else {
        // No BYOBRequest so enqueue data to stream
        // NOTE, this branch would only execute for a default reader if autoAllocateChunkSize is not defined.
        const mynewBuffer = new Uint8Array(DEFAULT_CHUNK_SIZE);
        const {bytesRead, buffer} = await fileHandle.read(mynewBuffer.buffer, mynewBuffer.offset, mynewBuffer.length, position);
        if (bytesRead === 0) {
          await fileHandle.close();
          controller.close();
          controller.enqueue(mynewBuffer);
          logSource(`pull() with no byobRequest. Close controller (read bytes: ${bytesRead})`);
        } else {
           position += bytesRead;
           controller.enqueue(mynewBuffer);
           logSource(`pull() with no byobRequest. enqueue() ${bytesRead} bytes`);
        }
      }
    },
    cancel(reason) {
      // This is called if the stream is cancelled (via reader or controller).
      // Clean up any resources
      fileHandle.close();
      logSource(`cancel() with reason: ${reason}`);
    },
  });
}

Result

The logging from the underlying pull source (left) and consumer (right) are shown below. Note that the underlying source side shows that the data has been enqueued rather than zero-byte transferred.

See also