Skip to content

Commit

Permalink
fix(lib): with bluer backend, use RFCOMM instead of BLE
Browse files Browse the repository at this point in the history
  • Loading branch information
Oppzippy committed Dec 30, 2024
1 parent f8de30f commit 363f016
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 174 deletions.
5 changes: 4 additions & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ float-cmp = { workspace = true, features = ["std"], default-features = false }
static_assertions = { workspace = true }

[target.'cfg(target_os = "linux")'.dependencies]
bluer = { workspace = true, optional = true, features = ["bluetoothd"] }
bluer = { workspace = true, optional = true, features = [
"bluetoothd",
"rfcomm",
] }
tokio = { workspace = true, features = ["rt-multi-thread"] }

[target.'cfg(any(target_os = "macos"))'.dependencies]
Expand Down
1 change: 1 addition & 0 deletions lib/src/api/connection/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ pub trait Connection {
async fn write_with_response(&self, data: &[u8]) -> crate::Result<()>;
async fn write_without_response(&self, data: &[u8]) -> crate::Result<()>;
async fn inbound_packets_channel(&self) -> crate::Result<mpsc::Receiver<Vec<u8>>>;
// TODO remove in v2
fn service_uuid(&self) -> Uuid;
}
2 changes: 2 additions & 0 deletions lib/src/device_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use macaddr::MacAddr6;
use static_assertions::const_assert;
use uuid::Uuid;

pub const RFCOMM_UUID: Uuid = uuid::uuid!("00001101-0000-1000-8000-00805f9b34fb");

// The devices have the same UUID except the first two bytes. I assume one device was chosen with an initial value,
// and then the first two bytes increment by one for each device going from there. Unsure of the initial value or
// the number of devices in existence.
Expand Down
227 changes: 112 additions & 115 deletions lib/src/soundcore_device/connection/bluer/connection.rs
Original file line number Diff line number Diff line change
@@ -1,75 +1,92 @@
use std::{sync::Arc, time::Duration};

use bluer::{
gatt::remote::{Characteristic, Service},
gatt::remote::Service,
rfcomm::{
stream::{OwnedReadHalf, OwnedWriteHalf},
Stream,
},
Device, DeviceEvent, DeviceProperty,
};
use futures::{pin_mut, StreamExt};
use futures::StreamExt;
use macaddr::MacAddr6;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
runtime::Handle,
select,
sync::{
mpsc::{self, error::TrySendError},
watch, Semaphore,
watch, Mutex, Semaphore,
},
task::JoinHandle,
};
use tracing::{debug, debug_span, info_span, trace, trace_span, warn, Instrument};
use tracing::{debug, info_span, trace, trace_span, warn, Instrument};
use uuid::Uuid;

use crate::{
api::connection::{Connection, ConnectionStatus},
device_utils::{self, READ_CHARACTERISTIC_UUID, SERVICE_UUID, WRITE_CHARACTERISTIC_UUID},
device_utils::{self, SERVICE_UUID},
};

#[derive(Debug)]
pub struct BluerConnection {
handle: Handle,
device: Device,
write_characteristic: Characteristic,
read_characteristic: Characteristic,
write_stream: Arc<Mutex<OwnedWriteHalf>>,
read_stream: Mutex<Option<mpsc::Receiver<Vec<u8>>>>,
service_uuid: Uuid,
connection_status_receiver: watch::Receiver<ConnectionStatus>,
connection_status_handle: JoinHandle<()>,
service_uuid: Uuid,
quit: Arc<Semaphore>,
}

impl BluerConnection {
pub async fn new(device: Device, handle: Handle) -> crate::Result<Self> {
pub async fn new(device: Device, stream: Stream, handle: Handle) -> crate::Result<Self> {
handle
.clone()
.spawn(async move {
device.connect().await?;
let service = Self::get_service_with_retry(&device, Duration::from_secs(5)).await?;
let service_uuid = service.uuid().await?;

let [read_characteristic, write_characteristic] = Self::get_characteristics(
&service,
[READ_CHARACTERISTIC_UUID, WRITE_CHARACTERISTIC_UUID],
)
.await?;

let (connection_status_receiver, connection_status_handle) =
Self::spawn_connection_status(&handle, device.to_owned()).await?;
.spawn(
async move {
let (connection_status_receiver, connection_status_handle) =
Self::spawn_connection_status(&handle, device.to_owned()).await?;
let (read_stream, write_stream) = stream.into_split();
let quit = Arc::new(Semaphore::new(0));
let service_uuid = Self::get_service_uuid_with_retry(&device).await?;

let connection = BluerConnection {
device,
service_uuid,
write_characteristic,
read_characteristic,
connection_status_receiver,
connection_status_handle,
handle,
quit: Arc::new(Semaphore::new(0)),
};
Ok(connection)
})
.instrument(info_span!("BluerConnection::new"))
let connection = BluerConnection {
device,
service_uuid,
write_stream: Arc::new(Mutex::new(write_stream)),
read_stream: Mutex::new(Some(
Self::spawn_inbound_packet_channel(
read_stream,
handle.to_owned(),
quit.to_owned(),
)
.await?,
)),
connection_status_receiver,
connection_status_handle,
handle,
quit,
};
Ok(connection)
}
.instrument(info_span!("BluerConnection::new")),
)
.await
.unwrap()
}

async fn get_service_uuid_with_retry(device: &Device) -> crate::Result<Uuid> {
match Self::get_service_with_retry(&device, Duration::from_secs(2)).await {
Ok(service) => service.uuid().await.map_err(Into::into),
Err(err) => {
debug!("GATT service not found, but that's okay since we're using RFCOMM: {err:?}");
Ok(Uuid::nil())
}
}
}

#[tracing::instrument]
async fn get_service_with_retry(device: &Device, timeout: Duration) -> crate::Result<Service> {
let service_found = device.events().await?.any(|event| async move {
Expand Down Expand Up @@ -113,32 +130,6 @@ impl BluerConnection {
})
}

async fn get_characteristics<const SIZE: usize>(
service: &Service,
uuids: [Uuid; SIZE],
) -> crate::Result<[Characteristic; SIZE]> {
let mut characteristics: [Option<Characteristic>; SIZE] = [const { None }; SIZE];
for characteristic in service.characteristics().await? {
let uuid = characteristic.uuid().await?;
if let Some(index) = uuids.iter().position(|u| *u == uuid) {
characteristics[index] = Some(characteristic);
}
}

characteristics
.iter()
.enumerate()
.find(|(_, c)| c.is_none())
.map(|(i, _)| {
Err(crate::Error::CharacteristicNotFound {
uuid: uuids[i],
source: None,
}) as Result<(), crate::Error>
})
.transpose()?;
Ok(characteristics.map(|v| v.expect("we already made sure every element is some")))
}

async fn spawn_connection_status(
handle: &Handle,
device: Device,
Expand Down Expand Up @@ -169,14 +160,61 @@ impl BluerConnection {

Ok((connection_status_receiver, connection_status_handle))
}

async fn spawn_inbound_packet_channel(
mut read_stream: OwnedReadHalf,
handle: Handle,
quit: Arc<Semaphore>,
) -> crate::Result<mpsc::Receiver<Vec<u8>>> {
// This queue should always be really small unless something is malfunctioning
let (sender, receiver) = mpsc::channel(100);
handle.spawn(
async move {
let mut buffer: Vec<u8> = vec![0; 1024];
loop {
select! {
_ = quit.acquire() => {
break;
}
// Does this read one packet at a time?
result = read_stream.read(&mut buffer) => {
match result {
Ok(bytes_read) => {
let bytes = &buffer[0..bytes_read];
trace!(event = "rfcomm read", ?bytes);
if bytes_read > 0 {
if let Err(err) = sender.try_send(bytes.to_vec()) {
if let TrySendError::Closed(_) = err {
break;
}
warn!("error forwarding packet to channel: {err}",)
}
}
}
Err(err) => {
debug!("read failed: {err:?}");
break;
},
}
},
}
}
}
.instrument(trace_span!(
"bluer_connection inbound_packets_channel reader"
)),
);

Ok(receiver)
}
}

impl Connection for BluerConnection {
async fn name(&self) -> crate::Result<String> {
let device = self.device.to_owned();
self.handle
.spawn(async move {
match device.name().await.unwrap() {
match device.name().await? {
Some(name) => Ok(name),
None => Err(crate::Error::NameNotFound {
mac_address: device.address().to_string(),
Expand Down Expand Up @@ -204,64 +242,23 @@ impl Connection for BluerConnection {
}

async fn write_without_response(&self, data: &[u8]) -> crate::Result<()> {
let data = data.to_owned();
let write_characteristic = self.write_characteristic.to_owned();
self.handle
.spawn(
async move { write_characteristic.write(&data).await }
.instrument(debug_span!("BluerConnection::write_without_response")),
)
self.write_stream
.lock()
.await
.unwrap()
.map_err(Into::into)
.write_all(data)
.await
.map_err(|err| crate::Error::WriteFailed {
source: Box::new(err),
})
}

async fn inbound_packets_channel(&self) -> crate::Result<mpsc::Receiver<Vec<u8>>> {
// This queue should always be really small unless something is malfunctioning
let (sender, receiver) = mpsc::channel(100);

let read_characteristic = self.read_characteristic.to_owned();
let notify = self
.handle
.spawn(async move { read_characteristic.notify().await })
Ok(self
.read_stream
.lock()
.await
.unwrap()?;
let quit = self.quit.to_owned();
self.handle.spawn(
async move {
pin_mut!(notify);

loop {
select! {
_ = quit.acquire() => {
break;
}
result = notify.next() => {
match result {
Some(data) => {
trace!(event = "bluer notification", ?data);
if let Err(err) = sender.try_send(data) {
if let TrySendError::Closed(_) = err {
break;
}
warn!("error forwarding packet to channel: {err}",)
}
}
None => {
debug!("notify channel ended");
break;
},
}
},
}
}
}
.instrument(trace_span!(
"bluer_connection inbound_packets_channel reader"
)),
);

Ok(receiver)
.take()
.expect("inbound_packets_channel should only be called once"))
}
}

Expand Down
Loading

0 comments on commit 363f016

Please sign in to comment.