Documentation ¶
Index ¶
- Constants
- Variables
- func GetBatchStatusRedisKey(batchID string) string
- func MigrateDatabase(conn *pgx.Conn) error
- type Batch
- type BatchDetails_t
- type BatchInput_t
- type BatchJob_t
- type BatchOutput_t
- type BatchProcessor
- type BatchStatus_t
- type InitBlock
- type Initializer
- type JSONstr
- type JobManager
- func (jm *JobManager) BatchAbort(batchID string) (status batchsqlc.StatusEnum, nsuccess, nfailed, naborted int, err error)
- func (jm *JobManager) BatchAppend(batchID string, batchinput []batchsqlc.InsertIntoBatchRowsParams, ...) (nrows int, err error)
- func (jm *JobManager) BatchDone(batchID string) (status batchsqlc.StatusEnum, batchOutput []BatchOutput_t, ...)
- func (jm *JobManager) BatchSubmit(app, op string, batchctx JSONstr, batchInput []BatchInput_t, waitabit bool) (batchID string, err error)
- func (jm *JobManager) RegisterInitializer(app string, initializer Initializer) error
- func (jm *JobManager) RegisterProcessorBatch(app string, op string, p BatchProcessor) error
- func (jm *JobManager) RegisterProcessorSlowQuery(app string, op string, p SlowQueryProcessor) error
- func (jm *JobManager) Run()
- func (jm *JobManager) SlowQueryAbort(reqID string) (err error)
- func (jm *JobManager) SlowQueryDone(reqID string) (status BatchStatus_t, result JSONstr, messages []wscutils.ErrorMessage, ...)
- func (jm *JobManager) SlowQuerySubmit(app, op string, inputContext, input JSONstr) (reqID string, err error)
- func (jm *JobManager) WaitOff(batchID string) (string, int, error)
- type JobManagerConfig
- type SlowQuery
- type SlowQueryProcessor
Constants ¶
const ALYA_BATCHCHUNK_NROWS = 10
const ALYA_BATCHSTATUS_CACHEDUR_SEC = 60
Variables ¶
var ErrInitializerAlreadyRegistered = errors.New("initializer already registered for this app")
var ErrProcessorAlreadyRegistered = errors.New("processor already registered for this app and operation")
ErrProcessorAlreadyRegistered is returned when attempting to register a second processor for the same (app, op) combination.
Functions ¶
func GetBatchStatusRedisKey ¶
func MigrateDatabase ¶
func MigrateDatabase(conn *pgx.Conn) error
MigrateDatabase runs the migrations using Tern.
Types ¶
type BatchDetails_t ¶ added in v0.14.0
type BatchDetails_t struct { ID string App string Op string Context JSONstr Status batchsqlc.StatusEnum OutputFiles map[string]string NSuccess int NFailed int NAborted int }
BatchDetails_t struct
type BatchInput_t ¶
BatchInput_t represents a single input row for a batch job.
type BatchJob_t ¶
type BatchOutput_t ¶
type BatchOutput_t struct { Line int Status BatchStatus_t Res JSONstr Messages JSONstr }
type BatchProcessor ¶
type BatchProcessor interface { DoBatchJob(InitBlock InitBlock, context JSONstr, line int, input JSONstr) (status batchsqlc.StatusEnum, result JSONstr, messages []wscutils.ErrorMessage, blobRows map[string]string, err error) MarkDone(InitBlock InitBlock, context JSONstr, details BatchDetails_t) error }
type BatchStatus_t ¶
type BatchStatus_t int
const ( BatchTryLater BatchStatus_t = iota BatchSuccess BatchFailed BatchAborted BatchWait BatchQueued BatchInProgress )
type InitBlock ¶
type InitBlock interface {
Close() error
}
maybe combine initblock and initializer InitBlock is used to store and manage resources needed for processing batch jobs and slow queries.
type Initializer ¶
Initializer is an interface that allows applications to initialize and provide any necessary resources or configuration for batch processing or slow queries. Implementers of this interface should define a struct that holds the required resources, and provide an implementation for the Init method to create and initialize an instance of that struct (InitBlock).
The Init method is expected to return an InitBlock that can be used by the processing functions (BatchProcessor or SlowQueryProcessor) to access the initialized resources.
type JSONstr ¶
type JSONstr struct {
// contains filtered or unexported fields
}
func NewJSONstr ¶
JSONstr is a custom type that represents a JSON string. It provides methods to create a new JSONstr from a string, convert it back to a string, and check if it contains valid JSON.
type JobManager ¶
type JobManager struct { Db *pgxpool.Pool Queries batchsqlc.Querier RedisClient *redis.Client ObjStore objstore.ObjectStore Logger *logharbour.Logger Config JobManagerConfig // contains filtered or unexported fields }
JobManager is the main struct that manages the processing of batch jobs and slow queries. It is responsible for fetching jobs from the database, processing them using the registered processors. Life cycle of a batch job or slow query is as follows: 1. Fetch a block of rows from the database 2. Process each row in the block 3. Update the corresponding batchrows and batches records with the results 4. Check for completed batches and summarize them
func NewJobManager ¶
func NewJobManager(db *pgxpool.Pool, redisClient *redis.Client, minioClient *minio.Client, logger *logharbour.Logger, config *JobManagerConfig) *JobManager
NewJobManager creates a new instance of JobManager. It initializes the necessary fields and returns a pointer to the JobManager.
func (*JobManager) BatchAbort ¶
func (jm *JobManager) BatchAbort(batchID string) (status batchsqlc.StatusEnum, nsuccess, nfailed, naborted int, err error)
func (*JobManager) BatchAppend ¶
func (jm *JobManager) BatchAppend(batchID string, batchinput []batchsqlc.InsertIntoBatchRowsParams, waitabit bool) (nrows int, err error)
func (*JobManager) BatchDone ¶
func (jm *JobManager) BatchDone(batchID string) (status batchsqlc.StatusEnum, batchOutput []BatchOutput_t, outputFiles map[string]string, nsuccess, nfailed, naborted int, err error)
func (*JobManager) BatchSubmit ¶
func (jm *JobManager) BatchSubmit(app, op string, batchctx JSONstr, batchInput []BatchInput_t, waitabit bool) (batchID string, err error)
BatchSubmit submits a new batch for processing. It generates a unique batch ID, inserts a record into the "batches" table, and inserts multiple records into the "batchrows" table corresponding to the provided batch input. The batch is then picked up and processed by the JobManager's worker goroutines spawned by Run(). Note that the operation or task to be performed on each batch row (value is converted to lowercase). The 'waitabit' parameter determines the initial status of the batch. If 'waitabit' is true, the batch status will be set to 'wait', indicating that the batch should be held back from immediate processing. If 'waitabit' is false, the batch status will be set to 'queued', making it available for processing.
func (*JobManager) RegisterInitializer ¶
func (jm *JobManager) RegisterInitializer(app string, initializer Initializer) error
RegisterInitializer registers an initializer for a specific application. The initializer is responsible for initializing any required resources or state needed for processing batches or slow queries for that application.
The initializer will be called by Alya to create an InitBlock instance that can be used by the processing functions (BatchProcessor or SlowQueryProcessor) to access any necessary resources or configuration for the application.
Applications must register an initializer before registering any batch processor or slow query processor. It allows for proper initialization and cleanup of resources used by the processing functions.
func (*JobManager) RegisterProcessorBatch ¶
func (jm *JobManager) RegisterProcessorBatch(app string, op string, p BatchProcessor) error
RegisterProcessorBatch allows applications to register a processing function for a specific batch operation type. The processing function implements the BatchProcessor interface. Each (app, op) combination can only have one registered processor. Attempting to register a second processor for the same combination will result in an error. The 'op' parameter is case-insensitive and will be converted to lowercase before registration.
func (*JobManager) RegisterProcessorSlowQuery ¶
func (jm *JobManager) RegisterProcessorSlowQuery(app string, op string, p SlowQueryProcessor) error
RegisterProcessorSlowQuery allows applications to register a processing function for a specific operation type. The processing function implements the SlowQueryProcessor interface. Each (app, op) combination can only have one registered processor. Attempting to register a second processor for the same combination will result in an error. The 'op' parameter is case-insensitive and will be converted to lowercase before registration.
func (*JobManager) Run ¶
func (jm *JobManager) Run()
Run is the main loop of the JobManager. It continuously fetches a block of rows from the database, processes each row either as a slow query or a batch job. After processing a block, it checks for completed batches and summarizes them. Fetching, processing and updating happens in the same transaction. This method should be called in a separate goroutine. It is thread safe -- updates to database and Redis are executed atomically (check updateStatusInRedis()).
func (*JobManager) SlowQueryAbort ¶
func (jm *JobManager) SlowQueryAbort(reqID string) (err error)
func (*JobManager) SlowQueryDone ¶
func (jm *JobManager) SlowQueryDone(reqID string) (status BatchStatus_t, result JSONstr, messages []wscutils.ErrorMessage, outputfiles map[string]string, err error)
func (*JobManager) SlowQuerySubmit ¶
func (jm *JobManager) SlowQuerySubmit(app, op string, inputContext, input JSONstr) (reqID string, err error)
type JobManagerConfig ¶
type JobManagerConfig struct { BatchChunkNRows int // number of rows to send to the batch processor in each chunk BatchStatusCacheDurSec int // duration in seconds to cache the batch status BatchOutputBucket string // bucket name for batch files }
JobManagerConfig holds the configuration for the job manager.