diff --git a/task/task.go b/task/task.go index 28fcd946..a0865046 100644 --- a/task/task.go +++ b/task/task.go @@ -209,7 +209,7 @@ func (service *Service) Put(msg *model.InputMessage, traceId string, flushFn fun util.Logger.Fatal("shard number calculation failed", zap.String("task", taskCfg.Name), zap.Error(err)) } } else { - msgRow.Shard = int(msgRow.Msg.Offset>>service.offShift) % service.sharder.shards + msgRow.Shard = int(msgRow.Msg.Offset * (int64(msgRow.Msg.Partition + 1)) >> service.offShift % int64(service.sharder.shards)) } service.sharder.PutElement(&msgRow) }