leader

package
v0.10.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2021 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package leader provides a simple leader election implementation.

Introduction

Leader election is particularly useful if the state cannot be rule out of your application. For example, you want to run some cron jobs to scan the database, but you have more than one instances up and running. Running cron jobs on all instances many not be desired. With the help of package leader, you can opt to only run such jobs on the leader node. When the leader goes down, a new leader will be elected. The cron job runner is therefore highly available.

Usage

The package leader exports configuration in this format:

leader:
  etcdName: default

To use package leader with package core:

var c *core.C = core.Default()
c.Provide(otetcd.Providers) // to provide the underlying driver
c.Provide(leader.Providers)
c.Invoke(func(status *leader.Status) {
	if ! status.IsLeader {
		return
	}
	// DO SOMETHING ON LEADER
})
Example (Cronjob)
package main

import (
	"context"
	"fmt"
	"os"

	"github.com/DoNewsCode/core"
	"github.com/DoNewsCode/core/di"
	"github.com/DoNewsCode/core/leader"
	"github.com/DoNewsCode/core/otetcd"
	"github.com/robfig/cron/v3"
)

type CronModule struct {
	Sts *leader.Status
}

func (s CronModule) ProvideCron(crontab *cron.Cron) {
	crontab.AddFunc("* * * * * *", func() {
		if s.Sts.IsLeader() {
			fmt.Println("do work as leader")
		}
	})
}

func main() {
	if os.Getenv("ETCD_ADDR") == "" {
		fmt.Println("set ETCD_ADDR to run this example")
		return
	}
	c := core.Default(core.WithInline("log.level", "none"))
	c.Provide(di.Deps{func() *cron.Cron {
		return cron.New(cron.WithSeconds())
	}})
	c.Provide(otetcd.Providers())
	c.Provide(leader.Providers())
	c.Invoke(func(sts *leader.Status) {
		c.AddModule(CronModule{Sts: sts})
	})
	c.Serve(context.Background())
}
Output:

Example (Providers)
package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/DoNewsCode/core"
	"github.com/DoNewsCode/core/contract"
	"github.com/DoNewsCode/core/events"
	"github.com/DoNewsCode/core/leader"
)

type AlwaysLeaderDriver struct{}

func (a AlwaysLeaderDriver) Campaign(ctx context.Context) error {
	return nil
}

func (a AlwaysLeaderDriver) Resign(ctx context.Context) error {
	return nil
}

func main() {
	if os.Getenv("ETCD_ADDR") == "" {
		fmt.Println("set ETCD_ADDR to run this example")
		return
	}
	c := core.Default(core.WithInline("log.level", "none"))
	c.Provide(leader.Providers(leader.WithDriver(AlwaysLeaderDriver{})))

	c.Invoke(func(dispatcher contract.Dispatcher, sts *leader.Status) {
		dispatcher.Subscribe(events.Listen(leader.OnStatusChanged, func(ctx context.Context, event interface{}) error {
			// Becomes true when campaign succeeds and becomes false when resign
			fmt.Println(event.(leader.OnStatusChangedPayload).Status.IsLeader())
			return nil
		}))
	})

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	c.Serve(ctx)

}
Output:

true
false
Example (Server)
package main

import (
	"context"
	"fmt"
	"net/http"
	"os"
	"time"

	"github.com/DoNewsCode/core"
	"github.com/DoNewsCode/core/contract"
	"github.com/DoNewsCode/core/events"
	"github.com/DoNewsCode/core/leader"
	"github.com/DoNewsCode/core/otetcd"
	"github.com/gorilla/mux"
)

type ServerModule struct {
	Sts *leader.Status
}

func (s ServerModule) ProvideHTTP(router *mux.Router) {
	router.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
		if s.Sts.IsLeader() {
			writer.Write([]byte("I am leader"))
		} else {
			writer.Write([]byte("I am follower"))
		}
	})
}

func main() {
	if os.Getenv("ETCD_ADDR") == "" {
		fmt.Println("set ETCD_ADDR to run this example")
		return
	}
	c := core.Default(core.WithInline("log.level", "none"))
	c.Provide(otetcd.Providers())
	c.Provide(leader.Providers())
	c.Invoke(func(dispatcher contract.Dispatcher) {
		// This listener will be called twice. Once on becoming the leader and once on resigning the leader.
		dispatcher.Subscribe(events.Listen(leader.OnStatusChanged, func(ctx context.Context, event interface{}) error {
			fmt.Println(event.(leader.OnStatusChangedPayload).Status.IsLeader())
			return nil
		}))
	})
	c.Invoke(func(sts *leader.Status) {
		c.AddModule(ServerModule{Sts: sts})
	})
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	c.Serve(ctx)

}
Output:

true
false

Index

Examples

Constants

View Source
const OnStatusChanged event = "onStatusChanged"

OnStatusChanged is an event that triggers when the leadership has transited. It's payload is OnStatusChangedPayload.

Variables

View Source
var ErrNotALeader = errors.New("not a leader")

ErrNotALeader is an error triggered when Resign is called but the current node is not leader.

Functions

func Providers

func Providers(opt ...ProvidersOptionFunc) di.Deps

Providers returns a set of dependency providers for *Election and *Status.

Depends On:
	contract.ConfigAccessor
	contract.Dispatcher
	contract.DIPopulator
Provides:
	Election *Election
	Status   *Status

Types

type Driver

type Driver interface {
	// Campaign starts a leader election. It should block until elected or context canceled.
	Campaign(ctx context.Context) error
	// Resign makes the current node a follower.
	Resign(context.Context) error
}

Driver models a external storage that can be used for leader election.

type DriverArgs added in v0.9.1

type DriverArgs struct {
	Populator contract.DIPopulator
}

DriverArgs is the argument for constructing new drivers.

type Election

type Election struct {
	// contains filtered or unexported fields
}

Election is a struct that controls the leader election. Whenever the leader status changed on this node, an event will be triggered. See example for how to listen this event.

func NewElection

func NewElection(dispatcher contract.Dispatcher, driver Driver) *Election

NewElection returns a pointer to the newly constructed Election instance.

func (*Election) Campaign

func (e *Election) Campaign(ctx context.Context) error

Campaign starts a leader election. It will block until this node becomes a leader or context cancelled.

func (*Election) Resign

func (e *Election) Resign(ctx context.Context) error

Resign gives up the leadership.

func (*Election) Status

func (e *Election) Status() *Status

Status returns the current status of the election.

type OnStatusChangedPayload added in v0.8.0

type OnStatusChangedPayload struct {
	Status *Status
}

OnStatusChangedPayload is the payload of OnStatusChanged.

type Option

type Option struct {
	// The name of the etcd instance.
	EtcdName string `json:"etcdName" yaml:"etcdName"`
}

Option is the available options to configure package leader.

type ProvidersOptionFunc added in v0.9.0

type ProvidersOptionFunc func(options *providersOption)

ProvidersOptionFunc is the type of functional providersOption for Providers. Use this type to change how Providers work.

func WithDriver added in v0.9.0

func WithDriver(driver Driver) ProvidersOptionFunc

WithDriver instructs the Providers to accept a leader election driver different from the default one. This option supersedes the WithDriverConstructor option.

func WithDriverConstructor added in v0.9.0

func WithDriverConstructor(f func(args DriverArgs) (Driver, error)) ProvidersOptionFunc

WithDriverConstructor instructs the Providers to accept an alternative constructor for election driver. If the WithDriver option is set, this option becomes an no-op.

type Status

type Status struct {
	// contains filtered or unexported fields
}

Status is a type that describes whether the current node is leader.

func (Status) IsLeader

func (s Status) IsLeader() bool

IsLeader returns true if the current node is leader.

Directories

Path Synopsis
Package leaderetcd provides a etcd driver for package leader
Package leaderetcd provides a etcd driver for package leader
Package leaderredis provides a redis driver for package leader
Package leaderredis provides a redis driver for package leader

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL