Skip to content

Commit

Permalink
restructure
Browse files Browse the repository at this point in the history
  • Loading branch information
Armand Caesar committed Feb 9, 2021
1 parent 6e00990 commit 89c3005
Show file tree
Hide file tree
Showing 8 changed files with 1,082 additions and 0 deletions.
13 changes: 13 additions & 0 deletions messagebus/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package messagebus

type SecurityProtocol string
type SASLMechanism string

const VERSION = "1.0.0"
const (
SASL_PLAINTEXT SecurityProtocol = "SASL_PLAINTEXT"
)

const (
SCRAM_SHA_512 SASLMechanism = "SCRAM-SHA-512"
)
114 changes: 114 additions & 0 deletions messagebus/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package messagebus

import "github.com/confluentinc/confluent-kafka-go/kafka"

type ConsumerConfiguration struct {
PollIntervalMs int
KafkaConfig *kafka.ConfigMap
}

type ConsumerOption func(c *ConsumerConfiguration)

// Create instance of consumer configuration
// Kafka configuration customization can be done through variadic parameters
// Complete Kafka configuration can be seen at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
// Example:
// NewConsumerConfig("group-1", WithPollIntervalMs(150), WithFetchMinBytes(20))
// Default values:
// pollIntervalMs: 100
// fetch.min.bytes: 10
// fetch.wait.max.ms: 10
// max.partition.fetch.bytes: 1048576
// session.timeout.ms: 10000
// heartbeat.interval.ms: 3000
// enable.auto.commit: false
// auto.offset.reset: "earliest"
func NewConsumerConfig(groupId string, opts ...ConsumerOption) *ConsumerConfiguration {
consumerConfig := &ConsumerConfiguration{
PollIntervalMs: 100,
KafkaConfig: &kafka.ConfigMap{
"group.id": groupId,
"fetch.min.bytes": 10,
"fetch.wait.max.ms": 500,
"max.partition.fetch.bytes": 1048576,
"session.timeout.ms": 10000,
"heartbeat.interval.ms": 3000,
"enable.auto.commit": false,
"auto.offset.reset": "earliest",
},
}
for _, opt := range opts {
opt(consumerConfig)
}
return consumerConfig
}

// Configure subscription poll interval in millisecond
func WithPollIntervalMs(ms int) ConsumerOption {
return func(c *ConsumerConfiguration) {
c.PollIntervalMs = ms
}
}

// Configure minimum number of bytes the broker responds with
func WithFetchMinBytes(fetchMinBytes int) ConsumerOption {
return func(c *ConsumerConfiguration) {
_ = c.KafkaConfig.SetKey("fetch.min.bytes", fetchMinBytes)
}
}

// Configure maximum time the broker may wait to fill the
// fetch response with fetch.min.bytes of messages
func WithFetchWaitMaxMs(fetchWaitMaxMs int) ConsumerOption {
return func(c *ConsumerConfiguration) {
_ = c.KafkaConfig.SetKey("fetch.wait.max.ms", fetchWaitMaxMs)
}
}

// Configure initial maximum number of bytes per topic+partition
// to request when fetching messages from the broker
func WithMaxPartitionFetchBytes(bytes int) ConsumerOption {
return func(c *ConsumerConfiguration) {
_ = c.KafkaConfig.SetKey("max.partition.fetch.bytes", bytes)
}
}

// Configure client group session and failure detection timeout
func WithSessionTimoutMs(ms int) ConsumerOption {
return func(c *ConsumerConfiguration) {
_ = c.KafkaConfig.SetKey("session.timeout.ms", ms)
}
}

// Configure group session keepalive heartbeat interval
func WithHeartbeatIntervalMs(ms int) ConsumerOption {
return func(c *ConsumerConfiguration) {
_ = c.KafkaConfig.SetKey("heartbeat.interval.ms", ms)
}
}

// Configure cction to take when there is no initial offset
// in offset store or the desired offset is out of range
func WithAutoOffsetReset(autoReset string) ConsumerOption {
return func(c *ConsumerConfiguration) {
_ = c.KafkaConfig.SetKey("auto.offset.reset", autoReset)
}
}

// Configure whether to automatically and periodically commit
// offsets in the background
func WithEnableAutoCommit(autoCommit bool) ConsumerOption {
return func(c *ConsumerConfiguration) {
_ = c.KafkaConfig.SetKey("enable.auto.commit", autoCommit)
}
}

// Configure SASL auth properties
func WithConsumerSASLAuth(protocol SecurityProtocol, mechanism SASLMechanism, username string, password string) ConsumerOption {
return func(p *ConsumerConfiguration) {
_ = p.KafkaConfig.SetKey("security.protocol", string(protocol))
_ = p.KafkaConfig.SetKey("sasl.mechanism", string(mechanism))
_ = p.KafkaConfig.SetKey("sasl.username", username)
_ = p.KafkaConfig.SetKey("sasl.password", password)
}
}
209 changes: 209 additions & 0 deletions messagebus/message_key.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 89c3005

Please sign in to comment.