Documentation ¶
Index ¶
- type FindOptions
- type InternalFilter
- type MemoryStorage
- func (storage *MemoryStorage) Ack(_ context.Context, msg *message.Message) (modifiedCount int64, err error)
- func (storage *MemoryStorage) Close(ctx context.Context) error
- func (storage *MemoryStorage) Count(_ context.Context, opts *FindOptions) (int64, error)
- func (storage *MemoryStorage) EditQueueConfiguration(_ context.Context, configuration *configuration.QueueConfiguration) error
- func (storage *MemoryStorage) Find(_ context.Context, opt *FindOptions) ([]message.Message, error)
- func (storage *MemoryStorage) Flush(_ context.Context) (deletedCount int64, err error)
- func (storage *MemoryStorage) GetQueueConfiguration(_ context.Context, queue string) (*configuration.QueueConfiguration, error)
- func (storage *MemoryStorage) GetStringInternalId(_ context.Context, message *message.Message) string
- func (storage *MemoryStorage) Insert(_ context.Context, messages ...*message.Message) (int64, int64, error)
- func (storage *MemoryStorage) ListQueueConfigurations(ctx context.Context) ([]*configuration.QueueConfiguration, error)
- func (storage *MemoryStorage) ListQueueNames(_ context.Context) (queues []string, err error)
- func (storage *MemoryStorage) ListQueuePrefixes(_ context.Context) (queues []string, err error)
- func (storage *MemoryStorage) Nack(_ context.Context, msg *message.Message) (modifiedCount int64, err error)
- func (storage *MemoryStorage) Remove(_ context.Context, queue string, ids ...string) (deleted int64, err error)
- type MongoStorage
- func (storage *MongoStorage) Ack(ctx context.Context, message *message.Message) (modifiedCount int64, err error)
- func (storage *MongoStorage) Close(ctx context.Context) error
- func (storage *MongoStorage) Count(ctx context.Context, opt *FindOptions) (int64, error)
- func (storage *MongoStorage) EditQueueConfiguration(ctx context.Context, configuration *configuration.QueueConfiguration) error
- func (storage *MongoStorage) Find(ctx context.Context, opt *FindOptions) ([]message.Message, error)
- func (storage *MongoStorage) Flush(ctx context.Context) (int64, error)
- func (storage *MongoStorage) GetQueueConfiguration(ctx context.Context, queue string) (*configuration.QueueConfiguration, error)
- func (storage *MongoStorage) GetStringInternalId(_ context.Context, message *message.Message) string
- func (storage *MongoStorage) Insert(ctx context.Context, messages ...*message.Message) (insertedCount int64, modifiedCount int64, err error)
- func (storage *MongoStorage) ListQueueConfigurations(ctx context.Context) ([]*configuration.QueueConfiguration, error)
- func (storage *MongoStorage) ListQueueNames(ctx context.Context) (queues []string, err error)
- func (storage *MongoStorage) ListQueuePrefixes(ctx context.Context) (queues []string, err error)
- func (storage *MongoStorage) Nack(ctx context.Context, message *message.Message) (modifiedCount int64, err error)
- func (storage *MongoStorage) Remove(ctx context.Context, queue string, ids ...string) (deleted int64, err error)
- type Storage
- type Type
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type FindOptions ¶
type InternalFilter ¶
type MemoryStorage ¶
type MemoryStorage struct {
// contains filtered or unexported fields
}
MemoryStorage is an implementation of the Storage Interface using memory. Currently only insert and pull functions are implemented.
func NewMemoryStorage ¶
func NewMemoryStorage(ctx context.Context) *MemoryStorage
func (*MemoryStorage) Count ¶
func (storage *MemoryStorage) Count(_ context.Context, opts *FindOptions) (int64, error)
func (*MemoryStorage) EditQueueConfiguration ¶
func (storage *MemoryStorage) EditQueueConfiguration(_ context.Context, configuration *configuration.QueueConfiguration) error
func (*MemoryStorage) Find ¶
func (storage *MemoryStorage) Find(_ context.Context, opt *FindOptions) ([]message.Message, error)
func (*MemoryStorage) Flush ¶
func (storage *MemoryStorage) Flush(_ context.Context) (deletedCount int64, err error)
func (*MemoryStorage) GetQueueConfiguration ¶
func (storage *MemoryStorage) GetQueueConfiguration(_ context.Context, queue string) (*configuration.QueueConfiguration, error)
func (*MemoryStorage) GetStringInternalId ¶
func (*MemoryStorage) ListQueueConfigurations ¶
func (storage *MemoryStorage) ListQueueConfigurations(ctx context.Context) ([]*configuration.QueueConfiguration, error)
func (*MemoryStorage) ListQueueNames ¶
func (storage *MemoryStorage) ListQueueNames(_ context.Context) (queues []string, err error)
func (*MemoryStorage) ListQueuePrefixes ¶
func (storage *MemoryStorage) ListQueuePrefixes(_ context.Context) (queues []string, err error)
type MongoStorage ¶
type MongoStorage struct {
// contains filtered or unexported fields
}
MongoStorage is an implementation of the Storage Interface using MongoDB.
func NewMongoStorage ¶
func NewMongoStorage(ctx context.Context) (*MongoStorage, error)
func (*MongoStorage) Ack ¶
func (storage *MongoStorage) Ack(ctx context.Context, message *message.Message) (modifiedCount int64, err error)
Ack updates the messages on mongostorage with updated status, score and diagnostic information.
func (*MongoStorage) Count ¶
func (storage *MongoStorage) Count(ctx context.Context, opt *FindOptions) (int64, error)
func (*MongoStorage) EditQueueConfiguration ¶
func (storage *MongoStorage) EditQueueConfiguration(ctx context.Context, configuration *configuration.QueueConfiguration) error
func (*MongoStorage) Find ¶
func (storage *MongoStorage) Find(ctx context.Context, opt *FindOptions) ([]message.Message, error)
Find returns a cursor with the specified projection for fetching all valid messages sorted by its ascending insertion date.
func (*MongoStorage) GetQueueConfiguration ¶
func (storage *MongoStorage) GetQueueConfiguration(ctx context.Context, queue string) (*configuration.QueueConfiguration, error)
func (*MongoStorage) GetStringInternalId ¶
func (*MongoStorage) ListQueueConfigurations ¶
func (storage *MongoStorage) ListQueueConfigurations(ctx context.Context) ([]*configuration.QueueConfiguration, error)
func (*MongoStorage) ListQueueNames ¶
func (storage *MongoStorage) ListQueueNames(ctx context.Context) (queues []string, err error)
func (*MongoStorage) ListQueuePrefixes ¶
func (storage *MongoStorage) ListQueuePrefixes(ctx context.Context) (queues []string, err error)
type Storage ¶
type Storage interface { Insert(ctx context.Context, messages ...*message.Message) (inserted int64, updated int64, err error) Find(ctx context.Context, opt *FindOptions) ([]message.Message, error) Remove(ctx context.Context, queue string, ids ...string) (deleted int64, err error) Ack(ctx context.Context, message *message.Message) (modifiedCount int64, err error) Nack(ctx context.Context, message *message.Message) (modifiedCount int64, err error) ListQueueNames(ctx context.Context) (queues []string, err error) ListQueuePrefixes(ctx context.Context) (queues []string, err error) Count(ctx context.Context, opt *FindOptions) (int64, error) GetStringInternalId(ctx context.Context, message *message.Message) string EditQueueConfiguration(ctx context.Context, configuration *configuration.QueueConfiguration) error GetQueueConfiguration(ctx context.Context, queue string) (*configuration.QueueConfiguration, error) ListQueueConfigurations(ctx context.Context) ([]*configuration.QueueConfiguration, error) // Available to cleanup tests Flush(ctx context.Context) (deletedCount int64, err error) // Close connection to the storage Close(ctx context.Context) error }
Storage is an interface that stores the messages that have to be routed. It contains all Data of the message and is used as a storage only.
Click to show internal directories.
Click to hide internal directories.