From 363f016fc530cbfe0290d8f34401770ca31322df Mon Sep 17 00:00:00 2001 From: Kyle Scheuing Date: Mon, 30 Dec 2024 02:24:16 -0500 Subject: [PATCH] fix(lib): with bluer backend, use RFCOMM instead of BLE --- lib/Cargo.toml | 5 +- lib/src/api/connection/connection.rs | 1 + lib/src/device_utils.rs | 2 + .../connection/bluer/connection.rs | 227 +++++++++--------- .../connection/bluer/connection_registry.rs | 135 ++++++----- 5 files changed, 196 insertions(+), 174 deletions(-) diff --git a/lib/Cargo.toml b/lib/Cargo.toml index dbd62c6e..9ba4efd7 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -33,7 +33,10 @@ float-cmp = { workspace = true, features = ["std"], default-features = false } static_assertions = { workspace = true } [target.'cfg(target_os = "linux")'.dependencies] -bluer = { workspace = true, optional = true, features = ["bluetoothd"] } +bluer = { workspace = true, optional = true, features = [ + "bluetoothd", + "rfcomm", +] } tokio = { workspace = true, features = ["rt-multi-thread"] } [target.'cfg(any(target_os = "macos"))'.dependencies] diff --git a/lib/src/api/connection/connection.rs b/lib/src/api/connection/connection.rs index a82529d3..f816864d 100644 --- a/lib/src/api/connection/connection.rs +++ b/lib/src/api/connection/connection.rs @@ -11,5 +11,6 @@ pub trait Connection { async fn write_with_response(&self, data: &[u8]) -> crate::Result<()>; async fn write_without_response(&self, data: &[u8]) -> crate::Result<()>; async fn inbound_packets_channel(&self) -> crate::Result>>; + // TODO remove in v2 fn service_uuid(&self) -> Uuid; } diff --git a/lib/src/device_utils.rs b/lib/src/device_utils.rs index e5bdcc9a..0d47f1c6 100644 --- a/lib/src/device_utils.rs +++ b/lib/src/device_utils.rs @@ -2,6 +2,8 @@ use macaddr::MacAddr6; use static_assertions::const_assert; use uuid::Uuid; +pub const RFCOMM_UUID: Uuid = uuid::uuid!("00001101-0000-1000-8000-00805f9b34fb"); + // The devices have the same UUID except the first two bytes. I assume one device was chosen with an initial value, // and then the first two bytes increment by one for each device going from there. Unsure of the initial value or // the number of devices in existence. diff --git a/lib/src/soundcore_device/connection/bluer/connection.rs b/lib/src/soundcore_device/connection/bluer/connection.rs index f1028580..aa9de736 100644 --- a/lib/src/soundcore_device/connection/bluer/connection.rs +++ b/lib/src/soundcore_device/connection/bluer/connection.rs @@ -1,75 +1,92 @@ use std::{sync::Arc, time::Duration}; use bluer::{ - gatt::remote::{Characteristic, Service}, + gatt::remote::Service, + rfcomm::{ + stream::{OwnedReadHalf, OwnedWriteHalf}, + Stream, + }, Device, DeviceEvent, DeviceProperty, }; -use futures::{pin_mut, StreamExt}; +use futures::StreamExt; use macaddr::MacAddr6; use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, runtime::Handle, select, sync::{ mpsc::{self, error::TrySendError}, - watch, Semaphore, + watch, Mutex, Semaphore, }, task::JoinHandle, }; -use tracing::{debug, debug_span, info_span, trace, trace_span, warn, Instrument}; +use tracing::{debug, info_span, trace, trace_span, warn, Instrument}; use uuid::Uuid; use crate::{ api::connection::{Connection, ConnectionStatus}, - device_utils::{self, READ_CHARACTERISTIC_UUID, SERVICE_UUID, WRITE_CHARACTERISTIC_UUID}, + device_utils::{self, SERVICE_UUID}, }; #[derive(Debug)] pub struct BluerConnection { handle: Handle, device: Device, - write_characteristic: Characteristic, - read_characteristic: Characteristic, + write_stream: Arc>, + read_stream: Mutex>>>, + service_uuid: Uuid, connection_status_receiver: watch::Receiver, connection_status_handle: JoinHandle<()>, - service_uuid: Uuid, quit: Arc, } impl BluerConnection { - pub async fn new(device: Device, handle: Handle) -> crate::Result { + pub async fn new(device: Device, stream: Stream, handle: Handle) -> crate::Result { handle .clone() - .spawn(async move { - device.connect().await?; - let service = Self::get_service_with_retry(&device, Duration::from_secs(5)).await?; - let service_uuid = service.uuid().await?; - - let [read_characteristic, write_characteristic] = Self::get_characteristics( - &service, - [READ_CHARACTERISTIC_UUID, WRITE_CHARACTERISTIC_UUID], - ) - .await?; - - let (connection_status_receiver, connection_status_handle) = - Self::spawn_connection_status(&handle, device.to_owned()).await?; + .spawn( + async move { + let (connection_status_receiver, connection_status_handle) = + Self::spawn_connection_status(&handle, device.to_owned()).await?; + let (read_stream, write_stream) = stream.into_split(); + let quit = Arc::new(Semaphore::new(0)); + let service_uuid = Self::get_service_uuid_with_retry(&device).await?; - let connection = BluerConnection { - device, - service_uuid, - write_characteristic, - read_characteristic, - connection_status_receiver, - connection_status_handle, - handle, - quit: Arc::new(Semaphore::new(0)), - }; - Ok(connection) - }) - .instrument(info_span!("BluerConnection::new")) + let connection = BluerConnection { + device, + service_uuid, + write_stream: Arc::new(Mutex::new(write_stream)), + read_stream: Mutex::new(Some( + Self::spawn_inbound_packet_channel( + read_stream, + handle.to_owned(), + quit.to_owned(), + ) + .await?, + )), + connection_status_receiver, + connection_status_handle, + handle, + quit, + }; + Ok(connection) + } + .instrument(info_span!("BluerConnection::new")), + ) .await .unwrap() } + async fn get_service_uuid_with_retry(device: &Device) -> crate::Result { + match Self::get_service_with_retry(&device, Duration::from_secs(2)).await { + Ok(service) => service.uuid().await.map_err(Into::into), + Err(err) => { + debug!("GATT service not found, but that's okay since we're using RFCOMM: {err:?}"); + Ok(Uuid::nil()) + } + } + } + #[tracing::instrument] async fn get_service_with_retry(device: &Device, timeout: Duration) -> crate::Result { let service_found = device.events().await?.any(|event| async move { @@ -113,32 +130,6 @@ impl BluerConnection { }) } - async fn get_characteristics( - service: &Service, - uuids: [Uuid; SIZE], - ) -> crate::Result<[Characteristic; SIZE]> { - let mut characteristics: [Option; SIZE] = [const { None }; SIZE]; - for characteristic in service.characteristics().await? { - let uuid = characteristic.uuid().await?; - if let Some(index) = uuids.iter().position(|u| *u == uuid) { - characteristics[index] = Some(characteristic); - } - } - - characteristics - .iter() - .enumerate() - .find(|(_, c)| c.is_none()) - .map(|(i, _)| { - Err(crate::Error::CharacteristicNotFound { - uuid: uuids[i], - source: None, - }) as Result<(), crate::Error> - }) - .transpose()?; - Ok(characteristics.map(|v| v.expect("we already made sure every element is some"))) - } - async fn spawn_connection_status( handle: &Handle, device: Device, @@ -169,6 +160,53 @@ impl BluerConnection { Ok((connection_status_receiver, connection_status_handle)) } + + async fn spawn_inbound_packet_channel( + mut read_stream: OwnedReadHalf, + handle: Handle, + quit: Arc, + ) -> crate::Result>> { + // This queue should always be really small unless something is malfunctioning + let (sender, receiver) = mpsc::channel(100); + handle.spawn( + async move { + let mut buffer: Vec = vec![0; 1024]; + loop { + select! { + _ = quit.acquire() => { + break; + } + // Does this read one packet at a time? + result = read_stream.read(&mut buffer) => { + match result { + Ok(bytes_read) => { + let bytes = &buffer[0..bytes_read]; + trace!(event = "rfcomm read", ?bytes); + if bytes_read > 0 { + if let Err(err) = sender.try_send(bytes.to_vec()) { + if let TrySendError::Closed(_) = err { + break; + } + warn!("error forwarding packet to channel: {err}",) + } + } + } + Err(err) => { + debug!("read failed: {err:?}"); + break; + }, + } + }, + } + } + } + .instrument(trace_span!( + "bluer_connection inbound_packets_channel reader" + )), + ); + + Ok(receiver) + } } impl Connection for BluerConnection { @@ -176,7 +214,7 @@ impl Connection for BluerConnection { let device = self.device.to_owned(); self.handle .spawn(async move { - match device.name().await.unwrap() { + match device.name().await? { Some(name) => Ok(name), None => Err(crate::Error::NameNotFound { mac_address: device.address().to_string(), @@ -204,64 +242,23 @@ impl Connection for BluerConnection { } async fn write_without_response(&self, data: &[u8]) -> crate::Result<()> { - let data = data.to_owned(); - let write_characteristic = self.write_characteristic.to_owned(); - self.handle - .spawn( - async move { write_characteristic.write(&data).await } - .instrument(debug_span!("BluerConnection::write_without_response")), - ) + self.write_stream + .lock() .await - .unwrap() - .map_err(Into::into) + .write_all(data) + .await + .map_err(|err| crate::Error::WriteFailed { + source: Box::new(err), + }) } async fn inbound_packets_channel(&self) -> crate::Result>> { - // This queue should always be really small unless something is malfunctioning - let (sender, receiver) = mpsc::channel(100); - - let read_characteristic = self.read_characteristic.to_owned(); - let notify = self - .handle - .spawn(async move { read_characteristic.notify().await }) + Ok(self + .read_stream + .lock() .await - .unwrap()?; - let quit = self.quit.to_owned(); - self.handle.spawn( - async move { - pin_mut!(notify); - - loop { - select! { - _ = quit.acquire() => { - break; - } - result = notify.next() => { - match result { - Some(data) => { - trace!(event = "bluer notification", ?data); - if let Err(err) = sender.try_send(data) { - if let TrySendError::Closed(_) = err { - break; - } - warn!("error forwarding packet to channel: {err}",) - } - } - None => { - debug!("notify channel ended"); - break; - }, - } - }, - } - } - } - .instrument(trace_span!( - "bluer_connection inbound_packets_channel reader" - )), - ); - - Ok(receiver) + .take() + .expect("inbound_packets_channel should only be called once")) } } diff --git a/lib/src/soundcore_device/connection/bluer/connection_registry.rs b/lib/src/soundcore_device/connection/bluer/connection_registry.rs index 62a7b6ea..0338cc18 100644 --- a/lib/src/soundcore_device/connection/bluer/connection_registry.rs +++ b/lib/src/soundcore_device/connection/bluer/connection_registry.rs @@ -2,10 +2,14 @@ use std::collections::HashSet; use std::sync::{Arc, Weak}; use std::time::Duration; -use bluer::{Adapter, Address, DiscoveryFilter, DiscoveryTransport, Session}; +use bluer::rfcomm::{Profile, ProfileHandle, ReqError}; +use bluer::{Adapter, Address, Session}; use futures::StreamExt; use macaddr::MacAddr6; +use tokio::select; use tokio::sync::Mutex; +use tokio::time::sleep; +use tracing::{debug, debug_span, warn, warn_span, Instrument}; use weak_table::weak_value_hash_map::Entry; use weak_table::WeakValueHashMap; @@ -18,74 +22,56 @@ use super::RuntimeOrHandle; pub struct BluerConnectionRegistry { runtime: RuntimeOrHandle, session: Session, + rfcomm_handle: Arc>, connections: Mutex>>, } impl BluerConnectionRegistry { pub async fn new(runtime: RuntimeOrHandle) -> crate::Result { + let session = runtime + .spawn(async move { Session::new().await }) + .await + .unwrap()?; + let rfcomm_handle = session + .register_profile(Profile { + uuid: device_utils::RFCOMM_UUID, + ..Default::default() + }) + .await?; Ok(Self { - session: runtime - .spawn(async move { Session::new().await }) - .await - .unwrap()?, + session, runtime, + rfcomm_handle: Arc::new(Mutex::new(rfcomm_handle)), connections: Mutex::new(WeakValueHashMap::new()), }) } - #[tracing::instrument(skip(self))] async fn all_connected(&self) -> crate::Result> { let session = self.session.to_owned(); self.runtime - .spawn(async move { - let adapter = session.default_adapter().await?; - tracing::debug!("starting scan on adapter {}", adapter.name()); - let device_addresses = Self::ble_scan(&adapter).await?; - tracing::debug!("discovered {} devices", device_addresses.len()); - let mut descriptors = HashSet::new(); - for address in device_addresses { - if let Some(descriptor) = Self::address_to_descriptor(&adapter, address).await? - { - descriptors.insert(descriptor); + .spawn( + async move { + let adapter = session.default_adapter().await?; + let device_addresses = adapter.device_addresses().await?; + let mut descriptors = HashSet::new(); + for address in device_addresses { + if device_utils::is_mac_address_soundcore_device(address.into()) { + if let Some(descriptor) = + Self::address_to_descriptor(&adapter, address).await? + { + descriptors.insert(descriptor); + } + } } + tracing::debug!("filtered down to {} descriptors", descriptors.len()); + Ok(descriptors) } - tracing::debug!("filtered down to {} descriptors", descriptors.len()); - Ok(descriptors) - }) + .instrument(debug_span!("BluerConnectionRegistry::all_connected")), + ) .await .unwrap() } - /// Scans for connected BLE devices and attempt to filter out devices without the service UUID. - /// Not guaranteed to filter out all devices if another process is scanning at the same time. - async fn ble_scan(adapter: &Adapter) -> crate::Result> { - adapter - .set_discovery_filter(DiscoveryFilter { - transport: DiscoveryTransport::Le, - ..Default::default() - }) - .await?; - - let discover = adapter.discover_devices().await?; - - let device_addresses = discover - .take_until(tokio::time::sleep(Duration::from_secs(1))) - .filter_map(|event| async move { - match event { - bluer::AdapterEvent::DeviceAdded(address) - if device_utils::is_mac_address_soundcore_device(address.into()) => - { - Some(address) - } - _ => None, - } - }) - .collect::>() - .await; - - Ok(device_addresses) - } - /// Filters out devices that are not connected and returns descriptors async fn address_to_descriptor( adapter: &Adapter, @@ -108,17 +94,50 @@ impl BluerConnectionRegistry { ) -> crate::Result> { let handle = self.runtime.handle(); let session = self.session.to_owned(); + let rfcomm_handle_lock = self.rfcomm_handle.to_owned(); self.runtime - .spawn(async move { - let adapter = session.default_adapter().await?; - match adapter.device(mac_address.into_array().into()) { - Ok(device) => Ok(Some(BluerConnection::new(device, handle).await?)), - Err(err) => match err.kind { - bluer::ErrorKind::NotFound => Ok(None), - _ => Err(crate::Error::from(err)), - }, + .spawn( + async move { + let mut rfcomm_handle = rfcomm_handle_lock.lock().await; + let adapter = session.default_adapter().await?; + let device = match adapter.device(mac_address.into_array().into()) { + Ok(device) => device, + Err(err) => { + return match err.kind { + bluer::ErrorKind::NotFound => Ok(None), + _ => Err(crate::Error::from(err)), + } + } + }; + debug!("connecting"); + let stream = loop { + select! { + res = async { + let _ = device.connect().await; + device.connect_profile(&device_utils::RFCOMM_UUID).await + } => { + if let Err(err)=res{ + warn!("connect profile failed: {err:?}") + } + sleep(Duration::from_secs(3)).await; + } + req = rfcomm_handle.next() => { + let req = req.unwrap(); + if req.device() == device.address() { + debug!("accepting request from {}", req.device()); + break req.accept()?; + } else { + debug!("rejecting request from {}", req.device()); + req.reject(ReqError::Rejected); + } + } + } + }; + debug!("connected"); + BluerConnection::new(device, stream, handle).await.map(Some) } - }) + .instrument(warn_span!("BluerConnectionRegistry::new_connection")), + ) .await .unwrap() }