Skip to content

Commit 33b1574

Browse files
authored
lib: add support for readable byte streams to .toWeb()
Add support for the creation of ReadableByteStream to Readable.toWeb() and Duplex.toWeb() This enables the use of .getReader({ mode: "byob" }) on e.g. socket().toWeb() Refs: #56004 (comment) Refs: https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_byte_streams PR-URL: #58664 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Ethan Arrowood <[email protected]> Reviewed-By: Mattias Buelens <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 1ce22dd commit 33b1574

File tree

7 files changed

+132
-10
lines changed

7 files changed

+132
-10
lines changed

doc/api/stream.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3188,6 +3188,9 @@ Returns whether the stream has been read from or cancelled.
31883188
<!-- YAML
31893189
added: v17.0.0
31903190
changes:
3191+
- version: REPLACEME
3192+
pr-url: https://github.com/nodejs/node/pull/58664
3193+
description: Add 'type' option to specify 'bytes'.
31913194
- version:
31923195
- v24.0.0
31933196
- v22.17.0
@@ -3210,6 +3213,7 @@ changes:
32103213
If no value is provided, the size will be `1` for all the chunks.
32113214
* `chunk` {any}
32123215
* Returns: {number}
3216+
* `type` {string} Must be 'bytes' or undefined.
32133217
* Returns: {ReadableStream}
32143218

32153219
### `stream.Writable.fromWeb(writableStream[, options])`
@@ -3383,11 +3387,14 @@ duplex.write('hello');
33833387
duplex.once('readable', () => console.log('readable', duplex.read()));
33843388
```
33853389

3386-
### `stream.Duplex.toWeb(streamDuplex)`
3390+
### `stream.Duplex.toWeb(streamDuplex[, options])`
33873391

33883392
<!-- YAML
33893393
added: v17.0.0
33903394
changes:
3395+
- version: REPLACEME
3396+
pr-url: https://github.com/nodejs/node/pull/58664
3397+
description: Add 'type' option to specify 'bytes'.
33913398
- version:
33923399
- v24.0.0
33933400
- v22.17.0
@@ -3396,6 +3403,8 @@ changes:
33963403
-->
33973404

33983405
* `streamDuplex` {stream.Duplex}
3406+
* `options` {Object}
3407+
* `type` {string} Must be 'bytes' or undefined.
33993408
* Returns: {Object}
34003409
* `readable` {ReadableStream}
34013410
* `writable` {WritableStream}

doc/api/webstreams.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1777,7 +1777,7 @@ text(readable).then((data) => {
17771777
[Streams]: stream.md
17781778
[WHATWG Streams Standard]: https://streams.spec.whatwg.org/
17791779
[`stream.Duplex.fromWeb`]: stream.md#streamduplexfromwebpair-options
1780-
[`stream.Duplex.toWeb`]: stream.md#streamduplextowebstreamduplex
1780+
[`stream.Duplex.toWeb`]: stream.md#streamduplextowebstreamduplex-options
17811781
[`stream.Duplex`]: stream.md#class-streamduplex
17821782
[`stream.Readable.fromWeb`]: stream.md#streamreadablefromwebreadablestream-options
17831783
[`stream.Readable.toWeb`]: stream.md#streamreadabletowebstreamreadable-options

lib/internal/streams/duplex.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,8 @@ Duplex.fromWeb = function(pair, options) {
191191
options);
192192
};
193193

194-
Duplex.toWeb = function(duplex) {
195-
return lazyWebStreams().newReadableWritablePairFromDuplex(duplex);
194+
Duplex.toWeb = function(duplex, options) {
195+
return lazyWebStreams().newReadableWritablePairFromDuplex(duplex, options);
196196
};
197197

198198
let duplexify;

lib/internal/webstreams/adapters.js

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ const {
7373
validateBoolean,
7474
validateFunction,
7575
validateObject,
76+
validateOneOf,
7677
} = require('internal/validators');
7778

7879
const {
@@ -417,7 +418,8 @@ function newStreamWritableFromWritableStream(writableStream, options = kEmptyObj
417418
* @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy
418419
* @param {Readable} streamReadable
419420
* @param {{
420-
* strategy : QueuingStrategy
421+
* strategy? : QueuingStrategy
422+
* type? : 'bytes',
421423
* }} [options]
422424
* @returns {ReadableStream}
423425
*/
@@ -432,6 +434,12 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
432434
'stream.Readable',
433435
streamReadable);
434436
}
437+
validateObject(options, 'options');
438+
if (options.type !== undefined) {
439+
validateOneOf(options.type, 'options.type', ['bytes', undefined]);
440+
}
441+
442+
const isBYOB = options.type === 'bytes';
435443

436444
if (isDestroyed(streamReadable) || !isReadable(streamReadable)) {
437445
const readable = new ReadableStream();
@@ -443,6 +451,9 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
443451
const highWaterMark = streamReadable.readableHighWaterMark;
444452

445453
const evaluateStrategyOrFallback = (strategy) => {
454+
// If the stream is BYOB, we only use highWaterMark
455+
if (isBYOB)
456+
return { highWaterMark };
446457
// If there is a strategy available, use it
447458
if (strategy)
448459
return strategy;
@@ -491,7 +502,19 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
491502
streamReadable.on('data', onData);
492503

493504
return new ReadableStream({
494-
start(c) { controller = c; },
505+
type: isBYOB ? 'bytes' : undefined,
506+
start(c) {
507+
controller = c;
508+
if (isBYOB) {
509+
streamReadable.once('end', () => {
510+
// close the controller
511+
controller.close();
512+
// And unlock the last BYOB read request
513+
controller.byobRequest?.respond(0);
514+
wasCanceled = true;
515+
});
516+
}
517+
},
495518

496519
pull() { streamReadable.resume(); },
497520

@@ -601,9 +624,10 @@ function newStreamReadableFromReadableStream(readableStream, options = kEmptyObj
601624

602625
/**
603626
* @param {Duplex} duplex
627+
* @param {{ type?: 'bytes' }} [options]
604628
* @returns {ReadableWritablePair}
605629
*/
606-
function newReadableWritablePairFromDuplex(duplex) {
630+
function newReadableWritablePairFromDuplex(duplex, options = kEmptyObject) {
607631
// Not using the internal/streams/utils isWritableNodeStream and
608632
// isReadableNodeStream utilities here because they will return false
609633
// if the duplex was created with writable or readable options set to
@@ -615,9 +639,11 @@ function newReadableWritablePairFromDuplex(duplex) {
615639
throw new ERR_INVALID_ARG_TYPE('duplex', 'stream.Duplex', duplex);
616640
}
617641

642+
validateObject(options, 'options');
643+
618644
if (isDestroyed(duplex)) {
619645
const writable = new WritableStream();
620-
const readable = new ReadableStream();
646+
const readable = new ReadableStream({ type: options.type });
621647
writable.close();
622648
readable.cancel();
623649
return { readable, writable };
@@ -633,8 +659,8 @@ function newReadableWritablePairFromDuplex(duplex) {
633659

634660
const readable =
635661
isReadable(duplex) ?
636-
newReadableStreamFromStreamReadable(duplex) :
637-
new ReadableStream();
662+
newReadableStreamFromStreamReadable(duplex, options) :
663+
new ReadableStream({ type: options.type });
638664

639665
if (!isReadable(duplex))
640666
readable.cancel();

test/parallel/test-stream-duplex.js

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,26 @@ process.on('exit', () => {
131131
assert.deepStrictEqual(Buffer.from(result.value), dataToRead);
132132
}));
133133
}
134+
135+
// Duplex.toWeb BYOB
136+
{
137+
const dataToRead = Buffer.from('hello');
138+
const dataToWrite = Buffer.from('world');
139+
140+
const duplex = Duplex({
141+
read() {
142+
this.push(dataToRead);
143+
this.push(null);
144+
},
145+
write: common.mustCall((chunk) => {
146+
assert.strictEqual(chunk, dataToWrite);
147+
})
148+
});
149+
150+
const { writable, readable } = Duplex.toWeb(duplex, { type: 'bytes' });
151+
writable.getWriter().write(dataToWrite);
152+
const data = new Uint8Array(dataToRead.length);
153+
readable.getReader({ mode: 'byob' }).read(data).then(common.mustCall((result) => {
154+
assert.deepStrictEqual(Buffer.from(result.value), dataToRead);
155+
}));
156+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
'use strict';
2+
require('../common');
3+
const { Readable } = require('stream');
4+
const assert = require('assert');
5+
const common = require('../common');
6+
7+
let count = 0;
8+
9+
const nodeStream = new Readable({
10+
read(size) {
11+
if (this.destroyed) {
12+
return;
13+
}
14+
// Simulate a stream that pushes sequences of 16 bytes
15+
const buffer = Buffer.alloc(size);
16+
for (let i = 0; i < size; i++) {
17+
buffer[i] = count++ % 16;
18+
}
19+
this.push(buffer);
20+
}
21+
});
22+
23+
// Test validation of 'type' option
24+
assert.throws(
25+
() => {
26+
Readable.toWeb(nodeStream, { type: 'wrong type' });
27+
},
28+
{
29+
code: 'ERR_INVALID_ARG_VALUE'
30+
}
31+
);
32+
33+
// Test normal operation with ReadableByteStream
34+
const webStream = Readable.toWeb(nodeStream, { type: 'bytes' });
35+
const reader = webStream.getReader({ mode: 'byob' });
36+
const expected = new Uint8Array(16);
37+
for (let i = 0; i < 16; i++) {
38+
expected[i] = count++;
39+
}
40+
41+
for (let i = 0; i < 1000; i++) {
42+
// Read 16 bytes of data from the stream
43+
const receive = new Uint8Array(16);
44+
reader.read(receive).then(common.mustCall((result) => {
45+
// Verify the data received
46+
assert.ok(!result.done);
47+
assert.deepStrictEqual(result.value, expected);
48+
}));
49+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
'use strict';
2+
require('../common');
3+
const { Readable } = require('stream');
4+
const assert = require('assert');
5+
const common = require('../common');
6+
{
7+
const r = Readable.from([]);
8+
// Cancelling reader while closing should not cause uncaught exceptions
9+
r.on('close', common.mustCall(() => reader.cancel()));
10+
11+
const reader = Readable.toWeb(r, { type: 'bytes' }).getReader({ mode: 'byob' });
12+
reader.read(new Uint8Array(16)).then(common.mustCall((result) => {
13+
assert.ok(result.done);
14+
}));
15+
}

0 commit comments

Comments
 (0)