task

package
v3.8.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2024 License: Apache-2.0 Imports: 11 Imported by: 1

Documentation

Index

Constants

View Source
const (
	NodePoolStateSteady  = "NodePoolStateSteady"
	NodePoolStateUpgrade = "NodePoolStateUpgrade"
)

Variables

View Source
var (
	ErrNodePoolIsUpgrading = errors.New("nodePool is upgrading")
	ErrNodePoolIsNil       = errors.New("nodePool is nil")
)
View Source
var (
	ErrJobExist     = errors.New("jobName already exist")
	ErrJobNotExist  = errors.New("jobName not exist")
	ErrJobWrongNode = errors.New("job is not running in this node")
)

Functions

This section is empty.

Types

type INodePool added in v3.8.0

type INodePool interface {
	Start(ctx context.Context) error
	CheckJobAvailable(jobName string) (bool, error)
	Stop(ctx context.Context) error

	GetNodeID() string
	GetLastNodesUpdateTime() time.Time
}

func NewNodePool added in v3.8.0

func NewNodePool(
	serviceName string,
	drv driver.DriverV2,
	updateDuration time.Duration,
	hashReplicas int,
) INodePool

type IRecentJobPacker added in v3.8.0

type IRecentJobPacker interface {
	// goroutine safety.
	// Add a job to packer
	// will save recent jobs (like 2 * heartbeat duration)
	AddJob(jobName string, t time.Time) error

	// goroutine safety.
	// Pop out all jobs in packer
	PopAllJobs() (jobNames []string)
}

IRecentJobPacker this is an interface which be used to pack the jobs running in the cluster state is `unstable`. like some nodes broken or new nodes were add.

func NewRecentJobPacker added in v3.8.0

func NewRecentJobPacker(timeWindow time.Duration) IRecentJobPacker

type Job

type Job interface {
	Run()
}

Job Interface

type JobWarpper

type JobWarpper struct {
	ID      cron.EntryID
	Task    *Task
	Name    string
	CronStr string
	Job     Job
}

JobWarpper is a job warpper

func (JobWarpper) Execute added in v3.8.0

func (job JobWarpper) Execute()

func (JobWarpper) Run

func (job JobWarpper) Run()

Run is run job

type JobWithTime added in v3.8.0

type JobWithTime struct {
	JobName     string
	RunningTime time.Time
}

type JobWithTimeHeap added in v3.8.0

type JobWithTimeHeap []JobWithTime

func (*JobWithTimeHeap) Index added in v3.8.0

func (jobHeap *JobWithTimeHeap) Index(i int) interface{}

func (*JobWithTimeHeap) Len added in v3.8.0

func (jobHeap *JobWithTimeHeap) Len() int

func (*JobWithTimeHeap) Less added in v3.8.0

func (jobHeap *JobWithTimeHeap) Less(i, j int) bool

func (*JobWithTimeHeap) Pop added in v3.8.0

func (jobHeap *JobWithTimeHeap) Pop() (ret interface{})

func (*JobWithTimeHeap) Push added in v3.8.0

func (jobHeap *JobWithTimeHeap) Push(x interface{})

func (*JobWithTimeHeap) Swap added in v3.8.0

func (jobHeap *JobWithTimeHeap) Swap(i, j int)

type NodePool

type NodePool struct {
	// contains filtered or unexported fields
}

NodePool For cluster steable. NodePool has 2 states:

  1. Steady If this nodePoolLists is the same as the last update, we will mark this node's state to Steady. In this state, this node can run jobs.
  2. Upgrade If this nodePoolLists is different to the last update, we will mark this node's state to Upgrade. In this state, this node can not run jobs.

func (*NodePool) CheckJobAvailable added in v3.8.0

func (np *NodePool) CheckJobAvailable(jobName string) (bool, error)

Check if this job can be run in this node.

func (*NodePool) GetLastNodesUpdateTime added in v3.8.0

func (np *NodePool) GetLastNodesUpdateTime() time.Time

func (*NodePool) GetNodeID added in v3.8.0

func (np *NodePool) GetNodeID() string

func (*NodePool) Start added in v3.8.0

func (np *NodePool) Start(ctx context.Context) (err error)

func (*NodePool) Stop added in v3.8.0

func (np *NodePool) Stop(ctx context.Context) error

type Option

type Option func(*Task)

Option is Task Option

func CronOptionChain

func CronOptionChain(wrappers ...cron.JobWrapper) Option

CronOptionChain is Warp cron with chain

func CronOptionLocation

func CronOptionLocation(loc *time.Location) Option

CronOptionLocation is warp cron with location

func CronOptionParser

func CronOptionParser(p cron.ScheduleParser) Option

CronOptionParser is warp cron with schedules.

func CronOptionSeconds

func CronOptionSeconds() Option

CronOptionSeconds is warp cron with seconds

func RunningLocally added in v3.8.0

func RunningLocally() Option

func WithClusterStable added in v3.8.0

func WithClusterStable(timeWindow time.Duration) Option

You can use this option to start the recent jobs rerun after the cluster upgrading.

func WithHashReplicas

func WithHashReplicas(d int) Option

WithHashReplicas set hashReplicas

func WithNodeUpdateDuration

func WithNodeUpdateDuration(d time.Duration) Option

WithNodeUpdateDuration set node update duration

func WithRecoverFunc

func WithRecoverFunc(recoverFunc RecoverFuncType) Option

You can defined yourself recover function to make the job will be added to your task when the process restart

type RecentJobPacker added in v3.8.0

type RecentJobPacker struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*RecentJobPacker) AddJob added in v3.8.0

func (rjp *RecentJobPacker) AddJob(jobName string, t time.Time) (err error)

func (*RecentJobPacker) PopAllJobs added in v3.8.0

func (rjp *RecentJobPacker) PopAllJobs() (jobNames []string)

type RecoverFuncType

type RecoverFuncType func(d *Task)

type StableJob

type StableJob interface {
	Job
	GetCron() string
	Serialize() ([]byte, error)
	UnSerialize([]byte) error
}

This type of Job will be recovered in a node of service restarting.

type Task

type Task struct {
	ServerName string

	RecoverFunc RecoverFuncType
	// contains filtered or unexported fields
}

Task is main struct

func NewTask

func NewTask(serverName string, driver driver.DriverV2, cronOpts ...cron.Option) *Task

NewTask create a Task

func NewTaskWithOption

func NewTaskWithOption(serverName string, driver driver.DriverV2, taskOpts ...Option) *Task

NewTaskWithOption create a Task with Task Option

func (*Task) AddFunc

func (d *Task) AddFunc(jobName, cronStr string, cmd func()) (err error)

AddFunc add a cron func

func (*Task) AddJob

func (d *Task) AddJob(jobName, cronStr string, job Job) (err error)

AddJob add a job

func (*Task) GetJob added in v3.8.0

func (d *Task) GetJob(jobName string, thisNodeOnly bool) (*JobWarpper, error)

Get job by jobName if this jobName not exist, will return error.

if `thisNodeOnly` is true
	if this job is not available in this node, will return error.
otherwise return the struct of JobWarpper whose name is jobName.

func (*Task) GetJobs added in v3.8.0

func (d *Task) GetJobs(thisNodeOnly bool) []*JobWarpper

Get job list.

if `thisNodeOnly` is true
	return all jobs available in this node.
otherwise return all jobs added to task.

we never return nil. If there is no job. this func will return an empty slice.

func (*Task) NodeID added in v3.8.0

func (d *Task) NodeID() string

func (*Task) Remove

func (d *Task) Remove(jobName string)

Remove Job by jobName

func (*Task) Run

func (d *Task) Run()

Run Job

func (*Task) Start

func (d *Task) Start()

Start job

func (*Task) Stop

func (d *Task) Stop()

Stop job

Directories

Path Synopsis
Package cron implements a cron spec parser and job runner.
Package cron implements a cron spec parser and job runner.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL