Skip to content

Commit

Permalink
update backfill-sink to use argo workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
coutug committed Aug 20, 2024
1 parent ddeb480 commit f9fc1e2
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 155 deletions.
2 changes: 1 addition & 1 deletion charts/substreams-sink-sql-backfill/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.0.13
version: 0.1.0

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
Expand Down
146 changes: 0 additions & 146 deletions charts/substreams-sink-sql-backfill/templates/job.yaml

This file was deleted.

26 changes: 26 additions & 0 deletions charts/substreams-sink-sql-backfill/templates/resources.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
apiVersion: v1
kind: ResourceQuota
metadata:
name: {{ .Values.namespace.name }}-quota
namespace: {{ .Values.namespace.name }}
spec:
hard:
requests.cpu: {{ .Values.namespace.cpuLimit | quote }}
requests.memory: {{ .Values.namespace.memoryLimit | quote }}
limits.cpu: {{ .Values.namespace.cpuLimit | quote }}
limits.memory: {{ .Values.namespace.memoryLimit | quote }}
---
apiVersion: v1
kind: LimitRange
metadata:
name: {{ .Values.namespace.name }}-limit-range
namespace: {{ .Values.namespace.name }}
spec:
limits:
- default:
cpu: 50m
memory: 64Mi
defaultRequest:
cpu: 40m
memory: 64Mi
type: Container
169 changes: 169 additions & 0 deletions charts/substreams-sink-sql-backfill/templates/workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: {{ .Release.Name }}
generateName: backfill-
namespace: {{ .Values.namespace.name }}
spec:
serviceAccountName: argo-workflow
{{- if .Values.ttlStrategy }}
ttlStrategy:
{{- .Values.ttlStrategy | toYaml | nindent 4}}
{{- end }}
{{- if .Values.nodeAffinity }}
podSpecPatch: |
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
{{- .Values.nodeAffinity | toYaml | nindent 12}}
{{- end }}
{{- if .Values.imagePullSecret }}
imagePullSecrets:
- name: {{ .Values.imagePullSecret }}
{{- end }}
entrypoint: main
templates:
- name: main
retryStrategy:
retryPolicy: "Always"
limit: "3"
backoff:
duration: "10"
factor: "2"
maxDuration: "1h"
steps:
{{- range $rangeIndex, $range := .Values.blockRanges }}
{{- if eq $rangeIndex 0 }}
- - name: full-run-{{ $rangeIndex }}
{{- else }}
- name: full-run-{{ $rangeIndex }}
{{- end }}
template: full-run
arguments:
parameters:
- name: start
value: {{ "{{item.start}}" | quote }}
- name: stop
value: {{ "{{item.stop}}" | quote }}
withItems:
{{- range $currentBlock := untilStep (index $range.range 0 | int) (index $range.range 1 | int) ($range.blocksPerJob | int) }}
{{- if lt (add $currentBlock $range.blocksPerJob | int) (index $range.range 1 | int) }}
- { start: {{ $currentBlock | quote }}, stop: {{ add $currentBlock $range.blocksPerJob | quote }} }
{{- else }}
- { start: {{ $currentBlock | quote }}, stop: {{ (index $range.range 1 | int) | quote }} }
{{- end }}
{{- end }}
{{- end }}
- name: full-run
inputs:
parameters:
- name: start
- name: stop
steps:
- - name: create-tables
template: ch-job
arguments:
parameters:
- name: start
value: {{ "{{inputs.parameters.start}}" | quote }}
- name: script
value: |
set -e
cat <<EOF > /tmp/create-tables.sql
CREATE TABLE IF NOT EXISTS {{ .Values.database }}.{{ .Values.tablePrefix }}_{{ "{{inputs.parameters.start}}" | quote }}
(
id String,
cursor String,
block_num Int64,
block_id String
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY (id)
ORDER BY (id);
EOF
exit 1
curl -X POST "http://{{ .Values.host }}:8123/?user=$(USERNAME)\&password=$(PASSWORD)" --data-binary @/tmp/create-tables.sql
- - name: sink-backfill
template: sink-backfill-job
arguments:
parameters:
- name: start
value: {{ "{{inputs.parameters.start}}" | quote }}
- name: stop
value: {{ "{{inputs.parameters.stop}}" | quote }}
- - name: drop-tables
template: ch-job
arguments:
parameters:
- name: start
value: {{ "{{inputs.parameters.start}}" | quote }}
- name: script
value: |
set -e
echo 'DROP TABLE {{ .Values.database }}.{{ .Values.tablePrefix }}_{{ "{{inputs.parameters.start}}" | quote }}' > /tmp/drop-tables.sql
curl -X POST "http://{{ .Values.host }}:8123/?user=$(USERNAME)\&password=$(PASSWORD)" --data-binary @/tmp/drop-tables.sql
- name: ch-job
inputs:
parameters:
- name: start
- name: script
container:
image: curlimages/curl:8.9.1
env:
- name: USERNAME
valueFrom:
secretKeyRef:
name: {{ .Values.clickhouseAuth.secretName }}
key: {{ .Values.clickhouseAuth.keyUsername }}
- name: PASSWORD
valueFrom:
secretKeyRef:
name: {{ .Values.clickhouseAuth.secretName }}
key: {{ .Values.clickhouseAuth.keyPassword }}
command: ["/bin/sh", "-c"]
args:
- |
{{ "{{inputs.parameters.script}}" | quote }}
- name: sink-backfill-job
inputs:
parameters:
- name: start
- name: stop
podSpecPatch: |
containers:
- name: main
resources:
limits:
cpu: {{ .Values.container.cpuLimit | quote }}
memory: {{ .Values.container.memoryLimit | quote }}
container:
image: {{ .Values.image.repository }}:{{ .Values.image.tag }}
env:
{{- range $key, $val := .Values.env }}
- name: {{ $key | upper }}
value: {{ $val | quote }}
{{- end }}
- name: SINK_SQL_RUN_CURSORS_TABLE
value: {{ .Values.tablePrefix }}_{{ "{{inputs.parameters.start}}" | quote }}
- name: USERNAME
valueFrom:
secretKeyRef:
name: {{ .Values.clickhouseAuth.secretName }}
key: {{ .Values.clickhouseAuth.keyUsername }}
- name: PASSWORD
valueFrom:
secretKeyRef:
name: {{ .Values.clickhouseAuth.secretName }}
key: {{ .Values.clickhouseAuth.keyPassword }}
command: ["/bin/sh", "-c"]
args:
- |
./substreams-sink-sql run clickhouse://$(USERNAME):$(PASSWORD)@{{ .Values.host }}:9000/{{ .Values.database }} {{ .Values.manifest }} {{ "{{inputs.parameters.start}}" | quote }}:{{ "{{inputs.parameters.stop}}" | quote }}
Loading

0 comments on commit f9fc1e2

Please sign in to comment.