Documentation ¶
Index ¶
- Constants
- Variables
- func GetPrimaryTokenIndex(token Token, clusterSize int, rangesPerToken int) (BrokerIndex, RangeIndex)
- func Intersects(startA, endA, startB, endB Token) bool
- func Murmur3H1(data []byte) int64
- func OrdinalsPlacementOrder(size int) []uint32
- func RangeByTokenAndClusterSize(token Token, index RangeIndex, rangesPerToken int, clusterSize int) (Token, Token)
- type BrokerIndex
- type BrokerInfo
- type BufferBackedWriter
- type Closer
- type ConsumerGroup
- type GenId
- type GenStatus
- type GenVersion
- type Generation
- type GenerationRanges
- type HttpError
- type Initializer
- type Offset
- type OffsetCommitType
- type OffsetResetPolicy
- type OffsetSource
- type OffsetState
- type OffsetStoreKey
- type OffsetStoreKeyValue
- type ProducingError
- type RangeIndex
- type ReplicationInfo
- type Replicator
- type SegmentChunk
- type StringSet
- type Token
- type TokenRanges
- type TopicDataId
- type TopicInfo
- type TopologyInfo
- func (t *TopologyInfo) AmIIncluded() bool
- func (t *TopologyInfo) BrokerByOrdinal(ordinal int) *BrokerInfo
- func (t *TopologyInfo) BrokerByOrdinalList(ordinals []int) []BrokerInfo
- func (t *TopologyInfo) GetIndex(ordinal int) BrokerIndex
- func (t *TopologyInfo) GetToken(index BrokerIndex) Token
- func (t *TopologyInfo) HasBroker(ordinal int) bool
- func (t *TopologyInfo) MyOrdinal() int
- func (t *TopologyInfo) MyToken() Token
- func (t *TopologyInfo) NaturalFollowers(brokerIndex BrokerIndex) []int
- func (t *TopologyInfo) NextBroker() *BrokerInfo
- func (t *TopologyInfo) NextBrokers(index BrokerIndex, length int) []BrokerInfo
- func (t *TopologyInfo) NextIndex() BrokerIndex
- func (t *TopologyInfo) Peers() []BrokerInfo
- func (t *TopologyInfo) PreviousBroker() *BrokerInfo
- func (t *TopologyInfo) PrimaryToken(token Token, ranges int) (Token, BrokerIndex, RangeIndex)
- func (t *TopologyInfo) TotalBrokers() int
- type TrackedConnection
- func (c *TrackedConnection) Close() error
- func (c *TrackedConnection) Id() uuid.UUID
- func (c *TrackedConnection) IsOpen() bool
- func (c *TrackedConnection) LocalAddr() net.Addr
- func (c *TrackedConnection) Read(b []byte) (n int, err error)
- func (c *TrackedConnection) RemoteAddr() net.Addr
- func (c *TrackedConnection) SetDeadline(t time.Time) error
- func (c *TrackedConnection) SetReadDeadline(t time.Time) error
- func (c *TrackedConnection) SetWriteDeadline(t time.Time) error
- func (c *TrackedConnection) Write(b []byte) (n int, err error)
- type TransactionStatus
Constants ¶
const ( MIMETypeJSON = "application/json" MIMETypeNDJSON = "application/x-ndjson" MIMETypeProducerBinary = "application/vnd.polar.producer.frames" // {uint32_length}{bytes}{uint32_length}{bytes}... ContentTypeHeaderKey = "Content-Type" )
const DefaultOffsetResetPolicy = StartFromLatest
const OffsetCompleted = math.MaxInt64
Variables ¶
var GossipGetNotFound = fmt.Errorf("Information not found")
Peer processed the GET request, but didn't found the information that was looking for
Functions ¶
func GetPrimaryTokenIndex ¶
func GetPrimaryTokenIndex(token Token, clusterSize int, rangesPerToken int) (BrokerIndex, RangeIndex)
GetPrimaryTokenIndex returns the broker index of the start token in a given range
func Intersects ¶
func OrdinalsPlacementOrder ¶
OrdinalsPlacementOrder gets a slice of ordinals in the placement order.
e.g. {0, 1, 2} for 3-broker cluster and {0, 3, 1, 4, 2, 5} for a 6-broker cluster.
func RangeByTokenAndClusterSize ¶
func RangeByTokenAndClusterSize(token Token, index RangeIndex, rangesPerToken int, clusterSize int) (Token, Token)
Gets the start and end of a range based on the token, range index and cluster size. Note that for the last range, the end will be `MaxInt64`.
Types ¶
type BrokerIndex ¶
type BrokerIndex int
BrokerIndex represents the position of a broker in the current broker list. It's exposed as different type to avoid mixing it up w/ Ordinal (replica number)
e.g. in a cluster composed of {0, 3, 1, 4, 2, 3}, the index of 3 is 1.
type BrokerInfo ¶
type BrokerInfo struct { // IsSelf determines whether the broker refers to this instance IsSelf bool // Ordinal represents the unique identifier of the broker in a cluster Ordinal int // HostName contains the reachable host name of the broker, i.e. "broker-1" HostName string }
BrokerInfo contains information about a broker
func (*BrokerInfo) String ¶
func (b *BrokerInfo) String() string
type BufferBackedWriter ¶
type BufferBackedWriter interface { io.Writer io.StringWriter // Bytes returns a slice holding the unread portion of the internal buffer. // The slice aliases the buffer content at least until the next buffer modification. Bytes() []byte }
Represents a writer backed by an in-memory buffer
type ConsumerGroup ¶
type ConsumerGroup struct { Name string `json:"name"` Ids []string `json:"ids"` Topics []string `json:"topics"` OnNewGroup OffsetResetPolicy `json:"onNewGroup"` }
ConsumerGroup contains info about a single group of consumers. It's used as an interbroker message to send snapshot of the local view of consumers to other brokers.
type GenId ¶
type GenId struct { Start Token `json:"start"` // Start token of the generation Version GenVersion `json:"version"` }
Represents a unique reference to a generation
type GenStatus ¶
type GenStatus int
GenStatus determines the state (proposed, accepted, ...) of the status
type GenVersion ¶
type GenVersion uint32
func (GenVersion) String ¶
func (v GenVersion) String() string
type Generation ¶
type Generation struct { Start Token `json:"start"` End Token `json:"end"` Version GenVersion `json:"version"` Timestamp int64 `json:"timestamp"` // In unix micros Leader int `json:"leader"` // The ordinal of the leader Followers []int `json:"followers"` // Follower ordinals TxLeader int `json:"txLeader"` // The originator of the transaction Tx uuid.UUID `json:"tx"` Status GenStatus `json:"status"` ToDelete bool `json:"toDelete"` Parents []GenId `json:"parents"` ClusterSize int `json:"clusterSize"` // The size of the cluster at the time of the generation creation }
func (*Generation) Time ¶
func (g *Generation) Time() time.Time
Time() returns the timestamp expressed as a time.Time
type GenerationRanges ¶
type GenerationRanges struct { Generation *Generation Indices []RangeIndex }
Represents a point-in-time set of ranges
type Initializer ¶
type Initializer interface {
Init() error
}
type Offset ¶
type Offset struct { Version GenVersion `json:"version"` // Generation version of the offset ClusterSize int `json:"clusterSize"` // Cluster size of the gen version Offset int64 `json:"value"` // Numerical offset value Token Token `json:"token"` // The start token of the offset generation Index RangeIndex `json:"index"` // The range index of the offset Source OffsetSource `json:"source"` // The point-in-time when the offset was recorded. }
Represents a topic offset for a given token.
func NewDefaultOffset ¶
func NewDefaultOffset(topic *TopicDataId, clusterSize int, value int64) Offset
func NewOffset ¶
func NewOffset(topic *TopicDataId, clusterSize int, source GenId, value int64) Offset
type OffsetCommitType ¶
type OffsetCommitType int
const ( OffsetCommitNone OffsetCommitType = iota OffsetCommitLocal OffsetCommitAll )
type OffsetResetPolicy ¶
type OffsetResetPolicy int
const ( StartFromLatest OffsetResetPolicy = iota StartFromEarliest )
func ParseOffsetResetPolicy ¶
func ParseOffsetResetPolicy(text string) (OffsetResetPolicy, error)
func (OffsetResetPolicy) String ¶
func (p OffsetResetPolicy) String() string
type OffsetSource ¶
type OffsetSource struct { Id GenId `json:"id"` // Gen id of the source Timestamp int64 `json:"ts"` // Timestamp in Unix Micros }
func NewOffsetSource ¶
func NewOffsetSource(id GenId) OffsetSource
type OffsetState ¶
type OffsetState interface { Initializer fmt.Stringer // Gets the offset value for a given group and range. // Returns nil, false when not found // // The caller MUST check whether the current broker can serve the data when ranges don't match // The caller MUST check whether the consumer is assigned when ranges don't match Get(group string, topic string, token Token, index RangeIndex, clusterSize int) (offset *Offset, rangesMatch bool) // Gets offset values in order for a given group and range with defaults values. // // The caller MUST check whether the current broker can serve the data and that the consumer is assigned. GetAllWithDefaults( group string, topic string, token Token, rangeIndex RangeIndex, clusterSize int, policy OffsetResetPolicy) []Offset // Sets the known offset value in memory, optionally committing it to the data store Set(group string, topic string, value Offset, commit OffsetCommitType) bool // Returns the max produced offset from local and peers. // When it can not be found, it returns a negative value. // When there's an unexpected error on local and peers, it returns an error MaxProducedOffset(topicId *TopicDataId) (int64, error) }
Represents a local view of the consumer group offsets
type OffsetStoreKey ¶
Represents an identifier of an offset to be persisted
type OffsetStoreKeyValue ¶
type OffsetStoreKeyValue struct { Key OffsetStoreKey `json:"key"` Value Offset `json:"value"` }
Represents an identifier and a value of an offset
type ProducingError ¶
Represents an error while producing that we are certain it caused side effect in the data store.
func NewNoWriteAttemptedError ¶
func NewNoWriteAttemptedError(message string, a ...interface{}) ProducingError
type RangeIndex ¶
type RangeIndex uint8
Represents an index in the token range
func (RangeIndex) String ¶
func (t RangeIndex) String() string
type ReplicationInfo ¶
type ReplicationInfo struct { Leader *BrokerInfo // Determines the leader of the replication plan, it can be nil when not determined Followers []BrokerInfo Token Token RangeIndex RangeIndex }
func NewReplicationInfo ¶
func NewReplicationInfo(topology *TopologyInfo, token Token, leader int, followers []int, index RangeIndex) ReplicationInfo
type Replicator ¶
type Replicator interface { // Sends a message to be stored as replica of current broker's datalog SendToFollowers( replicationInfo ReplicationInfo, topic TopicDataId, segmentId int64, chunk SegmentChunk) error }
Replicator contains logic to send data to replicas
type SegmentChunk ¶
SegmentChunk represents a group of compressed records.
type StringSet ¶
func (*StringSet) ToSortedSlice ¶
type TokenRanges ¶
type TokenRanges struct { Token Token ClusterSize int Indices []RangeIndex }
Represents slices of the token range between two tokens
func ProjectRangeByClusterSize ¶
func ProjectRangeByClusterSize( token Token, index RangeIndex, rangesPerToken int, clusterSize int, newClusterSize int, ) []*TokenRanges
type TopicDataId ¶
type TopicDataId struct { Name string `json:"topic"` Token Token `json:"token"` RangeIndex RangeIndex `json:"rangeIndex"` Version GenVersion `json:"version"` }
TopicDataId contains information to locate a certain piece of data.
Specifies a topic, for a token, for a defined gen id.
func (*TopicDataId) GenId ¶
func (t *TopicDataId) GenId() GenId
func (*TopicDataId) String ¶
func (t *TopicDataId) String() string
type TopologyInfo ¶
type TopologyInfo struct { Brokers []BrokerInfo // Brokers ordered by index (e.g. 0,3,1,4,2,5) LocalIndex BrokerIndex // Index of the current broker relative to this topology instance // contains filtered or unexported fields }
TopologyInfo represents a snapshot of the current placement of the brokers
func NewDevTopology ¶
func NewDevTopology() *TopologyInfo
func NewTopology ¶
func NewTopology(brokersByOrdinal []BrokerInfo, myOrdinal int) TopologyInfo
NewTopology creates a Topology struct using brokers in ordinal order.
func (*TopologyInfo) AmIIncluded ¶
func (t *TopologyInfo) AmIIncluded() bool
Returns true when my ordinal is included in the topology
func (*TopologyInfo) BrokerByOrdinal ¶
func (t *TopologyInfo) BrokerByOrdinal(ordinal int) *BrokerInfo
BrokerByOrdinal gets the broker by a given ordinal.
It returns nil when not found.
func (*TopologyInfo) BrokerByOrdinalList ¶
func (t *TopologyInfo) BrokerByOrdinalList(ordinals []int) []BrokerInfo
BrokerByOrdinal gets the broker by a given ordinal.
func (*TopologyInfo) GetIndex ¶
func (t *TopologyInfo) GetIndex(ordinal int) BrokerIndex
GetIndex gets the position of the broker in the broker slice.
It returns NotFoundIndex when not found.
func (*TopologyInfo) GetToken ¶
func (t *TopologyInfo) GetToken(index BrokerIndex) Token
GetToken gets the token by the broker index.
func (*TopologyInfo) HasBroker ¶
func (t *TopologyInfo) HasBroker(ordinal int) bool
Returns true when my ordinal is included in the topology
func (*TopologyInfo) MyOrdinal ¶
func (t *TopologyInfo) MyOrdinal() int
MyOrdinal gets the current broker's ordinal.
func (*TopologyInfo) MyToken ¶
func (t *TopologyInfo) MyToken() Token
MyToken gets the natural token based on the current broker index.
func (*TopologyInfo) NaturalFollowers ¶
func (t *TopologyInfo) NaturalFollowers(brokerIndex BrokerIndex) []int
NaturalFollowers gets the ordinals of the brokers at position n+1 and n+2
func (*TopologyInfo) NextBroker ¶
func (t *TopologyInfo) NextBroker() *BrokerInfo
NextBroker returns the broker in the position n+1
func (*TopologyInfo) NextBrokers ¶
func (t *TopologyInfo) NextBrokers(index BrokerIndex, length int) []BrokerInfo
NextBrokers returns the broker in the position n+1, n+2, n...
func (*TopologyInfo) NextIndex ¶
func (t *TopologyInfo) NextIndex() BrokerIndex
NextBroker returns the broker in the position n+1
func (*TopologyInfo) Peers ¶
func (t *TopologyInfo) Peers() []BrokerInfo
Returns list of all brokers except itself
func (*TopologyInfo) PreviousBroker ¶
func (t *TopologyInfo) PreviousBroker() *BrokerInfo
PreviousBroker returns the broker in the position n-1 It panics when my ordinal is not included in the Topology
func (*TopologyInfo) PrimaryToken ¶
func (t *TopologyInfo) PrimaryToken(token Token, ranges int) (Token, BrokerIndex, RangeIndex)
Returns the primary token (start of the range), BrokerIndex and Range index for a given token
func (*TopologyInfo) TotalBrokers ¶
func (t *TopologyInfo) TotalBrokers() int
Returns the amount of brokers in the topology
type TrackedConnection ¶
type TrackedConnection struct {
// contains filtered or unexported fields
}
TrackedConnection represents a net connection over http that contains information about whether it's open/closed.
It benefits from the fact that a Transport will invoke `Close()` when a http request or http2 ping fails
func NewTrackedConnection ¶
func NewTrackedConnection(conn net.Conn, closeHandler func(*TrackedConnection)) *TrackedConnection
Creates a new TrackedConnection using the provided tcp conn. It invokes the close handler once it's closed.
func (*TrackedConnection) Close ¶
func (c *TrackedConnection) Close() error
func (*TrackedConnection) Id ¶
func (c *TrackedConnection) Id() uuid.UUID
func (*TrackedConnection) IsOpen ¶
func (c *TrackedConnection) IsOpen() bool
IsOpen() returns true when the connection is known to be open.
func (*TrackedConnection) LocalAddr ¶
func (c *TrackedConnection) LocalAddr() net.Addr
func (*TrackedConnection) RemoteAddr ¶
func (c *TrackedConnection) RemoteAddr() net.Addr
func (*TrackedConnection) SetDeadline ¶
func (c *TrackedConnection) SetDeadline(t time.Time) error
func (*TrackedConnection) SetReadDeadline ¶
func (c *TrackedConnection) SetReadDeadline(t time.Time) error
func (*TrackedConnection) SetWriteDeadline ¶
func (c *TrackedConnection) SetWriteDeadline(t time.Time) error
type TransactionStatus ¶
type TransactionStatus int
const ( TransactionStatusCancelled TransactionStatus = iota TransactionStatusCommitted )