liftbridge

package module
v1.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2020 License: Apache-2.0 Imports: 18 Imported by: 0

README

go-liftbridge CircleCI GoDoc

Go client for Liftbridge, a system that provides lightweight, fault-tolerant message streams for NATS.

Liftbridge provides the following high-level features:

  • Log-based API for NATS
  • Replicated for fault-tolerance
  • Horizontally scalable
  • Wildcard subscription support
  • At-least-once delivery support and message replay
  • Message key-value support
  • Log compaction by key

Installation

$ go get github.com/liftbridge-io/go-liftbridge

Basic Usage

package main

import (
	"fmt"

	lift "github.com/liftbridge-io/go-liftbridge"
	"golang.org/x/net/context"
)

func main() {
	// Create Liftbridge client.
	addrs := []string{"localhost:9292", "localhost:9293", "localhost:9294"}
	client, err := lift.Connect(addrs)
	if err != nil {
		panic(err)
	}
	defer client.Close()

	// Create a stream attached to the NATS subject "foo".
	var (
        	subject = "foo"
        	name    = "foo-stream"
	)
	if err := client.CreateStream(context.Background(), subject, name); err != nil {
		if err != lift.ErrStreamExists {
			panic(err)
		}
	}
	
	// Publish a message to "foo".
	if _, err := client.Publish(context.Background(), name, []byte("hello")); err != nil {
		panic(err)
	}

	// Subscribe to the stream starting from the beginning.
	ctx := context.Background()
	if err := client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
		if err != nil {
			panic(err)
		}
		fmt.Println(msg.Offset(), string(msg.Value()))
	}, lift.StartAtEarliestReceived()); err != nil {
		panic(err)
	}

	<-ctx.Done()
}
Create Stream

Streams are a durable message log attached to a NATS subject. They record messages published to the subject for consumption.

Streams have a few key properties: a subject, which is the corresponding NATS subject, a name, which is a human-readable identifier for the stream, and a replication factor, which is the number of nodes the stream should be replicated to for redundancy. Optionally, there is a group which is the name of a load-balance group for the stream to join. When there are multiple streams in the same group, messages will be balanced among them.

// Create a stream attached to the NATS subject "foo.*" that is replicated to
// all the brokers in the cluster. ErrStreamExists is returned if a stream with
// the given name already exists.
client.CreateStream(context.Background(), "foo.*", "my-stream", lift.MaxReplication())

We can also configure different properties of the stream with options, which
allow overriding the server settings.

// Create a stream and set the rentention to 134217728 bytes (128MB) and enable
// stream compaction.
client.CreateStream(context.Background(), subject, name,
    lift.RetentionMaxBytes(134217728), lift.CompactEnabled(true))
Subscription Start/Replay Options

Subscriptions are how Liftbridge streams are consumed. Clients can choose where to start consuming messages from in a stream. This is controlled using options passed to Subscribe. Client can also choose to subscribe from partition's leader (by default) or from a random ISR replica

// Subscribe starting with new messages only.
client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
	fmt.Println(msg.Offset(), string(msg.Value()))
})

// Subscribe starting with the most recently published value.
client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
	fmt.Println(msg.Offset(), string(msg.Value()))
}, lift.StartAtLatestReceived())

// Subscribe starting with the oldest published value.
client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
	fmt.Println(msg.Offset(), string(msg.Value()))
}, lift.StartAtEarliestReceived())

// Subscribe starting at a specific offset.
client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
	fmt.Println(msg.Offset(), string(msg.Value()))
}, lift.StartAtOffset(42))

// Subscribe starting at a specific time.
client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
	fmt.Println(msg.Offset(), string(msg.Value()))
}, lift.StartAtTime(time.Now()))

// Subscribe starting at a specific amount of time in the past.
client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
	fmt.Println(msg.Offset(), string(msg.Value()))
}, lift.StartAtTimeDelta(time.Minute))

// Subscribe to a random ISR replica
// this helps reduce the work load
// for partition leader
client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
	fmt.Println(msg.Offset(), string(msg.Value()))
}, lift.ReadISRReplica())
Publishing

There are two publish APIs provided to make it easy to write messages to streams, Publish and PublishToSubject. These include a number of options for decorating messages with metadata like a message key and headers as well as configuring acking behavior from the server.

Publish sends a message to a Liftbridge stream. The stream partition that gets published to is determined by the provided partition or Partitioner strategy passed through MessageOptions, if any. If a partition or Partitioner is not provided, it will publish to the base partition (partition 0). This partition determines the underlying NATS subject that gets published to. To publish directly to a specific NATS subject, use the low-level PublishToSubject API described below.

Keys are used by Liftbridge's log compaction. When enabled, Liftbridge streams will retain only the last message for a given key.

// Publish a message with a key and header set.
client.Publish(context.Background(), "foo-stream", []byte("hello"),
	lift.Key([]byte("key"),
	lift.Header("foo", []byte("bar")),
)

An AckPolicy tells the server when to send an ack. If a deadline is provided to Publish, it will block up to this amount of time waiting for the ack.

ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
client.Publish(ctx, "foo-stream", []byte("hello"),
	lift.AckPolicyAll(), // Wait for all stream replicas to get the message
)

ctx, _ = context.WithTimeout(context.Background(), 2*time.Second)
client.Publish(ctx, "foo-stream", []byte("hello"),
	lift.AckPolicyLeader(), // Wait for just the stream leader to get the message
)

client.Publish(context.Background(), "foo-stream", []byte("hello"),
	lift.AckPolicyNone(), // Don't send an ack
)

PublishToSubject sends a message directly to the provided NATS subject. Note that because this publishes directly to a subject, there may be multiple (or no) streams that receive the message. As a result, MessageOptions related to partitioning will be ignored. To publish at the stream/partition level, use the high-level Publish API described above.

// Publish a message directly to a NATS subject.
client.PublishToSubject(context.Background(), "foo.bar", []byte("hello"))

Alternatively, messages can be published directly to NATS using a NATS client and Liftbridge helpers detailed below.

Publishing Directly with NATS

Since Liftbridge is an extension of NATS, a NATS client can also be used to publish messages. This means existing NATS publishers do not need any changes for messages to be consumed in Liftbridge.

package main

import "github.com/nats-io/go-nats"

func main() {
	// Connect to NATS.
	nc, _ := nats.Connect(nats.DefaultURL)

	// Publish a message.
	nc.Publish("foo.bar", []byte("Hello, world!")) 
	nc.Flush()
}

As shown with the publish APIs above, Liftbridge allows publishers to add metadata to messages, including a key, ack inbox, correlation ID, and ack policy. The message key can be used for stream compaction in Liftbridge. Acks are used to guarantee Liftbridge has recorded a message to ensure at-least-once delivery. The ack inbox determines a NATS subject to publish an acknowledgement to once Liftbridge has committed the message. The correlation id is used to correlate an ack back to the original message. The ack policy determines when Liftbridge acknowledges the message: when the stream leader has stored the message, when all replicas have stored it, or no ack at all.

This additional metadata is sent using a message envelope which is a protobuf. The publish APIs handle this for you, but this client library also provides helper APIs to make it easy to create envelopes and deal with acks yourself using a NATS client directly.

var (
	ackInbox = "foo.acks"
	cid      = "some-random-id"
)

// Create a message envelope to publish.
msg := lift.NewMessage([]byte("Hello, world!"),
	lift.Key([]byte("foo")), // Key to set on the message
	lift.AckInbox(ackInbox), // Send ack to this NATS subject
	lift.AckPolicyAll(),     // Send ack once message is fully replicated
	lift.CorrelationID(cid), // Set the ID which will be sent on the ack
)

// Setup a NATS subscription for acks.
sub, _ := nc.SubscribeSync(ackInbox)

// Publish the message.
nc.Publish("foo.bar", msg)

// Wait for ack from Liftbridge.
resp, _ := sub.NextMsg(5*time.Second)
ack, _ := lift.UnmarshalAck(resp.Data)
if ack.CorrelationID() == cid {
	fmt.Println("message acked!")
}
Partitioning

Liftbridge streams are partitioned to allow for increased parallelism. By default, a stream consists of a single partition, but the number of partitions can be configured when the stream is created.

Creating Partitioned Streams
// Create stream with three partitions.
client.CreateStream(context.Background(), "bar", "bar-stream", lift.Partitions(3))

Each partition maps to a NATS subject derived from the base stream subject. For example, the partitions for a stream with three partitions attached to the subject "bar" map to the NATS subjects "bar", "bar.1", and "bar.2", respectively.

Publishing to Stream Partitions

By default, clients will publish to the base partition, but this can be configured by providing a Partitioner.

// Publish to partition based on message key hash.
client.Publish(context.Background(), "bar-stream", []byte("hello"),
	lift.Key([]byte("key")),
	lift.PartitionByKey(),
)

// Publish to partitions in a round-robin fashion.
client.Publish(context.Background(), "bar-stream", []byte("hello"),
	lift.Key([]byte("key")),
	lift.PartitionByRoundRobin(),
)

// Publish to a specific partition.
client.Publish(context.Background(), "bar-stream", []byte("hello"),
	lift.Key([]byte("key")),
	lift.ToPartition(1),
)

// Publish directly to a partition NATS subject.
client.PublishToSubject(context.Background(), "bar.1", []byte("hello"),
	lift.Key([]byte("key")))

A custom Partitioner implementation can also be provided to Publish. PublishToSubject will ignore any partition-related MessageOption.

Subscribing to Stream Partitions

Like publishing, clients will subscribe to the base partition by default. However, a specific partition to consume from can be specified at subscribe time.

// Subscribe to a specific partition.
client.Subscribe(ctx, "bar-stream", func(msg *lift.Message, err error) {
	fmt.Println(msg.Offset, string(msg.Value))
}, lift.Partition(1))

Documentation

Overview

Package liftbridge implements a client for the Liftbridge messaging system. Liftbridge provides lightweight, fault-tolerant message streams by implementing a durable stream augmentation NATS. In particular, it offers a publish-subscribe log API that is highly available and horizontally scalable.

This package provides APIs for creating and consuming Liftbridge streams and some utility APIs for using Liftbridge in combination with NATS.

Index

Examples

Constants

View Source
const MaxReplicationFactor int32 = -1

MaxReplicationFactor can be used to tell the server to set the replication factor equal to the current number of servers in the cluster when creating a stream.

Variables

View Source
var (
	// ErrStreamExists is returned by CreateStream if the specified stream
	// already exists in the Liftbridge cluster.
	ErrStreamExists = errors.New("stream already exists")

	// ErrNoSuchStream is returned by DeleteStream if the specified stream does
	// not exist in the Liftbridge cluster.
	ErrNoSuchStream = errors.New("stream does not exist")

	// ErrNoSuchPartition is returned by Subscribe if the specified stream
	// partition does not exist in the Liftbridge cluster.
	ErrNoSuchPartition = errors.New("stream partition does not exist")

	// ErrStreamDeleted is sent to subscribers when the stream they are
	// subscribed to has been deleted.
	ErrStreamDeleted = errors.New("stream has been deleted")

	// ErrPartitionPaused is sent to subscribers when the stream partition they
	// are subscribed to has been paused.
	ErrPartitionPaused = errors.New("stream partition has been paused")

	// ErrAckTimeout indicates a publish ack was not received in time.
	ErrAckTimeout = errors.New("publish ack timeout")

	// ErrEndOfReadonlyPartition is sent to subscribers when the stream
	// partition they are subscribed to has either been set to readonly or is
	// already readonly and all messages have been read.
	ErrEndOfReadonlyPartition = errors.New("end of readonly partition reached")
)

Functions

func NewMessage

func NewMessage(value []byte, options ...MessageOption) []byte

NewMessage returns a serialized message for the given payload and options.

Example
// Create NATS connection.
conn, err := nats.GetDefaultOptions().Connect()
if err != nil {
	panic(err)
}
defer conn.Flush()
defer conn.Close()

// Publish simple message.
msg := NewMessage([]byte("value"))
if err := conn.Publish("foo", msg); err != nil {
	panic(err)
}

// Publish message with options.
msg = NewMessage([]byte("value"),
	Key([]byte("key")),
	AckPolicyAll(),
	AckInbox("ack"),
	CorrelationID("123"),
)
if err := conn.Publish("foo", msg); err != nil {
	panic(err)
}
Output:

Types

type Ack

type Ack struct {
	// contains filtered or unexported fields
}

Ack represents an acknowledgement that a message was committed to a stream partition.

func UnmarshalAck

func UnmarshalAck(data []byte) (*Ack, error)

UnmarshalAck deserializes an Ack from the given byte slice. It returns an error if the given data is not actually an Ack.

Example
// Create NATS connection.
conn, err := nats.GetDefaultOptions().Connect()
if err != nil {
	panic(err)
}
defer conn.Close()

// Setup ack inbox.
ackInbox := "acks"
acked := make(chan struct{})
_, err = conn.Subscribe(ackInbox, func(m *nats.Msg) {
	ack, err := UnmarshalAck(m.Data)
	if err != nil {
		panic(err)
	}
	fmt.Println("ack:", ack.Stream(), ack.Offset(), ack.MessageSubject())
	close(acked)
})
if err != nil {
	panic(err)
}

// Publish message.
msg := NewMessage([]byte("value"), Key([]byte("key")), AckInbox(ackInbox))
if err := conn.Publish("foo", msg); err != nil {
	panic(err)
}

<-acked
Output:

func (*Ack) AckInbox

func (a *Ack) AckInbox() string

AckInbox is the NATS subject the ack was published to.

func (*Ack) AckPolicy

func (a *Ack) AckPolicy() AckPolicy

AckPolicy sent on the message.

func (*Ack) CorrelationID

func (a *Ack) CorrelationID() string

CorrelationID is the user-supplied value from the message.

func (*Ack) MessageSubject

func (a *Ack) MessageSubject() string

MessageSubject is the NATS subject the message was received on.

func (*Ack) Offset

func (a *Ack) Offset() int64

Offset is the partition offset the message was committed to.

func (*Ack) PartitionSubject

func (a *Ack) PartitionSubject() string

PartitionSubject is the NATS subject the partition is attached to.

func (*Ack) Stream

func (a *Ack) Stream() string

Stream the Message was received on.

type AckHandler

type AckHandler func(ack *Ack, err error)

AckHandler is used to handle the results of asynchronous publishes to a stream. If the AckPolicy on the published message is not NONE, the handler will receive the ack once it's received from the cluster or an error if the message was not received successfully.

type AckPolicy

type AckPolicy int32

AckPolicy controls the behavior of message acknowledgements.

type BrokerInfo

type BrokerInfo struct {
	// contains filtered or unexported fields
}

BrokerInfo contains information for a Liftbridge cluster node.

func (*BrokerInfo) Addr

func (b *BrokerInfo) Addr() string

Addr returns <host>:<port> for the broker server.

func (*BrokerInfo) Host

func (b *BrokerInfo) Host() string

Host of the broker server.

func (*BrokerInfo) ID

func (b *BrokerInfo) ID() string

ID of the broker.

func (*BrokerInfo) Port

func (b *BrokerInfo) Port() int32

Port of the broker server.

type Client

type Client interface {
	// Close the client connection.
	Close() error

	// CreateStream creates a new stream attached to a NATS subject. Subject is
	// the NATS subject the stream is attached to, and name is the stream
	// identifier, unique per subject. It returns ErrStreamExists if a stream
	// with the given subject and name already exists.
	CreateStream(ctx context.Context, subject, name string, opts ...StreamOption) error

	// DeleteStream deletes a stream and all of its partitions. Name is the
	// stream identifier, globally unique.
	DeleteStream(ctx context.Context, name string) error

	// PauseStream pauses a stream and some or all of its partitions. Name is
	// the stream identifier, globally unique. It returns an ErrNoSuchPartition
	// if the given stream or partition does not exist. By default, this will
	// pause all partitions. A partition is resumed when it is published to via
	// the Liftbridge Publish API or ResumeAll is enabled and another partition
	// in the stream is published to.
	PauseStream(ctx context.Context, name string, opts ...PauseOption) error

	// SetStreamReadonly sets the readonly flag on a stream and some or all of
	// its partitions. Name is the stream identifier, globally unique. It
	// returns an ErrNoSuchPartition if the given stream or partition does not
	// exist. By default, this will set the readonly flag on all partitions.
	// Subscribers to a readonly partition will see their subscription ended
	// with a ErrEndOfReadonlyPartition error once all messages currently in
	// the partition have been read.
	SetStreamReadonly(ctx context.Context, name string, opts ...ReadonlyOption) error

	// Subscribe creates an ephemeral subscription for the given stream. It
	// begins receiving messages starting at the configured position and waits
	// for new messages when it reaches the end of the stream. The default
	// start position is the end of the stream. It returns an
	// ErrNoSuchPartition if the given stream or partition does not exist. Use
	// a cancelable Context to close a subscription.
	Subscribe(ctx context.Context, stream string, handler Handler, opts ...SubscriptionOption) error

	// Publish publishes a new message to the Liftbridge stream. The partition
	// that gets published to is determined by the provided partition or
	// Partitioner passed through MessageOptions, if any. If a partition or
	// Partitioner is not provided, this defaults to the base partition. This
	// partition determines the underlying NATS subject that gets published to.
	// To publish directly to a specific NATS subject, use the low-level
	// PublishToSubject API.
	//
	// If the AckPolicy is not NONE, this will synchronously block until the
	// ack is received. If the ack is not received in time, ErrAckTimeout is
	// returned. If AckPolicy is NONE, this returns nil on success.
	Publish(ctx context.Context, stream string, value []byte, opts ...MessageOption) (*Ack, error)

	// PublishAsync publishes a new message to the Liftbridge stream and
	// asynchronously processes the ack or error for the message.
	PublishAsync(ctx context.Context, stream string, value []byte, ackHandler AckHandler, opts ...MessageOption) error

	// PublishToSubject publishes a new message to the NATS subject. Note that
	// because this publishes directly to a subject, there may be multiple (or
	// no) streams that receive the message. As a result, MessageOptions
	// related to partitioning will be ignored. To publish at the
	// stream/partition level, use the high-level Publish API.
	//
	// If the AckPolicy is not NONE and a deadline is provided, this will
	// synchronously block until the first ack is received. If an ack is not
	// received in time, ErrAckTimeout is returned. If an AckPolicy and
	// deadline are configured, this returns the first Ack on success,
	// otherwise it returns nil.
	PublishToSubject(ctx context.Context, subject string, value []byte, opts ...MessageOption) (*Ack, error)

	// FetchMetadata returns cluster metadata including broker and stream
	// information.
	FetchMetadata(ctx context.Context) (*Metadata, error)
}

Client is the main API used to communicate with a Liftbridge cluster. Call Connect to get a Client instance.

Example (CreateStream)
// Connect to Liftbridge.
addr := "localhost:9292"
client, err := Connect([]string{addr})
if err != nil {
	panic(err)
}
defer client.Close()

// Create stream with a single partition.
if err := client.CreateStream(context.Background(), "foo", "foo-stream"); err != nil {
	panic(err)
}

// Create stream with three partitions.
if err := client.CreateStream(context.Background(), "bar", "bar-stream", Partitions(3)); err != nil {
	panic(err)
}
Output:

Example (Publish)
// Connect to Liftbridge.
addr := "localhost:9292"
client, err := Connect([]string{addr})
if err != nil {
	panic(err)
}
defer client.Close()

// Publish message to base stream partition.
if _, err := client.Publish(context.Background(), "foo-stream", []byte("hello")); err != nil {
	panic(err)
}

// Publish message to stream partition based on key.
if _, err := client.Publish(context.Background(), "bar-stream", []byte("hello"),
	Key([]byte("key")), PartitionByKey(),
); err != nil {
	panic(err)
}
Output:

Example (PublishToSubject)
// Connect to Liftbridge.
addr := "localhost:9292"
client, err := Connect([]string{addr})
if err != nil {
	panic(err)
}
defer client.Close()

// Publish message directly to NATS subject.
if _, err := client.PublishToSubject(context.Background(), "foo.bar", []byte("hello")); err != nil {
	panic(err)
}
Output:

Example (Subscribe)
// Connect to Liftbridge.
addr := "localhost:9292"
client, err := Connect([]string{addr})
if err != nil {
	panic(err)
}
defer client.Close()

// Subscribe to base stream partition.
if err := client.Subscribe(context.Background(), "foo-stream", func(msg *Message, err error) {
	if err != nil {
		panic(err)
	}
	fmt.Println(msg.Offset(), string(msg.Value()))
}); err != nil {
	panic(err)
}

// Subscribe to a specific stream partition.
ctx := context.Background()
if err := client.Subscribe(ctx, "bar-stream", func(msg *Message, err error) {
	if err != nil {
		panic(err)
	}
	fmt.Println(msg.Offset(), string(msg.Value()))
}, Partition(1)); err != nil {
	panic(err)
}

<-ctx.Done()
Output:

func Connect

func Connect(addrs []string, options ...ClientOption) (Client, error)

Connect creates a Client connection for the given Liftbridge cluster. Multiple addresses can be provided. Connect will use whichever it connects successfully to first in random order. The Client will use the pool of addresses for failover purposes. Note that only one seed address needs to be provided as the Client will discover the other brokers when fetching metadata for the cluster.

Example
addr := "localhost:9292"
client, err := Connect([]string{addr})
if err != nil {
	panic(err)
}
defer client.Close()
Output:

type ClientOption

type ClientOption func(*ClientOptions) error

ClientOption is a function on the ClientOptions for a connection. These are used to configure particular client options.

func AckWaitTime

func AckWaitTime(wait time.Duration) ClientOption

AckWaitTime is a ClientOption to set the default amount of time to wait for an ack to be received for a published message before ErrAckTimeout is returned. This can be overridden on individual requests by setting a timeout on the Context. This defaults to 5 seconds if not set.

func KeepAliveTime

func KeepAliveTime(keepAlive time.Duration) ClientOption

KeepAliveTime is a ClientOption to set the amount of time a pooled connection can be idle before it is closed and removed from the pool. The default is 30 seconds.

func MaxConnsPerBroker

func MaxConnsPerBroker(max int) ClientOption

MaxConnsPerBroker is a ClientOption to set the maximum number of connections to pool for a given broker in the cluster. The default is 2.

func ResubscribeWaitTime

func ResubscribeWaitTime(wait time.Duration) ClientOption

ResubscribeWaitTime is a ClientOption to set the amount of time to attempt to re-establish a stream subscription after being disconnected. For example, if the server serving a subscription dies and the stream is replicated, the client will attempt to re-establish the subscription once the stream leader has failed over. This failover can take several moments, so this option gives the client time to retry. The default is 30 seconds.

func TLSCert

func TLSCert(cert string) ClientOption

TLSCert is a ClientOption to set the TLS certificate for the client.

func TLSConfig

func TLSConfig(config *tls.Config) ClientOption

TLSConfig is a ClientOption to set the TLS configuration for the client. Overrides TLSCert.

type ClientOptions

type ClientOptions struct {
	// Brokers it the set of hosts the client will use when attempting to
	// connect.
	Brokers []string

	// MaxConnsPerBroker is the maximum number of connections to pool for a
	// given broker in the cluster. The default is 2.
	MaxConnsPerBroker int

	// KeepAliveTime is the amount of time a pooled connection can be idle
	// before it is closed and removed from the pool. The default is 30
	// seconds.
	KeepAliveTime time.Duration

	// TLSCert is the TLS certificate file to use. The client does not use a
	// TLS connection if this is not set.
	TLSCert string

	// TLSConfig is the TLS configuration to use. The client does not use a
	// TLS connection if this is not set. Overrides TLSCert if set.
	TLSConfig *tls.Config

	// ResubscribeWaitTime is the amount of time to attempt to re-establish a
	// stream subscription after being disconnected. For example, if the server
	// serving a subscription dies and the stream is replicated, the client
	// will attempt to re-establish the subscription once the stream leader has
	// failed over. This failover can take several moments, so this option
	// gives the client time to retry. The default is 30 seconds.
	ResubscribeWaitTime time.Duration

	// AckWaitTime is the default amount of time to wait for an ack to be
	// received for a published message before ErrAckTimeout is returned. This
	// can be overridden on individual requests by setting a timeout on the
	// Context. This defaults to 5 seconds if not set.
	AckWaitTime time.Duration
}

ClientOptions are used to control the Client configuration.

func DefaultClientOptions

func DefaultClientOptions() ClientOptions

DefaultClientOptions returns the default configuration options for the client.

func (ClientOptions) Connect

func (o ClientOptions) Connect() (Client, error)

Connect will attempt to connect to a Liftbridge server with multiple options.

type Handler

type Handler func(msg *Message, err error)

Handler is the callback invoked by Subscribe when a message is received on the specified stream. If err is not nil, the subscription will be terminated and no more messages will be received.

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message received from a Liftbridge stream.

func UnmarshalMessage

func UnmarshalMessage(data []byte) (*Message, error)

UnmarshalMessage deserializes a message from the given byte slice. It returns an error if the given data is not actually a Message.

func (*Message) Headers

func (m *Message) Headers() map[string][]byte

Headers is a set of key-value pairs.

func (*Message) Key

func (m *Message) Key() []byte

Key is an optional label set on a Message, useful for partitioning and stream compaction.

func (*Message) Offset

func (m *Message) Offset() int64

Offset is a monotonic message sequence in the stream partition.

func (*Message) Partition

func (m *Message) Partition() int32

Partition the Message was received on.

func (*Message) ReplySubject

func (m *Message) ReplySubject() string

ReplySubject is the NATS reply subject on the Message, if any.

func (*Message) Stream

func (m *Message) Stream() string

Stream the Message was received on.

func (*Message) Subject

func (m *Message) Subject() string

Subject is the NATS subject the Message was received on.

func (*Message) Timestamp

func (m *Message) Timestamp() time.Time

Timestamp is the time the Message was received by the server.

func (*Message) Value

func (m *Message) Value() []byte

Value is the Message payload.

type MessageOption

type MessageOption func(*MessageOptions)

MessageOption is a function on the MessageOptions for a Message. These are used to configure particular optional Message fields.

func AckInbox

func AckInbox(ackInbox string) MessageOption

AckInbox is a MessageOption to set the NATS subject Liftbridge should publish the Message ack to. If this is not set, the server will generate a random inbox.

func AckPolicyAll

func AckPolicyAll() MessageOption

AckPolicyAll is a MessageOption that sets the AckPolicy of the Message to ALL. This means the Message ack will be sent when the message has been written to all replicas.

func AckPolicyLeader

func AckPolicyLeader() MessageOption

AckPolicyLeader is a MessageOption that sets the AckPolicy of the Message to LEADER. This means the Message ack will be sent when the stream leader has written it to its write-ahead log.

func AckPolicyNone

func AckPolicyNone() MessageOption

AckPolicyNone is a MessageOption that sets the AckPolicy of the Message to NONE. This means no ack will be sent.

func CorrelationID

func CorrelationID(correlationID string) MessageOption

CorrelationID is a MessageOption to set the identifier used to correlate an ack with the published Message. If this is not set, the ack will not have a correlation id.

func Header(name string, value []byte) MessageOption

Header is a MessageOption that adds a single header to the Message. This may overwrite previously set headers.

func Headers

func Headers(headers map[string][]byte) MessageOption

Headers is a MessageOption that adds a set of headers to the Message. This may overwrite previously set headers.

func Key

func Key(key []byte) MessageOption

Key is a MessageOption to set the key on a Message. If Liftbridge has stream compaction enabled, the stream will retain only the last value for each key.

func PartitionBy

func PartitionBy(partitioner Partitioner) MessageOption

PartitionBy is a MessageOption that specifies a Partitioner used to map Messages to stream partitions.

func PartitionByKey

func PartitionByKey() MessageOption

PartitionByKey is a MessageOption that maps Messages to stream partitions based on a hash of the Message key. This computes the partition number for a given message by hashing the key and modding by the number of partitions for the first stream found with the subject of the published message. This does not work with streams containing wildcards in their subjects, e.g. "foo.*", since this matches on the subject literal of the published message. This also has undefined behavior if there are multiple streams for the given subject.

func PartitionByRoundRobin

func PartitionByRoundRobin() MessageOption

PartitionByRoundRobin is a MessageOption that maps Messages to stream partitions in a round-robin fashion. This computes the partition number for a given message by atomically incrementing a counter for the message subject and modding by the number of partitions for the first stream found with the subject. This does not work with streams containing wildcards in their subjects, e.g. "foo.*", since this matches on the subject literal of the published message. This also has undefined behavior if there are multiple streams for the given subject.

func ToPartition

func ToPartition(partition int32) MessageOption

ToPartition is a MessageOption that specifies the stream partition to publish the Message to. If this is set, any Partitioner will not be used.

type MessageOptions

type MessageOptions struct {
	// Key to set on the Message. If Liftbridge has stream compaction enabled,
	// the stream will retain only the last value for each key.
	Key []byte

	// AckInbox sets the NATS subject Liftbridge should publish the Message ack
	// to. If this is not set, the server will generate a random inbox.
	AckInbox string

	// CorrelationID sets the identifier used to correlate an ack with the
	// published Message. If this is not set, the ack will not have a
	// correlation id.
	CorrelationID string

	// AckPolicy controls the behavior of Message acks sent by the server. By
	// default, Liftbridge will send an ack when the stream leader has written
	// the Message to its write-ahead log.
	AckPolicy AckPolicy

	// Headers are key-value pairs to set on the Message.
	Headers map[string][]byte

	// Partitioner specifies the strategy for mapping a Message to a stream
	// partition.
	Partitioner Partitioner

	// Partition specifies the stream partition to publish the Message to. If
	// this is set, any Partitioner will not be used. This is a pointer to
	// allow distinguishing between unset and 0.
	Partition *int32
}

MessageOptions are used to configure optional settings for a Message.

type Metadata

type Metadata struct {
	// contains filtered or unexported fields
}

Metadata contains an immutable snapshot of information for a cluster and subset of streams.

func (*Metadata) Addrs

func (m *Metadata) Addrs() []string

Addrs returns the list of known broker addresses.

func (*Metadata) Brokers

func (m *Metadata) Brokers() []*BrokerInfo

Brokers returns a list of the cluster nodes.

func (*Metadata) GetStream

func (m *Metadata) GetStream(name string) *StreamInfo

GetStream returns the given stream or nil if unknown.

func (*Metadata) GetStreams

func (m *Metadata) GetStreams() map[string]*StreamInfo

GetStreams returns a map of stream names to streams

func (*Metadata) LastUpdated

func (m *Metadata) LastUpdated() time.Time

LastUpdated returns the time when this metadata was last updated from the server.

func (*Metadata) PartitionCountForStream

func (m *Metadata) PartitionCountForStream(stream string) int32

PartitionCountForStream returns the number of partitions for the given stream.

type PartitionInfo

type PartitionInfo struct {
	// contains filtered or unexported fields
}

PartitionInfo contains information for a Liftbridge stream partition.

func (*PartitionInfo) ID

func (p *PartitionInfo) ID() int32

ID of the partition.

func (*PartitionInfo) ISR

func (p *PartitionInfo) ISR() []*BrokerInfo

ISR returns the list of replicas currently in the in-sync replica set.

func (*PartitionInfo) Leader

func (p *PartitionInfo) Leader() *BrokerInfo

Leader returns the broker acting as leader for this partition or nil if there is no leader.

func (*PartitionInfo) Replicas

func (p *PartitionInfo) Replicas() []*BrokerInfo

Replicas returns the list of brokers replicating the partition.

type Partitioner

type Partitioner interface {
	// Partition computes the partition number for a given message.
	Partition(stream string, key, value []byte, metadata *Metadata) int32
}

Partitioner is used to map a message to a stream partition.

type PauseOption

type PauseOption func(*PauseOptions) error

PauseOption is a function on the PauseOptions for a pause call. These are used to configure particular pausing options.

func PausePartitions

func PausePartitions(partitions ...int32) PauseOption

PausePartitions sets the list of partition to pause or all of them if nil/empty.

func ResumeAll

func ResumeAll() PauseOption

ResumeAll will resume all partitions in the stream if one of them is published to instead of resuming only that partition.

type PauseOptions

type PauseOptions struct {
	// Partitions sets the list of partitions to pause or all of them if
	// nil/empty.
	Partitions []int32

	// ResumeAll will resume all partitions in the stream if one of them is
	// published to instead of resuming only that partition.
	ResumeAll bool
}

PauseOptions are used to setup stream pausing.

type ReadonlyOption

type ReadonlyOption func(*ReadonlyOptions) error

ReadonlyOption is a function on the ReadonlyOptions for a set readonly call. These are used to configure particular set readonly options.

func Readonly

func Readonly(readonly bool) ReadonlyOption

Readonly defines if the partitions should be set to readonly or to readwrite.

func ReadonlyPartitions

func ReadonlyPartitions(partitions ...int32) ReadonlyOption

ReadonlyPartitions sets the list of partition on which to set the readonly flag or all of them if nil/empty.

type ReadonlyOptions

type ReadonlyOptions struct {
	// Partitions sets the list of partitions on which to set the readonly flag
	// or all of them if nil/empty.
	Partitions []int32

	// Readwrite defines if the partitions should be set to readonly (false) or
	// to readwrite (true). This field is called readwrite and not readonly so
	// that the default value corresponds to "enable readonly".
	Readwrite bool
}

ReadonlyOptions are used to setup stream readonly operations.

type StartPosition

type StartPosition int32

StartPosition controls where to begin consuming in a stream.

type StreamInfo

type StreamInfo struct {
	// contains filtered or unexported fields
}

StreamInfo contains information for a Liftbridge stream.

func (*StreamInfo) GetPartition

func (s *StreamInfo) GetPartition(id int32) *PartitionInfo

GetPartition returns the partition info for the given partition id or nil if no such partition exists.

func (*StreamInfo) Partitions

func (s *StreamInfo) Partitions() map[int32]*PartitionInfo

Partitions returns a map containing partition IDs and partitions for the stream.

type StreamOption

type StreamOption func(*StreamOptions) error

StreamOption is a function on the StreamOptions for a stream. These are used to configure particular stream options.

func CleanerInterval

func CleanerInterval(val time.Duration) StreamOption

CleanerInterval sets the value of the cleaner.interval configuration for the stream. This controls the frequency to check if a new stream log segment file should be rolled and whether any segments are eligible for deletion based on the retention policy or compaction if enabled. If this is not set, it uses the server default value.

func CompactEnabled

func CompactEnabled(val bool) StreamOption

CompactEnabled sets the value of the compact.enabled configuration for the stream. This controls the activation of stream log compaction. If this is not set, it uses the server default value.

func CompactMaxGoroutines

func CompactMaxGoroutines(val int32) StreamOption

CompactMaxGoroutines sets the value of the compact.max.goroutines configuration for the stream. This controls the maximum number of concurrent goroutines to use for compaction on a stream log (only applicable if compact.enabled is true). If this is not set, it uses the server default value.

func Group

func Group(group string) StreamOption

Group is a StreamOption to set the load-balance group for a stream. When there are multiple streams in the same group, messages will be balanced among them.

func MaxReplication

func MaxReplication() StreamOption

MaxReplication is a StreamOption to set the stream replication factor equal to the current number of servers in the cluster.

func Partitions

func Partitions(partitions int32) StreamOption

Partitions is a StreamOption to set the number of partitions for a stream. Partitions are ordered, replicated, and durably stored on disk and serve as the unit of storage and parallelism for a stream. A partitioned stream for NATS subject "foo.bar" with three partitions internally maps to the NATS subjects "foo.bar", "foo.bar.1", and "foo.bar.2". A single partition would map to "foo.bar" to match behavior of an "un-partitioned" stream. If this is not set, it defaults to 1.

func ReplicationFactor

func ReplicationFactor(replicationFactor int32) StreamOption

ReplicationFactor is a StreamOption to set the replication factor for a stream. The replication factor controls the number of servers to replicate a stream to. E.g. a value of 1 would mean only 1 server would have the data, and a value of 3 would be 3 servers would have it. If this is not set, it defaults to 1. A value of -1 will signal to the server to set the replication factor equal to the current number of servers in the cluster.

func RetentionMaxAge

func RetentionMaxAge(val time.Duration) StreamOption

RetentionMaxAge sets the value of the retention.max.age configuration for the stream. This controls the TTL for stream log segment files, after which they are deleted. A value of 0 indicates no TTL. If this is not set, it uses the server default value.

func RetentionMaxBytes

func RetentionMaxBytes(val int64) StreamOption

RetentionMaxBytes sets the value of the retention.max.bytes configuration for the stream. This controls the maximum size a stream's log can grow to, in bytes, before we will discard old log segments to free up space. A value of 0 indicates no limit. If this is not set, it uses the server default value.

func RetentionMaxMessages

func RetentionMaxMessages(val int64) StreamOption

RetentionMaxMessages sets the value of the retention.max.messages configuration for the stream. This controls the maximum size a stream's log can grow to, in number of messages, before we will discard old log segments to free up space. A value of 0 indicates no limit. If this is not set, it uses the server default value.

func SegmentMaxAge

func SegmentMaxAge(val time.Duration) StreamOption

SegmentMaxAge sets the value of the segment.max.age configuration for the stream. Thia controls the maximum time before a new stream log segment is rolled out. A value of 0 means new segments will only be rolled when segment.max.bytes is reached. Retention is always done a file at a time, so a larger value means fewer files but less granular control over retention. If this is not set, it uses the server default value.

func SegmentMaxBytes

func SegmentMaxBytes(val int64) StreamOption

SegmentMaxBytes sets the value of the segment.max.bytes configuration for the stream. This controls the maximum size of a single stream log segment file in bytes. Retention is always done a file at a time, so a larger segment size means fewer files but less granular control over retention. If this is not set, it uses the server default value.

type StreamOptions

type StreamOptions struct {
	// Group is the name of a load-balance group. When there are multiple
	// streams in the same group, messages will be balanced among them.
	Group string

	// ReplicationFactor controls the number of servers to replicate a stream
	// to. E.g. a value of 1 would mean only 1 server would have the data, and
	// a value of 3 would be 3 servers would have it. If this is not set, it
	// defaults to 1. A value of -1 will signal to the server to set the
	// replication factor equal to the current number of servers in the
	// cluster.
	ReplicationFactor int32

	// Partitions determines how many partitions to create for a stream. If 0,
	// this will behave as a stream with a single partition. If this is not
	// set, it defaults to 1.
	Partitions int32

	// The maximum size a stream's log can grow to, in bytes, before we will
	// discard old log segments to free up space. A value of 0 indicates no
	// limit. If this is not set, it uses the server default value.
	RetentionMaxBytes *int64

	// The maximum size a stream's log can grow to, in number of messages,
	// before we will discard old log segments to free up space. A value of 0
	// indicates no limit. If this is not set, it uses the server default
	// value.
	RetentionMaxMessages *int64

	// The TTL for stream log segment files, after which they are deleted. A
	// value of 0 indicates no TTL. If this is not set, it uses the server
	// default value.
	RetentionMaxAge *time.Duration

	// The frequency to check if a new stream log segment file should be rolled
	// and whether any segments are eligible for deletion based on the
	// retention policy or compaction if enabled. If this is not set, it uses
	// the server default value.
	CleanerInterval *time.Duration

	// The maximum size of a single stream log segment file in bytes. Retention
	// is always done a file at a time, so a larger segment size means fewer
	// files but less granular control over retention. If this is not set, it
	// uses the server default value.
	SegmentMaxBytes *int64

	// The maximum time before a new stream log segment is rolled out. A value
	// of 0 means new segments will only be rolled when segment.max.bytes is
	// reached. Retention is always done a file at a time, so a larger value
	// means fewer files but less granular control over retention. If this is
	// not set, it uses the server default value.
	SegmentMaxAge *time.Duration

	// The maximum number of concurrent goroutines to use for compaction on a
	// stream log (only applicable if compact.enabled is true). If this is not
	// set, it uses the server default value.
	CompactMaxGoroutines *int32

	// CompactEnabled controls the activation of stream log compaction. If this
	// is not set, it uses the server default value.
	CompactEnabled *bool
}

StreamOptions are used to configure new streams.

type SubscriptionOption

type SubscriptionOption func(*SubscriptionOptions) error

SubscriptionOption is a function on the SubscriptionOptions for a subscription. These are used to configure particular subscription options.

func Partition

func Partition(partition int32) SubscriptionOption

Partition specifies the stream partition to consume. If not set, this defaults to 0.

func ReadISRReplica

func ReadISRReplica() SubscriptionOption

ReadISRReplica sets read replica option. If true, the client will request subscription from an random ISR replica instead of subscribing explicitly to partition's leader. As a random ISR replica is given, it may well be the partition's leader itself.

func StartAtEarliestReceived

func StartAtEarliestReceived() SubscriptionOption

StartAtEarliestReceived sets the subscription start position to the earliest message received in the stream.

func StartAtLatestReceived

func StartAtLatestReceived() SubscriptionOption

StartAtLatestReceived sets the subscription start position to the last message received in the stream.

func StartAtOffset

func StartAtOffset(offset int64) SubscriptionOption

StartAtOffset sets the desired start offset to begin consuming from in the stream.

func StartAtTime

func StartAtTime(start time.Time) SubscriptionOption

StartAtTime sets the desired timestamp to begin consuming from in the stream.

func StartAtTimeDelta

func StartAtTimeDelta(ago time.Duration) SubscriptionOption

StartAtTimeDelta sets the desired timestamp to begin consuming from in the stream using a time delta in the past.

type SubscriptionOptions

type SubscriptionOptions struct {
	// StartPosition controls where to begin consuming from in the stream.
	StartPosition StartPosition

	// StartOffset sets the stream offset to begin consuming from.
	StartOffset int64

	// StartTimestamp sets the stream start position to the given timestamp.
	StartTimestamp time.Time

	// Partition sets the stream partition to consume.
	Partition int32

	// ReadISRReplica sets client's ability to subscribe from a random ISR
	ReadISRReplica bool
}

SubscriptionOptions are used to control a subscription's behavior.

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL