Documentation
¶
Overview ¶
Package tasker is a light distribute producer&consumer task model based on beego.
Task ¶
the main description of message or job. the state machine like this:
+----> failed | pending ----+----> running -----+----> success ^ | | v +---------------- retry
Index ¶
- Constants
- Variables
- func Consume(topic string, fn ConsumeFn, concurency ...int) (int, error)
- func FQDN() string
- func Init(MachineID func() (uint16, error), CheckMachineID func(uint16) bool) (err error)
- func InitAllTask()
- func InitIDGEN(MachineID func() (uint16, error), CheckMachineID func(uint16) bool) error
- func MsgQConsume(m MsgQ) error
- func MsgQInitTask(m MsgQ)
- func MsgQPublish(m MsgQ) error
- func MsgQPublishWithRetry(m MsgQ, timeout time.Duration, retry int) error
- func RegisterModel()
- func RegisterTask(item MsgQ)
- func SetStats(s Stats)
- func Stat(topic, status string, duration time.Duration)
- type ConsumeFn
- type Core
- type MsgQ
- type Stats
- type Task
Constants ¶
const ( TaskStatPending = "pending" TaskStatRunning = "running" TaskStatRetry = "retry" TaskStatFailed = "failed" TaskStatSuccess = "success" )
Variables ¶
var InstanceID uint16
InstanceID is the tasker instance uniq key.
var IsMaster bool
IsMaster when true, the instance is master instance.
var RegisteredTask map[string]MsgQ
RegisteredTask is a map record all consumer task
var UniqID *sonyflake.Sonyflake
UniqID use for generate worker id.
Functions ¶
func FQDN ¶
func FQDN() string
FQDN Get Fully Qualified Domain Name returns "unknown" or hostanme in case of error
func Init ¶
Init will initialize the tasker instance, include:
- generate the InstanceID use MachineID func, use instance private ip address when MachineID is nil
- start race master in goroutine
- initialize all task
func InitAllTask ¶
func InitAllTask()
InitAllTask will init all RegisteredTask to beego toolbox, this will start consume task
func MsgQInitTask ¶
func MsgQInitTask(m MsgQ)
MsgQInitTask add consume task to beego toolbox task, run consumer interval.
func MsgQPublishWithRetry ¶
MsgQPublishWithRetry represents publish with retry and timeout set
func RegisterModel ¶
func RegisterModel()
func RegisterTask ¶
func RegisterTask(item MsgQ)
RegisterTask is used for consumer task at init func, register them self to the tasker
Types ¶
type Core ¶
type Core struct { Id int MasterInstanceID uint16 `orm:"column(master_instance_id)"` MasterFQDN string `orm:"column(master_fqdn)"` Updated time.Time `orm:"auto_now"` MasterOutOfDate int64 // ms InstanceHeartbeat int64 // ms }
Core is the task package config table.
`MasterOutOfDate` means the time master state can be save, instance can race to be master when the duration of `Updated` to now bigger than this. `InstanceHeartbeat` is the max interval Instance should be check Master, should be less than `MasterOutOfDate`
type MsgQ ¶
type MsgQ interface { New() MsgQ Topic() string TaskSpec() string Concurency() int Exec(uint64) error }
MsgQ is interface for msg, all messages should be implements these interfaces.
type Task ¶
type Task struct { Id int Topic string Status string Timeout int // 超时ms Retry int Input string `orm:"type(text)"` WorkerId uint64 Created time.Time `orm:"auto_now_add"` Updated time.Time `orm:"auto_now"` Log string `orm:"type(text)"` }
Task is the core object for tasker package.
func NewSimpleTask ¶
NewSimpleTask is new task with default settings.