Skip to content

Commit

Permalink
chore: cherry-pick #18941 and #19172 (SWAP) to release 2.1 (#19287)
Browse files Browse the repository at this point in the history
Co-authored-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
yezizp2012 and BugenZhao authored Nov 8, 2024
1 parent 854b497 commit b207431
Show file tree
Hide file tree
Showing 148 changed files with 1,676 additions and 935 deletions.
2 changes: 1 addition & 1 deletion .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ header:
- "src/sqlparser/**/*.rs"
- "java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/internal/*.java"
- "java/connector-node/risingwave-sink-iceberg/src/main/java/org/apache/iceberg/**/*.java"
- "src/meta/model_v2/migration/**/*.rs"
- "src/meta/model/migration/**/*.rs"
- "lints/ui/**"

comment: on-failure
26 changes: 13 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ members = [
"src/license",
"src/meta",
"src/meta/dashboard",
"src/meta/model_v2",
"src/meta/model_v2/migration",
"src/meta/model",
"src/meta/model/migration",
"src/meta/node",
"src/meta/service",
"src/object_store",
Expand Down Expand Up @@ -228,8 +228,8 @@ risingwave_mem_table_spill_test = { path = "./src/stream/spill_test" }
risingwave_meta = { path = "./src/meta" }
risingwave_meta_dashboard = { path = "./src/meta/dashboard" }
risingwave_meta_service = { path = "./src/meta/service" }
risingwave_meta_model_migration = { path = "src/meta/model_v2/migration" }
risingwave_meta_model_v2 = { path = "./src/meta/model_v2" }
risingwave_meta_model = { path = "src/meta/model" }
risingwave_meta_model_migration = { path = "src/meta/model/migration" }
risingwave_meta_node = { path = "./src/meta/node" }
risingwave_object_store = { path = "./src/object_store" }
risingwave_pb = { path = "./src/prost" }
Expand Down
181 changes: 181 additions & 0 deletions e2e_test/ddl/alter_swap_rename.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

# Create initial tables and views for testing swap
statement ok
CREATE TABLE t1 (v1 INT primary key, v2 STRUCT<v1 INT, v2 STRUCT<v1 INT, v2 INT>>);

statement ok
CREATE TABLE t2 (v1 INT primary key, v2 STRUCT<v1 INT, v2 STRUCT<v1 INT, v2 INT>>);

# Insert some test data
statement ok
INSERT INTO t1 VALUES(1,(1,(1,2)));

statement ok
INSERT INTO t2 VALUES(2,(2,(2,4)));

# Create materialized views referencing the tables
statement ok
CREATE MATERIALIZED VIEW mv1 AS SELECT v1, (t.v2).v1 AS v21 FROM t1 t;

statement ok
CREATE MATERIALIZED VIEW mv2 AS SELECT v1, (t.v2).v1 AS v21 FROM t2 t;

# Create regular views
statement ok
CREATE VIEW v1 AS SELECT t1.v1 FROM t1;

statement ok
CREATE VIEW v2 AS SELECT t2.v2 FROM t2;

# Create sources
statement ok
CREATE SOURCE src1 (v INT) WITH (
connector = 'datagen',
fields.v.kind = 'sequence',
fields.v.start = '1',
fields.v.end = '5',
datagen.rows.per.second='10',
datagen.split.num = '1'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE src2 (v INT) WITH (
connector = 'datagen',
fields.v.kind = 'sequence',
fields.v.start = '6',
fields.v.end = '10',
datagen.rows.per.second='10',
datagen.split.num = '1'
) FORMAT PLAIN ENCODE JSON;

# Create sinks
statement ok
CREATE SINK sink1 AS SELECT * FROM mv1 WITH (
connector = 'blackhole'
);

statement ok
CREATE SINK sink2 AS SELECT * FROM mv2 WITH (
connector = 'blackhole'
);

# Create subscriptions
statement ok
CREATE SUBSCRIPTION sub1 FROM mv1 WITH (
retention = '1D'
);

statement ok
CREATE SUBSCRIPTION sub2 FROM mv2 WITH (
retention = '1D'
);

# Test table swap
statement ok
ALTER TABLE t1 SWAP WITH t2;

statement error Permission denied
ALTER TABLE t1 SWAP WITH mv1;

statement error not found
ALTER TABLE mv1 SWAP WITH mv2;

query II rowsort
SELECT * FROM t1;
----
2 (2,"(2,4)")

query II rowsort
SELECT * FROM t2;
----
1 (1,"(1,2)")

# Test materialized view swap
statement ok
ALTER MATERIALIZED VIEW mv1 SWAP WITH mv2;

# Verify materialized view contents
query II rowsort
SELECT * FROM mv1;
----
2 2

query II rowsort
SELECT * FROM mv2;
----
1 1

# Test view swap
statement ok
ALTER VIEW v1 SWAP WITH v2;

# Verify view definitions are swapped
query TT
SHOW CREATE VIEW v1;
----
public.v1 CREATE VIEW v1 AS SELECT t2.v2 FROM t1 AS t2

query TT
SHOW CREATE VIEW v2;
----
public.v2 CREATE VIEW v2 AS SELECT t1.v1 FROM t2 AS t1

# Test source swap
statement ok
ALTER SOURCE src1 SWAP WITH src2;

# Verify source definitions are swapped
query TT
SHOW CREATE SOURCE src1;
----
public.src1 CREATE SOURCE src1 (v INT) WITH (connector = 'datagen', fields.v.kind = 'sequence', fields.v.start = '6', fields.v.end = '10', datagen.rows.per.second = '10', datagen.split.num = '1') FORMAT PLAIN ENCODE JSON

query TT
SHOW CREATE SOURCE src2;
----
public.src2 CREATE SOURCE src2 (v INT) WITH (connector = 'datagen', fields.v.kind = 'sequence', fields.v.start = '1', fields.v.end = '5', datagen.rows.per.second = '10', datagen.split.num = '1') FORMAT PLAIN ENCODE JSON

# Test sink swap
statement ok
ALTER SINK sink1 SWAP WITH sink2;

# Verify sink definitions are swapped
query TT
SHOW CREATE SINK sink1;
----
public.sink1 CREATE SINK sink1 AS SELECT * FROM mv1 AS mv2 WITH (connector = 'blackhole')

query TT
SHOW CREATE SINK sink2;
----
public.sink2 CREATE SINK sink2 AS SELECT * FROM mv2 AS mv1 WITH (connector = 'blackhole')

# Test subscription swap
statement ok
ALTER SUBSCRIPTION sub1 SWAP WITH sub2;

# Verify subscription definitions are swapped
query TT
SHOW CREATE SUBSCRIPTION sub1;
----
public.sub1 CREATE SUBSCRIPTION sub1 FROM mv1 WITH (retention = '1D')

query TT
SHOW CREATE SUBSCRIPTION sub2;
----
public.sub2 CREATE SUBSCRIPTION sub2 FROM mv2 WITH (retention = '1D')

# Clean up
statement ok
DROP SOURCE src1;

statement ok
DROP SOURCE src2;

statement ok
DROP TABLE t1 CASCADE;

statement ok
DROP TABLE t2 CASCADE;
21 changes: 21 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,26 @@ message AlterOwnerResponse {
WaitVersion version = 2;
}

message AlterSwapRenameRequest {
message ObjectNameSwapPair {
uint32 src_object_id = 1;
uint32 dst_object_id = 2;
}
oneof object {
ObjectNameSwapPair schema = 1;
ObjectNameSwapPair table = 2;
ObjectNameSwapPair view = 3;
ObjectNameSwapPair source = 4;
ObjectNameSwapPair sink = 5;
ObjectNameSwapPair subscription = 6;
}
}

message AlterSwapRenameResponse {
common.Status status = 1;
WaitVersion version = 2;
}

message CreateFunctionRequest {
catalog.Function function = 1;
}
Expand Down Expand Up @@ -513,4 +533,5 @@ service DdlService {
rpc Wait(WaitRequest) returns (WaitResponse);
rpc CommentOn(CommentOnRequest) returns (CommentOnResponse);
rpc AutoSchemaChange(AutoSchemaChangeRequest) returns (AutoSchemaChangeResponse);
rpc AlterSwapRename(AlterSwapRenameRequest) returns (AlterSwapRenameResponse);
}
2 changes: 1 addition & 1 deletion src/ctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ risingwave_connector = { workspace = true }
risingwave_frontend = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
risingwave_meta = { workspace = true }
risingwave_meta_model = { workspace = true }
risingwave_meta_model_migration = { workspace = true }
risingwave_meta_model_v2 = { workspace = true }
risingwave_object_store = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/meta/reschedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use anyhow::{anyhow, Result};
use inquire::Confirm;
use itertools::Itertools;
use regex::Regex;
use risingwave_meta_model_v2::WorkerId;
use risingwave_meta_model::WorkerId;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::meta::{GetClusterInfoResponse, PbWorkerReschedule};
use serde::{Deserialize, Serialize};
Expand Down
12 changes: 10 additions & 2 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use risingwave_pb::catalog::{
PbSubscription, PbTable, PbView,
};
use risingwave_pb::ddl_service::{
alter_name_request, alter_owner_request, alter_set_schema_request, create_connection_request,
PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType, WaitVersion,
alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request,
create_connection_request, PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType,
WaitVersion,
};
use risingwave_pb::meta::PbTableParallelism;
use risingwave_pb::stream_plan::StreamFragmentGraph;
Expand Down Expand Up @@ -197,6 +198,8 @@ pub trait CatalogWriter: Send + Sync {
object: alter_set_schema_request::Object,
new_schema_id: u32,
) -> Result<()>;

async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
}

#[derive(Clone)]
Expand Down Expand Up @@ -498,6 +501,11 @@ impl CatalogWriter for CatalogWriterImpl {

Ok(())
}

async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
let version = self.meta_client.alter_swap_rename(object).await?;
self.wait_version(version).await
}
}

impl CatalogWriterImpl {
Expand Down
Loading

0 comments on commit b207431

Please sign in to comment.