taskstorage

package
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 5, 2022 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoEnt is used in Repository implementation
	// to express situation where TaskInfo for operation does not exist.
	ErrNoEnt = errors.New("no ent")
	// ErrInvalidEnt is used in Repository implementation.
	// This is indication of an invalid TaskInfo insertion or a stored info is invalid.
	ErrInvalidEnt = errors.New("invalid ent")
	// ErrNotUpdatableState is used in Repository implementation.
	// It is returned when TaskInfo for the update is not updatable.
	ErrNotUpdatableState = errors.New("not updatable")
)
View Source
var (
	ErrMiddlewareOrder   = errors.New("invalid middleware order")
	ErrNonexistentWorkId = errors.New("nonexistent work id")
)
View Source
var (
	ErrOtherNodeWorkingOnTheTask = errors.New("other node is already working on the task")
)

Functions

This section is empty.

Types

type ExternalStateChangeErr

type ExternalStateChangeErr struct {
	// contains filtered or unexported fields
}

ExternalStateChangeErr is used to tell that error is caused by external repository manipulations.

func (ExternalStateChangeErr) Error

func (e ExternalStateChangeErr) Error() string

Error implements Error interface.

type MultiNodeTaskStorage

type MultiNodeTaskStorage struct {
	// contains filtered or unexported fields
}

MultiNodeTaskStorage is almost same as SingleNodeTaskStorage but has one additional behavior. MultiNodeTaskStorage tries to mark tasks as Working state right before task is being worked on. If task is already marked, it fails to do task. Multiple nodes can be synced to same data storage through RepositoryUpdater interface. And only one node will do task.

func NewMultiNodeTaskStorage

func NewMultiNodeTaskStorage(
	repo RepositoryUpdater,
	shouldRestore func(TaskInfo) bool,
	workRegistry WorkRegistry,
) *MultiNodeTaskStorage

func (*MultiNodeTaskStorage) Middleware

func (m *MultiNodeTaskStorage) Middleware(freeParam bool) []gokugen.MiddlewareFunc

func (*MultiNodeTaskStorage) Sync

func (m *MultiNodeTaskStorage) Sync(
	schedule func(ctx gokugen.SchedulerContext) (gokugen.Task, error),
) (rescheduled map[string]gokugen.Task, schedulingErr map[string]error, err error)

type Repository

type Repository interface {
	// Insert inserts given model to the repository.
	// Id and LastModified field are ignored and will be newly created.
	Insert(TaskInfo) (taskId string, err error)
	// GetUpdatedSince fetches all elements modified after or equal to t.
	// Result must be ordered by LastModified in ascending order.
	// Implementation may limit the number of fetched elements.
	// Implementation may or may not unmarshal Param to any. Fetched Param may be []byte or naive
	GetUpdatedSince(t time.Time) ([]TaskInfo, error)
	// GetById fetches an element associated with the id.
	// If the id does not exist in the repository,
	// GetById reports it by returning wrapped  or unwrapped ErrNoEnt error.
	// Implementation may or may not unmarshal Param to any.
	GetById(id string) (TaskInfo, error)
	// MarkAsDone marks the task as done.
	// Other than Initialized or Working state is not updatable to done.
	MarkAsDone(id string) (ok bool, err error)
	// MarkAsCancelled marks the task as cancelled.
	// Other than Initialized or Working state is not updatable to cancelled.
	MarkAsCancelled(id string) (ok bool, err error)
	// MarkAsFailed marks the task as failed which means workers failed to do this task.
	// Other than Initialized or Working state is not updatable to failed.
	MarkAsFailed(id string) (ok bool, err error)
}

type RepositoryUpdater

type RepositoryUpdater interface {
	Repository
	StateUpdater
}

type SingleNodeTaskStorage

type SingleNodeTaskStorage struct {
	// contains filtered or unexported fields
}

SingleNodeTaskStorage provides ability to store task information to, and restore from persistent data storage.

func NewSingleNodeTaskStorage

func NewSingleNodeTaskStorage(
	repo Repository,
	shouldRestore func(TaskInfo) bool,
	workRegistry WorkRegistry,
	syncCtxWrapper func(gokugen.SchedulerContext) gokugen.SchedulerContext,
) *SingleNodeTaskStorage

NewSingleNodeTaskStorage creates new SingleNodeTaskStorage instance.

repo is Repository, interface to manipulate persistent data storage.

shouldRestore is used in Sync, to decide if task should be restored and re-scheduled in internal scheduler. (e.g. ignore tasks if they are too old and overdue.)

workRegistry is used to retrieve work function associated to workId. User must register functions to registry beforehand.

syncCtxWrapper is used in Sync. Sync tries to schedule newly craeted context. this context will be wrapped with syncCtxWrapper if non nil.

func (*SingleNodeTaskStorage) Middleware

func (ts *SingleNodeTaskStorage) Middleware(freeParam bool) []gokugen.MiddlewareFunc

Middleware returns gokugen.MiddlewareFunc's. Order must be maintained. Though these middleware(s), task context info is stored in external persistent data storage.

If freeParam is true, param free up functionality is enabled. It let those middlewares to forget param until needed. Setting freeParam true adds one middleware that loads up param from repository right before work execution.

func (*SingleNodeTaskStorage) RetryMarking

func (s *SingleNodeTaskStorage) RetryMarking() (allRemoved bool)

RetryMarking retries to mark of failed marking.

func (*SingleNodeTaskStorage) Sync

func (ts *SingleNodeTaskStorage) Sync(
	schedule func(ctx gokugen.SchedulerContext) (gokugen.Task, error),
) (rescheduled map[string]gokugen.Task, schedulingErr map[string]error, err error)

Sync syncs itnernal state with an external data storage. Normally TaskStorage does it reversely through middlewares, mirroring internal state to the external data storage. But after rebooting the system, or repository is changed externally, Sync is needed to fetch back external data.

type StateUpdater

type StateUpdater interface {
	// UpdateState updates task State to new if current state is old.
	// swapped is true if update is successful, false otherwise.
	UpdateState(taskId string, old, new TaskState) (swapped bool, err error)
}

type SyncStateStore

type SyncStateStore struct {
	// contains filtered or unexported fields
}

func NewSyncStateStore

func NewSyncStateStore() *SyncStateStore

func (*SyncStateStore) Clone

func (s *SyncStateStore) Clone() *SyncStateStore

func (*SyncStateStore) GetAll

func (s *SyncStateStore) GetAll() []TaskStateSet

func (*SyncStateStore) Has

func (s *SyncStateStore) Has(v string) (has bool)

func (*SyncStateStore) Len

func (s *SyncStateStore) Len() int

func (*SyncStateStore) Put

func (s *SyncStateStore) Put(v string, state TaskState)

func (*SyncStateStore) Remove

func (s *SyncStateStore) Remove(v string) (removed bool)

type TaskInfo

type TaskInfo struct {
	Id            string
	WorkId        string
	Param         any
	ScheduledTime time.Time
	State         TaskState
	LastModified  time.Time
}

type TaskMap

type TaskMap struct {
	// contains filtered or unexported fields
}

func NewTaskMap

func NewTaskMap() *TaskMap

func (*TaskMap) AllIds

func (m *TaskMap) AllIds() []string

func (*TaskMap) Clone

func (m *TaskMap) Clone() *TaskMap

func (*TaskMap) Delete

func (m *TaskMap) Delete(k string) (deleted bool)

func (*TaskMap) Has

func (m *TaskMap) Has(k string) (has bool)

func (*TaskMap) LoadAndDelete

func (m *TaskMap) LoadAndDelete(k string) (task gokugen.Task, loaded bool)

func (*TaskMap) LoadOrStore

func (m *TaskMap) LoadOrStore(k string, v gokugen.Task) (actual gokugen.Task, loaded bool)

type TaskState

type TaskState int
const (
	Initialized TaskState = iota
	Working
	Done
	Cancelled
	Failed
)

func NewStateFromString

func NewStateFromString(s string) TaskState

func (TaskState) String

func (ts TaskState) String() string

type TaskStateSet

type TaskStateSet struct {
	Key   string
	Value TaskState
}

type UpdateDiff

type UpdateDiff struct {
	UpdateKey UpdateKey
	Diff      TaskInfo
}

type UpdateKey

type UpdateKey struct {
	WorkId        bool
	Param         bool
	ScheduledTime bool
	State         bool
}

type Updater

type Updater interface {
	// Update updates the info with diff.
	// Implementation may limit updatable state.
	// If state is not updatable, return wrapped or direct ErrNotUpdatableState.
	Update(taskId string, diff UpdateDiff) (err error)
}

type WorkFn

type WorkFn = gokugen.WorkFn

type WorkFnWParam

type WorkFnWParam = gokugen.WorkFnWParam

type WorkRegistry

type WorkRegistry interface {
	Load(key string) (value WorkFnWParam, ok bool)
}

WorkRegistry is used to retrieve work function by workId. impl/work_registry.ParamUnmarshaller will be good enough for almost all users.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL