Skip to content

Commit

Permalink
Merge pull request #20 from jeroenrinzema/0.3.0-rc
Browse files Browse the repository at this point in the history
v0.3.0
  • Loading branch information
jeroenrinzema authored Aug 16, 2019
2 parents 5cbe493 + b5b734c commit c295f3c
Show file tree
Hide file tree
Showing 39 changed files with 717 additions and 1,136 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ go:

install: false
script:
- go test ./... -v -mod=vendor -race -count=1 -coverprofile=coverage.txt -covermode=atomic -timeout=5s
- go test ./... -v -mod=vendor -race -count=1 -coverprofile=coverage.txt -covermode=atomic -timeout=30s

after_success:
- bash <(curl -s https://codecov.io/bash)
2 changes: 1 addition & 1 deletion breaker.go → circuit/main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package commander
package circuit

import (
"sync"
Expand Down
6 changes: 5 additions & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
ignore:
- "examples"
- "examples"
coverage:
range: 70..100
round: down
percision: 2
108 changes: 0 additions & 108 deletions command.go

This file was deleted.

87 changes: 0 additions & 87 deletions command_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion dialects/kafka/consumer/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (handle *GroupHandle) ConsumeClaim(session sarama.ConsumerGroupSession, cla

go func(message *sarama.ConsumerMessage) {
err := handle.client.Claim(message)
if err != nil {
if err == ErrRetry {
// Mark the message to be consumed again
session.ResetOffset(message.Topic, message.Partition, message.Offset, "")
return
Expand Down
31 changes: 16 additions & 15 deletions dialects/kafka/consumer/main.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package consumer

import (
"errors"
"sync"

"github.com/Shopify/sarama"
"github.com/jeroenrinzema/commander/dialects/kafka/metadata"
"github.com/jeroenrinzema/commander/types"
)

var (
// ErrRetry error retry type representation
ErrRetry = errors.New("retry message")
)

// HandleType represents the type of consumer that is adviced to use for the given connectionstring
type HandleType int8

Expand Down Expand Up @@ -41,7 +47,6 @@ type Claimer interface {
// Subscription represents a consumer topic(s) subscription
type Subscription struct {
messages chan *types.Message
marked chan error
}

// Topic represents a thread safe list of subscriptions
Expand Down Expand Up @@ -95,10 +100,9 @@ func (client *Client) Connect(initialOffset int64, config *sarama.Config, ts ...
}

// Subscribe subscribes to the given topics and returs a message channel
func (client *Client) Subscribe(topics ...types.Topic) (<-chan *types.Message, func(error), error) {
func (client *Client) Subscribe(topics ...types.Topic) (<-chan *types.Message, error) {
subscription := &Subscription{
marked: make(chan error, 1),
messages: make(chan *types.Message, 1),
messages: make(chan *types.Message, 0),
}

for _, topic := range topics {
Expand All @@ -109,11 +113,7 @@ func (client *Client) Subscribe(topics ...types.Topic) (<-chan *types.Message, f
client.topics[topic.Name].subscriptions[subscription.messages] = subscription
}

next := func(err error) {
subscription.marked <- err
}

return subscription.messages, next, nil
return subscription.messages, nil
}

// Unsubscribe removes the given channel from the available subscriptions.
Expand All @@ -126,7 +126,6 @@ func (client *Client) Unsubscribe(sub <-chan *types.Message) error {
if has {
delete(topic.subscriptions, sub)
close(subscription.messages)
close(subscription.marked)
}
topic.mutex.Unlock()
}(topic, sub)
Expand All @@ -148,19 +147,21 @@ func (client *Client) Claim(consumed *sarama.ConsumerMessage) (err error) {
message := metadata.MessageFromMessage(consumed)

client.topics[topic].mutex.RLock()
defer client.topics[topic].mutex.RUnlock()

for _, subscription := range client.topics[topic].subscriptions {
message.Async()
select {
case subscription.messages <- message:
err = <-subscription.marked
if err != nil {
break
result := message.Await()
if result != nil {
return ErrRetry
}
default:
}
}
client.topics[topic].mutex.RUnlock()

return err
return nil
}

// Close closes the Kafka consumer
Expand Down
23 changes: 9 additions & 14 deletions dialects/kafka/metadata/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/Shopify/sarama"
"github.com/jeroenrinzema/commander"
"github.com/jeroenrinzema/commander/metadata"
"github.com/jeroenrinzema/commander/types"
)

Expand Down Expand Up @@ -50,7 +49,7 @@ headers:
continue headers
}

message.Ctx = metadata.NewStatusCodeContext(message.Ctx, types.StatusCode(status))
message.Status = types.StatusCode(status)
break
case HeaderVersion:
version, err := strconv.ParseInt(string(record.Value), 10, 8)
Expand All @@ -64,7 +63,7 @@ headers:
message.EOS = message.EOS.Parse(string(record.Value))
break
case HeaderParentID:
message.Ctx = metadata.NewParentIDContext(message.Ctx, types.ParentID(record.Value))
message.Ctx = types.NewParentIDContext(message.Ctx, types.ParentID(record.Value))
break
case HeaderParentTimestamp:
unix, err := strconv.ParseInt(string(record.Value), 10, 64)
Expand All @@ -73,7 +72,7 @@ headers:
}

time := types.ParentTimestamp(time.Unix(0, unix))
message.Ctx = metadata.NewParentTimestampContext(message.Ctx, time)
message.Ctx = types.NewParentTimestampContext(message.Ctx, time)
break
default:
headers[key] = strings.Split(string(record.Value), types.HeaderValueDevider)
Expand Down Expand Up @@ -103,33 +102,29 @@ func MessageToMessage(produce *commander.Message) *sarama.ProducerMessage {
Key: []byte(HeaderEOS),
Value: []byte(produce.EOS.String()),
},
}

status, has := metadata.StatusCodeFromContext(produce.Ctx)
if has {
headers = append(headers, sarama.RecordHeader{
sarama.RecordHeader{
Key: []byte(HeaderStatusCode),
Value: []byte(strconv.Itoa(int(status))),
})
Value: []byte(strconv.Itoa(int(produce.Status))),
},
}

parent, has := metadata.ParentIDFromContext(produce.Ctx)
parent, has := types.ParentIDFromContext(produce.Ctx)
if has {
headers = append(headers, sarama.RecordHeader{
Key: []byte(HeaderParentID),
Value: []byte(parent),
})
}

timestamp, has := metadata.ParentTimestampFromContext(produce.Ctx)
timestamp, has := types.ParentTimestampFromContext(produce.Ctx)
if has {
headers = append(headers, sarama.RecordHeader{
Key: []byte(HeaderParentTimestamp),
Value: []byte(strconv.Itoa(int(time.Time(timestamp).UnixNano()))),
})
}

kv, has := metadata.HeaderFromContext(produce.Ctx)
kv, has := types.HeaderFromContext(produce.Ctx)
if has {
for key, value := range kv {
headers = append(headers, sarama.RecordHeader{
Expand Down
Loading

0 comments on commit c295f3c

Please sign in to comment.