pool

package
v1.7.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2020 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Wrap

func Wrap(j interface{}) job.Interface

Wrap returns a new job.Interface based on the wrapped job handler reference.

Types

type DeDuplicator added in v1.7.0

type DeDuplicator interface {
	// Check the uniqueness of the unique job and set the unique flag if it is not set yet.
	//
	// Parameters:
	//  jobName string           : name of the job
	//  params models.Parameters : parameters of the job
	//
	// Returns:
	//  If no unique flag and successfully set it, a nil error is returned;
	//  otherwise, a non nil error is returned.
	Unique(jobName string, params models.Parameters) error

	// Remove the unique flag after job exiting
	// Parameters:
	//  jobName string           : name of the job
	//  params models.Parameters : parameters of the job
	//
	// Returns:
	//  If unique flag is successfully removed, a nil error is returned;
	//  otherwise, a non nil error is returned.
	DelUniqueSign(jobName string, params models.Parameters) error
}

DeDuplicator is designed to handle the uniqueness of the job. Once a job is declared to be unique, the job can be enqueued only if no same job (same job name and parameters) in the queue or running in progress. Adopt the same unique mechanism with the upstream framework.

type GoCraftWorkPool

type GoCraftWorkPool struct {
	// contains filtered or unexported fields
}

GoCraftWorkPool is the pool implementation based on gocraft/work powered by redis.

func NewGoCraftWorkPool

func NewGoCraftWorkPool(ctx *env.Context, namespace string, workerCount uint, redisPool *redis.Pool) *GoCraftWorkPool

NewGoCraftWorkPool is constructor of goCraftWorkPool.

func (*GoCraftWorkPool) CancelJob

func (gcwp *GoCraftWorkPool) CancelJob(jobID string) error

CancelJob will cancel the job

func (*GoCraftWorkPool) Enqueue

func (gcwp *GoCraftWorkPool) Enqueue(jobName string, params models.Parameters, isUnique bool) (models.JobStats, error)

Enqueue job

func (*GoCraftWorkPool) GetJobStats

func (gcwp *GoCraftWorkPool) GetJobStats(jobID string) (models.JobStats, error)

GetJobStats return the job stats of the specified enqueued job.

func (*GoCraftWorkPool) IsKnownJob

func (gcwp *GoCraftWorkPool) IsKnownJob(name string) (interface{}, bool)

IsKnownJob ...

func (*GoCraftWorkPool) PeriodicallyEnqueue

func (gcwp *GoCraftWorkPool) PeriodicallyEnqueue(jobName string, params models.Parameters, cronSetting string) (models.JobStats, error)

PeriodicallyEnqueue job

func (*GoCraftWorkPool) RegisterHook

func (gcwp *GoCraftWorkPool) RegisterHook(jobID string, hookURL string) error

RegisterHook registers status hook url sync method

func (*GoCraftWorkPool) RegisterJob

func (gcwp *GoCraftWorkPool) RegisterJob(name string, j interface{}) error

RegisterJob is used to register the job to the pool. j is the type of job

func (*GoCraftWorkPool) RegisterJobs

func (gcwp *GoCraftWorkPool) RegisterJobs(jobs map[string]interface{}) error

RegisterJobs is used to register multiple jobs to pool.

func (*GoCraftWorkPool) RetryJob

func (gcwp *GoCraftWorkPool) RetryJob(jobID string) error

RetryJob retry the job

func (*GoCraftWorkPool) Schedule

func (gcwp *GoCraftWorkPool) Schedule(jobName string, params models.Parameters, runAfterSeconds uint64, isUnique bool) (models.JobStats, error)

Schedule job

func (*GoCraftWorkPool) Start

func (gcwp *GoCraftWorkPool) Start() error

Start to serve Unblock action

func (*GoCraftWorkPool) Stats

func (gcwp *GoCraftWorkPool) Stats() (models.JobPoolStats, error)

Stats of pool

func (*GoCraftWorkPool) StopJob

func (gcwp *GoCraftWorkPool) StopJob(jobID string) error

StopJob will stop the job

func (*GoCraftWorkPool) ValidateJobParameters

func (gcwp *GoCraftWorkPool) ValidateJobParameters(jobType interface{}, params map[string]interface{}) error

ValidateJobParameters ...

type Interface

type Interface interface {
	// Start to serve
	//
	// Return:
	//  error if failed to start
	Start() error

	// Register job to the pool.
	//
	// name	string     : job name for referring
	// job	interface{}: job handler which must implement the job.Interface.
	//
	// Return:
	//  error if failed to register
	RegisterJob(name string, job interface{}) error

	// Register multiple jobs.
	//
	// jobs	map[string]interface{}: job map, key is job name and value is job handler.
	//
	// Return:
	//  error if failed to register
	RegisterJobs(jobs map[string]interface{}) error

	// Enqueue job
	//
	// jobName string           : the name of enqueuing job
	// params models.Parameters : parameters of enqueuing job
	// isUnique bool            : specify if duplicated job will be discarded
	//
	// Returns:
	//  models.JobStats: the stats of enqueuing job if succeed
	//  error          : if failed to enqueue
	Enqueue(jobName string, params models.Parameters, isUnique bool) (models.JobStats, error)

	// Schedule job to run after the specified interval (seconds).
	//
	// jobName string           : the name of enqueuing job
	// runAfterSeconds uint64   : the waiting interval with seconds
	// params models.Parameters : parameters of enqueuing job
	// isUnique bool            : specify if duplicated job will be discarded
	//
	// Returns:
	//  models.JobStats: the stats of enqueuing job if succeed
	//  error          : if failed to enqueue
	Schedule(jobName string, params models.Parameters, runAfterSeconds uint64, isUnique bool) (models.JobStats, error)

	// Schedule the job periodically running.
	//
	// jobName string           : the name of enqueuing job
	// params models.Parameters : parameters of enqueuing job
	// cronSetting string       : the periodic duration with cron style like '0 * * * * *'
	//
	// Returns:
	//  models.JobStats: the stats of enqueuing job if succeed
	//  error          : if failed to enqueue
	PeriodicallyEnqueue(jobName string, params models.Parameters, cronSetting string) (models.JobStats, error)

	// Return the status info of the pool.
	//
	// Returns:
	//  models.JobPoolStats : the stats info of all running pools
	//  error               :  failed to check
	Stats() (models.JobPoolStats, error)

	// Check if the job has been already registered.
	//
	// name string : name of job
	//
	// Returns:
	// interface{} : the job type of the known job if it's existing
	// bool        : if the known job requires parameters
	IsKnownJob(name string) (interface{}, bool)

	ValidateJobParameters(jobType interface{}, params map[string]interface{}) error

	// Get the stats of the specified job
	//
	// jobID string : ID of the enqueued job
	//
	// Returns:
	//  models.JobStats : job stats data
	//  error           : error returned if meet any problems
	GetJobStats(jobID string) (models.JobStats, error)

	// Stop the job
	//
	// jobID string : ID of the enqueued job
	//
	// Return:
	//  error           : error returned if meet any problems
	StopJob(jobID string) error

	// Cancel the job
	//
	// jobID string : ID of the enqueued job
	//
	// Return:
	//  error           : error returned if meet any problems
	CancelJob(jobID string) error

	// Retry the job
	//
	// jobID string : ID of the enqueued job
	//
	// Return:
	//  error           : error returned if meet any problems
	RetryJob(jobID string) error

	// Register hook
	//
	// jobID string   : ID of job
	// hookURL string : the hook url
	//
	// Return:
	//  error        : error returned if meet any problems
	RegisterHook(jobID string, hookURL string) error
}

Interface for worker pool. More like a driver to transparent the lower queue.

type MessageServer

type MessageServer struct {
	// contains filtered or unexported fields
}

MessageServer implements the sub/pub mechanism via redis to do async message exchanging.

func NewMessageServer

func NewMessageServer(ctx context.Context, namespace string, redisPool *redis.Pool) *MessageServer

NewMessageServer creates a new ptr of MessageServer

func (*MessageServer) Start

func (ms *MessageServer) Start() error

Start to serve

func (*MessageServer) Subscribe

func (ms *MessageServer) Subscribe(event string, callback interface{}) error

Subscribe event with specified callback

type RedisDeDuplicator added in v1.7.0

type RedisDeDuplicator struct {
	// contains filtered or unexported fields
}

RedisDeDuplicator implement the DeDuplicator interface based on redis.

func NewRedisDeDuplicator added in v1.7.0

func NewRedisDeDuplicator(ns string, pool *redis.Pool) *RedisDeDuplicator

NewRedisDeDuplicator is constructor of RedisDeDuplicator

func (*RedisDeDuplicator) DelUniqueSign added in v1.7.0

func (rdd *RedisDeDuplicator) DelUniqueSign(jobName string, params models.Parameters) error

DelUniqueSign delete the job unique sign

func (*RedisDeDuplicator) Unique added in v1.7.0

func (rdd *RedisDeDuplicator) Unique(jobName string, params models.Parameters) error

Unique checks if the job is unique and set unique flag if it is not set yet.

type RedisJob

type RedisJob struct {
	// contains filtered or unexported fields
}

RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis pool.

func NewRedisJob

func NewRedisJob(j interface{}, ctx *env.Context, statsManager opm.JobStatsManager, deDuplicator DeDuplicator) *RedisJob

NewRedisJob is constructor of RedisJob

func (*RedisJob) Run

func (rj *RedisJob) Run(j *work.Job) error

Run the job

type RedisPoolContext

type RedisPoolContext struct{}

RedisPoolContext ... We did not use this context to pass context info so far, just a placeholder.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL