Documentation ¶
Index ¶
- func Wrap(j interface{}) job.Interface
- type DeDuplicator
- type GoCraftWorkPool
- func (gcwp *GoCraftWorkPool) CancelJob(jobID string) error
- func (gcwp *GoCraftWorkPool) Enqueue(jobName string, params models.Parameters, isUnique bool) (models.JobStats, error)
- func (gcwp *GoCraftWorkPool) GetJobStats(jobID string) (models.JobStats, error)
- func (gcwp *GoCraftWorkPool) IsKnownJob(name string) (interface{}, bool)
- func (gcwp *GoCraftWorkPool) PeriodicallyEnqueue(jobName string, params models.Parameters, cronSetting string) (models.JobStats, error)
- func (gcwp *GoCraftWorkPool) RegisterHook(jobID string, hookURL string) error
- func (gcwp *GoCraftWorkPool) RegisterJob(name string, j interface{}) error
- func (gcwp *GoCraftWorkPool) RegisterJobs(jobs map[string]interface{}) error
- func (gcwp *GoCraftWorkPool) RetryJob(jobID string) error
- func (gcwp *GoCraftWorkPool) Schedule(jobName string, params models.Parameters, runAfterSeconds uint64, ...) (models.JobStats, error)
- func (gcwp *GoCraftWorkPool) Start() error
- func (gcwp *GoCraftWorkPool) Stats() (models.JobPoolStats, error)
- func (gcwp *GoCraftWorkPool) StopJob(jobID string) error
- func (gcwp *GoCraftWorkPool) ValidateJobParameters(jobType interface{}, params map[string]interface{}) error
- type Interface
- type MessageServer
- type RedisDeDuplicator
- type RedisJob
- type RedisPoolContext
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type DeDuplicator ¶ added in v1.7.0
type DeDuplicator interface { // Check the uniqueness of the unique job and set the unique flag if it is not set yet. // // Parameters: // jobName string : name of the job // params models.Parameters : parameters of the job // // Returns: // If no unique flag and successfully set it, a nil error is returned; // otherwise, a non nil error is returned. Unique(jobName string, params models.Parameters) error // Remove the unique flag after job exiting // Parameters: // jobName string : name of the job // params models.Parameters : parameters of the job // // Returns: // If unique flag is successfully removed, a nil error is returned; // otherwise, a non nil error is returned. DelUniqueSign(jobName string, params models.Parameters) error }
DeDuplicator is designed to handle the uniqueness of the job. Once a job is declared to be unique, the job can be enqueued only if no same job (same job name and parameters) in the queue or running in progress. Adopt the same unique mechanism with the upstream framework.
type GoCraftWorkPool ¶
type GoCraftWorkPool struct {
// contains filtered or unexported fields
}
GoCraftWorkPool is the pool implementation based on gocraft/work powered by redis.
func NewGoCraftWorkPool ¶
func NewGoCraftWorkPool(ctx *env.Context, namespace string, workerCount uint, redisPool *redis.Pool) *GoCraftWorkPool
NewGoCraftWorkPool is constructor of goCraftWorkPool.
func (*GoCraftWorkPool) CancelJob ¶
func (gcwp *GoCraftWorkPool) CancelJob(jobID string) error
CancelJob will cancel the job
func (*GoCraftWorkPool) Enqueue ¶
func (gcwp *GoCraftWorkPool) Enqueue(jobName string, params models.Parameters, isUnique bool) (models.JobStats, error)
Enqueue job
func (*GoCraftWorkPool) GetJobStats ¶
func (gcwp *GoCraftWorkPool) GetJobStats(jobID string) (models.JobStats, error)
GetJobStats return the job stats of the specified enqueued job.
func (*GoCraftWorkPool) IsKnownJob ¶
func (gcwp *GoCraftWorkPool) IsKnownJob(name string) (interface{}, bool)
IsKnownJob ...
func (*GoCraftWorkPool) PeriodicallyEnqueue ¶
func (gcwp *GoCraftWorkPool) PeriodicallyEnqueue(jobName string, params models.Parameters, cronSetting string) (models.JobStats, error)
PeriodicallyEnqueue job
func (*GoCraftWorkPool) RegisterHook ¶
func (gcwp *GoCraftWorkPool) RegisterHook(jobID string, hookURL string) error
RegisterHook registers status hook url sync method
func (*GoCraftWorkPool) RegisterJob ¶
func (gcwp *GoCraftWorkPool) RegisterJob(name string, j interface{}) error
RegisterJob is used to register the job to the pool. j is the type of job
func (*GoCraftWorkPool) RegisterJobs ¶
func (gcwp *GoCraftWorkPool) RegisterJobs(jobs map[string]interface{}) error
RegisterJobs is used to register multiple jobs to pool.
func (*GoCraftWorkPool) RetryJob ¶
func (gcwp *GoCraftWorkPool) RetryJob(jobID string) error
RetryJob retry the job
func (*GoCraftWorkPool) Schedule ¶
func (gcwp *GoCraftWorkPool) Schedule(jobName string, params models.Parameters, runAfterSeconds uint64, isUnique bool) (models.JobStats, error)
Schedule job
func (*GoCraftWorkPool) Start ¶
func (gcwp *GoCraftWorkPool) Start() error
Start to serve Unblock action
func (*GoCraftWorkPool) Stats ¶
func (gcwp *GoCraftWorkPool) Stats() (models.JobPoolStats, error)
Stats of pool
func (*GoCraftWorkPool) StopJob ¶
func (gcwp *GoCraftWorkPool) StopJob(jobID string) error
StopJob will stop the job
func (*GoCraftWorkPool) ValidateJobParameters ¶
func (gcwp *GoCraftWorkPool) ValidateJobParameters(jobType interface{}, params map[string]interface{}) error
ValidateJobParameters ...
type Interface ¶
type Interface interface { // Start to serve // // Return: // error if failed to start Start() error // Register job to the pool. // // name string : job name for referring // job interface{}: job handler which must implement the job.Interface. // // Return: // error if failed to register RegisterJob(name string, job interface{}) error // Register multiple jobs. // // jobs map[string]interface{}: job map, key is job name and value is job handler. // // Return: // error if failed to register RegisterJobs(jobs map[string]interface{}) error // Enqueue job // // jobName string : the name of enqueuing job // params models.Parameters : parameters of enqueuing job // isUnique bool : specify if duplicated job will be discarded // // Returns: // models.JobStats: the stats of enqueuing job if succeed // error : if failed to enqueue Enqueue(jobName string, params models.Parameters, isUnique bool) (models.JobStats, error) // Schedule job to run after the specified interval (seconds). // // jobName string : the name of enqueuing job // runAfterSeconds uint64 : the waiting interval with seconds // params models.Parameters : parameters of enqueuing job // isUnique bool : specify if duplicated job will be discarded // // Returns: // models.JobStats: the stats of enqueuing job if succeed // error : if failed to enqueue Schedule(jobName string, params models.Parameters, runAfterSeconds uint64, isUnique bool) (models.JobStats, error) // Schedule the job periodically running. // // jobName string : the name of enqueuing job // params models.Parameters : parameters of enqueuing job // cronSetting string : the periodic duration with cron style like '0 * * * * *' // // Returns: // models.JobStats: the stats of enqueuing job if succeed // error : if failed to enqueue PeriodicallyEnqueue(jobName string, params models.Parameters, cronSetting string) (models.JobStats, error) // Return the status info of the pool. // // Returns: // models.JobPoolStats : the stats info of all running pools // error : failed to check Stats() (models.JobPoolStats, error) // Check if the job has been already registered. // // name string : name of job // // Returns: // interface{} : the job type of the known job if it's existing // bool : if the known job requires parameters IsKnownJob(name string) (interface{}, bool) ValidateJobParameters(jobType interface{}, params map[string]interface{}) error // Get the stats of the specified job // // jobID string : ID of the enqueued job // // Returns: // models.JobStats : job stats data // error : error returned if meet any problems GetJobStats(jobID string) (models.JobStats, error) // Stop the job // // jobID string : ID of the enqueued job // // Return: // error : error returned if meet any problems StopJob(jobID string) error // Cancel the job // // jobID string : ID of the enqueued job // // Return: // error : error returned if meet any problems CancelJob(jobID string) error // Retry the job // // jobID string : ID of the enqueued job // // Return: // error : error returned if meet any problems RetryJob(jobID string) error // Register hook // // jobID string : ID of job // hookURL string : the hook url // // Return: // error : error returned if meet any problems RegisterHook(jobID string, hookURL string) error }
Interface for worker pool. More like a driver to transparent the lower queue.
type MessageServer ¶
type MessageServer struct {
// contains filtered or unexported fields
}
MessageServer implements the sub/pub mechanism via redis to do async message exchanging.
func NewMessageServer ¶
NewMessageServer creates a new ptr of MessageServer
func (*MessageServer) Subscribe ¶
func (ms *MessageServer) Subscribe(event string, callback interface{}) error
Subscribe event with specified callback
type RedisDeDuplicator ¶ added in v1.7.0
type RedisDeDuplicator struct {
// contains filtered or unexported fields
}
RedisDeDuplicator implement the DeDuplicator interface based on redis.
func NewRedisDeDuplicator ¶ added in v1.7.0
func NewRedisDeDuplicator(ns string, pool *redis.Pool) *RedisDeDuplicator
NewRedisDeDuplicator is constructor of RedisDeDuplicator
func (*RedisDeDuplicator) DelUniqueSign ¶ added in v1.7.0
func (rdd *RedisDeDuplicator) DelUniqueSign(jobName string, params models.Parameters) error
DelUniqueSign delete the job unique sign
func (*RedisDeDuplicator) Unique ¶ added in v1.7.0
func (rdd *RedisDeDuplicator) Unique(jobName string, params models.Parameters) error
Unique checks if the job is unique and set unique flag if it is not set yet.
type RedisJob ¶
type RedisJob struct {
// contains filtered or unexported fields
}
RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis pool.
func NewRedisJob ¶
func NewRedisJob(j interface{}, ctx *env.Context, statsManager opm.JobStatsManager, deDuplicator DeDuplicator) *RedisJob
NewRedisJob is constructor of RedisJob
type RedisPoolContext ¶
type RedisPoolContext struct{}
RedisPoolContext ... We did not use this context to pass context info so far, just a placeholder.