Skip to content

Commit

Permalink
restructure
Browse files Browse the repository at this point in the history
  • Loading branch information
Armand Caesar committed Feb 9, 2021
1 parent 48b79c9 commit 6e00990
Show file tree
Hide file tree
Showing 18 changed files with 52 additions and 1,171 deletions.
12 changes: 5 additions & 7 deletions example/consumer_example/consume_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"syscall"

"github.com/kata-ai/messagebus-golang-kafka/messagebus"
"github.com/kata-ai/messagebus-golang-kafka/messagebus/consumer"
"github.com/kata-ai/messagebus-golang-kafka/messagebus/serialization"
)

type handler struct {
Expand All @@ -20,11 +18,11 @@ func (h handler) HandleMessage(context messagebus.MessageContext) {
}

func main() {
consumerConfig := consumer.NewConsumerConfig(
consumerConfig := messagebus.NewConsumerConfig(
"messagebus-golang",
consumer.WithSASLAuth(
consumer.SASL_PLAINTEXT,
consumer.SCRAM_SHA_512,
messagebus.WithConsumerSASLAuth(
messagebus.SASL_PLAINTEXT,
messagebus.SCRAM_SHA_512,
"kafka-dev",
"Cfhj5nJ6Fy1W",
),
Expand All @@ -35,7 +33,7 @@ func main() {
bus, err := messagebus.NewMessageBus(
brokers,
"http://ab14371f4e314424c9eeeb6c4eb707b3-143588661.ap-southeast-1.elb.amazonaws.com:8081",
serialization.RECORD_NAME_STRATEGY,
messagebus.RECORD_NAME_STRATEGY,
nil,
consumerConfig,
)
Expand Down
19 changes: 8 additions & 11 deletions example/publisher_example/publish_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@ import (

"github.com/kata-ai/messagebus-golang-kafka/example/schemas"
"github.com/kata-ai/messagebus-golang-kafka/messagebus"
"github.com/kata-ai/messagebus-golang-kafka/messagebus/producer"
"github.com/kata-ai/messagebus-golang-kafka/messagebus/record"
"github.com/kata-ai/messagebus-golang-kafka/messagebus/serialization"
)

func main() {
producerConfig := producer.NewProducerConfig(
producer.WithCompressionType("gzip"),
producer.WithSASLAuth(
producer.SASL_PLAINTEXT,
producer.SCRAM_SHA_512,
producerConfig := messagebus.NewProducerConfig(
messagebus.WithCompressionType("gzip"),
messagebus.WithProducerSASLAuth(
messagebus.SASL_PLAINTEXT,
messagebus.SCRAM_SHA_512,
"kafka-dev",
"Cfhj5nJ6Fy1W",
),
Expand All @@ -27,7 +24,7 @@ func main() {
bus, err := messagebus.NewMessageBus(
brokers,
"http://ab14371f4e314424c9eeeb6c4eb707b3-143588661.ap-southeast-1.elb.amazonaws.com:8081",
serialization.RECORD_NAME_STRATEGY,
messagebus.RECORD_NAME_STRATEGY,
producerConfig,
nil,
)
Expand All @@ -40,11 +37,11 @@ func main() {
Name: "johnny",
Age: 21,
}
key, err := record.NewMessageKey("messagebus_test")
key, err := messagebus.NewMessageKey("messagebus_test")
if err != nil {
panic(err)
}
message := &record.ProducerRecord{
message := &messagebus.ProducerRecord{
Key: key,
Value: value,
}
Expand Down
18 changes: 7 additions & 11 deletions example/rpc_example/rpc_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ import (

"github.com/kata-ai/messagebus-golang-kafka/example/schemas"
"github.com/kata-ai/messagebus-golang-kafka/messagebus"
"github.com/kata-ai/messagebus-golang-kafka/messagebus/consumer"
"github.com/kata-ai/messagebus-golang-kafka/messagebus/producer"
"github.com/kata-ai/messagebus-golang-kafka/messagebus/record"
"github.com/kata-ai/messagebus-golang-kafka/messagebus/serialization"
)

type handler struct{}
Expand All @@ -19,24 +15,24 @@ func (handler) HandleMessage(context messagebus.MessageContext) {
Name: requestValue["name"].(string),
Age: int32(requestValue["age"].(float64)) + 10,
}
key, err := record.NewMessageKey("messagebus_test_response")
key, err := messagebus.NewMessageKey("messagebus_test_response")
if err != nil {
panic(err)
}
responseRecord := record.NewProducerRecord(key, responseValue)
responseRecord := messagebus.NewProducerRecord(key, responseValue)
_, _ = context.Reply(responseRecord)
}

func main() {
producerConfig := producer.NewProducerConfig(producer.WithCompressionType("gzip"))
producerConfig := messagebus.NewProducerConfig(messagebus.WithCompressionType("gzip"))
brokers := []string{
"a8714fef67c014a368982de8747cd095-1492289186.ap-southeast-1.elb.amazonaws.com:9094",
}
consumerConfig := consumer.NewConsumerConfig("messagebus-golang")
consumerConfig := messagebus.NewConsumerConfig("messagebus-golang")
bus, err := messagebus.NewMessageBus(
brokers,
"http://ab14371f4e314424c9eeeb6c4eb707b3-143588661.ap-southeast-1.elb.amazonaws.com:8081",
serialization.RECORD_NAME_STRATEGY,
messagebus.RECORD_NAME_STRATEGY,
producerConfig,
consumerConfig,
nil,
Expand All @@ -49,11 +45,11 @@ func main() {
Name: "johnny",
Age: 21,
}
key, err := record.NewMessageKey("messagebus_test", record.WithReplyTopic("message-bus-golang-reply"))
key, err := messagebus.NewMessageKey("messagebus_test", messagebus.WithReplyTopic("message-bus-golang-reply"))
if err != nil {
panic(err)
}
requestRecord := record.NewProducerRecord(key, value)
requestRecord := messagebus.NewProducerRecord(key, value)
bus.RegisterHandler("message-bus-golang", handler{})
err = bus.Subscribe("message-bus-golang")
if err != nil {
Expand Down
12 changes: 0 additions & 12 deletions messagebus/consumer/constants.go

This file was deleted.

114 changes: 0 additions & 114 deletions messagebus/consumer/consumer.go

This file was deleted.

5 changes: 2 additions & 3 deletions messagebus/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import (
"errors"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/kata-ai/messagebus-golang-kafka/messagebus/record"
)

type MessageContext struct {
Incoming *record.ConsumerRecord
Incoming *ConsumerRecord
Sender IMessageBus
}

func (m MessageContext) Reply(record *record.ProducerRecord) (offset kafka.Offset, err error) {
func (m MessageContext) Reply(record *ProducerRecord) (offset kafka.Offset, err error) {
if m.Incoming.Key.ReplyTopic == "" {
return -1, errors.New("reply topic undefined")
}
Expand Down
24 changes: 21 additions & 3 deletions messagebus/interfaces.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,36 @@
package messagebus

import (
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/kata-ai/messagebus-golang-kafka/messagebus/record"
)

type IMessageBus interface {
Send(service string, message *record.ProducerRecord) (kafka.Offset, error)
Send(service string, message *ProducerRecord) (kafka.Offset, error)
Subscribe(topic string) error
Unsubscribe(topic string) error
Request(service string, message *record.ProducerRecord) (*record.ConsumerRecord, error)
Request(service string, message *ProducerRecord) (*ConsumerRecord, error)
Disconnect() error
}

type Handler interface {
HandleMessage(context MessageContext)
}

type ISerializer interface {
Serialize(topic string, record *ProducerRecord) (*SerializedProducerRecord, error)
Deserialize(message *kafka.Message) (*ConsumerRecord, error)
}

type ISchemaRegistryClient interface {
getSchema(schemaID int) (*Schema, error)
getLatestSchema(subject string, isKey bool) (*Schema, error)
getSchemaVersions(subject string, isKey bool) ([]int, error)
getSchemaByVersion(subject string, version int, isKey bool) (*Schema, error)
createSchema(subject string, schema string, schemaType SchemaType, isKey bool) (*Schema, error)
setCredentials(username string, password string)
setTimeout(timeout time.Duration)
isCachingEnabled(value bool)
isCodecCreationEnabled(value bool)
}
20 changes: 8 additions & 12 deletions messagebus/message_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,26 @@ import (
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/kata-ai/messagebus-golang-kafka/messagebus/consumer"
"github.com/kata-ai/messagebus-golang-kafka/messagebus/producer"
"github.com/kata-ai/messagebus-golang-kafka/messagebus/record"
"github.com/kata-ai/messagebus-golang-kafka/messagebus/serialization"
)

type MessageBus struct {
Producer *kafka.Producer
Consumer *kafka.Consumer
Handlers map[string]Handler
Serializer serialization.ISerializer
Serializer ISerializer
Subscriptions []string
stopChan chan bool
rpcTimeoutMs int
producerConfig *producer.ProducerConfiguration
consumerConfig *consumer.ConsumerConfiguration
producerConfig *ProducerConfiguration
consumerConfig *ConsumerConfiguration
}

type MessageBusOption func(m *MessageBus)

// NewMessageBus -> Create new instance of message bus
// It returns a pointer for message bus object and an error
// Error is nil if message bus instantiation is successful
func NewMessageBus(brokerList []string, schemaRegistry string, strategy serialization.SubjectStrategy, producerConfig *producer.ProducerConfiguration, consumerConfig *consumer.ConsumerConfiguration, opts ...MessageBusOption) (*MessageBus, error) {
func NewMessageBus(brokerList []string, schemaRegistry string, strategy SubjectStrategy, producerConfig *ProducerConfiguration, consumerConfig *ConsumerConfiguration, opts ...MessageBusOption) (*MessageBus, error) {
brokers := strings.Join(brokerList, ",")

var err error
Expand All @@ -56,7 +52,7 @@ func NewMessageBus(brokerList []string, schemaRegistry string, strategy serializ
}
}

serializer, err := serialization.NewSerializer(schemaRegistry, strategy)
serializer, err := NewSerializer(schemaRegistry, strategy)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -94,7 +90,7 @@ func (m *MessageBus) RegisterHandler(topic string, handler Handler) {
// Send message to a topic
// Returns kafka offset object and error
// Error is nil if send operation is successful
func (m MessageBus) Send(service string, message *record.ProducerRecord) (kafka.Offset, error) {
func (m MessageBus) Send(service string, message *ProducerRecord) (kafka.Offset, error) {
serializedRecord, err := m.Serializer.Serialize(service, message)
if err != nil {
return -1, err
Expand Down Expand Up @@ -210,7 +206,7 @@ func (m *MessageBus) Disconnect() error {

// Request-response pattern
// May not work properly with Kafka since it is not designed to do request-response pattern
func (m *MessageBus) Request(service string, message *record.ProducerRecord) (*record.ConsumerRecord, error) {
func (m *MessageBus) Request(service string, message *ProducerRecord) (*ConsumerRecord, error) {
if m.Consumer == nil {
return nil, errors.New("consumer not instantiated")
}
Expand All @@ -219,7 +215,7 @@ func (m *MessageBus) Request(service string, message *record.ProducerRecord) (*r
return nil, errors.New("message should have reply topic")
}

resultChan := make(chan *record.ConsumerRecord)
resultChan := make(chan *ConsumerRecord)
defer close(resultChan)

m.RegisterHandler(replyTopic, replyHandler{
Expand Down
Loading

0 comments on commit 6e00990

Please sign in to comment.