Swig 🍺

Advanced Concepts

Deep dive into Swig's advanced features and architecture

Advanced Concepts

Transactional Integrity

In distributed systems, especially job queues, transactional integrity is crucial. Swig offers three levels of transaction control:

Use your existing database transaction for maximum control:

tx, _ := pool.Begin(ctx)
defer tx.Rollback(ctx)
 
// Create user
userID := createUser(tx)
 
// Enqueue welcome email in same transaction
err := swigClient.AddJobWithTx(ctx, tx, &EmailWorker{
    To: email,
    Subject: "Welcome!",
})
if err != nil {
    return err
}
return tx.Commit(ctx)

2. Use Swig's Transaction Helper

Let Swig manage the transaction for you:

err := swigClient.driver.WithTx(ctx, func(tx Transaction) error {
    // Create user
    if err := createUser(tx); err != nil {
        return err // Triggers rollback
    }
    
    // Add job (auto rollback if this fails)
    return tx.Exec(ctx, insertJobSQL, ...)
})

3. No Transaction (Simple)

For simple, non-transactional job enqueueing:

err := swigClient.AddJob(ctx, &EmailWorker{
    To: email,
    Subject: "Welcome!",
})

Benefits of Transactional Job Processing

  • No Lost Jobs: Jobs are either fully committed or not at all
  • Atomic Processing: Jobs are processed exactly once using PostgreSQL's SKIP LOCK
  • Data Consistency: Jobs can be part of your application's transactions

Architecture

Swig uses PostgreSQL's advanced features for efficient job distribution:

SKIP LOCK for Job Distribution

Swig uses PostgreSQL's SKIP LOCK feature to ensure:

  • No duplicate job processing
  • Fair job distribution across workers
  • High availability
  • Transactional integrity

Leader Election

Built-in leader election using PostgreSQL advisory locks ensures:

  • Only one worker processes jobs at a time
  • Automatic failover if the leader fails
  • No external coordination needed

Batch Processing

Swig supports efficient batch insertion of multiple jobs:

Basic Batch Insertion

batchJobs := []drivers.BatchJob{
    {
        Worker: &EmailWorker{
            To: "batch1@example.com",
            Subject: "Batch Welcome",
            Body: "Welcome to our platform!",
        },
        Opts: drivers.JobOptions{
            Queue: "default",
            Priority: 1,
            RunAt: time.Now(),
        },
    },
    {
        Worker: &EmailWorker{
            To: "batch2@example.com",
            Subject: "Batch Welcome",
            Body: "Welcome to our platform!",
        },
        Opts: drivers.JobOptions{
            Queue: "default",
            Priority: 1,
            RunAt: time.Now(),
        },
    },
}
 
err := swigClient.AddJobs(ctx, batchJobs)
if err != nil {
    log.Printf("Failed to add batch jobs: %v", err)
} else {
    log.Println("Batch jobs added successfully")
}

Transactional Batch Insertion

tx, err := db.Begin()
if err != nil {
    log.Printf("Failed to begin transaction: %v", err)
    return
}
defer tx.Rollback()
 
txBatchJobs := []drivers.BatchJob{
    {
        Worker: &EmailWorker{
            To: "tx1@example.com",
            Subject: "Transactional Welcome",
            Body: "Welcome to our platform!",
        },
        Opts: drivers.JobOptions{
            Queue: "default",
            Priority: 1,
            RunAt: time.Now(),
        },
    },
    {
        Worker: &EmailWorker{
            To: "tx2@example.com",
            Subject: "Transactional Welcome",
            Body: "Welcome to our platform!",
        },
        Opts: drivers.JobOptions{
            Queue: "default",
            Priority: 1,
            RunAt: time.Now(),
        },
    },
}
 
log.Println("Adding transactional batch jobs...")
if err := swigClient.AddJobsWithTx(ctx, tx, txBatchJobs); err != nil {
    log.Printf("Failed to add transactional batch jobs: %v", err)
    return
}
 
if err := tx.Commit(); err != nil {
    log.Printf("Failed to commit transaction: %v", err)
    return
}
log.Println("Transactional batch jobs added successfully")

Performance Considerations

Batch Processing

  • Batch insertion is significantly faster than individual inserts
  • Optimal batch size depends on your database configuration
  • Monitor memory usage for large batches

Driver Selection

Swig supports two PostgreSQL drivers with different characteristics:

  1. pgx Driver (Recommended)

    • Better performance
    • Native LISTEN/NOTIFY support
    • Real-time job notifications
    • Optimized for batch operations
  2. database/sql Driver

    • Standard Go database interface
    • Requires github.com/lib/pq for LISTEN/NOTIFY
    • Simpler implementation
    • Good for most use cases

Cleanup and Testing

Swig provides methods for both graceful shutdown and complete cleanup:

// Graceful shutdown: Wait for jobs to complete
err := swigClient.Stop(ctx)
 
// Complete cleanup: Drop all Swig tables
err := swigClient.Close(ctx)

The Close method is particularly useful in:

  • Testing environments
  • Development scenarios
  • CI/CD pipelines

Next Steps