dsync

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 31, 2024 License: MIT Imports: 6 Imported by: 0

README

build-img pkg-img reportcard-img license-img

Distributed Sync (dsync)

dsync is for distributed sync. It aims to solve problems where coordination between different pods running the same service are required.

k8s offers an API to allow you to execute leader election. However, that API is much more complex than we actually require. So this library was written as a wrapper around it.

Upfront warning about testing

DO NOT import the fake k8s client. It is huge and will make go vet take forever and make staticcheck OOM kill your pipeline. This package uses it to confirm things so that you do not have to. Which is why this package does not use staticcheck in pipeline.

Rather use dsync.New(drivers.NewMockDriver(), "podName") to construct a client for your tests.

Constructors

For a more complete example look at examples/main.go

package main

import (
	"context"
	"os"

	"github.com/tempcke/dsync"
	"github.com/tempcke/dsync/configs"
	"github.com/tempcke/dsync/drivers"
)

func main() {
	var (
		ctx   = context.Background()
		envFn = os.Getenv // could use viper.GetString
	)

	// construct driver
	conf := configs.WithDefaults(envFn, configs.Defaults)
	driver, err := drivers.NewKubeDriver(conf)
	if err != nil {
		panic(err)
	}

	// construct dsync with driver
	d := dsync.New(driver, conf(configs.KeyPodName))
	e := d.Election(ctx, "token-manager")
	// if you don't check Err() all methods just no-op
	if err := e.Err(); err != nil {
		panic(err)
	}
	e.WhenElected(func(ctx context.Context) {
		// do stuff until ctx is canceled
	})
}
Configuration

conf configs.Config allows you to define the configuration. This is obviously important for tests or if you use different env vars than the ones it expects.

NOTE: For an example of how to setup your helm chart please look in example/pipeline

  • KUBE_NAMESPACE - the namespace you would use with kubectl -n which can be given to you easily in your helm chart
  • KUBE_LEASE_SCOPE - it is recommended to use the name of your service. because all leases are scoped to the namespace, it is possible that lease names from different services could conflict with each-other. Therefore, we provide a service level prefix to ensure that they don't. Because it is used to prefix lease names it must follow the same constraints that lease names have which is lower case letters a-z, numbers 0-9, and -'s only.
  • KUBE_POD_NAME - this can be anything that uniquely identifies your pod in the namespace. You can generate a uuid on start and use that, or you can actually use your real pod_name from your helm chart. If omitted it will use a global var set to a UUID for this running instance.
  • KUBE_CONFIG_PATH - you should never need to use this unless you want to use your own local kube config to run a test which will create and use real leases on the remote kubernetes cluster. These kinds of tests should only ever be created or ran from within this library and obviously could never run in pipeline.

Useful Methods

package dsync

import "context"

type Dsync struct{}
func (Dsync) Election(context.Context, string) Election
func (Dsync) NewLock(context.Context, string) Lock

type Election struct{}
func (Election) Resource() Resource
func (Election) Err() error
func (Election) GetLeader() string
func (Election) IsLeader() bool
func (Election) WhenElected(func(context.Context))
func (Election) Stop()

type Lock struct{}
func (Lock) Resource() Resource
func (Lock) Err() error
func (Lock) Lock() error
func (Lock) LockContext(context.Context) error
func (Lock) TryLock() error
func (Lock) Unlock() error
func (Lock) DoWithLock(context.Context, func() error) error

type Resource struct {
	Namespace string // optional
	Scope     string // optional
	Name      string
}
func (Resource) String() string
func (Resource) Equal(Resource) bool
func (Resource) Validate() error
func (Resource) ElectionName() string
func (Resource) LockName() string
Election
  • IsLeader() bool returns true if you are the leader. It is important to note that it does not wait to ensure that there is a leader before returning. It simply returns the current state and if the election hasn't finished yet, then you are not the leader yet.
  • WhenElected(ctx, func(ctx)) will result in your function being called after every election where you transition from not being the leader, to being the leader. The ctx will be canceled if at any time you stop being the leader for any reason. So given two pods A and B and you are A. If A is elected, then B is elected, then A elected again. What will happen for you is your func will be called, the context will be canceled, then your func will be called again. The context will always be canceled before a second execution can happen however it is up to you to ensure you have correctly reacted to the canceled context.

Documentation

Index

Constants

View Source
const (
	DefaultNamespace = configs.DefNamespace
	DefaultScope     = configs.DefLeaseScope
	MasterTask       = "master"
)
View Source
const NamePattern = `^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$`

NamePattern is enforced kubernetes on lease names. https://regex101.com/r/9zbjQ0/1 example: kebab-case-with-0-9.seperated-by-dots

Matches a domain-like string that: - Starts and ends with an alphanumeric character (a-z0-9). - Can have multiple segments separated by dots (.). - Each segment must start and end with an alphanumeric character and may contain hyphens in between. - Supports optional subdomains or multiple levels. Examples that match: - example - example.com - sub.domain.com - a1-b2.c3-d4 Examples that don’t match: - .example (starts with a dot) - example. (ends with a dot) - ex..ample (consecutive dots)

Variables

View Source
var (
	ErrBadResourceName = errors.New("bad resource scope or name")
	ErrInvalidState    = errors.New("invalid state")
	ErrLockNotFound    = errors.New("lock not found")

	ErrNoLeader      = errors.New("no leader right now")
	ErrNotFound      = errors.New("resource not found")
	ErrAlreadyLocked = errors.New("resource is already locked")
	ErrNotLockHolder = errors.New("resource locked by someone else")
	ErrNotLocked     = errors.New("resource not locked")
)

Functions

func SanitizeName

func SanitizeName(in string) string

Types

type Driver

type Driver interface {
	Resource(string) Resource
	GetLock(context.Context, ResourceName, PodID) (LockDriver, error)
	GetElection(context.Context, ResourceName, PodID) (ElectionDriver, error)
}

type Dsync

type Dsync struct {
	// contains filtered or unexported fields
}

func New

func New(driver Driver, instance string) Dsync

func (Dsync) NewElection

func (d Dsync) NewElection(ctx context.Context, task string) (Election, error)

NewElection will return an Election if there is an error or not the returned election will simply no-op in case of an error

func (Dsync) NewLock

func (d Dsync) NewLock(ctx context.Context, name string) Lock

NewLock constructs and returns a Lock if there is any error with the Lock itself then the methods on Lock will return them therefore no need to return the error here if you want to check the error yourself use Lock.Err()

func (Dsync) Resource

func (d Dsync) Resource(name string) Resource

func (Dsync) SanitizeName

func (d Dsync) SanitizeName(input string) string

SanitizeName returns a valid resource name the best it can upper case letters will be made lower case special chars will be converted to dashes and dashes will be trimmed from start and end

type Election

type Election struct {
	// contains filtered or unexported fields
}

func NewElection

func NewElection(ctx context.Context, d Driver, task, pod string) Election

func (Election) Err

func (e Election) Err() error

func (Election) GetLeader

func (e Election) GetLeader() string

func (Election) IsLeader

func (e Election) IsLeader() bool

func (Election) Resource

func (e Election) Resource() Resource

func (Election) Stop

func (e Election) Stop()

func (Election) WhenElected

func (e Election) WhenElected(f func(context.Context))

type ElectionDriver

type ElectionDriver interface {
	GetLeader() string
	IsLeader() bool
	WhenElected(f func(context.Context))
	Stop()
}

type ElectionID

type ElectionID = string

type Interface

type Interface interface {
	NewElection(context.Context, string) (Election, error)
	NewLock(context.Context, string) Lock
}

type Lock

type Lock struct {
	// contains filtered or unexported fields
}

func NewLock

func NewLock(ctx context.Context, d Driver, name, pod string) Lock

func (Lock) DoWithLock

func (l Lock) DoWithLock(ctx context.Context, f func() error) error

func (Lock) DoWithTryLock

func (l Lock) DoWithTryLock(ctx context.Context, f func() error) error

func (Lock) Err

func (l Lock) Err() error

func (Lock) Lock

func (l Lock) Lock() error

func (Lock) LockContext

func (l Lock) LockContext(ctx context.Context) error

func (Lock) Resource

func (l Lock) Resource() Resource

func (Lock) TryLock

func (l Lock) TryLock() error

func (Lock) Unlock

func (l Lock) Unlock() error

type LockDriver

type LockDriver interface {
	Lock() error
	LockContext(ctx context.Context) error
	Unlock() error
	TryLock() error
}

type LockID

type LockID = string

type Logger

type Logger interface {
	Info(msg string, args ...any)
	Warn(msg string, args ...any)
	Error(msg string, args ...any)
}

type PodID

type PodID = string

type Resource

type Resource struct {
	Namespace string // optional
	Scope     string // optional
	Name      string
}

func NewResource

func NewResource(name string) Resource

func NewScopedResource

func NewScopedResource(ns, scope, name string) Resource

func ToResource

func ToResource(s string) Resource

func (Resource) ElectionName

func (r Resource) ElectionName() string

func (Resource) Equal

func (r Resource) Equal(r2 Resource) bool

func (Resource) LockName

func (r Resource) LockName() string

func (Resource) String

func (r Resource) String() string

func (Resource) Validate

func (r Resource) Validate() error

type ResourceName

type ResourceName = string

type Term

type Term struct {
	// contains filtered or unexported fields
}

func NewTerm

func NewTerm(ctx context.Context) Term

func (Term) Active

func (t Term) Active() bool

func (Term) Close

func (t Term) Close()

func (Term) Context

func (t Term) Context() context.Context

func (Term) Done

func (t Term) Done() <-chan struct{}

func (Term) Err

func (t Term) Err() error

Directories

Path Synopsis
internal
k8s

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL