Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Map's concurrency issues #1100

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type pushConsumer struct {
queueMaxSpanFlowControlTimes int
consumeFunc utils.Set
submitToConsume func(*processQueue, *primitive.MessageQueue)
subscribedTopic map[string]string
subscribedTopic *Sets // 原本是一个 map[string]string,且没加锁,存在并发问题
interceptor primitive.Interceptor
queueLock *QueueLock
done chan struct{}
Expand Down Expand Up @@ -112,7 +112,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {

p := &pushConsumer{
defaultConsumer: dc,
subscribedTopic: make(map[string]string, 0),
subscribedTopic: NewSets(),
queueLock: newQueueLock(),
done: make(chan struct{}, 1),
consumeFunc: utils.NewSet(),
Expand Down Expand Up @@ -235,13 +235,20 @@ func (pc *pushConsumer) Start() error {
}

pc.client.UpdateTopicRouteInfo()
for k := range pc.subscribedTopic {
_, exist := pc.topicSubscribeInfoTable.Load(k)
var notExistKey string
pc.subscribedTopic.Each(func(key interface{}) bool {
_, exist := pc.topicSubscribeInfoTable.Load(key)
if !exist {
pc.Shutdown()
return fmt.Errorf("the topic=%s route info not found, it may not exist", k)
notExistKey = key.(string)
return false
}
return true
})
if notExistKey != "" {
pc.shutdown()
return fmt.Errorf("the topic=%s route info not found, it may not exist", notExistKey)
}

pc.client.CheckClientInBroker()
pc.client.SendHeartbeatToAllBrokerWithLock()
go pc.client.RebalanceImmediately()
Expand Down Expand Up @@ -302,7 +309,7 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
}
data := buildSubscriptionData(topic, selector)
pc.subscriptionDataTable.Store(topic, data)
pc.subscribedTopic[topic] = ""
pc.subscribedTopic.Set(topic)

pc.consumeFunc.Add(&PushConsumerCallback{
f: f,
Expand Down Expand Up @@ -502,9 +509,7 @@ func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*pr
rlog.LogKeyValueChangedFrom: data.SubVersion,
rlog.LogKeyValueChangedTo: newVersion,
})
data.Lock()
data.SubVersion = newVersion
data.Unlock()

// TODO: optimize
count := 0
Expand Down Expand Up @@ -550,7 +555,7 @@ func (pc *pushConsumer) validate() error {
return fmt.Errorf("consumerGroup can't equal [%s], please specify another one", internal.DefaultConsumerGroup)
}

if len(pc.subscribedTopic) == 0 {
if pc.subscribedTopic.Len() == 0 {
rlog.Warning("not subscribe any topic yet", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
Expand Down
48 changes: 48 additions & 0 deletions consumer/set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package consumer

import "sync"

// Sets 集合
// pushConsumer.subscribedTopic 原本使用的未加锁的 map[string]string,会有并发问题(concurrent map writes)
// 且原本的使用中只用了 key字段,这里单独写了一个 Sets 实现,替换 pushConsumer.subscribedTopic 解决 concurrent map writes 问题
type Sets struct {
lock *sync.RWMutex
m map[interface{}]struct{}
}

func NewSets() *Sets {
return &Sets{
lock: new(sync.RWMutex),
m: make(map[interface{}]struct{}),
}
}

func (m *Sets) Len() int {
m.lock.RLock()
defer m.lock.RUnlock()
return len(m.m)
}

func (m *Sets) Set(key interface{}) {
m.lock.Lock()
defer m.lock.Unlock()
m.m[key] = struct{}{}
}

func (m *Sets) Exist(key interface{}) bool {
m.lock.RLock()
defer m.lock.RUnlock()
_, ok := m.m[key]
return ok
}

func (m *Sets) Each(f func(k interface{}) bool) bool {
m.lock.RLock()
defer m.lock.RUnlock()
for k := range m.m {
if !f(k) {
return false
}
}
return true
}