Documentation ¶
Index ¶
- Constants
- Variables
- func BucketIDStrCRC32(shardKey string, totalBucketCount uint64) uint64
- func RouterBucketIDMPCRC32(total uint64, keys ...string)
- type BucketStatError
- type BucketStatInfo
- type CallOpts
- type Config
- type DiscoveryMode
- type EmptyLogger
- type EmptyMetrics
- type Error
- type InstanceInfo
- type LogProvider
- type MetricsProvider
- type Replicaset
- type ReplicasetCallOpts
- type ReplicasetInfo
- type Router
- func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replicaset, error)
- func (r *Router) BucketReset(bucketID uint64)
- func (r *Router) BucketResolve(ctx context.Context, bucketID uint64) (*Replicaset, error)
- func (r *Router) BucketSet(bucketID uint64, rsID uuid.UUID) (*Replicaset, error)
- func (r *Router) DiscoveryAllBuckets(ctx context.Context) error
- func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buckets []uint64)
- func (r *Router) RouteMapClean()
- func (r *Router) RouterBucketCount() uint64
- func (r *Router) RouterBucketID(shardKey string) uint64
- func (r *Router) RouterBucketIDStrCRC32(shardKey string) uint64
- func (r *Router) RouterCallImpl(ctx context.Context, bucketID uint64, opts CallOpts, fnc string, ...) (interface{}, StorageResultTypedFunc, error)
- func (r *Router) RouterMapCallRWImpl(ctx context.Context, fnc string, args interface{}, opts CallOpts) (map[uuid.UUID]interface{}, error)
- func (r *Router) RouterRoute(ctx context.Context, bucketID uint64) (*Replicaset, error)
- func (r *Router) RouterRouteAll() map[uuid.UUID]*Replicaset
- func (r *Router) Topology() TopologyController
- type StdoutLogger
- type StorageCallAssertError
- type StorageCallVShardError
- type StorageResultTypedFunc
- type TopologyController
- type TopologyProvider
- type VshardMode
Constants ¶
const CallTimeoutMin = time.Second / 2
Variables ¶
var ErrInvalidConfig = fmt.Errorf("config invalid")
var Errors = map[int]Error{ 1: { Name: "WRONG_BUCKET", Msg: "Cannot perform action with bucket %d, reason: %s", Args: []string{"bucket_id", "reason", "destination"}, }, 2: { Name: "NON_MASTER", Msg: "Replica %s is not a master for replicaset %s anymore", Args: []string{"replica", "replicaset", "master"}, }, 3: { Name: "BUCKET_ALREADY_EXISTS", Msg: "Bucket %d already exists", Args: []string{"bucket_id"}, }, 4: { Name: "NO_SUCH_REPLICASET", Msg: "Replicaset %s not found", Args: []string{"replicaset"}, }, 5: { Name: "MOVE_TO_SELF", Msg: "Cannot move: bucket %d is already on replicaset %s", Args: []string{"bucket_id", "replicaset"}, }, 6: { Name: "MISSING_MASTER", Msg: "Master is not configured for replicaset %s", Args: []string{"replicaset"}, }, 7: { Name: "TRANSFER_IS_IN_PROGRESS", Msg: "Bucket %d is transferring to replicaset %s", Args: []string{"bucket_id", "destination"}, }, 8: { Name: "UNREACHABLE_REPLICASET", Msg: "There is no active replicas in replicaset %s", Args: []string{"replicaset", "bucket_id"}, }, 9: { Name: "NO_ROUTE_TO_BUCKET", Msg: "Bucket %d cannot be found. Is rebalancing in progress?", Args: []string{"bucket_id"}, }, 10: { Name: "NON_EMPTY", Msg: "Cluster is already bootstrapped", }, 11: { Name: "UNREACHABLE_MASTER", Msg: "Master of replicaset %s is unreachable: %s", Args: []string{"replicaset", "reason"}, }, 12: { Name: "OUT_OF_SYNC", Msg: "Replica is out of sync", }, 13: { Name: "HIGH_REPLICATION_LAG", Msg: "High replication lag: %f", Args: []string{"lag"}, }, 14: { Name: "UNREACHABLE_REPLICA", Msg: "Replica %s isn't active", Args: []string{"replica"}, }, 15: { Name: "LOW_REDUNDANCY", Msg: "Only one replica is active", }, 16: { Name: "INVALID_REBALANCING", Msg: "Sending and receiving buckets at same time is not allowed", }, 17: { Name: "SUBOPTIMAL_REPLICA", Msg: "A current read replica in replicaset %s is not optimal", Args: []string{"replicaset"}, }, 18: { Name: "UNKNOWN_BUCKETS", Msg: "%d buckets are not discovered", Args: []string{"not_discovered_cnt"}, }, 19: { Name: "REPLICASET_IS_LOCKED", Msg: "Replicaset is locked", }, 20: { Name: "OBJECT_IS_OUTDATED", Msg: "Object is outdated after module reload/reconfigure. Use new instance.", }, 21: { Name: "ROUTER_ALREADY_EXISTS", Msg: "Router with name %s already exists", Args: []string{"router_name"}, }, 22: { Name: "BUCKET_IS_LOCKED", Msg: "Bucket %d is locked", Args: []string{"bucket_id"}, }, 23: { Name: "INVALID_CFG", Msg: "Invalid configuration: %s", Args: []string{"reason"}, }, 24: { Name: "BUCKET_IS_PINNED", Msg: "Bucket %d is pinned", Args: []string{"bucket_id"}, }, 25: { Name: "TOO_MANY_RECEIVING", Msg: "Too many receiving buckets at once, please, throttle", }, 26: { Name: "STORAGE_IS_REFERENCED", Msg: "Storage is referenced", }, 27: { Name: "STORAGE_REF_ADD", Msg: "Can not add a storage ref: %s", Args: []string{"reason"}, }, 28: { Name: "STORAGE_REF_USE", Msg: "Can not use a storage ref: %s", Args: []string{"reason"}, }, 29: { Name: "STORAGE_REF_DEL", Msg: "Can not delete a storage ref: %s", Args: []string{"reason"}, }, 30: { Name: "BUCKET_RECV_DATA_ERROR", Msg: "Can not receive the bucket %s data in space \"%s\" at tuple %s: %s", Args: []string{"bucket_id", "space", "tuple", "reason"}, }, 31: { Name: "MULTIPLE_MASTERS_FOUND", Msg: "Found more than one master in replicaset %s on nodes %s and %s", Args: []string{"replicaset", "master1", "master2"}, }, 32: { Name: "REPLICASET_IN_BACKOFF", Msg: "Replicaset %s is in backoff, can't take requests right now. Last error was %s", Args: []string{"replicaset", "error"}, }, 33: { Name: "STORAGE_IS_DISABLED", Msg: "Storage is disabled: %s", Args: []string{"reason"}, }, 34: { Name: "BUCKET_IS_CORRUPTED", Msg: "Bucket %d is corrupted: %s", Args: []string{"bucket_id", "reason"}, }, 35: { Name: "ROUTER_IS_DISABLED", Msg: "Router is disabled: %s", Args: []string{"reason"}, }, 36: { Name: "BUCKET_GC_ERROR", Msg: "Error during bucket GC: %s", Args: []string{"reason"}, }, 37: { Name: "STORAGE_CFG_IS_IN_PROGRESS", Msg: "Configuration of the storage is in progress", }, 38: { Name: "ROUTER_CFG_IS_IN_PROGRESS", Msg: "Configuration of the router with name %s is in progress", Args: []string{"router_name"}, }, 39: { Name: "BUCKET_INVALID_UPDATE", Msg: "Bucket %s update is invalid: %s", Args: []string{"bucket_id", "reason"}, }, 40: { Name: "VHANDSHAKE_NOT_COMPLETE", Msg: "Handshake with %s have not been completed yet", Args: []string{"replica"}, }, 41: { Name: "INSTANCE_NAME_MISMATCH", Msg: "Mismatch server name: expected \"%s\", but got \"%s\"", Args: []string{"expected_name", "actual_name"}, }, }
Functions ¶
func BucketIDStrCRC32 ¶
func RouterBucketIDMPCRC32 ¶
RouterBucketIDMPCRC32 is not supported now
Types ¶
type BucketStatError ¶
type BucketStatError struct { BucketID uint64 `msgpack:"bucket_id"` Reason string `msgpack:"reason"` Code int `msgpack:"code"` Type string `msgpack:"type"` Message string `msgpack:"message"` Name string `msgpack:"name"` }
func (BucketStatError) Error ¶
func (bse BucketStatError) Error() string
type BucketStatInfo ¶
type CallOpts ¶
type CallOpts struct { VshardMode VshardMode // vshard mode in call PoolMode pool.Mode Timeout time.Duration }
type Config ¶
type Config struct { // Providers Logger LogProvider // Logger is not required Metrics MetricsProvider // Metrics is not required TopologyProvider TopologyProvider // TopologyProvider is required provider // Discovery DiscoveryTimeout time.Duration // DiscoveryTimeout is timeout between cron discovery job; by default there is no timeout DiscoveryMode DiscoveryMode TotalBucketCount uint64 User string Password string PoolOpts tarantool.Opts NWorkers int32 // todo: rename this, cause NWorkers naming looks strange }
type DiscoveryMode ¶
type DiscoveryMode int
const ( // DiscoveryModeOn is cron discovery with cron timeout DiscoveryModeOn DiscoveryMode = iota DiscoveryModeOnce )
type EmptyLogger ¶
type EmptyLogger struct{}
type EmptyMetrics ¶
type EmptyMetrics struct{}
EmptyMetrics is default empty metrics provider you can embed this type and realize just some metrics
func (*EmptyMetrics) CronDiscoveryEvent ¶
func (e *EmptyMetrics) CronDiscoveryEvent(ok bool, duration time.Duration, reason string)
func (*EmptyMetrics) RequestDuration ¶
func (e *EmptyMetrics) RequestDuration(duration time.Duration, ok bool, mapReduce bool)
func (*EmptyMetrics) RetryOnCall ¶
func (e *EmptyMetrics) RetryOnCall(reason string)
type Error ¶
func ErrorByName ¶
type InstanceInfo ¶
type LogProvider ¶
type MetricsProvider ¶
type MetricsProvider interface { CronDiscoveryEvent(ok bool, duration time.Duration, reason string) RetryOnCall(reason string) RequestDuration(duration time.Duration, ok bool, mapReduce bool) }
MetricsProvider is an interface for passing library metrics to your prometheus/graphite and other metrics
type Replicaset ¶
type Replicaset struct {
// contains filtered or unexported fields
}
func (*Replicaset) BucketStat ¶
func (rs *Replicaset) BucketStat(ctx context.Context, bucketID uint64) (BucketStatInfo, error)
func (*Replicaset) ReplicaCall ¶
func (rs *Replicaset) ReplicaCall( ctx context.Context, opts ReplicasetCallOpts, fnc string, args interface{}, ) (interface{}, StorageResultTypedFunc, error)
ReplicaCall perform function on remote storage link https://github.com/tarantool/vshard/blob/master/vshard/replicaset.lua#L661
func (*Replicaset) String ¶
func (rs *Replicaset) String() string
type ReplicasetCallOpts ¶
type ReplicasetInfo ¶
func (ReplicasetInfo) String ¶
func (rsi ReplicasetInfo) String() string
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
func (*Router) BucketDiscovery ¶
BucketDiscovery search bucket in whole cluster
func (*Router) BucketReset ¶
func (*Router) BucketResolve ¶
BucketResolve resolve bucket id to replicaset
func (*Router) DiscoveryAllBuckets ¶
func (*Router) DiscoveryHandleBuckets ¶
func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buckets []uint64)
DiscoveryHandleBuckets arrange downloaded buckets to the route map so as they reference a given replicaset.
func (*Router) RouteMapClean ¶
func (r *Router) RouteMapClean()
func (*Router) RouterBucketCount ¶
func (*Router) RouterBucketID ¶
RouterBucketID return the bucket identifier from the parameter used for sharding Deprecated: RouterBucketID() is deprecated, use RouterBucketIDStrCRC32() RouterBucketIDMPCRC32() instead
func (*Router) RouterBucketIDStrCRC32 ¶
func (*Router) RouterCallImpl ¶
func (r *Router) RouterCallImpl(ctx context.Context, bucketID uint64, opts CallOpts, fnc string, args interface{}) (interface{}, StorageResultTypedFunc, error)
RouterCallImpl Perform shard operation function will restart operation after wrong bucket response until timeout is reached
func (*Router) RouterMapCallRWImpl ¶
func (r *Router) RouterMapCallRWImpl( ctx context.Context, fnc string, args interface{}, opts CallOpts, ) (map[uuid.UUID]interface{}, error)
RouterMapCallRWImpl perform call function on all masters in the cluster with a guarantee that in case of success it was executed with all buckets being accessible for reads and writes.
func (*Router) RouterRoute ¶
RouterRoute get replicaset object by bucket identifier. alias to BucketResolve
func (*Router) RouterRouteAll ¶
func (r *Router) RouterRouteAll() map[uuid.UUID]*Replicaset
RouterRouteAll return map of all replicasets.
func (*Router) Topology ¶
func (r *Router) Topology() TopologyController
type StdoutLogger ¶
type StdoutLogger struct{}
type StorageCallAssertError ¶
type StorageCallAssertError struct { Code int `msgpack:"code"` BaseType string `msgpack:"base_type"` Type string `msgpack:"type"` Message string `msgpack:"message"` Trace interface{} `msgpack:"trace"` }
func (StorageCallAssertError) Error ¶
func (s StorageCallAssertError) Error() string
type StorageCallVShardError ¶
type StorageCallVShardError struct { BucketID uint64 `msgpack:"bucket_id" mapstructure:"bucket_id"` Reason string `msgpack:"reason"` Code int `msgpack:"code"` Type string `msgpack:"type"` Message string `msgpack:"message"` Name string `msgpack:"name"` MasterUUID *string `msgpack:"master_uuid" mapstructure:"master_uuid"` // mapstructure cant decode to source uuid type ReplicasetUUID *string `msgpack:"replicaset_uuid" mapstructure:"replicaset_uuid"` // mapstructure cant decode to source uuid type }
func (StorageCallVShardError) Error ¶
func (s StorageCallVShardError) Error() string
type StorageResultTypedFunc ¶
type StorageResultTypedFunc = func(result interface{}) error
type TopologyController ¶
type TopologyController interface { AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceInfo) error RemoveReplicaset(ctx context.Context, rsID uuid.UUID) []error RemoveInstance(ctx context.Context, rsID, instanceID uuid.UUID) error AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error }
type TopologyProvider ¶
type TopologyProvider interface { // Init should create the current topology at the beginning // and change the state during the process of changing the point of receiving the cluster configuration Init(t TopologyController) error // Close closes all connections if the provider created them Close() }
TopologyProvider is external module that can lookup current topology of cluster it might be etcd/config/consul or smth else
type VshardMode ¶
type VshardMode string
const ( ReadMode VshardMode = "read" WriteMode VshardMode = "write" )
func (VshardMode) String ¶
func (c VshardMode) String() string