Documentation
¶
Overview ¶
Package amqpextra provides Dialer for dialing in case the connection lost.
Index ¶
- func Dial(opts ...Option) (*amqp.Connection, error)
- func NewConsumer(connCh <-chan *Connection, opts ...consumer.Option) (*consumer.Consumer, error)
- func NewPublisher(connCh <-chan *Connection, opts ...publisher.Option) (*publisher.Publisher, error)
- type AMQPConnection
- type Connection
- type Dialer
- func (c *Dialer) Close()
- func (c *Dialer) Connection(ctx context.Context) (*amqp.Connection, error)
- func (c *Dialer) ConnectionCh() <-chan *Connection
- func (c *Dialer) Consumer(opts ...consumer.Option) (*consumer.Consumer, error)
- func (c *Dialer) Notify(stateCh chan State) <-chan State
- func (c *Dialer) NotifyClosed() <-chan struct{}
- func (c *Dialer) Publisher(opts ...publisher.Option) (*publisher.Publisher, error)
- type Option
- func WithAMQPDial(dial func(url string, c amqp.Config) (AMQPConnection, error)) Option
- func WithConnectionProperties(props amqp.Table) Option
- func WithContext(ctx context.Context) Option
- func WithLogger(l logger.Logger) Option
- func WithNotify(stateCh chan State) Option
- func WithRetryPeriod(dur time.Duration) Option
- func WithTLS(tlsConfig *tls.Config) Option
- func WithURL(urls ...string) Option
- type Ready
- type State
- type Unready
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Dial ¶
func Dial(opts ...Option) (*amqp.Connection, error)
Dial returns established connection or an error. It keeps retrying until timeout 30sec is reached.
func NewConsumer ¶
Example ¶
package main import ( "context" "log" "github.com/makasim/amqpextra" "github.com/makasim/amqpextra/consumer" amqp "github.com/rabbitmq/amqp091-go" ) func main() { // you can get connCh from dialer.ConnectionCh() method var connCh chan *amqpextra.Connection h := consumer.HandlerFunc( func(ctx context.Context, msg amqp.Delivery) interface{} { // process message msg.Ack(false) return nil }) // create consumer c, err := amqpextra.NewConsumer( connCh, consumer.WithHandler(h), consumer.WithQueue("a_queue"), ) if err != nil { log.Fatal(err) } // close consumer c.Close() <-c.NotifyClosed() }
Output:
func NewPublisher ¶
func NewPublisher( connCh <-chan *Connection, opts ...publisher.Option, ) (*publisher.Publisher, error)
Example ¶
package main import ( "log" "github.com/makasim/amqpextra" "github.com/makasim/amqpextra/publisher" amqp "github.com/rabbitmq/amqp091-go" ) func main() { // you can get readyCh from dialer.ConnectionCh() method var connCh chan *amqpextra.Connection // create publisher p, err := amqpextra.NewPublisher(connCh) if err != nil { log.Fatal(err) } // publish a message go p.Publish(publisher.Message{ Key: "test_queue", Publishing: amqp.Publishing{ Body: []byte(`{"foo": "fooVal"}`), }, }) // close publisher p.Close() <-p.NotifyClosed() }
Output:
Types ¶
type AMQPConnection ¶ added in v0.15.0
AMQPConnection is an interface for streadway's *amqp.Connection
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection provides access to streadway's *amqp.Connection as well as notification channels A notification indicates that something wrong has happened to the connection. The client should get a fresh connection from Dialer.
func (*Connection) AMQPConnection ¶ added in v0.15.0
func (c *Connection) AMQPConnection() *amqp.Connection
AMQPConnection returns streadway's *amqp.Connection
func (*Connection) NotifyLost ¶ added in v0.15.0
func (c *Connection) NotifyLost() chan struct{}
NotifyLost notifies when current connection is lost and new once should be requested
type Dialer ¶
type Dialer struct {
// contains filtered or unexported fields
}
Dialer is responsible for keeping the connection up. If connection is lost or closed. It tries dial a server again and again with some wait periods. Dialer keep connection up until it Dialer.Close() method called or the context is canceled.
func (*Dialer) Close ¶ added in v0.15.0
func (c *Dialer) Close()
Close initiate Dialer close. Subscribe Dialer.NotifyClosed() to know when it was finally closed.
func (*Dialer) Connection ¶ added in v0.15.0
Connection returns streadway's *amqp.Connection. The client should subscribe on Dialer.NotifyReady(), Dialer.NotifyUnready() events in order to know when the connection is lost.
func (*Dialer) ConnectionCh ¶ added in v0.15.0
func (c *Dialer) ConnectionCh() <-chan *Connection
ConnectionCh returns Connection channel. The channel should be used to get established connections. The client must subscribe on Connection.NotifyLost(). Once lost, client must stop using current connection and get new one from Connection channel. Connection channel is closed when Dialer is closed. Don't forget to check for closed connection.
Example ¶
nolint:gosimple // the purpose of select case is to stress the connCh close case.
package main import ( "log" "time" "github.com/makasim/amqpextra" ) func main() { dialer, err := amqpextra.NewDialer(amqpextra.WithURL("amqp://guest:guest@localhost:5672/%2f")) if err != nil { log.Fatal(err) } connCh := dialer.ConnectionCh() go func() { for { select { case conn, ok := <-connCh: if !ok { // connection is permanently closed return } <-conn.NotifyLost() } } }() time.Sleep(time.Second) dialer.Close() }
Output:
func (*Dialer) Consumer ¶ added in v0.15.0
Consumer returns a consumer that support reconnection feature.
Example ¶
package main import ( "context" "github.com/makasim/amqpextra" "github.com/makasim/amqpextra/consumer" amqp "github.com/rabbitmq/amqp091-go" ) func main() { // open connection d, _ := amqpextra.NewDialer(amqpextra.WithURL("amqp://guest:guest@localhost:5672/%2f")) h := consumer.HandlerFunc(func(ctx context.Context, msg amqp.Delivery) interface{} { // process message msg.Ack(false) return nil }) c, _ := d.Consumer( consumer.WithQueue("a_queue"), consumer.WithHandler(h), ) // close consumer c.Close() // close dialer d.Close() }
Output:
func (*Dialer) Notify ¶ added in v0.16.0
Notify could be used to subscribe on Dialer ready/unready events
func (*Dialer) NotifyClosed ¶ added in v0.15.0
func (c *Dialer) NotifyClosed() <-chan struct{}
NotifyClosed could be used to subscribe on Dialer closed event. Dialer.ConnectionCh() could no longer be used after this point
func (*Dialer) Publisher ¶ added in v0.15.0
Publisher returns a consumer that support reconnection feature.
Example ¶
package main import ( "context" "time" "github.com/makasim/amqpextra" "github.com/makasim/amqpextra/publisher" amqp "github.com/rabbitmq/amqp091-go" ) func main() { // open connection d, _ := amqpextra.NewDialer(amqpextra.WithURL("amqp://guest:guest@localhost:5672/%2f")) // create publisher p, _ := d.Publisher() ctx, cancelFunc := context.WithTimeout(context.Background(), time.Millisecond*100) defer cancelFunc() // publish a message p.Publish(publisher.Message{ Key: "test_queue", Context: ctx, Publishing: amqp.Publishing{ Body: []byte(`{"foo": "fooVal"}`), }, }) // close publisher p.Close() // close connection d.Close() }
Output:
type Option ¶ added in v0.15.0
type Option func(c *Dialer)
Option could be used to configure Dialer
func WithAMQPDial ¶ added in v0.15.0
WithAMQPDial configure dial function. The function takes the url and amqp.Config and returns AMQPConnection.
func WithConnectionProperties ¶ added in v0.15.0
WithConnectionProperties configure connection properties set on dial.
func WithContext ¶ added in v0.15.0
WithLogger configure Dialer context The context could used later to stop Dialer
func WithLogger ¶ added in v0.15.0
WithLogger configure the logger used by Dialer
func WithNotify ¶ added in v0.16.0
WithNotify helps subscribe on Dialer ready/unready events.
func WithRetryPeriod ¶ added in v0.15.0
WithRetryPeriod configure how much time to wait before next dial attempt. Default: 5sec.
Directories
¶
Path | Synopsis |
---|---|
mock_consumer
Package mock_consumer is a generated GoMock package.
|
Package mock_consumer is a generated GoMock package. |
e2e_test
|
|
logrus
module
|
|
Package mock_amqpextra is a generated GoMock package.
|
Package mock_amqpextra is a generated GoMock package. |
mock_publisher
Package mock_publisher is a generated GoMock package.
|
Package mock_publisher is a generated GoMock package. |