Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[yugabyte] Yugabyte compatible implementation #1143

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,11 @@ jobs:
run: make down-locally
- name: Start local DSS instance
run: make start-locally
- name: (debug) logs
run: sleep 10 && make collect-local-logs && cat core-service-for-testing.log
- name: Probe local DSS instance
run: make probe-locally
- name: Run Qualifier against local DSS instance
run: make qualify-locally
- name: Bring down local DSS instance
run: make down-locally
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE INDEX s_cell_idx ON subscriptions USING ybgin (cells);
CREATE INDEX isa_cell_idx ON identification_service_areas USING ybgin (cells);
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- This migration drops ybgin indexes due to existing limitation to use it with other indexes.
-- https://docs.yugabyte.com/preview/explore/ysql-language-features/indexes-constraints/gin/#limitations.
DROP INDEX s_cell_idx;
DROP INDEX isa_cell_idx;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE INDEX sc_cells_idx ON scd_constraints USING ybgin (cells);
CREATE INDEX ss_cells_idx ON scd_subscriptions USING ybgin (cells);
CREATE INDEX so_cells_idx ON scd_operations USING ybgin (cells);
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- This migration drops ybgin indexes due to existing limitation to use it with other indexes.
-- https://docs.yugabyte.com/preview/explore/ysql-language-features/indexes-constraints/gin/#limitations.
DROP INDEX sc_cells_idx;
DROP INDEX ss_cells_idx;
DROP INDEX so_cells_idx;
2 changes: 2 additions & 0 deletions build/dev/docker-compose_dss.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ services:
volumes:
- $PWD/../test-certs:/var/test-certs:ro
- $PWD/startup/core_service.sh:/startup/core_service.sh:ro
environment:
COMPOSE_PROFILES: ${COMPOSE_PROFILES}
command: /startup/core_service.sh ${DEBUG_ON:-0}
ports:
- "4000:4000"
Expand Down
19 changes: 15 additions & 4 deletions build/dev/startup/core_service.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,21 @@

DEBUG_ON=${1:-0}

# POSIX compliant test to check if with-yugabyte profile is enabled.
if [ "${COMPOSE_PROFILES#*"with-yugabyte"}" != "${COMPOSE_PROFILES}" ]; then
echo "Using Yugabyte"
DATASTORE_CONNECTION="-cockroach_host local-dss-ybdb -cockroach_user yugabyte --cockroach_port 5433"
else
echo "Using CockroachDB"
DATASTORE_CONNECTION="-cockroach_host local-dss-crdb"
fi

if [ "$DEBUG_ON" = "1" ]; then
echo "Debug Mode: on"

dlv --headless --listen=:4000 --api-version=2 --accept-multiclient exec --continue /usr/bin/core-service -- \
-cockroach_host local-dss-crdb \
# Linter is disabled to properly unwrap $DATASTORE_CONNECTION.
# shellcheck disable=SC2086
dlv --headless --listen=:4000 --api-version=2 --accept-multiclient exec --continue /usr/bin/core-service -- ${DATASTORE_CONNECTION} \
-public_key_files /var/test-certs/auth2.pem \
-log_format console \
-dump_requests \
Expand All @@ -20,8 +30,9 @@ if [ "$DEBUG_ON" = "1" ]; then
else
echo "Debug Mode: off"

/usr/bin/core-service \
-cockroach_host local-dss-crdb \
# Linter is disabled to properly unwrap $DATASTORE_CONNECTION.
# shellcheck disable=SC2086
/usr/bin/core-service ${DATASTORE_CONNECTION} \
-public_key_files /var/test-certs/auth2.pem \
-log_format console \
-dump_requests \
Expand Down
13 changes: 9 additions & 4 deletions pkg/rid/store/cockroach/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import (
)

const (
// currentMajorSchemaVersion is the current major schema version.
currentMajorSchemaVersion = 4
// The current major schema version per datastore type.
currentCrdbMajorSchemaVersion = 4
currentYugabyteMajorSchemaVersion = 1

// Records expire if current time is <expiredDurationInMin> minutes more than records' endTime.
expiredDurationInMin = 30
Expand Down Expand Up @@ -91,8 +92,12 @@ func (s *Store) CheckCurrentMajorSchemaVersion(ctx context.Context) error {
return stacktrace.NewError("Remote ID database has not been bootstrapped with Schema Manager, Please check https://github.com/interuss/dss/tree/master/build#updgrading-database-schemas")
}

if currentMajorSchemaVersion != vs.Major {
return stacktrace.NewError("Unsupported schema version for remote ID! Got %s, requires major version of %d. Please check https://github.com/interuss/dss/tree/master/build#updgrading-database-schemas", vs, currentMajorSchemaVersion)
if s.db.Version.Type == datastore.CockroachDB && currentCrdbMajorSchemaVersion != vs.Major {
return stacktrace.NewError("Unsupported schema version for remote ID! Got %s, requires major version of %d. Please check https://github.com/interuss/dss/tree/master/build#updgrading-database-schemas", vs, currentCrdbMajorSchemaVersion)
}

if s.db.Version.Type == datastore.Yugabyte && currentYugabyteMajorSchemaVersion != vs.Major {
return stacktrace.NewError("Unsupported schema version for remote ID! Got %s, requires major version of %d. Please check https://github.com/interuss/dss/tree/master/build#updgrading-database-schemas", vs, currentYugabyteMajorSchemaVersion)
}

return nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/scd/store/cockroach/availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ func init() {
func (u *repo) UpsertUssAvailability(ctx context.Context, s *scdmodels.UssAvailabilityStatus) (*scdmodels.UssAvailabilityStatus, error) {
var (
upsertQuery = fmt.Sprintf(`
UPSERT INTO
INSERT INTO
scd_uss_availability
(%s)
VALUES
($1, $2, transaction_timestamp())
ON CONFLICT (id) DO UPDATE
SET availability = $2, updated_at = transaction_timestamp()
RETURNING
%s`, availabilityFieldsWithoutPrefix, availabilityFieldsWithPrefix)
)
Expand Down
32 changes: 29 additions & 3 deletions pkg/scd/store/cockroach/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,38 @@ func (c *repo) GetConstraint(ctx context.Context, id dssmodels.ID) (*scdmodels.C
func (c *repo) UpsertConstraint(ctx context.Context, s *scdmodels.Constraint) (*scdmodels.Constraint, error) {
var (
upsertQuery = fmt.Sprintf(`
UPSERT INTO
INSERT INTO
scd_constraints
(%s)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, transaction_timestamp())
ON CONFLICT (%s) DO UPDATE
SET %s = $2,
%s = $3,
%s = $4,
%s = $5,
%s = $6,
%s = $7,
%s = $8,
%s = $9,
%s = transaction_timestamp()
WHERE scd_constraints.%s = $1
RETURNING
%s`, constraintFieldsWithoutPrefix, constraintFieldsWithPrefix)
%s`,
constraintFieldsWithoutPrefix,
constraintFieldsWithIndices[0],
constraintFieldsWithIndices[1],
constraintFieldsWithIndices[2],
constraintFieldsWithIndices[3],
constraintFieldsWithIndices[4],
constraintFieldsWithIndices[5],
constraintFieldsWithIndices[6],
constraintFieldsWithIndices[7],
constraintFieldsWithIndices[8],
constraintFieldsWithIndices[9],
constraintFieldsWithIndices[0],
constraintFieldsWithPrefix,
)
)

cids, err := dsssql.CellUnionToCellIdsWithValidation(s.Cells)
Expand Down Expand Up @@ -203,7 +228,8 @@ func (c *repo) SearchConstraints(ctx context.Context, v4d *dssmodels.Volume4D) (
COALESCE(starts_at <= $3, true)
AND
COALESCE(ends_at >= $2, true)
LIMIT $4`, constraintFieldsWithoutPrefix)
LIMIT $4;
`, constraintFieldsWithoutPrefix)
)

// TODO: Lazily calculate & cache spatial covering so that it is only ever
Expand Down
37 changes: 34 additions & 3 deletions pkg/scd/store/cockroach/operational_intents.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,44 @@ func (s *repo) DeleteOperationalIntent(ctx context.Context, id dssmodels.ID) err
func (s *repo) UpsertOperationalIntent(ctx context.Context, operation *scdmodels.OperationalIntent) (*scdmodels.OperationalIntent, error) {
var (
upsertOperationsQuery = fmt.Sprintf(`
UPSERT INTO
INSERT INTO
scd_operations
(%s)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, transaction_timestamp(), $10, $11, $12, $13)
RETURNING
%s`, operationFieldsWithoutPrefix, operationFieldsWithPrefix)
ON CONFLICT (%s) DO UPDATE
SET %s = $2,
%s = $3,
%s = $4,
%s = $5,
%s = $6,
%s = $7,
%s = $8,
%s = $9,
%s = transaction_timestamp(),
%s = $10,
%s = $11,
%s = $12,
%s = $13
RETURNING
%s`,
operationFieldsWithoutPrefix,
operationFieldsWithIndices[0],
operationFieldsWithIndices[1],
operationFieldsWithIndices[2],
operationFieldsWithIndices[3],
operationFieldsWithIndices[4],
operationFieldsWithIndices[5],
operationFieldsWithIndices[6],
operationFieldsWithIndices[7],
operationFieldsWithIndices[8],
operationFieldsWithIndices[9],
operationFieldsWithIndices[10],
operationFieldsWithIndices[11],
operationFieldsWithIndices[12],
operationFieldsWithIndices[13],
operationFieldsWithPrefix,
)
)

cids := make([]int64, len(operation.Cells))
Expand Down
13 changes: 9 additions & 4 deletions pkg/scd/store/cockroach/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
)

const (
// currentMajorSchemaVersion is the current major schema version.
currentMajorSchemaVersion = 3
// The current major schema version per datastore type.
currentCrdbMajorSchemaVersion = 3
currentYugabyteMajorSchemaVersion = 1
)

var (
Expand Down Expand Up @@ -66,8 +67,12 @@ func (s *Store) CheckCurrentMajorSchemaVersion(ctx context.Context) error {
return stacktrace.NewError("Strategic conflict detection database has not been bootstrapped with Schema Manager, Please check https://github.com/interuss/dss/tree/master/build#upgrading-database-schemas")
}

if currentMajorSchemaVersion != vs.Major {
return stacktrace.NewError("Unsupported schema version for strategic conflict detection! Got %s, requires major version of %d. Please check https://github.com/interuss/dss/tree/master/build#upgrading-database-schemas", vs, currentMajorSchemaVersion)
if s.db.Version.Type == datastore.CockroachDB && currentCrdbMajorSchemaVersion != vs.Major {
return stacktrace.NewError("Unsupported schema version for strategic conflict detection! Got %s, requires major version of %d. Please check https://github.com/interuss/dss/tree/master/build#upgrading-database-schemas", vs, currentCrdbMajorSchemaVersion)
}

if s.db.Version.Type == datastore.Yugabyte && currentYugabyteMajorSchemaVersion != vs.Major {
return stacktrace.NewError("Unsupported schema version for strategic conflict detection! Got %s, requires major version of %d. Please check https://github.com/interuss/dss/tree/master/build#upgrading-database-schemas", vs, currentYugabyteMajorSchemaVersion)
}

return nil
Expand Down
35 changes: 31 additions & 4 deletions pkg/scd/store/cockroach/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (c *repo) fetchSubscriptionByID(ctx context.Context, q dsssql.Queryable, id
FROM
scd_subscriptions
WHERE
id = $1`, subscriptionFieldsWithPrefix)
id = $1::uuid`, subscriptionFieldsWithPrefix)
)
uid, err := id.PgUUID()
if err != nil {
Expand Down Expand Up @@ -185,15 +185,42 @@ func (c *repo) pushSubscription(ctx context.Context, q dsssql.Queryable, s *scdm
FROM
scd_subscriptions
WHERE
id = $1
id = $1::uuid
)
UPSERT INTO
INSERT INTO
scd_subscriptions
(%s)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, transaction_timestamp())
ON CONFLICT (%s) DO UPDATE
SET %s = $2,
%s = $3::int,
%s = $4,
%s = $5::int,
%s = $6,
%s = $7,
%s = $8::bool,
%s = $9,
%s = $10,
%s = $11,
%s = transaction_timestamp()
RETURNING
%s`, subscriptionFieldsWithoutPrefix, subscriptionFieldsWithPrefix)
%s`,
subscriptionFieldsWithoutPrefix,
subscriptionFieldsWithIndices[0],
subscriptionFieldsWithIndices[1],
subscriptionFieldsWithIndices[2],
subscriptionFieldsWithIndices[3],
subscriptionFieldsWithIndices[4],
subscriptionFieldsWithIndices[5],
subscriptionFieldsWithIndices[6],
subscriptionFieldsWithIndices[7],
subscriptionFieldsWithIndices[8],
subscriptionFieldsWithIndices[9],
subscriptionFieldsWithIndices[10],
subscriptionFieldsWithIndices[11],
subscriptionFieldsWithPrefix,
)
)

cids := make([]int64, len(s.Cells))
Expand Down
Loading