app

package
v0.0.0-...-db14c7d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2021 License: MIT Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultLogFlags = mdbx.EnvNoMetaSync |
		mdbx.EnvNoTLS |
		mdbx.EnvWriteMap |
		mdbx.EnvLIFOReclaim |
		mdbx.EnvNoMemInit |
		mdbx.EnvCoalesce

	DefaultStableFlags = mdbx.EnvSyncDurable |
		mdbx.EnvNoTLS |
		mdbx.EnvWriteMap |
		mdbx.EnvLIFOReclaim |
		mdbx.EnvNoMemInit |
		mdbx.EnvCoalesce

	Kilobyte = 1024
	Megabyte = 1024 * 1024
	Gigabyte = Megabyte * 1024
	Terabyte = Gigabyte * 1024
)

Variables

View Source
var (
	DefaultStableGeometry = mdbx.Geometry{
		SizeLower:       64 * Kilobyte,
		SizeNow:         8 * Kilobyte,
		SizeUpper:       256 * Kilobyte,
		GrowthStep:      64 * Kilobyte,
		ShrinkThreshold: 128 * Kilobyte,
		PageSize:        4 * Kilobyte,
	}
	DefaultLogGeometry = mdbx.Geometry{
		SizeLower:       1 * Megabyte,
		SizeNow:         1 * Megabyte,
		SizeUpper:       4 * Gigabyte,
		GrowthStep:      16 * Megabyte,
		ShrinkThreshold: 8 * Megabyte,
		PageSize:        8 * Kilobyte,
	}
)
View Source
var ErrCorrupt = errors.New("corrupt")

ErrCorrupt is returned when a data is invalid or corrupt

View Source
var ErrInvalid = errors.New("invalid")

ErrInvalid is returned when an operation has invalid arguments or options

View Source
var ErrNotLeader = raft.ErrNotLeader

ErrNotLeader is returned when the raft leader is unknown

View Source
var (
	ErrPathNotDir = errors.New("path is not a directory")
)
View Source
var ErrSyntax = errors.New("syntax error")

ErrSyntax is returned where there was a syntax error

View Source
var ErrUnauthorized = errors.New("unauthorized")

ErrUnauthorized is returned when a client connection has not been authorized

View Source
var ErrUnknownCommand = errors.New("unknown command")

ErrUnknownCommand is returned when a command is not known

View Source
var ErrWrongNumArgs = errors.New("wrong number of arguments")

ErrWrongNumArgs is returned when the arg count is wrong

Functions

func Main

func Main(conf Config) error

Main entrypoint for the cluster node. This must be called once and only once, and as the last call in the Go main() function. There are no return values as all application operations, logging, and I/O will be forever transferred.

func ReadRawMachineInfo

func ReadRawMachineInfo(m Machine, info *RawMachineInfo)

ReadRawMachineInfo reads the raw machine components.

func RedisDial

func RedisDial(addr, auth string, tlscfg *tls.Config) (redis.Conn, error)

RedisDial is a helper function that dials out to another Uhaha server with redis protocol and using the provded TLS config and Auth token. The TLS/Auth must be correct in order to establish a connection.

func SerializedSize

func SerializedSize(log *raft.Log) int

func WriteRawMachineInfo

func WriteRawMachineInfo(m Machine, info *RawMachineInfo)

WriteRawMachineInfo writes raw components to the machine. Use with care as this operation may destroy the consistency of your cluster.

Types

type Backend

type Backend int

The Backend database format used for storing Raft logs and meta data.

const (
	// LevelDB is an on-disk LSM (LSM log-structured merge-tree) database. This
	// format is optimized for fast sequential reads and writes, which is ideal
	// for most Raft implementations. This is the default format used by Uhaha.
	LevelDB Backend = iota
	// Bolt is an on-disk single-file b+tree database. This format has been a
	// popular choice for Go-based Raft implementations for years.
	Bolt
	MDBX
)

type Config

type Config struct {

	// Name gives the server application a name. Default "uhaha-app"
	Name string

	// Version of the application. Default "0.0.0"
	Version string

	// GitSHA of the application.
	GitSHA string

	// Flag is used to manage the application startup flags.
	Flag struct {
		// Custom tells Main to not automatically parse the application startup
		// flags. When set it is up to the user to parse the os.Args manually
		// or with a different library.
		Custom bool
		// Usage is an optional function that allows for altering the usage
		// message.
		Usage func(usage string) string
		// PreParse is an optional function that allows for adding command line
		// flags before the user flags are parsed.
		PreParse func()
		// PostParse is an optional function that fires after user flags are
		// parsed.
		PostParse func()
	}

	// Snapshot fires when a snapshot
	Snapshot func(data interface{}) (Snapshot, error)

	// Restore returns a data object that is fully restored from the previous
	// snapshot using the input Reader. A restore operation on happens once,
	// if needed, at the start of the application.
	Restore func(rd io.Reader) (data interface{}, err error)

	// UseJSONSnapshots is a convienence field that tells the machine to use
	// JSON as the format for all snapshots and restores. This may be good for
	// small simple data models which have types that can be fully marshalled
	// into JSON, ie. all imporant data fields need to exportable (Capitalized).
	// For more complicated or specialized data, it's proabably best to assign
	// custom functions to the Config.Snapshot and Config.Restore fields.
	// It's invalid to set this field while also setting Snapshot and/or
	// Restore. Default false
	UseJSONSnapshots bool

	// Tick fires at regular intervals as specified by TickDelay. This function
	// can be used to make updates to the database.
	Tick func(m Machine)

	// DataDirReady is an optional callback function that fires containing the
	// path to the directory where all the logs and snapshots are stored.
	DataDirReady func(dir string)

	// ServerReady is an optional callback function that fires when the server
	// socket is listening and is ready to accept incoming connections. The
	// network address, auth, and tls-config are provided to allow for
	// background connections to be made to self, if desired.
	ServerReady func(addr, auth string, tlscfg *tls.Config)

	// ConnOpened is an optional callback function that fires when a new
	// network connection was opened on this machine. You can accept or deny
	// the connection, and optionally provide a client-specific context that
	// stick around until the connection is closed with ConnClosed.
	ConnOpened func(addr string) (context interface{}, accept bool)

	// ConnClosed is an optional callback function that fires when a network
	// connection has been closed on this machine.
	ConnClosed func(context interface{}, addr string)

	LocalTime   bool          // default false
	TickDelay   time.Duration // default 200ms
	BackupPath  string        // default ""
	InitialData interface{}   // default nil
	NodeID      string        // default "1"
	Addr        string        // default ":11001"
	DataDir     string        // default "data"
	LogOutput   io.Writer     // default os.Stderr
	LogLevel    string        // default "notice"
	JoinAddr    string        // default ""
	Backend     Backend       // default LevelDB
	NoSync      bool          // default false
	OpenReads   bool          // default false
	MaxPool     int           // default 8
	TLSCertPath string        // default ""
	TLSKeyPath  string        // default ""
	Auth        string        // default ""
	Advertise   string        // default ""
	TryErrors   bool          // default false (return TRY instead of MOVED)
	InitRunQuit bool          // default false
	// contains filtered or unexported fields
}

Config is the configuration for managing the behavior of the application. This must be fill out prior and then passed to the uhaha.Main() function.

func (*Config) AddCatchallCommand

func (conf *Config) AddCatchallCommand(
	fn func(m Machine, args []string) (interface{}, error),
)

AddCatchallCommand adds an intermediate command that will execute for any input that was not previously defined with AddIntermediateCommand, AddWriteCommand, or AddReadCommand.

func (*Config) AddIntermediateCommand

func (conf *Config) AddIntermediateCommand(name string,
	fn func(m Machine, args []string) (interface{}, error),
)

AddIntermediateCommand adds a command that is for performing client and system specific operations. It *is not* intended for working with the machine data, and doing so will risk data corruption.

func (*Config) AddReadCommand

func (conf *Config) AddReadCommand(name string,
	fn func(m Machine, args []string) (interface{}, error),
)

AddReadCommand adds a command for reading machine data.

func (*Config) AddService

func (conf *Config) AddService(sniff func(rd io.Reader) bool,
	acceptor func(s Service, ln net.Listener),
)

AddService adds a custom client network service, such as HTTP or gRPC. By default, a Redis compatible service is already included.

func (*Config) AddWriteCommand

func (conf *Config) AddWriteCommand(name string,
	fn func(m Machine, args []string) (interface{}, error),
)

AddWriteCommand adds a command for reading or altering machine data.

type FilterArgs

type FilterArgs []string

FilterArgs ...

type Hijack

type Hijack func(s Service, conn HijackedConn)

Hijack is a function type that can be used to "hijack" a service client connection and allowing to perform I/O operations outside the standard network loop. An example of it's usage can be found in the examples/kvdb project.

type HijackedConn

type HijackedConn interface {
	// RemoteAddr is the connection remote tcp address.
	RemoteAddr() string
	// ReadCommands is an iterator function that reads pipelined commands.
	// Returns a error when the connection encountared and error.
	ReadCommands(func(args []string) bool) error
	// ReadCommand reads one command at a time.
	ReadCommand() (args []string, err error)
	// WriteAny writes any type to the write buffer using the format rules that
	// are defined by the original Service.
	WriteAny(v interface{})
	// WriteRaw writes raw data to the write buffer.
	WriteRaw(data []byte)
	// Flush the write write buffer and send data to the connection.
	Flush() error
	// Close the connection
	Close() error
}

HijackedConn is a connection that has been detached from the main service network loop. It's entirely up to the hijacker to performs all I/O operations. The Write* functions buffer write data and the Flush must be called to do the actual sending of the data to the connection. Close the connection to when done.

type Machine

type Machine interface {
	// Data is the original user data interface that was assigned at startup.
	// It's safe to alter the data in this interface while inside a Write
	// command, but it's only safe to read from this interface for Read
	// commands.
	// Returns nil for Intermediate Commands.
	Data() interface{}
	// Now generates a stable timestamp that is synced with internet time
	// and for Write commands is always monotonical increasing. It's made to
	// be a trusted source of time for performing operations on the user data.
	// Always use this function instead of the builtin time.Now().
	// Returns nil for Intermediate Commands.
	Now() time.Time
	// Rand is a random number generator that must be used instead of the
	// standard Go packages `crypto/rand` and `math/rand`. For Write commands
	// the values returned from this generator are crypto seeded, guaranteed
	// to be reproduced in exact order when the server restarts, and identical
	// across all machines in the cluster. The underlying implementation is
	// PCG. Check out http://www.pcg-random.org/ for more information.
	// Returns nil for Intermediate Commands.
	Rand() Rand
	// Context returns the connection context that was defined in from the
	// Config.ConnOpened callback. Only available for Intermediate commands.
	// Returns nil for Read and Write Commands.
	Context() interface{}
}

The Machine interface is passed to every command. It includes the user data and various utilities that should be used from Write, Read, and Intermediate commands.

It's important to note that the Data(), Now(), and Rand() functions can be used safely for Write and Read commands, but are not available for Intermediate commands. The Context() is ONLY available for Intermediate commands.

A call to Rand() and Now() from inside of a Read command will always return back the same last known value of it's respective type. While, from a Write command, you'll get freshly generated values. This is to ensure that the every single command ALWAYS generates the same series of data on every server.

type Message

type Message struct {
	// Args are the original command arguments.
	Args []string
	// Resp is the command reponse, if not an error.
	Resp interface{}
	// Err is the command error, if not successful.
	Err error
	// Elapsed is the amount of time that the command took to process.
	Elapsed time.Duration
	// Addr is the remote TCP address of the connection that generated
	// this message.
	Addr string
}

A Message represents a command and is in a format that is consumed by an Observer.

type Monitor

type Monitor interface {
	// Send a message to observers
	Send(msg Message)
	// NewObjser returns a new Observer containing a channel that will send the
	// messages for every command processed by the service.
	// Stop the observer to release associated resources.
	NewObserver() Observer
}

Monitor represents an interface for sending and consuming command messages that are processed by a Service.

type Observer

type Observer interface {
	Stop()
	C() <-chan Message
}

An Observer holds a channel that delivers the messages for all commands processed by a Service.

type Rand

type Rand interface {
	Int() int
	Uint64() uint64
	Uint32() uint32
	Float64() float64
	Read([]byte) (n int, err error)
}

Rand is a random number interface used by Machine

type RawMachineInfo

type RawMachineInfo struct {
	TS   int64
	Seed int64
}

RawMachineInfo represents the raw components of the machine

type Receiver

type Receiver interface {
	Recv() (interface{}, time.Duration, error)
}

Receiver ...

func Response

func Response(v interface{}, elapsed time.Duration, err error) Receiver

Response ...

type SendOptions

type SendOptions struct {
	Context        interface{}
	From           interface{}
	AllowOpenReads bool
	DenyOpenReads  bool
}

SendOptions ...

type Service

type Service interface {
	// Send a command with args from a client
	Send(args []string, opts *SendOptions) Receiver
	// Auth authorizes a client
	Auth(auth string) error
	// Monitor returns a service monitor for observing client commands.
	Monitor() Monitor
	// Opened
	Opened(addr string) (context interface{}, accept bool)
	// Closed
	Closed(context interface{}, addr string)
}

Service is a client facing service.

type Snapshot

type Snapshot interface {
	Persist(io.Writer) error
	Done(path string)
}

A Snapshot is an interface that allows for Raft snapshots to be taken.

type Store

type Store struct {
	// contains filtered or unexported fields
}

func OpenStore

func OpenStore(path string, logFlags, stableFlags mdbx.EnvFlags, mode os.FileMode) (*Store, error)

func (*Store) Close

func (s *Store) Close() error

func (*Store) DeleteRange

func (s *Store) DeleteRange(min, max uint64) error

DeleteRange deletes a range of log entries. The range is inclusive.

func (*Store) FirstIndex

func (s *Store) FirstIndex() (uint64, error)

FirstIndex returns the first index written. 0 for no entries.

func (*Store) Get

func (s *Store) Get(key []byte) (result []byte, err error)

Get returns the value for key, or an empty byte slice if key was not found.

func (*Store) GetLog

func (s *Store) GetLog(index uint64, log *raft.Log) error

GetLog gets a log entry at a given index.

func (*Store) GetUint64

func (s *Store) GetUint64(key []byte) (result uint64, err error)

GetUint64 returns the uint64 value for key, or 0 if key was not found.

func (*Store) LastIndex

func (s *Store) LastIndex() (uint64, error)

LastIndex returns the last index written. 0 for no entries.

func (*Store) Set

func (s *Store) Set(key []byte, val []byte) error

func (*Store) SetUint64

func (s *Store) SetUint64(key []byte, val uint64) error

func (*Store) StoreLog

func (s *Store) StoreLog(log *raft.Log) error

StoreLog stores a log entry.

func (*Store) StoreLogs

func (s *Store) StoreLogs(logs []*raft.Log) error

StoreLogs stores multiple log entries.

func (*Store) Sync

func (s *Store) Sync() error

func (*Store) SyncLog

func (s *Store) SyncLog() error

func (*Store) SyncStable

func (s *Store) SyncStable() error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL