interbroker

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2023 License: AGPL-3.0 Imports: 33 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerGroupInfoMessage

type ConsumerGroupInfoMessage struct {
	Groups []ConsumerGroup `json:"groups"`
	Origin int             `json:"origin"` // The ordinal of the sender
}

type ConsumerInfoListener

type ConsumerInfoListener interface {
	OnConsumerInfoFromPeer(ordinal int, groups []ConsumerGroup)

	OnOffsetFromPeer(kv *OffsetStoreKeyValue)

	// Invoked when a consumer should be registered as a result of a peer request
	OnRegisterFromPeer(id string, group string, topics []string, onNewGroup OffsetResetPolicy) error

	// Invoked when a consumer offset should be committed locally as a result of a peer request
	OnCommitFromPeer(id string) error

	OnUnregisterFromPeer(id string) error
}

type ConsumerRegisterMessage

type ConsumerRegisterMessage struct {
	Id         string            `json:"id"`
	Group      string            `json:"group"`
	Topics     []string          `json:"topics"`
	OnNewGroup OffsetResetPolicy `json:"onNewGroup"`
}

type GenListener

type GenListener interface {
	OnRemoteSetAsProposed(newGen *Generation, newGen2 *Generation, expectedTx *UUID) error

	OnRemoteSetAsCommitted(token1 Token, token2 *Token, tx UUID, origin int) error

	OnRemoteRangeSplitStart(origin int) error

	// Invoked when scaling down is detected and ranges need to be joined
	OnJoinRange(previousTopology *TopologyInfo, topology *TopologyInfo)
}

Represents a gossip listener to generation-related messages

type GenReadResult

type GenReadResult struct {
	Committed *Generation
	Proposed  *Generation
	Error     error
}

type GenerationCommitMessage

type GenerationCommitMessage struct {
	Tx     UUID   `json:"tx"`
	Token1 Token  `json:"token1"`
	Token2 *Token `json:"token2,omitempty"`
	Origin int    `json:"origin"` // The ordinal of the originator of the transaction
}

GenerationCommitMessage the interbroker api json message for committing a generation to another broker.

type GenerationGossiper

type GenerationGossiper interface {
	// GetGenerations gets the generations for a given token on a peer
	GetGenerations(ordinal int, token Token) GenReadResult

	// IsTokenRangeCovered sends a request to the peer to determine whether the broker
	// has an active range containing (but not starting) the token
	IsTokenRangeCovered(ordinal int, token Token) (bool, error)

	// HasTokenHistoryForToken determines whether the broker has any history matching the token
	HasTokenHistoryForToken(ordinal int, token Token, clusterSize int) (bool, error)

	// Gets the last known generation (not necessary the active one) of a given start token
	ReadTokenHistory(ordinal int, token Token, clusterSize int) (*Generation, error)

	// Compare and sets the generation value to the proposed/accepted state
	SetGenerationAsProposed(ordinal int, newGen *Generation, newGen2 *Generation, expectedTx *UUID) error

	// Compare and sets the generation as committed
	SetAsCommitted(ordinal int, token1 Token, token2 *Token, tx UUID) error

	// Sends a request to the previous broker to start the process of splitting its token range
	RangeSplitStart(ordinal int) error

	// RegisterGenListener adds a listener for new generations received by the gossipper
	RegisterGenListener(listener GenListener)

	// RegisterHostUpDownListener adds a handler to that will be invoked when peers switch from UP->DOWN or DOWN->UP
	RegisterHostUpDownListener(listener PeerStateListener)
}

GenerationGossiper is responsible for communicating actions related to generations.

type GenerationProposeMessage

type GenerationProposeMessage struct {
	Generation  *Generation `json:"gen"`
	Generation2 *Generation `json:"gen2"`
	ExpectedTx  *UUID       `json:"tx,omitempty"`
}

Represents the interbroker api json message for proposing and accepting a generation to another broker.

It's possible to accept multiple generations as part of the same transaction.

type Gossiper

type Gossiper interface {
	Initializer
	Closer
	Replicator
	GenerationGossiper

	// Starts accepting connections from peers.
	AcceptConnections() error

	// Starts opening connections to known peers.
	OpenConnections()

	// Sends a message to be handled as a leader of a token
	SendToLeader(
		replicationInfo ReplicationInfo,
		topic string,
		querystring url.Values,
		contentLength int64,
		contentType string,
		body io.Reader) error

	// Sends a request to get file part to one or more peers
	StreamFile(
		peers []int,
		segmentId int64,
		topic *TopicDataId,
		startOffset int64,
		maxRecords int,
		buf []byte) (int, error)

	// Queries a peer for the state of another broker
	ReadBrokerIsUp(ordinal int, brokerUpOrdinal int) (bool, error)

	// Sends a message to the broker with the ordinal number containing the local snapshot of consumers
	SendConsumerGroups(ordinal int, groups []ConsumerGroup) error

	SendConsumerRegister(ordinal int, id string, group string, topics []string, onNewGroup OffsetResetPolicy) error

	SendConsumerCommit(ordinal int, id string) error

	SendConsumerUnregister(ordinal int, id string) error

	// Sends a message to the broker with the committed offset of a consumer group
	SendCommittedOffset(ordinal int, offsetKv *OffsetStoreKeyValue) error

	// Sends a message to the next broker stating the current broker is shutting down
	SendGoobye()

	// Reads the producer offset of a certain past topic generatoin
	ReadProducerOffset(ordinal int, topic *TopicDataId) (int64, error)

	// Retrieves the file structure from the peers and merge it with the local file structure
	MergeTopicFiles(peers []int, topic *TopicDataId, offset int64) error

	// Adds a listener for consumer information
	RegisterConsumerInfoListener(listener ConsumerInfoListener)

	// Adds a listener for rerouted messages
	RegisterReroutedMessageListener(listener ReroutingListener)

	// WaitForPeersUp blocks until all peers are UP
	WaitForPeersUp()

	// Gets a snapshot information to determine whether a broker is considered as UP
	IsHostUp(ordinal int) bool
}

Gossiper is responsible for communicating with other peers.

func NewGossiper

func NewGossiper(
	config conf.GossipConfig,
	discoverer discovery.Discoverer,
	localDb localdb.Client,
	datalog data.Datalog,
) Gossiper

type PeerStateListener

type PeerStateListener interface {
	OnHostUp(broker BrokerInfo)
	OnHostDown(broker BrokerInfo)

	// Invoked as a result of a peer sending goodbye message
	OnHostShuttingDown(broker BrokerInfo)
}

type ReroutingListener

type ReroutingListener interface {
	OnReroutedMessage(
		topic string,
		querystring url.Values,
		contentLength int64,
		contentType string,
		body io.ReadCloser) error
}

type TopicFileStructureMessage

type TopicFileStructureMessage struct {
	FileNames []string `json:"fileNames"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL