From d46605e7b7b7ba62a6b64b5874001e23bad74e2e Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Thu, 12 Dec 2024 14:14:32 +0800 Subject: [PATCH] fix: drop channel if partition receiver finished (#17037) * fix: partition receiver should implement on_finish * fix: partition receiver should implement on_finish --------- Co-authored-by: Winter Zhang --- .../pipeline/sources/src/async_source.rs | 20 ++++++++++- .../read/block_partition_receiver_source.rs | 36 ++++++++++++------- .../pruning_pipeline/send_part_info_sink.rs | 4 ++- 3 files changed, 45 insertions(+), 15 deletions(-) diff --git a/src/query/pipeline/sources/src/async_source.rs b/src/query/pipeline/sources/src/async_source.rs index 84bf331d0df49..77360d4045e2d 100644 --- a/src/query/pipeline/sources/src/async_source.rs +++ b/src/query/pipeline/sources/src/async_source.rs @@ -38,6 +38,11 @@ pub trait AsyncSource: Send { fn un_reacted(&self) -> Result<()> { Ok(()) } + + #[async_backtrace::framed] + async fn on_finish(&mut self) -> Result<()> { + Ok(()) + } } // TODO: This can be refactored using proc macros @@ -50,6 +55,7 @@ pub struct AsyncSourcer { output: Arc, scan_progress: Arc, generated_data: Option, + called_on_finish: bool, } impl AsyncSourcer { @@ -65,6 +71,7 @@ impl AsyncSourcer { scan_progress, is_finish: false, generated_data: None, + called_on_finish: false, }))) } } @@ -81,12 +88,16 @@ impl Processor for AsyncSourcer { fn event(&mut self) -> Result { if self.is_finish { + if !self.called_on_finish { + return Ok(Event::Async); + } self.output.finish(); return Ok(Event::Finished); } if self.output.is_finished() { - return Ok(Event::Finished); + self.is_finish = true; + return Ok(Event::Async); } if !self.output.can_push() { @@ -112,6 +123,13 @@ impl Processor for AsyncSourcer { #[async_backtrace::framed] async fn async_process(&mut self) -> Result<()> { + if self.is_finish { + if !self.called_on_finish { + self.called_on_finish = true; + self.inner.on_finish().await?; + } + return Ok(()); + } match self.inner.generate().await? { None => self.is_finish = true, Some(data_block) => { diff --git a/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs b/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs index 9922d1651c63a..f4fc7f3172bca 100644 --- a/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs +++ b/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs @@ -27,7 +27,7 @@ use databend_common_pipeline_sources::AsyncSourcer; use crate::operations::read::block_partition_meta::BlockPartitionMeta; pub struct BlockPartitionReceiverSource { - pub meta_receiver: Receiver>, + pub meta_receiver: Option>>, } impl BlockPartitionReceiverSource { @@ -37,7 +37,7 @@ impl BlockPartitionReceiverSource { output_port: Arc, ) -> Result { AsyncSourcer::create(ctx, output_port, Self { - meta_receiver: receiver, + meta_receiver: Some(receiver), }) } } @@ -49,18 +49,28 @@ impl AsyncSource for BlockPartitionReceiverSource { #[async_backtrace::framed] async fn generate(&mut self) -> Result> { - match self.meta_receiver.recv().await { - Ok(Ok(part)) => Ok(Some(DataBlock::empty_with_meta( - BlockPartitionMeta::create(vec![part]), - ))), - Ok(Err(e)) => Err( - // The error is occurred in pruning process - e, - ), - Err(_) => { - // The channel is closed, we should return None to stop generating - Ok(None) + if let Some(rx) = &self.meta_receiver { + match rx.recv().await { + Ok(Ok(part)) => Ok(Some(DataBlock::empty_with_meta( + BlockPartitionMeta::create(vec![part]), + ))), + Ok(Err(e)) => Err( + // The error is occurred in pruning process + e, + ), + Err(_) => { + // The channel is closed, we should return None to stop generating + Ok(None) + } } + } else { + Ok(None) } } + + #[async_backtrace::framed] + async fn on_finish(&mut self) -> Result<()> { + drop(self.meta_receiver.take()); + Ok(()) + } } diff --git a/src/query/storages/fuse/src/pruning_pipeline/send_part_info_sink.rs b/src/query/storages/fuse/src/pruning_pipeline/send_part_info_sink.rs index be58d04fd5029..c0a97d821dabf 100644 --- a/src/query/storages/fuse/src/pruning_pipeline/send_part_info_sink.rs +++ b/src/query/storages/fuse/src/pruning_pipeline/send_part_info_sink.rs @@ -189,7 +189,9 @@ impl AsyncSink for SendPartInfoSink { for info in info_ptr { if let Some(sender) = &self.sender { - let _ = sender.send(Ok(info)).await; + if let Err(_e) = sender.send(Ok(info)).await { + break; + } } }