From 0c42c3816710eed6a0308003f6c807dd912538cd Mon Sep 17 00:00:00 2001 From: YenchangChan Date: Mon, 6 May 2024 13:52:15 +0800 Subject: [PATCH] fix: rebalance issue - risk of int overflow - rebalance by offset and partition --- task/task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task/task.go b/task/task.go index 28fcd946..a0865046 100644 --- a/task/task.go +++ b/task/task.go @@ -209,7 +209,7 @@ func (service *Service) Put(msg *model.InputMessage, traceId string, flushFn fun util.Logger.Fatal("shard number calculation failed", zap.String("task", taskCfg.Name), zap.Error(err)) } } else { - msgRow.Shard = int(msgRow.Msg.Offset>>service.offShift) % service.sharder.shards + msgRow.Shard = int(msgRow.Msg.Offset * (int64(msgRow.Msg.Partition + 1)) >> service.offShift % int64(service.sharder.shards)) } service.sharder.PutElement(&msgRow) }