storage

package
v0.0.22 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2023 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type FindOptions

type FindOptions struct {
	Projection *map[string]int
	Sort       *orderedmap.OrderedMap[string, int]
	Limit      int64
	*InternalFilter

	// Boolean to deal with storage retries when storage reports missing elements
	Retry bool
}

type InternalFilter

type InternalFilter struct {
	InternalIdBreakpointGt  string
	InternalIdBreakpointLte string
	Ids                     *[]string
	Queue                   string
	QueuePrefix             string

	// Used to manage TTL cleaner. It will always search for elements with expire date bigger than the value provided in this attribute.
	ExpiryDate *time.Time
}

type MemoryStorage

type MemoryStorage struct {
	// contains filtered or unexported fields
}

MemoryStorage is an implementation of the Storage Interface using memory. Currently only insert and pull functions are implemented.

func NewMemoryStorage

func NewMemoryStorage(ctx context.Context) *MemoryStorage

func (*MemoryStorage) Ack

func (storage *MemoryStorage) Ack(_ context.Context, msg *message.Message) (modifiedCount int64, err error)

func (*MemoryStorage) Close

func (storage *MemoryStorage) Close(ctx context.Context) error

func (*MemoryStorage) Count

func (storage *MemoryStorage) Count(_ context.Context, opts *FindOptions) (int64, error)

func (*MemoryStorage) EditQueueConfiguration

func (storage *MemoryStorage) EditQueueConfiguration(_ context.Context, configuration *configuration.QueueConfiguration) error

func (*MemoryStorage) Find

func (storage *MemoryStorage) Find(_ context.Context, opt *FindOptions) ([]message.Message, error)

func (*MemoryStorage) Flush

func (storage *MemoryStorage) Flush(_ context.Context) (deletedCount int64, err error)

func (*MemoryStorage) GetQueueConfiguration

func (storage *MemoryStorage) GetQueueConfiguration(_ context.Context, queue string) (*configuration.QueueConfiguration, error)

func (*MemoryStorage) GetStringInternalId

func (storage *MemoryStorage) GetStringInternalId(_ context.Context, message *message.Message) string

func (*MemoryStorage) Insert

func (storage *MemoryStorage) Insert(_ context.Context, messages ...*message.Message) (int64, int64, error)

func (*MemoryStorage) ListQueueConfigurations

func (storage *MemoryStorage) ListQueueConfigurations(ctx context.Context) ([]*configuration.QueueConfiguration, error)

func (*MemoryStorage) ListQueueNames

func (storage *MemoryStorage) ListQueueNames(_ context.Context) (queues []string, err error)

func (*MemoryStorage) ListQueuePrefixes

func (storage *MemoryStorage) ListQueuePrefixes(_ context.Context) (queues []string, err error)

func (*MemoryStorage) Nack added in v0.0.18

func (storage *MemoryStorage) Nack(_ context.Context, msg *message.Message) (modifiedCount int64, err error)

func (*MemoryStorage) Remove

func (storage *MemoryStorage) Remove(_ context.Context, queue string, ids ...string) (deleted int64, err error)

type MongoStorage

type MongoStorage struct {
	// contains filtered or unexported fields
}

MongoStorage is an implementation of the Storage Interface using MongoDB.

func NewMongoStorage

func NewMongoStorage(ctx context.Context) (*MongoStorage, error)

func (*MongoStorage) Ack

func (storage *MongoStorage) Ack(ctx context.Context, message *message.Message) (modifiedCount int64, err error)

Ack updates the messages on mongostorage with updated status, score and diagnostic information.

func (*MongoStorage) Close

func (storage *MongoStorage) Close(ctx context.Context) error

func (*MongoStorage) Count

func (storage *MongoStorage) Count(ctx context.Context, opt *FindOptions) (int64, error)

func (*MongoStorage) EditQueueConfiguration

func (storage *MongoStorage) EditQueueConfiguration(ctx context.Context, configuration *configuration.QueueConfiguration) error

func (*MongoStorage) Find

func (storage *MongoStorage) Find(ctx context.Context, opt *FindOptions) ([]message.Message, error)

Find returns a cursor with the specified projection for fetching all valid messages sorted by its ascending insertion date.

func (*MongoStorage) Flush

func (storage *MongoStorage) Flush(ctx context.Context) (int64, error)

func (*MongoStorage) GetQueueConfiguration

func (storage *MongoStorage) GetQueueConfiguration(ctx context.Context, queue string) (*configuration.QueueConfiguration, error)

func (*MongoStorage) GetStringInternalId

func (storage *MongoStorage) GetStringInternalId(_ context.Context, message *message.Message) string

func (*MongoStorage) Insert

func (storage *MongoStorage) Insert(ctx context.Context, messages ...*message.Message) (insertedCount int64, modifiedCount int64, err error)

func (*MongoStorage) ListQueueConfigurations

func (storage *MongoStorage) ListQueueConfigurations(ctx context.Context) ([]*configuration.QueueConfiguration, error)

func (*MongoStorage) ListQueueNames

func (storage *MongoStorage) ListQueueNames(ctx context.Context) (queues []string, err error)

func (*MongoStorage) ListQueuePrefixes

func (storage *MongoStorage) ListQueuePrefixes(ctx context.Context) (queues []string, err error)

func (*MongoStorage) Nack added in v0.0.18

func (storage *MongoStorage) Nack(ctx context.Context, message *message.Message) (modifiedCount int64, err error)

Nack updates the messages on mongostorage with updated status, score and diagnostic information.

func (*MongoStorage) Remove

func (storage *MongoStorage) Remove(ctx context.Context, queue string, ids ...string) (deleted int64, err error)

type Storage

type Storage interface {
	Insert(ctx context.Context, messages ...*message.Message) (inserted int64, updated int64, err error)

	Find(ctx context.Context, opt *FindOptions) ([]message.Message, error)
	Remove(ctx context.Context, queue string, ids ...string) (deleted int64, err error)
	Ack(ctx context.Context, message *message.Message) (modifiedCount int64, err error)
	Nack(ctx context.Context, message *message.Message) (modifiedCount int64, err error)

	ListQueueNames(ctx context.Context) (queues []string, err error)
	ListQueuePrefixes(ctx context.Context) (queues []string, err error)
	Count(ctx context.Context, opt *FindOptions) (int64, error)

	GetStringInternalId(ctx context.Context, message *message.Message) string

	EditQueueConfiguration(ctx context.Context, configuration *configuration.QueueConfiguration) error
	GetQueueConfiguration(ctx context.Context, queue string) (*configuration.QueueConfiguration, error)
	ListQueueConfigurations(ctx context.Context) ([]*configuration.QueueConfiguration, error)

	// Available to cleanup tests
	Flush(ctx context.Context) (deletedCount int64, err error)

	// Close connection to the storage
	Close(ctx context.Context) error
}

Storage is an interface that stores the messages that have to be routed. It contains all Data of the message and is used as a storage only.

func CreateStorage

func CreateStorage(ctx context.Context, storageType Type) (Storage, error)

type Type

type Type string
const (
	MONGODB Type = "MONGODB"
	MEMORY  Type = "MEMORY"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL