stomp

package module
v3.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2024 License: Apache-2.0 Imports: 12 Imported by: 78

README

stomp

Go language implementation of a STOMP client library.

Build Status Go Reference

Features:

  • Supports STOMP Specifications Versions 1.0, 1.1, 1.2 (https://stomp.github.io/)
  • Protocol negotiation to select the latest mutually supported protocol
  • Heart beating for testing the underlying network connection
  • Tested against RabbitMQ v3.0.1

Usage Instructions

go get github.com/go-stomp/stomp/v3

For API documentation, see https://pkg.go.dev/github.com/go-stomp/stomp/v3

Breaking changes between this previous version and the current version are documented in breaking_changes.md.

License

Copyright 2012 - Present The go-stomp authors

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Documentation

Overview

Package stomp provides operations that allow communication with a message broker that supports the STOMP protocol. STOMP is the Streaming Text-Oriented Messaging Protocol. See http://stomp.github.com/ for more details.

This package provides support for all STOMP protocol features in the STOMP protocol specifications, versions 1.0, 1.1 and 1.2. These features including protocol negotiation, heart-beating, value encoding, and graceful shutdown.

Connecting to a STOMP server is achieved using the stomp.Dial function, or the stomp.Connect function. See the examples section for a summary of how to use these functions. Both functions return a stomp.Conn object for subsequent interaction with the STOMP server.

Once a connection (stomp.Conn) is created, it can be used to send messages to the STOMP server, or create subscriptions for receiving messages from the STOMP server. Transactions can be created to send multiple messages and/ or acknowledge multiple received messages from the server in one, atomic transaction. The examples section has examples of using subscriptions and transactions.

The client program can instruct the stomp.Conn to gracefully disconnect from the STOMP server using the Disconnect method. This will perform a graceful shutdown sequence as specified in the STOMP specification.

Source code and other details for the project are available at GitHub:

https://github.com/go-stomp/stomp

Index

Examples

Constants

View Source
const DefaultDisconnectReceiptTimeout = 30 * time.Second

Default receipt timeout in Conn.Disconnect function

View Source
const DefaultHeartBeatError = 5 * time.Second

Default time span to add to read/write heart-beat timeouts to avoid premature disconnections due to network latency.

View Source
const DefaultMsgSendTimeout = 10 * time.Second

Default send timeout in Conn.Send function

View Source
const DefaultRcvReceiptTimeout = 30 * time.Second

Default receipt timeout in Conn.Send function

View Source
const DefaultUnsubscribeReceiptTimeout = 30 * time.Second

Default receipt timeout in Subscription.Unsubscribe function

View Source
const ReplyToHeader = "reply-to"

Reply-To header used for temporary queues/RPC with rabbit.

Variables

View Source
var (
	ErrInvalidCommand            = newErrorMessage("invalid command")
	ErrInvalidFrameFormat        = newErrorMessage("invalid frame format")
	ErrUnsupportedVersion        = newErrorMessage("unsupported version")
	ErrCompletedTransaction      = newErrorMessage("transaction is completed")
	ErrNackNotSupported          = newErrorMessage("NACK not supported in STOMP 1.0")
	ErrNotReceivedMessage        = newErrorMessage("cannot ack/nack a message, not from server")
	ErrCannotNackAutoSub         = newErrorMessage("cannot send NACK for a subscription with ack:auto")
	ErrCompletedSubscription     = newErrorMessage("subscription is unsubscribed")
	ErrClosedUnexpectedly        = newErrorMessage("connection closed unexpectedly")
	ErrAlreadyClosed             = newErrorMessage("connection already closed")
	ErrMsgSendTimeout            = newErrorMessage("msg send timeout")
	ErrMsgReceiptTimeout         = newErrorMessage("msg receipt timeout")
	ErrDisconnectReceiptTimeout  = newErrorMessage("disconnect receipt timeout")
	ErrUnsubscribeReceiptTimeout = newErrorMessage("unsubscribe receipt timeout")
	ErrNilOption                 = newErrorMessage("nil option")
)

Error values

View Source
var ConnOpt struct {
	// Login is a connect option that allows the calling program to
	// specify the "login" and "passcode" values to send to the STOMP
	// server.
	Login func(login, passcode string) func(*Conn) error

	// Host is a connect option that allows the calling program to
	// specify the value of the "host" header.
	Host func(host string) func(*Conn) error

	// UseStomp is a connect option that specifies that the client
	// should use the "STOMP" command instead of the "CONNECT" command.
	// Note that using "STOMP" is only valid for STOMP version 1.1 and later.
	UseStomp func(*Conn) error

	// AcceptVersoin is a connect option that allows the client to
	// specify one or more versions of the STOMP protocol that the
	// client program is prepared to accept. If this option is not
	// specified, the client program will accept any of STOMP versions
	// 1.0, 1.1 or 1.2.
	AcceptVersion func(versions ...Version) func(*Conn) error

	// HeartBeat is a connect option that allows the client to specify
	// the send and receive timeouts for the STOMP heartbeat negotiation mechanism.
	// The sendTimeout parameter specifies the maximum amount of time
	// between the client sending heartbeat notifications from the server.
	// The recvTimeout paramter specifies the minimum amount of time between
	// the client expecting to receive heartbeat notifications from the server.
	// If not specified, this option defaults to one minute for both send and receive
	// timeouts.
	HeartBeat func(sendTimeout, recvTimeout time.Duration) func(*Conn) error

	// HeartBeatError is a connect option that will normally only be specified during
	// testing. It specifies a short time duration that is larger than the amount of time
	// that will take for a STOMP frame to be transmitted from one station to the other.
	// When not specified, this value defaults to 5 seconds. This value is set to a much
	// shorter time duration during unit testing.
	HeartBeatError func(errorTimeout time.Duration) func(*Conn) error

	// MsgSendTimeout is a connect option that allows the client to specify
	// the timeout for the Conn.Send function.
	// The msgSendTimeout parameter specifies maximum blocking time for calling
	// the Conn.Send function.
	// If not specified, this option defaults to 10 seconds.
	// Less than or equal to zero means infinite
	MsgSendTimeout func(msgSendTimeout time.Duration) func(*Conn) error

	// RcvReceiptTimeout is a connect option that allows the client to specify
	// how long to wait for a receipt in the Conn.Send function. This helps
	// avoid deadlocks. If this is not specified, the default is 30 seconds.
	RcvReceiptTimeout func(rcvReceiptTimeout time.Duration) func(*Conn) error

	// DisconnectReceiptTimeout is a connect option that allows the client to specify
	// how long to wait for a receipt in the Conn.Disconnect function. This helps
	// avoid deadlocks. If this is not specified, the default is 30 seconds.
	DisconnectReceiptTimeout func(disconnectReceiptTimeout time.Duration) func(*Conn) error

	// UnsubscribeReceiptTimeout is a connect option that allows the client to specify
	// how long to wait for a receipt in the Conn.Unsubscribe function. This helps
	// avoid deadlocks. If this is not specified, the default is 30 seconds.
	UnsubscribeReceiptTimeout func(unsubscribeReceiptTimeout time.Duration) func(*Conn) error

	// HeartBeatGracePeriodMultiplier is used to calculate the effective read heart-beat timeout
	// the broker will enforce for each client’s connection. The multiplier is applied to
	// the read-timeout interval the client specifies in its CONNECT frame
	HeartBeatGracePeriodMultiplier func(multiplier float64) func(*Conn) error

	// Header is a connect option that allows the client to specify a custom
	// header entry in the STOMP frame. This connect option can be specified
	// multiple times for multiple custom headers.
	Header func(key, value string) func(*Conn) error

	// ReadChannelCapacity is the number of messages that can be on the read channel at the
	// same time. A high number may affect memory usage while a too low number may lock the
	// system up. Default is set to 20.
	ReadChannelCapacity func(capacity int) func(*Conn) error

	// WriteChannelCapacity is the number of messages that can be on the write channel at the
	// same time. A high number may affect memory usage while a too low number may lock the
	// system up. Default is set to 20.
	WriteChannelCapacity func(capacity int) func(*Conn) error

	// ReadBufferSize specifies number of bytes that can be used to read the message
	// A high number may affect memory usage while a too low number may lock the
	// system up. Default is set to 4096.
	ReadBufferSize func(size int) func(*Conn) error

	// WriteBufferSize specifies number of bytes that can be used to write the message
	// A high number may affect memory usage while a too low number may lock the
	// system up. Default is set to 4096.
	WriteBufferSize func(size int) func(*Conn) error

	// ResponseHeaders lets you provide a callback function to get the headers from the CONNECT response
	ResponseHeaders func(func(*frame.Header)) func(*Conn) error

	// Logger lets you provide a callback function that sets the logger used by a connection
	Logger func(logger Logger) func(*Conn) error

	// Enable statistical gathering for reading/writing messages from the server
	WithStats func() func(*Conn) error
}

Options for connecting to the STOMP server. Used with the stomp.Dial and stomp.Connect functions, both of which have examples.

View Source
var SendOpt struct {
	// Receipt specifies that the client should request acknowledgement
	// from the server before the send operation successfully completes.
	Receipt func(*frame.Frame) error

	// NoContentLength specifies that the SEND frame should not include
	// a content-length header entry. By default the content-length header
	// entry is always included, but some message brokers assign special
	// meaning to STOMP frames that do not contain a content-length
	// header entry. (In particular ActiveMQ interprets STOMP frames
	// with no content-length as being a text message)
	NoContentLength func(*frame.Frame) error

	// Header provides the opportunity to include custom header entries
	// in the SEND frame that the client sends to the server. This option
	// can be specified multiple times if multiple custom header entries
	// are required.
	Header func(key, value string) func(*frame.Frame) error
}

SendOpt contains options for for the Conn.Send and Transaction.Send functions.

View Source
var SubscribeOpt struct {
	// Id provides the opportunity to specify the value of the "id" header
	// entry in the STOMP SUBSCRIBE frame.
	//
	// If the client program does specify the value for "id",
	// it is responsible for choosing a unique value.
	Id func(id string) func(*frame.Frame) error

	// Header provides the opportunity to include custom header entries
	// in the SUBSCRIBE frame that the client sends to the server.
	Header func(key, value string) func(*frame.Frame) error
}

SubscribeOpt contains options for for the Conn.Subscribe function.

Functions

This section is empty.

Types

type AckMode

type AckMode int

The AckMode type is an enumeration of the acknowledgement modes for a STOMP subscription.

const (
	// No acknowledgement is required, the server assumes that the client
	// received the message.
	AckAuto AckMode = iota

	// Client acknowledges messages. When a client acknowledges a message,
	// any previously received messages are also acknowledged.
	AckClient

	// Client acknowledges message. Each message is acknowledged individually.
	AckClientIndividual
)

func (AckMode) ShouldAck

func (a AckMode) ShouldAck() bool

ShouldAck returns true if this AckMode is an acknowledgement mode which requires acknowledgement. Returns true for all values except AckAuto, which returns false.

func (AckMode) String

func (a AckMode) String() string

String returns the string representation of the AckMode value.

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

A Conn is a connection to a STOMP server. Create a Conn using either the Dial or Connect function.

func Connect

func Connect(conn io.ReadWriteCloser, opts ...func(*Conn) error) (*Conn, error)

Connect creates a STOMP connection and performs the STOMP connect protocol sequence. The connection to the STOMP server has already been created by the program. The opts parameter provides the opportunity to specify STOMP protocol options.

Example

Example of connecting to a STOMP server using an existing network connection.

netConn, err := net.DialTimeout("tcp", "stomp.server.com:61613", 10*time.Second)
if err != nil {
	return err
}

stompConn, err := stomp.Connect(netConn)
if err != nil {
	return err
}

defer stompConn.Disconnect()

doSomethingWith(stompConn)
return nil
Output:

func ConnectWithContext added in v3.1.3

func ConnectWithContext(ctx context.Context, conn io.ReadWriteCloser, opts ...func(*Conn) error) (*Conn, error)

func Dial

func Dial(network, addr string, opts ...func(*Conn) error) (*Conn, error)

Dial creates a network connection to a STOMP server and performs the STOMP connect protocol sequence. The network endpoint of the STOMP server is specified by network and addr. STOMP protocol options can be specified in opts.

func DialWithContext added in v3.1.3

func DialWithContext(ctx context.Context, network, addr string, opts ...func(*Conn) error) (*Conn, error)

func (*Conn) Ack

func (c *Conn) Ack(m *Message) error

Ack acknowledges a message received from the STOMP server. If the message was received on a subscription with AckMode == AckAuto, then no operation is performed.

func (*Conn) Begin

func (c *Conn) Begin() *Transaction

Begin is used to start a transaction. Transactions apply to sending and acknowledging. Any messages sent or acknowledged during a transaction will be processed atomically by the STOMP server based on the transaction.

func (*Conn) BeginWithError

func (c *Conn) BeginWithError() (*Transaction, error)

BeginWithError is used to start a transaction, but also returns the error (if any) from sending the frame to start the transaction.

func (*Conn) Disconnect

func (c *Conn) Disconnect() error

Disconnect will disconnect from the STOMP server. This function follows the STOMP standard's recommended protocol for graceful disconnection: it sends a DISCONNECT frame with a receipt header element. Once the RECEIPT frame has been received, the connection with the STOMP server is closed and any further attempt to write to the server will fail.

func (*Conn) MustDisconnect

func (c *Conn) MustDisconnect() error

MustDisconnect will disconnect 'ungracefully' from the STOMP server. This method should be used only as last resort when there are fatal network errors that prevent to do a proper disconnect from the server.

func (*Conn) Nack

func (c *Conn) Nack(m *Message) error

Nack indicates to the server that a message was not received by the client. Returns an error if the STOMP version does not support the NACK message.

func (*Conn) Send

func (c *Conn) Send(destination, contentType string, body []byte, opts ...func(*frame.Frame) error) error

Send sends a message to the STOMP server, which in turn sends the message to the specified destination. If the STOMP server fails to receive the message for any reason, the connection will close.

The content type should be specified, according to the STOMP specification, but if contentType is an empty string, the message will be delivered without a content-type header entry. The body array contains the message body, and its content should be consistent with the specified content type.

Any number of options can be specified in opts. See the examples for usage. Options include whether to receive a RECEIPT, should the content-length be suppressed, and sending custom header entries.

func (*Conn) Server

func (c *Conn) Server() string

Server returns the STOMP server identification, which can be returned by the STOMP server during the connect sequence. If the STOMP server does not return a server header entry, this value will be a blank string.

func (*Conn) Session

func (c *Conn) Session() string

Session returns the session identifier, which can be returned by the STOMP server during the connect sequence. If the STOMP server does not return a session header entry, this value will be a blank string.

func (*Conn) Stats added in v3.1.2

func (c *Conn) Stats() ConnectionStats

func (*Conn) Subscribe

func (c *Conn) Subscribe(destination string, ack AckMode, opts ...func(*frame.Frame) error) (*Subscription, error)

Subscribe creates a subscription on the STOMP server. The subscription has a destination, and messages sent to that destination will be received by this subscription. A subscription has a channel on which the calling program can receive messages.

func (*Conn) Version

func (c *Conn) Version() Version

Version returns the version of the STOMP protocol that is being used to communicate with the STOMP server. This version is negotiated with the server during the connect sequence.

type ConnectionStats added in v3.1.2

type ConnectionStats struct {
	CurrentWriteChanSize int
	CurrentReadChanSize  int
	WritesSent           int64
	ReadsReceived        int64
}

ConnectionStats contains the statistics of a connection. Retrieve via Connection.Stats()

type Error

type Error struct {
	Message string
	Frame   *frame.Frame
}

StompError implements the Error interface, and provides additional information about a STOMP error.

func (Error) Error

func (e Error) Error() string

type Logger added in v3.0.2

type Logger interface {
	Debugf(format string, value ...interface{})
	Infof(format string, value ...interface{})
	Warningf(format string, value ...interface{})
	Errorf(format string, value ...interface{})

	Debug(message string)
	Info(message string)
	Warning(message string)
	Error(message string)
}

type Message

type Message struct {
	// Indicates whether an error was received on the subscription.
	// The error will contain details of the error. If the server
	// sent an ERROR frame, then the Body, ContentType and Header fields
	// will be populated according to the contents of the ERROR frame.
	Err error

	// Destination the message has been sent to.
	Destination string

	// MIME content type.
	ContentType string // MIME content

	// Connection that the message was received on.
	Conn *Conn

	// Subscription associated with the message.
	Subscription *Subscription

	// Optional header entries. When received from the server,
	// these are the header entries received with the message.
	Header *frame.Header

	// The message body, which is an arbitrary sequence of bytes.
	// The ContentType indicates the format of this body.
	Body []byte // Content of message
}

A Message represents a message received from the STOMP server. In most cases a message corresponds to a single STOMP MESSAGE frame received from the STOMP server. If, however, the Err field is non-nil, then the message corresponds to a STOMP ERROR frame, or a connection error between the client and the server.

func (*Message) Read

func (msg *Message) Read(p []byte) (int, error)

func (*Message) ReadByte

func (msg *Message) ReadByte() (byte, error)

func (*Message) ShouldAck

func (msg *Message) ShouldAck() bool

ShouldAck returns true if this message should be acknowledged to the STOMP server that sent it.

type Subscription

type Subscription struct {
	C chan *Message
	// contains filtered or unexported fields
}

The Subscription type represents a client subscription to a destination. The subscription is created by calling Conn.Subscribe.

Once a client has subscribed, it can receive messages from the C channel.

func (*Subscription) AckMode

func (s *Subscription) AckMode() AckMode

AckMode returns the Acknowledgement mode specified when the subscription was created.

func (*Subscription) Active

func (s *Subscription) Active() bool

Active returns whether the subscription is still active. Returns false if the subscription has been unsubscribed.

func (*Subscription) Destination

func (s *Subscription) Destination() string

Destination for which the subscription applies.

func (*Subscription) Id

func (s *Subscription) Id() string

Identification for this subscription. Unique among all subscriptions for the same Client.

func (*Subscription) Read

func (s *Subscription) Read() (*Message, error)

Read a message from the subscription. This is a convenience method: many callers will prefer to read from the channel C directly.

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe(opts ...func(*frame.Frame) error) error

Unsubscribes and closes the channel C.

type Transaction

type Transaction struct {
	// contains filtered or unexported fields
}

A Transaction applies to the sending of messages to the STOMP server, and the acknowledgement of messages received from the STOMP server. All messages sent and and acknowledged in the context of a transaction are processed atomically by the STOMP server.

Transactions are committed with the Commit method. When a transaction is committed, all sent messages, acknowledgements and negative acknowledgements, are processed by the STOMP server. Alternatively transactions can be aborted, in which case all sent messages, acknowledgements and negative acknowledgements are discarded by the STOMP server.

Example
conn, err := stomp.Dial("tcp", "localhost:61613")
if err != nil {
	return err
}
defer conn.Disconnect()

sub, err := conn.Subscribe("/queue/test-2", stomp.AckClient)
if err != nil {
	return err
}

// receive 5 messages and then quit
for i := 0; i < 5; i++ {
	msg := <-sub.C
	if msg.Err != nil {
		return msg.Err
	}

	tx := conn.Begin()

	doAnotherThingWith(msg, tx)

	tx.Send("/queue/another-one", "text/plain",
		[]byte(fmt.Sprintf("Message #%d", i)), nil)

	// acknowledge the message
	err = tx.Ack(msg)
	if err != nil {
		return err
	}

	err = tx.Commit()
	if err != nil {
		return err
	}
}

err = sub.Unsubscribe()
if err != nil {
	return err
}

return nil
Output:

func (*Transaction) Abort

func (tx *Transaction) Abort() error

Abort will abort the transaction. Any calls to Send, SendWithReceipt, Ack and Nack on this transaction will be discarded. This function does not wait for the server to process the ABORT frame. See AbortWithReceipt if you want to ensure the ABORT is processed.

func (*Transaction) AbortWithReceipt

func (tx *Transaction) AbortWithReceipt() error

Abort will abort the transaction. Any calls to Send, SendWithReceipt, Ack and Nack on this transaction will be discarded.

func (*Transaction) Ack

func (tx *Transaction) Ack(msg *Message) error

Ack sends an acknowledgement for the message to the server. The STOMP server will not process the acknowledgement until the transaction has been committed. If the subscription has an AckMode of AckAuto, calling this function has no effect.

func (*Transaction) Commit

func (tx *Transaction) Commit() error

Commit will commit the transaction. All messages and acknowledgements sent to the STOMP server on this transaction will be processed atomically. This function does not wait for the server to process the COMMIT frame. See CommitWithReceipt if you want to ensure the COMMIT is processed.

func (*Transaction) CommitWithReceipt

func (tx *Transaction) CommitWithReceipt() error

Commit will commit the transaction. All messages and acknowledgements sent to the STOMP server on this transaction will be processed atomically.

func (*Transaction) Conn

func (tx *Transaction) Conn() *Conn

Conn returns the connection associated with this transaction.

func (*Transaction) Id

func (tx *Transaction) Id() string

Id returns the unique identifier for the transaction.

func (*Transaction) Nack

func (tx *Transaction) Nack(msg *Message) error

Nack sends a negative acknowledgement for the message to the server, indicating that this client cannot or will not process the message and that it should be processed elsewhere. The STOMP server will not process the negative acknowledgement until the transaction has been committed. It is an error to call this method if the subscription has an AckMode of AckAuto, because the STOMP server will not be expecting any kind of acknowledgement (positive or negative) for this message.

func (*Transaction) Send

func (tx *Transaction) Send(destination, contentType string, body []byte, opts ...func(*frame.Frame) error) error

Send sends a message to the STOMP server as part of a transaction. The server will not process the message until the transaction is committed. This method returns without confirming that the STOMP server has received the message. If the STOMP server does fail to receive the message for any reason, the connection will close.

The content type should be specified, according to the STOMP specification, but if contentType is an empty string, the message will be delivered without a content type header entry. The body array contains the message body, and its content should be consistent with the specified content type.

TODO: document opts

type Validator

type Validator interface {
	// Validate returns nil if the frame is valid, or an error if not valid.
	Validate(f *frame.Frame) error
}

Validator is an interface for validating STOMP frames.

func NewValidator

func NewValidator(version Version) Validator

type Version

type Version string

Version is the STOMP protocol version.

const (
	V10 Version = "1.0"
	V11 Version = "1.1"
	V12 Version = "1.2"
)

func (Version) CheckSupported

func (v Version) CheckSupported() error

CheckSupported is used to determine whether a particular STOMP version is supported by this library. Returns nil if the version is supported, or ErrUnsupportedVersion if not supported.

func (Version) String

func (v Version) String() string

String returns a string representation of the STOMP version.

func (Version) SupportsNack

func (v Version) SupportsNack() bool

SupportsNack indicates whether this version of the STOMP protocol supports use of the NACK command.

Notes

Bugs

  • If the client does not read messages from the Subscription.C channel quickly enough, the client will stop reading messages from the server.

Directories

Path Synopsis
examples
Package frame provides functionality for manipulating STOMP frames.
Package frame provides functionality for manipulating STOMP frames.
internal
log
Package server contains a simple STOMP server implementation.
Package server contains a simple STOMP server implementation.
client
Package client implements client connectivity in the STOMP server.
Package client implements client connectivity in the STOMP server.
queue
Package queue provides implementations of server-side queues.
Package queue provides implementations of server-side queues.
topic
Package topic provides implementations of server-side topics.
Package topic provides implementations of server-side topics.
A simple, stand-alone STOMP server.
A simple, stand-alone STOMP server.
Package testutil is a generated GoMock package.
Package testutil is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL