topicreaderinternal

package
v3.54.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2023 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrCommitDisabled             = xerrors.Wrap(errors.New("ydb: commits disabled"))
	ErrWrongCommitOrderInSyncMode = xerrors.Wrap(errors.New("ydb: wrong commit order in sync mode"))
)
View Source
var (
	PublicErrCommitSessionToExpiredSession = xerrors.Wrap(errors.New("ydb: commit to expired session"))
)
View Source
var PublicErrUnexpectedCodec = errors.New("unexpected codec") //nolint:revive,stylecheck

PublicErrUnexpectedCodec return when try to read message content with unknown codec

Functions

This section is empty.

Types

type CallbackWithMessageContentFunc

type CallbackWithMessageContentFunc func(data []byte) error

CallbackWithMessageContentFunc is callback function for work with message content data bytes MUST NOT be used after f returned if you need content longer - copy content to other slice

type CommitRanges

type CommitRanges struct {
	// contains filtered or unexported fields
}

func NewCommitRangesFromPublicCommits

func NewCommitRangesFromPublicCommits(ranges []PublicCommitRange) CommitRanges

func NewCommitRangesWithCapacity

func NewCommitRangesWithCapacity(capacity int) CommitRanges

func (*CommitRanges) Append

func (r *CommitRanges) Append(ranges ...PublicCommitRangeGetter)

func (*CommitRanges) AppendMessages

func (r *CommitRanges) AppendMessages(messages ...PublicMessage)

func (*CommitRanges) GetCommitsInfo added in v3.52.0

func (r *CommitRanges) GetCommitsInfo() []trace.TopicReaderStreamCommitInfo

GetCommitsInfo implements trace.TopicReaderStreamSendCommitMessageStartMessageInfo

func (*CommitRanges) Reset

func (r *CommitRanges) Reset()

type Pool

type Pool interface {
	Get() interface{}
	Put(x interface{})
}

Pool is interface for sync.Pool and may be extended by follow to original type

type PublicBatch

type PublicBatch struct {
	empty.DoNotCopy

	Messages []*PublicMessage
	// contains filtered or unexported fields
}

PublicBatch is ordered group of message from one partition

func (*PublicBatch) Context

func (m *PublicBatch) Context() context.Context

Context is cancelled when code should stop to process messages batch for example - lost connection to server or receive stop partition signal without graceful flag

func (*PublicBatch) PartitionID

func (m *PublicBatch) PartitionID() int64

PartitionID of messages in the batch

func (*PublicBatch) Topic

func (m *PublicBatch) Topic() string

Topic is path of source topic of the messages in the batch

type PublicCommitMode

type PublicCommitMode int
const (
	CommitModeAsync PublicCommitMode = iota // default
	CommitModeNone
	CommitModeSync
)

type PublicCommitRange

type PublicCommitRange struct {
	// contains filtered or unexported fields
}

PublicCommitRange contains data for commit messages range

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

type PublicCommitRangeGetter

type PublicCommitRangeGetter interface {
	// contains filtered or unexported methods
}

PublicCommitRangeGetter return data piece for commit messages range

type PublicCreateDecoderFunc

type PublicCreateDecoderFunc func(input io.Reader) (io.Reader, error)

type PublicGetPartitionStartOffsetFunc

type PublicGetPartitionStartOffsetFunc func(
	ctx context.Context,
	req PublicGetPartitionStartOffsetRequest,
) (res PublicGetPartitionStartOffsetResponse, err error)

PublicGetPartitionStartOffsetFunc callback function for optional manage read progress store at own side

type PublicGetPartitionStartOffsetRequest

type PublicGetPartitionStartOffsetRequest struct {
	Topic       string
	PartitionID int64
}

PublicGetPartitionStartOffsetRequest info about partition

type PublicGetPartitionStartOffsetResponse

type PublicGetPartitionStartOffsetResponse struct {
	// contains filtered or unexported fields
}

PublicGetPartitionStartOffsetResponse allow to set start offset for read messages for the partition

func (*PublicGetPartitionStartOffsetResponse) StartFrom

func (r *PublicGetPartitionStartOffsetResponse) StartFrom(offset int64)

StartFrom set start offset for read the partition

type PublicMessage

type PublicMessage struct {
	empty.DoNotCopy

	SeqNo                int64
	CreatedAt            time.Time
	MessageGroupID       string
	WriteSessionMetadata map[string]string
	Offset               int64
	WrittenAt            time.Time
	ProducerID           string

	UncompressedSize int // as sent by sender, server/sdk doesn't check the field. It may be empty or wrong.
	// contains filtered or unexported fields
}

PublicMessage is representation of topic message

func (*PublicMessage) Context

func (m *PublicMessage) Context() context.Context

func (*PublicMessage) PartitionID

func (m *PublicMessage) PartitionID() int64

func (*PublicMessage) Read

func (m *PublicMessage) Read(p []byte) (n int, err error)

Read implements io.Reader Read uncompressed message content return topicreader.UnexpectedCodec if message compressed with unknown codec

func (*PublicMessage) Topic

func (m *PublicMessage) Topic() string

func (*PublicMessage) UnmarshalTo

UnmarshalTo can call most once per message, it read all data from internal reader and call PublicMessageContentUnmarshaler.UnmarshalYDBTopicMessage with uncompressed content

type PublicMessageBuilder added in v3.48.6

type PublicMessageBuilder struct {
	// contains filtered or unexported fields
}

func NewPublicMessageBuilder added in v3.48.6

func NewPublicMessageBuilder() *PublicMessageBuilder

func (*PublicMessageBuilder) Build added in v3.48.6

func (pmb *PublicMessageBuilder) Build() *PublicMessage

Build return builded message and reset internal state for create new message

func (*PublicMessageBuilder) Context added in v3.48.7

func (pmb *PublicMessageBuilder) Context(ctx context.Context)

Context set message Context

func (*PublicMessageBuilder) CreatedAt added in v3.48.6

func (pmb *PublicMessageBuilder) CreatedAt(createdAt time.Time) *PublicMessageBuilder

CreatedAt set message CreatedAt

func (*PublicMessageBuilder) DataAndUncompressedSize added in v3.48.6

func (pmb *PublicMessageBuilder) DataAndUncompressedSize(data []byte) *PublicMessageBuilder

DataAndUncompressedSize set message uncompressed content and field UncompressedSize

func (*PublicMessageBuilder) MessageGroupID added in v3.48.6

func (pmb *PublicMessageBuilder) MessageGroupID(messageGroupID string) *PublicMessageBuilder

MessageGroupID set message MessageGroupID

func (*PublicMessageBuilder) Offset added in v3.48.6

func (pmb *PublicMessageBuilder) Offset(offset int64) *PublicMessageBuilder

Offset set message Offset

func (*PublicMessageBuilder) PartitionID added in v3.48.7

func (pmb *PublicMessageBuilder) PartitionID(partitionID int64)

PartitionID set message PartitionID

func (*PublicMessageBuilder) ProducerID added in v3.48.6

func (pmb *PublicMessageBuilder) ProducerID(producerID string) *PublicMessageBuilder

ProducerID set message ProducerID

func (*PublicMessageBuilder) Seqno added in v3.48.6

func (pmb *PublicMessageBuilder) Seqno(seqNo int64) *PublicMessageBuilder

Seqno set message Seqno

func (*PublicMessageBuilder) Topic added in v3.48.7

func (pmb *PublicMessageBuilder) Topic(topic string)

Topic set message Topic

func (*PublicMessageBuilder) UncompressedSize added in v3.48.6

func (pmb *PublicMessageBuilder) UncompressedSize(uncompressedSize int) *PublicMessageBuilder

UncompressedSize set message UncompressedSize

func (*PublicMessageBuilder) WriteSessionMetadata added in v3.48.6

func (pmb *PublicMessageBuilder) WriteSessionMetadata(writeSessionMetadata map[string]string) *PublicMessageBuilder

WriteSessionMetadata set message WriteSessionMetadata

func (*PublicMessageBuilder) WrittenAt added in v3.48.6

func (pmb *PublicMessageBuilder) WrittenAt(writtenAt time.Time) *PublicMessageBuilder

WrittenAt set message WrittenAt

type PublicMessageContentUnmarshaler

type PublicMessageContentUnmarshaler interface {
	// UnmarshalYDBTopicMessage MUST NOT use data after return.
	// If you need content after return from Consume - copy data content to
	// own slice with copy(dst, data)
	UnmarshalYDBTopicMessage(data []byte) error
}

PublicMessageContentUnmarshaler is interface for unmarshal message content

type PublicReadBatchOption

type PublicReadBatchOption interface {
	Apply(options ReadMessageBatchOptions) ReadMessageBatchOptions
}

PublicReadBatchOption для различных пожеланий к батчу вроде WithMaxMessages(int)

type PublicReadSelector

type PublicReadSelector struct {
	Path       string
	Partitions []int64
	ReadFrom   time.Time     // zero value mean skip read from filter
	MaxTimeLag time.Duration // 0 mean skip time lag filter
}

func (PublicReadSelector) Clone

Clone create deep clone of the selector

type PublicReaderOption

type PublicReaderOption func(cfg *ReaderConfig)

func WithCredentials added in v3.37.8

func WithCredentials(cred credentials.Credentials) PublicReaderOption

func WithTrace added in v3.38.0

func WithTrace(tracer *trace.Topic) PublicReaderOption

type RawTopicReaderStream

type RawTopicReaderStream interface {
	Recv() (rawtopicreader.ServerMessage, error)
	Send(msg rawtopicreader.ClientMessage) error
	CloseSend() error
}

type ReadMessageBatchOptions

type ReadMessageBatchOptions struct {
	// contains filtered or unexported fields
}

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

func NewReader

func NewReader(
	connector TopicSteamReaderConnect,
	consumer string,
	readSelectors []PublicReadSelector,
	opts ...PublicReaderOption,
) Reader

func (*Reader) Close

func (r *Reader) Close(ctx context.Context) error

func (*Reader) Commit

func (r *Reader) Commit(ctx context.Context, offsets PublicCommitRangeGetter) (err error)

func (*Reader) CommitRanges

func (r *Reader) CommitRanges(ctx context.Context, ranges []PublicCommitRange) error

func (*Reader) ID added in v3.52.0

func (r *Reader) ID() int64

func (*Reader) ReadMessage

func (r *Reader) ReadMessage(ctx context.Context) (*PublicMessage, error)

ReadMessage read exactly one message

func (*Reader) ReadMessageBatch

func (r *Reader) ReadMessageBatch(ctx context.Context, opts ...PublicReadBatchOption) (batch *PublicBatch, err error)

ReadMessageBatch read batch of messages. Batch is collection of messages, which can be atomically committed

func (*Reader) Tracer added in v3.52.0

func (r *Reader) Tracer() *trace.Topic

func (*Reader) WaitInit added in v3.52.3

func (r *Reader) WaitInit(ctx context.Context) error

type ReaderConfig

type ReaderConfig struct {
	config.Common

	RetrySettings      topic.RetrySettings
	DefaultBatchConfig ReadMessageBatchOptions
	// contains filtered or unexported fields
}

type TopicSteamReaderConnect

type TopicSteamReaderConnect func(connectionCtx context.Context) (RawTopicReaderStream, error)

TopicSteamReaderConnect connect to grpc stream when connectionCtx closed stream must stop work and return errors for all methods

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL