vshard_router

package module
v0.0.0-...-2023994 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2024 License: MIT Imports: 12 Imported by: 0

README

Go VShard Router

Go Report Card codecov Go Reference

Translations:

go-vshard-router is a library for sending requests to a sharded tarantool cluster directly, without using tarantool-router. go-vshard-router takes a new approach to creating your cluster

Old cluster schema

graph TD
    subgraph Tarantool Database Cluster
        subgraph Replicaset 1
            Master_001_1
            Replica_001_2
        end

    end

ROUTER1["Tarantool vshard-router 1_1"] --> Master_001_1
ROUTER2["Tarantool vshard-router 1_2"] --> Master_001_1
ROUTER3["Tarantool vshard-router 1_3"] --> Master_001_1
ROUTER1["Tarantool vshard-router 1_1"] --> Replica_001_2
ROUTER2["Tarantool vshard-router 1_2"] --> Replica_001_2
ROUTER3["Tarantool vshard-router 1_3"] --> Replica_001_2

GO["Golang service"]
GO --> ROUTER1
GO --> ROUTER2
GO --> ROUTER3

New cluster schema

graph TD
    subgraph Application Host
        Golang-Service
    end

    Golang-Service --> |iproto| MASTER1
    Golang-Service --> |iproto| REPLICA1
    
    MASTER1["Master 001_1"]
    REPLICA1["Replica 001_2"]
    
    subgraph Tarantool Database Cluster
        subgraph Replicaset 1
            MASTER1
            REPLICA1
        end
    end

    ROUTER1["Tarantool vshard-router(As contorol plane)"]
    ROUTER1 --> MASTER1
    ROUTER1 --> REPLICA1

Getting started

Prerequisites
  • Go: any one of the two latest major releases (we test it with these).
Getting Go-Vshard-Router

With Go module support, simply add the following import

import "github.com/hackallcode/go-vshard-router"

to your code, and then go [build|run|test] will automatically fetch the necessary dependencies.

Otherwise, run the following Go command to install the go-vshard-router package:

$ go get -u github.com/hackallcode/go-vshard-router
Running Go-Vshard-Router

First you need to import Go-Vshard-Router package for using Go-Vshard-Router

package main

import (
  "context"
  "fmt"
  "strconv"
  "time"

  vshardrouter "github.com/hackallcode/go-vshard-router"
  "github.com/hackallcode/go-vshard-router/providers/static"

  "github.com/google/uuid"
  "github.com/tarantool/go-tarantool/v2"
  "github.com/tarantool/go-tarantool/v2/pool"
)

func main() {
  ctx := context.Background()

  directRouter, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{
    DiscoveryTimeout: time.Minute,
    DiscoveryMode:    vshardrouter.DiscoveryModeOn,
    TopologyProvider: static.NewProvider(map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo{
      vshardrouter.ReplicasetInfo{
        Name: "replcaset_1",
        UUID: uuid.New(),
      }: {
        {
          Addr: "127.0.0.1:1001",
          UUID: uuid.New(),
        },
        {
          Addr: "127.0.0.1:1002",
          UUID: uuid.New(),
        },
      },
      vshardrouter.ReplicasetInfo{
        Name: "replcaset_2",
        UUID: uuid.New(),
      }: {
        {
          Addr: "127.0.0.1:2001",
          UUID: uuid.New(),
        },
        {
          Addr: "127.0.0.1:2002",
          UUID: uuid.New(),
        },
      },
    }),
    TotalBucketCount: 128000,
    PoolOpts: tarantool.Opts{
      Timeout: time.Second,
    },
  })
  if err != nil {
    panic(err)
  }

  user := struct {
    ID uint64
  }{
    ID: 123,
  }

  bucketID := vshardrouter.BucketIDStrCRC32(strconv.FormatUint(user.ID, 10), directRouter.RouterBucketCount())

  interfaceResult, getTyped, err := directRouter.RouterCallImpl(
    ctx,
    bucketID,
    vshardrouter.CallOpts{VshardMode: vshardrouter.ReadMode, PoolMode: pool.PreferRO, Timeout: time.Second * 2},
    "storage.api.get_user_info",
    []interface{}{&struct {
      BucketID uint64                 `msgpack:"bucket_id" json:"bucket_id,omitempty"`
      Body     map[string]interface{} `msgpack:"body"`
    }{
      BucketID: bucketID,
      Body: map[string]interface{}{
        "user_id": "123456",
      },
    }},
  )

  info := &struct {
    BirthDay int
  }{}

  err = getTyped(&[]interface{}{info})
  if err != nil {
    panic(err)
  }

  fmt.Printf("interface result: %v", interfaceResult)
  fmt.Printf("get typed result: %v", info)
}
Learn more examples
Customer service

Service with go-vshard-router on top of the tarantool example from the original vshard library using raft

Benchmarks

Topology:

  • 4 replicasets (x2 instances per rs)
  • 4 tarantool proxy
  • 1 golang service
K6

constant VUes scenario: at a load close to production

select

  • go-vshard-router: uncritically worse latency, but 3 times more rps Image alt
  • tarantool-router: (80% cpu, heavy rps kills proxy at 100% cpu) Image alt

Documentation

Index

Constants

View Source
const CallTimeoutMin = time.Second / 2

Variables

View Source
var ErrInvalidConfig = fmt.Errorf("config invalid")
View Source
var Errors = map[int]Error{
	1: {
		Name: "WRONG_BUCKET",
		Msg:  "Cannot perform action with bucket %d, reason: %s",
		Args: []string{"bucket_id", "reason", "destination"},
	},
	2: {
		Name: "NON_MASTER",
		Msg:  "Replica %s is not a master for replicaset %s anymore",
		Args: []string{"replica", "replicaset", "master"},
	},
	3: {
		Name: "BUCKET_ALREADY_EXISTS",
		Msg:  "Bucket %d already exists",
		Args: []string{"bucket_id"},
	},
	4: {
		Name: "NO_SUCH_REPLICASET",
		Msg:  "Replicaset %s not found",
		Args: []string{"replicaset"},
	},
	5: {
		Name: "MOVE_TO_SELF",
		Msg:  "Cannot move: bucket %d is already on replicaset %s",
		Args: []string{"bucket_id", "replicaset"},
	},
	6: {
		Name: "MISSING_MASTER",
		Msg:  "Master is not configured for replicaset %s",
		Args: []string{"replicaset"},
	},
	7: {
		Name: "TRANSFER_IS_IN_PROGRESS",
		Msg:  "Bucket %d is transferring to replicaset %s",
		Args: []string{"bucket_id", "destination"},
	},
	8: {
		Name: "UNREACHABLE_REPLICASET",
		Msg:  "There is no active replicas in replicaset %s",
		Args: []string{"replicaset", "bucket_id"},
	},
	9: {
		Name: "NO_ROUTE_TO_BUCKET",
		Msg:  "Bucket %d cannot be found. Is rebalancing in progress?",
		Args: []string{"bucket_id"},
	},
	10: {
		Name: "NON_EMPTY",
		Msg:  "Cluster is already bootstrapped",
	},
	11: {
		Name: "UNREACHABLE_MASTER",
		Msg:  "Master of replicaset %s is unreachable: %s",
		Args: []string{"replicaset", "reason"},
	},
	12: {
		Name: "OUT_OF_SYNC",
		Msg:  "Replica is out of sync",
	},
	13: {
		Name: "HIGH_REPLICATION_LAG",
		Msg:  "High replication lag: %f",
		Args: []string{"lag"},
	},
	14: {
		Name: "UNREACHABLE_REPLICA",
		Msg:  "Replica %s isn't active",
		Args: []string{"replica"},
	},
	15: {
		Name: "LOW_REDUNDANCY",
		Msg:  "Only one replica is active",
	},
	16: {
		Name: "INVALID_REBALANCING",
		Msg:  "Sending and receiving buckets at same time is not allowed",
	},
	17: {
		Name: "SUBOPTIMAL_REPLICA",
		Msg:  "A current read replica in replicaset %s is not optimal",
		Args: []string{"replicaset"},
	},
	18: {
		Name: "UNKNOWN_BUCKETS",
		Msg:  "%d buckets are not discovered",
		Args: []string{"not_discovered_cnt"},
	},
	19: {
		Name: "REPLICASET_IS_LOCKED",
		Msg:  "Replicaset is locked",
	},
	20: {
		Name: "OBJECT_IS_OUTDATED",
		Msg:  "Object is outdated after module reload/reconfigure. Use new instance.",
	},
	21: {
		Name: "ROUTER_ALREADY_EXISTS",
		Msg:  "Router with name %s already exists",
		Args: []string{"router_name"},
	},
	22: {
		Name: "BUCKET_IS_LOCKED",
		Msg:  "Bucket %d is locked",
		Args: []string{"bucket_id"},
	},
	23: {
		Name: "INVALID_CFG",
		Msg:  "Invalid configuration: %s",
		Args: []string{"reason"},
	},
	24: {
		Name: "BUCKET_IS_PINNED",
		Msg:  "Bucket %d is pinned",
		Args: []string{"bucket_id"},
	},
	25: {
		Name: "TOO_MANY_RECEIVING",
		Msg:  "Too many receiving buckets at once, please, throttle",
	},
	26: {
		Name: "STORAGE_IS_REFERENCED",
		Msg:  "Storage is referenced",
	},
	27: {
		Name: "STORAGE_REF_ADD",
		Msg:  "Can not add a storage ref: %s",
		Args: []string{"reason"},
	},
	28: {
		Name: "STORAGE_REF_USE",
		Msg:  "Can not use a storage ref: %s",
		Args: []string{"reason"},
	},
	29: {
		Name: "STORAGE_REF_DEL",
		Msg:  "Can not delete a storage ref: %s",
		Args: []string{"reason"},
	},
	30: {
		Name: "BUCKET_RECV_DATA_ERROR",
		Msg:  "Can not receive the bucket %s data in space \"%s\" at tuple %s: %s",
		Args: []string{"bucket_id", "space", "tuple", "reason"},
	},
	31: {
		Name: "MULTIPLE_MASTERS_FOUND",
		Msg:  "Found more than one master in replicaset %s on nodes %s and %s",
		Args: []string{"replicaset", "master1", "master2"},
	},
	32: {
		Name: "REPLICASET_IN_BACKOFF",
		Msg:  "Replicaset %s is in backoff, can't take requests right now. Last error was %s",
		Args: []string{"replicaset", "error"},
	},
	33: {
		Name: "STORAGE_IS_DISABLED",
		Msg:  "Storage is disabled: %s",
		Args: []string{"reason"},
	},
	34: {
		Name: "BUCKET_IS_CORRUPTED",
		Msg:  "Bucket %d is corrupted: %s",
		Args: []string{"bucket_id", "reason"},
	},
	35: {
		Name: "ROUTER_IS_DISABLED",
		Msg:  "Router is disabled: %s",
		Args: []string{"reason"},
	},
	36: {
		Name: "BUCKET_GC_ERROR",
		Msg:  "Error during bucket GC: %s",
		Args: []string{"reason"},
	},
	37: {
		Name: "STORAGE_CFG_IS_IN_PROGRESS",
		Msg:  "Configuration of the storage is in progress",
	},
	38: {
		Name: "ROUTER_CFG_IS_IN_PROGRESS",
		Msg:  "Configuration of the router with name %s is in progress",
		Args: []string{"router_name"},
	},
	39: {
		Name: "BUCKET_INVALID_UPDATE",
		Msg:  "Bucket %s update is invalid: %s",
		Args: []string{"bucket_id", "reason"},
	},
	40: {
		Name: "VHANDSHAKE_NOT_COMPLETE",
		Msg:  "Handshake with %s have not been completed yet",
		Args: []string{"replica"},
	},
	41: {
		Name: "INSTANCE_NAME_MISMATCH",
		Msg:  "Mismatch server name: expected \"%s\", but got \"%s\"",
		Args: []string{"expected_name", "actual_name"},
	},
}

Functions

func BucketIDStrCRC32

func BucketIDStrCRC32(shardKey string, totalBucketCount uint64) uint64

func RouterBucketIDMPCRC32

func RouterBucketIDMPCRC32(total uint64, keys ...string)

RouterBucketIDMPCRC32 is not supported now

Types

type BucketStatError

type BucketStatError struct {
	BucketID uint64 `msgpack:"bucket_id"`
	Reason   string `msgpack:"reason"`
	Code     int    `msgpack:"code"`
	Type     string `msgpack:"type"`
	Message  string `msgpack:"message"`
	Name     string `msgpack:"name"`
}

func (BucketStatError) Error

func (bse BucketStatError) Error() string

type BucketStatInfo

type BucketStatInfo struct {
	BucketID uint64 `mapstructure:"id"`
	Status   string `mapstructure:"status"`
}

type CallOpts

type CallOpts struct {
	VshardMode VshardMode // vshard mode in call
	PoolMode   pool.Mode
	Timeout    time.Duration
}

type Config

type Config struct {
	// Providers
	Logger           LogProvider      // Logger is not required
	Metrics          MetricsProvider  // Metrics is not required
	TopologyProvider TopologyProvider // TopologyProvider is required provider

	// Discovery
	DiscoveryTimeout time.Duration // DiscoveryTimeout is timeout between cron discovery job; by default there is no timeout
	DiscoveryMode    DiscoveryMode

	TotalBucketCount uint64
	User             string
	Password         string
	PoolOpts         tarantool.Opts

	NWorkers int32 // todo: rename this, cause NWorkers naming looks strange
}

type DiscoveryMode

type DiscoveryMode int
const (
	// DiscoveryModeOn is cron discovery with cron timeout
	DiscoveryModeOn DiscoveryMode = iota
	DiscoveryModeOnce
)

type EmptyLogger

type EmptyLogger struct{}

func (*EmptyLogger) Debug

func (e *EmptyLogger) Debug(ctx context.Context, msg string)

func (*EmptyLogger) Error

func (e *EmptyLogger) Error(ctx context.Context, msg string)

func (*EmptyLogger) Info

func (e *EmptyLogger) Info(ctx context.Context, msg string)

func (*EmptyLogger) Warn

func (e *EmptyLogger) Warn(ctx context.Context, msg string)

type EmptyMetrics

type EmptyMetrics struct{}

EmptyMetrics is default empty metrics provider you can embed this type and realize just some metrics

func (*EmptyMetrics) CronDiscoveryEvent

func (e *EmptyMetrics) CronDiscoveryEvent(ok bool, duration time.Duration, reason string)

func (*EmptyMetrics) RequestDuration

func (e *EmptyMetrics) RequestDuration(duration time.Duration, ok bool, mapReduce bool)

func (*EmptyMetrics) RetryOnCall

func (e *EmptyMetrics) RetryOnCall(reason string)

type Error

type Error struct {
	Name string
	Msg  string
	Args []string
}

func ErrorByName

func ErrorByName(name string) Error

func (Error) Error

func (e Error) Error() string

type InstanceInfo

type InstanceInfo struct {
	Addr string
	UUID uuid.UUID
}

type LogProvider

type LogProvider interface {
	Info(context.Context, string)
	Debug(context.Context, string)
	Error(context.Context, string)
	Warn(context.Context, string)
}

type MetricsProvider

type MetricsProvider interface {
	CronDiscoveryEvent(ok bool, duration time.Duration, reason string)
	RetryOnCall(reason string)
	RequestDuration(duration time.Duration, ok bool, mapReduce bool)
}

MetricsProvider is an interface for passing library metrics to your prometheus/graphite and other metrics

type Replicaset

type Replicaset struct {
	// contains filtered or unexported fields
}

func (*Replicaset) BucketStat

func (rs *Replicaset) BucketStat(ctx context.Context, bucketID uint64) (BucketStatInfo, error)

func (*Replicaset) ReplicaCall

func (rs *Replicaset) ReplicaCall(
	ctx context.Context,
	opts ReplicasetCallOpts,
	fnc string,
	args interface{},
) (interface{}, StorageResultTypedFunc, error)

ReplicaCall perform function on remote storage link https://github.com/tarantool/vshard/blob/master/vshard/replicaset.lua#L661

func (*Replicaset) String

func (rs *Replicaset) String() string

type ReplicasetCallOpts

type ReplicasetCallOpts struct {
	PoolMode pool.Mode
	Timeout  time.Duration
}

type ReplicasetInfo

type ReplicasetInfo struct {
	Name string
	UUID uuid.UUID
}

func (ReplicasetInfo) String

func (rsi ReplicasetInfo) String() string

type Router

type Router struct {
	// contains filtered or unexported fields
}

func NewRouter

func NewRouter(ctx context.Context, cfg Config) (*Router, error)

func (*Router) BucketDiscovery

func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replicaset, error)

BucketDiscovery search bucket in whole cluster

func (*Router) BucketReset

func (r *Router) BucketReset(bucketID uint64)

func (*Router) BucketResolve

func (r *Router) BucketResolve(ctx context.Context, bucketID uint64) (*Replicaset, error)

BucketResolve resolve bucket id to replicaset

func (*Router) BucketSet

func (r *Router) BucketSet(bucketID uint64, rsID uuid.UUID) (*Replicaset, error)

BucketSet Set a bucket to a replicaset.

func (*Router) DiscoveryAllBuckets

func (r *Router) DiscoveryAllBuckets(ctx context.Context) error

func (*Router) DiscoveryHandleBuckets

func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buckets []uint64)

DiscoveryHandleBuckets arrange downloaded buckets to the route map so as they reference a given replicaset.

func (*Router) RouteMapClean

func (r *Router) RouteMapClean()

func (*Router) RouterBucketCount

func (r *Router) RouterBucketCount() uint64

func (*Router) RouterBucketID

func (r *Router) RouterBucketID(shardKey string) uint64

RouterBucketID return the bucket identifier from the parameter used for sharding Deprecated: RouterBucketID() is deprecated, use RouterBucketIDStrCRC32() RouterBucketIDMPCRC32() instead

func (*Router) RouterBucketIDStrCRC32

func (r *Router) RouterBucketIDStrCRC32(shardKey string) uint64

func (*Router) RouterCallImpl

func (r *Router) RouterCallImpl(ctx context.Context,
	bucketID uint64,
	opts CallOpts,
	fnc string,
	args interface{}) (interface{}, StorageResultTypedFunc, error)

RouterCallImpl Perform shard operation function will restart operation after wrong bucket response until timeout is reached

func (*Router) RouterMapCallRWImpl

func (r *Router) RouterMapCallRWImpl(
	ctx context.Context,
	fnc string,
	args interface{},
	opts CallOpts,
) (map[uuid.UUID]interface{}, error)

RouterMapCallRWImpl perform call function on all masters in the cluster with a guarantee that in case of success it was executed with all buckets being accessible for reads and writes.

func (*Router) RouterRoute

func (r *Router) RouterRoute(ctx context.Context, bucketID uint64) (*Replicaset, error)

RouterRoute get replicaset object by bucket identifier. alias to BucketResolve

func (*Router) RouterRouteAll

func (r *Router) RouterRouteAll() map[uuid.UUID]*Replicaset

RouterRouteAll return map of all replicasets.

func (*Router) Topology

func (r *Router) Topology() TopologyController

type StdoutLogger

type StdoutLogger struct{}

func (*StdoutLogger) Debug

func (e *StdoutLogger) Debug(ctx context.Context, msg string)

func (*StdoutLogger) Error

func (e *StdoutLogger) Error(ctx context.Context, msg string)

func (*StdoutLogger) Info

func (e *StdoutLogger) Info(ctx context.Context, msg string)

func (*StdoutLogger) Warn

func (e *StdoutLogger) Warn(ctx context.Context, msg string)

type StorageCallAssertError

type StorageCallAssertError struct {
	Code     int         `msgpack:"code"`
	BaseType string      `msgpack:"base_type"`
	Type     string      `msgpack:"type"`
	Message  string      `msgpack:"message"`
	Trace    interface{} `msgpack:"trace"`
}

func (StorageCallAssertError) Error

func (s StorageCallAssertError) Error() string

type StorageCallVShardError

type StorageCallVShardError struct {
	BucketID       uint64  `msgpack:"bucket_id" mapstructure:"bucket_id"`
	Reason         string  `msgpack:"reason"`
	Code           int     `msgpack:"code"`
	Type           string  `msgpack:"type"`
	Message        string  `msgpack:"message"`
	Name           string  `msgpack:"name"`
	MasterUUID     *string `msgpack:"master_uuid" mapstructure:"master_uuid"`         // mapstructure cant decode to source uuid type
	ReplicasetUUID *string `msgpack:"replicaset_uuid" mapstructure:"replicaset_uuid"` // mapstructure cant decode to source uuid type
}

func (StorageCallVShardError) Error

func (s StorageCallVShardError) Error() string

type StorageResultTypedFunc

type StorageResultTypedFunc = func(result interface{}) error

type TopologyController

type TopologyController interface {
	AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceInfo) error
	RemoveReplicaset(ctx context.Context, rsID uuid.UUID) []error
	RemoveInstance(ctx context.Context, rsID, instanceID uuid.UUID) error
	AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error
	AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error
}

type TopologyProvider

type TopologyProvider interface {
	// Init should create the current topology at the beginning
	// and change the state during the process of changing the point of receiving the cluster configuration
	Init(t TopologyController) error
	// Close closes all connections if the provider created them
	Close()
}

TopologyProvider is external module that can lookup current topology of cluster it might be etcd/config/consul or smth else

type VshardMode

type VshardMode string
const (
	ReadMode  VshardMode = "read"
	WriteMode VshardMode = "write"
)

func (VshardMode) String

func (c VshardMode) String() string

Directories

Path Synopsis
mocks
providers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL