Documentation
¶
Index ¶
- Constants
- Variables
- func NewCarnaxRaft(raftDir string, retainSnapshotCount int, raftId string, myAddr string, ...) (*raft.Raft, *transport.Manager)
- func ValidateTopicConfig(config *apiv1.TopicConfig) ([]error, bool)
- type CarnaxConfig
- type CarnaxController
- func (m *CarnaxController) CommitSync(consumerGroupId string) error
- func (m *CarnaxController) CreateTopic(config *apiv1.TopicConfig) error
- func (m *CarnaxController) Flush() error
- func (m *CarnaxController) OffsetsForTimes(id string, clientId string, m2 map[*TopicPartitionHash]*apiv1.SeekIndex)
- func (m *CarnaxController) Poll(consumerGroupId string, clientId string, _ time.Duration) (*controllerv1.Poll_Response, error)
- func (m *CarnaxController) Shutdown() error
- func (m *CarnaxController) Start(raftInst *raft.Raft) error
- func (m *CarnaxController) Subscribe(id string, clientId string, topics ...string) (*apiv1.ConsumerGroupNode, error)
- func (m *CarnaxController) Write(topic string, record *apiv1.Record) (*commandv1.Address, error)
- type CarnaxControllerFSM
- type ExecutionMode
- type IndexFile
- type ObjectStore
- type ObjectStoreS3
- type Properties
- type SegmentCache
- type SegmentKey
- type SegmentLog
- type SegmentLookupPred
- type SegmentPath
- type SegmentType
- type TimeIndexFile
- type TopicPartitionHash
- type TopicPartitionSegment
- type TopicSegment
- Bugs
Constants ¶
const SegmentIncrement = 1
SegmentIncrement is the additional padding to enforce new segments to always be incremented by an offset of 1. this introduces an idea of 'time' in segments rather as, if we write a message to the current segment and do not publish anything to it, by the time we have to roll the next segment (every 250ms by default) we will over-write the segment due to the starting offset being identical.
Variables ¶
var ( ErrNoIndexFound = errors.New("no index found") ErrNotALeader = errors.New("not a leader") ErrTopicNotFound = errors.New("topic not found") )
var ( KiB = 1024 MiB = 1 * KiB )
var DefaultCarnaxConfig = ParseFromProperties(Properties{ "log.segment.bytes": 8 * MiB, "segment.ms": int(250 * time.Millisecond.Milliseconds()), "bootstrap.servers": "", })
Functions ¶
func NewCarnaxRaft ¶
func NewCarnaxRaft(raftDir string, retainSnapshotCount int, raftId string, myAddr string, msgLog *CarnaxController, raftBootstrap bool) (*raft.Raft, *transport.Manager)
NewCarnaxRaft ... this is a bit messy
func ValidateTopicConfig ¶
func ValidateTopicConfig(config *apiv1.TopicConfig) ([]error, bool)
ValidateTopicConfig will run some validations on the given TopicConfig returns any errors and a boolean value signalling if the topic config is valid (i.e. OK).
Types ¶
type CarnaxConfig ¶
type CarnaxConfig struct { BootstrapServers []string ExecutionMode ExecutionMode MaxPollRecords int SegmentIndexBytes int SegmentJitterMs int SegmentMs int MaxMessageBytes int IndexIntervalBytes int LogIndexIntervalBytes int LogSegmentBytes int // contains filtered or unexported fields }
func ParseFromProperties ¶
func ParseFromProperties(props Properties) CarnaxConfig
type CarnaxController ¶
type CarnaxController struct { // raft consensus related bits RaftBind string RaftDir string // contains filtered or unexported fields }
CarnaxController ...
func NewCarnaxControllerWithConfig ¶
func NewCarnaxControllerWithConfig(store ObjectStore, config CarnaxConfig) *CarnaxController
func (*CarnaxController) CommitSync ¶
func (m *CarnaxController) CommitSync(consumerGroupId string) error
func (*CarnaxController) CreateTopic ¶
func (m *CarnaxController) CreateTopic(config *apiv1.TopicConfig) error
func (*CarnaxController) Flush ¶
func (m *CarnaxController) Flush() error
Flush will flush the open segment into the object-storeBackedLog.
func (*CarnaxController) OffsetsForTimes ¶
func (m *CarnaxController) OffsetsForTimes(id string, clientId string, m2 map[*TopicPartitionHash]*apiv1.SeekIndex)
OffsetsForTimes ... default.api.timeout.ms
func (*CarnaxController) Poll ¶
func (m *CarnaxController) Poll(consumerGroupId string, clientId string, _ time.Duration) (*controllerv1.Poll_Response, error)
Poll ...
This signature is inconsistent in how it returns information as opposed to the rest of the API
nit: we should respect the state of the consumer group node here e.g. if it's in a rebalance or not.
func (*CarnaxController) Shutdown ¶
func (m *CarnaxController) Shutdown() error
func (*CarnaxController) Subscribe ¶
func (m *CarnaxController) Subscribe(id string, clientId string, topics ...string) (*apiv1.ConsumerGroupNode, error)
Subscribe ...
NOTE: Ideally the client.id could just be generated as an uuid from the API wrapper.
type CarnaxControllerFSM ¶
type CarnaxControllerFSM CarnaxController
CarnaxControllerFSM implements the finite-state-machine mechanism used in hashicorp/raft
func (*CarnaxControllerFSM) Apply ¶
func (f *CarnaxControllerFSM) Apply(l *raft.Log) interface{}
func (*CarnaxControllerFSM) Restore ¶
func (f *CarnaxControllerFSM) Restore(snapshot io.ReadCloser) error
func (*CarnaxControllerFSM) Snapshot ¶
func (f *CarnaxControllerFSM) Snapshot() (raft.FSMSnapshot, error)
type IndexFile ¶
func IndexFromBytes ¶
type ObjectStore ¶
type ObjectStoreS3 ¶
type ObjectStoreS3 struct {
// contains filtered or unexported fields
}
ObjectStoreS3 this should probably be fixed to a particular bucket
func NewObjectStoreS3 ¶
func NewObjectStoreS3(s3Config aws.Config) *ObjectStoreS3
func (*ObjectStoreS3) Delete ¶
func (o *ObjectStoreS3) Delete(s string) error
func (*ObjectStoreS3) List ¶
func (o *ObjectStoreS3) List(s string) []string
type Properties ¶
type SegmentCache ¶
type SegmentCache struct {
// contains filtered or unexported fields
}
type SegmentKey ¶
type SegmentKey string
func SegmentName ¶
func SegmentName(topic string, partitionIndex uint32, offset uint64) SegmentKey
func (SegmentKey) Format ¶
func (s SegmentKey) Format(typ SegmentType) string
type SegmentLog ¶
type SegmentLog struct {
// contains filtered or unexported fields
}
func NewSegmentLog ¶
func NewSegmentLog() *SegmentLog
func (*SegmentLog) Size ¶
func (s *SegmentLog) Size() uint64
type SegmentLookupPred ¶
func SegmentByTimestamp ¶
func SegmentByTimestamp(store ObjectStore, topic string, partition uint32) SegmentLookupPred
type SegmentPath ¶
type SegmentPath string
func (SegmentPath) Valid ¶
func (s SegmentPath) Valid() bool
type SegmentType ¶
type SegmentType int
const ( SegmentIndex SegmentType = iota SegmentTimeIndex SegmentLogFile )
type TimeIndexFile ¶
func TimeIndexFromBytes ¶
func TimeIndexFromBytes(data []byte) TimeIndexFile
type TopicPartitionHash ¶
type TopicPartitionHash apiv1.TopicPartition
func (*TopicPartitionHash) String ¶
func (t *TopicPartitionHash) String() string
type TopicPartitionSegment ¶
type TopicPartitionSegment struct {
// contains filtered or unexported fields
}
func NewTopicPartitionSegment ¶
func NewTopicPartitionSegment(config CarnaxConfig, start ...uint64) *TopicPartitionSegment
func (*TopicPartitionSegment) CommitRecord ¶
func (s *TopicPartitionSegment) CommitRecord(rec *apiv1.Record, offset uint64)
func (*TopicPartitionSegment) Data ¶
func (s *TopicPartitionSegment) Data() []byte
func (*TopicPartitionSegment) Index ¶
func (s *TopicPartitionSegment) Index() []byte
func (*TopicPartitionSegment) TimeIndex ¶
func (s *TopicPartitionSegment) TimeIndex() []byte
type TopicSegment ¶
type TopicSegment struct {
// contains filtered or unexported fields
}
func NewTopicSegment ¶
func NewTopicSegment(prev ...*TopicSegment) *TopicSegment
Notes ¶
Bugs ¶
We don't handle misses in the offsIndex.