Documentation ¶
Index ¶
- func FeedConfigParams() []string
- func NewFakeBuckets(buckets []string) map[string]*FakeBucket
- type Average
- type BucketAccess
- type BucketFeeder
- type Engine
- func (engine *Engine) Endpoints() []string
- func (engine *Engine) SnapshotData(m *mc.DcpEvent, vbno uint16, vbuuid, seqno uint64) interface{}
- func (engine *Engine) StreamBeginData(vbno uint16, vbuuid, seqno uint64) interface{}
- func (engine *Engine) StreamEndData(vbno uint16, vbuuid, seqno uint64) interface{}
- func (engine *Engine) SyncData(vbno uint16, vbuuid, seqno uint64) interface{}
- func (engine *Engine) TransformRoute(vbuuid uint64, m *mc.DcpEvent, data map[string]interface{}, encodeBuf []byte) ([]byte, error)
- type FakeBucket
- func (b *FakeBucket) Close(kvaddr string)
- func (b *FakeBucket) CloseFeed() (err error)
- func (b *FakeBucket) EndVbStreams(opaque uint16, ts *protobuf.TsVbuuid) (err error)
- func (b *FakeBucket) GetChannel() <-chan *mc.DcpEvent
- func (b *FakeBucket) GetFailoverLogs(opaque uint16, vbnos []uint16, conf map[string]interface{}) (couchbase.FailoverLog, error)
- func (b *FakeBucket) GetVBmap(kvaddrs []string) (map[string][]uint16, error)
- func (b *FakeBucket) OpenKVFeed(kvaddr string) (BucketFeeder, error)
- func (b *FakeBucket) SetFailoverLog(vbno uint16, flog [][2]uint64)
- func (b *FakeBucket) SetVbmap(kvaddr string, vbnos []uint16)
- func (b *FakeBucket) StartVbStreams(opaque uint16, ts *protobuf.TsVbuuid) (err error)
- type FakeStream
- type Feed
- func (feed *Feed) AddBuckets(req *protobuf.AddBucketsRequest, opaque uint16) (*protobuf.TopicResponse, error)
- func (feed *Feed) AddInstances(req *protobuf.AddInstancesRequest, opaque uint16) (*protobuf.TimestampResponse, error)
- func (feed *Feed) DelBuckets(req *protobuf.DelBucketsRequest, opaque uint16) error
- func (feed *Feed) DelInstances(req *protobuf.DelInstancesRequest, opaque uint16) error
- func (feed *Feed) DeleteEndpoint(raddr string) error
- func (feed *Feed) GetOpaque() uint16
- func (feed *Feed) GetStatistics() c.Statistics
- func (feed *Feed) GetTopicResponse() *protobuf.TopicResponse
- func (feed *Feed) MutationTopic(req *protobuf.MutationTopicRequest, opaque uint16) (*protobuf.TopicResponse, error)
- func (feed *Feed) Ping() error
- func (feed *Feed) PostFinKVdata(bucket string)
- func (feed *Feed) PostStreamEnd(bucket string, m *mc.DcpEvent)
- func (feed *Feed) PostStreamRequest(bucket string, m *mc.DcpEvent)
- func (feed *Feed) RepairEndpoints(req *protobuf.RepairEndpointsRequest, opaque uint16) error
- func (feed *Feed) ResetConfig(config c.Config) error
- func (feed *Feed) RestartVbuckets(req *protobuf.RestartVbucketsRequest, opaque uint16) (*protobuf.TopicResponse, error)
- func (feed *Feed) Shutdown(opaque uint16) error
- func (feed *Feed) ShutdownVbuckets(req *protobuf.ShutdownVbucketsRequest, opaque uint16) error
- func (feed *Feed) StaleCheck(staleTimeout int) (string, error)
- type KVData
- func (kvdata *KVData) AddEngines(opaque uint16, engines map[uint64]*Engine, ...) (map[uint16]uint64, error)
- func (kvdata *KVData) Close() error
- func (kvdata *KVData) DeleteEngines(opaque uint16, engineKeys []uint64) error
- func (kvdata *KVData) GetStatistics() map[string]interface{}
- func (kvdata *KVData) ReloadHeartbeat() error
- func (kvdata *KVData) ResetConfig(config c.Config) error
- func (kvdata *KVData) UpdateTs(opaque uint16, ts *protobuf.TsVbuuid) error
- type Projector
- func (p *Projector) AddFeed(topic string, feed *Feed) (err error)
- func (p *Projector) DelFeed(topic string) (err error)
- func (p *Projector) GetConfig() c.Config
- func (p *Projector) GetFeed(topic string) (*Feed, error)
- func (p *Projector) GetFeedConfig() c.Config
- func (p *Projector) GetFeeds() []*Feed
- func (p *Projector) ResetConfig(config c.Config)
- type Subscriber
- type Vbucket
- type VbucketWorker
- func (worker *VbucketWorker) AddEngines(opaque uint16, engines map[uint64]*Engine, ...) (map[uint16]uint64, error)
- func (worker *VbucketWorker) Close() error
- func (worker *VbucketWorker) DeleteEngines(opaque uint16, engines []uint64) error
- func (worker *VbucketWorker) Event(m *mc.DcpEvent) error
- func (worker *VbucketWorker) GetStatistics() (map[string]interface{}, error)
- func (worker *VbucketWorker) GetVbuckets() ([]*Vbucket, error)
- func (worker *VbucketWorker) ResetConfig(config c.Config) error
- func (worker *VbucketWorker) SyncPulse() error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FeedConfigParams ¶
func FeedConfigParams() []string
FeedConfigParams return the list of configuration params supported by a feed.
func NewFakeBuckets ¶
func NewFakeBuckets(buckets []string) map[string]*FakeBucket
NewFakeBuckets returns a reference to new FakeBucket.
Types ¶
type Average ¶
type Average struct {
// contains filtered or unexported fields
}
Average maintains the average and variance of a stream of numbers in a space-efficient manner.
type BucketAccess ¶
type BucketAccess interface { // Refresh bucket meta information like vbmap Refresh() error // GetVBmap returns a map of `kvaddr` to list of vbuckets hosted in a kv // node. GetVBmap(kvaddrs []string) (map[string][]uint16, error) // FailoverLog fetch the failover log for specified vbucket GetFailoverLogs( opaque uint16, vbuckets []uint16, config map[string]interface{}) (couchbase.FailoverLog, error) // Close this bucket. Close() }
BucketAccess interface manage a subset of vbucket streams with mutiple KV nodes. To be implemented by couchbase.Bucket type.
type BucketFeeder ¶
type BucketFeeder interface { // GetChannel return a mutation channel. GetChannel() (mutch <-chan *mc.DcpEvent) // StartVbStreams starts a set of vbucket streams on this feed. // returns list of vbuckets for which StreamRequest is successfully // posted. StartVbStreams(opaque uint16, ts *protobuf.TsVbuuid) error // EndVbStreams ends an existing vbucket stream from this feed. EndVbStreams(opaque uint16, endTs *protobuf.TsVbuuid) error // CloseFeed ends all active streams on this feed and free its resources. CloseFeed() (err error) }
BucketFeeder interface from a BucketAccess object.
func OpenBucketFeed ¶
func OpenBucketFeed( feedname couchbase.DcpFeedName, b *couchbase.Bucket, opaque uint16, kvaddrs []string, config map[string]interface{}) (feeder BucketFeeder, err error)
OpenBucketFeed opens feed for bucket.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine is immutable structure defined for each index, or any other entity that wants projection and routing over kv-mutations.
func (*Engine) SnapshotData ¶
SnapshotData from this engine.
func (*Engine) StreamBeginData ¶
StreamBeginData from this engine.
func (*Engine) StreamEndData ¶
StreamEndData from this engine.
type FakeBucket ¶
FakeBucket fot unit testing.
func (*FakeBucket) Close ¶
func (b *FakeBucket) Close(kvaddr string)
Close is method receiver for BucketAccess interface
func (*FakeBucket) CloseFeed ¶
func (b *FakeBucket) CloseFeed() (err error)
CloseFeed is method receiver for BucketFeeder interface
func (*FakeBucket) EndVbStreams ¶
func (b *FakeBucket) EndVbStreams( opaque uint16, ts *protobuf.TsVbuuid) (err error)
EndVbStreams is method receiver for BucketFeeder interface
func (*FakeBucket) GetChannel ¶
func (b *FakeBucket) GetChannel() <-chan *mc.DcpEvent
GetChannel is method receiver for BucketFeeder interface
func (*FakeBucket) GetFailoverLogs ¶
func (b *FakeBucket) GetFailoverLogs( opaque uint16, vbnos []uint16, conf map[string]interface{}) (couchbase.FailoverLog, error)
GetFailoverLogs is method receiver for BucketAccess interface
func (*FakeBucket) GetVBmap ¶
func (b *FakeBucket) GetVBmap(kvaddrs []string) (map[string][]uint16, error)
GetVBmap is method receiver for BucketAccess interface
func (*FakeBucket) OpenKVFeed ¶
func (b *FakeBucket) OpenKVFeed(kvaddr string) (BucketFeeder, error)
OpenKVFeed is method receiver for BucketAccess interface
func (*FakeBucket) SetFailoverLog ¶
func (b *FakeBucket) SetFailoverLog(vbno uint16, flog [][2]uint64)
SetFailoverLog fake initialization method.
func (*FakeBucket) SetVbmap ¶
func (b *FakeBucket) SetVbmap(kvaddr string, vbnos []uint16)
SetVbmap fake initialization method.
func (*FakeBucket) StartVbStreams ¶
func (b *FakeBucket) StartVbStreams( opaque uint16, ts *protobuf.TsVbuuid) (err error)
StartVbStreams is method receiver for BucketFeeder interface
type FakeStream ¶
type FakeStream struct {
// contains filtered or unexported fields
}
FakeStream fot unit testing.
type Feed ¶
type Feed struct {
// contains filtered or unexported fields
}
Feed is mutation stream - for maintenance, initial-load, catchup etc...
func NewFeed ¶
func NewFeed( pooln, topic string, projector *Projector, config c.Config, opaque uint16) (*Feed, error)
NewFeed creates a new topic feed. `config` contains following keys.
clusterAddr: KV cluster address <host:port>. feedWaitStreamReqTimeout: wait for a response to StreamRequest feedWaitStreamEndTimeout: wait for a response to StreamEnd feedChanSize: channel size for feed's control path and back path mutationChanSize: channel size of projector's data path routine syncTimeout: timeout, in ms, for sending periodic Sync messages kvstatTick: timeout, in ms, for logging kvstats routerEndpointFactory: endpoint factory
func (*Feed) AddBuckets ¶
func (feed *Feed) AddBuckets( req *protobuf.AddBucketsRequest, opaque uint16) (*protobuf.TopicResponse, error)
AddBuckets will remove buckets and all its upstream and downstream elements, except endpoints. Synchronous call.
func (*Feed) AddInstances ¶
func (feed *Feed) AddInstances( req *protobuf.AddInstancesRequest, opaque uint16) (*protobuf.TimestampResponse, error)
AddInstances will restart specified endpoint-address if it is not active already. Synchronous call.
func (*Feed) DelBuckets ¶
func (feed *Feed) DelBuckets( req *protobuf.DelBucketsRequest, opaque uint16) error
DelBuckets will remove buckets and all its upstream and downstream elements, except endpoints. Synchronous call.
func (*Feed) DelInstances ¶
func (feed *Feed) DelInstances( req *protobuf.DelInstancesRequest, opaque uint16) error
DelInstances will restart specified endpoint-address if it is not active already. Synchronous call.
func (*Feed) DeleteEndpoint ¶
DeleteEndpoint will delete the specified endpoint address from feed.
func (*Feed) GetStatistics ¶
func (feed *Feed) GetStatistics() c.Statistics
GetStatistics for this feed. Synchronous call.
func (*Feed) GetTopicResponse ¶
func (feed *Feed) GetTopicResponse() *protobuf.TopicResponse
GetTopicResponse for this feed. Synchronous call.
func (*Feed) MutationTopic ¶
func (feed *Feed) MutationTopic( req *protobuf.MutationTopicRequest, opaque uint16) (*protobuf.TopicResponse, error)
MutationTopic will start the feed. Synchronous call.
func (*Feed) PostFinKVdata ¶
PostFinKVdata feedback from data-path. Asynchronous call.
func (*Feed) PostStreamEnd ¶
PostStreamEnd feedback from data-path. Asynchronous call.
func (*Feed) PostStreamRequest ¶
PostStreamRequest feedback from data-path. Asynchronous call.
func (*Feed) RepairEndpoints ¶
func (feed *Feed) RepairEndpoints( req *protobuf.RepairEndpointsRequest, opaque uint16) error
RepairEndpoints will restart specified endpoint-address if it is not active already. Synchronous call.
func (*Feed) ResetConfig ¶
ResetConfig for this feed.
func (*Feed) RestartVbuckets ¶
func (feed *Feed) RestartVbuckets( req *protobuf.RestartVbucketsRequest, opaque uint16) (*protobuf.TopicResponse, error)
RestartVbuckets will restart upstream vbuckets for specified buckets. Synchronous call.
func (*Feed) Shutdown ¶
Shutdown feed, its upstream connection with kv and downstream endpoints. Synchronous call.
func (*Feed) ShutdownVbuckets ¶
func (feed *Feed) ShutdownVbuckets( req *protobuf.ShutdownVbucketsRequest, opaque uint16) error
ShutdownVbuckets will shutdown streams for specified buckets. Synchronous call.
type KVData ¶
type KVData struct {
// contains filtered or unexported fields
}
KVData captures an instance of data-path for single kv-node from upstream connection.
func NewKVData ¶
func NewKVData( feed *Feed, bucket string, opaque uint16, reqTs *protobuf.TsVbuuid, engines map[uint64]*Engine, endpoints map[string]c.RouterEndpoint, mutch <-chan *mc.DcpEvent, config c.Config) *KVData
NewKVData create a new data-path instance.
func (*KVData) AddEngines ¶
func (kvdata *KVData) AddEngines( opaque uint16, engines map[uint64]*Engine, endpoints map[string]c.RouterEndpoint) (map[uint16]uint64, error)
AddEngines and endpoints, synchronous call.
func (*KVData) DeleteEngines ¶
DeleteEngines synchronous call.
func (*KVData) GetStatistics ¶
GetStatistics from kv data path, synchronous call.
func (*KVData) ReloadHeartbeat ¶
ReloadHeartbeat for kvdata.
func (*KVData) ResetConfig ¶
ResetConfig for kvdata.
type Projector ¶
type Projector struct {
// contains filtered or unexported fields
}
Projector data structure, a projector is connected to one or more upstream kv-nodes. Works in tandem with projector's adminport.
func NewProjector ¶
NewProjector creates a news projector instance and starts a corresponding adminport.
func (*Projector) AddFeed ¶
AddFeed object for `topic`. - return ErrorTopicExist if topic is duplicate.
func (*Projector) DelFeed ¶
DelFeed object for `topic`. - return ErrorTopicMissing if topic is not started.
func (*Projector) GetFeed ¶
GetFeed object for `topic`. - return ErrorTopicMissing if topic is not started.
func (*Projector) GetFeedConfig ¶
GetFeedConfig from current configuration settings.
func (*Projector) ResetConfig ¶
ResetConfig accepts a full-set or subset of global configuration and updates projector related fields.
type Subscriber ¶
type Subscriber interface { // GetEvaluators will return a map of uuid to Evaluator interface. // - return ErrorInconsistentFeed for malformed tables. GetEvaluators() (map[uint64]c.Evaluator, error) // GetRouters will return a map of uuid to Router interface. // - return ErrorInconsistentFeed for malformed tables. GetRouters() (map[uint64]c.Router, error) }
Subscriber interface abstracts engines (aka instances) that can supply `evaluators`, to transform mutations into custom-messages, and `routers`, to supply distribution topology for custom-messages.
type Vbucket ¶
type Vbucket struct {
// contains filtered or unexported fields
}
Vbucket is immutable structure defined for each vbucket.
type VbucketWorker ¶
type VbucketWorker struct {
// contains filtered or unexported fields
}
VbucketWorker is immutable structure defined for each vbucket.
func NewVbucketWorker ¶
func NewVbucketWorker( id int, feed *Feed, bucket string, opaque uint16, config c.Config) *VbucketWorker
NewVbucketWorker creates a new routine to handle this vbucket stream.
func (*VbucketWorker) AddEngines ¶
func (worker *VbucketWorker) AddEngines( opaque uint16, engines map[uint64]*Engine, endpoints map[string]c.RouterEndpoint) (map[uint16]uint64, error)
AddEngines update active set of engines and endpoints, synchronous call.
func (*VbucketWorker) Close ¶
func (worker *VbucketWorker) Close() error
Close worker-routine, synchronous call.
func (*VbucketWorker) DeleteEngines ¶
func (worker *VbucketWorker) DeleteEngines( opaque uint16, engines []uint64) error
DeleteEngines delete engines and update endpoints synchronous call.
func (*VbucketWorker) Event ¶
func (worker *VbucketWorker) Event(m *mc.DcpEvent) error
Event will post an DcpEvent, asychronous call.
func (*VbucketWorker) GetStatistics ¶
func (worker *VbucketWorker) GetStatistics() (map[string]interface{}, error)
GetStatistics for worker vbucket, synchronous call.
func (*VbucketWorker) GetVbuckets ¶
func (worker *VbucketWorker) GetVbuckets() ([]*Vbucket, error)
GetVbuckets will return the list of active vbuckets managed by this workers.
func (*VbucketWorker) ResetConfig ¶
func (worker *VbucketWorker) ResetConfig(config c.Config) error
ResetConfig for worker-routine, synchronous call.
func (*VbucketWorker) SyncPulse ¶
func (worker *VbucketWorker) SyncPulse() error
SyncPulse will trigger worker to generate a sync pulse for all its vbuckets, asychronous call.