Documentation ¶
Overview ¶
Package natsio contains transforms for interacting with NATS.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Read ¶
func Read( s beam.Scope, uri string, stream string, subject string, opts ...ReadOptionFn, ) beam.PCollection
Read reads messages from NATS JetStream and returns a PCollection<ConsumerMessage>. Read takes a variable number of ReadOptionFn to configure the read operation:
- UserCredentials: path to the user credentials file. Defaults to empty.
- ProcessingTimePolicy: whether to use the pipeline processing time of the messages as the event time. Defaults to true.
- PublishingTimePolicy: whether to use the publishing time of the messages as the event time. Defaults to false.
- FetchSize: the maximum number of messages to retrieve at a time. Defaults to 100.
- StartSeqNo: the start sequence number of messages to read. Defaults to 1.
- EndSeqNo: the end sequence number of messages to read (exclusive). Defaults to math.MaxInt64.
Example ¶
package main import ( "context" "log" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam/io/natsio" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam/x/beamx" "github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam/x/debug" ) func main() { beam.Init() p, s := beam.NewPipelineWithRoot() uri := "nats://localhost:4222" stream := "EVENTS" subject := "events.*" col := natsio.Read(s, uri, stream, subject) debug.Print(s, col) if err := beamx.Run(context.Background(), p); err != nil { log.Fatalf("Failed to execute job: %v", err) } }
Output:
func Write ¶
func Write(s beam.Scope, uri string, col beam.PCollection, opts ...WriteOptionFn) beam.PCollection
Write writes a PCollection<ProducerMessage> to NATS JetStream and returns a PCollection<PublishAck> of the acknowledged messages. The ID field can be set in the ProducerMessage to utilize JetStream's support for deduplication of messages. Write takes a variable number of WriteOptionFn to configure the write operation:
- UserCredentials: path to the user credentials file. Defaults to empty.
Example ¶
beam.Init() p, s := beam.NewPipelineWithRoot() uri := "nats://localhost:4222" msgs := []natsio.ProducerMessage{ { Subject: "events.1", ID: "123", Data: []byte("hello"), Headers: nats.Header{"key": []string{"val1"}}, }, { Subject: "events.2", ID: "124", Data: []byte("world"), Headers: nats.Header{"key": []string{"val2"}}, }, } input := beam.CreateList(s, msgs) natsio.Write(s, uri, input) if err := beamx.Run(context.Background(), p); err != nil { log.Fatalf("Failed to execute job: %v", err) }
Output:
Types ¶
type ConsumerMessage ¶
type ProducerMessage ¶
ProducerMessage represents a message to be published to NATS.
type PublishAck ¶
PublishAck represents an acknowledgement from NATS after publishing a message.
type ReadOptionFn ¶
type ReadOptionFn func(option *readOption) error
ReadOptionFn is a function that can be passed to Read to configure options for reading from NATS.
func ReadEndSeqNo ¶
func ReadEndSeqNo(seqNo int64) ReadOptionFn
ReadEndSeqNo sets the end sequence number of messages to read (exclusive).
func ReadFetchSize ¶
func ReadFetchSize(size int) ReadOptionFn
ReadFetchSize sets the maximum number of messages to retrieve at a time.
func ReadProcessingTimePolicy ¶
func ReadProcessingTimePolicy() ReadOptionFn
ReadProcessingTimePolicy specifies that the pipeline processing time of the messages should be used to compute the watermark estimate.
func ReadPublishingTimePolicy ¶
func ReadPublishingTimePolicy() ReadOptionFn
ReadPublishingTimePolicy specifies that the publishing time of the messages should be used to compute the watermark estimate.
func ReadStartSeqNo ¶
func ReadStartSeqNo(seqNo int64) ReadOptionFn
ReadStartSeqNo sets the start sequence number of messages to read.
func ReadUserCredentials ¶
func ReadUserCredentials(credsFile string) ReadOptionFn
ReadUserCredentials sets the user credentials when connecting to NATS.
type WriteOptionFn ¶
type WriteOptionFn func(option *writeOption)
WriteOptionFn is a function that can be passed to Write to configure options for writing messages.
func WriteUserCredentials ¶
func WriteUserCredentials(credsFile string) WriteOptionFn
WriteUserCredentials sets the user credentials when connecting to NATS.