Documentation ¶
Overview ¶
Example (OpenSubscriptionFromURL) ¶
package main import ( "context" "log" "github.com/sraphs/gdk/pubsub" ) func main() { // PRAGMA: This example is used on github.com/sraphs/gdk; PRAGMA comments adjust how it is shown and can be ignored. // PRAGMA: On github.com/sraphs/gdk, add a blank import: _ "github.com/sraphs/gdk/pubsub/pulsarpubsub" // PRAGMA: On github.com/sraphs/gdk, hide lines until the next blank line. ctx := context.Background() // pubsub.OpenSubscription creates a *pubsub.Subscription from a URL. // The host + path are used as the consumer group name. // The "topic" query parameter sets one or more topics to subscribe to. // The set of brokers must be in an environment variable KAFKA_BROKERS. subscription, err := pubsub.OpenSubscription(ctx, "pulsar://my-sub?topic=my-topic") if err != nil { log.Fatal(err) } defer subscription.Shutdown(ctx) }
Output:
Example (OpenTopicFromURL) ¶
package main import ( "context" "log" "github.com/sraphs/gdk/pubsub" ) func main() { // PRAGMA: This example is used on github.com/sraphs/gdk; PRAGMA comments adjust how it is shown and can be ignored. // PRAGMA: On github.com/sraphs/gdk, add a blank import: _ "github.com/sraphs/gdk/pubsub/pulsarpubsub" // PRAGMA: On github.com/sraphs/gdk, hide lines until the next blank line. ctx := context.Background() // pubsub.OpenTopic creates a *pubsub.Topic from a URL. // The host + path are the topic name to send to. // The set of brokers must be in an environment variable KAFKA_BROKERS. topic, err := pubsub.OpenTopic(ctx, "pulsar://my-topic") if err != nil { log.Fatal(err) } defer topic.Shutdown(ctx) }
Output:
Index ¶
- Constants
- func MinimalConfig(url string) pulsar.ClientOptions
- func OpenSubscription(client pulsar.Client, opts *SubscriptionOptions) (*pubsub.Subscription, error)
- func OpenTopic(client pulsar.Client, opts *TopicOptions) (*pubsub.Topic, error)
- type SubscriptionOptions
- type TopicOptions
- type URLOpener
Examples ¶
Constants ¶
const Scheme = "pulsar"
Scheme is the URL scheme pulsarpubsub registers its URLOpeners under on pubsub.DefaultMux.
Variables ¶
This section is empty.
Functions ¶
func MinimalConfig ¶
func MinimalConfig(url string) pulsar.ClientOptions
MinimalConfig returns a minimal pulsar.ClientOptions.
func OpenSubscription ¶
func OpenSubscription(client pulsar.Client, opts *SubscriptionOptions) (*pubsub.Subscription, error)
OpenSubscription returns a *pubsub.Subscription representing a Redis Subscribe. The topicName is the Pulsar Channel to subscribe to; for more info, see https://pulsar.apache.org/docs/next/concepts-topic-compaction.
Example ¶
package main import ( "context" "log" "github.com/apache/pulsar-client-go/pulsar" "github.com/sraphs/gdk/pubsub/pulsarpubsub" ) func main() { // PRAGMA: This example is used on github.com/sraphs/gdk; PRAGMA comments adjust how it is shown and can be ignored. // PRAGMA: On github.com/sraphs/gdk, hide lines until the next blank line. ctx := context.Background() localPulsarURL := "pulsar://localhost:6650" config := pulsarpubsub.MinimalConfig(localPulsarURL) client, err := pulsar.NewClient(config) if err != nil { log.Fatal(err) } // Construct a *pubsub.Subscription, use the SubscriptionName "my-sub" // and receiving messages from "my-topic". subscription, err := pulsarpubsub.OpenSubscription(client, &pulsarpubsub.SubscriptionOptions{ ConsumerOptions: pulsar.ConsumerOptions{ Topic: "my-topic", SubscriptionName: "my-sub", }, KeyName: "", }) if err != nil { log.Fatal(err) } defer subscription.Shutdown(ctx) }
Output:
func OpenTopic ¶
OpenTopic returns a *pubsub.Topic for use with Redis. The channel is the Redis Chanel; for more info, see https://redis.io/commands/pubsub-channels.
Example ¶
package main import ( "context" "log" "github.com/apache/pulsar-client-go/pulsar" "github.com/sraphs/gdk/pubsub/pulsarpubsub" ) func main() { // PRAGMA: This example is used on github.com/sraphs/gdk; PRAGMA comments adjust how it is shown and can be ignored. // PRAGMA: On github.com/sraphs/gdk, hide lines until the next blank line. ctx := context.Background() localPulsarURL := "pulsar://localhost:6650" config := pulsarpubsub.MinimalConfig(localPulsarURL) client, err := pulsar.NewClient(config) if err != nil { log.Fatal(err) } // Construct a *pubsub.Topic. topic, err := pulsarpubsub.OpenTopic(client, &pulsarpubsub.TopicOptions{ ProducerOptions: pulsar.ProducerOptions{ Topic: "my-topic", }, KeyName: "", }) if err != nil { log.Fatal(err) } defer topic.Shutdown(ctx) }
Output:
Types ¶
type SubscriptionOptions ¶
type SubscriptionOptions struct { pulsar.ConsumerOptions KeyName string }
SubscriptionOptions sets options for constructing a *pubsub.Subscription backed by Pulsar.
type TopicOptions ¶
type TopicOptions struct { pulsar.ProducerOptions KeyName string }
TopicOptions sets options for constructing a *pubsub.Topic backed by Pulsar.
type URLOpener ¶
type URLOpener struct { // Client to use for communication with the server. Client pulsar.Client // TopicOptions specifies the options to pass to OpenTopic. TopicOptions TopicOptions // SubscriptionOptions specifies the options to pass to OpenSubscription. SubscriptionOptions SubscriptionOptions }
URLOpener opens Redis URLs like "redis://my-topic".
The URL host+path is used as the subject.
No query parameters are supported.
func (*URLOpener) OpenSubscriptionURL ¶
func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)
OpenSubscriptionURL opens a pubsub.Subscription based on u.