batch_job

package
v1.30.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 8, 2024 License: MIT Imports: 5 Imported by: 0

README

Solomon AI Batch Job Library

Efficient and Type-safe Batch Job Processing for Go Applications

Overview

The Solomon AI Batch Job Library is a production-ready Go package that simplifies batch job processing with a focus on type safety, reliability, and developer experience. Built on top of Asynq, it provides a robust foundation for scheduling and executing recurring tasks in your Go applications.

Key Features

🚀 High Performance

  • Efficient job scheduling with minimal overhead
  • Optimized for high-throughput scenarios
  • Built-in connection pooling and resource management

Developer Experience

  • Intuitive builder pattern API
  • Strongly typed configurations
  • Comprehensive validation out of the box
  • Clear error messages and debugging support

🛡️ Reliability

  • Sophisticated retry mechanism with exponential backoff
  • Job persistence and recovery
  • Graceful shutdown handling
  • Detailed job execution metrics

🔧 Flexibility

  • Multiple scheduling interval options
  • Customizable job parameters
  • Easy integration with existing applications
  • Extensible architecture

Installation

go get github.com/SolomonAIEngineering/backend-core-library/batch

Quick Start

Here's a simple example to get you started:

package main

import (
    "context"
    "log"
    
    batchjob "github.com/SolomonAIEngineering/backend-core-library/batch-job"
    taskprocessor "github.com/SolomonAIEngineering/backend-core-library/task-processor"
    "github.com/hibiken/asynq"
)

func main() {
    // 1. Create a batch job
    task := asynq.NewTask("cleanup", nil)
    job := batchjob.NewBatchJob(
        batchjob.WithBaseConfig(batchjob.BaseConfig{
            Enabled:    true,
            Interval:   "@daily",
            MaxRetries: 3,
        }),
        batchjob.WithTask(task),
        batchjob.WithTaskId(stringPtr("daily-cleanup")),
    )

    // 2. Create batch jobs collection
    jobs := batchjob.NewBatchJobs(
        batchjob.WithBatchJob(job),
    )

    // 3. Initialize the task processor
    processorConfig := taskprocessor.Config{
        RedisAddr: "localhost:6379",
        // Add other configuration options as needed
    }
    processor := taskprocessor.NewTaskProcessor(processorConfig)

    // 4. Initialize and start the batcher
    batcher := batchjob.NewBatcher(
        batchjob.WithBatchJobsRef(jobs),
        batchjob.WithTaskProcessor(processor),
    )

    // 5. Register and start the jobs
    if err := batcher.RegisterRecurringBatchJobs(context.Background()); err != nil {
        log.Fatal(err)
    }
}

func stringPtr(s string) *string {
    return &s
}

Scheduling Options

The library supports various interval patterns:

Standard Intervals

  • @yearly or @annually - Run once a year at midnight of January 1
  • @monthly - Run once a month at midnight of the first day
  • @weekly - Run once a week at midnight of Sunday
  • @daily or @midnight - Run once a day at midnight
  • @hourly - Run once an hour at the beginning of the hour

Custom Intervals

  • Minutes: @every 1m, @every 5m, @every 15m
  • Hours: @every 1h, @every 2h, @every 6h
  • Custom: @every 30s, @every 2h30m

Cron Expressions

job := batchjob.NewBatchJob(
    batchjob.WithBaseConfig(batchjob.BaseConfig{
        Interval: "0 */2 * * *", // Every 2 hours
    }),
    // ... other configurations
)

Advanced Usage

Configuring Retries

job := batchjob.NewBatchJob(
    batchjob.WithBaseConfig(batchjob.BaseConfig{
        MaxRetries: 5,
        RetryDelays: []time.Duration{
            time.Second * 30,
            time.Minute * 1,
            time.Minute * 5,
            time.Minute * 15,
            time.Hour * 1,
        },
    }),
    // ... other configurations
)

Custom Task Parameters

payload := map[string]interface{}{
    "batchSize": 1000,
    "priority":  "high",
}
task := asynq.NewTask("data-processing", payload)

job := batchjob.NewBatchJob(
    batchjob.WithTask(task),
    // ... other configurations
)

Job Monitoring and Metrics

The library provides built-in monitoring capabilities:

type JobMetrics struct {
    SuccessCount   int64
    FailureCount   int64
    LastExecuted   time.Time
    ExecutionTime  time.Duration
    RetryCount     int
}

// Access metrics for a specific job
metrics := batcher.GetJobMetrics("daily-cleanup")

Best Practices

  1. Error Handling: Always implement proper error handling and logging

    if err := batcher.RegisterRecurringBatchJobs(ctx); err != nil {
        log.Printf("Failed to register batch jobs: %v", err)
        // Implement your error handling strategy
    }
    
  2. Resource Management: Properly close resources when shutting down

    defer processor.Shutdown()
    defer batcher.Shutdown(ctx)
    
  3. Configuration: Use environment variables for configuration

    config := batchjob.BaseConfig{
        Enabled:    os.Getenv("BATCH_JOB_ENABLED") == "true",
        Interval:   os.Getenv("BATCH_JOB_INTERVAL"),
        MaxRetries: 3,
    }
    

Contributing

We welcome contributions! Please see our Contributing Guide for details.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support


Built with ❤️ by Solomon AI Engineering

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseConfig

type BaseConfig struct {
	// Enabled determines if the batch job is active and should be processed
	Enabled bool `json:"enabled"`

	// Interval specifies the frequency at which the job should run.
	// Supports cron-like syntax and duration strings (e.g., "@daily", "10s", "1m", "1h")
	Interval string `json:"interval"`

	// MaxRetries defines the maximum number of retry attempts for failed jobs
	MaxRetries int64 `json:"maxRetries"`
}

BaseConfig provides fundamental configuration options for batch jobs. It defines common settings such as job scheduling intervals and retry policies.

func (*BaseConfig) ProcessingInterval

func (c *BaseConfig) ProcessingInterval() taskprocessor.ProcessingInterval

ProcessingInterval converts the string interval configuration into a structured taskprocessor.ProcessingInterval value.

Example:

cfg := BaseConfig{Interval: "@daily"}
interval := cfg.ProcessingInterval() // Returns taskprocessor.EveryDayAtMidnight

func (*BaseConfig) Validate

func (c *BaseConfig) Validate() error

Validate checks the correctness of the base configuration values. Returns an error if any values are invalid.

Validation rules: - MaxRetries must be non-negative - Interval must not be empty

type BatchJob

type BatchJob struct {
	BaseConfig
	// contains filtered or unexported fields
}

BatchJob represents a configurable batch job with its associated task and identifier. It extends BaseConfig with task-specific configuration for asynq processing.

func NewBatchJob

func NewBatchJob(options ...BatchJobConfigOption) *BatchJob

NewBatchJob creates a new BatchJob instance with the provided options. It uses the functional options pattern to allow flexible configuration.

Example:

taskId := "job-123"
job := NewBatchJob(
    WithBaseConfig(baseCfg),
    WithTask(task),
    WithTaskId(&taskId),
)

func (*BatchJob) String

func (c *BatchJob) String() string

String provides a string representation of the BatchJob for debugging and logging purposes.

Example output: "BatchJob: cfg - {BaseConfig}, taskId - abc123, task - {Task}"

func (*BatchJob) Validate

func (c *BatchJob) Validate() error

Validate ensures that the BatchJob is properly configured with all required fields. Returns an error if any required field is missing or if the base configuration is invalid.

type BatchJobConfigOption

type BatchJobConfigOption func(*BatchJob)

BatchJobConfigOption defines a function type for configuring BatchJob instances. This follows the functional options pattern for flexible configuration.

func WithBaseConfig

func WithBaseConfig(cfg BaseConfig) BatchJobConfigOption

WithBaseConfig sets the base configuration for the batch job. This includes common settings like processing interval and retry policy.

Example:

baseCfg := BaseConfig{...}
job := NewBatchJob(WithBaseConfig(baseCfg))

func WithTask

func WithTask(task *asynq.Task) BatchJobConfigOption

WithTask sets the asynq task for the batch job. The task contains the actual work to be performed when the job executes.

Example:

task := asynq.NewTask(...)
job := NewBatchJob(WithTask(task))

func WithTaskId

func WithTaskId(taskId *string) BatchJobConfigOption

WithTaskId sets the unique identifier for the batch job. This ID is used to prevent duplicate job registrations.

Example:

taskId := "unique-job-id"
job := NewBatchJob(WithTaskId(&taskId))

type BatchJobs

type BatchJobs struct {
	// contains filtered or unexported fields
}

BatchJobs manages a collection of batch jobs that can be scheduled and executed. It provides methods for adding and managing individual BatchJob instances.

func NewBatchJobs

func NewBatchJobs(options ...BatchJobsOption) *BatchJobs

NewBatchJobs creates a new BatchJobs instance with the provided options. It uses the functional options pattern to allow flexible configuration.

Example:

jobs := NewBatchJobs(
    WithBatchJob(job1),
    WithBatchJob(job2),
)

func (*BatchJobs) AddJob

func (b *BatchJobs) AddJob(job *BatchJob)

AddJob appends a new BatchJob to the jobs collection. This method can be used to add jobs after initialization.

Example:

jobs := NewBatchJobs()
jobs.AddJob(newJob)

type BatchJobsOption

type BatchJobsOption func(*BatchJobs)

BatchJobsOption defines a function type for configuring BatchJobs instances. This follows the functional options pattern for flexible configuration.

func WithBatchJob

func WithBatchJob(batchJob *BatchJob) BatchJobsOption

WithBatchJob adds a single batch job to the collection. If the jobs slice hasn't been initialized, it will create a new slice.

Example:

job := &BatchJob{...}
jobs := NewBatchJobs(WithBatchJob(job))

func WithBatchJobs

func WithBatchJobs(batchJobs *BatchJobs) BatchJobsOption

WithBatchJobs replaces the entire collection of batch jobs with the provided jobs. This is useful when you want to configure multiple jobs at once.

Example:

existingJobs := &BatchJobs{...}
newJobs := NewBatchJobs(WithBatchJobs(existingJobs))

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

Batcher implements the Runnable interface and provides functionality to register and manage recurring batch jobs.

func NewBatcher

func NewBatcher(options ...BatcherOption) *Batcher

NewBatcher creates a new Batcher instance with the provided options. It uses the functional options pattern to allow flexible configuration.

Example:

batcher := NewBatcher(
    WithBatchJobsRef(jobs),
    WithTaskProcessor(processor),
)

func (*Batcher) RegisterRecurringBatchJobs

func (b *Batcher) RegisterRecurringBatchJobs(ctx context.Context) error

RegisterRecurringBatchJobs implements the Runnable interface. It validates and registers each batch job with the task processor. If a job with the same ID already exists, it will be skipped without error. Any other registration errors will be returned immediately.

type BatcherOption

type BatcherOption func(*Batcher)

BatcherOption defines a function type for configuring a Batcher instance. This follows the functional options pattern for flexible configuration.

func WithBatchJobsRef

func WithBatchJobsRef(batchJobs *BatchJobs) BatcherOption

WithBatchJobsRef sets the batch jobs configuration for the batcher. The BatchJobs parameter contains the collection of jobs to be registered.

Example:

jobs := &BatchJobs{...}
batcher := NewBatcher(WithBatchJobsRef(jobs))

func WithTaskProcessor

func WithTaskProcessor(processor *taskprocessor.TaskProcessor) BatcherOption

WithTaskProcessor sets the task processor that will handle the execution of batch jobs. The task processor is responsible for managing the job queue and executing jobs according to their schedules.

Example:

processor := taskprocessor.New(...)
batcher := NewBatcher(WithTaskProcessor(processor))

type Runnable

type Runnable interface {
	// RegisterRecurringBatchJobs registers all batch jobs with the task processor.
	// It validates each job's configuration before registration and returns an error
	// if any job fails validation or registration.
	//
	// Example:
	//   batcher := NewBatcher(WithBatchJobs(jobs), WithTaskProcessor(processor))
	//   if err := batcher.RegisterRecurringBatchJobs(ctx); err != nil {
	//       log.Fatal(err)
	//   }
	RegisterRecurringBatchJobs(ctx context.Context) error
}

Runnable defines the interface for registering recurring batch jobs. Implementations of this interface are responsible for setting up and configuring batch jobs that need to run on a recurring schedule.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL