Documentation
¶
Index ¶
- Constants
- Variables
- func Connect(path string, getConnection GetConnection) <-chan client.Connection
- func GeneratePoolPath(poolID string) string
- func GetHostID(leader client.Leader) (string, error)
- func GetLocalConnection(path string) (client.Connection, error)
- func GetRemoteConnection(path string) (client.Connection, error)
- func InitializeLocalClient(client *client.Client)
- func InitializeRemoteClient(client *client.Client)
- func Listen(shutdown <-chan interface{}, ready chan<- error, conn client.Connection, ...)
- func MonitorRealm(shutdown <-chan interface{}, conn client.Connection, path string) <-chan string
- func NewHostLeader(conn client.Connection, hostID, realm, path string) client.Leader
- func PathExists(conn client.Connection, p string) (bool, error)
- func Ready(shutdown <-chan interface{}, conn client.Connection, p string) error
- func ShutdownConnections()
- func Start(shutdown <-chan interface{}, conn client.Connection, master Listener, ...)
- func Sync(conn client.Connection, data []Node, zkpath string) error
- type GetConnection
- type HostLeader
- type Listener
- type NewListener
- type Node
- type SyncHandler
- type Synchronizer
- type ZZKTestSuite
Constants ¶
const (
DefaultConnectionTimeout = time.Minute
)
const DefaultRetryTime = time.Minute
DefaultRetryTime is the time to retry a failed local operation
const ZKTestTimeout = 5 * time.Second
NOTE: this constant can be adjusted to satisfy race conditions
Variables ¶
var ( ErrTimeout = errors.New("connection timeout") ErrShutdown = errors.New("listener shutdown") )
Errors
var ErrInvalidType = errors.New("invalid type")
ErrInvalidType is the error for invalid zk data types
var (
ErrNotInitialized = errors.New("client not initialized")
)
Functions ¶
func Connect ¶
func Connect(path string, getConnection GetConnection) <-chan client.Connection
Connect generates a client connection asynchronously
func GeneratePoolPath ¶
GeneratePoolPath generates the path for a pool-based connection
func GetLocalConnection ¶
func GetLocalConnection(path string) (client.Connection, error)
GetLocalConnection acquires a connection from the local zookeeper client
func GetRemoteConnection ¶
func GetRemoteConnection(path string) (client.Connection, error)
GetRemoteConnection acquires a connection from the remote zookeeper client
func InitializeLocalClient ¶
InitializeLocalClient initializes the local zookeeper client
func InitializeRemoteClient ¶
InitializeRemoteClient initializes the remote zookeeper client
func Listen ¶
func Listen(shutdown <-chan interface{}, ready chan<- error, conn client.Connection, l Listener)
Listen initializes a listener for a particular zookeeper node shutdown: signal to shutdown the listener ready: signal to indicate that the listener has started watching its
child nodes (must set buffer size >= 1)
l: object that manages the zk interface for a specific path
func MonitorRealm ¶
func MonitorRealm(shutdown <-chan interface{}, conn client.Connection, path string) <-chan string
func NewHostLeader ¶
func NewHostLeader(conn client.Connection, hostID, realm, path string) client.Leader
NewHostLeader initializes a new host leader
func PathExists ¶
func PathExists(conn client.Connection, p string) (bool, error)
PathExists verifies if a path exists and does not raise an exception if the path does not exist
func Ready ¶
func Ready(shutdown <-chan interface{}, conn client.Connection, p string) error
Ready waits for a node to be available for watching
func ShutdownConnections ¶
func ShutdownConnections()
ShutdownConnections closes all local and remote zookeeper connections
func Start ¶
func Start(shutdown <-chan interface{}, conn client.Connection, master Listener, listeners ...Listener)
Start starts a group of listeners that are governed by a master listener. When the master exits, it shuts down all of the child listeners and waits for all of the subprocesses to exit
Types ¶
type GetConnection ¶
type GetConnection func(string) (client.Connection, error)
GetConnection describes a generic function for acquiring a connection object
type HostLeader ¶
HostLeader is the node to store leader information for a host
func (*HostLeader) SetVersion ¶
func (node *HostLeader) SetVersion(version interface{})
SetVersion implements client.Node
func (*HostLeader) Version ¶
func (node *HostLeader) Version() interface{}
Version implements client.Node
type Listener ¶
type Listener interface { // SetConnection sets the connection object SetConnection(conn client.Connection) // GetPath concatenates the base path with whatever child nodes that are specified GetPath(nodes ...string) string // Ready verifies that the listener can start listening Ready() error // Done performs any cleanup when the listener exits Done() // Spawn is the action to be performed when a child node is found on the parent Spawn(<-chan interface{}, string) // PostProcess performs additional action based on the nodes that are in processing PostProcess(p map[string]struct{}) }
Listener is zookeeper node listener type
type NewListener ¶
NewListener instantiates a new listener object
type Node ¶
type Node interface { client.Node // GetID relates to the child node mapping in zookeeper GetID() string // Create creates the object in zookeeper Create(conn client.Connection) error // Update updates the object in zookeeper Update(conn client.Connection) error }
Node manages zookeeper actions
type SyncHandler ¶
type SyncHandler interface { // GetPath gets the path to the node GetPath(...string) string // Ready implements Listener Ready() error // Done implements Listener Done() // GetConnection acquires a path-based connection GetConnection(string) (client.Connection, error) // Allocate initialized a new Node object Allocate() Node // GetAll gets all local data GetAll() ([]Node, error) // AddUpdate performs a local update AddUpdate(string, Node) (string, error) // Delete deletes a Node locally Delete(string) error }
SyncHandler is the handler for the Synchronizer
type Synchronizer ¶
type Synchronizer struct { SyncHandler // contains filtered or unexported fields }
Synchronizer is the remote synchronizer object
func NewSynchronizer ¶
func NewSynchronizer(handler SyncHandler) *Synchronizer
NewSynchronizer instantiates a new synchronizer
func (*Synchronizer) AddListener ¶
func (l *Synchronizer) AddListener(f NewListener)
AddListener creates new Listener objects based on the Synchronizer's child nodes
func (*Synchronizer) PostProcess ¶
func (l *Synchronizer) PostProcess(processing map[string]struct{})
PostProcess deletes any orphaned data that exists locally
func (*Synchronizer) SetConnection ¶
func (l *Synchronizer) SetConnection(conn client.Connection)
SetConnection implements Listener
func (*Synchronizer) Spawn ¶
func (l *Synchronizer) Spawn(shutdown <-chan interface{}, nodeID string)
Spawn starts the remote Synchronizer based on nodeID
type ZZKTestSuite ¶
type ZZKTestSuite struct {
isvcs.ManagerTestSuite
}
func (*ZZKTestSuite) Create ¶
func (t *ZZKTestSuite) Create(c *C)
func (*ZZKTestSuite) Destroy ¶
func (t *ZZKTestSuite) Destroy(c *C)
func (*ZZKTestSuite) GetService ¶
func (t *ZZKTestSuite) GetService(c *C) *isvcs.IService
func (*ZZKTestSuite) SetUp ¶
func (t *ZZKTestSuite) SetUp(c *C)
func (*ZZKTestSuite) SetUpSuite ¶
func (t *ZZKTestSuite) SetUpSuite(c *C)