Documentation ¶
Index ¶
- Variables
- type ExternalStateChangeErr
- type MultiNodeTaskStorage
- type Repository
- type RepositoryUpdater
- type SingleNodeTaskStorage
- func (ts *SingleNodeTaskStorage) Middleware(freeParam bool) []gokugen.MiddlewareFunc
- func (s *SingleNodeTaskStorage) RetryMarking() (allRemoved bool)
- func (ts *SingleNodeTaskStorage) Sync(schedule func(ctx gokugen.SchedulerContext) (gokugen.Task, error)) (rescheduled map[string]gokugen.Task, schedulingErr map[string]error, err error)
- type StateUpdater
- type SyncStateStore
- type TaskInfo
- type TaskMap
- func (m *TaskMap) AllIds() []string
- func (m *TaskMap) Clone() *TaskMap
- func (m *TaskMap) Delete(k string) (deleted bool)
- func (m *TaskMap) Has(k string) (has bool)
- func (m *TaskMap) LoadAndDelete(k string) (task gokugen.Task, loaded bool)
- func (m *TaskMap) LoadOrStore(k string, v gokugen.Task) (actual gokugen.Task, loaded bool)
- type TaskState
- type TaskStateSet
- type UpdateDiff
- type UpdateKey
- type Updater
- type WorkFn
- type WorkFnWParam
- type WorkRegistry
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoEnt is used in Repository implementation // to express situation where TaskInfo for operation does not exist. ErrNoEnt = errors.New("no ent") // ErrInvalidEnt is used in Repository implementation. // This is indication of an invalid TaskInfo insertion or a stored info is invalid. ErrInvalidEnt = errors.New("invalid ent") // ErrNotUpdatableState is used in Repository implementation. // It is returned when TaskInfo for the update is not updatable. ErrNotUpdatableState = errors.New("not updatable") )
var ( ErrMiddlewareOrder = errors.New("invalid middleware order") ErrNonexistentWorkId = errors.New("nonexistent work id") )
var (
ErrOtherNodeWorkingOnTheTask = errors.New("other node is already working on the task")
)
Functions ¶
This section is empty.
Types ¶
type ExternalStateChangeErr ¶
type ExternalStateChangeErr struct {
// contains filtered or unexported fields
}
ExternalStateChangeErr is used to tell that error is caused by external repository manipulations.
func (ExternalStateChangeErr) Error ¶
func (e ExternalStateChangeErr) Error() string
Error implements Error interface.
type MultiNodeTaskStorage ¶
type MultiNodeTaskStorage struct {
// contains filtered or unexported fields
}
MultiNodeTaskStorage is almost same as SingleNodeTaskStorage but has one additional behavior. MultiNodeTaskStorage tries to mark tasks as Working state right before task is being worked on. If task is already marked, it fails to do task. Multiple nodes can be synced to same data storage through RepositoryUpdater interface. And only one node will do task.
func NewMultiNodeTaskStorage ¶
func NewMultiNodeTaskStorage( repo RepositoryUpdater, shouldRestore func(TaskInfo) bool, workRegistry WorkRegistry, ) *MultiNodeTaskStorage
func (*MultiNodeTaskStorage) Middleware ¶
func (m *MultiNodeTaskStorage) Middleware(freeParam bool) []gokugen.MiddlewareFunc
type Repository ¶
type Repository interface { // Insert inserts given model to the repository. // Id and LastModified field are ignored and will be newly created. Insert(TaskInfo) (taskId string, err error) // GetUpdatedSince fetches all elements modified after or equal to t. // Result must be ordered by LastModified in ascending order. // Implementation may limit the number of fetched elements. // Implementation may or may not unmarshal Param to any. Fetched Param may be []byte or naive GetUpdatedSince(t time.Time) ([]TaskInfo, error) // GetById fetches an element associated with the id. // If the id does not exist in the repository, // GetById reports it by returning wrapped or unwrapped ErrNoEnt error. // Implementation may or may not unmarshal Param to any. GetById(id string) (TaskInfo, error) // MarkAsDone marks the task as done. // Other than Initialized or Working state is not updatable to done. MarkAsDone(id string) (ok bool, err error) // MarkAsCancelled marks the task as cancelled. // Other than Initialized or Working state is not updatable to cancelled. MarkAsCancelled(id string) (ok bool, err error) // MarkAsFailed marks the task as failed which means workers failed to do this task. // Other than Initialized or Working state is not updatable to failed. MarkAsFailed(id string) (ok bool, err error) }
type RepositoryUpdater ¶
type RepositoryUpdater interface { Repository StateUpdater }
type SingleNodeTaskStorage ¶
type SingleNodeTaskStorage struct {
// contains filtered or unexported fields
}
SingleNodeTaskStorage provides ability to store task information to, and restore from persistent data storage.
func NewSingleNodeTaskStorage ¶
func NewSingleNodeTaskStorage( repo Repository, shouldRestore func(TaskInfo) bool, workRegistry WorkRegistry, syncCtxWrapper func(gokugen.SchedulerContext) gokugen.SchedulerContext, ) *SingleNodeTaskStorage
NewSingleNodeTaskStorage creates new SingleNodeTaskStorage instance.
repo is Repository, interface to manipulate persistent data storage.
shouldRestore is used in Sync, to decide if task should be restored and re-scheduled in internal scheduler. (e.g. ignore tasks if they are too old and overdue.)
workRegistry is used to retrieve work function associated to workId. User must register functions to registry beforehand.
syncCtxWrapper is used in Sync. Sync tries to schedule newly craeted context. this context will be wrapped with syncCtxWrapper if non nil.
func (*SingleNodeTaskStorage) Middleware ¶
func (ts *SingleNodeTaskStorage) Middleware(freeParam bool) []gokugen.MiddlewareFunc
Middleware returns gokugen.MiddlewareFunc's. Order must be maintained. Though these middleware(s), task context info is stored in external persistent data storage.
If freeParam is true, param free up functionality is enabled. It let those middlewares to forget param until needed. Setting freeParam true adds one middleware that loads up param from repository right before work execution.
func (*SingleNodeTaskStorage) RetryMarking ¶
func (s *SingleNodeTaskStorage) RetryMarking() (allRemoved bool)
RetryMarking retries to mark of failed marking.
func (*SingleNodeTaskStorage) Sync ¶
func (ts *SingleNodeTaskStorage) Sync( schedule func(ctx gokugen.SchedulerContext) (gokugen.Task, error), ) (rescheduled map[string]gokugen.Task, schedulingErr map[string]error, err error)
Sync syncs itnernal state with an external data storage. Normally TaskStorage does it reversely through middlewares, mirroring internal state to the external data storage. But after rebooting the system, or repository is changed externally, Sync is needed to fetch back external data.
type StateUpdater ¶
type SyncStateStore ¶
type SyncStateStore struct {
// contains filtered or unexported fields
}
func NewSyncStateStore ¶
func NewSyncStateStore() *SyncStateStore
func (*SyncStateStore) Clone ¶
func (s *SyncStateStore) Clone() *SyncStateStore
func (*SyncStateStore) GetAll ¶
func (s *SyncStateStore) GetAll() []TaskStateSet
func (*SyncStateStore) Has ¶
func (s *SyncStateStore) Has(v string) (has bool)
func (*SyncStateStore) Len ¶
func (s *SyncStateStore) Len() int
func (*SyncStateStore) Put ¶
func (s *SyncStateStore) Put(v string, state TaskState)
func (*SyncStateStore) Remove ¶
func (s *SyncStateStore) Remove(v string) (removed bool)
type TaskMap ¶
type TaskMap struct {
// contains filtered or unexported fields
}
func NewTaskMap ¶
func NewTaskMap() *TaskMap
func (*TaskMap) LoadAndDelete ¶
type TaskStateSet ¶
type UpdateDiff ¶
type Updater ¶
type Updater interface { // Update updates the info with diff. // Implementation may limit updatable state. // If state is not updatable, return wrapped or direct ErrNotUpdatableState. Update(taskId string, diff UpdateDiff) (err error) }
type WorkFnWParam ¶
type WorkFnWParam = gokugen.WorkFnWParam
type WorkRegistry ¶
type WorkRegistry interface {
Load(key string) (value WorkFnWParam, ok bool)
}
WorkRegistry is used to retrieve work function by workId. impl/work_registry.ParamUnmarshaller will be good enough for almost all users.