jobworker

package
v0.0.0-...-20cb493 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2024 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// OnError will be called for every error that
	// would also be logged.
	OnError = func(error) {}
)

Functions

func DeleteAllJobsAndBundles

func DeleteAllJobsAndBundles(ctx context.Context) error

func DoJob

func DoJob(ctx context.Context, job *jobqueue.Job) (err error)

DoJob does a job synchronously and sets the job.Result if there was no error or sets job.ErrorMsg and job.ErrorData in addition to returning any error.

The job.ID is added to the context that's passed to the job worker function as golog attribute with the key "jobID".

StartedAt, StoppedAt, and UpdatedAt are not modified.

func FinishThreads

func FinishThreads()

FinishThreads waits until all worker threads have finished their current jobs and stops them before they start working on new jobs.

func IsRegistered

func IsRegistered(jobType string) bool

IsRegistered checks if a worker is registered for the given job type.

func Register

func Register(jobType string, worker Worker)

Register a Worker implementation for a jobType. See also RegisterFunc

func RegisterFunc

func RegisterFunc(workerFunc any)

RegisterFunc uses reflection to register a function with a custom payload argument type as Worker for jobs of type ReflectJobType(arg). The playload JSON of the job will be unmarshalled to the type of the argument.

func RegisterFuncForJobType

func RegisterFuncForJobType(jobType string, workerFunc any)

RegisterFuncForJobType uses reflection to register a function with a custom payload argument type as Worker for jobs of jobType. The playload JSON of the job will be unmarshalled to the type of the argument.

func RegisteredJobTypes

func RegisteredJobTypes() notnull.StringArray

func SetDataBase

func SetDataBase(newDB DataBase)

func StartPollingAvailableJobs

func StartPollingAvailableJobs(interval time.Duration) error

StartPollingAvailableJobs polls the database for available jobs every `interval` duration.

func StartThreads

func StartThreads(ctx context.Context, numThreads int) error

StartThreads starts numThreads new threads that are polling postgresdb for jobs to work on. The passed context does not cancel the started threads.

func StopThreads

func StopThreads(ctx context.Context)

StopThreads stops the threads listening for new jobs.

func Unregister

func Unregister(jobTypes ...string)

Types

type DataBase

type DataBase interface {
	jobqueue.Service

	SetJobAvailableListener(context.Context, func()) error
	StartNextJobOrNil(ctx context.Context) (*jobqueue.Job, error)

	SetJobError(ctx context.Context, jobID uu.ID, errorMsg string, errorData nullable.JSON) error
	SetJobResult(ctx context.Context, jobID uu.ID, result nullable.JSON) error
	SetJobStart(ctx context.Context, jobID uu.ID, startAt time.Time) error

	DeleteJobsFromOrigin(ctx context.Context, origin string) error
	DeleteJobsOfType(ctx context.Context, jobType string) error
	DeleteJobBundlesFromOrigin(ctx context.Context, origin string) error
	DeleteJobBundlesOfType(ctx context.Context, bundleType string) error
	DeleteAllJobsAndBundles(ctx context.Context) error
}

type Worker

type Worker interface {
	DoJob(ctx context.Context, job *jobqueue.Job) (result any, err error)
}

type WorkerFunc

type WorkerFunc func(ctx context.Context, job *jobqueue.Job) (result any, err error)

func (WorkerFunc) DoJob

func (f WorkerFunc) DoJob(ctx context.Context, job *jobqueue.Job) (result any, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL