Documentation ¶
Index ¶
- Constants
- func IsRecoverable(err error) bool
- func NewErr(msg string) error
- func Unrecoverable(err error) error
- type Base
- func (b *Base) Cancel()
- func (b *Base) Ctx() context.Context
- func (b *Base) CtxDone() <-chan struct{}
- func (b *Base) GetErr() error
- func (b *Base) GetID() string
- func (b *Base) GetProgress() float64
- func (b *Base) GetRetry() (int, int)
- func (b *Base) GetState() State
- func (b *Base) Persist()
- func (b *Base) SetCancelFunc(cancelFunc context.CancelFunc)
- func (b *Base) SetCtx(ctx context.Context)
- func (b *Base) SetErr(err error)
- func (b *Base) SetID(id string)
- func (b *Base) SetPersist(persist func())
- func (b *Base) SetProgress(progress float64)
- func (b *Base) SetRetry(retry int, maxRetry int)
- func (b *Base) SetState(state State)
- type Info
- type Manager
- func (m *Manager[T]) Add(task T)
- func (m *Manager[T]) Cancel(id string)
- func (m *Manager[T]) CancelAll()
- func (m *Manager[T]) CancelByCondition(condition func(task T) bool)
- func (m *Manager[T]) GetAll() []T
- func (m *Manager[T]) GetByCondition(condition func(task T) bool) []T
- func (m *Manager[T]) GetByID(id string) (T, bool)
- func (m *Manager[T]) GetByState(state ...State) []T
- func (m *Manager[T]) Pause()
- func (m *Manager[T]) Remove(id string)
- func (m *Manager[T]) RemoveAll()
- func (m *Manager[T]) RemoveByCondition(condition func(task T) bool)
- func (m *Manager[T]) RemoveByState(state ...State)
- func (m *Manager[T]) Retry(id string)
- func (m *Manager[T]) RetryAllFailed()
- func (m *Manager[T]) Start()
- func (m *Manager[T]) Wait()
- type OnBeforeRetry
- type OnFailed
- type OnSucceeded
- type Option
- func WithLogger(logger *slog.Logger) Option
- func WithMaxRetry(maxRetry int) Option
- func WithOptions(opts Options) Option
- func WithPersistDebounce(debounce time.Duration) Option
- func WithPersistFunction(r func() ([]byte, error), w func([]byte) error) Option
- func WithPersistPath(path string) Option
- func WithRunning(running bool) Option
- func WithTimeout(timeout time.Duration) Option
- func WithWorks(works int) Option
- type Options
- type Persistable
- type Recoverable
- type Retryable
- type State
- type TacheError
- type Task
- type TaskBase
- type TaskWithInfo
- type Worker
- type WorkerPool
Constants ¶
const ( // StatePending is the state of a task when it is pending StatePending = iota // StateRunning is the state of a task when it is running StateRunning // StateSucceeded is the state of a task when it succeeded StateSucceeded // StateCanceling is the state of a task when it is canceling StateCanceling // StateCanceled is the state of a task when it is canceled StateCanceled // StateErrored is the state of a task when it is errored (it will be retried) StateErrored // StateFailing is the state of a task when it is failing (executed OnFailed hook) StateFailing // StateFailed is the state of a task when it failed (no retry times left) StateFailed // StateWaitingRetry is the state of a task when it is waiting for retry StateWaitingRetry // StateBeforeRetry is the state of a task when it is executing OnBeforeRetry hook StateBeforeRetry )
Variables ¶
This section is empty.
Functions ¶
func IsRecoverable ¶
IsRecoverable checks if error is an instance of `unrecoverableError`
func Unrecoverable ¶
Unrecoverable wraps an error in `unrecoverableError` struct
Types ¶
type Base ¶
type Base struct { ID string `json:"id"` State State `json:"state"` Retry int `json:"retry"` MaxRetry int `json:"max_retry"` // contains filtered or unexported fields }
Base is the base struct for all tasks to implement TaskBase interface
func (*Base) GetProgress ¶
func (*Base) SetCancelFunc ¶
func (b *Base) SetCancelFunc(cancelFunc context.CancelFunc)
func (*Base) SetPersist ¶
func (b *Base) SetPersist(persist func())
func (*Base) SetProgress ¶
type Manager ¶
type Manager[T Task] struct { // contains filtered or unexported fields }
Manager is the manager of all tasks
func NewManager ¶
NewManager create a new manager
func (*Manager[T]) CancelByCondition ¶ added in v0.1.3
CancelByCondition cancel tasks under specific condition given by a function
func (*Manager[T]) GetByCondition ¶ added in v0.1.3
GetByCondition get tasks under specific condition given by a function
func (*Manager[T]) GetByState ¶
GetByState get tasks by state
func (*Manager[T]) RemoveByCondition ¶ added in v0.1.3
RemoveByCondition remove tasks under specific condition given by a function
func (*Manager[T]) RemoveByState ¶
RemoveByState remove tasks by state
func (*Manager[T]) RetryAllFailed ¶
func (m *Manager[T]) RetryAllFailed()
RetryAllFailed retry all failed tasks
type OnBeforeRetry ¶
type OnBeforeRetry interface {
OnBeforeRetry()
}
OnBeforeRetry is the interface for tasks that need to be executed before retrying
type OnFailed ¶
type OnFailed interface {
OnFailed()
}
OnFailed is the interface for tasks that need to be executed when they fail
type OnSucceeded ¶
type OnSucceeded interface {
OnSucceeded()
}
OnSucceeded is the interface for tasks that need to be executed when they succeed
type Option ¶
type Option func(*Options)
Option is the option for manager
func WithPersistDebounce ¶
WithPersistDebounce set persist debounce
func WithPersistFunction ¶ added in v0.1.2
type Options ¶
type Options struct { Works int MaxRetry int Timeout *time.Duration PersistPath string PersistDebounce *time.Duration Running bool Logger *slog.Logger PersistReadFunction func() ([]byte, error) PersistWriteFunction func([]byte) error }
Options is the options for manager
type Persistable ¶
type Persistable interface {
Persistable() bool
}
Persistable judge whether the task is persistable
type Recoverable ¶
type Recoverable interface {
Recoverable() bool
}
Recoverable judge whether the task is recoverable
type Retryable ¶
type Retryable interface {
Retryable() bool
}
Retryable judge whether the task is retryable
type TacheError ¶
type TacheError struct {
Msg string
}
TacheError is a custom error type
func (*TacheError) Error ¶
func (e *TacheError) Error() string
type TaskBase ¶
type TaskBase interface { // SetProgress sets the progress of the task SetProgress(progress float64) // GetProgress gets the progress of the task GetProgress() float64 // SetState sets the state of the task SetState(state State) // GetState gets the state of the task GetState() State // GetID gets the ID of the task GetID() string // SetID sets the ID of the task SetID(id string) // SetErr sets the error of the task SetErr(err error) // GetErr gets the error of the task GetErr() error // SetCtx sets the context of the task SetCtx(ctx context.Context) // CtxDone gets the context done channel of the task CtxDone() <-chan struct{} // Cancel cancels the task Cancel() // Ctx gets the context of the task Ctx() context.Context // SetCancelFunc sets the cancel function of the task SetCancelFunc(cancelFunc context.CancelFunc) // GetRetry gets the retry of the task GetRetry() (int, int) // SetRetry sets the retry of the task SetRetry(retry int, maxRetry int) // Persist persists the task Persist() // SetPersist sets the persist function of the task SetPersist(persist func()) }
TaskBase is the base interface for all tasks
type TaskWithInfo ¶
type WorkerPool ¶
type WorkerPool[T Task] struct { // contains filtered or unexported fields }
WorkerPool is the pool of workers
func NewWorkerPool ¶
func NewWorkerPool[T Task](size int) *WorkerPool[T]
NewWorkerPool creates a new worker pool
func (*WorkerPool[T]) Put ¶
func (wp *WorkerPool[T]) Put(worker *Worker[T])
Put puts a worker back to pool