zk

package
v0.0.0-...-ad1a918 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2014 License: BSD-3-Clause Imports: 23 Imported by: 0

Documentation

Overview

Emulate a "global" namespace across n zk quorums.

Index

Constants

View Source
const (
	DISCONNECTED = 0
	CONNECTING   = 1
	CONNECTED    = 2
)
View Source
const (
	// PERM_DIRECTORY are default permissions for a node.
	PERM_DIRECTORY = zookeeper.PERM_ADMIN | zookeeper.PERM_CREATE | zookeeper.PERM_DELETE | zookeeper.PERM_READ | zookeeper.PERM_WRITE
	// PERM_FILE allows a zk node to emulate file behavior by disallowing child nodes.
	PERM_FILE = zookeeper.PERM_ADMIN | zookeeper.PERM_READ | zookeeper.PERM_WRITE
)
View Source
const (
	DEFAULT_BASE_TIMEOUT = 5 * time.Second
)
View Source
const (
	DEFAULT_MAX_RETRIES = 3
)

Variables

View Source
var (
	// DefaultZkConfigPaths is the default list of config files to check.
	DefaultZkConfigPaths = []string{"/etc/zookeeper/zk_client.json"}
	// MagicPrefix is the Default name for the root note in the zookeeper tree.
	MagicPrefix = "zk"
)
View Source
var (
	// This error is returned by functions that wait for a result
	// when they are interrupted.
	ErrInterrupted = errors.New("zkutil: obtaining lock was interrupted")

	// This error is returned by functions that wait for a result
	// when the timeout value is reached.
	ErrTimeout = errors.New("zkutil: obtaining lock timed out")
)
View Source
var ErrConnectionClosed = errors.New("ZkConn: connection is closed")

ErrConnectionClosed is returned if we try to access a closed connection.

Functions

func ChildrenRecursive

func ChildrenRecursive(zconn Conn, zkPath string) ([]string, error)

func CreateOrUpdate

func CreateOrUpdate(zconn Conn, zkPath, value string, flags int, aclv []zookeeper.ACL, recursive bool) (pathCreated string, err error)

func CreatePidNode

func CreatePidNode(zconn Conn, zkPath string, contents string, done chan struct{}) error

Close the release channel when you want to clean up nicely.

func CreateRecursive

func CreateRecursive(zconn Conn, zkPath, value string, flags int, aclv []zookeeper.ACL) (pathCreated string, err error)

Create a path and any pieces required, think mkdir -p. Intermediate znodes are always created empty.

func DeleteRecursive

func DeleteRecursive(zconn Conn, zkPath string, version int) error

func GetZkSubprocessFlags

func GetZkSubprocessFlags() []string

GetZkSubprocessFlags returns the flags necessary to run a sub-process that would connect to the same zk servers as this process (assuming the environment of the sub-process is the same as ours)

func GuessLocalCell

func GuessLocalCell() string

Read the cell from -zk.local-cell, or the environment ZK_CLIENT_LOCAL_CELL or guess the cell by the hostname. This is either the first two characters or the character before a dash '-'.

func IsDirectory

func IsDirectory(aclv []zookeeper.ACL) bool

IsDirectory returns if this node should be treated as a directory.

func ObtainQueueLock

func ObtainQueueLock(zconn Conn, zkPath string, wait time.Duration, interrupted chan struct{}) error

The lexically lowest node is the lock holder - verify that this path holds the lock. Call this queue-lock because the semantics are a hybrid. Normal zookeeper locks make assumptions about sequential numbering that don't hold when the data in a lock is modified. if the provided 'interrupted' chan is closed, we'll just stop waiting and return an interruption error

func RegisterZkReader

func RegisterZkReader(zkReader ZkReader)

helper method to register the server (does interface checking)

func ResolveWildcards

func ResolveWildcards(zconn Conn, zkPaths []string) ([]string, error)

resolve paths like: /zk/nyc/vt/tablets/*/action /zk/global/vt/keyspaces/*/shards/*/action /zk/*/vt/tablets/*/action into real existing paths

If you send paths that don't contain any wildcard and don't exist, this function will return an empty array.

func ZkCellFromZkPath

func ZkCellFromZkPath(zkPath string) (string, error)

func ZkKnownCells

func ZkKnownCells(useCache bool) []string

returns all the known cells, alphabetically ordered. It will include 'global' if there is a dc-specific global cell or a global cell

func ZkPathToZkAddr

func ZkPathToZkAddr(zkPath string, useCache bool) (string, error)

Types

type ChangeFunc

type ChangeFunc func(oldValue string, oldStat Stat) (newValue string, err error)

type Conn

type Conn interface {
	Get(path string) (data string, stat Stat, err error)
	GetW(path string) (data string, stat Stat, watch <-chan zookeeper.Event, err error)

	Children(path string) (children []string, stat Stat, err error)
	ChildrenW(path string) (children []string, stat Stat, watch <-chan zookeeper.Event, err error)

	Exists(path string) (stat Stat, err error)
	ExistsW(path string) (stat Stat, watch <-chan zookeeper.Event, err error)

	Create(path, value string, flags int, aclv []zookeeper.ACL) (pathCreated string, err error)

	Set(path, value string, version int) (stat Stat, err error)

	Delete(path string, version int) (err error)

	Close() error

	RetryChange(path string, flags int, acl []zookeeper.ACL, changeFunc ChangeFunc) error

	ACL(path string) ([]zookeeper.ACL, Stat, error)
	SetACL(path string, aclv []zookeeper.ACL, version int) error
}

This interface is really close to the zookeeper connection interface. It uses the Stat interface defined here instead of the zookeeper.Stat structure for stats. Everything else is the same as in zookeeper. So refer to the zookeeper docs for the conventions used here (for instance, using -1 as version to specify any version)

type ConnCache

type ConnCache struct {
	// contains filtered or unexported fields
}

func NewConnCache

func NewConnCache(useZkocc bool) *ConnCache

func (*ConnCache) Close

func (cc *ConnCache) Close() error

func (*ConnCache) ConnForPath

func (cc *ConnCache) ConnForPath(zkPath string) (cn Conn, err error)

func (*ConnCache) String

func (cc *ConnCache) String() string

Implements expvar.Var()

type ElectorTask

type ElectorTask interface {
	Run() error
	Stop()
	// Return true if interrupted, false if it died of natural causes.
	// An interrupted task indicates that the election should stop.
	Interrupted() bool
}

ElectorTask is the interface for a task that runs essentially forever or until something bad happens. If a task must be stopped, it should be handled promptly - no second notification will be sent.

type GlobalConn

type GlobalConn struct {
	// contains filtered or unexported fields
}

func Dial

func Dial(serverAddrs []string, recvTimeout time.Duration) (*GlobalConn, <-chan zookeeper.Event, error)

func (*GlobalConn) Children

func (gzc *GlobalConn) Children(path string) (children []string, stat Stat, err error)

func (*GlobalConn) Close

func (gzc *GlobalConn) Close() (err error)

func (*GlobalConn) Create

func (gzc *GlobalConn) Create(path, value string, flags int, aclv []zookeeper.ACL) (pathCreated string, err error)

func (*GlobalConn) Delete

func (gzc *GlobalConn) Delete(path string, version int) (err error)

func (*GlobalConn) Get

func (gzc *GlobalConn) Get(path string) (data string, stat Stat, err error)

func (*GlobalConn) Set

func (gzc *GlobalConn) Set(path, value string, version int) (stat Stat, err error)

type GlobalZookeeperError

type GlobalZookeeperError string

func (GlobalZookeeperError) Error

func (e GlobalZookeeperError) Error() string

type MetaConn

type MetaConn struct {
	// contains filtered or unexported fields
}

func NewMetaConn

func NewMetaConn(useZkocc bool) *MetaConn

func (*MetaConn) ACL

func (conn *MetaConn) ACL(path string) (acl []zookeeper.ACL, stat Stat, err error)

func (*MetaConn) Children

func (conn *MetaConn) Children(path string) (children []string, stat Stat, err error)

func (*MetaConn) ChildrenW

func (conn *MetaConn) ChildrenW(path string) (children []string, stat Stat, watch <-chan zookeeper.Event, err error)

func (*MetaConn) Close

func (conn *MetaConn) Close() error

func (*MetaConn) Create

func (conn *MetaConn) Create(path, value string, flags int, aclv []zookeeper.ACL) (pathCreated string, err error)

func (*MetaConn) Delete

func (conn *MetaConn) Delete(path string, version int) (err error)

func (*MetaConn) Exists

func (conn *MetaConn) Exists(path string) (stat Stat, err error)

func (*MetaConn) ExistsW

func (conn *MetaConn) ExistsW(path string) (stat Stat, watch <-chan zookeeper.Event, err error)

func (*MetaConn) Get

func (conn *MetaConn) Get(path string) (data string, stat Stat, err error)

func (*MetaConn) GetW

func (conn *MetaConn) GetW(path string) (data string, stat Stat, watch <-chan zookeeper.Event, err error)

func (*MetaConn) RetryChange

func (conn *MetaConn) RetryChange(path string, flags int, acl []zookeeper.ACL, changeFunc ChangeFunc) error

func (*MetaConn) Set

func (conn *MetaConn) Set(path, value string, version int) (stat Stat, err error)

func (*MetaConn) SetACL

func (conn *MetaConn) SetACL(path string, aclv []zookeeper.ACL, version int) (err error)

func (*MetaConn) String

func (conn *MetaConn) String() string

Implements expvar.Var()

type Stat

type Stat interface {
	Czxid() int64
	Mzxid() int64
	CTime() time.Time
	MTime() time.Time
	Version() int
	CVersion() int
	AVersion() int
	EphemeralOwner() int64
	DataLength() int
	NumChildren() int
	Pzxid() int64
}

type ZElector

type ZElector struct {
	// contains filtered or unexported fields
}

ZElector stores basic state for running an election.

func CreateElection

func CreateElection(zconn Conn, zkPath string) ZElector

CreateElection returns an initialized elector. An election is really a cycle of events. You are flip-flopping between leader and candidate. It's better to think of this as a stream of events that one needs to react to.

func (ZElector) Interrupt

func (zm ZElector) Interrupt()

Interrupt releases a lock that's held.

func (ZElector) Lock

func (zm ZElector) Lock() error

Lock returns nil when the lock is acquired.

func (ZElector) LockWithTimeout

func (zm ZElector) LockWithTimeout(wait time.Duration) (err error)

LockWithTimeout returns nil when the lock is acquired. A lock is held if the file exists and you are the creator. Setting the wait to zero makes this a nonblocking lock check.

FIXME(msolo) Disallow non-super users from removing the lock?

func (*ZElector) RunTask

func (ze *ZElector) RunTask(task ElectorTask) error

RunTask returns nil when the underlyingtask ends or the error it generated.

func (ZElector) Unlock

func (zm ZElector) Unlock() error

Unlock returns nil if the lock was successfully released. Otherwise, it is most likely a zookeeper related error.

type ZLocker

type ZLocker interface {
	Lock() error
	LockWithTimeout(wait time.Duration) error
	Unlock() error
	Interrupt()
}

ZLocker is an interface for a lock that can fail.

func CreateMutex

func CreateMutex(zconn Conn, zkPath string) ZLocker

CreateMutex initializes an unaquired mutex. A mutex is released only by Unlock. You can clean up a mutex with delete, but you should be careful doing so.

type ZkConn

type ZkConn struct {
	// contains filtered or unexported fields
}

ZkConn is a client class that implements zk.Conn using a zookeeper.Conn. The conn member variable is protected by the mutex.

func DialZk

func DialZk(zkAddr string, baseTimeout time.Duration) (*ZkConn, <-chan zookeeper.Event, error)

Dial a ZK server and waits for connection event. Returns a ZkConn encapsulating the zookeeper.Conn, and the zookeeper session event channel to monitor the connection

The value for baseTimeout is used as a session timeout as well, and will be used to negotiate a 'good' value with the server. From reading the zookeeper source code, it has to be between 6 and 60 seconds (2x and 20x the tickTime by default, with default tick time being 3 seconds). min session time, max session time and ticktime can all be overwritten on the zookeeper server side, so these numbers may vary.

Then this baseTimeout is used to compute other related timeouts: - connect timeout is 1/3 of baseTimeout - recv timeout is 2/3 of baseTimeout minus a ping time - send timeout is 1/3 of baseTimeout - we try to send a ping a least every baseTimeout / 3

Note the baseTimeout has *nothing* to do with the time between we call Dial and the maximum time before we receive the event on the session. The library will actually try to re-connect in the background (after each timeout), and may *never* send an event if the TCP connections always fail. Use DialZkTimeout to enforce a timeout for the initial connect.

func DialZkTimeout

func DialZkTimeout(zkAddr string, baseTimeout time.Duration, connectTimeout time.Duration) (*ZkConn, <-chan zookeeper.Event, error)

func (*ZkConn) ACL

func (conn *ZkConn) ACL(path string) (acls []zookeeper.ACL, stat Stat, err error)

func (*ZkConn) Children

func (conn *ZkConn) Children(path string) (children []string, stat Stat, err error)

func (*ZkConn) ChildrenW

func (conn *ZkConn) ChildrenW(path string) (children []string, stat Stat, watch <-chan zookeeper.Event, err error)

func (*ZkConn) Close

func (conn *ZkConn) Close() error

Close will close the connection asynchronously. It will never fail, even though closing the connection might fail in the background. Accessing this ZkConn after Close has been called will return ErrConnectionClosed.

func (*ZkConn) Create

func (conn *ZkConn) Create(path, value string, flags int, aclv []zookeeper.ACL) (pathCreated string, err error)

func (*ZkConn) Delete

func (conn *ZkConn) Delete(path string, version int) (err error)

func (*ZkConn) Exists

func (conn *ZkConn) Exists(path string) (stat Stat, err error)

func (*ZkConn) ExistsW

func (conn *ZkConn) ExistsW(path string) (stat Stat, watch <-chan zookeeper.Event, err error)

func (*ZkConn) Get

func (conn *ZkConn) Get(path string) (data string, stat Stat, err error)

func (*ZkConn) GetW

func (conn *ZkConn) GetW(path string) (data string, stat Stat, watch <-chan zookeeper.Event, err error)

func (*ZkConn) RetryChange

func (conn *ZkConn) RetryChange(path string, flags int, acl []zookeeper.ACL, changeFunc ChangeFunc) error

func (*ZkConn) Set

func (conn *ZkConn) Set(path, value string, version int) (stat Stat, err error)

func (*ZkConn) SetACL

func (conn *ZkConn) SetACL(path string, aclv []zookeeper.ACL, version int) error

type ZkNode

type ZkNode struct {
	Path     string
	Data     string
	Stat     ZkStat
	Children []string
	Cached   bool // the response comes from the zkocc cache
	Stale    bool // the response is stale because we're not connected
}

func (*ZkNode) MarshalBson

func (zkNode *ZkNode) MarshalBson(buf *bytes2.ChunkedWriter, key string)

MarshalBson bson-encodes ZkNode.

func (*ZkNode) UnmarshalBson

func (zkNode *ZkNode) UnmarshalBson(buf *bytes.Buffer, kind byte)

UnmarshalBson bson-decodes into ZkNode.

type ZkNodeV

type ZkNodeV struct {
	Nodes []*ZkNode
}

func (*ZkNodeV) MarshalBson

func (zkNodeV *ZkNodeV) MarshalBson(buf *bytes2.ChunkedWriter, key string)

MarshalBson bson-encodes ZkNodeV.

func (*ZkNodeV) UnmarshalBson

func (zkNodeV *ZkNodeV) UnmarshalBson(buf *bytes.Buffer, kind byte)

UnmarshalBson bson-decodes into ZkNodeV.

type ZkPath

type ZkPath struct {
	Path string
}

func (*ZkPath) MarshalBson

func (zkPath *ZkPath) MarshalBson(buf *bytes2.ChunkedWriter, key string)

MarshalBson bson-encodes ZkPath.

func (*ZkPath) UnmarshalBson

func (zkPath *ZkPath) UnmarshalBson(buf *bytes.Buffer, kind byte)

UnmarshalBson bson-decodes into ZkPath.

type ZkPathV

type ZkPathV struct {
	Paths []string
}

func (*ZkPathV) MarshalBson

func (zkPathV *ZkPathV) MarshalBson(buf *bytes2.ChunkedWriter, key string)

MarshalBson bson-encodes ZkPathV.

func (*ZkPathV) UnmarshalBson

func (zkPathV *ZkPathV) UnmarshalBson(buf *bytes.Buffer, kind byte)

UnmarshalBson bson-decodes into ZkPathV.

type ZkReader

type ZkReader interface {
	Get(req *ZkPath, reply *ZkNode) error
	GetV(req *ZkPathV, reply *ZkNodeV) error
	Children(req *ZkPath, reply *ZkNode) error
}

defines the RPC services for zkocc the service name to use is 'ZkReader'

type ZkStat

type ZkStat struct {
	// contains filtered or unexported fields
}

func (*ZkStat) AVersion

func (zkStat *ZkStat) AVersion() int

func (*ZkStat) CTime

func (zkStat *ZkStat) CTime() time.Time

func (*ZkStat) CVersion

func (zkStat *ZkStat) CVersion() int

func (*ZkStat) Czxid

func (zkStat *ZkStat) Czxid() int64

ZkStat methods to match zk.Stat interface

func (*ZkStat) DataLength

func (zkStat *ZkStat) DataLength() int

func (*ZkStat) EphemeralOwner

func (zkStat *ZkStat) EphemeralOwner() int64

func (*ZkStat) FromZookeeperStat

func (zkStat *ZkStat) FromZookeeperStat(zStat Stat)

helper method

func (*ZkStat) MTime

func (zkStat *ZkStat) MTime() time.Time

func (*ZkStat) MarshalBson

func (zkStat *ZkStat) MarshalBson(buf *bytes2.ChunkedWriter, key string)

MarshalBson bson-encodes ZkStat.

func (*ZkStat) Mzxid

func (zkStat *ZkStat) Mzxid() int64

func (*ZkStat) NumChildren

func (zkStat *ZkStat) NumChildren() int

func (*ZkStat) Pzxid

func (zkStat *ZkStat) Pzxid() int64

func (*ZkStat) UnmarshalBson

func (zkStat *ZkStat) UnmarshalBson(buf *bytes.Buffer, kind byte)

UnmarshalBson bson-decodes into ZkStat.

func (*ZkStat) Version

func (zkStat *ZkStat) Version() int

type ZkoccConn

type ZkoccConn struct {
	// contains filtered or unexported fields
}

ZkoccConn is a client class that implements zk.Conn but uses a RPC client to talk to a zkocc process

func DialZkocc

func DialZkocc(addr string, connectTimeout time.Duration) (zkocc *ZkoccConn, err error)

From the addr (of the form server1:port1,server2:port2,server3:port3:...) splits it on commas, randomizes the list, and tries to connect to the servers, stopping at the first successful connection

func (*ZkoccConn) ACL

func (conn *ZkoccConn) ACL(path string) ([]zookeeper.ACL, Stat, error)

might want to add ACL in RPC code

func (*ZkoccConn) Children

func (conn *ZkoccConn) Children(path string) (children []string, stat Stat, err error)

func (*ZkoccConn) ChildrenW

func (conn *ZkoccConn) ChildrenW(path string) (children []string, stat Stat, watch <-chan zookeeper.Event, err error)

func (*ZkoccConn) Close

func (conn *ZkoccConn) Close() error

func (*ZkoccConn) Create

func (conn *ZkoccConn) Create(path, value string, flags int, aclv []zookeeper.ACL) (pathCreated string, err error)

func (*ZkoccConn) Delete

func (conn *ZkoccConn) Delete(path string, version int) (err error)

func (*ZkoccConn) Exists

func (conn *ZkoccConn) Exists(path string) (stat Stat, err error)

implement Exists using Get FIXME(alainjobart) Maybe we should add Exists in rpc API?

func (*ZkoccConn) ExistsW

func (conn *ZkoccConn) ExistsW(path string) (stat Stat, watch <-chan zookeeper.Event, err error)

func (*ZkoccConn) Get

func (conn *ZkoccConn) Get(path string) (data string, stat Stat, err error)

func (*ZkoccConn) GetW

func (conn *ZkoccConn) GetW(path string) (data string, stat Stat, watch <-chan zookeeper.Event, err error)

func (*ZkoccConn) RetryChange

func (conn *ZkoccConn) RetryChange(path string, flags int, acl []zookeeper.ACL, changeFunc ChangeFunc) error

func (*ZkoccConn) Set

func (conn *ZkoccConn) Set(path, value string, version int) (stat Stat, err error)

func (*ZkoccConn) SetACL

func (conn *ZkoccConn) SetACL(path string, aclv []zookeeper.ACL, version int) error

type ZkoccUnimplementedError

type ZkoccUnimplementedError string

func (ZkoccUnimplementedError) Error

func (e ZkoccUnimplementedError) Error() string

Directories

Path Synopsis
Package fakezk is a pretty complete mock implementation of a Zookeper connection (see go/zk/zk.Conn).
Package fakezk is a pretty complete mock implementation of a Zookeper connection (see go/zk/zk.Conn).
pdns
To be used with PowerDNS (pdns) as a "pipe backend" CoProcess.
To be used with PowerDNS (pdns) as a "pipe backend" CoProcess.
cache for zkocc
cache for zkocc

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL