From 1acbe8f6afafb99ba9a0db9a82a33338c7a165fe Mon Sep 17 00:00:00 2001 From: Anjula Date: Wed, 1 Apr 2020 02:24:59 +0530 Subject: [PATCH] Changing Batcher implementation --- batcher.go | 124 +++++++++++++++++++++++++++++------------------- batcher_test.go | 68 ++++++++++++++++---------- worker.go | 22 --------- 3 files changed, 116 insertions(+), 98 deletions(-) delete mode 100644 worker.go diff --git a/batcher.go b/batcher.go index af6c00d..320e136 100644 --- a/batcher.go +++ b/batcher.go @@ -6,83 +6,107 @@ import ( "time" ) -type Function func(workerID int, data []interface{}) +type Function func(data []interface{}) bool type BatchConfig struct { - size int - WaitTime time.Duration - Func Function - items []interface{} - batchChan chan []interface{} + MaxCapacity int + WaitTime time.Duration + function Function + batchChan chan interface{} + mutex sync.RWMutex } -var mutex = &sync.Mutex{} - //Initialises a new instance -func NewBatcher(size int, waitTime time.Duration, numWorkers int, funct Function) (b *BatchConfig, err error) { - +func NewBatcher(maxJobs int, waitTime time.Duration, f Function) (*BatchConfig, error) { switch { - case size <= 0: + case maxJobs <= 0: return &BatchConfig{}, errors.New("invalid size") case waitTime <= 0: return &BatchConfig{}, errors.New("invalid wait time") - case numWorkers <= 0: - return &BatchConfig{}, errors.New("invalid number of workers") } - batch := &BatchConfig{ - size: size, - WaitTime: waitTime, - Func: funct, - items: make([]interface{}, 0), //initialize empty slice - batchChan: make(chan []interface{}, numWorkers), + + return &BatchConfig{ + MaxCapacity: maxJobs, + WaitTime: waitTime, + function: f, + }, nil +} + +//This function helps to insert an item +func (b *BatchConfig) Insert(item interface{}) (bool, error) { + if item == nil { + return false, errors.New("item inserted is null") } - batch.worker(numWorkers) - go batch.autoDump() + b.mutex.Lock() + defer b.mutex.Unlock() - return batch, nil + if b.batchChan == nil { + b.batchChan = make(chan interface{}, b.MaxCapacity) + go b.dumper() + } + + b.batchChan <- item + + return true, nil } -//This function helps to insert item to array -func (b *BatchConfig) Insert(item interface{}) bool { - mutex.Lock() - defer mutex.Unlock() - if len(b.items) < b.size { - b.items = append(b.items, item) - return true +func (b *BatchConfig) InsertItems(items []interface{}) (bool, error) { + if items == nil { + return false, errors.New("items inserted is null") } - return false -} + b.mutex.Lock() + defer b.mutex.Unlock() + batchLen := len(items) + // If the length of batch is larger than maxCapacity + if batchLen > b.MaxCapacity { + items = items[:b.MaxCapacity] + } -func (b *BatchConfig) dump() { - copiedItems := make([]interface{},len(b.items)) - if len(b.items) != 0 { - mutex.Lock() - copy(copiedItems,b.items) - b.items = b.items[:0] - b.batchChan <- copiedItems - mutex.Unlock() + if b.batchChan == nil { + b.batchChan = make(chan interface{}, b.MaxCapacity) + go b.dumper() } -} -func (b *BatchConfig) autoDump() { - if len(b.items) == b.size { - b.dump() - }else { - b.timeout() + for _, item := range items { + b.batchChan <- item } + return true, nil } -//This function heps to with timeout dump -func (b *BatchConfig) timeout() { - ticker := time.NewTicker(b.WaitTime) + + + +func (b *BatchConfig) dumper() { + var batch []interface{} + timer := time.NewTimer(b.WaitTime) + for { select { - case <- ticker.C: - b.dump() + case <-timer.C: + b.function(batch) + b.close() + return + case item := <-b.batchChan: + batch = append(batch, item) + if len(batch) >= b.MaxCapacity { + // Callback with batch + b.function(batch) + // Init batch array + batch = []interface{}{} + } } } } + +func (b *BatchConfig) close() { + b.mutex.Lock() + defer b.mutex.Unlock() + if b.batchChan != nil { + close(b.batchChan) + b.batchChan = nil + } +} diff --git a/batcher_test.go b/batcher_test.go index 4857eb0..b440a6f 100644 --- a/batcher_test.go +++ b/batcher_test.go @@ -1,16 +1,14 @@ package batcher import ( - "fmt" - "log" "testing" "time" ) -func DummyBatchFn1(workerID int, data []interface{}) { - for _,v := range data{ - log.Println(fmt.Sprintf("[WokerID]: %d [data]: %d",workerID, v)) - } + + +func DummyBatchFn1(data []interface{}) bool{ + return true } type batch struct { @@ -59,34 +57,52 @@ func TestNewBatcher(t *testing.T) { } for _, tt := range newBatcherTest { - _, err := NewBatcher(tt.batch.size, tt.batch.waitTime, tt.batch.numWorkers, tt.batch.funct) - - if err != nil { - response := false - if response != tt.response { - t.Error("Failed: NewBatcher Test") - } - } else { - response := true - if response != tt.response { - t.Error("Failed: NewBatcher Test") + _,err :=NewBatcher(tt.batch.size, tt.batch.waitTime,tt.batch.funct) + if err!= nil{ + if tt.response != false{ + t.Error("[ERROR]: NewBatcher function") } - } + } + } } +func BenchmarkNewBatcher(b *testing.B) { + ba := batch{ + size: 1, + waitTime: 15*time.Second, + numWorkers: 1, + funct: DummyBatchFn1, + } + for i :=0; i