Documentation ¶
Index ¶
- Constants
- Variables
- func TestOnly_ResetDb(cfg *MySQLConfig)
- type CompareFunc
- type Config
- type ConsumerInterface
- type EncoderPool
- type Event
- type EventStatus
- type Heap
- type HeapItem
- type MySQLConfig
- type MySQLConsumer
- type MySQLStore
- func (self *MySQLStore) Cancel(evId string) error
- func (self *MySQLStore) Close()
- func (self *MySQLStore) GetDb() *sql.DB
- func (self *MySQLStore) GetStat(reset bool) map[string]interface{}
- func (self *MySQLStore) Open() error
- func (self *MySQLStore) Save(ev *Event) string
- func (self *MySQLStore) UpdateForRetry(ev *Event, retryParam interface{}) error
- func (self *MySQLStore) UpdateStatus(evId string, status EventStatus) error
- type NoTrigger
- type PQ
- type PQItem
- type Queue
- func (self *Queue) Cancel(evId string) error
- func (self *Queue) Create(triggerType string, triggerTime time.Time, data interface{}) string
- func (self *Queue) GetStat() map[string]interface{}
- func (self *Queue) Populate(store StoreInterface, consumer ConsumerInterface) (*Queue, error)
- func (self *Queue) Start() error
- func (self *Queue) Stop()
- type QueueDepsContainer
- type Scheduler
- type SchedulerConfig
- type SchedulerDepsContainer
- type Seq32
- type Stat
- type StatInterface
- type StoreInterface
- type TriggerInterface
- type TriggerResult
Constants ¶
View Source
const ( EventStatus_DEFAULT = 1 + iota EventStatus_OK EventStatus_CANCEL EventStatus_ERROR EventStatus_RETRY )
View Source
const ( SQL_TMPL_CREATE_DATABASE = `CREATE DATABASE IF NOT EXISTS %s` SQL_TMPL_CREATE_TABLE = `` /* 340-byte string literal not displayed */ )
View Source
const SEQ_MASK_INT32 = 0x7fffffff
Variables ¶
View Source
var ( SQL_SAVE_EVENT string SQL_DELETE_EVENT string SQL_UPDATE_EVENT_STATUS string SQL_UPDATE_EVENT_FOR_RETRY string SQL_RESET_DELAYED_EVENTS string SQL_DECLARE_OWNERSHIP string SQL_SELECT_EVENTS string )
View Source
var Encoder = newEncoderPool()
View Source
var SchedulerDeps = &SchedulerDepsContainer{}
Functions ¶
Types ¶
type CompareFunc ¶
type Config ¶
type Config struct { StatIntervalSec int `json:"stat_interval_sec"` SchedulerConfig MySQLConfig }
func DefaultConfig ¶
func DefaultConfig() *Config
type ConsumerInterface ¶
type ConsumerInterface interface { Start() Stop() Events() <-chan []*Event }
type EncoderPool ¶
type EncoderPool struct {
// contains filtered or unexported fields
}
func (*EncoderPool) Marshal ¶
func (self *EncoderPool) Marshal(v interface{}) ([]byte, error)
type Event ¶
type Event struct { Id string TriggerType string TriggerTime time.Time Owner string Attempts int Status EventStatus Created time.Time Updated time.Time Completed time.Time Locked time.Time Data interface{} // contains filtered or unexported fields }
func TestOnly_SelectEvents ¶
func TestOnly_SelectEvents(cfg *MySQLConfig) []*Event
type EventStatus ¶
type EventStatus uint32
func (EventStatus) String ¶
func (self EventStatus) String() string
type HeapItem ¶
type HeapItem struct { // public Value interface{} // contains filtered or unexported fields }
func NewHeapItem ¶
type MySQLConfig ¶
type MySQLConfig struct { MySQL6 bool `json:"mysql6"` User string `json:"username"` Pass string `json:"password"` Host string `json:"host"` Port int `json:"port"` DbName string `json:"db_name"` TableName string `json:"table_name"` MaxOpenConnection int `json:"max_open_connection"` ConsumerName string `json:"queue_name"` ConsumerLockTimeoutSec int `json:"consumer_lock_timeout_sec"` ConsumerTimeWindowSec int `json:"consumer_time_window_sec"` ConsumerSelectLimit int `json:"consumer_select_limit"` ConsumerSleepMSec int `json:"consumer_sleep_msec"` }
type MySQLConsumer ¶
type MySQLConsumer struct {
// contains filtered or unexported fields
}
func NewMySQLConsumer ¶
func NewMySQLConsumer(cfg *Config, store *MySQLStore) *MySQLConsumer
func (*MySQLConsumer) Events ¶
func (self *MySQLConsumer) Events() <-chan []*Event
func (*MySQLConsumer) GetStat ¶
func (self *MySQLConsumer) GetStat(reset bool) map[string]interface{}
func (*MySQLConsumer) Start ¶
func (self *MySQLConsumer) Start()
func (*MySQLConsumer) Stop ¶
func (self *MySQLConsumer) Stop()
type MySQLStore ¶
type MySQLStore struct {
// contains filtered or unexported fields
}
func NewMySQLStore ¶
func NewMySQLStore(cfg *Config) *MySQLStore
func (*MySQLStore) Cancel ¶
func (self *MySQLStore) Cancel(evId string) error
func (*MySQLStore) Close ¶
func (self *MySQLStore) Close()
func (*MySQLStore) GetDb ¶
func (self *MySQLStore) GetDb() *sql.DB
func (*MySQLStore) GetStat ¶
func (self *MySQLStore) GetStat(reset bool) map[string]interface{}
func (*MySQLStore) Open ¶
func (self *MySQLStore) Open() error
func (*MySQLStore) Save ¶
func (self *MySQLStore) Save(ev *Event) string
func (*MySQLStore) UpdateForRetry ¶
func (self *MySQLStore) UpdateForRetry(ev *Event, retryParam interface{}) error
func (*MySQLStore) UpdateStatus ¶
func (self *MySQLStore) UpdateStatus(evId string, status EventStatus) error
type NoTrigger ¶
type NoTrigger struct{}
default trigger
func (*NoTrigger) Trigger ¶
func (self *NoTrigger) Trigger(ev *Event) *TriggerResult
type PQ ¶
type PQ struct {
// contains filtered or unexported fields
}
func (*PQ) Push ¶
meanings of returned values index >= 0, poped == nil: queue is not full, new item has been pushed index >= 0, poped != nil: queue is full, new item has been pushed, the item with lowest priority has been poped index < 0, poped == nil: not possible for PQ (only if there is a wrong usage/implementation e.g. we set max == 0 ...) index < 0, poped != nil: queue is full, new item has not been pushed because it has the lowest priority
type Queue ¶
type Queue struct { QueueDepsContainer `inject:"inline"` // contains filtered or unexported fields }
func CreateCustomQueue ¶
func CreateCustomQueue(cfg *Config, triggers map[string]TriggerInterface) *Queue
func CreateQueue ¶
func CreateQueue(cfg *Config, triggers map[string]TriggerInterface) (*Queue, error)
func (*Queue) Populate ¶
func (self *Queue) Populate(store StoreInterface, consumer ConsumerInterface) (*Queue, error)
type QueueDepsContainer ¶
type QueueDepsContainer struct { Store StoreInterface `inject:""` Consumer ConsumerInterface `inject:""` }
type Scheduler ¶
type Scheduler struct { SchedulerDepsContainer `inject:"inline"` // contains filtered or unexported fields }
type SchedulerConfig ¶
type SchedulerDepsContainer ¶
type SchedulerDepsContainer struct {
Store StoreInterface `inject:""`
}
type Stat ¶
type Stat struct {
// contains filtered or unexported fields
}
func (*Stat) Add ¶
func (self *Stat) Add(s StatInterface)
type StatInterface ¶
type StoreInterface ¶
type TriggerInterface ¶
type TriggerInterface interface {
Trigger(ev *Event) *TriggerResult
}
type TriggerResult ¶
type TriggerResult struct { Status EventStatus TriggerTime time.Time Data interface{} }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.