Documentation ¶
Overview ¶
Package tasq provides a task queue implementation compapible with multiple repositories
Index ¶
- Variables
- func NoopLogger() *log.Logger
- type Cleaner
- type Client
- type Consumer
- func (c *Consumer) Channel() <-chan *func()
- func (c *Consumer) Forget(taskType string) error
- func (c *Consumer) Learn(taskType string, f HandlerFunc, override bool) error
- func (c *Consumer) Start(ctx context.Context) error
- func (c *Consumer) Stop() error
- func (c *Consumer) WithAutoDeleteOnSuccess(autoDeleteOnSuccess bool) *Consumer
- func (c *Consumer) WithChannelSize(channelSize int) *Consumer
- func (c *Consumer) WithLogger(logger Logger) *Consumer
- func (c *Consumer) WithMaxActiveTasks(maxActiveTasks int) *Consumer
- func (c *Consumer) WithPollInterval(pollInterval time.Duration) *Consumer
- func (c *Consumer) WithPollLimit(pollLimit int) *Consumer
- func (c *Consumer) WithPollStrategy(pollStrategy PollStrategy) *Consumer
- func (c *Consumer) WithQueues(queues ...string) *Consumer
- func (c *Consumer) WithVisibilityTimeout(visibilityTimeout time.Duration) *Consumer
- type HandlerFunc
- type IRepository
- type Inspector
- func (o *Inspector) Count(ctx context.Context, taskStatuses []TaskStatus, taskTypes, queues []string) (int64, error)
- func (o *Inspector) Delete(ctx context.Context, safeDelete bool, tasks ...*Task) error
- func (o *Inspector) Purge(ctx context.Context, safeDelete bool, taskStatuses []TaskStatus, ...) (int64, error)
- func (o *Inspector) Scan(ctx context.Context, taskStatuses []TaskStatus, taskTypes, queues []string, ...) ([]*Task, error)
- type Logger
- type Ordering
- type PollStrategy
- type Producer
- type Task
- type TaskStatus
- type TaskStatusGroup
Constants ¶
This section is empty.
Variables ¶
var ( ErrConsumerAlreadyRunning = errors.New("consumer has already been started") ErrConsumerAlreadyStopped = errors.New("consumer has already been stopped") ErrCouldNotActivateTasks = errors.New("a number of tasks could not be activated") ErrCouldNotPollTasks = errors.New("could not poll tasks") ErrCouldNotPingTasks = errors.New("could not ping tasks") ErrTaskTypeAlreadyLearned = errors.New("task with this type already learned") ErrTaskTypeNotFound = errors.New("task with this type not found") ErrTaskTypeNotKnown = errors.New("task with this type is not known by this consumer") ErrUnknownPollStrategy = errors.New("unknown poll strategy") ErrVisibilityTimeoutTooShort = errors.New("visibility timeout must be longer than poll interval") )
Collection of consumer errors.
Functions ¶
func NoopLogger ¶ added in v0.4.3
NoopLogger discards the log messages written to it.
Types ¶
type Cleaner ¶
type Cleaner struct {
// contains filtered or unexported fields
}
Cleaner is a service instance created by a Client with reference to that client and the task age limit parameter.
func (*Cleaner) Clean ¶
Clean will initiate the removal of finished (either succeeded or failed) tasks from the tasks table if they have been created long enough ago for them to be eligible.
func (*Cleaner) WithTaskAge ¶
WithTaskAge defines the minimum time duration that must have passed since the creation of a finished task in order for it to be eligible for cleanup when the Cleaner's Clean() method is called.
Default value: 15 minutes.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client wraps the tasq repository interface which is used by the different services to access the database.
func NewClient ¶
func NewClient(repository IRepository) *Client
NewClient creates a new tasq client instance with the provided tasq.
func (*Client) NewCleaner ¶
NewCleaner creates a new cleaner with a reference to the original tasq client.
func (*Client) NewConsumer ¶
NewConsumer creates a new consumer with a reference to the original tasq client and default consumer parameters.
func (*Client) NewInspector ¶ added in v1.1.0
NewInspector creates a new inspector with a reference to the original tasq client.
func (*Client) NewProducer ¶
NewProducer creates a new consumer with a reference to the original tasq client.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a service instance created by a Client with reference to that client and the various parameters that define the task consumption behaviour.
func (*Consumer) Channel ¶
func (c *Consumer) Channel() <-chan *func()
Channel returns a read-only channel where the polled jobs can be read from.
func (*Consumer) Forget ¶
Forget removes a handler function for the specified taskType from the map of learned handler functions. If the specified taskType does not exist, it'll return an error.
func (*Consumer) Learn ¶
func (c *Consumer) Learn(taskType string, f HandlerFunc, override bool) error
Learn sets a handler function for the specified taskType. If override is false and a handler function is already set for the specified taskType, it'll return an error.
func (*Consumer) Start ¶
Start launches the go routine which manages the pinging and polling of tasks for the consumer, or returns an error if the consumer is not properly configured.
func (*Consumer) Stop ¶
Stop sends the termination signal to the consumer so it'll no longer poll for news tasks.
func (*Consumer) WithAutoDeleteOnSuccess ¶
WithAutoDeleteOnSuccess sets whether successful tasks should be automatically deleted from the task queue by the consumer.
Default value: false.
func (*Consumer) WithChannelSize ¶
WithChannelSize sets the size of the buffered channel used for outputting the polled messages to.
Default value: 10.
func (*Consumer) WithLogger ¶
WithLogger sets the Logger interface that is used for event logging during task consumption.
Default value: NoopLogger.
func (*Consumer) WithMaxActiveTasks ¶
WithMaxActiveTasks sets the maximum number of tasks a consumer can have enqueued at the same time before polling for additional ones.
Default value: 10.
func (*Consumer) WithPollInterval ¶
WithPollInterval sets the interval at which the consumer will try and poll for new tasks to be executed must not be greater than or equal to visibility timeout.
Default value: 5 seconds.
func (*Consumer) WithPollLimit ¶
WithPollLimit sets the maximum number of messages polled from the task queue.
Default value: 10.
func (*Consumer) WithPollStrategy ¶
func (c *Consumer) WithPollStrategy(pollStrategy PollStrategy) *Consumer
WithPollStrategy sets the ordering to be used when polling for tasks from the task queue.
Default value: PollStrategyByCreatedAt.
func (*Consumer) WithQueues ¶
WithQueues sets the queues from which the consumer may poll for tasks.
Default value: empty slice of strings.
func (*Consumer) WithVisibilityTimeout ¶
WithVisibilityTimeout sets the duration by which each ping will extend a task's visibility timeout; Once this timeout is up, a consumer instance may receive the task again.
Default value: 15 seconds.
type HandlerFunc ¶ added in v1.0.4
HandlerFunc is the function signature for the handler functions that are used to process tasks.
type IRepository ¶ added in v1.0.0
type IRepository interface { Migrate(ctx context.Context) error PingTasks(ctx context.Context, taskIDs []uuid.UUID, visibilityTimeout time.Duration) ([]*Task, error) PollTasks(ctx context.Context, types, queues []string, visibilityTimeout time.Duration, ordering Ordering, limit int) ([]*Task, error) CleanTasks(ctx context.Context, minimumAge time.Duration) (int64, error) RegisterStart(ctx context.Context, task *Task) (*Task, error) RegisterError(ctx context.Context, task *Task, errTask error) (*Task, error) RegisterFinish(ctx context.Context, task *Task, finishStatus TaskStatus) (*Task, error) SubmitTask(ctx context.Context, task *Task) (*Task, error) DeleteTask(ctx context.Context, task *Task, safeDelete bool) error RequeueTask(ctx context.Context, task *Task) (*Task, error) ScanTasks(ctx context.Context, taskStatuses []TaskStatus, taskTypes, queues []string, ordering Ordering, limit int) ([]*Task, error) CountTasks(ctx context.Context, taskStatuses []TaskStatus, taskTypes, queues []string) (int64, error) PurgeTasks(ctx context.Context, taskStatuses []TaskStatus, taskTypes, queues []string, safeDelete bool) (int64, error) }
IRepository describes the mandatory methods a repository must implement in order for tasq to be able to use it.
type Inspector ¶ added in v1.1.0
type Inspector struct {
// contains filtered or unexported fields
}
Inspector is a service instance created by a Client with reference to that client with the purpose of enabling the observation of tasks.
func (*Inspector) Count ¶ added in v1.1.0
func (o *Inspector) Count(ctx context.Context, taskStatuses []TaskStatus, taskTypes, queues []string) (int64, error)
Count returns a the total number of tasks based on the supplied filter arguments.
type Ordering ¶ added in v1.0.0
type Ordering int
Ordering is an enum type describing the polling strategy utitlized during the polling process.
type PollStrategy ¶
type PollStrategy string
PollStrategy is the label assigned to the ordering by which tasks are polled for consumption.
const ( PollStrategyByCreatedAt PollStrategy = "pollByCreatedAt" // Poll by oldest tasks first PollStrategyByPriority PollStrategy = "pollByPriority" // Poll by highest priority task first )
Collection of pollStrategies.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer is a service instance created by a Client with reference to that client with the purpose of enabling the submission of new tasks.
type Task ¶
type Task struct { ID uuid.UUID Type string Args []byte Queue string Priority int16 Status TaskStatus ReceiveCount int32 MaxReceives int32 LastError *string CreatedAt time.Time StartedAt *time.Time FinishedAt *time.Time VisibleAt time.Time }
Task is the struct used to represent an atomic task managed by tasq.
func NewTask ¶ added in v1.0.0
func NewTask(taskType string, taskArgs any, queue string, priority int16, maxReceives int32) (*Task, error)
NewTask creates a new Task struct based on the supplied arguments required to define it.
func (*Task) IsLastReceive ¶ added in v1.0.1
IsLastReceive returns true if the task has reached its maximum number of receives.
func (*Task) SetVisibility ¶ added in v1.0.1
SetVisibility sets the time at which the task will become visible again.
func (*Task) UnmarshalArgs ¶
UnmarshalArgs decodes the task arguments into the passed target interface.
type TaskStatus ¶ added in v1.0.0
type TaskStatus string
TaskStatus is an enum type describing the status a task is currently in.
const ( StatusNew TaskStatus = "NEW" StatusEnqueued TaskStatus = "ENQUEUED" StatusInProgress TaskStatus = "IN_PROGRESS" StatusSuccessful TaskStatus = "SUCCESSFUL" StatusFailed TaskStatus = "FAILED" )
The collection of possible task statuses.
func GetTaskStatuses ¶ added in v1.0.0
func GetTaskStatuses(taskStatusGroup TaskStatusGroup) []TaskStatus
GetTaskStatuses returns a slice of TaskStatuses based on the TaskStatusGroup passed as an argument.
type TaskStatusGroup ¶ added in v1.0.0
type TaskStatusGroup int
TaskStatusGroup is an enum type describing the key used in the map of TaskStatuses which groups them for different purposes.
const ( AllTasks TaskStatusGroup = iota OpenTasks FinishedTasks )
The collection of possible task status groupings.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
_examples
|
|
repository
|
|
mysql
Package mysql provides the implementation of a tasq repository in MySQL
|
Package mysql provides the implementation of a tasq repository in MySQL |
postgres
Package postgres provides the implementation of a tasq repository in PostgreSQL
|
Package postgres provides the implementation of a tasq repository in PostgreSQL |