From 1686530180cc04a1694183e55ad81fee525108d7 Mon Sep 17 00:00:00 2001 From: Mattis Kieffer Date: Sun, 8 Dec 2024 21:30:18 +0100 Subject: [PATCH] move update task closures to functions for testing --- src/components/bluetooth.rs | 182 ++++++++++++++++++++++++++---------- 1 file changed, 132 insertions(+), 50 deletions(-) diff --git a/src/components/bluetooth.rs b/src/components/bluetooth.rs index d3919cc..1c91905 100644 --- a/src/components/bluetooth.rs +++ b/src/components/bluetooth.rs @@ -9,9 +9,9 @@ use crate::core::events::{AppEvent, MeasurementEvent}; use crate::model::bluetooth::AdapterDescriptor; use crate::model::bluetooth::{DeviceDescriptor, HeartrateMessage}; use anyhow::{anyhow, Result}; -use btleplug::api::Peripheral; + use btleplug::{ - api::{BDAddr, Central, Manager as _}, + api::{BDAddr, Central, Manager as _, Peripheral}, platform::{Adapter, Manager}, }; @@ -101,46 +101,73 @@ where listening: None, } } + + pub async fn peripheral_listener( + cheststrap: T, + tx: Sender, + ) -> Result<()> { + cheststrap.connect().await?; + + cheststrap.discover_services().await?; + + let char = cheststrap + .characteristics() + .iter() + .find(|c| c.uuid == HEARTRATE_MEASUREMENT_UUID) + .ok_or(anyhow!("Peripheral has no Heartrate attribute"))? + .clone(); + + cheststrap.subscribe(&char).await?; + + let mut notification_stream = cheststrap.notifications().await?; + while let Some(data) = notification_stream.next().await { + if data.value.len() < 2 + || tx + .send(AppEvent::Measurement(MeasurementEvent::RecordMessage( + HeartrateMessage::new(&data.value), + ))) + .is_err() + { + break; + } + } + warn!("BT transceiver terminated"); + Err(anyhow!("listener terminated")) + } + pub async fn listen_to_peripheral( adapter: A, peripheral_address: BDAddr, tx: Sender, ) -> Result>> { - let fut = tokio::spawn(async move { + let peripherals = adapter.peripherals().await?; + let cheststrap = peripherals + .into_iter() + .find(|p| p.address() == peripheral_address) + .ok_or(anyhow!("Peripheral not found"))?; + + let fut = tokio::spawn(Self::peripheral_listener(cheststrap, tx)); + Ok(fut) + } + + pub async fn adapter_updater( + adapter: A, + devices: Arc>>, + ) -> Result<()> { + loop { let peripherals = adapter.peripherals().await?; - let cheststrap = peripherals - .into_iter() - .find(|p| p.address() == peripheral_address) - .ok_or(anyhow!("Peripheral not found"))?; - cheststrap.connect().await?; - - cheststrap.discover_services().await?; - - let char = cheststrap - .characteristics() - .iter() - .find(|c| c.uuid == HEARTRATE_MEASUREMENT_UUID) - .ok_or(anyhow!("Peripheral has no Heartrate attribute"))? - .clone(); - - cheststrap.subscribe(&char).await?; - - let mut notification_stream = cheststrap.notifications().await?; - while let Some(data) = notification_stream.next().await { - if data.value.len() < 2 - || tx - .send(AppEvent::Measurement(MeasurementEvent::RecordMessage( - HeartrateMessage::new(&data.value), - ))) - .is_err() - { - break; + let mut descriptors = Vec::new(); + for peripheral in &peripherals { + let address = peripheral.address(); + if let Ok(name) = peripheral.get_name().await { + descriptors.push(DeviceDescriptor { name, address }); } } - warn!("BT transceiver terminated"); - Err(anyhow!("listener terminated")) - }); - Ok(fut) + // TODO: Send events when an error arises + descriptors.sort(); + *devices.write().await = descriptors; + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + } } } @@ -195,22 +222,7 @@ where trace!("Scanning started on adapter {}.", adapter.get_name().await?); let devices = self.devices.clone(); if self.peri_updater_handle.is_none() { - self.peri_updater_handle = Some(tokio::spawn(async move { - loop { - let peripherals = adapter.peripherals().await?; - let mut descriptors = Vec::new(); - for peripheral in &peripherals { - let address = peripheral.address(); - if let Ok(name) = peripheral.get_name().await { - descriptors.push(DeviceDescriptor { name, address }); - } - } - // TODO: Send events when an error arises - descriptors.sort(); - *devices.write().await = descriptors; - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - } - })); + self.peri_updater_handle = Some(tokio::spawn(Self::adapter_updater(adapter, devices))); } Ok(()) } @@ -611,4 +623,74 @@ mod tests { assert!(component.stop_scan().await.is_ok()); assert!(!component.scanning); } + #[tokio::test] + async fn test_peripheral_listener() { + let (tx, _rx) = broadcast::channel(16); + let mut peripheral = MockPeripheral::default(); + + // Set up expectations for peripheral + peripheral.expect_connect().returning(|| Ok(())); + peripheral.expect_discover_services().returning(|| Ok(())); + peripheral.expect_characteristics().returning(|| { + let mut chars = BTreeSet::new(); + chars.insert(Characteristic { + uuid: HEARTRATE_MEASUREMENT_UUID, + service_uuid: Uuid::nil(), + descriptors: BTreeSet::new(), + properties: Default::default(), + }); + chars + }); + peripheral.expect_subscribe().returning(|_| Ok(())); + + // Create notification stream that sends one heartrate message + peripheral.expect_notifications().returning(|| { + Ok(Box::pin(futures::stream::once(async { + ValueNotification { + uuid: HEARTRATE_MEASUREMENT_UUID, + value: vec![0, 60], // Simple heartrate of 60 BPM + } + }))) + }); + + let result = BluetoothComponent::::peripheral_listener(peripheral, tx).await; + assert!(result.is_err()); // Should error when stream ends + } + + #[tokio::test] + async fn test_adapter_updater() { + let devices = Arc::new(RwLock::new(Vec::::new())); + let mut adapter = MockAdapter::default(); + + adapter.expect_peripherals().returning(|| { + let mut peripheral = MockPeripheral::default(); + peripheral.expect_address().returning(BDAddr::default); + peripheral + .expect_get_name() + .returning(|| Ok("TestDevice".to_string())); + Ok(vec![peripheral]) + }); + + // Create future that will cancel adapter_updater after one iteration + let devices_clone = devices.clone(); + + // Run adapter_updater (will be cancelled by handle) + let hnd2 = tokio::spawn(BluetoothComponent::::adapter_updater( + adapter, devices, + )); + let handle = tokio::spawn(async move { + // Give adapter_updater time to run one iteration + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + // Verify devices were updated + let devices = devices_clone.read().await; + assert_eq!(devices.len(), 1); + assert_eq!(devices[0].name, "TestDevice"); + assert_eq!(devices[0].address, BDAddr::default()); + }); + + // Wait for verification + handle.await.unwrap(); + hnd2.abort(); + } }