diff --git a/messagebus/constants.go b/messagebus/constants.go new file mode 100644 index 0000000..277f2c3 --- /dev/null +++ b/messagebus/constants.go @@ -0,0 +1,13 @@ +package messagebus + +type SecurityProtocol string +type SASLMechanism string + +const VERSION = "1.0.0" +const ( + SASL_PLAINTEXT SecurityProtocol = "SASL_PLAINTEXT" +) + +const ( + SCRAM_SHA_512 SASLMechanism = "SCRAM-SHA-512" +) diff --git a/messagebus/consumer.go b/messagebus/consumer.go new file mode 100644 index 0000000..595feb0 --- /dev/null +++ b/messagebus/consumer.go @@ -0,0 +1,114 @@ +package messagebus + +import "github.com/confluentinc/confluent-kafka-go/kafka" + +type ConsumerConfiguration struct { + PollIntervalMs int + KafkaConfig *kafka.ConfigMap +} + +type ConsumerOption func(c *ConsumerConfiguration) + +// Create instance of consumer configuration +// Kafka configuration customization can be done through variadic parameters +// Complete Kafka configuration can be seen at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md +// Example: +// NewConsumerConfig("group-1", WithPollIntervalMs(150), WithFetchMinBytes(20)) +// Default values: +// pollIntervalMs: 100 +// fetch.min.bytes: 10 +// fetch.wait.max.ms: 10 +// max.partition.fetch.bytes: 1048576 +// session.timeout.ms: 10000 +// heartbeat.interval.ms: 3000 +// enable.auto.commit: false +// auto.offset.reset: "earliest" +func NewConsumerConfig(groupId string, opts ...ConsumerOption) *ConsumerConfiguration { + consumerConfig := &ConsumerConfiguration{ + PollIntervalMs: 100, + KafkaConfig: &kafka.ConfigMap{ + "group.id": groupId, + "fetch.min.bytes": 10, + "fetch.wait.max.ms": 500, + "max.partition.fetch.bytes": 1048576, + "session.timeout.ms": 10000, + "heartbeat.interval.ms": 3000, + "enable.auto.commit": false, + "auto.offset.reset": "earliest", + }, + } + for _, opt := range opts { + opt(consumerConfig) + } + return consumerConfig +} + +// Configure subscription poll interval in millisecond +func WithPollIntervalMs(ms int) ConsumerOption { + return func(c *ConsumerConfiguration) { + c.PollIntervalMs = ms + } +} + +// Configure minimum number of bytes the broker responds with +func WithFetchMinBytes(fetchMinBytes int) ConsumerOption { + return func(c *ConsumerConfiguration) { + _ = c.KafkaConfig.SetKey("fetch.min.bytes", fetchMinBytes) + } +} + +// Configure maximum time the broker may wait to fill the +// fetch response with fetch.min.bytes of messages +func WithFetchWaitMaxMs(fetchWaitMaxMs int) ConsumerOption { + return func(c *ConsumerConfiguration) { + _ = c.KafkaConfig.SetKey("fetch.wait.max.ms", fetchWaitMaxMs) + } +} + +// Configure initial maximum number of bytes per topic+partition +// to request when fetching messages from the broker +func WithMaxPartitionFetchBytes(bytes int) ConsumerOption { + return func(c *ConsumerConfiguration) { + _ = c.KafkaConfig.SetKey("max.partition.fetch.bytes", bytes) + } +} + +// Configure client group session and failure detection timeout +func WithSessionTimoutMs(ms int) ConsumerOption { + return func(c *ConsumerConfiguration) { + _ = c.KafkaConfig.SetKey("session.timeout.ms", ms) + } +} + +// Configure group session keepalive heartbeat interval +func WithHeartbeatIntervalMs(ms int) ConsumerOption { + return func(c *ConsumerConfiguration) { + _ = c.KafkaConfig.SetKey("heartbeat.interval.ms", ms) + } +} + +// Configure cction to take when there is no initial offset +// in offset store or the desired offset is out of range +func WithAutoOffsetReset(autoReset string) ConsumerOption { + return func(c *ConsumerConfiguration) { + _ = c.KafkaConfig.SetKey("auto.offset.reset", autoReset) + } +} + +// Configure whether to automatically and periodically commit +// offsets in the background +func WithEnableAutoCommit(autoCommit bool) ConsumerOption { + return func(c *ConsumerConfiguration) { + _ = c.KafkaConfig.SetKey("enable.auto.commit", autoCommit) + } +} + +// Configure SASL auth properties +func WithConsumerSASLAuth(protocol SecurityProtocol, mechanism SASLMechanism, username string, password string) ConsumerOption { + return func(p *ConsumerConfiguration) { + _ = p.KafkaConfig.SetKey("security.protocol", string(protocol)) + _ = p.KafkaConfig.SetKey("sasl.mechanism", string(mechanism)) + _ = p.KafkaConfig.SetKey("sasl.username", username) + _ = p.KafkaConfig.SetKey("sasl.password", password) + } +} diff --git a/messagebus/message_key.go b/messagebus/message_key.go new file mode 100644 index 0000000..a8bf67c --- /dev/null +++ b/messagebus/message_key.go @@ -0,0 +1,209 @@ +// Code generated by github.com/actgardner/gogen-avro/v7. DO NOT EDIT. +/* + * SOURCE: + * message_header.avsc + */ +package messagebus + +import ( + "github.com/actgardner/gogen-avro/v7/compiler" + "github.com/actgardner/gogen-avro/v7/vm" + "github.com/actgardner/gogen-avro/v7/vm/types" + "github.com/google/uuid" + "io" + "os" + "time" +) + +type MessageKeyOption func(k *MessageKey) + +type MessageKey struct { + ValueSubject string `json:"valueSubject"` + + MessageId string `json:"messageId"` + + CorrelationId string `json:"correlationId"` + + ConversationId string `json:"conversationId"` + + ReplyTopic string `json:"replyTopic"` + + OriginService string `json:"originService"` + + OriginHostname string `json:"originHostname"` + + MessageBusVersion string `json:"messageBusVersion"` + + Timestamp int64 `json:"timestamp"` +} + +const MessageKeyAvroCRC64Fingerprint = "k\xe9\xb8G\xb5\x9a=v" + +func NewMessageKey(originService string, options ...MessageKeyOption) (*MessageKey, error) { + id := uuid.New().String() + hostname, err := os.Hostname() + if err != nil { + return nil, err + } + key := &MessageKey{ + OriginService: originService, + MessageId: id, + CorrelationId: id, + OriginHostname: hostname, + MessageBusVersion: VERSION, + Timestamp: time.Now().Unix() * 1000, + ConversationId: id, + } + for _, opt := range options { + opt(key) + } + return key, nil +} + +func (r *MessageKey) SetValueSubject(valueSubject string) { + r.ValueSubject = valueSubject +} + +func DeserializeMessageKey(r io.Reader) (*MessageKey, error) { + t := &MessageKey{} + deser, err := compiler.CompileSchemaBytes([]byte(t.Schema()), []byte(t.Schema())) + if err != nil { + return nil, err + } + + err = vm.Eval(r, deser, t) + if err != nil { + return nil, err + } + return t, err +} + +func WithReplyTopic(topic string) MessageKeyOption { + return func(k *MessageKey) { + k.ReplyTopic = topic + } +} + +func DeserializeMessageKeyFromSchema(r io.Reader, schema string) (*MessageKey, error) { + t := &MessageKey{} + + deser, err := compiler.CompileSchemaBytes([]byte(schema), []byte(t.Schema())) + if err != nil { + return nil, err + } + + err = vm.Eval(r, deser, t) + if err != nil { + return nil, err + } + return t, err +} + +func writeMessageKey(r *MessageKey, w io.Writer) error { + var err error + err = vm.WriteString(r.ValueSubject, w) + if err != nil { + return err + } + err = vm.WriteString(r.MessageId, w) + if err != nil { + return err + } + err = vm.WriteString(r.CorrelationId, w) + if err != nil { + return err + } + err = vm.WriteString(r.ConversationId, w) + if err != nil { + return err + } + err = vm.WriteString(r.ReplyTopic, w) + if err != nil { + return err + } + err = vm.WriteString(r.OriginService, w) + if err != nil { + return err + } + err = vm.WriteString(r.OriginHostname, w) + if err != nil { + return err + } + err = vm.WriteString(r.MessageBusVersion, w) + if err != nil { + return err + } + err = vm.WriteLong(r.Timestamp, w) + if err != nil { + return err + } + return err +} + +func (r *MessageKey) Serialize(w io.Writer) error { + return writeMessageKey(r, w) +} + +func (r *MessageKey) Schema() string { + return "{\"fields\":[{\"name\":\"valueSubject\",\"type\":\"string\"},{\"name\":\"messageId\",\"type\":\"string\"},{\"name\":\"correlationId\",\"type\":\"string\"},{\"name\":\"conversationId\",\"type\":\"string\"},{\"default\":\"\",\"name\":\"replyTopic\",\"type\":\"string\"},{\"name\":\"originService\",\"type\":\"string\"},{\"name\":\"originHostname\",\"type\":\"string\"},{\"name\":\"messageBusVersion\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"long\"}],\"name\":\"ai.kata.kafka.MessageKey\",\"type\":\"record\"}" +} + +func (r *MessageKey) SchemaName() string { + return "ai.kata.kafka.MessageKey" +} + +func (_ *MessageKey) SetBoolean(v bool) { panic("Unsupported operation") } +func (_ *MessageKey) SetInt(v int32) { panic("Unsupported operation") } +func (_ *MessageKey) SetLong(v int64) { panic("Unsupported operation") } +func (_ *MessageKey) SetFloat(v float32) { panic("Unsupported operation") } +func (_ *MessageKey) SetDouble(v float64) { panic("Unsupported operation") } +func (_ *MessageKey) SetBytes(v []byte) { panic("Unsupported operation") } +func (_ *MessageKey) SetString(v string) { panic("Unsupported operation") } +func (_ *MessageKey) SetUnionElem(v int64) { panic("Unsupported operation") } + +func (r *MessageKey) Get(i int) types.Field { + switch i { + case 0: + return &types.String{Target: &r.ValueSubject} + case 1: + return &types.String{Target: &r.MessageId} + case 2: + return &types.String{Target: &r.CorrelationId} + case 3: + return &types.String{Target: &r.ConversationId} + case 4: + return &types.String{Target: &r.ReplyTopic} + case 5: + return &types.String{Target: &r.OriginService} + case 6: + return &types.String{Target: &r.OriginHostname} + case 7: + return &types.String{Target: &r.MessageBusVersion} + case 8: + return &types.Long{Target: &r.Timestamp} + } + panic("Unknown field index") +} + +func (r *MessageKey) SetDefault(i int) { + switch i { + case 4: + r.ReplyTopic = "" + return + } + panic("Unknown field index") +} + +func (r *MessageKey) NullField(i int) { + switch i { + } + panic("Not a nullable field index") +} + +func (_ *MessageKey) AppendMap(key string) types.Field { panic("Unsupported operation") } +func (_ *MessageKey) AppendArray() types.Field { panic("Unsupported operation") } +func (_ *MessageKey) Finalize() {} + +func (_ *MessageKey) AvroCRC64Fingerprint() []byte { + return []byte(MessageKeyAvroCRC64Fingerprint) +} diff --git a/messagebus/producer.go b/messagebus/producer.go new file mode 100644 index 0000000..7bd81a6 --- /dev/null +++ b/messagebus/producer.go @@ -0,0 +1,128 @@ +package messagebus + +import ( + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +type ProducerConfiguration struct { + FlushTimeoutMs int + KafkaConfig *kafka.ConfigMap +} + +type ProducerOption func(p *ProducerConfiguration) + +// Create instance of producer configuration. +// Kafka configuration customization can be done through variadic parameters. +// Complete Kafka configuration can be seen at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md +// Example: +// NewProducerConfig(WithFlushTimeout(150), WithAcks(-1), WithRetries(5)) +// Default values: +// FlushTimeoutMs: 3000 +// acks: 1 +// retries: 10 +// max.in.flight: 1048576 +// message.max.bytes: 10000 +// compression.type: "snappy" +// retry.backoff.ms: 100 +// linger.ms: 100 +// batch.num.messages: 10000 +func NewProducerConfig(opts ...ProducerOption) *ProducerConfiguration { + producerConfig := &ProducerConfiguration{ + FlushTimeoutMs: 3000, + KafkaConfig: &kafka.ConfigMap{ + "acks": 1, + "retries": 5, + "max.in.flight": 1, + "message.max.bytes": 1000000, + "compression.type": "snappy", + "retry.backoff.ms": 100, + "linger.ms": 100, + "batch.num.messages": 10000, + }, + } + for _, opt := range opts { + opt(producerConfig) + } + return producerConfig +} + +// Configure timeout when flushing producer before disconnecting +func WithFlushTimeout(flushTimeoutMs int) ProducerOption { + return func(p *ProducerConfiguration) { + p.FlushTimeoutMs = flushTimeoutMs + } +} + +// Configure required number of acknowledgement +func WithAcks(acks int) ProducerOption { + return func(p *ProducerConfiguration) { + _ = p.KafkaConfig.SetKey("acks", acks) + } +} + +// Configure number of retries when sending +func WithRetries(retries int) ProducerOption { + return func(p *ProducerConfiguration) { + _ = p.KafkaConfig.SetKey("retries", retries) + } +} + +// Configure maximum number of in-flight requests per broker connection +func WithMaxInFlight(maxInFlight int) ProducerOption { + return func(p *ProducerConfiguration) { + _ = p.KafkaConfig.SetKey("max.in.flight", maxInFlight) + } +} + +// Configure maximum request message size +func WithMaxBytes(maxBytes int) ProducerOption { + return func(p *ProducerConfiguration) { + _ = p.KafkaConfig.SetKey("message.max.bytes", maxBytes) + } +} + +// Configure message compression type +func WithCompressionType(compressionType string) ProducerOption { + return func(p *ProducerConfiguration) { + _ = p.KafkaConfig.SetKey("compression.type", compressionType) + } +} + +// Configure backoff time in milliseconds before retrying a protocol request +func WithRetryBackoffMs(retryBackoffMs int) ProducerOption { + return func(p *ProducerConfiguration) { + _ = p.KafkaConfig.SetKey("retry.backoff.ms", retryBackoffMs) + } +} + +// Configure delay in milliseconds to wait for messages in the producer +// queue to accumulate before constructing message batches to transmit to brokers +func WithLingerMs(lingerMs int) ProducerOption { + return func(p *ProducerConfiguration) { + _ = p.KafkaConfig.SetKey("linger.ms", lingerMs) + } +} + +// Configure maximum number of messages batched in one MessageSe +func WithBatchNumMessages(batchNumMessages int) ProducerOption { + return func(p *ProducerConfiguration) { + _ = p.KafkaConfig.SetKey("batch.num.messages", batchNumMessages) + } +} + +// Configure the ack timeout of the producer request in milliseconds +func WithRequestTimeoutMs(requestTimeoutMs int) ProducerOption { + return func(p *ProducerConfiguration) { + _ = p.KafkaConfig.SetKey("request.timeout.ms", requestTimeoutMs) + } +} + +// Configure SASL auth properties +func WithProducerSASLAuth(protocol SecurityProtocol, mechanism SASLMechanism, username string, password string) ProducerOption { + return func(p *ProducerConfiguration) { + _ = p.KafkaConfig.SetKey("security.protocol", string(protocol)) + _ = p.KafkaConfig.SetKey("sasl.mechanism", string(mechanism)) + _ = p.KafkaConfig.SetKey("sasl.username", username) + _ = p.KafkaConfig.SetKey("sasl.password", password) + } +} diff --git a/messagebus/record.go b/messagebus/record.go new file mode 100644 index 0000000..e4d09d7 --- /dev/null +++ b/messagebus/record.go @@ -0,0 +1,28 @@ +package messagebus + +import ( + "time" + + "github.com/actgardner/gogen-avro/v7/container" +) + +type ProducerRecord struct { + Key *MessageKey + Value container.AvroRecord +} + +func NewProducerRecord(key *MessageKey, value container.AvroRecord) *ProducerRecord { + return &ProducerRecord{ + Key: key, + Value: value, + } +} + +type ConsumerRecord struct { + Key *MessageKey + Topic string + Value map[string]interface{} + Partition int32 + Offset string + Timestamp time.Time +} diff --git a/messagebus/schema_registry_client.go b/messagebus/schema_registry_client.go new file mode 100644 index 0000000..d5ac994 --- /dev/null +++ b/messagebus/schema_registry_client.go @@ -0,0 +1,398 @@ +package messagebus + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "regexp" + "strconv" + "sync" + "time" + + "github.com/linkedin/goavro/v2" +) + +// schemaRegistryClient allows interactions with +// Schema Registry over HTTP. Applications using +// this client can retrieve data about schemas, +// which in turn can be used to Serialize and +// Deserialize data. +type schemaRegistryClient struct { + schemaRegistryURL string + credentials *credentials + httpClient *http.Client + cachingEnabled bool + codecCreationEnabled bool + idSchemaCache map[int]*Schema + idSchemaCacheLock sync.RWMutex + subjectSchemaCache map[string]*Schema + subjectSchemaCacheLock sync.RWMutex +} + +type SchemaType string + +const ( + Protobuf SchemaType = "PROTOBUF" + Avro SchemaType = "AVRO" + Json SchemaType = "JSON" +) + +func (s SchemaType) String() string { + return string(s) +} + +// Schema is a data structure that holds all +// the relevant information about schemas. +type Schema struct { + id int + schema string + version int + codec *goavro.Codec +} + +type credentials struct { + username string + password string +} + +type schemaRequest struct { + Schema string `json:"schema"` +} + +type schemaResponse struct { + Subject string `json:"subject"` + Version int `json:"version"` + Schema string `json:"schema"` + ID int `json:"id"` +} + +const ( + schemaByID = "/schemas/ids/%d" + subjectVersions = "/subjects/%s/versions" + subjectByVersion = "/subjects/%s/versions/%s" + contentType = "application/vnd.schemaregistry.v1+json" +) + +// createSchemaRegistryClient creates a client that allows +// interactions with Schema Registry over HTTP. Applications +// using this client can retrieve data about schemas, which +// in turn can be used to Serialize and Deserialize records. +func createSchemaRegistryClient(schemaRegistryURL string) *schemaRegistryClient { + return &schemaRegistryClient{schemaRegistryURL: schemaRegistryURL, + httpClient: &http.Client{Timeout: 5 * time.Second}, + cachingEnabled: true, codecCreationEnabled: true, + idSchemaCache: make(map[int]*Schema), + subjectSchemaCache: make(map[string]*Schema)} +} + +// getSchema gets the schema associated with the given id. +func (client *schemaRegistryClient) getSchema(schemaID int) (*Schema, error) { + + if client.cachingEnabled { + client.idSchemaCacheLock.RLock() + cachedSchema := client.idSchemaCache[schemaID] + client.idSchemaCacheLock.RUnlock() + if cachedSchema != nil { + return cachedSchema, nil + } + } + + resp, err := client.httpRequest("GET", fmt.Sprintf(schemaByID, schemaID), nil) + if err != nil { + return nil, err + } + + var schemaResp = new(schemaResponse) + err = json.Unmarshal(resp, &schemaResp) + if err != nil { + return nil, err + } + var codec *goavro.Codec + if client.codecCreationEnabled { + codec, err = goavro.NewCodec(schemaResp.Schema) + if err != nil { + return nil, err + } + } + var schema = &Schema{ + id: schemaID, + schema: schemaResp.Schema, + codec: codec, + } + + if client.cachingEnabled { + client.idSchemaCacheLock.Lock() + client.idSchemaCache[schemaID] = schema + client.idSchemaCacheLock.Unlock() + } + + return schema, nil +} + +// getLatestSchema gets the schema associated with the given subject. +// The schema returned contains the last version for that subject. +func (client *schemaRegistryClient) getLatestSchema(subject string, isKey bool) (*Schema, error) { + + // In order to ensure consistency, we need + // to temporarily disable caching to force + // the retrieval of the latest release from + // Schema Registry. + cachingEnabled := client.cachingEnabled + client.isCachingEnabled(false) + schema, err := client.getVersion(subject, "latest", isKey) + client.isCachingEnabled(cachingEnabled) + + return schema, err +} + +// getSchemaVersions returns a list of versions from a given subject. +func (client *schemaRegistryClient) getSchemaVersions(subject string, isKey bool) ([]int, error) { + + concreteSubject := subject + resp, err := client.httpRequest("GET", fmt.Sprintf(subjectVersions, concreteSubject), nil) + if err != nil { + return nil, err + } + + var versions = []int{} + err = json.Unmarshal(resp, &versions) + if err != nil { + return nil, err + } + + return versions, nil +} + +// getSchemaByVersion gets the schema associated with the given subject. +// The schema returned contains the version specified as a parameter. +func (client *schemaRegistryClient) getSchemaByVersion(subject string, version int, isKey bool) (*Schema, error) { + return client.getVersion(subject, strconv.Itoa(version), isKey) +} + +// createSchema creates a new schema in Schema Registry and associates +// with the subject provided. It returns the newly created schema with +// all its associated information. +func (client *schemaRegistryClient) createSchema(subject string, schema string, + schemaType SchemaType, isKey bool) (*Schema, error) { + + concreteSubject := subject + + switch schemaType { + case Avro, Json: + compiledRegex := regexp.MustCompile(`\r?\n`) + schema = compiledRegex.ReplaceAllString(schema, " ") + case Protobuf: + break + default: + return nil, fmt.Errorf("invalid schema type. valid values are Avro, Json, or Protobuf") + } + schemaReq := schemaRequest{Schema: schema} + schemaBytes, err := json.Marshal(schemaReq) + if err != nil { + return nil, err + } + payload := bytes.NewBuffer(schemaBytes) + resp, err := client.httpRequest("POST", fmt.Sprintf(subjectVersions, concreteSubject), payload) + if err != nil { + return nil, err + } + + schemaResp := new(schemaResponse) + err = json.Unmarshal(resp, &schemaResp) + if err != nil { + return nil, err + } + // Conceptually, the schema returned below will be the + // exactly same one created above. However, since Schema + // Registry can have multiple concurrent clients writing + // schemas, this may produce an incorrect result. Thus, + // this logic strongly relies on the idempotent guarantees + // from Schema Registry, as well as in the best practice + // that schemas don't change very often. + newSchema, err := client.getSchema(schemaResp.ID) + if err != nil { + return nil, err + } + + if client.cachingEnabled { + + // Update the subject-2-schema cache + cacheKey := cacheKey(concreteSubject, + strconv.Itoa(newSchema.version)) + client.subjectSchemaCacheLock.Lock() + client.subjectSchemaCache[cacheKey] = newSchema + client.subjectSchemaCacheLock.Unlock() + + // Update the id-2-schema cache + client.idSchemaCacheLock.Lock() + client.idSchemaCache[newSchema.id] = newSchema + client.idSchemaCacheLock.Unlock() + + } + + return newSchema, nil +} + +// setCredentials allows users to set credentials to be +// used with Schema Registry, for scenarios when Schema +// Registry has authentication enabled. +func (client *schemaRegistryClient) setCredentials(username string, password string) { + if len(username) > 0 && len(password) > 0 { + credentials := credentials{username, password} + client.credentials = &credentials + } +} + +// setTimeout allows the client to be reconfigured about +// how much time internal HTTP requests will take until +// they timeout. FYI, It defaults to five seconds. +func (client *schemaRegistryClient) setTimeout(timeout time.Duration) { + client.httpClient.Timeout = timeout +} + +// isCachingEnabled allows the client to cache any values +// that have been returned, which may speed up performance +// if these values rarely changes. +func (client *schemaRegistryClient) isCachingEnabled(value bool) { + client.cachingEnabled = value +} + +// isCodecCreationEnabled allows the application to enable/disable +// the automatic creation of codec's when schemas are returned. +func (client *schemaRegistryClient) isCodecCreationEnabled(value bool) { + client.codecCreationEnabled = value +} + +func (client *schemaRegistryClient) getVersion(subject string, + version string, isKey bool) (*Schema, error) { + + concreteSubject := subject + + if client.cachingEnabled { + cacheKey := cacheKey(concreteSubject, version) + client.subjectSchemaCacheLock.RLock() + cachedResult := client.subjectSchemaCache[cacheKey] + client.subjectSchemaCacheLock.RUnlock() + if cachedResult != nil { + return cachedResult, nil + } + } + + resp, err := client.httpRequest("GET", fmt.Sprintf(subjectByVersion, concreteSubject, version), nil) + if err != nil { + return nil, err + } + + schemaResp := new(schemaResponse) + err = json.Unmarshal(resp, &schemaResp) + if err != nil { + return nil, err + } + var codec *goavro.Codec + if client.codecCreationEnabled { + codec, err = goavro.NewCodec(schemaResp.Schema) + if err != nil { + return nil, err + } + } + var schema = &Schema{ + id: schemaResp.ID, + schema: schemaResp.Schema, + version: schemaResp.Version, + codec: codec, + } + + if client.cachingEnabled { + + // Update the subject-2-schema cache + cacheKey := cacheKey(concreteSubject, version) + client.subjectSchemaCacheLock.Lock() + client.subjectSchemaCache[cacheKey] = schema + client.subjectSchemaCacheLock.Unlock() + + // Update the id-2-schema cache + client.idSchemaCacheLock.Lock() + client.idSchemaCache[schema.id] = schema + client.idSchemaCacheLock.Unlock() + + } + + return schema, nil +} + +func (client *schemaRegistryClient) httpRequest(method, uri string, payload io.Reader) ([]byte, error) { + + url := fmt.Sprintf("%s%s", client.schemaRegistryURL, uri) + req, err := http.NewRequest(method, url, payload) + if err != nil { + return nil, err + } + if client.credentials != nil { + req.SetBasicAuth(client.credentials.username, client.credentials.password) + } + req.Header.Set("Content-Type", contentType) + resp, err := client.httpClient.Do(req) + if err != nil { + return nil, err + } + + if resp != nil { + defer resp.Body.Close() + } + if resp.StatusCode < 200 || resp.StatusCode > 299 { + return nil, createError(resp) + } + + return ioutil.ReadAll(resp.Body) +} + +// ID ensures access to ID +func (schema *Schema) ID() int { + return schema.id +} + +// Schema ensures access to Schema +func (schema *Schema) Schema() string { + return schema.schema +} + +// Version ensures access to Version +func (schema *Schema) Version() int { + return schema.version +} + +// Codec ensures access to Codec +func (schema *Schema) Codec() *goavro.Codec { + return schema.codec +} + +func (schema Schema) FullName() string { + var dat map[string]string + _ = json.Unmarshal([]byte(schema.schema), &dat) + switch dat["namespace"] { + case "": + return dat["name"] + default: + return fmt.Sprintf("%s.%s", dat["namespace"], dat["name"]) + } +} + +func cacheKey(subject string, version string) string { + return fmt.Sprintf("%s-%s", subject, version) +} + +func createError(resp *http.Response) error { + decoder := json.NewDecoder(resp.Body) + var errorResp struct { + ErrorCode int `json:"error_code"` + Message string `json:"message"` + } + err := decoder.Decode(&errorResp) + if err == nil { + return fmt.Errorf("%s: %s", resp.Status, errorResp.Message) + } + return fmt.Errorf("%s", resp.Status) +} diff --git a/messagebus/serializer.go b/messagebus/serializer.go new file mode 100644 index 0000000..484c375 --- /dev/null +++ b/messagebus/serializer.go @@ -0,0 +1,157 @@ +package messagebus + +import ( + "bytes" + "encoding/binary" + "encoding/json" + + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +type Serializer struct { + schemaRegistry ISchemaRegistryClient + strategy SubjectStrategy +} + +type SerializedProducerRecord struct { + Key []byte + Value []byte +} + +func NewSerializer(srUrl string, strategy SubjectStrategy) (*Serializer, error) { + client := createSchemaRegistryClient(srUrl) + return &Serializer{schemaRegistry: client, strategy: strategy}, nil +} + +func (s Serializer) Serialize(topic string, record *ProducerRecord) (*SerializedProducerRecord, error) { + valueBytes, valueSubject, err := s.serializeValue(topic, record) + if err != nil { + return nil, err + } + record.Key.SetValueSubject(valueSubject) + keyBytes, err := s.serializeKey(topic, record) + if err != nil { + return nil, err + } + serializedRecord := &SerializedProducerRecord{ + Key: keyBytes, + Value: valueBytes, + } + return serializedRecord, nil +} + +func (s Serializer) serializeValue(topic string, record *ProducerRecord) ([]byte, string, error) { + schemaStr := record.Value.Schema() + valueSubject, err := prepareSubjectName(topic, schemaStr, s.strategy, false) + if err != nil { + return nil, "", err + } + + schema, err := s.schemaRegistry.createSchema(valueSubject, schemaStr, Avro, false) + if err != nil { + return nil, "", err + } + var buf bytes.Buffer + err = record.Value.Serialize(&buf) + if err != nil { + return nil, "", err + } + schemaIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID())) + + var data []byte + data = append(data, byte(0)) + data = append(data, schemaIDBytes...) + data = append(data, buf.Bytes()...) + + return data, valueSubject, nil +} + +func (s Serializer) serializeKey(topic string, record *ProducerRecord) ([]byte, error) { + subject, err := prepareSubjectName(topic, record.Key.Schema(), s.strategy, true) + if err != nil { + return nil, err + } + schema, err := s.schemaRegistry.createSchema(subject, record.Key.Schema(), Avro, true) + if err != nil { + return nil, err + } + schemaIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID())) + + var buf bytes.Buffer + err = record.Key.Serialize(&buf) + + var data []byte + data = append(data, byte(0)) + data = append(data, schemaIDBytes...) + data = append(data, buf.Bytes()...) + return data, nil +} + +func (s Serializer) Deserialize(message *kafka.Message) (*ConsumerRecord, error) { + var key *MessageKey + if message.Key != nil { + avroDeserializedKey, err := s.deserializeBytes(message.Key) + if err != nil { + return nil, err + } + key, err = s.decodeKey(avroDeserializedKey) + if err != nil { + return nil, err + } + } + var value map[string]interface{} + if message.Value != nil { + avroDeserializedValue, err := s.deserializeBytes(message.Value) + if err != nil { + return nil, err + } + value, err = s.decodeValue(avroDeserializedValue) + if err != nil { + return nil, err + } + } + + return &ConsumerRecord{ + Key: key, + Topic: *message.TopicPartition.Topic, + Value: value, + Partition: message.TopicPartition.Partition, + Offset: message.TopicPartition.Offset.String(), + Timestamp: message.Timestamp, + }, nil +} + +func (s Serializer) deserializeBytes(bytes []byte) ([]byte, error) { + schemaID := binary.BigEndian.Uint32(bytes[1:5]) + schema, err := s.schemaRegistry.getSchema(int(schemaID)) + if err != nil { + return nil, err + } + native, _, err := schema.Codec().NativeFromBinary(bytes[5:]) + if err != nil { + return nil, err + } + deserializedBytes, err := schema.Codec().TextualFromNative(nil, native) + return deserializedBytes, err + +} + +func (s Serializer) decodeKey(deserializedKey []byte) (*MessageKey, error) { + var key MessageKey + err := json.Unmarshal(deserializedKey, &key) + if err != nil { + return nil, err + } + return &key, nil +} + +func (s Serializer) decodeValue(deserializedValue []byte) (map[string]interface{}, error) { + var value map[string]interface{} + err := json.Unmarshal(deserializedValue, &value) + if err != nil { + return nil, err + } + return value, nil +} diff --git a/messagebus/subject_strategy.go b/messagebus/subject_strategy.go new file mode 100644 index 0000000..39dc6aa --- /dev/null +++ b/messagebus/subject_strategy.go @@ -0,0 +1,35 @@ +package messagebus + +import ( + "errors" + "fmt" +) + +type SubjectStrategy int + +const ( + TOPIC_NAME_STRATEGY = iota + TOPIC_RECORD_NAME_STRATEGY + RECORD_NAME_STRATEGY +) + +func (s SubjectStrategy) String() string { + return [...]string{"TOPIC_NAME_STRATEGY", "TOPIC_RECORD_NAME_STRATEGY", "RECORD_NAME_STRATEGY"}[s] +} + +func prepareSubjectName(topic string, schemaStr string, strategy SubjectStrategy, isKey bool) (string, error) { + schema := Schema{schema: schemaStr} + switch strategy { + case TOPIC_NAME_STRATEGY: + if isKey { + return fmt.Sprintf("%s-key", topic), nil + } + return fmt.Sprintf("%s-value", topic), nil + case TOPIC_RECORD_NAME_STRATEGY: + return fmt.Sprintf("%s-%s", topic, schema.FullName()), nil + case RECORD_NAME_STRATEGY: + return schema.FullName(), nil + default: + return "", errors.New("unknown subject strategy") + } +}