api

package
v1.1.0-beta.0...-d6b313f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultStoreNamespace = "default"

DefaultStoreNamespace is the default namespace.

Variables

View Source
var ErrEventIDNotMatch = errors.New("timer event id not match")

ErrEventIDNotMatch indicates that the timer's event id not match.

View Source
var ErrTimerExists = errors.New("timer already exists")

ErrTimerExists indicates that the specified timer already exits.

View Source
var ErrTimerNotExist = errors.New("timer not exist")

ErrTimerNotExist indicates that the specified timer not exist.

View Source
var ErrVersionNotMatch = errors.New("timer version not match")

ErrVersionNotMatch indicates that the timer's version not match.

Functions

func ValidateTimeZone

func ValidateTimeZone(tz string) error

ValidateTimeZone validates the TimeZone field.

Types

type Cond

type Cond interface {
	// Match returns whether a record match the condition.
	Match(timer *TimerRecord) bool
}

Cond is an interface to match a timer record.

type CronPolicy

type CronPolicy struct {
	// contains filtered or unexported fields
}

CronPolicy implements SchedEventPolicy, it is the policy of type `SchedEventCron`.

func NewCronPolicy

func NewCronPolicy(expr string) (*CronPolicy, error)

NewCronPolicy creates a new CronPolicy.

func (*CronPolicy) NextEventTime

func (p *CronPolicy) NextEventTime(watermark time.Time) (time.Time, bool)

NextEventTime returns the next time of the timer event.

type EventExtra

type EventExtra struct {
	// EventManualRequestID is the related request id of the manual trigger.
	// If current event is not triggered manually, it is empty.
	EventManualRequestID string
	// EventWatermark is the watermark when event triggers.
	EventWatermark time.Time
}

EventExtra stores some extra attributes for event.

type GetTimerOption

type GetTimerOption func(*TimerCond)

GetTimerOption is the option to get timers.

func WithID

func WithID(id string) GetTimerOption

WithID indicates to get a timer with the specified id.

func WithKey

func WithKey(key string) GetTimerOption

WithKey indicates to get a timer with the specified key.

func WithKeyPrefix

func WithKeyPrefix(keyPrefix string) GetTimerOption

WithKeyPrefix to get timers with the indicated key prefix.

func WithTag

func WithTag(tags ...string) GetTimerOption

WithTag indicates to get a timer with the specified tags.

type Hook

type Hook interface {
	// Start starts the hook.
	Start()
	// Stop stops the hook. When it is called, this means the framework is shutting down.
	Stop()
	// OnPreSchedEvent will be called before triggering a new event. It's return value tells the next action of the triggering.
	// For example, if `TimerShedEvent.Delay` is a non-zero value, the event triggering will be postponed.
	// Notice that `event.Timer().EventID` will be empty because the current event is not actually triggered,
	// use `event.EventID()` to get the event id instead.
	OnPreSchedEvent(ctx context.Context, event TimerShedEvent) (PreSchedEventResult, error)
	// OnSchedEvent will be called when a new event is triggered.
	OnSchedEvent(ctx context.Context, event TimerShedEvent) error
}

Hook is an interface which should be implemented by user to tell framework how to trigger an event. Several timers with a same hook class can share one hook in a runtime.

type HookFactory

type HookFactory func(hookClass string, cli TimerClient) Hook

HookFactory is the factory function to construct a new Hook object with `hookClass`.

type ManualRequest

type ManualRequest struct {
	// ManualRequestID is the id of manual request.
	ManualRequestID string
	// ManualRequestTime is the request time.
	ManualRequestTime time.Time
	// ManualTimeout is the timeout for the request, if the timer is not triggered after timeout, processed will be set to true
	// with empty event id.
	ManualTimeout time.Duration
	// ManualProcessed indicates the request is processed (triggered or timeout).
	ManualProcessed bool
	// ManualEventID means the triggered event id for the current request.
	ManualEventID string
}

ManualRequest is the request info to trigger timer manually.

func (*ManualRequest) IsManualRequesting

func (r *ManualRequest) IsManualRequesting() bool

IsManualRequesting indicates that whether this timer is requesting to trigger event manually.

func (*ManualRequest) SetProcessed

func (r *ManualRequest) SetProcessed(eventID string) ManualRequest

SetProcessed sets the timer's manual request to processed.

type Operator

type Operator struct {
	// Op indicates the operator type.
	Op OperatorTp
	// Not indicates whether to apply 'NOT' to the condition.
	Not bool
	// Children is the children of the operator.
	Children []Cond
}

Operator implements Cond.

func And

func And(children ...Cond) *Operator

And returns a condition `AND(child1, child2, ...)`.

func Not

func Not(cond Cond) *Operator

Not returns a condition `NOT(cond)`.

func Or

func Or(children ...Cond) *Operator

Or returns a condition `OR(child1, child2, ...)`.

func (*Operator) Match

func (c *Operator) Match(t *TimerRecord) bool

Match will return whether the condition match the timer record.

type OperatorTp

type OperatorTp int8

OperatorTp is the operator type of the condition.

const (
	// OperatorAnd means 'AND' operator.
	OperatorAnd OperatorTp = iota
	// OperatorOr means 'OR' operator.
	OperatorOr
)

type OptionalVal

type OptionalVal[T any] struct {
	// contains filtered or unexported fields
}

OptionalVal is used by `TimerCond` and `TimerUpdate` to indicate. whether some field's condition has been set or whether is field should be updated.

func NewOptionalVal

func NewOptionalVal[T any](val T) (o OptionalVal[T])

NewOptionalVal creates a new OptionalVal.

func (*OptionalVal[T]) Clear

func (o *OptionalVal[T]) Clear()

Clear clears the value. The `Present()` will return false after `Clear` is called.

func (*OptionalVal[T]) Get

func (o *OptionalVal[T]) Get() (v T, present bool)

Get returns the current value, if the second return value is false, this means it is not set.

func (*OptionalVal[T]) Present

func (o *OptionalVal[T]) Present() bool

Present indicates whether the field value is set.

func (*OptionalVal[T]) Set

func (o *OptionalVal[T]) Set(v T)

Set sets the value.

type PreSchedEventResult

type PreSchedEventResult struct {
	// Delay indicates to delay the event after a while.
	// If `Delay` is 0, it means no delay, and then `OnSchedEvent` will be called.
	// Otherwise, after a while according to `Delay`, `OnPreSchedEvent` will be called again to
	// check whether to trigger the event.
	Delay time.Duration
	// EventData indicates the data should be passed to the event that should be triggered.
	// EventData can be used to store some pre-computed configurations of the next event.
	EventData []byte
}

PreSchedEventResult is the result of `OnPreSchedEvent`.

type SchedEventPolicy

type SchedEventPolicy interface {
	// NextEventTime returns the time to schedule the next timer event. If the second return value is true,
	// it means we have a next event schedule after `watermark`. Otherwise, it means there is no more event after `watermark`.
	NextEventTime(watermark time.Time) (time.Time, bool)
}

SchedEventPolicy is an interface to tell the runtime how to schedule a timer's events.

func CreateSchedEventPolicy

func CreateSchedEventPolicy(tp SchedPolicyType, expr string) (SchedEventPolicy, error)

CreateSchedEventPolicy creates a SchedEventPolicy according to `SchedPolicyType` and `SchedPolicyExpr`.

type SchedEventStatus

type SchedEventStatus string

SchedEventStatus is the current schedule status of timer's event.

const (
	// SchedEventIdle means the timer is not in trigger state currently.
	SchedEventIdle SchedEventStatus = "IDLE"
	// SchedEventTrigger means the timer is in trigger state.
	SchedEventTrigger SchedEventStatus = "TRIGGER"
)

type SchedIntervalPolicy

type SchedIntervalPolicy struct {
	// contains filtered or unexported fields
}

SchedIntervalPolicy implements SchedEventPolicy, it is the policy of type `SchedEventInterval`.

func NewSchedIntervalPolicy

func NewSchedIntervalPolicy(expr string) (*SchedIntervalPolicy, error)

NewSchedIntervalPolicy creates a new SchedIntervalPolicy.

func (*SchedIntervalPolicy) NextEventTime

func (p *SchedIntervalPolicy) NextEventTime(watermark time.Time) (time.Time, bool)

NextEventTime returns the next time of the timer event. A next event should be triggered after a time indicated by `interval` after watermark.

type SchedPolicyType

type SchedPolicyType string

SchedPolicyType is the type of the event schedule policy.

const (
	// SchedEventInterval indicates to schedule events every fixed interval.
	SchedEventInterval SchedPolicyType = "INTERVAL"
	// SchedEventCron indicates to schedule events by cron expression.
	SchedEventCron SchedPolicyType = "CRON"
)

type TimerClient

type TimerClient interface {
	// GetDefaultNamespace returns the default namespace of this client.
	GetDefaultNamespace() string
	// CreateTimer creates a new timer.
	CreateTimer(ctx context.Context, spec TimerSpec) (*TimerRecord, error)
	// GetTimerByID queries the timer by ID.
	GetTimerByID(ctx context.Context, timerID string) (*TimerRecord, error)
	// GetTimerByKey queries the timer by key.
	GetTimerByKey(ctx context.Context, key string) (*TimerRecord, error)
	// GetTimers queries timers by options.
	GetTimers(ctx context.Context, opts ...GetTimerOption) ([]*TimerRecord, error)
	// UpdateTimer updates a timer.
	UpdateTimer(ctx context.Context, timerID string, opts ...UpdateTimerOption) error
	// ManualTriggerEvent triggers event manually.
	ManualTriggerEvent(ctx context.Context, timerID string) (string, error)
	// CloseTimerEvent closes the triggering event of a timer.
	CloseTimerEvent(ctx context.Context, timerID string, eventID string, opts ...UpdateTimerOption) error
	// DeleteTimer deletes a timer.
	DeleteTimer(ctx context.Context, timerID string) (bool, error)
}

TimerClient is an interface exposed to user to manage timers.

func NewDefaultTimerClient

func NewDefaultTimerClient(store *TimerStore) TimerClient

NewDefaultTimerClient creates a new defaultTimerClient.

type TimerCond

type TimerCond struct {
	// ID indicates to filter the timer record with ID.
	ID OptionalVal[string]
	// Namespace indicates to filter the timer by Namespace.
	Namespace OptionalVal[string]
	// Key indicates to filter the timer record with ID.
	// The filter behavior is defined by `KeyPrefix`.
	Key OptionalVal[string]
	// KeyPrefix indicates how to filter with timer's key if `Key` is set.
	// If `KeyPrefix is` true, it will check whether the timer's key is prefixed with `TimerCond.Key`.
	// Otherwise, it will check whether the timer's key equals `TimerCond.Key`.
	KeyPrefix bool
	// Tags indicates to filter the timer record with specified tags.
	Tags OptionalVal[[]string]
}

TimerCond is the condition to filter a timer record.

func (*TimerCond) Clear

func (c *TimerCond) Clear()

Clear clears all fields.

func (*TimerCond) FieldsSet

func (c *TimerCond) FieldsSet(excludes ...unsafe.Pointer) (fields []string)

FieldsSet returns all fields that has been set exclude excludes.

func (*TimerCond) Match

func (c *TimerCond) Match(t *TimerRecord) bool

Match will return whether the condition match the timer record.

type TimerRecord

type TimerRecord struct {
	TimerSpec
	// ID is the id of timer, it is unique and auto assigned by the store when created.
	ID string
	// ManualRequest is the request to trigger timer event manually.
	ManualRequest
	// EventStatus indicates the current schedule status of the timer's event.
	EventStatus SchedEventStatus
	// EventID indicates the id of current triggered event.
	// If the `EventStatus` is `IDLE`, this value should be empty.
	EventID string
	// EventData indicates the data of current triggered event.
	// If the `EventStatus` is `IDLE`, this value should be empty.
	EventData []byte
	// EventStart indicates the start time of current triggered event.
	// If the `EventStatus` is `IDLE`, `EventStart.IsZero()` should returns true.
	EventStart time.Time
	// EventExtra stores some extra attributes for event
	EventExtra
	// SummaryData is a binary which is used to store some summary information of the timer.
	// User can update it when closing a timer's event to update the summary.
	SummaryData []byte
	// CreateTime is the creation time of the timer.
	CreateTime time.Time
	// Version is the version of the record, when the record updated, version will be increased.
	Version uint64
	// Location is used to get the alias of TiDB timezone.
	Location *time.Location
}

TimerRecord is the timer record saved in the timer store.

func (*TimerRecord) Clone

func (r *TimerRecord) Clone() *TimerRecord

Clone returns a cloned TimerRecord.

func (*TimerRecord) NextEventTime

func (r *TimerRecord) NextEventTime() (tm time.Time, _ bool, _ error)

NextEventTime returns the next time for timer to schedule

type TimerShedEvent

type TimerShedEvent interface {
	// EventID returns the event ID the current event.
	EventID() string
	// Timer returns the timer record object of the current event.
	Timer() *TimerRecord
}

TimerShedEvent is an interface which gives the timer's schedule event's information.

type TimerSpec

type TimerSpec struct {
	// Namespace is the namespace of the timer.
	Namespace string
	// Key is the key of the timer. Key is unique in each namespace.
	Key string
	// Tags is used to tag a timer.
	Tags []string
	// Data is a binary which is defined by user.
	Data []byte
	// TimeZone is the time zone name of the timer to evaluate the schedule policy.
	// If TimeZone is empty, it means to use the tidb cluster's time zone.
	TimeZone string
	// SchedPolicyType is the type of the event schedule policy.
	SchedPolicyType SchedPolicyType
	// SchedPolicyExpr is the expression of event schedule policy with the type specified by SchedPolicyType.
	SchedPolicyExpr string
	// HookClass is the class of the hook.
	HookClass string
	// Watermark indicates the progress the timer's event schedule.
	Watermark time.Time
	// Enable indicated whether the timer is enabled.
	// If it is false, the new timer event will not be scheduled even it is up to time.
	Enable bool
}

TimerSpec is the specification of a timer without any runtime status.

func (*TimerSpec) Clone

func (t *TimerSpec) Clone() *TimerSpec

Clone returns a cloned TimerSpec.

func (*TimerSpec) CreateSchedEventPolicy

func (t *TimerSpec) CreateSchedEventPolicy() (SchedEventPolicy, error)

CreateSchedEventPolicy creates a SchedEventPolicy according to `SchedPolicyType` and `SchedPolicyExpr`.

func (*TimerSpec) Validate

func (t *TimerSpec) Validate() error

Validate validates the TimerSpec.

type TimerStore

type TimerStore struct {
	TimerStoreCore
}

TimerStore extends TimerStoreCore to provide some extra methods for timer operations.

func NewMemoryTimerStore

func NewMemoryTimerStore() *TimerStore

NewMemoryTimerStore creates a memory store for timers

func (*TimerStore) GetByID

func (s *TimerStore) GetByID(ctx context.Context, timerID string) (record *TimerRecord, err error)

GetByID gets a timer by ID. If the timer with the specified ID not exists, an error `ErrTimerNotExist` will be returned.

func (*TimerStore) GetByKey

func (s *TimerStore) GetByKey(ctx context.Context, namespace, key string) (record *TimerRecord, err error)

GetByKey gets a timer by key. If the timer with the specified key not exists in the namespace, an error `ErrTimerNotExist` will be returned.

type TimerStoreCore

type TimerStoreCore interface {
	// Create creates a new record. If `record.ID` is empty, an id will be assigned automatically.
	// The first return value is the final id of the timer.
	Create(ctx context.Context, record *TimerRecord) (string, error)
	// List lists the timers that match the condition.
	List(ctx context.Context, cond Cond) ([]*TimerRecord, error)
	// Update updates a timer.
	Update(ctx context.Context, timerID string, update *TimerUpdate) error
	// Delete deletes a timer.
	Delete(ctx context.Context, timerID string) (bool, error)
	// WatchSupported indicates whether watch supported for this store.
	WatchSupported() bool
	// Watch watches all changes of the store. A chan will be returned to receive `WatchTimerResponse`
	// The returned chan be closed when the context in the argument is done.
	Watch(ctx context.Context) WatchTimerChan
	// Close closes the store
	Close()
}

TimerStoreCore is an interface, it contains several core methods of store.

type TimerUpdate

type TimerUpdate struct {
	// Tags indicates to set all tags for a timer.
	Tags OptionalVal[[]string]
	// Enable indicates to set the timer's `Enable` field.
	Enable OptionalVal[bool]
	// TimeZone indicates to set the timer's `TimeZone` field.
	TimeZone OptionalVal[string]
	// SchedPolicyType indicates to set the timer's `SchedPolicyType` field.
	SchedPolicyType OptionalVal[SchedPolicyType]
	// SchedPolicyExpr indicates to set the timer's `SchedPolicyExpr` field.
	SchedPolicyExpr OptionalVal[string]
	// ManualRequest indicates to set the timer's manual request.
	ManualRequest OptionalVal[ManualRequest]
	// EventStatus indicates the event status.
	EventStatus OptionalVal[SchedEventStatus]
	// EventID indicates to set the timer event id.
	EventID OptionalVal[string]
	// EventData indicates to set the timer event data.
	EventData OptionalVal[[]byte]
	// EventStart indicates the start time of event.
	EventStart OptionalVal[time.Time]
	// EventExtra indicates to set the `EventExtra` field.
	EventExtra OptionalVal[EventExtra]
	// Watermark indicates to set the timer's `Watermark` field.
	Watermark OptionalVal[time.Time]
	// SummaryData indicates to set the timer's `Summary` field.
	SummaryData OptionalVal[[]byte]
	// CheckVersion indicates to check the timer's version when updated.
	CheckVersion OptionalVal[uint64]
	// CheckEventID indicates to check the timer's eventID.
	CheckEventID OptionalVal[string]
}

TimerUpdate indicates how to update a timer.

func (*TimerUpdate) Clear

func (u *TimerUpdate) Clear()

Clear clears all fields.

func (*TimerUpdate) FieldsSet

func (u *TimerUpdate) FieldsSet(excludes ...unsafe.Pointer) (fields []string)

FieldsSet returns all fields that has been set exclude excludes.

type TimerWatchEventNotifier

type TimerWatchEventNotifier interface {
	// Watch watches all changes of the store. A chan will be returned to receive `WatchTimerResponse`
	// The returned chan be closed when the context in the argument is done.
	Watch(ctx context.Context) WatchTimerChan
	// Notify sends the event to watchers.
	Notify(tp WatchTimerEventType, timerID string)
	// Close closes the notifier.
	Close()
}

TimerWatchEventNotifier is used to notify timer watch events.

func NewMemTimerWatchEventNotifier

func NewMemTimerWatchEventNotifier() TimerWatchEventNotifier

NewMemTimerWatchEventNotifier creates a notifier with memory implement

type UpdateTimerOption

type UpdateTimerOption func(*TimerUpdate)

UpdateTimerOption is the option to update the timer.

func WithSetEnable

func WithSetEnable(enable bool) UpdateTimerOption

WithSetEnable indicates to set the timer's `Enable` field.

func WithSetSchedExpr

func WithSetSchedExpr(tp SchedPolicyType, expr string) UpdateTimerOption

WithSetSchedExpr indicates to set the timer's schedule policy.

func WithSetSummaryData

func WithSetSummaryData(summary []byte) UpdateTimerOption

WithSetSummaryData indicates to set the timer's summary.

func WithSetTags

func WithSetTags(tags []string) UpdateTimerOption

WithSetTags indicates to set the timer's tags.

func WithSetTimeZone

func WithSetTimeZone(name string) UpdateTimerOption

WithSetTimeZone sets the timezone of the timer

func WithSetWatermark

func WithSetWatermark(watermark time.Time) UpdateTimerOption

WithSetWatermark indicates to set the timer's watermark.

type WatchTimerChan

type WatchTimerChan <-chan WatchTimerResponse

WatchTimerChan is the chan of the watch timer.

type WatchTimerEvent

type WatchTimerEvent struct {
	// Tp indicates the event type.
	Tp WatchTimerEventType
	// TimerID indicates the timer id for the event.
	TimerID string
}

WatchTimerEvent is the watch event object.

type WatchTimerEventType

type WatchTimerEventType int8

WatchTimerEventType is the type of the watch event.

const (
	// WatchTimerEventCreate indicates that a new timer is created.
	WatchTimerEventCreate WatchTimerEventType = 1 << iota
	// WatchTimerEventUpdate indicates that a timer is updated.
	WatchTimerEventUpdate
	// WatchTimerEventDelete indicates that a timer is deleted.
	WatchTimerEventDelete
)

type WatchTimerResponse

type WatchTimerResponse struct {
	// Events contains all events in the response.
	Events []*WatchTimerEvent
}

WatchTimerResponse is the response of watch.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL