Documentation ¶
Index ¶
- Variables
- func NewRepository(dataSource any, driver, prefix string, migrate bool, ...) (repository repository.IRepository, err error)
- type Cleaner
- type Client
- type Consumer
- func (c *Consumer) Channel() <-chan *func()
- func (c *Consumer) Forget(taskType string) error
- func (c *Consumer) Learn(taskType string, f handlerFunc, override bool) error
- func (c *Consumer) Start() error
- func (c *Consumer) Stop() error
- func (c *Consumer) WithAutoDeleteOnSuccess(autoDeleteOnSuccess bool) *Consumer
- func (c *Consumer) WithChannelSize(channelSize int) *Consumer
- func (c *Consumer) WithLogger(logger Logger) *Consumer
- func (c *Consumer) WithMaxActiveTasks(maxActiveTasks int) *Consumer
- func (c *Consumer) WithPollInterval(pollInterval time.Duration) *Consumer
- func (c *Consumer) WithPollLimit(pollLimit int) *Consumer
- func (c *Consumer) WithPollStrategy(pollStrategy PollStrategy) *Consumer
- func (c *Consumer) WithQueues(queues ...string) *Consumer
- func (c *Consumer) WithVisibilityTimeout(visibilityTimeout time.Duration) *Consumer
- type Logger
- type PollStrategy
- type Producer
- type Task
Constants ¶
This section is empty.
Variables ¶
var (
NoopLogger = log.New(io.Discard, "", 0) // discards the log messages written to it
)
Functions ¶
func NewRepository ¶
func NewRepository(dataSource any, driver, prefix string, migrate bool, migrationTimeout time.Duration) (repository repository.IRepository, err error)
NewRepository creates a repository instance for the provided sql driver, and optionally migrates the required type and table the argument datasource may be an initiated *sql.DB instance or the dsn string prefix speficies a prefix for both the used task status enum type and the table used (e.g.: "tasq" will result in the table name "tasq_tasks")
Types ¶
type Cleaner ¶
type Cleaner struct {
// contains filtered or unexported fields
}
Cleaner is a service instance created by a Client with reference to that client and the task age limit parameter
func (*Cleaner) Clean ¶
Clean will initiate the removal of finished (either succeeded or failed) tasks from the queue table if they have been created long enough ago for them to be eligible
func (*Cleaner) WithTaskAge ¶
WithTaskAge defines the minimum time duration that must have passed since the creation of a finished task in order for it to be eligible for cleanup when the Cleaner's Clean() method is called.
default value: 15 minutes
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client wraps the tasq repository interface which is used by the different services to access the database
func NewClient ¶
func NewClient(ctx context.Context, repository repository.IRepository) *Client
NewClient creates a new tasq client instance with the provided tasq repository
func (*Client) NewCleaner ¶
NewCleaner creates a new cleaner with a reference to the original tasq client
func (*Client) NewConsumer ¶
NewCleaner creates a new consumer with a reference to the original tasq client and default consumer parameters
func (*Client) NewProducer ¶
NewCleaner creates a new consumer with a reference to the original tasq client
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a service instance created by a Client with reference to that client and the various parameters that define the task consumption behaviour
func (*Consumer) Channel ¶
func (c *Consumer) Channel() <-chan *func()
Channel returns a read-only channel where the polled jobs can be read from
func (*Consumer) Forget ¶
Forget removes a handler function for the specified taskType from the map of learned handler functions; If the specified taskType does not exist, it'll return an error
func (*Consumer) Learn ¶
Learn sets a handler function for the specified taskType; If override is false and a handler function is already set for the specified taskType, it'll return an error
func (*Consumer) Start ¶
Start launches the go routine which manages the pinging and polling of tasks for the consumer, or returns an error if the consumer is not properly configured
func (*Consumer) Stop ¶
Stop sends the termination signal to the consumer so it'll no longer poll for news tasks
func (*Consumer) WithAutoDeleteOnSuccess ¶
WithAutoDeleteOnSuccess sets whether successful tasks should be automatically deleted from the task queue by the consumer
default value: false
func (*Consumer) WithChannelSize ¶
WithChannelSize sets the size of the buffered channel used for outputting the polled messages to
default value: 10
func (*Consumer) WithLogger ¶
WithLogger sets the Logger interface that is used for event logging during task consumption
default value: NoopLogger
func (*Consumer) WithMaxActiveTasks ¶
WithMaxActiveTasks sets the maximum number of tasks a consumer can have enqueued at the same time before polling for additional ones
default value: 10
func (*Consumer) WithPollInterval ¶
WithPollInterval sets the interval at which the consumer will try and poll for new tasks to be executed must not be greater than or equal to visibility timeout
default value: 5 seconds
func (*Consumer) WithPollLimit ¶
WithPollLimit sets the maximum number of messages polled from the task queue
default value: 10
func (*Consumer) WithPollStrategy ¶
func (c *Consumer) WithPollStrategy(pollStrategy PollStrategy) *Consumer
WithPollStrategy sets the ordering to be used when polling for tasks from the task queue
default value: PollStrategyByCreatedAt
func (*Consumer) WithQueues ¶
WithQueues sets the queues from which the consumer may poll for tasks
default value: empty slice of strings
func (*Consumer) WithVisibilityTimeout ¶
WithVisibilityTimeout sets the duration by which each ping will extend a task's visibility timeout; Once this timeout is up, a consumer instance may receive the task again
default value: 15 seconds
type PollStrategy ¶
type PollStrategy string
PollStrategy is the label assigned to the ordering by which tasks are polled for consumption
const ( PollStrategyByCreatedAt PollStrategy = "pollByCreatedAt" // Poll by oldest tasks first PollStrategyByPriority PollStrategy = "pollByPriority" // Poll by highest priority task first )