Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly handle broadcast capture state #551

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions Sources/LiveKit/Broadcast/BroadcastExtensionState.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2025 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#if os(iOS)

import Combine
import Foundation

#if canImport(ReplayKit)
import ReplayKit
#endif

class BroadcastExtensionState {
/// A publisher that emits a Boolean value indicating whether or not the extension is currently broadcasting.
static var isBroadcasting: some Publisher<Bool, Never> {
Publishers.Merge(
DarwinNotificationCenter.shared.publisher(for: .broadcastStarted).map { _ in true },
DarwinNotificationCenter.shared.publisher(for: .broadcastStopped).map { _ in false }
)
.eraseToAnyPublisher()
}

/// Displays the system broadcast picker, allowing the user to start the broadcast.
/// - Note: This is merely a request and does not guarantee the user will choose to start the broadcast.
static func requestActivation() async {
await RPSystemBroadcastPickerView.show(
for: BroadcastScreenCapturer.screenSharingExtension,
showsMicrophoneButton: false
)
}
}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

import Combine
import Foundation

enum DarwinNotification: String {
Expand All @@ -23,7 +24,6 @@ enum DarwinNotification: String {

final class DarwinNotificationCenter: @unchecked Sendable {
public static let shared = DarwinNotificationCenter()

private let notificationCenter = CFNotificationCenterGetDarwinNotifyCenter()

func postNotification(_ name: DarwinNotification) {
Expand All @@ -34,3 +34,99 @@ final class DarwinNotificationCenter: @unchecked Sendable {
true)
}
}

extension DarwinNotificationCenter {
/// Returns a publisher that emits events when broadcasting notifications matching the given name.
func publisher(for name: DarwinNotification) -> Publisher {
Publisher(notificationCenter, name)
}

/// A publisher that emits notifications.
struct Publisher: Combine.Publisher {
typealias Output = DarwinNotification
typealias Failure = Never

private let name: DarwinNotification
private let center: CFNotificationCenter?

fileprivate init(_ center: CFNotificationCenter?, _ name: DarwinNotification) {
self.name = name
self.center = center
}

func receive<S>(
subscriber: S
) where S: Subscriber, Never == S.Failure, DarwinNotification == S.Input {
subscriber.receive(subscription: Subscription(subscriber, center, name))
}
}

private class SubscriptionBase {
let name: DarwinNotification
let center: CFNotificationCenter?

init(_ center: CFNotificationCenter?, _ name: DarwinNotification) {
self.name = name
self.center = center
}

static var callback: CFNotificationCallback = { _, observer, _, _, _ in
guard let observer else { return }
Unmanaged<SubscriptionBase>
.fromOpaque(observer)
.takeUnretainedValue()
.notifySubscriber()
}

func notifySubscriber() {
// Overridden by generic subclass to call specific subscriber's
// receive method. This allows forming a C function pointer to the callback.
}
}

private class Subscription<S: Subscriber>: SubscriptionBase, Combine.Subscription where S.Input == DarwinNotification, S.Failure == Never {
private var subscriber: S?

init(_ subscriber: S, _ center: CFNotificationCenter?, _ name: DarwinNotification) {
self.subscriber = subscriber
super.init(center, name)
addObserver()
}

func request(_: Subscribers.Demand) {}

private var opaqueSelf: UnsafeRawPointer {
UnsafeRawPointer(Unmanaged.passUnretained(self).toOpaque())
}

private func addObserver() {
CFNotificationCenterAddObserver(center,
opaqueSelf,
Self.callback,
name.rawValue as CFString,
nil,
.deliverImmediately)
}

private func removeObserver() {
guard subscriber != nil else { return }
CFNotificationCenterRemoveObserver(center,
opaqueSelf,
CFNotificationName(name.rawValue as CFString),
nil)
subscriber = nil
}

override func notifySubscriber() {
_ = subscriber?.receive(name)
}

func cancel() {
removeObserver()
}

deinit {
removeObserver()
}
}
}
67 changes: 59 additions & 8 deletions Sources/LiveKit/Participant/LocalParticipant.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@
* limitations under the License.
*/

import Combine
import Foundation

#if canImport(ReplayKit)
import ReplayKit
#endif

#if swift(>=5.9)
internal import LiveKitWebRTC
#else
Expand Down Expand Up @@ -229,6 +226,59 @@ public class LocalParticipant: Participant {

return didUpdate
}

// MARK: - Broadcast Activation

#if os(iOS)

/// An optional hook called just after a broadcast starts.
///
/// Returns a Boolean value indicating weather or not the broadcast should be
/// published as a screen share track. This could be useful to present a confirmation dialog,
/// returning `true` or `false` based on the user's response .
///
public var broadcastStarted: (() async -> Bool)?

private var isBroadcasting = false {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this or something like it needs to be publicly gettable, because developers may have UI that hinges on this state, in the case that they aren't starting publishing right away

Copy link
Contributor Author

@ladvoc ladvoc Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay if we expose a Combine AnyPublisher<Bool, Never> in the public API? If so, I could see making the isBroadcasting property a CurrentValueSubject which is type erased into AnyPublisher. This would allow both getting the current value and listening for changes. We could then replace the broadcastStarted closure with a Boolean publishOnBroadcast property which allows the developer to disable the default behavior to manually control track publication as per your suggestion. The public interface would then look something like this:

public var publishOnBroadcast: Bool
public var isBroadcasting: AnyPublisher<Bool, Never>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's an interesting idea and i'd defer to @hiroshihorie but my instinct is that we should consider introducing new patterns carefully and deliberately. i'm not sure it's worth using something novel here that is unlike other things in our SDK, especially because I'm not sure there's a really strong reason we'd need to besides that it might be a little nicer.

we use a delegate pattern everywhere else. maybe good to make a new BroadcastDelegate that has this and the onStarted method in it

Copy link
Member

@hiroshihorie hiroshihorie Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @bcherry is right, I'm open to adding a AnyPublisher in addition to current design of reading value synchronously and notifying with delegate perhaps.

We eventually need to modernize the SDK and I think we can start here incrementally.

Copy link
Contributor Author

@ladvoc ladvoc Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hiroshihorie, just to make sure I’m following you, are you thinking the public interface would look something like this then?

public var isBroadcasting: Bool
public var isBroadcastingPublisher: AnyPublisher<Bool, Never>

This would be in addition to the delegate methods.

didSet { broadcastStateChanged() }
}

private var cancellable = Set<AnyCancellable>()

override init(room: Room, sid: Participant.Sid? = nil, identity: Participant.Identity? = nil) {
super.init(room: room, sid: sid, identity: identity)

BroadcastExtensionState
.isBroadcasting
.sink { [weak self] in
guard let self else { return }
self.isBroadcasting = $0
}
.store(in: &cancellable)
}

private func broadcastStateChanged() {
guard isBroadcasting else {
logger.debug("Broadcast stopped")
return
}
logger.debug("Broadcast started")

Task { [weak self] in
guard let self else { return }
let shouldPublish = await self.broadcastStarted?() ?? true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think that method could have a clearer name, such as shouldPublishBroadcast, to make it more obvious how it fits in

although, thinking more about it, perhaps it would be clearest if the method were simply called onBroadcastStart. if the developer implements it, then they're given complete control over what to do next. otherwise the default implementation is to call try await self.setScreenShare(enabled: true) if they're connected and to cancel the broadcast if they're not.

that way we don't need to let this task hang indefinitely if the developer has more complex needs (such as maybe the user is running broadcast in a "preview" mode not meant to be published right away)

guard shouldPublish else {
logger.debug("Will not publish screen share track")
return
}
do {
try await self.setScreenShare(enabled: true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if you're not already actively connected to a room?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setScreenShare(enabled:) calls down to set(source:enabled:captureOptions:publishOptions:) which in turn calls requireRoom(), performing a check to ensure the room isn't nil. I will need to perform some more examination to see if additional checks are necessary.

} catch {
logger.error("Failed to enable screen share: \(error)")
}
}
}
#endif
}

// MARK: - Session Migration
Expand Down Expand Up @@ -339,10 +389,11 @@ public extension LocalParticipant {
let localTrack: LocalVideoTrack
let options = (captureOptions as? ScreenShareCaptureOptions) ?? room._state.roomOptions.defaultScreenShareCaptureOptions
if options.useBroadcastExtension {
await RPSystemBroadcastPickerView.show(
for: BroadcastScreenCapturer.screenSharingExtension,
showsMicrophoneButton: false
)
guard self.isBroadcasting else {
await BroadcastExtensionState.requestActivation()
return nil
}
// Wait until broadcasting to publish track
localTrack = LocalVideoTrack.createBroadcastScreenCapturerTrack(options: options)
} else {
localTrack = LocalVideoTrack.createInAppScreenShareTrack(options: options)
Expand Down
47 changes: 47 additions & 0 deletions Tests/LiveKitTests/DarwinNotificationCenterTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2025 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Combine
@testable import LiveKit
import XCTest

class DarwinNotificationCenterTests: XCTestCase {
func testPublisher() throws {
let receiveFirst = XCTestExpectation(description: "Receive from 1st subscriber")
let receiveSecond = XCTestExpectation(description: "Receive from 2nd subscriber")

let name = DarwinNotification.broadcastStarted

var cancellable = Set<AnyCancellable>()
DarwinNotificationCenter.shared
.publisher(for: name)
.sink {
XCTAssertEqual($0, name)
receiveFirst.fulfill()
}
.store(in: &cancellable)
DarwinNotificationCenter.shared
.publisher(for: name)
.sink {
XCTAssertEqual($0, name)
receiveSecond.fulfill()
}
.store(in: &cancellable)

DarwinNotificationCenter.shared.postNotification(name)
wait(for: [receiveFirst, receiveSecond], timeout: 10.0)
}
}
Loading