Documentation
¶
Index ¶
- Constants
- Variables
- func CalcNextRunTime(j Job) (time.Time, error)
- func FuncMapReadable() []map[string]string
- func GetNextRunTimeMax() (time.Time, error)
- func JobToPbJobPtr(j Job) (*pb.Job, error)
- func JobsToPbJobsPtr(js []Job) (*pb.Jobs, error)
- func RegisterFuncs(fps ...FuncPkg)
- func StateDump(j Job) ([]byte, error)
- type Broker
- type ClusterNode
- func (cn *ClusterNode) GetEndpointMain() string
- func (cn *ClusterNode) HANodeMap() TypeNodeMap
- func (cn *ClusterNode) IsMainNode() bool
- func (cn *ClusterNode) MainNode() map[string]any
- func (cn *ClusterNode) NodeMapCopy() TypeNodeMap
- func (cn *ClusterNode) NodeMapToPbNodesPtr() *pb.Nodes
- func (cn *ClusterNode) RPCHeartbeat(args *Node, reply *Node)
- func (cn *ClusterNode) RPCRegister(args *Node, reply *Node)
- func (cn *ClusterNode) RegisterNodeRemote(ctx context.Context) error
- func (cn *ClusterNode) SetEndpointMain(endpoint string)
- type FuncPkg
- type FuncUnregisteredError
- type HeartbeatArgs
- type HeartbeatReply
- type Job
- type JobNotFoundError
- type JobSlice
- type JobTimeoutError
- type Node
- type Queue
- type Raft
- type Role
- type Scheduler
- func (s *Scheduler) AddJob(j Job) (Job, error)
- func (s *Scheduler) DeleteAllJobs() error
- func (s *Scheduler) DeleteJob(id string) error
- func (s *Scheduler) GetAllJobs() ([]Job, error)
- func (s *Scheduler) GetJob(id string) (Job, error)
- func (s *Scheduler) Info() map[string]any
- func (s *Scheduler) IsBrokerMode() bool
- func (s *Scheduler) IsClusterMode() bool
- func (s *Scheduler) IsRunning() bool
- func (s *Scheduler) PauseJob(id string) (Job, error)
- func (s *Scheduler) ResumeJob(id string) (Job, error)
- func (s *Scheduler) RunJob(j Job) error
- func (s *Scheduler) ScheduleJob(j Job) error
- func (s *Scheduler) SetBroker(ctx context.Context, brk *Broker) error
- func (s *Scheduler) SetClusterNode(ctx context.Context, cn *ClusterNode) error
- func (s *Scheduler) SetStore(sto Store) error
- func (s *Scheduler) Start()
- func (s *Scheduler) Stop()
- func (s *Scheduler) UpdateJob(j Job) (Job, error)
- type Store
- type TypeNodeMap
- type VoteArgs
- type VoteReply
Constants ¶
const ( TYPE_DATETIME = "datetime" TYPE_INTERVAL = "interval" TYPE_CRON = "cron" )
constant indicating a job's type
const ( STATUS_RUNNING = "running" STATUS_PAUSED = "paused" )
constant indicating a job's status
const Version = "0.7.8"
Variables ¶
var FuncMap = make(map[string]FuncPkg)
Record the actual path of function and the corresponding function. Since golang can't serialize functions, need to register them with `RegisterFuncs` before using it.
var GetBroker = (*Scheduler).getBroker
var GetClusterNode = (*Scheduler).getClusterNode
var GetStore = (*Scheduler).getStore
Functions ¶
func CalcNextRunTime ¶
Calculate the next run time, different job type will be calculated in different ways, when the job is paused, will return `9999-09-09 09:09:09`.
func FuncMapReadable ¶
func GetNextRunTimeMax ¶
func RegisterFuncs ¶
func RegisterFuncs(fps ...FuncPkg)
Types ¶
type Broker ¶ added in v0.7.0
type Broker struct { // Job queues. // def: map[<queue>]Queue Queues map[string]Queue // Number of workers per queue. // Default: `2` WorkersPerQueue int // contains filtered or unexported fields }
When using a Broker, job scheduling is done in queue and no longer directly via API calls.
type ClusterNode ¶
type ClusterNode struct { // Main node RPC listening address. // If you are the main, `EndpointMain` is the same as `Endpoint`. // Default: `127.0.0.1:36380` EndpointMain string // The unique identifier of this node. // RPC listening address. // Used to expose the cluster's internal API. // Default: `127.0.0.1:36380` Endpoint string // gRPC listening address. // Used to expose the external API. // Default: `127.0.0.1:36360` EndpointGRPC string // HTTP listening address. // Used to expose the external API. // Default: `127.0.0.1:36370` EndpointHTTP string // Useful when a job specifies a queue. // A queue can correspond to multiple nodes. // Default: `default` Queue string // Node mode, for Scheduler high availability. // If the value is `HA`, the node will join the raft group. // Default: “, Options `HA` Mode string // Bind to each other and the Scheduler. Scheduler *Scheduler // For Scheduler high availability. // Bind to each other and the Raft. Raft *Raft // Used to mark the status of Cluster Scheduler operation. SchedulerCanStart bool // contains filtered or unexported fields }
Each node provides `Cluster RPC`, `gRPC`, `HTTP` services, but only the main node starts the scheduler, the other worker nodes register with the main node and then run jobs from the main node via the RPC's `RunJob` API.
func (*ClusterNode) GetEndpointMain ¶
func (cn *ClusterNode) GetEndpointMain() string
func (*ClusterNode) HANodeMap ¶
func (cn *ClusterNode) HANodeMap() TypeNodeMap
func (*ClusterNode) IsMainNode ¶
func (cn *ClusterNode) IsMainNode() bool
func (*ClusterNode) MainNode ¶
func (cn *ClusterNode) MainNode() map[string]any
func (*ClusterNode) NodeMapCopy ¶
func (cn *ClusterNode) NodeMapCopy() TypeNodeMap
func (*ClusterNode) NodeMapToPbNodesPtr ¶
func (cn *ClusterNode) NodeMapToPbNodesPtr() *pb.Nodes
Used to gRPC Protobuf
func (*ClusterNode) RPCHeartbeat ¶
func (cn *ClusterNode) RPCHeartbeat(args *Node, reply *Node)
RPC API
func (*ClusterNode) RPCRegister ¶
func (cn *ClusterNode) RPCRegister(args *Node, reply *Node)
RPC API
func (*ClusterNode) RegisterNodeRemote ¶
func (cn *ClusterNode) RegisterNodeRemote(ctx context.Context) error
Used for worker node
After initialization, node need to register with the main node and synchronize cluster node information.
func (*ClusterNode) SetEndpointMain ¶
func (cn *ClusterNode) SetEndpointMain(endpoint string)
type FuncUnregisteredError ¶
type FuncUnregisteredError string
func (FuncUnregisteredError) Error ¶
func (e FuncUnregisteredError) Error() string
type HeartbeatArgs ¶
type HeartbeatReply ¶
type HeartbeatReply struct {
Term int
}
type Job ¶
type Job struct { // The unique identifier of this job, automatically generated. // It should not be set manually. Id string `json:"id"` // User defined. Name string `json:"name"` // Optional: `TYPE_DATETIME` | `TYPE_INTERVAL` | `TYPE_CRON` Type string `json:"type"` // It can be used when Type is `TYPE_DATETIME`. StartAt string `json:"start_at"` // This field is useless. EndAt string `json:"end_at"` // It can be used when Type is `TYPE_INTERVAL`. Interval string `json:"interval"` // It can be used when Type is `TYPE_CRON`. CronExpr string `json:"cron_expr"` // Refer to `time.LoadLocation`. // Default: `UTC` Timezone string `json:"timezone"` // The job actually runs the function, // and you need to register it through 'RegisterFuncs' before using it. // Since it cannot be stored by serialization, // when using gRPC or HTTP calls, you should use `FuncName`. Func func(context.Context, Job) `json:"-"` // The actual path of `Func`. // This field has a higher priority than `Func` FuncName string `json:"func_name"` // Arguments for `Func`. Args map[string]any `json:"args"` // The running timeout of `Func`. // Default: `1h` Timeout string `json:"timeout"` // Used in cluster mode, if empty, randomly pick a node to run `Func`. // Used in broker mode, if empty, randomly pick a queue to run `Func`. Queues []string `json:"queues"` // Automatic update, not manual setting. LastRunTime time.Time `json:"last_run_time"` // Automatic update, not manual setting. // When the job is paused, this field is set to `9999-09-09 09:09:09`. NextRunTime time.Time `json:"next_run_time"` // Optional: `STATUS_RUNNING` | `STATUS_PAUSED` // It should not be set manually. Status string `json:"status"` }
Carry the information of the scheduled job
func (*Job) LastRunTimeWithTimezone ¶
func (*Job) NextRunTimeWithTimezone ¶
type JobNotFoundError ¶
type JobNotFoundError string
func (JobNotFoundError) Error ¶
func (e JobNotFoundError) Error() string
type JobTimeoutError ¶
func (*JobTimeoutError) Error ¶
func (e *JobTimeoutError) Error() string
type Queue ¶ added in v0.7.0
type Queue interface { // Initialization functions for each queue, // called when the scheduler run `SetBroker`. Init(ctx context.Context) error // Push job to this queue. PushJob(bJ []byte) error // Pull job from this queue. PullJob() <-chan []byte // Clear all resources bound to this queue. Clear() error }
Defines the interface that each queue must implement.
type Raft ¶
type Raft struct {
// contains filtered or unexported fields
}
func (*Raft) RPCHeartbeat ¶
func (rf *Raft) RPCHeartbeat(args HeartbeatArgs, reply *HeartbeatReply) error
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
In standalone mode, the scheduler only needs to run jobs on a regular basis. In cluster mode, the scheduler also needs to be responsible for allocating jobs to cluster nodes.
func (*Scheduler) DeleteAllJobs ¶
func (*Scheduler) GetAllJobs ¶
func (*Scheduler) IsBrokerMode ¶ added in v0.7.0
func (*Scheduler) IsClusterMode ¶
func (*Scheduler) ScheduleJob ¶
Select a worker node or queue.
func (*Scheduler) SetClusterNode ¶
func (s *Scheduler) SetClusterNode(ctx context.Context, cn *ClusterNode) error
Bind the cluster node
func (*Scheduler) Start ¶
func (s *Scheduler) Start()
In addition to being called manually, it is also called after `AddJob`.
type Store ¶
type Store interface { // Initialization functions for each store, // called when the scheduler run `SetStore`. Init() error // Add job to this store. AddJob(j Job) error // Get the job from this store. // @return error `JobNotFoundError` if there are no job. GetJob(id string) (Job, error) // Get all jobs from this store. GetAllJobs() ([]Job, error) // Update job in store with a newer version. UpdateJob(j Job) error // Delete the job from this store. DeleteJob(id string) error // Delete all jobs from this store. DeleteAllJobs() error // Get the earliest next run time of all the jobs stored in this store, // or `time.Time{}` if there are no job. // Used to set the wakeup interval for the scheduler. GetNextRunTime() (time.Time, error) // Clear all resources bound to this store. Clear() error }
Defines the interface that each store must implement.