cluster

package
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2023 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ClusterNotifyAdd = iota
	ClusterNotifyModify
	ClusterNotifyDelete
	ClusterNotifyStateOnline
	ClusterNotifyStateOffline
)
View Source
const (
	NodeRoleServer = iota
	NodeRoleClient
)
View Source
const (
	NodeStateAlive = iota
	NodeStateLeft
	NodeStateFail
)
View Source
const CLUSUniackStore string = "uniack/"
View Source
const CLUSUnicastStore string = "unicast/"
View Source
const CONSUL_KV_BASE_URL = "http://localhost:8500/v1/kv"
View Source
const DefaultAgentGRPCPort = 18401
View Source
const DefaultControllerGRPCPort = 18400
View Source
const DefaultDataCenter string = "neuvector"
View Source
const DefaultMaxQLen = 4096
View Source
const DefaultScannerGRPCPort = 18402
View Source
const GRPCMaxMsgSize = 1024 * 1024 * 32

Variables

View Source
var ClusterNotifyName = []string{
	ClusterNotifyAdd:          "add",
	ClusterNotifyModify:       "modify",
	ClusterNotifyDelete:       "delete",
	ClusterNotifyStateOnline:  "connect",
	ClusterNotifyStateOffline: "disconnect",
}
View Source
var ErrEmptyStore error = errors.New("Empty store")
View Source
var ErrKeyNotFound error = errors.New("Key not found")
View Source
var ErrQueueFull = errors.New("Queue full")
View Source
var KVValueSizeMax = 512 * 1024

Functions

func CreateGRPCClient

func CreateGRPCClient(key, endpoint string, autoRemove bool, create CreateService) error

func Delete

func Delete(key string) error

func DeleteGRPCClient

func DeleteGRPCClient(key string)

func DeleteTree

func DeleteTree(keyPrefix string) error

func Exist

func Exist(key string) bool

func FillClusterAddrs

func FillClusterAddrs(cfg *ClusterConfig, sys *system.SystemTools) error

func ForceLeave

func ForceLeave(node string, server bool)

func Get

func Get(key string) ([]byte, error)

func GetClusterLead

func GetClusterLead() string

func GetGRPCClient

func GetGRPCClient(key string, isCompressed IsCompressedFunc, cb GRPCCallback) (interface{}, error)

func GetGRPCClientEndpoint

func GetGRPCClientEndpoint(key string) string

func GetRev

func GetRev(key string) ([]byte, uint64, error)

func GetSelfAddress

func GetSelfAddress() string

func GetStoreKeys

func GetStoreKeys(store string) ([]string, error)

func IsControllerGRPCCommpressed

func IsControllerGRPCCommpressed(endpoint string) bool

func IsEnforcerGRPCCommpressed

func IsEnforcerGRPCCommpressed(endpoint string) bool

func LeaveCluster

func LeaveCluster(server bool)

func List

func List(keyPrefix string) (consulapi.KVPairs, error)

func PauseAllWatchers

func PauseAllWatchers(includeMonitorWatch bool)

func PauseWatcher

func PauseWatcher(key string)

func Put

func Put(key string, value []byte) error

func PutBinary

func PutBinary(key string, value []byte) error

func PutIfNotExist

func PutIfNotExist(key string, value []byte, logKeyOnly bool) error

The difference between putRev(k, v, 0) and PutIfNotExist(k, v) is the later return nil error when the key exists

func PutQuiet

func PutQuiet(key string, value []byte) error

func PutQuietRev

func PutQuietRev(key string, value []byte, rev uint64) error

func PutRev

func PutRev(key string, value []byte, rev uint64) error

func RegisterKeyWatcher

func RegisterKeyWatcher(key string, f KeyWatcher)

func RegisterLeadChangeWatcher

func RegisterLeadChangeWatcher(fn LeadChangeCallback, lead string)

func RegisterNodeWatcher

func RegisterNodeWatcher(f NodeWatcher)

func RegisterStateWatcher

func RegisterStateWatcher(f StateWatcher)

func RegisterStoreWatcher

func RegisterStoreWatcher(store string, f StoreWatcher, bCongestCtl bool)

func RegisterWatcherMonitor

func RegisterWatcherMonitor(failFunc func() bool, recoverFunc func())

func ResolveJoinAndBindAddr

func ResolveJoinAndBindAddr(joinAddr string, sys *system.SystemTools) (string, string, error)

func ResumeAllWatchers

func ResumeAllWatchers()

func ResumeWatcher

func ResumeWatcher(key string)

func SetLogLevel

func SetLogLevel(level log.Level)

func SetWatcherCongestionCtl

func SetWatcherCongestionCtl(key string, enabled bool)

func StartCluster

func StartCluster(cc *ClusterConfig) (string, error)

Types

type CLUSUnicast

type CLUSUnicast struct {
	Expect string `json:"expect"`
	Data   []byte `json:"data"`
}

type ClusterConfig

type ClusterConfig struct {
	ID       string
	Server   bool
	Debug    bool
	Ifaces   map[string][]share.CLUSIPAddr
	JoinAddr string

	BindAddr      string
	AdvertiseAddr string
	DataCenter    string
	RPCPort       uint
	LANPort       uint
	WANPort       uint
	EnableDebug   bool
	// contains filtered or unexported fields
}

type ClusterDriver

type ClusterDriver interface {
	Start(cc *ClusterConfig, eCh chan error, recover bool)
	Join(cc *ClusterConfig) error
	Leave(server bool) error
	ForceLeave(node string, server bool) error
	Reload(cc *ClusterConfig) error

	GetSelfAddress() string
	GetLead() (string, error)
	ServerAlive() (bool, error)
	GetAllMembers() []ClusterMemberInfo

	NewLock(key string, wait time.Duration) (LockInterface, error)
	NewSession(name string, ttl time.Duration) (SessionInterface, error)

	// KV
	Exist(key string) bool
	Get(key string) ([]byte, error)
	GetRev(key string) ([]byte, uint64, error)
	GetStoreKeys(store string) ([]string, error)
	Put(key string, value []byte) error
	PutRev(key string, value []byte, rev uint64) error
	PutIfNotExist(key string, value []byte) error
	Delete(key string) error
	List(keyPrefix string) (consulapi.KVPairs, error)
	DeleteTree(keyPrefix string) error
	Transact([]transactEntry) (bool, error)

	// Watcher
	RegisterKeyWatcher(key string, watcher KeyWatcher)
	RegisterStoreWatcher(store string, watcher StoreWatcher, bCongestCtl bool)
	RegisterStateWatcher(watcher StateWatcher)
	RegisterNodeWatcher(watcher NodeWatcher)
	RegisterWatcherMonitor(failFunc func() bool, recoverFunc func())
	RegisterExistingWatchers()

	StopAllWatchers()
	PauseAllWatchers(includeMonitorWatch bool)
	ResumeAllWatchers()
	PauseWatcher(key string)
	ResumeWatcher(key string)
	SetWatcherCongestionCtl(key string, enabled bool)
}

type ClusterMemberInfo

type ClusterMemberInfo struct {
	Name  string
	Role  int
	State int
}

func GetAllMembers

func GetAllMembers() []ClusterMemberInfo

type ClusterNotifyType

type ClusterNotifyType int

type ClusterTransact

type ClusterTransact struct {
	// contains filtered or unexported fields
}

func Transact

func Transact() *ClusterTransact

func (*ClusterTransact) Apply

func (t *ClusterTransact) Apply() (bool, error)

func (*ClusterTransact) CheckRev

func (t *ClusterTransact) CheckRev(key string, rev uint64)

func (*ClusterTransact) Close

func (t *ClusterTransact) Close()

func (*ClusterTransact) Delete

func (t *ClusterTransact) Delete(key string)

func (*ClusterTransact) DeleteRev

func (t *ClusterTransact) DeleteRev(key string, rev uint64)

func (*ClusterTransact) HasData

func (t *ClusterTransact) HasData() bool

func (*ClusterTransact) Put

func (t *ClusterTransact) Put(key string, value []byte)

func (*ClusterTransact) PutBinary

func (t *ClusterTransact) PutBinary(key string, value []byte)

func (*ClusterTransact) PutQuiet

func (t *ClusterTransact) PutQuiet(key string, value []byte)

func (*ClusterTransact) PutRev

func (t *ClusterTransact) PutRev(key string, value []byte, rev uint64)

func (*ClusterTransact) Reset

func (t *ClusterTransact) Reset()

func (*ClusterTransact) Size

func (t *ClusterTransact) Size() int

type CreateService

type CreateService func(conn *grpc.ClientConn) Service

type GRPCCallback

type GRPCCallback interface {
	Shutdown()
}

type GRPCClient

type GRPCClient struct {
	// contains filtered or unexported fields
}

func (*GRPCClient) Close

func (c *GRPCClient) Close()

func (*GRPCClient) GetClient

func (c *GRPCClient) GetClient() *grpc.ClientConn

type GRPCServer

type GRPCServer struct {
	// contains filtered or unexported fields
}

func NewGRPCServerTCP

func NewGRPCServerTCP(endpoint string) (*GRPCServer, error)

func NewGRPCServerUnix

func NewGRPCServerUnix(socket string) (*GRPCServer, error)

func (*GRPCServer) GetServer

func (s *GRPCServer) GetServer() *grpc.Server

func (*GRPCServer) GracefulStop

func (s *GRPCServer) GracefulStop()

func (*GRPCServer) Start

func (s *GRPCServer) Start()

func (*GRPCServer) Stop

func (s *GRPCServer) Stop()

type IsCompressedFunc

type IsCompressedFunc func(endpoint string) bool

type KeyWatcher

type KeyWatcher func(ClusterNotifyType, string, []byte, uint64)

type LeadChangeCallback

type LeadChangeCallback func(string, string)

type LockInterface

type LockInterface interface {
	Lock(stopCh <-chan struct{}) (<-chan struct{}, error)
	Unlock() error
	Key() string
}

func NewLock

func NewLock(key string, wait time.Duration) (LockInterface, error)

type MessengerInterface

type MessengerInterface interface {
	Unicast(target string, subject string, data []byte,
		cb UnicastCallback, timeout int, args ...interface{}) error
	UnicastStore(target string) string
	UnicastKey2Subject(key string) string
	UniackStore() string
	UniackUpdateHandler(nType ClusterNotifyType, key string, value []byte, unused uint64)
}

func NewMessenger

func NewMessenger(hostID string, devID string) MessengerInterface

type MockEvQueue

type MockEvQueue struct {
}

func (*MockEvQueue) Append

func (evq *MockEvQueue) Append(interface{}) error

func (*MockEvQueue) Flush

func (evq *MockEvQueue) Flush() error

type MockMessenger

type MockMessenger struct {
}

func (*MockMessenger) UniackStore

func (msgr *MockMessenger) UniackStore() string

func (*MockMessenger) UniackUpdateHandler

func (msgr *MockMessenger) UniackUpdateHandler(nType ClusterNotifyType, key string, value []byte, unused uint64)

func (*MockMessenger) Unicast

func (msgr *MockMessenger) Unicast(target string, subject string, data []byte,
	cb UnicastCallback, timeout int, args ...interface{}) error

func (*MockMessenger) UnicastKey2Subject

func (msgr *MockMessenger) UnicastKey2Subject(key string) string

func (*MockMessenger) UnicastStore

func (msgr *MockMessenger) UnicastStore(target string) string

type NodeWatcher

type NodeWatcher func(ClusterNotifyType, string, string)

type ObjectQueueInterface

type ObjectQueueInterface interface {
	Append(obj interface{}) error
	Flush() error
}

func NewObjectQueue

func NewObjectQueue(key string, maxQLen int) ObjectQueueInterface

type Service

type Service interface{}

* ---------------------------------------------------- * ---------- Client management ----------------------- * ----------------------------------------------------

type SessionInterface

type SessionInterface interface {
	Associate(key string) error
	Disassociate(key string) error
}

Session is a mechanism to implement short-lived keys. When the session is created, a TTL value is given. Keys are "associated" with the session will be deleted when the session expires.

func NewSession

func NewSession(name string, ttl time.Duration) (SessionInterface, error)

type StateWatcher

type StateWatcher func(ClusterNotifyType, string, string)

type StoreWatcher

type StoreWatcher func(ClusterNotifyType, string, []byte, uint64)

type UnicastCallback

type UnicastCallback func(string, []byte, ...interface{})

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL