subscriber

package
v2.0.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2025 License: Apache-2.0 Imports: 11 Imported by: 1

Documentation

Index

Examples

Constants

View Source
const ProductEventStream = "GVT_%s_DP_%s"

Variables

This section is empty.

Functions

func DeliverNew

func DeliverNew() func(*Subscription)

DeliverNew configures the Subscriber to begin receiving events from the most recently produced events. This option is used when the Subscriber needs to start receiving the latest data, ignoring previous events.

func InitialLoad

func InitialLoad(enabled bool) func(*Subscription)

InitialLoad determines whether to receive an initial copy of all existing data when first subscribing and interfacing with a Data Product. This option allows the subscriber to get a snapshot of all existing data before continuing to receive real-time data change events. enabled: Set to true to enable receiving the initial data load.

func Partition

func Partition(partitions ...int) func(*Subscription)

Partition specifies the particular partitions of a data product to subscribe to. This option is used to increase parallel computing capabilities by subscribing only to specific partitions of the data divided into 256 parts by Gravity. partitions: A list of partition indices to subscribe to.

func StartSequence

func StartSequence(seq uint64) func(*Subscription)

StartSequence sets the starting sequence number from which the Subscriber begins to receive events. This option is used to specify a particular point in the event sequence to start receiving messages from. seq: The sequence number from which to start receiving events.

Types

type Options

type Options struct {
	Domain  string
	Verbose bool
}

func NewOptions

func NewOptions() *Options

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(name string, options *Options) *Subscriber

func NewSubscriberWithClient

func NewSubscriberWithClient(name string, client *core.Client, options *Options) *Subscriber

func (*Subscriber) Connect

func (s *Subscriber) Connect(host string, options *core.Options) error

func (*Subscriber) Disconnect

func (s *Subscriber) Disconnect() error

func (*Subscriber) GetName

func (s *Subscriber) GetName() string

func (*Subscriber) Reset

func (s *Subscriber) Reset(productName string) error

Reset removes the subscription to a specified product on Gravity. productName: The name of the product to reset the subscription. Returns an error if the reset fails.

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(productName string, handler func(*nats.Msg), opts ...SubscriptionOpt) (*Subscription, error)

Subscribe sets up a subscription to a specified product on Gravity. This function allows for message consumption by registering a handler function to process incoming messages. productName: The name of the product to subscribe to. handler: A function that will be called to handle each incoming message. It takes a *nats.Msg as a parameter. opts: A variadic list of options to customize the subscription behavior. Returns a pointer to the Subscription object and an error if the subscription fails.

Example
client := core.NewClient()

// Connect to Gravity
options := core.NewOptions()
err := client.Connect("0.0.0.0:32803", options)
if err != nil {
	panic(err)
}

// Create adapter connector
acOpts := NewOptions()
s := NewSubscriberWithClient("sdk_example_subscriber", client, acOpts)

// Subscribe to specific data product
sub, err := s.Subscribe("sdk_example", func(msg *nats.Msg) {
	// TODO: event handling
}, Partition(-1), StartSequence(0))
if err != nil {
	panic(err)
}

defer sub.Close()
Output:

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

func NewSubscription

func NewSubscription(s *Subscriber, domain string, productName string, handler func(*nats.Msg), opts ...SubscriptionOpt) *Subscription

func (*Subscription) Close

func (sub *Subscription) Close() error

Close terminates the subscription and closes the connection associated with it. This function should be called to cleanly shutdown the subscription, ensuring that all resources are released properly. Returns an error if the closing process encounters any issues.

func (*Subscription) Subscribe

func (sub *Subscription) Subscribe() error

Subscribe initiates the subscription process for the specified data product in the Subscription object. This function starts the reception of messages based on the configuration set in the Subscription. It connects to the Gravity service and begins handling incoming messages using the handler function defined earlier. Returns an error if the subscription process fails or if the Subscription is not properly configured.

type SubscriptionOpt

type SubscriptionOpt func(*Subscription)

SubscriptionOpt is a type of function that modifies a Subscription.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL