store

package
v1.0.0-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 10, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. *

* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. *

* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. *

* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. *

* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. *

* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. *

Index

Constants

View Source
const (
	RoleMaster = "master"
	RoleSlave  = "slave"

	NodeIDLen = 40
)
View Source
const (
	CommandCreate = iota + 1
	CommandUpdate = iota + 1
	CommandRemove
)
View Source
const (
	MinSlotID = 0
	MaxSlotID = 16383
)

Variables

View Source
var ErrSlotOutOfRange = errors.New("slot id was out of range, should be between 0 and 16383")

Functions

This section is empty.

Types

type Cluster

type Cluster struct {
	Name    string       `json:"name"`
	Version atomic.Int64 `json:"version"`
	Shards  []*Shard     `json:"shards"`
}

func NewCluster

func NewCluster(name string, nodes []string, replicas int) (*Cluster, error)

func ParseCluster

func ParseCluster(clusterStr string) (*Cluster, error)

ParseCluster will parse the cluster string into cluster topology.

func (*Cluster) AddNode

func (cluster *Cluster) AddNode(shardIndex int, addr, role, password string) error

func (*Cluster) GetNodes

func (cluster *Cluster) GetNodes() []Node

func (*Cluster) GetShard

func (cluster *Cluster) GetShard(shardIndex int) (*Shard, error)

func (*Cluster) MigrateSlot

func (cluster *Cluster) MigrateSlot(ctx context.Context, slot int, targetShardIdx int, slotOnly bool) error

func (*Cluster) PromoteNewMaster

func (cluster *Cluster) PromoteNewMaster(ctx context.Context,
	shardIdx int, masterNodeID, preferredNodeID string) (string, error)

func (*Cluster) RemoveNode

func (cluster *Cluster) RemoveNode(shardIndex int, nodeID string) error

func (*Cluster) Reset

func (cluster *Cluster) Reset(ctx context.Context) error

func (*Cluster) SetPassword

func (cluster *Cluster) SetPassword(password string)

SetPassword will set the password for all nodes in the cluster.

func (*Cluster) SetSlot

func (cluster *Cluster) SetSlot(ctx context.Context, slot int, targetNodeID string) error

func (*Cluster) SyncToNodes

func (cluster *Cluster) SyncToNodes(ctx context.Context) error

func (*Cluster) ToSlotString

func (cluster *Cluster) ToSlotString() (string, error)

type ClusterInfo

type ClusterInfo struct {
	CurrentEpoch   int64  `json:"cluster_current_epoch"`
	MigratingSlot  int    `json:"migrating_slot"`
	MigratingState string `json:"migrating_state"`
}

type ClusterMockNode

type ClusterMockNode struct {
	*ClusterNode

	Sequence uint64
}

ClusterMockNode is a mock implementation of the Node interface, it is used for testing purposes.

func NewClusterMockNode

func NewClusterMockNode() *ClusterMockNode

func (*ClusterMockNode) GetClusterInfo

func (mock *ClusterMockNode) GetClusterInfo(ctx context.Context) (*ClusterInfo, error)

func (*ClusterMockNode) GetClusterNodeInfo

func (mock *ClusterMockNode) GetClusterNodeInfo(ctx context.Context) (*ClusterNodeInfo, error)

func (*ClusterMockNode) Reset

func (mock *ClusterMockNode) Reset(ctx context.Context) error

func (*ClusterMockNode) SyncClusterInfo

func (mock *ClusterMockNode) SyncClusterInfo(ctx context.Context, cluster *Cluster) error

type ClusterNode

type ClusterNode struct {
	// contains filtered or unexported fields
}

func NewClusterNode

func NewClusterNode(addr, password string) *ClusterNode

func (*ClusterNode) Addr

func (n *ClusterNode) Addr() string

func (*ClusterNode) CheckClusterMode

func (n *ClusterNode) CheckClusterMode(ctx context.Context) (int64, error)

func (*ClusterNode) GetClient

func (n *ClusterNode) GetClient() *redis.Client

func (*ClusterNode) GetClusterInfo

func (n *ClusterNode) GetClusterInfo(ctx context.Context) (*ClusterInfo, error)

func (*ClusterNode) GetClusterNodeInfo

func (n *ClusterNode) GetClusterNodeInfo(ctx context.Context) (*ClusterNodeInfo, error)

func (*ClusterNode) GetClusterNodesString

func (n *ClusterNode) GetClusterNodesString(ctx context.Context) (string, error)

func (*ClusterNode) ID

func (n *ClusterNode) ID() string

func (*ClusterNode) IsMaster

func (n *ClusterNode) IsMaster() bool

func (*ClusterNode) MarshalJSON

func (n *ClusterNode) MarshalJSON() ([]byte, error)

func (*ClusterNode) MigrateSlot

func (n *ClusterNode) MigrateSlot(ctx context.Context, slot int, targetNodeID string) error

func (*ClusterNode) Password

func (n *ClusterNode) Password() string

func (*ClusterNode) Reset

func (n *ClusterNode) Reset(ctx context.Context) error

func (*ClusterNode) SetPassword

func (n *ClusterNode) SetPassword(password string)

func (*ClusterNode) SetRole

func (n *ClusterNode) SetRole(role string)

func (*ClusterNode) SyncClusterInfo

func (n *ClusterNode) SyncClusterInfo(ctx context.Context, cluster *Cluster) error

func (*ClusterNode) UnmarshalJSON

func (n *ClusterNode) UnmarshalJSON(bytes []byte) error

func (*ClusterNode) Validate

func (n *ClusterNode) Validate() error

type ClusterNodeInfo

type ClusterNodeInfo struct {
	Sequence uint64 `json:"sequence"`
	Role     string `json:"role"`
}

type ClusterStore

type ClusterStore struct {
	// contains filtered or unexported fields
}

func NewClusterStore

func NewClusterStore(e engine.Engine) *ClusterStore

func (*ClusterStore) CheckNewNodes

func (s *ClusterStore) CheckNewNodes(ctx context.Context, nodes []string) error

func (*ClusterStore) Close

func (s *ClusterStore) Close() error

func (*ClusterStore) CreateCluster

func (s *ClusterStore) CreateCluster(ctx context.Context, ns string, clusterInfo *Cluster) error

func (*ClusterStore) CreateNamespace

func (s *ClusterStore) CreateNamespace(ctx context.Context, ns string) error

CreateNamespace will create a namespace for clusters

func (*ClusterStore) EmitEvent

func (s *ClusterStore) EmitEvent(event EventPayload)

func (*ClusterStore) ExistsNamespace

func (s *ClusterStore) ExistsNamespace(ctx context.Context, ns string) (bool, error)

ExistsNamespace return an indicator whether the specified namespace exists

func (*ClusterStore) GetCluster

func (s *ClusterStore) GetCluster(ctx context.Context, ns, cluster string) (*Cluster, error)

func (*ClusterStore) ID

func (s *ClusterStore) ID() string

func (*ClusterStore) IsLeader

func (s *ClusterStore) IsLeader() bool

func (*ClusterStore) IsReady

func (s *ClusterStore) IsReady(ctx context.Context) bool

func (*ClusterStore) Leader

func (s *ClusterStore) Leader() string

func (*ClusterStore) LeaderChange

func (s *ClusterStore) LeaderChange() <-chan bool

func (*ClusterStore) ListCluster

func (s *ClusterStore) ListCluster(ctx context.Context, ns string) ([]string, error)

ListCluster return the list of name of cluster under the specified namespace

func (*ClusterStore) ListNamespace

func (s *ClusterStore) ListNamespace(ctx context.Context) ([]string, error)

ListNamespace return the list of name of all namespaces

func (*ClusterStore) Notify

func (s *ClusterStore) Notify() <-chan EventPayload

func (*ClusterStore) RemoveCluster

func (s *ClusterStore) RemoveCluster(ctx context.Context, ns, cluster string) error

func (*ClusterStore) RemoveNamespace

func (s *ClusterStore) RemoveNamespace(ctx context.Context, ns string) error

RemoveNamespace delete the specified namespace from store

func (*ClusterStore) SetCluster

func (s *ClusterStore) SetCluster(ctx context.Context, ns string, clusterInfo *Cluster) error

func (*ClusterStore) Stop

func (s *ClusterStore) Stop() error

func (*ClusterStore) UpdateCluster

func (s *ClusterStore) UpdateCluster(ctx context.Context, ns string, clusterInfo *Cluster) error

UpdateCluster update the Name to store under the specified namespace

type Command

type Command int

type EventPayload

type EventPayload struct {
	Namespace string
	Cluster   string
	Type      EventType
	Command   Command
}

type EventType

type EventType int
const (
	EventNamespace EventType = iota + 1
	EventCluster
)

type Node

type Node interface {
	ID() string
	Password() string
	Addr() string
	IsMaster() bool

	SetRole(string)
	SetPassword(string)

	Reset(ctx context.Context) error
	GetClusterNodeInfo(ctx context.Context) (*ClusterNodeInfo, error)
	GetClusterInfo(ctx context.Context) (*ClusterInfo, error)
	SyncClusterInfo(ctx context.Context, cluster *Cluster) error
	CheckClusterMode(ctx context.Context) (int64, error)
	MigrateSlot(ctx context.Context, slot int, NodeID string) error

	MarshalJSON() ([]byte, error)
	UnmarshalJSON(data []byte) error
}

type Shard

type Shard struct {
	Nodes            []Node      `json:"nodes"`
	SlotRanges       []SlotRange `json:"slot_ranges"`
	TargetShardIndex int         `json:"target_shard_index"`
	MigratingSlot    int         `json:"migrating_slot"`
}

func NewShard

func NewShard() *Shard

func (*Shard) ClearMigrateState

func (shard *Shard) ClearMigrateState()

func (*Shard) GetMasterNode

func (shard *Shard) GetMasterNode() Node

func (*Shard) HasOverlap

func (shard *Shard) HasOverlap(slotRange *SlotRange) bool

func (*Shard) IsMigrating

func (shard *Shard) IsMigrating() bool

func (*Shard) IsServicing

func (shard *Shard) IsServicing() bool

func (*Shard) ToSlotsString

func (shard *Shard) ToSlotsString() (string, error)

func (*Shard) UnmarshalJSON

func (shard *Shard) UnmarshalJSON(bytes []byte) error

UnmarshalJSON unmarshal a Shard from JSON bytes, it's required since Shard.Nodes is an interface slice. So we need to take into a concrete type.

type Shards

type Shards []*Shard

func (Shards) Len

func (s Shards) Len() int

func (Shards) Less

func (s Shards) Less(i, j int) bool

func (Shards) Swap

func (s Shards) Swap(i, j int)

type SlotRange

type SlotRange struct {
	Start int `json:"start"`
	Stop  int `json:"stop"`
}

func NewSlotRange

func NewSlotRange(start, stop int) (*SlotRange, error)

func ParseSlotRange

func ParseSlotRange(s string) (*SlotRange, error)

func (*SlotRange) Contains

func (slotRange *SlotRange) Contains(slot int) bool

func (*SlotRange) HasOverlap

func (slotRange *SlotRange) HasOverlap(that *SlotRange) bool

func (*SlotRange) MarshalJSON

func (slotRange *SlotRange) MarshalJSON() ([]byte, error)

func (*SlotRange) String

func (slotRange *SlotRange) String() string

func (*SlotRange) UnmarshalJSON

func (slotRange *SlotRange) UnmarshalJSON(data []byte) error

type SlotRanges

type SlotRanges []SlotRange

func AddSlotToSlotRanges

func AddSlotToSlotRanges(source SlotRanges, slot int) SlotRanges

func CalculateSlotRanges

func CalculateSlotRanges(n int) SlotRanges

func RemoveSlotFromSlotRanges

func RemoveSlotFromSlotRanges(source SlotRanges, slot int) SlotRanges

func (*SlotRanges) Contains

func (SlotRanges *SlotRanges) Contains(slot int) bool

type Store

type Store interface {
	IsReady(ctx context.Context) bool

	ListNamespace(ctx context.Context) ([]string, error)
	CreateNamespace(ctx context.Context, ns string) error
	ExistsNamespace(ctx context.Context, ns string) (bool, error)
	RemoveNamespace(ctx context.Context, ns string) error

	ListCluster(ctx context.Context, ns string) ([]string, error)
	GetCluster(ctx context.Context, ns, cluster string) (*Cluster, error)
	RemoveCluster(ctx context.Context, ns, cluster string) error
	CreateCluster(ctx context.Context, ns string, cluster *Cluster) error
	UpdateCluster(ctx context.Context, ns string, cluster *Cluster) error
	SetCluster(ctx context.Context, ns string, clusterInfo *Cluster) error

	CheckNewNodes(ctx context.Context, nodes []string) error
}

Directories

Path Synopsis
* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.
* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.
etcd
* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.
* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.
zookeeper
* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.
* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL