natsio

package
v3.0.0-...-16f56ce Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2024 License: Apache-2.0, BSD-3-Clause, MIT Imports: 13 Imported by: 0

Documentation

Overview

Package natsio contains transforms for interacting with NATS.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Read

func Read(
	s beam.Scope,
	uri string,
	stream string,
	subject string,
	opts ...ReadOptionFn,
) beam.PCollection

Read reads messages from NATS JetStream and returns a PCollection<ConsumerMessage>. Read takes a variable number of ReadOptionFn to configure the read operation:

  • UserCredentials: path to the user credentials file. Defaults to empty.
  • ProcessingTimePolicy: whether to use the pipeline processing time of the messages as the event time. Defaults to true.
  • PublishingTimePolicy: whether to use the publishing time of the messages as the event time. Defaults to false.
  • FetchSize: the maximum number of messages to retrieve at a time. Defaults to 100.
  • StartSeqNo: the start sequence number of messages to read. Defaults to 1.
  • EndSeqNo: the end sequence number of messages to read (exclusive). Defaults to math.MaxInt64.
Example
package main

import (
	"context"
	"log"

	"github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam"
	"github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam/io/natsio"
	"github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam/x/beamx"
	"github.com/cd-paliv/beam-fork/sdks/v3/go/pkg/beam/x/debug"
)

func main() {
	beam.Init()

	p, s := beam.NewPipelineWithRoot()

	uri := "nats://localhost:4222"
	stream := "EVENTS"
	subject := "events.*"

	col := natsio.Read(s, uri, stream, subject)
	debug.Print(s, col)

	if err := beamx.Run(context.Background(), p); err != nil {
		log.Fatalf("Failed to execute job: %v", err)
	}
}
Output:

func Write

func Write(s beam.Scope, uri string, col beam.PCollection, opts ...WriteOptionFn) beam.PCollection

Write writes a PCollection<ProducerMessage> to NATS JetStream and returns a PCollection<PublishAck> of the acknowledged messages. The ID field can be set in the ProducerMessage to utilize JetStream's support for deduplication of messages. Write takes a variable number of WriteOptionFn to configure the write operation:

  • UserCredentials: path to the user credentials file. Defaults to empty.
Example
beam.Init()

p, s := beam.NewPipelineWithRoot()

uri := "nats://localhost:4222"
msgs := []natsio.ProducerMessage{
	{
		Subject: "events.1",
		ID:      "123",
		Data:    []byte("hello"),
		Headers: nats.Header{"key": []string{"val1"}},
	},
	{
		Subject: "events.2",
		ID:      "124",
		Data:    []byte("world"),
		Headers: nats.Header{"key": []string{"val2"}},
	},
}

input := beam.CreateList(s, msgs)
natsio.Write(s, uri, input)

if err := beamx.Run(context.Background(), p); err != nil {
	log.Fatalf("Failed to execute job: %v", err)
}
Output:

Types

type ConsumerMessage

type ConsumerMessage struct {
	Subject        string
	PublishingTime time.Time
	ID             string
	Headers        map[string][]string
	Data           []byte
}

type ProducerMessage

type ProducerMessage struct {
	Subject string
	ID      string
	Headers map[string][]string
	Data    []byte
}

ProducerMessage represents a message to be published to NATS.

type PublishAck

type PublishAck struct {
	Stream    string
	Subject   string
	ID        string
	Sequence  uint64
	Duplicate bool
}

PublishAck represents an acknowledgement from NATS after publishing a message.

type ReadOptionFn

type ReadOptionFn func(option *readOption) error

ReadOptionFn is a function that can be passed to Read to configure options for reading from NATS.

func ReadEndSeqNo

func ReadEndSeqNo(seqNo int64) ReadOptionFn

ReadEndSeqNo sets the end sequence number of messages to read (exclusive).

func ReadFetchSize

func ReadFetchSize(size int) ReadOptionFn

ReadFetchSize sets the maximum number of messages to retrieve at a time.

func ReadProcessingTimePolicy

func ReadProcessingTimePolicy() ReadOptionFn

ReadProcessingTimePolicy specifies that the pipeline processing time of the messages should be used to compute the watermark estimate.

func ReadPublishingTimePolicy

func ReadPublishingTimePolicy() ReadOptionFn

ReadPublishingTimePolicy specifies that the publishing time of the messages should be used to compute the watermark estimate.

func ReadStartSeqNo

func ReadStartSeqNo(seqNo int64) ReadOptionFn

ReadStartSeqNo sets the start sequence number of messages to read.

func ReadUserCredentials

func ReadUserCredentials(credsFile string) ReadOptionFn

ReadUserCredentials sets the user credentials when connecting to NATS.

type WriteOptionFn

type WriteOptionFn func(option *writeOption)

WriteOptionFn is a function that can be passed to Write to configure options for writing messages.

func WriteUserCredentials

func WriteUserCredentials(credsFile string) WriteOptionFn

WriteUserCredentials sets the user credentials when connecting to NATS.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL