Documentation ¶
Index ¶
- Constants
- func DeliverNew() func(*Subscription)
- func InitialLoad(enabled bool) func(*Subscription)
- func Partition(partitions ...int) func(*Subscription)
- func StartSequence(seq uint64) func(*Subscription)
- type Options
- type Subscriber
- func (s *Subscriber) Connect(host string, options *core.Options) error
- func (s *Subscriber) Disconnect() error
- func (s *Subscriber) GetName() string
- func (s *Subscriber) Reset(productName string) error
- func (s *Subscriber) Subscribe(productName string, handler func(*nats.Msg), opts ...SubscriptionOpt) (*Subscription, error)
- type Subscription
- type SubscriptionOpt
Examples ¶
Constants ¶
const ProductEventStream = "GVT_%s_DP_%s"
Variables ¶
This section is empty.
Functions ¶
func DeliverNew ¶
func DeliverNew() func(*Subscription)
DeliverNew configures the Subscriber to begin receiving events from the most recently produced events. This option is used when the Subscriber needs to start receiving the latest data, ignoring previous events.
func InitialLoad ¶
func InitialLoad(enabled bool) func(*Subscription)
InitialLoad determines whether to receive an initial copy of all existing data when first subscribing and interfacing with a Data Product. This option allows the subscriber to get a snapshot of all existing data before continuing to receive real-time data change events. enabled: Set to true to enable receiving the initial data load.
func Partition ¶
func Partition(partitions ...int) func(*Subscription)
Partition specifies the particular partitions of a data product to subscribe to. This option is used to increase parallel computing capabilities by subscribing only to specific partitions of the data divided into 256 parts by Gravity. partitions: A list of partition indices to subscribe to.
func StartSequence ¶
func StartSequence(seq uint64) func(*Subscription)
StartSequence sets the starting sequence number from which the Subscriber begins to receive events. This option is used to specify a particular point in the event sequence to start receiving messages from. seq: The sequence number from which to start receiving events.
Types ¶
type Options ¶
func NewOptions ¶
func NewOptions() *Options
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
func NewSubscriber ¶
func NewSubscriber(name string, options *Options) *Subscriber
func NewSubscriberWithClient ¶
func NewSubscriberWithClient(name string, client *core.Client, options *Options) *Subscriber
func (*Subscriber) Disconnect ¶
func (s *Subscriber) Disconnect() error
func (*Subscriber) GetName ¶
func (s *Subscriber) GetName() string
func (*Subscriber) Reset ¶
func (s *Subscriber) Reset(productName string) error
Reset removes the subscription to a specified product on Gravity. productName: The name of the product to reset the subscription. Returns an error if the reset fails.
func (*Subscriber) Subscribe ¶
func (s *Subscriber) Subscribe(productName string, handler func(*nats.Msg), opts ...SubscriptionOpt) (*Subscription, error)
Subscribe sets up a subscription to a specified product on Gravity. This function allows for message consumption by registering a handler function to process incoming messages. productName: The name of the product to subscribe to. handler: A function that will be called to handle each incoming message. It takes a *nats.Msg as a parameter. opts: A variadic list of options to customize the subscription behavior. Returns a pointer to the Subscription object and an error if the subscription fails.
Example ¶
client := core.NewClient() // Connect to Gravity options := core.NewOptions() err := client.Connect("0.0.0.0:32803", options) if err != nil { panic(err) } // Create adapter connector acOpts := NewOptions() s := NewSubscriberWithClient("sdk_example_subscriber", client, acOpts) // Subscribe to specific data product sub, err := s.Subscribe("sdk_example", func(msg *nats.Msg) { // TODO: event handling }, Partition(-1), StartSequence(0)) if err != nil { panic(err) } defer sub.Close()
Output:
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
func NewSubscription ¶
func NewSubscription(s *Subscriber, domain string, productName string, handler func(*nats.Msg), opts ...SubscriptionOpt) *Subscription
func (*Subscription) Close ¶
func (sub *Subscription) Close() error
Close terminates the subscription and closes the connection associated with it. This function should be called to cleanly shutdown the subscription, ensuring that all resources are released properly. Returns an error if the closing process encounters any issues.
func (*Subscription) Subscribe ¶
func (sub *Subscription) Subscribe() error
Subscribe initiates the subscription process for the specified data product in the Subscription object. This function starts the reception of messages based on the configuration set in the Subscription. It connects to the Gravity service and begins handling incoming messages using the handler function defined earlier. Returns an error if the subscription process fails or if the Subscription is not properly configured.
type SubscriptionOpt ¶
type SubscriptionOpt func(*Subscription)
SubscriptionOpt is a type of function that modifies a Subscription.