Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REDIS BROKER IMPLEMENTATION #33

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ In summary, Paota facilitates the asynchronous processing of tasks in a distribu

The `Config` struct holds all configuration options for Paota. It includes the following parameters:

- **Broker**: The message broker to be used. Currently, only "amqp" is supported.
- **Broker**: The message broker to be used. Currently, only "amqp" and "redis" is supported.
- **Store**: The type of storage to be used (optional).
- **TaskQueueName**: The name of the task queue. Default value is "paota_tasks".
- **StoreQueueName**: The name of the storage queue (optional).
Expand Down Expand Up @@ -245,4 +245,32 @@ Total Records Processed: 10 Lakh data records.

Thank you for flying Paota!


# Redis broker
The Redis broker acts as a message queue for tasks, enabling asynchronous task processing using a worker pool. The tasks are serialized and stored in Redis, and workers consume these tasks for execution.

## Redis broker workflow
1. Create Worker Pool: Initialize the worker pool with the Redis broker configuration.
2. Register Tasks: Define and register task handlers (e.g., Print task).
3. Publish Tasks: Use the SendTaskWithContext method to publish tasks to the Redis queue.
4. Process Tasks: Workers consume tasks from the Redis queue and execute the corresponding handlers.

## Sample Task Format
{
"UUID": "task_8341a57b-d26d-4bec-94bc-9ef911dc5072",
"Name": "Print",
"Args": [
{
"Name": "Arg_Name",
"Type": "string",
"Value": "{\"id\":\"1\",\"name\":\"Jane Doe\",\"email\":\"jane.doe@example.com\"}"
}
],
"RoutingKey": "",
"Priority": 0,
"RetryCount": 10,
"RetryTimeout": 0,
"WaitTime": 0,
"RetriesDone": 0,
"IgnoreWhenTaskNotRegistered": true,
"ETA": null
}
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ type ConfigProvider interface {

// Config holds all configuration for Paota
type Config struct {
Broker string `env:"BROKER" envDefault:"amqp" validate:"required,oneof=amqp"` //allowed amqp
Broker string `env:"BROKER" envDefault:"amqp" validate:"required,oneof=amqp redis"` //allowed amqp and redis
Store string `env:"STORE"`
TaskQueueName string `env:"QUEUE_NAME" envDefault:"paota_tasks" validate:"required"`
StoreQueueName string `env:"STORE_QUEUE_NAME"`
AMQP *AMQPConfig `envPrefix:"AMQP_"`
MongoDB *MongoDBConfig `envPrefix:"MONGO_"`
Redis *RedisConfig `envPrefix:"REDIS_"`
}

type configProvider struct {
Expand Down
20 changes: 20 additions & 0 deletions config/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package config

type RedisConfig struct {
Address string `env:"URL" envDefault:"redis://localhost:6379"`
MaxIdle int `env:"MAX_IDLE" envDefault:"10"`
MaxActive int `env:"MAX_ACTIVE" envDefault:"100"`
IdleTimeout int `env:"IDLE_TIMEOUT" envDefault:"300"`
Wait bool `env:"WAIT" envDefault:"true"`
ReadTimeout int `env:"READ_TIMEOUT" envDefault:"15"`
WriteTimeout int `env:"WRITE_TIMEOUT" envDefault:"15"`
ConnectTimeout int `env:"CONNECT_TIMEOUT" envDefault:"15"`
NormalTasksPollPeriod int `env:"NORMAL_TASKS_POLL_PERIOD" envDefault:"1000"`
DelayedTasksPollPeriod int `env:"DELAYED_TASKS_POLL_PERIOD" envDefault:"1000"`
DelayedTasksKey string `env:"DELAYED_TASKS_KEY" envDefault:"paota_tasks_delayed"`
ClientName string `env:"CLIENT_NAME" envDefault:"app_redis_client"`
MasterName string `env:"MASTER_NAME" envDefault:""`
ClusterEnabled bool `env:"CLUSTER_ENABLED" envDefault:"false"`
RetryTimeout int `env:"RETRY_TIMEOUT" envDefault:"30"`
RetryCount int `env:"RETRY_COUNT" envDefault:"3"`
}
160 changes: 160 additions & 0 deletions example/redis/consumer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package main

import (
"context"
"encoding/json"
"fmt"
"os"
"sync"
"time"

"github.com/surendratiwari3/paota/config"
"github.com/surendratiwari3/paota/logger"
"github.com/surendratiwari3/paota/schema"
"github.com/surendratiwari3/paota/workerpool"
)

type printWorker struct {
workerPool workerpool.Pool
}

type retryTestWorker struct {
workerPool workerpool.Pool
attempts map[string]int // Track attempts per task
mu sync.Mutex // Protect the map
}

type scheduledWorker struct {
workerPool workerpool.Pool
}

func main() {
// Configure Redis Broker
cnf := config.Config{
Broker: "redis",
TaskQueueName: "paota_task_queue",
Redis: &config.RedisConfig{
Address: "localhost:6379", // Replace with your Redis server address
DelayedTasksKey: "paota_delayed_tasks", // Key for delayed/scheduled tasks
},
}

// Set the configuration
err := config.GetConfigProvider().SetApplicationConfig(cnf)
if err != nil {
logger.ApplicationLogger.Error("config error, exit", err)
return
}

// Create a new worker pool
newWorkerPool, err := workerpool.NewWorkerPool(context.Background(), 10, "testWorker")
if err != nil {
logger.ApplicationLogger.Error("workerPool is not created", err)
os.Exit(0)
} else if newWorkerPool == nil {
logger.ApplicationLogger.Info("workerPool is nil")
os.Exit(0)
}

// Create the worker instances
printWorker := printWorker{workerPool: newWorkerPool}
retryWorker := &retryTestWorker{
workerPool: newWorkerPool,
attempts: make(map[string]int),
}
scheduledWorker := scheduledWorker{workerPool: newWorkerPool}

logger.ApplicationLogger.Info("newWorkerPool created successfully")

// Register tasks
regTasks := map[string]interface{}{
"Print": printWorker.Print,
"RetryTest": retryWorker.RetryTest,
"ScheduledTask": scheduledWorker.ScheduledTask,
}
err = newWorkerPool.RegisterTasks(regTasks)
if err != nil {
logger.ApplicationLogger.Error("error while registering tasks", err)
return
}

logger.ApplicationLogger.Info("Worker is also started")

// Start the worker pool
err = newWorkerPool.Start()
if err != nil {
logger.ApplicationLogger.Error("error while starting worker", err)
}
}

// Print is the task handler for the "Print" task
func (wp printWorker) Print(arg *schema.Signature) error {
// Deserialize the task argument
var user map[string]interface{}
err := json.Unmarshal([]byte(arg.Args[0].Value.(string)), &user)
if err != nil {
logger.ApplicationLogger.Error("failed to parse task argument", err)
return err
}

logger.ApplicationLogger.Infof("Processing task: %v", user)
return nil
}

func (w *retryTestWorker) RetryTest(arg *schema.Signature) error {
w.mu.Lock()
w.attempts[arg.UUID]++
attempts := w.attempts[arg.UUID]
w.mu.Unlock()

logger.ApplicationLogger.Info("Processing RetryTest task",
"taskID", arg.UUID,
"attempt", attempts,
"data", arg.Args[0].Value,
)

// Fail first 3 attempts
if attempts <= 3 {
return fmt.Errorf("intentional failure, attempt %d/3", attempts)
}

// Succeed on 4th attempt
logger.ApplicationLogger.Info("RetryTest task succeeded",
"taskID", arg.UUID,
"attempts", attempts,
)
return nil
}

func (sw scheduledWorker) ScheduledTask(arg *schema.Signature) error {
// Log when the task actually executes
executionTime := time.Now().UTC()

// Parse the scheduled time from args if provided
var scheduledTime time.Time
if len(arg.Args) > 0 {
if timeStr, ok := arg.Args[0].Value.(string); ok {
if parsed, err := time.Parse(time.RFC3339, timeStr); err == nil {
scheduledTime = parsed
}
}
}

logger.ApplicationLogger.Info("Executing scheduled task",
"taskID", arg.UUID,
"scheduledFor", scheduledTime,
"executedAt", executionTime,
"data", arg.Args,
)

// Calculate drift if we have a scheduled time
if !scheduledTime.IsZero() {
drift := executionTime.Sub(scheduledTime)
logger.ApplicationLogger.Info("Scheduled task timing",
"taskID", arg.UUID,
"driftSeconds", drift.Seconds(),
)
}

return nil
}
134 changes: 134 additions & 0 deletions example/redis/publisher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package main

import (
"context"
"encoding/json"
"os"
"sync"
"time"

"github.com/surendratiwari3/paota/config"
"github.com/surendratiwari3/paota/logger"
"github.com/surendratiwari3/paota/schema"
"github.com/surendratiwari3/paota/workerpool"
)

// UserRecord represents the structure of user records.
type UserRecord struct {
ID string `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
}

func main() {

// Configure Redis broker
cnf := config.Config{
Broker: "redis",
TaskQueueName: "paota_task_queue",
Redis: &config.RedisConfig{
Address: "localhost:6379",
DelayedTasksKey: "paota_delayed_tasks",
},
}

// Create a worker pool
newWorkerPool, err := workerpool.NewWorkerPoolWithConfig(context.Background(), 10, "testWorker", cnf)
if err != nil {
logger.ApplicationLogger.Error("workerPool is not created", err)
os.Exit(1)
} else if newWorkerPool == nil {
logger.ApplicationLogger.Info("workerPool is nil")
os.Exit(1)
}
logger.ApplicationLogger.Info("newWorkerPool created successfully")

// Prepare a user record as the task payload
user := UserRecord{
ID: "1",
Name: "Jane Doe",
Email: "jane.doe@example.com",
}

userJSON, err := json.Marshal(user)
if err != nil {
logger.ApplicationLogger.Error("failed to marshal user record", err)
return
}

printJob := &schema.Signature{
Name: "Print",
Args: []schema.Arg{
{
Type: "string",
Value: string(userJSON),
},
},
RetryCount: 10,
IgnoreWhenTaskNotRegistered: true,
}

// Add this after your existing print job
retryJob := &schema.Signature{
Name: "RetryTest",
Args: []schema.Arg{
{
Type: "string",
Value: "test retry mechanism",
},
},
RetryCount: 5, // Allow up to 5 retries
RetryTimeout: 20, // Retry every 3 seconds
}

// Create a scheduled task to run in 1 minute
eta := time.Now().Add(1 * time.Minute)
scheduledJob := &schema.Signature{
Name: "ScheduledTask",
Args: []schema.Arg{
{
Type: "string",
Value: "This is a scheduled task",
},
},
ETA: &eta,
}

// Send the scheduled task
_, err = newWorkerPool.SendTaskWithContext(context.Background(), scheduledJob)
if err != nil {
logger.ApplicationLogger.Error("failed to send scheduled task", err)
} else {
logger.ApplicationLogger.Info("Scheduled task published successfully")
}

// Send the retry test job
_, err = newWorkerPool.SendTaskWithContext(context.Background(), retryJob)
if err != nil {
logger.ApplicationLogger.Error("failed to send retry test task", err)
} else {
logger.ApplicationLogger.Info("Retry test task published successfully")
}

// Use a WaitGroup to synchronize goroutines
var waitGrp sync.WaitGroup

for i := 0; i < 1; i++ {
waitGrp.Add(1) // Add to the WaitGroup counter for each goroutine
go func() {
defer waitGrp.Done() // Mark this goroutine as done when it exits
for j := 0; j < 1; j++ {
_, err := newWorkerPool.SendTaskWithContext(context.Background(), printJob)
if err != nil {
logger.ApplicationLogger.Error("failed to send task", err)
} else {
logger.ApplicationLogger.Info("Task published successfully")
}
}
}()
}

// Wait for all goroutines to finish
waitGrp.Wait()
logger.ApplicationLogger.Info("All tasks have been published successfully")
}
Loading