Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create RTCRtpTransportProcessor and move high freq fields there #68

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion api-outline.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,33 @@ partial interface RTCRtpReceiver {
Promise<sequence<RTCRtpReceiveStream>> replaceReceiveStreams();
}

[Exposed=Window]
interface RTCRtpTransport {
Promise<RTCRtpSendStream> addRtpSendStream(RTCRtpSendStreamInit);
Promise<RTCRtpReceiveStream> addRtpReceiveStream(RTCRtpReceiveStreamInit);
Orphis marked this conversation as resolved.
Show resolved Hide resolved

// Causes RTCRtpTransportProcessorEvent to be fired on |worker|.
createProcessor(Worker worker, optional any options, optional sequence<object> transfer);
}

[Exposed=DedicatedWorker]
interface RTCRtpTransportProcessor {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So are we saying we want to require the use of workers to use RtpTransport? Are we sure that's what we want?

And if that's what we want, why bother calling this this RTCRtpTransportProcessor and not just RTCRtpTransport?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the discussion happening in #33. There's the suggestion it should be worker-only like RTCRtpScriptTransform (vs rely on transferability like MediaStreamTrackProcessor etc). Could you join the discussion there, Peter?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that it should be worker-first objects given the potential high volume of events and calls.
It might be fine to be able to transfer it to another worker (when it was discussed, value seems small), as well as to transfer by-products of this object (RTCRtpSendStream for instance).
It is always feasible to add this kind of functionality in the future.

// Options passed to createProcessor() triggering this to be created.
readonly attribute any options;

attribute EventHandler onpacketizedrtpavailable; // No payload. Call readPacketizedRtp
// Batch interface to read packetized RTP for senders which don't have associated
// RTCRtpSendStreams. Only provided if the PeerConnection was created with
// { customPacer: true }.
sequence<RTCRtpPacket> readPacketizedRtp(maxNumberOfPackets);

attribute EventHandler onsentrtp; // No payload. Use readSentRtp
// Batch interface to read SentRtp notifications.
// Only provided if the PeerConnection was created with { customPacer: true }.
sequence<SentRtp> readSentRtp(long maxCount);

attribute EventHandler onreceivedrtpacks; // No payload. Use readReceivedRtpAcks
// Batch interface to read RtpAcks as an alternative to onrtpacksreceived.
// Batch interface to read RtpAcks notifications.
sequence<RtpAcks> readReceivedRtpAcks(long maxCount);

readonly attribute unsigned long bandwidthEstimate; // bps
Expand All @@ -104,6 +121,16 @@ interface RTCRtpTransport {
attribute unsigned long customPerPacketOverhead;
}

[Exposed=DedicatedWorker]
interface RTCRtpTransportProcessorEvent : Event {
readonly attribute RTCRtpTransportProcessor processor;
};

partial interface DedicatedWorkerGlobalScope {
// Receives instances of RTCRtpTransportProcessorEvent.
attribute EventHandler onrtcrtptransportprocessor;
};

// RFC 8888 or Transport-cc feedback
interface RTCRtpAcks {
readonly attribute sequence<RTCRtpAck> acks;
Expand Down
19 changes: 12 additions & 7 deletions explainer-use-case-1.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,18 @@ rtpReceiveStream.onreceivedrtp = () => {
### Example 12: Custom bitrate allocation
```javascript
const [pc, rtpTransport] = await setupPeerConnectionWithRtpSender();
setInterval(() => {
for (const [rtpSender, bitrate] of allocateBitrates(rtpTransport.bandwidthEstimate)) { // Custom
const parameters = rtpSender.getParameters();
parameters.encodings[0].maxBitrate = bitrate;
rtpSender.setParameters(parameters);
}
}, 1000);
rtpTransport.createProcessor(new Worker("worker.js"));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createProcessor is a bit odd in terms of naming (it returns undefined for instance).
Also, it might be a bit less flexible/idiomatic than a regular attribute:

  1. We might want in the future to change processors (Worker 1 to worker 2)
  2. Web apps might want to stop using a processor
  3. Web apps might want to preflight creating processors before having RTPTransport.
  4. Having an attribute allows to easily know whether a process is active/which processor is active

I would tend to prefer having a rtpTransport.processor attribute that is set by the web application.
For instance: rtpTransport.processor = new RTCRtpTransportProcessorHandle(worker). Or maybe RTCRtpTransportHandle.


// worker.js
onrtcrtptransportprocessor = (e) => {
setInterval(() => {
for (const [rtpSender, bitrate] of allocateBitrates(e.processor.bandwidthEstimate)) { // Custom bandwidth allocation
const parameters = rtpSender.getParameters();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are window objects so it is a bit of an odd example, we should post message to the window env, or keep exposing these values in both RTCRtpTransport and RTCRtpTransportProcessor.

parameters.encodings[0].maxBitrate = bitrate;
rtpSender.setParameters(parameters);
}
}, 1000);
};
```

## Example 13: Receive with BYOB
Expand Down
161 changes: 95 additions & 66 deletions explainer-use-case-2.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,52 +27,66 @@ Applications need to be be able to batch processing to run much less often than

```javascript
const [pc, rtpTransport] = setupPeerConnectionWithRtpTransport(); // Custom
const estimator = createBandwidthEstimator(); // Custom
rtpTransport.onsentrtp = () => {
for (const sentRtp of rtpTransport.readSentRtp(100)) {
if (sentRtp.ackId) {
estimator.rememberSentRtp(sentRtp);
}
}
}
rtpTransport.onreceivedrtpacks = () => {
for (const rtpAcks in rtpTransport.readReceivedRtpAcks(100)) {
for (const rtpAck in rtpAcks.acks) {
const bwe = estimator.processReceivedAcks(rtpAck);
rtpTransport.customMaxBandwidth = bwe;
rtpTransport.createProcessor(new Worker("worker.js"));

// worker.js
onrtcrtptransportprocessor = (e) => {
const rtpTransportProcessor = e.processor;
const estimator = createBandwidthEstimator(); // Custom
rtpTransportProcessor.onsentrtp = () => {
for (const sentRtp of rtpTransportProcessor.readSentRtp(100)) {
if (sentRtp.ackId) {
estimator.rememberSentRtp(sentRtp);
}
}
}

};
rtpTransportProcessor.onreceivedrtpacks = () => {
for (const rtpAcks in rtpTransportProcessor.readReceivedRtpAcks(100)) {
for (const rtpAck in rtpAcks.acks) {
const bwe = estimator.processReceivedAcks(rtpAck);
rtpTransportProcessor.customMaxBandwidth = bwe;
}
}
};
};
```

## Example 2: Custom Pacing and Probing

```javascript
const [pc, rtpTransport] = setupPeerConnectionWithRtpTransport({customPacer: true}); // Custom
const pacer = createPacer(); // Custom
rtpTransport.onpacketizedrtpavailable = () => {
for (const rtpPacket in rtpTransport.readPacketizedRtp(100)) {
pacer.enqueue(rtpPacket);
rtpTransport.createProcessor(new Worker("worker.js"));

// worker.js
onrtcrtptransportprocessor = (e) => {
runPacing(e.processor);
};

async function runPacing(rtpTransportProcessor) {
const pacer = createPacer(); // Custom
rtpTransportProcessor.onpacketizedrtpavailable = () => {
for (const rtpPacket in rtpTransportProcessor.readPacketizedRtp(100)) {
pacer.enqueue(rtpPacket);
}
};
while (true) {
const [rtpSendStream, originalPacket, paddingBytes, sendTime] = await pacer.dequeue(); // Custom
// Create an RTCRtpPacketInit instance with the desired padding.
const packetInit = {
originalPacket.marker,
originalPacket.payloadType,
originalPacket.timestamp,
originalPacket.csrcs,
originalPacket.headerExtensions,
originalPacket.payload,
paddingBytes
};
const rtpSent = rtpSendStream.sendRtp(packetInit, {sendTime: sendTime});
(async () => {
pacer.handleSent(await rtpSent);
})();
}
}
while (true) {
const [rtpSender, originalPacket, paddingBytes, sendTime] = await pacer.dequeue(); // Custom
// Create an RTCRtpPacketInit instance with the desired padding.
const packetInit = {
originalPacket.marker,
originalPacket.payloadType,
originalPacket.timestamp,
originalPacket.csrcs,
originalPacket.headerExtensions,
originalPacket.payload,
paddingBytes
};
const rtpSent = rtpSender.sendRtp(packetInit, {sendTime: sendTime});
(async () => {
pacer.handleSent(await rtpSent);
})();
}
```

## Example 3: Batched pacing
Expand All @@ -81,19 +95,25 @@ at a controlled frequency.

```javascript
const [pc, rtpTransport] = setupPeerConnectionWithRtpTransport(); // Custom
const pacer = createPacer(); // Custom
rtpTransport.customPacer = true;
rtpTransport.createProcessor(new Worker("worker.js"));

async function pacePacketBatch() {
rtpTransport.onpacketizedrtpavailable = undefined;
// worker.js
onrtcrtptransportprocessor = (e) => {
runPacing(e.processor);
};

async function runPacing(rtpTransportProcessor) {
const pacer = createPacer(); // Custom

rtpTransportProcessor.onpacketizedrtpavailable = undefined;
while(true) {
let pendingPackets = rtpTransport.readPacketizedRtp(100);
let pendingPackets = rtpTransportProcessor.readPacketizedRtp(100);
if (pendingPackets.size() == 0) {
// No packets available synchronously. Wait for the next available packet.
rtpTransport.onpacketizedrtpavailable = pacePacketBatch;
rtpTransportProcessor.onpacketizedrtpavailable = pacePacketBatch;
return;
}
for (const rtpPacket in rtpTransport.readPacketizedRtp(100)) {
for (const rtpPacket in rtpTransportProcessor.readPacketizedRtp(100)) {
pacer.enqueue(rtpPacket);
}
// Wait 20ms before processing more packets.
Expand All @@ -106,33 +126,42 @@ async function pacePacketBatch() {

```javascript
const [pc, rtpTransport] = setupPeerConnectionWithRtpTransport(); // Custom
rtpTransport.createProcessor(new Worker("worker.js"));

// worker.js
onrtcrtptransportprocessor = (e) => {
runPacing(e.processor);
};

const estimator = createBandwidthEstimator(); // Custom

// Every 100ms, notify the estimator of all RTP packets sent.
setInterval(() => {
// Read all synchronously available rtpSents in batches of 100.
while(true) {
let sentRtps = rtpTransport.readSentRtp(100);
if (sentRtps.length == 0) {
break;
async function runPacing(rtpTransportProcessor) {
// Every 100ms, notify the estimator of all RTP packets sent.
setInterval(() => {
// Read all synchronously available rtpSents in batches of 100.
while(true) {
let sentRtps = rtpTransportProcessor.readSentRtp(100);
if (sentRtps.length == 0) {
break;
}
sentRtps.forEach((sentRtp) => estimator.rememberRtpSent(sentRtp));
}
sentRtps.forEach((sentRtp) => estimator.rememberRtpSent(sentRtp));
}
}, 100);

// Every 100ms, notify the estimator of all RTP acks received.
setInterval(() => {
// Read all synchronously available RtpAcks in batches of 100.
while(true) {
let rtpAcks = rtpTransport.readReceivedRtpAcks(100);
if (rtpAcks.length == 0) {
break;
}, 100);

// Every 100ms, notify the estimator of all RTP acks received.
setInterval(() => {
// Read all synchronously available RtpAcks in batches of 100.
while(true) {
let rtpAcks = rtpTransportProcessor.readReceivedRtpAcks(100);
if (rtpAcks.length == 0) {
break;
}
rtpAcks.forEach((ack) => estimator.processReceivedAcks(ack));
}
rtpAcks.forEach((ack) => estimator.processReceivedAcks(ack));
}
// Update bitrate estimations now that estimator is up to date.
doBitrateAllocationAndUpdateEncoders(estimator); // Custom
}, 100);
// Update bitrate estimations now that estimator is up to date.
doBitrateAllocationAndUpdateEncoders(estimator); // Custom
}, 100);
}
```

## Alternative designs considered
Expand Down
Loading