topicreaderinternal

package
v3.69.0-rc0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 29, 2024 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrCommitDisabled             = xerrors.Wrap(errors.New("ydb: commits disabled"))
	ErrWrongCommitOrderInSyncMode = xerrors.Wrap(errors.New("ydb: wrong commit order in sync mode"))
)
View Source
var ErrPublicUnexpectedCodec = errors.New("unexpected codec")

ErrPublicUnexpectedCodec return when try to read message content with unknown codec

View Source
var (
	PublicErrCommitSessionToExpiredSession = xerrors.Wrap(errors.New("ydb: commit to expired session"))
)

Functions

This section is empty.

Types

type CallbackWithMessageContentFunc

type CallbackWithMessageContentFunc func(data []byte) error

CallbackWithMessageContentFunc is callback function for work with message content data bytes MUST NOT be used after f returned if you need content longer - copy content to other slice

type CommitRanges

type CommitRanges struct {
	// contains filtered or unexported fields
}

func NewCommitRangesFromPublicCommits

func NewCommitRangesFromPublicCommits(ranges []PublicCommitRange) CommitRanges

func NewCommitRangesWithCapacity

func NewCommitRangesWithCapacity(capacity int) CommitRanges

func (*CommitRanges) Append

func (r *CommitRanges) Append(ranges ...PublicCommitRangeGetter)

func (*CommitRanges) AppendMessages

func (r *CommitRanges) AppendMessages(messages ...PublicMessage)

func (*CommitRanges) GetCommitsInfo added in v3.52.0

func (r *CommitRanges) GetCommitsInfo() []trace.TopicReaderStreamCommitInfo

GetCommitsInfo implements trace.TopicReaderStreamSendCommitMessageStartMessageInfo

func (*CommitRanges) Reset

func (r *CommitRanges) Reset()

type Pool

type Pool interface {
	Get() interface{}
	Put(x interface{})
}

Pool is interface for sync.Pool and may be extended by follow to original type

type PublicBatch

type PublicBatch struct {
	empty.DoNotCopy

	Messages []*PublicMessage
	// contains filtered or unexported fields
}

PublicBatch is ordered group of message from one partition

func (*PublicBatch) Context

func (m *PublicBatch) Context() context.Context

Context is cancelled when code should stop to process messages batch for example - lost connection to server or receive stop partition signal without graceful flag

func (*PublicBatch) PartitionID

func (m *PublicBatch) PartitionID() int64

PartitionID of messages in the batch

func (*PublicBatch) Topic

func (m *PublicBatch) Topic() string

Topic is path of source topic of the messages in the batch

type PublicCommitMode

type PublicCommitMode int
const (
	CommitModeAsync PublicCommitMode = iota // default
	CommitModeNone
	CommitModeSync
)

type PublicCommitRange

type PublicCommitRange struct {
	// contains filtered or unexported fields
}

PublicCommitRange contains data for commit messages range

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

type PublicCommitRangeGetter

type PublicCommitRangeGetter interface {
	// contains filtered or unexported methods
}

PublicCommitRangeGetter return data piece for commit messages range

type PublicCreateDecoderFunc

type PublicCreateDecoderFunc func(input io.Reader) (io.Reader, error)

type PublicGetPartitionStartOffsetFunc

type PublicGetPartitionStartOffsetFunc func(
	ctx context.Context,
	req PublicGetPartitionStartOffsetRequest,
) (res PublicGetPartitionStartOffsetResponse, err error)

PublicGetPartitionStartOffsetFunc callback function for optional manage read progress store at own side

type PublicGetPartitionStartOffsetRequest

type PublicGetPartitionStartOffsetRequest struct {
	Topic       string
	PartitionID int64
}

PublicGetPartitionStartOffsetRequest info about partition

type PublicGetPartitionStartOffsetResponse

type PublicGetPartitionStartOffsetResponse struct {
	// contains filtered or unexported fields
}

PublicGetPartitionStartOffsetResponse allow to set start offset for read messages for the partition

func (*PublicGetPartitionStartOffsetResponse) StartFrom

func (r *PublicGetPartitionStartOffsetResponse) StartFrom(offset int64)

StartFrom set start offset for read the partition

type PublicMessage

type PublicMessage struct {
	empty.DoNotCopy

	SeqNo                int64
	CreatedAt            time.Time
	MessageGroupID       string
	WriteSessionMetadata map[string]string
	Offset               int64
	WrittenAt            time.Time
	ProducerID           string
	Metadata             map[string][]byte // Metadata, nil if no metadata

	UncompressedSize int // as sent by sender, server/sdk doesn't check the field. It may be empty or wrong.
	// contains filtered or unexported fields
}

PublicMessage is representation of topic message

func (*PublicMessage) Context

func (m *PublicMessage) Context() context.Context

func (*PublicMessage) PartitionID

func (m *PublicMessage) PartitionID() int64

func (*PublicMessage) Read

func (m *PublicMessage) Read(p []byte) (n int, err error)

Read implements io.Reader Read uncompressed message content return topicreader.UnexpectedCodec if message compressed with unknown codec

func (*PublicMessage) Topic

func (m *PublicMessage) Topic() string

func (*PublicMessage) UnmarshalTo

UnmarshalTo can call most once per message, it read all data from internal reader and call PublicMessageContentUnmarshaler.UnmarshalYDBTopicMessage with uncompressed content

type PublicMessageBuilder added in v3.48.6

type PublicMessageBuilder struct {
	// contains filtered or unexported fields
}

func NewPublicMessageBuilder added in v3.48.6

func NewPublicMessageBuilder() *PublicMessageBuilder

func (*PublicMessageBuilder) Build added in v3.48.6

func (pmb *PublicMessageBuilder) Build() *PublicMessage

Build return builded message and reset internal state for create new message

func (*PublicMessageBuilder) Context added in v3.48.7

func (pmb *PublicMessageBuilder) Context(ctx context.Context)

Context set message Context

func (*PublicMessageBuilder) CreatedAt added in v3.48.6

func (pmb *PublicMessageBuilder) CreatedAt(createdAt time.Time) *PublicMessageBuilder

CreatedAt set message CreatedAt

func (*PublicMessageBuilder) DataAndUncompressedSize added in v3.48.6

func (pmb *PublicMessageBuilder) DataAndUncompressedSize(data []byte) *PublicMessageBuilder

DataAndUncompressedSize set message uncompressed content and field UncompressedSize

func (*PublicMessageBuilder) MessageGroupID added in v3.48.6

func (pmb *PublicMessageBuilder) MessageGroupID(messageGroupID string) *PublicMessageBuilder

MessageGroupID set message MessageGroupID

func (*PublicMessageBuilder) Metadata added in v3.54.3

func (pmb *PublicMessageBuilder) Metadata(metadata map[string][]byte) *PublicMessageBuilder

func (*PublicMessageBuilder) Offset added in v3.48.6

func (pmb *PublicMessageBuilder) Offset(offset int64) *PublicMessageBuilder

Offset set message Offset

func (*PublicMessageBuilder) PartitionID added in v3.48.7

func (pmb *PublicMessageBuilder) PartitionID(partitionID int64)

PartitionID set message PartitionID

func (*PublicMessageBuilder) ProducerID added in v3.48.6

func (pmb *PublicMessageBuilder) ProducerID(producerID string) *PublicMessageBuilder

ProducerID set message ProducerID

func (*PublicMessageBuilder) Seqno added in v3.48.6

func (pmb *PublicMessageBuilder) Seqno(seqNo int64) *PublicMessageBuilder

Seqno set message Seqno

func (*PublicMessageBuilder) Topic added in v3.48.7

func (pmb *PublicMessageBuilder) Topic(topic string)

Topic set message Topic

func (*PublicMessageBuilder) UncompressedSize added in v3.48.6

func (pmb *PublicMessageBuilder) UncompressedSize(uncompressedSize int) *PublicMessageBuilder

UncompressedSize set message UncompressedSize

func (*PublicMessageBuilder) WriteSessionMetadata added in v3.48.6

func (pmb *PublicMessageBuilder) WriteSessionMetadata(writeSessionMetadata map[string]string) *PublicMessageBuilder

WriteSessionMetadata set message WriteSessionMetadata

func (*PublicMessageBuilder) WrittenAt added in v3.48.6

func (pmb *PublicMessageBuilder) WrittenAt(writtenAt time.Time) *PublicMessageBuilder

WrittenAt set message WrittenAt

type PublicMessageContentUnmarshaler

type PublicMessageContentUnmarshaler interface {
	// UnmarshalYDBTopicMessage MUST NOT use data after return.
	// If you need content after return from Consume - copy data content to
	// own slice with copy(dst, data)
	UnmarshalYDBTopicMessage(data []byte) error
}

PublicMessageContentUnmarshaler is interface for unmarshal message content

type PublicReadBatchOption

type PublicReadBatchOption interface {
	Apply(options ReadMessageBatchOptions) ReadMessageBatchOptions
}

PublicReadBatchOption для различных пожеланий к батчу вроде WithMaxMessages(int)

type PublicReadSelector

type PublicReadSelector struct {
	Path       string
	Partitions []int64
	ReadFrom   time.Time     // zero value mean skip read from filter
	MaxTimeLag time.Duration // 0 mean skip time lag filter
}

func (PublicReadSelector) Clone

Clone create deep clone of the selector

type PublicReaderOption

type PublicReaderOption func(cfg *ReaderConfig)

func WithCredentials added in v3.37.8

func WithCredentials(cred credentials.Credentials) PublicReaderOption

func WithTrace added in v3.38.0

func WithTrace(tracer *trace.Topic) PublicReaderOption

type RawTopicReaderStream

type RawTopicReaderStream interface {
	Recv() (rawtopicreader.ServerMessage, error)
	Send(msg rawtopicreader.ClientMessage) error
	CloseSend() error
}

type ReadMessageBatchOptions

type ReadMessageBatchOptions struct {
	// contains filtered or unexported fields
}

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

func NewReader

func NewReader(
	connector TopicSteamReaderConnect,
	consumer string,
	readSelectors []PublicReadSelector,
	opts ...PublicReaderOption,
) (Reader, error)

func (*Reader) Close

func (r *Reader) Close(ctx context.Context) error

func (*Reader) Commit

func (r *Reader) Commit(ctx context.Context, offsets PublicCommitRangeGetter) (err error)

func (*Reader) CommitRanges

func (r *Reader) CommitRanges(ctx context.Context, ranges []PublicCommitRange) error

func (*Reader) ID added in v3.52.0

func (r *Reader) ID() int64

func (*Reader) ReadMessage

func (r *Reader) ReadMessage(ctx context.Context) (*PublicMessage, error)

ReadMessage read exactly one message

func (*Reader) ReadMessageBatch

func (r *Reader) ReadMessageBatch(ctx context.Context, opts ...PublicReadBatchOption) (batch *PublicBatch, err error)

ReadMessageBatch read batch of messages. Batch is collection of messages, which can be atomically committed

func (*Reader) Tracer added in v3.52.0

func (r *Reader) Tracer() *trace.Topic

func (*Reader) WaitInit added in v3.52.3

func (r *Reader) WaitInit(ctx context.Context) error

type ReaderConfig

type ReaderConfig struct {
	config.Common

	RetrySettings      topic.RetrySettings
	DefaultBatchConfig ReadMessageBatchOptions
	// contains filtered or unexported fields
}

func (*ReaderConfig) Validate added in v3.68.0

func (c *ReaderConfig) Validate() error

type TopicSteamReaderConnect

type TopicSteamReaderConnect func(connectionCtx context.Context) (RawTopicReaderStream, error)

TopicSteamReaderConnect connect to grpc stream when connectionCtx closed stream must stop work and return errors for all methods

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL