Skip to content

Commit

Permalink
Merge pull request #112 from ugol/issue/111
Browse files Browse the repository at this point in the history
Added GCS Producer  Fix #111
  • Loading branch information
ugol authored Dec 6, 2023
2 parents 94b707c + b5d4d4b commit cb1adc9
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 52 deletions.
1 change: 1 addition & 0 deletions config/jrconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
"mongoConfig": "./mongoDB/config.json",
"elasticConfig": "./elastic/config.json",
"s3Config": "./s3/config.json",
"gcsConfig": "./gcs/config.json",
"url": ""
}
}
31 changes: 26 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ module github.com/ugol/jr
go 1.20

require (
cloud.google.com/go/storage v1.35.1
github.com/actgardner/gogen-avro/v10 v10.2.1
github.com/aws/aws-sdk-go v1.45.20
github.com/confluentinc/confluent-kafka-go/v2 v2.2.0
github.com/elastic/go-elasticsearch/v8 v8.10.0
github.com/go-chi/chi/v5 v5.0.10
github.com/google/uuid v1.3.1
github.com/google/uuid v1.4.0
github.com/redis/go-redis/v9 v9.2.1
github.com/spf13/cobra v1.7.0
github.com/spf13/viper v1.16.0
Expand All @@ -18,10 +19,19 @@ require (
)

require (
cloud.google.com/go v0.110.8 // indirect
cloud.google.com/go/compute v1.23.1 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.3 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/elastic/elastic-transport-go/v8 v8.3.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand All @@ -31,8 +41,20 @@ require (
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/sync v0.3.0 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.13.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.150.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
)

require (
Expand All @@ -41,7 +63,6 @@ require (
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/heetch/avro v0.4.4 // indirect
github.com/iancoleman/orderedmap v0.3.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/invopop/jsonschema v0.11.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
Expand All @@ -54,7 +75,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/squeeze69/codicefiscale v1.0.4 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/sys v0.13.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
91 changes: 45 additions & 46 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/cmd/emitterCreate.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func init() {
emitterCreateCmd.Flags().StringVar(&valueTemplate, "valueTemplate", constants.DEFAULT_VALUE_TEMPLATE, "template name to use for the value")
emitterCreateCmd.Flags().StringVar(&keyTemplate, "keyTemplate", constants.DEFAULT_KEY, "template to use for the key")
emitterCreateCmd.Flags().StringVar(&outputTemplate, "outputTemplate", constants.DEFAULT_OUTPUT_TEMPLATE, "Formatting of K,V on standard output")
emitterCreateCmd.Flags().StringVarP(&output, "output", "o", constants.DEFAULT_OUTPUT, "can be one of stdout, kafka, redis, mongo, elastic, s3")
emitterCreateCmd.Flags().StringVarP(&output, "output", "o", constants.DEFAULT_OUTPUT, "can be one of stdout, kafka, redis, mongo, elastic, s3, gcs")
emitterCreateCmd.Flags().StringVar(&topic, "topic", constants.DEFAULT_TOPIC, "Default topic to write to if using output='kafka'")
emitterCreateCmd.Flags().BoolVar(&kcat, "kcat", false, "If you want to pipe jr with kcat, use this flag: it is equivalent to --output stdout --outputTemplate '{{key}},{{value}}' --oneline")
emitterCreateCmd.Flags().BoolVarP(&oneline, "oneline", "l", false, "strips /n from output, for example to be pipelined to tools like kcat")
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/producerList.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var producerListCmd = &cobra.Command{
fmt.Printf("%sMongodb%s (--output = mongo)\n", Green, Reset)
fmt.Printf("%sElastic%s (--output = elastic)\n", Green, Reset)
fmt.Printf("%sS3%s (--output = s3)\n", Green, Reset)
fmt.Printf("%sGCS%s (--output = gcs)\n", Green, Reset)
fmt.Println()

},
Expand Down
3 changes: 3 additions & 0 deletions pkg/cmd/templateRun.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ jr template run --template "{{name}}"
configuration.GlobalCfg.ElasticConfig, _ = cmd.Flags().GetString(f.Name)
case "s3Config":
configuration.GlobalCfg.S3Config, _ = cmd.Flags().GetString(f.Name)
case "gcsConfig":
configuration.GlobalCfg.GCSConfig, _ = cmd.Flags().GetString(f.Name)
}
}
})
Expand Down Expand Up @@ -170,5 +172,6 @@ func init() {
templateRunCmd.Flags().String("mongoConfig", "", "MongoDB configuration")
templateRunCmd.Flags().String("elasticConfig", "", "Elastic Search configuration")
templateRunCmd.Flags().String("s3Config", "", "Amazon S3 configuration")
templateRunCmd.Flags().String("gcsConfig", "", "Google GCS configuration")

}
1 change: 1 addition & 0 deletions pkg/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type GlobalConfiguration struct {
MongoConfig string
ElasticConfig string
S3Config string
GCSConfig string
Url string
EmbeddedTemplate bool
FileNameTemplate bool
Expand Down
13 changes: 13 additions & 0 deletions pkg/emitter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/ugol/jr/pkg/producers/mongoDB"
"github.com/ugol/jr/pkg/producers/redis"
"github.com/ugol/jr/pkg/producers/s3"
"github.com/ugol/jr/pkg/producers/gcs"
"github.com/ugol/jr/pkg/producers/server"
"github.com/ugol/jr/pkg/tpl"
"log"
Expand Down Expand Up @@ -123,6 +124,11 @@ func (e *Emitter) Initialize(conf configuration.GlobalConfiguration) {
return
}

if e.Output == "gcs" {
e.Producer = createGCSProducer(conf.GCSConfig)
return
}

if e.Output == "http" {
e.Producer = &server.JsonProducer{OutputTpl: &o}
return
Expand Down Expand Up @@ -171,6 +177,13 @@ func createS3Producer(s3Config string) Producer {
return sProducer
}

func createGCSProducer(gcsConfig string) Producer {
gProducer := &gcs.GCSProducer{}
gProducer.Initialize(gcsConfig)

return gProducer
}

func createKafkaProducer(conf configuration.GlobalConfiguration, topic string, templateType string) *kafka.KafkaManager {

kManager := &kafka.KafkaManager{
Expand Down
3 changes: 3 additions & 0 deletions pkg/producers/gcs/config.json.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"bucket_name": "your-bucket-name"
}
74 changes: 74 additions & 0 deletions pkg/producers/gcs/gcsProducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package gcs

import (
"context"
"encoding/json"
"io/ioutil"
"log"
"fmt"

"cloud.google.com/go/storage"
"github.com/google/uuid"
)

type Config struct {
Bucket string `json:"bucket_name"`
}

type GCSProducer struct {
client storage.Client
bucket string
}

func (p *GCSProducer) Initialize(configFile string) {
var config Config
file, err := ioutil.ReadFile(configFile)
err = json.Unmarshal(file, &config)
if err != nil {
log.Fatalf("Failed to parse configuration parameters: %s", err)
}

ctx := context.Background()
// Use Google Application Default Credentials to authorize and authenticate the client.
// More information about Application Default Credentials and how to enable is at
// https://developers.google.com/identity/protocols/application-default-credentials.
client, err := storage.NewClient(ctx)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}

p.client = *client
p.bucket = config.Bucket
}

func (p *GCSProducer) Produce(k []byte, v []byte, o any) {
ctx := context.Background()

bucket := p.bucket
var key string

if k == nil || len(k) == 0 {
// generate a UUID as index
id := uuid.New()
key = id.String() + "/.json"
} else {
key = string(k) + "/.json"
}

objectHandle := p.client.Bucket(bucket).Object(key)
writer := objectHandle.NewWriter(ctx)
kvPair := fmt.Sprintf("%s=%s\n", key, v)

_, err := writer.Write([]byte(kvPair))
if err != nil {
log.Fatalf("Failed to write to GCS: %v", err)
}

writer.Close()

}

func (p *GCSProducer) Close() error {
p.client.Close()
return nil
}

0 comments on commit cb1adc9

Please sign in to comment.