Documentation
¶
Index ¶
- Constants
- Variables
- type Configuration
- type ConnectionHandler
- type Consumer
- func (con *Consumer) Bury(id uint64, pri uint32) error
- func (con *Consumer) Close() error
- func (con *Consumer) ConnectionHandler() ConnectionHandler
- func (con *Consumer) Delete(id uint64) error
- func (con *Consumer) Init() error
- func (con *Consumer) ListTubes() ([]string, error)
- func (con *Consumer) OnEndConsume()
- func (con *Consumer) OnHeartbeat()
- func (con *Consumer) OnReserveTimeout()
- func (con *Consumer) OnStartConsume()
- func (con *Consumer) Release(id uint64, pri uint32, delay time.Duration) error
- func (con *Consumer) Reserve(timeout time.Duration) (id uint64, body []byte, err error)
- func (con *Consumer) SetEventHandler(handler EventHandler)
- func (con *Consumer) SetTaskPayloadHandler(handler common.TaskPayloadHandler)
- func (con *Consumer) StartConsumer() error
- func (con *Consumer) StopConsumer()
- func (con *Consumer) TaskEventChannel() chan<- *common.TaskProcessEvent
- func (con *Consumer) Touch(id uint64) error
- type EventHandler
- type Handler
Constants ¶
View Source
const ( //Channel size to allocate. It is important for task implementation to send event //asynchronously to avoid blocking the execution thread TaskEventChannelSize = 1 )
Variables ¶
View Source
var ErrTimeout = errors.New("timeout")
hard coded to avoid dependency on go-beanstalkd library only for one constant
View Source
var WireSet = wire.NewSet(NewConsumer, NewConfiguration, wire.Bind(new(Handler), new(*Consumer)))
Functions ¶
This section is empty.
Types ¶
type Configuration ¶
type Configuration struct { //Waiting time for consumer reserve WaitForConsumerReserve time.Duration //Waiting time for quit signal timeout Heartbeat time.Duration ReleasePriority uint32 ReleaseDelay time.Duration BuryPriority uint32 }
Configuration stores initialization data for worker server
func NewConfiguration ¶
func NewConfiguration() *Configuration
type ConnectionHandler ¶
type ConnectionHandler interface { Reserve(timeout time.Duration) (id uint64, body []byte, err error) Release(id uint64, pri uint32, delay time.Duration) error Delete(id uint64) error Bury(id uint64, pri uint32) error Touch(id uint64) error Put(body []byte, pri uint32, delay, ttr time.Duration) (id uint64, err error) ListTubes() ([]string, error) Close() error }
type Consumer ¶
type Consumer struct { *Configuration // contains filtered or unexported fields }
Consumer stores configuration for consumer activation
func NewConsumer ¶
func NewConsumer(config *Configuration, connectorHandler connector.Handler, connectionHandler ConnectionHandler) *Consumer
NewConsumer creates consumer instance with given Handler
func (*Consumer) ConnectionHandler ¶
func (con *Consumer) ConnectionHandler() ConnectionHandler
func (*Consumer) OnEndConsume ¶
func (con *Consumer) OnEndConsume()
func (*Consumer) OnHeartbeat ¶
func (con *Consumer) OnHeartbeat()
func (*Consumer) OnReserveTimeout ¶
func (con *Consumer) OnReserveTimeout()
func (*Consumer) OnStartConsume ¶
func (con *Consumer) OnStartConsume()
func (*Consumer) SetEventHandler ¶
func (con *Consumer) SetEventHandler(handler EventHandler)
func (*Consumer) SetTaskPayloadHandler ¶
func (con *Consumer) SetTaskPayloadHandler(handler common.TaskPayloadHandler)
func (*Consumer) StartConsumer ¶
StartConsumer starts consumer thread
func (*Consumer) StopConsumer ¶
func (con *Consumer) StopConsumer()
StopConsumer stops consumer thread
func (*Consumer) TaskEventChannel ¶
func (con *Consumer) TaskEventChannel() chan<- *common.TaskProcessEvent
type EventHandler ¶
type EventHandler interface { OnStartConsume() OnEndConsume() OnReserveTimeout() OnHeartbeat() }
type Handler ¶
type Handler interface { Init() error Close() error ConnectionHandler() ConnectionHandler TaskEventChannel() chan<- *common.TaskProcessEvent SetEventHandler(handler EventHandler) SetTaskPayloadHandler(handler common.TaskPayloadHandler) StartConsumer() error StopConsumer() }
Click to show internal directories.
Click to hide internal directories.