Skip to content

Commit

Permalink
Avoid potential memory leak when dropping mqtt clients (#464)
Browse files Browse the repository at this point in the history
* Improve docs for mqtt/client

* Improve docs for private/zerocopy

* Avoid potential memory leak when dropping mqtt clients

* Improve docs for private/zerocopy

* Fix grammar

* Clarify docs in private/zerocopy

* Improve docs for Unblocker

* Replace vTaskDelete with task::destroy

* Use core instead of std
  • Loading branch information
HalfVoxel authored Jul 28, 2024
1 parent 6b2d9bd commit 35e59fa
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 14 deletions.
11 changes: 11 additions & 0 deletions src/mqtt/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,8 @@ impl EspAsyncMqttClient {
topic: Option<&str>,
payload: Option<&[u8]>,
) -> Result<MessageId, EspError> {
// Get the shared reference to the work item (as processed by the Self::work thread),
// and replace it with the next work item we want to process.
let work = self.0.exec_in_out().await.unwrap();

work.command = command;
Expand All @@ -814,21 +816,27 @@ impl EspAsyncMqttClient {
work.payload.extend_from_slice(payload);
}

// Signal the worker thread that it can process the work item.
self.0.do_exec().await;

// Wait for the worker thread to finish and return the result.
let work = self.0.exec_in_out().await.unwrap();

work.result
}

fn work(channel: Arc<Channel<AsyncWork>>, mut client: EspMqttClient) {
// Placeholder work item. This will be replaced by the first actual work item.
let mut work = AsyncWork {
command: AsyncCommand::Unsubscribe,
topic: alloc::vec::Vec::new(),
payload: alloc::vec::Vec::new(),
result: Ok(0),
};

// Repeatedly share a reference to the work until the channel is closed.
// The receiver will replace the data with the next work item, then wait for
// this thread to process it by calling into the C library and write the result.
while channel.share(&mut work) {
let topic = unsafe { core::ffi::CStr::from_bytes_with_nul_unchecked(&work.topic) };

Expand Down Expand Up @@ -984,7 +992,10 @@ impl<'a> EspMqttEvent<'a> {
}
}

/// SAFETY: EspMqttEvent contains no thread-specific data.
unsafe impl<'a> Send for EspMqttEvent<'a> {}

/// SAFETY: EspMqttEvent is a read-only struct, so sharing it between threads is fine.
unsafe impl<'a> Sync for EspMqttEvent<'a> {}

impl<'a> ErrorType for EspMqttEvent<'a> {
Expand Down
43 changes: 29 additions & 14 deletions src/private/unblocker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@ where
T: Send + 'static,
{
receiver: Receiver<T>,
task: TaskHandle_t,
}

/// SAFETY: Unblocker uses a raw pointer in Unblocker::task. This causes rust to treat it as not Send.
/// However, this task handle is only ever used to delete the FreeRTOS task, which is safe to do from any thread.
unsafe impl<T: Send + 'static> Send for Unblocker<T> {}

impl<T> Unblocker<T>
where
T: Send + 'static,
{
/// Create a new Unblocker
///
/// # Arguments
///
/// * `task_name` - The name of the task.
/// * `stack_size` - The size of the stack for the task, in bytes.
/// * `priority` - The priority of the task.
/// * `pin_to_core` - The core to pin the task to, if any.
/// * `worker` - The function to run in the task. A channel will be passed to the worker with all events.
/// The channel will be closed when the Unblocker is dropped, at which point the worker should exit.
pub fn new<F>(
task_name: &'static CStr,
stack_size: usize,
Expand All @@ -47,7 +52,7 @@ where

let worker: *mut Box<dyn FnOnce() + Send + 'static> = Box::into_raw(worker);

let task = unsafe {
let _task = unsafe {
task::create(
Self::work,
task_name,
Expand All @@ -58,10 +63,11 @@ where
)
}
.inspect_err(|_| unsafe {
// Avoid memory leak if task creation fails
Box::from_raw(worker);
})?;

Ok(Self { receiver, task })
Ok(Self { receiver })
}

pub async fn exec_in_out(&mut self) -> Option<&mut T> {
Expand All @@ -73,10 +79,22 @@ where
}

extern "C" fn work(arg: *mut core::ffi::c_void) {
let worker: Box<Box<dyn FnOnce() + Send + 'static>> =
unsafe { Box::from_raw(arg as *mut _) };
{
let worker: Box<Box<dyn FnOnce() + Send + 'static>> =
unsafe { Box::from_raw(arg as *mut _) };

worker();
worker();
}
unsafe {
// FreeRTOS tasks must delete themselves. Returning from the task function
// without deleting the task will cause a crash.
// This function will immediately stop running. This means we have to ensure
// that all memory is freed before calling vTaskDelete.
// Fortunately, this function does not manage any memory at this point.
// The scope above guarantees that all relevant variables are dropped at this point.
// See https://www.freertos.org/implementing-a-FreeRTOS-task.html
task::destroy(core::ptr::null_mut());
}
}
}

Expand All @@ -85,10 +103,7 @@ where
T: Send + 'static,
{
fn drop(&mut self) {
// This should cause the worker task to exit
self.receiver.done();

unsafe {
task::destroy(self.task);
}
}
}
27 changes: 27 additions & 0 deletions src/private/zerocopy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ where
}
}

/// SAFETY: The receiver will be able to access mutable state from other threads (even from their stack),
/// however, the channel will ensure that the state is only accessed by one receiver at a time,
/// and that the mutable state does not disappear while the receiver is using it.
///
/// Even though Receiver<T> requires T: 'static, we need to implement this for all lifetimes.
/// This is due to a rustc bug: https://github.com/rust-lang/rust/issues/110338
unsafe impl<'a, T> Send for Receiver<T> where T: Send + 'a {}

pub struct QuitOnDrop<T>(Arc<Channel<T>>)
Expand Down Expand Up @@ -138,6 +144,14 @@ where
(this, receiver)
}

/// Share a mutable reference, that the receiver can read or write to.
///
/// This will block until the receiver has processed the data, or the channel is closed.
///
/// Returns `true` if the data has been processed by a receiver, and `false` if the channel was closed.
///
/// This allows different threads to communicate without passing the data via the heap.
/// Instead, a sender can share a mutable reference *from its own stack* with a receiver.
pub fn share(&self, mut data: &mut T) -> bool {
self.set(State::Data(data))
}
Expand Down Expand Up @@ -182,7 +196,20 @@ where
}
}

/// SAFETY: The channel uses a mutex to synchronize access to the shared state.
/// The shared state also contain a raw pointer, which can point into the stack of the sender thread.
/// Despite this, the channel is constructed to be safe to send between threads.
///
/// Even though Channel<T> requires T: 'static, we need to implement this for all lifetimes.
/// This is due to a rustc bug: https://github.com/rust-lang/rust/issues/110338
unsafe impl<'a, T> Send for Channel<T> where T: Send + 'a {}

/// SAFETY: The channel uses a mutex to synchronize access to the shared state.
/// The shared state also contain a raw pointer, which can point into the stack of the sender thread.
/// Despite this, the channel is constructed to be safe to shared between threads.
///
/// Even though Channel<T> requires T: 'static, we need to implement this for all lifetimes.
/// This is due to a rustc bug: https://github.com/rust-lang/rust/issues/110338
unsafe impl<'a, T> Sync for Channel<T> where T: Send + 'a {}

#[derive(Copy, Clone, Debug)]
Expand Down

0 comments on commit 35e59fa

Please sign in to comment.