Documentation ¶
Index ¶
Constants ¶
const ( // DefaultNamespace is the default cluster namespace to use if one is not // specified. DefaultNamespace = "liftbridge-default" // DefaultPort is the port to bind to if one is not specified. DefaultPort = 9292 )
Variables ¶
var ErrStreamExists = errors.New("stream already exists")
ErrStreamExists is returned by CreateStream when attempting to create a stream that already has the provided subject and name.
Functions ¶
func GetLogLevel ¶
GetLogLevel converts the level string to its corresponding int value. It returns an error if the level is invalid.
Types ¶
type ClusteringConfig ¶
type ClusteringConfig struct { ServerID string Namespace string RaftSnapshots int RaftSnapshotThreshold uint64 RaftCacheSize int RaftBootstrapSeed bool RaftBootstrapPeers []string RaftLogging bool ReplicaMaxLagTime time.Duration ReplicaMaxLeaderTimeout time.Duration ReplicaFetchTimeout time.Duration MinISR int }
ClusteringConfig contains settings for controlling cluster behavior.
type CommitLog ¶
type CommitLog interface { // Delete closes the log and removes all data associated with it from the // filesystem. Delete() error // NewReader creates a new Reader starting at the given offset. If // uncommitted is true, the Reader will read uncommitted messages from the // log. Otherwise, it will only return committed messages. NewReader(offset int64, uncommitted bool) (*commitlog.Reader, error) // Truncate removes all messages from the log starting at the given offset. Truncate(offset int64) error // NewestOffset returns the offset of the last message in the log or -1 if // empty. NewestOffset() int64 // OldestOffset returns the offset of the first message in the log or -1 if // empty. OldestOffset() int64 // OffsetForTimestamp returns the earliest offset whose timestamp is // greater than or equal to the given timestamp. OffsetForTimestamp(timestamp int64) (int64, error) // SetHighWatermark sets the high watermark on the log. All messages up to // and including the high watermark are considered committed. SetHighWatermark(hw int64) // HighWatermark returns the high watermark for the log. HighWatermark() int64 // Append writes the given batch of messages to the log and returns their // corresponding offsets in the log. Append(msg []*proto.Message) ([]int64, error) // AppendMessageSet writes the given message set data to the log and // returns the corresponding offsets in the log. AppendMessageSet(ms []byte) ([]int64, error) // Clean applies retention and compaction rules against the log, if // applicable. Clean() error // Close closes each log segment file and stops the background goroutine // checkpointing the high watermark to disk. Close() error }
CommitLog is the durable write-ahead log interface used to back each stream.
type Config ¶
type Config struct { Host string Port int LogLevel uint32 NoLog bool DataDir string BatchMaxMessages int BatchWaitTime time.Duration MetadataCacheMaxAge time.Duration TLSKey string TLSCert string NATS nats.Options Log LogConfig Clustering ClusteringConfig }
Config contains all settings for a Liftbridge Server.
func NewConfig ¶
NewConfig creates a new Config with default settings and applies any settings from the given configuration file.
func NewDefaultConfig ¶
func NewDefaultConfig() *Config
NewDefaultConfig creates a new Config with default settings.
type LogConfig ¶
type LogConfig struct { RetentionMaxBytes int64 RetentionMaxMessages int64 RetentionMaxAge time.Duration CleanerInterval time.Duration SegmentMaxBytes int64 LogRollTime time.Duration Compact bool CompactMaxGoroutines int }
LogConfig contains settings for controlling the message log for a stream.
func (LogConfig) RetentionString ¶
RetentionString returns a human-readable string representation of the retention policy.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the main Liftbridge object. Create it by calling New or RunServerWithConfig.
func RunServerWithConfig ¶
RunServerWithConfig creates and starts a new Server with the given configuration. It returns an error if the Server failed to start.
func (*Server) Apply ¶
Apply applies a Raft log entry to the controller FSM. This is invoked by Raft once a log entry is committed. It returns a value which will be made available on the ApplyFuture returned by Raft.Apply if that method was called on the same Raft node as the FSM.
Note that, on restart, this can be called for entries that have already been committed to Raft as part of the recovery process. As such, this should be an idempotent call.
func (*Server) Restore ¶
func (s *Server) Restore(snapshot io.ReadCloser) error
Restore is used to restore an FSM from a snapshot. It is not called concurrently with any other command. The FSM must discard all previous state.
func (*Server) Snapshot ¶
func (s *Server) Snapshot() (raft.FSMSnapshot, error)
Snapshot is used to support log compaction. This call should return an FSMSnapshot which can be used to save a point-in-time snapshot of the FSM. Apply and Snapshot are not called in multiple threads, but Apply will be called concurrently with Persist. This means the FSM should be implemented in a fashion that allows for concurrent updates while a snapshot is happening.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package commitlog provides an implementation for a file-backed write-ahead log.
|
Package commitlog provides an implementation for a file-backed write-ahead log. |
Package conf supports a configuration file format used by gnatsd.
|
Package conf supports a configuration file format used by gnatsd. |
Package proto is a generated protocol buffer package.
|
Package proto is a generated protocol buffer package. |