From f9fc1e20236e4064b287b63808cd40fe42f215ee Mon Sep 17 00:00:00 2001 From: coutug Date: Tue, 20 Aug 2024 08:56:23 -0400 Subject: [PATCH] update backfill-sink to use argo workflows --- .../substreams-sink-sql-backfill/Chart.yaml | 2 +- .../templates/job.yaml | 146 --------------- .../templates/resources.yaml | 26 +++ .../templates/workflow.yaml | 169 ++++++++++++++++++ .../substreams-sink-sql-backfill/values.yaml | 35 +++- 5 files changed, 223 insertions(+), 155 deletions(-) delete mode 100644 charts/substreams-sink-sql-backfill/templates/job.yaml create mode 100644 charts/substreams-sink-sql-backfill/templates/resources.yaml create mode 100644 charts/substreams-sink-sql-backfill/templates/workflow.yaml diff --git a/charts/substreams-sink-sql-backfill/Chart.yaml b/charts/substreams-sink-sql-backfill/Chart.yaml index 378bb56..8e35143 100644 --- a/charts/substreams-sink-sql-backfill/Chart.yaml +++ b/charts/substreams-sink-sql-backfill/Chart.yaml @@ -21,7 +21,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.0.13 +version: 0.1.0 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/charts/substreams-sink-sql-backfill/templates/job.yaml b/charts/substreams-sink-sql-backfill/templates/job.yaml deleted file mode 100644 index 85a14be..0000000 --- a/charts/substreams-sink-sql-backfill/templates/job.yaml +++ /dev/null @@ -1,146 +0,0 @@ -{{- $totalBlocks := sub .Values.endBlock .Values.startBlock }} -{{- $currentBlock := .Values.startBlock }} -{{- $blockLeftover := mod $totalBlocks .Values.jobAmount }} -{{- $blockIncrement := div (sub $totalBlocks $blockLeftover) .Values.jobAmount }} -{{- $values := .Values }} - -{{- range $i := until ($values.jobAmount | int) }} -{{- if ne $i 0 }} -{{ $currentBlock = add $currentBlock $blockIncrement }} -{{- end }} -{{- if eq $i (sub $values.jobAmount 1) }} -{{ $blockIncrement = add $blockIncrement $blockLeftover }} -{{- end }} ---- -apiVersion: batch/v1 -kind: Job -metadata: - name: {{ include "substreams-sink-sql-backfill.fullname" $ }}{{ $i }}-{{ $currentBlock }} - namespace: {{ $.Release.Namespace }} - labels: - {{- include "substreams-sink-sql-backfill.labels" $ | nindent 4 }} -spec: - ttlSecondsAfterFinished: 60 - template: - metadata: - labels: - {{- include "substreams-sink-sql-backfill.labels" $ | nindent 8 }} - spec: - {{- if $values.nodeAffinity }} - affinity: - nodeAffinity: - requiredDuringSchedulingIgnoredDuringExecution: - nodeSelectorTerms: - {{- $values.nodeAffinity | toYaml | nindent 12}} - {{- end }} - containers: - # Container to create cursor tables - - name: {{ include "substreams-sink-sql-backfill.fullname" $ }}-create-tables - image: curlimages/curl:latest - imagePullPolicy: Always - command: ["/bin/sh", "-c"] - args: - - | - set -e - - cat < /tmp/create-tables.sql - CREATE TABLE IF NOT EXISTS {{ $values.database }}.{{ $values.tablePrefix }}_{{ $currentBlock }} - ( - id String, - cursor String, - block_num Int64, - block_id String - ) - ENGINE = ReplacingMergeTree() - PRIMARY KEY (id) - ORDER BY (id); - EOF - - curl -X POST "http://{{ $values.host }}:8123/?user=$(USERNAME)&password=$(PASSWORD)" --data-binary @/tmp/create-tables.sql - - touch /tmp/create-tables - env: - - name: USERNAME - valueFrom: - secretKeyRef: - name: {{ $values.clickhouseAuth.secretName }} - key: {{ $values.clickhouseAuth.keyUsername }} - - name: PASSWORD - valueFrom: - secretKeyRef: - name: {{ $values.clickhouseAuth.secretName }} - key: {{ $values.clickhouseAuth.keyPassword }} - volumeMounts: - - name: jobs-sync - mountPath: /tmp - - # Container to backfill data - - name: {{ include "substreams-sink-sql-backfill.fullname" $ }}-backfill - image: "{{ $values.image.repository }}:{{ $values.image.tag }}" - imagePullPolicy: Always - command: ["/bin/sh", "-c"] - args: - - | - set -e - - while [ ! -f /tmp/create-tables ]; do sleep 1; done; - - ./substreams-sink-sql run clickhouse://$(USERNAME):$(PASSWORD)@{{ $values.host }}:9000/{{ $values.database }} {{ $values.manifest }} {{ $currentBlock }}:{{ add $currentBlock $blockIncrement }} - - touch /tmp/run-backfill - env: - {{- range $key, $val := $values.env }} - - name: {{ $key | upper }} - value: {{ $val | quote }} - {{- end }} - - name: SINK_SQL_RUN_CURSORS_TABLE - value: {{ $values.tablePrefix }}_{{ $currentBlock }} - - name: USERNAME - valueFrom: - secretKeyRef: - name: {{ $values.clickhouseAuth.secretName }} - key: {{ $values.clickhouseAuth.keyUsername }} - - name: PASSWORD - valueFrom: - secretKeyRef: - name: {{ $values.clickhouseAuth.secretName }} - key: {{ $values.clickhouseAuth.keyPassword }} - volumeMounts: - - name: jobs-sync - mountPath: /tmp - - # Container to drop cursor tables - - name: {{ include "substreams-sink-sql-backfill.fullname" $ }}-drop-tables - image: curlimages/curl:latest - imagePullPolicy: Always - command: ["/bin/sh", "-c"] - args: - - | - set -e - - while [ ! -f /tmp/run-backfill ]; do sleep 1; done; - - echo "DROP TABLE {{ $values.database }}.{{ $values.tablePrefix }}_{{ $currentBlock }}" > /tmp/drop-tables.sql - - curl -X POST "http://{{ $values.host }}:8123/?user=$(USERNAME)&password=$(PASSWORD)" --data-binary @/tmp/drop-tables.sql - env: - - name: USERNAME - valueFrom: - secretKeyRef: - name: {{ $values.clickhouseAuth.secretName }} - key: {{ $values.clickhouseAuth.keyUsername }} - - name: PASSWORD - valueFrom: - secretKeyRef: - name: {{ $values.clickhouseAuth.secretName }} - key: {{ $values.clickhouseAuth.keyPassword }} - volumeMounts: - - name: jobs-sync - mountPath: /tmp - restartPolicy: OnFailure - imagePullSecrets: - - name: {{ $values.imagePullSecret }} - volumes: - - name: jobs-sync - emptyDir: {} -{{- end }} \ No newline at end of file diff --git a/charts/substreams-sink-sql-backfill/templates/resources.yaml b/charts/substreams-sink-sql-backfill/templates/resources.yaml new file mode 100644 index 0000000..df1aa01 --- /dev/null +++ b/charts/substreams-sink-sql-backfill/templates/resources.yaml @@ -0,0 +1,26 @@ +apiVersion: v1 +kind: ResourceQuota +metadata: + name: {{ .Values.namespace.name }}-quota + namespace: {{ .Values.namespace.name }} +spec: + hard: + requests.cpu: {{ .Values.namespace.cpuLimit | quote }} + requests.memory: {{ .Values.namespace.memoryLimit | quote }} + limits.cpu: {{ .Values.namespace.cpuLimit | quote }} + limits.memory: {{ .Values.namespace.memoryLimit | quote }} +--- +apiVersion: v1 +kind: LimitRange +metadata: + name: {{ .Values.namespace.name }}-limit-range + namespace: {{ .Values.namespace.name }} +spec: + limits: + - default: + cpu: 50m + memory: 64Mi + defaultRequest: + cpu: 40m + memory: 64Mi + type: Container \ No newline at end of file diff --git a/charts/substreams-sink-sql-backfill/templates/workflow.yaml b/charts/substreams-sink-sql-backfill/templates/workflow.yaml new file mode 100644 index 0000000..5021c42 --- /dev/null +++ b/charts/substreams-sink-sql-backfill/templates/workflow.yaml @@ -0,0 +1,169 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: {{ .Release.Name }} + generateName: backfill- + namespace: {{ .Values.namespace.name }} +spec: + serviceAccountName: argo-workflow + {{- if .Values.ttlStrategy }} + ttlStrategy: + {{- .Values.ttlStrategy | toYaml | nindent 4}} + {{- end }} + {{- if .Values.nodeAffinity }} + podSpecPatch: | + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + {{- .Values.nodeAffinity | toYaml | nindent 12}} + {{- end }} + {{- if .Values.imagePullSecret }} + imagePullSecrets: + - name: {{ .Values.imagePullSecret }} + {{- end }} + entrypoint: main + templates: + - name: main + retryStrategy: + retryPolicy: "Always" + limit: "3" + backoff: + duration: "10" + factor: "2" + maxDuration: "1h" + steps: + {{- range $rangeIndex, $range := .Values.blockRanges }} + {{- if eq $rangeIndex 0 }} + - - name: full-run-{{ $rangeIndex }} + {{- else }} + - name: full-run-{{ $rangeIndex }} + {{- end }} + template: full-run + arguments: + parameters: + - name: start + value: {{ "{{item.start}}" | quote }} + - name: stop + value: {{ "{{item.stop}}" | quote }} + withItems: + {{- range $currentBlock := untilStep (index $range.range 0 | int) (index $range.range 1 | int) ($range.blocksPerJob | int) }} + {{- if lt (add $currentBlock $range.blocksPerJob | int) (index $range.range 1 | int) }} + - { start: {{ $currentBlock | quote }}, stop: {{ add $currentBlock $range.blocksPerJob | quote }} } + {{- else }} + - { start: {{ $currentBlock | quote }}, stop: {{ (index $range.range 1 | int) | quote }} } + {{- end }} + {{- end }} + {{- end }} + - name: full-run + inputs: + parameters: + - name: start + - name: stop + steps: + - - name: create-tables + template: ch-job + arguments: + parameters: + - name: start + value: {{ "{{inputs.parameters.start}}" | quote }} + - name: script + value: | + set -e + + cat < /tmp/create-tables.sql + CREATE TABLE IF NOT EXISTS {{ .Values.database }}.{{ .Values.tablePrefix }}_{{ "{{inputs.parameters.start}}" | quote }} + ( + id String, + cursor String, + block_num Int64, + block_id String + ) + ENGINE = ReplacingMergeTree() + PRIMARY KEY (id) + ORDER BY (id); + EOF + + exit 1 + + curl -X POST "http://{{ .Values.host }}:8123/?user=$(USERNAME)\&password=$(PASSWORD)" --data-binary @/tmp/create-tables.sql + - - name: sink-backfill + template: sink-backfill-job + arguments: + parameters: + - name: start + value: {{ "{{inputs.parameters.start}}" | quote }} + - name: stop + value: {{ "{{inputs.parameters.stop}}" | quote }} + - - name: drop-tables + template: ch-job + arguments: + parameters: + - name: start + value: {{ "{{inputs.parameters.start}}" | quote }} + - name: script + value: | + set -e + + echo 'DROP TABLE {{ .Values.database }}.{{ .Values.tablePrefix }}_{{ "{{inputs.parameters.start}}" | quote }}' > /tmp/drop-tables.sql + + curl -X POST "http://{{ .Values.host }}:8123/?user=$(USERNAME)\&password=$(PASSWORD)" --data-binary @/tmp/drop-tables.sql + + - name: ch-job + inputs: + parameters: + - name: start + - name: script + container: + image: curlimages/curl:8.9.1 + env: + - name: USERNAME + valueFrom: + secretKeyRef: + name: {{ .Values.clickhouseAuth.secretName }} + key: {{ .Values.clickhouseAuth.keyUsername }} + - name: PASSWORD + valueFrom: + secretKeyRef: + name: {{ .Values.clickhouseAuth.secretName }} + key: {{ .Values.clickhouseAuth.keyPassword }} + command: ["/bin/sh", "-c"] + args: + - | + {{ "{{inputs.parameters.script}}" | quote }} + + - name: sink-backfill-job + inputs: + parameters: + - name: start + - name: stop + podSpecPatch: | + containers: + - name: main + resources: + limits: + cpu: {{ .Values.container.cpuLimit | quote }} + memory: {{ .Values.container.memoryLimit | quote }} + container: + image: {{ .Values.image.repository }}:{{ .Values.image.tag }} + env: + {{- range $key, $val := .Values.env }} + - name: {{ $key | upper }} + value: {{ $val | quote }} + {{- end }} + - name: SINK_SQL_RUN_CURSORS_TABLE + value: {{ .Values.tablePrefix }}_{{ "{{inputs.parameters.start}}" | quote }} + - name: USERNAME + valueFrom: + secretKeyRef: + name: {{ .Values.clickhouseAuth.secretName }} + key: {{ .Values.clickhouseAuth.keyUsername }} + - name: PASSWORD + valueFrom: + secretKeyRef: + name: {{ .Values.clickhouseAuth.secretName }} + key: {{ .Values.clickhouseAuth.keyPassword }} + command: ["/bin/sh", "-c"] + args: + - | + ./substreams-sink-sql run clickhouse://$(USERNAME):$(PASSWORD)@{{ .Values.host }}:9000/{{ .Values.database }} {{ .Values.manifest }} {{ "{{inputs.parameters.start}}" | quote }}:{{ "{{inputs.parameters.stop}}" | quote }} diff --git a/charts/substreams-sink-sql-backfill/values.yaml b/charts/substreams-sink-sql-backfill/values.yaml index 660b5ce..3a39587 100644 --- a/charts/substreams-sink-sql-backfill/values.yaml +++ b/charts/substreams-sink-sql-backfill/values.yaml @@ -1,6 +1,10 @@ +# Before running the workload, you should have +# - an existing namespace with the argo workflow serviceAccount +# - a secret with the credentials for the clickhouse database + image: repository: ghcr.io/pinax-network/substreams-sink-sql - tag: f9081de + tag: develop pullPolicy: IfNotPresent nameOverride: "" @@ -11,19 +15,34 @@ fullnameOverride: "" # - key: kubernetes.io/hostname # operator: NotIn # In or NotIn # values: -# - dom85 +# - domX # Secret to pull image from private repo imagePullSecret: ghcr-cred -# Number of job to parallelize the filling process -jobAmount: 1 -# Start and end block that will be split between all jobs -startBlock: 0 -endBlock: -1 +# Make sure to use the service account that match with argo workflow server +serviceAccount: argo-workflow + +# If the namespace exist, make sure the serviceAccount mentionned previously is in that namespace +# If the namespace does not existe, create it and then add it to `workflowNamespaces` in argo-workflows helm values. This will add the serviceAccount to the new namespace. +# Resources limits for the namespace and the containers running the backfill jobs (both must be specified) +namespace: + name: backfill-workflow + cpuLimit: 4 + memoryLimit: 6Gi +container: + cpuLimit: 100m + memoryLimit: 256Mi + +# Block ranges to run. Will run as many jobs as it can based on resources available. Once the resource limit is hit, the jobs left will be put in queue +blockRanges: + # range of blocks + - range: [0, 10] + # blocks per job to run for that range + blocksPerJob: 5 + # spkg file to use manifest: myFile.spkg - # Clickhouse host name host: myClickhouseDB # Clickhouse database to use