Swig 🍺

Configuration

How to configure Swig for your needs

Configuration

Queue Configuration

Swig allows you to configure multiple queues with different worker pools:

configs := []swig.SwigQueueConfig{
    {
        QueueType: swig.Default,
        MaxWorkers: 5,
    },
    {
        QueueType: swig.Priority,
        MaxWorkers: 3,
    },
}

Each queue operates independently with its own worker pool, allowing you to:

  • Process priority jobs faster with dedicated workers
  • Prevent low-priority jobs from blocking important tasks
  • Scale worker pools based on queue requirements

Worker Configuration

Workers must implement the Worker interface:

type Worker interface {
    JobName() string
    Process(ctx context.Context) error
}

Example worker implementation:

type EmailWorker struct {
    To      string `json:"to"`
    Subject string `json:"subject"`
    Body    string `json:"body"`
}
 
func (w *EmailWorker) JobName() string {
    return "send_email"
}
 
func (w *EmailWorker) Process(ctx context.Context) error {
    log.Printf("Sending email to %s: %s", w.To, w.Subject)
    return sendEmail(w.To, w.Subject, w.Body)
}

Worker Registration

Workers must be registered with Swig before they can process jobs:

// Create workers registry
workers := swig.NewWorkerRegistry()
 
// Register workers
workers.RegisterWorker(&EmailWorker{})
workers.RegisterWorker(&ImageResizeWorker{})
 
// Pass workers to Swig
swigClient := swig.NewSwig(driver, configs, workers)

The registry stores a factory for each worker type. Every claimed job gets a fresh worker instance before its JSON payload is unmarshaled, which avoids sharing mutable worker state across concurrent jobs.

Job Options

When adding jobs, you can specify various options:

// Basic job
err := swigClient.AddJob(ctx, &EmailWorker{
    To: "user@example.com",
    Subject: "Welcome!",
})
 
// Priority job
err = swigClient.AddJob(ctx, &EmailWorker{
    To: "urgent@example.com",
    Subject: "Urgent Notice!",
}, swig.JobOptions{
    Queue: swig.Priority,
    Priority: 5,
})
 
// Scheduled job
err = swigClient.AddJob(ctx, &EmailWorker{
    To: "scheduled@example.com",
    Subject: "Scheduled Email",
}, swig.JobOptions{
    RunAt: time.Now().Add(time.Hour),
})

Batch Processing

Swig supports efficient batch job insertion:

// Create batch jobs
batchJobs := []drivers.BatchJob{
    {
        Worker: &EmailWorker{
            To: "batch1@example.com",
            Subject: "Batch Welcome",
            Body: "Welcome to our platform!",
        },
        Opts: drivers.JobOptions{
            Queue: "default",
            Priority: 1,
            RunAt: time.Now(),
        },
    },
    {
        Worker: &EmailWorker{
            To: "batch2@example.com",
            Subject: "Batch Welcome",
            Body: "Welcome to our platform!",
        },
        Opts: drivers.JobOptions{
            Queue: "default",
            Priority: 1,
            RunAt: time.Now(),
        },
    },
}
 
// Add jobs in a single operation
err := swigClient.AddJobs(ctx, batchJobs)
if err != nil {
    log.Printf("Failed to add batch jobs: %v", err)
} else {
    log.Println("Batch jobs added successfully")
}

Transactional Batch Processing

Batch jobs can also be inserted within a transaction:

tx, err := db.Begin()
if err != nil {
    log.Printf("Failed to begin transaction: %v", err)
    return
}
defer tx.Rollback()
 
txBatchJobs := []drivers.BatchJob{
    {
        Worker: &EmailWorker{
            To: "tx1@example.com",
            Subject: "Transactional Welcome",
            Body: "Welcome to our platform!",
        },
        Opts: drivers.JobOptions{
            Queue: "default",
            Priority: 1,
            RunAt: time.Now(),
        },
    },
    {
        Worker: &EmailWorker{
            To: "tx2@example.com",
            Subject: "Transactional Welcome",
            Body: "Welcome to our platform!",
        },
        Opts: drivers.JobOptions{
            Queue: "default",
            Priority: 1,
            RunAt: time.Now(),
        },
    },
}
 
log.Println("Adding transactional batch jobs...")
if err := swigClient.AddJobsWithTx(ctx, tx, txBatchJobs); err != nil {
    log.Printf("Failed to add transactional batch jobs: %v", err)
    return
}
 
if err := tx.Commit(); err != nil {
    log.Printf("Failed to commit transaction: %v", err)
    return
}
log.Println("Transactional batch jobs added successfully")

Transaction Support

Swig integrates seamlessly with your existing transactions:

// Using your own transaction
tx, err := db.BeginTx(ctx, nil)
if err != nil {
    return err
}
defer tx.Rollback()
 
// Add job in transaction
err = swigClient.AddJobWithTx(ctx, tx, &EmailWorker{
    To: "user@example.com",
    Subject: "Welcome!",
})
if err != nil {
    return err
}
 
// Commit transaction
return tx.Commit()
 
// Or use Swig's transaction helper
err = swigClient.WithTx(ctx, func(tx interface{}) error {
    // Add job in transaction
    return swigClient.AddJobWithTx(ctx, tx, &EmailWorker{
        To: "user@example.com",
        Subject: "Welcome!",
    })
})

Error Handling

Swig provides comprehensive error handling:

  • Failed jobs are automatically retried with exponential backoff
  • Maximum retry attempts can be configured
  • Error details are stored in the database for debugging

Connection Lifecycle

Swig uses PostgreSQL features that are tied to a database session, so it keeps those responsibilities on dedicated connections:

  • Worker notifications use a long-lived LISTEN/NOTIFY listener per worker.
  • Leader election uses a dedicated advisory-lock connection for the active leader.
  • Shutdown cancels worker contexts, closes listeners, waits for workers, and releases the leader advisory lock from the same session that acquired it.

Application queries and job inserts still use the normal driver pool. Dedicated connections are only used where PostgreSQL requires session ownership.

Cleanup and Testing

Swig provides methods for both graceful shutdown and complete cleanup:

// Graceful shutdown: cancel workers, wait for them, and release leader resources
err := swigClient.Stop(ctx)
 
// Complete cleanup: Drop all Swig tables
err := swigClient.Close(ctx)

The Close method is particularly useful in:

  • Testing environments to clean up after tests
  • Development scenarios to reset state
  • CI/CD pipelines needing clean slate between runs

Example usage in tests:

func TestJobProcessing(t *testing.T) {
    // Setup Swig
    swigClient := swig.NewSwig(driver, configs, workers)
    defer swigClient.Close(ctx) // Clean up after test
    
    // Run your tests...
}

Next Steps

On this page