internal

package
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2024 License: Apache-2.0 Imports: 39 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MaxMessageSize limit message size for transfer
	MaxMessageSize = 5 * 1024 * 1024
	// MessageFramePadding is for metadata and other frame headers
	MessageFramePadding = 10 * 1024
	// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
	MaxFrameSize = MaxMessageSize + MessageFramePadding
)
View Source
const (
	Persistent    GetTopicsOfNamespaceMode = "PERSISTENT"
	NonPersistent                          = "NON_PERSISTENT"
	All                                    = "ALL"
)
View Source
const (
	BinaryService = "pulsar"
	HTTPService   = "http"
	HTTPSService  = "https"
	SSLService    = "ssl"
	BinaryPort    = 6650
	BinaryTLSPort = 6651
	HTTPPort      = 80
	HTTPSPort     = 443
)
View Source
const HTTPAdminServiceV1Format string = "/admin/%s/partitions"
View Source
const HTTPAdminServiceV2Format string = "/admin/v2/%s/partitions"
View Source
const HTTPLookupServiceBasePathV1 string = "/lookup/v2/destination/"
View Source
const HTTPLookupServiceBasePathV2 string = "/lookup/v2/topic/"
View Source
const HTTPTopicUnderNamespaceV1 string = "/admin/namespaces/%s/destinations?mode=%s"
View Source
const HTTPTopicUnderNamespaceV2 string = "/admin/v2/namespaces/%s/topics?mode=%s"
View Source
const (
	PulsarProtocolVersion = int32(pb.ProtocolVersion_v20)
)

Variables

View Source
var (
	Version             string
	ClientVersionString string
)
View Source
var ErrConnectionClosed = errors.New("connection closed")
View Source
var ErrCorruptedMessage = errors.New("corrupted message")

ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data. The data is considered corrupted if it's missing a header, a checksum mismatch or there was an error when unmarshalling the message metadata.

View Source
var ErrEOM = errors.New("EOF")

ErrEOM is the error returned by ReadMessage when no more input is available.

View Source
var ErrExceedMaxMessageSize = errors.New("encryptedPayload exceeds MaxMessageSize")
View Source
var (
	// ErrRequestTimeOut happens when request not finished in given requestTimeout.
	ErrRequestTimeOut = errors.New("request timed out")
)

Functions

func ConvertFromStringMap

func ConvertFromStringMap(m map[string]string) []*pb.KeyValue

ConvertFromStringMap convert a string map to a KeyValue []byte

func ConvertToStringMap

func ConvertToStringMap(pbb []*pb.KeyValue) map[string]string

ConvertToStringMap convert a KeyValue []byte to string map

func Crc32cCheckSum

func Crc32cCheckSum(data []byte) uint32

Crc32cCheckSum handles computing the checksum.

func GetAndAdd

func GetAndAdd(n *uint64, diff uint64) uint64

GetAndAdd perform atomic read and update

func GetCompressionProvider added in v0.10.0

func GetCompressionProvider(
	compressionType pb.CompressionType,
	level compression.Level,
) compression.Provider

func GetConnectionsCount added in v0.10.0

func GetConnectionsCount(p *ConnectionPool) int

func GetTopicRestPath added in v0.5.0

func GetTopicRestPath(tn *TopicName) string

func IsV2Namespace added in v0.6.0

func IsV2Namespace(namespace string) bool

func IsV2TopicName added in v0.5.0

func IsV2TopicName(tn *TopicName) bool

func JavaStringHash

func JavaStringHash(s string) uint32

JavaStringHash and Java String.hashCode() equivalent

func MarshalToSizedBuffer added in v0.10.0

func MarshalToSizedBuffer(m proto.Message, out []byte) error

func Murmur3_32Hash

func Murmur3_32Hash(s string) uint32

Murmur3_32Hash use Murmur3 hashing function

func ParseRelativeTimeInSeconds

func ParseRelativeTimeInSeconds(relativeTime string) (time.Duration, error)

func SingleSend added in v0.10.0

func SingleSend(wb Buffer,
	producerID, sequenceID uint64,
	msgMetadata *pb.MessageMetadata,
	compressedPayload Buffer,
	encryptor crypto.Encryptor,
	maxMassageSize uint32,
	useTxn bool,
	mostSigBits uint64,
	leastSigBits uint64) error

func StartCleanConnectionsTask added in v0.10.0

func StartCleanConnectionsTask(p *ConnectionPool, connectionMaxIdleTime time.Duration)

func TimestampMillis

func TimestampMillis(t time.Time) uint64

TimestampMillis return a time unix nano.

func TopicNameWithoutPartitionPart

func TopicNameWithoutPartitionPart(tn *TopicName) string

Types

type BatchBuilder

type BatchBuilder interface {
	// IsFull check if the size in the current batch exceeds the maximum size allowed by the batch
	IsFull() bool

	// Add will add single message to batch.
	Add(
		metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64,
		payload []byte,
		callback interface{}, replicateTo []string, deliverAt time.Time,
		schemaVersion []byte, multiSchemaEnabled bool,
		useTxn bool,
		mostSigBits uint64,
		leastSigBits uint64,
	) bool

	// Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
	Flush() *FlushBatch

	// FlushBatches all the messages buffered in multiple batches and wait until all
	// messages have been successfully persisted.
	FlushBatches() []*FlushBatch

	// Return the batch container batch message in multiple batches.
	IsMultiBatches() bool

	Close() error
	// contains filtered or unexported methods
}

BatchBuilder is a interface of batch builders

func NewBatchBuilder

func NewBatchBuilder(
	maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64,
	compressionType pb.CompressionType, level compression.Level,
	bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
) (BatchBuilder, error)

NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.

func NewKeyBasedBatchBuilder added in v0.4.0

func NewKeyBasedBatchBuilder(
	maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64,
	compressionType pb.CompressionType, level compression.Level,
	bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
) (BatchBuilder, error)

NewKeyBasedBatchBuilder init batch builder and return BatchBuilder pointer. Build a new key based batch message container.

type BatcherBuilderProvider added in v0.4.0

type BatcherBuilderProvider func(
	maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64,
	compressionType pb.CompressionType, level compression.Level,
	bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
) (BatchBuilder, error)

BatcherBuilderProvider defines func which returns the BatchBuilder.

type BlockingQueue

type BlockingQueue interface {
	// Put enqueue one item, block if the queue is full
	Put(item interface{})

	// Take dequeue one item, block until it's available
	Take() interface{}

	// Poll dequeue one item, return nil if queue is empty
	Poll() interface{}

	// CompareAndPoll compare the first item and poll it if meet the conditions
	CompareAndPoll(compare func(item interface{}) bool) interface{}

	// Peek return the first item without dequeing, return nil if queue is empty
	Peek() interface{}

	// PeekLast return last item in queue without dequeing, return nil if queue is empty
	PeekLast() interface{}

	// Size return the current size of the queue
	Size() int

	// ReadableSlice returns a new view of the readable items in the queue
	ReadableSlice() []interface{}
}

BlockingQueue is a interface of block queue

func NewBlockingQueue

func NewBlockingQueue(maxSize int) BlockingQueue

NewBlockingQueue init block queue and returns a BlockingQueue

type Buffer

type Buffer interface {
	ReadableBytes() uint32

	WritableBytes() uint32

	// Capacity returns the capacity of the buffer's underlying byte slice,
	// that is, the total space allocated for the buffer's data.
	Capacity() uint32

	IsWritable() bool

	Read(size uint32) []byte

	Skip(size uint32)

	Get(readerIndex uint32, size uint32) []byte

	ReadableSlice() []byte

	WritableSlice() []byte

	// WrittenBytes advance the writer index when data was written in a slice
	WrittenBytes(size uint32)

	// MoveToFront copy the available portion of data at the beginning of the buffer
	MoveToFront()

	ReadUint16() uint16
	ReadUint32() uint32

	WriteUint16(n uint16)
	WriteUint32(n uint32)

	WriterIndex() uint32
	ReaderIndex() uint32

	Write(s []byte)

	Put(writerIdx uint32, s []byte)
	PutUint32(n uint32, writerIdx uint32)

	Resize(newSize uint32)
	ResizeIfNeeded(spaceNeeded uint32)

	// Clear will clear the current buffer data.
	Clear()
}

Buffer is a variable-sized buffer of bytes with Read and Write methods. The zero value for Buffer is an empty buffer ready to use.

func NewBuffer

func NewBuffer(size int) Buffer

NewBuffer creates and initializes a new Buffer using buf as its initial contents.

func NewBufferWrapper

func NewBufferWrapper(buf []byte) Buffer

type BuffersPool added in v0.2.0

type BuffersPool interface {
	GetBuffer() Buffer
}

type CheckSum

type CheckSum struct {
	// contains filtered or unexported fields
}

func (*CheckSum) Write

func (cs *CheckSum) Write(p []byte) (int, error)

type ClientHandlers

type ClientHandlers struct {
	// contains filtered or unexported fields
}

ClientHandlerMap is a simple concurrent-safe map for the client type

func NewClientHandlers

func NewClientHandlers() ClientHandlers

func (*ClientHandlers) Add

func (h *ClientHandlers) Add(c Closable)

func (*ClientHandlers) Close

func (h *ClientHandlers) Close()

func (*ClientHandlers) Del

func (h *ClientHandlers) Del(c Closable)

func (*ClientHandlers) Val

func (h *ClientHandlers) Val(c Closable) bool

type Closable

type Closable interface {
	Close()
}

type Connection

type Connection interface {
	SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
	SendRequestNoWait(req *pb.BaseCommand) error
	WriteData(data Buffer)
	RegisterListener(id uint64, listener ConnectionListener) error
	UnregisterListener(id uint64)
	AddConsumeHandler(id uint64, handler ConsumerHandler) error
	DeleteConsumeHandler(id uint64)
	ID() string
	GetMaxMessageSize() int32
	Close()
	WaitForClose() <-chan struct{}
	IsProxied() bool
}

Connection is a interface of client cnx.

type ConnectionListener

type ConnectionListener interface {
	// ReceivedSendReceipt receive and process the return value of the send command.
	ReceivedSendReceipt(response *pb.CommandSendReceipt)

	// ConnectionClosed close the TCP connection.
	ConnectionClosed(closeProducer *pb.CommandCloseProducer)

	// SetRedirectedClusterURI set the redirected cluster URI for lookups
	SetRedirectedClusterURI(redirectedClusterURI string)
}

ConnectionListener is a user of a connection (eg. a producer or a consumer) that can register itself to get notified when the connection is closed.

type ConnectionPool

type ConnectionPool interface {
	// GetConnection get a connection from ConnectionPool.
	GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error)

	// GetConnections get all connections in the pool.
	GetConnections() map[string]Connection

	// Close all the connections in the pool
	Close()
}

ConnectionPool is a interface of connection pool.

func NewConnectionPool

func NewConnectionPool(
	tlsOptions *TLSOptions,
	auth auth.Provider,
	connectionTimeout time.Duration,
	keepAliveInterval time.Duration,
	maxConnectionsPerHost int,
	logger log.Logger,
	metrics *Metrics,
	connectionMaxIdleTime time.Duration) ConnectionPool

NewConnectionPool init connection pool.

type ConsumerHandler

type ConsumerHandler interface {
	MessageReceived(response *pb.CommandMessage, headersAndPayload Buffer) error

	ActiveConsumerChanged(isActive bool)

	// ConnectionClosed close the TCP connection.
	ConnectionClosed(closeConsumer *pb.CommandCloseConsumer)

	// SetRedirectedClusterURI set the redirected cluster URI for lookups
	SetRedirectedClusterURI(redirectedClusterURI string)
}

type FlushBatch added in v0.13.1

type FlushBatch struct {
	BatchData  Buffer
	SequenceID uint64
	Callbacks  []interface{}
	Error      error
}

type GetTopicsOfNamespaceMode added in v0.6.0

type GetTopicsOfNamespaceMode string

GetTopicsOfNamespaceMode for CommandGetTopicsOfNamespace_Mode

type HTTPClient added in v0.5.0

type HTTPClient interface {
	Get(endpoint string, obj interface{}, params map[string]string) error
	Closable
}

func NewHTTPClient added in v0.5.0

func NewHTTPClient(serviceURL *url.URL, serviceNameResolver ServiceNameResolver, tlsConfig *TLSOptions,
	requestTimeout time.Duration, logger log.Logger, metrics *Metrics,
	authProvider auth.Provider) (HTTPClient, error)

type LeveledMetrics added in v0.7.0

type LeveledMetrics struct {
	MessagesPublished        prometheus.Counter
	BytesPublished           prometheus.Counter
	MessagesPending          prometheus.Gauge
	BytesPending             prometheus.Gauge
	PublishErrorsTimeout     prometheus.Counter
	PublishErrorsMsgTooLarge prometheus.Counter
	PublishLatency           prometheus.Observer
	PublishRPCLatency        prometheus.Observer

	MessagesReceived   prometheus.Counter
	BytesReceived      prometheus.Counter
	PrefetchedMessages prometheus.Gauge
	PrefetchedBytes    prometheus.Gauge
	AcksCounter        prometheus.Counter
	NacksCounter       prometheus.Counter
	DlqCounter         prometheus.Counter
	ProcessingTime     prometheus.Observer

	ProducersOpened            prometheus.Counter
	ProducersClosed            prometheus.Counter
	ProducersReconnectFailure  prometheus.Counter
	ProducersReconnectMaxRetry prometheus.Counter
	ProducersPartitions        prometheus.Gauge
	ConsumersOpened            prometheus.Counter
	ConsumersClosed            prometheus.Counter
	ConsumersReconnectFailure  prometheus.Counter
	ConsumersReconnectMaxRetry prometheus.Counter
	ConsumersPartitions        prometheus.Gauge
	ReadersOpened              prometheus.Counter
	ReadersClosed              prometheus.Counter
}

type LookupResult

type LookupResult struct {
	LogicalAddr  *url.URL
	PhysicalAddr *url.URL
}

LookupResult encapsulates a struct for lookup a request, containing two parts: LogicalAddr, PhysicalAddr.

type LookupService

type LookupService interface {
	// Lookup perform a lookup for the given topic, confirm the location of the broker
	// where the topic is located, and return the LookupResult.
	Lookup(topic string) (*LookupResult, error)

	// GetPartitionedTopicMetadata perform a CommandPartitionedTopicMetadata request for
	// the given topic, returns the CommandPartitionedTopicMetadataResponse as the result.
	GetPartitionedTopicMetadata(topic string) (*PartitionedTopicMetadata, error)

	// GetTopicsOfNamespace returns all the topics name for a given namespace.
	GetTopicsOfNamespace(namespace string, mode GetTopicsOfNamespaceMode) ([]string, error)

	// GetSchema returns schema for a given version.
	GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error)

	GetBrokerAddress(brokerServiceURL string, proxyThroughServiceURL bool) (*LookupResult, error)

	ServiceNameResolver() *ServiceNameResolver

	// Closable Allow Lookup Service's internal client to be able to closed
	Closable
}

LookupService is a interface of lookup service.

func NewHTTPLookupService added in v0.5.0

func NewHTTPLookupService(httpClient HTTPClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver,
	tlsEnabled bool, logger log.Logger, metrics *Metrics) LookupService

NewHTTPLookupService init a http based lookup service struct and return an object of LookupService.

func NewLookupService

func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver,
	tlsEnabled bool, listenerName string, logger log.Logger, metrics *Metrics) LookupService

NewLookupService init a lookup service struct and return an object of LookupService.

type MemoryLimitController added in v0.10.0

type MemoryLimitController interface {
	ReserveMemory(ctx context.Context, size int64) bool
	TryReserveMemory(size int64) bool
	ForceReserveMemory(size int64)
	ReleaseMemory(size int64)
	CurrentUsage() int64
	CurrentUsagePercent() float64
	IsMemoryLimited() bool
	RegisterTrigger(trigger func())
}

func NewMemoryLimitController added in v0.10.0

func NewMemoryLimitController(limit int64, threshold float64) MemoryLimitController

NewMemoryLimitController threshold valid range is (0, 1.0)

type MessageReader

type MessageReader struct {
	// contains filtered or unexported fields
}

MessageReader provides helper methods to parse the metadata and messages from the binary format Wire format for a messages

Old format (single message) [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]

Batch format [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [METADATA_SIZE][METADATA][PAYLOAD] [METADATA_SIZE][METADATA][PAYLOAD]

func NewMessageReader

func NewMessageReader(headersAndPayload Buffer) *MessageReader

func NewMessageReaderFromArray

func NewMessageReaderFromArray(headersAndPayload []byte) *MessageReader

func (*MessageReader) ReadBrokerMetadata added in v0.9.0

func (r *MessageReader) ReadBrokerMetadata() (*pb.BrokerEntryMetadata, error)

func (*MessageReader) ReadMessage

func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error)

func (*MessageReader) ReadMessageMetadata

func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error)

func (*MessageReader) ResetBuffer

func (r *MessageReader) ResetBuffer(buffer Buffer)

type Metrics added in v0.4.0

type Metrics struct {

	// Metrics that are not labeled with specificity are immediately available
	ConnectionsOpened                     prometheus.Counter
	ConnectionsClosed                     prometheus.Counter
	ConnectionsEstablishmentErrors        prometheus.Counter
	ConnectionsHandshakeErrors            prometheus.Counter
	LookupRequestsCount                   prometheus.Counter
	PartitionedTopicMetadataRequestsCount prometheus.Counter
	RPCRequestCount                       prometheus.Counter
	// contains filtered or unexported fields
}

func NewMetricsProvider added in v0.4.0

func NewMetricsProvider(metricsCardinality int, userDefinedLabels map[string]string,
	registerer prometheus.Registerer) *Metrics

NewMetricsProvider returns metrics registered to registerer.

func (*Metrics) GetLeveledMetrics added in v0.7.0

func (mp *Metrics) GetLeveledMetrics(t string) *LeveledMetrics

type PartitionedTopicMetadata added in v0.5.0

type PartitionedTopicMetadata struct {
	Partitions int `json:"partitions"` // Number of partitions for the topic
}

PartitionedTopicMetadata encapsulates a struct for metadata of a partitioned topic

type PulsarServiceURI added in v0.5.0

type PulsarServiceURI struct {
	ServiceName  string
	ServiceInfos []string
	ServiceHosts []string

	URL *url.URL
	// contains filtered or unexported fields
}

func NewPulsarServiceURIFromURIString added in v0.5.0

func NewPulsarServiceURIFromURIString(uri string) (*PulsarServiceURI, error)

func NewPulsarServiceURIFromURL added in v0.5.0

func NewPulsarServiceURIFromURL(url *url.URL) (*PulsarServiceURI, error)

type RPCClient

type RPCClient interface {
	// Create a new unique request id
	NewRequestID() uint64

	NewProducerID() uint64

	NewConsumerID() uint64

	// Send a request and block until the result is available
	RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)

	RequestToHost(serviceNameResolver *ServiceNameResolver, requestID uint64,
		cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)

	Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
		cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)

	RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error

	RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)

	LookupService(URL string) LookupService
}

func NewRPCClient

func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
	requestTimeout time.Duration, logger log.Logger, metrics *Metrics,
	listenerName string, tlsConfig *TLSOptions, authProvider auth.Provider) RPCClient

type RPCResult

type RPCResult struct {
	Response *pb.BaseCommand
	Cnx      Connection
}

type Semaphore

type Semaphore interface {
	// Acquire a permit, if one is available and returns immediately,
	// reducing the number of available permits by one.
	Acquire(ctx context.Context) bool

	// Try to acquire a permit. The method will return immediately
	// with a `true` if it was possible to acquire a permit and
	// `false` otherwise.
	TryAcquire() bool

	// Release a permit, returning it to the semaphore.
	// Release a permit, increasing the number of available permits by
	// one.  If any threads are trying to acquire a permit, then one is
	// selected and given the permit that was just released.  That thread
	// is (re)enabled for thread scheduling purposes.
	// There is no requirement that a thread that releases a permit must
	// have acquired that permit by calling Acquire().
	// Correct usage of a semaphore is established by programming convention
	// in the application.
	Release()
}

func NewSemaphore added in v0.2.0

func NewSemaphore(maxPermits int32) Semaphore

type ServiceNameResolver added in v0.5.0

type ServiceNameResolver interface {
	ResolveHost() (*url.URL, error)
	ResolveHostURI() (*PulsarServiceURI, error)
	UpdateServiceURL(url *url.URL) error
	GetServiceURI() *PulsarServiceURI
	GetServiceURL() *url.URL
	GetAddressList() []*url.URL
}

func NewPulsarServiceNameResolver added in v0.5.0

func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver

type TLSOptions

type TLSOptions struct {
	KeyFile                 string
	CertFile                string
	TrustCertsFilePath      string
	AllowInsecureConnection bool
	ValidateHostname        bool
	ServerName              string
	CipherSuites            []uint16
	MinVersion              uint16
	MaxVersion              uint16
}

type TopicName

type TopicName struct {
	Domain    string
	Tenant    string
	Namespace string
	Topic     string
	Name      string
	Partition int
}

TopicName abstract a struct contained in a Topic

func ParseTopicName

func ParseTopicName(topic string) (*TopicName, error)

ParseTopicName parse the given topic name and return TopicName.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL