Skip to content

Commit

Permalink
move update task closures to functions for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
mat-kie committed Dec 8, 2024
1 parent b43183d commit 1686530
Showing 1 changed file with 132 additions and 50 deletions.
182 changes: 132 additions & 50 deletions src/components/bluetooth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use crate::core::events::{AppEvent, MeasurementEvent};
use crate::model::bluetooth::AdapterDescriptor;
use crate::model::bluetooth::{DeviceDescriptor, HeartrateMessage};
use anyhow::{anyhow, Result};
use btleplug::api::Peripheral;

use btleplug::{
api::{BDAddr, Central, Manager as _},
api::{BDAddr, Central, Manager as _, Peripheral},
platform::{Adapter, Manager},
};

Expand Down Expand Up @@ -101,46 +101,73 @@ where
listening: None,
}
}

pub async fn peripheral_listener<T: Peripheral>(
cheststrap: T,
tx: Sender<AppEvent>,
) -> Result<()> {
cheststrap.connect().await?;

cheststrap.discover_services().await?;

let char = cheststrap
.characteristics()
.iter()
.find(|c| c.uuid == HEARTRATE_MEASUREMENT_UUID)
.ok_or(anyhow!("Peripheral has no Heartrate attribute"))?
.clone();

cheststrap.subscribe(&char).await?;

let mut notification_stream = cheststrap.notifications().await?;
while let Some(data) = notification_stream.next().await {
if data.value.len() < 2
|| tx
.send(AppEvent::Measurement(MeasurementEvent::RecordMessage(
HeartrateMessage::new(&data.value),
)))
.is_err()
{
break;

Check warning on line 131 in src/components/bluetooth.rs

View check run for this annotation

Codecov / codecov/patch

src/components/bluetooth.rs#L131

Added line #L131 was not covered by tests
}
}
warn!("BT transceiver terminated");
Err(anyhow!("listener terminated"))
}

pub async fn listen_to_peripheral(
adapter: A,
peripheral_address: BDAddr,
tx: Sender<AppEvent>,
) -> Result<JoinHandle<Result<()>>> {
let fut = tokio::spawn(async move {
let peripherals = adapter.peripherals().await?;
let cheststrap = peripherals
.into_iter()
.find(|p| p.address() == peripheral_address)
.ok_or(anyhow!("Peripheral not found"))?;

let fut = tokio::spawn(Self::peripheral_listener(cheststrap, tx));
Ok(fut)
}

pub async fn adapter_updater(
adapter: A,
devices: Arc<RwLock<Vec<DeviceDescriptor>>>,
) -> Result<()> {
loop {
let peripherals = adapter.peripherals().await?;
let cheststrap = peripherals
.into_iter()
.find(|p| p.address() == peripheral_address)
.ok_or(anyhow!("Peripheral not found"))?;
cheststrap.connect().await?;

cheststrap.discover_services().await?;

let char = cheststrap
.characteristics()
.iter()
.find(|c| c.uuid == HEARTRATE_MEASUREMENT_UUID)
.ok_or(anyhow!("Peripheral has no Heartrate attribute"))?
.clone();

cheststrap.subscribe(&char).await?;

let mut notification_stream = cheststrap.notifications().await?;
while let Some(data) = notification_stream.next().await {
if data.value.len() < 2
|| tx
.send(AppEvent::Measurement(MeasurementEvent::RecordMessage(
HeartrateMessage::new(&data.value),
)))
.is_err()
{
break;
let mut descriptors = Vec::new();
for peripheral in &peripherals {
let address = peripheral.address();
if let Ok(name) = peripheral.get_name().await {
descriptors.push(DeviceDescriptor { name, address });
}
}
warn!("BT transceiver terminated");
Err(anyhow!("listener terminated"))
});
Ok(fut)
// TODO: Send events when an error arises
descriptors.sort();
*devices.write().await = descriptors;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}
}

Expand Down Expand Up @@ -195,22 +222,7 @@ where
trace!("Scanning started on adapter {}.", adapter.get_name().await?);
let devices = self.devices.clone();
if self.peri_updater_handle.is_none() {
self.peri_updater_handle = Some(tokio::spawn(async move {
loop {
let peripherals = adapter.peripherals().await?;
let mut descriptors = Vec::new();
for peripheral in &peripherals {
let address = peripheral.address();
if let Ok(name) = peripheral.get_name().await {
descriptors.push(DeviceDescriptor { name, address });
}
}
// TODO: Send events when an error arises
descriptors.sort();
*devices.write().await = descriptors;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}));
self.peri_updater_handle = Some(tokio::spawn(Self::adapter_updater(adapter, devices)));
}
Ok(())
}
Expand Down Expand Up @@ -611,4 +623,74 @@ mod tests {
assert!(component.stop_scan().await.is_ok());
assert!(!component.scanning);
}
#[tokio::test]
async fn test_peripheral_listener() {
let (tx, _rx) = broadcast::channel(16);
let mut peripheral = MockPeripheral::default();

// Set up expectations for peripheral
peripheral.expect_connect().returning(|| Ok(()));
peripheral.expect_discover_services().returning(|| Ok(()));
peripheral.expect_characteristics().returning(|| {
let mut chars = BTreeSet::new();
chars.insert(Characteristic {
uuid: HEARTRATE_MEASUREMENT_UUID,
service_uuid: Uuid::nil(),
descriptors: BTreeSet::new(),
properties: Default::default(),
});
chars
});
peripheral.expect_subscribe().returning(|_| Ok(()));

// Create notification stream that sends one heartrate message
peripheral.expect_notifications().returning(|| {
Ok(Box::pin(futures::stream::once(async {
ValueNotification {
uuid: HEARTRATE_MEASUREMENT_UUID,
value: vec![0, 60], // Simple heartrate of 60 BPM
}
})))
});

let result = BluetoothComponent::<MockAdapter>::peripheral_listener(peripheral, tx).await;
assert!(result.is_err()); // Should error when stream ends
}

#[tokio::test]
async fn test_adapter_updater() {
let devices = Arc::new(RwLock::new(Vec::<DeviceDescriptor>::new()));
let mut adapter = MockAdapter::default();

adapter.expect_peripherals().returning(|| {
let mut peripheral = MockPeripheral::default();
peripheral.expect_address().returning(BDAddr::default);
peripheral
.expect_get_name()
.returning(|| Ok("TestDevice".to_string()));
Ok(vec![peripheral])
});

// Create future that will cancel adapter_updater after one iteration
let devices_clone = devices.clone();

// Run adapter_updater (will be cancelled by handle)
let hnd2 = tokio::spawn(BluetoothComponent::<MockAdapter>::adapter_updater(
adapter, devices,
));
let handle = tokio::spawn(async move {
// Give adapter_updater time to run one iteration
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;

// Verify devices were updated
let devices = devices_clone.read().await;
assert_eq!(devices.len(), 1);
assert_eq!(devices[0].name, "TestDevice");
assert_eq!(devices[0].address, BDAddr::default());
});

// Wait for verification
handle.await.unwrap();
hnd2.abort();
}
}

0 comments on commit 1686530

Please sign in to comment.