Documentation ¶
Index ¶
- Constants
- Variables
- func CreateProtoNode(node *memberlist.Node) *model.Node
- func GenerateRepositoryId(group string) int32
- func Get(key string)
- func Put(packetId string, message PacketMessage)
- type Area
- type Capacity
- type Clock
- type Clocks
- type Cluster
- type ClusterDataDelegate
- type ClusterDelegate
- type ClusterEventListener
- type ClusterMessageDelegate
- type ClusterPacketDispatcher
- type ClusterPacketListener
- type ClusterPacketPipeline
- func (p *ClusterPacketPipeline) InRead() <-chan *model.Packet
- func (p *ClusterPacketPipeline) InSyncWrite(packet *model.Packet) *SyncChannel
- func (p *ClusterPacketPipeline) InWrite(packet *model.Packet)
- func (p *ClusterPacketPipeline) OutRead() <-chan *model.Packet
- func (p *ClusterPacketPipeline) OutWrite(packet *model.Packet)
- type ClusterPipeline
- type DataCenter
- func (p *DataCenter) Area(areaId int32) *Area
- func (p *DataCenter) Compare(item core.ArrayItem) int32
- func (p *DataCenter) GetId() int32
- func (p *DataCenter) Group()
- func (p *DataCenter) IfAbsentCreateArea(group string) *Area
- func (p *DataCenter) LeaveNode(nodeId uint32) error
- func (p *DataCenter) NextOfRing(ring uint32) uint32
- func (p *DataCenter) Nodes() []Node
- func (p *DataCenter) UpdateNodeStatus(nodeId uint32, status model.NodeStatus) error
- type DataCenterReplicaStrategy
- type DataIterator
- type DataOperations
- type DataPipeline
- type DataVectorClock
- type EndpointSnitch
- type EventListener
- type HashTable
- type HashTableData
- type HashTableNode
- type LocalDataOperations
- type Node
- type NodeEvent
- type NodeEventType
- type PacketDispatcher
- type PacketHandler
- type PacketListener
- type PacketMessage
- type PacketPipeline
- type PacketQueue
- type Partition
- type PartitionDiskType
- type Pipeline
- type Rack
- type RemoteDataOperations
- type SyncChannel
- type Transport
- type TransportConfig
- type TransportGossip
- func (p *TransportGossip) GetBroadcasts(overhead, limit int) [][]byte
- func (p *TransportGossip) LocalState(join bool) []byte
- func (p *TransportGossip) Me() TransportInfo
- func (p *TransportGossip) MergeRemoteState(buf []byte, join bool)
- func (p *TransportGossip) Node(nodeId int32) *core.Node
- func (p *TransportGossip) NodeMeta(limit int) []byte
- func (p *TransportGossip) NotifyJoin(n *memberlist.Node)
- func (p *TransportGossip) NotifyLeave(n *memberlist.Node)
- func (p *TransportGossip) NotifyMsg(dat []byte)
- func (p *TransportGossip) NotifyUpdate(n *memberlist.Node)
- func (p *TransportGossip) SendToTCP(nodeId int32, msg []byte) error
- func (p *TransportGossip) SendToUDP(nodeId int32, msg []byte) error
- func (p *TransportGossip) Serving() *TransportInfo
- func (p *TransportGossip) Shutdown()
- type TransportInfo
- type TransportStatus
- type UCluster
- type UClusterConfig
- type Version
- type Versions
- type Warehouse
- func (p *Warehouse) AddNode(node *Node, partitionSize int, replicaSize int) error
- func (p *Warehouse) Applicants() *core.Array
- func (p *Warehouse) GetCenter(dc int32) *DataCenter
- func (p *Warehouse) GetNode(dc int32, nodeId int32) *Node
- func (p *Warehouse) Group()
- func (p *Warehouse) IfAbsentCreateDataCenter(group string) *DataCenter
- func (p *Warehouse) IfPresent(ipv4 string) *DataCenter
- func (p *Warehouse) JoinNode(ip string, port int) *Node
- func (p *Warehouse) LeaveNode(ip string, port int) *Node
- func (p *Warehouse) Readying(listener WarehouseListener)
- type WarehouseListener
- type WarehouseStatus
Constants ¶
const PROTO_VERSION = 1
Variables ¶
var ( ErrNotFoundClusterNode = errors.New("not found cluster node") ErrMessageDispatcherExist = errors.New("message dispatcher already exist") ErrMessageHandlerExist = errors.New("message handler already exist") ErrMessageHandlerNotExist = errors.New("message handler not exist") )
var (
ErrClusterNodeOffline = errors.New("cluster node offline")
)
Functions ¶
func CreateProtoNode ¶
func CreateProtoNode(node *memberlist.Node) *model.Node
func GenerateRepositoryId ¶
func Put ¶
func Put(packetId string, message PacketMessage)
Types ¶
type Area ¶
func (*Area) IfAbsentCreateRack ¶
type Clocks ¶
func (*Clocks) UpdateClock ¶
type Cluster ¶
func NewCluster ¶
func NewCluster(config UClusterConfig) *Cluster
type ClusterDataDelegate ¶
type ClusterDelegate ¶
type ClusterDelegate interface { LocalNodeStorageInfo() *model.NodeStorageInfo LocalNodeStorageStat() *model.NodeStorageStat LocalNodeHealthStat() *model.NodeHealthStat }
type ClusterEventListener ¶
type ClusterEventListener struct {
// contains filtered or unexported fields
}
func NewClusterEventListener ¶
func NewClusterEventListener(warehouse *Warehouse) *ClusterEventListener
func (*ClusterEventListener) OnTopologyChanged ¶
func (p *ClusterEventListener) OnTopologyChanged(event *NodeEvent)
type ClusterMessageDelegate ¶
type ClusterMessageDelegate interface { System(pipeline Pipeline, message model.SystemMessage) error Event(pipeline Pipeline, message model.EventMessage) error Topic(pipeline Pipeline, message model.TopicMessage) error Data(pipeline Pipeline, message model.DataMessage) error }
type ClusterPacketDispatcher ¶
type ClusterPacketDispatcher struct {
// contains filtered or unexported fields
}
func NewClusterPacketDispatcher ¶
func NewClusterPacketDispatcher(cluster *Cluster) *ClusterPacketDispatcher
func (*ClusterPacketDispatcher) Dispatch ¶
func (p *ClusterPacketDispatcher) Dispatch(packet model.Packet) error
func (*ClusterPacketDispatcher) Register ¶
func (p *ClusterPacketDispatcher) Register(packetType model.PacketType, handler PacketHandler) error
type ClusterPacketListener ¶
type ClusterPacketListener struct {
Pipeline PacketPipeline
}
func NewClusterPacketListener ¶
func NewClusterPacketListener(pipeline PacketPipeline) *ClusterPacketListener
func (*ClusterPacketListener) OnReceive ¶
func (p *ClusterPacketListener) OnReceive(packet *model.Packet)
type ClusterPacketPipeline ¶
type ClusterPacketPipeline struct {
// contains filtered or unexported fields
}
func NewClusterPacketPipeline ¶
func NewClusterPacketPipeline() *ClusterPacketPipeline
func (*ClusterPacketPipeline) InRead ¶
func (p *ClusterPacketPipeline) InRead() <-chan *model.Packet
func (*ClusterPacketPipeline) InSyncWrite ¶
func (p *ClusterPacketPipeline) InSyncWrite(packet *model.Packet) *SyncChannel
func (*ClusterPacketPipeline) InWrite ¶
func (p *ClusterPacketPipeline) InWrite(packet *model.Packet)
func (*ClusterPacketPipeline) OutRead ¶
func (p *ClusterPacketPipeline) OutRead() <-chan *model.Packet
func (*ClusterPacketPipeline) OutWrite ¶
func (p *ClusterPacketPipeline) OutWrite(packet *model.Packet)
type ClusterPipeline ¶
type ClusterPipeline struct {
// contains filtered or unexported fields
}
func NewClusterPipeline ¶
func NewClusterPipeline(pipeline PacketPipeline) *ClusterPipeline
func (*ClusterPipeline) ASyncSend ¶
func (p *ClusterPipeline) ASyncSend(packet *model.Packet)
type DataCenter ¶
type DataCenter struct { Id int32 //数据中心Id Name string //数据中心名 Areas *core.Array ReplicaStrategy DataCenterReplicaStrategy // contains filtered or unexported fields }
func NewDataCenter ¶
func NewDataCenter(id int32) *DataCenter
func (*DataCenter) Area ¶
func (p *DataCenter) Area(areaId int32) *Area
func (*DataCenter) GetId ¶
func (p *DataCenter) GetId() int32
func (*DataCenter) Group ¶
func (p *DataCenter) Group()
func (*DataCenter) IfAbsentCreateArea ¶
func (p *DataCenter) IfAbsentCreateArea(group string) *Area
func (*DataCenter) LeaveNode ¶
func (p *DataCenter) LeaveNode(nodeId uint32) error
func (*DataCenter) NextOfRing ¶
func (p *DataCenter) NextOfRing(ring uint32) uint32
func (*DataCenter) Nodes ¶
func (p *DataCenter) Nodes() []Node
func (*DataCenter) UpdateNodeStatus ¶
func (p *DataCenter) UpdateNodeStatus(nodeId uint32, status model.NodeStatus) error
type DataCenterReplicaStrategy ¶
type DataCenterReplicaStrategy uint8
const (
DataCenterReplicaStrategy_Circle DataCenterReplicaStrategy = iota //按照一致性哈希环复制
)
type DataIterator ¶
type DataOperations ¶
type DataPipeline ¶
type DataPipeline interface { }
type DataVectorClock ¶
type DataVectorClock struct {
Clocks *Clocks
}
type EndpointSnitch ¶
type EventListener ¶
type EventListener interface {
OnTopologyChanged(event *NodeEvent)
}
type HashTable ¶
type HashTable struct {
// contains filtered or unexported fields
}
func NewHashTable ¶
func (*HashTable) Contains ¶
func (p *HashTable) Contains(data HashTableData) bool
func (*HashTable) Get ¶
func (p *HashTable) Get(data HashTableData) uint32
func (*HashTable) Put ¶
func (p *HashTable) Put(data HashTableData) uint32
type HashTableData ¶
type HashTableData interface {
Key() string
}
type HashTableNode ¶
func NewHashTableNode ¶
func NewHashTableNode(key string, hash uint32) *HashTableNode
type LocalDataOperations ¶
func (*LocalDataOperations) Migrate ¶
func (p *LocalDataOperations) Migrate(from, to int32, startRing int32, endRing int32) error
1 3 5 7 9 11 2
type Node ¶
type NodeEvent ¶
type NodeEvent struct { Type NodeEventType Node *model.Node Native interface{} }
func NewNodeEvent ¶
func NewNodeEvent(t NodeEventType, node *model.Node, native interface{}) *NodeEvent
type NodeEventType ¶
type NodeEventType uint8
const ( NodeEventType_Join NodeEventType = iota NodeEventType_Leave NodeEventType_Update )
type PacketDispatcher ¶
type PacketDispatcher interface { Dispatch(packet model.Packet) error Register(packetType model.PacketType, handler PacketHandler) error }
type PacketHandler ¶
type PacketListener ¶
type PacketMessage ¶
type PacketMessage struct {
// contains filtered or unexported fields
}
type PacketPipeline ¶
type PacketQueue ¶
type PacketQueue struct {
// contains filtered or unexported fields
}
type Partition ¶
type Partition struct { Id int32 //分区Id Index int32 DataCenter int32 //数据中心Id Area int32 //区Id Rack int32 //机架Id Node int32 //节点Id Ip string //节点Ip Port int //节点端口 Path string //分区根路径 Replicas *core.Array DiskCapacity Capacity }
func NewPartition ¶
type PartitionDiskType ¶
type PartitionDiskType uint8
const ( PartitionDiskType_Physical PartitionDiskType = iota PartitionDiskType_Cloud )
type RemoteDataOperations ¶
type RemoteDataOperations struct { }
type SyncChannel ¶
type SyncChannel struct { PacketId string // contains filtered or unexported fields }
func NewSyncChannel ¶
func NewSyncChannel(packetId string) *SyncChannel
func (*SyncChannel) Read ¶
func (p *SyncChannel) Read() <-chan *model.Packet
func (*SyncChannel) Write ¶
func (p *SyncChannel) Write(packet *model.Packet)
type TransportConfig ¶
type TransportConfig struct { Id int32 BindIp []string BindPort int AdvertisePort int Seeds []string Secret string EventListener EventListener PacketListener PacketListener }
type TransportGossip ¶
type TransportGossip struct {
// contains filtered or unexported fields
}
func NewTransportGossip ¶
func NewTransportGossip(config *TransportConfig) *TransportGossip
func (*TransportGossip) GetBroadcasts ¶
func (p *TransportGossip) GetBroadcasts(overhead, limit int) [][]byte
GetBroadcasts is called when user data messages can be broadcast. It can return a list of buffers to send. Each buffer should assume an overhead as provided with a limit on the total byte size allowed. The total byte size of the resulting data to send must not exceed the limit. Care should be taken that this method does not block, since doing so would block the entire UDP packet receive loop.
func (*TransportGossip) LocalState ¶
func (p *TransportGossip) LocalState(join bool) []byte
LocalState is used for a TCP push/pull. This is sent to the remote side in addition to the membership information. Any data can be sent here. See MergeRemoteState as well. The `join` boolean indicates this is for a join instead of a push/pull.
func (*TransportGossip) Me ¶
func (p *TransportGossip) Me() TransportInfo
func (*TransportGossip) MergeRemoteState ¶
func (p *TransportGossip) MergeRemoteState(buf []byte, join bool)
MergeRemoteState is invoked after a TCP push/pull. This is the state received from the remote side and is the result of the remote side's LocalState call. The 'join' boolean indicates this is for a join instead of a push/pull.
func (*TransportGossip) NodeMeta ¶
func (p *TransportGossip) NodeMeta(limit int) []byte
NodeMeta is used to retrieve meta-data about the current node when broadcasting an alive message. It's length is limited to the given byte size. This metadata is available in the Node structure.
func (*TransportGossip) NotifyJoin ¶
func (p *TransportGossip) NotifyJoin(n *memberlist.Node)
NotifyJoin is invoked when a node is detected to have joined. The Node argument must not be modified.
func (*TransportGossip) NotifyLeave ¶
func (p *TransportGossip) NotifyLeave(n *memberlist.Node)
NotifyLeave is invoked when a node is detected to have left. The Node argument must not be modified.
func (*TransportGossip) NotifyMsg ¶
func (p *TransportGossip) NotifyMsg(dat []byte)
NotifyMsg is called when a user-data message is received. Care should be taken that this method does not block, since doing so would block the entire UDP packet receive loop. Additionally, the byte slice may be modified after the call returns, so it should be copied if needed
func (*TransportGossip) NotifyUpdate ¶
func (p *TransportGossip) NotifyUpdate(n *memberlist.Node)
NotifyUpdate is invoked when a node is detected to have updated, usually involving the meta data. The Node argument must not be modified.
func (*TransportGossip) SendToTCP ¶
func (p *TransportGossip) SendToTCP(nodeId int32, msg []byte) error
func (*TransportGossip) SendToUDP ¶
func (p *TransportGossip) SendToUDP(nodeId int32, msg []byte) error
func (*TransportGossip) Serving ¶
func (p *TransportGossip) Serving() *TransportInfo
func (*TransportGossip) Shutdown ¶
func (p *TransportGossip) Shutdown()
type TransportInfo ¶
type TransportStatus ¶
type TransportStatus uint8
type UCluster ¶
type UCluster struct {
// contains filtered or unexported fields
}
func NewUCluster ¶
func NewUCluster(config *UClusterConfig) *UCluster
type UClusterConfig ¶
type UClusterConfig struct { Name string `json:"level" yaml:"name"` Seeds []string `json:"seeds" yaml:"seeds"` BindIp []string `json:"bind_ip" yaml:"bind_ip"` BindPort int `json:"bind_port" yaml:"bind_port"` AdvertisePort int `json:"advertise_port" yaml:"advertise_port"` ReplicaCount int `json:"replica_count" yaml:"replica_count"` Secret string `json:"secret" yaml:"secret"` Delegate ClusterDelegate `json:"-" yaml:"-"` DataDelegate ClusterDataDelegate `json:"-" yaml:"-"` MessageDelegate ClusterMessageDelegate `json:"-" yaml:"-"` }
type Warehouse ¶
type Warehouse struct { Centers *core.Array sync.RWMutex // contains filtered or unexported fields }
func NewWarehouse ¶
func (*Warehouse) Applicants ¶
func (*Warehouse) GetCenter ¶
func (p *Warehouse) GetCenter(dc int32) *DataCenter
func (*Warehouse) IfAbsentCreateDataCenter ¶
func (p *Warehouse) IfAbsentCreateDataCenter(group string) *DataCenter
func (*Warehouse) IfPresent ¶
func (p *Warehouse) IfPresent(ipv4 string) *DataCenter
func (*Warehouse) Readying ¶
func (p *Warehouse) Readying(listener WarehouseListener)
type WarehouseListener ¶
type WarehouseListener func(status WarehouseStatus)
type WarehouseStatus ¶
type WarehouseStatus uint8
const ( WarehouseStatus_Launching WarehouseStatus = iota WarehouseStatus_Normal WarehouseStatus_Node_Changed )
Source Files ¶
- cluster.go
- cluster_communication.go
- cluster_delegate.go
- cluster_event_listener.go
- cluster_node.go
- cluster_packet_handler.go
- cluster_packet_handler_event.go
- cluster_packet_handler_system.go
- cluster_packet_listener.go
- cluster_pipeline.go
- cluster_vector_clock.go
- data_center.go
- data_operations.go
- data_operations_local.go
- data_operations_remote.go
- endpoint_snitch.go
- errors.go
- hash_table.go
- packet_dispatcher.go
- packet_queue.go
- pipeline.go
- transport.go
- transport_gossip.go
- ucluster.go
- warehouse.go