Swig 🍺

Configuration

How to configure Swig for your needs

Configuration

Queue Configuration

Swig allows you to configure multiple queues with different worker pools:

configs := []swig.SwigQueueConfig{
    {
        QueueType: swig.Default,
        MaxWorkers: 5,
    },
    {
        QueueType: swig.Priority,
        MaxWorkers: 3,
    },
}

Each queue operates independently with its own worker pool, allowing you to:

  • Process priority jobs faster with dedicated workers
  • Prevent low-priority jobs from blocking important tasks
  • Scale worker pools based on queue requirements

Worker Configuration

Workers must implement the Worker interface:

type Worker interface {
    JobName() string
    Process(ctx context.Context) error
}

Example worker implementation:

type EmailWorker struct {
    To      string `json:"to"`
    Subject string `json:"subject"`
    Body    string `json:"body"`
}
 
func (w *EmailWorker) JobName() string {
    return "send_email"
}
 
func (w *EmailWorker) Process(ctx context.Context) error {
    log.Printf("Sending email to %s: %s", w.To, w.Subject)
    return sendEmail(w.To, w.Subject, w.Body)
}

Worker Registration

Workers must be registered with Swig before they can process jobs:

// Create workers registry
workers := swig.NewWorkerRegistry()
 
// Register workers
workers.RegisterWorker(&EmailWorker{})
workers.RegisterWorker(&ImageResizeWorker{})
 
// Pass workers to Swig
swigClient := swig.NewSwig(driver, configs, workers)

Job Options

When adding jobs, you can specify various options:

// Basic job
err := swigClient.AddJob(ctx, &EmailWorker{
    To: "user@example.com",
    Subject: "Welcome!",
})
 
// Priority job
err = swigClient.AddJob(ctx, &EmailWorker{
    To: "urgent@example.com",
    Subject: "Urgent Notice!",
}, swig.JobOptions{
    Queue: swig.Priority,
    Priority: 5,
})
 
// Scheduled job
err = swigClient.AddJob(ctx, &EmailWorker{
    To: "scheduled@example.com",
    Subject: "Scheduled Email",
}, swig.JobOptions{
    RunAt: time.Now().Add(time.Hour),
})

Batch Processing

Swig supports efficient batch job insertion:

// Create batch jobs
batchJobs := []drivers.BatchJob{
    {
        Worker: &EmailWorker{
            To: "batch1@example.com",
            Subject: "Batch Welcome",
            Body: "Welcome to our platform!",
        },
        Opts: drivers.JobOptions{
            Queue: "default",
            Priority: 1,
            RunAt: time.Now(),
        },
    },
    {
        Worker: &EmailWorker{
            To: "batch2@example.com",
            Subject: "Batch Welcome",
            Body: "Welcome to our platform!",
        },
        Opts: drivers.JobOptions{
            Queue: "default",
            Priority: 1,
            RunAt: time.Now(),
        },
    },
}
 
// Add jobs in a single operation
err := swigClient.AddJobs(ctx, batchJobs)
if err != nil {
    log.Printf("Failed to add batch jobs: %v", err)
} else {
    log.Println("Batch jobs added successfully")
}

Transactional Batch Processing

Batch jobs can also be inserted within a transaction:

tx, err := db.Begin()
if err != nil {
    log.Printf("Failed to begin transaction: %v", err)
    return
}
defer tx.Rollback()
 
txBatchJobs := []drivers.BatchJob{
    {
        Worker: &EmailWorker{
            To: "tx1@example.com",
            Subject: "Transactional Welcome",
            Body: "Welcome to our platform!",
        },
        Opts: drivers.JobOptions{
            Queue: "default",
            Priority: 1,
            RunAt: time.Now(),
        },
    },
    {
        Worker: &EmailWorker{
            To: "tx2@example.com",
            Subject: "Transactional Welcome",
            Body: "Welcome to our platform!",
        },
        Opts: drivers.JobOptions{
            Queue: "default",
            Priority: 1,
            RunAt: time.Now(),
        },
    },
}
 
log.Println("Adding transactional batch jobs...")
if err := swigClient.AddJobsWithTx(ctx, tx, txBatchJobs); err != nil {
    log.Printf("Failed to add transactional batch jobs: %v", err)
    return
}
 
if err := tx.Commit(); err != nil {
    log.Printf("Failed to commit transaction: %v", err)
    return
}
log.Println("Transactional batch jobs added successfully")

Transaction Support

Swig integrates seamlessly with your existing transactions:

// Using your own transaction
tx, err := db.BeginTx(ctx, nil)
if err != nil {
    return err
}
defer tx.Rollback()
 
// Add job in transaction
err = swigClient.AddJobWithTx(ctx, tx, &EmailWorker{
    To: "user@example.com",
    Subject: "Welcome!",
})
if err != nil {
    return err
}
 
// Commit transaction
return tx.Commit()
 
// Or use Swig's transaction helper
err = swigClient.WithTx(ctx, func(tx interface{}) error {
    // Add job in transaction
    return swigClient.AddJobWithTx(ctx, tx, &EmailWorker{
        To: "user@example.com",
        Subject: "Welcome!",
    })
})

Error Handling

Swig provides comprehensive error handling:

  • Failed jobs are automatically retried with exponential backoff
  • Maximum retry attempts can be configured
  • Error details are stored in the database for debugging

Cleanup and Testing

Swig provides methods for both graceful shutdown and complete cleanup:

// Graceful shutdown: Wait for jobs to complete
err := swigClient.Stop(ctx)
 
// Complete cleanup: Drop all Swig tables
err := swigClient.Close(ctx)

The Close method is particularly useful in:

  • Testing environments to clean up after tests
  • Development scenarios to reset state
  • CI/CD pipelines needing clean slate between runs

Example usage in tests:

func TestJobProcessing(t *testing.T) {
    // Setup Swig
    swigClient := swig.NewSwig(driver, configs, workers)
    defer swigClient.Close(ctx) // Clean up after test
    
    // Run your tests...
}

Next Steps

On this page