tasq

package module
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2022 License: MIT Imports: 12 Imported by: 0

README

godoc for greencoda/tasq Build Status Go 1.19 Go Report card

tasq

Tasq is Golang task queue using SQL database for persistence (currently supporting PostgreSQL only)

Install

go get -u github.com/greencoda/tasq

Usage Example

To try tasq locally, you'll need a PostgreSQL DB backend. You may use the supplied docker-compose.yml file to start a local instace

docker-compose -f example/docker-compose.yml up -d

Afterwards simply run the example.go file

go run _example/example.go

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	NoopLogger = log.New(io.Discard, "", 0) // discards the log messages written to it
)

Functions

func NewRepository

func NewRepository(dataSource any, driver, prefix string, migrate bool, migrationTimeout time.Duration) (repository repository.IRepository, err error)

NewRepository creates a repository instance for the provided sql driver, and optionally migrates the required type and table the argument datasource may be an initiated *sql.DB instance or the dsn string prefix speficies a prefix for both the used task status enum type and the table used (e.g.: "tasq" will result in the table name "tasq_tasks")

Types

type Cleaner

type Cleaner struct {
	// contains filtered or unexported fields
}

Cleaner is a service instance created by a Client with reference to that client and the task age limit parameter

func (*Cleaner) Clean

func (c *Cleaner) Clean() (int64, error)

Clean will initiate the removal of finished (either succeeded or failed) tasks from the queue table if they have been created long enough ago for them to be eligible

func (*Cleaner) WithTaskAge

func (c *Cleaner) WithTaskAge(taskAge time.Duration) *Cleaner

WithTaskAge defines the minimum time duration that must have passed since the creation of a finished task in order for it to be eligible for cleanup when the Cleaner's Clean() method is called.

default value: 15 minutes

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client wraps the tasq repository interface which is used by the different services to access the database

func NewClient

func NewClient(ctx context.Context, repository repository.IRepository) *Client

NewClient creates a new tasq client instance with the provided tasq repository

func (*Client) NewCleaner

func (c *Client) NewCleaner() *Cleaner

NewCleaner creates a new cleaner with a reference to the original tasq client

func (*Client) NewConsumer

func (c *Client) NewConsumer() *Consumer

NewCleaner creates a new consumer with a reference to the original tasq client and default consumer parameters

func (*Client) NewProducer

func (c *Client) NewProducer() *Producer

NewCleaner creates a new consumer with a reference to the original tasq client

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a service instance created by a Client with reference to that client and the various parameters that define the task consumption behaviour

func (*Consumer) Channel

func (c *Consumer) Channel() <-chan *func()

Channel returns a read-only channel where the polled jobs can be read from

func (*Consumer) Forget

func (c *Consumer) Forget(taskType string) error

Forget removes a handler function for the specified taskType from the map of learned handler functions; If the specified taskType does not exist, it'll return an error

func (*Consumer) Learn

func (c *Consumer) Learn(taskType string, f handlerFunc, override bool) error

Learn sets a handler function for the specified taskType; If override is false and a handler function is already set for the specified taskType, it'll return an error

func (*Consumer) Start

func (c *Consumer) Start() error

Start launches the go routine which manages the pinging and polling of tasks for the consumer, or returns an error if the consumer is not properly configured

func (*Consumer) Stop

func (c *Consumer) Stop() error

Stop sends the termination signal to the consumer so it'll no longer poll for news tasks

func (*Consumer) WithAutoDeleteOnSuccess

func (c *Consumer) WithAutoDeleteOnSuccess(autoDeleteOnSuccess bool) *Consumer

WithAutoDeleteOnSuccess sets whether successful tasks should be automatically deleted from the task queue by the consumer

default value: false

func (*Consumer) WithChannelSize

func (c *Consumer) WithChannelSize(channelSize int) *Consumer

WithChannelSize sets the size of the buffered channel used for outputting the polled messages to

default value: 10

func (*Consumer) WithLogger

func (c *Consumer) WithLogger(logger Logger) *Consumer

WithLogger sets the Logger interface that is used for event logging during task consumption

default value: NoopLogger

func (*Consumer) WithMaxActiveTasks

func (c *Consumer) WithMaxActiveTasks(maxActiveTasks int) *Consumer

WithMaxActiveTasks sets the maximum number of tasks a consumer can have enqueued at the same time before polling for additional ones

default value: 10

func (*Consumer) WithPollInterval

func (c *Consumer) WithPollInterval(pollInterval time.Duration) *Consumer

WithPollInterval sets the interval at which the consumer will try and poll for new tasks to be executed must not be greater than or equal to visibility timeout

default value: 5 seconds

func (*Consumer) WithPollLimit

func (c *Consumer) WithPollLimit(pollLimit int) *Consumer

WithPollLimit sets the maximum number of messages polled from the task queue

default value: 10

func (*Consumer) WithPollStrategy

func (c *Consumer) WithPollStrategy(pollStrategy PollStrategy) *Consumer

WithPollStrategy sets the ordering to be used when polling for tasks from the task queue

default value: PollStrategyByCreatedAt

func (*Consumer) WithQueues

func (c *Consumer) WithQueues(queues ...string) *Consumer

WithQueues sets the queues from which the consumer may poll for tasks

default value: empty slice of strings

func (*Consumer) WithVisibilityTimeout

func (c *Consumer) WithVisibilityTimeout(visibilityTimeout time.Duration) *Consumer

WithVisibilityTimeout sets the duration by which each ping will extend a task's visibility timeout; Once this timeout is up, a consumer instance may receive the task again

default value: 15 seconds

type Logger

type Logger interface {
	Print(v ...any)
	Printf(format string, v ...any)
}

Logger is the interface used for event logging during task consumption

type PollStrategy

type PollStrategy string

PollStrategy is the label assigned to the ordering by which tasks are polled for consumption

const (
	PollStrategyByCreatedAt PollStrategy = "pollByCreatedAt" // Poll by oldest tasks first
	PollStrategyByPriority  PollStrategy = "pollByPriority"  // Poll by highest priority task first
)

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Cleaner is a service instance created by a Client with reference to that client with the purpose of enabling the submission of new tasks

func (*Producer) Submit

func (p *Producer) Submit(taskType string, taskArgs any, queue string, priority int16, maxReceives int32) (submittedTask Task, err error)

Submit constructs and submits a new task to the queue based on the supplied arguments

type Task

type Task interface {
	GetDetails() *model.Task
	UnmarshalArgs(v any) error
}

Task is the public interface for accessing details of the consumed task and unmarshaling its arguments so the handler function can use it

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL