Documentation ¶
Index ¶
- type PgTransport
- func NewPgTransport(stream StreamLayer, maxPool int, timeout time.Duration, logOutput io.Writer) *PgTransport
- func NewPgTransportWithConfig(config *PgTransportConfig) *PgTransport
- func NewPgTransportWithLogger(stream StreamLayer, maxPool int, timeout time.Duration, logger timber.Logger) *PgTransport
- func (p *PgTransport) AppendEntries(id raft.ServerID, target raft.ServerAddress, args *raft.AppendEntriesRequest, ...) error
- func (p *PgTransport) AppendEntriesPipeline(id raft.ServerID, target raft.ServerAddress) (raft.AppendPipeline, error)
- func (p *PgTransport) Close() error
- func (p *PgTransport) CloseStreams()
- func (p *PgTransport) Consumer() <-chan raft.RPC
- func (p *PgTransport) DecodePeer(buf []byte) raft.ServerAddress
- func (p *PgTransport) EncodePeer(id raft.ServerID, a raft.ServerAddress) []byte
- func (p *PgTransport) InstallSnapshot(id raft.ServerID, target raft.ServerAddress, args *raft.InstallSnapshotRequest, ...) error
- func (p *PgTransport) IsShutdown() bool
- func (p *PgTransport) LocalAddr() raft.ServerAddress
- func (p *PgTransport) RequestVote(id raft.ServerID, target raft.ServerAddress, args *raft.RequestVoteRequest, ...) error
- func (p *PgTransport) SetHeartbeatHandler(callback func(rpc raft.RPC))
- func (p *PgTransport) TimeoutNow(id raft.ServerID, target raft.ServerAddress, args *raft.TimeoutNowRequest, ...) error
- type PgTransportConfig
- type ServerAddressProvider
- type StreamLayer
- type Transport
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type PgTransport ¶
type PgTransport struct {
// contains filtered or unexported fields
}
PgTransport is an improved TCP transport for raft that uses a net code similar to Postgres.
func NewPgTransport ¶
func NewPgTransport( stream StreamLayer, maxPool int, timeout time.Duration, logOutput io.Writer, ) *PgTransport
NewPgTransport creates a new network transport with the given dialer and listener. The maxPool controls how many connections we will pool. The timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply the timeout by (SnapshotSize / TimeoutScale).
func NewPgTransportWithConfig ¶
func NewPgTransportWithConfig( config *PgTransportConfig, ) *PgTransport
func NewPgTransportWithLogger ¶
func NewPgTransportWithLogger( stream StreamLayer, maxPool int, timeout time.Duration, logger timber.Logger, ) *PgTransport
NewPgTransportWithLogger creates a new network transport with the given logger, dialer and listener. The maxPool controls how many connections we will pool. The timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply the timeout by (SnapshotSize / TimeoutScale).
func (*PgTransport) AppendEntries ¶
func (p *PgTransport) AppendEntries( id raft.ServerID, target raft.ServerAddress, args *raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse, ) error
func (*PgTransport) AppendEntriesPipeline ¶
func (p *PgTransport) AppendEntriesPipeline( id raft.ServerID, target raft.ServerAddress, ) (raft.AppendPipeline, error)
func (*PgTransport) Close ¶
func (p *PgTransport) Close() error
func (*PgTransport) CloseStreams ¶
func (p *PgTransport) CloseStreams()
CloseStreams closes the current streams.
func (*PgTransport) Consumer ¶
func (p *PgTransport) Consumer() <-chan raft.RPC
Consumer implements the Transport interface.
func (*PgTransport) DecodePeer ¶
func (p *PgTransport) DecodePeer(buf []byte) raft.ServerAddress
DecodePeer implements the Transport interface.
func (*PgTransport) EncodePeer ¶
func (p *PgTransport) EncodePeer(id raft.ServerID, a raft.ServerAddress) []byte
EncodePeer implements the Transport interface.
func (*PgTransport) InstallSnapshot ¶
func (p *PgTransport) InstallSnapshot( id raft.ServerID, target raft.ServerAddress, args *raft.InstallSnapshotRequest, resp *raft.InstallSnapshotResponse, data io.Reader, ) error
func (*PgTransport) IsShutdown ¶
func (p *PgTransport) IsShutdown() bool
func (*PgTransport) LocalAddr ¶
func (p *PgTransport) LocalAddr() raft.ServerAddress
LocalAddr implements the Transport interface.
func (*PgTransport) RequestVote ¶
func (p *PgTransport) RequestVote( id raft.ServerID, target raft.ServerAddress, args *raft.RequestVoteRequest, resp *raft.RequestVoteResponse, ) error
func (*PgTransport) SetHeartbeatHandler ¶
func (p *PgTransport) SetHeartbeatHandler(callback func(rpc raft.RPC))
SetHeartbeatHandler is used to setup a heartbeat handler as a fast-pass. This is to avoid head-of-line blocking from disk IO.
func (*PgTransport) TimeoutNow ¶
func (p *PgTransport) TimeoutNow(id raft.ServerID, target raft.ServerAddress, args *raft.TimeoutNowRequest, resp *raft.TimeoutNowResponse) error
type PgTransportConfig ¶
type PgTransportConfig struct { ServerAddressProvider ServerAddressProvider Logger timber.Logger Stream StreamLayer MaxPool int Timeout time.Duration }
PgTransportConfig exposes just a few ways to tweak the internal behavior of the pg transport.
type ServerAddressProvider ¶
type ServerAddressProvider interface {
ServerAddr(id raft.ServerID) (raft.ServerAddress, error)
}
ServerAddressProvider just provides us a potential implementation to allow us to lookup an address with whatever ID we are provided. While it is default behavior most of the time to use the listen address as the server ID in a raft implementation, this is a dumb idea and we should absolutely not depend on it.
type StreamLayer ¶
type StreamLayer interface { net.Listener // Dial is used to create a new outgoing connection Dial(address raft.ServerAddress, timeout time.Duration) (net.Conn, error) }
StreamLayer is just a local interface definition for our net stuff essentially what will actually be passed here is from the core.Wrapper stuff that we built as a net code hack.