Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrIterFinished = errors.New("ERR iteration finished successfully") ErrInvalidTopic = errors.New("invalid topic") ErrTopicAlreadyExists = errors.New("topic already exists") ErrArchivedTopic = errors.New("topic has been archived") ErrTopicRecreate = errors.New("cannot recreate archived topic") ErrMissingBucket = errors.New("missing topics bucket") ErrCreatingTopic = errors.New("could not create topic") ErrInvalidConsumer = errors.New("invalid consumer for group") ErrConsumerAlreadyRegistered = errors.New("consumer is already registered") )
Functions ¶
func NewSqlStore ¶
Types ¶
type KVItem ¶
func (*KVItem) IsFinished ¶
type Store ¶
type Store interface { CreateTopic(ctx context.Context, topic *models.Topic) error DeleteTopic(ctx context.Context, topic *models.Topic) error AddRecords(ctx context.Context, topic string, records ...*models.Record) error GetRecords(ctx context.Context, topic string, start string, limit int) ([]*models.Record, error) FetchRecord(ctx context.Context, consumer *models.Consumer) (*models.Record, error) ListTopics(ctx context.Context) ([]string, error) RegisterConsumer(ctx context.Context, consumer *models.Consumer) (*models.Consumer, error) GetConsumerPosition(ctx context.Context, consumer *models.Consumer) (string, error) CommitConsumerPosition(ctx context.Context, consumer *models.Consumer) error GetConsumerLag(ctx context.Context, consumer *models.Consumer) (int64, error) LoadMeta(ctx context.Context, topic string) (*models.Topic, error) Stats() map[string]*models.Topic Impl() any Close() SnapshotItems() <-chan DataItem PruneOldRecords(ctx context.Context) error }
Click to show internal directories.
Click to hide internal directories.