Documentation ¶
Overview ¶
Package zk is a native Go client library for the ZooKeeper orchestration service.
Index ¶
- Constants
- Variables
- func FLWRuok(servers []string, timeout time.Duration) []bool
- func FormatServers(servers []string) []string
- func WithDialer(dialer Dialer) connOption
- func WithEventCallback(cb EventCallback) connOption
- func WithHostProvider(hostProvider HostProvider) connOption
- func WithLogInfo(logInfo bool) connOption
- func WithLogger(logger Logger) connOption
- func WithMaxBufferSize(maxBufferSize int) connOption
- func WithMaxConnBufferSize(maxBufferSize int) connOption
- type ACL
- type BatchTreeWalker
- func (w *BatchTreeWalker) Walk(visitor BatchVisitorFunc) error
- func (w *BatchTreeWalker) WalkChan(bufferSize int) <-chan VisitEvent
- func (w *BatchTreeWalker) WalkChanCtx(ctx context.Context, bufferSize int) <-chan VisitEvent
- func (w *BatchTreeWalker) WalkCtx(ctx context.Context, visitor BatchVisitorCtxFunc) error
- type BatchVisitorCtxFunc
- type BatchVisitorFunc
- type CheckVersionRequest
- type ChildrenFunc
- type Conn
- func (c *Conn) AddAuth(scheme string, auth []byte) error
- func (c *Conn) AddAuthCtx(ctx context.Context, scheme string, auth []byte) error
- func (c *Conn) AddWatch(path string, recursive bool, options ...WatcherOption) (<-chan Event, error)
- func (c *Conn) AddWatchCtx(ctx context.Context, path string, recursive bool, options ...WatcherOption) (<-chan Event, error)
- func (c *Conn) BatchWalker(path string, batchSize int) *BatchTreeWalker
- func (c *Conn) Children(path string) ([]string, *Stat, error)
- func (c *Conn) ChildrenCtx(ctx context.Context, path string) ([]string, *Stat, error)
- func (c *Conn) ChildrenW(path string) ([]string, *Stat, <-chan Event, error)
- func (c *Conn) ChildrenWCtx(ctx context.Context, path string) ([]string, *Stat, <-chan Event, error)
- func (c *Conn) Close()
- func (c *Conn) Create(path string, data []byte, flags int32, acl []ACL) (string, error)
- func (c *Conn) CreateContainer(path string, data []byte, flags int32, acl []ACL) (string, error)
- func (c *Conn) CreateContainerCtx(ctx context.Context, path string, data []byte, flags int32, acl []ACL) (string, error)
- func (c *Conn) CreateCtx(ctx context.Context, path string, data []byte, flags int32, acl []ACL) (string, error)
- func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl []ACL) (string, error)
- func (c *Conn) CreateProtectedEphemeralSequentialCtx(ctx context.Context, path string, data []byte, acl []ACL) (string, error)
- func (c *Conn) CreateTTL(path string, data []byte, flags int32, acl []ACL, ttl time.Duration) (string, error)
- func (c *Conn) CreateTTLCtx(ctx context.Context, path string, data []byte, flags int32, acl []ACL, ...) (string, error)
- func (c *Conn) Delete(path string, version int32) error
- func (c *Conn) DeleteCtx(ctx context.Context, path string, version int32) error
- func (c *Conn) Exists(path string) (bool, *Stat, error)
- func (c *Conn) ExistsCtx(ctx context.Context, path string) (bool, *Stat, error)
- func (c *Conn) ExistsW(path string) (bool, *Stat, <-chan Event, error)
- func (c *Conn) ExistsWCtx(ctx context.Context, path string) (bool, *Stat, <-chan Event, error)
- func (c *Conn) Get(path string) ([]byte, *Stat, error)
- func (c *Conn) GetACL(path string) ([]ACL, *Stat, error)
- func (c *Conn) GetACLCtx(ctx context.Context, path string) ([]ACL, *Stat, error)
- func (c *Conn) GetCtx(ctx context.Context, path string) ([]byte, *Stat, error)
- func (c *Conn) GetW(path string) ([]byte, *Stat, <-chan Event, error)
- func (c *Conn) GetWCtx(ctx context.Context, path string) ([]byte, *Stat, <-chan Event, error)
- func (c *Conn) IncrementalReconfig(joining, leaving []string, version int64) (*Stat, error)
- func (c *Conn) IncrementalReconfigCtx(ctx context.Context, joining, leaving []string, version int64) (*Stat, error)
- func (c *Conn) Multi(ops ...any) ([]MultiResponse, error)
- func (c *Conn) MultiCtx(ctx context.Context, ops ...any) ([]MultiResponse, error)
- func (c *Conn) MultiRead(ops ...any) ([]MultiResponse, error)
- func (c *Conn) MultiReadCtx(ctx context.Context, ops ...any) ([]MultiResponse, error)
- func (c *Conn) Reconfig(members []string, version int64) (*Stat, error)
- func (c *Conn) ReconfigCtx(ctx context.Context, members []string, version int64) (*Stat, error)
- func (c *Conn) RemoveWatch(ech <-chan Event) error
- func (c *Conn) RemoveWatchCtx(ctx context.Context, ech <-chan Event) error
- func (c *Conn) Server() string
- func (c *Conn) SessionID() int64
- func (c *Conn) Set(path string, data []byte, version int32) (*Stat, error)
- func (c *Conn) SetACL(path string, acl []ACL, version int32) (*Stat, error)
- func (c *Conn) SetACLCtx(ctx context.Context, path string, acl []ACL, version int32) (*Stat, error)
- func (c *Conn) SetCtx(ctx context.Context, path string, data []byte, version int32) (*Stat, error)
- func (c *Conn) SetLogger(l Logger)
- func (c *Conn) State() State
- func (c *Conn) Sync(path string) (string, error)
- func (c *Conn) SyncCtx(ctx context.Context, path string) (string, error)
- func (c *Conn) Walker(path string, order TraversalOrder) *TreeWalker
- type CreateContainerRequest
- type CreateRequest
- type CreateTTLRequest
- type DNSHostProvider
- type DeleteRequest
- type Dialer
- type ErrCode
- type Event
- type EventCallback
- type EventType
- type GetChildrenRequest
- type GetDataRequest
- type HostProvider
- type Lock
- type Logger
- type Mode
- type MultiResponse
- type PathVersionRequest
- type RefreshDNSHostProvider
- type ServerClient
- type ServerClients
- type ServerStats
- type SetDataRequest
- type Stat
- type State
- type TraversalOrder
- type TreeCache
- func (tc *TreeCache) Children(path string) ([]string, *Stat, error)
- func (tc *TreeCache) Exists(path string) (bool, *Stat, error)
- func (tc *TreeCache) Get(path string) ([]byte, *Stat, error)
- func (tc *TreeCache) Sync(ctx context.Context) (err error)
- func (tc *TreeCache) WaitForInitialSync(ctx context.Context) error
- func (tc *TreeCache) Walker(path string, order TraversalOrder) *TreeWalker
- type TreeCacheListener
- type TreeCacheListenerFuncs
- func (l *TreeCacheListenerFuncs) OnNodeCreated(path string, data []byte, stat *Stat)
- func (l *TreeCacheListenerFuncs) OnNodeDataChanged(path string, data []byte, stat *Stat)
- func (l *TreeCacheListenerFuncs) OnNodeDeleted(path string)
- func (l *TreeCacheListenerFuncs) OnNodeDeleting(path string, data []byte, stat *Stat)
- func (l *TreeCacheListenerFuncs) OnSyncError(err error)
- func (l *TreeCacheListenerFuncs) OnSyncStarted()
- func (l *TreeCacheListenerFuncs) OnSyncStopped(err error)
- func (l *TreeCacheListenerFuncs) OnTreeSynced(elapsed time.Duration)
- type TreeCacheListenerMock
- func (m *TreeCacheListenerMock) NodesCreated() map[string][]byte
- func (m *TreeCacheListenerMock) NodesDataChanged() map[string][]byte
- func (m *TreeCacheListenerMock) NodesDeleted() map[string]string
- func (m *TreeCacheListenerMock) NodesDeleting() map[string][]byte
- func (m *TreeCacheListenerMock) OnNodeCreated(path string, data []byte, stat *Stat)
- func (m *TreeCacheListenerMock) OnNodeCreatedCalled() int
- func (m *TreeCacheListenerMock) OnNodeDataChanged(path string, data []byte, stat *Stat)
- func (m *TreeCacheListenerMock) OnNodeDataChangedCalled() int
- func (m *TreeCacheListenerMock) OnNodeDeleted(path string)
- func (m *TreeCacheListenerMock) OnNodeDeletedCalled() int
- func (m *TreeCacheListenerMock) OnNodeDeleting(path string, data []byte, stat *Stat)
- func (m *TreeCacheListenerMock) OnNodeDeletingCalled() int
- func (m *TreeCacheListenerMock) OnSyncError(err error)
- func (m *TreeCacheListenerMock) OnSyncErrorCalled() int
- func (m *TreeCacheListenerMock) OnSyncStarted()
- func (m *TreeCacheListenerMock) OnSyncStartedCalled() int
- func (m *TreeCacheListenerMock) OnSyncStopped(err error)
- func (m *TreeCacheListenerMock) OnSyncStoppedCalled() int
- func (m *TreeCacheListenerMock) OnTreeSynced(elapsed time.Duration)
- func (m *TreeCacheListenerMock) OnTreeSyncedCalled() int
- type TreeCacheOption
- func WithTreeCacheAbsolutePaths(absolutePaths bool) TreeCacheOption
- func WithTreeCacheBatchSize(batchSize int) TreeCacheOption
- func WithTreeCacheIncludeData(includeData bool) TreeCacheOption
- func WithTreeCacheListener(listener TreeCacheListener) TreeCacheOption
- func WithTreeCacheLogger(logger Logger) TreeCacheOption
- func WithTreeCacheReservoirLimit(reservoirLimit int) TreeCacheOption
- type TreeWalker
- type Version
- type VisitEvent
- type VisitorCtxFunc
- type VisitorFunc
- type WatcherOption
Constants ¶
const ( // FlagEphemeral means the node is ephemeral. FlagEphemeral = 1 FlagSequence = 2 FlagTTL = 4 )
const ( // PermRead represents the permission needed to read a znode. PermRead = 1 << iota PermWrite PermCreate PermDelete PermAdmin PermAll = 0x1f )
Constants for ACL permissions
const (
// DefaultPort is the default port listened by server.
DefaultPort = 2181
)
Variables ¶
var ( // ErrConnectionClosed means the connection has been closed. ErrConnectionClosed = errors.New("zk: connection closed") ErrUnknown = errors.New("zk: unknown error") ErrAPIError = errors.New("zk: api error") ErrNoNode = errors.New("zk: node does not exist") ErrNoAuth = errors.New("zk: not authenticated") ErrBadVersion = errors.New("zk: version conflict") ErrNoChildrenForEphemerals = errors.New("zk: ephemeral nodes may not have children") ErrNodeExists = errors.New("zk: node already exists") ErrNotEmpty = errors.New("zk: node has children") ErrSessionExpired = errors.New("zk: session has been expired by the server") ErrInvalidACL = errors.New("zk: invalid ACL specified") ErrInvalidFlags = errors.New("zk: invalid flags specified") ErrAuthFailed = errors.New("zk: client authentication failed") ErrClosing = errors.New("zk: zookeeper is closing") ErrNothing = errors.New("zk: no server responses to process") ErrSessionMoved = errors.New("zk: session moved to another server, so operation is ignored") ErrReconfigDisabled = errors.New("attempts to perform a reconfiguration operation when reconfiguration feature is disabled") ErrBadArguments = errors.New("invalid arguments") ErrNoWatcher = errors.New("watcher does not exist") ErrInvalidCallback = errors.New("zk: invalid callback specified") )
var ( // ErrDeadlock is returned by Lock when trying to lock twice without unlocking first ErrDeadlock = errors.New("zk: trying to acquire a lock twice") // ErrNotLocked is returned by Unlock when trying to release a lock that has not first be acquired. ErrNotLocked = errors.New("zk: not locked") )
var ( ErrUnhandledFieldType = errors.New("zk: unhandled field type") ErrPtrExpected = errors.New("zk: encode/decode expect a non-nil pointer to struct") ErrShortBuffer = errors.New("zk: buffer too small") )
var ErrInvalidPath = errors.New("zk: invalid path")
ErrInvalidPath indicates that an operation was being attempted on an invalid path. (e.g. empty path).
var ErrNoServer = errors.New("zk: could not connect to a server")
ErrNoServer indicates that an operation cannot be completed because attempts to connect to all servers in the list failed.
var ErrPersistentWatcherStalled = fmt.Errorf("persistent watcher has stalled")
ErrPersistentWatcherStalled is passed to OnSyncError hook whenever the pump reservoir limit is hit.
Functions ¶
func FLWRuok ¶
FLWRuok is a FourLetterWord helper function. In particular, this function pulls the ruok output from each server.
func FormatServers ¶
FormatServers takes a slice of addresses, and makes sure they are in a format that resembles <addr>:<port>. If the server has no port provided, the DefaultPort constant is added to the end.
func WithDialer ¶
func WithDialer(dialer Dialer) connOption
WithDialer returns a connection option specifying a non-default Dialer.
func WithEventCallback ¶
func WithEventCallback(cb EventCallback) connOption
WithEventCallback returns a connection option that specifies an event callback. The callback must not block - doing so would delay the ZK go routines.
func WithHostProvider ¶
func WithHostProvider(hostProvider HostProvider) connOption
WithHostProvider returns a connection option specifying a non-default HostProvider.
func WithLogInfo ¶
func WithLogInfo(logInfo bool) connOption
WithLogInfo returns a connection option specifying whether or not information messages should be logged.
func WithLogger ¶
func WithLogger(logger Logger) connOption
WithLogger returns a connection option specifying a non-default Logger.
func WithMaxBufferSize ¶
func WithMaxBufferSize(maxBufferSize int) connOption
WithMaxBufferSize sets the maximum buffer size used to read and decode packets received from the Zookeeper server. The standard Zookeeper client for Java defaults to a limit of 1mb. For backwards compatibility, this Go client defaults to unbounded unless overridden via this option. A value that is zero or negative indicates that no limit is enforced.
This is meant to prevent resource exhaustion in the face of potentially malicious data in ZK. It should generally match the server setting (which also defaults ot 1mb) so that clients and servers agree on the limits for things like the size of data in an individual znode and the total size of a transaction.
For production systems, this should be set to a reasonable value (ideally that matches the server configuration). For ops tooling, it is handy to use a much larger limit, in order to do things like clean-up problematic state in the ZK tree. For example, if a single znode has a huge number of children, it is possible for the response to a "list children" operation to exceed this buffer size and cause errors in clients. The only way to subsequently clean up the tree (by removing superfluous children) is to use a client configured with a larger buffer size that can successfully query for all of the child names and then remove them. (Note there are other tools that can list all of the child names without an increased buffer size in the client, but they work by inspecting the servers' transaction logs to enumerate children instead of sending an online request to a server.
func WithMaxConnBufferSize ¶
func WithMaxConnBufferSize(maxBufferSize int) connOption
WithMaxConnBufferSize sets maximum buffer size used to send and encode packets to Zookeeper server. The standard Zookeeper client for java defaults to a limit of 1mb. This option should be used for non-standard server setup where znode is bigger than default 1mb.
Types ¶
type ACL ¶
type BatchTreeWalker ¶
type BatchTreeWalker struct {
// contains filtered or unexported fields
}
BatchTreeWalker provides traversal of a tree of nodes rooted at a specific path. It fetches children in batches to reduce the number of round trips. The batch size is configurable.
func NewBatchTreeWalker ¶
func NewBatchTreeWalker(conn *Conn, path string, batchSize int) *BatchTreeWalker
NewBatchTreeWalker returns a new BatchTreeWalker for the given connection, root path and batch size.
func (*BatchTreeWalker) Walk ¶
func (w *BatchTreeWalker) Walk(visitor BatchVisitorFunc) error
Walk begins traversing the tree and calls the visitor function for each node visited.
func (*BatchTreeWalker) WalkChan ¶
func (w *BatchTreeWalker) WalkChan(bufferSize int) <-chan VisitEvent
WalkChan begins traversing the tree and sends the results to the returned channel. The channel will be buffered with the given size. The channel is closed when the traversal is complete. If an error occurs, an error event will be sent to the channel before it is closed.
func (*BatchTreeWalker) WalkChanCtx ¶
func (w *BatchTreeWalker) WalkChanCtx(ctx context.Context, bufferSize int) <-chan VisitEvent
WalkChanCtx is like WalkChan, but it takes a context that can be used to cancel the walk.
func (*BatchTreeWalker) WalkCtx ¶
func (w *BatchTreeWalker) WalkCtx(ctx context.Context, visitor BatchVisitorCtxFunc) error
type BatchVisitorCtxFunc ¶
BatchVisitorCtxFunc is like BatchVisitorFunc, but it takes a context.
type BatchVisitorFunc ¶
BatchVisitorFunc is a function that is called for each batch of nodes visited.
type CheckVersionRequest ¶
type CheckVersionRequest PathVersionRequest
type ChildrenFunc ¶
ChildrenFunc is a function that returns the children of a node.
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
Conn is the client connection and tracks all details for communication with the server.
func Connect ¶
func Connect(servers []string, sessionTimeout time.Duration, options ...connOption) (*Conn, <-chan Event, error)
Connect establishes a new connection to a pool of zookeeper servers. The provided session timeout sets the amount of time for which a session is considered valid after losing connection to a server. Within the session timeout it's possible to reestablish a connection to a different server and keep the same session. This is means any ephemeral nodes and watches are maintained.
func ConnectWithDialer ¶
func ConnectWithDialer(servers []string, sessionTimeout time.Duration, dialer Dialer) (*Conn, <-chan Event, error)
ConnectWithDialer establishes a new connection to a pool of zookeeper servers using a custom Dialer. See Connect for further information about session timeout. This method is deprecated and provided for compatibility: use the WithDialer option instead.
func (*Conn) AddAuthCtx ¶
AddAuthCtx adds an authentication config to the connection.
func (*Conn) AddWatch ¶
func (c *Conn) AddWatch(path string, recursive bool, options ...WatcherOption) (<-chan Event, error)
AddWatch creates a persistent (optionally recursive) watch at the given path.
func (*Conn) AddWatchCtx ¶
func (c *Conn) AddWatchCtx(ctx context.Context, path string, recursive bool, options ...WatcherOption) (<-chan Event, error)
AddWatchCtx creates a persistent (optionally recursive) watch at the given path.
func (*Conn) BatchWalker ¶
func (c *Conn) BatchWalker(path string, batchSize int) *BatchTreeWalker
BatchWalker returns a new BatchTreeWalker used to traverse the tree of nodes at the given path. Nodes are traversed in breadth-first order, in batches up to the given size. This method is more efficient than Walker when the number of nodes is large.
func (*Conn) ChildrenCtx ¶
ChildrenCtx returns the children of a znode.
func (*Conn) ChildrenWCtx ¶
func (c *Conn) ChildrenWCtx(ctx context.Context, path string) ([]string, *Stat, <-chan Event, error)
ChildrenWCtx returns the children of a znode and sets a watch.
func (*Conn) Close ¶
func (c *Conn) Close()
Close will submit a close request with ZK and signal the connection to stop sending and receiving packets.
func (*Conn) Create ¶
Create creates a znode. The returned path is the new path assigned by the server, it may not be the same as the input, for example when creating a sequence znode the returned path will be the input path with a sequence number appended.
func (*Conn) CreateContainer ¶
CreateContainer creates a container znode and returns the path.
func (*Conn) CreateContainerCtx ¶
func (c *Conn) CreateContainerCtx(ctx context.Context, path string, data []byte, flags int32, acl []ACL) (string, error)
CreateContainerCtx creates a container znode and returns the path.
func (*Conn) CreateCtx ¶
func (c *Conn) CreateCtx(ctx context.Context, path string, data []byte, flags int32, acl []ACL) (string, error)
CreateCtx creates a znode. The returned path is the new path assigned by the server, it may not be the same as the input, for example when creating a sequence znode the returned path will be the input path with a sequence number appended.
func (*Conn) CreateProtectedEphemeralSequential ¶
func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl []ACL) (string, error)
CreateProtectedEphemeralSequential fixes a race condition if the server crashes after it creates the node. On reconnect the session may still be valid so the ephemeral node still exists. Therefore, on reconnect we need to check if a node with a GUID generated on create exists.
func (*Conn) CreateProtectedEphemeralSequentialCtx ¶
func (c *Conn) CreateProtectedEphemeralSequentialCtx(ctx context.Context, path string, data []byte, acl []ACL) (string, error)
CreateProtectedEphemeralSequentialCtx fixes a race condition if the server crashes after it creates the node. On reconnect the session may still be valid so the ephemeral node still exists. Therefore, on reconnect we need to check if a node with a GUID generated on create exists.
func (*Conn) CreateTTL ¶
func (c *Conn) CreateTTL(path string, data []byte, flags int32, acl []ACL, ttl time.Duration) (string, error)
CreateTTL creates a TTL znode, which will be automatically deleted by server after the TTL.
func (*Conn) CreateTTLCtx ¶
func (c *Conn) CreateTTLCtx(ctx context.Context, path string, data []byte, flags int32, acl []ACL, ttl time.Duration) (string, error)
CreateTTLCtx creates a TTL znode, which will be automatically deleted by server after the TTL.
func (*Conn) ExistsWCtx ¶
ExistsWCtx tells the existence of a znode and sets a watch.
func (*Conn) IncrementalReconfig ¶
IncrementalReconfig is the zookeeper reconfiguration api that allows adding and removing servers by lists of members. For more info refer to the ZK documentation.
An optional version allows for conditional reconfigurations, -1 ignores the condition.
Returns the new configuration znode stat.
func (*Conn) IncrementalReconfigCtx ¶
func (c *Conn) IncrementalReconfigCtx(ctx context.Context, joining, leaving []string, version int64) (*Stat, error)
IncrementalReconfigCtx is the zookeeper reconfiguration api that allows adding and removing servers by lists of members. For more info refer to the ZK documentation.
An optional version allows for conditional reconfigurations, -1 ignores the condition.
Returns the new configuration znode stat.
func (*Conn) Multi ¶
func (c *Conn) Multi(ops ...any) ([]MultiResponse, error)
Multi executes multiple ZooKeeper operations or none of them. The provided ops must be one of *CreateRequest, *DeleteRequest, *SetDataRequest, or *CheckVersionRequest.
func (*Conn) MultiCtx ¶
MultiCtx executes multiple ZooKeeper operations or none of them. The provided ops must be one of *CreateRequest, *DeleteRequest, *SetDataRequest, or *CheckVersionRequest.
func (*Conn) MultiRead ¶
func (c *Conn) MultiRead(ops ...any) ([]MultiResponse, error)
MultiRead executes multiple ZooKeeper read operations. The provided ops must be one of *GetDataRequest or *GetChildrenRequest. A MultiResponse will be returned for each op, with data or children.
func (*Conn) MultiReadCtx ¶
MultiReadCtx executes multiple ZooKeeper read operations. The provided ops must be one of *GetDataRequest or *GetChildrenRequest.
func (*Conn) Reconfig ¶
Reconfig is the non-incremental update functionality for Zookeeper where the list provided is the entire new member list. For more info refer to the ZK documentation.
An optional version allows for conditional reconfigurations, -1 ignores the condition.
Returns the new configuration znode stat.
func (*Conn) ReconfigCtx ¶
ReconfigCtx is the non-incremental update functionality for Zookeeper where the list provided is the entire new member list. For more info refer to the ZK documentation.
An optional version allows for conditional reconfigurations, -1 ignores the condition.
Returns the new configuration znode stat.
func (*Conn) RemoveWatch ¶
RemoveWatch removes a watch associated with the given channel. Note: This method works for any type of watch, not just persistent ones.
func (*Conn) RemoveWatchCtx ¶
RemoveWatchCtx removes a watch associated with the given channel. Note: This method works for any type of watch, not just persistent ones.
func (*Conn) SetLogger ¶
SetLogger sets the logger to be used for printing errors. Logger is an interface provided by this package.
func (*Conn) Sync ¶
Sync flushes the channel between process and the leader of a given znode, you may need it if you want identical views of ZooKeeper data for 2 client instances. Please refer to the "Consistency Guarantees" section of ZK document for more details.
func (*Conn) SyncCtx ¶
SyncCtx flushes the channel between process and the leader of a given znode, you may need it if you want identical views of ZooKeeper data for 2 client instances. Please refer to the "Consistency Guarantees" section of ZK document for more details.
func (*Conn) Walker ¶
func (c *Conn) Walker(path string, order TraversalOrder) *TreeWalker
Walker returns a new TreeWalker used to traverse the tree of nodes at the given path. Nodes are traversed in the specified order (depth-first or breadth-first). For large trees, use BatchWalker instead.
type CreateContainerRequest ¶
type CreateContainerRequest CreateRequest
type CreateRequest ¶
type CreateTTLRequest ¶
type DNSHostProvider ¶
type DNSHostProvider struct {
// contains filtered or unexported fields
}
DNSHostProvider is the default HostProvider. It currently matches the Java StaticHostProvider, resolving hosts from DNS once during the call to Init. It could be easily extended to re-query DNS periodically or if there is trouble connecting.
func (*DNSHostProvider) Connected ¶
func (hp *DNSHostProvider) Connected()
Connected notifies the HostProvider of a successful connection.
func (*DNSHostProvider) Init ¶
func (hp *DNSHostProvider) Init(servers []string) error
Init is called first, with the servers specified in the connection string. It uses DNS to look up addresses for each server, then shuffles them all together.
func (*DNSHostProvider) Len ¶
func (hp *DNSHostProvider) Len() int
Len returns the number of servers available
func (*DNSHostProvider) Next ¶
func (hp *DNSHostProvider) Next() (server string, retryStart bool)
Next returns the next server to connect to. retryStart will be true if we've looped through all known servers without Connected() being called.
type DeleteRequest ¶
type DeleteRequest PathVersionRequest
type ErrCode ¶
type ErrCode int32
ErrCode is the error code defined by server. Refer to ZK documentations for more specifics.
type Event ¶
type Event struct { Type EventType State State Path string // For non-session events, the path of the watched node. Err error Server string // For connection events }
Event is an Znode event sent by the server. Refer to EventType for more details.
type EventCallback ¶
type EventCallback func(Event)
EventCallback is a function that is called when an Event occurs.
type EventType ¶
type EventType int32
EventType represents the event type sent by server.
const ( EventNodeCreated EventType = 1 EventNodeDeleted EventType = 2 EventNodeDataChanged EventType = 3 EventNodeChildrenChanged EventType = 4 )
Events that can be received from the server.
type GetChildrenRequest ¶
type GetChildrenRequest pathWatchRequest
type GetDataRequest ¶
type GetDataRequest pathWatchRequest
type HostProvider ¶
type HostProvider interface { // Init is called first, with the servers specified in the connection string. Init(servers []string) error // Len returns the number of servers. Len() int // Next returns the next server to connect to. retryStart will be true if we've looped through // all known servers without Connected() being called. Next() (server string, retryStart bool) // Notify the HostProvider of a successful connection. Connected() }
HostProvider is used to represent a set of hosts a ZooKeeper client should connect to. It is an analog of the Java equivalent: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/client/HostProvider.java?view=markup
func NewRefreshDNSHostProvider ¶
func NewRefreshDNSHostProvider() HostProvider
type Lock ¶
type Lock struct {
// contains filtered or unexported fields
}
Lock is a mutual exclusion lock.
func NewLock ¶
NewLock creates a new lock instance using the provided connection, path, and acl. The path must be a node that is only used by this lock. A lock instances starts unlocked until Lock() is called.
func (*Lock) Lock ¶
Lock attempts to acquire the lock. It works like LockWithData, but it doesn't write any data to the lock node.
func (*Lock) LockCtx ¶
LockCtx attempts to acquire the lock. It works like LockWithData, but it doesn't write any data to the lock node.
func (*Lock) LockWithData ¶
LockWithData attempts to acquire the lock, writing data into the lock node. It will wait to return until the lock is acquired or an error occurs. If this instance already has the lock then ErrDeadlock is returned.
func (*Lock) LockWithDataCtx ¶
LockWithDataCtx attempts to acquire the lock, writing data into the lock node. It will wait to return until the lock is acquired or an error occurs. If this instance already has the lock then ErrDeadlock is returned.
type Logger ¶
Logger is an interface that can be implemented to provide custom log output.
var DefaultLogger Logger = defaultLogger{}
DefaultLogger uses the stdlib log package for logging.
type MultiResponse ¶
type MultiResponse struct { Path string // The path of the znode. Only set for CreateRequest. Children []string // The children of the znode. Only set for GetChildrenRequest. Stat *Stat // The stat of the znode. Only set for CreateRequest and SetDataRequest Data []byte // The data of the znode. Only set for GetDataRequest. Error error // The error of the operation. Applies to all request types. }
MultiResponse is the result of a Multi or MultiRead call.
type PathVersionRequest ¶
type RefreshDNSHostProvider ¶
type RefreshDNSHostProvider struct { DNSHostProvider // contains filtered or unexported fields }
RefreshDNSHostProvider is a wrapper around DNSHostProvider that will re-resolve server addresses:
- everytime the list of server IPs has been fully tried
- everytime we ask for a server IP for a reconnection
func (*RefreshDNSHostProvider) Connected ¶
func (hp *RefreshDNSHostProvider) Connected()
Connected notifies the HostProvider of a successful connection.
func (*RefreshDNSHostProvider) Init ¶
func (hp *RefreshDNSHostProvider) Init(servers []string) error
func (*RefreshDNSHostProvider) Next ¶
func (hp *RefreshDNSHostProvider) Next() (server string, retryStart bool)
type ServerClient ¶
type ServerClient struct { Queued int64 Received int64 Sent int64 SessionID int64 Lcxid int64 Lzxid int64 Timeout int32 LastLatency int32 MinLatency int32 AvgLatency int32 MaxLatency int32 Established time.Time LastResponse time.Time Addr string LastOperation string // maybe? Error error }
ServerClient is the information for a single Zookeeper client and its session. This is used to parse/extract the output fo the `cons` command.
type ServerClients ¶
type ServerClients struct { Clients []*ServerClient Error error }
ServerClients is a struct for the FLWCons() function. It's used to provide the list of Clients.
This is needed because FLWCons() takes multiple servers.
func FLWCons ¶
func FLWCons(servers []string, timeout time.Duration) ([]*ServerClients, bool)
FLWCons is a FourLetterWord helper function. In particular, this function pulls the ruok output from each server.
As with FLWSrvr, the boolean value indicates whether one of the requests had an issue. The Clients struct has an Error value that can be checked.
type ServerStats ¶
type ServerStats struct { Server string Sent int64 Received int64 NodeCount int64 MinLatency int64 AvgLatency float64 MaxLatency int64 Connections int64 Outstanding int64 Epoch int32 Counter int32 BuildTime time.Time Mode Mode Version string Error error }
ServerStats is the information pulled from the Zookeeper `stat` command.
func FLWSrvr ¶
func FLWSrvr(servers []string, timeout time.Duration) ([]*ServerStats, bool)
FLWSrvr is a FourLetterWord helper function. In particular, this function pulls the srvr output from the zookeeper instances and parses the output. A slice of *ServerStats structs are returned as well as a boolean value to indicate whether this function processed successfully.
If the boolean value is false there was a problem. If the *ServerStats slice is empty or nil, then the error happened before we started to obtain 'srvr' values. Otherwise, one of the servers had an issue and the "Error" value in the struct should be inspected to determine which server had the issue.
type SetDataRequest ¶
type Stat ¶
type Stat struct { Czxid int64 // The zxid of the change that caused this znode to be created. Mzxid int64 // The zxid of the change that last modified this znode. Ctime int64 // The time in milliseconds from epoch when this znode was created. Mtime int64 // The time in milliseconds from epoch when this znode was last modified. Version int32 // The number of changes to the data of this znode. Cversion int32 // The number of changes to the children of this znode. Aversion int32 // The number of changes to the ACL of this znode. EphemeralOwner int64 // The session id of the owner of this znode if the znode is an ephemeral node. If it is not an ephemeral node, it will be zero. DataLength int32 // The length of the data field of this znode. NumChildren int32 // The number of children of this znode. Pzxid int64 // last modified children }
type State ¶
type State int32
State is the session state.
const ( // StateUnknown means the session state is unknown. StateUnknown State = -1 StateDisconnected State = 0 StateConnecting State = 1 StateAuthFailed State = 4 StateConnectedReadOnly State = 5 StateSaslAuthenticated State = 6 StateExpired State = -112 StateConnected = State(100) StateHasSession = State(101) )
type TraversalOrder ¶
type TraversalOrder int
const ( // BreadthFirstOrder indicates that the tree should be traversed in breadth-first order. BreadthFirstOrder TraversalOrder = iota // DepthFirstOrder indicates that the tree should be traversed in depth-first order. DepthFirstOrder )
type TreeCache ¶
type TreeCache struct {
// contains filtered or unexported fields
}
func NewTreeCache ¶
func NewTreeCache(conn *Conn, path string, options ...TreeCacheOption) *TreeCache
func (*TreeCache) WaitForInitialSync ¶
WaitForInitialSync will wait for the cache to start and complete an initial sync of the tree. This method will return when any of the following conditions are met (whichever occurs first):
- The initial sync completes,
- The Sync() method returns before the initial sync completes, or
- The given context is cancelled / timed-out.
In cases (2) and (3), an error will be returned indicating the cause.
func (*TreeCache) Walker ¶
func (tc *TreeCache) Walker(path string, order TraversalOrder) *TreeWalker
type TreeCacheListener ¶
type TreeCacheListener interface { // OnSyncStarted is called when the tree cache has started its sync loop. OnSyncStarted() // OnSyncStopped is called when the tree cache has stopped its sync loop. // The error causing the stop is passed as an argument. OnSyncStopped(err error) // OnSyncError is called when the tree cache encounters an error during sync, prompting a retry. OnSyncError(err error) // OnTreeSynced is called when the tree cache has completed a full sync of state. // This is called once after the tree cache is started, and again after each subsequent sync cycle. // A new sync cycle is triggered by connection loss or watch failure. OnTreeSynced(elapsed time.Duration) // OnNodeCreated is called when a node is created after last full sync. OnNodeCreated(path string, data []byte, stat *Stat) // OnNodeDeleting is called when a node is about to be deleted from the cache. // This is your last chance to get the data for the node before it is deleted. // This only works if the cache is configured to include data WithTreeCacheIncludeData. // data and stat can be nil if the node was not found in the cache, // so, nil handling should be done in the listener OnNodeDeleting(path string, data []byte, stat *Stat) // OnNodeDeleted is called when a node is deleted after last full sync. OnNodeDeleted(path string) // OnNodeDataChanged is called when a node's data is changed after last full sync. OnNodeDataChanged(path string, data []byte, stat *Stat) }
TreeCacheListener is a listener for tree cache events. Events are delivered synchronously, so the listener should not block.
type TreeCacheListenerFuncs ¶
type TreeCacheListenerFuncs struct { OnSyncStartedFunc func() OnSyncStoppedFunc func(err error) OnSyncErrorFunc func(err error) OnTreeSyncedFunc func(elapsed time.Duration) OnNodeCreatedFunc func(path string, data []byte, stat *Stat) OnNodeDeletingFunc func(path string, data []byte, stat *Stat) OnNodeDeletedFunc func(path string) OnNodeDataChangedFunc func(path string, data []byte, stat *Stat) }
TreeCacheListenerFuncs is a convenience type that implements TreeCacheListener with function callbacks. Any callback that is nil is ignored.
func (*TreeCacheListenerFuncs) OnNodeCreated ¶
func (l *TreeCacheListenerFuncs) OnNodeCreated(path string, data []byte, stat *Stat)
func (*TreeCacheListenerFuncs) OnNodeDataChanged ¶
func (l *TreeCacheListenerFuncs) OnNodeDataChanged(path string, data []byte, stat *Stat)
func (*TreeCacheListenerFuncs) OnNodeDeleted ¶
func (l *TreeCacheListenerFuncs) OnNodeDeleted(path string)
func (*TreeCacheListenerFuncs) OnNodeDeleting ¶
func (l *TreeCacheListenerFuncs) OnNodeDeleting(path string, data []byte, stat *Stat)
func (*TreeCacheListenerFuncs) OnSyncError ¶
func (l *TreeCacheListenerFuncs) OnSyncError(err error)
func (*TreeCacheListenerFuncs) OnSyncStarted ¶
func (l *TreeCacheListenerFuncs) OnSyncStarted()
func (*TreeCacheListenerFuncs) OnSyncStopped ¶
func (l *TreeCacheListenerFuncs) OnSyncStopped(err error)
func (*TreeCacheListenerFuncs) OnTreeSynced ¶
func (l *TreeCacheListenerFuncs) OnTreeSynced(elapsed time.Duration)
type TreeCacheListenerMock ¶ added in v1.0.11
type TreeCacheListenerMock struct {
// contains filtered or unexported fields
}
TreeCacheListenerMock is a mock implementation of TreeCacheListener.
func NewTreeCacheListenerMock ¶ added in v1.0.11
func NewTreeCacheListenerMock() *TreeCacheListenerMock
func (*TreeCacheListenerMock) NodesCreated ¶ added in v1.0.11
func (m *TreeCacheListenerMock) NodesCreated() map[string][]byte
func (*TreeCacheListenerMock) NodesDataChanged ¶ added in v1.0.11
func (m *TreeCacheListenerMock) NodesDataChanged() map[string][]byte
func (*TreeCacheListenerMock) NodesDeleted ¶ added in v1.0.11
func (m *TreeCacheListenerMock) NodesDeleted() map[string]string
func (*TreeCacheListenerMock) NodesDeleting ¶ added in v1.0.11
func (m *TreeCacheListenerMock) NodesDeleting() map[string][]byte
func (*TreeCacheListenerMock) OnNodeCreated ¶ added in v1.0.11
func (m *TreeCacheListenerMock) OnNodeCreated(path string, data []byte, stat *Stat)
func (*TreeCacheListenerMock) OnNodeCreatedCalled ¶ added in v1.0.11
func (m *TreeCacheListenerMock) OnNodeCreatedCalled() int
func (*TreeCacheListenerMock) OnNodeDataChanged ¶ added in v1.0.11
func (m *TreeCacheListenerMock) OnNodeDataChanged(path string, data []byte, stat *Stat)
func (*TreeCacheListenerMock) OnNodeDataChangedCalled ¶ added in v1.0.11
func (m *TreeCacheListenerMock) OnNodeDataChangedCalled() int
func (*TreeCacheListenerMock) OnNodeDeleted ¶ added in v1.0.11
func (m *TreeCacheListenerMock) OnNodeDeleted(path string)
func (*TreeCacheListenerMock) OnNodeDeletedCalled ¶ added in v1.0.11
func (m *TreeCacheListenerMock) OnNodeDeletedCalled() int
func (*TreeCacheListenerMock) OnNodeDeleting ¶ added in v1.0.11
func (m *TreeCacheListenerMock) OnNodeDeleting(path string, data []byte, stat *Stat)
func (*TreeCacheListenerMock) OnNodeDeletingCalled ¶ added in v1.0.11
func (m *TreeCacheListenerMock) OnNodeDeletingCalled() int
func (*TreeCacheListenerMock) OnSyncError ¶ added in v1.0.11
func (m *TreeCacheListenerMock) OnSyncError(err error)
func (*TreeCacheListenerMock) OnSyncErrorCalled ¶ added in v1.0.11
func (m *TreeCacheListenerMock) OnSyncErrorCalled() int
func (*TreeCacheListenerMock) OnSyncStarted ¶ added in v1.0.11
func (m *TreeCacheListenerMock) OnSyncStarted()
func (*TreeCacheListenerMock) OnSyncStartedCalled ¶ added in v1.0.11
func (m *TreeCacheListenerMock) OnSyncStartedCalled() int
func (*TreeCacheListenerMock) OnSyncStopped ¶ added in v1.0.11
func (m *TreeCacheListenerMock) OnSyncStopped(err error)
func (*TreeCacheListenerMock) OnSyncStoppedCalled ¶ added in v1.0.11
func (m *TreeCacheListenerMock) OnSyncStoppedCalled() int
func (*TreeCacheListenerMock) OnTreeSynced ¶ added in v1.0.11
func (m *TreeCacheListenerMock) OnTreeSynced(elapsed time.Duration)
func (*TreeCacheListenerMock) OnTreeSyncedCalled ¶ added in v1.0.11
func (m *TreeCacheListenerMock) OnTreeSyncedCalled() int
type TreeCacheOption ¶
type TreeCacheOption func(*TreeCache)
func WithTreeCacheAbsolutePaths ¶
func WithTreeCacheAbsolutePaths(absolutePaths bool) TreeCacheOption
WithTreeCacheAbsolutePaths returns an option to use full/absolute paths in the tree cache. Normally, the cache reports paths relative to the node it is rooted at. For example, if the cache is rooted at "/foo" and "/foo/bar" is created, the cache reports the node as "/bar". With absolute paths enabled, the cache reports the node as "/foo/bar".
func WithTreeCacheBatchSize ¶
func WithTreeCacheBatchSize(batchSize int) TreeCacheOption
WithTreeCacheBatchSize returns an option to use the specified batch size in the tree cache. The batch size determines how many nodes are fetched per request during a tree walk. If the given batch size is <= 0>, the default batch size is used.
func WithTreeCacheIncludeData ¶
func WithTreeCacheIncludeData(includeData bool) TreeCacheOption
WithTreeCacheIncludeData returns an option to include data in the tree cache.
func WithTreeCacheListener ¶
func WithTreeCacheListener(listener TreeCacheListener) TreeCacheOption
WithTreeCacheListener returns an option to use the specified listener in the tree cache.
func WithTreeCacheLogger ¶
func WithTreeCacheLogger(logger Logger) TreeCacheOption
WithTreeCacheLogger returns an option that sets the logger to use for the tree cache.
func WithTreeCacheReservoirLimit ¶
func WithTreeCacheReservoirLimit(reservoirLimit int) TreeCacheOption
WithTreeCacheReservoirLimit returns an option to use the specified reservoir limit in the tree cache. The reservoir limit is the absolute maximum number of events that can be queued by watchers before being forcefully closed. If the given reservoir limit is <= 0>, the default limit is used.
type TreeWalker ¶
type TreeWalker struct {
// contains filtered or unexported fields
}
TreeWalker provides traversal of a tree of nodes rooted at a specific path.
func NewTreeWalker ¶
func NewTreeWalker(fetcher ChildrenFunc, path string, order TraversalOrder) *TreeWalker
NewTreeWalker creates a new TreeWalker with the given fetcher function and root path.
func (*TreeWalker) Walk ¶
func (w *TreeWalker) Walk(visitor VisitorFunc) error
Walk begins traversing the tree and calls the visitor function for each node visited.
func (*TreeWalker) WalkChan ¶
func (w *TreeWalker) WalkChan(bufferSize int) <-chan VisitEvent
WalkChan begins traversing the tree and sends the results to the returned channel. The channel will be buffered with the given size. The channel is closed when the traversal is complete. If an error occurs, an error event will be sent to the channel before it is closed.
func (*TreeWalker) WalkChanCtx ¶
func (w *TreeWalker) WalkChanCtx(ctx context.Context, bufferSize int) <-chan VisitEvent
WalkChanCtx is like WalkChan, but it takes a context that can be used to cancel the walk.
func (*TreeWalker) WalkCtx ¶
func (w *TreeWalker) WalkCtx(ctx context.Context, visitor VisitorCtxFunc) error
WalkCtx is like Walk, but takes a context that can be used to cancel the walk.
type Version ¶
func ParseVersion ¶
ParseVersion parses a version string into a Version struct.
func ParseVersionErr ¶
ParseVersionErr parses a version string into a Version struct; returns an error if the string is invalid.
func (Version) GreaterThan ¶
type VisitEvent ¶
VisitEvent is the event that is sent to the channel returned by various walk functions. If Err is not nil, it indicates that an error occurred while walking the tree.
type VisitorCtxFunc ¶
VisitorCtxFunc is like VisitorFunc, but it takes a context.
type VisitorFunc ¶
VisitorFunc is a function that is called for each node visited.
type WatcherOption ¶
type WatcherOption func(*watcherOptions)
WatcherOption represents an option for a watcher.
func WithStallCallback ¶
func WithStallCallback(stallCallback func()) WatcherOption
WithStallCallback returns a WatcherOption that configures a callback function for when we hit the reservoir limit.
func WithWatcherInvalidateOnDisconnect ¶
func WithWatcherInvalidateOnDisconnect() WatcherOption
WithWatcherInvalidateOnDisconnect returns a WatcherOption that configures the watcher to be invalidated on disconnect.
func WithWatcherReservoirLimit ¶
func WithWatcherReservoirLimit(reservoirLimit int) WatcherOption
WithWatcherReservoirLimit returns a WatcherOption that configures the reservoir limit for a persistent watcher. The reservoir limit is the absolute maximum number of events that can be queued before the watcher is forcefully closed. If the given reservoir lLimit is <= 0, the default value of 2048 will be used.