Documentation ¶
Index ¶
- Constants
- Variables
- func Assert(stmt func() bool)
- func BucketSeqnos(cluster, pooln, bucketn string) (l_seqnos []uint64, err error)
- func CPUCount(log bool) int
- func CheckIfRebalanceOngoing(urlSuffix string, nodeAddrs []string) (bool, error)
- func ClusterAuthUrl(cluster string) (string, error)
- func CollectSeqnos(kvfeeds map[string]*kvConn) (l_seqnos []uint64, err error)
- func CompareSlices(s1, s2 []uint16) bool
- func ComputeMD5(data []byte) ([]byte, error)
- func Condense(vbs []uint16) string
- func ConnectBucket(cluster, pooln, bucketn string) (*couchbase.Bucket, error)
- func Console(clusterAddr string, format string, v ...interface{}) error
- func Contains(needle interface{}, haystack interface{}) bool
- func ContainsIgnoreCase(needle string, haystack []string) bool
- func ConvertBigEndianToUint64(cas []byte) (uint64, error)
- func CountActiveKVNodes(bucket, hostaddress string) int
- func CurrentEventingNodeAddress(auth, hostaddress string) (string, error)
- func DeepCopy(kv map[string]interface{}) (newKv map[string]interface{})
- func DeleteAppContent(appPath, checksumPath, appName string) error
- func DeleteStaleAppContent(appPath, appName string) error
- func Do(req *http.Request) (*http.Response, error)
- func EventingNodesAddresses(auth, hostaddress string) ([]string, error)
- func EventingVer() string
- func FloatEquals(a, b float64) bool
- func GenerateHandlerUUID() (uint32, error)
- func Get(url string) (resp *http.Response, err error)
- func GetAggBootstrappingApps(urlSuffix string, nodeAddrs []string) (bool, error)
- func GetAppNameFromPath(path string) string
- func GetDeployedApps(urlSuffix string, nodeAddrs []string) (map[string]map[string]string, error)
- func GetEventProcessingStats(urlSuffix string, nodeAddrs []string) (map[string]int64, error)
- func GetHash(appCode string) string
- func GetIPMode() string
- func GetLogLevel(logLevel string) logging.LogLevel
- func GetMaxVbuckets() int
- func GetNodeUUIDs(urlSuffix string, nodeAddrs []string) (map[string]string, error)
- func GetProgress(urlSuffix string, nodeAddrs []string) (*cm.RebalanceProgress, map[string]interface{}, map[string]error)
- func Head(url string) (resp *http.Response, err error)
- func IsIPv6() bool
- func KVNodesAddresses(auth, hostaddress, bucket string) ([]string, error)
- func KVVbMap(auth, bucket, hostaddress string) (map[uint16]string, error)
- func KillProcess(pid int) error
- func ListChildren(path string) []string
- func LocalEventingServiceHost(auth, hostaddress string) (string, error)
- func LocalKey() (usr, key string)
- func Localhost() string
- func MemcachedErrCode(err error) gomemcached.Status
- func MetaKvDelete(path string, rev interface{}) error
- func MetakvGet(path string) ([]byte, error)
- func MetakvRecursiveDelete(dirpath string) error
- func MetakvSet(path string, value []byte, rev interface{}) error
- func NewRequest(method, url string, body io.Reader) (*http.Request, error)
- func NsServerNodesAddresses(auth, hostaddress string) ([]string, error)
- func Post(url string, contentType string, body io.Reader) (resp *http.Response, err error)
- func PostForm(url string, data url.Values) (resp *http.Response, err error)
- func ReadAppContent(appsPath, checksumPath, appName string) ([]byte, error)
- func RecursiveDelete(dirpath string) error
- func Retry(b Backoff, retryCount *int64, callback CallbackFunc, args ...interface{}) error
- func SeedProcess()
- func SetIPv6(is6 bool)
- func SetMaxVbuckets(sz int)
- func SetServicePorts(portMap map[string]string)
- func SprintDCPCounts(counts map[mcd.CommandCode]uint64) (string, uint64, time.Time)
- func SprintV8Counts(counts map[string]uint64) string
- func StopDebugger(nodeAddr, appName string)
- func StrSliceDiff(kv1, kv2 []string) []string
- func StripScheme(endpoint string) string
- func SuperImpose(source, on map[string]interface{}) map[string]interface{}
- func ToStr(value bool) (strValue string)
- func ToStringArray(from interface{}) (to []string)
- func Uint16SliceDiff(kv1, kv2 []uint16) []uint16
- func VbsSliceDiff(X, Y []uint16) []uint16
- func VbucketByKey(key []byte, numVbuckets int) uint16
- func VbucketDistribution(vbs []uint16, numWorkers int) map[int][]uint16
- func VbucketNodeAssignment(vbs []uint16, numWorkers int) map[int][]uint16
- func WriteAppContent(appsPath, checksumPath, appName string, payload []byte) error
- type Backoff
- type BoundedQueue
- type CallbackFunc
- type CbAuthHandler
- type Client
- func (c *Client) Do(req *http.Request) (*http.Response, error)
- func (c *Client) Get(url string) (resp *http.Response, err error)
- func (c *Client) Head(url string) (resp *http.Response, err error)
- func (c *Client) Post(url string, contentType string, body io.Reader) (resp *http.Response, err error)
- func (c *Client) PostForm(url string, data url.Values) (resp *http.Response, err error)
- type Clock
- type ClusterInfoCache
- func (c *ClusterInfoCache) Fetch() error
- func (c *ClusterInfoCache) GetActiveEventingNodes() (nodes []couchbase.Node)
- func (c *ClusterInfoCache) GetBucketUUID(bucket string) (uuid string)
- func (c *ClusterInfoCache) GetClusterVersion() (int, int)
- func (c *ClusterInfoCache) GetCurrentNode() NodeId
- func (c *ClusterInfoCache) GetFailedEventingNodes() (nodes []couchbase.Node)
- func (c *ClusterInfoCache) GetLocalHostAddress() (string, error)
- func (c *ClusterInfoCache) GetLocalHostname() (string, error)
- func (c *ClusterInfoCache) GetLocalServerGroup() (string, error)
- func (c *ClusterInfoCache) GetLocalServiceAddress(srvc string) (string, error)
- func (c *ClusterInfoCache) GetLocalServiceHost(srvc string) (string, error)
- func (c *ClusterInfoCache) GetLocalServicePort(srvc string) (string, error)
- func (c *ClusterInfoCache) GetNewEventingNodes() (nodes []couchbase.Node)
- func (c *ClusterInfoCache) GetNodeStatus(nid NodeId) (string, error)
- func (c *ClusterInfoCache) GetNodesByBucket(bucket string) (nids []NodeId, err error)
- func (c *ClusterInfoCache) GetNodesByServiceType(srvc string) (nids []NodeId)
- func (c *ClusterInfoCache) GetServerGroup(nid NodeId) string
- func (c *ClusterInfoCache) GetServiceAddress(nid NodeId, srvc string) (addr string, err error)
- func (c *ClusterInfoCache) GetVBuckets(nid NodeId, bucket string) (vbs []uint32, err error)
- func (c *ClusterInfoCache) IsEphemeral(bucket string) (bool, error)
- func (c *ClusterInfoCache) IsMemcached(bucket string) (bool, error)
- func (c *ClusterInfoCache) IsNodeHealthy(nid NodeId) (bool, error)
- func (c *ClusterInfoCache) SetLogPrefix(p string)
- func (c *ClusterInfoCache) SetMaxRetries(r int)
- func (c *ClusterInfoCache) SetServicePorts(portMap map[string]string)
- type Config
- type ConfigHolder
- type DynamicAuthenticator
- type Element
- type ExponentialBackoff
- type FixedBackoff
- type GocbLogger
- type NamedParamsInfo
- type NodeId
- type ParseInfo
- type PayloadHash
- type RetryHelper
- type UUID
- type Uint16Slice
Constants ¶
const ( EVENTING_ADMIN_SERVICE = "eventingAdminPort" EVENTING_SSL_SERVICE = "eventingSSL" )
const ( Stop time.Duration = -1 DefaultInitialInterval = 1 * time.Second DefaultRandomizationFactor = 0.5 DefaultMultiplier = 1.5 DefaultMaxInterval = 10 * time.Second DefaultMaxElapsedTime = time.Minute )
const ( EventingAdminService = "eventingAdminPort" DataService = "kv" MgmtService = "mgmt" HTTPRequestTimeout = time.Duration(5000) * time.Millisecond EPSILON = 1e-5 )
const BUCKET_UUID_NIL = ""
const CLUSTER_INFO_INIT_RETRIES = 5
const CLUSTER_INFO_VALIDATION_RETRIES = 10
const (
MAX_AUTH_RETRIES = 10
)
Variables ¶
var ( ErrInvalidNodeId = errors.New("Invalid NodeId") ErrInvalidService = errors.New("Invalid service") ErrNodeNotBucketMember = errors.New("Node is not a member of bucket") ErrValidationFailed = errors.New("ClusterInfo Validation Failed") )
var DefaultClient = &Client{}
var ErrorClosed = errors.New("bounded_queue is closed")
var ErrorSize = errors.New("element size is more than max memory quota")
var GocbCredsRequestCounter = 0
var ServiceAddrMap map[string]string
var SystemClock = systemClock{}
Functions ¶
func BucketSeqnos ¶
BucketSeqnos return list of {{vbno,seqno}..} for all vbuckets. this call might fail due to,
- concurrent access that can preserve a deleted/failed bucket object.
- pollForDeletedBuckets() did not get a chance to cleanup a deleted bucket.
in both the cases if the call is retried it should get fixed, provided a valid bucket exists.
func CheckIfRebalanceOngoing ¶
func ClusterAuthUrl ¶
func CollectSeqnos ¶
func CompareSlices ¶
func ComputeMD5 ¶
func ConnectBucket ¶
ConnectBucket will instantiate a couchbase-bucket instance with cluster. caller's responsibility to close the bucket.
func ContainsIgnoreCase ¶
func CountActiveKVNodes ¶
func DeleteAppContent ¶
DeleteAppContent delete handler code
func DeleteStaleAppContent ¶
Delete stale app fragments
func EventingNodesAddresses ¶
func EventingVer ¶
func EventingVer() string
func FloatEquals ¶
func GenerateHandlerUUID ¶
func GetAggBootstrappingApps ¶
func GetAppNameFromPath ¶
func GetDeployedApps ¶
func GetEventProcessingStats ¶
func GetLogLevel ¶
func GetMaxVbuckets ¶
func GetMaxVbuckets() int
func GetNodeUUIDs ¶
func GetProgress ¶
func KVNodesAddresses ¶
func KillProcess ¶
func ListChildren ¶
func MemcachedErrCode ¶
func MemcachedErrCode(err error) gomemcached.Status
func MetaKvDelete ¶
func MetakvRecursiveDelete ¶
func NsServerNodesAddresses ¶
func ReadAppContent ¶
ReadAppContent reads Handler Code
func RecursiveDelete ¶
func Retry ¶
func Retry(b Backoff, retryCount *int64, callback CallbackFunc, args ...interface{}) error
func SeedProcess ¶
func SeedProcess()
func SetMaxVbuckets ¶
func SetMaxVbuckets(sz int)
func SetServicePorts ¶
func SprintDCPCounts ¶
func SprintV8Counts ¶
func StopDebugger ¶
func StopDebugger(nodeAddr, appName string)
func StrSliceDiff ¶
func StripScheme ¶
func SuperImpose ¶
func ToStringArray ¶
func ToStringArray(from interface{}) (to []string)
func Uint16SliceDiff ¶
func VbsSliceDiff ¶
func VbucketByKey ¶
VbucketByKey returns doc_id to vbucket mapping
func VbucketDistribution ¶
VbucketDistribution is used by vbucket ownership give up and takeover routines during rebalance
func VbucketNodeAssignment ¶
VbucketNodeAssignment will be used as generic partitioning scheme for vbucket assignment to Eventing.Consumer and Eventing.Producer instances
func WriteAppContent ¶
WriteAppContent fragments the payload and store it to metakv
Types ¶
type BoundedQueue ¶
type BoundedQueue struct {
// contains filtered or unexported fields
}
func NewBoundedQueue ¶
func NewBoundedQueue(maxcount, maxsize uint64) *BoundedQueue
func (*BoundedQueue) Close ¶
func (q *BoundedQueue) Close()
func (*BoundedQueue) Count ¶
func (q *BoundedQueue) Count() uint64
func (*BoundedQueue) IsClosed ¶
func (q *BoundedQueue) IsClosed() bool
func (*BoundedQueue) Pop ¶
func (q *BoundedQueue) Pop() (Element, error)
func (*BoundedQueue) Push ¶
func (q *BoundedQueue) Push(elem Element) error
type CallbackFunc ¶
type CallbackFunc func(arg ...interface{}) error
type CbAuthHandler ¶
cbauth admin authentication helper Uses default cbauth env variables internally to provide auth creds
func (*CbAuthHandler) AuthenticateMemcachedConn ¶
func (ah *CbAuthHandler) AuthenticateMemcachedConn(host string, conn *memcached.Client) error
func (*CbAuthHandler) GetCredentials ¶
func (ah *CbAuthHandler) GetCredentials() (string, string)
type ClusterInfoCache ¶
Helper object for fetching cluster information Can be used by services running on a cluster node to connect with local management service for obtaining cluster information. Info cache can be updated by using Refresh() method.
func FetchNewClusterInfoCache ¶
func FetchNewClusterInfoCache(clusterUrl string) (*ClusterInfoCache, error)
func (*ClusterInfoCache) Fetch ¶
func (c *ClusterInfoCache) Fetch() error
func (*ClusterInfoCache) GetActiveEventingNodes ¶
func (c *ClusterInfoCache) GetActiveEventingNodes() (nodes []couchbase.Node)
func (*ClusterInfoCache) GetBucketUUID ¶
func (c *ClusterInfoCache) GetBucketUUID(bucket string) (uuid string)
Return UUID of a given bucket.
func (*ClusterInfoCache) GetClusterVersion ¶
func (c *ClusterInfoCache) GetClusterVersion() (int, int)
func (*ClusterInfoCache) GetCurrentNode ¶
func (c *ClusterInfoCache) GetCurrentNode() NodeId
func (*ClusterInfoCache) GetFailedEventingNodes ¶
func (c *ClusterInfoCache) GetFailedEventingNodes() (nodes []couchbase.Node)
func (*ClusterInfoCache) GetLocalHostAddress ¶
func (c *ClusterInfoCache) GetLocalHostAddress() (string, error)
func (*ClusterInfoCache) GetLocalHostname ¶
func (c *ClusterInfoCache) GetLocalHostname() (string, error)
func (*ClusterInfoCache) GetLocalServerGroup ¶
func (c *ClusterInfoCache) GetLocalServerGroup() (string, error)
func (*ClusterInfoCache) GetLocalServiceAddress ¶
func (c *ClusterInfoCache) GetLocalServiceAddress(srvc string) (string, error)
func (*ClusterInfoCache) GetLocalServiceHost ¶
func (c *ClusterInfoCache) GetLocalServiceHost(srvc string) (string, error)
func (*ClusterInfoCache) GetLocalServicePort ¶
func (c *ClusterInfoCache) GetLocalServicePort(srvc string) (string, error)
func (*ClusterInfoCache) GetNewEventingNodes ¶
func (c *ClusterInfoCache) GetNewEventingNodes() (nodes []couchbase.Node)
func (*ClusterInfoCache) GetNodeStatus ¶
func (c *ClusterInfoCache) GetNodeStatus(nid NodeId) (string, error)
func (*ClusterInfoCache) GetNodesByBucket ¶
func (c *ClusterInfoCache) GetNodesByBucket(bucket string) (nids []NodeId, err error)
func (*ClusterInfoCache) GetNodesByServiceType ¶
func (c *ClusterInfoCache) GetNodesByServiceType(srvc string) (nids []NodeId)
func (*ClusterInfoCache) GetServerGroup ¶
func (c *ClusterInfoCache) GetServerGroup(nid NodeId) string
func (*ClusterInfoCache) GetServiceAddress ¶
func (c *ClusterInfoCache) GetServiceAddress(nid NodeId, srvc string) (addr string, err error)
func (*ClusterInfoCache) GetVBuckets ¶
func (c *ClusterInfoCache) GetVBuckets(nid NodeId, bucket string) (vbs []uint32, err error)
func (*ClusterInfoCache) IsEphemeral ¶
func (c *ClusterInfoCache) IsEphemeral(bucket string) (bool, error)
func (*ClusterInfoCache) IsMemcached ¶
func (c *ClusterInfoCache) IsMemcached(bucket string) (bool, error)
func (*ClusterInfoCache) IsNodeHealthy ¶
func (c *ClusterInfoCache) IsNodeHealthy(nid NodeId) (bool, error)
func (*ClusterInfoCache) SetLogPrefix ¶
func (c *ClusterInfoCache) SetLogPrefix(p string)
func (*ClusterInfoCache) SetMaxRetries ¶
func (c *ClusterInfoCache) SetMaxRetries(r int)
func (*ClusterInfoCache) SetServicePorts ¶
func (c *ClusterInfoCache) SetServicePorts(portMap map[string]string)
type ConfigHolder ¶
type ConfigHolder struct {
// contains filtered or unexported fields
}
func (*ConfigHolder) Load ¶
func (h *ConfigHolder) Load() Config
func (*ConfigHolder) Store ¶
func (h *ConfigHolder) Store(conf Config)
type DynamicAuthenticator ¶
type DynamicAuthenticator struct {
Caller string
}
func (*DynamicAuthenticator) Credentials ¶
func (dynAuth *DynamicAuthenticator) Credentials(req gocb.AuthCredsRequest) ([]gocb.UserPassPair, error)
type ExponentialBackoff ¶
type ExponentialBackoff struct { InitialInterval time.Duration RandomizationFactor float64 Multiplier float64 MaxInterval time.Duration // After MaxElapsedTime the ExponentialBackoff stops. // It never stops if MaxElapsedTime == 0. MaxElapsedTime time.Duration Clock Clock // contains filtered or unexported fields }
func NewExponentialBackoff ¶
func NewExponentialBackoff() *ExponentialBackoff
func (*ExponentialBackoff) GetElapsedTime ¶
func (b *ExponentialBackoff) GetElapsedTime() time.Duration
func (*ExponentialBackoff) NextBackoff ¶
func (b *ExponentialBackoff) NextBackoff() time.Duration
func (*ExponentialBackoff) Reset ¶
func (b *ExponentialBackoff) Reset()
type FixedBackoff ¶
func NewFixedBackoff ¶
func NewFixedBackoff(d time.Duration) *FixedBackoff
func (*FixedBackoff) NextBackoff ¶
func (b *FixedBackoff) NextBackoff() time.Duration
func (*FixedBackoff) Reset ¶
func (b *FixedBackoff) Reset()
type GocbLogger ¶
type GocbLogger struct{}
type NamedParamsInfo ¶
type NamedParamsInfo struct { PInfo ParseInfo `json:"p_info"` NamedParams []string `json:"named_params"` }
func GetNamedParams ¶
func GetNamedParams(query string) (info *NamedParamsInfo)
type ParseInfo ¶
type ParseInfo struct { IsValid bool `json:"is_valid"` IsSelectQuery bool `json:"is_select_query"` IsDmlQuery bool `json:"is_dml_query"` KeyspaceName string `json:"keyspace_name"` Info string `json:"info"` }
func (*ParseInfo) FlattenParseInfo ¶
type PayloadHash ¶
type RetryHelper ¶
type RetryHelper struct {
// contains filtered or unexported fields
}
Helper object to execute a function with retries and exponential backoff
func NewRetryHelper ¶
func NewRetryHelper( maxRetries int, interval time.Duration, factor int, call retryFunc) *RetryHelper
func (*RetryHelper) Run ¶
func (r *RetryHelper) Run() error
type Uint16Slice ¶
type Uint16Slice []uint16
func (Uint16Slice) Len ¶
func (s Uint16Slice) Len() int
func (Uint16Slice) Less ¶
func (s Uint16Slice) Less(i, j int) bool
func (Uint16Slice) Swap ¶
func (s Uint16Slice) Swap(i, j int)