jobs

package
v0.17.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

README

Alya Batch Package

Alya Jobs Package is a Go library for processing batch jobs and slow queries in a distributed and scalable manner. It provides a framework for registering and executing custom processing functions, managing job statuses, and handling output files.

Table of Contents

Prerequisites

  • PostgreSQL
  • Redis
  • Minio

Installation

Import the package in your code:

import "github.com/remiges-tech/alya/jobs"

JobManager

To start using the Alya Jobs Package, you need to initialize a JobManager instance. The JobManager is responsible for managing the execution of batch jobs and slow queries.

pool := getDb() // Initialize the database connection pool
redisClient := redis.NewClient(&redis.Options{Addr: "localhost:6379"}) // Initialize Redis client
minioClient := createMinioClient() // Initialize Minio client (optional)

jm := jobs.NewJobManager(pool, redisClient, minioClient)

Registering Initializers

Initializers are used to set up any necessary resources or configuration for processing batch jobs or slow queries. You need to register an initializer for each application that will use the Alya Jobs Package.

err := jm.RegisterInitializer("banking", &BankingInitializer{})
if err != nil {
    log.Fatal("Failed to register initializer:", err)
}

Registering Processors

Processors are custom functions that define how batch jobs or slow queries should be processed. You need to register a processor for each operation type within an application.

err := jm.RegisterProcessorBatch("banking", "process_transactions", &TransactionBatchProcessor{})
if err != nil {
    log.Fatal("Failed to register batch processor:", err)
}

err := jm.RegisterProcessorSlowQuery("banking", "generate_statement", &StatementSlowQueryProcessor{})
if err != nil {
    log.Fatal("Failed to register slow query processor:", err)
}

Submitting Batch Jobs

To submit a batch job, use the BatchSubmit method of the JobManager. You need to provide the application name, operation type, batch context, batch input data, and a flag indicating whether to wait before processing.

csvFile := "transactions.csv"
batchInput, err := loadBatchInputFromCSV(csvFile)
if err != nil {
    log.Fatal("Failed to load batch input from CSV:", err)
}

batchID, err := jm.BatchSubmit("banking", "process_transactions", jobs.JSONstr("{}"), batchInput, false)
if err != nil {
    log.Fatal("Failed to submit batch:", err)
}

Submitting Slow Queries

To submit a slow query, use the SlowQuerySubmit method of the JobManager. You need to provide the application name, operation type, query context, and query input data.

context := jobs.JSONstr(`{"accountID": "1234567890"}`)
input := jobs.JSONstr(`{"startDate": "2023-01-01", "endDate": "2023-12-31"}`)

reqID, err := jm.SlowQuerySubmit("banking", "generate_statement", context, input)
if err != nil {
    log.Fatal("Failed to submit slow query:", err)
}

Checking Job Status

To check the status of a batch job or slow query, use the BatchDone or SlowQueryDone method of the JobManager, respectively. These methods return the current status of the job, along with any output files or error messages.

status, batchOutput, outputFiles, nsuccess, nfailed, naborted, err := jm.BatchDone(batchID)
if err != nil {
    log.Fatal("Error while polling for batch status:", err)
}

status, result, messages, err := jm.SlowQueryDone(reqID)
if err != nil {
    log.Fatal("Error while polling for slow query status:", err)
}

Aborting Jobs

To abort a batch job or slow query, use the BatchAbort or SlowQueryAbort method of the JobManager, respectively. These methods will mark the job as aborted and stop any further processing.

err := jm.BatchAbort(batchID)
if err != nil {
    log.Fatal("Failed to abort batch:", err)
}

err := jm.SlowQueryAbort(reqID)
if err != nil {
    log.Fatal("Failed to abort slow query:", err)
}

Example

Here's an example of processing bank transactions from a CSV file:

csvFile := "transactions.csv"
batchInput, err := loadBatchInputFromCSV(csvFile)
if err != nil {
    log.Fatal("Failed to load batch input from CSV:", err)
}

batchID, err := jm.BatchSubmit("banking", "process_transactions", jobs.JSONstr("{}"), batchInput, false)
if err != nil {
    log.Fatal("Failed to submit batch:", err)
}

go jm.Run()

status, _, outputFiles, nsuccess, nfailed, naborted, err := jm.BatchDone(batchID)
if err != nil {
    log.Fatal("Error while polling for batch status:", err)
}

fmt.Println("Batch completed with status:", status)
fmt.Println("Output files:", outputFiles)
fmt.Println("Success count:", nsuccess)
fmt.Println("Failed count:", nfailed)
fmt.Println("Aborted count:", naborted)

You can find more examples in the examples directory of the package.

Configuration

The Alya Jobs Package uses config parameters:

  • ALYA_BATCHCHUNK_NROWS: The number of rows to fetch in each batch chunk (default: 10).
  • ALYA_BATCHSTATUS_CACHEDUR_SEC: The duration (in seconds) for which batch status is cached in Redis (default: 100).

Documentation

Index

Constants

View Source
const ALYA_BATCHCHUNK_NROWS = 10
View Source
const ALYA_BATCHSTATUS_CACHEDUR_SEC = 60

Variables

View Source
var ErrInitializerAlreadyRegistered = errors.New("initializer already registered for this app")
View Source
var ErrProcessorAlreadyRegistered = errors.New("processor already registered for this app and operation")

ErrProcessorAlreadyRegistered is returned when attempting to register a second processor for the same (app, op) combination.

Functions

func GetBatchStatusRedisKey

func GetBatchStatusRedisKey(batchID string) string

func MigrateDatabase

func MigrateDatabase(conn *pgx.Conn) error

MigrateDatabase runs the migrations using Tern.

Types

type Batch

type Batch struct {
	Db          *pgxpool.Pool
	Queries     batchsqlc.Querier
	RedisClient *redis.Client
}

type BatchDetails_t added in v0.14.0

type BatchDetails_t struct {
	ID          string
	App         string
	Op          string
	Context     JSONstr
	Status      batchsqlc.StatusEnum
	OutputFiles map[string]string
	NSuccess    int
	NFailed     int
	NAborted    int
}

BatchDetails_t struct

type BatchInput_t

type BatchInput_t struct {
	Line  int     `json:"line"`
	Input JSONstr `json:"input"`
}

BatchInput_t represents a single input row for a batch job.

type BatchJob_t

type BatchJob_t struct {
	App     string
	Op      string
	Batch   uuid.UUID
	RowID   int
	Context JSONstr
	Line    int
	Input   JSONstr
}

type BatchOutput_t

type BatchOutput_t struct {
	Line     int
	Status   BatchStatus_t
	Res      JSONstr
	Messages JSONstr
}

type BatchProcessor

type BatchProcessor interface {
	DoBatchJob(InitBlock InitBlock, context JSONstr, line int, input JSONstr) (status batchsqlc.StatusEnum, result JSONstr, messages []wscutils.ErrorMessage, blobRows map[string]string, err error)
	MarkDone(InitBlock InitBlock, context JSONstr, details BatchDetails_t) error
}

type BatchStatus_t

type BatchStatus_t int
const (
	BatchTryLater BatchStatus_t = iota
	BatchSuccess
	BatchFailed
	BatchAborted
	BatchWait
	BatchQueued
	BatchInProgress
)

type InitBlock

type InitBlock interface {
	Close() error
}

maybe combine initblock and initializer InitBlock is used to store and manage resources needed for processing batch jobs and slow queries.

type Initializer

type Initializer interface {
	Init(app string) (InitBlock, error)
}

Initializer is an interface that allows applications to initialize and provide any necessary resources or configuration for batch processing or slow queries. Implementers of this interface should define a struct that holds the required resources, and provide an implementation for the Init method to create and initialize an instance of that struct (InitBlock).

The Init method is expected to return an InitBlock that can be used by the processing functions (BatchProcessor or SlowQueryProcessor) to access the initialized resources.

type JSONstr

type JSONstr struct {
	// contains filtered or unexported fields
}

func NewJSONstr

func NewJSONstr(s string) (JSONstr, error)

JSONstr is a custom type that represents a JSON string. It provides methods to create a new JSONstr from a string, convert it back to a string, and check if it contains valid JSON.

func (JSONstr) IsValid

func (j JSONstr) IsValid() bool

IsValid returns true if the JSONstr is valid, false otherwise.

func (JSONstr) String

func (j JSONstr) String() string

String returns the string representation of the JSONstr.

type JobManager

type JobManager struct {
	Db          *pgxpool.Pool
	Queries     batchsqlc.Querier
	RedisClient *redis.Client
	ObjStore    objstore.ObjectStore

	Logger *logharbour.Logger
	Config JobManagerConfig
	// contains filtered or unexported fields
}

JobManager is the main struct that manages the processing of batch jobs and slow queries. It is responsible for fetching jobs from the database, processing them using the registered processors. Life cycle of a batch job or slow query is as follows: 1. Fetch a block of rows from the database 2. Process each row in the block 3. Update the corresponding batchrows and batches records with the results 4. Check for completed batches and summarize them

func NewJobManager

func NewJobManager(db *pgxpool.Pool, redisClient *redis.Client, minioClient *minio.Client, logger *logharbour.Logger, config *JobManagerConfig) *JobManager

NewJobManager creates a new instance of JobManager. It initializes the necessary fields and returns a pointer to the JobManager.

func (*JobManager) BatchAbort

func (jm *JobManager) BatchAbort(batchID string) (status batchsqlc.StatusEnum, nsuccess, nfailed, naborted int, err error)

func (*JobManager) BatchAppend

func (jm *JobManager) BatchAppend(batchID string, batchinput []batchsqlc.InsertIntoBatchRowsParams, waitabit bool) (nrows int, err error)

func (*JobManager) BatchDone

func (jm *JobManager) BatchDone(batchID string) (status batchsqlc.StatusEnum, batchOutput []BatchOutput_t, outputFiles map[string]string, nsuccess, nfailed, naborted int, err error)

func (*JobManager) BatchSubmit

func (jm *JobManager) BatchSubmit(app, op string, batchctx JSONstr, batchInput []BatchInput_t, waitabit bool) (batchID string, err error)

BatchSubmit submits a new batch for processing. It generates a unique batch ID, inserts a record into the "batches" table, and inserts multiple records into the "batchrows" table corresponding to the provided batch input. The batch is then picked up and processed by the JobManager's worker goroutines spawned by Run(). Note that the operation or task to be performed on each batch row (value is converted to lowercase). The 'waitabit' parameter determines the initial status of the batch. If 'waitabit' is true, the batch status will be set to 'wait', indicating that the batch should be held back from immediate processing. If 'waitabit' is false, the batch status will be set to 'queued', making it available for processing.

func (*JobManager) RegisterInitializer

func (jm *JobManager) RegisterInitializer(app string, initializer Initializer) error

RegisterInitializer registers an initializer for a specific application. The initializer is responsible for initializing any required resources or state needed for processing batches or slow queries for that application.

The initializer will be called by Alya to create an InitBlock instance that can be used by the processing functions (BatchProcessor or SlowQueryProcessor) to access any necessary resources or configuration for the application.

Applications must register an initializer before registering any batch processor or slow query processor. It allows for proper initialization and cleanup of resources used by the processing functions.

func (*JobManager) RegisterProcessorBatch

func (jm *JobManager) RegisterProcessorBatch(app string, op string, p BatchProcessor) error

RegisterProcessorBatch allows applications to register a processing function for a specific batch operation type. The processing function implements the BatchProcessor interface. Each (app, op) combination can only have one registered processor. Attempting to register a second processor for the same combination will result in an error. The 'op' parameter is case-insensitive and will be converted to lowercase before registration.

func (*JobManager) RegisterProcessorSlowQuery

func (jm *JobManager) RegisterProcessorSlowQuery(app string, op string, p SlowQueryProcessor) error

RegisterProcessorSlowQuery allows applications to register a processing function for a specific operation type. The processing function implements the SlowQueryProcessor interface. Each (app, op) combination can only have one registered processor. Attempting to register a second processor for the same combination will result in an error. The 'op' parameter is case-insensitive and will be converted to lowercase before registration.

func (*JobManager) Run

func (jm *JobManager) Run()

Run is the main loop of the JobManager. It continuously fetches a block of rows from the database, processes each row either as a slow query or a batch job. After processing a block, it checks for completed batches and summarizes them. Fetching, processing and updating happens in the same transaction. This method should be called in a separate goroutine. It is thread safe -- updates to database and Redis are executed atomically (check updateStatusInRedis()).

func (*JobManager) SlowQueryAbort

func (jm *JobManager) SlowQueryAbort(reqID string) (err error)

func (*JobManager) SlowQueryDone

func (jm *JobManager) SlowQueryDone(reqID string) (status BatchStatus_t, result JSONstr, messages []wscutils.ErrorMessage, outputfiles map[string]string, err error)

func (*JobManager) SlowQuerySubmit

func (jm *JobManager) SlowQuerySubmit(app, op string, inputContext, input JSONstr) (reqID string, err error)

func (*JobManager) WaitOff

func (jm *JobManager) WaitOff(batchID string) (string, int, error)

type JobManagerConfig

type JobManagerConfig struct {
	BatchChunkNRows        int    // number of rows to send to the batch processor in each chunk
	BatchStatusCacheDurSec int    // duration in seconds to cache the batch status
	BatchOutputBucket      string // bucket name for batch files
}

JobManagerConfig holds the configuration for the job manager.

type SlowQuery

type SlowQuery struct {
	Db          *pgxpool.Pool
	Queries     batchsqlc.Querier
	RedisClient *redis.Client
}

type SlowQueryProcessor

type SlowQueryProcessor interface {
	DoSlowQuery(InitBlock InitBlock, context JSONstr, input JSONstr) (status batchsqlc.StatusEnum, result JSONstr, messages []wscutils.ErrorMessage, outputFiles map[string]string, err error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL