consumer

package
v0.0.0-...-8836d2b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2018 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// ClusterChangeNotifChBufSize limits buffer size for cluster change notif from producer
	ClusterChangeNotifChBufSize = 10
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Consumer is responsible interacting with c++ v8 worker over local tcp port

func NewConsumer

func NewConsumer(hConfig *common.HandlerConfig, pConfig *common.ProcessConfig, rConfig *common.RebalanceConfig,
	index int, uuid string, eventingNodeUUIDs []string, vbnos []uint16, app *common.AppConfig,
	dcpConfig map[string]interface{}, p common.EventingProducer, s common.EventingSuperSup,
	numVbuckets int, retryCount *int64, vbEventingNodeAssignMap map[uint16]string,
	workerVbucketMap map[string][]uint16) *Consumer

NewConsumer called by producer to create consumer handle

func (*Consumer) ClearEventStats

func (c *Consumer) ClearEventStats()

ClearEventStats flushes event processing stats

func (*Consumer) ConsumerName

func (c *Consumer) ConsumerName() string

ConsumerName returns consumer name e.q <event_handler_name>_worker_1

func (*Consumer) DcpEventsRemainingToProcess

func (c *Consumer) DcpEventsRemainingToProcess() uint64

DcpEventsRemainingToProcess reports cached value for dcp events remaining to producer

func (*Consumer) EventingNodeUUIDs

func (c *Consumer) EventingNodeUUIDs() []string

EventingNodeUUIDs return list of known eventing node uuids

func (*Consumer) EventsProcessedPSec

func (c *Consumer) EventsProcessedPSec() *cm.EventProcessingStats

EventsProcessedPSec reports dcp + timer events triggered per sec

func (*Consumer) GetEventProcessingStats

func (c *Consumer) GetEventProcessingStats() map[string]uint64

GetEventProcessingStats exposes dcp/timer processing stats

func (*Consumer) GetExecutionStats

func (c *Consumer) GetExecutionStats() map[string]interface{}

GetExecutionStats returns OnUpdate/OnDelete success/failure stats for event handlers from cpp world

func (*Consumer) GetFailureStats

func (c *Consumer) GetFailureStats() map[string]interface{}

GetFailureStats returns failure stats for event handlers from cpp world

func (*Consumer) GetHandlerCode

func (c *Consumer) GetHandlerCode() string

GetHandlerCode returns handler code to assist V8 debugger

func (*Consumer) GetLatencyStats

func (c *Consumer) GetLatencyStats() map[string]uint64

GetLatencyStats returns latency stats for event handlers from from cpp world

func (*Consumer) GetLcbExceptionsStats

func (c *Consumer) GetLcbExceptionsStats() map[string]uint64

GetLcbExceptionsStats returns libcouchbase exception stats from CPP workers

func (*Consumer) GetMetaStoreStats

func (c *Consumer) GetMetaStoreStats() map[string]uint64

GetMetaStoreStats exposes timer store related stat counters

func (*Consumer) GetSourceMap

func (c *Consumer) GetSourceMap() string

GetSourceMap returns source map to assist V8 debugger

func (*Consumer) HandleV8Worker

func (c *Consumer) HandleV8Worker() error

HandleV8Worker sets up CPP V8 worker post its bootstrap

func (*Consumer) HostPortAddr

func (c *Consumer) HostPortAddr() string

HostPortAddr returns the HostPortAddr combination of current eventing node e.g. 127.0.0.1:25000

func (*Consumer) Index

func (c *Consumer) Index() int

Index returns the index of consumer among all consumers designated for specific handler on an eventing node

func (*Consumer) InternalVbDistributionStats

func (c *Consumer) InternalVbDistributionStats() []uint16

InternalVbDistributionStats returns internal state of vbucket ownership distribution on local eventing node

func (*Consumer) NodeUUID

func (c *Consumer) NodeUUID() string

NodeUUID returns UUID that's supplied by ns_server from command line

func (*Consumer) NotifyClusterChange

func (c *Consumer) NotifyClusterChange()

NotifyClusterChange is called by producer handle to signify each consumer instance about StartTopologyChange rpc call from cbauth service.Manager

func (*Consumer) NotifyRebalanceStop

func (c *Consumer) NotifyRebalanceStop()

NotifyRebalanceStop is called by producer to signal stopping of rebalance operation

func (*Consumer) NotifySettingsChange

func (c *Consumer) NotifySettingsChange()

NotifySettingsChange signals consumer instance of settings update

func (*Consumer) Pid

func (c *Consumer) Pid() int

Pid returns the process id of CPP V8 worker

func (*Consumer) RebalanceStatus

func (c *Consumer) RebalanceStatus() bool

RebalanceStatus returns state of rebalance for consumer instance

func (*Consumer) RebalanceTaskProgress

func (c *Consumer) RebalanceTaskProgress() *cm.RebalanceProgress

RebalanceTaskProgress reports progress to producer

func (*Consumer) Serve

func (c *Consumer) Serve()

Serve acts as init routine for consumer handle

func (*Consumer) SetConnHandle

func (c *Consumer) SetConnHandle(conn net.Conn)

SetConnHandle sets the tcp connection handle for CPP V8 worker

func (*Consumer) SetFeedbackConnHandle

func (c *Consumer) SetFeedbackConnHandle(conn net.Conn)

SetFeedbackConnHandle initialised the socket connect for data channel from eventing-consumer

func (*Consumer) SetRebalanceStatus

func (c *Consumer) SetRebalanceStatus(status bool)

SetRebalanceStatus update rebalance status for consumer instance

func (*Consumer) SignalBootstrapFinish

func (c *Consumer) SignalBootstrapFinish()

SignalBootstrapFinish is leveraged by Eventing.Producer instance to know if corresponding Eventing.Consumer instance has finished bootstrap

func (*Consumer) SignalConnected

func (c *Consumer) SignalConnected()

SignalConnected notifies consumer routine when CPP V8 worker has connected to tcp listener instance

func (*Consumer) SignalFeedbackConnected

func (c *Consumer) SignalFeedbackConnected()

SignalFeedbackConnected notifies consumer routine when CPP V8 worker has connected to data channel

func (*Consumer) SignalStopDebugger

func (c *Consumer) SignalStopDebugger() error

SignalStopDebugger signal C++ consumer to stop debugger

func (*Consumer) SpawnCompilationWorker

func (c *Consumer) SpawnCompilationWorker(appCode, appContent, appName, eventingPort string, handlerHeaders, handlerFooters []string) (*common.CompileStatus, error)

SpawnCompilationWorker bring up a CPP worker to compile the user supplied handler code

func (*Consumer) Stop

func (c *Consumer) Stop()

Stop acts terminate routine for consumer handle

func (*Consumer) String

func (c *Consumer) String() string

func (*Consumer) TimerDebugStats

func (c *Consumer) TimerDebugStats() map[int]map[string]interface{}

TimerDebugStats captures timer related stats to assist in debugging mismatches during rebalance

func (*Consumer) UpdateEventingNodesUUIDs

func (c *Consumer) UpdateEventingNodesUUIDs(keepNodes, ejectNodes []string)

UpdateEventingNodesUUIDs is called by producer instance to notify about updated list of node uuids

func (*Consumer) UpdateWorkerQueueMemCap

func (c *Consumer) UpdateWorkerQueueMemCap(quota int64)

UpdateWorkerQueueMemCap revises the memory cap for cpp worker, dcp and timer queues

func (*Consumer) VbDcpEventsRemainingToProcess

func (c *Consumer) VbDcpEventsRemainingToProcess() map[int]int64

VbDcpEventsRemainingToProcess reports cached dcp events remaining broken down to vbucket level

func (*Consumer) VbEventingNodeAssignMapUpdate

func (c *Consumer) VbEventingNodeAssignMapUpdate(vbEventingNodeAssignMap map[uint16]string)

VbEventingNodeAssignMapUpdate captures updated node to vbucket assignment

func (*Consumer) VbProcessingStats

func (c *Consumer) VbProcessingStats() map[uint16]map[string]interface{}

VbProcessingStats exposes consumer vb metadata to producer

func (*Consumer) VbSeqnoStats

func (c *Consumer) VbSeqnoStats() map[int]map[string]interface{}

VbSeqnoStats returns seq no stats, which can be useful in figuring out missed events during rebalance

func (*Consumer) WorkerVbMapUpdate

func (c *Consumer) WorkerVbMapUpdate(workerVbucketMap map[string][]uint16)

WorkerVbMapUpdate captures updated mapping of active consumers to vbuckets they should handle as per static planner

type OwnershipEntry

type OwnershipEntry struct {
	AssignedWorker string `json:"assigned_worker"`
	CurrentVBOwner string `json:"current_vb_owner"`
	Operation      string `json:"operation"`
	SeqNo          uint64 `json:"seq_no"`
	Timestamp      string `json:"timestamp"`
}

OwnershipEntry captures the state of vbucket within the metadata blob

type TimerInfo

type TimerInfo struct {
	Epoch     int64  `json:"epoch"`
	Vb        uint64 `json:"vb"`
	SeqNum    uint64 `json:"seq_num"`
	Callback  string `json:"callback"`
	Reference string `json:"reference"`
	Context   string `json:"context"`
}

TimerInfo is the struct sent by C++ worker to create the timer

func (*TimerInfo) Size

func (info *TimerInfo) Size() uint64

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL