Skip to content

Commit

Permalink
Changing Batcher implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Anjula authored and Anjula committed Mar 31, 2020
1 parent 562e0f8 commit 1acbe8f
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 98 deletions.
124 changes: 74 additions & 50 deletions batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,83 +6,107 @@ import (
"time"
)

type Function func(workerID int, data []interface{})
type Function func(data []interface{}) bool

type BatchConfig struct {
size int
WaitTime time.Duration
Func Function
items []interface{}
batchChan chan []interface{}
MaxCapacity int
WaitTime time.Duration
function Function
batchChan chan interface{}
mutex sync.RWMutex
}

var mutex = &sync.Mutex{}

//Initialises a new instance
func NewBatcher(size int, waitTime time.Duration, numWorkers int, funct Function) (b *BatchConfig, err error) {

func NewBatcher(maxJobs int, waitTime time.Duration, f Function) (*BatchConfig, error) {
switch {
case size <= 0:
case maxJobs <= 0:
return &BatchConfig{}, errors.New("invalid size")
case waitTime <= 0:
return &BatchConfig{}, errors.New("invalid wait time")
case numWorkers <= 0:
return &BatchConfig{}, errors.New("invalid number of workers")
}

batch := &BatchConfig{
size: size,
WaitTime: waitTime,
Func: funct,
items: make([]interface{}, 0), //initialize empty slice
batchChan: make(chan []interface{}, numWorkers),

return &BatchConfig{
MaxCapacity: maxJobs,
WaitTime: waitTime,
function: f,
}, nil
}

//This function helps to insert an item
func (b *BatchConfig) Insert(item interface{}) (bool, error) {
if item == nil {
return false, errors.New("item inserted is null")
}
batch.worker(numWorkers)

go batch.autoDump()
b.mutex.Lock()
defer b.mutex.Unlock()

return batch, nil
if b.batchChan == nil {
b.batchChan = make(chan interface{}, b.MaxCapacity)
go b.dumper()
}

b.batchChan <- item

return true, nil
}

//This function helps to insert item to array
func (b *BatchConfig) Insert(item interface{}) bool {
mutex.Lock()
defer mutex.Unlock()
if len(b.items) < b.size {
b.items = append(b.items, item)
return true
func (b *BatchConfig) InsertItems(items []interface{}) (bool, error) {
if items == nil {
return false, errors.New("items inserted is null")
}

return false
}
b.mutex.Lock()
defer b.mutex.Unlock()

batchLen := len(items)
// If the length of batch is larger than maxCapacity
if batchLen > b.MaxCapacity {
items = items[:b.MaxCapacity]
}

func (b *BatchConfig) dump() {
copiedItems := make([]interface{},len(b.items))
if len(b.items) != 0 {
mutex.Lock()
copy(copiedItems,b.items)
b.items = b.items[:0]
b.batchChan <- copiedItems
mutex.Unlock()
if b.batchChan == nil {
b.batchChan = make(chan interface{}, b.MaxCapacity)
go b.dumper()
}
}

func (b *BatchConfig) autoDump() {
if len(b.items) == b.size {
b.dump()
}else {
b.timeout()
for _, item := range items {
b.batchChan <- item
}
return true, nil
}

//This function heps to with timeout dump
func (b *BatchConfig) timeout() {
ticker := time.NewTicker(b.WaitTime)



func (b *BatchConfig) dumper() {
var batch []interface{}
timer := time.NewTimer(b.WaitTime)

for {
select {
case <- ticker.C:
b.dump()
case <-timer.C:
b.function(batch)
b.close()
return
case item := <-b.batchChan:
batch = append(batch, item)
if len(batch) >= b.MaxCapacity {
// Callback with batch
b.function(batch)
// Init batch array
batch = []interface{}{}
}
}
}
}

func (b *BatchConfig) close() {
b.mutex.Lock()
defer b.mutex.Unlock()
if b.batchChan != nil {
close(b.batchChan)
b.batchChan = nil
}
}
68 changes: 42 additions & 26 deletions batcher_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package batcher

import (
"fmt"
"log"
"testing"
"time"
)

func DummyBatchFn1(workerID int, data []interface{}) {
for _,v := range data{
log.Println(fmt.Sprintf("[WokerID]: %d [data]: %d",workerID, v))
}


func DummyBatchFn1(data []interface{}) bool{
return true
}

type batch struct {
Expand Down Expand Up @@ -59,34 +57,52 @@ func TestNewBatcher(t *testing.T) {
}

for _, tt := range newBatcherTest {
_, err := NewBatcher(tt.batch.size, tt.batch.waitTime, tt.batch.numWorkers, tt.batch.funct)

if err != nil {
response := false
if response != tt.response {
t.Error("Failed: NewBatcher Test")
}
} else {
response := true
if response != tt.response {
t.Error("Failed: NewBatcher Test")
_,err :=NewBatcher(tt.batch.size, tt.batch.waitTime,tt.batch.funct)
if err!= nil{
if tt.response != false{
t.Error("[ERROR]: NewBatcher function")
}
}
}

}

}

func BenchmarkNewBatcher(b *testing.B) {
ba := batch{
size: 1,
waitTime: 15*time.Second,
numWorkers: 1,
funct: DummyBatchFn1,
}
for i :=0; i<b.N; i++ {
_,err :=NewBatcher(ba.size,ba.waitTime,ba.funct)
if err !=nil{
b.Error("[ERROR]: NewBatcher function - Benchmarks")
}
}
}

func TestBatchConfig_Insert(t *testing.T) {
batch, err := NewBatcher(10,3,2, DummyBatchFn1)
if err !=nil{
t.Error("Failed: Insert Function Test : New Batcher")
batch,err := NewBatcher(60, 10*time.Millisecond, DummyBatchFn1)

if err != nil{
t.Error("[ERROR]: NewBatcher function - Insert")
}
for i:=1; i<=10; i++ {
check := batch.Insert(i)

if check != true {
t.Error("Failed: Insert Function Test")
for i:=1; i<=1000; i++ {
_, err :=batch.Insert(i)
if err != nil{
t.Error("[ERROR]: Insert function")
}
}

}
}

func TestBatchConfig_InsertItems(t *testing.T) {
_, err := NewBatcher(10, 10*time.Millisecond, DummyBatchFn1)

if err != nil{
t.Error("[ERROR]: NewBatcher function - Insert")
}
}
22 changes: 0 additions & 22 deletions worker.go

This file was deleted.

0 comments on commit 1acbe8f

Please sign in to comment.