Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making this more dev friendly and quick setup #391

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.git
.github
venv
13 changes: 13 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM apache/airflow:1.10.15-python3.8

RUN python -m pip install --upgrade pip

ENV DAGS_FOLDER=/opt/airflow/dags/repo/dags

COPY elaborate-baton-357506-f9435b87997e.json /usr/sa.json

USER airflow

COPY requirements.txt requirements.txt

RUN pip install -r requirements.txt
147 changes: 120 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,62 +1,155 @@
# Ethereum ETL Airflow

Read this article: https://cloud.google.com/blog/products/data-analytics/ethereum-bigquery-how-we-built-dataset
Read this article: [https://cloud.google.com/blog/products/data-analytics/ethereum-bigquery-how-we-built-dataset](https://cloud.google.com/blog/products/data-analytics/ethereum-bigquery-how-we-built-dataset)

## Setting up Airflow DAGs using Google Cloud Composer
## Support

### Create BigQuery Datasets
The repo itself can be deployed on any compute cloud platform.
Certain DAGs are available only for GCP.

- Sign in to BigQuery https://bigquery.cloud.google.com/
- Create new datasets called `crypto_ethereum`, `crypto_ethereum_raw`, `crypto_ethereum_temp`

### Create Google Cloud Storage bucket
| Feature | GCP | AWS |
|---------|-------|-------|
| Can be deployed? | Yes | Yes |
| ethereum_amend_dag | Yes | No |
| ethereum_clean_dag | Yes ? | Yes ? |
| ethereum_export_dag | Yes (Provider → BigQuery)| Yes (Provider → S3) |
| ethereum_load_dag | Yes (BigQuery → Storage) | Yes (S3 → RedShift) |
| ethereum_parse_dag | Yes | No |
| ethereum_partition_dag | Yes | No |
| ethereum_sessions_dag | Yes | No |
------------------------------------

- Create a new Google Storage bucket to store exported files https://console.cloud.google.com/storage/browser

### Create Google Cloud Composer environment
# Setting up Airflow DAGs

## Google Cloud Composer

Assumes that you have `gcloud` installed and configured. If not, [install Google Cloud CLI](https://cloud.google.com/sdk/docs/install-sdk)


Create a new Cloud Composer environment:

```bash
export ENVIRONMENT_NAME=ethereum-etl-0
gcloud composer environments create $ENVIRONMENT_NAME --location=us-central1 --zone=us-central1-a \
--disk-size=50GB --machine-type=n1-standard-2 --node-count=3 --python-version=3 --image-version=composer-1.17.6-airflow-1.10.15 \
--network=default --subnetwork=default
export NODE_TYPE=n1-standard-2
export ZONE=us-central1-a
gcloud composer environments create $ENVIRONMENT_NAME \
--location=us-central1 \
--zone=$ZONE \
--disk-size=50GB \
--machine-type=$NODE_TYPE \
--node-count=3 \
--python-version=3 \
--image-version=composer-1.17.6-airflow-1.10.15 \
--network=default \
--subnetwork=default

gcloud composer environments update $ENVIRONMENT_NAME \
--location=us-central1 \
--update-pypi-package=ethereum-etl==1.7.2
```

gcloud composer environments update $ENVIRONMENT_NAME --location=us-central1 --update-pypi-package=ethereum-etl==1.7.2
### Upload DAGs

```bash
> ./upload_dags.sh <airflow_bucket>
```

Create variables in Airflow (**Admin > Variables** in the UI):
## AWS EKS

| Variable | Description |
|-----------------------------------------|-----------------------------------------|
| ethereum_output_bucket | GCS bucket to store exported files |
| ethereum_provider_uris | Comma separated URIs of Ethereum nodes |
| ethereum_destination_dataset_project_id | Project ID of BigQuery datasets |
| notification_emails | email for notifications |
Assumes you have docker, kubectl, helm, eksctl, aws cli installed and configured.
The ECR Repository is created if it does not exists.

Check other variables in `dags/ethereumetl_airflow/variables.py`.
Airflow comes with its own Postgres container as well. For most purposes,
an external PG connection is recommended.
You should set it up, as it will be required.

### Upload DAGs

```bash
> ./upload_dags.sh <airflow_bucket>
export ENVIRONMENT_NAME=ethereum-etl-0
export NODE_TYPE=m5.large
export ZONE=ap-south-1
eksctl create cluster \
--name $ENVIRONMENT_NAME \
--region $ZONE
eksctl create nodegroup \
--cluster $ENVIRONMENT_NAME \
--region $ZONE \
--name $ENVIRONMENT_NAME-nodegroup \
--node-type $NODE_TYPE \
--nodes 3 \
--nodes-min 2 \
--nodes-max 10
./deploy-airflow.sh \
-n $ENVIRONMENT_NAME \
--pg-url USER:PASSWORD@HOST:PORT/DB \
--ecs-host 289344454031.dkr.ecr.ap-south-1.amazonaws.com \
--image-name ethetl \
--image-tag latest \
--build-image \
--fernet-key 15NrZQ5lfysmX9HggBJgl8qlFVrsTys8-XJcK_qN4hQ=
```

### Running Tests
You might also want to change the policy of storage for airflow-worker-logs to be retained when you redeploy. In order to to this, follow the [retain volume steps](https://kubernetes.io/docs/tasks/administer-cluster/change-pv-reclaim-policy/).

To enable port forwarding from this app so you can access Airflow UI, run

`kubectl port-forward svc/airflow-webserver 8080:8080 --namespace $ENVIRONMENT_NAME`.

You can now login to [http://localhost:8080](http://localhost:8080/).


# Creating variables

Create variables by following steps on [variables](docs/Variables.md) and importing them to Airflow UI.

# Creating Connections

You will need to [create connections in Airflow UI](https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html#creating-a-connection-with-the-ui) for connecting to the cloud
provider of your choice.

## GCP

1. If you want to use GCP for storage and processing of the data, then
create a service account for your GCP app.
[Follow the instructions](https://cloud.google.com/docs/authentication/production).
Store the json file somewhere secure. You will need it's content for next step.
2. You'll need to create the following connection IDs.
[Refer here for details on specifics](https://airflow.apache.org/docs/apache-airflow/1.10.13/howto/connection/gcp.html). Copy the content of above JSON file in the field `Keyfile JSON`.

- `google_cloud_default`
- `bigquery_default`

# Starting DAGs

Once in the Airflow UI, make sure to start the following DAGs

- airflow_monitoringq
- ethereum_amend_dag
- ethereum_clean_dag
- ethereum_export_dag
- ethereum_load_dag
- ethereum_partition_dag
- ethereum_sessions_dag
- ethereum_verify_streaming_dag

There are 120+ other DAGs that parse contract specific logic. You can optionally chose to start some or all or none of them.

# Running Tests

```bash
pip install -r requirements.txt
export PYTHONPATH='dags'
pytest -vv -s
```

### Creating Table Definition Files for Parsing Events and Function Calls
# Creating Table Definition Files for Parsing Events and Function Calls

Read this article: https://medium.com/@medvedev1088/query-ens-and-0x-events-with-sql-in-google-bigquery-4d197206e644
Read this article: [https://medium.com/@medvedev1088/query-ens-and-0x-events-with-sql-in-google-bigquery-4d197206e644](https://medium.com/@medvedev1088/query-ens-and-0x-events-with-sql-in-google-bigquery-4d197206e644)

### More Information
# More Information

You can follow the instructions here for Polygon DAGs https://github.com/blockchain-etl/polygon-etl. The architecture
You can follow the instructions here for Polygon DAGs [https://github.com/blockchain-etl/polygon-etl](https://github.com/blockchain-etl/polygon-etl). The architecture
there is very similar to Ethereum so in most case substituting `polygon` for `ethereum` will work. Contributions
to this README file for porting documentation from Polygon to Ethereum are welcome.
2 changes: 1 addition & 1 deletion dags/ethereumetl_airflow/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
def read_export_dag_vars(var_prefix, **kwargs):
export_start_date = read_var('export_start_date', var_prefix, True, **kwargs)
export_start_date = datetime.strptime(export_start_date, '%Y-%m-%d')

provider_uris = read_var('provider_uris', var_prefix, True, **kwargs)
provider_uris = [uri.strip() for uri in provider_uris.split(',')]

Expand Down
114 changes: 114 additions & 0 deletions deploy-airflow.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#!/bin/bash
while [[ $# -gt 0 ]]; do
case $1 in
-n|--namespace)
NAMESPACE="$2"
shift # past argument
shift # past value
;;
-p|--pg-url)
PG_URL="$2"
shift # past argument
shift # past value
;;
-f|--fernet-key)
FERNET_KEY="$2"
shift # past argument
shift # past value
;;
-e|--ecs-host)
ECS_HOST="$2"
shift # past argument
shift # past value
;;
--image-name)
IMAGE_NAME="$2"
shift # past argument
shift # past value
;;
--image-tag)
IMAGE_TAG="$2"
shift # past argument
shift # past value
;;
--build-image)
BUILD_IMAGE=true
shift # past argument
;;
-*|--*)
echo "Unknown option $1"
exit 1
;;
esac
done

# Check all required arguments
if [[ -z "$NAMESPACE" || -z "$PG_URL" || -z "$ECS_HOST" || -z "$IMAGE_NAME" || -z "$IMAGE_TAG" || -z "$FERNET_KEY" ]];
then
echo "You missed some required argument."
exit 1
fi

# Prepare some arguments
PROJECT_DIR=$(cd $(dirname $0);pwd)
TEMP_DIR="$PROJECT_DIR"/.tmp
HELM_VALUE_YAML="$TEMP_DIR"/value.yaml
IMAGE_REPOSITORY="$ECS_HOST/$IMAGE_NAME"

if [ ! -z $BUILD_IMAGE ]
then
# Create Repo in ECR
aws ecr create-repository --repository-name "$IMAGE_REPOSITORY"

# Login to ECR
aws ecr get-login-password --region ap-south-1 | docker login --username AWS --password-stdin "$ECS_HOST"

# Build and push the image
docker buildx build \
--platform linux/amd64 \
--file Dockerfile \
--no-cache \
--load \
-t "$IMAGE_REPOSITORY:$IMAGE_TAG" .

# Check if build was success
if [ $? -ne 0 ]
then
echo "Docker Build failed. Not proceeding."
exit 1
fi

docker push "$IMAGE_REPOSITORY:$IMAGE_TAG"

# Check if push was success
if [ $? -ne 0 ]
then
echo "Docker Push failed. Not proceeding."
exit 1
fi

fi

# Create temp folder and write helm values yaml to it.
mkdir -p -- "$TEMP_DIR"

# shellcheck disable=SC2002
cat "$PROJECT_DIR"/helm-values.yaml | \
sed "s={{IMAGE_REPOSITORY}}=$IMAGE_REPOSITORY=" | \
sed "s={{IMAGE_TAG}}=$IMAGE_TAG=" | \
sed "s/{{FERNET_KEY}}/$FERNET_KEY/" > "$HELM_VALUE_YAML"

# Recreate namespace and install all resources.
kubectl delete namespace "$NAMESPACE"
kubectl create namespace "$NAMESPACE"

kubectl create secret generic airflow-database --from-literal=connection=postgresql+psycopg2://"$PG_URL" -n "$NAMESPACE"

kubectl create secret generic airflow-result-database --from-literal=connection=db+postgresql://"$PG_URL" -n "$NAMESPACE"

kubectl create secret generic airflow-webserver-secret --from-literal="webserver-secret-key=$(python3 -c 'import secrets; print(secrets.token_hex(16))')" -n "$NAMESPACE"

helm upgrade --install airflow apache-airflow/airflow --namespace "$NAMESPACE" --create-namespace -f "$HELM_VALUE_YAML" --debug

# Clean up temp folder
rm -rf "$TEMP_DIR"
Loading