topicreadercommon

package
v3.81.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2024 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultBufferSize = 1024 * 1024

Variables

View Source
var (
	ErrCommitDisabled             = xerrors.Wrap(errors.New("ydb: commits disabled"))
	ErrWrongCommitOrderInSyncMode = xerrors.Wrap(errors.New("ydb: wrong commit order in sync mode"))
)
View Source
var ErrPublicUnexpectedCodec = xerrors.Wrap(errors.New("ydb: unexpected codec"))

ErrPublicUnexpectedCodec return when try to read message content with unknown codec

View Source
var PublicErrCommitSessionToExpiredSession = xerrors.Wrap(errors.New("ydb: commit to expired session"))

Functions

func BatchIsEmpty

func BatchIsEmpty(b *PublicBatch) bool

func CreateInitMessage

func CreateInitMessage(consumer string, selectors []*PublicReadSelector) *rawtopicreader.InitRequest

func MessageGetBufferBytesAccount

func MessageGetBufferBytesAccount(m *PublicMessage) int

func MessageSetNilDataForTest

func MessageSetNilDataForTest(m *PublicMessage)

func NextReaderID

func NextReaderID() int64

Types

type CallbackWithMessageContentFunc

type CallbackWithMessageContentFunc func(data []byte) error

CallbackWithMessageContentFunc is callback function for work with message content data bytes MUST NOT be used after f returned if you need content longer - copy content to other slice

type CommitRange

type CommitRange struct {
	CommitOffsetStart rawtopiccommon.Offset
	CommitOffsetEnd   rawtopiccommon.Offset
	PartitionSession  *PartitionSession
}

func GetCommitRange

func GetCommitRange(item PublicCommitRangeGetter) CommitRange

type CommitRanges

type CommitRanges struct {
	Ranges []CommitRange
}

func NewCommitRangesFromPublicCommits

func NewCommitRangesFromPublicCommits(ranges []PublicCommitRange) CommitRanges

func NewCommitRangesWithCapacity

func NewCommitRangesWithCapacity(capacity int) CommitRanges

func (*CommitRanges) Append

func (r *CommitRanges) Append(ranges ...PublicCommitRangeGetter)

func (*CommitRanges) AppendCommitRange

func (r *CommitRanges) AppendCommitRange(cr CommitRange)

func (*CommitRanges) AppendCommitRanges

func (r *CommitRanges) AppendCommitRanges(ranges []CommitRange)

func (*CommitRanges) AppendMessages

func (r *CommitRanges) AppendMessages(messages ...PublicMessage)

func (*CommitRanges) GetCommitsInfo

func (r *CommitRanges) GetCommitsInfo() []trace.TopicReaderStreamCommitInfo

GetCommitsInfo implements trace.TopicReaderStreamSendCommitMessageStartMessageInfo

func (*CommitRanges) Len

func (r *CommitRanges) Len() int

func (*CommitRanges) Optimize

func (r *CommitRanges) Optimize()

func (*CommitRanges) Reset

func (r *CommitRanges) Reset()

func (*CommitRanges) ToPartitionsOffsets

func (r *CommitRanges) ToPartitionsOffsets() []rawtopicreader.PartitionCommitOffset

func (*CommitRanges) ToRawMessage added in v3.79.0

func (r *CommitRanges) ToRawMessage() *rawtopicreader.CommitOffsetRequest

type Committer added in v3.79.0

type Committer struct {
	BufferTimeLagTrigger time.Duration // 0 mean no additional time lag
	BufferCountTrigger   int
	// contains filtered or unexported fields
}

func NewCommitterStopped added in v3.79.0

func NewCommitterStopped(
	tracer *trace.Topic,
	lifeContext context.Context,
	mode PublicCommitMode,
	send SendMessageToServerFunc,
) *Committer

func (*Committer) Close added in v3.79.0

func (c *Committer) Close(ctx context.Context, err error) error

func (*Committer) Commit added in v3.79.0

func (c *Committer) Commit(ctx context.Context, commitRange CommitRange) error

func (*Committer) OnCommitNotify added in v3.79.0

func (c *Committer) OnCommitNotify(session *PartitionSession, offset rawtopiccommon.Offset)

func (*Committer) Start added in v3.79.0

func (c *Committer) Start()

type DecoderMap

type DecoderMap struct {
	// contains filtered or unexported fields
}

func NewDecoderMap

func NewDecoderMap() DecoderMap

func (*DecoderMap) AddDecoder

func (m *DecoderMap) AddDecoder(codec rawtopiccommon.Codec, createFunc PublicCreateDecoderFunc)

func (*DecoderMap) Decode

func (m *DecoderMap) Decode(codec rawtopiccommon.Codec, input io.Reader) (io.Reader, error)

type PartitionSession

type PartitionSession struct {
	Topic       string
	PartitionID int64

	ReaderID int64

	StreamPartitionSessionID rawtopicreader.PartitionSessionID
	ClientPartitionSessionID int64
	// contains filtered or unexported fields
}

func BatchGetPartitionSession

func BatchGetPartitionSession(item *PublicBatch) *PartitionSession

func NewPartitionSession

func NewPartitionSession(
	partitionContext context.Context,
	topic string,
	partitionID int64,
	readerID int64,
	connectionID string,
	partitionSessionID rawtopicreader.PartitionSessionID,
	clientPartitionSessionID int64,
	committedOffset rawtopiccommon.Offset,
) *PartitionSession

func (*PartitionSession) Close

func (s *PartitionSession) Close()

func (*PartitionSession) CommittedOffset

func (s *PartitionSession) CommittedOffset() rawtopiccommon.Offset

func (*PartitionSession) Context

func (s *PartitionSession) Context() context.Context

func (*PartitionSession) LastReceivedMessageOffset

func (s *PartitionSession) LastReceivedMessageOffset() rawtopiccommon.Offset

func (*PartitionSession) SetCommittedOffsetForward added in v3.76.4

func (s *PartitionSession) SetCommittedOffsetForward(v rawtopiccommon.Offset)

SetCommittedOffsetForward set new offset if new offset greater, then old

func (*PartitionSession) SetContext

func (s *PartitionSession) SetContext(ctx context.Context)

func (*PartitionSession) SetLastReceivedMessageOffset

func (s *PartitionSession) SetLastReceivedMessageOffset(v rawtopiccommon.Offset)

func (*PartitionSession) ToPublic

type PartitionSessionStorage

type PartitionSessionStorage struct {
	// contains filtered or unexported fields
}

func (*PartitionSessionStorage) Add

func (*PartitionSessionStorage) Get

func (*PartitionSessionStorage) GetAll

func (*PartitionSessionStorage) Remove

type Pool

type Pool interface {
	Get() interface{}
	Put(x interface{})
}

Pool is interface for sync.Pool and may be extended by follow to original type

type PublicBatch

type PublicBatch struct {
	empty.DoNotCopy

	Messages []*PublicMessage
	// contains filtered or unexported fields
}

PublicBatch is ordered group of message from one partition

func BatchAppend

func BatchAppend(original, appended *PublicBatch) (*PublicBatch, error)

func BatchCutMessages

func BatchCutMessages(b *PublicBatch, count int) (head, rest *PublicBatch)

func BatchSetCommitRangeForTest

func BatchSetCommitRangeForTest(b *PublicBatch, commitRange CommitRange) *PublicBatch

func NewBatch

func NewBatch(session *PartitionSession, messages []*PublicMessage) (*PublicBatch, error)

func NewBatchFromStream

func NewBatchFromStream(
	decoders DecoderMap,
	session *PartitionSession,
	sb rawtopicreader.Batch,
) (*PublicBatch, error)

func ReadRawBatchesToPublicBatches

func ReadRawBatchesToPublicBatches(
	msg *rawtopicreader.ReadResponse,
	sessions *PartitionSessionStorage,
	decoders DecoderMap,
) ([]*PublicBatch, error)

func (*PublicBatch) Context

func (m *PublicBatch) Context() context.Context

Context is cancelled when code should stop to process messages batch for example - lost connection to server or receive stop partition signal without graceful flag

func (*PublicBatch) PartitionID

func (m *PublicBatch) PartitionID() int64

PartitionID of messages in the batch

func (*PublicBatch) Topic

func (m *PublicBatch) Topic() string

Topic is path of source topic of the messages in the batch

type PublicCommitMode added in v3.79.0

type PublicCommitMode int
const (
	CommitModeAsync PublicCommitMode = iota // default
	CommitModeNone
	CommitModeSync
)

func (PublicCommitMode) CommitsEnabled added in v3.79.0

func (m PublicCommitMode) CommitsEnabled() bool

type PublicCommitRange

type PublicCommitRange struct {
	// contains filtered or unexported fields
}

PublicCommitRange contains data for commit messages range

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

type PublicCommitRangeGetter

type PublicCommitRangeGetter interface {
	// contains filtered or unexported methods
}

PublicCommitRangeGetter return data piece for commit messages range

type PublicCreateDecoderFunc

type PublicCreateDecoderFunc func(input io.Reader) (io.Reader, error)

type PublicMessage

type PublicMessage struct {
	empty.DoNotCopy

	SeqNo                int64
	CreatedAt            time.Time
	MessageGroupID       string
	WriteSessionMetadata map[string]string
	Offset               int64
	WrittenAt            time.Time
	ProducerID           string
	Metadata             map[string][]byte // Metadata, nil if no metadata

	UncompressedSize int // as sent by sender, server/sdk doesn't check the field. It may be empty or wrong.
	// contains filtered or unexported fields
}

PublicMessage is representation of topic message

func MessageWithSetCommitRangeForTest

func MessageWithSetCommitRangeForTest(m *PublicMessage, commitRange CommitRange) *PublicMessage

func (*PublicMessage) Context

func (m *PublicMessage) Context() context.Context

func (*PublicMessage) PartitionID

func (m *PublicMessage) PartitionID() int64

func (*PublicMessage) Read

func (m *PublicMessage) Read(p []byte) (n int, err error)

Read implements io.Reader Read uncompressed message content return topicreader.UnexpectedCodec if message compressed with unknown codec

Content of the message released from the memory after first read error including io.EOF.

func (*PublicMessage) Topic

func (m *PublicMessage) Topic() string

func (*PublicMessage) UnmarshalTo

UnmarshalTo can call most once per message, it read all data from internal reader and call PublicMessageContentUnmarshaler.UnmarshalYDBTopicMessage with uncompressed content

type PublicMessageBuilder

type PublicMessageBuilder struct {
	// contains filtered or unexported fields
}

func NewPublicMessageBuilder

func NewPublicMessageBuilder() *PublicMessageBuilder

func (*PublicMessageBuilder) Build

func (pmb *PublicMessageBuilder) Build() *PublicMessage

Build return builded message and reset internal state for create new message

func (*PublicMessageBuilder) CommitRange

func (*PublicMessageBuilder) Context

Context set message Context

func (*PublicMessageBuilder) CreatedAt

func (pmb *PublicMessageBuilder) CreatedAt(createdAt time.Time) *PublicMessageBuilder

CreatedAt set message CreatedAt

func (*PublicMessageBuilder) DataAndUncompressedSize

func (pmb *PublicMessageBuilder) DataAndUncompressedSize(data []byte) *PublicMessageBuilder

DataAndUncompressedSize set message uncompressed content and field UncompressedSize

func (*PublicMessageBuilder) MessageGroupID

func (pmb *PublicMessageBuilder) MessageGroupID(messageGroupID string) *PublicMessageBuilder

MessageGroupID set message MessageGroupID

func (*PublicMessageBuilder) Metadata

func (pmb *PublicMessageBuilder) Metadata(metadata map[string][]byte) *PublicMessageBuilder

func (*PublicMessageBuilder) Offset

func (pmb *PublicMessageBuilder) Offset(offset int64) *PublicMessageBuilder

Offset set message Offset

func (*PublicMessageBuilder) PartitionID

func (pmb *PublicMessageBuilder) PartitionID(partitionID int64) *PublicMessageBuilder

PartitionID set message PartitionID

func (*PublicMessageBuilder) PartitionSession added in v3.76.4

func (pmb *PublicMessageBuilder) PartitionSession(session *PartitionSession) *PublicMessageBuilder

func (*PublicMessageBuilder) ProducerID

func (pmb *PublicMessageBuilder) ProducerID(producerID string) *PublicMessageBuilder

ProducerID set message ProducerID

func (*PublicMessageBuilder) RawDataLen

func (pmb *PublicMessageBuilder) RawDataLen(val int) *PublicMessageBuilder

func (*PublicMessageBuilder) Seqno

func (pmb *PublicMessageBuilder) Seqno(seqNo int64) *PublicMessageBuilder

Seqno set message Seqno

func (*PublicMessageBuilder) Topic

Topic set message Topic

func (*PublicMessageBuilder) UncompressedSize

func (pmb *PublicMessageBuilder) UncompressedSize(uncompressedSize int) *PublicMessageBuilder

UncompressedSize set message UncompressedSize

func (*PublicMessageBuilder) WriteSessionMetadata

func (pmb *PublicMessageBuilder) WriteSessionMetadata(writeSessionMetadata map[string]string) *PublicMessageBuilder

WriteSessionMetadata set message WriteSessionMetadata

func (*PublicMessageBuilder) WrittenAt

func (pmb *PublicMessageBuilder) WrittenAt(writtenAt time.Time) *PublicMessageBuilder

WrittenAt set message WrittenAt

type PublicMessageContentUnmarshaler

type PublicMessageContentUnmarshaler interface {
	// UnmarshalYDBTopicMessage MUST NOT use data after return.
	// If you need content after return from Consume - copy data content to
	// own slice with copy(dst, data)
	UnmarshalYDBTopicMessage(data []byte) error
}

PublicMessageContentUnmarshaler is interface for unmarshal message content

type PublicPartitionSession

type PublicPartitionSession struct {
	// PartitionSessionID is unique session ID per listener object
	PartitionSessionID int64

	// TopicPath contains path for the topic
	TopicPath string

	// PartitionID contains partition id. It can be repeated for one reader if the partition will stop/start few times.
	PartitionID int64
}

PublicPartitionSession contains information about partition session for the event

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

type PublicReadSelector

type PublicReadSelector struct {
	Path       string
	Partitions []int64
	ReadFrom   time.Time     // zero value mean skip read from filter
	MaxTimeLag time.Duration // 0 mean skip time lag filter
}

func (PublicReadSelector) Clone

Clone create deep clone of the selector

type RawTopicReaderStream added in v3.79.0

type RawTopicReaderStream interface {
	Recv() (rawtopicreader.ServerMessage, error)
	Send(msg rawtopicreader.ClientMessage) error
	CloseSend() error
}

type SendMessageToServerFunc added in v3.79.0

type SendMessageToServerFunc func(msg rawtopicreader.ClientMessage) error

type SyncedStream added in v3.79.0

type SyncedStream struct {
	// contains filtered or unexported fields
}

func NewSyncedStream added in v3.79.0

func NewSyncedStream(stream RawTopicReaderStream) *SyncedStream

func (*SyncedStream) CloseSend added in v3.79.0

func (s *SyncedStream) CloseSend() error

func (*SyncedStream) Recv added in v3.79.0

func (*SyncedStream) Send added in v3.79.0

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL