zk

package
v0.0.0-...-9a5932e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2014 License: BSD-3-Clause Imports: 23 Imported by: 0

Documentation

Overview

Emulate a "global" namespace across n zk quorums.

Index

Constants

View Source
const (
	DISCONNECTED = 0
	CONNECTING   = 1
	CONNECTED    = 2
)
View Source
const (
	DEFAULT_BASE_TIMEOUT = 5 * time.Second
)
View Source
const (
	DEFAULT_MAX_RETRIES = 3
)

Variables

View Source
var (
	// This error is returned by functions that wait for a result
	// when they are interrupted.
	ErrInterrupted = errors.New("zkutil: obtaining lock was interrupted")

	// This error is returned by functions that wait for a result
	// when the timeout value is reached.
	ErrTimeout = errors.New("zkutil: obtaining lock timed out")
)

Functions

func ChildrenRecursive

func ChildrenRecursive(zconn Conn, zkPath string) ([]string, error)

func CreateOrUpdate

func CreateOrUpdate(zconn Conn, zkPath, value string, flags int, aclv []zookeeper.ACL, recursive bool) (pathCreated string, err error)

func CreatePidNode

func CreatePidNode(zconn Conn, zkPath string, contents string, done chan struct{}) error

Close done when you want to exit cleanly.

func CreateRecursive

func CreateRecursive(zconn Conn, zkPath, value string, flags int, aclv []zookeeper.ACL) (pathCreated string, err error)

Create a path and any pieces required, think mkdir -p. Intermediate znodes are always created empty.

func DeleteRecursive

func DeleteRecursive(zconn Conn, zkPath string, version int) error

func GetZkSubprocessFlags

func GetZkSubprocessFlags() []string

GetZkSubprocessFlags returns the flags necessary to run a sub-process that would connect to the same zk servers as this process (assuming the environment of the sub-process is the same as ours)

func GuessLocalCell

func GuessLocalCell() string

Read the cell from -zk.local-cell, or the environment ZK_CLIENT_LOCAL_CELL or guess the cell by the hostname. This is either the first two characters or the character before a dash '-'.

func ObtainQueueLock

func ObtainQueueLock(zconn Conn, zkPath string, wait time.Duration, interrupted chan struct{}) error

The lexically lowest node is the lock holder - verify that this path holds the lock. Call this queue-lock because the semantics are a hybrid. Normal zookeeper locks make assumptions about sequential numbering that don't hold when the data in a lock is modified. if the provided 'interrupted' chan is closed, we'll just stop waiting and return an interruption error

func RegisterZkReader

func RegisterZkReader(zkReader ZkReader)

helper method to register the server (does interface checking)

func ResolveWildcards

func ResolveWildcards(zconn Conn, zkPaths []string) ([]string, error)

resolve paths like: /zk/nyc/vt/tablets/*/action /zk/global/vt/keyspaces/*/shards/*/action /zk/*/vt/tablets/*/action into real existing paths

If you send paths that don't contain any wildcard and don't exist, this function will return an empty array.

func ZkCellFromZkPath

func ZkCellFromZkPath(zkPath string) (string, error)

func ZkKnownCells

func ZkKnownCells(useCache bool) []string

returns all the known cells, alphabetically ordered. It will include 'global' if there is a dc-specific global cell or a global cell

func ZkPathToZkAddr

func ZkPathToZkAddr(zkPath string, useCache bool) (string, error)

Types

type ChangeFunc

type ChangeFunc func(oldValue string, oldStat Stat) (newValue string, err error)

type Conn

type Conn interface {
	Get(path string) (data string, stat Stat, err error)
	GetW(path string) (data string, stat Stat, watch <-chan zookeeper.Event, err error)

	Children(path string) (children []string, stat Stat, err error)
	ChildrenW(path string) (children []string, stat Stat, watch <-chan zookeeper.Event, err error)

	Exists(path string) (stat Stat, err error)
	ExistsW(path string) (stat Stat, watch <-chan zookeeper.Event, err error)

	Create(path, value string, flags int, aclv []zookeeper.ACL) (pathCreated string, err error)

	Set(path, value string, version int) (stat Stat, err error)

	Delete(path string, version int) (err error)

	Close() error

	RetryChange(path string, flags int, acl []zookeeper.ACL, changeFunc ChangeFunc) error

	ACL(path string) ([]zookeeper.ACL, Stat, error)
	SetACL(path string, aclv []zookeeper.ACL, version int) error
}

This interface is really close to the zookeeper connection interface. It uses the Stat interface defined here instead of the zookeeper.Stat structure for stats. Everything else is the same as in zookeeper. So refer to the zookeeper docs for the conventions used here (for instance, using -1 as version to specify any version)

type ConnCache

type ConnCache struct {
	// contains filtered or unexported fields
}

func NewConnCache

func NewConnCache(useZkocc bool) *ConnCache

func (*ConnCache) Close

func (cc *ConnCache) Close() error

func (*ConnCache) ConnForPath

func (cc *ConnCache) ConnForPath(zkPath string) (cn Conn, err error)

func (*ConnCache) String

func (cc *ConnCache) String() string

Implements expvar.Var()

type GlobalConn

type GlobalConn struct {
	// contains filtered or unexported fields
}

func Dial

func Dial(serverAddrs []string, recvTimeout time.Duration) (*GlobalConn, <-chan zookeeper.Event, error)

func (*GlobalConn) Children

func (gzc *GlobalConn) Children(path string) (children []string, stat Stat, err error)

func (*GlobalConn) Close

func (gzc *GlobalConn) Close() (err error)

func (*GlobalConn) Create

func (gzc *GlobalConn) Create(path, value string, flags int, aclv []zookeeper.ACL) (pathCreated string, err error)

func (*GlobalConn) Delete

func (gzc *GlobalConn) Delete(path string, version int) (err error)

func (*GlobalConn) Get

func (gzc *GlobalConn) Get(path string) (data string, stat Stat, err error)

func (*GlobalConn) Set

func (gzc *GlobalConn) Set(path, value string, version int) (stat Stat, err error)

type GlobalZookeeperError

type GlobalZookeeperError string

func (GlobalZookeeperError) Error

func (e GlobalZookeeperError) Error() string

type MetaConn

type MetaConn struct {
	// contains filtered or unexported fields
}

func NewMetaConn

func NewMetaConn(useZkocc bool) *MetaConn

func (*MetaConn) ACL

func (conn *MetaConn) ACL(path string) (acl []zookeeper.ACL, stat Stat, err error)

func (*MetaConn) Children

func (conn *MetaConn) Children(path string) (children []string, stat Stat, err error)

func (*MetaConn) ChildrenW

func (conn *MetaConn) ChildrenW(path string) (children []string, stat Stat, watch <-chan zookeeper.Event, err error)

func (*MetaConn) Close

func (conn *MetaConn) Close() error

func (*MetaConn) Create

func (conn *MetaConn) Create(path, value string, flags int, aclv []zookeeper.ACL) (pathCreated string, err error)

func (*MetaConn) Delete

func (conn *MetaConn) Delete(path string, version int) (err error)

func (*MetaConn) Exists

func (conn *MetaConn) Exists(path string) (stat Stat, err error)

func (*MetaConn) ExistsW

func (conn *MetaConn) ExistsW(path string) (stat Stat, watch <-chan zookeeper.Event, err error)

func (*MetaConn) Get

func (conn *MetaConn) Get(path string) (data string, stat Stat, err error)

func (*MetaConn) GetW

func (conn *MetaConn) GetW(path string) (data string, stat Stat, watch <-chan zookeeper.Event, err error)

func (*MetaConn) RetryChange

func (conn *MetaConn) RetryChange(path string, flags int, acl []zookeeper.ACL, changeFunc ChangeFunc) error

func (*MetaConn) Set

func (conn *MetaConn) Set(path, value string, version int) (stat Stat, err error)

func (*MetaConn) SetACL

func (conn *MetaConn) SetACL(path string, aclv []zookeeper.ACL, version int) (err error)

func (*MetaConn) String

func (conn *MetaConn) String() string

Implements expvar.Var()

type Stat

type Stat interface {
	Czxid() int64
	Mzxid() int64
	CTime() time.Time
	MTime() time.Time
	Version() int
	CVersion() int
	AVersion() int
	EphemeralOwner() int64
	DataLength() int
	NumChildren() int
	Pzxid() int64
}

type ZkConn

type ZkConn struct {
	// contains filtered or unexported fields
}

ZkConn is a client class that implements zk.Conn using a zookeeper.Conn

func DialZk

func DialZk(zkAddr string, baseTimeout time.Duration) (*ZkConn, <-chan zookeeper.Event, error)

Dial a ZK server and waits for connection event. Returns a ZkConn encapsulating the zookeeper.Conn, and the zookeeper session event channel to monitor the connection

The value for baseTimeout is used as a session timeout as well, and will be used to negotiate a 'good' value with the server. From reading the zookeeper source code, it has to be between 6 and 60 seconds (2x and 20x the tickTime by default, with default tick time being 3 seconds). min session time, max session time and ticktime can all be overwritten on the zookeeper server side, so these numbers may vary.

Then this baseTimeout is used to compute other related timeouts: - connect timeout is 1/3 of baseTimeout - recv timeout is 2/3 of baseTimeout minus a ping time - send timeout is 1/3 of baseTimeout - we try to send a ping a least every baseTimeout / 3

Note the baseTimeout has *nothing* to do with the time between we call Dial and the maximum time before we receive the event on the session. The library will actually try to re-connect in the background (after each timeout), and may *never* send an event if the TCP connections always fail. Use DialZkTimeout to enforce a timeout for the initial connect.

func DialZkTimeout

func DialZkTimeout(zkAddr string, baseTimeout time.Duration, connectTimeout time.Duration) (*ZkConn, <-chan zookeeper.Event, error)

func (*ZkConn) ACL

func (conn *ZkConn) ACL(path string) (acls []zookeeper.ACL, stat Stat, err error)

func (*ZkConn) Children

func (conn *ZkConn) Children(path string) (children []string, stat Stat, err error)

func (*ZkConn) ChildrenW

func (conn *ZkConn) ChildrenW(path string) (children []string, stat Stat, watch <-chan zookeeper.Event, err error)

func (*ZkConn) Close

func (conn *ZkConn) Close() error

Close will close the connection asynchronously. It will never fail, even though closing the connection might fail in the background. Accessing this ZkConn after Close has been called will panic.

func (*ZkConn) Create

func (conn *ZkConn) Create(path, value string, flags int, aclv []zookeeper.ACL) (pathCreated string, err error)

func (*ZkConn) Delete

func (conn *ZkConn) Delete(path string, version int) (err error)

func (*ZkConn) Exists

func (conn *ZkConn) Exists(path string) (stat Stat, err error)

func (*ZkConn) ExistsW

func (conn *ZkConn) ExistsW(path string) (stat Stat, watch <-chan zookeeper.Event, err error)

func (*ZkConn) Get

func (conn *ZkConn) Get(path string) (data string, stat Stat, err error)

func (*ZkConn) GetW

func (conn *ZkConn) GetW(path string) (data string, stat Stat, watch <-chan zookeeper.Event, err error)

func (*ZkConn) RetryChange

func (conn *ZkConn) RetryChange(path string, flags int, acl []zookeeper.ACL, changeFunc ChangeFunc) error

func (*ZkConn) Set

func (conn *ZkConn) Set(path, value string, version int) (stat Stat, err error)

func (*ZkConn) SetACL

func (conn *ZkConn) SetACL(path string, aclv []zookeeper.ACL, version int) error

type ZkNode

type ZkNode struct {
	Path     string
	Data     string
	Stat     ZkStat
	Children []string
	Cached   bool // the response comes from the zkocc cache
	Stale    bool // the response is stale because we're not connected
}

func (*ZkNode) MarshalBson

func (zkNode *ZkNode) MarshalBson(buf *bytes2.ChunkedWriter, key string)

func (*ZkNode) UnmarshalBson

func (zkNode *ZkNode) UnmarshalBson(buf *bytes.Buffer, kind byte)

type ZkNodeV

type ZkNodeV struct {
	Nodes []*ZkNode
}

func (*ZkNodeV) MarshalBson

func (zkNodeV *ZkNodeV) MarshalBson(buf *bytes2.ChunkedWriter, key string)

func (*ZkNodeV) UnmarshalBson

func (zkNodeV *ZkNodeV) UnmarshalBson(buf *bytes.Buffer, kind byte)

type ZkPath

type ZkPath struct {
	Path string
}

func (*ZkPath) MarshalBson

func (zkPath *ZkPath) MarshalBson(buf *bytes2.ChunkedWriter, key string)

func (*ZkPath) UnmarshalBson

func (zkPath *ZkPath) UnmarshalBson(buf *bytes.Buffer, kind byte)

type ZkPathV

type ZkPathV struct {
	Paths []string
}

func (*ZkPathV) MarshalBson

func (zkPathV *ZkPathV) MarshalBson(buf *bytes2.ChunkedWriter, key string)

func (*ZkPathV) UnmarshalBson

func (zkPathV *ZkPathV) UnmarshalBson(buf *bytes.Buffer, kind byte)

type ZkReader

type ZkReader interface {
	Get(req *ZkPath, reply *ZkNode) error
	GetV(req *ZkPathV, reply *ZkNodeV) error
	Children(req *ZkPath, reply *ZkNode) error
}

defines the RPC services for zkocc the service name to use is 'ZkReader'

type ZkStat

type ZkStat struct {
	// contains filtered or unexported fields
}

func (*ZkStat) AVersion

func (zkStat *ZkStat) AVersion() int

func (*ZkStat) CTime

func (zkStat *ZkStat) CTime() time.Time

func (*ZkStat) CVersion

func (zkStat *ZkStat) CVersion() int

func (*ZkStat) Czxid

func (zkStat *ZkStat) Czxid() int64

ZkStat methods to match zk.Stat interface

func (*ZkStat) DataLength

func (zkStat *ZkStat) DataLength() int

func (*ZkStat) EphemeralOwner

func (zkStat *ZkStat) EphemeralOwner() int64

func (*ZkStat) FromZookeeperStat

func (zkStat *ZkStat) FromZookeeperStat(zStat Stat)

helper method

func (*ZkStat) MTime

func (zkStat *ZkStat) MTime() time.Time

func (*ZkStat) MarshalBson

func (zkStat *ZkStat) MarshalBson(buf *bytes2.ChunkedWriter, key string)

func (*ZkStat) Mzxid

func (zkStat *ZkStat) Mzxid() int64

func (*ZkStat) NumChildren

func (zkStat *ZkStat) NumChildren() int

func (*ZkStat) Pzxid

func (zkStat *ZkStat) Pzxid() int64

func (*ZkStat) UnmarshalBson

func (zkStat *ZkStat) UnmarshalBson(buf *bytes.Buffer, kind byte)

func (*ZkStat) Version

func (zkStat *ZkStat) Version() int

type ZkoccConn

type ZkoccConn struct {
	// contains filtered or unexported fields
}

ZkoccConn is a client class that implements zk.Conn but uses a RPC client to talk to a zkocc process

func DialZkocc

func DialZkocc(addr string, connectTimeout time.Duration) (zkocc *ZkoccConn, err error)

From the addr (of the form server1:port1,server2:port2,server3:port3:...) splits it on commas, randomizes the list, and tries to connect to the servers, stopping at the first successful connection

func (*ZkoccConn) ACL

func (conn *ZkoccConn) ACL(path string) ([]zookeeper.ACL, Stat, error)

might want to add ACL in RPC code

func (*ZkoccConn) Children

func (conn *ZkoccConn) Children(path string) (children []string, stat Stat, err error)

func (*ZkoccConn) ChildrenW

func (conn *ZkoccConn) ChildrenW(path string) (children []string, stat Stat, watch <-chan zookeeper.Event, err error)

func (*ZkoccConn) Close

func (conn *ZkoccConn) Close() error

func (*ZkoccConn) Create

func (conn *ZkoccConn) Create(path, value string, flags int, aclv []zookeeper.ACL) (pathCreated string, err error)

func (*ZkoccConn) Delete

func (conn *ZkoccConn) Delete(path string, version int) (err error)

func (*ZkoccConn) Exists

func (conn *ZkoccConn) Exists(path string) (stat Stat, err error)

implement Exists using Get FIXME(alainjobart) Maybe we should add Exists in rpc API?

func (*ZkoccConn) ExistsW

func (conn *ZkoccConn) ExistsW(path string) (stat Stat, watch <-chan zookeeper.Event, err error)

func (*ZkoccConn) Get

func (conn *ZkoccConn) Get(path string) (data string, stat Stat, err error)

func (*ZkoccConn) GetW

func (conn *ZkoccConn) GetW(path string) (data string, stat Stat, watch <-chan zookeeper.Event, err error)

func (*ZkoccConn) RetryChange

func (conn *ZkoccConn) RetryChange(path string, flags int, acl []zookeeper.ACL, changeFunc ChangeFunc) error

func (*ZkoccConn) Set

func (conn *ZkoccConn) Set(path, value string, version int) (stat Stat, err error)

func (*ZkoccConn) SetACL

func (conn *ZkoccConn) SetACL(path string, aclv []zookeeper.ACL, version int) error

type ZkoccUnimplementedError

type ZkoccUnimplementedError string

func (ZkoccUnimplementedError) Error

func (e ZkoccUnimplementedError) Error() string

Directories

Path Synopsis
Package fakezk is a pretty complete mock implementation of a Zookeper connection (see go/zk/zk.Conn).
Package fakezk is a pretty complete mock implementation of a Zookeper connection (see go/zk/zk.Conn).
cache for zkocc
cache for zkocc

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL