protocol

package
v0.0.0-...-f15e7a6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2024 License: Apache-2.0 Imports: 8 Imported by: 6

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewObserver

func NewObserver() *observer

func RunFollowerServer

func RunFollowerServer(naddr string,
	leader string,
	ss RequestMgr,
	handler ActionHandler,
	factory MsgFactory,
	killch <-chan bool) (err error)

Create a new FollowerServer. This is a blocking call until the FollowerServer terminates. Make sure the kilch is a buffered channel such that if the goroutine running RunFollowerServer goes away, the sender won't get blocked.

func RunLeaderServer

func RunLeaderServer(naddr string,
	listener *common.PeerListener,
	ss RequestMgr,
	handler ActionHandler,
	factory MsgFactory,
	killch <-chan bool) (err error)

Create a new LeaderServer. This is a blocking call until the LeaderServer termintates.

killch should be unbuffered to ensure the sender won't block

func RunLeaderServerWithCustomHandler

func RunLeaderServerWithCustomHandler(naddr string,
	listener *common.PeerListener,
	ss RequestMgr,
	handler ActionHandler,
	factory MsgFactory,
	reqHandler CustomRequestHandler,
	killch <-chan bool,
	resetCh <-chan bool) (err error)

func RunWatcherServer

func RunWatcherServer(leader string,
	handler ActionHandler,
	factory MsgFactory,
	killch <-chan bool,
	readych chan<- bool)

Create a new WatcherServer. This is a blocking call until the WatcherServer terminates. Make sure the kilch is a buffered channel such that if the goroutine running RunWatcherServer goes away, the sender won't get blocked.

func RunWatcherServerWithElection

func RunWatcherServerWithElection(host string,
	peerUDP []string,
	peerTCP []string,
	requestMgr RequestMgr,
	handler ActionHandler,
	factory MsgFactory,
	killch <-chan bool,
	readych chan<- bool)

Create a new WatcherServer. This is a blocking call until the WatcherServer terminates. Make sure the kilch is a buffered channel such that if the goroutine running RunWatcherServer goes away, the sender won't get blocked.

func RunWatcherServerWithRequest

func RunWatcherServerWithRequest(leader string,
	requestMgr RequestMgr,
	handler ActionHandler,
	factory MsgFactory,
	killch <-chan bool,
	readych chan<- bool,
	alivech chan<- bool,
	pingch <-chan bool)

Create a new WatcherServer. This is a blocking call until the WatcherServer terminates. Make sure the kilch is a buffered channel such that if the goroutine running RunWatcherServer goes away, the sender won't get blocked.

func RunWatcherServerWithRequest2

func RunWatcherServerWithRequest2(leader string,
	requestMgr RequestMgr,
	handler ActionHandler,
	factory MsgFactory,
	killch <-chan bool,
	readych chan<- bool,
	alivech chan<- bool,
	pingch <-chan bool,
	authfn ClientAuthFunction)

Types

type AbortMsg

type AbortMsg interface {
	common.Packet
	GetFid() string
	GetReqId() uint64
	GetError() string
}

type AcceptMsg

type AcceptMsg interface {
	common.Packet
	GetTxnid() uint64
	GetFid() string
}

type ActionHandler

type ActionHandler interface {

	//
	// Environment API
	//
	GetEnsembleSize() uint64

	//
	// The following API are used during election
	//
	GetLastLoggedTxid() (common.Txnid, error)

	GetLastCommittedTxid() (common.Txnid, error)

	GetStatus() PeerStatus

	GetQuorumVerifier() QuorumVerifier

	// Current Epoch is set during leader/followr discovery phase.
	// It is the current epoch (term) of the leader.
	GetCurrentEpoch() (uint32, error)

	// This is the Epoch that leader/follower agrees during discovery/sync phase.
	GetAcceptedEpoch() (uint32, error)

	GetCommitedEntries(txid1, txid2 common.Txnid) (<-chan LogEntryMsg, <-chan error, chan<- bool, error)

	LogAndCommit(txid common.Txnid, op uint32, key string, content []byte, toCommit bool) error

	// Set new accepted epoch as well as creating new txnid
	NotifyNewAcceptedEpoch(uint32) error

	NotifyNewCurrentEpoch(uint32) error

	//
	// The following API are used during normal execution
	//
	GetNextTxnId() common.Txnid

	GetFollowerId() string

	LogProposal(proposal ProposalMsg) error

	Commit(txid common.Txnid) error

	Abort(fid string, reqId uint64, err string) error

	Respond(fid string, reqId uint64, err string, content []byte) error
}

type Ballot

type Ballot struct {
	// contains filtered or unexported fields
}

type ClientAuthFunction

type ClientAuthFunction func(*common.PeerPipe) error

type CommitMsg

type CommitMsg interface {
	common.Packet
	GetTxnid() uint64
}

type ConsentState

type ConsentState struct {
	// contains filtered or unexported fields
}

func NewConsentState

func NewConsentState(sid string, epoch uint32, ensemble uint64) *ConsentState

Create a new ConsentState for synchronization. The leader must proceed in 4 stages:

  1. Reach quourm of followers for sending its persisted acceptedEpoch to the leader. The leader uses followers acceptedEpoch to determine the next epoch.
  2. Reach quorum of followers to accept the new epoch.
  3. Synchronizes the commit log between leader and each follower
  4. Reach quorum of followers to accepts this leader (NewLeaderAck)

The ConsentState is used for keep that state for stages (1), (2) and (4) where quorum is required to proceed to next stage.

The ConsentState is using the physical host (actual port) which is different for each TCP connection. This requires the ConsentState to be cleaned up if synchronization with a particular follower aborts. After synchronization with a follower succeeds, the follower's vote will stay in the ConsentState, since the main purpose of the ConsentState is for voting on a new epoch, as well as establishing that a majority of followers are going to follower the leader. A node can only establish leadership until stage 4 passes. Once leadership is established, if the node looses majority of followers, the server should abort and go through re-election again with a new ConsentState.

func (*ConsentState) Terminate

func (s *ConsentState) Terminate()

type CustomRequestHandler

type CustomRequestHandler interface {
	OnNewRequest(fid string, request RequestMsg)
	GetResponseChannel() <-chan common.Packet
}

type ElectionSite

type ElectionSite struct {
	// contains filtered or unexported fields
}

The ElectionSite controls all the participants of a election.

  1. messenger - repsonsible for sending messages to other voter
  2. ballot master - manages a ballot orginated from this node. This includes re-balloting if there is no convergence on the votes.
  3. poll worker - recieve votes from other voters and determine if majority is reached

func CreateElectionSite

func CreateElectionSite(laddr string,
	peers []string,
	factory MsgFactory,
	handler ActionHandler,
	solicitOnly bool) (election *ElectionSite, err error)

Create ElectionSite

func (*ElectionSite) Close

func (e *ElectionSite) Close()

Close ElectionSite. Any pending ballot will be closed immediately.

func (*ElectionSite) IsClosed

func (e *ElectionSite) IsClosed() bool

Tell if the ElectionSite is closed.

func (*ElectionSite) StartElection

func (e *ElectionSite) StartElection() <-chan string

Start a new Election. If there is a ballot in progress, this function will return a nil channel. The ballot will happen indefinitely until a winner emerge or there is an error. The winner will be returned through winnerch. If there is an error, the channel will be closed without sending a value.

func (*ElectionSite) UpdateWinningEpoch

func (s *ElectionSite) UpdateWinningEpoch(epoch uint32)

Update the winning epoch. The epoch can change after the synchronization phase (when leader tells the follower what is the actual epoch value -- after the leader gets a quorum of followers). There are other possible implementations (e.g. keeping the winning vote with the server -- not the ballotMaster), but for now, let's just have this API to update the epoch. Note that this is just a public wrapper method on top of ballotMaster.

type EpochAckMsg

type EpochAckMsg interface {
	common.Packet
	GetLastLoggedTxid() uint64
	GetCurrentEpoch() uint32
}

type Follower

type Follower struct {
	// contains filtered or unexported fields
}

func NewFollower

func NewFollower(kind PeerRole,
	pipe *common.PeerPipe,
	handler ActionHandler,
	factory MsgFactory) *Follower

Create a new Follower. This will run the follower protocol to communicate with the leader in voting proposal as well as sending new proposal to leader.

func (*Follower) ForwardRequest

func (f *Follower) ForwardRequest(request RequestMsg) bool

Forward the request to the leader

func (*Follower) GetFollowerId

func (f *Follower) GetFollowerId() string

Return the follower ID

func (*Follower) Start

func (f *Follower) Start() <-chan bool

Start the listener. This is running in a goroutine. The follower can be shutdown by calling Terminate() function or by closing the PeerPipe.

func (*Follower) Terminate

func (f *Follower) Terminate()

Terminate. This function is an no-op if the follower already complete successfully.

type FollowerInfoMsg

type FollowerInfoMsg interface {
	common.Packet
	GetAcceptedEpoch() uint32
	GetFid() string
	GetVoting() bool
}

type FollowerServer

type FollowerServer struct {
	// contains filtered or unexported fields
}

type FollowerStageCode

type FollowerStageCode uint16
const (
	SEND_FOLLOWERINFO FollowerStageCode = iota
	RECEIVE_UPDATE_ACCEPTED_EPOCH
	SYNC_RECEIVE
	RECEIVE_UPDATE_CURRENT_EPOCH
	FOLLOWER_SYNC_DONE
)

type FollowerState

type FollowerState struct {
	// contains filtered or unexported fields
}

type FollowerSyncProxy

type FollowerSyncProxy struct {
	// contains filtered or unexported fields
}

func NewFollowerSyncProxy

func NewFollowerSyncProxy(leader *common.PeerPipe,
	handler ActionHandler,
	factory MsgFactory,
	voting bool) *FollowerSyncProxy

Create a FollowerSyncProxy to synchronize with a leader. The proxy requires 1 stateful variables to be provided as inputs:

  1. Leader : The leader is a PeerPipe (TCP connection). This is used to exchange messages with the leader node.

func (*FollowerSyncProxy) GetDoneChannel

func (l *FollowerSyncProxy) GetDoneChannel() <-chan bool

Return a channel that tells when the syncrhonization is done. This is unbuffered channel such that FollowerSyncProxy will not be blocked upon completion of synchronization (whether successful or not).

func (*FollowerSyncProxy) Start

func (f *FollowerSyncProxy) Start() bool

Start synchronization with a speicfic leader. This function can be run as regular function or go-routine. If the caller runs this as a go-routine, the caller should use GetDoneChannel() to tell when this function completes.

There are 3 cases when this function sends "false" to donech: 1) When there is an error during synchronization 2) When synchronization timeout 3) Terminate() is called

When a failure (false) result is sent to the donech, this function will also close the PeerPipe to the leader. This will force the leader to skip this follower.

This function will catch panic.

func (*FollowerSyncProxy) Terminate

func (l *FollowerSyncProxy) Terminate()

Terminate the syncrhonization with this leader. Upon temrination, the leader will skip this follower. This function is an no-op if the FollowerSyncProxy already completes successfully.

type IdGen

type IdGen struct {
	// contains filtered or unexported fields
}

type Leader

type Leader struct {
	// contains filtered or unexported fields
}

func NewLeader

func NewLeader(naddr string,
	handler ActionHandler,
	factory MsgFactory) (leader *Leader, err error)

Create a new leader

func NewLeaderWithCustomHandler

func NewLeaderWithCustomHandler(naddr string,
	handler ActionHandler,
	factory MsgFactory,
	reqHandler CustomRequestHandler) (leader *Leader, err error)

func (*Leader) AddFollower

func (l *Leader) AddFollower(fid string,
	peer *common.PeerPipe,
	o *observer)

Add a follower and starts a listener for the follower. If the leader is terminated, the pipe between leader and follower will also be closed.

func (*Leader) AddObserver

func (l *Leader) AddObserver(id string, o *observer)

Add observer

func (*Leader) AddWatcher

func (l *Leader) AddWatcher(fid string,
	peer *common.PeerPipe,
	o *observer)

Add a watcher. If the leader is terminated, the pipe between leader and watcher will also be closed.

func (*Leader) GetActiveEnsembleSize

func (l *Leader) GetActiveEnsembleSize() int

Get the current ensmeble size of the leader. It is the number of followers + 1 (including leader)

func (*Leader) GetEnsembleChangeChannel

func (l *Leader) GetEnsembleChangeChannel() <-chan bool

Get the channel for notify when the ensemble of followers changes. The receiver of the channel can then tell if the leader has a quorum of followers.

func (*Leader) GetFollowerId

func (l *Leader) GetFollowerId() string

Return the follower ID. The leader is a follower to itself.

func (*Leader) IsClosed

func (l *Leader) IsClosed() bool

Has the leader terminated/closed?

func (*Leader) QueueRequest

func (l *Leader) QueueRequest(fid string, req common.Packet)

func (*Leader) QueueResponse

func (l *Leader) QueueResponse(req common.Packet)

func (*Leader) RemoveObserver

func (l *Leader) RemoveObserver(id string)

Remove observer

func (*Leader) Terminate

func (l *Leader) Terminate()

Terminate the leader. It is an no-op if the leader is already completed successfully.

type LeaderInfoMsg

type LeaderInfoMsg interface {
	common.Packet
	GetAcceptedEpoch() uint32
}

type LeaderServer

type LeaderServer struct {
	// contains filtered or unexported fields
}

type LeaderStageCode

type LeaderStageCode uint16
const (
	UPDATE_ACCEPTED_EPOCH_AFTER_QUORUM LeaderStageCode = iota
	NOTIFY_NEW_EPOCH
	UPDATE_CURRENT_EPOCH_AFTER_QUORUM
	SYNC_SEND
	DECLARE_NEW_LEADER_AFTER_QUORUM
	LEADER_SYNC_DONE
)

type LeaderState

type LeaderState struct {
	// contains filtered or unexported fields
}

type LeaderSyncProxy

type LeaderSyncProxy struct {
	// contains filtered or unexported fields
}

func NewLeaderSyncProxy

func NewLeaderSyncProxy(leader *Leader,
	state *ConsentState,
	follower *common.PeerPipe,
	handler ActionHandler,
	factory MsgFactory) *LeaderSyncProxy

Create a LeaderSyncProxy to synchronize with a follower. The proxy requires 2 stateful variables to be provided as inputs:

  1. ConsentState: The LeaderSyncProxy requires a quorum of followers to follow before leader can process client request. The consentState is a shared state (shared among multiple LeaderSyncProxy) to keep track of the followers following this leader during synchronziation. Note that a follower may leave the leader after synchronziation, but the ConsentState will not keep track of follower leaving.
  2. Follower: The follower is a PeerPipe (TCP connection). This is used to exchange messages with the follower node.

func (*LeaderSyncProxy) CanFollowerVote

func (l *LeaderSyncProxy) CanFollowerVote() bool

Can the follower vote?

func (*LeaderSyncProxy) GetDoneChannel

func (l *LeaderSyncProxy) GetDoneChannel() <-chan bool

Return a channel that tells when the syncrhonization is done. This is unbuffered channel such that LeaderSyncProxy will not be blocked upon completion of synchronization (whether successful or not).

func (*LeaderSyncProxy) GetFid

func (l *LeaderSyncProxy) GetFid() string

Return the fid (follower id)

func (*LeaderSyncProxy) Start

func (l *LeaderSyncProxy) Start(o *observer, packet common.Packet) bool

Start synchronization with a speicfic follower. This function can be run as regular function or go-routine. If the caller runs this as a go-routine, the caller should use GetDoneChannel() to tell when this function completes.

There are 3 cases when this function sends "false" to donech: 1) When there is an error during synchronization 2) When synchronization timeout 3) Terminate() is called

When a failure (false) result is sent to the donech, this function will also close the PeerPipe to the follower. This will force the follower to restart election.

This function will catch panic.

func (*LeaderSyncProxy) Terminate

func (l *LeaderSyncProxy) Terminate()

Terminate the syncrhonization with this follower. Upon temrination, the follower will enter into election again. This function cannot guarantee that the go-routine will terminate until the given ConsentState is terminated as well. This function is an no-op if the LeaderSyncProxy already completes successfully.

type ListenerState

type ListenerState struct {
	// contains filtered or unexported fields
}

type LogEntryMsg

type LogEntryMsg interface {
	common.Packet
	GetTxnid() uint64
	GetOpCode() uint32
	GetKey() string
	GetContent() []byte
}

type MsgFactory

type MsgFactory interface {
	CreateProposal(txnid uint64, fid string, reqId uint64, op uint32, key string, content []byte) ProposalMsg

	CreateAccept(txnid uint64, fid string) AcceptMsg

	CreateCommit(txnid uint64) CommitMsg

	CreateAbort(fid string, reqId uint64, err string) AbortMsg

	CreateVote(round uint64, status uint32, epoch uint32, cndId string, cndLoggedTxnId uint64,
		cndCommittedTxnId uint64, solicit bool) VoteMsg

	CreateFollowerInfo(epoch uint32, fid string, voting bool) FollowerInfoMsg

	CreateEpochAck(lastLoggedTxid uint64, epoch uint32) EpochAckMsg

	CreateLeaderInfo(epoch uint32) LeaderInfoMsg

	CreateNewLeader(epoch uint32) NewLeaderMsg

	CreateNewLeaderAck() NewLeaderAckMsg

	CreateLogEntry(txnid uint64, opCode uint32, key string, content []byte) LogEntryMsg

	CreateRequest(id uint64, opCode uint32, key string, content []byte) RequestMsg

	CreateResponse(fid string, reqId uint64, err string, content []byte) ResponseMsg
}

type NewLeaderAckMsg

type NewLeaderAckMsg interface {
	common.Packet
}

type NewLeaderMsg

type NewLeaderMsg interface {
	common.Packet
	GetCurrentEpoch() uint32
}

type PeerRole

type PeerRole byte
const (
	LEADER PeerRole = iota
	FOLLOWER
	WATCHER
)

type PeerStatus

type PeerStatus byte
const (
	ELECTING PeerStatus = iota
	LEADING
	FOLLOWING
	WATCHING
)

type ProposalMsg

type ProposalMsg interface {
	common.Packet
	GetTxnid() uint64
	GetFid() string
	GetReqId() uint64
	GetOpCode() uint32
	GetKey() string
	GetContent() []byte
}

type QuorumVerifier

type QuorumVerifier interface {
	HasQuorum(count int) bool
}

type RequestHandle

type RequestHandle struct {
	Request   RequestMsg
	Err       error
	Mutex     sync.Mutex
	CondVar   *sync.Cond
	StartTime int64
	Content   []byte
}

type RequestMgr

type RequestMgr interface {
	GetRequestChannel() <-chan *RequestHandle
	AddPendingRequest(handle *RequestHandle)
	CleanupOnError()
}

type RequestMsg

type RequestMsg interface {
	common.Packet
	GetReqId() uint64
	GetOpCode() uint32
	GetKey() string
	GetContent() []byte
}

type ResponseMsg

type ResponseMsg interface {
	common.Packet
	GetFid() string
	GetReqId() uint64
	GetError() string
	GetContent() []byte
}

type VoteMsg

type VoteMsg interface {
	common.Packet
	GetRound() uint64
	GetStatus() uint32
	GetEpoch() uint32
	GetCndId() string
	GetCndLoggedTxnId() uint64
	GetCndCommittedTxnId() uint64
	GetSolicit() bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL