gta

package module
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2021 License: MIT Imports: 18 Imported by: 0

README

GTA - Go Task Async

A lightweight and reliable asynchronous task and transaction message library for Golang.  [🇺🇸English | 🇨🇳中文]

Go Report Card GitHub Workflow Status Travis (.com) Coverage GitHub issues Release GitHub license

Overview

GTA (Go Task Async) is a lightweight and reliable asynchronous task and transaction message library for by golang. The framework has the following characteristics:

  • High reliability: ensure the scheduling and execution of asynchronous tasks At Least Once, and the status of all submitted tasks can be traced back
  • Flexible configuration: it provides a number of simple and easy-to-use optional configuration items, which can better fit the needs of different situations
  • Allow to submit multiple tasks: allow to submit multiple tasks in the same transaction (it is not guaranteed that the tasks will be executed in the order of submission)
  • Allow to submit nested tasks: allow to submit new asynchronous tasks among submitted tasks (ensure that tasks are executed in the order of submission)
  • Multiple scheduling methods: one is low latency scheduling similar to 'Commit Hook' mechanism and the other is preemptive scheduling based on scan mechanism. The former gives priority to the current instance, while the latter's scheduling right depends on the result of multi instance competition
  • Built in tasks: provide multiple built-in tasks running on this framework for abnormal task monitoring, historical task cleaning, etc
  • Graceful stop: provide graceful stop mechanism, try not to let the running task be stopped violently when the instance exits
  • Pooling: the bottom layer uses the goroutine pool to run asynchronous tasks, and the size of the coroutine pool can be configured
  • Lightweight: external dependence has and only has GORM and relational database

Users can submit, schedule, execute and monitor asynchronous tasks through this framework. It relies on relational database to ensure the reliability and traceability of asynchronous tasks. It can be used in various situations that need to ensure the successful execution of tasks (try our best to ensure the success, unless the task itself or external resources are abnormal).

In addition, the framework allows asynchronous tasks to be submitted in a transaction to ensure the strong correlation between the task and the transaction. That is, if the transaction fails to roll back, the asynchronous task will not be executed. If the transaction is successfully submitted, the asynchronous task will be executed. Therefore, it is also an implementation of transaction message.

Install

go get -u github.com/ycydsxy/gta

Getting Started

package main

import (
	"context"
	"time"

	"github.com/sirupsen/logrus"
	"github.com/ycydsxy/gta"
	"gorm.io/driver/sqlite"
	"gorm.io/gorm"
	"gorm.io/gorm/logger"
)

func main() {
	// database and task table(please refer to model.sql for table schema) should be prepared first
	// here is for test only, don't use in production
	db, err := gorm.Open(sqlite.Open("test.db"), &gorm.Config{Logger: logger.Default.LogMode(logger.Silent)})
	if err != nil {
		panic(err)
	}
	if err = db.AutoMigrate(&gta.Task{}); err != nil {
		panic(err)
	}

	// start gta
	gta.StartWithOptions(db, "tasks")
	defer gta.Stop(true)

	// register a certain async task
	gta.Register("foo_task", gta.TaskDefinition{
		Handler: func(ctx context.Context, arg interface{}) (err error) {
			time.Sleep(time.Second)
			logrus.Warn("task done")
			return nil
		},
	})

	// run simple async task
	if err := gta.Run(context.TODO(), "foo_task", nil); err != nil {
		logrus.Errorf("error in async task, err: %v", err)
	}

	// run async task in transaction
	if err := gta.Transaction(func(tx *gorm.DB) error {
		if err := gta.RunWithTx(tx, context.TODO(), "foo_task", nil); err != nil {
			return err
		}
		return nil
	}); err != nil {
		logrus.Errorf("error in transaction with async task, err: %v", err)
	}
}

Configuration

Global optional configuration

When calling StartWithOptions or NewTaskManager, one or more optional configurations can be specified according to the incoming order. If the configuration name is XXX, the configuration can be specified with WithXXX. For example, the following code can specify that the pool size is 10 and the dry run flag is true:

gta.StartWithOptions(db, "tasks", gta.WithPoolSize(10), gta.WithDryRun(true))

All optional configurations and default values are as follows:

name type default value meaning
Context context. Context context.Background() root context, used for the framework itself
LoggerFactory func(ctx context.Context) Logger defaultLoggerFactory log factory method for log printing
StorageTimeout time.Duration 1 week determines how long a completed task will be cleaned up
InitializedTimeout time.Duration 5 minutes determines how long an initialized task will be considered abnormal
RunningTimeout time.Duration 30 minutes determines how long an ongoing task will be considered abnormal
WaitTimeout time.Duration waiting all the time determines the longest execution time of the Stop function when a task is running
ScanInterval time.Duration 5 seconds determines the speed of scanning initialized task under normal circumstances
InstantScanInvertal time. Duration 100 ms determines the scan speed when there are unprocessed initialized tasks
CtxMarshaler CtxMarshaler defaultCtxMarshaler determines how context is serialized
CheckCallback func(logger Logger, abnormalTasks []Task) defaultcheckcallback determines how to handle the detected abnormal task
DryRun bool false dry run flag is used to test and determines whether to run without relying on the database
PoolSize int math.MaxInt32 determines how many goroutines can be used to run tasks

Single task definition

When calling Register for task registration, you need to pass in the corresponding task definition, as follows:

name type default value meaning
Handler func(ctx context.Context, arg interface{}) (err error) no required, task handler
ArgType reflect.Type nil determines the actual type of arg in the task processing function. If it is empty, the type of arg is map[string]interface{}
CtxMarshaler CtxMarshaler global CtxMarshaler determines how to serialize the context.context of a task
RetryTimes int 0 the maximum number of retries when a task fails. Tasks exceeding this value will be marked as failed
RetryInterval func(times int) time.Duration 1 second the interval between two retries of task execution error
CleanSucceeded bool false whether to clear the task record immediately after the success. If so, the task record will be cleared immediately after succeeded
InitTimeoutSensitive bool false determines whether the task is sensitive to InitializedTimeout. If so, it cannot be scanned and scheduled after InitializedTimeout

Frequently asked questions

What is an abnormal task? How to detect abnormal tasks?

Abnormal tasks include tasks that have not been scheduled for a long time, tasks that have timed out, or tasks that have been aborted due to non graceful shutdown

Abnormal tasks will be detected by the built-in tasks executed regularly, and the configured CheckCallback will be called. By default, the number of abnormal tasks and the corresponding ID will be printed through the log

Will the pool block when it is full?

In the current design, the steps of submitting tasks are not blocked. When the pool is full, the submitted tasks will be transferred to other instances for execution, and the scanning mechanism will be suspended; When all the instance pools are full, the tasks will be overstocked in the database

Is there a delay in scheduling asynchronous tasks?

If it is based on the Commit Hook mechanism, there is almost no delay, such as calling Run in the case of sufficient co pool or calling RunWithTx in the built-in Transaction.

If the pool is full, or the RunWithTx is invoked in a non built Transaction, then the asynchronous task is scheduled based on the scan mechanism. At this time, the schedule is delayed. The delay time is the time required for all instances to compete and schedule the task, and is related to the scan interval, the pool idle time and the backlog of tasks.

The task consumption ability of scanning mechanism?

The maximum consumption capacity is N*1/InstantScanInterval tasks per second, where n is the number of instances, instantscaninterval is the fast scan interval, and the consumption capacity of a single instance is set to 10/s by default. The scheduling ability of scanning mechanism is limited, and it is an auxiliary scheduling method. Reducing InstantScanInterval can improve the consumption ability, but it will also increase the database pressure. Therefore, under normal circumstances, we should try to use commit hook mechanism

How to handle abnormal and failed tasks?

Under normal circumstances, the exception and failure of a task are small probability events. If the exception and failure of a task are caused by some factors (such as external resource exception, abnormal downtime, etc.), it can be rescheduled manually with corresponding APIs provided by TaskManager, such as ForceRerunTasks and QueryUnsuccessfulTasks

How to test?

You can use WithDryRun(true) to make the framework enter dry running mode to avoid the data impact caused by reading and writing task tables of other instances. In this mode, the framework will not read and write task tables, nor record task status and other information

License

MIT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrZeroRowsAffected represents zero rows affected in a database operation.
	ErrZeroRowsAffected = errors.New("zero rows affected")
	// ErrUnexpected represents unexpected error occurred.
	ErrUnexpected = errors.New("unexpected")
	// ErrTaskNotFound represents certain task not found.
	ErrTaskNotFound = errors.New("task not found")

	// ErrOption represents option is invalid.
	ErrOption = errors.New("option invalid")

	// ErrDefNilHandler represents Handler in the task definition is nil.
	ErrDefNilHandler = errors.New("definition handler is nil")
	// ErrDefEmptyPrimaryKey represents primaryKey in the task definition is empty.
	ErrDefEmptyPrimaryKey = errors.New("definition primary key is empty")
	// ErrDefInvalidLoopInterval represents loopInterval in the task definition is invalid.
	ErrDefInvalidLoopInterval = errors.New("definition loop interval is invalid")
	// ErrDefInvalidArgument represents argument in the task definition is invalid.
	ErrDefInvalidArgument = errors.New("definition argument is invalid")
)

Functions

func Register

func Register(key TaskKey, definition TaskDefinition)

Register binds a task definition to a certain task key.

func Run

func Run(ctx context.Context, key TaskKey, arg interface{}) error

Run provides the ability to asynchronously run a registered task reliably.

func RunWithTx

func RunWithTx(tx *gorm.DB, ctx context.Context, key TaskKey, arg interface{}) error

RunWithTx makes it possible to create a task along with other database operations in the same transaction.

func StartWithOptions

func StartWithOptions(db *gorm.DB, table string, options ...Option)

StartWithOptions configures the default task manager and starts it. This function should be called before any other function is called.

func Stop

func Stop(wait bool)

Stop provides the ability to gracefully stop current running tasks.

func Transaction

func Transaction(fc func(tx *gorm.DB) error) (err error)

Transaction wraps the 'Transaction' function of *gorm.DB

Types

type CtxMarshaler

type CtxMarshaler interface {
	MarshalCtx(ctx context.Context) ([]byte, error)
	UnmarshalCtx(bytes []byte) (context.Context, error)
}

CtxMarshaler is used to marshal or unmarshal context.

type Logger

type Logger interface {
	Printf(format string, args ...interface{})
	Infof(format string, args ...interface{})
	Warnf(format string, args ...interface{})
	Errorf(format string, args ...interface{})
}

Logger is a logging interface for logging necessary messages.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option is a interface.

func WithCheckCallback

func WithCheckCallback(f func(logger Logger, abnormalTasks []Task)) Option

WithCheckCallback set the checkCallback option.

func WithContext

func WithContext(ctx context.Context) Option

WithContext set the context option.

func WithCtxMarshaler

func WithCtxMarshaler(m CtxMarshaler) Option

WithCtxMarshaler set the ctxMarshaler option.

func WithDryRun

func WithDryRun(flag bool) Option

WithDryRun set the dryRun option.

func WithInitializedTimeout

func WithInitializedTimeout(d time.Duration) Option

WithInitializedTimeout set the initializedTimeout option.

func WithInstantScanInterval

func WithInstantScanInterval(d time.Duration) Option

WithInstantScanInterval set the instantScanInterval option.

func WithLoggerFactory

func WithLoggerFactory(f func(ctx context.Context) Logger) Option

WithLoggerFactory set the loggerFactory option.

func WithPoolSize

func WithPoolSize(size int) Option

WithPoolSize set the poolSize option.

func WithRunningTimeout

func WithRunningTimeout(d time.Duration) Option

WithRunningTimeout set the runningTimeout option.

func WithScanInterval

func WithScanInterval(d time.Duration) Option

WithScanInterval set the scanInterval option.

func WithStorageTimeout

func WithStorageTimeout(d time.Duration) Option

WithStorageTimeout set the storageTimeout option.

func WithWaitTimeout

func WithWaitTimeout(d time.Duration) Option

WithWaitTimeout set the waitTimeout option.

type Task

type Task struct {
	ID         uint64
	TaskKey    TaskKey
	TaskStatus TaskStatus
	Context    []byte
	Argument   []byte
	Extra      TaskExtra
	CreatedAt  time.Time
	UpdatedAt  time.Time
}

Task is an entity in database.

type TaskDefinition

type TaskDefinition struct {
	// must provide, task handler
	Handler TaskHandler

	// optional, task argument type in the handler
	ArgType reflect.Type
	// optional, to replace default config
	CtxMarshaler CtxMarshaler
	// optional, max retry times before fail
	RetryTimes int
	// optional, retry interval
	RetryInterval func(times int) time.Duration
	// optional, determine whether the task will be cleaned immediately once succeeded
	CleanSucceeded bool
	// optional, determine whether the initialized task can still be scheduled after timeout
	InitTimeoutSensitive bool
	// contains filtered or unexported fields
}

TaskDefinition is a definition of a certain task

type TaskExtra

type TaskExtra struct{}

TaskExtra contains other information of a task.

func (*TaskExtra) Scan

func (s *TaskExtra) Scan(v interface{}) error

Scan implements Scanner.

func (TaskExtra) Value

func (s TaskExtra) Value() (driver.Value, error)

Value implements Valuer.

type TaskHandler

type TaskHandler func(ctx context.Context, arg interface{}) (err error)

TaskHandler is a handler to a certain task

type TaskKey

type TaskKey string

TaskKey is a unique ID for a set of tasks with same definition.

type TaskManager

type TaskManager struct {
	// contains filtered or unexported fields
}

TaskManager is the overall processor of task, which includes scheduler, scanner and other components

func DefaultManager

func DefaultManager() *TaskManager

DefaultManager returns the default task manager.

func NewTaskManager

func NewTaskManager(db *gorm.DB, table string, options ...Option) *TaskManager

NewTaskManager generates a new instance of TaskManager.

The database and task table must be provided because this tool relies heavily on the database. For more information about the table schema, please refer to 'model.sql'.

func (*TaskManager) ForceRerunTasks

func (s *TaskManager) ForceRerunTasks(taskIDs []uint64, status TaskStatus) (int64, error)

ForceRerunTasks changes specific tasks to 'initialized'.

func (*TaskManager) QueryUnsuccessfulTasks

func (s *TaskManager) QueryUnsuccessfulTasks(limit, offset int) ([]Task, error)

QueryUnsuccessfulTasks checks initialized, running or failed tasks.

func (*TaskManager) Register

func (s *TaskManager) Register(key TaskKey, definition TaskDefinition)

Register binds a task definition to a certain task key. Tasks of same type usually have the same task key.

Task key is a unique ID for a set of tasks with same definition. Task handler should be idempotent because a task may be scheduled more than once in some cases.

Handler must be provided in the task definition. It would be better to provide the argument type additionally, unless you want to use the default argument type(i.e. map[string]interface{} for struct) inside the handler.

func (*TaskManager) Run

func (s *TaskManager) Run(ctx context.Context, key TaskKey, arg interface{}) error

Run provides the ability to asynchronously run a registered task reliably. It's an alternative to using 'go func( ){}' when you need to care about the ultimate success of a task.

An error is returned when the task creating process failed, otherwise, the task will be scheduled asynchronously later. If error or panic occurs in the running process, it will be rescheduled according to the 'RetryTimes' value. If the retry times exceeds the maximum config value, the task is marked 'failed' in the database with error logs recorded. In these cases, maybe a manual operation is essential.

The context passed in should be consistent with the 'ctxMarshaler' value defined in the overall configuration or the task definition.

func (*TaskManager) RunWithTx

func (s *TaskManager) RunWithTx(tx *gorm.DB, ctx context.Context, key TaskKey, arg interface{}) error

RunWithTx makes it possible to create a task along with other database operations in the same transaction. The task will be scheduled if the transaction is committed successfully, or canceled if the transaction is rolled backs. Thus, this is a simple implement for BASE that can be used in distributed transaction situations.

The task will be scheduled immediately after the transaction is committed if you use the builtin 'Transaction' function below. Otherwise, it will be scheduled later in the scan process.

You can create more than one task in a single transaction, like this:

_ = Transaction(func(tx *gorm.DB) error {
		if err:= doSomething(); err != nil{ // do something
			return err
		}

		if err := RunWithTx(); err != nil { // task1
			return err
		}

		if err := RunWithTx(); err != nil { // task2
			return err
		}
		return nil
})

func (*TaskManager) Start

func (s *TaskManager) Start()

Start starts the TaskManager. This function should be called before any other functions in a TaskManager is called.

func (*TaskManager) Stop

func (s *TaskManager) Stop(wait bool)

Stop provides the ability to gracefully stop current running tasks. If you cannot tolerate task failure or loss in cases when a termination signal is received or the pod is migrated, it would be better to explicitly call this function before the main process exits. Otherwise, these tasks are easily to be killed and will be reported by abnormal task check process later.

The wait parameter determines whether to wait for all running tasks to complete.

func (*TaskManager) Transaction

func (s *TaskManager) Transaction(fc func(tx *gorm.DB) error) (err error)

Transaction wraps the 'Transaction' function of *gorm.DB, providing the ability to schedule the tasks created inside once the transaction is committed successfully.

type TaskStatus

type TaskStatus string

TaskStatus represents the status of a task.

const (
	TaskStatusUnKnown     TaskStatus = ""
	TaskStatusInitialized TaskStatus = "initialized"
	TaskStatusRunning     TaskStatus = "running"
	TaskStatusSucceeded   TaskStatus = "succeeded"
	TaskStatusFailed      TaskStatus = "failed"
)

here are constants for task status

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL