Documentation ¶
Index ¶
- Constants
- Variables
- func Connect(path string, getConnection GetConnection) <-chan client.Connection
- func GeneratePoolPath(poolID string) string
- func GetHostID(leader client.Leader) (string, error)
- func GetLocalConnection(path string) (client.Connection, error)
- func GetRemoteConnection(path string) (client.Connection, error)
- func InitializeLocalClient(client *client.Client)
- func InitializeRemoteClient(client *client.Client)
- func Listen(shutdown <-chan interface{}, ready chan<- error, conn client.Connection, ...)
- func Listen2(shutdown <-chan interface{}, conn client.Connection, s Spawner)
- func Manage(shutdown <-chan interface{}, root string, l Listener2)
- func MonitorRealm(shutdown <-chan interface{}, conn client.Connection, path string) <-chan string
- func PathExists(conn client.Connection, p string) (bool, error)
- func Ready(shutdown <-chan interface{}, conn client.Connection, p string) error
- func ShutdownConnections()
- func Start(shutdown <-chan interface{}, conn client.Connection, master Listener, ...)
- func Sync(conn client.Connection, data []Node, zkpath string) error
- type GetConnection
- type HostLeader
- type LeaderListener
- type Listener
- type Listener2
- type NewListener
- type Node
- type Spawner
- type SyncHandler
- type Synchronizer
Constants ¶
const ( DefaultConnectionTimeout = time.Minute MaxDelay = 15 * time.Second )
const DefaultRetryTime = time.Minute
DefaultRetryTime is the time to retry a failed local operation
Variables ¶
var ( ErrTimeout = errors.New("connection timeout") ErrShutdown = errors.New("listener shutdown") ErrBadConn = errors.New("bad connection") )
Errors
var ErrInvalidType = errors.New("invalid type")
ErrInvalidType is the error for invalid zk data types
var (
ErrNotInitialized = errors.New("client not initialized")
)
Functions ¶
func Connect ¶
func Connect(path string, getConnection GetConnection) <-chan client.Connection
Connect generates a client connection asynchronously
func GeneratePoolPath ¶
GeneratePoolPath generates the path for a pool-based connection
func GetLocalConnection ¶
func GetLocalConnection(path string) (client.Connection, error)
GetLocalConnection acquires a connection from the local zookeeper client
func GetRemoteConnection ¶
func GetRemoteConnection(path string) (client.Connection, error)
GetRemoteConnection acquires a connection from the remote zookeeper client
func InitializeLocalClient ¶
InitializeLocalClient initializes the local zookeeper client
func InitializeRemoteClient ¶
InitializeRemoteClient initializes the remote zookeeper client
func Listen ¶
func Listen(shutdown <-chan interface{}, ready chan<- error, conn client.Connection, l Listener)
Listen initializes a listener for a particular zookeeper node shutdown: signal to shutdown the listener ready: signal to indicate that the listener has started watching its
child nodes (must set buffer size >= 1)
l: object that manages the zk interface for a specific path
func Listen2 ¶
func Listen2(shutdown <-chan interface{}, conn client.Connection, s Spawner)
Listen2 manages spawning threads to handle nodes created under the parent path.
func MonitorRealm ¶
func MonitorRealm(shutdown <-chan interface{}, conn client.Connection, path string) <-chan string
func PathExists ¶
func PathExists(conn client.Connection, p string) (bool, error)
PathExists verifies if a path exists and does not raise an exception if the path does not exist
func Ready ¶
func Ready(shutdown <-chan interface{}, conn client.Connection, p string) error
Ready waits for a node to be available for watching
func ShutdownConnections ¶
func ShutdownConnections()
ShutdownConnections closes all local and remote zookeeper connections
func Start ¶
func Start(shutdown <-chan interface{}, conn client.Connection, master Listener, listeners ...Listener)
Start starts a group of listeners that are governed by a master listener. When the master exits, it shuts down all of the child listeners and waits for all of the subprocesses to exit
Types ¶
type GetConnection ¶
type GetConnection func(string) (client.Connection, error)
GetConnection describes a generic function for acquiring a connection object
type HostLeader ¶
HostLeader is the node to store leader information for a host
func (*HostLeader) SetVersion ¶
func (node *HostLeader) SetVersion(version interface{})
SetVersion implements client.Node
func (*HostLeader) Version ¶
func (node *HostLeader) Version() interface{}
Version implements client.Node
type LeaderListener ¶
type LeaderListener struct {
// contains filtered or unexported fields
}
LeaderListener is generic watcher and broadcaster of leader types
func NewLeaderListener ¶
func NewLeaderListener(path string) *LeaderListener
NewLeaderListener instantiates a listener to watch the leader election at a given path.
func (*LeaderListener) Run ¶
func (l *LeaderListener) Run(cancel <-chan interface{}, conn client.Connection)
Run manages the event loop for this listener
func (*LeaderListener) Wait ¶
func (l *LeaderListener) Wait() <-chan struct{}
Wait enqueues a watcher that will be updated when a new leader is elected
type Listener ¶
type Listener interface { // SetConnection sets the connection object SetConnection(conn client.Connection) // GetPath concatenates the base path with whatever child nodes that are specified GetPath(nodes ...string) string // Ready verifies that the listener can start listening Ready() error // Done performs any cleanup when the listener exits Done() // Spawn is the action to be performed when a child node is found on the parent Spawn(<-chan interface{}, string) // PostProcess performs additional action based on the nodes that are in processing PostProcess(p map[string]struct{}) }
Listener is zookeeper node listener type
type Listener2 ¶
type Listener2 interface { // Listen is the method to call to start the listener Listen(cancel <-chan interface{}, conn client.Connection) // Exited does additional cleanup once shutdown is called Exited() }
Listener2 is for monitoring the listener and its connection to zookeeper
type NewListener ¶
NewListener instantiates a new listener object
type Node ¶
type Node interface { client.Node // GetID relates to the child node mapping in zookeeper GetID() string // Create creates the object in zookeeper Create(conn client.Connection) error // Update updates the object in zookeeper Update(conn client.Connection) error }
Node manages zookeeper actions
type Spawner ¶
type Spawner interface { // SetConn sets the zookeeper connection SetConn(conn client.Connection) // Path returns the parent path of the zookeeper node whose children are // the target of spawn Path() string // Pre performs a synchronous action to occur before spawn Pre() // Spawn is intended to manage individual nodes that exist from Path() Spawn(cancel <-chan struct{}, n string) // Post presents the complete list of nodes that are children of Path() for // further processing and synchronization Post(p map[string]struct{}) }
Spawner manages the spawning of individual goroutines for managing nodes under a particular zookeeper
type SyncHandler ¶
type SyncHandler interface { // GetPath gets the path to the node GetPath(...string) string // Ready implements Listener Ready() error // Done implements Listener Done() // GetConnection acquires a path-based connection GetConnection(string) (client.Connection, error) // Allocate initialized a new Node object Allocate() Node // GetAll gets all local data GetAll() ([]Node, error) // AddUpdate performs a local update AddUpdate(string, Node) (string, error) // Delete deletes a Node locally Delete(string) error }
SyncHandler is the handler for the Synchronizer
type Synchronizer ¶
type Synchronizer struct { SyncHandler // contains filtered or unexported fields }
Synchronizer is the remote synchronizer object
func NewSynchronizer ¶
func NewSynchronizer(handler SyncHandler) *Synchronizer
NewSynchronizer instantiates a new synchronizer
func (*Synchronizer) AddListener ¶
func (l *Synchronizer) AddListener(f NewListener)
AddListener creates new Listener objects based on the Synchronizer's child nodes
func (*Synchronizer) PostProcess ¶
func (l *Synchronizer) PostProcess(processing map[string]struct{})
PostProcess deletes any orphaned data that exists locally
func (*Synchronizer) SetConnection ¶
func (l *Synchronizer) SetConnection(conn client.Connection)
SetConnection implements Listener
func (*Synchronizer) Spawn ¶
func (l *Synchronizer) Spawn(shutdown <-chan interface{}, nodeID string)
Spawn starts the remote Synchronizer based on nodeID