Versions in this module Expand all Collapse all v1 v1.0.2 Mar 24, 2022 v1.0.1 Mar 13, 2022 Changes in this version + const DefaultChunkSize + const DefaultJobPoolSize + const DefaultMaxPartitionSize + const DefaultMinPartitionSize + const DefaultPartitions + const DefaultStepTaskPoolSize + const ErrCodeConcurrency + const ErrCodeDbFail + const ErrCodeGeneral + const ErrCodeRetry + const ErrCodeStop + const FileItemReaderCurrentIndex + const FileItemReaderEnd + const FileItemReaderFileNameKey + const FileItemReaderHandleKey + const FileItemReaderStart + const FileItemWriterFileNameKey + const FileItemWriterHandleKey + const ItemReaderCurrentIndex + const ItemReaderKeyList + const ItemReaderMaxIndex + const PartitionStepPartitionsKey + func NewJob(name string, steps ...Step) *jobBuilder + func NewStep(name string, handler ...interface{}) *stepBuilder + func Register(job Job) error + func Restart(ctx context.Context, jobId interface{}) (int64, error) + func RestartAsync(ctx context.Context, jobId interface{}) (int64, error) + func SetDB(sqlDb *sql.DB) + func SetLogger(l logs.Logger) + func SetMaxRunningJobs(size int) + func SetMaxRunningSteps(size int) + func SetTransactionManager(txMgr TransactionManager) + func Start(ctx context.Context, jobName string, params string) (int64, error) + func StartAsync(ctx context.Context, jobName string, params string) (int64, error) + func Stop(ctx context.Context, jobId interface{}) error + func Unregister(job Job) + type Aggregator interface + Aggregate func(execution *StepExecution, subExecutions []*StepExecution) BatchError + type BatchContext struct + func NewBatchContext() *BatchContext + func (ctx *BatchContext) DeepCopy() *BatchContext + func (ctx *BatchContext) Exists(key string) bool + func (ctx *BatchContext) Get(key string, def ...interface{}) interface{} + func (ctx *BatchContext) GetBool(key string, def ...bool) (bool, error) + func (ctx *BatchContext) GetInt(key string, def ...int) (int, error) + func (ctx *BatchContext) GetInt64(key string, def ...int64) (int64, error) + func (ctx *BatchContext) GetString(key string, def ...string) (string, error) + func (ctx *BatchContext) MarshalJSON() ([]byte, error) + func (ctx *BatchContext) Merge(other *BatchContext) + func (ctx *BatchContext) Put(key string, value interface{}) + func (ctx *BatchContext) Remove(key string) + func (ctx *BatchContext) UnmarshalJSON(b []byte) error + type BatchError interface + Code func() string + Error func() string + Message func() string + StackTrace func() string + func NewBatchError(code string, msg string, args ...interface{}) BatchError + type ChunkContext struct + End bool + StepExecution *StepExecution + Tx interface{} + type ChunkListener interface + AfterChunk func(context *ChunkContext) BatchError + BeforeChunk func(context *ChunkContext) BatchError + OnError func(context *ChunkContext, err BatchError) + type DefaultTxManager struct + func (tm *DefaultTxManager) BeginTx() (interface{}, BatchError) + func (tm *DefaultTxManager) Commit(tx interface{}) BatchError + func (tm *DefaultTxManager) Rollback(tx interface{}) BatchError + type FilePath struct + NamePattern string + func (f *FilePath) Format(execution *StepExecution) (string, error) + type Future interface + Get func() (interface{}, error) + type Handler interface + Handle func(execution *StepExecution) BatchError + type ItemReader interface + ReadItem func(key interface{}) (interface{}, error) + ReadKeys func() ([]interface{}, error) + type Job interface + GetSteps func() []Step + Name func() string + Start func(ctx context.Context, execution *JobExecution) BatchError + Stop func(ctx context.Context, execution *JobExecution) BatchError + type JobExecution struct + CreateTime time.Time + EndTime time.Time + FailError BatchError + JobContext *BatchContext + JobExecutionId int64 + JobInstanceId int64 + JobName string + JobParams map[string]interface{} + JobStatus status.BatchStatus + StartTime time.Time + StepExecutions []*StepExecution + Version int64 + func (e *JobExecution) AddStepExecution(execution *StepExecution) + type JobListener interface + AfterJob func(execution *JobExecution) BatchError + BeforeJob func(execution *JobExecution) BatchError + type OpenCloser interface + Close func(execution *StepExecution) BatchError + Open func(execution *StepExecution) BatchError + type PartitionListener interface + AfterPartition func(execution *StepExecution, subExecutions []*StepExecution) BatchError + BeforePartition func(execution *StepExecution) BatchError + OnError func(execution *StepExecution, err BatchError) + type Partitioner interface + GetPartitionNames func(execution *StepExecution, partitions uint) []string + Partition func(execution *StepExecution, partitions uint) ([]*StepExecution, BatchError) + type PartitionerFactory interface + GetPartitioner func(minPartitionSize, maxPartitionSize uint) Partitioner + type Processor interface + Process func(item interface{}, chunkCtx *ChunkContext) (interface{}, BatchError) + type Reader interface + Read func(chunkCtx *ChunkContext) (interface{}, BatchError) + type Step interface + Exec func(ctx context.Context, execution *StepExecution) BatchError + Name func() string + type StepExecution struct + CommitCount int64 + CreateTime time.Time + EndTime time.Time + FailError BatchError + FilterCount int64 + JobExecution *JobExecution + LastUpdated time.Time + ProcessSkipCount int64 + ReadCount int64 + ReadSkipCount int64 + RollbackCount int64 + StartTime time.Time + StepContext *BatchContext + StepContextId int64 + StepExecutionContext *BatchContext + StepExecutionId int64 + StepName string + StepStatus status.BatchStatus + Version int64 + WriteCount int64 + WriteSkipCount int64 + type StepListener interface + AfterStep func(execution *StepExecution) BatchError + BeforeStep func(execution *StepExecution) BatchError + type Task func(execution *StepExecution) BatchError + type TransactionManager interface + BeginTx func() (tx interface{}, err BatchError) + Commit func(tx interface{}) BatchError + Rollback func(tx interface{}) BatchError + func NewTransactionManager(db *sql.DB) TransactionManager + type Writer interface + Write func(items []interface{}, chunkCtx *ChunkContext) BatchError