Documentation ¶
Overview ¶
Curator.go is a Golang porting for Curator, make it easy to access Zookeeper
Learn ZooKeeper ¶
Curator.go users are assumed to know ZooKeeper. A good place to start is http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html
Using Curator ¶
The Curator.go are available from github.com.
$ go get github.com/flier/curator.go
You can easily include Curator.go into your code.
import ( "github.com/flier/curator.go" )
Getting a Connection ¶
Curator uses Fluent Style. If you haven't used this before, it might seem odd so it's suggested that you familiarize yourself with the style.
Curator connection instances (CuratorFramework) are allocated from the CuratorFrameworkBuilder. You only need one CuratorFramework object for each ZooKeeper cluster you are connecting to:
curator.NewClient(connString, retryPolicy)
This will create a connection to a ZooKeeper cluster using default values. The only thing that you need to specify is the retry policy. For most cases, you should use:
retryPolicy := curator.NewExponentialBackoffRetry(time.Second, 3, 15*time.Second) client := curator.NewClient(connString, retryPolicy) client.Start() defer client.Close()
The client must be started (and closed when no longer needed).
Calling ZooKeeper Directly ¶
Once you have a CuratorFramework instance, you can make direct calls to ZooKeeper in a similar way to using the raw ZooKeeper object provided in the ZooKeeper distribution. E.g.:
client.Create().ForPathWithData(path, payload)
The benefit here is that Curator manages the ZooKeeper connection and will retry operations if there are connection problems.
Recipes ¶
Distributed Lock
lock := curator.NewInterProcessMutex(client, lockPath) if ( lock.Acquire(maxWait, waitUnit) ) { defer lock.Release() // do some work inside of the critical section here }
Leader Election
listener := curator.NewLeaderSelectorListener(func(CuratorFramework client) error { // this callback will get called when you are the leader // do whatever leader work you need to and only exit // this method when you want to relinquish leadership })) selector := curator.NewLeaderSelector(client, path, listener) selector.AutoRequeue() // not required, but this is behavior that you will probably expect selector.Start()
Generic API ¶
Curator provides generic API for builder
type Pathable[T] interface { // Commit the currently building operation using the given path ForPath(path string) (T, error) } type PathAndBytesable[T] interface { Pathable[T] // Commit the currently building operation using the given path and data ForPathWithData(path string, payload []byte) (T, error) } type Compressible[T] interface { // Cause the data to be compressed using the configured compression provider Compressed() T } type Decompressible[T] interface { // Cause the data to be de-compressed using the configured compression provider Decompressed() T } type CreateModable[T] interface { // Set a create mode - the default is CreateMode.PERSISTENT WithMode(mode CreateMode) T } type ACLable[T] interface { // Set an ACL list WithACL(acl ...zk.ACL) T } type Versionable[T] interface { // Use the given version (the default is -1) WithVersion(version int) T } type Statable[T] interface { // Have the operation fill the provided stat object StoringStatIn(*zk.Stat) T } type ParentsCreatable[T] interface { // Causes any parent nodes to get created if they haven't already been CreatingParentsIfNeeded() T } type ChildrenDeletable[T] interface { // Will also delete children if they exist. DeletingChildrenIfNeeded() T } type Watchable[T] interface { // Have the operation set a watch Watched() T // Set a watcher for the operation UsingWatcher(watcher Watcher) T } type Backgroundable[T] interface { // Perform the action in the background InBackground() T // Perform the action in the background InBackgroundWithContext(context interface{}) T // Perform the action in the background InBackgroundWithCallback(callback BackgroundCallback) T // Perform the action in the background InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) T }
Index ¶
- Constants
- Variables
- func CloseQuietly(closeable Closeable) (err error)
- func DeleteChildren(conn ZookeeperConnection, path string, deleteSelf bool) error
- func FixForNamespace(namespace, path string, isSequential bool) (string, error)
- func GetNodeFromPath(path string) string
- func JoinPath(parent string, children ...string) string
- func MakeDirs(conn ZookeeperConnection, path string, makeLastNode bool, ...) error
- func NewCuratorZookeeperClient(zookeeperDialer ZookeeperDialer, ensembleProvider EnsembleProvider, ...) *curatorZookeeperClient
- func NewEnsurePath(path string) *ensurePath
- func NewEnsurePathWithAcl(path string, aclProvider ACLProvider) *ensurePath
- func NewEnsurePathWithAclAndHelper(path string, aclProvider ACLProvider, helper EnsurePathHelper) *ensurePath
- func ValidatePath(path string) error
- type ACLProvider
- type AtomicBool
- type AuthInfo
- type BackgroundCallback
- type CheckExistsBuilder
- type Closeable
- type CompressionProvider
- type ConnectionState
- type ConnectionStateListenable
- type ConnectionStateListener
- type CreateBuilder
- type CreateMode
- type CuratorEvent
- type CuratorEventType
- type CuratorFramework
- type CuratorFrameworkBuilder
- func (b *CuratorFrameworkBuilder) Authorization(scheme string, auth []byte) *CuratorFrameworkBuilder
- func (b *CuratorFrameworkBuilder) Build() CuratorFramework
- func (b *CuratorFrameworkBuilder) Compression(name string) *CuratorFrameworkBuilder
- func (b *CuratorFrameworkBuilder) ConnectString(connectString string) *CuratorFrameworkBuilder
- type CuratorListenable
- type CuratorListener
- type CuratorZookeeperClient
- type DefaultZookeeperDialer
- type DeleteBuilder
- type EnsembleProvider
- type EnsurePath
- type EnsurePathHelper
- type ExponentialBackoffRetry
- type FixedEnsembleProvider
- type GetACLBuilder
- type GetChildrenBuilder
- type GetDataBuilder
- type GzipCompressionProvider
- type LZ4CompressionProvider
- type Listenable
- type ListenerContainer
- type OperationType
- type PathAndNode
- type RetryLoop
- type RetryNTimes
- type RetryOneTime
- type RetryPolicy
- type RetrySleeper
- type RetryUntilElapsed
- type SetACLBuilder
- type SetDataBuilder
- type SleepingRetry
- type State
- type SyncBuilder
- type Tracer
- type TracerDriver
- type Transaction
- type TransactionBridge
- type TransactionCheckBuilder
- type TransactionCreateBuilder
- type TransactionDeleteBuilder
- type TransactionFinal
- type TransactionResult
- type TransactionSetDataBuilder
- type UnhandledErrorListenable
- type UnhandledErrorListener
- type Watcher
- type Watchers
- type ZookeeperConnection
- type ZookeeperDialFunc
- type ZookeeperDialer
Constants ¶
const ( PERSISTENT CreateMode = 0 PERSISTENT_SEQUENTIAL = zk.FlagSequence EPHEMERAL = zk.FlagEphemeral EPHEMERAL_SEQUENTIAL = zk.FlagEphemeral + zk.FlagSequence )
const ( DEFAULT_SESSION_TIMEOUT = 60 * time.Second DEFAULT_CONNECTION_TIMEOUT = 15 * time.Second DEFAULT_CLOSE_WAIT = 1 * time.Second )
const ( MAX_RETRIES_LIMIT = 29 DEFAULT_MAX_SLEEP time.Duration = time.Duration(math.MaxInt32 * int64(time.Second)) )
const AnyVersion int32 = -1
const MAX_BACKGROUND_ERRORS = 10
const (
PATH_SEPARATOR = "/"
)
const STATE_QUEUE_SIZE = 25
Variables ¶
var ( OPEN_ACL_UNSAFE = zk.WorldACL(zk.PermAll) CREATOR_ALL_ACL = zk.AuthACL(zk.PermAll) READ_ACL_UNSAFE = zk.WorldACL(zk.PermRead) )
var ( ErrConnectionClosed = zk.ErrConnectionClosed ErrUnknown = zk.ErrUnknown ErrAPIError = zk.ErrAPIError ErrNoNode = zk.ErrNoNode ErrNoAuth = zk.ErrNoAuth ErrBadVersion = zk.ErrBadVersion ErrNoChildrenForEphemerals = zk.ErrNoChildrenForEphemerals ErrNodeExists = zk.ErrNodeExists ErrNotEmpty = zk.ErrNotEmpty ErrSessionExpired = zk.ErrSessionExpired ErrInvalidACL = zk.ErrInvalidACL ErrAuthFailed = zk.ErrAuthFailed ErrClosing = zk.ErrClosing ErrNothing = zk.ErrNothing ErrSessionMoved = zk.ErrSessionMoved )
var ( EventNodeCreated = zk.EventNodeCreated EventNodeDeleted = zk.EventNodeDeleted EventNodeDataChanged = zk.EventNodeDataChanged EventNodeChildrenChanged = zk.EventNodeChildrenChanged )
var ( ErrConnectionLoss = errors.New("connection loss") ErrTimeout = errors.New("timeout") )
var ( CompressionProviders = map[string]CompressionProvider{ "gzip": NewGzipCompressionProvider(), "lz4": NewLZ4CompressionProvider(), } )
var CuratorEventTypeNames = []string{"CREATE", "DELETE", "EXISTS", "GET_DATA", "SET_DATA", "CHILDREN", "SYNC", "GET_ACL", "SET_ACL", "WATCHED", "CLOSING"}
Functions ¶
func CloseQuietly ¶
func DeleteChildren ¶
func DeleteChildren(conn ZookeeperConnection, path string, deleteSelf bool) error
Recursively deletes children of a node.
func FixForNamespace ¶
Apply the namespace to the given path
func GetNodeFromPath ¶
Given a full path, return the node name. i.e. "/one/two/three" will return "three"
func MakeDirs ¶
func MakeDirs(conn ZookeeperConnection, path string, makeLastNode bool, aclProvider ACLProvider) error
Make sure all the nodes in the path are created
func NewCuratorZookeeperClient ¶
func NewCuratorZookeeperClient(zookeeperDialer ZookeeperDialer, ensembleProvider EnsembleProvider, sessionTimeout, connectionTimeout time.Duration, watcher Watcher, retryPolicy RetryPolicy, canReadOnly bool, authInfos []AuthInfo) *curatorZookeeperClient
func NewEnsurePath ¶
func NewEnsurePath(path string) *ensurePath
func NewEnsurePathWithAcl ¶
func NewEnsurePathWithAcl(path string, aclProvider ACLProvider) *ensurePath
func NewEnsurePathWithAclAndHelper ¶
func NewEnsurePathWithAclAndHelper(path string, aclProvider ACLProvider, helper EnsurePathHelper) *ensurePath
Types ¶
type ACLProvider ¶
type ACLProvider interface { // Return the ACL list to use by default GetDefaultAcl() []zk.ACL // Return the ACL list to use for the given path GetAclForPath(path string) []zk.ACL }
func NewDefaultACLProvider ¶
func NewDefaultACLProvider() ACLProvider
type AtomicBool ¶
type AtomicBool int32
const ( FALSE AtomicBool = iota TRUE )
func NewAtomicBool ¶
func NewAtomicBool(b bool) AtomicBool
func (*AtomicBool) CompareAndSwap ¶
func (b *AtomicBool) CompareAndSwap(oldValue, newValue bool) bool
func (*AtomicBool) Load ¶
func (b *AtomicBool) Load() bool
func (*AtomicBool) Set ¶
func (b *AtomicBool) Set(v bool)
func (*AtomicBool) Swap ¶
func (b *AtomicBool) Swap(v bool) bool
type BackgroundCallback ¶
type BackgroundCallback func(client CuratorFramework, event CuratorEvent) error
Called when the async background operation completes
type CheckExistsBuilder ¶
type CheckExistsBuilder interface { // Pathable[T] // // Commit the currently building operation using the given path ForPath(path string) (*zk.Stat, error) // Watchable[T] // // Have the operation set a watch Watched() CheckExistsBuilder // Set a watcher for the operation UsingWatcher(watcher Watcher) CheckExistsBuilder // Backgroundable[T] // // Perform the action in the background InBackground() CheckExistsBuilder // Perform the action in the background InBackgroundWithContext(context interface{}) CheckExistsBuilder // Perform the action in the background InBackgroundWithCallback(callback BackgroundCallback) CheckExistsBuilder // Perform the action in the background InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) CheckExistsBuilder }
type Closeable ¶
type Closeable interface { // Closes this and releases any system resources associated with it. Close() error }
A Closeable is a source or destination of data that can be closed.
type CompressionProvider ¶
type ConnectionState ¶
type ConnectionState int32
const ( UNKNOWN ConnectionState = iota CONNECTED // Sent for the first successful connection to the server. SUSPENDED // There has been a loss of connection. Leaders, locks, etc. RECONNECTED // A suspended, lost, or read-only connection has been re-established LOST // The connection is confirmed to be lost. Close any locks, leaders, etc. READ_ONLY // The connection has gone into read-only mode. )
func (ConnectionState) Connected ¶
func (s ConnectionState) Connected() bool
func (ConnectionState) String ¶
func (s ConnectionState) String() string
type ConnectionStateListenable ¶
type ConnectionStateListenable interface { Listenable /* [T] */ AddListener(listener ConnectionStateListener) RemoveListener(listener ConnectionStateListener) }
type ConnectionStateListener ¶
type ConnectionStateListener interface { // Called when there is a state change in the connection StateChanged(client CuratorFramework, newState ConnectionState) }
func NewConnectionStateListener ¶
func NewConnectionStateListener(callback connectionStateListenerCallback) ConnectionStateListener
type CreateBuilder ¶
type CreateBuilder interface { // PathAndBytesable[T] // // Commit the currently building operation using the given path ForPath(path string) (string, error) // Commit the currently building operation using the given path and data ForPathWithData(path string, payload []byte) (string, error) // ParentsCreatable[T] // // Causes any parent nodes to get created if they haven't already been CreatingParentsIfNeeded() CreateBuilder // CreateModable[T] // // Set a create mode - the default is CreateMode.PERSISTENT WithMode(mode CreateMode) CreateBuilder // ACLable[T] // // Set an ACL list WithACL(acls ...zk.ACL) CreateBuilder // Compressible[T] // // Cause the data to be compressed using the configured compression provider Compressed() CreateBuilder // Backgroundable[T] // // Perform the action in the background InBackground() CreateBuilder // Perform the action in the background InBackgroundWithContext(context interface{}) CreateBuilder // Perform the action in the background InBackgroundWithCallback(callback BackgroundCallback) CreateBuilder // Perform the action in the background InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) CreateBuilder }
type CreateMode ¶
type CreateMode int32
func (CreateMode) IsEphemeral ¶
func (m CreateMode) IsEphemeral() bool
func (CreateMode) IsSequential ¶
func (m CreateMode) IsSequential() bool
type CuratorEvent ¶
type CuratorEvent interface { // check here first - this value determines the type of event and which methods will have valid values Type() CuratorEventType // "rc" from async callbacks Err() error // the path Path() string // the context object passed to Backgroundable.InBackground(interface{}) Context() interface{} // any stat Stat() *zk.Stat // any data Data() []byte // any name Name() string // any children Children() []string // any ACL list or null ACLs() []zk.ACL WatchedEvent() *zk.Event }
A super set of all the various Zookeeper events/background methods.
type CuratorEventType ¶
type CuratorEventType int
const ( CREATE CuratorEventType = iota // CuratorFramework.Create() -> Err(), Path(), Data() DELETE // CuratorFramework.Delete() -> Err(), Path() EXISTS // CuratorFramework.CheckExists() -> Err(), Path(), Stat() GET_DATA // CuratorFramework.GetData() -> Err(), Path(), Stat(), Data() SET_DATA // CuratorFramework.SetData() -> Err(), Path(), Stat() CHILDREN // CuratorFramework.GetChildren() -> Err(), Path(), Stat(), Children() SYNC // CuratorFramework.Sync() -> Err(), Path() GET_ACL // CuratorFramework.GetACL() -> Err(), Path() SET_ACL // CuratorFramework.SetACL() -> Err(), Path() WATCHED // Watchable.UsingWatcher() -> WatchedEvent() CLOSING // Event sent when client is being closed )
func (CuratorEventType) String ¶
func (t CuratorEventType) String() string
type CuratorFramework ¶
type CuratorFramework interface { // Start the client. // Most mutator methods will not work until the client is started Start() error // Stop the client Close() error // Returns the state of this instance State() State // Return true if the client is started, not closed, etc. Started() bool // Start a create builder Create() CreateBuilder // Start a delete builder Delete() DeleteBuilder // Start an exists builder CheckExists() CheckExistsBuilder // Start a get data builder GetData() GetDataBuilder // Start a set data builder SetData() SetDataBuilder // Start a get children builder GetChildren() GetChildrenBuilder // Start a get ACL builder GetACL() GetACLBuilder // Start a set ACL builder SetACL() SetACLBuilder // Start a transaction builder InTransaction() Transaction // Perform a sync on the given path - syncs are always in the background DoSync(path string, backgroundContextObject interface{}) // Start a sync builder. Note: sync is ALWAYS in the background even if you don't use one of the background() methods Sync() SyncBuilder // Returns the listenable interface for the Connect State ConnectionStateListenable() ConnectionStateListenable // Returns the listenable interface for events CuratorListenable() CuratorListenable // Returns the listenable interface for unhandled errors UnhandledErrorListenable() UnhandledErrorListenable // Returns a facade of the current instance that does _not_ automatically pre-pend the namespace to all paths NonNamespaceView() CuratorFramework // Returns a facade of the current instance that uses the specified namespace // or no namespace if newNamespace is empty. UsingNamespace(newNamespace string) CuratorFramework // Return the current namespace or "" if none Namespace() string // Return the managed zookeeper client ZookeeperClient() CuratorZookeeperClient // Allocates an ensure path instance that is namespace aware NewNamespaceAwareEnsurePath(path string) EnsurePath // Block until a connection to ZooKeeper is available. BlockUntilConnected() error // Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded BlockUntilConnectedTimeout(maxWaitTime time.Duration) error }
Zookeeper framework-style client
func NewClient ¶
func NewClient(connString string, retryPolicy RetryPolicy) CuratorFramework
Create a new client with default session timeout and default connection timeout
func NewClientTimeout ¶
func NewClientTimeout(connString string, sessionTimeout, connectionTimeout time.Duration, retryPolicy RetryPolicy) CuratorFramework
Create a new client
type CuratorFrameworkBuilder ¶
type CuratorFrameworkBuilder struct { AuthInfos []AuthInfo // the connection authorization ZookeeperDialer ZookeeperDialer // the zookeeper dialer to use EnsembleProvider EnsembleProvider // the list ensemble provider. DefaultData []byte // the data to use when PathAndBytesable.ForPath(String) is used. Namespace string // as ZooKeeper is a shared space, users of a given cluster should stay within a pre-defined namespace SessionTimeout time.Duration // the session timeout ConnectionTimeout time.Duration // the connection timeout MaxCloseWait time.Duration // the time to wait during close to wait background tasks RetryPolicy RetryPolicy // the retry policy to use CompressionProvider CompressionProvider // the compression provider AclProvider ACLProvider // the provider for ACLs CanBeReadOnly bool // allow ZooKeeper client to enter read only mode in case of a network partition. }
func (*CuratorFrameworkBuilder) Authorization ¶
func (b *CuratorFrameworkBuilder) Authorization(scheme string, auth []byte) *CuratorFrameworkBuilder
Add connection authorization
func (*CuratorFrameworkBuilder) Build ¶
func (b *CuratorFrameworkBuilder) Build() CuratorFramework
Apply the current values and build a new CuratorFramework
func (*CuratorFrameworkBuilder) Compression ¶
func (b *CuratorFrameworkBuilder) Compression(name string) *CuratorFrameworkBuilder
Add compression provider
func (*CuratorFrameworkBuilder) ConnectString ¶
func (b *CuratorFrameworkBuilder) ConnectString(connectString string) *CuratorFrameworkBuilder
Set the list of servers to connect to.
type CuratorListenable ¶
type CuratorListenable interface { Listenable /* [T] */ AddListener(listener CuratorListener) RemoveListener(listener CuratorListener) }
type CuratorListener ¶
type CuratorListener interface { // Called when a background task has completed or a watch has triggered EventReceived(client CuratorFramework, event CuratorEvent) error }
Receives notifications about errors and background events
func NewCuratorListener ¶
func NewCuratorListener(callback curatorListenerCallback) CuratorListener
type CuratorZookeeperClient ¶
type CuratorZookeeperClient interface { // Return the managed ZK connection. Conn() (ZookeeperConnection, error) // Return the current retry policy RetryPolicy() RetryPolicy // Return a new retry loop. All operations should be performed in a retry loop NewRetryLoop() RetryLoop // Returns true if the client is current connected Connected() bool // This method blocks until the connection to ZK succeeds. BlockUntilConnectedOrTimedOut() error // Must be called after created Start() error // Close the client Close() error // Start a new tracer StartTracer(name string) Tracer }
A wrapper around Zookeeper that takes care of some low-level housekeeping
type DefaultZookeeperDialer ¶
type DeleteBuilder ¶
type DeleteBuilder interface { // Pathable[T] // // Commit the currently building operation using the given path ForPath(path string) error // ChildrenDeletable[T] // // Will also delete children if they exist. DeletingChildrenIfNeeded() DeleteBuilder // Versionable[T] // // Use the given version (the default is -1) WithVersion(version int32) DeleteBuilder // Backgroundable[T] // // Perform the action in the background InBackground() DeleteBuilder // Perform the action in the background InBackgroundWithContext(context interface{}) DeleteBuilder // Perform the action in the background InBackgroundWithCallback(callback BackgroundCallback) DeleteBuilder // Perform the action in the background InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) DeleteBuilder }
type EnsembleProvider ¶
type EnsembleProvider interface { // Curator will call this method when CuratorZookeeperClient.Start() is called Start() error // Curator will call this method when CuratorZookeeperClient.Close() is called Close() error // Return the current connection string to use ConnectionString() string }
Abstraction that provides the ZooKeeper connection string
type EnsurePath ¶
type EnsurePath interface { // First time, synchronizes and makes sure all nodes in the path are created. // Subsequent calls with this instance are NOPs. Ensure(client CuratorZookeeperClient) error // Returns a view of this EnsurePath instance that does not make the last node. ExcludingLast() EnsurePath }
type EnsurePathHelper ¶
type EnsurePathHelper interface {
Ensure(client CuratorZookeeperClient, path string, makeLastNode bool) error
}
type ExponentialBackoffRetry ¶
type ExponentialBackoffRetry struct {
SleepingRetry
}
Retry policy that retries a set number of times with increasing sleep time between retries
type FixedEnsembleProvider ¶
type FixedEnsembleProvider struct {
// contains filtered or unexported fields
}
Standard ensemble provider that wraps a fixed connection string
func NewFixedEnsembleProvider ¶
func NewFixedEnsembleProvider(connectString string) *FixedEnsembleProvider
func (*FixedEnsembleProvider) Close ¶
func (p *FixedEnsembleProvider) Close() error
func (*FixedEnsembleProvider) ConnectionString ¶
func (p *FixedEnsembleProvider) ConnectionString() string
func (*FixedEnsembleProvider) Start ¶
func (p *FixedEnsembleProvider) Start() error
type GetACLBuilder ¶
type GetACLBuilder interface { // Pathable[T] // // Commit the currently building operation using the given path ForPath(path string) ([]zk.ACL, error) // Statable[T] // // Have the operation fill the provided stat object StoringStatIn(stat *zk.Stat) GetACLBuilder // Backgroundable[T] // // Perform the action in the background InBackground() GetACLBuilder // Perform the action in the background InBackgroundWithContext(context interface{}) GetACLBuilder // Perform the action in the background InBackgroundWithCallback(callback BackgroundCallback) GetACLBuilder // Perform the action in the background InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) GetACLBuilder }
type GetChildrenBuilder ¶
type GetChildrenBuilder interface { // Pathable[T] // // Commit the currently building operation using the given path ForPath(path string) ([]string, error) // Statable[T] // // Have the operation fill the provided stat object StoringStatIn(stat *zk.Stat) GetChildrenBuilder // Watchable[T] // // Have the operation set a watch Watched() GetChildrenBuilder // Set a watcher for the operation UsingWatcher(watcher Watcher) GetChildrenBuilder // Backgroundable[T] // // Perform the action in the background InBackground() GetChildrenBuilder // Perform the action in the background InBackgroundWithContext(context interface{}) GetChildrenBuilder // Perform the action in the background InBackgroundWithCallback(callback BackgroundCallback) GetChildrenBuilder // Perform the action in the background InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) GetChildrenBuilder }
type GetDataBuilder ¶
type GetDataBuilder interface { // Pathable[T] // // Commit the currently building operation using the given path ForPath(path string) ([]byte, error) // Decompressible[T] // // Cause the data to be de-compressed using the configured compression provider Decompressed() GetDataBuilder // Statable[T] // // Have the operation fill the provided stat object StoringStatIn(stat *zk.Stat) GetDataBuilder // Watchable[T] // // Have the operation set a watch Watched() GetDataBuilder // Set a watcher for the operation UsingWatcher(watcher Watcher) GetDataBuilder // Backgroundable[T] // // Perform the action in the background InBackground() GetDataBuilder // Perform the action in the background InBackgroundWithContext(context interface{}) GetDataBuilder // Perform the action in the background InBackgroundWithCallback(callback BackgroundCallback) GetDataBuilder // Perform the action in the background InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) GetDataBuilder }
type GzipCompressionProvider ¶
type GzipCompressionProvider struct {
// contains filtered or unexported fields
}
func NewGzipCompressionProvider ¶
func NewGzipCompressionProvider() *GzipCompressionProvider
func NewGzipCompressionProviderWithLevel ¶
func NewGzipCompressionProviderWithLevel(level int) *GzipCompressionProvider
func (*GzipCompressionProvider) Compress ¶
func (c *GzipCompressionProvider) Compress(path string, data []byte) ([]byte, error)
func (*GzipCompressionProvider) Decompress ¶
func (c *GzipCompressionProvider) Decompress(path string, compressedData []byte) ([]byte, error)
type LZ4CompressionProvider ¶
type LZ4CompressionProvider struct{}
func NewLZ4CompressionProvider ¶
func NewLZ4CompressionProvider() *LZ4CompressionProvider
func (*LZ4CompressionProvider) Compress ¶
func (c *LZ4CompressionProvider) Compress(path string, data []byte) ([]byte, error)
func (*LZ4CompressionProvider) Decompress ¶
func (c *LZ4CompressionProvider) Decompress(path string, compressedData []byte) ([]byte, error)
type Listenable ¶
type Listenable interface { Len() int Clear() ForEach(callback func(interface{})) }
Abstracts a listenable object
type ListenerContainer ¶
type ListenerContainer struct {
// contains filtered or unexported fields
}
func (*ListenerContainer) Add ¶
func (c *ListenerContainer) Add(listener interface{})
func (*ListenerContainer) Clear ¶
func (c *ListenerContainer) Clear()
func (*ListenerContainer) ForEach ¶
func (c *ListenerContainer) ForEach(callback func(interface{}))
func (*ListenerContainer) Len ¶
func (c *ListenerContainer) Len() int
func (*ListenerContainer) Remove ¶
func (c *ListenerContainer) Remove(listener interface{})
type OperationType ¶
type OperationType int
Transaction operation types
const ( OP_CREATE OperationType = iota OP_DELETE OP_SET_DATA OP_CHECK )
type PathAndNode ¶
type PathAndNode struct {
Path, Node string
}
func SplitPath ¶
func SplitPath(path string) (*PathAndNode, error)
Given a full path, return the the individual parts, without slashes.
type RetryLoop ¶
type RetryLoop interface { // creates a retry loop calling the given proc and retrying if needed CallWithRetry(proc func() (interface{}, error)) (interface{}, error) }
Mechanism to perform an operation on Zookeeper that is safe against disconnections and "recoverable" errors.
type RetryNTimes ¶
type RetryNTimes struct {
SleepingRetry
}
Retry policy that retries a max number of times
func NewRetryNTimes ¶
func NewRetryNTimes(n int, sleepBetweenRetries time.Duration) *RetryNTimes
type RetryOneTime ¶
type RetryOneTime struct {
RetryNTimes
}
A retry policy that retries only once
func NewRetryOneTime ¶
func NewRetryOneTime(sleepBetweenRetry time.Duration) *RetryOneTime
type RetryPolicy ¶
type RetryPolicy interface { // Called when an operation has failed for some reason. // This method should return true to make another attempt. AllowRetry(retryCount int, elapsedTime time.Duration, sleeper RetrySleeper) bool }
Abstracts the policy to use when retrying connections
type RetrySleeper ¶
Abstraction for retry policies to sleep
var DefaultRetrySleeper RetrySleeper = &defaultRetrySleeper{}
type RetryUntilElapsed ¶
type RetryUntilElapsed struct { SleepingRetry // contains filtered or unexported fields }
A retry policy that retries until a given amount of time elapses
func NewRetryUntilElapsed ¶
func NewRetryUntilElapsed(maxElapsedTime, sleepBetweenRetries time.Duration) *RetryUntilElapsed
func (*RetryUntilElapsed) AllowRetry ¶
func (r *RetryUntilElapsed) AllowRetry(retryCount int, elapsedTime time.Duration, sleeper RetrySleeper) bool
type SetACLBuilder ¶
type SetACLBuilder interface { // Pathable[T] // // Commit the currently building operation using the given path ForPath(path string) (*zk.Stat, error) // ACLable[T] // // Set an ACL list WithACL(acls ...zk.ACL) SetACLBuilder // Versionable[T] // // Use the given version (the default is -1) WithVersion(version int32) SetACLBuilder // Backgroundable[T] // // Perform the action in the background InBackground() SetACLBuilder // Perform the action in the background InBackgroundWithContext(context interface{}) SetACLBuilder // Perform the action in the background InBackgroundWithCallback(callback BackgroundCallback) SetACLBuilder // Perform the action in the background InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) SetACLBuilder }
type SetDataBuilder ¶
type SetDataBuilder interface { // PathAndBytesable[T] // // Commit the currently building operation using the given path ForPath(path string) (*zk.Stat, error) // Commit the currently building operation using the given path and data ForPathWithData(path string, payload []byte) (*zk.Stat, error) // Versionable[T] // // Use the given version (the default is -1) WithVersion(version int32) SetDataBuilder // Compressible[T] // // Cause the data to be compressed using the configured compression provider Compressed() SetDataBuilder // Backgroundable[T] // // Perform the action in the background InBackground() SetDataBuilder // Perform the action in the background InBackgroundWithContext(context interface{}) SetDataBuilder // Perform the action in the background InBackgroundWithCallback(callback BackgroundCallback) SetDataBuilder // Perform the action in the background InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) SetDataBuilder }
type SleepingRetry ¶
type SleepingRetry struct { RetryPolicy N int // contains filtered or unexported fields }
func (*SleepingRetry) AllowRetry ¶
func (r *SleepingRetry) AllowRetry(retryCount int, elapsedTime time.Duration, sleeper RetrySleeper) bool
type SyncBuilder ¶
type SyncBuilder interface { // Pathable[T] // // Commit the currently building operation using the given path ForPath(path string) (string, error) // Backgroundable[T] // // Perform the action in the background InBackground() SyncBuilder // Perform the action in the background InBackgroundWithContext(context interface{}) SyncBuilder // Perform the action in the background InBackgroundWithCallback(callback BackgroundCallback) SyncBuilder // Perform the action in the background InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) SyncBuilder }
type TracerDriver ¶
type TracerDriver interface { // Record the given trace event AddTime(name string, d time.Duration) // Add to a named counter AddCount(name string, increment int) }
Mechanism for timing methods and recording counters
type Transaction ¶
type Transaction interface { // Start a create builder in the transaction Create() TransactionCreateBuilder // Start a delete builder in the transaction Delete() TransactionDeleteBuilder // Start a set data builder in the transaction SetData() TransactionSetDataBuilder // Start a check builder in the transaction Check() TransactionCheckBuilder }
Transactional/atomic operations.
The general form for this interface is:
curator.InTransaction().operation().arguments().ForPath(...). And().more-operations. And().Commit()
Here's an example that creates two nodes in a transaction
curator.InTransaction(). Create().ForPathWithData("/path-one", path-one-data). And().Create().ForPathWithData("/path-two", path-two-data). And().Commit()
<b>Important:</b> the operations are not submitted until CuratorTransactionFinal.Commit() is called.
type TransactionBridge ¶
type TransactionBridge interface { TransactionFinal And() TransactionFinal }
Syntactic sugar to make the fluent interface more readable
type TransactionCheckBuilder ¶
type TransactionCheckBuilder interface { // Pathable[T] // // Commit the currently building operation using the given path ForPath(path string) TransactionBridge // Versionable[T] // // Use the given version (the default is -1) WithVersion(version int32) TransactionCheckBuilder }
type TransactionCreateBuilder ¶
type TransactionCreateBuilder interface { // PathAndBytesable[T] // // Commit the currently building operation using the given path ForPath(path string) TransactionBridge // Commit the currently building operation using the given path and data ForPathWithData(path string, payload []byte) TransactionBridge // CreateModable[T] // // Set a create mode - the default is CreateMode.PERSISTENT WithMode(mode CreateMode) TransactionCreateBuilder // ACLable[T] // // Set an ACL list WithACL(acls ...zk.ACL) TransactionCreateBuilder // Compressible[T] // // Cause the data to be compressed using the configured compression provider Compressed() TransactionCreateBuilder }
type TransactionDeleteBuilder ¶
type TransactionDeleteBuilder interface { // Pathable[T] // // Commit the currently building operation using the given path ForPath(path string) TransactionBridge // Versionable[T] // // Use the given version (the default is -1) WithVersion(version int32) TransactionDeleteBuilder }
type TransactionFinal ¶
type TransactionFinal interface { Transaction // Commit all added operations as an atomic unit and return results for the operations. // One result is returned for each operation added. // Further, the ordering of the results matches the ordering that the operations were added. Commit() ([]TransactionResult, error) }
Adds commit to the transaction interface
type TransactionResult ¶
type TransactionResult struct { Type OperationType ForPath string ResultPath string ResultStat *zk.Stat }
Holds the result of one transactional operation
type TransactionSetDataBuilder ¶
type TransactionSetDataBuilder interface { // PathAndBytesable[T] // // Commit the currently building operation using the given path ForPath(path string) TransactionBridge // Commit the currently building operation using the given path and data ForPathWithData(path string, payload []byte) TransactionBridge // Versionable[T] // // Use the given version (the default is -1) WithVersion(version int32) TransactionSetDataBuilder // Compressible[T] // // Cause the data to be compressed using the configured compression provider Compressed() TransactionSetDataBuilder }
type UnhandledErrorListenable ¶
type UnhandledErrorListenable interface { Listenable /* [T] */ AddListener(listener UnhandledErrorListener) RemoveListener(listener UnhandledErrorListener) }
type UnhandledErrorListener ¶
type UnhandledErrorListener interface { // Called when an exception is caught in a background thread, handler, etc. UnhandledError(err error) }
func NewUnhandledErrorListener ¶
func NewUnhandledErrorListener(callback unhandledErrorListenerCallback) UnhandledErrorListener
type Watcher ¶
type Watcher interface {
// contains filtered or unexported methods
}
func NewWatcher ¶
type Watchers ¶
type Watchers struct {
// contains filtered or unexported fields
}
func NewWatchers ¶
type ZookeeperConnection ¶
type ZookeeperConnection interface { // Add the specified scheme:auth information to this connection. AddAuth(scheme string, auth []byte) error // Close this connection Close() // Create a node with the given path. // // The node data will be the given data, and node acl will be the given acl. Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error) // Return the stat of the node of the given path. Return nil if no such a node exists. Exists(path string) (bool, *zk.Stat, error) ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error) // Delete the node with the given path. // // The call will succeed if such a node exists, // and the given version matches the node's version // (if the given version is -1, it matches any node's versions). Delete(path string, version int32) error // Return the data and the stat of the node of the given path. Get(path string) ([]byte, *zk.Stat, error) GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event, error) // Set the ACL for the node of the given path // if such a node exists and the given version matches the version of the node. // Return the stat of the node. Set(path string, data []byte, version int32) (*zk.Stat, error) // Return the list of the children of the node of the given path. Children(path string) ([]string, *zk.Stat, error) ChildrenW(path string) ([]string, *zk.Stat, <-chan zk.Event, error) // Return the ACL and stat of the node of the given path. GetACL(path string) ([]zk.ACL, *zk.Stat, error) // Set the ACL for the node of the given path // if such a node exists and the given version matches the version of the node. // Return the stat of the node. SetACL(path string, acl []zk.ACL, version int32) (*zk.Stat, error) // Executes multiple ZooKeeper operations or none of them. Multi(ops ...interface{}) ([]zk.MultiResponse, error) // Flushes channel between process and leader. Sync(path string) (string, error) }
type ZookeeperDialFunc ¶
type ZookeeperDialer ¶
type ZookeeperDialer interface {
Dial(connString string, sessionTimeout time.Duration, canBeReadOnly bool) (ZookeeperConnection, <-chan zk.Event, error)
}
Allocate a new ZooKeeper connection
func NewZookeeperDialer ¶
func NewZookeeperDialer(dial ZookeeperDialFunc) ZookeeperDialer