Skip to content

planetscale/airbyte-source

Repository files navigation

PlanetScale Airbyte Source

Getting Support

If you run into any issues with using this source, please reach out to PlanetScale Support at support@planetscale.com

About

This repository builds a docker container that is used a Source Connector in Airbyte.

Official documentation of connecting to PlanetScale databases from your Airbyte installation is available here

Self-hosting

Click here for docs to self-host this docker image.

How the container will be called:

The first argument passed to the image must be the command (e.g. spec, check, discover, read). Additional arguments can be passed after the command. Note: The system running the container will handle mounting the appropriate paths so that the config files are available to the container.

docker run --rm -i <source-image-name> spec

docker run --rm -i <source-image-name> check --config <config-file-path>

docker run --rm -i <source-image-name> discover --config <config-file-path>

docker run --rm -i <source-image-name> read --config <config-file-path> \
 --catalog <catalog-file-path> [--state <state-file-path>] > message_stream.json
Interface Pseudocode:
spec() -> ConnectorSpecification
check(Config) -> AirbyteConnectionStatus
discover(Config) -> AirbyteCatalog
read(Config, ConfiguredAirbyteCatalog, State) -> Stream<AirbyteMessage>

Features supported.

  1. Connection check for a given set of credentials.
  2. Schema fetch for a given database
  3. Reading all records for a given table.
  4. Incremental data fetch, based on VGTID.
  5. Support for sharded databases.
  6. Peek before Sync.

Running the application locally.

1. Check command:

Create a json file that has connection variables for the PlanetScale database, which looks like this:

{
    "host": "<FQDN for your PS database>",
    "database":"<default keyspace name>",
    "username":"<username>",
    "password":"<some password for your database>"
}

save it as source.json in this directory

Now, you can run go run main.go check --config source.json and should see an output like this :

 go run main.go check --config source.json | jq .
{
  "type": "CONNECTION_STATUS",
  "connectionStatus": {
    "status": "SUCCEEDED",
    "message": "Successfully connected to database planetscaledatabase at host 7hnhokoiid3c.us-east-3.psdb.cloud with username tzmqspqq1wrz"
  }
}

2. Discover command:

We need the same file as we do for read command above.

You can now run go run main.go discover --config source.json and see an output similar t:

go run main.go discover --config source.json | jq .
{
  "type": "CATALOG",
  "catalog": {
    "streams": [
      {
        "name": "departments",
        "json_schema": {
          "type": "object",
          "properties": {
            "dept_name": {
              "type": "string"
            },
            "dept_no": {
              "type": "string"
            }
          }
        },
        "supported_sync_modes": [
          "full_refresh"
        ],
        "namespace": "planetscaledatabase"
      }
    ]
  }
}

3. Read command:

Create a catalog.json file that looks similar to the output of "Discover", but with the format of:

{
  "streams": [
    {
      "stream": { // body of stream from above },
      "sync_mode": "full_refresh" // one of the supported_sync_modes from above
    }
  ]
}

For the example output above, a valid catalog.json might look like:

{
  "streams": [
    {
      "stream": {
        "name": "departments",
        "json_schema": {
          "type": "object",
          "properties": {
            "dept_name": {
              "type": "string"
            },
            "dept_no": {
              "type": "string"
            }
          }
        },
        "supported_sync_modes": [
          "full_refresh"
        ],
        "namespace": "planetscaledatabase"
      },
      "sync_mode": "full_refresh"
    }
  ]
}

Then run:

go run main.go read --config source.json --catalog catalog.json

3a. Starting from a specific GTID

You can start replication from a specific GTID per keyspace shard, by setting starting_gtids in your configuration file:

{
    "host": "<FQDN for your PS database>",
    "database":"<default keyspace name>",
    "username":"<username>",
    "password":"<some password for your database>",
    "starting_gtids": "{\"keyspace\": {\"shard\": \"MySQL56/MYGTID:1-3\"}}"
}

Note: When starting_gtids is specified in the configuration file, and a --state file is passed, the --state file will always take precedence. This is so incremental sync continues working.

How to get starting GTIDs

You can get the latest exectued GTID for every shard by querying your database.

  1. Access your PlanetScale database. One way to do so is to use pscale shell.
  2. Target the keyspace and shard that you would like the latest GTID for by doing use keyspace/shard.
    • i.e. use my_sharded_keyspace/-10
    • If your database is unsharded, you don't have to target a keyspace or shard. Skip this step.
  3. Execute select @@gtid_executed;
  4. You'll get a result that looks something like:
my_sharded_keyspace/-10> select @@gtid_executed\G
*************************** 1. row ***************************
@@`gtid_executed`: 16cec08d-f91b-11ee-8afb-aacaf4984ae5:1-5639808,
b13c2fe0-f91a-11ee-aa81-6251c27c9c24:1-2487289,
fe8e2a3c-f91a-11ee-9812-82f5834c1ba7:1-46602355
1 row in set (0.01 sec)
  1. Use the GTIDs returned to form the starting GTID for that shard (in this example, shard -10):
{"my_sharded_keyspace": {"-10": "MySQL56/16cec08d-f91b-11ee-8afb-aacaf4984ae5:1-5639808,b13c2fe0-f91a-11ee-aa81-6251c27c9c24:1-2487289,fe8e2a3c-f91a-11ee-9812-82f5834c1ba7:1-46602355"}}
  1. Repeat this process for all your shards, if your database is sharded.

Note: Remember to prepend the prefix MySQL56/ onto your starting GTIDs.

Interpreting logs

Airbyte logs will include logs from the source (this library), the Airbyte connector, and the destination. All source logs will be prefixed with [source] and PlanetScale Source ::.

Source-level logs

Checking connection The Airbyte source is checking if it can connect to the PlanetScale database.

state file detected, parsing provided file <file> A state JSON file was passed to the Airbyte source.

Syncing from tabletType <tabletType> The tablet type the Airbyte source is syncing from.

Table-level logs

syncing stream <table> with sync mode <sync mode> Beginning sync for table with the sync mode sync mode (i.e. incremental)

Shard-level logs

Shard-level logs will be prefixed with: [<keyspace>:<tablet_type>:<table> shard : <shard>].

peeking to see if there's any new rows The Airbyte source is checking if there are any new rows by comparing the current cursor position with the end (latest) cursor position.

new rows found, syncing rows for 1m0s New rows detected (current cursor position is behind end/latest cursor position)

syncing rows with cursor <start cursor> Airbyte source will attempt to sync starting from start cursor.

Syncing with cursor position : <start cursor>, using last known PK : <usingLastKnownPK>, stop cursor is : <stop cursor> Airbyte source will attempt to read from start cursor to stop cursor. usingLastKnownPK specifies whether or not the Airbyte source is syncing from a last known primary key.

DEBUG: SyncRequest.Cells = [<cells>] Vitess cells that the Airbyte source will sync from.

Finished reading <recordCount> records for table [<table>] Airbyte source has detected that the client has ended its connection. recordCount records were counted as having been sent to connector for table table.