vshard_router

package module
v1.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2024 License: MIT Imports: 16 Imported by: 0

README

Go VShard Router

логотип go vshard router

Go Reference Actions Status Go Report Card codecov License

Translations:

go-vshard-router is a library for sending requests to a sharded tarantool cluster directly, without using tarantool-router. This library based on tarantool vhsard library router. go-vshard-router takes a new approach to creating your cluster

Old cluster schema

graph TD
    subgraph Tarantool Database Cluster
        subgraph Replicaset 1
            Master_001_1
            Replica_001_2
        end

    end

ROUTER1["Tarantool vshard-router 1_1"] --> Master_001_1
ROUTER2["Tarantool vshard-router 1_2"] --> Master_001_1
ROUTER3["Tarantool vshard-router 1_3"] --> Master_001_1
ROUTER1["Tarantool vshard-router 1_1"] --> Replica_001_2
ROUTER2["Tarantool vshard-router 1_2"] --> Replica_001_2
ROUTER3["Tarantool vshard-router 1_3"] --> Replica_001_2

GO["Golang service"]
GO --> ROUTER1
GO --> ROUTER2
GO --> ROUTER3

New cluster schema

graph TD
    subgraph Application Host
        Golang-Service
    end

    Golang-Service --> |iproto| MASTER1
    Golang-Service --> |iproto| REPLICA1
    
    MASTER1["Master 001_1"]
    REPLICA1["Replica 001_2"]
    
    subgraph Tarantool Database Cluster
        subgraph Replicaset 1
            MASTER1
            REPLICA1
        end
    end

    ROUTER1["Tarantool vshard-router(As contorol plane)"]
    ROUTER1 --> MASTER1
    ROUTER1 --> REPLICA1

Getting started

Prerequisites
  • Go: any one of the two latest major releases (we test it with these).
Getting Go-Vshard-Router

With Go module support, simply add the following import

import "github.com/tarantool/go-vshard-router"

to your code, and then go [build|run|test] will automatically fetch the necessary dependencies.

Otherwise, run the following Go command to install the go-vshard-router package:

$ go get -u github.com/tarantool/go-vshard-router
Running Go-Vshard-Router

First you need to import Go-Vshard-Router package for using Go-Vshard-Router

package main

import (
  "context"
  "fmt"
  "strconv"
  "time"

  vshardrouter "github.com/tarantool/go-vshard-router"
  "github.com/tarantool/go-vshard-router/providers/static"

  "github.com/google/uuid"
  "github.com/tarantool/go-tarantool/v2"
  "github.com/tarantool/go-tarantool/v2/pool"
)

func main() {
  ctx := context.Background()

  directRouter, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{
    DiscoveryTimeout: time.Minute,
    DiscoveryMode:    vshardrouter.DiscoveryModeOn,
    TopologyProvider: static.NewProvider(map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo{
      vshardrouter.ReplicasetInfo{
        Name: "replcaset_1",
        UUID: uuid.New(),
      }: {
        {
          Addr: "127.0.0.1:1001",
          UUID: uuid.New(),
        },
        {
          Addr: "127.0.0.1:1002",
          UUID: uuid.New(),
        },
      },
      vshardrouter.ReplicasetInfo{
        Name: "replcaset_2",
        UUID: uuid.New(),
      }: {
        {
          Addr: "127.0.0.1:2001",
          UUID: uuid.New(),
        },
        {
          Addr: "127.0.0.1:2002",
          UUID: uuid.New(),
        },
      },
    }),
    TotalBucketCount: 128000,
    PoolOpts: tarantool.Opts{
      Timeout: time.Second,
    },
  })
  if err != nil {
    panic(err)
  }

  user := struct {
    ID uint64
  }{
    ID: 123,
  }

  bucketID := vshardrouter.BucketIDStrCRC32(strconv.FormatUint(user.ID, 10), directRouter.RouterBucketCount())

  interfaceResult, getTyped, err := directRouter.RouterCallImpl(
    ctx,
    bucketID,
    vshardrouter.CallOpts{VshardMode: vshardrouter.ReadMode, PoolMode: pool.PreferRO, Timeout: time.Second * 2},
    "storage.api.get_user_info",
    []interface{}{&struct {
      BucketID uint64                 `msgpack:"bucket_id" json:"bucket_id,omitempty"`
      Body     map[string]interface{} `msgpack:"body"`
    }{
      BucketID: bucketID,
      Body: map[string]interface{}{
        "user_id": "123456",
      },
    }},
  )

  info := &struct {
    BirthDay int
  }{}

  err = getTyped(&[]interface{}{info})
  if err != nil {
    panic(err)
  }

  fmt.Printf("interface result: %v", interfaceResult)
  fmt.Printf("get typed result: %v", info)
}
Providers

You can use topology (configuration) providers as the source of router configuration.
Currently, the following providers are supported:

  • etcd (for configurations similar to moonlibs/config in etcd v2 for Tarantool versions below 3)
  • static (for specifying configuration directly in the code for ease of testing)
  • viper
    • etcd v3
    • consul
    • files
Learn more examples
Quick Start

Learn with th Quick Start, which include examples and theory.

Customer service

Service with go-vshard-router on top of the tarantool example from the original vshard library using raft

Benchmarks

Go Bench
Benchmark Runs Time (ns/op) Memory (B/op) Allocations (allocs/op)
BenchmarkCallSimpleInsert_GO-12 14216 81118 1419 29
BenchmarkCallSimpleInsert_Lua-12 9580 123307 1131 19
BenchmarkCallSimpleSelect_GO-12 18832 65190 1879 38
BenchmarkCallSimpleSelect_Lua-12 9963 104781 1617 28
K6

Topology:

  • 4 replicasets (x2 instances per rs)
  • 4 tarantool proxy
  • 1 golang service

constant VUes scenario: at a load close to production

select

  • go-vshard-router: uncritically worse latency, but 3 times more rps Image alt
  • tarantool-router: (80% cpu, heavy rps kills proxy at 100% cpu) Image alt

Documentation

Index

Constants

View Source
const (
	VShardErrCodeWrongBucket            = 1
	VShardErrCodeNonMaster              = 2
	VShardErrCodeBucketAlreadyExists    = 3
	VShardErrCodeNoSuchReplicaset       = 4
	VShardErrCodeMoveToSelf             = 5
	VShardErrCodeMissingMaster          = 6
	VShardErrCodeTransferIsInProgress   = 7
	VShardErrCodeUnreachableReplicaset  = 8
	VShardErrCodeNoRouteToBucket        = 9
	VShardErrCodeNonEmpty               = 10
	VShardErrCodeUnreachableMaster      = 11
	VShardErrCodeOutOfSync              = 12
	VShardErrCodeHighReplicationLag     = 13
	VShardErrCodeUnreachableReplica     = 14
	VShardErrCodeLowRedundancy          = 15
	VShardErrCodeInvalidRebalancing     = 16
	VShardErrCodeSuboptimalReplica      = 17
	VShardErrCodeUnknownBuckets         = 18
	VShardErrCodeReplicasetIsLocked     = 19
	VShardErrCodeObjectIsOutdated       = 20
	VShardErrCodeRouterAlreadyExists    = 21
	VShardErrCodeBucketIsLocked         = 22
	VShardErrCodeInvalidCfg             = 23
	VShardErrCodeBucketIsPinned         = 24
	VShardErrCodeTooManyReceiving       = 25
	VShardErrCodeStorageIsReferenced    = 26
	VShardErrCodeStorageRefAdd          = 27
	VShardErrCodeStorageRefUse          = 28
	VShardErrCodeStorageRefDel          = 29
	VShardErrCodeBucketRecvDataError    = 30
	VShardErrCodeMultipleMastersFound   = 31
	VShardErrCodeReplicasetInBackoff    = 32
	VShardErrCodeStorageIsDisabled      = 33
	VShardErrCodeBucketIsCorrupted      = 34
	VShardErrCodeRouterIsDisabled       = 35
	VShardErrCodeBucketGCError          = 36
	VShardErrCodeStorageCfgIsInProgress = 37
	VShardErrCodeRouterCfgIsInProgress  = 38
	VShardErrCodeBucketInvalidUpdate    = 39
	VShardErrCodeVhandshakeNotComplete  = 40
	VShardErrCodeInstanceNameMismatch   = 41
)

VShard error codes

View Source
const (
	VShardErrNameWrongBucket            = "WRONG_BUCKET"
	VShardErrNameNonMaster              = "NON_MASTER"
	VShardErrNameBucketAlreadyExists    = "BUCKET_ALREADY_EXISTS"
	VShardErrNameNoSuchReplicaset       = "NO_SUCH_REPLICASET"
	VShardErrNameMoveToSelf             = "MOVE_TO_SELF"
	VShardErrNameMissingMaster          = "MISSING_MASTER"
	VShardErrNameTransferIsInProgress   = "TRANSFER_IS_IN_PROGRESS"
	VShardErrNameUnreachableReplicaset  = "UNREACHABLE_REPLICASET"
	VShardErrNameNoRouteToBucket        = "NO_ROUTE_TO_BUCKET"
	VShardErrNameNonEmpty               = "NON_EMPTY"
	VShardErrNameUnreachableMaster      = "UNREACHABLE_MASTER"
	VShardErrNameOutOfSync              = "OUT_OF_SYNC"
	VShardErrNameHighReplicationLag     = "HIGH_REPLICATION_LAG"
	VShardErrNameUnreachableReplica     = "UNREACHABLE_REPLICA"
	VShardErrNameLowRedundancy          = "LOW_REDUNDANCY"
	VShardErrNameInvalidRebalancing     = "INVALID_REBALANCING"
	VShardErrNameSuboptimalReplica      = "SUBOPTIMAL_REPLICA"
	VShardErrNameUnknownBuckets         = "UNKNOWN_BUCKETS"
	VShardErrNameReplicasetIsLocked     = "REPLICASET_IS_LOCKED"
	VShardErrNameObjectIsOutdated       = "OBJECT_IS_OUTDATED"
	VShardErrNameRouterAlreadyExists    = "ROUTER_ALREADY_EXISTS"
	VShardErrNameBucketIsLocked         = "BUCKET_IS_LOCKED"
	VShardErrNameInvalidCfg             = "INVALID_CFG"
	VShardErrNameBucketIsPinned         = "BUCKET_IS_PINNED"
	VShardErrNameTooManyReceiving       = "TOO_MANY_RECEIVING"
	VShardErrNameStorageIsReferenced    = "STORAGE_IS_REFERENCED"
	VShardErrNameStorageRefAdd          = "STORAGE_REF_ADD"
	VShardErrNameStorageRefUse          = "STORAGE_REF_USE"
	VShardErrNameStorageRefDel          = "STORAGE_REF_DEL"
	VShardErrNameBucketRecvDataError    = "BUCKET_RECV_DATA_ERROR"
	VShardErrNameMultipleMastersFound   = "MULTIPLE_MASTERS_FOUND"
	VShardErrNameReplicasetInBackoff    = "REPLICASET_IN_BACKOFF"
	VShardErrNameStorageIsDisabled      = "STORAGE_IS_DISABLED"
	VShardErrNameBucketIsCorrupted      = "BUCKET_IS_CORRUPTED"
	VShardErrNameRouterIsDisabled       = "ROUTER_IS_DISABLED"
	VShardErrNameBucketGCError          = "BUCKET_GC_ERROR"
	VShardErrNameStorageCfgIsInProgress = "STORAGE_CFG_IS_IN_PROGRESS"
	VShardErrNameRouterCfgIsInProgress  = "ROUTER_CFG_IS_IN_PROGRESS"
	VShardErrNameBucketInvalidUpdate    = "BUCKET_INVALID_UPDATE"
	VShardErrNameVhandshakeNotComplete  = "VHANDSHAKE_NOT_COMPLETE"
	VShardErrNameInstanceNameMismatch   = "INSTANCE_NAME_MISMATCH"
)

VShard error names

Variables

View Source
var (
	ErrReplicasetExists    = fmt.Errorf("replicaset already exists")
	ErrReplicasetNotExists = fmt.Errorf("replicaset not exists")
)
View Source
var (
	ErrInvalidConfig       = fmt.Errorf("config invalid")
	ErrInvalidInstanceInfo = fmt.Errorf("invalid instance info")
	ErrTopologyProvider    = fmt.Errorf("got error from topology provider")
)

Functions

func BucketIDStrCRC32

func BucketIDStrCRC32(shardKey string, totalBucketCount uint64) uint64

func CalculateEtalonBalance

func CalculateEtalonBalance(replicasets []Replicaset, bucketCount uint64) error

CalculateEtalonBalance computes the ideal bucket count for each replicaset. This iterative algorithm seeks the optimal balance within a cluster by calculating the ideal bucket count for each replicaset at every step. If the ideal count cannot be achieved due to pinned buckets, the algorithm makes a best effort to approximate balance by ignoring the replicaset with pinned buckets and its associated pinned count. After each iteration, a new balance is recalculated. However, this can lead to scenarios where the conditions are still unmet; ignoring pinned buckets in overloaded replicasets can reduce the ideal bucket count in others, potentially causing new values to fall below their pinned count.

At each iteration, the algorithm either concludes or disregards at least one new overloaded replicaset. Therefore, its time complexity is O(N^2), where N is the number of replicasets. based on https://github.com/tarantool/vshard/blob/master/vshard/replicaset.lua#L1358

func RouterBucketIDMPCRC32

func RouterBucketIDMPCRC32(total uint64, keys ...string)

RouterBucketIDMPCRC32 is not implemented yet

func RouterMapCallRW

func RouterMapCallRW[T any](r *Router, ctx context.Context,
	fnc string, args interface{}, opts RouterMapCallRWOptions,
) (map[uuid.UUID]T, error)

RouterMapCallRW is a consistent Map-Reduce. The given function is called on all masters in the cluster with a guarantee that in case of success it was executed with all buckets being accessible for reads and writes. T is a return type of user defined function 'fnc'. We define it as a distinct function, not a Router method, because golang limitations, see: https://github.com/golang/go/issues/49085.

Types

type BucketStatInfo

type BucketStatInfo struct {
	BucketID uint64 `msgpack:"id"`
	Status   string `msgpack:"status"`
}

func (*BucketStatInfo) DecodeMsgpack

func (bsi *BucketStatInfo) DecodeMsgpack(d *msgpack.Decoder) error

tnt vshard storage returns map with 'int' keys for bucketStatInfo, example: map[id:48 status:active 1:48 2:active]. But msgpackv5 supports only string keys when decoding maps into structs, see issue: https://github.com/vmihailenco/msgpack/issues/372 To workaround this we decode BucketStatInfo manually. When the issue above will be resolved, this code can be (and should be) deleted.

type BucketsSearchMode

type BucketsSearchMode int

BucketsSearchMode a type, that used to define policy for BucketDiscovery method. See type Config for further details.

const (
	// BucketsSearchLegacy implements the same logic as lua router:
	// send bucket_stat request to every replicaset,
	// return a response immediately if any of them succeed.
	BucketsSearchLegacy BucketsSearchMode = iota
	// BucketsSearchBatchedQuick and BucketsSearchBatchedFull implement another logic:
	// send buckets_discovery request to every replicaset with from=bucketID,
	// seek our bucketID in their responses.
	// Additionally, store other bucketIDs in the route map.
	// BucketsSearchBatchedQuick stops iterating over replicasets responses as soon as our bucketID is found.
	BucketsSearchBatchedQuick
	// BucketsSearchBatchedFull implements the same logic as BucketsSearchBatchedQuick,
	// but doesn't stop iterating over replicasets responses as soon as our bucketID is found.
	// Instead, it always iterates over all replicasets responses even bucketID is found.
	BucketsSearchBatchedFull
)

type CallOpts

type CallOpts struct {
	VshardMode VshardMode // vshard mode in call
	PoolMode   pool.Mode
	Timeout    time.Duration
}

type CallRequest

type CallRequest struct {
	// contains filtered or unexported fields
}

CallRequest helps you to create a call request object for execution by a Connection.

func NewCallRequest

func NewCallRequest(function string) *CallRequest

NewCallRequest returns a new empty CallRequest.

func (*CallRequest) Args

func (req *CallRequest) Args(args interface{}) *CallRequest

Args sets the args for the eval request. Note: default value is empty.

func (*CallRequest) BucketID

func (req *CallRequest) BucketID(bucketID uint64) *CallRequest

BucketID method that sets the bucketID for your request. You can ignore this parameter if you have a bucketGetter. However, this method has a higher priority.

func (*CallRequest) Context

func (req *CallRequest) Context(ctx context.Context) *CallRequest

Context sets a passed context to the request.

type CallResponse

type CallResponse struct {
	// contains filtered or unexported fields
}

CallResponse is a backwards-compatible structure with go-tarantool for easier replacement.

func (*CallResponse) Get

func (resp *CallResponse) Get() ([]interface{}, error)

Get implementation now works synchronously for response. The interface was created purely for convenient migration to go-vshard-router from go-tarantool.

func (*CallResponse) GetTyped

func (resp *CallResponse) GetTyped(result interface{}) error

GetTyped waits synchronously for response and calls msgpack.Decoder.Decode(result) if no error happens.

type Config

type Config struct {
	// Providers
	// Loggerf injects a custom logger. By default there is no logger is used.
	Loggerf          LogfProvider     // Loggerf is not required
	Metrics          MetricsProvider  // Metrics is not required
	TopologyProvider TopologyProvider // TopologyProvider is required provider

	// Discovery
	// DiscoveryTimeout is timeout between cron discovery job; by default there is no timeout.
	DiscoveryTimeout time.Duration
	DiscoveryMode    DiscoveryMode
	// DiscoveryWorkStep is a pause between calling buckets_discovery on storage
	// in buckets discovering logic. Default is 10ms.
	DiscoveryWorkStep time.Duration

	// BucketsSearchMode defines policy for BucketDiscovery method.
	// Default value is BucketsSearchLegacy.
	// See BucketsSearchMode constants for more detail.
	BucketsSearchMode BucketsSearchMode

	TotalBucketCount uint64
	User             string
	Password         string
	PoolOpts         tarantool.Opts

	// BucketGetter is an optional argument.
	// You can specify a function that will receive the bucket id from the context.
	// This is useful if you use middleware that inserts the calculated bucket id into the request context.
	BucketGetter func(ctx context.Context) uint64
	// RequestTimeout timeout for requests to Tarantool.
	// Don't rely on using this timeout.
	// This is the difference between the timeout of the library itself
	// that is, our retry timeout if the buckets, for example, move.
	// Currently, it only works for sugar implementations .
	RequestTimeout time.Duration
}

type DiscoveryMode

type DiscoveryMode int
const (
	// DiscoveryModeOn is cron discovery with cron timeout
	DiscoveryModeOn DiscoveryMode = iota
	DiscoveryModeOnce
)

type EmptyMetrics

type EmptyMetrics struct{}

EmptyMetrics is default empty metrics provider you can embed this type and realize just some metrics

func (*EmptyMetrics) CronDiscoveryEvent

func (e *EmptyMetrics) CronDiscoveryEvent(_ bool, _ time.Duration, _ string)

func (*EmptyMetrics) RequestDuration

func (e *EmptyMetrics) RequestDuration(_ time.Duration, _ bool, _ bool)

func (*EmptyMetrics) RetryOnCall

func (e *EmptyMetrics) RetryOnCall(_ string)

type InstanceInfo

type InstanceInfo struct {
	// Name is human-readable id for instance
	// Starting with tarantool 3.0, the definition is made into a human-readable name,
	// so far it is not used directly inside the library
	Name string
	Addr string
	UUID uuid.UUID
}

func (InstanceInfo) String

func (ii InstanceInfo) String() string

func (InstanceInfo) Validate

func (ii InstanceInfo) Validate() error

type LogfProvider

type LogfProvider interface {
	Debugf(ctx context.Context, format string, v ...any)
	Infof(ctx context.Context, format string, v ...any)
	Warnf(ctx context.Context, format string, v ...any)
	Errorf(ctx context.Context, format string, v ...any)
}

LogfProvider an interface to inject a custom logger.

type MetricsProvider

type MetricsProvider interface {
	CronDiscoveryEvent(ok bool, duration time.Duration, reason string)
	RetryOnCall(reason string)
	RequestDuration(duration time.Duration, ok bool, mapReduce bool)
}

MetricsProvider is an interface for passing library metrics to your prometheus/graphite and other metrics

type Replicaset

type Replicaset struct {
	EtalonBucketCount uint64
	// contains filtered or unexported fields
}

func (*Replicaset) BucketForceCreate

func (rs *Replicaset) BucketForceCreate(ctx context.Context, firstBucketID, count uint64) error

func (*Replicaset) BucketStat

func (rs *Replicaset) BucketStat(ctx context.Context, bucketID uint64) (BucketStatInfo, error)

func (*Replicaset) BucketsCount

func (rs *Replicaset) BucketsCount(ctx context.Context) (uint64, error)

func (*Replicaset) CallAsync

func (rs *Replicaset) CallAsync(ctx context.Context, opts ReplicasetCallOpts, fnc string, args interface{}) *tarantool.Future

CallAsync sends async request to remote storage

func (*Replicaset) ReplicaCall

func (rs *Replicaset) ReplicaCall(
	ctx context.Context,
	opts ReplicasetCallOpts,
	fnc string,
	args interface{},
) (interface{}, StorageResultTypedFunc, error)

ReplicaCall perform function on remote storage link https://github.com/tarantool/vshard/blob/99ceaee014ea3a67424c2026545838e08d69b90c/vshard/replicaset.lua#L661 Deprecated: ReplicaCall is deprecated, because looks like it has a little bit broken interface. See https://github.com/tarantool/go-vshard-router/issues/42. Use CallAsync instead.

func (*Replicaset) String

func (rs *Replicaset) String() string

type ReplicasetCallOpts

type ReplicasetCallOpts struct {
	PoolMode pool.Mode
	Timeout  time.Duration
}

type ReplicasetInfo

type ReplicasetInfo struct {
	Name             string
	UUID             uuid.UUID
	Weight           float64
	PinnedCount      uint64
	IgnoreDisbalance bool
}

func (ReplicasetInfo) String

func (rsi ReplicasetInfo) String() string

type Router

type Router struct {
	// contains filtered or unexported fields
}

func NewRouter

func NewRouter(ctx context.Context, cfg Config) (*Router, error)

func (*Router) AddInstance

func (r *Router) AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceInfo) error

func (*Router) AddReplicaset

func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error

func (*Router) AddReplicasets

func (r *Router) AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error

func (*Router) BucketDiscovery

func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replicaset, error)

BucketDiscovery search bucket in whole cluster

func (*Router) BucketReset

func (r *Router) BucketReset(bucketID uint64)

func (*Router) BucketResolve

func (r *Router) BucketResolve(ctx context.Context, bucketID uint64) (*Replicaset, error)

BucketResolve resolve bucket id to replicaset

func (*Router) BucketSet

func (r *Router) BucketSet(bucketID uint64, rsID uuid.UUID) (*Replicaset, error)

BucketSet Set a bucket to a replicaset.

func (*Router) Call

func (r *Router) Call(ctx context.Context, bucketID uint64, mode VshardRouterCallMode,
	fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error)

Call calls the function identified by 'fnc' on the shard storing the bucket identified by 'bucket_id'.

func (*Router) CallBRE

func (r *Router) CallBRE(ctx context.Context, bucketID uint64,
	fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error)

CallBRE is an alias for Call with VshardRouterCallModeBRE.

func (*Router) CallBRO

func (r *Router) CallBRO(ctx context.Context, bucketID uint64,
	fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error)

CallBRO is an alias for Call with VshardRouterCallModeBRO.

func (*Router) CallRE

func (r *Router) CallRE(ctx context.Context, bucketID uint64,
	fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error)

CallRE is an alias for Call with VshardRouterCallModeRE.

func (*Router) CallRO

func (r *Router) CallRO(ctx context.Context, bucketID uint64,
	fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error)

CallRO is an alias for Call with VshardRouterCallModeRO.

func (*Router) CallRW

func (r *Router) CallRW(ctx context.Context, bucketID uint64,
	fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error)

CallRW is an alias for Call with VshardRouterCallModeRW.

func (*Router) ClusterBootstrap

func (r *Router) ClusterBootstrap(ctx context.Context, ifNotBootstrapped bool) error

ClusterBootstrap initializes the cluster by bootstrapping the necessary buckets across the available replicasets. It checks the current state of each replicaset and creates buckets if required. The function takes a context for managing cancellation and deadlines, and a boolean parameter ifNotBootstrapped to control error handling. If ifNotBootstrapped is true, the function will log any errors encountered during the bootstrapping process but will not halt execution; instead, it will return the last error encountered. If ifNotBootstrapped is false, any error will result in an immediate return, ensuring that the operation either succeeds fully or fails fast.

func (*Router) DiscoveryAllBuckets

func (r *Router) DiscoveryAllBuckets(ctx context.Context) error

func (*Router) DiscoveryHandleBuckets

func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buckets []uint64)

DiscoveryHandleBuckets arrange downloaded buckets to the route map so as they reference a given replicaset.

func (*Router) Do

func (r *Router) Do(req *CallRequest, userMode pool.Mode) *CallResponse

Do perform a request synchronously on the connection. It is important that the logic of this method is different from go-tarantool.

func (*Router) RemoveInstance

func (r *Router) RemoveInstance(ctx context.Context, rsID, instanceID uuid.UUID) error

func (*Router) RemoveReplicaset

func (r *Router) RemoveReplicaset(ctx context.Context, rsID uuid.UUID) []error

func (*Router) RouteMapClean

func (r *Router) RouteMapClean()

func (*Router) RouterBucketCount

func (r *Router) RouterBucketCount() uint64

func (*Router) RouterBucketID

func (r *Router) RouterBucketID(shardKey string) uint64

RouterBucketID return the bucket identifier from the parameter used for sharding Deprecated: RouterBucketID() is deprecated, use RouterBucketIDStrCRC32() RouterBucketIDMPCRC32() instead

func (*Router) RouterBucketIDStrCRC32

func (r *Router) RouterBucketIDStrCRC32(shardKey string) uint64

func (*Router) RouterCallImpl

func (r *Router) RouterCallImpl(ctx context.Context,
	bucketID uint64,
	opts CallOpts,
	fnc string,
	args interface{}) (interface{}, StorageResultTypedFunc, error)

RouterCallImpl Perform shard operation function will restart operation after wrong bucket response until timeout is reached Deprecated: RouterCallImpl is deprecated. See https://github.com/tarantool/go-vshard-router/issues/110. Use Call method with RO, RW, RE, BRO, BRE modes instead.

func (*Router) RouterMapCallRWImpl

func (r *Router) RouterMapCallRWImpl(
	ctx context.Context,
	fnc string,
	args interface{},
	opts CallOpts,
) (map[uuid.UUID]interface{}, error)

RouterMapCallRWImpl perform call function on all masters in the cluster with a guarantee that in case of success it was executed with all buckets being accessible for reads and writes. Deprecated: RouterMapCallRWImpl is deprecated. Use more general RouterMapCallRW instead.

func (*Router) RouterRoute

func (r *Router) RouterRoute(ctx context.Context, bucketID uint64) (*Replicaset, error)

RouterRoute get replicaset object by bucket identifier. alias to BucketResolve

func (*Router) RouterRouteAll

func (r *Router) RouterRouteAll() map[uuid.UUID]*Replicaset

RouterRouteAll return map of all replicasets.

func (*Router) Topology

func (r *Router) Topology() TopologyController

type RouterMapCallRWOptions

type RouterMapCallRWOptions struct {
	// Timeout defines timeout for RouterMapCallRW.
	Timeout time.Duration
}

RouterMapCallRWOptions sets options for RouterMapCallRW.

type StdoutLogLevel

type StdoutLogLevel int

StdoutLogLevel is a type to control log level for StdoutLoggerf.

const (
	// StdoutLogDefault is equal to default value of StdoutLogLevel. Acts like StdoutLogInfo.
	StdoutLogDefault StdoutLogLevel = iota
	// StdoutLogDebug enables debug or higher level logs for StdoutLoggerf
	StdoutLogDebug
	// StdoutLogInfo enables only info or higher level logs for StdoutLoggerf
	StdoutLogInfo
	// StdoutLogWarn enables only warn or higher level logs for StdoutLoggerf
	StdoutLogWarn
	// StdoutLogError enables error level logs for StdoutLoggerf
	StdoutLogError
)

type StdoutLoggerf

type StdoutLoggerf struct {
	// LogLevel controls log level to print, see StdoutLogLevel constants for details.
	LogLevel StdoutLogLevel
}

StdoutLoggerf a logger that prints into stderr

func (StdoutLoggerf) Debugf

func (s StdoutLoggerf) Debugf(_ context.Context, format string, v ...any)

Debugf implements Debugf method for LogfProvider interface

func (StdoutLoggerf) Errorf

func (s StdoutLoggerf) Errorf(_ context.Context, format string, v ...any)

Errorf implements Errorf method for LogfProvider interface

func (StdoutLoggerf) Infof

func (s StdoutLoggerf) Infof(_ context.Context, format string, v ...any)

Infof implements Infof method for LogfProvider interface

func (StdoutLoggerf) Warnf

func (s StdoutLoggerf) Warnf(_ context.Context, format string, v ...any)

Warnf implements Warnf method for LogfProvider interface

type StorageCallVShardError

type StorageCallVShardError struct {
	BucketID uint64 `msgpack:"bucket_id"`
	Reason   string `msgpack:"reason"`
	Code     int    `msgpack:"code"`
	Type     string `msgpack:"type"`
	Message  string `msgpack:"message"`
	Name     string `msgpack:"name"`
	// These 3 fields below are send as string by vshard storage, so we decode them into string, not uuid.UUID type
	// Example: 00000000-0000-0002-0002-000000000000
	MasterUUID     string `msgpack:"master"`
	ReplicasetUUID string `msgpack:"replicaset"`
	ReplicaUUID    string `msgpack:"replica"`
	Destination    string `msgpack:"destination"`
}

func (StorageCallVShardError) Error

func (s StorageCallVShardError) Error() string

type StorageResultTypedFunc

type StorageResultTypedFunc = func(result ...interface{}) error

type TopologyController

type TopologyController interface {
	AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceInfo) error
	RemoveReplicaset(ctx context.Context, rsID uuid.UUID) []error
	RemoveInstance(ctx context.Context, rsID, instanceID uuid.UUID) error
	AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error
	AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error
}

TopologyController is an entity that allows you to interact with the topology. TopologyController is not concurrent safe. This decision is made intentionally because there is no point in providing concurrence safety for this case. In any case, a caller can use his own external synchronization primitive to handle concurrent access.

type TopologyProvider

type TopologyProvider interface {
	// Init should create the current topology at the beginning
	// and change the state during the process of changing the point of receiving the cluster configuration
	Init(t TopologyController) error
	// Close closes all connections if the provider created them
	Close()
}

TopologyProvider is external module that can lookup current topology of cluster it might be etcd/config/consul or smth else

type VshardMode

type VshardMode string
const (
	ReadMode  VshardMode = "read"
	WriteMode VshardMode = "write"
)

func (VshardMode) String

func (c VshardMode) String() string

type VshardRouterCallMode

type VshardRouterCallMode int

VshardRouterCallMode is a type to represent call mode for Router.Call method.

const (
	// VshardRouterCallModeRO sets a read-only mode for Router.Call.
	VshardRouterCallModeRO VshardRouterCallMode = iota
	// VshardRouterCallModeRW sets a read-write mode for Router.Call.
	VshardRouterCallModeRW
	// VshardRouterCallModeRE acts like VshardRouterCallModeRO
	// with preference for a replica rather than a master.
	// This mode is not supported yet.
	VshardRouterCallModeRE
	// VshardRouterCallModeBRO acts like VshardRouterCallModeRO with balancing.
	VshardRouterCallModeBRO
	// VshardRouterCallModeBRE acts like VshardRouterCallModeRO with balancing
	// and preference for a replica rather than a master.
	VshardRouterCallModeBRE
)

type VshardRouterCallOptions

type VshardRouterCallOptions struct {
	Timeout time.Duration
}

VshardRouterCallOptions represents options to Router.Call[XXX] methods.

type VshardRouterCallResp

type VshardRouterCallResp struct {
	// contains filtered or unexported fields
}

VshardRouterCallResp represents a response from Router.Call[XXX] methods.

func (VshardRouterCallResp) Get

func (r VshardRouterCallResp) Get() ([]interface{}, error)

Get returns a response from user defined function as []interface{}.

func (VshardRouterCallResp) GetTyped

func (r VshardRouterCallResp) GetTyped(result []interface{}) error

GetTyped decodes a response from user defined function into custom values.

Directories

Path Synopsis
mocks
providers
etcd
Package etcd based on moonlibs config library https://github.com/moonlibs/config?tab=readme-ov-file#multi-shard-topology-for-custom-sharding-etcdclustermaster
Package etcd based on moonlibs config library https://github.com/moonlibs/config?tab=readme-ov-file#multi-shard-topology-for-custom-sharding-etcdclustermaster
nolint:revive
nolint:revive
tests
tnt

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL