Documentation ¶
Index ¶
- Constants
- Variables
- func CloseIdleConnections(c *http.Client)
- func ConvertFromStringMap(m map[string]string) []*pb.KeyValue
- func ConvertToStringMap(pbb []*pb.KeyValue) map[string]string
- func Crc32cCheckSum(data []byte) uint32
- func GetAndAdd(n *uint64, diff uint64) uint64
- func GetTopicRestPath(tn *TopicName) string
- func IsV2Namespace(namespace string) bool
- func IsV2TopicName(tn *TopicName) bool
- func JavaStringHash(s string) uint32
- func Murmur3_32Hash(s string) uint32
- func ParseRelativeTimeInSeconds(relativeTime string) (time.Duration, error)
- func TimestampMillis(t time.Time) uint64
- func TopicNameWithoutPartitionPart(tn *TopicName) string
- type Backoff
- type BatchBuilder
- type BatcherBuilderProvider
- type BlockingQueue
- type Buffer
- type BuffersPool
- type CheckSum
- type ClientHandlers
- type Closable
- type Connection
- type ConnectionListener
- type ConnectionPool
- type ConsumerHandler
- type GetTopicsOfNamespaceMode
- type HTTPClient
- type LeveledMetrics
- type LookupResult
- type LookupService
- type MessageReader
- type Metrics
- type PartitionedTopicMetadata
- type PulsarServiceURI
- type RPCClient
- type RPCResult
- type Semaphore
- type ServiceNameResolver
- type TLSOptions
- type TopicName
Constants ¶
const ( // MaxMessageSize limit message size for transfer MaxMessageSize = 5 * 1024 * 1024 // MessageFramePadding is for metadata and other frame headers MessageFramePadding = 10 * 1024 // MaxFrameSize limit the maximum size that pulsar allows for messages to be sent. MaxFrameSize = MaxMessageSize + MessageFramePadding )
const ( // TODO: Find a better way to embed the version in the library code PulsarVersion = "0.1" ClientVersionString = "Pulsar Go " + PulsarVersion PulsarProtocolVersion = int32(pb.ProtocolVersion_v13) )
const ( Persistent GetTopicsOfNamespaceMode = "PERSISTENT" NonPersistent = "NON_PERSISTENT" All = "ALL" )
const ( BinaryService = "pulsar" HTTPService = "http" HTTPSService = "https" SSLService = "ssl" BinaryPort = 6650 BinaryTLSPort = 6651 HTTPPort = 80 HTTPSPort = 443 )
const HTTPAdminServiceV1Format string = "/admin/%s/partitions"
const HTTPAdminServiceV2Format string = "/admin/v2/%s/partitions"
const HTTPLookupServiceBasePathV1 string = "/lookup/v2/destination/"
const HTTPLookupServiceBasePathV2 string = "/lookup/v2/topic/"
const HTTPTopicUnderNamespaceV1 string = "/admin/namespaces/%s/destinations?mode=%s"
const HTTPTopicUnderNamespaceV2 string = "/admin/v2/namespaces/%s/topics?mode=%s"
Variables ¶
var ErrConnectionClosed = errors.New("connection closed")
var ErrCorruptedMessage = errors.New("corrupted message")
ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data. The data is considered corrupted if it's missing a header, a checksum mismatch or there was an error when unmarshalling the message metadata.
var ErrEOM = errors.New("EOF")
ErrEOM is the error returned by ReadMessage when no more input is available.
var ( // ErrRequestTimeOut happens when request not finished in given requestTimeout. ErrRequestTimeOut = errors.New("request timed out") )
Functions ¶
func CloseIdleConnections ¶
func ConvertFromStringMap ¶
ConvertFromStringMap convert a string map to a KeyValue []byte
func ConvertToStringMap ¶
ConvertToStringMap convert a KeyValue []byte to string map
func Crc32cCheckSum ¶
Crc32cCheckSum handles computing the checksum.
func GetTopicRestPath ¶
func IsV2Namespace ¶
func IsV2TopicName ¶
func JavaStringHash ¶
JavaStringHash and Java String.hashCode() equivalent
func Murmur3_32Hash ¶
Murmur3_32Hash use Murmur3 hashing function
func TimestampMillis ¶
TimestampMillis return a time unix nano.
Types ¶
type Backoff ¶
type Backoff struct {
// contains filtered or unexported fields
}
Backoff computes the delay before retrying an action. It uses an exponential backoff with jitter. The jitter represents up to 20 percents of the delay.
type BatchBuilder ¶
type BatchBuilder interface { // IsFull check if the size in the current batch exceeds the maximum size allowed by the batch IsFull() bool // Add will add single message to batch. Add( metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64, payload []byte, callback interface{}, replicateTo []string, deliverAt time.Time, ) bool // Flush all the messages buffered in the client and wait until all messages have been successfully persisted. Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}, err error) // Flush all the messages buffered in multiple batches and wait until all // messages have been successfully persisted. FlushBatches() ( batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}, errors []error, ) // Return the batch container batch message in multiple batches. IsMultiBatches() bool Close() error // contains filtered or unexported methods }
BatchBuilder is a interface of batch builders
func NewBatchBuilder ¶
func NewBatchBuilder( maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor, ) (BatchBuilder, error)
NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
func NewKeyBasedBatchBuilder ¶
func NewKeyBasedBatchBuilder( maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor, ) (BatchBuilder, error)
NewKeyBasedBatchBuilder init batch builder and return BatchBuilder pointer. Build a new key based batch message container.
type BatcherBuilderProvider ¶
type BatcherBuilderProvider func( maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor, ) (BatchBuilder, error)
BatcherBuilderProvider defines func which returns the BatchBuilder.
type BlockingQueue ¶
type BlockingQueue interface { // Put enqueue one item, block if the queue is full Put(item interface{}) // Take dequeue one item, block until it's available Take() interface{} // Poll dequeue one item, return nil if queue is empty Poll() interface{} // CompareAndPoll compare the first item and poll it if meet the conditions CompareAndPoll(compare func(item interface{}) bool) interface{} // Peek return the first item without dequeing, return nil if queue is empty Peek() interface{} // PeekLast return last item in queue without dequeing, return nil if queue is empty PeekLast() interface{} // Size return the current size of the queue Size() int // ReadableSlice returns a new view of the readable items in the queue ReadableSlice() []interface{} }
BlockingQueue is a interface of block queue
func NewBlockingQueue ¶
func NewBlockingQueue(maxSize int) BlockingQueue
NewBlockingQueue init block queue and returns a BlockingQueue
type Buffer ¶
type Buffer interface { ReadableBytes() uint32 WritableBytes() uint32 // Capacity returns the capacity of the buffer's underlying byte slice, // that is, the total space allocated for the buffer's data. Capacity() uint32 IsWritable() bool Read(size uint32) []byte Get(readerIndex uint32, size uint32) []byte ReadableSlice() []byte WritableSlice() []byte // WrittenBytes advance the writer index when data was written in a slice WrittenBytes(size uint32) // MoveToFront copy the available portion of data at the beginning of the buffer MoveToFront() ReadUint16() uint16 ReadUint32() uint32 WriteUint16(n uint16) WriteUint32(n uint32) WriterIndex() uint32 ReaderIndex() uint32 Write(s []byte) Put(writerIdx uint32, s []byte) PutUint32(n uint32, writerIdx uint32) Resize(newSize uint32) ResizeIfNeeded(spaceNeeded uint32) // Clear will clear the current buffer data. Clear() }
Buffer is a variable-sized buffer of bytes with Read and Write methods. The zero value for Buffer is an empty buffer ready to use.
func NewBufferWrapper ¶
type BuffersPool ¶
type BuffersPool interface {
GetBuffer() Buffer
}
type ClientHandlers ¶
type ClientHandlers struct {
// contains filtered or unexported fields
}
ClientHandlerMap is a simple concurrent-safe map for the client type
func NewClientHandlers ¶
func NewClientHandlers() ClientHandlers
func (*ClientHandlers) Add ¶
func (h *ClientHandlers) Add(c Closable)
func (*ClientHandlers) Close ¶
func (h *ClientHandlers) Close()
func (*ClientHandlers) Del ¶
func (h *ClientHandlers) Del(c Closable)
func (*ClientHandlers) Val ¶
func (h *ClientHandlers) Val(c Closable) bool
type Connection ¶
type Connection interface { SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error)) SendRequestNoWait(req *pb.BaseCommand) error WriteData(data Buffer) RegisterListener(id uint64, listener ConnectionListener) UnregisterListener(id uint64) AddConsumeHandler(id uint64, handler ConsumerHandler) DeleteConsumeHandler(id uint64) ID() string GetMaxMessageSize() int32 Close() }
Connection is a interface of client cnx.
type ConnectionListener ¶
type ConnectionListener interface { // ReceivedSendReceipt receive and process the return value of the send command. ReceivedSendReceipt(response *pb.CommandSendReceipt) // ConnectionClosed close the TCP connection. ConnectionClosed() }
ConnectionListener is a user of a connection (eg. a producer or a consumer) that can register itself to get notified when the connection is closed.
type ConnectionPool ¶
type ConnectionPool interface { // GetConnection get a connection from ConnectionPool. GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) // Close all the connections in the pool Close() }
ConnectionPool is a interface of connection pool.
func NewConnectionPool ¶
func NewConnectionPool( tlsOptions *TLSOptions, auth auth.Provider, connectionTimeout time.Duration, maxConnectionsPerHost int, logger log.Logger, metrics *Metrics) ConnectionPool
NewConnectionPool init connection pool.
type ConsumerHandler ¶
type ConsumerHandler interface { MessageReceived(response *pb.CommandMessage, headersAndPayload Buffer) error // ConnectionClosed close the TCP connection. ConnectionClosed() }
type GetTopicsOfNamespaceMode ¶
type GetTopicsOfNamespaceMode string
GetTopicsOfNamespaceMode for CommandGetTopicsOfNamespace_Mode
type HTTPClient ¶
type HTTPClient interface { Get(endpoint string, obj interface{}, params map[string]string) error Closable }
func NewHTTPClient ¶
func NewHTTPClient(serviceURL *url.URL, serviceNameResolver ServiceNameResolver, tlsConfig *TLSOptions, requestTimeout time.Duration, logger log.Logger, metrics *Metrics, authProvider auth.Provider) (HTTPClient, error)
type LeveledMetrics ¶
type LeveledMetrics struct { MessagesPublished prometheus.Counter BytesPublished prometheus.Counter MessagesPending prometheus.Gauge BytesPending prometheus.Gauge PublishErrorsTimeout prometheus.Counter PublishErrorsMsgTooLarge prometheus.Counter PublishLatency prometheus.Observer PublishRPCLatency prometheus.Observer MessagesReceived prometheus.Counter BytesReceived prometheus.Counter PrefetchedMessages prometheus.Gauge PrefetchedBytes prometheus.Gauge AcksCounter prometheus.Counter NacksCounter prometheus.Counter DlqCounter prometheus.Counter ProcessingTime prometheus.Observer ProducersOpened prometheus.Counter ProducersClosed prometheus.Counter ProducersPartitions prometheus.Gauge ConsumersOpened prometheus.Counter ConsumersClosed prometheus.Counter ConsumersPartitions prometheus.Gauge ReadersOpened prometheus.Counter ReadersClosed prometheus.Counter }
type LookupResult ¶
LookupResult encapsulates a struct for lookup a request, containing two parts: LogicalAddr, PhysicalAddr.
type LookupService ¶
type LookupService interface { // Lookup perform a lookup for the given topic, confirm the location of the broker // where the topic is located, and return the LookupResult. Lookup(topic string) (*LookupResult, error) // GetPartitionedTopicMetadata perform a CommandPartitionedTopicMetadata request for // the given topic, returns the CommandPartitionedTopicMetadataResponse as the result. GetPartitionedTopicMetadata(topic string) (*PartitionedTopicMetadata, error) // GetTopicsOfNamespace returns all the topics name for a given namespace. GetTopicsOfNamespace(namespace string, mode GetTopicsOfNamespaceMode) ([]string, error) // Closable Allow Lookup Service's internal client to be able to closed Closable }
LookupService is a interface of lookup service.
func NewHTTPLookupService ¶
func NewHTTPLookupService(httpClient HTTPClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver, tlsEnabled bool, logger log.Logger, metrics *Metrics) LookupService
NewHTTPLookupService init a http based lookup service struct and return an object of LookupService.
func NewLookupService ¶
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver, tlsEnabled bool, listenerName string, logger log.Logger, metrics *Metrics) LookupService
NewLookupService init a lookup service struct and return an object of LookupService.
type MessageReader ¶
type MessageReader struct {
// contains filtered or unexported fields
}
MessageReader provides helper methods to parse the metadata and messages from the binary format Wire format for a messages
Old format (single message) [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
Batch format [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [METADATA_SIZE][METADATA][PAYLOAD] [METADATA_SIZE][METADATA][PAYLOAD]
func NewMessageReader ¶
func NewMessageReader(headersAndPayload Buffer) *MessageReader
func NewMessageReaderFromArray ¶
func NewMessageReaderFromArray(headersAndPayload []byte) *MessageReader
func (*MessageReader) ReadMessage ¶
func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error)
func (*MessageReader) ReadMessageMetadata ¶
func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error)
func (*MessageReader) ResetBuffer ¶
func (r *MessageReader) ResetBuffer(buffer Buffer)
type Metrics ¶
type Metrics struct { // Metrics that are not labeled with specificity are immediately available ConnectionsOpened prometheus.Counter ConnectionsClosed prometheus.Counter ConnectionsEstablishmentErrors prometheus.Counter ConnectionsHandshakeErrors prometheus.Counter LookupRequestsCount prometheus.Counter PartitionedTopicMetadataRequestsCount prometheus.Counter RPCRequestCount prometheus.Counter // contains filtered or unexported fields }
func NewMetricsProvider ¶
func (*Metrics) GetLeveledMetrics ¶
func (mp *Metrics) GetLeveledMetrics(t string) *LeveledMetrics
type PartitionedTopicMetadata ¶
type PartitionedTopicMetadata struct {
Partitions int `json:"partitions"` // Number of partitions for the topic
}
PartitionedTopicMetadata encapsulates a struct for metadata of a partitioned topic
type PulsarServiceURI ¶
type PulsarServiceURI struct { ServiceName string ServiceInfos []string ServiceHosts []string URL *url.URL // contains filtered or unexported fields }
func NewPulsarServiceURIFromURIString ¶
func NewPulsarServiceURIFromURIString(uri string) (*PulsarServiceURI, error)
func NewPulsarServiceURIFromURL ¶
func NewPulsarServiceURIFromURL(url *url.URL) (*PulsarServiceURI, error)
type RPCClient ¶
type RPCClient interface { // Create a new unique request id NewRequestID() uint64 NewProducerID() uint64 NewConsumerID() uint64 // Send a request and block until the result is available RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) }
func NewRPCClient ¶
func NewRPCClient(serviceURL *url.URL, serviceNameResolver ServiceNameResolver, pool ConnectionPool, requestTimeout time.Duration, logger log.Logger, metrics *Metrics) RPCClient
type RPCResult ¶
type RPCResult struct { Response *pb.BaseCommand Cnx Connection }
type Semaphore ¶
type Semaphore interface { // Acquire a permit, if one is available and returns immediately, // reducing the number of available permits by one. Acquire(ctx context.Context) bool // Try to acquire a permit. The method will return immediately // with a `true` if it was possible to acquire a permit and // `false` otherwise. TryAcquire() bool // Release a permit, returning it to the semaphore. // Release a permit, increasing the number of available permits by // one. If any threads are trying to acquire a permit, then one is // selected and given the permit that was just released. That thread // is (re)enabled for thread scheduling purposes. // There is no requirement that a thread that releases a permit must // have acquired that permit by calling Acquire(). // Correct usage of a semaphore is established by programming convention // in the application. Release() }
func NewSemaphore ¶
type ServiceNameResolver ¶
type ServiceNameResolver interface { ResolveHost() (*url.URL, error) ResolveHostURI() (*PulsarServiceURI, error) UpdateServiceURL(url *url.URL) error GetServiceURI() *PulsarServiceURI GetServiceURL() *url.URL GetAddressList() []*url.URL }
func NewPulsarServiceNameResolver ¶
func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver
type TLSOptions ¶
Source Files ¶
- backoff.go
- batch_builder.go
- blocking_queue.go
- buffer.go
- checksum.go
- client_handlers.go
- closable.go
- commands.go
- connection.go
- connection_pool.go
- connection_reader.go
- hash.go
- http_client.go
- http_client_go_1.12.go
- key_based_batch_builder.go
- lookup_service.go
- metrics.go
- namespace_name.go
- rpc_client.go
- semaphore.go
- service_name_resolver.go
- service_uri.go
- topic_name.go
- utils.go