Documentation ¶
Index ¶
- Variables
- func MillisToUTCTime(millis int64) time.Time
- type Job
- type JobID
- type JobMetadata
- type JobType
- type ScheduledTask
- type Scheduler
- type Task
- type TaskHandler
- type TimingWheel
- type TimingWheelCommonMetadata
- type TimingWheelOptions
- type TimingWheelSlot
- type TimingWheelSlotMetadata
- type TimingWheels
Constants ¶
This section is empty.
Variables ¶
var ( ErrTimingWheelStopped = errors.New("timing wheel stopped") ErrTimingWheelTaskNotFound = errors.New("task not found") ErrTimingWheelTaskEmptyJobID = errors.New("empty job id in task") ErrTimingWheelEmptyJob = errors.New("empty job in task") ErrTimingWheelTaskIsExpired = errors.New("task is expired") ErrTimingWheelTaskUnableToBeAddedToSlot = errors.New("task unable to be added to slot") ErrTimingWheelTaskUnableToBeRemoved = errors.New("task unable to be removed") ErrTimingWheelTaskTooShortExpiration = errors.New("task expiration is too short") ErrTimingWheelUnknownScheduler = errors.New("unknown schedule") ErrTimingWheelTaskCancelled = errors.New("task cancelled") )
Functions ¶
func MillisToUTCTime ¶
Types ¶
type Job ¶
type Job func(ctx context.Context, metadata JobMetadata)
Job is the function that will be executed by the timing wheel
type JobMetadata ¶
type JobMetadata interface { // GetJobID returns the jobID of the job, unique identifier. GetJobID() JobID // GetExpiredMs returns the expirationMs of the job. GetExpiredMs() int64 // GetRestLoopCount returns the rest loop count. GetRestLoopCount() int64 // GetJobType returns the job type. GetJobType() JobType }
JobMetadata describes the metadata of a job Each slot in the timing wheel is a linked list of jobs
type ScheduledTask ¶
type ScheduledTask interface { Task UpdateNextScheduledMs() }
ScheduledTask is the interface that wraps the repeat Job
func NewRepeatTask ¶
type Scheduler ¶
type Scheduler interface { // GetRestLoopCount returns the rest loop count. // If the rest loop count is -1, it means that the task will run forever unless cancel manually. GetRestLoopCount() int64 // contains filtered or unexported methods }
func NewFiniteScheduler ¶
func NewInfiniteScheduler ¶
type Task ¶
type Task interface { JobMetadata GetJobMetadata() JobMetadata // GetJob returns the job function. GetJob() Job // GetSlot returns the slot of the job. GetSlot() TimingWheelSlot // GetPreviousSlotMetadata returns the previous slot metadata of the job. GetPreviousSlotMetadata() TimingWheelSlotMetadata Cancel() bool Cancelled() bool // contains filtered or unexported methods }
Task is the interface that wraps the Job
type TaskHandler ¶
type TaskHandler func(Task) // Core function
TaskHandler is a function that reinserts a task into the timing wheel. It means that the task should be executed periodically or repeatedly for a certain times. Reinsert will add current task to next slot, higher level slot (overflow wheel) or the same level slot (current wheel) depending on the expirationMs of the task. When the task is reinserted, the expirationMs of the task should be updated.
- Check if the task is cancelled. If so, stop reinserting.
- Check if the task's loop count is greater than 0. If so, decrease the loop count and reinsert.
- Check if the task's loop count is -1 (run forever unless cancel manually). If so, reinsert and update the expirationMs.
type TimingWheel ¶
type TimingWheel interface { TimingWheelCommonMetadata GetInterval() int64 GetCurrentTimeMs() int64 }
TimingWheel slots is private, they should be provided by the implementation
type TimingWheelOptions ¶
type TimingWheelOptions func(tw *timingWheel)
func WithTimingWheelSlotSize ¶
func WithTimingWheelSlotSize(slotSize int64) TimingWheelOptions
func WithTimingWheelTickMs ¶
func WithTimingWheelTickMs(basicTickMs time.Duration) TimingWheelOptions
type TimingWheelSlot ¶
type TimingWheelSlot interface { TimingWheelSlotMetadata // GetMetadata returns the metadata of the slot. GetMetadata() TimingWheelSlotMetadata // AddTask adds a task to the slot. AddTask(Task) // RemoveTask removes a task from the slot. RemoveTask(Task) bool // Flush flushes all tasks in the slot generally, // but it should be called in a loop. Flush(TaskHandler) }
TimingWheelSlot is the interface that wraps the slot, in kafka, it is called bucket.
func NewXSlot ¶
func NewXSlot() TimingWheelSlot
type TimingWheelSlotMetadata ¶
type TimingWheelSlotMetadata interface { // GetExpirationMs returns the expirationMs of the slot. GetExpirationMs() int64 // GetSlotID returns the slotID of the slot, easy for debugging. GetSlotID() int64 // GetLevel returns the level of the slot, easy for debugging. GetLevel() int64 // contains filtered or unexported methods }
type TimingWheels ¶
type TimingWheels interface { TimingWheelCommonMetadata // GetTaskCounter returns the current task count of the timing wheel. GetTaskCounter() int64 // AddTask adds a task to the timing wheels. AddTask(task Task) error // CancelTask cancels a task by jobID. CancelTask(jobID JobID) error // Shutdown stops the timing wheels Shutdown() // AfterFunc schedules a function to run after the duration delayMs. AfterFunc(delayMs time.Duration, fn Job) (Task, error) // ScheduleFunc schedules a function to run at a certain time generated by the schedule. ScheduleFunc(schedFn func() Scheduler, fn Job) (Task, error) }
func NewTimingWheels ¶
func NewTimingWheels(ctx context.Context, startMs int64, opts ...TimingWheelOptions) TimingWheels
NewTimingWheels creates a new timing wheel. @param startMs the start time in milliseconds, example value time.Now().UnixMilli().
Same as the kafka, Time.SYSTEM.hiResClockMs() is used.