meta

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2023 License: BSD-3-Clause Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const MaxTopicNameLength = 512

Variables

View Source
var (
	TopicSegment      = Segment{0x74, 0x70}
	TopicNamesSegment = Segment{0x54, 0x6e}
	GroupSegment      = Segment{0x47, 0x50}
)

Segments currently in use by Ensign

Functions

func IsValidTimestamp added in v0.5.0

func IsValidTimestamp(s *timestamppb.Timestamp) bool

func ValidateGroup added in v0.5.1

func ValidateGroup(group *api.ConsumerGroup, partial bool) error

func ValidateTopic added in v0.5.0

func ValidateTopic(topic *api.Topic, partial bool) error

Validate a topic is ready for storage in the database. If partial is true, then the fields that may be set by this package (e.g. ID, created, modified) are not checked, otherwise the entire struct is validated.

Types

type GroupIterator added in v0.5.1

type GroupIterator struct {
	ldbiter.Iterator
}

Implements the iterator.GroupIterator to provide access to a list of groups.

func (*GroupIterator) Error added in v0.5.1

func (i *GroupIterator) Error() (err error)

func (*GroupIterator) Group added in v0.5.1

func (i *GroupIterator) Group() (*api.ConsumerGroup, error)

Group unmarshals the next ConsumerGroup in the iterator.

type IndexKey added in v0.5.0

type IndexKey [16]byte

An Index Key is a ULID that must be unique across all objects; e.g. Topics must have unique keys across projects. This can be guaranteed by using ulids.New for creating IDs. If a key can be created by a user e.g. for Groups, it must not use the object key accessor since we cannot guarantee that users will create unique keys.

func CreateIndex added in v0.5.0

func CreateIndex(objectID ulid.ULID) (key IndexKey, err error)

type ObjectKey added in v0.5.0

type ObjectKey [34]byte

ObjectKey is the 16 byte project ID followed by a 2 byte object segment then a 16 byte unique object ID. The IndexKey maps to the object key to allow for easy lookups.

func CreateKey added in v0.5.0

func CreateKey(parentID, objectID ulid.ULID, segment Segment) (key ObjectKey, err error)

func GroupKey added in v0.5.1

func GroupKey(group *api.ConsumerGroup) ObjectKey

GroupKey is a 34 byte value that is the concatenated projectID followed by the group segment and then the murmur3 hashed key of the group (unless a 16 byte ID is specified by the user). If there are any errors, e.g. the group is invalid, this function will panic. It is the responsibility of the caller to validate the group.

func TopicKey added in v0.5.0

func TopicKey(topic *api.Topic) ObjectKey

TopicKey is a 34 byte value that is the concatenated projectID followed by the topic segment and then the topicID. We expect that topicIDs are unique in the database.

func TopicNameKey added in v0.5.1

func TopicNameKey(topic *api.Topic) ObjectKey

TopicNameKey is a 34 byte value that is the concatenated projectID followed by the topic segment and then the murmur3 hashed topic name. This allows us to ensure that topic names are unique to the project.

func (*ObjectKey) Key added in v0.5.0

func (k *ObjectKey) Key() IndexKey

func (*ObjectKey) ObjectID added in v0.5.0

func (k *ObjectKey) ObjectID() (id ulid.ULID, err error)

func (*ObjectKey) ParentID added in v0.5.0

func (k *ObjectKey) ParentID() (id ulid.ULID, err error)

func (*ObjectKey) Segment added in v0.5.1

func (k *ObjectKey) Segment() (Segment, error)

func (*ObjectKey) UnmarshalValue added in v0.5.0

func (k *ObjectKey) UnmarshalValue(data []byte) error

type Segment added in v0.5.1

type Segment [2]byte

Segments ensure that different objects are stored contiguously in the database ordered by their project then their ID to make it easy to scan for objects.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store implements the store.MetaStore interface for interacting with topics and other persistent data in the ensign database. Store can be readonly which is enforced both by the underlying disk reads to leveldb and the readonly flag at the top level.

func Open

func Open(conf config.StorageConfig) (store *Store, err error)

Open a new metadata store at the metapath supplied by the configuration.

func (*Store) AllowedTopics added in v0.5.1

func (s *Store) AllowedTopics(projectID ulid.ULID) (_ []ulid.ULID, err error)

AllowedTopics returns the topic ids for all of the topics specified in the project. This method is routinely used for validating topics available to the user.

func (*Store) Close

func (s *Store) Close() error

Close the underlying leveldb gracefully to avoid database corruption.

func (*Store) Count added in v0.5.0

func (s *Store) Count(slice *util.Range) (count uint64, err error)

Count the number of objects that match the specified range by iterating through all of the keys and counting them. This is primarily used for testing.

func (*Store) Create added in v0.5.0

func (s *Store) Create(key ObjectKey, value []byte, uniqueIndices ...ObjectKey) (err error)

Create an object in the database with the specified key by saving both the object key in the database as well as the object mapped to the key value. If the object already exists, then an error is returned. To prevent concurrency issues, Create locks the object key before access and performs all writes in batch. TODO: extend tests for this method to validate concurrency.

func (*Store) CreateGroup added in v0.5.1

func (s *Store) CreateGroup(group *api.ConsumerGroup) (err error)

Create a consumer group in the database. If the group already exists or if the topic is not valid an error is returned. The entire object key for the group is locked in the keymu to prevent concurrent writes to the group.

func (*Store) CreateTopic

func (s *Store) CreateTopic(topic *api.Topic) (err error)

Create a topic in the database; if the topic already exists or if the topic is not valid an error is returned. This method uses the keymu lock to avoid concurrency issues for multiple writers.

func (*Store) Delete added in v0.5.0

func (s *Store) Delete(key []byte) error

Delete the specified key, wrapping any leveldb errors in an errors.Error. NOTE: if deleting objects it is preferred to use Destroy to avoid concurrency issues.

func (*Store) DeleteGroup added in v0.5.1

func (s *Store) DeleteGroup(group *api.ConsumerGroup) (err error)

Delete a group from the database. If the group does not exist, no error is returned.

func (*Store) DeleteTopic

func (s *Store) DeleteTopic(topicID ulid.ULID) (err error)

Delete a topic from the database. If the topic does not exist, no error is returned. This method uses the keymu lock to avoid concurrency issues and also cleans up any indices associated with the topic.

func (*Store) Destroy added in v0.5.0

func (s *Store) Destroy(key IndexKey, uniqueIndices ...ObjectKey) (err error)

Destroy an object in the database by first looking its object key with the specified objectID then deleting the value at the retrieved object key. To avoid concurrency issues, destroy locks the key and deletes both the object key and the object in a batch write. If the object does not exist, no error is returned.

func (*Store) Get added in v0.5.0

func (s *Store) Get(key []byte) (value []byte, err error)

Gets a value for the specified key, wrapping any leveldb errors in an errors.Error. NOTE: if getting objects it is preferred to use Retrieve to avoid concurrency issues.

func (*Store) GetOrCreateGroup added in v0.5.1

func (s *Store) GetOrCreateGroup(group *api.ConsumerGroup) (created bool, err error)

GetOrCreate a consumer group in the database. This method is the primary mechanism of retrieving a consumer group object from the database. The user should only specify the minimum required fields on the group and if the group exists the object will be updated in place with other values. Otherwise, the group will be created with default values set ready service a new group. If the group was created this function will return true.

func (*Store) Has added in v0.5.0

func (s *Store) Has(key []byte) (ret bool, err error)

Has checks the existence of a key in the database, wrapping any leveldb errors in an errors.Error for easy error checking and verification.

func (*Store) ListGroups added in v0.5.1

func (s *Store) ListGroups(projectID ulid.ULID) iterator.GroupIterator

List all consumer groups associated with the specified projectID. Returns a GroupIterator that can be used to retrieve each individual consumer group.

func (*Store) ListTopicNames added in v0.5.1

func (s *Store) ListTopicNames(projectID ulid.ULID) iterator.TopicNamesIterator

func (*Store) ListTopics

func (s *Store) ListTopics(projectID ulid.ULID) iterator.TopicIterator

List all of the topics associated with the specified projectID. Returns a TopicIterator that can be used to retrieve each individual topic or to create a page of topics for paginated requests.

func (*Store) Put added in v0.5.0

func (s *Store) Put(key, value []byte) error

Put a value for the specified pair, wrapping any leveldb errors in an errors.Error. NOTE: if putting objects it is preferred to use Create or Update to avoid concurrency issues by using key-specific transactions.

func (*Store) ReadOnly

func (s *Store) ReadOnly() bool

Returns the readonly state of the meta store.

func (*Store) Retrieve added in v0.5.0

func (s *Store) Retrieve(key IndexKey) (value []byte, err error)

Retrieve an object from the database by first looking up its object key with the specified objectID then returning the value at the retrieved object key. TODO: implement read locks to avoid concurrency issues.

func (*Store) RetrieveGroup added in v0.5.1

func (s *Store) RetrieveGroup(group *api.ConsumerGroup) (err error)

Retrieve updates the group pointer in place with the current values in the database.

func (*Store) RetrieveTopic

func (s *Store) RetrieveTopic(topicID ulid.ULID) (topic *api.Topic, err error)

Retrieve a topic from the database.

func (*Store) RetrieveUnique added in v0.5.1

func (s *Store) RetrieveUnique(uniqueKey ObjectKey) (value []byte, err error)

Retrieve an object by its unique index by first looking up the object key associated with the unique ID and then returning the value at the retrieved object key.

func (*Store) TopicExists added in v0.5.1

func (s *Store) TopicExists(in *api.TopicName) (_ *api.TopicExistsInfo, err error)

func (*Store) Update added in v0.5.0

func (s *Store) Update(key ObjectKey, value []byte, uniqueIndices ...ObjectKey) (err error)

Update an object in the database with the specified object key. If the object does not exist, an error is returned (unlike normal Put semantics). To avoid concurrency issues, update locks the specified key before performing any writes.

func (*Store) UpdateGroup added in v0.5.1

func (s *Store) UpdateGroup(group *api.ConsumerGroup) (err error)

Update a group by putting the data from the input group into the database. If the group does not exist, an error is returned (unlike normal Put semantics). To avoid concurrency issues, this method locks the object key before performing writes.

NOTE: it is important that only one go routine updates the group otherwise go routines may update the group to conflicting values since there is no ability to check if one group's updates are "more recent" than another's (e.g. through a versioning mechanism). The database does not guard the order of writes.

func (*Store) UpdateTopic

func (s *Store) UpdateTopic(topic *api.Topic) (err error)

Update a topic by putting the specified topic into the database. This method uses the keymu lock to avoid concurrency issues and returns an error if the specified topic does not exist or is not valid.

NOTE: We must return an error if the topic name has changed otherwise we will also have to modify the uniqueness constraints on topic name and check them as well.

type TopicIterator added in v0.5.0

type TopicIterator struct {
	ldbiter.Iterator
}

Implements iterator.TopicIterator to provide access to a list of topics.

func (*TopicIterator) Error added in v0.5.1

func (i *TopicIterator) Error() (err error)

func (*TopicIterator) NextPage added in v0.5.0

func (i *TopicIterator) NextPage(in *api.PageInfo) (page *api.TopicsPage, err error)

NextPage seeks the iterator to the next page of results and returns a page of topics.

func (*TopicIterator) Topic added in v0.5.0

func (i *TopicIterator) Topic() (*api.Topic, error)

Topic unmarshals the next topic in the iterator.

type TopicNamesIterator added in v0.5.1

type TopicNamesIterator struct {
	ldbiter.Iterator
}

Implements iterator.TopicNamesIterator to provide access to the topic names index.

func (*TopicNamesIterator) Error added in v0.5.1

func (i *TopicNamesIterator) Error() (err error)

func (*TopicNamesIterator) NextPage added in v0.5.1

func (i *TopicNamesIterator) NextPage(in *api.PageInfo) (page *api.TopicNamesPage, err error)

func (*TopicNamesIterator) TopicName added in v0.5.1

func (i *TopicNamesIterator) TopicName() (_ *api.TopicName, err error)

TopicName unmarshals the next topic name in the iterator

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL