Documentation ¶
Index ¶
- Constants
- Variables
- func IsValidTimestamp(s *timestamppb.Timestamp) bool
- func ValidateGroup(group *api.ConsumerGroup, partial bool) error
- func ValidateTopic(topic *api.Topic, partial bool) error
- type GroupIterator
- type IndexKey
- type ObjectKey
- type Segment
- type Store
- func (s *Store) AllowedTopics(projectID ulid.ULID) (_ []ulid.ULID, err error)
- func (s *Store) Close() error
- func (s *Store) Count(slice *util.Range) (count uint64, err error)
- func (s *Store) Create(key ObjectKey, value []byte, uniqueIndices ...ObjectKey) (err error)
- func (s *Store) CreateGroup(group *api.ConsumerGroup) (err error)
- func (s *Store) CreateTopic(topic *api.Topic) (err error)
- func (s *Store) Delete(key []byte) error
- func (s *Store) DeleteGroup(group *api.ConsumerGroup) (err error)
- func (s *Store) DeleteTopic(topicID ulid.ULID) (err error)
- func (s *Store) Destroy(key IndexKey, uniqueIndices ...ObjectKey) (err error)
- func (s *Store) Get(key []byte) (value []byte, err error)
- func (s *Store) GetOrCreateGroup(group *api.ConsumerGroup) (created bool, err error)
- func (s *Store) Has(key []byte) (ret bool, err error)
- func (s *Store) ListGroups(projectID ulid.ULID) iterator.GroupIterator
- func (s *Store) ListTopicNames(projectID ulid.ULID) iterator.TopicNamesIterator
- func (s *Store) ListTopics(projectID ulid.ULID) iterator.TopicIterator
- func (s *Store) Put(key, value []byte) error
- func (s *Store) ReadOnly() bool
- func (s *Store) Retrieve(key IndexKey) (value []byte, err error)
- func (s *Store) RetrieveGroup(group *api.ConsumerGroup) (err error)
- func (s *Store) RetrieveTopic(topicID ulid.ULID) (topic *api.Topic, err error)
- func (s *Store) RetrieveUnique(uniqueKey ObjectKey) (value []byte, err error)
- func (s *Store) TopicExists(in *api.TopicName) (_ *api.TopicExistsInfo, err error)
- func (s *Store) Update(key ObjectKey, value []byte, uniqueIndices ...ObjectKey) (err error)
- func (s *Store) UpdateGroup(group *api.ConsumerGroup) (err error)
- func (s *Store) UpdateTopic(topic *api.Topic) (err error)
- type TopicIterator
- type TopicNamesIterator
Constants ¶
const MaxTopicNameLength = 512
Variables ¶
var ( TopicSegment = Segment{0x74, 0x70} TopicNamesSegment = Segment{0x54, 0x6e} GroupSegment = Segment{0x47, 0x50} )
Segments currently in use by Ensign
Functions ¶
func IsValidTimestamp ¶ added in v0.5.0
func IsValidTimestamp(s *timestamppb.Timestamp) bool
func ValidateGroup ¶ added in v0.5.1
func ValidateGroup(group *api.ConsumerGroup, partial bool) error
func ValidateTopic ¶ added in v0.5.0
Validate a topic is ready for storage in the database. If partial is true, then the fields that may be set by this package (e.g. ID, created, modified) are not checked, otherwise the entire struct is validated.
Types ¶
type GroupIterator ¶ added in v0.5.1
Implements the iterator.GroupIterator to provide access to a list of groups.
func (*GroupIterator) Error ¶ added in v0.5.1
func (i *GroupIterator) Error() (err error)
func (*GroupIterator) Group ¶ added in v0.5.1
func (i *GroupIterator) Group() (*api.ConsumerGroup, error)
Group unmarshals the next ConsumerGroup in the iterator.
type IndexKey ¶ added in v0.5.0
type IndexKey [16]byte
An Index Key is a ULID that must be unique across all objects; e.g. Topics must have unique keys across projects. This can be guaranteed by using ulids.New for creating IDs. If a key can be created by a user e.g. for Groups, it must not use the object key accessor since we cannot guarantee that users will create unique keys.
func CreateIndex ¶ added in v0.5.0
type ObjectKey ¶ added in v0.5.0
type ObjectKey [34]byte
ObjectKey is the 16 byte project ID followed by a 2 byte object segment then a 16 byte unique object ID. The IndexKey maps to the object key to allow for easy lookups.
func GroupKey ¶ added in v0.5.1
func GroupKey(group *api.ConsumerGroup) ObjectKey
GroupKey is a 34 byte value that is the concatenated projectID followed by the group segment and then the murmur3 hashed key of the group (unless a 16 byte ID is specified by the user). If there are any errors, e.g. the group is invalid, this function will panic. It is the responsibility of the caller to validate the group.
func TopicKey ¶ added in v0.5.0
TopicKey is a 34 byte value that is the concatenated projectID followed by the topic segment and then the topicID. We expect that topicIDs are unique in the database.
func TopicNameKey ¶ added in v0.5.1
TopicNameKey is a 34 byte value that is the concatenated projectID followed by the topic segment and then the murmur3 hashed topic name. This allows us to ensure that topic names are unique to the project.
func (*ObjectKey) UnmarshalValue ¶ added in v0.5.0
type Segment ¶ added in v0.5.1
type Segment [2]byte
Segments ensure that different objects are stored contiguously in the database ordered by their project then their ID to make it easy to scan for objects.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store implements the store.MetaStore interface for interacting with topics and other persistent data in the ensign database. Store can be readonly which is enforced both by the underlying disk reads to leveldb and the readonly flag at the top level.
func Open ¶
func Open(conf config.StorageConfig) (store *Store, err error)
Open a new metadata store at the metapath supplied by the configuration.
func (*Store) AllowedTopics ¶ added in v0.5.1
AllowedTopics returns the topic ids for all of the topics specified in the project. This method is routinely used for validating topics available to the user.
func (*Store) Count ¶ added in v0.5.0
Count the number of objects that match the specified range by iterating through all of the keys and counting them. This is primarily used for testing.
func (*Store) Create ¶ added in v0.5.0
Create an object in the database with the specified key by saving both the object key in the database as well as the object mapped to the key value. If the object already exists, then an error is returned. To prevent concurrency issues, Create locks the object key before access and performs all writes in batch. TODO: extend tests for this method to validate concurrency.
func (*Store) CreateGroup ¶ added in v0.5.1
func (s *Store) CreateGroup(group *api.ConsumerGroup) (err error)
Create a consumer group in the database. If the group already exists or if the topic is not valid an error is returned. The entire object key for the group is locked in the keymu to prevent concurrent writes to the group.
func (*Store) CreateTopic ¶
Create a topic in the database; if the topic already exists or if the topic is not valid an error is returned. This method uses the keymu lock to avoid concurrency issues for multiple writers.
func (*Store) Delete ¶ added in v0.5.0
Delete the specified key, wrapping any leveldb errors in an errors.Error. NOTE: if deleting objects it is preferred to use Destroy to avoid concurrency issues.
func (*Store) DeleteGroup ¶ added in v0.5.1
func (s *Store) DeleteGroup(group *api.ConsumerGroup) (err error)
Delete a group from the database. If the group does not exist, no error is returned.
func (*Store) DeleteTopic ¶
Delete a topic from the database. If the topic does not exist, no error is returned. This method uses the keymu lock to avoid concurrency issues and also cleans up any indices associated with the topic.
func (*Store) Destroy ¶ added in v0.5.0
Destroy an object in the database by first looking its object key with the specified objectID then deleting the value at the retrieved object key. To avoid concurrency issues, destroy locks the key and deletes both the object key and the object in a batch write. If the object does not exist, no error is returned.
func (*Store) Get ¶ added in v0.5.0
Gets a value for the specified key, wrapping any leveldb errors in an errors.Error. NOTE: if getting objects it is preferred to use Retrieve to avoid concurrency issues.
func (*Store) GetOrCreateGroup ¶ added in v0.5.1
func (s *Store) GetOrCreateGroup(group *api.ConsumerGroup) (created bool, err error)
GetOrCreate a consumer group in the database. This method is the primary mechanism of retrieving a consumer group object from the database. The user should only specify the minimum required fields on the group and if the group exists the object will be updated in place with other values. Otherwise, the group will be created with default values set ready service a new group. If the group was created this function will return true.
func (*Store) Has ¶ added in v0.5.0
Has checks the existence of a key in the database, wrapping any leveldb errors in an errors.Error for easy error checking and verification.
func (*Store) ListGroups ¶ added in v0.5.1
func (s *Store) ListGroups(projectID ulid.ULID) iterator.GroupIterator
List all consumer groups associated with the specified projectID. Returns a GroupIterator that can be used to retrieve each individual consumer group.
func (*Store) ListTopicNames ¶ added in v0.5.1
func (s *Store) ListTopicNames(projectID ulid.ULID) iterator.TopicNamesIterator
func (*Store) ListTopics ¶
func (s *Store) ListTopics(projectID ulid.ULID) iterator.TopicIterator
List all of the topics associated with the specified projectID. Returns a TopicIterator that can be used to retrieve each individual topic or to create a page of topics for paginated requests.
func (*Store) Put ¶ added in v0.5.0
Put a value for the specified pair, wrapping any leveldb errors in an errors.Error. NOTE: if putting objects it is preferred to use Create or Update to avoid concurrency issues by using key-specific transactions.
func (*Store) Retrieve ¶ added in v0.5.0
Retrieve an object from the database by first looking up its object key with the specified objectID then returning the value at the retrieved object key. TODO: implement read locks to avoid concurrency issues.
func (*Store) RetrieveGroup ¶ added in v0.5.1
func (s *Store) RetrieveGroup(group *api.ConsumerGroup) (err error)
Retrieve updates the group pointer in place with the current values in the database.
func (*Store) RetrieveTopic ¶
Retrieve a topic from the database.
func (*Store) RetrieveUnique ¶ added in v0.5.1
Retrieve an object by its unique index by first looking up the object key associated with the unique ID and then returning the value at the retrieved object key.
func (*Store) TopicExists ¶ added in v0.5.1
func (*Store) Update ¶ added in v0.5.0
Update an object in the database with the specified object key. If the object does not exist, an error is returned (unlike normal Put semantics). To avoid concurrency issues, update locks the specified key before performing any writes.
func (*Store) UpdateGroup ¶ added in v0.5.1
func (s *Store) UpdateGroup(group *api.ConsumerGroup) (err error)
Update a group by putting the data from the input group into the database. If the group does not exist, an error is returned (unlike normal Put semantics). To avoid concurrency issues, this method locks the object key before performing writes.
NOTE: it is important that only one go routine updates the group otherwise go routines may update the group to conflicting values since there is no ability to check if one group's updates are "more recent" than another's (e.g. through a versioning mechanism). The database does not guard the order of writes.
func (*Store) UpdateTopic ¶
Update a topic by putting the specified topic into the database. This method uses the keymu lock to avoid concurrency issues and returns an error if the specified topic does not exist or is not valid.
NOTE: We must return an error if the topic name has changed otherwise we will also have to modify the uniqueness constraints on topic name and check them as well.
type TopicIterator ¶ added in v0.5.0
Implements iterator.TopicIterator to provide access to a list of topics.
func (*TopicIterator) Error ¶ added in v0.5.1
func (i *TopicIterator) Error() (err error)
func (*TopicIterator) NextPage ¶ added in v0.5.0
func (i *TopicIterator) NextPage(in *api.PageInfo) (page *api.TopicsPage, err error)
NextPage seeks the iterator to the next page of results and returns a page of topics.
type TopicNamesIterator ¶ added in v0.5.1
Implements iterator.TopicNamesIterator to provide access to the topic names index.
func (*TopicNamesIterator) Error ¶ added in v0.5.1
func (i *TopicNamesIterator) Error() (err error)
func (*TopicNamesIterator) NextPage ¶ added in v0.5.1
func (i *TopicNamesIterator) NextPage(in *api.PageInfo) (page *api.TopicNamesPage, err error)