taskmanager

package
v0.4.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2023 License: Apache-2.0 Imports: 12 Imported by: 3

Documentation

Overview

Package taskmanager is the task/job management service layer for concurrent and asynchronous tasks

Index

Constants

This section is empty.

Variables

View Source
var ErrEngineNotSupported = errors.New("engine not supported")

ErrEngineNotSupported is when a feature is not supported by another engine

View Source
var ErrInvalidTaskDuration = errors.New("invalid duration for task")

ErrInvalidTaskDuration is when the task duration is invalid

View Source
var ErrMissingFactory = errors.New("missing factory type to load taskq")

ErrMissingFactory is when the factory type is missing or empty

View Source
var ErrMissingRedis = errors.New("missing redis connection")

ErrMissingRedis is when the Redis connection is missing prior to loading taskq

View Source
var ErrMissingTaskName = errors.New("missing task name")

ErrMissingTaskName is when the task name is missing

View Source
var ErrMissingTaskQConfig = errors.New("missing taskq configuration")

ErrMissingTaskQConfig is when the taskq configuration is missing prior to loading taskq

View Source
var ErrNoEngine = errors.New("task manager engine is empty: choose taskq or machinery (IE: WithTaskQ())")

ErrNoEngine is returned when there is no engine set (missing engine)

View Source
var ErrNoTasksFound = errors.New("no tasks found")

ErrNoTasksFound is when there are no tasks found in the taskmanager

View Source
var ErrTaskNotFound = errors.New("task not found")

ErrTaskNotFound is when a task was not found

Functions

func DefaultTaskQConfig

func DefaultTaskQConfig(name string) *taskq.QueueOptions

DefaultTaskQConfig will return a default configuration that can be modified

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is the taskmanager client (configuration)

func (*Client) Close

func (c *Client) Close(ctx context.Context) error

Close will close client and any open connections

func (*Client) Debug

func (c *Client) Debug(on bool)

Debug will set the debug flag

func (*Client) DebugLog

func (c *Client) DebugLog(text string)

DebugLog will display verbose logs

func (*Client) Engine

func (c *Client) Engine() Engine

Engine will return the engine that is set

func (*Client) Factory

func (c *Client) Factory() Factory

Factory will return the factory that is set

func (*Client) GetTxnCtx

func (c *Client) GetTxnCtx(ctx context.Context) context.Context

GetTxnCtx will check for an existing transaction

func (*Client) IsDebug

func (c *Client) IsDebug() bool

IsDebug will return if debugging is enabled

func (*Client) IsNewRelicEnabled

func (c *Client) IsNewRelicEnabled() bool

IsNewRelicEnabled will return if new relic is enabled

func (*Client) RegisterTask

func (c *Client) RegisterTask(task *Task) error

RegisterTask is a universal method to register a task

func (*Client) ResetCron

func (c *Client) ResetCron()

ResetCron will reset the cron scheduler and all loaded tasks

func (*Client) RunTask

func (c *Client) RunTask(ctx context.Context, options *TaskOptions) error

RunTask is a universal method to run a task

func (*Client) Tasks

func (c *Client) Tasks() map[string]*taskq.Task

Tasks will return the list of tasks

type ClientInterface

type ClientInterface interface {
	TaskService
	Close(ctx context.Context) error
	Debug(on bool)
	Engine() Engine
	Factory() Factory
	GetTxnCtx(ctx context.Context) context.Context
	IsDebug() bool
	IsNewRelicEnabled() bool
}

ClientInterface is the taskmanager client interface

func NewClient

func NewClient(_ context.Context, opts ...ClientOps) (ClientInterface, error)

NewClient creates a new client for all TaskManager functionality

If no options are given, it will use the defaultClientOptions() ctx may contain a NewRelic txn (or one will be created)

type ClientOps

type ClientOps func(c *clientOptions)

ClientOps allow functional options to be supplied that overwrite default client options.

func WithCronService added in v0.2.14

func WithCronService(cronService CronService) ClientOps

WithCronService will set the cron service

func WithDebugging

func WithDebugging() ClientOps

WithDebugging will enable debugging mode

func WithLogger

func WithLogger(customLogger zLogger.GormLoggerInterface) ClientOps

WithLogger will set the custom logger interface

func WithNewRelic

func WithNewRelic() ClientOps

WithNewRelic will enable the NewRelic wrapper

func WithTaskQ

func WithTaskQ(config *taskq.QueueOptions, factory Factory) ClientOps

WithTaskQ will use the TaskQ engine

type CronService added in v0.2.14

type CronService interface {
	AddFunc(spec string, cmd func()) (int, error)
	New()
	Start()
	Stop()
}

CronService is the cron service provider

type Engine

type Engine string

Engine is the different types of task manager's that are supported

const (
	Empty     Engine = "empty"
	Machinery Engine = "machinery"
	TaskQ     Engine = "taskq"
)

Supported engines

func (Engine) IsEmpty

func (e Engine) IsEmpty() bool

IsEmpty will return true if the engine is not set

func (Engine) String

func (e Engine) String() string

String is the string version of engine

type Factory

type Factory string

Factory is the different types of task factories that are supported

const (
	FactoryEmpty  Factory = "empty"
	FactoryMemory Factory = "memory"
	FactoryRedis  Factory = "redis"
)

Supported factories

func (Factory) IsEmpty

func (f Factory) IsEmpty() bool

IsEmpty will return true if the factory is not set

func (Factory) String

func (f Factory) String() string

String is the string version of factory

type Task

type Task struct {
	Name string // Task name.

	// Function called to process a message.
	// There are three permitted types of signature:
	// 1. A zero-argument function
	// 2. A function whose arguments are assignable in type from those which are passed in the message
	// 3. A function which takes a single `*Message` argument
	// The handler function may also optionally take a Context as a first argument and may optionally return an error.
	// If the handler takes a Context, when it is invoked it will be passed the same Context as that which was passed to
	// `StartConsumer`. If the handler returns a non-nil error the message processing will fail and will be retried/.
	Handler interface{}
	// Function called to process failed message after the specified number of retries have all failed.
	// The FallbackHandler accepts the same types of function as the Handler.
	FallbackHandler interface{}

	// Optional function used by Consumer with defer statement to recover from panics.
	DeferFunc func()

	// Number of tries/releases after which the message fails permanently and is deleted. Default is 64 retries.
	RetryLimit int

	// Minimum backoff time between retries. Default is 30 seconds.
	MinBackoff time.Duration

	// Maximum backoff time between retries. Default is 30 minutes.
	MaxBackoff time.Duration
}

Task is the options for a new task (mimics TaskQ)

type TaskOptions

type TaskOptions struct {
	Arguments      []interface{} `json:"arguments"`        // Arguments for the task
	Delay          time.Duration `json:"delay"`            // Run after X delay
	OnceInPeriod   time.Duration `json:"once_in_period"`   // Run once in X period
	RunEveryPeriod time.Duration `json:"run_every_period"` // Cron job!
	TaskName       string        `json:"task_name"`        // Name of the task
}

TaskOptions are used for running a task

type TaskService

type TaskService interface {
	RegisterTask(task *Task) error
	ResetCron()
	RunTask(ctx context.Context, options *TaskOptions) error
	Tasks() map[string]*taskq.Task
}

TaskService is the task related methods

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL