Documentation ¶
Index ¶
- Constants
- Variables
- type AlterOption
- func AlterConsumerWithAttributes(name string, attributes map[string]string) AlterOption
- func AlterConsumerWithImportant(name string, important bool) AlterOption
- func AlterConsumerWithReadFrom(name string, readFrom time.Time) AlterOption
- func AlterConsumerWithSupportedCodecs(name string, codecs []topictypes.Codec) AlterOption
- func AlterWithAddConsumers(consumers ...topictypes.Consumer) AlterOption
- func AlterWithAttributes(attributes map[string]string) AlterOption
- func AlterWithDropConsumers(consumersName ...string) AlterOption
- func AlterWithMeteringMode(m topictypes.MeteringMode) AlterOption
- func AlterWithMinActivePartitions(minActivePartitions int64) AlterOption
- func AlterWithPartitionCountLimit(partitionCountLimit int64) AlterOption
- func AlterWithPartitionWriteBurstBytes(burstBytes int64) AlterOption
- func AlterWithPartitionWriteSpeedBytesPerSecond(bytesPerSecond int64) AlterOption
- func AlterWithRetentionPeriod(retentionPeriod time.Duration) AlterOption
- func AlterWithRetentionStorageMB(retentionStorageMB int64) AlterOption
- func AlterWithSupportedCodecs(codecs ...topictypes.Codec) AlterOption
- type CheckErrorRetryArgs
- type CheckErrorRetryFunction
- type CheckErrorRetryResult
- type CommitMode
- type CreateDecoderFunc
- type CreateEncoderFunc
- type CreateOption
- func CreateWithAttributes(attributes map[string]string) CreateOption
- func CreateWithConsumer(consumers ...topictypes.Consumer) CreateOption
- func CreateWithMeteringMode(mode topictypes.MeteringMode) CreateOption
- func CreateWithMinActivePartitions(count int64) CreateOption
- func CreateWithPartitionCountLimit(count int64) CreateOption
- func CreateWithPartitionWriteBurstBytes(partitionWriteBurstBytes int64) CreateOption
- func CreateWithPartitionWriteSpeedBytesPerSecond(partitionWriteSpeedBytesPerSecond int64) CreateOption
- func CreateWithRetentionPeriod(retentionPeriod time.Duration) CreateOption
- func CreateWithRetentionStorageMB(retentionStorageMB int64) CreateOption
- func CreateWithSupportedCodecs(codecs ...topictypes.Codec) CreateOption
- type DescribeOption
- type DropOption
- type GetPartitionStartOffsetFunc
- type GetPartitionStartOffsetRequest
- type GetPartitionStartOffsetResponse
- type OnWriterInitResponseCallback
- type ReadSelector
- type ReadSelectors
- type ReaderOption
- func WithAddDecoder(codec topictypes.Codec, decoderCreate CreateDecoderFunc) ReaderOption
- func WithBatchReadMaxCount(count int) ReaderOption
- func WithBatchReadMinCount(count int) ReaderOptiondeprecated
- func WithCommitCountTrigger(count int) ReaderOption
- func WithCommitMode(mode CommitMode) ReaderOption
- func WithCommitTimeLagTrigger(lag time.Duration) ReaderOption
- func WithCommonConfig(common config.Common) ReaderOption
- func WithGetPartitionStartOffset(f GetPartitionStartOffsetFunc) ReaderOption
- func WithMessagesBufferSize(size int) ReaderOption
- func WithReaderBatchMaxCount(count int) ReaderOption
- func WithReaderBufferSizeBytes(size int) ReaderOption
- func WithReaderCheckRetryErrorFunction(callback CheckErrorRetryFunction) ReaderOption
- func WithReaderCommitCountTrigger(count int) ReaderOption
- func WithReaderCommitMode(mode CommitMode) ReaderOption
- func WithReaderCommitTimeLagTrigger(lag time.Duration) ReaderOption
- func WithReaderGetPartitionStartOffset(f GetPartitionStartOffsetFunc) ReaderOption
- func WithReaderOperationCancelAfter(cancelAfter time.Duration) ReaderOption
- func WithReaderOperationTimeout(timeout time.Duration) ReaderOption
- func WithReaderStartTimeout(timeout time.Duration) ReaderOption
- func WithReaderTrace(t trace.Topic) ReaderOption
- func WithReaderUpdateTokenInterval(interval time.Duration) ReaderOption
- type TopicOption
- type WithOnWriterConnectedInfo
- type WriteSessionMetadata
- type WriterOption
- func WithCodec(codec topictypes.Codec) WriterOption
- func WithCodecAutoSelect() WriterOption
- func WithOnWriterFirstConnected(f OnWriterInitResponseCallback) WriterOption
- func WithPartitionID(partitionID int64) WriterOption
- func WithProducerID(producerID string) WriterOption
- func WithSyncWrite(sync bool) WriterOption
- func WithWriteSessionMeta(meta map[string]string) WriterOption
- func WithWriterAddEncoder(codec topictypes.Codec, f CreateEncoderFunc) WriterOption
- func WithWriterCheckRetryErrorFunction(callback CheckErrorRetryFunction) WriterOption
- func WithWriterCodec(codec topictypes.Codec) WriterOption
- func WithWriterCodecAutoSelect() WriterOption
- func WithWriterCompressorCount(num int) WriterOption
- func WithWriterMaxQueueLen(num int) WriterOption
- func WithWriterMessageMaxBytesSize(size int) WriterOption
- func WithWriterPartitionID(partitionID int64) WriterOption
- func WithWriterProducerID(producerID string) WriterOption
- func WithWriterSessionMeta(meta map[string]string) WriterOption
- func WithWriterSetAutoCreatedAt(val bool) WriterOption
- func WithWriterSetAutoSeqNo(val bool) WriterOption
- func WithWriterStartTimeout(timeout time.Duration) WriterOption
- func WithWriterTrace(t trace.Topic) WriterOption
- func WithWriterUpdateTokenInterval(interval time.Duration) WriterOption
- func WithWriterWaitServerAck(wait bool) WriterOption
Examples ¶
Constants ¶
const ( // CommitModeAsync - commit return true if commit success add to internal send buffer (but not sent to server) // now it is grpc buffer, in feature it may be internal sdk buffer CommitModeAsync = topicreaderinternal.CommitModeAsync // default // CommitModeNone - reader will not be commit operation CommitModeNone = topicreaderinternal.CommitModeNone // CommitModeSync - commit return true when sdk receive ack of commit from server // The mode needs strong ordering client code for prevent deadlock. // Example: // Good: // CommitOffset(1) // CommitOffset(2) // // Error: // CommitOffset(2) - server will wait commit offset 1 before send ack about offset 1 and 2 committed. // CommitOffset(1) // SDK will detect the problem and return error instead of deadlock. CommitModeSync = topicreaderinternal.CommitModeSync )
Variables ¶
var ( CheckErrorRetryDecisionDefault = topic.PublicRetryDecisionDefault // Apply default behavior for the error CheckErrorRetryDecisionRetry = topic.PublicRetryDecisionRetry // Do once more retry CheckErrorRetryDecisionStop = topic.PublicRetryDecisionStop // Do not retry )
Functions ¶
This section is empty.
Types ¶
type AlterOption ¶
type AlterOption interface {
ApplyAlterOption(req *rawtopic.AlterTopicRequest)
}
AlterOption type of options for change topic settings
func AlterConsumerWithAttributes ¶
func AlterConsumerWithAttributes(name string, attributes map[string]string) AlterOption
AlterConsumerWithAttributes change attributes of the consumer
func AlterConsumerWithImportant ¶
func AlterConsumerWithImportant(name string, important bool) AlterOption
AlterConsumerWithImportant set/remove important flag for the consumer of topic
func AlterConsumerWithReadFrom ¶
func AlterConsumerWithReadFrom(name string, readFrom time.Time) AlterOption
AlterConsumerWithReadFrom change min time of messages, received for the topic
func AlterConsumerWithSupportedCodecs ¶
func AlterConsumerWithSupportedCodecs(name string, codecs []topictypes.Codec) AlterOption
AlterConsumerWithSupportedCodecs change codecs, supported by the consumer
func AlterWithAddConsumers ¶
func AlterWithAddConsumers(consumers ...topictypes.Consumer) AlterOption
AlterWithAddConsumers add consumer to the topic
func AlterWithAttributes ¶
func AlterWithAttributes(attributes map[string]string) AlterOption
AlterWithAttributes change attributes map of topic
func AlterWithDropConsumers ¶
func AlterWithDropConsumers(consumersName ...string) AlterOption
AlterWithDropConsumers drop consumer from the topic
func AlterWithMeteringMode ¶ added in v3.38.2
func AlterWithMeteringMode(m topictypes.MeteringMode) AlterOption
AlterWithMeteringMode change metering mode for topic (need for serverless installations)
func AlterWithMinActivePartitions ¶
func AlterWithMinActivePartitions(minActivePartitions int64) AlterOption
AlterWithMinActivePartitions change min active partitions of the topic
func AlterWithPartitionCountLimit ¶
func AlterWithPartitionCountLimit(partitionCountLimit int64) AlterOption
AlterWithPartitionCountLimit change partition count limit of the topic
func AlterWithPartitionWriteBurstBytes ¶
func AlterWithPartitionWriteBurstBytes(burstBytes int64) AlterOption
AlterWithPartitionWriteBurstBytes change burst size for write to partition of topic
func AlterWithPartitionWriteSpeedBytesPerSecond ¶
func AlterWithPartitionWriteSpeedBytesPerSecond(bytesPerSecond int64) AlterOption
AlterWithPartitionWriteSpeedBytesPerSecond change limit of write speed for partitions of the topic
func AlterWithRetentionPeriod ¶
func AlterWithRetentionPeriod(retentionPeriod time.Duration) AlterOption
AlterWithRetentionPeriod change retention period of topic
func AlterWithRetentionStorageMB ¶
func AlterWithRetentionStorageMB(retentionStorageMB int64) AlterOption
AlterWithRetentionStorageMB change retention storage size in MB.
func AlterWithSupportedCodecs ¶
func AlterWithSupportedCodecs(codecs ...topictypes.Codec) AlterOption
AlterWithSupportedCodecs change set of codec, allowed for the topic
type CheckErrorRetryArgs ¶ added in v3.42.0
type CheckErrorRetryArgs = topic.PublicCheckErrorRetryArgs
type CheckErrorRetryFunction ¶ added in v3.42.0
type CheckErrorRetryFunction = topic.PublicCheckErrorRetryFunction
type CheckErrorRetryResult ¶ added in v3.42.0
type CheckErrorRetryResult = topic.PublicCheckRetryResult
type CommitMode ¶
type CommitMode = topicreaderinternal.PublicCommitMode
CommitMode variants of commit mode of the reader
type CreateDecoderFunc ¶
type CreateDecoderFunc = topicreaderinternal.PublicCreateDecoderFunc
CreateDecoderFunc interface for fabric of message decoders
type CreateEncoderFunc ¶ added in v3.38.0
type CreateEncoderFunc = topicwriterinternal.PublicCreateEncoderFunc
CreateEncoderFunc for create message decoders
type CreateOption ¶
type CreateOption interface {
ApplyCreateOption(request *rawtopic.CreateTopicRequest)
}
CreateOption type for options of topic create
func CreateWithAttributes ¶
func CreateWithAttributes(attributes map[string]string) CreateOption
CreateWithAttributes set attributes for the topic.
func CreateWithConsumer ¶
func CreateWithConsumer(consumers ...topictypes.Consumer) CreateOption
CreateWithConsumer create new consumers with the topic
func CreateWithMeteringMode ¶ added in v3.38.2
func CreateWithMeteringMode(mode topictypes.MeteringMode) CreateOption
CreateWithMeteringMode set metering mode for the topic
func CreateWithMinActivePartitions ¶
func CreateWithMinActivePartitions(count int64) CreateOption
CreateWithMinActivePartitions set min active partitions for the topic
func CreateWithPartitionCountLimit ¶
func CreateWithPartitionCountLimit(count int64) CreateOption
CreateWithPartitionCountLimit set partition count limit for the topic
func CreateWithPartitionWriteBurstBytes ¶
func CreateWithPartitionWriteBurstBytes(partitionWriteBurstBytes int64) CreateOption
CreateWithPartitionWriteBurstBytes set burst limit for partitions of the topic
func CreateWithPartitionWriteSpeedBytesPerSecond ¶
func CreateWithPartitionWriteSpeedBytesPerSecond(partitionWriteSpeedBytesPerSecond int64) CreateOption
CreateWithPartitionWriteSpeedBytesPerSecond set write size limit for partitions of the topic
func CreateWithRetentionPeriod ¶
func CreateWithRetentionPeriod(retentionPeriod time.Duration) CreateOption
CreateWithRetentionPeriod set retention time interval for the topic.
func CreateWithRetentionStorageMB ¶
func CreateWithRetentionStorageMB(retentionStorageMB int64) CreateOption
CreateWithRetentionStorageMB set retention size for the topic.
func CreateWithSupportedCodecs ¶ added in v3.38.2
func CreateWithSupportedCodecs(codecs ...topictypes.Codec) CreateOption
CreateWithSupportedCodecs set supported codecs for the topic
type DescribeOption ¶
type DescribeOption func(req *rawtopic.DescribeTopicRequest)
DescribeOption type for options of describe method. Not used now.
type DropOption ¶
type DropOption interface {
ApplyDropOption(request *rawtopic.DropTopicRequest)
}
DropOption type for drop options. Not used now.
type GetPartitionStartOffsetFunc ¶
type GetPartitionStartOffsetFunc = topicreaderinternal.PublicGetPartitionStartOffsetFunc
GetPartitionStartOffsetFunc callback function for optional handle start partition event and manage read progress at own side. It can call multiply times in parallel.
type GetPartitionStartOffsetRequest ¶
type GetPartitionStartOffsetRequest = topicreaderinternal.PublicGetPartitionStartOffsetRequest
GetPartitionStartOffsetRequest info about the partition
type GetPartitionStartOffsetResponse ¶
type GetPartitionStartOffsetResponse = topicreaderinternal.PublicGetPartitionStartOffsetResponse
GetPartitionStartOffsetResponse optional set offset for start reade messages for the partition
type OnWriterInitResponseCallback ¶ added in v3.38.0
type OnWriterInitResponseCallback = topicwriterinternal.PublicOnWriterInitResponseCallback
OnWriterInitResponseCallback Deprecated: (was experimental) will be removed soon.
type ReadSelector ¶
type ReadSelector = topicreaderinternal.PublicReadSelector
ReadSelector set rules for reader: set of topic, partitions, start time filted, etc.
type ReadSelectors ¶
type ReadSelectors []ReadSelector
ReadSelectors slice of rules for topic reader
func ReadTopic ¶
func ReadTopic(path string) ReadSelectors
ReadTopic create simple selector for read topics, if no need special settings.
type ReaderOption ¶
type ReaderOption = topicreaderinternal.PublicReaderOption
ReaderOption options for topic reader
func WithAddDecoder ¶
func WithAddDecoder(codec topictypes.Codec, decoderCreate CreateDecoderFunc) ReaderOption
WithAddDecoder add decoder for a codec. It allows to set decoders fabric for custom codec and replace internal decoders.
func WithBatchReadMaxCount ¶
func WithBatchReadMaxCount(count int) ReaderOption
WithBatchReadMaxCount Deprecated: (was experimental) will be removed soon. Use WithReaderBatchMaxCount instead.
func WithBatchReadMinCount
deprecated
func WithBatchReadMinCount(count int) ReaderOption
WithBatchReadMinCount prefer min count messages in batch sometimes batch can contain fewer messages, for example if local buffer is full and SDK can't receive more messages
Deprecated: (was experimental) the method will be removed soon.
The option will be removed for simplify code internals
func WithCommitCountTrigger ¶
func WithCommitCountTrigger(count int) ReaderOption
WithCommitCountTrigger Deprecated: (was experimental) will be removed soon. Use WithReaderCommitCountTrigger instead
func WithCommitMode ¶
func WithCommitMode(mode CommitMode) ReaderOption
WithCommitMode Deprecated: (was experimental) will be removed soon. Use WithReaderCommitMode instead.
func WithCommitTimeLagTrigger ¶
func WithCommitTimeLagTrigger(lag time.Duration) ReaderOption
WithCommitTimeLagTrigger Deprecated: (was experimental) will be removed soon. Use WithReaderCommitTimeLagTrigger instead
func WithCommonConfig ¶
func WithCommonConfig(common config.Common) ReaderOption
WithCommonConfig
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func WithGetPartitionStartOffset ¶
func WithGetPartitionStartOffset(f GetPartitionStartOffsetFunc) ReaderOption
WithGetPartitionStartOffset Deprecated: (was experimental) will be removed soon. Use WithReaderGetPartitionStartOffset instead
func WithMessagesBufferSize ¶
func WithMessagesBufferSize(size int) ReaderOption
WithMessagesBufferSize Deprecated: (was experimental) will be removed soon Use WithReaderBufferSizeBytes instead.
func WithReaderBatchMaxCount ¶ added in v3.52.3
func WithReaderBatchMaxCount(count int) ReaderOption
WithReaderBatchMaxCount set max messages count, returned by topic.TopicReader.ReadBatch method
func WithReaderBufferSizeBytes ¶ added in v3.52.3
func WithReaderBufferSizeBytes(size int) ReaderOption
WithReaderBufferSizeBytes set size of internal buffer for read ahead messages.
func WithReaderCheckRetryErrorFunction ¶ added in v3.42.0
func WithReaderCheckRetryErrorFunction(callback CheckErrorRetryFunction) ReaderOption
WithReaderCheckRetryErrorFunction can override default error retry policy use CheckErrorRetryDecisionDefault for use default behavior for the error callback func must be fast and deterministic: always result same result for same error - it can be called few times for every error
Example ¶
var db *ydb.Driver reader, err := db.Topic().StartReader( "consumer", topicoptions.ReadTopic("topic"), topicoptions.WithReaderCheckRetryErrorFunction( func(errInfo topicoptions.CheckErrorRetryArgs) topicoptions.CheckErrorRetryResult { // Retry not found operations if ydb.IsOperationErrorNotFoundError(errInfo.Error) { return topicoptions.CheckErrorRetryDecisionRetry } // and use default behavior for all other errors return topicoptions.CheckErrorRetryDecisionDefault }), ) _, _ = reader, err
Output:
func WithReaderCommitCountTrigger ¶ added in v3.52.3
func WithReaderCommitCountTrigger(count int) ReaderOption
WithReaderCommitCountTrigger set count trigger for send batch to server if count > 0 and sdk count of buffered commits >= count - send commit request to server 0 mean no count limit and use timer lag trigger only
func WithReaderCommitMode ¶ added in v3.52.3
func WithReaderCommitMode(mode CommitMode) ReaderOption
WithReaderCommitMode set commit mode to the reader
func WithReaderCommitTimeLagTrigger ¶ added in v3.52.3
func WithReaderCommitTimeLagTrigger(lag time.Duration) ReaderOption
WithReaderCommitTimeLagTrigger set time lag from first commit message before send commit to server for accumulate many similar-time commits to one server request 0 mean no additional lag and send commit soon as possible Default value: 1 second
func WithReaderGetPartitionStartOffset ¶ added in v3.52.3
func WithReaderGetPartitionStartOffset(f GetPartitionStartOffsetFunc) ReaderOption
WithReaderGetPartitionStartOffset set optional handler for own manage progress of read partitions instead of/additional to commit messages
func WithReaderOperationCancelAfter ¶
func WithReaderOperationCancelAfter(cancelAfter time.Duration) ReaderOption
WithReaderOperationCancelAfter
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func WithReaderOperationTimeout ¶
func WithReaderOperationTimeout(timeout time.Duration) ReaderOption
WithReaderOperationTimeout
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func WithReaderStartTimeout ¶ added in v3.42.0
func WithReaderStartTimeout(timeout time.Duration) ReaderOption
WithReaderStartTimeout mean timeout for connect to reader stream and work some time without errors
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func WithReaderTrace ¶ added in v3.32.0
func WithReaderTrace(t trace.Topic) ReaderOption
WithReaderTrace set tracer for the topic reader
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func WithReaderUpdateTokenInterval ¶ added in v3.41.0
func WithReaderUpdateTokenInterval(interval time.Duration) ReaderOption
WithReaderUpdateTokenInterval set custom interval for send update token message to the server
type TopicOption ¶
TopicOption
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func WithOperationCancelAfter ¶
func WithOperationCancelAfter(operationCancelAfter time.Duration) TopicOption
WithOperationCancelAfter set the maximum amount of time a YDB server will process an operation. After timeout exceeds YDB will try to cancel operation and if it succeeds appropriate error will be returned to the client; otherwise processing will be continued. If OperationCancelAfter is zero then no timeout is used.
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func WithOperationTimeout ¶
func WithOperationTimeout(operationTimeout time.Duration) TopicOption
WithOperationTimeout set the maximum amount of time a YDB server will process an operation. After timeout exceeds YDB will try to cancel operation and regardless of the cancellation appropriate error will be returned to the client. If OperationTimeout is zero then no timeout is used.
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func WithTrace ¶
func WithTrace(trace trace.Topic, opts ...trace.TopicComposeOption) TopicOption
WithTrace defines trace over persqueue client calls
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
type WithOnWriterConnectedInfo ¶ added in v3.38.0
type WithOnWriterConnectedInfo = topicwriterinternal.PublicWithOnWriterConnectedInfo
WithOnWriterConnectedInfo present information, received from server Deprecated: (was experimental) will be removed soon
type WriteSessionMetadata ¶ added in v3.38.0
WriteSessionMetadata set key-value metadata for write session. The metadata will allow for messages of the session in topic reader.
type WriterOption ¶ added in v3.38.0
type WriterOption = topicwriterinternal.PublicWriterOption
WriterOption options for a topic writer
func WithCodec ¶ added in v3.38.0
func WithCodec(codec topictypes.Codec) WriterOption
WithCodec Deprecated: (was experimental) will be removed soon. Use WithWriterCodec instead
func WithCodecAutoSelect ¶ added in v3.38.0
func WithCodecAutoSelect() WriterOption
WithCodecAutoSelect Deprecated: (was experimental) will be removed soon. Use WithWriterCodecAutoSelect instead.
func WithOnWriterFirstConnected ¶ added in v3.38.0
func WithOnWriterFirstConnected(f OnWriterInitResponseCallback) WriterOption
WithOnWriterFirstConnected set callback f, which will called once - after first successfully init topic writer stream Deprecated: (was experimental) will be removed soon. Use Writer.WaitInit function instead
func WithPartitionID ¶ added in v3.38.0
func WithPartitionID(partitionID int64) WriterOption
WithPartitionID Deprecated: (was experimental) will be removed soon Use WithWriterPartitionID instead
func WithProducerID ¶ added in v3.43.0
func WithProducerID(producerID string) WriterOption
WithProducerID Deprecated: (was experimental) will be removed soon. Use WithWriterProducerID instead
func WithSyncWrite ¶ added in v3.38.0
func WithSyncWrite(sync bool) WriterOption
WithSyncWrite Deprecated: (was experimental) use WithWriterWaitServerAck instead
func WithWriteSessionMeta ¶ added in v3.38.0
func WithWriteSessionMeta(meta map[string]string) WriterOption
WithWriteSessionMeta Deprecated: (was experimental) will be removed soon. Use WithWriterSessionMeta instead
func WithWriterAddEncoder ¶ added in v3.38.0
func WithWriterAddEncoder(codec topictypes.Codec, f CreateEncoderFunc) WriterOption
WithWriterAddEncoder add custom codec implementation to writer. It allows to set custom codecs implementations for custom and internal codecs.
func WithWriterCheckRetryErrorFunction ¶ added in v3.42.0
func WithWriterCheckRetryErrorFunction(callback CheckErrorRetryFunction) WriterOption
WithWriterCheckRetryErrorFunction can override default error retry policy use CheckErrorRetryDecisionDefault for use default behavior for the error callback func must be fast and deterministic: always result same result for same error - it can be called few times for every error
Example ¶
var db *ydb.Driver writer, err := db.Topic().StartWriter( "", topicoptions.WithWriterCheckRetryErrorFunction( func(errInfo topicoptions.CheckErrorRetryArgs) topicoptions.CheckErrorRetryResult { // Retry for all transport errors if ydb.IsTransportError(errInfo.Error) { return topicoptions.CheckErrorRetryDecisionRetry } // and use default behavior for all other errors return topicoptions.CheckErrorRetryDecisionDefault }), ) _, _ = writer, err
Output:
func WithWriterCodec ¶ added in v3.52.3
func WithWriterCodec(codec topictypes.Codec) WriterOption
WithWriterCodec disable codec auto select and force set codec for the write session
func WithWriterCodecAutoSelect ¶ added in v3.52.3
func WithWriterCodecAutoSelect() WriterOption
WithWriterCodecAutoSelect - auto select best codec for messages stream enabled by default if option enabled - send a batch of messages for every allowed codec (for prevent delayed bad codec accident) then from time to time measure all codecs and select codec with the smallest result messages size
func WithWriterCompressorCount ¶ added in v3.38.0
func WithWriterCompressorCount(num int) WriterOption
WithWriterCompressorCount set max count of goroutine for compress messages must be more zero
panic if num <= 0
func WithWriterMaxQueueLen ¶ added in v3.38.2
func WithWriterMaxQueueLen(num int) WriterOption
WithWriterMaxQueueLen set max len of queue for wait ack
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func WithWriterMessageMaxBytesSize ¶ added in v3.38.2
func WithWriterMessageMaxBytesSize(size int) WriterOption
WithWriterMessageMaxBytesSize set max body size of one message in bytes. Writer will return error in message will be more than the size.
func WithWriterPartitionID ¶ added in v3.52.3
func WithWriterPartitionID(partitionID int64) WriterOption
WithWriterPartitionID set direct partition id on write session level
func WithWriterProducerID ¶ added in v3.52.3
func WithWriterProducerID(producerID string) WriterOption
WithWriterProducerID set producer for write session
func WithWriterSessionMeta ¶ added in v3.52.3
func WithWriterSessionMeta(meta map[string]string) WriterOption
WithWriterSessionMeta set writer's session metadata
func WithWriterSetAutoCreatedAt ¶ added in v3.38.0
func WithWriterSetAutoCreatedAt(val bool) WriterOption
WithWriterSetAutoCreatedAt set messages CreatedAt by SDK enabled by default if enabled - Message.CreatedAt field must be zero
func WithWriterSetAutoSeqNo ¶ added in v3.38.0
func WithWriterSetAutoSeqNo(val bool) WriterOption
WithWriterSetAutoSeqNo set messages SeqNo by SDK enabled by default if enabled - Message.SeqNo field must be zero
func WithWriterStartTimeout ¶ added in v3.42.0
func WithWriterStartTimeout(timeout time.Duration) WriterOption
WithWriterStartTimeout mean timeout for connect to writer stream and work some time without errors
func WithWriterTrace ¶ added in v3.40.0
func WithWriterTrace(t trace.Topic) WriterOption
WithWriterTrace set tracer for the writer
func WithWriterUpdateTokenInterval ¶ added in v3.41.0
func WithWriterUpdateTokenInterval(interval time.Duration) WriterOption
WithWriterUpdateTokenInterval set time interval between send auth token to the server
func WithWriterWaitServerAck ¶ added in v3.52.3
func WithWriterWaitServerAck(wait bool) WriterOption
WithWriterWaitServerAck - when enabled every Write call wait ack from server for all messages from the call disabled by default. Make writer much slower, use only if you really need it.