Skip to content

Commit

Permalink
feat: add AWS DynamoDB producer
Browse files Browse the repository at this point in the history
  • Loading branch information
Vincenzo authored and Vincenzo committed Aug 21, 2024
1 parent a9f14ca commit 5c22b9c
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 17 deletions.
16 changes: 10 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0
github.com/actgardner/gogen-avro/v10 v10.2.1
github.com/adrg/xdg v0.5.0
github.com/aws/aws-sdk-go-v2 v1.26.1
github.com/aws/aws-sdk-go v1.54.14
github.com/aws/aws-sdk-go-v2 v1.30.4
github.com/aws/aws-sdk-go-v2/config v1.27.10
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.14.11
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.34.5
github.com/aws/aws-sdk-go-v2/service/s3 v1.48.1
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0
github.com/elastic/go-elasticsearch/v8 v8.14.0
Expand Down Expand Up @@ -46,23 +49,24 @@ require (
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
github.com/VividCortex/ewma v1.1.1 // indirect
github.com/alessio/shellescape v1.4.1 // indirect
github.com/aws/aws-sdk-go v1.54.14 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.4 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.10 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.10 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.22.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.10 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.10 // indirect
github.com/aws/aws-sdk-go-v2/service/kms v1.30.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.20.4 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect
github.com/aws/smithy-go v1.20.2 // indirect
github.com/aws/smithy-go v1.20.4 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
Expand Down
28 changes: 18 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -70,28 +70,36 @@ github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj
github.com/aws/aws-sdk-go v1.34.0/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.54.14 h1:llJ60MzLzovyDE/rEDbUjS1cICh7krk1PwQwNlKRoeQ=
github.com/aws/aws-sdk-go v1.54.14/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA=
github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM=
github.com/aws/aws-sdk-go-v2 v1.30.4 h1:frhcagrVNrzmT95RJImMHgabt99vkXGslubDaDagTk8=
github.com/aws/aws-sdk-go-v2 v1.30.4/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.4 h1:OCs21ST2LrepDfD3lwlQiOqIGp6JiEUqG84GzTDoyJs=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.4/go.mod h1:usURWEKSNNAcAZuzRn/9ZYPT8aZQkR7xcCtunK/LkJo=
github.com/aws/aws-sdk-go-v2/config v1.27.10 h1:PS+65jThT0T/snC5WjyfHHyUgG+eBoupSDV+f838cro=
github.com/aws/aws-sdk-go-v2/config v1.27.10/go.mod h1:BePM7Vo4OBpHreKRUMuDXX+/+JWP38FLkzl5m27/Jjs=
github.com/aws/aws-sdk-go-v2/credentials v1.17.10 h1:qDZ3EA2lv1KangvQB6y258OssCHD0xvaGiEDkG4X/10=
github.com/aws/aws-sdk-go-v2/credentials v1.17.10/go.mod h1:6t3sucOaYDwDssHQa0ojH1RpmVmF5/jArkye1b2FKMI=
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.14.11 h1:KUHQows9JhDp+RJRs9KLN+ljsK5D+oLV13Wr/TwlSr4=
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.14.11/go.mod h1:4kdmcGnKW4R9l2ddj6hNgKnJoxztjvJNCoI9eikMgvI=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 h1:FVJ0r5XTHSmIHJV6KuDmdYhEpvlHpiSd38RQWhut5J4=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1/go.mod h1:zusuAeqezXzAB24LGuzuekqMAEgWkVYukBec3kr3jUg=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 h1:aw39xVGeRWlWx9EzGVnhOR4yOjQDHPQ6o6NmBlscyQg=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5/go.mod h1:FSaRudD0dXiMPK2UjknVwwTYyZMRsHv3TtkabsZih5I=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 h1:PG1F3OD1szkuQPzDw3CIQsRIrtTlUC3lP84taWzHlq0=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5/go.mod h1:jU1li6RFryMz+so64PpKtudI+QzbKoIEivqdf6LNpOc=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 h1:TNyt/+X43KJ9IJJMjKfa3bNTiZbUP7DeCxfbTROESwY=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16/go.mod h1:2DwJF39FlNAUiX5pAc0UNeiz16lK2t7IaFcm0LFHEgc=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 h1:jYfy8UPmd+6kJW5YhY0L1/KftReOGxI/4NtVSTh9O/I=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16/go.mod h1:7ZfEPZxkW42Afq4uQB8H2E2e6ebh6mXTueEpYzjCzcs=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.10 h1:5oE2WzJE56/mVveuDZPJESKlg/00AaS2pY2QZcnxg4M=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.10/go.mod h1:FHbKWQtRBYUz4vO5WBWjzMD2by126ny5y/1EoaWoLfI=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 h1:Ji0DY1xUsUr3I8cHps0G+XM3WWU16lP6yG8qu1GAZAs=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2/go.mod h1:5CsjAbs3NlGQyZNFACh+zztPDI7fU6eW9QsxjfnuBKg=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.34.5 h1:Cm77yt+/CV7A6DglkENsWA3H1hq8+4ItJnFKrhxHkvg=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.34.5/go.mod h1:s2fYaueBuCnwv1XQn6T8TfShxJWusv5tWPMcL+GY6+g=
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.22.4 h1:qOvCqaiLTc0MnIdZr0LbdtJKetiRscHxi+9XjjtlEAs=
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.22.4/go.mod h1:3YxVsEoCNYOLIbdA+cCXSp1fom9hrhyB1DsCiYryCaQ=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 h1:KypMCbLPPHEmf9DgMGw51jMj77VfGPAN2Kv4cfhlfgI=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4/go.mod h1:Vz1JQXliGcQktFTN/LN6uGppAIRoLBR2bMvIMP0gOjc=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.10 h1:L0ai8WICYHozIKK+OtPzVJBugL7culcuM4E4JOpIEm8=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.10/go.mod h1:byqfyxJBshFk0fF9YmK0M0ugIO8OWjzH2T3bPG4eGuA=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.17 h1:HDJGz1jlV7RokVgTPfx1UHBHANC0N5Uk++xgyYgz5E0=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.17/go.mod h1:5szDu6TWdRDytfDxUQVv2OYfpTQMKApVFyqpm+TcA98=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 h1:ogRAwT1/gxJBcSWDMZlgyFUM962F51A5CRhDLbxLdmo=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7/go.mod h1:YCsIZhXfRPLFFCl5xxY+1T9RKzOKjCut+28JSX2DnAk=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.10 h1:KOxnQeWy5sXyS37fdKEvAsGHOr9fa/qvwxfJurR/BzE=
Expand All @@ -106,8 +114,8 @@ github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 h1:Jux+gDDyi1Lruk+KHF91tK2K
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4/go.mod h1:mUYPBhaF2lGiukDEjJX2BLRRKTmoUSitGDUgM4tRxak=
github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 h1:cwIxeBttqPN3qkaAjcEcsh8NYr8n2HZPkcKgPAi1phU=
github.com/aws/aws-sdk-go-v2/service/sts v1.28.6/go.mod h1:FZf1/nKNEkHdGGJP/cI2MoIMquumuRK6ol3QQJNDxmw=
github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q=
github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4=
github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk=
github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/producerList.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ var producerListCmd = &cobra.Command{
fmt.Printf("%sAZCosmosDB%s (--output = azcosmosdb)\n", Green, Reset)
fmt.Printf("%sCassandra%s (--output = cassandra)\n", Green, Reset)
fmt.Printf("%sLUA Script%s (--output = luascript)\n", Green, Reset)
fmt.Printf("%sAWS DynamoDB%s (--output = awsdynamodb)\n", Green, Reset)
fmt.Println()

},
Expand Down
5 changes: 4 additions & 1 deletion pkg/cmd/templateRun.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ jr template run --template "{{name}}"
configuration.GlobalCfg.ElasticConfig, _ = cmd.Flags().GetString(f.Name)
case "s3Config":
configuration.GlobalCfg.S3Config, _ = cmd.Flags().GetString(f.Name)
case "awsDynamoDBConfig":
configuration.GlobalCfg.AWSDynamoDBConfig, _ = cmd.Flags().GetString(f.Name)
case "gcsConfig":
configuration.GlobalCfg.GCSConfig, _ = cmd.Flags().GetString(f.Name)
case "azBlobStorageConfig":
Expand Down Expand Up @@ -182,7 +184,8 @@ func init() {
templateRunCmd.Flags().String("redisConfig", "", "Redis configuration")
templateRunCmd.Flags().String("mongoConfig", "", "MongoDB configuration")
templateRunCmd.Flags().String("elasticConfig", "", "Elastic Search configuration")
templateRunCmd.Flags().String("s3Config", "", "Amazon S3 configuration")
templateRunCmd.Flags().String("s3Config", "", "AWS S3 configuration")
templateRunCmd.Flags().String("awsDynamoDBConfig", "", "AWS DynamoDB configuration")
templateRunCmd.Flags().String("gcsConfig", "", "Google GCS configuration")
templateRunCmd.Flags().String("azBlobStorageConfig", "", "Azure Blob storage configuration")
templateRunCmd.Flags().String("azCosmosDBConfig", "", "Azure CosmosDB configuration")
Expand Down
1 change: 1 addition & 0 deletions pkg/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type GlobalConfiguration struct {
GCSConfig string
HTTPConfig string
CassandraConfig string
AWSDynamoDBConfig string
LUAScriptConfig string
Url string
EmbeddedTemplate bool
Expand Down
13 changes: 13 additions & 0 deletions pkg/emitter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/jrnd-io/jr/pkg/constants"
"github.com/jrnd-io/jr/pkg/ctx"
"github.com/jrnd-io/jr/pkg/functions"
"github.com/jrnd-io/jr/pkg/producers/awsdynamodb"
"github.com/jrnd-io/jr/pkg/producers/azblobstorage"
"github.com/jrnd-io/jr/pkg/producers/azcosmosdb"
"github.com/jrnd-io/jr/pkg/producers/cassandra"
Expand Down Expand Up @@ -129,6 +130,11 @@ func (e *Emitter) Initialize(conf configuration.GlobalConfiguration) {
return
}

if e.Output == "awsdynamodb" {
e.Producer = createAWSDynamoDB(conf.AWSDynamoDBConfig)
return
}

if e.Output == "gcs" {
e.Producer = createGCSProducer(conf.GCSConfig)
return
Expand Down Expand Up @@ -213,6 +219,13 @@ func createS3Producer(s3Config string) Producer {
return sProducer
}

func createAWSDynamoDB(config string) Producer {
producer := &awsdynamodb.Producer{}
producer.Initialize(config)

return producer
}

func createAZBlobStorageProducer(azConfig string) Producer {
producer := &azblobstorage.Producer{}
producer.Initialize(azConfig)
Expand Down
25 changes: 25 additions & 0 deletions pkg/producers/awsdynamodb/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright © 2024 JR team
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package awsdynamodb

type Config struct {
Table string `json:"table"`
}
3 changes: 3 additions & 0 deletions pkg/producers/awsdynamodb/config.json.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"table":"<dynamo db table>"
}
90 changes: 90 additions & 0 deletions pkg/producers/awsdynamodb/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright © 2024 JR team
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package awsdynamodb

import (
"context"
"encoding/json"
"os"

awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go/aws"
"github.com/rs/zerolog/log"
)

type Producer struct {
configuration Config

client *dynamodb.Client
}

func (p *Producer) Initialize(configFile string) {
var config Config
file, err := os.ReadFile(configFile)
if err != nil {
log.Fatal().Err(err).Msg("Failed to ReadFile")
}
err = json.Unmarshal(file, &config)
if err != nil {
log.Fatal().Err(err).Msg("Failed to parse configuration parameters")
}

if config.Table == "" {
log.Fatal().Msg("Table is mandatory")
}

awsConfig, err := awsconfig.LoadDefaultConfig(context.TODO())
if err != nil {
log.Fatal().Err(err).Msg("Failed to load default AWS config")
}
client := dynamodb.NewFromConfig(awsConfig)

p.client = client
p.configuration = config
}

func (p *Producer) Produce(_ []byte, v []byte, _ any) {

var jsonMap map[string]interface{}
if err := json.Unmarshal(v, &jsonMap); err != nil {
log.Fatal().Err(err).Msg("Failed to unmarshal json")
}

item, err := attributevalue.MarshalMap(jsonMap)
if err != nil {
log.Fatal().Err(err).Msg("Failed to marshal map")
}

_, err = p.client.PutItem(context.TODO(), &dynamodb.PutItemInput{
TableName: aws.String(p.configuration.Table),
Item: item,
})
if err != nil {
log.Fatal().Err(err).Msg("Failed to put item")
}

}

func (p *Producer) Close() error {
return nil
}

0 comments on commit 5c22b9c

Please sign in to comment.