topicreadercommon

package
v3.76.2-rc0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 31, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultBufferSize = 1024 * 1024

Variables

View Source
var ErrPublicUnexpectedCodec = xerrors.Wrap(errors.New("ydb: unexpected codec"))

ErrPublicUnexpectedCodec return when try to read message content with unknown codec

Functions

func BatchIsEmpty

func BatchIsEmpty(b *PublicBatch) bool

func CreateInitMessage

func CreateInitMessage(consumer string, selectors []*PublicReadSelector) *rawtopicreader.InitRequest

func MessageGetBufferBytesAccount

func MessageGetBufferBytesAccount(m *PublicMessage) int

func MessageSetNilDataForTest

func MessageSetNilDataForTest(m *PublicMessage)

func NextReaderID

func NextReaderID() int64

Types

type CallbackWithMessageContentFunc

type CallbackWithMessageContentFunc func(data []byte) error

CallbackWithMessageContentFunc is callback function for work with message content data bytes MUST NOT be used after f returned if you need content longer - copy content to other slice

type CommitRange

type CommitRange struct {
	CommitOffsetStart rawtopiccommon.Offset
	CommitOffsetEnd   rawtopiccommon.Offset
	PartitionSession  *PartitionSession
}

func GetCommitRange

func GetCommitRange(item PublicCommitRangeGetter) CommitRange

type CommitRanges

type CommitRanges struct {
	Ranges []CommitRange
}

func NewCommitRangesFromPublicCommits

func NewCommitRangesFromPublicCommits(ranges []PublicCommitRange) CommitRanges

func NewCommitRangesWithCapacity

func NewCommitRangesWithCapacity(capacity int) CommitRanges

func (*CommitRanges) Append

func (r *CommitRanges) Append(ranges ...PublicCommitRangeGetter)

func (*CommitRanges) AppendCommitRange

func (r *CommitRanges) AppendCommitRange(cr CommitRange)

func (*CommitRanges) AppendCommitRanges

func (r *CommitRanges) AppendCommitRanges(ranges []CommitRange)

func (*CommitRanges) AppendMessages

func (r *CommitRanges) AppendMessages(messages ...PublicMessage)

func (*CommitRanges) GetCommitsInfo

func (r *CommitRanges) GetCommitsInfo() []trace.TopicReaderStreamCommitInfo

GetCommitsInfo implements trace.TopicReaderStreamSendCommitMessageStartMessageInfo

func (*CommitRanges) Len

func (r *CommitRanges) Len() int

func (*CommitRanges) Optimize

func (r *CommitRanges) Optimize()

func (*CommitRanges) Reset

func (r *CommitRanges) Reset()

func (*CommitRanges) ToPartitionsOffsets

func (r *CommitRanges) ToPartitionsOffsets() []rawtopicreader.PartitionCommitOffset

type DecoderMap

type DecoderMap struct {
	// contains filtered or unexported fields
}

func NewDecoderMap

func NewDecoderMap() DecoderMap

func (*DecoderMap) AddDecoder

func (m *DecoderMap) AddDecoder(codec rawtopiccommon.Codec, createFunc PublicCreateDecoderFunc)

func (*DecoderMap) Decode

func (m *DecoderMap) Decode(codec rawtopiccommon.Codec, input io.Reader) (io.Reader, error)

type PartitionSession

type PartitionSession struct {
	Topic       string
	PartitionID int64

	ReaderID int64

	StreamPartitionSessionID rawtopicreader.PartitionSessionID
	ClientPartitionSessionID int64
	// contains filtered or unexported fields
}

func BatchGetPartitionSession

func BatchGetPartitionSession(item *PublicBatch) *PartitionSession

func NewPartitionSession

func NewPartitionSession(
	partitionContext context.Context,
	topic string,
	partitionID int64,
	readerID int64,
	connectionID string,
	partitionSessionID rawtopicreader.PartitionSessionID,
	clientPartitionSessionID int64,
	committedOffset rawtopiccommon.Offset,
) *PartitionSession

func (*PartitionSession) Close

func (s *PartitionSession) Close()

func (*PartitionSession) CommittedOffset

func (s *PartitionSession) CommittedOffset() rawtopiccommon.Offset

func (*PartitionSession) Context

func (s *PartitionSession) Context() context.Context

func (*PartitionSession) LastReceivedMessageOffset

func (s *PartitionSession) LastReceivedMessageOffset() rawtopiccommon.Offset

func (*PartitionSession) SetCommittedOffset

func (s *PartitionSession) SetCommittedOffset(v rawtopiccommon.Offset)

func (*PartitionSession) SetContext

func (s *PartitionSession) SetContext(ctx context.Context)

func (*PartitionSession) SetLastReceivedMessageOffset

func (s *PartitionSession) SetLastReceivedMessageOffset(v rawtopiccommon.Offset)

func (*PartitionSession) ToPublic

type PartitionSessionStorage

type PartitionSessionStorage struct {
	// contains filtered or unexported fields
}

func (*PartitionSessionStorage) Add

func (*PartitionSessionStorage) Get

func (*PartitionSessionStorage) GetAll

func (*PartitionSessionStorage) Remove

type Pool

type Pool interface {
	Get() interface{}
	Put(x interface{})
}

Pool is interface for sync.Pool and may be extended by follow to original type

type PublicBatch

type PublicBatch struct {
	empty.DoNotCopy

	Messages []*PublicMessage
	// contains filtered or unexported fields
}

PublicBatch is ordered group of message from one partition

func BatchAppend

func BatchAppend(original, appended *PublicBatch) (*PublicBatch, error)

func BatchCutMessages

func BatchCutMessages(b *PublicBatch, count int) (head, rest *PublicBatch)

func BatchSetCommitRangeForTest

func BatchSetCommitRangeForTest(b *PublicBatch, commitRange CommitRange) *PublicBatch

func NewBatch

func NewBatch(session *PartitionSession, messages []*PublicMessage) (*PublicBatch, error)

func NewBatchFromStream

func NewBatchFromStream(
	decoders DecoderMap,
	session *PartitionSession,
	sb rawtopicreader.Batch,
) (*PublicBatch, error)

func ReadRawBatchesToPublicBatches

func ReadRawBatchesToPublicBatches(
	msg *rawtopicreader.ReadResponse,
	sessions *PartitionSessionStorage,
	decoders DecoderMap,
) ([]*PublicBatch, error)

func (*PublicBatch) Context

func (m *PublicBatch) Context() context.Context

Context is cancelled when code should stop to process messages batch for example - lost connection to server or receive stop partition signal without graceful flag

func (*PublicBatch) PartitionID

func (m *PublicBatch) PartitionID() int64

PartitionID of messages in the batch

func (*PublicBatch) Topic

func (m *PublicBatch) Topic() string

Topic is path of source topic of the messages in the batch

type PublicCommitRange

type PublicCommitRange struct {
	// contains filtered or unexported fields
}

PublicCommitRange contains data for commit messages range

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

type PublicCommitRangeGetter

type PublicCommitRangeGetter interface {
	// contains filtered or unexported methods
}

PublicCommitRangeGetter return data piece for commit messages range

type PublicCreateDecoderFunc

type PublicCreateDecoderFunc func(input io.Reader) (io.Reader, error)

type PublicMessage

type PublicMessage struct {
	empty.DoNotCopy

	SeqNo                int64
	CreatedAt            time.Time
	MessageGroupID       string
	WriteSessionMetadata map[string]string
	Offset               int64
	WrittenAt            time.Time
	ProducerID           string
	Metadata             map[string][]byte // Metadata, nil if no metadata

	UncompressedSize int // as sent by sender, server/sdk doesn't check the field. It may be empty or wrong.
	// contains filtered or unexported fields
}

PublicMessage is representation of topic message

func MessageWithSetCommitRangeForTest

func MessageWithSetCommitRangeForTest(m *PublicMessage, commitRange CommitRange) *PublicMessage

func (*PublicMessage) Context

func (m *PublicMessage) Context() context.Context

func (*PublicMessage) PartitionID

func (m *PublicMessage) PartitionID() int64

func (*PublicMessage) Read

func (m *PublicMessage) Read(p []byte) (n int, err error)

Read implements io.Reader Read uncompressed message content return topicreader.UnexpectedCodec if message compressed with unknown codec

func (*PublicMessage) Topic

func (m *PublicMessage) Topic() string

func (*PublicMessage) UnmarshalTo

UnmarshalTo can call most once per message, it read all data from internal reader and call PublicMessageContentUnmarshaler.UnmarshalYDBTopicMessage with uncompressed content

type PublicMessageBuilder

type PublicMessageBuilder struct {
	// contains filtered or unexported fields
}

func NewPublicMessageBuilder

func NewPublicMessageBuilder() *PublicMessageBuilder

func (*PublicMessageBuilder) Build

func (pmb *PublicMessageBuilder) Build() *PublicMessage

Build return builded message and reset internal state for create new message

func (*PublicMessageBuilder) CommitRange

func (*PublicMessageBuilder) Context

Context set message Context

func (*PublicMessageBuilder) CreatedAt

func (pmb *PublicMessageBuilder) CreatedAt(createdAt time.Time) *PublicMessageBuilder

CreatedAt set message CreatedAt

func (*PublicMessageBuilder) DataAndUncompressedSize

func (pmb *PublicMessageBuilder) DataAndUncompressedSize(data []byte) *PublicMessageBuilder

DataAndUncompressedSize set message uncompressed content and field UncompressedSize

func (*PublicMessageBuilder) MessageGroupID

func (pmb *PublicMessageBuilder) MessageGroupID(messageGroupID string) *PublicMessageBuilder

MessageGroupID set message MessageGroupID

func (*PublicMessageBuilder) Metadata

func (pmb *PublicMessageBuilder) Metadata(metadata map[string][]byte) *PublicMessageBuilder

func (*PublicMessageBuilder) Offset

func (pmb *PublicMessageBuilder) Offset(offset int64) *PublicMessageBuilder

Offset set message Offset

func (*PublicMessageBuilder) PartitionID

func (pmb *PublicMessageBuilder) PartitionID(partitionID int64) *PublicMessageBuilder

PartitionID set message PartitionID

func (*PublicMessageBuilder) ProducerID

func (pmb *PublicMessageBuilder) ProducerID(producerID string) *PublicMessageBuilder

ProducerID set message ProducerID

func (*PublicMessageBuilder) RawDataLen

func (pmb *PublicMessageBuilder) RawDataLen(val int) *PublicMessageBuilder

func (*PublicMessageBuilder) Seqno

func (pmb *PublicMessageBuilder) Seqno(seqNo int64) *PublicMessageBuilder

Seqno set message Seqno

func (*PublicMessageBuilder) Topic

Topic set message Topic

func (*PublicMessageBuilder) UncompressedSize

func (pmb *PublicMessageBuilder) UncompressedSize(uncompressedSize int) *PublicMessageBuilder

UncompressedSize set message UncompressedSize

func (*PublicMessageBuilder) WriteSessionMetadata

func (pmb *PublicMessageBuilder) WriteSessionMetadata(writeSessionMetadata map[string]string) *PublicMessageBuilder

WriteSessionMetadata set message WriteSessionMetadata

func (*PublicMessageBuilder) WrittenAt

func (pmb *PublicMessageBuilder) WrittenAt(writtenAt time.Time) *PublicMessageBuilder

WrittenAt set message WrittenAt

type PublicMessageContentUnmarshaler

type PublicMessageContentUnmarshaler interface {
	// UnmarshalYDBTopicMessage MUST NOT use data after return.
	// If you need content after return from Consume - copy data content to
	// own slice with copy(dst, data)
	UnmarshalYDBTopicMessage(data []byte) error
}

PublicMessageContentUnmarshaler is interface for unmarshal message content

type PublicPartitionSession

type PublicPartitionSession struct {
	// PartitionSessionID is unique session ID per listener object
	PartitionSessionID int64

	// TopicPath contains path for the topic
	TopicPath string

	// PartitionID contains partition id. It can be repeated for one reader if the partition will stop/start few times.
	PartitionID int64
}

PublicPartitionSession contains information about partition session for the event

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

type PublicReadSelector

type PublicReadSelector struct {
	Path       string
	Partitions []int64
	ReadFrom   time.Time     // zero value mean skip read from filter
	MaxTimeLag time.Duration // 0 mean skip time lag filter
}

func (PublicReadSelector) Clone

Clone create deep clone of the selector

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL