api

package
v0.0.0-...-db7883e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2024 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const SegmentIncrement = 1

SegmentIncrement is the additional padding to enforce new segments to always be incremented by an offset of 1. this introduces an idea of 'time' in segments rather as, if we write a message to the current segment and do not publish anything to it, by the time we have to roll the next segment (every 250ms by default) we will over-write the segment due to the starting offset being identical.

Variables

View Source
var (
	ErrNoIndexFound  = errors.New("no index found")
	ErrNotALeader    = errors.New("not a leader")
	ErrTopicNotFound = errors.New("topic not found")
)
View Source
var (
	KiB = 1024
	MiB = 1 * KiB
)
View Source
var DefaultCarnaxConfig = ParseFromProperties(Properties{
	"log.segment.bytes": 8 * MiB,
	"segment.ms":        int(250 * time.Millisecond.Milliseconds()),
	"bootstrap.servers": "",
})

Functions

func NewCarnaxRaft

func NewCarnaxRaft(raftDir string, retainSnapshotCount int, raftId string, myAddr string, msgLog *CarnaxController, raftBootstrap bool) (*raft.Raft, *transport.Manager)

NewCarnaxRaft ... this is a bit messy

func ValidateTopicConfig

func ValidateTopicConfig(config *apiv1.TopicConfig) ([]error, bool)

ValidateTopicConfig will run some validations on the given TopicConfig returns any errors and a boolean value signalling if the topic config is valid (i.e. OK).

Types

type CarnaxConfig

type CarnaxConfig struct {
	BootstrapServers []string
	ExecutionMode    ExecutionMode

	MaxPollRecords int

	SegmentIndexBytes int
	SegmentJitterMs   int
	SegmentMs         int

	MaxMessageBytes int

	IndexIntervalBytes    int
	LogIndexIntervalBytes int
	LogSegmentBytes       int
	// contains filtered or unexported fields
}

func ParseFromProperties

func ParseFromProperties(props Properties) CarnaxConfig

type CarnaxController

type CarnaxController struct {
	// raft consensus related bits
	RaftBind string
	RaftDir  string
	// contains filtered or unexported fields
}

CarnaxController ...

func NewCarnaxControllerWithConfig

func NewCarnaxControllerWithConfig(store ObjectStore, config CarnaxConfig) *CarnaxController

func (*CarnaxController) CommitSync

func (m *CarnaxController) CommitSync(consumerGroupId string) error

func (*CarnaxController) CreateTopic

func (m *CarnaxController) CreateTopic(config *apiv1.TopicConfig) error

func (*CarnaxController) Flush

func (m *CarnaxController) Flush() error

Flush will flush the open segment into the object-storeBackedLog.

func (*CarnaxController) OffsetsForTimes

func (m *CarnaxController) OffsetsForTimes(id string, clientId string, m2 map[*TopicPartitionHash]*apiv1.SeekIndex)

OffsetsForTimes ... default.api.timeout.ms

func (*CarnaxController) Poll

func (m *CarnaxController) Poll(consumerGroupId string, clientId string, _ time.Duration) (*controllerv1.Poll_Response, error)

Poll ...

This signature is inconsistent in how it returns information as opposed to the rest of the API

nit: we should respect the state of the consumer group node here e.g. if it's in a rebalance or not.

func (*CarnaxController) Shutdown

func (m *CarnaxController) Shutdown() error

func (*CarnaxController) Start

func (m *CarnaxController) Start(raftInst *raft.Raft) error

func (*CarnaxController) Subscribe

func (m *CarnaxController) Subscribe(id string, clientId string, topics ...string) (*apiv1.ConsumerGroupNode, error)

Subscribe ...

NOTE: Ideally the client.id could just be generated as an uuid from the API wrapper.

func (*CarnaxController) Write

func (m *CarnaxController) Write(topic string, record *apiv1.Record) (*commandv1.Address, error)

Write is a synchronous write to the log it is a fire-and-forget write and does not respond with any acknowledgement

type CarnaxControllerFSM

type CarnaxControllerFSM CarnaxController

CarnaxControllerFSM implements the finite-state-machine mechanism used in hashicorp/raft

func (*CarnaxControllerFSM) Apply

func (f *CarnaxControllerFSM) Apply(l *raft.Log) interface{}

func (*CarnaxControllerFSM) Restore

func (f *CarnaxControllerFSM) Restore(snapshot io.ReadCloser) error

func (*CarnaxControllerFSM) Snapshot

func (f *CarnaxControllerFSM) Snapshot() (raft.FSMSnapshot, error)

type ExecutionMode

type ExecutionMode int
const (
	SingleNode ExecutionMode = iota
	MultiNode
)

type IndexFile

type IndexFile []*apiv1.Index

func IndexFromBytes

func IndexFromBytes(data []byte) IndexFile

func (IndexFile) Search

func (i IndexFile) Search(offs uint64) *apiv1.Index

type ObjectStore

type ObjectStore interface {
	Put(string, []byte) error
	Get(string) ([]byte, error)
	Delete(string) error
	List(string) []string
}

type ObjectStoreS3

type ObjectStoreS3 struct {
	// contains filtered or unexported fields
}

ObjectStoreS3 this should probably be fixed to a particular bucket

func NewObjectStoreS3

func NewObjectStoreS3(s3Config aws.Config) *ObjectStoreS3

func (*ObjectStoreS3) Delete

func (o *ObjectStoreS3) Delete(s string) error

func (*ObjectStoreS3) Get

func (o *ObjectStoreS3) Get(s string) ([]byte, error)

func (*ObjectStoreS3) List

func (o *ObjectStoreS3) List(s string) []string

func (*ObjectStoreS3) Put

func (o *ObjectStoreS3) Put(s string, data []byte) error

type Properties

type Properties map[string]any

type SegmentCache

type SegmentCache struct {
	// contains filtered or unexported fields
}

type SegmentKey

type SegmentKey string

func SegmentName

func SegmentName(topic string, partitionIndex uint32, offset uint64) SegmentKey

func (SegmentKey) Format

func (s SegmentKey) Format(typ SegmentType) string

type SegmentLog

type SegmentLog struct {
	// contains filtered or unexported fields
}

func NewSegmentLog

func NewSegmentLog() *SegmentLog

func (*SegmentLog) Size

func (s *SegmentLog) Size() uint64

type SegmentLookupPred

type SegmentLookupPred func(u uint64, ts int64) bool

func SegmentByTimestamp

func SegmentByTimestamp(store ObjectStore, topic string, partition uint32) SegmentLookupPred

type SegmentPath

type SegmentPath string

func (SegmentPath) Valid

func (s SegmentPath) Valid() bool

type SegmentType

type SegmentType int
const (
	SegmentIndex SegmentType = iota
	SegmentTimeIndex
	SegmentLogFile
)

type TimeIndexFile

type TimeIndexFile []*apiv1.TimeIndex

func TimeIndexFromBytes

func TimeIndexFromBytes(data []byte) TimeIndexFile

type TopicPartitionHash

type TopicPartitionHash apiv1.TopicPartition

func (*TopicPartitionHash) String

func (t *TopicPartitionHash) String() string

type TopicPartitionSegment

type TopicPartitionSegment struct {
	// contains filtered or unexported fields
}

func NewTopicPartitionSegment

func NewTopicPartitionSegment(config CarnaxConfig, start ...uint64) *TopicPartitionSegment

func (*TopicPartitionSegment) CommitRecord

func (s *TopicPartitionSegment) CommitRecord(rec *apiv1.Record, offset uint64)

func (*TopicPartitionSegment) Data

func (s *TopicPartitionSegment) Data() []byte

func (*TopicPartitionSegment) Index

func (s *TopicPartitionSegment) Index() []byte

func (*TopicPartitionSegment) TimeIndex

func (s *TopicPartitionSegment) TimeIndex() []byte

type TopicSegment

type TopicSegment struct {
	// contains filtered or unexported fields
}

func NewTopicSegment

func NewTopicSegment(prev ...*TopicSegment) *TopicSegment

Notes

Bugs

  • We don't handle misses in the offsIndex.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL