Examples
Practical examples of using Swig
Examples
Basic Usage
Here's a simple example of setting up and using Swig. Choose your preferred PostgreSQL driver:
package main
import (
"context"
"fmt"
"log"
"github.com/glamboyosa/swig"
"github.com/glamboyosa/swig/drivers"
"github.com/glamboyosa/swig/workers"
"github.com/jackc/pgx/v5/pgxpool"
)
// EmailWorker demonstrates a basic worker implementation
type EmailWorker struct {
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
}
func (w *EmailWorker) JobName() string {
return "send_email"
}
func (w *EmailWorker) Process(ctx context.Context) error {
fmt.Printf("Sending email to: %s with subject: %s\n", w.To, w.Subject)
return nil
}
func main() {
ctx := context.Background()
// Connect to PostgreSQL using pgx
pgxConfig, err := pgxpool.ParseConfig("postgres://postgres:postgres@localhost:5432/swig_example?sslmode=disable")
if err != nil {
log.Fatalf("Unable to parse config: %v", err)
}
pgxPool, err := pgxpool.NewWithConfig(ctx, pgxConfig)
if err != nil {
log.Fatalf("Unable to connect to database: %v", err)
}
defer pgxPool.Close()
// Create pgx driver
driver, err := drivers.NewPgxDriver(pgxPool)
if err != nil {
log.Fatalf("Unable to create driver: %v", err)
}
// Create and register workers
workers := workers.NewWorkerRegistry()
workers.RegisterWorker(&EmailWorker{})
// Configure queues
configs := []swig.SwigQueueConfig{
{QueueType: swig.Default, MaxWorkers: 5},
{QueueType: swig.Priority, MaxWorkers: 2},
}
// Create and start Swig
swigClient := swig.NewSwig(driver, configs, *workers)
swigClient.Start(ctx)
// Add a job
err = swigClient.AddJob(ctx, &EmailWorker{
To: "user@example.com",
Subject: "Welcome!",
Body: "Hello from Swig",
})
if err != nil {
log.Printf("Failed to add job: %v", err)
}
}
Transactional Usage
Example of using Swig within a transaction:
// Create users table for transaction examples
createTableSQL := `
CREATE TABLE IF NOT EXISTS users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email TEXT NOT NULL UNIQUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);`
if _, err := db.ExecContext(ctx, createTableSQL); err != nil {
log.Fatalf("Failed to create users table: %v", err)
}
// Example 1: Using AddJobWithTx - Create user and send welcome email in same transaction
tx, err := db.BeginTx(ctx, nil)
if err != nil {
log.Printf("Failed to begin transaction: %v", err)
}
defer tx.Rollback() // Will be no-op if committed
// Insert user
var userEmail = "new@example.com"
if _, err := tx.ExecContext(ctx, `
INSERT INTO users (email) VALUES ($1)
ON CONFLICT (email) DO NOTHING
`, userEmail); err != nil {
log.Printf("Failed to insert user: %v", err)
return
}
// Add welcome email job in same transaction
err = swigClient.AddJobWithTx(ctx, tx, &EmailWorker{
To: userEmail,
Subject: "Welcome to our platform!",
Body: "Thanks for joining.",
})
if err != nil {
log.Printf("Failed to add welcome email job: %v", err)
return
}
// Commit the transaction
if err := tx.Commit(); err != nil {
log.Printf("Failed to commit transaction: %v", err)
return
}
log.Printf("Successfully created user and queued welcome email")
Batch Processing
Example of adding multiple jobs in a single operation:
// Create batch jobs
batchJobs := []drivers.BatchJob{
{
Worker: &EmailWorker{
To: "batch1@example.com",
Subject: "Batch Welcome",
Body: "Welcome to our platform!",
},
Opts: drivers.JobOptions{
Queue: "default",
Priority: 1,
RunAt: time.Now(),
},
},
{
Worker: &EmailWorker{
To: "batch2@example.com",
Subject: "Batch Welcome",
Body: "Welcome to our platform!",
},
Opts: drivers.JobOptions{
Queue: "default",
Priority: 1,
RunAt: time.Now(),
},
},
}
// Add jobs in a single operation
err := swigClient.AddJobs(ctx, batchJobs)
if err != nil {
log.Printf("Failed to add batch jobs: %v", err)
} else {
log.Println("Batch jobs added successfully")
}
// Example: Transactional batch insertion
tx, err := db.Begin()
if err != nil {
log.Printf("Failed to begin transaction: %v", err)
return
}
defer tx.Rollback()
txBatchJobs := []drivers.BatchJob{
{
Worker: &EmailWorker{
To: "tx1@example.com",
Subject: "Transactional Welcome",
Body: "Welcome to our platform!",
},
Opts: drivers.JobOptions{
Queue: "default",
Priority: 1,
RunAt: time.Now(),
},
},
{
Worker: &EmailWorker{
To: "tx2@example.com",
Subject: "Transactional Welcome",
Body: "Welcome to our platform!",
},
Opts: drivers.JobOptions{
Queue: "default",
Priority: 1,
RunAt: time.Now(),
},
},
}
log.Println("Adding transactional batch jobs...")
if err := swigClient.AddJobsWithTx(ctx, tx, txBatchJobs); err != nil {
log.Printf("Failed to add transactional batch jobs: %v", err)
return
}
if err := tx.Commit(); err != nil {
log.Printf("Failed to commit transaction: %v", err)
return
}
log.Println("Transactional batch jobs added successfully")
Priority Queues
Example of using priority queues:
// Configure priority queues
configs := []swig.SwigQueueConfig{
{
QueueType: swig.Default,
MaxWorkers: 5,
},
{
QueueType: swig.Priority,
MaxWorkers: 3,
},
}
// Add a high-priority job
err := swigClient.AddJob(ctx, &EmailWorker{
To: "user@example.com",
Subject: "Urgent: Action Required",
Body: "Please take action immediately.",
}, swig.JobOptions{
Queue: swig.Priority,
Priority: 10,
})
Scheduled Jobs
Example of scheduling jobs for future execution:
// Schedule a job to run in 1 hour
err := swigClient.AddJob(ctx, &EmailWorker{
To: "user@example.com",
Subject: "Reminder",
Body: "Don't forget about our meeting!",
}, swig.JobOptions{
RunAt: time.Now().Add(time.Hour),
})
Error Handling
Example of handling job processing errors:
type EmailWorker struct {
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
}
func (w *EmailWorker) JobName() string {
return "send_email"
}
func (w *EmailWorker) Process(ctx context.Context) error {
// Simulate a temporary error
if rand.Intn(2) == 0 {
return fmt.Errorf("temporary error: email service unavailable")
}
// Simulate a permanent error
if w.To == "invalid@example.com" {
return fmt.Errorf("permanent error: invalid email address")
}
log.Printf("Sending email to %s: %s", w.To, w.Subject)
return nil
}
Advanced Usage
Graceful Shutdown
For production applications, you might want to handle graceful shutdown:
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
// ... (other imports)
)
func main() {
ctx := context.Background()
// Create a context that will be cancelled on SIGINT or SIGTERM
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Setup signal handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigChan
log.Printf("Received signal %v, initiating shutdown...", sig)
cancel()
}()
// ... (Swig setup as shown in basic usage)
// Wait for shutdown signal
<-ctx.Done()
// Graceful shutdown
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCancel()
if err := swigClient.Stop(shutdownCtx); err != nil {
log.Printf("Error during shutdown: %v", err)
}
if err := swigClient.Close(shutdownCtx); err != nil {
log.Printf("Error closing swig client: %v", err)
}
}
Next Steps
- API Reference - Detailed API documentation
- Configuration - Learn how to configure Swig for your needs