Documentation ¶
Index ¶
- Constants
- type Event
- type EventQueue
- type Option
- type PersistentQueue
- func (q *PersistentQueue) Enqueue(eventContainer *eventsapi.EventContainer) (string, error)
- func (q *PersistentQueue) Retry(routingKey string) (int, error)
- func (q *PersistentQueue) Shutdown() error
- func (q *PersistentQueue) Start() error
- func (q *PersistentQueue) Status(routingKey string) ([]StatusItem, error)
- type StatusItem
Constants ¶
View Source
const StatusError = "error"
View Source
const StatusPending = "pending"
View Source
const StatusSuccess = "success"
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Event ¶
type Event struct { ID int `storm:"id,increment"` Key string `storm:"index"` RoutingKey string `storm:"index"` Status string `storm:"index"` Event *eventsapi.EventContainer ResponseBody []byte CreatedAt time.Time `storm:"index"` UpdatedAt time.Time `storm:"index"` }
Event represents an queued or processed event.
type EventQueue ¶
type EventQueue interface { Enqueue(*eventsapi.EventContainer, chan<- eventqueue.Response) error Shutdown() }
type Option ¶
type Option func(*PersistentQueue)
func WithEventQueue ¶
func WithEventQueue(eq EventQueue) Option
type PersistentQueue ¶
type PersistentQueue struct { DB *storm.DB Events storm.Node EventQueue EventQueue // contains filtered or unexported fields }
func NewPersistentQueue ¶
func NewPersistentQueue(options ...Option) *PersistentQueue
func (*PersistentQueue) Enqueue ¶
func (q *PersistentQueue) Enqueue(eventContainer *eventsapi.EventContainer) (string, error)
Enqueue adds an event to the persistent queue for processing.
Returns the event record's key along with any synchronous errors.
Only synchronous errors (e.g. invalid event) are supported as there are cases where we might not have a per-event response channel (e.g. processing a backlog).
func (*PersistentQueue) Retry ¶
func (q *PersistentQueue) Retry(routingKey string) (int, error)
Retries events that are in an error state, either for an routing key or for all events in error if none is provided.
func (*PersistentQueue) Shutdown ¶
func (q *PersistentQueue) Shutdown() error
Stop a `PersistentQueue`, performing any necessary cleanup.
func (*PersistentQueue) Start ¶
func (q *PersistentQueue) Start() error
func (*PersistentQueue) Status ¶
func (q *PersistentQueue) Status(routingKey string) ([]StatusItem, error)
Returns aggregate stats per routing key for pending and enqueued events.
Click to show internal directories.
Click to hide internal directories.