rabbitpubsub

package module
v0.40.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2024 License: Apache-2.0 Imports: 15 Imported by: 64

Documentation

Overview

Package rabbitpubsub provides an pubsub implementation for RabbitMQ. Use OpenTopic to construct a *pubsub.Topic, and/or OpenSubscription to construct a *pubsub.Subscription.

RabbitMQ follows the AMQP specification, which uses different terminology than the Go CDK Pub/Sub.

A Pub/Sub topic is an AMQP exchange. The exchange kind should be "fanout" to match the Pub/Sub model, although publishing will work with any kind of exchange.

A Pub/Sub subscription is an AMQP queue. The queue should be bound to the exchange that is the topic of the subscription. See the package example for details.

URLs

For pubsub.OpenTopic and pubsub.OpenSubscription, rabbitpubsub registers for the scheme "rabbit". The default URL opener will connect to a default server based on the environment variable "RABBIT_SERVER_URL". To customize the URL opener, or for more details on the URL format, see URLOpener. See https://gocloud.dev/concepts/urls/ for background information.

Message Delivery Semantics

RabbitMQ supports at-least-once semantics; applications must call Message.Ack after processing a message, or it will be redelivered. See https://godoc.org/gocloud.dev/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more background.

As

rabbitpubsub exposes the following types for As:

  • Topic: *amqp.Connection
  • Subscription: *amqp.Connection
  • Message.BeforeSend: *amqp.Publishing
  • Message.AfterSend: None
  • Message: amqp.Delivery
  • Error: *amqp.Error and MultiError
Example (OpenSubscriptionFromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/rabbitpubsub"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
	// This URL will Dial the RabbitMQ server at the URL in the environment
	// variable RABBIT_SERVER_URL and open the queue "myqueue".
	subscription, err := pubsub.OpenSubscription(ctx, "rabbit://myqueue")
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}
Output:

Example (OpenTopicFromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/rabbitpubsub"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
	// This URL will Dial the RabbitMQ server at the URL in the environment
	// variable RABBIT_SERVER_URL and open the exchange "myexchange".
	topic, err := pubsub.OpenTopic(ctx, "rabbit://myexchange")
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Output:

Index

Examples

Constants

View Source
const Scheme = "rabbit"

Scheme is the URL scheme rabbitpubsub registers its URLOpeners under on pubsub.DefaultMux.

Variables

This section is empty.

Functions

func OpenSubscription

func OpenSubscription(conn *amqp.Connection, name string, opts *SubscriptionOptions) *pubsub.Subscription

OpenSubscription returns a *pubsub.Subscription corresponding to the named queue. See the package documentation for an example.

The queue must have been previously created (for instance, by using amqp.Channel.QueueDeclare) and bound to an exchange.

OpenSubscription uses the supplied amqp.Connection for all communication. It is the caller's responsibility to establish this connection before calling OpenSubscription and to close it when Close has been called on all Subscriptions opened with it.

The documentation of the amqp package recommends using separate connections for publishing and subscribing.

Example
package main

import (
	"context"
	"log"

	amqp "github.com/rabbitmq/amqp091-go"
	"gocloud.dev/pubsub/rabbitpubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	rabbitConn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		log.Fatal(err)
	}
	defer rabbitConn.Close()
	subscription := rabbitpubsub.OpenSubscription(rabbitConn, "myqueue", nil)
	defer subscription.Shutdown(ctx)
}
Output:

func OpenTopic

func OpenTopic(conn *amqp.Connection, name string, opts *TopicOptions) *pubsub.Topic

OpenTopic returns a *pubsub.Topic corresponding to the named exchange. See the package documentation for an example.

The exchange should already exist (for instance, by using amqp.Channel.ExchangeDeclare), although this won't be checked until the first call to SendBatch. For the Go CDK Pub/Sub model to make sense, the exchange should be a fanout exchange, although nothing in this package enforces that.

OpenTopic uses the supplied amqp.Connection for all communication. It is the caller's responsibility to establish this connection before calling OpenTopic, and to close it when Close has been called on all Topics opened with it.

The documentation of the amqp package recommends using separate connections for publishing and subscribing.

Example
package main

import (
	"context"
	"log"

	amqp "github.com/rabbitmq/amqp091-go"
	"gocloud.dev/pubsub/rabbitpubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	rabbitConn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		log.Fatal(err)
	}
	defer rabbitConn.Close()
	topic := rabbitpubsub.OpenTopic(rabbitConn, "myexchange", nil)
	defer topic.Shutdown(ctx)
}
Output:

Types

type MultiError

type MultiError []error

A MultiError is an error that contains multiple errors.

func (MultiError) Error

func (m MultiError) Error() string

type SubscriptionOptions

type SubscriptionOptions struct {
	// KeyName optionally sets the Message.Metadata key in which to store the
	// RabbitMQ message key. If set, and if the RabbitMQ message key is non-empty,
	// the key value will be stored in Message.Metadata under KeyName.
	KeyName string

	// Qos property prefetch count. Optional.
	PrefetchCount *int
}

SubscriptionOptions sets options for constructing a *pubsub.Subscription backed by RabbitMQ.

type TopicOptions

type TopicOptions struct {
	// KeyName optionally sets the Message.Metadata key to use as the optional
	// RabbitMQ message key. If set, and if a matching Message.Metadata key is found,
	// the value for that key will be used as the routing key when sending to
	// RabbitMQ, instead of being added to the message headers.
	KeyName string
}

TopicOptions sets options for constructing a *pubsub.Topic backed by RabbitMQ.

type URLOpener

type URLOpener struct {
	// Connection to use for communication with the server.
	Connection *amqp.Connection

	// TopicOptions specifies the options to pass to OpenTopic.
	TopicOptions TopicOptions
	// SubscriptionOptions specifies the options to pass to OpenSubscription.
	SubscriptionOptions SubscriptionOptions
}

URLOpener opens RabbitMQ URLs like "rabbit://myexchange" for topics or "rabbit://myqueue" for subscriptions.

For topics, the URL's host+path is used as the exchange name.

For subscriptions, the URL's host+path is used as the queue name.

An optional query string can be used to set the Qos consumer prefetch on subscriptions like "rabbit://myqueue?prefetch_count=1000" to set the consumer prefetch count to 1000 see also https://www.rabbitmq.com/docs/consumer-prefetch

func (*URLOpener) OpenSubscriptionURL

func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)

OpenSubscriptionURL opens a pubsub.Subscription based on u.

func (*URLOpener) OpenTopicURL

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on u.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL