etcdseq

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2024 License: MIT Imports: 9 Imported by: 0

README

etcdseq GoDoc

Etcdseq is an lightweight library to support sequence nodes via etcd.

Sequence nodes can be achieved by other ways, such as:

  • Zookeeper sequence nodes
  • K8S StatefulSet

However, if you're using etcd rather than zookeeper and using K8S Deployment rather than StatefulSet, etcdseq can help.

What's more, etcdseq provides index/count pair as node info, which is useful when you need the number of total nodes. Zookeeper and K8S only provide index.

Design

Assume you have 3 nodes, every nodes would be assigned with an index/count pair.

  • node1: index=0 count=3
  • node2: index=1 count=3
  • node3: index=2 count=3

When new nodes added, count would change.

  • node1: index=0 count=4
  • node2: index=1 count=4
  • node3: index=2 count=4
  • node4: index=3 count=4

When nodes removed, indexes of the later nodes would change, and count would change too.

  • node1: index=0 count=3
  • node3: index=1 count=3
  • node4: index=2 count=3

Usage

Simply use an ChanHandler to recieve the node change event sequentially.

    // first create an etcd connection
    etcdClient, err := clientv3.New(clientv3.Config{
		Endpoints: []string{"http://localhost:2379"},
	})

    // create an channel to receive events
    ch := make(chan etcdseq.Info, 100)

    // key is the identifier of the nodes group
    key := "service-group"

    // create an EtcdSeq
    seq := etcdseq.NewEtcdSeq(etcdClient, key, etcdseq.NewChanHandler(ch))

    // start EtcdSeq service
	err = seq.Start()

    // make sure you stop EtcdSeq after you no longer need it.
    seq.Stop()

Use the following structure of code to handle the node change event. The key point is startService(info) which means you start your own service and allocate the resources according to the index/count pair. And remember using stopService to release the resources when node changed. If info.Invalid(), then there's no node alive anymore, just stop your service.

func run(ctx context.Context) {
    var oldInfo etcdseq.Info
	for {
		select {
		case <-ctx.Done():
			stopService()
			return
		case info := <-ch:
			// drain for the latest (it's an optional optimization)
			for i := 0; i < len(ch); i++ {
				info = <-ch
			}
			if info.Invalid() {
				stopService()
				continue
			}
			if oldInfo.Equal(info) {
                // remain unchanged (it's an optional optimization)
				continue
			}
            stopService()
            startService(info)
            oldInfo = info
		}
	}
}

License

Released under the MIT License.

Documentation

Index

Constants

View Source
const TimeToLive = 10

Variables

View Source
var DefaultInfo = Info{
	Index: indexInvalid,
	Count: countInvalid,
}

Functions

This section is empty.

Types

type ChanHandler

type ChanHandler struct {
	// contains filtered or unexported fields
}

func NewChanHandler

func NewChanHandler(ch chan<- Info) *ChanHandler

func (*ChanHandler) OnChange

func (h *ChanHandler) OnChange(info Info)

type DoneChan

type DoneChan struct {
	// contains filtered or unexported fields
}

A DoneChan is used as a channel that can be closed multiple times and wait for done.

func NewDoneChan

func NewDoneChan() *DoneChan

NewDoneChan returns a DoneChan.

func (*DoneChan) Close

func (dc *DoneChan) Close()

Close closes dc, it's safe to close more than once.

func (*DoneChan) Done

func (dc *DoneChan) Done() chan struct{}

Done returns a channel that can be notified on dc closed.

type EtcdSeq

type EtcdSeq struct {
	// contains filtered or unexported fields
}

func NewEtcdSeq

func NewEtcdSeq(client *clientv3.Client, key string, handler Handler, opts ...EtcdSeqOption) *EtcdSeq

func (*EtcdSeq) Lock added in v0.2.0

func (l *EtcdSeq) Lock(ctx context.Context, info Info) (Unlocker, error)

func (*EtcdSeq) Start

func (l *EtcdSeq) Start() error

func (*EtcdSeq) Stop

func (l *EtcdSeq) Stop()

type EtcdSeqOption

type EtcdSeqOption func(client *EtcdSeq)

func WithLogger

func WithLogger(logger Logger) EtcdSeqOption

type Handler

type Handler interface {
	OnChange(info Info)
}

type Info

type Info struct {
	Index int
	Count int
	// contains filtered or unexported fields
}

func (Info) Equal

func (i Info) Equal(other Info) bool

func (Info) Invalid

func (i Info) Invalid() bool

func (*Info) Reset

func (i *Info) Reset()

func (Info) String

func (i Info) String() string

type Logger

type Logger interface {
	Debugf(format string, args ...interface{})
	Infof(format string, args ...interface{})
	Errorf(format string, args ...interface{})
}

type Unlocker added in v0.2.0

type Unlocker func(ctx context.Context) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL