diff --git a/README.md b/README.md index cb0f2a1..59bd848 100644 --- a/README.md +++ b/README.md @@ -101,7 +101,7 @@ In summary, Paota facilitates the asynchronous processing of tasks in a distribu The `Config` struct holds all configuration options for Paota. It includes the following parameters: -- **Broker**: The message broker to be used. Currently, only "amqp" is supported. +- **Broker**: The message broker to be used. Currently, only "amqp" and "redis" is supported. - **Store**: The type of storage to be used (optional). - **TaskQueueName**: The name of the task queue. Default value is "paota_tasks". - **StoreQueueName**: The name of the storage queue (optional). @@ -245,4 +245,32 @@ Total Records Processed: 10 Lakh data records. Thank you for flying Paota! - +# Redis broker +The Redis broker acts as a message queue for tasks, enabling asynchronous task processing using a worker pool. The tasks are serialized and stored in Redis, and workers consume these tasks for execution. + +## Redis broker workflow + 1. Create Worker Pool: Initialize the worker pool with the Redis broker configuration. + 2. Register Tasks: Define and register task handlers (e.g., Print task). + 3. Publish Tasks: Use the SendTaskWithContext method to publish tasks to the Redis queue. + 4. Process Tasks: Workers consume tasks from the Redis queue and execute the corresponding handlers. + +## Sample Task Format + { + "UUID": "task_8341a57b-d26d-4bec-94bc-9ef911dc5072", + "Name": "Print", + "Args": [ + { + "Name": "Arg_Name", + "Type": "string", + "Value": "{\"id\":\"1\",\"name\":\"Jane Doe\",\"email\":\"jane.doe@example.com\"}" + } + ], + "RoutingKey": "", + "Priority": 0, + "RetryCount": 10, + "RetryTimeout": 0, + "WaitTime": 0, + "RetriesDone": 0, + "IgnoreWhenTaskNotRegistered": true, + "ETA": null + } diff --git a/config/config.go b/config/config.go index 41b25d1..db84685 100644 --- a/config/config.go +++ b/config/config.go @@ -17,12 +17,13 @@ type ConfigProvider interface { // Config holds all configuration for Paota type Config struct { - Broker string `env:"BROKER" envDefault:"amqp" validate:"required,oneof=amqp"` //allowed amqp + Broker string `env:"BROKER" envDefault:"amqp" validate:"required,oneof=amqp redis"` //allowed amqp and redis Store string `env:"STORE"` TaskQueueName string `env:"QUEUE_NAME" envDefault:"paota_tasks" validate:"required"` StoreQueueName string `env:"STORE_QUEUE_NAME"` AMQP *AMQPConfig `envPrefix:"AMQP_"` MongoDB *MongoDBConfig `envPrefix:"MONGO_"` + Redis *RedisConfig `envPrefix:"REDIS_"` } type configProvider struct { diff --git a/config/redis.go b/config/redis.go new file mode 100644 index 0000000..bae3160 --- /dev/null +++ b/config/redis.go @@ -0,0 +1,20 @@ +package config + +type RedisConfig struct { + Address string `env:"URL" envDefault:"redis://localhost:6379"` + MaxIdle int `env:"MAX_IDLE" envDefault:"10"` + MaxActive int `env:"MAX_ACTIVE" envDefault:"100"` + IdleTimeout int `env:"IDLE_TIMEOUT" envDefault:"300"` + Wait bool `env:"WAIT" envDefault:"true"` + ReadTimeout int `env:"READ_TIMEOUT" envDefault:"15"` + WriteTimeout int `env:"WRITE_TIMEOUT" envDefault:"15"` + ConnectTimeout int `env:"CONNECT_TIMEOUT" envDefault:"15"` + NormalTasksPollPeriod int `env:"NORMAL_TASKS_POLL_PERIOD" envDefault:"1000"` + DelayedTasksPollPeriod int `env:"DELAYED_TASKS_POLL_PERIOD" envDefault:"1000"` + DelayedTasksKey string `env:"DELAYED_TASKS_KEY" envDefault:"paota_tasks_delayed"` + ClientName string `env:"CLIENT_NAME" envDefault:"app_redis_client"` + MasterName string `env:"MASTER_NAME" envDefault:""` + ClusterEnabled bool `env:"CLUSTER_ENABLED" envDefault:"false"` + RetryTimeout int `env:"RETRY_TIMEOUT" envDefault:"30"` + RetryCount int `env:"RETRY_COUNT" envDefault:"3"` +} diff --git a/example/redis/consumer/main.go b/example/redis/consumer/main.go new file mode 100644 index 0000000..6e6a0ab --- /dev/null +++ b/example/redis/consumer/main.go @@ -0,0 +1,160 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "os" + "sync" + "time" + + "github.com/surendratiwari3/paota/config" + "github.com/surendratiwari3/paota/logger" + "github.com/surendratiwari3/paota/schema" + "github.com/surendratiwari3/paota/workerpool" +) + +type printWorker struct { + workerPool workerpool.Pool +} + +type retryTestWorker struct { + workerPool workerpool.Pool + attempts map[string]int // Track attempts per task + mu sync.Mutex // Protect the map +} + +type scheduledWorker struct { + workerPool workerpool.Pool +} + +func main() { + // Configure Redis Broker + cnf := config.Config{ + Broker: "redis", + TaskQueueName: "paota_task_queue", + Redis: &config.RedisConfig{ + Address: "localhost:6379", // Replace with your Redis server address + DelayedTasksKey: "paota_delayed_tasks", // Key for delayed/scheduled tasks + }, + } + + // Set the configuration + err := config.GetConfigProvider().SetApplicationConfig(cnf) + if err != nil { + logger.ApplicationLogger.Error("config error, exit", err) + return + } + + // Create a new worker pool + newWorkerPool, err := workerpool.NewWorkerPool(context.Background(), 10, "testWorker") + if err != nil { + logger.ApplicationLogger.Error("workerPool is not created", err) + os.Exit(0) + } else if newWorkerPool == nil { + logger.ApplicationLogger.Info("workerPool is nil") + os.Exit(0) + } + + // Create the worker instances + printWorker := printWorker{workerPool: newWorkerPool} + retryWorker := &retryTestWorker{ + workerPool: newWorkerPool, + attempts: make(map[string]int), + } + scheduledWorker := scheduledWorker{workerPool: newWorkerPool} + + logger.ApplicationLogger.Info("newWorkerPool created successfully") + + // Register tasks + regTasks := map[string]interface{}{ + "Print": printWorker.Print, + "RetryTest": retryWorker.RetryTest, + "ScheduledTask": scheduledWorker.ScheduledTask, + } + err = newWorkerPool.RegisterTasks(regTasks) + if err != nil { + logger.ApplicationLogger.Error("error while registering tasks", err) + return + } + + logger.ApplicationLogger.Info("Worker is also started") + + // Start the worker pool + err = newWorkerPool.Start() + if err != nil { + logger.ApplicationLogger.Error("error while starting worker", err) + } +} + +// Print is the task handler for the "Print" task +func (wp printWorker) Print(arg *schema.Signature) error { + // Deserialize the task argument + var user map[string]interface{} + err := json.Unmarshal([]byte(arg.Args[0].Value.(string)), &user) + if err != nil { + logger.ApplicationLogger.Error("failed to parse task argument", err) + return err + } + + logger.ApplicationLogger.Infof("Processing task: %v", user) + return nil +} + +func (w *retryTestWorker) RetryTest(arg *schema.Signature) error { + w.mu.Lock() + w.attempts[arg.UUID]++ + attempts := w.attempts[arg.UUID] + w.mu.Unlock() + + logger.ApplicationLogger.Info("Processing RetryTest task", + "taskID", arg.UUID, + "attempt", attempts, + "data", arg.Args[0].Value, + ) + + // Fail first 3 attempts + if attempts <= 3 { + return fmt.Errorf("intentional failure, attempt %d/3", attempts) + } + + // Succeed on 4th attempt + logger.ApplicationLogger.Info("RetryTest task succeeded", + "taskID", arg.UUID, + "attempts", attempts, + ) + return nil +} + +func (sw scheduledWorker) ScheduledTask(arg *schema.Signature) error { + // Log when the task actually executes + executionTime := time.Now().UTC() + + // Parse the scheduled time from args if provided + var scheduledTime time.Time + if len(arg.Args) > 0 { + if timeStr, ok := arg.Args[0].Value.(string); ok { + if parsed, err := time.Parse(time.RFC3339, timeStr); err == nil { + scheduledTime = parsed + } + } + } + + logger.ApplicationLogger.Info("Executing scheduled task", + "taskID", arg.UUID, + "scheduledFor", scheduledTime, + "executedAt", executionTime, + "data", arg.Args, + ) + + // Calculate drift if we have a scheduled time + if !scheduledTime.IsZero() { + drift := executionTime.Sub(scheduledTime) + logger.ApplicationLogger.Info("Scheduled task timing", + "taskID", arg.UUID, + "driftSeconds", drift.Seconds(), + ) + } + + return nil +} diff --git a/example/redis/publisher/main.go b/example/redis/publisher/main.go new file mode 100644 index 0000000..c05ed6f --- /dev/null +++ b/example/redis/publisher/main.go @@ -0,0 +1,134 @@ +package main + +import ( + "context" + "encoding/json" + "os" + "sync" + "time" + + "github.com/surendratiwari3/paota/config" + "github.com/surendratiwari3/paota/logger" + "github.com/surendratiwari3/paota/schema" + "github.com/surendratiwari3/paota/workerpool" +) + +// UserRecord represents the structure of user records. +type UserRecord struct { + ID string `json:"id"` + Name string `json:"name"` + Email string `json:"email"` +} + +func main() { + + // Configure Redis broker + cnf := config.Config{ + Broker: "redis", + TaskQueueName: "paota_task_queue", + Redis: &config.RedisConfig{ + Address: "localhost:6379", + DelayedTasksKey: "paota_delayed_tasks", + }, + } + + // Create a worker pool + newWorkerPool, err := workerpool.NewWorkerPoolWithConfig(context.Background(), 10, "testWorker", cnf) + if err != nil { + logger.ApplicationLogger.Error("workerPool is not created", err) + os.Exit(1) + } else if newWorkerPool == nil { + logger.ApplicationLogger.Info("workerPool is nil") + os.Exit(1) + } + logger.ApplicationLogger.Info("newWorkerPool created successfully") + + // Prepare a user record as the task payload + user := UserRecord{ + ID: "1", + Name: "Jane Doe", + Email: "jane.doe@example.com", + } + + userJSON, err := json.Marshal(user) + if err != nil { + logger.ApplicationLogger.Error("failed to marshal user record", err) + return + } + + printJob := &schema.Signature{ + Name: "Print", + Args: []schema.Arg{ + { + Type: "string", + Value: string(userJSON), + }, + }, + RetryCount: 10, + IgnoreWhenTaskNotRegistered: true, + } + + // Add this after your existing print job + retryJob := &schema.Signature{ + Name: "RetryTest", + Args: []schema.Arg{ + { + Type: "string", + Value: "test retry mechanism", + }, + }, + RetryCount: 5, // Allow up to 5 retries + RetryTimeout: 20, // Retry every 3 seconds + } + + // Create a scheduled task to run in 1 minute + eta := time.Now().Add(1 * time.Minute) + scheduledJob := &schema.Signature{ + Name: "ScheduledTask", + Args: []schema.Arg{ + { + Type: "string", + Value: "This is a scheduled task", + }, + }, + ETA: &eta, + } + + // Send the scheduled task + _, err = newWorkerPool.SendTaskWithContext(context.Background(), scheduledJob) + if err != nil { + logger.ApplicationLogger.Error("failed to send scheduled task", err) + } else { + logger.ApplicationLogger.Info("Scheduled task published successfully") + } + + // Send the retry test job + _, err = newWorkerPool.SendTaskWithContext(context.Background(), retryJob) + if err != nil { + logger.ApplicationLogger.Error("failed to send retry test task", err) + } else { + logger.ApplicationLogger.Info("Retry test task published successfully") + } + + // Use a WaitGroup to synchronize goroutines + var waitGrp sync.WaitGroup + + for i := 0; i < 1; i++ { + waitGrp.Add(1) // Add to the WaitGroup counter for each goroutine + go func() { + defer waitGrp.Done() // Mark this goroutine as done when it exits + for j := 0; j < 1; j++ { + _, err := newWorkerPool.SendTaskWithContext(context.Background(), printJob) + if err != nil { + logger.ApplicationLogger.Error("failed to send task", err) + } else { + logger.ApplicationLogger.Info("Task published successfully") + } + } + }() + } + + // Wait for all goroutines to finish + waitGrp.Wait() + logger.ApplicationLogger.Info("All tasks have been published successfully") +} diff --git a/go.mod b/go.mod index 12308d7..d43b059 100644 --- a/go.mod +++ b/go.mod @@ -5,33 +5,34 @@ go 1.21.5 require ( github.com/caarlos0/env/v10 v10.0.0 github.com/go-playground/validator/v10 v10.20.0 + github.com/gomodule/redigo v1.9.2 github.com/google/uuid v1.6.0 - github.com/labstack/echo/v4 v4.11.4 github.com/rabbitmq/amqp091-go v1.9.0 github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.8.4 go.mongodb.org/mongo-driver v1.13.1 ) +require ( + github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect +) + require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-redis/redis/v8 v8.11.5 github.com/golang/snappy v0.0.1 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/klauspost/compress v1.17.0 // indirect github.com/kr/text v0.2.0 // indirect - github.com/labstack/gommon v0.4.2 // indirect github.com/leodido/go-urn v1.4.0 // indirect - github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.20 // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/stretchr/objx v0.5.1 // indirect - github.com/valyala/bytebufferpool v1.0.0 // indirect - github.com/valyala/fasttemplate v1.2.2 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect diff --git a/go.sum b/go.sum index 382264e..31b791c 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,14 @@ github.com/caarlos0/env/v10 v10.0.0 h1:yIHUBZGsyqCnpTkbjk8asUlx6RFhhEs+h7TOBdgdzXA= github.com/caarlos0/env/v10 v10.0.0/go.mod h1:ZfulV76NvVPw3tm591U4SwL3Xx9ldzBP9aGxzeN7G18= +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= @@ -15,8 +19,12 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8= github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gomodule/redigo v1.9.2 h1:HrutZBLhSIU8abiSfW8pj8mPhOyMYjZT/wcA4/L9L9s= +github.com/gomodule/redigo v1.9.2/go.mod h1:KsU3hiK/Ay8U42qpaJk+kuNa3C+spxapWpM+ywhcgtw= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -30,17 +38,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/labstack/echo/v4 v4.11.4 h1:vDZmA+qNeh1pd/cCkEicDMrjtrnMGQ1QFI9gWN1zGq8= -github.com/labstack/echo/v4 v4.11.4/go.mod h1:noh7EvLwqDsmh/X/HWKPUl1AjzJrhyptRyEbQJfxen8= -github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= -github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= -github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= -github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= -github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= -github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= @@ -63,10 +62,6 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= -github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= -github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= @@ -103,8 +98,6 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/internal/broker/amqp/broker.go b/internal/broker/amqp/broker.go index 5356ca6..33de647 100644 --- a/internal/broker/amqp/broker.go +++ b/internal/broker/amqp/broker.go @@ -310,3 +310,7 @@ func (b *AMQPBroker) getTaskTTL(task *schema.Signature) int64 { } return delayMs } + +func (b *AMQPBroker) BrokerType() string { + return "rabbitmq" // Return "rabbitmq" to indicate it's a RabbitMQ broker +} \ No newline at end of file diff --git a/internal/broker/broker_interface.go b/internal/broker/broker_interface.go index 71e3a98..a7721de 100644 --- a/internal/broker/broker_interface.go +++ b/internal/broker/broker_interface.go @@ -11,4 +11,5 @@ type Broker interface { StartConsumer(ctx context.Context, groupInterface workergroup.WorkerGroupInterface) error StopConsumer() Publish(ctx context.Context, task *schema.Signature) error + BrokerType() string } diff --git a/internal/broker/redis/broker.go b/internal/broker/redis/broker.go new file mode 100644 index 0000000..1683b90 --- /dev/null +++ b/internal/broker/redis/broker.go @@ -0,0 +1,194 @@ +package redis + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/gomodule/redigo/redis" + "github.com/surendratiwari3/paota/config" + "github.com/surendratiwari3/paota/internal/broker" + "github.com/surendratiwari3/paota/internal/provider" + "github.com/surendratiwari3/paota/internal/workergroup" + "github.com/surendratiwari3/paota/logger" + "github.com/surendratiwari3/paota/schema" +) + +type RedisBroker struct { + provider provider.RedisProviderInterface + config *config.Config + stopChan chan struct{} + running bool +} + +func NewRedisBroker(provider provider.RedisProviderInterface, config *config.Config) (broker.Broker, error) { + broker := &RedisBroker{ + provider: provider, + config: config, + stopChan: make(chan struct{}), + } + + broker.running = true + go broker.processDelayedTasks() + + logger.ApplicationLogger.Info("Started Redis delayed task processor") + return broker, nil +} + +func (rb *RedisBroker) getTaskDelay(signature *schema.Signature) time.Duration { + if signature.ETA != nil { + now := time.Now().UTC() + if signature.ETA.After(now) { + return signature.ETA.Sub(now) + } + } + return 0 +} + +func (rb *RedisBroker) Publish(ctx context.Context, signature *schema.Signature) error { + delay := rb.getTaskDelay(signature) + + if delay > 0 { + // Add to delayed queue with future timestamp as score + taskBytes, err := json.Marshal(signature) + if err != nil { + return fmt.Errorf("failed to marshal delayed task: %v", err) + } + + conn := rb.provider.GetConn() + defer conn.Close() + + // Calculate exact execution time + executeAt := time.Now().UTC().Add(delay).Unix() + + _, err = conn.Do("ZADD", rb.config.Redis.DelayedTasksKey, executeAt, taskBytes) + if err != nil { + return fmt.Errorf("failed to add delayed task: %v", err) + } + + logger.ApplicationLogger.Info("Task scheduled for delayed execution", + "task", signature.Name, + "delay", delay, + "executeAt", time.Unix(executeAt, 0)) + return nil + } + + // No delay, publish immediately + return rb.publishToMainQueue(ctx, signature) +} + +func (rb *RedisBroker) StartConsumer(ctx context.Context, workerGroup workergroup.WorkerGroupInterface) error { + return rb.provider.Subscribe(rb.config.TaskQueueName, func(signature *schema.Signature) error { + workerGroup.AssignJob(signature) + return nil + }) +} + +func (rb *RedisBroker) StopConsumer() { + _ = rb.provider.CloseConnection() +} + +func (rb *RedisBroker) BrokerType() string { + return "redis" // Return "redis" to indicate it's a Redis broker +} + +func (rb *RedisBroker) LPush(ctx context.Context, key string, value interface{}) error { + conn := rb.provider.GetConn() + defer conn.Close() + _, err := conn.Do("LPUSH", key, value) + return err +} + +func (rb *RedisBroker) BLPop(ctx context.Context, key string) (interface{}, error) { + conn := rb.provider.GetConn() + defer conn.Close() + return conn.Do("BLPOP", key, 0) // 0 means block indefinitely +} + +func (rb *RedisBroker) StartDelayedTasksProcessor() { + rb.stopChan = make(chan struct{}) + go rb.processDelayedTasks() +} + +func (rb *RedisBroker) processDelayedTasks() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-rb.stopChan: + return + case <-ticker.C: + delayedKey := rb.config.Redis.DelayedTasksKey + mainQueue := rb.config.TaskQueueName + + conn := rb.provider.GetConn() + now := time.Now().UTC().Unix() + + // Get tasks that are ready (score <= current timestamp) + tasks, err := redis.Values(conn.Do("ZRANGEBYSCORE", delayedKey, "-inf", now)) + if err != nil { + logger.ApplicationLogger.Error("Failed to get delayed tasks", "error", err) + conn.Close() + continue + } + + for _, task := range tasks { + taskBytes, ok := task.([]byte) + if !ok { + continue + } + + // Execute in a transaction to ensure atomicity + conn.Send("MULTI") + // Move to main queue + conn.Send("LPUSH", mainQueue, taskBytes) + // Remove from delayed queue + conn.Send("ZREM", delayedKey, taskBytes) + + if _, err := redis.Values(conn.Do("EXEC")); err != nil { + logger.ApplicationLogger.Error("Failed to move task to main queue", + "error", err, + "task", string(taskBytes)) + } else { + logger.ApplicationLogger.Info("Moved delayed task to main queue", + "task", string(taskBytes)) + } + } + conn.Close() + } + } +} + +func (rb *RedisBroker) Stop() error { + if rb.stopChan != nil { + close(rb.stopChan) + } + return rb.provider.CloseConnection() +} + +func (rb *RedisBroker) GetProvider() provider.RedisProviderInterface { + return rb.provider +} + +func (rb *RedisBroker) Close() error { + if rb.running { + rb.running = false + close(rb.stopChan) + } + return nil +} + +func (rb *RedisBroker) publishToMainQueue(ctx context.Context, signature *schema.Signature) error { + taskBytes, err := json.Marshal(signature) + if err != nil { + return fmt.Errorf("failed to marshal task: %v", err) + } + + conn := rb.provider.GetConn() + defer conn.Close() + + _, err = conn.Do("LPUSH", rb.config.TaskQueueName, taskBytes) + return err +} diff --git a/internal/broker/redis/broker_test.go b/internal/broker/redis/broker_test.go new file mode 100644 index 0000000..8609667 --- /dev/null +++ b/internal/broker/redis/broker_test.go @@ -0,0 +1,290 @@ +package redis + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/surendratiwari3/paota/config" + + "github.com/gomodule/redigo/redis" + "github.com/surendratiwari3/paota/schema" +) + +// MockRedisProviderInterface is a mock implementation of the RedisProviderInterface. +type MockRedisProviderInterface struct { + mock.Mock +} + +func (m *MockRedisProviderInterface) Publish(ctx context.Context, queue string, signature *schema.Signature) error { + args := m.Called(ctx, queue, signature) + return args.Error(0) +} + +func (m *MockRedisProviderInterface) Subscribe(queue string, handler func(*schema.Signature) error) error { + args := m.Called(queue, handler) + return args.Error(0) +} + +func (m *MockRedisProviderInterface) CloseConnection() error { + args := m.Called() + return args.Error(0) +} + +func (m *MockRedisProviderInterface) GetConn() redis.Conn { + args := m.Called() + return args.Get(0).(redis.Conn) +} + +// MockRedisConn mocks redis.Conn +type MockRedisConn struct { + mock.Mock +} + +func (m *MockRedisConn) Close() error { + args := m.Called() + return args.Error(0) +} + +func (m *MockRedisConn) Err() error { + args := m.Called() + return args.Error(0) +} + +func (m *MockRedisConn) Do(commandName string, args ...interface{}) (interface{}, error) { + mockArgs := append([]interface{}{commandName}, args...) + result := m.Called(mockArgs...) + return result.Get(0), result.Error(1) +} + +func (m *MockRedisConn) Send(commandName string, args ...interface{}) error { + mockArgs := append([]interface{}{commandName}, args...) + return m.Called(mockArgs...).Error(0) +} + +func (m *MockRedisConn) Flush() error { + args := m.Called() + return args.Error(0) +} + +func (m *MockRedisConn) Receive() (interface{}, error) { + args := m.Called() + return args.Get(0), args.Error(1) +} + +func TestNewRedisBroker(t *testing.T) { + mockConfigProvider := new(config.MockConfigProvider) + + redisConfig := &config.RedisConfig{ + Address: "localhost:6379", + DelayedTasksKey: "delayed_tasks", + } + + conf := &config.Config{ + Broker: "redis", + TaskQueueName: "test_queue", + Redis: redisConfig, + } + + mockConfigProvider.On("GetConfig").Return(conf, nil) + + // Create a new instance of RedisBroker + broker, err := NewRedisBroker(nil, conf) + + // Assertions + assert.Nil(t, err) + assert.NotNil(t, broker) + assert.Equal(t, "test_queue", broker.(*RedisBroker).config.TaskQueueName) + assert.Equal(t, "redis", broker.BrokerType()) + assert.True(t, broker.(*RedisBroker).running) +} + +func TestRedisBrokerPublish(t *testing.T) { + mockProvider := new(MockRedisProviderInterface) + mockConn := new(MockRedisConn) + queue := "test_queue" + ctx := context.Background() + + // Test immediate task + t.Run("Immediate Task", func(t *testing.T) { + signature := &schema.Signature{ + Name: "TestTask", + Args: []schema.Arg{ + {Type: "string", Value: "test_value"}, + }, + } + + mockProvider.On("GetConn").Return(mockConn) + mockConn.On("Do", "LPUSH", queue, mock.Anything).Return(int64(1), nil) + mockConn.On("Close").Return(nil) + + redisBroker := &RedisBroker{ + provider: mockProvider, + config: &config.Config{ + TaskQueueName: queue, + }, + } + + err := redisBroker.Publish(ctx, signature) + require.NoError(t, err) + }) + + // Test delayed task + t.Run("Delayed Task", func(t *testing.T) { + futureTime := time.Now().Add(1 * time.Hour) + signature := &schema.Signature{ + Name: "DelayedTask", + ETA: &futureTime, + } + + mockProvider.On("GetConn").Return(mockConn) + mockConn.On("Do", "ZADD", mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil) + mockConn.On("Close").Return(nil) + + redisBroker := &RedisBroker{ + provider: mockProvider, + config: &config.Config{ + TaskQueueName: queue, + Redis: &config.RedisConfig{ + DelayedTasksKey: "delayed_tasks", + }, + }, + } + + err := redisBroker.Publish(ctx, signature) + require.NoError(t, err) + }) +} + +func TestGetTaskDelay(t *testing.T) { + rb := &RedisBroker{} + + t.Run("Future Task", func(t *testing.T) { + futureTime := time.Now().Add(1 * time.Hour) + sig := &schema.Signature{ETA: &futureTime} + delay := rb.getTaskDelay(sig) + assert.True(t, delay > 0) + assert.True(t, delay <= time.Hour) + }) + + t.Run("Past Task", func(t *testing.T) { + pastTime := time.Now().Add(-1 * time.Hour) + sig := &schema.Signature{ETA: &pastTime} + delay := rb.getTaskDelay(sig) + assert.Equal(t, time.Duration(0), delay) + }) + + t.Run("No ETA", func(t *testing.T) { + sig := &schema.Signature{} + delay := rb.getTaskDelay(sig) + assert.Equal(t, time.Duration(0), delay) + }) +} + +func TestPublishToMainQueue(t *testing.T) { + mockProvider := new(MockRedisProviderInterface) + mockConn := new(MockRedisConn) + queue := "test_queue" + + signature := &schema.Signature{ + Name: "TestTask", + Args: []schema.Arg{ + {Type: "string", Value: "test_value"}, + }, + } + + mockProvider.On("GetConn").Return(mockConn) + mockConn.On("Do", "LPUSH", queue, mock.Anything).Return(int64(1), nil) + mockConn.On("Close").Return(nil) + + redisBroker := &RedisBroker{ + provider: mockProvider, + config: &config.Config{ + TaskQueueName: queue, + }, + } + + err := redisBroker.publishToMainQueue(context.Background(), signature) + require.NoError(t, err) + mockProvider.AssertExpectations(t) + mockConn.AssertExpectations(t) +} + +func TestStartDelayedTasksProcessor(t *testing.T) { + redisBroker := &RedisBroker{ + stopChan: make(chan struct{}), + } + + redisBroker.StartDelayedTasksProcessor() + assert.NotNil(t, redisBroker.stopChan) + + // Cleanup + close(redisBroker.stopChan) +} + +func TestStop(t *testing.T) { + mockProvider := new(MockRedisProviderInterface) + mockProvider.On("CloseConnection").Return(nil) + + redisBroker := &RedisBroker{ + provider: mockProvider, + stopChan: make(chan struct{}), + } + + err := redisBroker.Stop() + require.NoError(t, err) + mockProvider.AssertExpectations(t) + + // Verify channel is closed + select { + case <-redisBroker.stopChan: + // Channel is closed as expected + default: + t.Error("stopChan should be closed") + } +} + +func TestGetProvider(t *testing.T) { + mockProvider := new(MockRedisProviderInterface) + redisBroker := &RedisBroker{ + provider: mockProvider, + } + + provider := redisBroker.GetProvider() + assert.Equal(t, mockProvider, provider) +} + +func TestRedisBrokerProcessDelayedTasks(t *testing.T) { + mockProvider := new(MockRedisProviderInterface) + mockConn := new(MockRedisConn) + + redisBroker := &RedisBroker{ + provider: mockProvider, + config: &config.Config{ + TaskQueueName: "main_queue", + Redis: &config.RedisConfig{ + DelayedTasksKey: "delayed_queue", + }, + }, + stopChan: make(chan struct{}), + running: true, + } + + mockProvider.On("GetConn").Return(mockConn) + mockConn.On("Do", "ZRANGEBYSCORE", "delayed_queue", "-inf", mock.Anything).Return([]interface{}{[]byte(`{"name":"test_task"}`)}, nil) + mockConn.On("Send", "MULTI").Return(nil) + mockConn.On("Send", "LPUSH", "main_queue", mock.Anything).Return(nil) + mockConn.On("Send", "ZREM", "delayed_queue", mock.Anything).Return(nil) + mockConn.On("Do", "EXEC").Return([]interface{}{int64(1), int64(1)}, nil) + mockConn.On("Close").Return(nil) + + go redisBroker.processDelayedTasks() + time.Sleep(2 * time.Second) + close(redisBroker.stopChan) + + mockProvider.AssertExpectations(t) + mockConn.AssertExpectations(t) +} diff --git a/internal/factory/factory.go b/internal/factory/factory.go index 8310741..e037c54 100644 --- a/internal/factory/factory.go +++ b/internal/factory/factory.go @@ -4,6 +4,8 @@ import ( "github.com/surendratiwari3/paota/config" "github.com/surendratiwari3/paota/internal/broker" amqpBroker "github.com/surendratiwari3/paota/internal/broker/amqp" + "github.com/surendratiwari3/paota/internal/broker/redis" + "github.com/surendratiwari3/paota/internal/provider" "github.com/surendratiwari3/paota/internal/task" "github.com/surendratiwari3/paota/internal/task/memory" "github.com/surendratiwari3/paota/logger" @@ -23,12 +25,19 @@ func (bf *Factory) NewAMQPBroker(configProvider config.ConfigProvider) (broker.B return amqpBroker.NewAMQPBroker(configProvider) } +func (bf *Factory) NewRedisBroker(configProvider config.ConfigProvider) (broker.Broker, error) { + redisProvider := provider.NewRedisProvider(configProvider.GetConfig().Redis) + return redis.NewRedisBroker(redisProvider, configProvider.GetConfig()) +} + // CreateBroker creates a new object of broker.Broker func (bf *Factory) CreateBroker(configProvider config.ConfigProvider) (broker.Broker, error) { brokerType := configProvider.GetConfig().Broker switch brokerType { case "amqp": return bf.NewAMQPBroker(configProvider) + case "redis": + return bf.NewRedisBroker(configProvider) default: logger.ApplicationLogger.Error("unsupported broker") return nil, appErrors.ErrUnsupportedBroker diff --git a/internal/provider/redis.go b/internal/provider/redis.go new file mode 100644 index 0000000..71a2f3a --- /dev/null +++ b/internal/provider/redis.go @@ -0,0 +1,133 @@ +package provider + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/gomodule/redigo/redis" + "github.com/surendratiwari3/paota/config" + "github.com/surendratiwari3/paota/logger" + "github.com/surendratiwari3/paota/schema" +) + +type RedisPoolInterface interface { + Get() redis.Conn + Close() error +} +type RedisProviderInterface interface { + Publish(ctx context.Context, queue string, message *schema.Signature) error + Subscribe(queue string, handler func(*schema.Signature) error) error + CloseConnection() error + GetConn() redis.Conn +} + +type redisProvider struct { + config *config.RedisConfig + pool RedisPoolInterface // Use the interface here +} + +// NewRedisProvider creates a new Redis provider. +func NewRedisProvider(redisConfig *config.RedisConfig) RedisProviderInterface { + return &redisProvider{ + config: redisConfig, + pool: &redis.Pool{ + MaxIdle: redisConfig.MaxIdle, + IdleTimeout: time.Duration(redisConfig.IdleTimeout) * time.Second, + Dial: func() (redis.Conn, error) { + return redis.Dial("tcp", redisConfig.Address) + }, + }, + } +} + +func (rp *redisProvider) Publish(ctx context.Context, queue string, message *schema.Signature) error { + conn := rp.pool.Get() + defer func() { + if conn != nil { + conn.Close() + } + }() + + if conn == nil { + return fmt.Errorf("failed to get connection from pool") + } + + payload, err := json.Marshal(message) + if err != nil { + return err + } + + _, err = redis.DoWithTimeout(conn, time.Duration(rp.config.WriteTimeout)*time.Second, "LPUSH", queue, payload) + return err +} + +func (rp *redisProvider) Subscribe(queue string, handler func(*schema.Signature) error) error { + + // Validate inputs early + if err := rp.validateInputs(queue, handler); err != nil { + return err + } + conn := rp.pool.Get() + if conn == nil { + return errors.New("failed to get connection from pool") + } + defer conn.Close() + + for { + // Execute BRPOP with timeout + payload, err := redis.Strings(redis.DoWithTimeout(conn, time.Duration(rp.config.ReadTimeout)*time.Second, "BRPOP", queue, rp.config.ReadTimeout)) + if err != nil { + if err == redis.ErrNil { + // BRPOP timeout: Log a warning and retry + logger.ApplicationLogger.Warn("BRPOP timed out, no items in queue", "queue", queue) + continue + } + // Log other errors and return to allow caller to handle + logger.ApplicationLogger.Error("Error during BRPOP", "queue", queue, "error", err) + return err + } + + // Ensure payload has the correct structure + if len(payload) < 2 { + logger.ApplicationLogger.Warn("Received invalid payload from Redis", "payload", payload) + continue + } + + var signature schema.Signature + if err := json.Unmarshal([]byte(payload[1]), &signature); err != nil { + logger.ApplicationLogger.Error("Failed to unmarshal payload", "payload", payload[1], "error", err) + continue + } + + // Process the task using the handler + if err := handler(&signature); err != nil { + logger.ApplicationLogger.Error("Task handler returned an error", "signature", signature, "error", err) + return err + } + } +} + +func (rp *redisProvider) CloseConnection() error { + return rp.pool.Close() +} + +// validateInputs performs the necessary input validation +func (rp *redisProvider) validateInputs(queue string, handler func(*schema.Signature) error) error { + if queue == "" { + return errors.New("queue name cannot be empty") + } + if handler == nil { + return errors.New("handler function cannot be nil") + } + if rp.pool == nil { + return errors.New("redis pool is not initialized") + } + return nil +} + +func (r *redisProvider) GetConn() redis.Conn { + return r.pool.Get() +} diff --git a/internal/provider/redis_test.go b/internal/provider/redis_test.go new file mode 100644 index 0000000..bb9ae08 --- /dev/null +++ b/internal/provider/redis_test.go @@ -0,0 +1,368 @@ +package provider + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/gomodule/redigo/redis" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/surendratiwari3/paota/config" + "github.com/surendratiwari3/paota/schema" +) + +// MockRedisConn now includes DoWithTimeout method +type MockRedisConn struct { + mock.Mock +} + +func (m *MockRedisConn) Close() error { + args := m.Called() + return args.Error(0) +} + +func (m *MockRedisConn) Err() error { + args := m.Called() + return args.Error(0) +} + +// Add this method to mock DoWithTimeout +func (m *MockRedisConn) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (interface{}, error) { + // Combine timeout and all other arguments + callArgs := make([]interface{}, 1+len(args)+1) + callArgs[0] = timeout + callArgs[1] = commandName + copy(callArgs[2:], args) + + methodArgs := m.Called(callArgs...) + return methodArgs.Get(0), methodArgs.Error(1) +} + +func (m *MockRedisConn) Do(commandName string, args ...interface{}) (interface{}, error) { + callArgs := make([]interface{}, 1+len(args)) + callArgs[0] = commandName + copy(callArgs[1:], args) + + methodArgs := m.Called(callArgs...) + return methodArgs.Get(0), methodArgs.Error(1) +} + +// MockRedisPool remains the same +type MockRedisPool struct { + mock.Mock +} + +func (m *MockRedisPool) Get() redis.Conn { + args := m.Called() + conn, ok := args.Get(0).(redis.Conn) + if !ok { + return nil + } + return conn +} + +func (m *MockRedisPool) Close() error { + args := m.Called() + return args.Error(0) +} + +func TestPublish(t *testing.T) { + testCases := []struct { + name string + mockSetup func(*MockRedisPool, *MockRedisConn) + message *schema.Signature + queue string + expectedError bool + errorMessage string + }{ + { + name: "Successful Publish", + mockSetup: func(pool *MockRedisPool, conn *MockRedisConn) { + // Setup pool to return mock connection + pool.On("Get").Return(conn) + + // Setup connection to expect Close() call + conn.On("Close").Return(nil) + + // Prepare payload for LPUSH + signature := &schema.Signature{Name: "test_task"} + payload, _ := json.Marshal(signature) + + // Expect DoWithTimeout method with correct arguments + conn.On("DoWithTimeout", + mock.Anything, // timeout duration + "LPUSH", + "test_queue", + payload, + ).Return(1, nil) + }, + message: &schema.Signature{Name: "test_task"}, + queue: "test_queue", + expectedError: false, + }, + { + name: "Failed Connection", + mockSetup: func(pool *MockRedisPool, conn *MockRedisConn) { + // Simulate nil connection + pool.On("Get").Return(nil) + }, + message: &schema.Signature{Name: "test_task"}, + queue: "test_queue", + expectedError: true, + errorMessage: "failed to get connection from pool", + }, + { + name: "JSON Marshaling Error", + mockSetup: func(pool *MockRedisPool, conn *MockRedisConn) { + // Setup pool to return mock connection + pool.On("Get").Return(conn) + + // Setup connection to expect Close() call + conn.On("Close").Return(nil) + }, + queue: "test_queue", + expectedError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockPool := new(MockRedisPool) + mockConn := new(MockRedisConn) + + // Setup mocks for this specific test case + tc.mockSetup(mockPool, mockConn) + + redisConfig := &config.RedisConfig{ + Address: "localhost:6379", + WriteTimeout: 5, + } + + provider := &redisProvider{ + config: redisConfig, + pool: mockPool, + } + + // Convert nil connection handling for "Failed Connection" case + if mockPool.Get() == nil { + err := provider.Publish(context.Background(), tc.queue, tc.message) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed to get connection from pool") + return + } + + err := provider.Publish(context.Background(), tc.queue, tc.message) + + if tc.expectedError { + assert.Error(t, err) + if tc.errorMessage != "" { + assert.Contains(t, err.Error(), tc.errorMessage) + } + } else { + assert.NoError(t, err) + } + + // Verify mock expectations + mockPool.AssertExpectations(t) + mockConn.AssertExpectations(t) + }) + } +} + +func TestCloseConnection(t *testing.T) { + mockPool := new(MockRedisPool) + mockPool.On("Close").Return(nil) + + redisConfig := &config.RedisConfig{ + Address: "localhost:6379", + } + + provider := &redisProvider{ + config: redisConfig, + pool: mockPool, + } + + err := provider.CloseConnection() + assert.NoError(t, err) + mockPool.AssertExpectations(t) +} + +func TestNewRedisProvider(t *testing.T) { + redisConfig := &config.RedisConfig{ + Address: "localhost:6379", + MaxIdle: 10, + IdleTimeout: 30, + ReadTimeout: 5, + WriteTimeout: 5, + } + + provider := NewRedisProvider(redisConfig) + assert.NotNil(t, provider) +} + +func TestSubscribe(t *testing.T) { + testCases := []struct { + name string + setupMocks func(*MockRedisPool, *MockRedisConn) + queue string + handler func(*schema.Signature) error + expectedError bool + errorContains string + }{ + { + name: "Empty Queue Name", + setupMocks: func(pool *MockRedisPool, conn *MockRedisConn) { + // No mock setup needed for this case + }, + queue: "", + handler: func(s *schema.Signature) error { + return nil + }, + expectedError: true, + errorContains: "queue name cannot be empty", + }, + { + name: "Nil Handler Function", + setupMocks: func(pool *MockRedisPool, conn *MockRedisConn) { + // No mock setup needed for this case + }, + queue: "test_queue", + handler: nil, + expectedError: true, + errorContains: "handler function cannot be nil", + }, + { + name: "Successful Subscribe", + setupMocks: func(pool *MockRedisPool, conn *MockRedisConn) { + pool.On("Get").Return(conn) + conn.On("Close").Return(nil) + + // Mock successful BRPOP response + signature := &schema.Signature{Name: "test_task"} + payload, _ := json.Marshal(signature) + conn.On("DoWithTimeout", + mock.Anything, + "BRPOP", + "test_queue", + mock.Anything, + ).Return([]interface{}{[]byte("test_queue"), payload}, nil) + }, + queue: "test_queue", + handler: func(s *schema.Signature) error { + return nil + }, + expectedError: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockPool := new(MockRedisPool) + mockConn := new(MockRedisConn) + + tc.setupMocks(mockPool, mockConn) + + redisConfig := &config.RedisConfig{ + Address: "localhost:6379", + ReadTimeout: 5, + WriteTimeout: 5, + } + provider := &redisProvider{ + config: redisConfig, + pool: mockPool, + } + + if provider.pool == nil { + err := provider.Subscribe(tc.queue, tc.handler) + assert.Error(t, err) + assert.Contains(t, err.Error(), tc.errorContains) + return + } + + err := provider.Subscribe(tc.queue, tc.handler) + + if tc.expectedError { + assert.Error(t, err) + if tc.errorContains != "" { + assert.Contains(t, err.Error(), tc.errorContains) + } + } else { + assert.NoError(t, err) + } + + mockPool.AssertExpectations(t) + mockConn.AssertExpectations(t) + }) + } +} + +func TestValidateInputs(t *testing.T) { + provider := &redisProvider{ + config: &config.RedisConfig{}, + pool: new(MockRedisPool), + } + + testCases := []struct { + name string + queue string + handler func(*schema.Signature) error + expectedError string + }{ + { + name: "Empty Queue", + queue: "", + handler: func(*schema.Signature) error { return nil }, + expectedError: "queue name cannot be empty", + }, + { + name: "Nil Handler", + queue: "test_queue", + handler: nil, + expectedError: "handler function cannot be nil", + }, + { + name: "Valid Inputs", + queue: "test_queue", + handler: func(*schema.Signature) error { return nil }, + expectedError: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := provider.validateInputs(tc.queue, tc.handler) + if tc.expectedError != "" { + assert.Error(t, err) + assert.Contains(t, err.Error(), tc.expectedError) + } else { + assert.NoError(t, err) + } + }) + } +} +func TestGetConn(t *testing.T) { + // Create mock pool and conn + mockPool := new(MockRedisPool) + mockConn := new(MockRedisConn) + + // Set up expectations + mockPool.On("Get").Return(mockConn) + + // Create provider with mock pool + provider := &redisProvider{ + config: &config.RedisConfig{}, + pool: mockPool, + } + + // Call GetConn + provider.GetConn() + + // Verify expectations + mockPool.AssertExpectations(t) + mockPool.AssertCalled(t, "Get") + + // Verify returned connection + //assert.Equal(t, mockConn, conn, "Should return the mock connection") +} \ No newline at end of file diff --git a/internal/task/memory/task.go b/internal/task/memory/task.go index 16e308d..bf07249 100644 --- a/internal/task/memory/task.go +++ b/internal/task/memory/task.go @@ -2,21 +2,25 @@ package memory import ( "context" + "encoding/json" "errors" "fmt" + "github.com/google/uuid" amqp "github.com/rabbitmq/amqp091-go" "github.com/surendratiwari3/paota/internal/broker" + "github.com/surendratiwari3/paota/internal/broker/redis" "github.com/surendratiwari3/paota/internal/task" + "sync" + "time" + "github.com/surendratiwari3/paota/config" "github.com/surendratiwari3/paota/internal/utils" "github.com/surendratiwari3/paota/internal/validation" "github.com/surendratiwari3/paota/logger" "github.com/surendratiwari3/paota/schema" appError "github.com/surendratiwari3/paota/schema/errors" - "sync" - "time" ) type DefaultTaskRegistrar struct { @@ -27,11 +31,12 @@ type DefaultTaskRegistrar struct { } func NewDefaultTaskRegistrar(brk broker.Broker, configProvider config.ConfigProvider) task.TaskRegistrarInterface { - return &DefaultTaskRegistrar{ + registrar := &DefaultTaskRegistrar{ registeredTasks: new(sync.Map), broker: brk, configProvider: configProvider, } + return registrar } // RegisterTasks registers all tasks at once @@ -69,7 +74,7 @@ func (r *DefaultTaskRegistrar) IsTaskRegistered(name string) bool { func (r *DefaultTaskRegistrar) GetRegisteredTask(name string) (interface{}, error) { taskFunc, ok := r.registeredTasks.Load(name) if !ok { - return nil, fmt.Errorf("Task not registered error: %s", name) + return nil, fmt.Errorf("task not registered error: %s", name) } return taskFunc, nil } @@ -79,14 +84,18 @@ func (r *DefaultTaskRegistrar) GetRegisteredTaskCount() uint { } func (r *DefaultTaskRegistrar) Processor(job interface{}) error { + logger.ApplicationLogger.Info("Received job of type", "type", fmt.Sprintf("%T", job)) // Perform a type switch to handle different job types switch j := job.(type) { case amqp.Delivery: - return r.amqpMsgProcessor(job) + return r.amqpMsgProcessor(j) // Pass the job as amqp.Delivery + case *schema.Signature: + // Perform a type assertion to ensure job is of type *schema.Signature + return r.redisMsgProcessor(j) // Use j directly since it's already asserted default: // Handle other job types or report an error - logger.ApplicationLogger.Error("Unsupported job type", j) + logger.ApplicationLogger.Error("Unsupported job type", "type", fmt.Sprintf("%T", j), "job", j) } return nil } @@ -134,6 +143,238 @@ func (r *DefaultTaskRegistrar) amqpMsgProcessor(job interface{}) error { return nil } +func (r *DefaultTaskRegistrar) redisMsgProcessor(signature *schema.Signature) error { + // Validate task + fn, err := r.validateTask(signature) + if err != nil { + return err + } + + // Handle task execution + return r.executeTask(signature, fn) +} + +func (r *DefaultTaskRegistrar) validateTask(signature *schema.Signature) (func(*schema.Signature) error, error) { + registeredTaskFunc, err := r.GetRegisteredTask(signature.Name) + if err != nil || registeredTaskFunc == nil { + logger.ApplicationLogger.Error("task not registered or invalid task function", signature.Name) + return nil, appError.ErrTaskNotRegistered + } + + fn, ok := registeredTaskFunc.(func(*schema.Signature) error) + if !ok { + logger.ApplicationLogger.Error("Invalid task function type", signature.Name) + return nil, appError.ErrTaskMustBeFunc + } + return fn, nil +} + +func (r *DefaultTaskRegistrar) executeTask(signature *schema.Signature, fn func(*schema.Signature) error) error { + if err := r.moveToProcessing(signature); err != nil { + logger.ApplicationLogger.Error("Failed to move task to processing", "error", err) + return err + } + + if err := fn(signature); err != nil { + return r.handleTaskError(signature, err) + } + + return r.removeFromProcessing(signature) +} + +func (r *DefaultTaskRegistrar) handleTaskError(signature *schema.Signature, err error) error { + if retryErr := r.retryRedisTask(signature); retryErr != nil { + logger.ApplicationLogger.Error("Failed to retry task", "error", retryErr) + if recoverErr := r.recoverFailedTask(signature); recoverErr != nil { + logger.ApplicationLogger.Error("Failed to recover task", "error", recoverErr) + } + return retryErr + } + return err +} + +// Helper functions to manage task state +func (r *DefaultTaskRegistrar) moveToProcessing(signature *schema.Signature) error { + if redisBroker, ok := r.broker.(*redis.RedisBroker); ok { + processingKey := r.getProcessingKey() + + taskBytes, err := json.Marshal(signature) + if err != nil { + return fmt.Errorf(errSerializeTask, err) + } + + conn := redisBroker.GetProvider().GetConn() + defer conn.Close() + + _, err = conn.Do("ZADD", processingKey, time.Now().Unix(), taskBytes) + return err + } + return nil +} + +func (r *DefaultTaskRegistrar) removeFromProcessing(signature *schema.Signature) error { + if redisBroker, ok := r.broker.(*redis.RedisBroker); ok { + processingKey := r.getProcessingKey() + + taskBytes, err := json.Marshal(signature) + if err != nil { + return fmt.Errorf(errSerializeTask, err) + } + + conn := redisBroker.GetProvider().GetConn() + defer conn.Close() + + _, err = conn.Do("ZREM", processingKey, taskBytes) + return err + } + return nil +} + +func (r *DefaultTaskRegistrar) recoverFailedTask(signature *schema.Signature) error { + if redisBroker, ok := r.broker.(*redis.RedisBroker); ok { + delayedKey := r.configProvider.GetConfig().Redis.DelayedTasksKey + if delayedKey == "" { + delayedKey = "paota_tasks_delayed" + } + processingKey := r.getProcessingKey() + + taskBytes, err := json.Marshal(signature) + if err != nil { + return fmt.Errorf(errSerializeTask, err) + } + + conn := redisBroker.GetProvider().GetConn() + defer conn.Close() + + // Start transaction + if _, err := conn.Do("MULTI"); err != nil { + return err + } + + // Move from processing back to delayed queue + _, err = conn.Do("ZADD", delayedKey, time.Now().Unix(), taskBytes) + if err != nil { + conn.Do("DISCARD") + return err + } + + // Remove from processing + _, err = conn.Do("ZREM", processingKey, taskBytes) + if err != nil { + conn.Do("DISCARD") + return err + } + + // Commit transaction + _, err = conn.Do("EXEC") + return err + } + return nil +} + +func (r *DefaultTaskRegistrar) getProcessingKey() string { + delayedKey := r.configProvider.GetConfig().Redis.DelayedTasksKey + if delayedKey == "" { + delayedKey = "paota_tasks_delayed" + } + return delayedKey + "_processing" +} + +func (r *DefaultTaskRegistrar) retryRedisTask(signature *schema.Signature) error { + // Set default retry values if not specified + if signature.RetryCount == 0 { + signature.RetryCount = r.configProvider.GetConfig().Redis.RetryCount + } + if signature.RetryTimeout == 0 { + signature.RetryTimeout = r.configProvider.GetConfig().Redis.RetryTimeout + } + + // Check if we've exceeded retry count + if signature.RetriesDone >= signature.RetryCount { + logger.ApplicationLogger.Info("Max retry attempts reached", + "task", signature.Name, + "attempts", signature.RetriesDone, + ) + return fmt.Errorf("max retry attempts reached for task: %s", signature.Name) + } + + // Increment retry counter + signature.RetriesDone++ + + // Calculate next retry time using the signature's RetryTimeout + retryDelay := time.Duration(signature.RetryTimeout) * time.Second + eta := time.Now().UTC().Add(retryDelay) + signature.ETA = &eta + + if redisBroker, ok := r.broker.(*redis.RedisBroker); ok { + if err := r.executeRedisRetry(redisBroker, signature, eta); err != nil { + return err + } + + logger.ApplicationLogger.Info("Task scheduled for retry", + "task", signature.Name, + "attempt", signature.RetriesDone, + "nextRetry", eta, + "delaySeconds", signature.RetryTimeout) + + return nil + } + + return fmt.Errorf("broker is not of type RedisBroker") +} + +func (r *DefaultTaskRegistrar) executeRedisRetry(redisBroker *redis.RedisBroker, signature *schema.Signature, eta time.Time) error { + delayedKey := r.configProvider.GetConfig().Redis.DelayedTasksKey + if delayedKey == "" { + delayedKey = "paota_tasks_delayed" + } + processingKey := delayedKey + "_processing" + + taskBytes, err := json.Marshal(signature) + if err != nil { + return fmt.Errorf(errSerializeTask, err) + } + + conn := redisBroker.GetProvider().GetConn() + defer conn.Close() + + // Start a Redis transaction + if _, err := conn.Do("MULTI"); err != nil { + return fmt.Errorf("failed to start transaction: %v", err) + } + + // 1. First move to processing set (temporary storage) + if _, err = conn.Do("ZADD", processingKey, eta.Unix(), taskBytes); err != nil { + conn.Do("DISCARD") + return fmt.Errorf("failed to add task to processing queue: %v", err) + } + + // 2. Remove from original set (if it exists) + if _, err = conn.Do("ZREM", delayedKey, taskBytes); err != nil { + conn.Do("DISCARD") + return fmt.Errorf("failed to remove task from original queue: %v", err) + } + + // 3. Add to delayed set + if _, err = conn.Do("ZADD", delayedKey, eta.Unix(), taskBytes); err != nil { + conn.Do("DISCARD") + return fmt.Errorf("failed to add task to delayed queue: %v", err) + } + + // 4. Remove from processing set + if _, err = conn.Do("ZREM", processingKey, taskBytes); err != nil { + conn.Do("DISCARD") + return fmt.Errorf("failed to remove task from processing queue: %v", err) + } + + // Commit the transaction + if _, err := conn.Do("EXEC"); err != nil { + return fmt.Errorf("failed to commit transaction: %v", err) + } + + return nil +} + func (r *DefaultTaskRegistrar) retryTask(signature *schema.Signature) error { if signature.RetryCount < 1 { return nil @@ -165,8 +406,39 @@ func (r *DefaultTaskRegistrar) SendTaskWithContext(ctx context.Context, signatur taskID := uuid.New().String() signature.UUID = fmt.Sprintf("task_%v", taskID) } - if err := r.broker.Publish(ctx, signature); err != nil { - return fmt.Errorf("Publish message error: %s", err) + + // Use BrokerType to determine the broker and send the task accordingly + switch r.broker.BrokerType() { + case "rabbitmq": + // If the broker is RabbitMQ, publish to the message queue + if err := r.broker.Publish(ctx, signature); err != nil { + return fmt.Errorf("publish message to RabbitMQ error: %s", err) + } + logger.ApplicationLogger.Info("Task published to RabbitMQ", "taskUUID", signature.UUID) + case "redis": + // If the broker is Redis, pass the signature directly to RedisBroker's Publish method + if redisBroker, ok := r.broker.(*redis.RedisBroker); ok { + // Publish directly without serialization because RedisBroker's Publish already handles it + if err := redisBroker.Publish(ctx, signature); err != nil { + return fmt.Errorf("failed to store task in Redis: %s", err) + } + logger.ApplicationLogger.Info("Task published to Redis", "taskUUID", signature.UUID) + } else { + return fmt.Errorf("broker is not of type RedisBroker") + } + + default: + // If the broker type is unsupported + return fmt.Errorf("unsupported broker type: %s", r.broker.BrokerType()) } + return nil } +func SignatureToBytes(signature *schema.Signature) ([]byte, error) { + // Serialize signature to JSON (you can use other formats if needed) + return json.Marshal(signature) +} + +const ( + errSerializeTask = "failed to serialize task: %v" +) diff --git a/internal/task/memory/task_test.go b/internal/task/memory/task_test.go index 3f87ec6..733d43f 100644 --- a/internal/task/memory/task_test.go +++ b/internal/task/memory/task_test.go @@ -1,13 +1,16 @@ package memory import ( - "errors" + "testing" + "time" + + amqp "github.com/rabbitmq/amqp091-go" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/surendratiwari3/paota/config" "github.com/surendratiwari3/paota/internal/broker" + "github.com/surendratiwari3/paota/internal/broker/redis" "github.com/surendratiwari3/paota/schema" - "testing" + appError "github.com/surendratiwari3/paota/schema/errors" ) func TestTaskRegistrar_RegisterTasks(t *testing.T) { @@ -98,30 +101,159 @@ func TestTaskRegistrar_GetRegisteredTask(t *testing.T) { assert.Nil(t, task) } -func TestTaskRegistrar_SendTaskWithContext(t *testing.T) { +func TestTaskRegistrar_Processor(t *testing.T) { mockConfigProvider := new(config.MockConfigProvider) - mockConfigProvider.On("GetConfig").Return(&config.Config{ - Broker: "amqp", - TaskQueueName: "test", + mockBroker := broker.NewMockBroker(t) + taskRegistrar := NewDefaultTaskRegistrar(mockBroker, mockConfigProvider) + + t.Run("AMQP Delivery", func(t *testing.T) { + delivery := amqp.Delivery{ + Body: []byte(`{"name": "test_task"}`), + } + err := taskRegistrar.Processor(delivery) + assert.Error(t, err) // Should error since task not registered + }) + + t.Run("Redis Signature", func(t *testing.T) { + signature := &schema.Signature{ + Name: "test_task", + } + err := taskRegistrar.Processor(signature) + assert.Error(t, err) // Should error since task not registered + }) + + t.Run("Unsupported Type", func(t *testing.T) { + err := taskRegistrar.Processor("unsupported") + assert.NoError(t, err) // Returns nil for unsupported types + }) +} + +func TestTaskRegistrar_RedisMsgProcessor(t *testing.T) { + mockConfigProvider := new(config.MockConfigProvider) + mockBroker := broker.NewMockBroker(t) + taskRegistrar := NewDefaultTaskRegistrar(mockBroker, mockConfigProvider) + + t.Run("Task Not Registered", func(t *testing.T) { + signature := &schema.Signature{ + Name: "nonexistent_task", + } + err := taskRegistrar.(*DefaultTaskRegistrar).redisMsgProcessor(signature) + assert.Equal(t, appError.ErrTaskNotRegistered, err) + }) + + t.Run("Invalid Task Function", func(t *testing.T) { + // Register invalid task function + concreteRegistrar := taskRegistrar.(*DefaultTaskRegistrar) + concreteRegistrar.registeredTasks.Store("invalid_task", "not a function") + signature := &schema.Signature{ + Name: "invalid_task", + } + err := taskRegistrar.(*DefaultTaskRegistrar).redisMsgProcessor(signature) + assert.Equal(t, appError.ErrTaskMustBeFunc, err) + }) + + t.Run("Successful Task Execution", func(t *testing.T) { + taskFunc := func(*schema.Signature) error { return nil } + concreteRegistrar := taskRegistrar.(*DefaultTaskRegistrar) + concreteRegistrar.registeredTasks.Store("success_task", taskFunc) + signature := &schema.Signature{ + Name: "success_task", + } + err := taskRegistrar.(*DefaultTaskRegistrar).redisMsgProcessor(signature) + assert.NoError(t, err) + }) +} + +func TestTaskRegistrar_RetryRedisTask(t *testing.T) { + mockConfig := &config.Config{ + Redis: &config.RedisConfig{ + RetryCount: 3, + RetryTimeout: 5, + }, + } + mockConfigProvider := new(config.MockConfigProvider) + mockConfigProvider.On("GetConfig").Return(mockConfig, nil) + + mockRedisBroker := new(redis.RedisBroker) + taskRegistrar := NewDefaultTaskRegistrar(mockRedisBroker, mockConfigProvider) + + t.Run("Max Retries Exceeded", func(t *testing.T) { + signature := &schema.Signature{ + Name: "test_task", + RetryCount: 3, + RetriesDone: 3, + } + err := taskRegistrar.(*DefaultTaskRegistrar).retryRedisTask(signature) + assert.Error(t, err) + assert.Contains(t, err.Error(), "max retry attempts reached") + }) +} + +func TestTaskRegistrar_RetryTask(t *testing.T) { + mockConfig := &config.Config{ AMQP: &config.AMQPConfig{ - Url: "amqp://localhost:5672", - HeartBeatInterval: 30, - ConnectionPoolSize: 2, + DelayedQueue: "delayed_queue", }, - }, nil) + } + mockConfigProvider := new(config.MockConfigProvider) + mockConfigProvider.On("GetConfig").Return(mockConfig, nil) + mockBroker := broker.NewMockBroker(t) taskRegistrar := NewDefaultTaskRegistrar(mockBroker, mockConfigProvider) - mockBroker.On("Publish", mock.Anything, mock.Anything).Return(nil) - // Create a mock task signature - mockSignature := &schema.Signature{ - UUID: "mockUUID", + + t.Run("No Retry Required", func(t *testing.T) { + signature := &schema.Signature{ + RetryCount: 0, + } + err := taskRegistrar.(*DefaultTaskRegistrar).retryTask(signature) + assert.NoError(t, err) + }) + + t.Run("Max Retries Exceeded", func(t *testing.T) { + signature := &schema.Signature{ + RetryCount: 3, + RetriesDone: 4, + } + err := taskRegistrar.(*DefaultTaskRegistrar).retryTask(signature) + assert.NoError(t, err) + }) +} + +func TestTaskRegistrar_GetRetryInterval(t *testing.T) { + mockConfigProvider := new(config.MockConfigProvider) + mockBroker := broker.NewMockBroker(t) + taskRegistrar := NewDefaultTaskRegistrar(mockBroker, mockConfigProvider) + + tests := []struct { + retryCount int + expected time.Duration + }{ + {1, 1 * time.Second}, + {2, 1 * time.Second}, + {3, 2 * time.Second}, + {4, 3 * time.Second}, + {5, 5 * time.Second}, } - err := taskRegistrar.SendTask(mockSignature) - assert.Nil(t, err) - mockBroker = broker.NewMockBroker(t) - taskRegistrar = NewDefaultTaskRegistrar(mockBroker, mockConfigProvider) - mockBroker.On("Publish", mock.Anything, mock.Anything).Return(errors.New("test error")) - err = taskRegistrar.SendTask(mockSignature) - assert.NotNil(t, err) + for _, tt := range tests { + result := taskRegistrar.(*DefaultTaskRegistrar).getRetryInterval(tt.retryCount) + assert.Equal(t, tt.expected, result) + } +} + +func TestSignatureToBytes(t *testing.T) { + signature := &schema.Signature{ + UUID: "test-uuid", + Name: "test-task", + Args: []schema.Arg{ + {Type: "string", Value: "arg1"}, + {Type: "string", Value: "arg2"}, + }, + } + + bytes, err := SignatureToBytes(signature) + assert.NoError(t, err) + assert.NotNil(t, bytes) + assert.Contains(t, string(bytes), "test-uuid") + assert.Contains(t, string(bytes), "test-task") } diff --git a/internal/workergroup/workergroup.go b/internal/workergroup/workergroup.go index f7702e5..c380ea8 100644 --- a/internal/workergroup/workergroup.go +++ b/internal/workergroup/workergroup.go @@ -1,9 +1,10 @@ package workergroup import ( + "sync" + "github.com/surendratiwari3/paota/internal/task" "github.com/surendratiwari3/paota/logger" - "sync" ) type WorkerGroupInterface interface { diff --git a/workerpool/workerpool.go b/workerpool/workerpool.go index f64ece6..1cf0e6d 100644 --- a/workerpool/workerpool.go +++ b/workerpool/workerpool.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/google/uuid" "github.com/surendratiwari3/paota/config" "github.com/surendratiwari3/paota/internal/backend" @@ -13,11 +14,12 @@ import ( "github.com/surendratiwari3/paota/logger" "github.com/surendratiwari3/paota/schema" - "github.com/surendratiwari3/paota/internal/workergroup" "os" "os/signal" "reflect" + "github.com/surendratiwari3/paota/internal/workergroup" + "sync" )