syncmember

package module
v0.0.0-...-f89825b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2024 License: MIT Imports: 17 Imported by: 0

README

SyncMember

基于Gossip协议的成员同步组件

支持同步Key-Value数据

支持监听Key-Value数据变化

Member synchronization component based on Gossip protocol

Support synchronization of Key-Value data

Support listening for changes in Key-Value data

安装 Install
go get github.com/ciiim/syncmember
例子 Example
func main() {
    // create two nodes
    // once created, nodes can be joined
    s1 := syncmember.NewSyncMember("node1",syncmember.DefaultConfig().SetPort(9001))
    s2 := syncmember.NewSyncMember("node2",syncmember.DefaultConfig().SetPort(9002))

    defer s1.Shutdown()
    defer s2.Shutdown()

    go s1.Run()

    //join node1
    s1.Join("127.0.0.1:9002")
    s2.Run()
}
Key-Value 操作 Key-Value operation
func main() {
    s1 := ...
    s2 := ...

    s1.SetKV("key1", []byte("value1"))
    // ...
    s2.GetValue("key1") // return []byte("value1")
    // ...
    s2.DeleteKV("key1")
    // ...
    s1.GetValue("key1") // return nil
}
TODO List
  • 支持间接通信 Support Indirect communication
  • 支持kv数据持久化 Support kv data persistence

Documentation

Index

Constants

View Source
const (
	KVEventNums int8 = 3
)

Variables

View Source
var (
	//Interval
	FastPingInterval    = 200 * time.Millisecond
	NormalPingInterval  = 400 * time.Millisecond
	SlowPingInterval    = 1 * time.Second
	DefaultPingInterval = NormalPingInterval

	FastPushPullInterval    = 20 * time.Second
	NormalPushPullInterval  = 40 * time.Second
	SlowPushPullInterval    = 60 * time.Second
	DefaultPushPullInterval = NormalPushPullInterval

	FastGossipInterval    = 200 * time.Millisecond
	NormalGossipInterval  = 400 * time.Millisecond
	SlowGossipInterval    = 1 * time.Second
	DefaultGossipInterval = NormalGossipInterval

	//Ping and Goosip
	DefaultFanout        = 3
	DefaultUDPBufferSize = 1500
	DefaultPushPullNums  = 1

	//Net
	DefaultTCPTimeout = 5 * time.Second
	BindAllIP         = net.ParseIP("0.0.0.0")
	BindLoopBackIP    = net.ParseIP("127.0.0.1")
	DefaultBindPort   = 9632

	LocalAdvertiseIP     = net.ParseIP("127.0.0.1")
	DefaultAdvertisePort = DefaultBindPort

	//Log
	DefaultLogLevel  = slog.LevelInfo
	OpenLogDetail    = true
	CloseLogDetail   = false
	DefaultLogWriter = os.Stderr
)
View Source
var (
	DefaultConfig = func() *Config {
		return &Config{
			BindIP:   BindAllIP,
			BindPort: DefaultBindPort,

			AdvertisePort: DefaultAdvertisePort,

			PingInterval:     DefaultPingInterval,
			PushPullInterval: DefaultPushPullInterval,
			GossipInterval:   DefaultGossipInterval,

			TCPTimeout: DefaultTCPTimeout,

			LogDetail: CloseLogDetail,
			LogLevel:  DefaultLogLevel,
			LogWriter: DefaultLogWriter,

			Fanout: DefaultFanout,

			UDPBufferSize: DefaultUDPBufferSize,
		}

	}

	DebugConfig = func() *Config {
		return &Config{
			BindIP:   BindAllIP,
			BindPort: DefaultBindPort,

			AdvertisePort: DefaultAdvertisePort,

			PingInterval:     FastGossipInterval,
			PushPullInterval: FastPushPullInterval,
			GossipInterval:   FastGossipInterval,

			TCPTimeout: DefaultTCPTimeout,

			LogDetail: OpenLogDetail,
			LogLevel:  slog.LevelDebug,
			LogWriter: DefaultLogWriter,

			Fanout:       DefaultFanout,
			PushPullNums: DefaultPushPullNums,

			UDPBufferSize: DefaultUDPBufferSize,
		}
	}
)

Functions

This section is empty.

Types

type Address

type Address struct {
	IP net.IP

	Port int
	Name string
	// contains filtered or unexported fields
}

func (Address) String

func (a Address) String() string

type Boardcast

type Boardcast interface {
	Less(than btree.Item) bool
}

type BoardcastQueue

type BoardcastQueue struct {
	// contains filtered or unexported fields
}

func (*BoardcastQueue) GetGossipBoardcast

func (b *BoardcastQueue) GetGossipBoardcast(availableBytes int) []*Message

func (*BoardcastQueue) PutMessage

func (b *BoardcastQueue) PutMessage(msgType MessageType, name string, payload []byte)

type BytesPool

type BytesPool struct {
}

type Config

type Config struct {
	BindIP        net.IP
	BindPort      int
	AdvertiseIP   net.IP
	AdvertisePort int

	PingInterval     time.Duration
	PushPullInterval time.Duration
	GossipInterval   time.Duration

	TCPTimeout time.Duration

	LogLevel  slog.Level
	LogWriter io.Writer
	LogDetail bool

	Fanout       int
	PushPullNums int

	UDPBufferSize int
}

func (*Config) OpenLogDetail

func (c *Config) OpenLogDetail(detail bool) *Config

func (*Config) SetAdvertiserIP

func (c *Config) SetAdvertiserIP(ip string) *Config

func (*Config) SetBindIP

func (c *Config) SetBindIP(ip string) *Config

func (*Config) SetFanout

func (c *Config) SetFanout(fanout int) *Config

func (*Config) SetGossipInterval

func (c *Config) SetGossipInterval(d time.Duration) *Config

func (*Config) SetLogLevel

func (c *Config) SetLogLevel(level slog.Level) *Config

func (*Config) SetLogWriter

func (c *Config) SetLogWriter(w io.Writer) *Config

func (*Config) SetPingInterval

func (c *Config) SetPingInterval(d time.Duration) *Config

func (*Config) SetPort

func (c *Config) SetPort(port int) *Config

func (*Config) SetPushPullInterval

func (c *Config) SetPushPullInterval(d time.Duration) *Config

func (*Config) SetTCPTimeout

func (c *Config) SetTCPTimeout(d time.Duration) *Config

func (*Config) SetUDPBufferSize

func (c *Config) SetUDPBufferSize(size int) *Config

type GossipBoardcast

type GossipBoardcast struct {
	// contains filtered or unexported fields
}

func (*GossipBoardcast) Less

func (g *GossipBoardcast) Less(than btree.Item) bool

type KV

type KV kVItem

type KVEventFunc

type KVEventFunc func(kv *KV)

KVEventFunc

EventKVSet kv <- after set

EventKVDelete kv <- before delete

EventKVUpdate kv <- after update

type KVEventType

type KVEventType int8
const (
	EventKVSet KVEventType = iota
	EventKVDelete
	EventKVUpdate
)

type KeyValuePayload

type KeyValuePayload struct {
	Key   string
	Value []byte
}

func (*KeyValuePayload) Decode

func (p *KeyValuePayload) Decode(b []byte) error

func (*KeyValuePayload) Encode

func (p *KeyValuePayload) Encode() *bytes.Buffer

type Message

type Message struct {
	MsgType MessageType
	Seq     uint64 //FIXME: 暂时不起作用
	Payload []byte
}

func (*Message) GetPayload

func (m *Message) GetPayload() []byte

type MessageType

type MessageType int8
const (
	Ping MessageType = iota
	Pong

	Alive
	Dead

	KVSet
	KVDelete
	KVUpdate
)

func (MessageType) String

func (m MessageType) String() string

type Node

type Node struct {
	// contains filtered or unexported fields
}

func (*Node) Addr

func (n *Node) Addr() Address

func (*Node) GetInfo

func (n *Node) GetInfo() NodeInfoPayload

func (*Node) IsCredible

func (n *Node) IsCredible() bool

func (*Node) NodeState

func (n *Node) NodeState() NodeStateType

type NodeEventDelegate

type NodeEventDelegate interface {

	// 当新节点加入时被调用
	NotifyJoin(n *Node)

	// 当已存在的节点变为存活时被调用
	NotifyAlive(n *Node)

	// 当已存在的节点变为死亡时被调用
	NotifyDead(n *Node)
}

type NodeEventType

type NodeEventType int8
const (
	DelegateNodeAlive NodeEventType = iota
	DelegateNodeDead
)

type NodeInfoPayload

type NodeInfoPayload struct {
	Addr      Address
	NodeState NodeStateType
	Version   int64
}

func (*NodeInfoPayload) Decode

func (p *NodeInfoPayload) Decode(b []byte) error

func (*NodeInfoPayload) Encode

func (p *NodeInfoPayload) Encode() *bytes.Buffer

type NodeLocalInfo

type NodeLocalInfo struct {
	// contains filtered or unexported fields
}

type NodeStateType

type NodeStateType int8
const (
	NodeUnknown NodeStateType = iota
	NodeDead
	NodeAlive
)

type Packet

type Packet struct {
	MessageBody *Message
	From        Address
	To          Address
}

type PacketHandlerFunc

type PacketHandlerFunc func(packet *Packet)

type SyncMember

type SyncMember struct {
	// contains filtered or unexported fields
}

func NewSyncMember

func NewSyncMember(nodeName string, config *Config) *SyncMember

func (*SyncMember) AddNode

func (s *SyncMember) AddNode(node *Node)

func (*SyncMember) DeleteKV

func (s *SyncMember) DeleteKV(key string)

func (*SyncMember) GetNodeState

func (s *SyncMember) GetNodeState(addr string) NodeStateType

func (*SyncMember) GetValue

func (s *SyncMember) GetValue(key string) []byte

func (*SyncMember) Join

func (s *SyncMember) Join(addr string) error

func (*SyncMember) MergeNodes

func (s *SyncMember) MergeNodes(remote []NodeInfoPayload) error

func (*SyncMember) Node

func (s *SyncMember) Node() Address

func (*SyncMember) RemoveKVWatcher

func (s *SyncMember) RemoveKVWatcher(key string, kvEventType KVEventType)

func (*SyncMember) Run

func (s *SyncMember) Run() error

func (*SyncMember) SetKV

func (s *SyncMember) SetKV(key string, value []byte)

func (*SyncMember) SetKVWatcher

func (s *SyncMember) SetKVWatcher(key string, kvEventType KVEventType, fn KVEventFunc)

func (*SyncMember) SetNodeDelegate

func (s *SyncMember) SetNodeDelegate(delegate NodeEventDelegate)

func (*SyncMember) Shutdown

func (s *SyncMember) Shutdown()

func (*SyncMember) UpdateKV

func (s *SyncMember) UpdateKV(key string, value []byte)

func (*SyncMember) WaitKVDelete

func (s *SyncMember) WaitKVDelete(key string) <-chan []byte

WaitKVDelete 用于等待某个key被删除

return 被删除的值

会覆盖之前的watcher

func (*SyncMember) WaitKVSet

func (s *SyncMember) WaitKVSet(key string) <-chan []byte

WaitKVSet 用于等待某个key被设置

return 设置的值

会覆盖之前的watcher

func (*SyncMember) WaitKVUpdate

func (s *SyncMember) WaitKVUpdate(key string) <-chan []byte

WaitKVUpdate 用于等待某个key被更新

return 更新后的值

会覆盖之前的watcher

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL