Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrMessageNoID = errors.New("Message cannot be written without a new UUID") ErrNegativeBatchSize = errors.New("Batch size cannot be negative") )
View Source
var ( ErrInvalidSubscriberID = errors.New("Subscriber ID cannot be blank") ErrInvalidStreamName = errors.New("Stream Name cannot be blank") ErrBlankCategory = errors.New("Category cannot be blank") ErrInvalidCategory = errors.New("Category cannot contain a hyphen") ErrInvalidSubscriberPosition = errors.New("Subscriber position must be greater than or equal to -1") ErrNilMessage = errors.New("Message cannot be nil") ErrInvalidPosition = errors.New("position must be greater than equal to -1") )
Errors
Functions ¶
This section is empty.
Types ¶
type MessageEnvelope ¶
type MessageEnvelope struct { ID uuid.UUID `db:"id"` StreamName string `db:"stream_name"` StreamCategory string `db:"stream_category"` MessageType string `db:"type"` Version int64 `db:"position"` GlobalPosition int64 `db:"global_position"` Data []byte `db:"data"` Metadata []byte `db:"metadata"` Time time.Time `db:"time"` }
Actual values that come out of the database
func (*MessageEnvelope) String ¶
func (msgEnv *MessageEnvelope) String() string
type Repository ¶
type Repository interface { // writes WriteMessage(ctx context.Context, message *MessageEnvelope) error WriteMessageWithExpectedPosition(ctx context.Context, message *MessageEnvelope, position int64) error // reads from stream GetAllMessagesInStream(ctx context.Context, streamName string, batchSize int) ([]*MessageEnvelope, error) GetAllMessagesInStreamSince(ctx context.Context, streamName string, globalPosition int64, batchSize int) ([]*MessageEnvelope, error) GetLastMessageInStream(ctx context.Context, streamName string) (*MessageEnvelope, error) // reads from category GetAllMessagesInCategory(ctx context.Context, category string, batchSize int) ([]*MessageEnvelope, error) GetAllMessagesInCategorySince(ctx context.Context, category string, globalPosition int64, batchSize int) ([]*MessageEnvelope, error) }
Repository the storage implementation for messagestore
func NewPostgresRepository ¶
func NewPostgresRepository(db *sql.DB, log logrus.FieldLogger) Repository
NewPostgresRepository creates a new in memory implementation for the messagestore reop
Source Files
¶
Click to show internal directories.
Click to hide internal directories.