v1

package
v0.0.0-...-052bb6b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 2, 2019 License: Apache-2.0 Imports: 16 Imported by: 1

Documentation

Index

Constants

View Source
const PROTO_VERSION = 1

Variables

View Source
var (
	ErrNotFoundClusterNode = errors.New("not found cluster node")

	ErrMessageDispatcherExist = errors.New("message dispatcher already exist")
	ErrMessageHandlerExist    = errors.New("message handler already exist")
	ErrMessageHandlerNotExist = errors.New("message handler not exist")
)
View Source
var (
	ErrClusterNodeOffline = errors.New("cluster node offline")
)

Functions

func CreateProtoNode

func CreateProtoNode(node *memberlist.Node) *model.Node

func GenerateRepositoryId

func GenerateRepositoryId(group string) int32

func Get

func Get(key string)

func Put

func Put(packetId string, message PacketMessage)

Types

type Area

type Area struct {
	Id    int32
	Racks *core.Array
}

func NewArea

func NewArea(id int32) *Area

func (*Area) Compare

func (p *Area) Compare(item core.ArrayItem) int32

func (*Area) GetId

func (p *Area) GetId() int32

func (*Area) IfAbsentCreateRack

func (p *Area) IfAbsentCreateRack(group string) *Rack

func (*Area) Rack

func (p *Area) Rack(id int32) *Rack

type Capacity

type Capacity struct {
	Total uint64
	Used  uint64
	Free  uint64
}

type Clock

type Clock struct {
	NodeId    uint32
	Remote    int64
	Local     int64
	Available bool
}

func NewClock

func NewClock(nodeId uint32, remote int64, local int64, available bool) *Clock

type Clocks

type Clocks struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*Clocks) GetClock

func (p *Clocks) GetClock(nodeId uint32) (int, *Clock)

func (*Clocks) Len

func (p *Clocks) Len() int

func (*Clocks) Less

func (p *Clocks) Less(i, j int) bool

时钟按照本地时间从低到高排序

func (*Clocks) Swap

func (p *Clocks) Swap(i, j int)

func (*Clocks) UpdateClock

func (p *Clocks) UpdateClock(clock *Clock)

type Cluster

type Cluster struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewCluster

func NewCluster(config UClusterConfig) *Cluster

func (*Cluster) JoinNode

func (p *Cluster) JoinNode(nodeId int32, partitionSize int, replicaSize int)

func (*Cluster) Listen

func (p *Cluster) Listen()

type ClusterDataDelegate

type ClusterDataDelegate interface {
	ForEach(iterator DataIterator)
	Get(data *model.DataBody) (bool, error)
	Set(data *model.DataBody) error
}

type ClusterDelegate

type ClusterDelegate interface {
	LocalNodeStorageInfo() *model.NodeStorageInfo
	LocalNodeStorageStat() *model.NodeStorageStat
	LocalNodeHealthStat() *model.NodeHealthStat
}

type ClusterEventListener

type ClusterEventListener struct {
	// contains filtered or unexported fields
}

func NewClusterEventListener

func NewClusterEventListener(warehouse *Warehouse) *ClusterEventListener

func (*ClusterEventListener) OnTopologyChanged

func (p *ClusterEventListener) OnTopologyChanged(event *NodeEvent)

type ClusterMessageDelegate

type ClusterMessageDelegate interface {
	System(pipeline Pipeline, message model.SystemMessage) error
	Event(pipeline Pipeline, message model.EventMessage) error
	Topic(pipeline Pipeline, message model.TopicMessage) error
	Data(pipeline Pipeline, message model.DataMessage) error
}

type ClusterPacketDispatcher

type ClusterPacketDispatcher struct {
	// contains filtered or unexported fields
}

func NewClusterPacketDispatcher

func NewClusterPacketDispatcher(cluster *Cluster) *ClusterPacketDispatcher

func (*ClusterPacketDispatcher) Dispatch

func (p *ClusterPacketDispatcher) Dispatch(packet model.Packet) error

func (*ClusterPacketDispatcher) Register

func (p *ClusterPacketDispatcher) Register(packetType model.PacketType, handler PacketHandler) error

type ClusterPacketListener

type ClusterPacketListener struct {
	Pipeline PacketPipeline
}

func NewClusterPacketListener

func NewClusterPacketListener(pipeline PacketPipeline) *ClusterPacketListener

func (*ClusterPacketListener) OnReceive

func (p *ClusterPacketListener) OnReceive(packet *model.Packet)

type ClusterPacketPipeline

type ClusterPacketPipeline struct {
	// contains filtered or unexported fields
}

func NewClusterPacketPipeline

func NewClusterPacketPipeline() *ClusterPacketPipeline

func (*ClusterPacketPipeline) InRead

func (p *ClusterPacketPipeline) InRead() <-chan *model.Packet

func (*ClusterPacketPipeline) InSyncWrite

func (p *ClusterPacketPipeline) InSyncWrite(packet *model.Packet) *SyncChannel

func (*ClusterPacketPipeline) InWrite

func (p *ClusterPacketPipeline) InWrite(packet *model.Packet)

func (*ClusterPacketPipeline) OutRead

func (p *ClusterPacketPipeline) OutRead() <-chan *model.Packet

func (*ClusterPacketPipeline) OutWrite

func (p *ClusterPacketPipeline) OutWrite(packet *model.Packet)

type ClusterPipeline

type ClusterPipeline struct {
	// contains filtered or unexported fields
}

func NewClusterPipeline

func NewClusterPipeline(pipeline PacketPipeline) *ClusterPipeline

func (*ClusterPipeline) ASyncSend

func (p *ClusterPipeline) ASyncSend(packet *model.Packet)

func (*ClusterPipeline) SyncSend

func (p *ClusterPipeline) SyncSend(packet *model.Packet) *model.Packet

type DataCenter

type DataCenter struct {
	Id    int32  //数据中心Id
	Name  string //数据中心名
	Areas *core.Array

	ReplicaStrategy DataCenterReplicaStrategy
	// contains filtered or unexported fields
}

func NewDataCenter

func NewDataCenter(id int32) *DataCenter

func (*DataCenter) Area

func (p *DataCenter) Area(areaId int32) *Area

func (*DataCenter) Compare

func (p *DataCenter) Compare(item core.ArrayItem) int32

func (*DataCenter) GetId

func (p *DataCenter) GetId() int32

func (*DataCenter) Group

func (p *DataCenter) Group()

func (*DataCenter) IfAbsentCreateArea

func (p *DataCenter) IfAbsentCreateArea(group string) *Area

func (*DataCenter) LeaveNode

func (p *DataCenter) LeaveNode(nodeId uint32) error

func (*DataCenter) NextOfRing

func (p *DataCenter) NextOfRing(ring uint32) uint32

func (*DataCenter) Nodes

func (p *DataCenter) Nodes() []Node

func (*DataCenter) UpdateNodeStatus

func (p *DataCenter) UpdateNodeStatus(nodeId uint32, status model.NodeStatus) error

type DataCenterReplicaStrategy

type DataCenterReplicaStrategy uint8
const (
	DataCenterReplicaStrategy_Circle DataCenterReplicaStrategy = iota //按照一致性哈希环复制
)

type DataIterator

type DataIterator func(data *model.DataBody) bool

type DataOperations

type DataOperations interface {
	Migrate(from, to int32, startRing int32, endRing int32) error
	Push(from, to int32, dataArray []*model.DataBody)
	Pull(from, to int32, dataArray []*model.DataBody) error
}

type DataPipeline

type DataPipeline interface {
}

type DataVectorClock

type DataVectorClock struct {
	Clocks *Clocks
}

func (*DataVectorClock) Compare

func (p *DataVectorClock) Compare(left Versions, right Versions) int

type EndpointSnitch

type EndpointSnitch interface {
	DataCenter() uint32

	Rack() uint32
}

type EventListener

type EventListener interface {
	OnTopologyChanged(event *NodeEvent)
}

type HashTable

type HashTable struct {
	// contains filtered or unexported fields
}

func NewHashTable

func NewHashTable(initialSize int) *HashTable

func (*HashTable) Contains

func (p *HashTable) Contains(data HashTableData) bool

func (*HashTable) Get

func (p *HashTable) Get(data HashTableData) uint32

func (*HashTable) IsEmpty

func (p *HashTable) IsEmpty() bool

func (*HashTable) Put

func (p *HashTable) Put(data HashTableData) uint32

func (*HashTable) Size

func (p *HashTable) Size() int

type HashTableData

type HashTableData interface {
	Key() string
}

type HashTableNode

type HashTableNode struct {
	//256
	Key string
	//4
	Hash uint32
}

func NewHashTableNode

func NewHashTableNode(key string, hash uint32) *HashTableNode

type LocalDataOperations

type LocalDataOperations struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*LocalDataOperations) Migrate

func (p *LocalDataOperations) Migrate(from, to int32, startRing int32, endRing int32) error

1 3 5 7 9 11 2

func (*LocalDataOperations) Pull

func (p *LocalDataOperations) Pull(from, to int32, dataArray []*model.DataBody) error

A不发送数据的value,仅发送数据的摘要key和version给B。 B根据版本比较数据,将本地比A新的数据(key,value,version)推送给A A更新自己的本地数据

func (*LocalDataOperations) Push

func (p *LocalDataOperations) Push(from, to int32, dataArray []*model.DataBody)

A节点将数据(key,value,version)及对应的版本号推送给B节点 B节点更新A发过来的数据中比自己新的数据

type Node

type Node struct {
	DataCenter int32
	Area       int32
	Rack       int32
	Id         int32
	Ip         string
	Port       int

	Status model.NodeStatus
	Health model.NodeHealth

	Partitions *core.Array

	Weight        float32
	ReplicaSize   int
	PartitionSize int
}

func NewNode

func NewNode(ip string, port int, weight float32) *Node

func (*Node) Compare

func (p *Node) Compare(item core.ArrayItem) int32

func (*Node) GetId

func (p *Node) GetId() int32

type NodeEvent

type NodeEvent struct {
	Type   NodeEventType
	Node   *model.Node
	Native interface{}
}

func NewNodeEvent

func NewNodeEvent(t NodeEventType, node *model.Node, native interface{}) *NodeEvent

type NodeEventType

type NodeEventType uint8
const (
	NodeEventType_Join NodeEventType = iota
	NodeEventType_Leave
	NodeEventType_Update
)

type PacketDispatcher

type PacketDispatcher interface {
	Dispatch(packet model.Packet) error
	Register(packetType model.PacketType, handler PacketHandler) error
}

type PacketHandler

type PacketHandler func(packet model.Packet) error

type PacketListener

type PacketListener interface {
	OnReceive(packet *model.Packet)
}

type PacketMessage

type PacketMessage struct {
	// contains filtered or unexported fields
}

type PacketPipeline

type PacketPipeline interface {
	InSyncWrite(packet *model.Packet) *SyncChannel
	InWrite(packet *model.Packet)
	InRead() <-chan *model.Packet
	OutWrite(packet *model.Packet)
	OutRead() <-chan *model.Packet
}

type PacketQueue

type PacketQueue struct {
	// contains filtered or unexported fields
}

type Partition

type Partition struct {
	Id         int32 //分区Id
	Index      int32
	DataCenter int32  //数据中心Id
	Area       int32  //区Id
	Rack       int32  //机架Id
	Node       int32  //节点Id
	Ip         string //节点Ip
	Port       int    //节点端口
	Path       string //分区根路径

	Replicas *core.Array

	DiskCapacity Capacity
}

func NewPartition

func NewPartition(node int32, partition int32) *Partition

func (*Partition) Compare

func (p *Partition) Compare(item core.ArrayItem) int32

func (*Partition) GetId

func (p *Partition) GetId() int32

type PartitionDiskType

type PartitionDiskType uint8
const (
	PartitionDiskType_Physical PartitionDiskType = iota
	PartitionDiskType_Cloud
)

type Pipeline

type Pipeline interface {
	SyncSend(packet *model.Packet) *model.Packet
	ASyncSend(packet *model.Packet)
}

type Rack

type Rack struct {
	Id    int32  //机架Id
	Name  string //机架名
	Nodes *core.Array
}

func NewRack

func NewRack(id int32) *Rack

func (*Rack) Compare

func (p *Rack) Compare(item core.ArrayItem) int32

func (*Rack) GetId

func (p *Rack) GetId() int32

func (*Rack) IfAbsentCreateNode

func (p *Rack) IfAbsentCreateNode(ip string, port int) *Node

func (*Rack) Node

func (p *Rack) Node(id int32) *Node

type RemoteDataOperations

type RemoteDataOperations struct {
}

func (*RemoteDataOperations) Pull

func (*RemoteDataOperations) Push

func (p *RemoteDataOperations) Push(dataArray []*model.DataBody) error

type SyncChannel

type SyncChannel struct {
	PacketId string
	// contains filtered or unexported fields
}

func NewSyncChannel

func NewSyncChannel(packetId string) *SyncChannel

func (*SyncChannel) Read

func (p *SyncChannel) Read() <-chan *model.Packet

func (*SyncChannel) Write

func (p *SyncChannel) Write(packet *model.Packet)

type Transport

type Transport interface {
	Serving() *TransportInfo
	Shutdown()
	SendToTCP(nodeId int32, msg []byte) error
	SendToUDP(nodeId int32, msg []byte) error
	Me() TransportInfo
	Node(nodeId int32) *core.Node
}

type TransportConfig

type TransportConfig struct {
	Id            int32
	BindIp        []string
	BindPort      int
	AdvertisePort int
	Seeds         []string
	Secret        string

	EventListener  EventListener
	PacketListener PacketListener
}

type TransportGossip

type TransportGossip struct {
	// contains filtered or unexported fields
}

func NewTransportGossip

func NewTransportGossip(config *TransportConfig) *TransportGossip

func (*TransportGossip) GetBroadcasts

func (p *TransportGossip) GetBroadcasts(overhead, limit int) [][]byte

GetBroadcasts is called when user data messages can be broadcast. It can return a list of buffers to send. Each buffer should assume an overhead as provided with a limit on the total byte size allowed. The total byte size of the resulting data to send must not exceed the limit. Care should be taken that this method does not block, since doing so would block the entire UDP packet receive loop.

func (*TransportGossip) LocalState

func (p *TransportGossip) LocalState(join bool) []byte

LocalState is used for a TCP push/pull. This is sent to the remote side in addition to the membership information. Any data can be sent here. See MergeRemoteState as well. The `join` boolean indicates this is for a join instead of a push/pull.

func (*TransportGossip) Me

func (*TransportGossip) MergeRemoteState

func (p *TransportGossip) MergeRemoteState(buf []byte, join bool)

MergeRemoteState is invoked after a TCP push/pull. This is the state received from the remote side and is the result of the remote side's LocalState call. The 'join' boolean indicates this is for a join instead of a push/pull.

func (*TransportGossip) Node

func (p *TransportGossip) Node(nodeId int32) *core.Node

func (*TransportGossip) NodeMeta

func (p *TransportGossip) NodeMeta(limit int) []byte

NodeMeta is used to retrieve meta-data about the current node when broadcasting an alive message. It's length is limited to the given byte size. This metadata is available in the Node structure.

func (*TransportGossip) NotifyJoin

func (p *TransportGossip) NotifyJoin(n *memberlist.Node)

NotifyJoin is invoked when a node is detected to have joined. The Node argument must not be modified.

func (*TransportGossip) NotifyLeave

func (p *TransportGossip) NotifyLeave(n *memberlist.Node)

NotifyLeave is invoked when a node is detected to have left. The Node argument must not be modified.

func (*TransportGossip) NotifyMsg

func (p *TransportGossip) NotifyMsg(dat []byte)

NotifyMsg is called when a user-data message is received. Care should be taken that this method does not block, since doing so would block the entire UDP packet receive loop. Additionally, the byte slice may be modified after the call returns, so it should be copied if needed

func (*TransportGossip) NotifyUpdate

func (p *TransportGossip) NotifyUpdate(n *memberlist.Node)

NotifyUpdate is invoked when a node is detected to have updated, usually involving the meta data. The Node argument must not be modified.

func (*TransportGossip) SendToTCP

func (p *TransportGossip) SendToTCP(nodeId int32, msg []byte) error

func (*TransportGossip) SendToUDP

func (p *TransportGossip) SendToUDP(nodeId int32, msg []byte) error

func (*TransportGossip) Serving

func (p *TransportGossip) Serving() *TransportInfo

func (*TransportGossip) Shutdown

func (p *TransportGossip) Shutdown()

type TransportInfo

type TransportInfo struct {
	Id     int32           //节点Id
	Name   string          //节点名称
	Status TransportStatus //节点状态
	Addr   net.IP          //节点ip
	Port   uint16          //节点端口
	Native interface{}     //原生实现
}

type TransportStatus

type TransportStatus uint8

type UCluster

type UCluster struct {
	// contains filtered or unexported fields
}

func NewUCluster

func NewUCluster(config *UClusterConfig) *UCluster

func (*UCluster) Serving

func (p *UCluster) Serving() error

type UClusterConfig

type UClusterConfig struct {
	Name          string   `json:"level" yaml:"name"`
	Seeds         []string `json:"seeds" yaml:"seeds"`
	BindIp        []string `json:"bind_ip" yaml:"bind_ip"`
	BindPort      int      `json:"bind_port" yaml:"bind_port"`
	AdvertisePort int      `json:"advertise_port" yaml:"advertise_port"`
	ReplicaCount  int      `json:"replica_count" yaml:"replica_count"`
	Secret        string   `json:"secret" yaml:"secret"`

	Delegate        ClusterDelegate        `json:"-" yaml:"-"`
	DataDelegate    ClusterDataDelegate    `json:"-" yaml:"-"`
	MessageDelegate ClusterMessageDelegate `json:"-" yaml:"-"`
}

type Version

type Version struct {
	NodeId    uint32
	Version   uint32
	Timestamp int64
}

type Versions

type Versions struct {
	Versions []Version
}

func (*Versions) GetVersion

func (p *Versions) GetVersion(nodeId uint32) (int, *Version)

func (*Versions) Len

func (p *Versions) Len() int

func (*Versions) Less

func (p *Versions) Less(i, j int) bool

按版本号倒序排列

func (*Versions) Swap

func (p *Versions) Swap(i, j int)

func (*Versions) UpdateVersion

func (p *Versions) UpdateVersion(ver *Version)

type Warehouse

type Warehouse struct {
	Centers *core.Array

	sync.RWMutex
	// contains filtered or unexported fields
}

func NewWarehouse

func NewWarehouse(cluster *Cluster) *Warehouse

func (*Warehouse) AddNode

func (p *Warehouse) AddNode(node *Node, partitionSize int, replicaSize int) error

func (*Warehouse) Applicants

func (p *Warehouse) Applicants() *core.Array

func (*Warehouse) GetCenter

func (p *Warehouse) GetCenter(dc int32) *DataCenter

func (*Warehouse) GetNode

func (p *Warehouse) GetNode(dc int32, nodeId int32) *Node

func (*Warehouse) Group

func (p *Warehouse) Group()

func (*Warehouse) IfAbsentCreateDataCenter

func (p *Warehouse) IfAbsentCreateDataCenter(group string) *DataCenter

func (*Warehouse) IfPresent

func (p *Warehouse) IfPresent(ipv4 string) *DataCenter

func (*Warehouse) JoinNode

func (p *Warehouse) JoinNode(ip string, port int) *Node

func (*Warehouse) LeaveNode

func (p *Warehouse) LeaveNode(ip string, port int) *Node

func (*Warehouse) Readying

func (p *Warehouse) Readying(listener WarehouseListener)

type WarehouseListener

type WarehouseListener func(status WarehouseStatus)

type WarehouseStatus

type WarehouseStatus uint8
const (
	WarehouseStatus_Launching WarehouseStatus = iota
	WarehouseStatus_Normal
	WarehouseStatus_Node_Changed
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL