Skip to content

Commit

Permalink
fix(flow): deal with flow drop leftover (#5391)
Browse files Browse the repository at this point in the history
* fix: deal with flow drop leftover

* chore: make it warn

* chore: apply suggestion.

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* chore: review

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
3 people authored Jan 19, 2025
1 parent d072801 commit 87c21e2
Showing 1 changed file with 19 additions and 2 deletions.
21 changes: 19 additions & 2 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,29 @@ impl FlowWorkerManager {
let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
let ctx = Arc::new(QueryContext::with(&catalog, &schema));

let (is_ts_placeholder, proto_schema) = self
let (is_ts_placeholder, proto_schema) = match self
.try_fetch_existing_table(&table_name)
.await?
.context(UnexpectedSnafu {
reason: format!("Table not found: {}", table_name.join(".")),
})?;
}) {
Ok(r) => r,
Err(e) => {
if self
.table_info_source
.get_opt_table_id_from_name(&table_name)
.await?
.is_none()
{
// deal with both flow&sink table no longer exists
// but some output is still in output buf
common_telemetry::warn!(e; "Table `{}` no longer exists, skip writeback", table_name.join("."));
continue;
} else {
return Err(e);
}
}
};
let schema_len = proto_schema.len();

let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();
Expand Down

0 comments on commit 87c21e2

Please sign in to comment.