diff --git a/demos/index.html b/demos/index.html index 2848fcfde..672690080 100644 --- a/demos/index.html +++ b/demos/index.html @@ -8,14 +8,17 @@

Streams Demos

Append child WritableStream
Piping through a series of transforms to a custom writable stream that appends children to - the DOM. + the DOM.
-
Streaming element -
Using custom element to stream into a given DOM location. +
Streaming element
+
Using custom element to stream into a given DOM location.
Streaming element with backpressure
A variation on the streaming element demo that adds backpressure to the element's writable - stream, decreasing jank. + stream, decreasing jank.
+ +
Random Values Stream
+
Piping a readable stream generating random values through a transform stream and then eventually writing it to the UI

Feel free to submit more demos by sending a pull request to the diff --git a/demos/random-values-stream/README.md b/demos/random-values-stream/README.md new file mode 100644 index 000000000..f074959e9 --- /dev/null +++ b/demos/random-values-stream/README.md @@ -0,0 +1,12 @@ +# Random Values Readable Stream + +Random Values Cryptography Stream generates random values in a readable stream using the [getRandomValues](https://developer.mozilla.org/en-US/docs/Web/API/RandomSource/getRandomValues) in the [WebCrypto API](https://developer.mozilla.org/en-US/docs/Web/API/Web_Crypto_API). The stream is then piped through a transform stream to make the output look prettier and then eventually written to a writable stream with an underlying sink that has relation to the User Interface. + +## About the Code + +A function called `pipeStream` initializes all the streams calling their respective methods along +with the arguments required and pipes the stream using the following code below + +```js +readableStream.pipeThrough(transformStream).pipeTo(writableStream); +``` diff --git a/demos/random-values-stream/index.html b/demos/random-values-stream/index.html new file mode 100644 index 000000000..099b040fc --- /dev/null +++ b/demos/random-values-stream/index.html @@ -0,0 +1,53 @@ + + + BackPressure built stream + + + + +

+

Random Cryptography Stream

+

Demonstration for reading random values array generated through WebCrypto getRandomValues function from a Readable Stream, passing it through a Transform Stream which converts the chunk into a readable form and writing it to the UI in the console using a Writable Stream with backpressure implemented. + +

+

Source Code

+

Read the the README.md file for explaination about the source code

+
+
+ + +
+
+
+

Readable Stream Pipe

+

Pipe readable stream through a transform stream to eventually write to a writable stream with an underlying sink that writes to the Web UI. Click the button below to start piping through the stream

+ +
+
+ + +
+
+ +

Status

+
+ Logs the status of the Readable and Writable Streams and data written to them +
+ +

Output

+ +
+ Output for the underlying sink will appear here +
+
+
+ + + + + + \ No newline at end of file diff --git a/demos/random-values-stream/index.js b/demos/random-values-stream/index.js new file mode 100644 index 000000000..5ac65906b --- /dev/null +++ b/demos/random-values-stream/index.js @@ -0,0 +1,217 @@ +/** + * Creates random values with the help of WebCrypto API + */ +function createRandomValuesStream(numberOfBytes = 10, valueInterval = 1000, maxValues = null) { + const cqs = new CountQueuingStrategy({ highWaterMark: 4 }); + const readableStream = new ReadableStream({ + totalEnqueuedItemsCount: 0, + interval: null, + + start(controller) { + logStatusText('`start` method of the readable stream called') + this.startValueInterval(controller); + }, + + /** + * Starting the random values generation again after a certain period + * @param {*} controller + */ + async pull(controller) { + if (controller.desiredSize > 2 && !this.interval) { + this.startValueInterval(controller); + } + }, + + async close(controller) { + logStatusText('`close` method of the readable stream called'); + this.clearValueInterval(); + controller.close(); + return; + }, + + async cancel() { + logStatusText('`cancel` method of the readable stream called') + this.clearValueInterval(); + }, + + throwFinalError(error) { + console.log('Errored out'); + console.error(error); + + this.clearValueInterval(); + }, + + startValueInterval(controller) { + if (this.interval) { + return; + } + + this.interval = setInterval(() => { + try { + controller.enqueue(randomValuesUint8Array(20)); + this.totalEnqueuedItemsCount++; + this.checkBackpressureSignal(controller); + + // Close the stream and clear the interval + if (maxValues && this.totalEnqueuedItemsCount >= maxValues) { + return this.close(controller); + } + } catch (error) { + this.throwFinalError(error); + } + }, valueInterval); + }, + + /** + * Clears the value interval stored in this.interval reference + */ + clearValueInterval() { + if (this.interval) { + clearInterval(this.interval); + this.interval = null; + } + }, + + /** + * Checks a backpressure signal and clears the interval + * not enqueuing any more values + * + * @param {*} controller + * @param {*} interval + */ + checkBackpressureSignal(controller, interval) { + if (controller.desiredSize <= 0) { + this.clearValueInterval(); + } + } + }, cqs); + + return readableStream; +} + +/** + * Creates a random values Uint8Array + * @param {*} numberOfBytes + */ +function randomValuesUint8Array(numberOfBytes) { + const uint8Array = new Uint8Array(numberOfBytes); + return window.crypto.getRandomValues(uint8Array); +} + +/** + * Create a writable stream to display the output with a builtin backpressure + */ +function createOutputWritableStream(parentElement) { + /** + * Equivalent to + * + * const cqs = new CountQueuingStrategy({ + * highWaterMark: 3, + * }); + */ + const queuingStrategy = { + highWaterMark: 3, + size() { return 1; } + } + + const writable = new WritableStream({ + async write(chunk, controller) { + try { + await writeChunk(chunk); + return; + } catch (error) { + return this.finalErrorHandler(error, controller); + } + }, + + finalErrorHandler(error, controller) { + controller.error(error); + logStatusText('Error occured in the writable stream'); + return error; + }, + + close() { + logStatusText('Closing the stream'); + console.log('Stream closed'); + } + }); + + /** + * Writes a chunk to the span and appends it to the parent element + * @param {*} chunk + */ + async function writeChunk(chunk) { + const containerElement = document.createElement('div'); + containerElement.className = 'output-chunk'; + containerElement.textContent = chunk; + parentElement.appendChild(containerElement); + return containerElement; + } + + return writable; +} + +function createArrayToStringTransform() { + const transformStream = new TransformStream({ + transform (chunk, controller) { + controller.enqueue(`${chunk.constructor.name}(${chunk.join(', ')})`); + } + }); + + return transformStream; +} + +/** + * Logs text regarding a status which is apart from the + * data written to the underlying sink and is related to status + * of the readable and writable streams + */ + +const statusContainer = document.querySelector('.output .status-container'); + +/** + * Logs status text + * @param {*} statusText + */ +function logStatusText(statusText) { + const divElement = document.createElement('div'); + divElement.className = 'status-chunk'; + divElement.textContent = statusText; + + statusContainer.appendChild(divElement); +} + +/** + * Demo related code + */ +async function pipeThroughHandler() { + const outputContainer = document.querySelector('.output .output-container'); + const pipeThroughButton = document.querySelector('.pipe-controls #pipe-through'); + outputContainer.innerHTML = statusContainer.innerHTML = ''; + + try { + pipeThroughButton.disabled = true; + logStatusText('Started writing to the stream'); + await pipeStream(outputContainer); + } catch (error) { + console.error(error); + } + + logStatusText('Done writing to the stream'); + pipeThroughButton.disabled = false; +} + +async function pipeStream(parentElement) { + const readableStream = createRandomValuesStream(10, 1000, 10); + const writableStream = createOutputWritableStream(parentElement); + const transformStream = createArrayToStringTransform(); + + return readableStream.pipeThrough(transformStream).pipeTo(writableStream); +} + +function initDemo() { + const pipeThroughButton = document.querySelector('.pipe-controls #pipe-through'); + pipeThroughButton.addEventListener('click', pipeThroughHandler); +} + +initDemo(); \ No newline at end of file diff --git a/demos/random-values-stream/styles.css b/demos/random-values-stream/styles.css new file mode 100644 index 000000000..4f8184fab --- /dev/null +++ b/demos/random-values-stream/styles.css @@ -0,0 +1,11 @@ +body { + margin: 2em auto ; + max-width: 970px; +} + +.output .output-container { + background-color: yellow; + overflow: auto; + padding: 10px; + max-height: 400px; +} \ No newline at end of file