common

package
v0.0.0-...-31abfad Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2024 License: Apache-2.0 Imports: 29 Imported by: 7

Documentation

Index

Constants

View Source
const (
	DcpEverything = DcpStreamBoundary("everything")
	DcpFromNow    = DcpStreamBoundary("from_now")
	DcpFromPrior  = DcpStreamBoundary("from_prior")
)
View Source
const (
	AppLocationTag  = "appLocation"
	AppLocationsTag = "appLocations"
	ReasonTag       = "reason"
)
View Source
const (
	StartRebalanceCType = ChangeType("start-rebalance")
	StopRebalanceCType  = ChangeType("stop-rebalance")
	StartFailoverCType  = ChangeType("start-failover")
)
View Source
const (
	AppState int8 = iota
	AppStateUndeployed
	AppStateEnabled
	AppStatePaused
	AppStateUnexpected
)
View Source
const (
	WaitingForMutation    = "WaitingForMutation" // Debugger has been started and consumers are waiting to trap
	MutationTrapped       = "MutationTrapped"    // One of the consumers have trapped the mutation
	DebuggerTokenKey      = "debugger"
	MetakvDebuggerPath    = MetakvEventingPath + "debugger/"
	MetakvTempAppsPath    = MetakvEventingPath + "tempApps/"
	MetakvCredentialsPath = MetakvEventingPath + "credentials/"
)
View Source
const (
	DebuggerCheckpoint checkpointType = iota
	Checkpoint
)
View Source
const (
	Ipv4 = IpMode("ipv4")
	Ipv6 = IpMode("ipv6")
)
View Source
const (
	KiB int = 1 << ((iota + 1) * 10) // Equivalent to 2^10
	MiB                              // Equivalent to 2^20
	GiB                              // Equivalent to 2^30
)
View Source
const (
	QueryVbMapVersion = "version"
	NewResponse       = "newResponse"
	TenantID          = "tenantID"
)
View Source
const (
	MetakvEventingPath             = "/eventing"
	EventingTopologyPath           = MetakvEventingPath + "/rebalanceToken/" // Listen to this and based on full path decide whether first one or second one is triggerd
	EventingFunctionPath           = MetakvEventingPath + "/tempApps/"
	EventingConfigPath             = MetakvEventingPath + "/settings/config/"
	EventingMetakvConfigKeepNodes  = MetakvEventingPath + "/config/keepNodes"
	EventingTenantDistributionPath = MetakvEventingPath + "/tenantDistribution"
	EventingDebuggerPath           = MetakvEventingPath + "/debugger/"
)
View Source
const (
	EventingFunctionPathTemplate       = EventingFunctionPath + "%s/%d"
	EventingFunctionCredentialTemplate = MetakvEventingPath + "/credentials/%s"
	EventingConfigPathTemplate         = EventingConfigPath + "%s"
	EventingDebuggerPathTemplate       = EventingDebuggerPath + "%s"
)
View Source
const (
	PauseFunctionTemplate    = "/api/v1/functions/%s/pause"
	UndeployFunctionTemplate = "/api/v1/functions/%s/undeploy"
)
View Source
const (
	SYSTEM_EVENT_COMPONENT = "eventing"

	SUB_COMPONENT_EVENTING_PRODUCER = "eventing-producer"

	DEFAULT_TIMEOUT_SECS = 2
)
View Source
const (
	EVENTID_PRODUCER_STARTUP systemeventlog.EventId = 4096

	EVENTID_CONSUMER_STARTUP systemeventlog.EventId = 4097
	EVENTID_CONSUMER_CRASH   systemeventlog.EventId = 4098

	EVENTID_START_TRACING systemeventlog.EventId = 4099
	EVENTID_STOP_TRACING  systemeventlog.EventId = 4100

	EVENTID_START_DEBUGGER systemeventlog.EventId = 4101
	EVENTID_STOP_DEBUGGER  systemeventlog.EventId = 4102

	EVENTID_CREATE_FUNCTION   systemeventlog.EventId = 4103
	EVENTID_DELETE_FUNCTION   systemeventlog.EventId = 4104
	EVENTID_IMPORT_FUNCTIONS  systemeventlog.EventId = 4105
	EVENTID_EXPORT_FUNCTIONS  systemeventlog.EventId = 4106
	EVENTID_BACKUP_FUNCTION   systemeventlog.EventId = 4107
	EVENTID_RESTORE_FUNCTION  systemeventlog.EventId = 4108
	EVENTID_DEPLOY_FUNCTION   systemeventlog.EventId = 4109
	EVENTID_UNDEPLOY_FUNCTION systemeventlog.EventId = 4110
	EVENTID_PAUSE_FUNCTION    systemeventlog.EventId = 4111
	EVENTID_RESUME_FUNCTION   systemeventlog.EventId = 4112

	EVENTID_CLEANUP_EVENTING systemeventlog.EventId = 4113
	EVENTID_DIE              systemeventlog.EventId = 4114
	EVENTID_TRIGGER_GC       systemeventlog.EventId = 4115
	EVENTID_FREE_OS_MEMORY   systemeventlog.EventId = 4116

	EVENTID_UPDATE_CONFIG    systemeventlog.EventId = 4117
	EVENTID_CLEAR_STATISTICS systemeventlog.EventId = 4118
)
View Source
const (
	CurlFeature uint32 = 1 << iota
)
View Source
const (
	HttpCallWaitTime = 2 * time.Second
)
View Source
const (
	SystemScopeName = "_system"
)

Variables

View Source
var (
	Couchstore = StorageEngine("couchstore")
	Magma      = StorageEngine("magma")
)
View Source
var (
	ErrRetryTimeout           = errors.New("retry timeout")
	ErrEncryptionLevelChanged = errors.New("Encryption Level changed during boostrap")
	ErrHandleEmpty            = errors.New("Bucket handle not initialized")
	ErrOnDeployFail           = errors.New("OnDeploy execution failed")
)
View Source
var (
	TransactionMutationPrefix   = []byte("_txn:")
	SyncGatewayMutationPrefix   = []byte("_sync:")
	SyncGatewayAttachmentPrefix = []byte("_sync:att")
)
View Source
var BucketNotWatched = errors.New("Bucket not being watched")
View Source
var CouchbaseVerMap = map[string]CouchbaseVer{
	"vulcan": CouchbaseVer{
		// contains filtered or unexported fields
	},
	"alice": CouchbaseVer{
		// contains filtered or unexported fields
	},
	"mad-hatter": CouchbaseVer{
		// contains filtered or unexported fields
	},
	"cheshire-cat": CouchbaseVer{
		// contains filtered or unexported fields
	},
	"6.6.2": CouchbaseVer{
		// contains filtered or unexported fields
	},
}
View Source
var (
	DisableCurl = "disable_curl"
)
View Source
var ErrInvalidVersion = errors.New("invalid eventing version")
View Source
var (
	ErrNodeNotAvailable = errors.New("node not available")
)
View Source
var LanguageCompatibility = []string{"7.2.0", "6.6.2", "6.0.0", "6.5.0"}

missing default is filled by the index 0

View Source
var MetakvMaxRetries int64 = 60

Functions

func CheckAndReturnDefaultForScopeOrCollection

func CheckAndReturnDefaultForScopeOrCollection(key string) string

func CheckKeyspaceExist

func CheckKeyspaceExist(observer notifier.Observer, keyspace application.Keyspace) bool

func DistributeAndWaitWork

func DistributeAndWaitWork[T comparable](parallism int, batchSize int, initialiser func(chan<- T) error, cb func(waitGroup *sync.WaitGroup, workerChan <-chan T))

func GetCheckpointKey

func GetCheckpointKey(app *AppConfig, vb uint16, cType checkpointType) string

func GetCompositeState

func GetCompositeState(dStatus, pStatus bool) int8

func GetDefaultHandlerHeaders

func GetDefaultHandlerHeaders() []string

func GetLocalhost

func GetLocalhost(ipMode IpMode) string

func GetLogfileLocationAndName

func GetLogfileLocationAndName(parentDir string, locationString string) (string, string)

func GetRand16Byte

func GetRand16Byte() (uint32, error)

func HexLittleEndianToUint64

func HexLittleEndianToUint64(hexLE []byte) uint64

func InitialiseSystemEventLogger

func InitialiseSystemEventLogger(baseNsserverURL string)

func LogSystemEvent

func LogSystemEvent(eventId systemeventlog.EventId,
	severity systemeventlog.EventSeverity, extraAttributes interface{})

func RandomID

func RandomID() (string, error)

func RandomIDFromDict

func RandomIDFromDict(dict string) (string, error)

func StopServer

func StopServer(server *http.Server) error

StopServer shutdowns the given server

func Uint32ToHex

func Uint32ToHex(uint32Val uint32) string

Types

type AppConfig

type AppConfig struct {
	AppCode            string
	ParsedAppCode      string
	AppDeployState     string
	AppName            string
	AppLocation        string
	AppState           string
	AppVersion         string
	FunctionID         uint32
	FunctionInstanceID string
	LastDeploy         string
	Settings           map[string]interface{}
	UserPrefix         string
	FunctionScope      FunctionScope
}

AppConfig Application/Event handler configuration

type AppRebalanceProgress

type AppRebalanceProgress struct {
	ToOwn               []uint16 `json:"to_own"`
	ToClose             []uint16 `json:"to_close"`
	OwnedVbs            []uint16 `json:"currently_owned"`
	RebalanceInProgress bool     `json:"running_rebalance"`
}

type AppStatus

type AppStatus struct {
	CompositeStatus       string                `json:"composite_status"`
	Name                  string                `json:"name"`
	FunctionScope         application.Namespace `json:"function_scope"`
	NumBootstrappingNodes int                   `json:"num_bootstrapping_nodes"`
	NumDeployedNodes      int                   `json:"num_deployed_nodes"`
	DeploymentStatus      bool                  `json:"deployment_status"`
	ProcessingStatus      bool                  `json:"processing_status"`
	RedeployRequired      bool                  `json:"redeploy_required"`
	AppState              stateMachine.AppState `json:"-"`
}

type AppStatusResponse

type AppStatusResponse struct {
	Apps             []*AppStatus `json:"apps"`
	NumEventingNodes int          `json:"num_eventing_nodes"`
}

type Application

type Application struct {
	AppHandlers        string                 `json:"appcode"`
	DeploymentConfig   DepCfg                 `json:"depcfg"`
	EventingVersion    string                 `json:"version"`
	EnforceSchema      bool                   `json:"enforce_schema"`
	FunctionID         uint32                 `json:"handleruuid"`
	FunctionInstanceID string                 `json:"function_instance_id"`
	Name               string                 `json:"appname"`
	Settings           map[string]interface{} `json:"settings"`
	Metainfo           map[string]interface{} `json:"metainfo,omitempty"`
	Owner              *Owner
	FunctionScope      FunctionScope `json:"function_scope"`
}

type Broadcaster

type Broadcaster interface {
	Request(onlyThisNode bool, path string, request *pc.Request) ([][]byte, *pc.Response, error)
	RequestFor(nodeUUID string, path string, request *pc.Request) ([][]byte, *pc.Response, error)

	CloseBroadcaster()
}

func NewBroadcaster

func NewBroadcaster(observer notifier.Observer) (Broadcaster, error)

type Bucket

type Bucket struct {
	Alias          string `json:"alias"`
	BucketName     string `json:"bucket_name"`
	ScopeName      string `json:"scope_name"`
	CollectionName string `json:"collection_name"`
	Access         string `json:"access"`
}

type ChangeType

type ChangeType string

type ClusterSettings

type ClusterSettings struct {
	LocalUsername string
	LocalPassword string
	LocalAddress  string

	AdminHTTPPort string
	AdminSSLPort  string

	SslCAFile       string
	SslCertFile     string
	SslKeyFile      string
	ClientKeyFile   string
	ClientCertFile  string
	ExecutablePath  string
	EventingDir     string
	KvPort          string
	RestPort        string
	DebugPort       string
	UUID            string
	DiagDir         string
	MaxRunningNodes int
	IpMode          IpMode

	CurrentVersion *notifier.Version
}

func (*ClusterSettings) ProtocolVer

func (cs *ClusterSettings) ProtocolVer() string

func (*ClusterSettings) String

func (cs *ClusterSettings) String() string

type CompileStatus

type CompileStatus struct {
	Area           string `json:"area"`
	Column         int    `json:"column_number"`
	CompileSuccess bool   `json:"compile_success"`
	Description    string `json:"description"`
	Index          int    `json:"index"`
	Language       string `json:"language"`
	Line           int    `json:"line_number"`
}

type Config

type Config map[string]interface{}

type Constant

type Constant struct {
	Value   string `json:"value"`
	Literal string `json:"literal"`
}

type CouchbaseVer

type CouchbaseVer struct {
	// contains filtered or unexported fields
}

func FrameCouchbaseVerFromNsServerStreamingRestApi

func FrameCouchbaseVerFromNsServerStreamingRestApi(ver string) (CouchbaseVer, error)

major.minor.mpVersion-build-type

func FrameCouchbaseVersion

func FrameCouchbaseVersion(ver string) (CouchbaseVer, error)

func FrameCouchbaseVersionShort

func FrameCouchbaseVersionShort(ver string) (CouchbaseVer, error)

for short hand version like x.x.x

func (CouchbaseVer) Compare

func (e CouchbaseVer) Compare(need CouchbaseVer) bool

returns e >= need

func (CouchbaseVer) Equals

func (e CouchbaseVer) Equals(need CouchbaseVer) bool

returns e == need

func (CouchbaseVer) String

func (e CouchbaseVer) String() string

type Credential

type Credential struct {
	Username  string `json:"username"`
	Password  string `json:"password"`
	BearerKey string `json:"bearer_key"`
}

type Curl

type Curl struct {
	Hostname               string `json:"hostname"`
	Value                  string `json:"value"`
	AuthType               string `json:"auth_type"`
	Username               string `json:"username"`
	Password               string `json:"password"`
	BearerKey              string `json:"bearer_key"`
	AllowCookies           bool   `json:"allow_cookies"`
	ValidateSSLCertificate bool   `json:"validate_ssl_certificate"`
}

type CursorRegistryMgr

type CursorRegistryMgr interface {
	UpdateLimit(newlimit uint8)
	CursorRegistryWriter
	CursorRegistryReader
}

type CursorRegistryReader

type CursorRegistryReader interface {
	GetCursors(k KeyspaceName) (map[string]struct{}, bool)
	PrintTree()
}

type CursorRegistryWriter

type CursorRegistryWriter interface {
	Register(k KeyspaceName, funcId string) bool
	Unregister(k KeyspaceName, funcId string)
}

type DcpStreamBoundary

type DcpStreamBoundary string

func StreamBoundary

func StreamBoundary(boundary string) DcpStreamBoundary

type DebuggerInstance

type DebuggerInstance struct {
	Token           string   `json:"token"`             // An ID for a debugging session
	Host            string   `json:"host"`              // The node where debugger has been spawned
	Status          string   `json:"status"`            // Possible values are WaitingForMutation, MutationTrapped
	URL             string   `json:"url"`               // Chrome-Devtools URL for debugging
	NodesExternalIP []string `json:"nodes_external_ip"` // List of external IP address of the nodes in the cluster
}

type DebuggerOp

type DebuggerOp uint8
const (
	StartDebuggerOp DebuggerOp = iota
	GetDebuggerURl
	WriteDebuggerURL
	StopDebuggerOp
)

func (DebuggerOp) String

func (do DebuggerOp) String() string

type DepCfg

type DepCfg struct {
	Buckets            []Bucket   `json:"buckets,omitempty"`
	Curl               []Curl     `json:"curl,omitempty"`
	Constants          []Constant `json:"constants,omitempty"`
	SourceBucket       string     `json:"source_bucket"`
	SourceScope        string     `json:"source_scope"`
	SourceCollection   string     `json:"source_collection"`
	MetadataBucket     string     `json:"metadata_bucket"`
	MetadataScope      string     `json:"metadata_scope"`
	MetadataCollection string     `json:"metadata_collection"`
}

type EventProcessingStats

type EventProcessingStats struct {
	DcpEventsProcessedPSec   int    `json:"dcp_events_processed_psec"`
	TimerEventsProcessedPSec int    `json:"timer_events_processed_psec"`
	Timestamp                string `json:"timestamp"`
}

type EventingConsumer

type EventingConsumer interface {
	BootstrapStatus() bool
	CheckIfQueuesAreDrained() error
	ClearEventStats()
	CloseAllRunningDcpFeeds()
	ConsumerName() string
	DcpEventsRemainingToProcess() uint64
	EventingNodeUUIDs() []string
	EventsProcessedPSec() *EventProcessingStats
	GetEventProcessingStats() map[string]uint64
	GetExecutionStats() map[string]interface{}
	GetFailureStats() map[string]interface{}
	GetInsight() *Insight
	GetLcbExceptionsStats() map[string]uint64
	GetMetaStoreStats() map[string]uint64
	HandleV8Worker() error
	HostPortAddr() string
	Index() int
	InternalVbDistributionStats() []uint16
	NodeUUID() string
	NotifyClusterChange()
	NotifyRebalanceStop()
	NotifySettingsChange()
	Pid() int
	RebalanceStatus() bool
	RebalanceTaskProgress() *RebalanceProgress
	RemoveSupervisorToken() error
	ResetBootstrapDone()
	ResetCounters()
	Serve()
	SetConnHandle(net.Conn)
	SetFeedbackConnHandle(net.Conn)
	SetRebalanceStatus(status bool)
	GetRebalanceStatus() bool
	GetPrevRebalanceInCompleteStatus() bool
	SignalBootstrapFinish()
	SignalConnected()
	SignalFeedbackConnected()
	SignalStopDebugger() error
	SpawnCompilationWorker(appCode, appContent, appName, eventingPort string, handlerHeaders, handlerFooters []string) (*CompileStatus, error)
	Stop(context string)
	String() string
	TimerDebugStats() map[int]map[string]interface{}
	NotifyPrepareTopologyChange(keepNodes, ejectNodes []string)
	UpdateEncryptionLevel(enforceTLS, encryptOn bool)
	UpdateWorkerQueueMemCap(quota int64)
	VbDcpEventsRemainingToProcess() map[int]int64
	VbEventingNodeAssignMapUpdate(map[uint16]string)
	VbProcessingStats() map[uint16]map[string]interface{}
	VbSeqnoStats() map[int]map[string]interface{}
	WorkerVbMapUpdate(map[string][]uint16)

	SendAssignedVbs()
	PauseConsumer()
	GetAssignedVbs(workerName string) ([]uint16, error)
	NotifyWorker()
	GetOwner() *Owner

	SetFeatureMatrix(featureMatrix uint32)

	GetSuperSup() EventingSuperSup
}

EventingConsumer interface to export functions from eventing_consumer

type EventingProducer

type EventingProducer interface {
	AddMetadataPrefix(key string) Key
	AppendCurlLatencyStats(deltas StatsData)
	AppendLatencyStats(deltas StatsData)
	BootstrapStatus() bool
	CfgData() string
	CheckpointBlobDump() map[string]interface{}
	CleanupMetadataBucket(skipCheckpointBlobs bool) error
	CleanupUDSs()
	ClearEventStats()
	DcpFeedBoundary() string
	GetAppCode() string
	GetAppLog(sz int64) []string
	GetDcpEventsRemainingToProcess() uint64
	GetDebuggerURL() (string, error)
	GetEventingConsumerPids() map[string]int
	GetEventProcessingStats() map[string]uint64
	GetExecutionStats() map[string]interface{}
	GetFailureStats() map[string]interface{}
	GetLatencyStats() StatsData
	GetCurlLatencyStats() StatsData
	GetInsight() *Insight
	GetLcbExceptionsStats() map[string]uint64
	GetMetaStoreStats() map[string]uint64
	GetMetadataPrefix() string
	GetNsServerPort() string
	GetVbOwner(vb uint16) (string, string, error)
	GetSeqsProcessed() map[int]int64
	GetDebuggerToken() string
	InternalVbDistributionStats() map[string]string
	IsEventingNodeAlive(eventingHostPortAddr, nodeUUID string) bool
	IsPlannerRunning() bool
	IsTrapEvent() bool
	KillAllConsumers()
	KillAndRespawnEventingConsumer(consumer EventingConsumer)
	KvHostPorts() []string
	LenRunningConsumers() int
	MetadataBucket() string
	MetadataScope() string
	MetadataCollection() string
	NotifyInit()
	NotifyPrepareTopologyChange(ejectNodes, keepNodes []string, changeType service.TopologyChangeType)
	NotifySettingsChange()
	NotifySupervisor()
	NotifyTopologyChange(msg *TopologyChangeMsg)
	NsServerHostPort() string
	NsServerNodeCount() int
	PauseProducer()
	PlannerStats() []*PlannerNodeVbMapping
	ResumeProducer()
	RebalanceStatus() bool
	RebalanceTaskProgress() *RebalanceProgress
	RemoveConsumerToken(workerName string)
	ResetCounters()
	SignalBootstrapFinish()
	SignalStartDebugger(token string) error
	SignalStopDebugger() error
	SetRetryCount(retryCount int64)
	SpanBlobDump() map[string]interface{}
	Serve()
	SourceBucket() string
	SourceScope() string
	SourceCollection() string
	GetSourceKeyspaceID() (KeyspaceID, bool)
	GetMetadataKeyspaceID() (KeyspaceID, bool)
	GetFunctionInstanceId() string
	GetCursorAware() bool
	SrcMutation() bool
	Stop(context string)
	StopRunningConsumers()
	String() string
	SetTrapEvent(value bool)
	TimerDebugStats() map[int]map[string]interface{}
	UndeployHandler(msg UndeployAction)
	UpdateEncryptionLevel(enforceTLS, encryptOn bool)
	UpdateMemoryQuota(quota int64)
	UsingTimer() bool
	VbDcpEventsRemainingToProcess() map[int]int64
	VbDistributionStatsFromMetadata() map[string]map[string]string
	VbSeqnoStats() map[int][]map[string]interface{}
	WriteAppLog(log string)
	WriteDebuggerURL(url string)
	WriteDebuggerToken(token string, hostnames []string) error
	GetOwner() *Owner
	GetFuncScopeDetails() (string, uint32)
	FunctionManageBucket() string
	FunctionManageScope() string
	SetFeatureMatrix(featureMatrix uint32)
}

EventingProducer interface to export functions from eventing_producer

type EventingServiceMgr

type EventingServiceMgr interface {
	UpdateBucketGraphFromMetakv(functionName string) error
	ResetFailoverStatus()
	GetFailoverStatus() (failoverNotifTs int64, changeId string)
	CheckLifeCycleOpsDuringRebalance() bool
	NotifySupervisorWaitCh()
	// TODO: Replace it with getting back the whole application.
	GetFunctionId(id Identity) (uint32, error)
	GetPreparedApp(appLocation string) Application
	GetAdminHTTPPort() string
	SetSettings(appLocation string, data []byte, force bool)
}

type EventingSuperSup

type EventingSuperSup interface {
	PausingAppList() map[string]string
	BootstrapAppList() map[string]string
	BootstrapAppStatus(appName string) bool
	BootstrapStatus() bool
	CheckAndSwitchgocbBucket(bucketName, appName string, setting *SecuritySetting) error
	CheckpointBlobDump(appName string) (interface{}, error)
	ClearEventStats() []string
	DcpFeedBoundary(fnName string) (string, error)
	DeployedAppList() []string
	GetEventProcessingStats(appName string) map[string]uint64
	GetAppCode(appName string) string
	GetAppLog(appName string, sz int64) []string
	GetAppCompositeState(appName string) int8
	GetDcpEventsRemainingToProcess(appName string) uint64
	GetDebuggerURL(appName string) (string, error)
	GetDeployedApps() map[string]string
	GetUndeployedApps() []string
	GetEventingConsumerPids(appName string) map[string]int
	GetExecutionStats(appName string) map[string]interface{}
	GetFailureStats(appName string) map[string]interface{}
	GetLatencyStats(appName string) StatsData
	GetCurlLatencyStats(appName string) StatsData
	GetInsight(appName string) *Insight
	GetLcbExceptionsStats(appName string) map[string]uint64
	GetLocallyDeployedApps() map[string]string
	GetMetaStoreStats(appName string) map[string]uint64
	GetBucket(bucketName, appName string) (*couchbase.Bucket, error)
	GetMetadataHandle(bucketName, scopeName, collectionName, appName string) (*gocb.Collection, error)
	GetKeyspaceID(bucketName, scopeName, collectionName string) (keyspaceID KeyspaceID, err error)
	GetCurrentManifestId(bucketName string) (string, error)
	GetRegisteredPool() string
	GetSeqsProcessed(appName string) map[int]int64
	GetNumVbucketsForBucket(bucketName string) int
	InternalVbDistributionStats(appName string) map[string]string
	KillAllConsumers()
	NotifyPrepareTopologyChange(ejectNodes, keepNodes []string, changeType service.TopologyChangeType)
	TopologyChangeNotifCallback(kve metakv.KVEntry) error
	PlannerStats(appName string) []*PlannerNodeVbMapping
	RebalanceStatus() bool
	RebalanceTaskProgress(appName string) (*RebalanceProgress, error)
	RemoveProducerToken(appName string)
	RestPort() string
	ResetCounters(appName string) error
	SetSecuritySetting(setting *SecuritySetting) bool
	GetSecuritySetting() *SecuritySetting
	EncryptionChangedDuringLifecycle() bool
	GetGocbSubscribedApps(encryptionEnabled bool) map[string]struct{}
	SignalStopDebugger(appName string) error
	SpanBlobDump(appName string) (interface{}, error)
	StopProducer(appName string, msg UndeployAction)
	TimerDebugStats(appName string) (map[int]map[string]interface{}, error)
	UpdateEncryptionLevel(enforceTLS, encryptOn bool)
	VbDcpEventsRemainingToProcess(appName string) map[int]int64
	VbDistributionStatsFromMetadata(appName string) map[string]map[string]string
	VbSeqnoStats(appName string) (map[int][]map[string]interface{}, error)
	WriteDebuggerURL(appName, url string)
	WriteDebuggerToken(appName, token string, hostnames []string)
	IncWorkerRespawnedCount()
	WorkerRespawnedCount() uint32
	CheckLifeCycleOpsDuringRebalance() bool
	GetBSCSnapshot() (map[string]map[string][]string, error)
	GetSystemMemoryQuota() float64
	ReadOnDeployDoc(string) (string, string, string)
	RemoveOnDeployLeader(string)
	PublishOnDeployStatus(string, string)
	WritePauseTimestamp(string, time.Time)
	RemovePauseTimestampDoc(appName string)
	WriteOnDeployMsgBuffer(appName, msg string)
	GetOnDeployMsgBuffer(appName string) []string
	ClearOnDeployMsgBuffer(appName string)
	GetOnDeployStatus(appName string) OnDeployState
	GetPreviousOnDeployStatus(appName string) OnDeployState
	UpdateFailedOnDeployStatus(appName string)
	CleanupOnDeployTimers(appName string, skipCheckpointBlobs bool) error

	WatchBucket(keyspace Keyspace, appName string, mType MonitorType) error
	UnwatchBucket(keyspace Keyspace, appName string)
}

type FunctionScope

type FunctionScope struct {
	BucketName string `json:"bucket"`
	ScopeName  string `json:"scope"`
}

needed only during 1st creation of the function

func GetFunctionScope

func GetFunctionScope(identity Identity) FunctionScope

func (FunctionScope) String

func (fs FunctionScope) String() string

func (*FunctionScope) ToKeyspace

func (fs *FunctionScope) ToKeyspace() *Keyspace

type GlobalStatsCounter

type GlobalStatsCounter struct {
	LcbCredsStats  atomic.Uint64
	GocbCredsStats atomic.Uint64
}

func NewGlobalStatsCounters

func NewGlobalStatsCounters() *GlobalStatsCounter

type HandlerConfig

type HandlerConfig struct {
	N1qlPrepareAll            bool
	LanguageCompatibility     string
	AllowTransactionMutations bool
	AllowSyncDocuments        bool
	CursorAware               bool
	AggDCPFeedMemCap          int64
	CheckpointInterval        int
	IdleCheckpointInterval    int
	CPPWorkerThrCount         int
	ExecuteTimerRoutineCount  int
	ExecutionTimeout          int
	CursorCheckpointTimeout   int
	FeedbackBatchSize         int
	FeedbackQueueCap          int64
	FeedbackReadBufferSize    int
	HandlerHeaders            []string
	HandlerFooters            []string
	LcbInstCapacity           int
	N1qlConsistency           string
	LogLevel                  string
	SocketWriteBatchSize      int
	SourceKeyspace            *Keyspace
	StatsLogInterval          int
	StreamBoundary            DcpStreamBoundary
	TimerContextSize          int64
	TimerQueueMemCap          uint64
	TimerQueueSize            uint64
	UndeployRoutineCount      int
	WorkerCount               int
	WorkerQueueCap            int64
	WorkerQueueMemCap         int64
	WorkerResponseTimeout     int
	LcbRetryCount             int
	LcbTimeout                int
	BucketCacheSize           int64
	BucketCacheAge            int64
	NumTimerPartitions        int
	CurlMaxAllowedRespSize    int
}

type HistogramStats

type HistogramStats struct {
	// contains filtered or unexported fields
}

func NewHistogramStats

func NewHistogramStats() *HistogramStats

func (*HistogramStats) Copy

func (hs *HistogramStats) Copy() *HistogramStats

func (*HistogramStats) Get

func (hs *HistogramStats) Get() map[string]uint64

func (*HistogramStats) MarshalJSON

func (hs *HistogramStats) MarshalJSON() ([]byte, error)

func (*HistogramStats) PercentileN

func (hs *HistogramStats) PercentileN(p int) int

func (*HistogramStats) Reset

func (hs *HistogramStats) Reset()

func (*HistogramStats) Update

func (hs *HistogramStats) Update(delta map[string]uint64)

func (*HistogramStats) UpdateWithHistogram

func (hs *HistogramStats) UpdateWithHistogram(hs1 *HistogramStats)

type Identity

type Identity struct {
	AppName string
	Bucket  string
	Scope   string
}

func GetIdentityFromLocation

func GetIdentityFromLocation(locationString string) (Identity, error)

func (Identity) String

func (id Identity) String() string

func (Identity) ToLocation

func (id Identity) ToLocation() string

type Insight

type Insight struct {
	Script string              `json:"script"`
	Lines  map[int]InsightLine `json:"lines"`
}

func NewInsight

func NewInsight() *Insight

func (*Insight) Accumulate

func (dst *Insight) Accumulate(src *Insight)

type InsightLine

type InsightLine struct {
	CallCount      int64   `json:"call_count"`
	CallTime       float64 `json:"call_time"`
	ExceptionCount int64   `json:"error_count"`
	LastException  string  `json:"error_msg"`
	LastLog        string  `json:"last_log"`
}

type Insights

type Insights map[string]*Insight

func NewInsights

func NewInsights() *Insights

func (*Insights) Accumulate

func (dst *Insights) Accumulate(src *Insights)

type IpMode

type IpMode string

type Key

type Key struct {
	// contains filtered or unexported fields
}

func NewKey

func NewKey(userPrefix, clusterPrefix, key string) Key

func (Key) GetPrefix

func (k Key) GetPrefix() string

func (Key) Raw

func (k Key) Raw() string

type Keyspace

type Keyspace struct {
	BucketName     string
	ScopeName      string
	CollectionName string
}

func (Keyspace) Equals

func (k1 Keyspace) Equals(k2 Keyspace) bool

func (Keyspace) IsWildcard

func (k Keyspace) IsWildcard() bool

func (Keyspace) String

func (k Keyspace) String() string

func (Keyspace) ToFunctionScope

func (k Keyspace) ToFunctionScope() *FunctionScope

type KeyspaceConvertor

type KeyspaceConvertor[T any] interface {
	MarshalJSON() ([]byte, error)
	GetOriginalKeyspace() T
}

type KeyspaceID

type KeyspaceID struct {
	Bid        string
	Cid        uint32
	Sid        uint32
	StreamType StreamType
}

func (KeyspaceID) Equals

func (keyspaceID KeyspaceID) Equals(keyspaceID2 KeyspaceID) bool

type KeyspaceName

type KeyspaceName struct {
	Bucket     string `json:"bucket_name"`
	Scope      string `json:"scope_name"`
	Collection string `json:"collection_name"`
}

type LifecycleMsg

type LifecycleMsg struct {
	InstanceID      string
	Applocation     application.AppLocation
	DeleteFunction  bool
	PauseFunction   bool
	UndeloyFunction bool
	Description     string
}

func (LifecycleMsg) String

func (u LifecycleMsg) String() string

type MarshalledData

type MarshalledData[T any] struct {
	// contains filtered or unexported fields
}

func NewMarshalledData

func NewMarshalledData[T any](val T) *MarshalledData[T]

func (*MarshalledData[T]) GetOriginalKeyspace

func (m *MarshalledData[T]) GetOriginalKeyspace() T

func (*MarshalledData[T]) MarshalJSON

func (m *MarshalledData[T]) MarshalJSON() ([]byte, error)

type MonitorType

type MonitorType int8
const (
	SrcWatch MonitorType = iota
	MetaWatch
	FunctionScopeWatch
)

type OnDeployState

type OnDeployState int8
const (
	PENDING OnDeployState = iota + 1
	FINISHED
	FAILED
)

func (OnDeployState) String

func (state OnDeployState) String() string

type Owner

type Owner struct {
	UUID   string
	User   string
	Domain string
}

func (*Owner) String

func (o *Owner) String() string

type OwnershipRoutine

type OwnershipRoutine interface {
	GetVbMap(namespace *application.KeyspaceInfo, numVb uint16) (string, []uint16, error)
}

type PlannerNodeVbMapping

type PlannerNodeVbMapping struct {
	Hostname string `json:"host_name"`
	StartVb  int    `json:"start_vb"`
	VbsCount int    `json:"vb_count"`
}

PlannerNodeVbMapping captures the vbucket distribution across all eventing nodes as per planner

type ProcessConfig

type ProcessConfig struct {
	BreakpadOn             bool
	DebuggerPort           string
	DiagDir                string
	EventingDir            string
	EventingPort           string
	EventingSSLPort        string
	FeedbackSockIdentifier string
	IPCType                string
	SockIdentifier         string
}

type RebalanceConfig

type RebalanceConfig struct {
	VBOwnershipGiveUpRoutineCount   int
	VBOwnershipTakeoverRoutineCount int
}

type RebalanceProgress

type RebalanceProgress struct {
	CloseStreamVbsLen     int
	StreamReqVbsLen       int
	VbsRemainingToShuffle int
	VbsOwnedPerPlan       int
	NodeLevelStats        interface{}
}

type SecuritySetting

type SecuritySetting struct {
	EncryptData        bool
	DisableNonSSLPorts bool
	CAFile             string
	CertFile           string
	KeyFile            string
	RootCAs            *x509.CertPool
}

type Signal

type Signal struct {
	// contains filtered or unexported fields
}

func NewSignal

func NewSignal() *Signal

func (*Signal) Close

func (n *Signal) Close()

func (*Signal) Notify

func (n *Signal) Notify()

func (*Signal) Pause

func (n *Signal) Pause()

func (*Signal) PauseWait

func (n *Signal) PauseWait() <-chan struct{}

func (*Signal) Ready

func (n *Signal) Ready()

func (*Signal) Resume

func (n *Signal) Resume()

func (*Signal) Wait

func (n *Signal) Wait() <-chan struct{}

func (*Signal) WaitResume

func (n *Signal) WaitResume()

type Stats

type Stats struct {
	ExecutionStats          map[string]interface{} `json:"execution_stats"`
	FailureStats            map[string]interface{} `json:"failure_stats"`
	EventProcessingStats    map[string]uint64      `json:"event_processing_stats"`
	LCBExceptionStats       map[string]uint64      `json:"lcb_exception_stats"`
	FunctionScope           application.Namespace  `json:"function_scope"`
	FunctionName            string                 `json:"function_name"`
	FunctionID              uint32                 `json:"function_id"`
	LatencyPercentileStats  map[string]int         `json:"latency_percentile_stats"`
	CurlLatency             *HistogramStats        `json:"curl_latency_percentile_stats"`
	DcpFeedBoundary         string                 `json:"dcp_feed_boundary"`
	EventRemaining          map[string]uint64      `json:"events_remaining"`
	LcbCredsRequestCounter  uint64                 `json:"lcb_creds_request_counter,omitempty"`
	GoCbCredsRequestCounter uint64                 `json:"gocb_creds_request_counter,omitempty"`

	Insight          *Insight          `json:"-"`
	LatencyHistogram *HistogramStats   `json:"-"`
	ProcessedSeq     map[uint16]uint64 `json:"-"`
}

func NewStats

func NewStats(statsInit bool, functionScope application.Namespace, appName string) *Stats

func (*Stats) Add

func (s2 *Stats) Add(s1 *Stats)

s2 = s2 + s1

func (*Stats) Copy

func (s *Stats) Copy(allStats bool) *Stats

func (*Stats) String

func (s *Stats) String() string

func (*Stats) Sub

func (s2 *Stats) Sub(s1 *Stats, copyNonSubtracted bool) *Stats

s2-s1

type StatsData

type StatsData map[string]uint64

type StorageEngine

type StorageEngine string

type StreamType

type StreamType uint8
const (
	STREAM_BUCKET StreamType = iota
	STREAM_SCOPE
	STREAM_COLLECTION
	STREAM_UNKNOWN
)

type TopologyChangeMsg

type TopologyChangeMsg struct {
	CType     ChangeType
	MsgSource string
}

type UndeployAction

type UndeployAction struct {
	UpdateMetakv        bool
	SkipMetadataCleanup bool
	DeleteFunction      bool

	Reason string
}

func DefaultUndeployAction

func DefaultUndeployAction() UndeployAction

func (UndeployAction) String

func (msg UndeployAction) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL