leader

package
v0.13.1-beta1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2022 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package leader provides a simple leader election implementation.

Introduction

Leader election is particularly useful if the state cannot be rule out of your application. For example, you want to run some cron jobs to scan the database, but you have more than one instances up and running. Running cron jobs on all instances many not be desired. With the help of package leader, you can opt to only run such jobs on the leader node. When the leader goes down, a new leader will be elected. The cron job runner is therefore highly available.

Usage

The package leader exports configuration in this format:

leader:
  etcdName: default

To use package leader with package core:

var c *core.C = core.Default()
c.Provide(otetcd.Providers) // to provide the underlying driver
c.Provide(leader.Providers)
c.Invoke(func(status *leader.Status) {
	if ! status.IsLeader {
		return
	}
	// DO SOMETHING ON LEADER
})
Example (Cronjob)
package main

import (
	"context"
	"fmt"
	"os"

	"github.com/DoNewsCode/core"
	"github.com/DoNewsCode/core/di"
	"github.com/DoNewsCode/core/leader"
	"github.com/DoNewsCode/core/otetcd"

	"github.com/robfig/cron/v3"
)

type CronModule struct {
	Sts *leader.Status
}

func (s CronModule) ProvideCron(crontab *cron.Cron) {
	crontab.AddFunc("* * * * * *", func() {
		if s.Sts.IsLeader() {
			fmt.Println("do work as leader")
		}
	})
}

func main() {
	if os.Getenv("ETCD_ADDR") == "" {
		fmt.Println("set ETCD_ADDR to run this example")
		return
	}
	c := core.Default(core.WithInline("log.level", "none"))
	c.Provide(di.Deps{func() *cron.Cron {
		return cron.New(cron.WithSeconds())
	}})
	c.Provide(otetcd.Providers())
	c.Provide(leader.Providers())
	c.Invoke(func(sts *leader.Status) {
		c.AddModule(CronModule{Sts: sts})
	})
	c.Serve(context.Background())
}
Output:

Example (Providers)
package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/DoNewsCode/core"
	"github.com/DoNewsCode/core/leader"
)

type AlwaysLeaderDriver struct {
}

func (a AlwaysLeaderDriver) Campaign(ctx context.Context, toLeader func(bool)) error {
	defer toLeader(false)
	toLeader(true)
	<-ctx.Done()
	return nil
}

func (a AlwaysLeaderDriver) Resign(ctx context.Context) error {
	return nil
}

func main() {
	if os.Getenv("ETCD_ADDR") == "" {
		fmt.Println("set ETCD_ADDR to run this example")
		return
	}
	c := core.Default(core.WithInline("log.level", "none"))
	c.Provide(leader.Providers(leader.WithDriver(AlwaysLeaderDriver{})))

	c.Invoke(func(statusChanged leader.StatusChanged) {
		statusChanged.On(func(ctx context.Context, status *leader.Status) error {
			// Becomes true when campaign succeeds and becomes false when resign
			fmt.Println(status.IsLeader())
			return nil
		})
	})

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	c.Serve(ctx)

}
Output:

true
false
Example (Server)
package main

import (
	"context"
	"fmt"
	"net/http"
	"os"
	"time"

	"github.com/DoNewsCode/core"
	"github.com/DoNewsCode/core/leader"
	"github.com/DoNewsCode/core/otetcd"

	"github.com/gorilla/mux"
)

type ServerModule struct {
	Sts *leader.Status
}

func (s ServerModule) ProvideHTTP(router *mux.Router) {
	router.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
		if s.Sts.IsLeader() {
			writer.Write([]byte("I am leader"))
		} else {
			writer.Write([]byte("I am follower"))
		}
	})
}

func main() {
	if os.Getenv("ETCD_ADDR") == "" {
		fmt.Println("set ETCD_ADDR to run this example")
		return
	}
	c := core.Default(core.WithInline("log.level", "none"))
	c.Provide(otetcd.Providers())
	c.Provide(leader.Providers())
	c.Invoke(func(statusChanged leader.StatusChanged) {
		// This listener will be called twice. Once on becoming the leader and once on resigning the leader.
		statusChanged.On(func(ctx context.Context, status *leader.Status) error {
			fmt.Println(status.IsLeader())
			return nil
		})
	})
	c.Invoke(func(sts *leader.Status) {
		c.AddModule(ServerModule{Sts: sts})
	})
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	c.Serve(ctx)

}
Output:

true
false

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrNotALeader = errors.New("not a leader")

ErrNotALeader is an error triggered when Resign is called but the current node is not leader.

Functions

func Providers

func Providers(opt ...ProvidersOptionFunc) di.Deps

Providers returns a set of dependency providers for *Election and *Status.

Depends On:
	contract.ConfigAccessor
	contract.Dispatcher
	contract.DIPopulator
Provides:
	*Election
	*Status
	StatusChanged

Types

type Dispatcher

type Dispatcher interface {
	// Fire dispatches leader election status
	Fire(ctx context.Context, status *Status) error
}

type Driver

type Driver interface {
	// Campaign starts a leader election. It should block context canceled.
	// The status must be updated with the campaign method.
	Campaign(ctx context.Context, toLeader func(bool)) error
	// Resign makes the current node a follower.
	Resign(context.Context) error
}

Driver models a external storage that can be used for leader election.

type DriverArgs added in v0.9.1

type DriverArgs struct {
	Populator contract.DIPopulator
}

DriverArgs is the argument for constructing new drivers.

type Election

type Election struct {
	// contains filtered or unexported fields
}

Election is a struct that controls the leader election. Whenever the leader status changed on this node, an event will be triggered. See example for how to listen this event.

func NewElection

func NewElection(dispatcher Dispatcher, driver Driver) *Election

NewElection returns a pointer to the newly constructed Election instance.

func (*Election) Campaign

func (e *Election) Campaign(ctx context.Context) error

Campaign starts a leader election. It will block until this node becomes a leader or context cancelled.

func (*Election) Resign

func (e *Election) Resign(ctx context.Context) error

Resign gives up the leadership.

func (*Election) Status

func (e *Election) Status() *Status

Status returns the current status of the election.

type Option

type Option struct {
	// The name of the etcd instance.
	EtcdName string `json:"etcdName" yaml:"etcdName"`
}

Option is the available options to configure package leader.

type ProvidersOptionFunc added in v0.9.0

type ProvidersOptionFunc func(options *providersOption)

ProvidersOptionFunc is the type of functional providersOption for Providers. Use this type to change how Providers work.

func WithDriver added in v0.9.0

func WithDriver(driver Driver) ProvidersOptionFunc

WithDriver instructs the Providers to accept a leader election driver different from the default one. This option supersedes the WithDriverConstructor option.

func WithDriverConstructor added in v0.9.0

func WithDriverConstructor(f func(args DriverArgs) (Driver, error)) ProvidersOptionFunc

WithDriverConstructor instructs the Providers to accept an alternative constructor for election driver. If the WithDriver option is set, this option becomes an no-op.

type Status

type Status struct {
	// contains filtered or unexported fields
}

Status is a type that describes whether the current node is leader.

func (Status) IsLeader

func (s Status) IsLeader() bool

IsLeader returns true if the current node is leader.

type StatusChanged

type StatusChanged interface {
	On(func(ctx context.Context, status *Status) error) (unsubscribe func())
}

StatusChanged is a channel for receiving StatusChanged events.

Directories

Path Synopsis
Package leaderetcd provides a etcd driver for package leader
Package leaderetcd provides a etcd driver for package leader
Package leaderredis provides a redis driver for package leader
Package leaderredis provides a redis driver for package leader

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL