Documentation
¶
Index ¶
- Constants
- Variables
- func AuthzStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, ...) error
- func AuthzUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, ...) (interface{}, error)
- func GetLogLevel(level string) (uint32, error)
- type ActivityStreamConfig
- type ClusteringConfig
- type Config
- type CursorsStreamConfig
- type EventTimestamps
- type GroupsConfig
- type HostPort
- type RaftLog
- type RaftLogListener
- type Server
- func (s *Server) AddRaftLogListener(listener RaftLogListener)
- func (s *Server) Apply(l *raft.Log) interface{}
- func (s *Server) GetListenPort() int
- func (s *Server) IsLeader() bool
- func (s *Server) IsRunning() bool
- func (s *Server) Restore(snapshot io.ReadCloser) error
- func (s *Server) Snapshot() (raft.FSMSnapshot, error)
- func (s *Server) Start() (err error)
- func (s *Server) Stop() error
- type StreamsConfig
Constants ¶
const ( // DefaultNamespace is the default cluster namespace to use if one is not // specified. DefaultNamespace = "liftbridge-default" // DefaultPort is the port to bind to if one is not specified. DefaultPort = 9292 )
const Version = "v1.9.0"
Version of the Liftbridge server.
Variables ¶
var ( // ErrStreamExists is returned by CreateStream when attempting to create a // stream that already exists. ErrStreamExists = errors.New("stream already exists") // ErrStreamNotFound is returned by DeleteStream/PauseStream when // attempting to delete/pause a stream that does not exist. ErrStreamNotFound = errors.New("stream does not exist") // ErrPartitionNotFound is returned by PauseStream when attempting to pause // a stream partition that does not exist. ErrPartitionNotFound = errors.New("partition does not exist") // ErrConsumerGroupExists is returned by createConsumerGroup when // attempting to create a group that already exists. ErrConsumerGroupExists = errors.New("consumer group already exists") // ErrConsumerGroupNotFound is returned by JoinConsumerGroup when // attempting to join a group that does not exist. ErrConsumerGroupNotFound = errors.New("consumer group does not exist") // ErrConsumerAlreadyMember is returned by JoinConsumerGroup when the // consumer is already a member of the group. ErrConsumerAlreadyMember = errors.New("consumer is already a member of the consumer group") // ErrConsumerNotMember is returned by LeaveConsumerGroup when the // consumer if not a member of the group. ErrConsumerNotMember = errors.New("consumer is not a member of the consumer group") // ErrBrokerNotCoordinator is returned by GetConsumerGroupAssignments when // this server is not the coordinator for the requested consumer group. ErrBrokerNotCoordinator = errors.New("broker is not the consumer group coordinator") // ErrGroupEpoch is returned by GetConsumerGroupAssignments when the // client-provided group epoch differs from the server-side group epoch. ErrGroupEpoch = errors.New("client-provided group epoch differs from broker group epoch") )
Functions ¶
func AuthzStreamInterceptor ¶ added in v1.9.0
func AuthzStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error
AuthzStreamInterceptor gets user from TLS-authenticated stream request and add user to ctx
func AuthzUnaryInterceptor ¶ added in v1.9.0
func AuthzUnaryInterceptor( ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (interface{}, error)
AuthzUnaryInterceptor gets user from TLS-authenticated request and add user to ctx
func GetLogLevel ¶
GetLogLevel converts the level string to its corresponding int value. It returns an error if the level is invalid.
Types ¶
type ActivityStreamConfig ¶
type ActivityStreamConfig struct { Enabled bool PublishTimeout time.Duration PublishAckPolicy client.AckPolicy }
ActivityStreamConfig contains settings for controlling activity stream behavior.
type ClusteringConfig ¶
type ClusteringConfig struct { ServerID string Namespace string RaftSnapshots int RaftSnapshotThreshold uint64 RaftCacheSize int RaftBootstrapSeed bool RaftBootstrapPeers []string RaftMaxQuorumSize uint ReplicaMaxLagTime time.Duration ReplicaMaxLeaderTimeout time.Duration ReplicaFetchTimeout time.Duration ReplicaMaxIdleWait time.Duration MinISR int ReplicationMaxBytes int64 }
ClusteringConfig contains settings for controlling cluster behavior.
type Config ¶
type Config struct { Listen HostPort Host string Port int LogLevel uint32 LogRecovery bool LogRaft bool LogNATS bool LogSilent bool DataDir string BatchMaxMessages int BatchMaxTime time.Duration MetadataCacheMaxAge time.Duration TLSKey string TLSCert string TLSClientAuth bool TLSClientAuthCA string TLSClientAuthz bool TLSClientAuthzModel string TLSClientAuthzPolicy string NATS nats.Options EmbeddedNATS bool EmbeddedNATSConfig string Streams StreamsConfig Clustering ClusteringConfig ActivityStream ActivityStreamConfig CursorsStream CursorsStreamConfig Groups GroupsConfig }
Config contains all settings for a Liftbridge Server.
func NewConfig ¶
NewConfig creates a new Config with default settings and applies any settings from the given configuration file.
func NewDefaultConfig ¶
func NewDefaultConfig() *Config
NewDefaultConfig creates a new Config with default settings.
func (Config) GetConnectionAddress ¶
GetConnectionAddress returns the host if specified and listen otherwise.
func (Config) GetListenAddress ¶
GetListenAddress returns the address and port to listen to.
func (Config) NATSServersString ¶ added in v1.5.0
NATSServersString returns a human-readable string representation of the list of NATS servers.
type CursorsStreamConfig ¶ added in v1.3.0
type CursorsStreamConfig struct { Partitions int32 ReplicationFactor int32 AutoPauseTime time.Duration }
CursorsStreamConfig contains settings for controlling cursors stream behavior.
type EventTimestamps ¶ added in v1.4.0
type EventTimestamps struct {
// contains filtered or unexported fields
}
EventTimestamps contains the first and latest times when an event has occurred.
type GroupsConfig ¶ added in v1.8.0
GroupsConfig contains settings for controlling consumer group behavior.
type RaftLogListener ¶ added in v1.6.0
type RaftLogListener interface {
Receive(*RaftLog)
}
RaftLogListener is a listener for Raft logs.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the main Liftbridge object. Create it by calling New or RunServerWithConfig.
func RunServerWithConfig ¶
RunServerWithConfig creates and starts a new Server with the given configuration. It returns an error if the Server failed to start.
func (*Server) AddRaftLogListener ¶ added in v1.6.0
func (s *Server) AddRaftLogListener(listener RaftLogListener)
AddRaftLogListener adds a Raft log listener.
func (*Server) Apply ¶
Apply applies a Raft log entry to the controller FSM. This is invoked by Raft once a log entry is committed. It returns a value which will be made available on the ApplyFuture returned by Raft.Apply if that method was called on the same Raft node as the FSM.
Note that, on restart, this can be called for entries that have already been committed to Raft as part of the recovery process. As such, this should be an idempotent call.
func (*Server) GetListenPort ¶ added in v1.3.0
GetListenPort returns the port the server is listening to. Returns 0 if the server is not listening.
func (*Server) IsLeader ¶
IsLeader indicates if the server is currently the metadata leader or not. If consistency is required for an operation, it should be threaded through the Raft cluster since that is the single source of truth. If a server thinks it's leader when it's not, the operation it proposes to the Raft cluster will fail.
func (*Server) IsRunning ¶
IsRunning indicates if the server is currently running or has been stopped.
func (*Server) Restore ¶
func (s *Server) Restore(snapshot io.ReadCloser) error
Restore is used to restore an FSM from a snapshot. It is not called concurrently with any other command. The FSM must discard all previous state.
func (*Server) Snapshot ¶
func (s *Server) Snapshot() (raft.FSMSnapshot, error)
Snapshot is used to support log compaction. This call should return an FSMSnapshot which can be used to save a point-in-time snapshot of the FSM. Apply and Snapshot are not called in multiple threads, but Apply will be called concurrently with Persist. This means the FSM should be implemented in a fashion that allows for concurrent updates while a snapshot is happening.
type StreamsConfig ¶
type StreamsConfig struct { RetentionMaxBytes int64 RetentionMaxMessages int64 RetentionMaxAge time.Duration CleanerInterval time.Duration SegmentMaxBytes int64 SegmentMaxAge time.Duration Compact bool CompactMaxGoroutines int AutoPauseTime time.Duration AutoPauseDisableIfSubscribers bool MinISR int ConcurrencyControl bool Encryption bool }
StreamsConfig contains settings for controlling the message log for streams.
func (*StreamsConfig) ApplyOverrides ¶ added in v1.2.0
func (l *StreamsConfig) ApplyOverrides(c *proto.StreamConfig)
ApplyOverrides applies the values from the StreamConfig protobuf to the StreamsConfig struct. If the value is present in the request's config section, it will be set in StreamsConfig.
func (StreamsConfig) AutoPauseString ¶ added in v1.3.0
func (l StreamsConfig) AutoPauseString() string
AutoPauseString returns a human-readable string representation of the auto pause setting.
func (StreamsConfig) RetentionString ¶
func (l StreamsConfig) RetentionString() string
RetentionString returns a human-readable string representation of the retention policy.