Documentation ¶
Index ¶
- func AdjustRoutingKey(b Interface, s *tasks.Signature)
- func IsAMQP(b Interface) bool
- type AMQPBroker
- type Broker
- func (b *Broker) GetConfig() *config.Config
- func (b *Broker) GetPendingTasks(queue string) ([]*tasks.Signature, error)
- func (b *Broker) IsTaskRegistered(name string) bool
- func (b *Broker) Publish(signature *tasks.Signature) error
- func (b *Broker) SetRegisteredTaskNames(names []string)
- func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcessor TaskProcessor) (bool, error)
- func (b *Broker) StopConsuming()
- type EagerBroker
- func (eagerBroker *EagerBroker) AssignWorker(w TaskProcessor)
- func (eagerBroker *EagerBroker) GetPendingTasks(queue string) ([]*tasks.Signature, error)
- func (eagerBroker *EagerBroker) Publish(task *tasks.Signature) error
- func (eagerBroker *EagerBroker) StartConsuming(consumerTag string, concurrency int, p TaskProcessor) (bool, error)
- func (eagerBroker *EagerBroker) StopConsuming()
- type EagerMode
- type ErrCouldNotUnmarshaTaskSignature
- type Interface
- type RedisBroker
- type TaskProcessor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AdjustRoutingKey ¶ added in v1.0.14
AdjustRoutingKey makes sure the routing key is correct. If the routing key is an empty string: a) set it to binding key for direct exchange type b) set it to default queue name
Types ¶
type AMQPBroker ¶
type AMQPBroker struct { Broker common.AMQPConnector // contains filtered or unexported fields }
AMQPBroker represents an AMQP broker
func (*AMQPBroker) Publish ¶
func (b *AMQPBroker) Publish(signature *tasks.Signature) error
Publish places a new message on the default queue
func (*AMQPBroker) StartConsuming ¶
func (b *AMQPBroker) StartConsuming(consumerTag string, concurrency int, taskProcessor TaskProcessor) (bool, error)
StartConsuming enters a loop and waits for incoming messages
func (*AMQPBroker) StopConsuming ¶
func (b *AMQPBroker) StopConsuming()
StopConsuming quits the loop
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker represents a base broker structure
func (*Broker) GetPendingTasks ¶
GetPendingTasks returns a slice of task.Signatures waiting in the queue
func (*Broker) IsTaskRegistered ¶
IsTaskRegistered returns true if the task is registered with this broker
func (*Broker) SetRegisteredTaskNames ¶
SetRegisteredTaskNames sets registered task names
func (*Broker) StartConsuming ¶ added in v1.0.14
func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcessor TaskProcessor) (bool, error)
StartConsuming enters a loop and waits for incoming messages
func (*Broker) StopConsuming ¶ added in v1.0.14
func (b *Broker) StopConsuming()
StopConsuming quits the loop
type EagerBroker ¶
type EagerBroker struct { Broker // contains filtered or unexported fields }
EagerBroker represents an "eager" in-memory broker
func (*EagerBroker) AssignWorker ¶
func (eagerBroker *EagerBroker) AssignWorker(w TaskProcessor)
AssignWorker assigns a worker to the eager broker
func (*EagerBroker) GetPendingTasks ¶
func (eagerBroker *EagerBroker) GetPendingTasks(queue string) ([]*tasks.Signature, error)
GetPendingTasks returns a slice of task.Signatures waiting in the queue
func (*EagerBroker) Publish ¶
func (eagerBroker *EagerBroker) Publish(task *tasks.Signature) error
Publish places a new message on the default queue
func (*EagerBroker) StartConsuming ¶
func (eagerBroker *EagerBroker) StartConsuming(consumerTag string, concurrency int, p TaskProcessor) (bool, error)
StartConsuming enters a loop and waits for incoming messages
func (*EagerBroker) StopConsuming ¶
func (eagerBroker *EagerBroker) StopConsuming()
StopConsuming quits the loop
type EagerMode ¶
type EagerMode interface {
AssignWorker(p TaskProcessor)
}
EagerMode interface with methods specific for this broker
type ErrCouldNotUnmarshaTaskSignature ¶ added in v1.0.12
type ErrCouldNotUnmarshaTaskSignature struct {
// contains filtered or unexported fields
}
ErrCouldNotUnmarshaTaskSignature ...
func NewErrCouldNotUnmarshaTaskSignature ¶ added in v1.0.12
func NewErrCouldNotUnmarshaTaskSignature(msg []byte, err error) ErrCouldNotUnmarshaTaskSignature
NewErrCouldNotUnmarshaTaskSignature returns new NewErrCouldNotUnmarshaTaskSignature instance
func (ErrCouldNotUnmarshaTaskSignature) Error ¶ added in v1.0.12
func (e ErrCouldNotUnmarshaTaskSignature) Error() string
Error implements the error interface
type Interface ¶
type Interface interface { GetConfig() *config.Config SetRegisteredTaskNames(names []string) IsTaskRegistered(name string) bool StartConsuming(consumerTag string, concurrency int, p TaskProcessor) (bool, error) StopConsuming() Publish(task *tasks.Signature) error GetPendingTasks(queue string) ([]*tasks.Signature, error) }
Interface - a common interface for all brokers
func NewAMQPBroker ¶
NewAMQPBroker creates new AMQPBroker instance
func NewEagerBroker ¶
func NewEagerBroker() Interface
NewEagerBroker creates new EagerBroker instance
type RedisBroker ¶
type RedisBroker struct { Broker common.RedisConnector // contains filtered or unexported fields }
RedisBroker represents a Redis broker
func (*RedisBroker) GetPendingTasks ¶
func (b *RedisBroker) GetPendingTasks(queue string) ([]*tasks.Signature, error)
GetPendingTasks returns a slice of task signatures waiting in the queue
func (*RedisBroker) Publish ¶
func (b *RedisBroker) Publish(signature *tasks.Signature) error
Publish places a new message on the default queue
func (*RedisBroker) StartConsuming ¶
func (b *RedisBroker) StartConsuming(consumerTag string, concurrency int, taskProcessor TaskProcessor) (bool, error)
StartConsuming enters a loop and waits for incoming messages
func (*RedisBroker) StopConsuming ¶
func (b *RedisBroker) StopConsuming()
StopConsuming quits the loop
type TaskProcessor ¶
TaskProcessor - can process a delivered task This will probably always be a worker instance