Skip to content

Commit

Permalink
Merge pull request #182 from ugol/178-feat-add-producer-for-azure-cos…
Browse files Browse the repository at this point in the history
…mosdb

feat: add cosmosdb
  • Loading branch information
vmarchese authored Aug 19, 2024
2 parents e1745a8 + 378d3dd commit 678f331
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 5 deletions.
36 changes: 36 additions & 0 deletions .localci/lint/golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
run:
# The default concurrency value is the number of available CPU.
# concurrency: 2
tests: true
timeout: 20m
allow-parallel-runners: true

linters:
enable:
- errcheck
- typecheck
- asciicheck
- bidichk
- bodyclose
- dupl
- gocritic
#- gofmt
- gosimple
- gosec
- govet
- goconst
- ineffassign
- nakedret
- revive
- staticcheck
- unused
fast: false

linters-settings:
dupl:
threshold: 400

output:
formats:
- format: html
path: golangci.html
23 changes: 23 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: trailing-whitespace
args: [--markdown-linebreak-ext=md]
- id: end-of-file-fixer
- id: fix-byte-order-marker
- id: mixed-line-ending
- id: check-merge-conflict
- id: check-case-conflict
- id: check-docstring-first
- repo: https://github.com/Lucas-C/pre-commit-hooks
rev: v1.3.1
hooks:
- id: remove-crlf
- repo: https://github.com/golangci/golangci-lint
rev: v1.54.2
hooks:
- id: golangci-lint
args:
- --config
- .localci/lint/golangci.yml
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.22.4

require (
cloud.google.com/go/storage v1.43.0
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v1.0.3
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0
github.com/actgardner/gogen-avro/v10 v10.2.1
github.com/adrg/xdg v0.5.0
Expand Down Expand Up @@ -32,6 +33,7 @@ require (
cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/iam v1.1.8 // indirect
github.com/Azure/azure-sdk-for-go v68.0.0+incompatible // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8=
github.com/AlecAivazis/survey/v2 v2.3.7 h1:6I/u8FvytdGsgonrYsVn2t8t4QiRnh6QSTqkkhIiSjQ=
github.com/AlecAivazis/survey/v2 v2.3.7/go.mod h1:xUTIdE4KCOIjsBAE1JYsUPoCqYdZ1reCfTwbto0Fduo=
github.com/Azure/azure-sdk-for-go v68.0.0+incompatible h1:fcYLmCpyNYRnvJbPerq7U0hS+6+I79yEDJBqVNcqUzU=
github.com/Azure/azure-sdk-for-go v68.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0 h1:GJHeeA2N7xrG3q30L2UXDyuWRzDM900/65j70wcM4Ww=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0/go.mod h1:l38EPgmsp71HHLq9j7De57JcKOWPyhrsW1Awm1JS6K0=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 h1:tfLQ34V6F7tVSwoTf/4lH5sE0o6eCJuNDTmH09nDpbc=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0/go.mod h1:9kIvujWAA58nmPmWB1m23fyWic1kYZMxD9CxaWn4Qpg=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v1.0.3 h1:gBWC0dYF3aO+7xGxL0Ccjv9BmnV30C8VZIrUPlMct6g=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v1.0.3/go.mod h1:7LBWaO4KRASAo9VpfhpxQKkdY6PBwkv9UDKzL9Sajuw=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0 h1:PiSrjRPpkQNjrM8H0WwKMnZUdu1RGMtd/LdGKUrOo+c=
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/producerList.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ var producerListCmd = &cobra.Command{
fmt.Printf("%sS3%s (--output = s3)\n", Green, Reset)
fmt.Printf("%sGCS%s (--output = gcs)\n", Green, Reset)
fmt.Printf("%sAZBlobStorage%s (--output = azblobstorage)\n", Green, Reset)
fmt.Printf("%sCassandra%s (--output = cassandra)\n", Green, Reset)
fmt.Printf("%sAZCosmosDB%s (--output = azcosmosdb)\n", Green, Reset)
fmt.Printf("%sCassandra%s (--output = cassandra)\n", Green, Reset)
fmt.Println()

},
Expand Down
5 changes: 4 additions & 1 deletion pkg/cmd/templateRun.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ jr template run --template "{{name}}"
configuration.GlobalCfg.GCSConfig, _ = cmd.Flags().GetString(f.Name)
case "azBlobStorageConfig":
configuration.GlobalCfg.AzBlobStorageConfig, _ = cmd.Flags().GetString(f.Name)
case "azCosmosDBConfig":
configuration.GlobalCfg.AzCosmosDBConfig, _ = cmd.Flags().GetString(f.Name)
case "httpConfig":
configuration.GlobalCfg.HTTPConfig, _ = cmd.Flags().GetString(f.Name)
case "cassandraConfig":
Expand Down Expand Up @@ -181,6 +183,7 @@ func init() {
templateRunCmd.Flags().String("s3Config", "", "Amazon S3 configuration")
templateRunCmd.Flags().String("gcsConfig", "", "Google GCS configuration")
templateRunCmd.Flags().String("azBlobStorageConfig", "", "Azure Blob storage configuration")
templateRunCmd.Flags().String("cassandraConfig", "", "Cassandra configuration")
templateRunCmd.Flags().String("azCosmosDBConfig", "", "Azure CosmosDB configuration")
templateRunCmd.Flags().String("cassandraConfig", "", "Cassandra configuration")

}
5 changes: 3 additions & 2 deletions pkg/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ type GlobalConfiguration struct {
RedisConfig string
MongoConfig string
AzBlobStorageConfig string
AzCosmosDBConfig string
ElasticConfig string
S3Config string
GCSConfig string
HTTPConfig string
CassandraConfig string
CassandraConfig string
Url string
EmbeddedTemplate bool
FileNameTemplate bool
}
}
14 changes: 13 additions & 1 deletion pkg/emitter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ import (
"github.com/ugol/jr/pkg/constants"
"github.com/ugol/jr/pkg/ctx"
"github.com/ugol/jr/pkg/functions"
"github.com/ugol/jr/pkg/producers/cassandra"
"github.com/ugol/jr/pkg/producers/azblobstorage"
"github.com/ugol/jr/pkg/producers/azcosmosdb"
"github.com/ugol/jr/pkg/producers/cassandra"
"github.com/ugol/jr/pkg/producers/console"
"github.com/ugol/jr/pkg/producers/elastic"
"github.com/ugol/jr/pkg/producers/gcs"
Expand Down Expand Up @@ -136,6 +137,10 @@ func (e *Emitter) Initialize(conf configuration.GlobalConfiguration) {
e.Producer = createAZBlobStorageProducer(conf.AzBlobStorageConfig)
return
}
if e.Output == "azcosmosdb" {
e.Producer = createAZCosmosDBProducer(conf.AzCosmosDBConfig)
return
}

if e.Output == "json" {
e.Producer = &server.JsonProducer{OutputTpl: &o}
Expand Down Expand Up @@ -210,6 +215,13 @@ func createAZBlobStorageProducer(azConfig string) Producer {
return producer
}

func createAZCosmosDBProducer(azConfig string) Producer {
producer := &azcosmosdb.Producer{}
producer.Initialize(azConfig)

return producer
}

func createGCSProducer(gcsConfig string) Producer {
gProducer := &gcs.GCSProducer{}
gProducer.Initialize(gcsConfig)
Expand Down
28 changes: 28 additions & 0 deletions pkg/producers/azcosmosdb/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright © 2022 Vincenzo Marchese <vincenzo.marchese@gmail.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package azcosmosdb

type Config struct {
Endpoint string `json:"endpoint"`
PrimaryAccountKey string `json:"primary_account_key"`
Database string `json:"database"`
Container string `json:"container"`
PartitionKey string `json:"partition_key"`
}
7 changes: 7 additions & 0 deletions pkg/producers/azcosmosdb/config.json.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"endpoint": "https://<account>.documents.azure.com:443/",
"primary_account_key": "<security primary access key>",
"database": "<database name>",
"container":"<container name>",
"partition_key": "<name of partition key field>"
}
106 changes: 106 additions & 0 deletions pkg/producers/azcosmosdb/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright © 2022 Vincenzo Marchese <vincenzo.marchese@gmail.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package azcosmosdb

import (
"context"
"encoding/json"
"os"

"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"
"github.com/rs/zerolog/log"
)

type Producer struct {
configuration Config
client *azcosmos.Client
}

func (p *Producer) Initialize(configFile string) {
cfgBytes, err := os.ReadFile(configFile)
if err != nil {
log.Fatal().Err(err).Msg("Failed to read config file")
}

config := Config{}
if err := json.Unmarshal(cfgBytes, &config); err != nil {
log.Fatal().Err(err).Msg("Failed to unmarshal config")
}

if config.Endpoint == "" {
log.Fatal().Msg("Endpoint is mandatory")
}

if config.PrimaryAccountKey == "" {
log.Fatal().Msg("PrimaryAccountKey is mandatory")
}

if config.PartitionKey == "" {
log.Fatal().Msg("PartitionKey is mandatory")
}

cred, err := azcosmos.NewKeyCredential(config.PrimaryAccountKey)
if err != nil {
log.Fatal().Err(err).Msg("Failed to create key credential")
}

client, err := azcosmos.NewClientWithKey(config.Endpoint, cred, nil)
if err != nil {
log.Fatal().Err(err).Msg("Failed to create azure cosmosdb client")
}

p.configuration = config
p.client = client

}

func (p *Producer) Produce(_ []byte, v []byte, _ any) {

// This is ugly but it works
var jsonMap map[string]interface{}
if err := json.Unmarshal(v, &jsonMap); err != nil {
log.Fatal().Err(err).Msg("Failed to unmarshal json")
}

// getting partition key value
pkValue := jsonMap[p.configuration.PartitionKey]
if pkValue == nil {
log.Fatal().Str("partition_key", p.configuration.PartitionKey).Msg("Partition key not found in value")
}
log.Debug().Str("pkValue", pkValue.(string)).Msg("Partition key value")

container, err := p.client.NewContainer(p.configuration.Database, p.configuration.Container)
if err != nil {
log.Fatal().Err(err).Msg("Failed to create container")
}

pk := azcosmos.NewPartitionKeyString(pkValue.(string))
resp, err := container.CreateItem(context.Background(), pk, v, nil)
if err != nil {
log.Fatal().Err(err).Msg("Failed to create item")
}

log.Debug().Interface("resp", resp).Msg("Item created")

}

func (p *Producer) Close() error {
return nil
}

0 comments on commit 678f331

Please sign in to comment.