Documentation ¶
Index ¶
- Constants
- Variables
- func ConvertFromStringMap(m map[string]string) []*pb.KeyValue
- func ConvertToStringMap(pbb []*pb.KeyValue) map[string]string
- func Crc32cCheckSum(data []byte) uint32
- func GetAndAdd(n *uint64, diff uint64) uint64
- func GetPartitionedTopicName(topic string, partitionIdx int) string
- func JavaStringHash(s string) uint32
- func Murmur3_32Hash(s string) uint32
- func NewDefaultRouter(clock Clock, hashFunc func(string) uint32, maxBatchingDelay time.Duration, ...) func(string, uint32) int
- func ParseRelativeTimeInSeconds(relativeTime string) (time.Duration, error)
- func ParseTopicNameToString(topic string) string
- func TimestampMillis(t time.Time) uint64
- func TopicNameWithoutPartitionPart(tn *TopicName) string
- type Backoff
- type BatchBuilder
- type BlockingQueue
- type Buffer
- type BuffersPool
- type CheckSum
- type ClientHandlers
- type Clock
- type Closable
- type Connection
- type ConnectionListener
- type ConnectionPool
- type ConsumerHandler
- type LookupResult
- type LookupService
- type MessageReader
- type RPCClient
- type RPCResult
- type Semaphore
- type TLSOptions
- type TopicName
Constants ¶
const ( // DefaultMaxBatchSize init default for maximum number of bytes per batch DefaultMaxBatchSize = 128 * 1024 // DefaultMaxMessagesPerBatch init default num of entries in per batch. DefaultMaxMessagesPerBatch = 1000 )
const ( // MaxFrameSize limit the maximum size that pulsar allows for messages to be sent. MaxFrameSize = 5 * 1024 * 1024 // MessageFramePadding is for metadata and other frame headers MessageFramePadding = 10 * 1024 // MaxMessageSize limit message size for transfer MaxMessageSize = MaxFrameSize - MessageFramePadding )
const ( // TODO: Find a better way to embed the version in the library code PulsarVersion = "0.1" ClientVersionString = "Pulsar Go " + PulsarVersion PulsarProtocolVersion = int32(pb.ProtocolVersion_v13) )
Variables ¶
var ErrCorruptedMessage = errors.New("corrupted message")
ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data. The data is considered corrupted if it's missing a header, a checksum mismatch or there was an error when unmarshalling the message metadata.
var ErrEOM = errors.New("EOF")
ErrEOM is the error returned by ReadMessage when no more input is available.
Functions ¶
func ConvertFromStringMap ¶
ConvertFromStringMap convert a string map to a KeyValue []byte
func ConvertToStringMap ¶
ConvertToStringMap convert a KeyValue []byte to string map
func Crc32cCheckSum ¶
Crc32cCheckSum handles computing the checksum.
func GetPartitionedTopicName ¶
func JavaStringHash ¶
JavaStringHash and Java String.hashCode() equivalent
func Murmur3_32Hash ¶
Murmur3_32Hash use Murmur3 hashing function
func NewDefaultRouter ¶
func NewDefaultRouter(clock Clock, hashFunc func(string) uint32, maxBatchingDelay time.Duration, disableBatching bool) func(string, uint32) int
NewDefaultRouter set the message routing mode for the partitioned producer. Default routing mode is round-robin routing.
func ParseTopicNameToString ¶
func TimestampMillis ¶
TimestampMillis return a time unix nano.
Types ¶
type BatchBuilder ¶
type BatchBuilder struct {
// contains filtered or unexported fields
}
BatchBuilder wraps the objects needed to build a batch.
func NewBatchBuilder ¶
func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, bufferPool BuffersPool) (*BatchBuilder, error)
NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
func (*BatchBuilder) Add ¶
func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceID uint64, payload []byte, callback interface{}, replicateTo []string, deliverAt time.Time) bool
Add will add single message to batch.
func (*BatchBuilder) Close ¶
func (bb *BatchBuilder) Close() error
func (*BatchBuilder) Flush ¶
func (bb *BatchBuilder) Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{})
Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
func (*BatchBuilder) IsFull ¶
func (bb *BatchBuilder) IsFull() bool
IsFull check if the size in the current batch exceeds the maximum size allowed by the batch
type BlockingQueue ¶
type BlockingQueue interface { // Put enqueue one item, block if the queue is full Put(item interface{}) // Take dequeue one item, block until it's available Take() interface{} // Poll dequeue one item, return nil if queue is empty Poll() interface{} // Peek return the first item without dequeing, return nil if queue is empty Peek() interface{} // PeekLast return last item in queue without dequeing, return nil if queue is empty PeekLast() interface{} // Size return the current size of the queue Size() int // ReadableSlice returns a new view of the readable items in the queue ReadableSlice() []interface{} }
BlockingQueue is a interface of block queue
func NewBlockingQueue ¶
func NewBlockingQueue(maxSize int) BlockingQueue
NewBlockingQueue init block queue and returns a BlockingQueue
type Buffer ¶
type Buffer interface { ReadableBytes() uint32 WritableBytes() uint32 // Capacity returns the capacity of the buffer's underlying byte slice, // that is, the total space allocated for the buffer's data. Capacity() uint32 IsWritable() bool Read(size uint32) []byte Get(readerIndex uint32, size uint32) []byte ReadableSlice() []byte WritableSlice() []byte // WrittenBytes advance the writer index when data was written in a slice WrittenBytes(size uint32) // MoveToFront copy the available portion of data at the beginning of the buffer MoveToFront() ReadUint16() uint16 ReadUint32() uint32 WriteUint16(n uint16) WriteUint32(n uint32) WriterIndex() uint32 ReaderIndex() uint32 Write(s []byte) Put(writerIdx uint32, s []byte) PutUint32(n uint32, writerIdx uint32) Resize(newSize uint32) ResizeIfNeeded(spaceNeeded uint32) // Clear will clear the current buffer data. Clear() }
Buffer is a variable-sized buffer of bytes with Read and Write methods. The zero value for Buffer is an empty buffer ready to use.
func NewBufferWrapper ¶
type BuffersPool ¶
type BuffersPool interface {
GetBuffer() Buffer
}
type ClientHandlers ¶
type ClientHandlers struct {
// contains filtered or unexported fields
}
ClientHandlerMap is a simple concurrent-safe map for the client type
func NewClientHandlers ¶
func NewClientHandlers() ClientHandlers
func (*ClientHandlers) Add ¶
func (h *ClientHandlers) Add(c Closable)
func (*ClientHandlers) Close ¶
func (h *ClientHandlers) Close()
func (*ClientHandlers) Del ¶
func (h *ClientHandlers) Del(c Closable)
func (*ClientHandlers) Val ¶
func (h *ClientHandlers) Val(c Closable) bool
type Clock ¶
type Clock func() uint64
func NewSystemClock ¶
func NewSystemClock() Clock
NewSystemClock init system clock and return current time.
type Connection ¶
type Connection interface { SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error)) SendRequestNoWait(req *pb.BaseCommand) WriteData(data Buffer) RegisterListener(id uint64, listener ConnectionListener) UnregisterListener(id uint64) AddConsumeHandler(id uint64, handler ConsumerHandler) DeleteConsumeHandler(id uint64) ID() string GetMaxMessageSize() int32 Close() }
Connection is a interface of clientOptions cnx.
type ConnectionListener ¶
type ConnectionListener interface { // ReceivedSendReceipt receive and process the return value of the send command. ReceivedSendReceipt(response *pb.CommandSendReceipt) // ConnectionClosed close the TCP connection. ConnectionClosed() }
ConnectionListener is a user of a connection (eg. a producer or a consumer) that can register itself to get notified when the connection is closed.
type ConnectionPool ¶
type ConnectionPool interface { // GetConnection get a connection from ConnectionPool. GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) // Close all the connections in the pool Close() }
ConnectionPool is a interface of connection pool.
func NewConnectionPool ¶
func NewConnectionPool( tlsOptions *TLSOptions, auth auth.Provider, connectionTimeout time.Duration, maxConnectionsPerHost int) ConnectionPool
NewConnectionPool init connection pool.
func NewConnectionPoolWithAuthCloud ¶
func NewConnectionPoolWithAuthCloud( authCloud authcloud.AuthenticationCloud, tlsOptions *TLSOptions, auth auth.Provider, connectionTimeout time.Duration, maxConnectionsPerHost int) ConnectionPool
type ConsumerHandler ¶
type ConsumerHandler interface { MessageReceived(response *pb.CommandMessage, headersAndPayload Buffer) error // ConnectionClosed close the TCP connection. ConnectionClosed() }
type LookupResult ¶
LookupResult encapsulates a struct for lookup a request, containing two parts: LogicalAddr, PhysicalAddr.
type LookupService ¶
type LookupService interface { // Lookup perform a lookup for the given topic, confirm the location of the broker // where the topic is located, and return the LookupResult. Lookup(topic string) (*LookupResult, error) NetModelLookup(topic string, netModel string) (*LookupResult, error) }
LookupService is a interface of lookup service.
func NewLookupService ¶
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, tlsEnabled bool) LookupService
NewLookupService init a lookup service struct and return an object of LookupService.
type MessageReader ¶
type MessageReader struct {
// contains filtered or unexported fields
}
MessageReader provides helper methods to parse the metadata and messages from the binary format Wire format for a messages
Old format (single message) [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
Batch format [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [METADATA_SIZE][METADATA][PAYLOAD] [METADATA_SIZE][METADATA][PAYLOAD]
func NewMessageReader ¶
func NewMessageReader(headersAndPayload Buffer) *MessageReader
func NewMessageReaderFromArray ¶
func NewMessageReaderFromArray(headersAndPayload []byte) *MessageReader
func (*MessageReader) ReadMessage ¶
func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error)
func (*MessageReader) ReadMessageMetadata ¶
func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error)
func (*MessageReader) ResetBuffer ¶
func (r *MessageReader) ResetBuffer(buffer Buffer)
type RPCClient ¶
type RPCClient interface { // Create a new unique request id NewRequestID() uint64 NewProducerID() uint64 NewConsumerID() uint64 // Send a request and block until the result is available RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) }
func NewRPCClient ¶
type RPCResult ¶
type RPCResult struct { Response *pb.BaseCommand Cnx Connection }
type Semaphore ¶
type Semaphore interface { // Acquire a permit, if one is available and returns immediately, // reducing the number of available permits by one. Acquire() // Try to acquire a permit. The method will return immediately // with a `true` if it was possible to acquire a permit and // `false` otherwise. TryAcquire() bool // Release a permit, returning it to the semaphore. // Release a permit, increasing the number of available permits by // one. If any threads are trying to acquire a permit, then one is // selected and given the permit that was just released. That thread // is (re)enabled for thread scheduling purposes. // There is no requirement that a thread that releases a permit must // have acquired that permit by calling Acquire(). // Correct usage of a semaphore is established by programming convention // in the application. Release() }