Documentation ¶
Overview ¶
Package goneli implements the (Non-)Exclusive Leader Induction protocol (NELI), published in https://github.com/obsidiandynamics/NELI.
This implementation is for the 'fast' variation of the protocol, running in exclusive mode over a single NELI group.
This implementation is thread-safe.
Example ¶
// Create a new Neli curator. neli, err := New(Config{ KafkaConfig: KafkaConfigMap{ "bootstrap.servers": "localhost:9092", }, LeaderGroupID: "my-app-name.group", LeaderTopic: "my-app-name.topic", }) if err != nil { panic(err) } // Starts a pulser Goroutine in the background, which will automatically terminate when Neli is closed. p, _ := neli.Background(func() { // An activity performed by the client application if it is the elected leader. This task should // perform a small amount of work that is exclusively attributable to a leader, and return immediately. // For as long as the associated Neli instance is the leader, this task will be invoked repeatedly; // therefore, it should break down any long-running work into bite-sized chunks that can be safely // performed without causing excessive blocking. log.Printf("Do important leader stuff") time.Sleep(100 * time.Millisecond) }) // Blocks until Neli is closed or an unrecoverable error occurs. panic(p.Await())
Output:
Example (LowLevel) ¶
// Bootstrap a custom logger. log := logrus.StandardLogger() log.SetLevel(logrus.TraceLevel) // Configure Neli. config := Config{ KafkaConfig: KafkaConfigMap{ "bootstrap.servers": "localhost:9092", }, LeaderGroupID: "my-app-name.group", LeaderTopic: "my-app-name.topic", Scribe: scribe.New(scribelogrus.Bind()), } // Handler of leader status updates. Used to initialise state upon leader acquisition, and to // wrap up in-flight work upon loss of leader status. barrier := func(e Event) { switch e.(type) { case LeaderAcquired: // The application may initialise any state necessary to perform work as a leader. log.Infof("Received event: leader elected") case LeaderRevoked: // The application may block the Barrier callback until it wraps up any in-flight // activity. Only upon returning from the callback, will a new leader be elected. log.Infof("Received event: leader revoked") case LeaderFenced: // The application must immediately terminate any ongoing activity, on the assumption // that another leader may be imminently elected. Unlike the handling of LeaderRevoked, // blocking in the Barrier callback will not prevent a new leader from being elected. log.Infof("Received event: leader fenced") } } // Create a new Neli curator, supplying the barrier as an optional argument. neli, err := New(config, barrier) if err != nil { panic(err) } // Pulsing is done in a separate Goroutine. (We don't have to, but it's often practical to do so.) go func() { defer neli.Close() for { // Pulse our presence, allowing for some time to acquire leader status. // Will return instantly if already leader. isLeader, err := neli.Pulse(10 * time.Millisecond) if err != nil { // Only fatal errors are returned from Pulse(). panic(err) } if isLeader { // We hold leader status... can safely do some work. // Avoid blocking for too long, otherwise we may miss a poll and lose leader status. log.Infof("Do important leader stuff") time.Sleep(100 * time.Millisecond) } } }() // Blocks until Neli is closed. neli.Await()
Output:
Example (SecureBroker) ¶
// Connects to a secure broker over TLS, using SASL authentication. neli, err := New(Config{ KafkaConfig: KafkaConfigMap{ "bootstrap.servers": "localhost:9092", "security.protocol": "sasl_ssl", "ssl.ca.location": "ca-cert.pem", "sasl.mechanism": "SCRAM-SHA-256", "sasl.username": "user", "sasl.password": "secret", }, LeaderGroupID: "my-app-name.group", LeaderTopic: "my-app-name.topic", }) if err != nil { panic(err) } p, _ := neli.Background(func() { log.Printf("Do important leader stuff") time.Sleep(100 * time.Millisecond) }) panic(p.Await())
Output:
Index ¶
- Constants
- Variables
- func Duration(d time.Duration) *time.Duration
- func Sanitise(name string) string
- type Barrier
- type Config
- type Event
- type KafkaConfigMap
- type KafkaConsumer
- type KafkaConsumerProvider
- type KafkaProducer
- type KafkaProducerProvider
- type LeaderAcquired
- type LeaderFenced
- type LeaderRevoked
- type LeaderTask
- type MockConfig
- type MockNeli
- type Neli
- type Pulser
- type State
Examples ¶
Constants ¶
const ( // DefaultPollDuration is the default value of Config.PollDuration DefaultPollDuration = 1 * time.Millisecond // DefaultMinPollInterval is the default value of Config.MinPollInterval DefaultMinPollInterval = 100 * time.Millisecond // DefaultHeartbeatTimeout is the default value of Config.HeartbeatTimeout DefaultHeartbeatTimeout = 5 * time.Second )
Variables ¶
var ErrNonLivePulse = fmt.Errorf("cannot pulse in non-live state")
ErrNonLivePulse is returned by Pulse() if the Neli instance has been closed.
Functions ¶
Types ¶
type Barrier ¶
type Barrier func(e Event)
Barrier is a callback function for handling Neli events during group rebalancing.
type Config ¶
type Config struct { KafkaConfig KafkaConfigMap LeaderTopic string LeaderGroupID string KafkaConsumerProvider KafkaConsumerProvider KafkaProducerProvider KafkaProducerProvider Scribe scribe.Scribe Name string PollDuration *time.Duration MinPollInterval *time.Duration HeartbeatTimeout *time.Duration }
Config encapsulates configuration for Neli.
func (*Config) SetDefaults ¶
func (c *Config) SetDefaults()
SetDefaults assigns the default values to optional fields.
type KafkaConfigMap ¶
type KafkaConfigMap map[string]interface{}
KafkaConfigMap represents the Kafka key-value configuration.
type KafkaConsumer ¶
type KafkaConsumer interface { Subscribe(topic string, rebalanceCb kafka.RebalanceCb) error ReadMessage(timeout time.Duration) (*kafka.Message, error) Close() error }
KafkaConsumer specifies the methods of a minimal consumer.
type KafkaConsumerProvider ¶
type KafkaConsumerProvider func(conf *KafkaConfigMap) (KafkaConsumer, error)
KafkaConsumerProvider is a factory for creating KafkaConsumer instances.
func StandardKafkaConsumerProvider ¶
func StandardKafkaConsumerProvider() KafkaConsumerProvider
StandardKafkaConsumerProvider returns a factory for creating a conventional KafkaConsumer, backed by the real client API.
type KafkaProducer ¶
type KafkaProducer interface { Events() chan kafka.Event Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error Close() }
KafkaProducer specifies the methods of a minimal producer.
type KafkaProducerProvider ¶
type KafkaProducerProvider func(conf *KafkaConfigMap) (KafkaProducer, error)
KafkaProducerProvider is a factory for creating KafkaProducer instances.
func StandardKafkaProducerProvider ¶
func StandardKafkaProducerProvider() KafkaProducerProvider
StandardKafkaProducerProvider returns a factory for creating a conventional KafkaProducer, backed by the real client API.
type LeaderAcquired ¶
type LeaderAcquired struct{}
LeaderAcquired is emitted upon successful acquisition of leader status, either through explicit partition assignment, or when a heartbeat is eventually received following a previously fenced state.
An application responding to a LeaderAcquired may initialise any state necessary to perform work as a leader.
func (LeaderAcquired) String ¶
func (e LeaderAcquired) String() string
String obtains a textual representation of the LeaderAcquired event.
type LeaderFenced ¶
type LeaderFenced struct{}
LeaderFenced is emitted when a suspected network partition occurs, and the leader voluntarily fences itself.
An application reacting to a LeaderFenced event must immediately terminate any ongoing activity, on the assumption that another leader may be imminently elected. Unlike the handling of LeaderRevoked, blocking in the Barrier callback will not prevent a new leader from being elected.
func (LeaderFenced) String ¶
func (e LeaderFenced) String() string
String obtains a textual representation of the LeaderFenced event.
type LeaderRevoked ¶
type LeaderRevoked struct{}
LeaderRevoked is emitted when the leader status has been revoked.
An application reacting to a LeaderRevoked event may block the Barrier callback until it wraps up any in-flight activity. Only upon returning from the callback, will a new leader be elected.
func (LeaderRevoked) String ¶
func (e LeaderRevoked) String() string
String obtains a textual representation of the LeaderRevoked event.
type LeaderTask ¶
type LeaderTask func()
LeaderTask is an activity performed by the client application if it is the elected leader. The task should perform a small amount of work that is exclusively attributable to a leader, and return immediately. For as long as the associated Neli instance is the leader, the task will be invoked repeatedly; therefore, it should break down any long-running work into bite-sized chunks that can be safely performed without causing excessive blocking.
type MockConfig ¶
MockConfig encapsulates the configuration for MockNeli.
func (*MockConfig) SetDefaults ¶
func (c *MockConfig) SetDefaults()
SetDefaults assigns the default values to optional fields.
func (MockConfig) String ¶
func (c MockConfig) String() string
Obtains a textual representation of the configuration.
func (MockConfig) Validate ¶
func (c MockConfig) Validate() error
Validate the MockConfig, returning an error if invalid.
type MockNeli ¶
MockNeli is a mock of Neli, producing the same behaviour as the real thing, but without contending for leadership. Instead, leader status is assigned/revoked via the Transition method.
type Neli ¶
type Neli interface { IsLeader() bool Pulse(timeout time.Duration) (bool, error) PulseCtx(ctx context.Context) (bool, error) Deadline() concurrent.Deadline Close() error Await() State() State Background(task LeaderTask) (Pulser, error) }
Neli is a curator for leader election.
type Pulser ¶
Pulser performs continuous pulsing of a Neli instance from a dedicated background Goroutine.