common

package
v0.0.0-...-f15e7a6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2024 License: Apache-2.0 Imports: 13 Imported by: 9

Documentation

Index

Constants

This section is empty.

Variables

View Source
var BALLOT_FINALIZE_WAIT time.Duration = 200 // wait this much for new votes before completing leader election
View Source
var BALLOT_MAX_TIMEOUT time.Duration = 500 // max timeout for a ballot (millisecond)
View Source
var BALLOT_TIMEOUT time.Duration = 50 // timeout for a ballot (millisecond)
View Source
var BOOTSTRAP_ACCEPTED_EPOCH uint32 = 0 // Boostrap value of accepted epoch
View Source
var BOOTSTRAP_CURRENT_EPOCH uint32 = 0 // Boostrap value of current epoch
View Source
var CONFIG_ACCEPTED_EPOCH = "AcceptedEpoch" // Server Config Param : AcceptedEpoch
View Source
var CONFIG_CURRENT_EPOCH = "CurrentEpoch" // Server Config Param : CurrentEpoch
View Source
var CONFIG_LAST_COMMITTED_TXID = "LastCommittedTxid" // Server Config Param : LastCommittedTxid
View Source
var CONFIG_LAST_LOGGED_TXID = "LastLoggedTxid" // Server Config Param : LastLoggedTxid
View Source
var CONFIG_MAGIC = "MagicNumber" // Server Config Param : Magic Number
View Source
var CONFIG_MAGIC_VALUE uint64 = 0x0123456789 // Server Config Param : Magic Number Value
View Source
var ELECTION_PORT = 9998 // port for receving election votes from peer
View Source
var ELECTION_TRANSPORT_TYPE = "udp" // network protocol for election vote transport
View Source
var LEADER_TIMEOUT time.Duration = 100000 // timeout for leader (millisecond)
View Source
var MAX_COUNTER uint32 = math.MaxUint32 // Max value for counter
View Source
var MAX_DATAGRAM_SIZE = 1000 // maximum size of datagram
View Source
var MAX_EPOCH uint32 = math.MaxUint32 // Max value for epoch
View Source
var MAX_FOLLOWERS = 100 // maximum number of followers
View Source
var MAX_PARTICIPANTS = 50 // maximum number of participants
View Source
var MAX_PEERS = 150 // maximum number of peers
View Source
var MAX_PROPOSALS = 1000 // maximum number of proposals
View Source
var MAX_RETRY_BACKOFF time.Duration = 30000 // max backoff time for retry (millisecond)
View Source
var MESSAGE_PORT = 9999 // port for receving message from peer (e.g. request/proposal)
View Source
var MESSAGE_TRANSPORT_TYPE = "tcp" // network protocol for message transport
View Source
var MIN_FOREST_DB_CACHE_SIZE = uint64(1024 * 1024) // Minimum Forestdb size
View Source
var REPOSITORY_NAME = "MetadataStore" // Forest db name for metadata store
View Source
var RETRY_BACKOFF time.Duration = 1000 // backoff time for retry (millisecond)
View Source
var SYNC_TIMEOUT time.Duration = 300000 // timeout for synchronization (millisecond)
View Source
var TCP_KEEP_ALIVE_PERIOD time.Duration = 100 * time.Millisecond // TCP keep alive period
View Source
var XACT_COMMIT_WAIT_TIME time.Duration = 20 * time.Millisecond // Time to wait for the commit log before returning an error

Functions

func CompareAndIncrementEpoch

func CompareAndIncrementEpoch(epoch1, epoch2 uint32) uint32

Compare epoch1 and epoch2. If epoch1 is equal or more recent, return the next more recent epoch value. If epoch1 is less recent than epoch2, return epoch2 as it is.

func FindPacketConcreteType

func FindPacketConcreteType(packet Packet) reflect.Type

func GetOpCodeStr

func GetOpCodeStr(r OpCode) string

func IsCustomOpCode

func IsCustomOpCode(opCode OpCode) bool

func IsNextInSequence

func IsNextInSequence(new, old Txnid) bool

Return true if txid2 is logically next in sequence from txid1. If txid2 and txid1 have different epoch, then only check if txid2 has a larger epoch. Otherwise, compare the counter such that txid2 is txid1 + 1

func IsTransactionalOpCode

func IsTransactionalOpCode(opCode OpCode) bool

func IsValidType

func IsValidType(name string) bool

func Marshall

func Marshall(packet Packet) ([]byte, error)

Take a Packet and create a serialized byte array.

8 bytes : total length of msg (excluding size of total length itself)
8 bytes : length of the message type (protobuf struct)
n bytes : message type (protobuf struct)
n bytes : message content  (protobuf)

func NewUUID

func NewUUID() (uint64, error)

func RegisterPacketByName

func RegisterPacketByName(name string, instance Packet)

func SafeRun

func SafeRun(funcName string, f FuncToRun)

Types

type BackoffTimer

type BackoffTimer struct {
	// contains filtered or unexported fields
}

func NewBackoffTimer

func NewBackoffTimer(duration time.Duration,
	maxDuration time.Duration, factor int) *BackoffTimer

func (*BackoffTimer) Backoff

func (t *BackoffTimer) Backoff()

func (*BackoffTimer) GetChannel

func (t *BackoffTimer) GetChannel() <-chan time.Time

func (*BackoffTimer) Reset

func (t *BackoffTimer) Reset()

func (*BackoffTimer) Stop

func (t *BackoffTimer) Stop() bool

type Cleanup

type Cleanup struct {
	// contains filtered or unexported fields
}

func NewCleanup

func NewCleanup(f func()) *Cleanup

func (*Cleanup) Cancel

func (c *Cleanup) Cancel()

func (*Cleanup) Run

func (c *Cleanup) Run()

type CompareResult

type CompareResult byte
const (
	EQUAL CompareResult = iota
	GREATER
	LESSER
	MORE_RECENT
	LESS_RECENT
)

func CompareEpoch

func CompareEpoch(epoch1, epoch2 uint32) CompareResult

Compare function to compare epoch1 with epoch2

return common.EQUAL if epoch1 is the same as epoch2 return common.MORE_RECENT if epoch1 is more recent return common.LESS_RECENT if epoch1 is less recent

This is just to prepare in the future if we support rolling over the epoch (but it will also require changes to comparing txnid as well).

type Error

type Error struct {
	// contains filtered or unexported fields
}

func NewError

func NewError(code ErrorCode, reason string) *Error

func WrapError

func WrapError(code ErrorCode, reason string, cause error) *Error

func (*Error) Error

func (e *Error) Error() string

func (*Error) IsFatal

func (e *Error) IsFatal() bool

type ErrorCode

type ErrorCode byte
const (
	PROTOCOL_ERROR ErrorCode = iota
	SERVER_ERROR
	SERVER_CONFIG_ERROR
	FATAL_ERROR
	ARG_ERROR
	ELECTION_ERROR
	CLIENT_ERROR
	REPO_ERROR
)

type FuncToRun

type FuncToRun func()

type Message

type Message struct {
	Content Packet
	Peer    net.Addr
}

A wrapper of a UPD message (content + sender addr)

type OpCode

type OpCode byte
const (
	OPCODE_INVALID OpCode = iota
	OPCODE_ADD
	OPCODE_SET
	OPCODE_DELETE
	OPCODE_GET
	OPCODE_STREAM_BEGIN_MARKER
	OPCODE_STREAM_END_MARKER
	OPCODE_BROADCAST
	OPCODE_CUSTOM = 50
)

func GetOpCode

func GetOpCode(s string) OpCode

func GetOpCodeFromInt

func GetOpCodeFromInt(i uint32) OpCode

type Packet

type Packet interface {
	// Name of the message
	Name() string

	// Encode function shall marshal message to byte array.
	Encode() (data []byte, err error)

	// Decode function shall unmarshal byte array back to message.
	Decode(data []byte) (err error)

	GetVersion() uint32

	// Debug Representation
	String() string
}

func CreatePacketByName

func CreatePacketByName(name string) (Packet, error)

func UnMarshall

func UnMarshall(payload []byte) (Packet, error)

Take a byte stream and create a Packet

type PacketRegistry

type PacketRegistry struct {
	// contains filtered or unexported fields
}

func NewPacketRegistry

func NewPacketRegistry() *PacketRegistry

type PeerConn

type PeerConn struct {
	// contains filtered or unexported fields
}

PeerConn encapsulates TCP/TLS connection, its corresponding PeerPipe and the first non-auth packet received by the auth function, if any.

func NewPeerConn

func NewPeerConn(conn net.Conn, pipe *PeerPipe, packet Packet) *PeerConn

func (*PeerConn) GetConn

func (peerConn *PeerConn) GetConn() net.Conn

func (*PeerConn) GetPacket

func (peerConn *PeerConn) GetPacket() Packet

func (*PeerConn) GetPeerPipe

func (peerConn *PeerConn) GetPeerPipe() *PeerPipe

type PeerListener

type PeerListener struct {
	// contains filtered or unexported fields
}

PeerListener - Listener for TCP connection If there is a new connection request, send the new connection through connch. If the listener is closed, connch will be closed.

func StartPeerListener

func StartPeerListener(laddr string) (*PeerListener, error)

Start a new PeerListener for listening to new connection request for processing messages. laddr - local network address (host:port)

func StartPeerListener2

func StartPeerListener2(laddr string, authfn ServerAuthFunction) (*PeerListener, error)

func (*PeerListener) Close

func (l *PeerListener) Close() bool

Close the PeerListener. The connection channel will be closed as well. This function is syncronized and will not close the channel twice.

func (*PeerListener) CloseNoLock

func (l *PeerListener) CloseNoLock() bool

func (*PeerListener) ConnChannel

func (l *PeerListener) ConnChannel() <-chan *PeerConn

Get the channel for new peer connection request. Return nil if the listener is closed. The consumer should also check if the channel is closed when dequeueing from the channel.

func (*PeerListener) ResetConnections

func (l *PeerListener) ResetConnections() (*PeerListener, error)

IMP: ResetConnections algorithm will trigger closing of the existing PeerListener and starting of a new one. But it uses the same connch so that the leaderServer shouldn't need a restart.

type PeerMessenger

type PeerMessenger struct {
	// contains filtered or unexported fields
}

PeerMessenger sends packets between peers.

func NewPeerMessenger

func NewPeerMessenger(laddr string, splitter map[string]chan *Message) (*PeerMessenger, error)

Create a new PeerMessenger. The consumer can call ReceiveChannel() to get the channel for receving message packets. If the splitter is not specified, then the message will be sent out through the channel returned by ReceiveChannel(). Otherwise, the splitter map will be used to decide which channel to use for incoming message. The key to the map is name of the Packet (Packet.Name()). If the splitter does not map to a channel for the given name, then it will send the message out through the default channel.

If the messenger is closed, the splitter channels will be closed as well.

func (*PeerMessenger) Close

func (p *PeerMessenger) Close() bool

Close the PeerMessenger. It is safe to call this method multiple times without causing panic.

func (*PeerMessenger) DefaultReceiveChannel

func (p *PeerMessenger) DefaultReceiveChannel() <-chan *Message

Return the default receive channel. If a splitter is specified, then it will first use the channel in the splitter map. If the splitter is not specified or the splitter map does not map to a channel, then the default receive channel is used.

func (*PeerMessenger) GetLocalAddr

func (p *PeerMessenger) GetLocalAddr() string

Get the local net address.

func (*PeerMessenger) Multicast

func (p *PeerMessenger) Multicast(packet Packet, peers []net.Addr) bool

Send a packet to the all the peers. This method will return false if the pipe is already closed.

func (*PeerMessenger) ReceiveChannel

func (p *PeerMessenger) ReceiveChannel(msgName string) <-chan *Message

Get the receiving channel for the specific message name. If there is no match, the return the default receiving channel.

func (*PeerMessenger) Send

func (p *PeerMessenger) Send(packet Packet, peer net.Addr) bool

Send a packet to the peer. This method will return false if the pipe is already closed.

func (*PeerMessenger) SendByName

func (p *PeerMessenger) SendByName(packet Packet, peer string) bool

Send a packet to the peer. This method will return false if the pipe is already closed or there is error in resolving the peer addr.

type PeerPipe

type PeerPipe struct {
	// contains filtered or unexported fields
}

PeerPipe is to maintain the messaging channel between two peers. This is not used for leader election.

func NewPeerPipe

func NewPeerPipe(pconn net.Conn) *PeerPipe

Create a new PeerPipe. The consumer can call ReceiveChannel() to get the channel for receving message packets.

func (*PeerPipe) Close

func (p *PeerPipe) Close() bool

Close the PeerPipe. It is safe to call this method multiple times without causing panic.

func (*PeerPipe) GetAddr

func (p *PeerPipe) GetAddr() string

Get the net address of the remote peer.

func (*PeerPipe) ReceiveChannel

func (p *PeerPipe) ReceiveChannel() <-chan Packet

Return the receive channel.

func (*PeerPipe) Send

func (p *PeerPipe) Send(packet Packet) bool

Send a packet to the peer. This method will return false if the pipe is already closed.

type RecoverableError

type RecoverableError struct {
	Reason string
}

func (*RecoverableError) Error

func (e *RecoverableError) Error() string

type ResettableTimer

type ResettableTimer struct {
	*time.Timer
	// contains filtered or unexported fields
}

func NewResettableTimer

func NewResettableTimer(d time.Duration) *ResettableTimer

func NewStoppedResettableTimer

func NewStoppedResettableTimer(d time.Duration) *ResettableTimer

func (*ResettableTimer) Reset

func (t *ResettableTimer) Reset()

type ServerAuthFunction

type ServerAuthFunction func(conn net.Conn) (*PeerPipe, Packet, error)

type TxnState

type TxnState struct {
	// contains filtered or unexported fields
}

func NewTxnState

func NewTxnState() *TxnState

func (*TxnState) GetNextTxnId

func (t *TxnState) GetNextTxnId() Txnid

func (*TxnState) InitCurrentTxnid

func (t *TxnState) InitCurrentTxnid(txnid Txnid)

func (*TxnState) SetEpoch

func (t *TxnState) SetEpoch(newEpoch uint32)

type Txnid

type Txnid uint64
var BOOTSTRAP_LAST_COMMITTED_TXID Txnid = Txnid(0) // Boostrap value of last committed txid
var BOOTSTRAP_LAST_LOGGED_TXID Txnid = Txnid(0) // Boostrap value of last logged txid

func (Txnid) GetCounter

func (id Txnid) GetCounter() uint64

func (Txnid) GetEpoch

func (id Txnid) GetEpoch() uint64

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL