topicwriter

package
v3.38.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2022 License: Apache-2.0 Imports: 2 Imported by: 1

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Message

type Message = topicwriterinternal.Message

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer represent write session to topic It handles connection problems, reconnect to server when need and resend buffered messages

func NewWriter

func NewWriter(writer *topicwriterinternal.Writer) *Writer

func (*Writer) Close

func (w *Writer) Close(ctx context.Context) error

func (*Writer) Write

func (w *Writer) Write(ctx context.Context, messages ...Message) error

Write send messages to topic return fast in async mode (default) and wait ack from server in sync mode. see topicoptions.WithSyncWrite

The method will wait first initial connection even for async mode, that mean first write may be slower. especially when connection has problems.

ctx cancelation mean cancel of wait ack only, it will not cancel of send messages

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

Example
ctx := context.Background()
db, err := ydb.Open(ctx, os.Getenv("YDB_CONNECTION_STRING"))
if err != nil {
	log.Fatalf("failed ydb connection: %v", err)
}

writer, err := db.Topic().StartWriter("producer-id", "topicName",
	topicoptions.WithWriterCompressorCount(1),
)
if err != nil {
	log.Fatalf("failed to create topic writer: %v", err)
}

err = writer.Write(ctx,
	topicwriter.Message{Data: strings.NewReader("1")},
	topicwriter.Message{Data: strings.NewReader("2")},
	topicwriter.Message{Data: strings.NewReader("3")},
)
if err == nil {
	fmt.Println("OK")
} else {
	log.Fatalf("failed write to stream")
}
Output:

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL