tache

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 26, 2024 License: MIT Imports: 13 Imported by: 3

README

tache

A task manager for golang

Documentation

Index

Constants

View Source
const (
	// StatePending is the state of a task when it is pending
	StatePending = iota
	// StateRunning is the state of a task when it is running
	StateRunning
	// StateSucceeded is the state of a task when it succeeded
	StateSucceeded
	// StateCanceling is the state of a task when it is canceling
	StateCanceling
	// StateCanceled is the state of a task when it is canceled
	StateCanceled
	// StateErrored is the state of a task when it is errored (it will be retried)
	StateErrored
	// StateFailing is the state of a task when it is failing (executed OnFailed hook)
	StateFailing
	// StateFailed is the state of a task when it failed (no retry times left)
	StateFailed
	// StateWaitingRetry is the state of a task when it is waiting for retry
	StateWaitingRetry
	// StateBeforeRetry is the state of a task when it is executing OnBeforeRetry hook
	StateBeforeRetry
)

Variables

This section is empty.

Functions

func IsRecoverable

func IsRecoverable(err error) bool

IsRecoverable checks if error is an instance of `unrecoverableError`

func NewErr

func NewErr(msg string) error

NewErr creates a new TacheError

func Unrecoverable

func Unrecoverable(err error) error

Unrecoverable wraps an error in `unrecoverableError` struct

Types

type Base

type Base struct {
	ID       string `json:"id"`
	State    State  `json:"state"`
	Retry    int    `json:"retry"`
	MaxRetry int    `json:"max_retry"`
	// contains filtered or unexported fields
}

Base is the base struct for all tasks to implement TaskBase interface

func (*Base) Cancel

func (b *Base) Cancel()

func (*Base) Ctx

func (b *Base) Ctx() context.Context

func (*Base) CtxDone

func (b *Base) CtxDone() <-chan struct{}

func (*Base) GetErr

func (b *Base) GetErr() error

func (*Base) GetID

func (b *Base) GetID() string

func (*Base) GetProgress

func (b *Base) GetProgress() float64

func (*Base) GetRetry

func (b *Base) GetRetry() (int, int)

func (*Base) GetState

func (b *Base) GetState() State

func (*Base) Persist

func (b *Base) Persist()

func (*Base) SetCancelFunc

func (b *Base) SetCancelFunc(cancelFunc context.CancelFunc)

func (*Base) SetCtx

func (b *Base) SetCtx(ctx context.Context)

func (*Base) SetErr

func (b *Base) SetErr(err error)

func (*Base) SetID

func (b *Base) SetID(id string)

func (*Base) SetPersist

func (b *Base) SetPersist(persist func())

func (*Base) SetProgress

func (b *Base) SetProgress(progress float64)

func (*Base) SetRetry

func (b *Base) SetRetry(retry int, maxRetry int)

func (*Base) SetState

func (b *Base) SetState(state State)

type Info

type Info interface {
	GetName() string
	GetStatus() string
}

type Manager

type Manager[T Task] struct {
	// contains filtered or unexported fields
}

Manager is the manager of all tasks

func NewManager

func NewManager[T Task](opts ...Option) *Manager[T]

NewManager create a new manager

func (*Manager[T]) Add

func (m *Manager[T]) Add(task T)

Add a task to manager

func (*Manager[T]) Cancel

func (m *Manager[T]) Cancel(id string)

Cancel a task by ID

func (*Manager[T]) CancelAll

func (m *Manager[T]) CancelAll()

CancelAll cancel all tasks

func (*Manager[T]) CancelByCondition added in v0.1.3

func (m *Manager[T]) CancelByCondition(condition func(task T) bool)

CancelByCondition cancel tasks under specific condition given by a function

func (*Manager[T]) GetAll

func (m *Manager[T]) GetAll() []T

GetAll get all tasks

func (*Manager[T]) GetByCondition added in v0.1.3

func (m *Manager[T]) GetByCondition(condition func(task T) bool) []T

GetByCondition get tasks under specific condition given by a function

func (*Manager[T]) GetByID

func (m *Manager[T]) GetByID(id string) (T, bool)

GetByID get task by ID

func (*Manager[T]) GetByState

func (m *Manager[T]) GetByState(state ...State) []T

GetByState get tasks by state

func (*Manager[T]) Pause

func (m *Manager[T]) Pause()

Pause manager

func (*Manager[T]) Remove

func (m *Manager[T]) Remove(id string)

Remove a task by ID

func (*Manager[T]) RemoveAll

func (m *Manager[T]) RemoveAll()

RemoveAll remove all tasks

func (*Manager[T]) RemoveByCondition added in v0.1.3

func (m *Manager[T]) RemoveByCondition(condition func(task T) bool)

RemoveByCondition remove tasks under specific condition given by a function

func (*Manager[T]) RemoveByState

func (m *Manager[T]) RemoveByState(state ...State)

RemoveByState remove tasks by state

func (*Manager[T]) Retry

func (m *Manager[T]) Retry(id string)

Retry a task by ID

func (*Manager[T]) RetryAllFailed

func (m *Manager[T]) RetryAllFailed()

RetryAllFailed retry all failed tasks

func (*Manager[T]) Start

func (m *Manager[T]) Start()

Start manager

func (*Manager[T]) Wait

func (m *Manager[T]) Wait()

Wait wait all tasks done, just for test

type OnBeforeRetry

type OnBeforeRetry interface {
	OnBeforeRetry()
}

OnBeforeRetry is the interface for tasks that need to be executed before retrying

type OnFailed

type OnFailed interface {
	OnFailed()
}

OnFailed is the interface for tasks that need to be executed when they fail

type OnSucceeded

type OnSucceeded interface {
	OnSucceeded()
}

OnSucceeded is the interface for tasks that need to be executed when they succeed

type Option

type Option func(*Options)

Option is the option for manager

func WithLogger

func WithLogger(logger *slog.Logger) Option

WithLogger set logger

func WithMaxRetry

func WithMaxRetry(maxRetry int) Option

WithMaxRetry set retry

func WithOptions

func WithOptions(opts Options) Option

WithOptions set options

func WithPersistDebounce

func WithPersistDebounce(debounce time.Duration) Option

WithPersistDebounce set persist debounce

func WithPersistFunction added in v0.1.2

func WithPersistFunction(r func() ([]byte, error), w func([]byte) error) Option

func WithPersistPath

func WithPersistPath(path string) Option

WithPersistPath set persist path

func WithRunning

func WithRunning(running bool) Option

WithRunning set running

func WithTimeout

func WithTimeout(timeout time.Duration) Option

WithTimeout set timeout

func WithWorks

func WithWorks(works int) Option

WithWorks set works

type Options

type Options struct {
	Works                int
	MaxRetry             int
	Timeout              *time.Duration
	PersistPath          string
	PersistDebounce      *time.Duration
	Running              bool
	Logger               *slog.Logger
	PersistReadFunction  func() ([]byte, error)
	PersistWriteFunction func([]byte) error
}

Options is the options for manager

func DefaultOptions

func DefaultOptions() *Options

DefaultOptions returns default options

type Persistable

type Persistable interface {
	Persistable() bool
}

Persistable judge whether the task is persistable

type Recoverable

type Recoverable interface {
	Recoverable() bool
}

Recoverable judge whether the task is recoverable

type Retryable

type Retryable interface {
	Retryable() bool
}

Retryable judge whether the task is retryable

type State

type State int

State is the state of a task

type TacheError

type TacheError struct {
	Msg string
}

TacheError is a custom error type

func (*TacheError) Error

func (e *TacheError) Error() string

type Task

type Task interface {
	TaskBase
	Run() error
}

Task is the interface for all tasks

type TaskBase

type TaskBase interface {
	// SetProgress sets the progress of the task
	SetProgress(progress float64)
	// GetProgress gets the progress of the task
	GetProgress() float64
	// SetState sets the state of the task
	SetState(state State)
	// GetState gets the state of the task
	GetState() State
	// GetID gets the ID of the task
	GetID() string
	// SetID sets the ID of the task
	SetID(id string)
	// SetErr sets the error of the task
	SetErr(err error)
	// GetErr gets the error of the task
	GetErr() error
	// SetCtx sets the context of the task
	SetCtx(ctx context.Context)
	// CtxDone gets the context done channel of the task
	CtxDone() <-chan struct{}
	// Cancel cancels the task
	Cancel()
	// Ctx gets the context of the task
	Ctx() context.Context
	// SetCancelFunc sets the cancel function of the task
	SetCancelFunc(cancelFunc context.CancelFunc)
	// GetRetry gets the retry of the task
	GetRetry() (int, int)
	// SetRetry sets the retry of the task
	SetRetry(retry int, maxRetry int)
	// Persist persists the task
	Persist()
	// SetPersist sets the persist function of the task
	SetPersist(persist func())
}

TaskBase is the base interface for all tasks

type TaskWithInfo

type TaskWithInfo interface {
	Task
	Info
}

type Worker

type Worker[T Task] struct {
	ID int
}

Worker is the worker to execute task

func (Worker[T]) Execute

func (w Worker[T]) Execute(task T)

Execute executes the task

type WorkerPool

type WorkerPool[T Task] struct {
	// contains filtered or unexported fields
}

WorkerPool is the pool of workers

func NewWorkerPool

func NewWorkerPool[T Task](size int) *WorkerPool[T]

NewWorkerPool creates a new worker pool

func (*WorkerPool[T]) Get

func (wp *WorkerPool[T]) Get() *Worker[T]

Get gets a worker from pool

func (*WorkerPool[T]) Put

func (wp *WorkerPool[T]) Put(worker *Worker[T])

Put puts a worker back to pool

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL