Versions in this module Expand all Collapse all v0 v0.12.2 Jun 26, 2024 Changes in this version + const All + const BinaryPort + const BinaryService + const BinaryTLSPort + const HTTPAdminServiceV1Format + const HTTPAdminServiceV2Format + const HTTPLookupServiceBasePathV1 + const HTTPLookupServiceBasePathV2 + const HTTPPort + const HTTPSPort + const HTTPSService + const HTTPService + const HTTPTopicUnderNamespaceV1 + const HTTPTopicUnderNamespaceV2 + const MaxFrameSize + const MaxMessageSize + const MessageFramePadding + const NonPersistent + const Persistent + const PulsarProtocolVersion + const SSLService + var ClientVersionString string + var ErrConnectionClosed = errors.New("connection closed") + var ErrCorruptedMessage = errors.New("corrupted message") + var ErrEOM = errors.New("EOF") + var ErrExceedMaxMessageSize = errors.New("encryptedPayload exceeds MaxMessageSize") + var ErrRequestTimeOut = errors.New("request timed out") + var Version string + func ConvertFromStringMap(m map[string]string) []*pb.KeyValue + func ConvertToStringMap(pbb []*pb.KeyValue) map[string]string + func Crc32cCheckSum(data []byte) uint32 + func GetAndAdd(n *uint64, diff uint64) uint64 + func GetCompressionProvider(compressionType pb.CompressionType, level compression.Level) compression.Provider + func GetConnectionsCount(p *ConnectionPool) int + func GetTopicRestPath(tn *TopicName) string + func IsV2Namespace(namespace string) bool + func IsV2TopicName(tn *TopicName) bool + func JavaStringHash(s string) uint32 + func MarshalToSizedBuffer(m proto.Message, out []byte) error + func Murmur3_32Hash(s string) uint32 + func ParseRelativeTimeInSeconds(relativeTime string) (time.Duration, error) + func SingleSend(wb Buffer, producerID, sequenceID uint64, msgMetadata *pb.MessageMetadata, ...) error + func StartCleanConnectionsTask(p *ConnectionPool, connectionMaxIdleTime time.Duration) + func TimestampMillis(t time.Time) uint64 + func TopicNameWithoutPartitionPart(tn *TopicName) string + type BackoffPolicy interface + Next func() time.Duration + type BatchBuilder interface + Add func(metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64, ...) bool + Close func() error + Flush func() (batchData Buffer, sequenceID uint64, callbacks []interface{}, err error) + FlushBatches func() (batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}, ...) + IsFull func() bool + IsMultiBatches func() bool + func NewBatchBuilder(maxMessages uint, maxBatchSize uint, maxMessageSize uint32, ...) (BatchBuilder, error) + func NewKeyBasedBatchBuilder(maxMessages uint, maxBatchSize uint, maxMessageSize uint32, ...) (BatchBuilder, error) + type BatcherBuilderProvider func(maxMessages uint, maxBatchSize uint, maxMessageSize uint32, ...) (BatchBuilder, error) + type BlockingQueue interface + CompareAndPoll func(compare func(item interface{}) bool) interface{} + Peek func() interface{} + PeekLast func() interface{} + Poll func() interface{} + Put func(item interface{}) + ReadableSlice func() []interface{} + Size func() int + Take func() interface{} + func NewBlockingQueue(maxSize int) BlockingQueue + type Buffer interface + Capacity func() uint32 + Clear func() + Get func(readerIndex uint32, size uint32) []byte + IsWritable func() bool + MoveToFront func() + Put func(writerIdx uint32, s []byte) + PutUint32 func(n uint32, writerIdx uint32) + Read func(size uint32) []byte + ReadUint16 func() uint16 + ReadUint32 func() uint32 + ReadableBytes func() uint32 + ReadableSlice func() []byte + ReaderIndex func() uint32 + Resize func(newSize uint32) + ResizeIfNeeded func(spaceNeeded uint32) + Skip func(size uint32) + WritableBytes func() uint32 + WritableSlice func() []byte + Write func(s []byte) + WriteUint16 func(n uint16) + WriteUint32 func(n uint32) + WriterIndex func() uint32 + WrittenBytes func(size uint32) + func NewBuffer(size int) Buffer + func NewBufferWrapper(buf []byte) Buffer + type BuffersPool interface + GetBuffer func() Buffer + type CheckSum struct + func (cs *CheckSum) Write(p []byte) (int, error) + type ClientHandlers struct + func NewClientHandlers() ClientHandlers + func (h *ClientHandlers) Add(c Closable) + func (h *ClientHandlers) Close() + func (h *ClientHandlers) Del(c Closable) + func (h *ClientHandlers) Val(c Closable) bool + type Closable interface + Close func() + type Connection interface + AddConsumeHandler func(id uint64, handler ConsumerHandler) error + Close func() + DeleteConsumeHandler func(id uint64) + GetMaxMessageSize func() int32 + ID func() string + IsProxied func() bool + RegisterListener func(id uint64, listener ConnectionListener) error + SendRequest func(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error)) + SendRequestNoWait func(req *pb.BaseCommand) error + UnregisterListener func(id uint64) + WriteData func(data Buffer) + type ConnectionListener interface + ConnectionClosed func(closeProducer *pb.CommandCloseProducer) + ReceivedSendReceipt func(response *pb.CommandSendReceipt) + SetRedirectedClusterURI func(redirectedClusterURI string) + type ConnectionPool interface + Close func() + GetConnection func(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) + func NewConnectionPool(tlsOptions *TLSOptions, auth auth.Provider, connectionTimeout time.Duration, ...) ConnectionPool + type ConsumerHandler interface + ActiveConsumerChanged func(isActive bool) + ConnectionClosed func(closeConsumer *pb.CommandCloseConsumer) + MessageReceived func(response *pb.CommandMessage, headersAndPayload Buffer) error + SetRedirectedClusterURI func(redirectedClusterURI string) + type DefaultBackoff struct + func (b *DefaultBackoff) IsMaxBackoffReached() bool + func (b *DefaultBackoff) Next() time.Duration + type GetTopicsOfNamespaceMode string + type HTTPClient interface + Get func(endpoint string, obj interface{}, params map[string]string) error + func NewHTTPClient(serviceURL *url.URL, serviceNameResolver ServiceNameResolver, ...) (HTTPClient, error) + type LeveledMetrics struct + AcksCounter prometheus.Counter + BytesPending prometheus.Gauge + BytesPublished prometheus.Counter + BytesReceived prometheus.Counter + ConsumersClosed prometheus.Counter + ConsumersOpened prometheus.Counter + ConsumersPartitions prometheus.Gauge + ConsumersReconnectFailure prometheus.Counter + ConsumersReconnectMaxRetry prometheus.Counter + DlqCounter prometheus.Counter + MessagesPending prometheus.Gauge + MessagesPublished prometheus.Counter + MessagesReceived prometheus.Counter + NacksCounter prometheus.Counter + PrefetchedBytes prometheus.Gauge + PrefetchedMessages prometheus.Gauge + ProcessingTime prometheus.Observer + ProducersClosed prometheus.Counter + ProducersOpened prometheus.Counter + ProducersPartitions prometheus.Gauge + ProducersReconnectFailure prometheus.Counter + ProducersReconnectMaxRetry prometheus.Counter + PublishErrorsMsgTooLarge prometheus.Counter + PublishErrorsTimeout prometheus.Counter + PublishLatency prometheus.Observer + PublishRPCLatency prometheus.Observer + ReadersClosed prometheus.Counter + ReadersOpened prometheus.Counter + type LookupResult struct + LogicalAddr *url.URL + PhysicalAddr *url.URL + type LookupService interface + GetBrokerAddress func(brokerServiceURL string, proxyThroughServiceURL bool) (*LookupResult, error) + GetPartitionedTopicMetadata func(topic string) (*PartitionedTopicMetadata, error) + GetSchema func(topic string, schemaVersion []byte) (schema *pb.Schema, err error) + GetTopicsOfNamespace func(namespace string, mode GetTopicsOfNamespaceMode) ([]string, error) + Lookup func(topic string) (*LookupResult, error) + ServiceNameResolver func() *ServiceNameResolver + func NewHTTPLookupService(httpClient HTTPClient, serviceURL *url.URL, ...) LookupService + func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, ...) LookupService + type MemoryLimitController interface + CurrentUsage func() int64 + CurrentUsagePercent func() float64 + ForceReserveMemory func(size int64) + IsMemoryLimited func() bool + RegisterTrigger func(trigger func()) + ReleaseMemory func(size int64) + ReserveMemory func(ctx context.Context, size int64) bool + TryReserveMemory func(size int64) bool + func NewMemoryLimitController(limit int64, threshold float64) MemoryLimitController + type MessageReader struct + func NewMessageReader(headersAndPayload Buffer) *MessageReader + func NewMessageReaderFromArray(headersAndPayload []byte) *MessageReader + func (r *MessageReader) ReadBrokerMetadata() (*pb.BrokerEntryMetadata, error) + func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error) + func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) + func (r *MessageReader) ResetBuffer(buffer Buffer) + type Metrics struct + ConnectionsClosed prometheus.Counter + ConnectionsEstablishmentErrors prometheus.Counter + ConnectionsHandshakeErrors prometheus.Counter + ConnectionsOpened prometheus.Counter + LookupRequestsCount prometheus.Counter + PartitionedTopicMetadataRequestsCount prometheus.Counter + RPCRequestCount prometheus.Counter + func NewMetricsProvider(metricsCardinality int, userDefinedLabels map[string]string, ...) *Metrics + func (mp *Metrics) GetLeveledMetrics(t string) *LeveledMetrics + type PartitionedTopicMetadata struct + Partitions int + type PulsarServiceURI struct + ServiceHosts []string + ServiceInfos []string + ServiceName string + URL *url.URL + func NewPulsarServiceURIFromURIString(uri string) (*PulsarServiceURI, error) + func NewPulsarServiceURIFromURL(url *url.URL) (*PulsarServiceURI, error) + type RPCClient interface + LookupService func(URL string) LookupService + NewConsumerID func() uint64 + NewProducerID func() uint64 + NewRequestID func() uint64 + Request func(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64, ...) (*RPCResult, error) + RequestOnCnx func(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, ...) (*RPCResult, error) + RequestOnCnxNoWait func(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error + RequestToAnyBroker func(requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) + RequestToHost func(serviceNameResolver *ServiceNameResolver, requestID uint64, ...) (*RPCResult, error) + func NewRPCClient(serviceURL *url.URL, pool ConnectionPool, requestTimeout time.Duration, ...) RPCClient + type RPCResult struct + Cnx Connection + Response *pb.BaseCommand + type Semaphore interface + Acquire func(ctx context.Context) bool + Release func() + TryAcquire func() bool + func NewSemaphore(maxPermits int32) Semaphore + type ServiceNameResolver interface + GetAddressList func() []*url.URL + GetServiceURI func() *PulsarServiceURI + GetServiceURL func() *url.URL + ResolveHost func() (*url.URL, error) + ResolveHostURI func() (*PulsarServiceURI, error) + UpdateServiceURL func(url *url.URL) error + func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver + type TLSOptions struct + AllowInsecureConnection bool + CertFile string + CipherSuites []uint16 + KeyFile string + MaxVersion uint16 + MinVersion uint16 + ServerName string + TrustCertsFilePath string + ValidateHostname bool + type TopicName struct + Domain string + Name string + Namespace string + Partition int + Tenant string + Topic string + func ParseTopicName(topic string) (*TopicName, error)