From 378d3dde818a0523b578cd2aabe1f51f494d62d3 Mon Sep 17 00:00:00 2001 From: Vincenzo Date: Mon, 19 Aug 2024 11:17:55 +0200 Subject: [PATCH] feat: add cosmosdb --- .localci/lint/golangci.yml | 36 +++++++ .pre-commit-config.yaml | 23 ++++ go.mod | 2 + go.sum | 4 + pkg/cmd/producerList.go | 3 +- pkg/cmd/templateRun.go | 5 +- pkg/configuration/configuration.go | 5 +- pkg/emitter/emitter.go | 14 ++- pkg/producers/azcosmosdb/config.go | 28 +++++ pkg/producers/azcosmosdb/config.json.example | 7 ++ pkg/producers/azcosmosdb/producer.go | 106 +++++++++++++++++++ 11 files changed, 228 insertions(+), 5 deletions(-) create mode 100644 .localci/lint/golangci.yml create mode 100644 .pre-commit-config.yaml create mode 100644 pkg/producers/azcosmosdb/config.go create mode 100644 pkg/producers/azcosmosdb/config.json.example create mode 100644 pkg/producers/azcosmosdb/producer.go diff --git a/.localci/lint/golangci.yml b/.localci/lint/golangci.yml new file mode 100644 index 00000000..a2076b9a --- /dev/null +++ b/.localci/lint/golangci.yml @@ -0,0 +1,36 @@ +run: + # The default concurrency value is the number of available CPU. + # concurrency: 2 + tests: true + timeout: 20m + allow-parallel-runners: true + +linters: + enable: + - errcheck + - typecheck + - asciicheck + - bidichk + - bodyclose + - dupl + - gocritic + #- gofmt + - gosimple + - gosec + - govet + - goconst + - ineffassign + - nakedret + - revive + - staticcheck + - unused + fast: false + +linters-settings: + dupl: + threshold: 400 + +output: + formats: + - format: html + path: golangci.html diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 00000000..5e04772a --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,23 @@ +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.4.0 + hooks: + - id: trailing-whitespace + args: [--markdown-linebreak-ext=md] + - id: end-of-file-fixer + - id: fix-byte-order-marker + - id: mixed-line-ending + - id: check-merge-conflict + - id: check-case-conflict + - id: check-docstring-first +- repo: https://github.com/Lucas-C/pre-commit-hooks + rev: v1.3.1 + hooks: + - id: remove-crlf +- repo: https://github.com/golangci/golangci-lint + rev: v1.54.2 + hooks: + - id: golangci-lint + args: + - --config + - .localci/lint/golangci.yml diff --git a/go.mod b/go.mod index 891a431b..a73ceb5a 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.22.4 require ( cloud.google.com/go/storage v1.43.0 + github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v1.0.3 github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0 github.com/actgardner/gogen-avro/v10 v10.2.1 github.com/adrg/xdg v0.5.0 @@ -32,6 +33,7 @@ require ( cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect cloud.google.com/go/compute/metadata v0.3.0 // indirect cloud.google.com/go/iam v1.1.8 // indirect + github.com/Azure/azure-sdk-for-go v68.0.0+incompatible // indirect github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect diff --git a/go.sum b/go.sum index d38d0186..39c3ebed 100644 --- a/go.sum +++ b/go.sum @@ -19,10 +19,14 @@ github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9 github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= github.com/AlecAivazis/survey/v2 v2.3.7 h1:6I/u8FvytdGsgonrYsVn2t8t4QiRnh6QSTqkkhIiSjQ= github.com/AlecAivazis/survey/v2 v2.3.7/go.mod h1:xUTIdE4KCOIjsBAE1JYsUPoCqYdZ1reCfTwbto0Fduo= +github.com/Azure/azure-sdk-for-go v68.0.0+incompatible h1:fcYLmCpyNYRnvJbPerq7U0hS+6+I79yEDJBqVNcqUzU= +github.com/Azure/azure-sdk-for-go v68.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0 h1:GJHeeA2N7xrG3q30L2UXDyuWRzDM900/65j70wcM4Ww= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0/go.mod h1:l38EPgmsp71HHLq9j7De57JcKOWPyhrsW1Awm1JS6K0= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 h1:tfLQ34V6F7tVSwoTf/4lH5sE0o6eCJuNDTmH09nDpbc= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0/go.mod h1:9kIvujWAA58nmPmWB1m23fyWic1kYZMxD9CxaWn4Qpg= +github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v1.0.3 h1:gBWC0dYF3aO+7xGxL0Ccjv9BmnV30C8VZIrUPlMct6g= +github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v1.0.3/go.mod h1:7LBWaO4KRASAo9VpfhpxQKkdY6PBwkv9UDKzL9Sajuw= github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY= github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0 h1:PiSrjRPpkQNjrM8H0WwKMnZUdu1RGMtd/LdGKUrOo+c= diff --git a/pkg/cmd/producerList.go b/pkg/cmd/producerList.go index c295d7d3..919a7058 100644 --- a/pkg/cmd/producerList.go +++ b/pkg/cmd/producerList.go @@ -55,7 +55,8 @@ var producerListCmd = &cobra.Command{ fmt.Printf("%sS3%s (--output = s3)\n", Green, Reset) fmt.Printf("%sGCS%s (--output = gcs)\n", Green, Reset) fmt.Printf("%sAZBlobStorage%s (--output = azblobstorage)\n", Green, Reset) - fmt.Printf("%sCassandra%s (--output = cassandra)\n", Green, Reset) + fmt.Printf("%sAZCosmosDB%s (--output = azcosmosdb)\n", Green, Reset) + fmt.Printf("%sCassandra%s (--output = cassandra)\n", Green, Reset) fmt.Println() }, diff --git a/pkg/cmd/templateRun.go b/pkg/cmd/templateRun.go index 8a8215aa..6581b4a6 100644 --- a/pkg/cmd/templateRun.go +++ b/pkg/cmd/templateRun.go @@ -113,6 +113,8 @@ jr template run --template "{{name}}" configuration.GlobalCfg.GCSConfig, _ = cmd.Flags().GetString(f.Name) case "azBlobStorageConfig": configuration.GlobalCfg.AzBlobStorageConfig, _ = cmd.Flags().GetString(f.Name) + case "azCosmosDBConfig": + configuration.GlobalCfg.AzCosmosDBConfig, _ = cmd.Flags().GetString(f.Name) case "httpConfig": configuration.GlobalCfg.HTTPConfig, _ = cmd.Flags().GetString(f.Name) case "cassandraConfig": @@ -181,6 +183,7 @@ func init() { templateRunCmd.Flags().String("s3Config", "", "Amazon S3 configuration") templateRunCmd.Flags().String("gcsConfig", "", "Google GCS configuration") templateRunCmd.Flags().String("azBlobStorageConfig", "", "Azure Blob storage configuration") - templateRunCmd.Flags().String("cassandraConfig", "", "Cassandra configuration") + templateRunCmd.Flags().String("azCosmosDBConfig", "", "Azure CosmosDB configuration") + templateRunCmd.Flags().String("cassandraConfig", "", "Cassandra configuration") } diff --git a/pkg/configuration/configuration.go b/pkg/configuration/configuration.go index 1a483e0b..7cbba91f 100644 --- a/pkg/configuration/configuration.go +++ b/pkg/configuration/configuration.go @@ -35,12 +35,13 @@ type GlobalConfiguration struct { RedisConfig string MongoConfig string AzBlobStorageConfig string + AzCosmosDBConfig string ElasticConfig string S3Config string GCSConfig string HTTPConfig string - CassandraConfig string + CassandraConfig string Url string EmbeddedTemplate bool FileNameTemplate bool -} \ No newline at end of file +} diff --git a/pkg/emitter/emitter.go b/pkg/emitter/emitter.go index 65809a55..ada9bf13 100644 --- a/pkg/emitter/emitter.go +++ b/pkg/emitter/emitter.go @@ -30,8 +30,9 @@ import ( "github.com/ugol/jr/pkg/constants" "github.com/ugol/jr/pkg/ctx" "github.com/ugol/jr/pkg/functions" - "github.com/ugol/jr/pkg/producers/cassandra" "github.com/ugol/jr/pkg/producers/azblobstorage" + "github.com/ugol/jr/pkg/producers/azcosmosdb" + "github.com/ugol/jr/pkg/producers/cassandra" "github.com/ugol/jr/pkg/producers/console" "github.com/ugol/jr/pkg/producers/elastic" "github.com/ugol/jr/pkg/producers/gcs" @@ -136,6 +137,10 @@ func (e *Emitter) Initialize(conf configuration.GlobalConfiguration) { e.Producer = createAZBlobStorageProducer(conf.AzBlobStorageConfig) return } + if e.Output == "azcosmosdb" { + e.Producer = createAZCosmosDBProducer(conf.AzCosmosDBConfig) + return + } if e.Output == "json" { e.Producer = &server.JsonProducer{OutputTpl: &o} @@ -210,6 +215,13 @@ func createAZBlobStorageProducer(azConfig string) Producer { return producer } +func createAZCosmosDBProducer(azConfig string) Producer { + producer := &azcosmosdb.Producer{} + producer.Initialize(azConfig) + + return producer +} + func createGCSProducer(gcsConfig string) Producer { gProducer := &gcs.GCSProducer{} gProducer.Initialize(gcsConfig) diff --git a/pkg/producers/azcosmosdb/config.go b/pkg/producers/azcosmosdb/config.go new file mode 100644 index 00000000..e99001e6 --- /dev/null +++ b/pkg/producers/azcosmosdb/config.go @@ -0,0 +1,28 @@ +// Copyright © 2022 Vincenzo Marchese +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +package azcosmosdb + +type Config struct { + Endpoint string `json:"endpoint"` + PrimaryAccountKey string `json:"primary_account_key"` + Database string `json:"database"` + Container string `json:"container"` + PartitionKey string `json:"partition_key"` +} diff --git a/pkg/producers/azcosmosdb/config.json.example b/pkg/producers/azcosmosdb/config.json.example new file mode 100644 index 00000000..789598fa --- /dev/null +++ b/pkg/producers/azcosmosdb/config.json.example @@ -0,0 +1,7 @@ +{ + "endpoint": "https://.documents.azure.com:443/", + "primary_account_key": "", + "database": "", + "container":"", + "partition_key": "" +} diff --git a/pkg/producers/azcosmosdb/producer.go b/pkg/producers/azcosmosdb/producer.go new file mode 100644 index 00000000..d0d49267 --- /dev/null +++ b/pkg/producers/azcosmosdb/producer.go @@ -0,0 +1,106 @@ +// Copyright © 2022 Vincenzo Marchese +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +package azcosmosdb + +import ( + "context" + "encoding/json" + "os" + + "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos" + "github.com/rs/zerolog/log" +) + +type Producer struct { + configuration Config + client *azcosmos.Client +} + +func (p *Producer) Initialize(configFile string) { + cfgBytes, err := os.ReadFile(configFile) + if err != nil { + log.Fatal().Err(err).Msg("Failed to read config file") + } + + config := Config{} + if err := json.Unmarshal(cfgBytes, &config); err != nil { + log.Fatal().Err(err).Msg("Failed to unmarshal config") + } + + if config.Endpoint == "" { + log.Fatal().Msg("Endpoint is mandatory") + } + + if config.PrimaryAccountKey == "" { + log.Fatal().Msg("PrimaryAccountKey is mandatory") + } + + if config.PartitionKey == "" { + log.Fatal().Msg("PartitionKey is mandatory") + } + + cred, err := azcosmos.NewKeyCredential(config.PrimaryAccountKey) + if err != nil { + log.Fatal().Err(err).Msg("Failed to create key credential") + } + + client, err := azcosmos.NewClientWithKey(config.Endpoint, cred, nil) + if err != nil { + log.Fatal().Err(err).Msg("Failed to create azure cosmosdb client") + } + + p.configuration = config + p.client = client + +} + +func (p *Producer) Produce(_ []byte, v []byte, _ any) { + + // This is ugly but it works + var jsonMap map[string]interface{} + if err := json.Unmarshal(v, &jsonMap); err != nil { + log.Fatal().Err(err).Msg("Failed to unmarshal json") + } + + // getting partition key value + pkValue := jsonMap[p.configuration.PartitionKey] + if pkValue == nil { + log.Fatal().Str("partition_key", p.configuration.PartitionKey).Msg("Partition key not found in value") + } + log.Debug().Str("pkValue", pkValue.(string)).Msg("Partition key value") + + container, err := p.client.NewContainer(p.configuration.Database, p.configuration.Container) + if err != nil { + log.Fatal().Err(err).Msg("Failed to create container") + } + + pk := azcosmos.NewPartitionKeyString(pkValue.(string)) + resp, err := container.CreateItem(context.Background(), pk, v, nil) + if err != nil { + log.Fatal().Err(err).Msg("Failed to create item") + } + + log.Debug().Interface("resp", resp).Msg("Item created") + +} + +func (p *Producer) Close() error { + return nil +}