dcpConn

package
v0.0.0-...-31abfad Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2024 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ActiveOnly          = uint32(0x10)
	TillLatest          = uint32(0x04)
	IgnorePurgeRollback = uint32(0x80)
)
View Source
const (
	DCP_NO_RES        = resCommandCode(0x00)
	DCP_SEQ_NUM       = resCommandCode(0x48)
	DCP_OPEN_CONN     = resCommandCode(0x50) // Open a DCP connection with a name
	DCP_SELECT_BUCKET = resCommandCode(0x89) // Select bucket
	DCP_ADDSTREAM     = resCommandCode(0x51) // DCP stream addition
	DCP_BUFFERACK     = resCommandCode(0x5d) // DCP Buffer Acknowledgement
	DCP_CONTROL       = resCommandCode(0x5e) // Set flow control params
	DCP_NOOP          = resCommandCode(0x5c) // dcp noop
	DCP_FLUSH         = resCommandCode(0x5a)

	DCP_SNAPSHOT_MARKER = resCommandCode(0x56) //snapshot marker

	DCP_DELETION   = resCommandCode(0x58)
	DCP_EXPIRATION = resCommandCode(0x59)
	DCP_MUTATION   = resCommandCode(0x57)

	DCP_FAILOVER   = resCommandCode(0x54)
	DCP_SEQ_NUMBER = resCommandCode(0x48)

	DCP_STREAMREQ   = resCommandCode(0x53)
	DCP_STREAM_END  = resCommandCode(0x55)
	DCP_CLOSESTREAM = resCommandCode(0x52)

	DCP_SYSTEM_EVENT = resCommandCode(0x5F)
	DCP_ADV_SEQNUM   = resCommandCode(0x64)

	DCP_HELO = resCommandCode(0x1F)

	DCP_SASL_AUTH  = resCommandCode(0x21)
	SASL_AUTH_LIST = resCommandCode(0x20)
)
View Source
const (
	COLLECTION_CREATE  = collectionEvent(0x00) // Collection has been created
	COLLECTION_DROP    = collectionEvent(0x01) // Collection has been dropped
	COLLECTION_FLUSH   = collectionEvent(0x02) // Collection has been flushed
	SCOPE_CREATE       = collectionEvent(0x03) // Scope has been created
	SCOPE_DROP         = collectionEvent(0x04) // Scope has been dropped
	COLLECTION_CHANGED = collectionEvent(0x05) // Collection has changed

	OSO_SNAPSHOT_START = collectionEvent(0x06) // OSO snapshot start
	OSO_SNAPSHOT_END   = collectionEvent(0x07) // OSO snapshot end

	EVENT_UNKNOWN = collectionEvent(0xFF)
)
View Source
const (
	// Can add more status later
	SUCCESS         = status(0x00)
	KEY_ENOENT      = status(0x01)
	FORCED_CLOSED   = status(0x01) // for stream end status
	STATE_CHANGED   = status(0x02)
	KEY_EEXISTS     = status(0x02)
	E2BIG           = status(0x03)
	EINVAL          = status(0x04)
	NOT_STORED      = status(0x05)
	DELTA_BADVAL    = status(0x06)
	FILTER_DELETED  = status(0x07) // Filter is deleted
	NOT_MY_VBUCKET  = status(0x07)
	LOST_PRIVILAGE  = status(0x08)
	ERANGE          = status(0x22)
	ROLLBACK        = status(0x23)
	UNKNOWN_COMMAND = status(0x81)
	ENOMEM          = status(0x82)
	TMPFAIL         = status(0x86)
	UNKNOWN         = status(0xFF)
)
View Source
const (
	FEATURE_COLLECTIONS = heloCommand(0x12)
	FEATURE_XERROR      = heloCommand(0x07)
)
View Source
const (
	RAW    = 0x00
	JSON   = 0x01
	SNAPPY = 0x02
	XATTR  = 0x04
)

Variables

View Source
var (
	ErrConnClosed          = errors.New("connection closed. Retry request")
	ErrFetchingSeqNum      = errors.New("error fetching seq number")
	ErrFetchingFailoverLog = errors.New("error fetching FailoverLog")
	ErrTimeout             = errors.New("request timeout")
)
View Source
var (
	ErrInvalidRequest  = errors.New("invalid Request")
	ErrAlreadyInflight = errors.New("stream request is already in flight")
)
View Source
var (
	ErrDecodingComponentID = errors.New("error decoding keyspaceComponentId")
)

Functions

func GetHexToUint32

func GetHexToUint32(keyspaceComponentHexId string) (uint32, error)

func GetVbUUID

func GetVbUUID(seqNo uint64, failoverLog FailoverLog) (uint64, int)

func LEB128Dec

func LEB128Dec(data []byte) ([]byte, uint32)

Decodes the encoded value according to LEB128 uint32 scheme Returns the decoded key as byte stream, collectionID as uint32 value

Types

type Config

type Config struct {
	Mode       Mode
	ClientName string
	BucketName string
	KvAddress  string

	DcpConfig map[ConfigKey]interface{}
}

func (Config) String

func (c Config) String() string

type ConfigKey

type ConfigKey int
const (
	// KeyOnly specifies whether to open connection with key only
	KeyOnly ConfigKey = iota
	IncludeXattr
)

type DcpConsumer

type DcpConsumer interface {
	Wait() error

	StartStreamReq(sr *StreamReq) error
	PauseStreamReq(sr *StreamReq)
	StopStreamReq(sr *StreamReq) *StreamReq

	GetSeqNumber(collectionID string) map[uint16]uint64
	GetFailoverLog(vbs []uint16) (map[uint16]FailoverLog, error)

	TlsSettingsChange(config *notifier.TlsConfig)

	CloseDcpConsumer() []*DcpEvent
}

func GetDcpConsumer

func GetDcpConsumer(config Config, tlsConfig *notifier.TlsConfig, sendChannel chan<- *DcpEvent, dcpEventPool *sync.Pool) DcpConsumer

func GetDcpConsumerWithContext

func GetDcpConsumerWithContext(ctx context.Context, config Config, tlsConfig *notifier.TlsConfig, sendChannel chan<- *DcpEvent, dcpEventPool *sync.Pool) DcpConsumer

type DcpEvent

type DcpEvent struct {
	Opcode   resCommandCode
	Datatype dcpDatatype
	Vbno     uint16
	ID       uint16
	Version  uint32

	Vbuuid       uint64
	Key, Value   []byte
	Cas          uint64
	ScopeID      uint32
	CollectionID uint32
	Seqno        uint64
	Status       status
	Type         valueType
	Expiry       uint32

	EventType   collectionEvent
	ManifestUID string
	FailoverLog FailoverLog

	Keyspace    *common.MarshalledData[application.Keyspace]
	SystemXattr map[string]xattrVal
	UserXattr   map[string]xattrVal
	SrRequest   *StreamReq
	// contains filtered or unexported fields
}

func (*DcpEvent) Reset

func (event *DcpEvent) Reset()

type FailoverLog

type FailoverLog [][2]uint64

func (FailoverLog) Pop

func (failoverLog FailoverLog) Pop(seq uint64) (FailoverLog, uint64, uint64)

type Mode

type Mode uint8
const (
	InfoMode Mode = iota
	StreamRequestMode
	MixedMode
)

type RequestType

type RequestType int8
const (
	Request_Collections RequestType = iota
	Request_Scope
	Request_Bucket
)

type StreamReq

type StreamReq struct {
	ID      uint16
	Version uint32

	// Status gives whats the status of the current request
	Status status

	RequestType   RequestType
	CollectionIDs []string //array of collction ids
	ScopeID       string   // scope id
	ManifestUID   string   //manifest id

	Vbno        uint16
	Flags       uint32
	StartSeq    uint64
	EndSeq      uint64
	Vbuuid      uint64
	FailoverLog FailoverLog
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL