types

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2023 License: AGPL-3.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MIMETypeJSON           = "application/json"
	MIMETypeNDJSON         = "application/x-ndjson"
	MIMETypeProducerBinary = "application/vnd.polar.producer.frames" // {uint32_length}{bytes}{uint32_length}{bytes}...
	ContentTypeHeaderKey   = "Content-Type"
)
View Source
const DefaultOffsetResetPolicy = StartFromLatest
View Source
const OffsetCompleted = math.MaxInt64

Variables

View Source
var GossipGetNotFound = fmt.Errorf("Information not found")

Peer processed the GET request, but didn't found the information that was looking for

Functions

func GetPrimaryTokenIndex

func GetPrimaryTokenIndex(token Token, clusterSize int, rangesPerToken int) (BrokerIndex, RangeIndex)

GetPrimaryTokenIndex returns the broker index of the start token in a given range

func Intersects

func Intersects(startA, endA, startB, endB Token) bool

func Murmur3H1

func Murmur3H1(data []byte) int64

func OrdinalsPlacementOrder

func OrdinalsPlacementOrder(size int) []uint32

OrdinalsPlacementOrder gets a slice of ordinals in the placement order.

e.g. {0, 1, 2} for 3-broker cluster and {0, 3, 1, 4, 2, 5} for a 6-broker cluster.

func RangeByTokenAndClusterSize

func RangeByTokenAndClusterSize(token Token, index RangeIndex, rangesPerToken int, clusterSize int) (Token, Token)

Gets the start and end of a range based on the token, range index and cluster size. Note that for the last range, the end will be `MaxInt64`.

Types

type BrokerIndex

type BrokerIndex int

BrokerIndex represents the position of a broker in the current broker list. It's exposed as different type to avoid mixing it up w/ Ordinal (replica number)

e.g. in a cluster composed of {0, 3, 1, 4, 2, 3}, the index of 3 is 1.

type BrokerInfo

type BrokerInfo struct {
	// IsSelf determines whether the broker refers to this instance
	IsSelf bool
	// Ordinal represents the unique identifier of the broker in a cluster
	Ordinal int
	// HostName contains the reachable host name of the broker, i.e. "broker-1"
	HostName string
}

BrokerInfo contains information about a broker

func (*BrokerInfo) String

func (b *BrokerInfo) String() string

type BufferBackedWriter

type BufferBackedWriter interface {
	io.Writer
	io.StringWriter

	// Bytes returns a slice holding the unread portion of the internal buffer.
	// The slice aliases the buffer content at least until the next buffer modification.
	Bytes() []byte
}

Represents a writer backed by an in-memory buffer

type Closer

type Closer interface {
	Close()
}

type ConsumerGroup

type ConsumerGroup struct {
	Name       string            `json:"name"`
	Ids        []string          `json:"ids"`
	Topics     []string          `json:"topics"`
	OnNewGroup OffsetResetPolicy `json:"onNewGroup"`
}

ConsumerGroup contains info about a single group of consumers. It's used as an interbroker message to send snapshot of the local view of consumers to other brokers.

type GenId

type GenId struct {
	Start   Token      `json:"start"` // Start token of the generation
	Version GenVersion `json:"version"`
}

Represents a unique reference to a generation

func (GenId) String

func (id GenId) String() string

type GenStatus

type GenStatus int

GenStatus determines the state (proposed, accepted, ...) of the status

const (
	StatusCancelled GenStatus = iota
	StatusProposed
	StatusAccepted
	StatusCommitted
)

func (GenStatus) String

func (s GenStatus) String() string

type GenVersion

type GenVersion uint32

func (GenVersion) String

func (v GenVersion) String() string

type Generation

type Generation struct {
	Start       Token      `json:"start"`
	End         Token      `json:"end"`
	Version     GenVersion `json:"version"`
	Timestamp   int64      `json:"timestamp"` // In unix micros
	Leader      int        `json:"leader"`    // The ordinal of the leader
	Followers   []int      `json:"followers"` // Follower ordinals
	TxLeader    int        `json:"txLeader"`  // The originator of the transaction
	Tx          uuid.UUID  `json:"tx"`
	Status      GenStatus  `json:"status"`
	ToDelete    bool       `json:"toDelete"`
	Parents     []GenId    `json:"parents"`
	ClusterSize int        `json:"clusterSize"` // The size of the cluster at the time of the generation creation
}

func (*Generation) Id

func (g *Generation) Id() GenId

Gets the identifier of the generation

func (*Generation) Time

func (g *Generation) Time() time.Time

Time() returns the timestamp expressed as a time.Time

type GenerationRanges

type GenerationRanges struct {
	Generation *Generation
	Indices    []RangeIndex
}

Represents a point-in-time set of ranges

type HttpError

type HttpError interface {
	error
	StatusCode() int
}

func NewHttpError

func NewHttpError(code int, message string) HttpError

func NewHttpErrorf

func NewHttpErrorf(code int, message string, a ...interface{}) HttpError

type Initializer

type Initializer interface {
	Init() error
}

type Offset

type Offset struct {
	Version     GenVersion   `json:"version"`     // Generation version of the offset
	ClusterSize int          `json:"clusterSize"` // Cluster size of the gen version
	Offset      int64        `json:"value"`       // Numerical offset value
	Token       Token        `json:"token"`       // The start token of the offset generation
	Index       RangeIndex   `json:"index"`       // The range index of the offset
	Source      OffsetSource `json:"source"`      // The point-in-time when the offset was recorded.
}

Represents a topic offset for a given token.

func NewDefaultOffset

func NewDefaultOffset(topic *TopicDataId, clusterSize int, value int64) Offset

func NewOffset

func NewOffset(topic *TopicDataId, clusterSize int, source GenId, value int64) Offset

func (*Offset) Equals

func (o *Offset) Equals(other *Offset) bool

Compares the values in the struct, except the source

func (*Offset) GenId

func (o *Offset) GenId() GenId

func (*Offset) String

func (o *Offset) String() string

type OffsetCommitType

type OffsetCommitType int
const (
	OffsetCommitNone OffsetCommitType = iota
	OffsetCommitLocal
	OffsetCommitAll
)

type OffsetResetPolicy

type OffsetResetPolicy int
const (
	StartFromLatest OffsetResetPolicy = iota
	StartFromEarliest
)

func ParseOffsetResetPolicy

func ParseOffsetResetPolicy(text string) (OffsetResetPolicy, error)

func (OffsetResetPolicy) String

func (p OffsetResetPolicy) String() string

type OffsetSource

type OffsetSource struct {
	Id        GenId `json:"id"` // Gen id of the source
	Timestamp int64 `json:"ts"` // Timestamp in Unix Micros
}

func NewOffsetSource

func NewOffsetSource(id GenId) OffsetSource

type OffsetState

type OffsetState interface {
	Initializer
	fmt.Stringer

	// Gets the offset value for a given group and range.
	// Returns nil, false when not found
	//
	// The caller MUST check whether the current broker can serve the data when ranges don't match
	// The caller MUST check whether the consumer is assigned when ranges don't match
	Get(group string, topic string, token Token, index RangeIndex, clusterSize int) (offset *Offset, rangesMatch bool)

	// Gets offset values in order for a given group and range with defaults values.
	//
	// The caller MUST check whether the current broker can serve the data and that the consumer is assigned.
	GetAllWithDefaults(
		group string,
		topic string,
		token Token,
		rangeIndex RangeIndex,
		clusterSize int,
		policy OffsetResetPolicy) []Offset

	// Sets the known offset value in memory, optionally committing it to the data store
	Set(group string, topic string, value Offset, commit OffsetCommitType) bool

	// Returns the max produced offset from local and peers.
	// When it can not be found, it returns a negative value.
	// When there's an unexpected  error on local and peers, it returns an error
	MaxProducedOffset(topicId *TopicDataId) (int64, error)
}

Represents a local view of the consumer group offsets

type OffsetStoreKey

type OffsetStoreKey struct {
	Group string `json:"group"`
	Topic string `json:"topic"`
}

Represents an identifier of an offset to be persisted

type OffsetStoreKeyValue

type OffsetStoreKeyValue struct {
	Key   OffsetStoreKey `json:"key"`
	Value Offset         `json:"value"`
}

Represents an identifier and a value of an offset

type ProducingError

type ProducingError interface {
	error

	WasWriteAttempted() bool
}

Represents an error while producing that we are certain it caused side effect in the data store.

func NewNoWriteAttemptedError

func NewNoWriteAttemptedError(message string, a ...interface{}) ProducingError

type RangeIndex

type RangeIndex uint8

Represents an index in the token range

func (RangeIndex) String

func (t RangeIndex) String() string

type ReplicationInfo

type ReplicationInfo struct {
	Leader     *BrokerInfo // Determines the leader of the replication plan, it can be nil when not determined
	Followers  []BrokerInfo
	Token      Token
	RangeIndex RangeIndex
}

func NewReplicationInfo

func NewReplicationInfo(topology *TopologyInfo, token Token, leader int, followers []int, index RangeIndex) ReplicationInfo

type Replicator

type Replicator interface {
	// Sends a message to be stored as replica of current broker's datalog
	SendToFollowers(
		replicationInfo ReplicationInfo,
		topic TopicDataId,
		segmentId int64,
		chunk SegmentChunk) error
}

Replicator contains logic to send data to replicas

type SegmentChunk

type SegmentChunk interface {
	DataBlock() []byte
	StartOffset() int64
	RecordLength() uint32
}

SegmentChunk represents a group of compressed records.

type StringSet

type StringSet map[string]bool

func (*StringSet) Add

func (s *StringSet) Add(values ...string)

func (*StringSet) ToSlice

func (s *StringSet) ToSlice() []string

func (*StringSet) ToSortedSlice

func (s *StringSet) ToSortedSlice() []string

type Token

type Token int64

Represents a partition token

const StartToken Token = math.MinInt64

func GetTokenAtIndex

func GetTokenAtIndex(length int, index int) Token

func HashToken

func HashToken(key string) Token

Gets a token based on a murmur3 hash

func (Token) String

func (t Token) String() string

type TokenRanges

type TokenRanges struct {
	Token       Token
	ClusterSize int
	Indices     []RangeIndex
}

Represents slices of the token range between two tokens

func ProjectRangeByClusterSize

func ProjectRangeByClusterSize(
	token Token,
	index RangeIndex,
	rangesPerToken int,
	clusterSize int,
	newClusterSize int,
) []*TokenRanges

type TopicDataId

type TopicDataId struct {
	Name       string     `json:"topic"`
	Token      Token      `json:"token"`
	RangeIndex RangeIndex `json:"rangeIndex"`
	Version    GenVersion `json:"version"`
}

TopicDataId contains information to locate a certain piece of data.

Specifies a topic, for a token, for a defined gen id.

func (*TopicDataId) GenId

func (t *TopicDataId) GenId() GenId

func (*TopicDataId) String

func (t *TopicDataId) String() string

type TopicInfo

type TopicInfo struct {
	Name string
}

type TopologyInfo

type TopologyInfo struct {
	Brokers    []BrokerInfo // Brokers ordered by index (e.g. 0,3,1,4,2,5)
	LocalIndex BrokerIndex  // Index of the current broker relative to this topology instance
	// contains filtered or unexported fields
}

TopologyInfo represents a snapshot of the current placement of the brokers

func NewDevTopology

func NewDevTopology() *TopologyInfo

func NewTopology

func NewTopology(brokersByOrdinal []BrokerInfo, myOrdinal int) TopologyInfo

NewTopology creates a Topology struct using brokers in ordinal order.

func (*TopologyInfo) AmIIncluded

func (t *TopologyInfo) AmIIncluded() bool

Returns true when my ordinal is included in the topology

func (*TopologyInfo) BrokerByOrdinal

func (t *TopologyInfo) BrokerByOrdinal(ordinal int) *BrokerInfo

BrokerByOrdinal gets the broker by a given ordinal.

It returns nil when not found.

func (*TopologyInfo) BrokerByOrdinalList

func (t *TopologyInfo) BrokerByOrdinalList(ordinals []int) []BrokerInfo

BrokerByOrdinal gets the broker by a given ordinal.

func (*TopologyInfo) GetIndex

func (t *TopologyInfo) GetIndex(ordinal int) BrokerIndex

GetIndex gets the position of the broker in the broker slice.

It returns NotFoundIndex when not found.

func (*TopologyInfo) GetToken

func (t *TopologyInfo) GetToken(index BrokerIndex) Token

GetToken gets the token by the broker index.

func (*TopologyInfo) HasBroker

func (t *TopologyInfo) HasBroker(ordinal int) bool

Returns true when my ordinal is included in the topology

func (*TopologyInfo) MyOrdinal

func (t *TopologyInfo) MyOrdinal() int

MyOrdinal gets the current broker's ordinal.

func (*TopologyInfo) MyToken

func (t *TopologyInfo) MyToken() Token

MyToken gets the natural token based on the current broker index.

func (*TopologyInfo) NaturalFollowers

func (t *TopologyInfo) NaturalFollowers(brokerIndex BrokerIndex) []int

NaturalFollowers gets the ordinals of the brokers at position n+1 and n+2

func (*TopologyInfo) NextBroker

func (t *TopologyInfo) NextBroker() *BrokerInfo

NextBroker returns the broker in the position n+1

func (*TopologyInfo) NextBrokers

func (t *TopologyInfo) NextBrokers(index BrokerIndex, length int) []BrokerInfo

NextBrokers returns the broker in the position n+1, n+2, n...

func (*TopologyInfo) NextIndex

func (t *TopologyInfo) NextIndex() BrokerIndex

NextBroker returns the broker in the position n+1

func (*TopologyInfo) Peers

func (t *TopologyInfo) Peers() []BrokerInfo

Returns list of all brokers except itself

func (*TopologyInfo) PreviousBroker

func (t *TopologyInfo) PreviousBroker() *BrokerInfo

PreviousBroker returns the broker in the position n-1 It panics when my ordinal is not included in the Topology

func (*TopologyInfo) PrimaryToken

func (t *TopologyInfo) PrimaryToken(token Token, ranges int) (Token, BrokerIndex, RangeIndex)

Returns the primary token (start of the range), BrokerIndex and Range index for a given token

func (*TopologyInfo) TotalBrokers

func (t *TopologyInfo) TotalBrokers() int

Returns the amount of brokers in the topology

type TrackedConnection

type TrackedConnection struct {
	// contains filtered or unexported fields
}

TrackedConnection represents a net connection over http that contains information about whether it's open/closed.

It benefits from the fact that a Transport will invoke `Close()` when a http request or http2 ping fails

func NewTrackedConnection

func NewTrackedConnection(conn net.Conn, closeHandler func(*TrackedConnection)) *TrackedConnection

Creates a new TrackedConnection using the provided tcp conn. It invokes the close handler once it's closed.

func (*TrackedConnection) Close

func (c *TrackedConnection) Close() error

func (*TrackedConnection) Id

func (c *TrackedConnection) Id() uuid.UUID

func (*TrackedConnection) IsOpen

func (c *TrackedConnection) IsOpen() bool

IsOpen() returns true when the connection is known to be open.

func (*TrackedConnection) LocalAddr

func (c *TrackedConnection) LocalAddr() net.Addr

func (*TrackedConnection) Read

func (c *TrackedConnection) Read(b []byte) (n int, err error)

func (*TrackedConnection) RemoteAddr

func (c *TrackedConnection) RemoteAddr() net.Addr

func (*TrackedConnection) SetDeadline

func (c *TrackedConnection) SetDeadline(t time.Time) error

func (*TrackedConnection) SetReadDeadline

func (c *TrackedConnection) SetReadDeadline(t time.Time) error

func (*TrackedConnection) SetWriteDeadline

func (c *TrackedConnection) SetWriteDeadline(t time.Time) error

func (*TrackedConnection) Write

func (c *TrackedConnection) Write(b []byte) (n int, err error)

type TransactionStatus

type TransactionStatus int
const (
	TransactionStatusCancelled TransactionStatus = iota
	TransactionStatusCommitted
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL