topicwriterinternal

package
v3.80.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2024 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	PublicErrQueueIsFull = xerrors.Wrap(errors.New("ydb: queue is full"))
)

Functions

This section is empty.

Types

type ConnectFunc

type ConnectFunc func(ctx context.Context) (RawTopicWriterStream, error)

type EncoderMap

type EncoderMap struct {
	// contains filtered or unexported fields
}

func NewEncoderMap

func NewEncoderMap() *EncoderMap

func (*EncoderMap) AddEncoder

func (e *EncoderMap) AddEncoder(codec rawtopiccommon.Codec, creator PublicCreateEncoderFunc)

func (*EncoderMap) CreateLazyEncodeWriter

func (e *EncoderMap) CreateLazyEncodeWriter(codec rawtopiccommon.Codec, target io.Writer) (io.WriteCloser, error)

func (*EncoderMap) GetSupportedCodecs

func (e *EncoderMap) GetSupportedCodecs() rawtopiccommon.SupportedCodecs

func (*EncoderMap) IsSupported

func (e *EncoderMap) IsSupported(codec rawtopiccommon.Codec) bool

type EncoderSelector

type EncoderSelector struct {
	// contains filtered or unexported fields
}

EncoderSelector not thread safe

func NewEncoderSelector

func NewEncoderSelector(
	m *EncoderMap,
	allowedCodecs rawtopiccommon.SupportedCodecs,
	parallelCompressors int,
	tracer *trace.Topic,
	writerReconnectorID, sessionID string,
) EncoderSelector

func (*EncoderSelector) CompressMessages

func (s *EncoderSelector) CompressMessages(messages []messageWithDataContent) (rawtopiccommon.Codec, error)

func (*EncoderSelector) ResetAllowedCodecs

func (s *EncoderSelector) ResetAllowedCodecs(allowedCodecs rawtopiccommon.SupportedCodecs)

type InitialInfo

type InitialInfo struct {
	LastSeqNum int64
}

type MessageQueueAckWaiter

type MessageQueueAckWaiter struct {
	// contains filtered or unexported fields
}

func (*MessageQueueAckWaiter) AddWaitIndex

func (m *MessageQueueAckWaiter) AddWaitIndex(index int)

type PublicCreateEncoderFunc

type PublicCreateEncoderFunc func(writer io.Writer) (io.WriteCloser, error)

type PublicFuturePartitioning

type PublicFuturePartitioning struct {
	// contains filtered or unexported fields
}

PublicFuturePartitioning will be published in feature, after server implementation completed.

func NewPartitioningWithMessageGroupID

func NewPartitioningWithMessageGroupID(id string) PublicFuturePartitioning

func NewPartitioningWithPartitionID

func NewPartitioningWithPartitionID(id int64) PublicFuturePartitioning

func (PublicFuturePartitioning) ToRaw

type PublicMessage

type PublicMessage struct {
	SeqNo     int64
	CreatedAt time.Time
	Data      io.Reader
	Metadata  map[string][]byte
	// contains filtered or unexported fields
}

type PublicOnWriterInitResponseCallback

type PublicOnWriterInitResponseCallback func(info PublicWithOnWriterConnectedInfo) error

type PublicWithOnWriterConnectedInfo

type PublicWithOnWriterConnectedInfo struct {
	LastSeqNo        int64
	SessionID        string
	PartitionID      int64
	CodecsFromServer []topictypes.Codec
	AllowedCodecs    []topictypes.Codec // Intersection between codecs from server and codecs, supported by writer
}

type PublicWriterOption

type PublicWriterOption func(cfg *WriterReconnectorConfig)

func WithAddEncoder

func WithAddEncoder(codec rawtopiccommon.Codec, encoderFunc PublicCreateEncoderFunc) PublicWriterOption

func WithAutoCodec

func WithAutoCodec() PublicWriterOption

func WithAutoSetSeqNo

func WithAutoSetSeqNo(val bool) PublicWriterOption

func WithAutosetCreatedTime

func WithAutosetCreatedTime(enable bool) PublicWriterOption

func WithClock

func WithClock(clock clockwork.Clock) PublicWriterOption

WithClock is private option for tests

func WithCodec

func WithCodec(codec rawtopiccommon.Codec) PublicWriterOption

func WithCompressorCount

func WithCompressorCount(num int) PublicWriterOption

func WithConnectFunc

func WithConnectFunc(connect ConnectFunc) PublicWriterOption

func WithConnectTimeout

func WithConnectTimeout(timeout time.Duration) PublicWriterOption

func WithCredentials

func WithCredentials(cred credentials.Credentials) PublicWriterOption

WithCredentials for internal usage only no proxy to public interface

func WithMaxQueueLen

func WithMaxQueueLen(num int) PublicWriterOption

func WithPartitioning

func WithPartitioning(partitioning PublicFuturePartitioning) PublicWriterOption

func WithProducerID

func WithProducerID(producerID string) PublicWriterOption

func WithSessionMeta

func WithSessionMeta(meta map[string]string) PublicWriterOption

func WithStartTimeout

func WithStartTimeout(timeout time.Duration) PublicWriterOption

func WithTokenUpdateInterval

func WithTokenUpdateInterval(interval time.Duration) PublicWriterOption

func WithTopic

func WithTopic(topic string) PublicWriterOption

func WithTrace

func WithTrace(tracer *trace.Topic) PublicWriterOption

func WithWaitAckOnWrite

func WithWaitAckOnWrite(val bool) PublicWriterOption

type RawTopicWriterStream

type RawTopicWriterStream interface {
	Recv() (rawtopicwriter.ServerMessage, error)
	Send(mess rawtopicwriter.ClientMessage) error
	CloseSend() error
}

type SingleStreamWriter

type SingleStreamWriter struct {
	Encoder EncoderSelector

	CodecsFromServer rawtopiccommon.SupportedCodecs

	SessionID string

	ReceivedLastSeqNum int64
	PartitionID        int64

	LastSeqNumRequested bool
	// contains filtered or unexported fields
}

func NewSingleStreamWriter

func NewSingleStreamWriter(
	ctxForPProfLabelsOnly context.Context,
	cfg SingleStreamWriterConfig,
) (*SingleStreamWriter, error)

func (*SingleStreamWriter) WaitClose

func (w *SingleStreamWriter) WaitClose(ctx context.Context) error

type SingleStreamWriterConfig

type SingleStreamWriterConfig struct {
	WritersCommonConfig
	// contains filtered or unexported fields
}

type StreamWriter

type StreamWriter interface {
	Write(ctx context.Context, messages []PublicMessage) error
	WaitInit(ctx context.Context) (info InitialInfo, err error)
	Close(ctx context.Context) error
	Flush(ctx context.Context) error
}

type WriterReconnector

type WriterReconnector struct {
	// contains filtered or unexported fields
}

func NewWriterReconnector

func NewWriterReconnector(
	cfg WriterReconnectorConfig,
) (*WriterReconnector, error)

func (*WriterReconnector) Close

func (w *WriterReconnector) Close(ctx context.Context) error

func (*WriterReconnector) Flush

func (w *WriterReconnector) Flush(ctx context.Context) error

func (*WriterReconnector) GetSessionID

func (w *WriterReconnector) GetSessionID() (sessionID string)

func (*WriterReconnector) WaitInit

func (w *WriterReconnector) WaitInit(ctx context.Context) (info InitialInfo, err error)

func (*WriterReconnector) Write

func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage) error

type WriterReconnectorConfig

type WriterReconnectorConfig struct {
	WritersCommonConfig

	MaxMessageSize               int
	MaxQueueLen                  int
	Common                       config.Common
	AdditionalEncoders           map[rawtopiccommon.Codec]PublicCreateEncoderFunc
	Connect                      ConnectFunc
	WaitServerAck                bool
	AutoSetSeqNo                 bool
	AutoSetCreatedTime           bool
	OnWriterInitResponseCallback PublicOnWriterInitResponseCallback
	RetrySettings                topic.RetrySettings
	// contains filtered or unexported fields
}

func NewWriterReconnectorConfig

func NewWriterReconnectorConfig(options ...PublicWriterOption) WriterReconnectorConfig

type WriterWithTransaction

type WriterWithTransaction struct {
	// contains filtered or unexported fields
}

func NewTopicWriterTransaction

func NewTopicWriterTransaction(w *WriterReconnector, tx tx.Transaction, tracer *trace.Topic) *WriterWithTransaction

func (*WriterWithTransaction) Close

func (*WriterWithTransaction) WaitInit

func (w *WriterWithTransaction) WaitInit(ctx context.Context) (info InitialInfo, err error)

func (*WriterWithTransaction) Write

func (w *WriterWithTransaction) Write(ctx context.Context, messages ...PublicMessage) error

type WritersCommonConfig

type WritersCommonConfig struct {
	Tracer *trace.Topic
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL