internal

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2019 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MaxMessageSize limit message size for transfer
	MaxMessageSize = 5 * 1024 * 1024
	// MaxBatchSize will be the largest size for a batch sent from this particular producer.
	// This is used as a baseline to allocate a new buffer that can hold the entire batch
	// without needing costly re-allocations.
	MaxBatchSize = 128 * 1024
	// DefaultMaxMessagesPerBatch init default num of entries in per batch.
	DefaultMaxMessagesPerBatch = 1000
)
View Source
const (
	// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
	MaxFrameSize = 5 * 1024 * 1024 * 1024
)

Variables

View Source
var ErrCorruptedMessage = errors.New("corrupted message")

ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data. The data is considered corrupted if it's missing a header, a checksum mismatch or there was an error when unmarshalling the message metadata.

View Source
var ErrEOM = errors.New("EOF")

ErrEOM is the error returned by ReadMessage when no more input is available.

Functions

func ConvertFromStringMap

func ConvertFromStringMap(m map[string]string) []*pb.KeyValue

ConvertFromStringMap convert a string map to a KeyValue []byte

func ConvertToStringMap

func ConvertToStringMap(pbb []*pb.KeyValue) map[string]string

ConvertToStringMap convert a KeyValue []byte to string map

func Crc32cCheckSum

func Crc32cCheckSum(data []byte) uint32

Crc32cCheckSum handles computing the checksum.

func GetAndAdd

func GetAndAdd(n *uint64, diff uint64) uint64

GetAndAdd perform atomic read and update

func JavaStringHash

func JavaStringHash(s string) uint32

JavaStringHash and Java String.hashCode() equivalent

func Murmur3_32Hash

func Murmur3_32Hash(s string) uint32

Murmur3_32Hash use Murmur3 hashing function

func NewDefaultRouter

func NewDefaultRouter(clock Clock, hashFunc func(string) uint32, maxBatchingDelay time.Duration) func(string, uint32) int

NewDefaultRouter set the message routing mode for the partitioned producer. Default routing mode is round-robin routing.

func TimestampMillis

func TimestampMillis(t time.Time) uint64

TimestampMillis return a time unix nano.

Types

type Backoff

type Backoff struct {
	// contains filtered or unexported fields
}

Backoff

func (*Backoff) Next

func (b *Backoff) Next() time.Duration

Next

type BatchBuilder

type BatchBuilder struct {
	// contains filtered or unexported fields
}

BatchBuilder wraps the objects needed to build a batch.

func NewBatchBuilder

func NewBatchBuilder(maxMessages uint, producerName string, producerID uint64,
	compressionType pb.CompressionType) (*BatchBuilder, error)

NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.

func (*BatchBuilder) Add

func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceID uint64, payload []byte,
	callback interface{}, replicateTo []string) bool

Add will add single message to batch.

func (*BatchBuilder) Flush

func (bb *BatchBuilder) Flush() (batchData []byte, sequenceID uint64, callbacks []interface{})

Flush all the messages buffered in the client and wait until all messages have been successfully persisted.

func (*BatchBuilder) IsFull

func (bb *BatchBuilder) IsFull() bool

IsFull check if the size in the current batch exceeds the maximum size allowed by the batch

type BlockingQueue

type BlockingQueue interface {
	// Put enqueue one item, block if the queue is full
	Put(item interface{})

	// Take dequeue one item, block until it's available
	Take() interface{}

	// Poll dequeue one item, return nil if queue is empty
	Poll() interface{}

	// Peek return the first item without dequeing, return nil if queue is empty
	Peek() interface{}

	// PeekLast return last item in queue without dequeing, return nil if queue is empty
	PeekLast() interface{}

	// Size return the current size of the queue
	Size() int

	// Iterator return an iterator for the queue
	Iterator() BlockingQueueIterator
}

BlockingQueue is a interface of block queue

func NewBlockingQueue

func NewBlockingQueue(maxSize int) BlockingQueue

NewBlockingQueue init block queue and returns a BlockingQueue

type BlockingQueueIterator

type BlockingQueueIterator interface {
	HasNext() bool
	Next() interface{}
}

BlockingQueueIterator abstract a interface of block queue iterator.

type Buffer

type Buffer interface {
	ReadableBytes() uint32

	WritableBytes() uint32

	// Capacity returns the capacity of the buffer's underlying byte slice,
	// that is, the total space allocated for the buffer's data.
	Capacity() uint32

	IsWritable() bool

	Read(size uint32) []byte

	Get(readerIndex uint32, size uint32) []byte

	ReadableSlice() []byte

	WritableSlice() []byte

	// WrittenBytes advance the writer index when data was written in a slice
	WrittenBytes(size uint32)

	// MoveToFront copy the available portion of data at the beginning of the buffer
	MoveToFront()

	ReadUint16() uint16
	ReadUint32() uint32

	WriteUint16(n uint16)
	WriteUint32(n uint32)

	WriterIndex() uint32
	ReaderIndex() uint32

	Write(s []byte)

	Put(writerIdx uint32, s []byte)
	PutUint32(n uint32, writerIdx uint32)

	Resize(newSize uint32)

	// Clear will clear the current buffer data.
	Clear()
}

Buffer is a variable-sized buffer of bytes with Read and Write methods. The zero value for Buffer is an empty buffer ready to use.

func NewBuffer

func NewBuffer(size int) Buffer

NewBuffer creates and initializes a new Buffer using buf as its initial contents.

func NewBufferWrapper

func NewBufferWrapper(buf []byte) Buffer

type CheckSum

type CheckSum struct {
	// contains filtered or unexported fields
}

func (*CheckSum) Write

func (cs *CheckSum) Write(p []byte) (int, error)

type Clock

type Clock func() uint64

func NewSystemClock

func NewSystemClock() Clock

NewSystemClock init system clock and return current time.

type Closable

type Closable interface {
	Close()
}

type Connection

type Connection interface {
	SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
	SendRequestNoWait(req *pb.BaseCommand)
	WriteData(data []byte)
	RegisterListener(id uint64, listener ConnectionListener)
	UnregisterListener(id uint64)
	AddConsumeHandler(id uint64, handler ConsumerHandler)
	DeleteConsumeHandler(id uint64)
	Close()
}

Connection is a interface of client cnx.

type ConnectionListener

type ConnectionListener interface {
	// ReceivedSendReceipt receive and process the return value of the send command.
	ReceivedSendReceipt(response *pb.CommandSendReceipt)

	// ConnectionClosed close the TCP connection.
	ConnectionClosed()
}

ConnectionListener is a user of a connection (eg. a producer or a consumer) that can register itself to get notified when the connection is closed.

type ConnectionPool

type ConnectionPool interface {
	// GetConnection get a connection from ConnectionPool.
	GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error)

	// Close all the connections in the pool
	Close()
}

ConnectionPool is a interface of connection pool.

func NewConnectionPool

func NewConnectionPool(tlsOptions *TLSOptions, auth auth.Provider) ConnectionPool

NewConnectionPool init connection pool.

type ConsumerHandler

type ConsumerHandler interface {
	MessageReceived(response *pb.CommandMessage, headersAndPayload Buffer) error

	// ConnectionClosed close the TCP connection.
	ConnectionClosed()
}

type LookupResult

type LookupResult struct {
	LogicalAddr  *url.URL
	PhysicalAddr *url.URL
}

LookupResult encapsulates a struct for lookup a request, containing two parts: LogicalAddr, PhysicalAddr.

type LookupService

type LookupService interface {
	// Lookup perform a lookup for the given topic, confirm the location of the broker
	// where the topic is located, and return the LookupResult.
	Lookup(topic string) (*LookupResult, error)
}

LookupService is a interface of lookup service.

func NewLookupService

func NewLookupService(rpcClient RPCClient, serviceURL *url.URL) LookupService

NewLookupService init a lookup service struct and return an object of LookupService.

type MessageReader

type MessageReader struct {
	// contains filtered or unexported fields
}

MessageReader provides helper methods to parse the metadata and messages from the binary format Wire format for a messages

Old format (single message) [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]

Batch format [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [METADATA_SIZE][METADATA][PAYLOAD] [METADATA_SIZE][METADATA][PAYLOAD]

func NewMessageReader

func NewMessageReader(headersAndPayload Buffer) *MessageReader

func NewMessageReaderFromArray

func NewMessageReaderFromArray(headersAndPayload []byte) *MessageReader

func (*MessageReader) ReadMessage

func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error)

func (*MessageReader) ReadMessageMetadata

func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error)

func (*MessageReader) ResetBuffer

func (r *MessageReader) ResetBuffer(buffer Buffer)

type RPCClient

type RPCClient interface {
	// Create a new unique request id
	NewRequestID() uint64

	NewProducerID() uint64

	NewConsumerID() uint64

	// Send a request and block until the result is available
	RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)

	Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
		cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)

	RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message)

	RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
}

func NewRPCClient

func NewRPCClient(serviceURL *url.URL, pool ConnectionPool) RPCClient

type RPCResult

type RPCResult struct {
	Response *pb.BaseCommand
	Cnx      Connection
}

type Semaphore

type Semaphore chan bool

Semaphore is a channel of bool, used to receive a bool type semaphore.

func (Semaphore) Acquire

func (s Semaphore) Acquire()

Acquire a permit, if one is available and returns immediately, reducing the number of available permits by one.

func (Semaphore) Release

func (s Semaphore) Release()

Release a permit, increasing the number of available permits by one. If any threads are trying to acquire a permit, then one is selected and given the permit that was just released. That thread is (re)enabled for thread scheduling purposes. There is no requirement that a thread that releases a permit must have acquired that permit by calling Acquire(). Correct usage of a semaphore is established by programming convention in the application.

type TLSOptions

type TLSOptions struct {
	TrustCertsFilePath      string
	AllowInsecureConnection bool
	ValidateHostname        bool
}

type TopicName

type TopicName struct {
	Name      string
	Namespace string
	Partition int
}

TopicName abstract a struct contained in a Topic

func ParseTopicName

func ParseTopicName(topic string) (*TopicName, error)

ParseTopicName parse the given topic name and return TopicName.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL