redisReplicaManager

package module
v1.2.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2023 License: MIT Imports: 13 Imported by: 0

README

redis-replica-manager

Group membership, sharding, replication and request routing manager relying on Redis for coordination.

TL;DR

This library allows building distributed applications where partitioning of work is important, replication is desired and the cost of moving a work partition from site to site is high.

It achieves it's goal by partitioning work into slots and assigning each site (cluster node) a set of slots to be responsible for.

A slot can be assigned to more than one site (replication scenario), yet only one site will be designated a primary for each slot.

The differentiation between a primary and a secondary site role for a slot is determined by the application.

Assignment of slots to sites is determined by Rendezvous hashing (see below for more details).

Guarantees

High Availability

The library allows replication of slots to more than one site.

A site is not allowed to relinquish responsibility of a slot before the slot has been migrated to enough replicas. The only exception to this rule is when a site is determined to be faulting.

Minimum Partition Migration

Removal of a site from the cluster, results in redistribution of the slots which that site was responsible for among other sites.

When a site is added to the cluster, slots which this site is now responsible for will move from other sites to the newly available site.

Application Control

Migration of slots between sites is requested by the library, and executed by the application.

No assumptions are made about whether migration has completed or not by the library.

The application determines when migration has completed, and notifies the library of that fact.

Faulty Sites Detection

A heartbeat and a watchdog timer guarantee that faulting sites are removed from the cluster.

Faulty Slot Handler Detection

If the application detects that a handler for a slot has failed, it should signal the cluster of that fact by calling LocalSiteManager.RemoveFailedSlot(context.Background(), slotId). This will immediately and unconditionally remove the slot from the local site, trigger failover to one of the secondary replicas, and eventually trigger a retry to add the slot to the local site.

Dynamic Routing

The library maintains a routing table that maps slots to a list of sites which is used to route requests for slots to the right site.

Rendezvous hashing

Rendezvous or highest random weight (HRW) hashing is an algorithm that allows clients to achieve distributed agreement on a set of k options out of a possible set of n options. A typical application is when clients need to agree on which sites (or proxies) objects are assigned to.

Rendezvous hashing is both much simpler and more general than consistent hashing, which becomes a special case (for k=1) of rendezvous hashing.

More information: https://en.wikipedia.org/wiki/Rendezvous_hashing

Example

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-redis/redis/v8"
	"github.com/rs/zerolog"
	redisReplicaManager "github.com/zavitax/redis-replica-manager-go"
)

var redisOptions = &redis.Options{
	Addr:     "127.0.0.1:6379",
	Password: "",
	DB:       0,
}

func createReplicaManagerOptions(
	testId string,
	siteId string,
) *redisReplicaManager.ReplicaManagerOptions {
	result := &redisReplicaManager.ReplicaManagerOptions{
		RedisOptions:   redisOptions,
		SiteTimeout:    time.Second * 5,
		RedisKeyPrefix: fmt.Sprintf("{test-redis-replica-manager}::%v", testId),
		SiteID:         siteId,
	}

	return result
}

func createReplicaManagerClient(options *redisReplicaManager.ReplicaManagerOptions) (redisReplicaManager.ReplicaManagerClient, error) {
	return redisReplicaManager.NewRedisReplicaManagerClient(context.TODO(), options)
}

func main() {
	ctx := context.Background()

	options1 := createReplicaManagerOptions("main", "site1")

	client1, _ := createReplicaManagerClient(options1)

	balancerOptions := &redisReplicaManager.ReplicaBalancerOptions{
		TotalSlotsCount:   512,
		SlotReplicaCount:  1,
		MinimumSitesCount: 1,
	}

	balancer1, _ := redisReplicaManager.NewReplicaBalancer(ctx, balancerOptions)

	manager1, _ := redisReplicaManager.NewLocalSiteManager(ctx, &redisReplicaManager.ClusterNodeManagerOptions{
		ReplicaManagerClient: client1,
		ReplicaBalancer:      balancer1,
		RefreshInterval:      time.Second * 15,
		NotifyMissingSlotsHandler: func(ctx context.Context, manager redisReplicaManager.LocalSiteManager, slots *[]uint32) error {
			fmt.Printf("m1: missing slots to be added to local site: %v\n", len(*slots))

			for _, slotId := range *slots {
				// Perform necessary operations to be able to fully serve requests for `slotId`
			
				// Notify the cluster that the slot was added to the local site.
				// This should happen only when the slot is completely ready to be served.
				// Calling `manager.RequestAddSlot()` tells the site manager "I am now ready to serve all requests for `slotId`".
				manager.RequestAddSlot(ctx, slotId)
			}

			return nil
		},
		NotifyRedundantSlotsHandler: func(ctx context.Context, manager redisReplicaManager.LocalSiteManager, slots *[]uint32) error {
			fmt.Printf("m1: redundant slots to be removed from local site: %v\n", len(*slots))

			for _, slotId := range *slots {
				// Ask the cluster manager if we are allowed to remove a redundant slot
				// (if it satisfies minimum replica count on other sites)
				if allowed, _ := manager.RequestRemoveSlot(ctx, slotId); allowed {
					// Slot has been approved for removal by the cluster (and has been removed from the routing table)

					// Only after the cluster approved our request to remove the slot,
					// we can release resources which were allocated to serve requests for `slotId`
					fmt.Printf("m1: allowed to remove slot from local site: %v\n", allowed)
				}
			}

			return nil
		},
		NotifyPrimarySlotsChangedHandler: func(ctx context.Context, manager redisReplicaManager.LocalSiteManager) error {
			slots, _ := manager.GetAllSlotsLocalNodeIsPrimaryFor(ctx)

			fmt.Printf("m1: primary slots changed: %v\n", len(*slots))

			return nil
		},
	})

	slots1, _ := manager1.GetSlotIdentifiers(ctx)

	fmt.Printf("manager1 slots count: %v\n", len(*slots1))

	fmt.Printf("m1: sites for slot 1: %v\n", manager1.GetSlotRouteTable(ctx, 1))
	fmt.Printf("m1: sites for slot 497: %v\n", manager1.GetSlotRouteTable(ctx, 497))

	fmt.Printf("m1: primary site for slot 1: %v\n", manager1.GetSlotPrimarySiteRoute(ctx, 1))
	fmt.Printf("m1: primary site for slot 497: %v\n", manager1.GetSlotPrimarySiteRoute(ctx, 497))

	fmt.Printf("m1: slot for object abcdefg: %v\n", manager1.GetSlotForObject("abcdefg"))

	manager1.Close()
}

Documentation

Index

Constants

View Source
const (
	ROLE_PRIMARY   = "primary"
	ROLE_SECONDARY = "secondary"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ClusterNodeManagerOptions

type ClusterNodeManagerOptions struct {
	ReplicaManagerClient ReplicaManagerClient
	ReplicaBalancer      ReplicaBalancer

	RefreshInterval time.Duration

	NotifyMissingSlotsHandler        func(ctx context.Context, manager LocalSiteManager, slots *[]uint32) error
	NotifyRedundantSlotsHandler      func(ctx context.Context, manager LocalSiteManager, slots *[]uint32) error
	NotifyPrimarySlotsChangedHandler func(ctx context.Context, manager LocalSiteManager) error
}

func (*ClusterNodeManagerOptions) Validate

func (o *ClusterNodeManagerOptions) Validate() error

type LocalSiteManager added in v1.0.0

type LocalSiteManager interface {
	RequestAddSlot(ctx context.Context, slotId uint32) (bool, error)

	RequestRemoveSlot(ctx context.Context, slotId uint32) (bool, error)
	RemoveFailedSlot(ctx context.Context, slotId uint32) error

	GetSlotIdentifiers(ctx context.Context) (*[]uint32, error)

	GetSlotForObject(objectId string) uint32

	GetSlotRouteTable(ctx context.Context, slotId uint32) *[]*RouteTableEntry
	GetSlotPrimarySiteRoute(ctx context.Context, slotId uint32) *RouteTableEntry

	IsLocalSitePrimaryForSlot(ctx context.Context, slotId uint32) (bool, error)
	GetAllSlotsLocalSiteIsPrimaryFor(ctx context.Context) (*[]uint32, error)

	Close() error
}

func NewLocalSiteManager added in v1.0.0

func NewLocalSiteManager(ctx context.Context, opts *ClusterNodeManagerOptions) (LocalSiteManager, error)

type RedisReplicaManagerSite

type RedisReplicaManagerSite struct {
	SiteID string
}

type RedisReplicaManagerSiteSlot

type RedisReplicaManagerSiteSlot struct {
	SiteID string
	SlotID string
	Role   string
}

type RedisReplicaManagerUpdate

type RedisReplicaManagerUpdate struct {
	Event  string `json:"event"`
	SlotID string `json:"slot,omitempty"`
	SiteID string `json:"site"`
	Reason string `json:"reason"`
	Role   string `json:"role,omitempty"`
}

type RedisReplicaManagerUpdateFunc

type RedisReplicaManagerUpdateFunc func(ctx context.Context, update *RedisReplicaManagerUpdate) error

type ReplicaBalancer

type ReplicaBalancer interface {
	AddSite(ctx context.Context, siteId string) error
	RemoveSite(ctx context.Context, siteId string) error

	GetSites() *[]string

	GetTargetSlotsForSite(ctx context.Context, siteId string) *[]uint32
	GetSlotSites(ctx context.Context, slotId uint32) *[]string

	GetTotalSitesCount() uint32
	GetTotalSlotsCount() uint32
	GetSlotReplicaCount() uint32

	GetSlotForObject(objectId string) uint32
}

func NewReplicaBalancer

func NewReplicaBalancer(ctx context.Context, opts *ReplicaBalancerOptions) (ReplicaBalancer, error)

type ReplicaBalancerOptions

type ReplicaBalancerOptions struct {
	TotalSlotsCount   int
	SlotReplicaCount  int
	MinimumSitesCount int
}

func (*ReplicaBalancerOptions) Validate

func (o *ReplicaBalancerOptions) Validate() error

type ReplicaManagerClient

type ReplicaManagerClient interface {
	AddSlot(ctx context.Context, slotId string) error
	RemoveSlot(ctx context.Context, slotId string, minReplicaCount int, reason string) error
	RemoveFailedSlot(ctx context.Context, slotId string, minReplicaCount int) error

	GetSlot(ctx context.Context, slotId string) (*RedisReplicaManagerSiteSlot, error)
	GetSlots(ctx context.Context) (*[]*RedisReplicaManagerSiteSlot, error)

	GetSlotsRouting(ctx context.Context) (*[]*RedisReplicaManagerSiteSlot, error)

	GetSiteID() string

	GetLiveSites(ctx context.Context) (*[]*RedisReplicaManagerSite, error)

	Channel() <-chan *RedisReplicaManagerUpdate

	Close() error
}

func NewRedisReplicaManagerClient

func NewRedisReplicaManagerClient(ctx context.Context, options *ReplicaManagerOptions) (ReplicaManagerClient, error)

type ReplicaManagerClientProviderOptions

type ReplicaManagerClientProviderOptions struct {
	SiteID                    string
	UpdateNotificationHandler RedisReplicaManagerUpdateFunc
}

type ReplicaManagerOptions

type ReplicaManagerOptions struct {
	RedisOptions              *redis.Options
	SiteID                    string
	SiteTimeout               time.Duration
	RedisKeyPrefix            string
	UpdateNotificationHandler RedisReplicaManagerUpdateFunc
	ManualHeartbeat           bool
}

func (*ReplicaManagerOptions) Validate

func (o *ReplicaManagerOptions) Validate() error

type RouteTableEntry

type RouteTableEntry struct {
	SlotID uint32
	SiteID string
	Role   string
}

Directories

Path Synopsis
cmd
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL