rabbithole

package module
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2019 License: BSD-2-Clause Imports: 11 Imported by: 0

README

Rabbit Hole, a RabbitMQ HTTP API Client for Go

This library is a RabbitMQ HTTP API client for the Go language.

Supported Go Versions

Rabbit Hole supports 3 most recent Go releases.

Supported RabbitMQ Versions

  • RabbitMQ 3.x

All versions require RabbitMQ Management UI plugin to be installed and enabled.

Project Maturity

Rabbit Hole is a mature library (first released in late 2013) designed after a couple of other RabbitMQ HTTP API clients with stable APIs. Breaking API changes are not out of the question but not without a reasonable version bump.

It is largely (80-90%) feature complete and decently documented.

Installation

go get github.com/michaelklishin/rabbit-hole

Documentation

API Reference

API reference is available on godoc.org.

Continue reading for a list of example snippets.

Overview

To import the package:

import (
       "github.com/michaelklishin/rabbit-hole"
)

All HTTP API operations are accessible via rabbithole.Client, which should be instantiated with rabbithole.NewClient:

// URI, username, password
rmqc, _ = NewClient("http://127.0.0.1:15672", "guest", "guest")

TLS (HTTPS) can be enabled by adding an HTTP transport to the parameters of rabbithole.NewTLSClient:

transport := &http.Transport{TLSClientConfig: tlsConfig}
rmqc, _ := NewTLSClient("https://127.0.0.1:15672", "guest", "guest", transport)

RabbitMQ HTTP API has to be configured to use TLS.

Getting Overview
res, err := rmqc.Overview()
Node and Cluster Status
xs, err := rmqc.ListNodes()
// => []NodeInfo, err

node, err := rmqc.GetNode("rabbit@mercurio")
// => NodeInfo, err
Operations on Connections
xs, err := rmqc.ListConnections()
// => []ConnectionInfo, err

conn, err := rmqc.GetConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
// => ConnectionInfo, err

// Forcefully close connection
_, err := rmqc.CloseConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
// => *http.Response, err
Operations on Channels
xs, err := rmqc.ListChannels()
// => []ChannelInfo, err

ch, err := rmqc.GetChannel("127.0.0.1:50545 -> 127.0.0.1:5672 (1)")
// => ChannelInfo, err
Operations on Vhosts
xs, err := rmqc.ListVhosts()
// => []VhostInfo, err

// information about individual vhost
x, err := rmqc.GetVhost("/")
// => VhostInfo, err

// creates or updates individual vhost
resp, err := rmqc.PutVhost("/", VhostSettings{Tracing: false})
// => *http.Response, err

// deletes individual vhost
resp, err := rmqc.DeleteVhost("/")
// => *http.Response, err
Managing Users
xs, err := rmqc.ListUsers()
// => []UserInfo, err

// information about individual user
x, err := rmqc.GetUser("my.user")
// => UserInfo, err

// creates or updates individual user
resp, err := rmqc.PutUser("my.user", UserSettings{Password: "s3krE7", Tags: "management,policymaker"})
// => *http.Response, err

// creates or updates individual user with no password
resp, err := rmqc.PutUserWithoutPassword("my.user", UserSettings{Tags: "management,policymaker"})
// => *http.Response, err

// deletes individual user
resp, err := rmqc.DeleteUser("my.user")
// => *http.Response, err
// creates or updates individual user with a SHA256 password hash
hash := SaltedPasswordHashSHA256("password-s3krE7")
resp, err := rmqc.PutUser("my.user", UserSettings{
  PasswordHash: hash,
  HashingAlgorithm: HashingAlgorithmSHA256,
  Tags: "management,policymaker"})
// => *http.Response, err
Managing Permissions
xs, err := rmqc.ListPermissions()
// => []PermissionInfo, err

// permissions of individual user
x, err := rmqc.ListPermissionsOf("my.user")
// => []PermissionInfo, err

// permissions of individual user in vhost
x, err := rmqc.GetPermissionsIn("/", "my.user")
// => PermissionInfo, err

// updates permissions of user in vhost
resp, err := rmqc.UpdatePermissionsIn("/", "my.user", Permissions{Configure: ".*", Write: ".*", Read: ".*"})
// => *http.Response, err

// revokes permissions in vhost
resp, err := rmqc.ClearPermissionsIn("/", "my.user")
// => *http.Response, err
Operations on Exchanges
xs, err := rmqc.ListExchanges()
// => []ExchangeInfo, err

// list exchanges in a vhost
xs, err := rmqc.ListExchangesIn("/")
// => []ExchangeInfo, err

// information about individual exchange
x, err := rmqc.GetExchange("/", "amq.fanout")
// => ExchangeInfo, err

// declares an exchange
resp, err := rmqc.DeclareExchange("/", "an.exchange", ExchangeSettings{Type: "fanout", Durable: false})
// => *http.Response, err

// deletes individual exchange
resp, err := rmqc.DeleteExchange("/", "an.exchange")
// => *http.Response, err
Operations on Queues
qs, err := rmqc.ListQueues()
// => []QueueInfo, err

// list queues in a vhost
qs, err := rmqc.ListQueuesIn("/")
// => []QueueInfo, err

// information about individual queue
q, err := rmqc.GetQueue("/", "a.queue")
// => QueueInfo, err

// declares a queue
resp, err := rmqc.DeclareQueue("/", "a.queue", QueueSettings{Durable: false})
// => *http.Response, err

// deletes individual queue
resp, err := rmqc.DeleteQueue("/", "a.queue")
// => *http.Response, err

// purges all messages in queue
resp, err := rmqc.PurgeQueue("/", "a.queue")
// => *http.Response, err
Operations on Bindings
bs, err := rmqc.ListBindings()
// => []BindingInfo, err

// list bindings in a vhost
bs, err := rmqc.ListBindingsIn("/")
// => []BindingInfo, err

// list bindings of a queue
bs, err := rmqc.ListQueueBindings("/", "a.queue")
// => []BindingInfo, err

// list all bindings having the exchange as source
bs1, err := rmqc.ListExchangeBindingsWithSource("/", "an.exchange")
// => []BindingInfo, err

// list all bindings having the exchange as destinattion
bs2, err := rmqc.ListExchangeBindingsWithDestination("/", "an.exchange")
// => []BindingInfo, err

// declare a binding
resp, err := rmqc.DeclareBinding("/", BindingInfo{
	Source: "an.exchange",
	Destination: "a.queue",
	DestinationType: "queue",
	RoutingKey: "#",
})
// => *http.Response, err

// deletes individual binding
resp, err := rmqc.DeleteBinding("/", BindingInfo{
	Source: "an.exchange",
	Destination: "a.queue",
	DestinationType: "queue",
	RoutingKey: "#",
	PropertiesKey: "%23",
})
// => *http.Response, err
Operations on Shovels
qs, err := rmqc.ListShovels()
// => []ShovelInfo, err

// list shovels in a vhost
qs, err := rmqc.ListShovelsIn("/")
// => []ShovelInfo, err

// information about an individual shovel
q, err := rmqc.GetShovel("/", "a.shovel")
// => ShovelInfo, err

// declares a shovel
shovelDetails := rabbithole.ShovelDefinition{SourceURI: "amqp://sourceURI", SourceQueue: "mySourceQueue", DestinationURI: "amqp://destinationURI", DestinationQueue: "myDestQueue", AddForwardHeaders: true, AckMode: "on-confirm", DeleteAfter: "never"}
resp, err := rmqc.DeclareShovel("/", "a.shovel", shovelDetails)
// => *http.Response, err

// deletes an individual shovel
resp, err := rmqc.DeleteShovel("/", "a.shovel")
// => *http.Response, err

Operations on cluster name
// Get cluster name
cn, err := rmqc.GetClusterName()
// => ClusterName, err

// Rename cluster
resp, err := rmqc.SetClusterName(ClusterName{Name: "rabbitmq@rabbit-hole"})
// => *http.Response, err

HTTPS Connections
var tlsConfig *tls.Config

...

transport := &http.Transport{TLSClientConfig: tlsConfig}

rmqc, err := NewTLSClient("https://127.0.0.1:15672", "guest", "guest", transport)
Changing Transport Layer
var transport *http.Transport

...

rmqc.SetTransport(transport)

CI Status

Build Status

Contributing

See CONTRIBUTING.md

2-clause BSD license.

(c) Michael S. Klishin, 2013-2019.

Documentation

Overview

Rabbit Hole is a Go client for the RabbitMQ HTTP API.

All HTTP API operations are accessible via `rabbithole.Client`, which should be instantiated with `rabbithole.NewClient`.

// URI, username, password
rmqc, _ = NewClient("http://127.0.0.1:15672", "guest", "guest")

Getting Overview

res, err := rmqc.Overview()

Node and Cluster Status

var err error

// => []NodeInfo, err
xs, err := rmqc.ListNodes()

node, err := rmqc.GetNode("rabbit@mercurio")
// => NodeInfo, err

Operations on Connections

xs, err := rmqc.ListConnections()
// => []ConnectionInfo, err

conn, err := rmqc.GetConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
// => ConnectionInfo, err

// Forcefully close connection
_, err := rmqc.CloseConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
// => *http.Response, err

Operations on Channels

xs, err := rmqc.ListChannels()
// => []ChannelInfo, err

ch, err := rmqc.GetChannel("127.0.0.1:50545 -> 127.0.0.1:5672 (1)")
// => ChannelInfo, err

Operations on Exchanges

xs, err := rmqc.ListExchanges()
// => []ExchangeInfo, err

// list exchanges in a vhost
xs, err := rmqc.ListExchangesIn("/")
// => []ExchangeInfo, err

// information about individual exchange
x, err := rmqc.GetExchange("/", "amq.fanout")
// => ExchangeInfo, err

// declares an exchange
resp, err := rmqc.DeclareExchange("/", "an.exchange", ExchangeSettings{Type: "fanout", Durable: false})
// => *http.Response, err

// deletes individual exchange
resp, err := rmqc.DeleteExchange("/", "an.exchange")
// => *http.Response, err

Operations on Queues

xs, err := rmqc.ListQueues()
// => []QueueInfo, err

// list queues in a vhost
xs, err := rmqc.ListQueuesIn("/")
// => []QueueInfo, err

// information about individual queue
x, err := rmqc.GetQueue("/", "a.queue")
// => QueueInfo, err

// declares a queue
resp, err := rmqc.DeclareQueue("/", "a.queue", QueueSettings{Durable: false})
// => *http.Response, err

// deletes individual queue
resp, err := rmqc.DeleteQueue("/", "a.queue")
// => *http.Response, err

// purges all messages in queue
resp, err := rmqc.PurgeQueue("/", "a.queue")
// => *http.Response, err

Operations on Bindings

bs, err := rmqc.ListBindings()
// => []BindingInfo, err

// list bindings in a vhost
bs, err := rmqc.ListBindingsIn("/")
// => []BindingInfo, err

// list bindings of a queue
bs, err := rmqc.ListQueueBindings("/", "a.queue")
// => []BindingInfo, err

// declare a binding
resp, err := rmqc.DeclareBinding("/", BindingInfo{
    Source: "an.exchange",
    Destination: "a.queue",
    DestinationType: "queue",
    RoutingKey: "#",
})
// => *http.Response, err

// deletes individual binding
resp, err := rmqc.DeleteBinding("/", BindingInfo{
    Source: "an.exchange",
    Destination: "a.queue",
    DestinationType: "queue",
    RoutingKey: "#",
    PropertiesKey: "%23",
})
// => *http.Response, err

Operations on Vhosts

xs, err := rmqc.ListVhosts()
// => []VhostInfo, err

// information about individual vhost
x, err := rmqc.GetVhost("/")
// => VhostInfo, err

// creates or updates individual vhost
resp, err := rmqc.PutVhost("/", VhostSettings{Tracing: false})
// => *http.Response, err

// deletes individual vhost
resp, err := rmqc.DeleteVhost("/")
// => *http.Response, err

Managing Users

xs, err := rmqc.ListUsers()
// => []UserInfo, err

// information about individual user
x, err := rmqc.GetUser("my.user")
// => UserInfo, err

// creates or updates individual user
resp, err := rmqc.PutUser("my.user", UserSettings{Password: "s3krE7", Tags: "management policymaker"})
// => *http.Response, err

// deletes individual user
resp, err := rmqc.DeleteUser("my.user")
// => *http.Response, err

Managing Permissions

xs, err := rmqc.ListPermissions()
// => []PermissionInfo, err

// permissions of individual user
x, err := rmqc.ListPermissionsOf("my.user")
// => []PermissionInfo, err

// permissions of individual user in vhost
x, err := rmqc.GetPermissionsIn("/", "my.user")
// => PermissionInfo, err

// updates permissions of user in vhost
resp, err := rmqc.UpdatePermissionsIn("/", "my.user", Permissions{Configure: ".*", Write: ".*", Read: ".*"})
// => *http.Response, err

// revokes permissions in vhost
resp, err := rmqc.ClearPermissionsIn("/", "my.user")
// => *http.Response, err

Operations on cluster name

// Get cluster name
cn, err := rmqc.GetClusterName()
// => ClusterName, err

// Rename cluster
resp, err := rmqc.SetClusterName(ClusterName{Name: "rabbitmq@rabbit-hole"})
// => *http.Response, err

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Base64EncodedSaltedPasswordHashSHA256

func Base64EncodedSaltedPasswordHashSHA256(password string) string

Produces a salted hash value expected by the HTTP API. See https://www.rabbitmq.com/passwords.html#computing-password-hash for details.

func Base64EncodedSaltedPasswordHashSHA512

func Base64EncodedSaltedPasswordHashSHA512(password string) string

func GenerateSalt

func GenerateSalt(n int) string

func PathEscape

func PathEscape(s string) string

PathEscape escapes the string so it can be safely placed inside a URL path segment.

func SaltedPasswordHashSHA256

func SaltedPasswordHashSHA256(password string) (string, string)

func SaltedPasswordHashSHA512

func SaltedPasswordHashSHA512(password string) (string, string)

Types

type AuthMechanism

type AuthMechanism NameDescriptionEnabled

type BackingQueueStatus

type BackingQueueStatus struct {
	Q1 int `json:"q1"`
	Q2 int `json:"q2"`
	Q3 int `json:"q3"`
	Q4 int `json:"q4"`
	// Total queue length
	Length int64 `json:"len"`
	// Number of pending acks from consumers
	PendingAcks int64 `json:"pending_acks"`
	// Number of messages held in RAM
	RAMMessageCount int64 `json:"ram_msg_count"`
	// Number of outstanding acks held in RAM
	RAMAckCount int64 `json:"ram_ack_count"`
	// Number of persistent messages in the store
	PersistentCount int64 `json:"persistent_count"`
	// Average ingress (inbound) rate, not including messages
	// that straight through to auto-acking consumers.
	AverageIngressRate float64 `json:"avg_ingress_rate"`
	// Average egress (outbound) rate, not including messages
	// that straight through to auto-acking consumers.
	AverageEgressRate float64 `json:"avg_egress_rate"`
	// rate at which unacknowledged message records enter RAM,
	// e.g. because messages are delivered requiring acknowledgement
	AverageAckIngressRate float32 `json:"avg_ack_ingress_rate"`
	// rate at which unacknowledged message records leave RAM,
	// e.g. because acks arrive or unacked messages are paged out
	AverageAckEgressRate float32 `json:"avg_ack_egress_rate"`
}

Information about backing queue (queue storage engine).

type BindingInfo

type BindingInfo struct {
	// Binding source (exchange name)
	Source string `json:"source"`
	Vhost  string `json:"vhost"`
	// Binding destination (queue or exchange name)
	Destination string `json:"destination"`
	// Destination type, either "queue" or "exchange"
	DestinationType string                 `json:"destination_type"`
	RoutingKey      string                 `json:"routing_key"`
	Arguments       map[string]interface{} `json:"arguments"`
	PropertiesKey   string                 `json:"properties_key"`
}

type BindingVertex

type BindingVertex string
const (
	BindingSource      BindingVertex = "source"
	BindingDestination BindingVertex = "destination"
)

func (BindingVertex) String

func (v BindingVertex) String() string

type BriefConnectionDetails

type BriefConnectionDetails struct {
	// Connection name
	Name string `json:"name"`
	// Client port
	PeerPort Port `json:"peer_port"`
	// Client host
	PeerHost string `json:"peer_host"`
}

Brief (very incomplete) connection information.

type BrokerContext

type BrokerContext struct {
	Node        string `json:"node"`
	Description string `json:"description"`
	Path        string `json:"path"`
	Port        Port   `json:"port"`
	Ignore      bool   `json:"ignore_in_use"`
}

RabbitMQ context (Erlang app) running on a node

type ChannelInfo

type ChannelInfo struct {
	// Channel number
	Number int `json:"number"`
	// Channel name
	Name string `json:"name"`

	// basic.qos (prefetch count) value used
	PrefetchCount int `json:"prefetch_count"`
	// How many consumers does this channel have
	ConsumerCount int `json:"consumer_count"`

	// Number of unacknowledged messages on this channel
	UnacknowledgedMessageCount int `json:"messages_unacknowledged"`
	// Number of messages on this channel unconfirmed to publishers
	UnconfirmedMessageCount int `json:"messages_unconfirmed"`
	// Number of messages on this channel uncommited to message store
	UncommittedMessageCount int `json:"messages_uncommitted"`
	// Number of acks on this channel uncommited to message store
	UncommittedAckCount int `json:"acks_uncommitted"`

	// TODO(mk): custom deserializer to date/time?
	IdleSince string `json:"idle_since"`

	// True if this channel uses publisher confirms
	UsesPublisherConfirms bool `json:"confirm"`
	// True if this channel uses transactions
	Transactional bool `json:"transactional"`
	// True if this channel is blocked via channel.flow
	ClientFlowBlocked bool `json:"client_flow_blocked"`

	User  string `json:"user"`
	Vhost string `json:"vhost"`
	Node  string `json:"node"`

	ConnectionDetails BriefConnectionDetails `json:"connection_details"`
}

type Client

type Client struct {
	// URI of a RabbitMQ node to use, not including the path, e.g. http://127.0.0.1:15672.
	Endpoint string
	// Username to use. This RabbitMQ user must have the "management" tag.
	Username string
	// Password to use.
	Password string
	// contains filtered or unexported fields
}

func NewClient

func NewClient(uri string, username string, password string) (me *Client, err error)

func NewTLSClient

func NewTLSClient(uri string, username string, password string, transport *http.Transport) (me *Client, err error)

Creates a client with a transport; it is up to the developer to make that layer secure.

func (*Client) ClearPermissionsIn

func (c *Client) ClearPermissionsIn(vhost, username string) (res *http.Response, err error)

Clears (deletes) permissions of user in virtual host.

func (*Client) CloseConnection

func (c *Client) CloseConnection(name string) (res *http.Response, err error)

func (*Client) DeclareBinding

func (c *Client) DeclareBinding(vhost string, info BindingInfo) (res *http.Response, err error)

DeclareBinding updates information about a binding between a source and a target

func (*Client) DeclareExchange

func (c *Client) DeclareExchange(vhost, exchange string, info ExchangeSettings) (res *http.Response, err error)

func (*Client) DeclareQueue

func (c *Client) DeclareQueue(vhost, queue string, info QueueSettings) (res *http.Response, err error)

func (*Client) DeclareShovel

func (c *Client) DeclareShovel(vhost, shovel string, info ShovelDefinition) (res *http.Response, err error)

DeclareShovel creates a shovel

func (*Client) DeleteBinding

func (c *Client) DeleteBinding(vhost string, info BindingInfo) (res *http.Response, err error)

DeleteBinding delets an individual binding

func (*Client) DeleteExchange

func (c *Client) DeleteExchange(vhost, exchange string) (res *http.Response, err error)

func (*Client) DeleteFederationUpstream

func (c *Client) DeleteFederationUpstream(vhost, upstreamName string) (res *http.Response, err error)

Deletes a federation upstream.

func (*Client) DeletePolicy

func (c *Client) DeletePolicy(vhost, name string) (res *http.Response, err error)

Deletes a policy.

func (*Client) DeleteQueue

func (c *Client) DeleteQueue(vhost, queue string) (res *http.Response, err error)

func (*Client) DeleteShovel

func (c *Client) DeleteShovel(vhost, shovel string) (res *http.Response, err error)

DeleteShovel a shovel

func (*Client) DeleteUser

func (c *Client) DeleteUser(username string) (res *http.Response, err error)

Deletes user.

func (*Client) DeleteVhost

func (c *Client) DeleteVhost(vhostname string) (res *http.Response, err error)

Deletes a virtual host.

func (*Client) EnabledProtocols

func (c *Client) EnabledProtocols() (xs []string, err error)

func (*Client) GetChannel

func (c *Client) GetChannel(name string) (rec *ChannelInfo, err error)

Returns channel information.

func (*Client) GetClusterName

func (c *Client) GetClusterName() (rec *ClusterName, err error)

func (*Client) GetConnection

func (c *Client) GetConnection(name string) (rec *ConnectionInfo, err error)

func (*Client) GetExchange

func (c *Client) GetExchange(vhost, exchange string) (rec *DetailedExchangeInfo, err error)

func (*Client) GetNode

func (c *Client) GetNode(name string) (rec *NodeInfo, err error)

func (*Client) GetPermissionsIn

func (c *Client) GetPermissionsIn(vhost, username string) (rec PermissionInfo, err error)

Returns permissions of user in virtual host.

func (*Client) GetPolicy

func (c *Client) GetPolicy(vhost, name string) (rec *Policy, err error)

Returns individual policy in virtual host.

func (*Client) GetQueue

func (c *Client) GetQueue(vhost, queue string) (rec *DetailedQueueInfo, err error)

func (*Client) GetQueueWithParameters

func (c *Client) GetQueueWithParameters(vhost, queue string, qs url.Values) (rec *DetailedQueueInfo, err error)

func (*Client) GetShovel

func (c *Client) GetShovel(vhost, shovel string) (rec *ShovelInfo, err error)

GetShovel returns a shovel configuration

func (*Client) GetUser

func (c *Client) GetUser(username string) (rec *UserInfo, err error)

Returns information about individual user.

func (*Client) GetVhost

func (c *Client) GetVhost(vhostname string) (rec *VhostInfo, err error)

Returns information about a specific virtual host.

func (*Client) ListBindings

func (c *Client) ListBindings() (rec []BindingInfo, err error)

Returns all bindings

func (*Client) ListBindingsIn

func (c *Client) ListBindingsIn(vhost string) (rec []BindingInfo, err error)

Returns all bindings in a virtual host.

func (*Client) ListChannels

func (c *Client) ListChannels() (rec []ChannelInfo, err error)

Returns information about all open channels.

func (*Client) ListConnections

func (c *Client) ListConnections() (rec []ConnectionInfo, err error)

func (*Client) ListExchangeBindings

func (c *Client) ListExchangeBindings(vhost, exchange string, sourceOrDestination BindingVertex) (rec []BindingInfo, err error)

Returns all bindings having the exchange as source or destination as defined by the Target

func (*Client) ListExchangeBindingsBetween

func (c *Client) ListExchangeBindingsBetween(vhost, source string, destination string) (rec []BindingInfo, err error)

func (*Client) ListExchangeBindingsWithDestination

func (c *Client) ListExchangeBindingsWithDestination(vhost, exchange string) (rec []BindingInfo, err error)

func (*Client) ListExchangeBindingsWithSource

func (c *Client) ListExchangeBindingsWithSource(vhost, exchange string) (rec []BindingInfo, err error)

func (*Client) ListExchanges

func (c *Client) ListExchanges() (rec []ExchangeInfo, err error)

func (*Client) ListExchangesIn

func (c *Client) ListExchangesIn(vhost string) (rec []ExchangeInfo, err error)

func (*Client) ListNodes

func (c *Client) ListNodes() (rec []NodeInfo, err error)

func (*Client) ListPermissions

func (c *Client) ListPermissions() (rec []PermissionInfo, err error)

Returns permissions for all users and virtual hosts.

func (*Client) ListPermissionsOf

func (c *Client) ListPermissionsOf(username string) (rec []PermissionInfo, err error)

Returns permissions of a specific user.

func (*Client) ListPolicies

func (c *Client) ListPolicies() (rec []Policy, err error)

Return all policies (across all virtual hosts).

func (*Client) ListPoliciesIn

func (c *Client) ListPoliciesIn(vhost string) (rec []Policy, err error)

Returns policies in a specific virtual host.

func (*Client) ListQueueBindings

func (c *Client) ListQueueBindings(vhost, queue string) (rec []BindingInfo, err error)

Returns all bindings of individual queue.

func (*Client) ListQueueBindingsBetween

func (c *Client) ListQueueBindingsBetween(vhost, exchange string, queue string) (rec []BindingInfo, err error)

func (*Client) ListQueues

func (c *Client) ListQueues() (rec []QueueInfo, err error)

func (*Client) ListQueuesIn

func (c *Client) ListQueuesIn(vhost string) (rec []QueueInfo, err error)

func (*Client) ListQueuesWithParameters

func (c *Client) ListQueuesWithParameters(params url.Values) (rec []QueueInfo, err error)

func (*Client) ListShovels

func (c *Client) ListShovels() (rec []ShovelInfo, err error)

ListShovels returns all shovels

func (*Client) ListShovelsIn

func (c *Client) ListShovelsIn(vhost string) (rec []ShovelInfo, err error)

ListShovelsIn returns all shovels in a vhost

func (*Client) ListUsers

func (c *Client) ListUsers() (rec []UserInfo, err error)

Returns a list of all users in a cluster.

func (*Client) ListVhosts

func (c *Client) ListVhosts() (rec []VhostInfo, err error)

Returns a list of virtual hosts.

func (*Client) Overview

func (c *Client) Overview() (rec *Overview, err error)

func (*Client) PagedListQueuesWithParameters

func (c *Client) PagedListQueuesWithParameters(params url.Values) (rec PagedQueueInfo, err error)

func (*Client) ProtocolPorts

func (c *Client) ProtocolPorts() (res map[string]Port, err error)

func (*Client) PurgeQueue

func (c *Client) PurgeQueue(vhost, queue string) (res *http.Response, err error)

func (*Client) PutFederationUpstream

func (c *Client) PutFederationUpstream(vhost string, upstreamName string, fDef FederationDefinition) (res *http.Response, err error)

Updates a federation upstream

func (*Client) PutPolicy

func (c *Client) PutPolicy(vhost string, name string, policy Policy) (res *http.Response, err error)

Updates a policy.

func (*Client) PutUser

func (c *Client) PutUser(username string, info UserSettings) (res *http.Response, err error)

Updates information about individual user.

func (*Client) PutUserWithoutPassword

func (c *Client) PutUserWithoutPassword(username string, info UserSettings) (res *http.Response, err error)

func (*Client) PutVhost

func (c *Client) PutVhost(vhostname string, settings VhostSettings) (res *http.Response, err error)

Creates or updates a virtual host.

func (*Client) SetClusterName

func (c *Client) SetClusterName(cn ClusterName) (res *http.Response, err error)

func (*Client) SetTimeout

func (c *Client) SetTimeout(timeout time.Duration)

SetTimeout changes the HTTP timeout that the Client will use. By default there is no timeout.

func (*Client) SetTransport

func (c *Client) SetTransport(transport *http.Transport)

SetTransport changes the Transport Layer that the Client will use.

func (*Client) UpdatePermissionsIn

func (c *Client) UpdatePermissionsIn(vhost, username string, permissions Permissions) (res *http.Response, err error)

Updates permissions of user in virtual host.

func (*Client) Whoami

func (c *Client) Whoami() (rec *WhoamiInfo, err error)

type ClusterName

type ClusterName struct {
	Name string `json:"name"`
}

type ConnectionInfo

type ConnectionInfo struct {
	// Connection name
	Name string `json:"name"`
	// Node the client is connected to
	Node string `json:"node"`
	// Number of open channels
	Channels int `json:"channels"`
	// Connection state
	State string `json:"state"`
	// Connection type, network (via AMQP client) or direct (via direct Erlang client)
	Type string `json:"type"`

	// Server port
	Port Port `json:"port"`
	// Client port
	PeerPort Port `json:"peer_port"`

	// Server host
	Host string `json:"host"`
	// Client host
	PeerHost string `json:"peer_host"`

	// Last connection blocking reason, if any
	LastBlockedBy string `json:"last_blocked_by"`
	// When connection was last blocked
	LastBlockedAge string `json:"last_blocked_age"`

	// True if connection uses TLS/SSL
	UsesTLS bool `json:"ssl"`
	// Client certificate subject
	PeerCertSubject string `json:"peer_cert_subject"`
	// Client certificate validity
	PeerCertValidity string `json:"peer_cert_validity"`
	// Client certificate issuer
	PeerCertIssuer string `json:"peer_cert_issuer"`

	// TLS/SSL protocol and version
	SSLProtocol string `json:"ssl_protocol"`
	// Key exchange mechanism
	SSLKeyExchange string `json:"ssl_key_exchange"`
	// SSL cipher suite used
	SSLCipher string `json:"ssl_cipher"`
	// SSL hash
	SSLHash string `json:"ssl_hash"`

	// Protocol, e.g. AMQP 0-9-1 or MQTT 3-1
	Protocol string `json:"protocol"`
	User     string `json:"user"`
	// Virtual host
	Vhost string `json:"vhost"`

	// Heartbeat timeout
	Timeout int `json:"timeout"`
	// Maximum frame size (AMQP 0-9-1)
	FrameMax int `json:"frame_max"`

	// A map of client properties (name, version, capabilities, etc)
	ClientProperties Properties `json:"client_properties"`

	// Octets received
	RecvOct uint64 `json:"recv_oct"`
	// Octets sent
	SendOct     uint64 `json:"send_oct"`
	RecvCount   uint64 `json:"recv_cnt"`
	SendCount   uint64 `json:"send_cnt"`
	SendPending uint64 `json:"send_pend"`
	// Ingress data rate
	RecvOctDetails RateDetails `json:"recv_oct_details"`
	// Egress data rate
	SendOctDetails RateDetails `json:"send_oct_details"`
}

Provides information about connection to a RabbitMQ node.

type DetailedExchangeInfo

type DetailedExchangeInfo struct {
	Name       string                 `json:"name"`
	Vhost      string                 `json:"vhost"`
	Type       string                 `json:"type"`
	Durable    bool                   `json:"durable"`
	AutoDelete bool                   `json:"auto_delete"`
	Internal   bool                   `json:"internal"`
	Arguments  map[string]interface{} `json:"arguments"`

	Incoming []ExchangeIngressDetails `json:"incoming"`
	Outgoing []ExchangeEgressDetails  `json:"outgoing"`
}

type DetailedQueueInfo

type DetailedQueueInfo QueueInfo

type ErlangApp

type ErlangApp NameDescriptionVersion

type ErrorResponse

type ErrorResponse struct {
	StatusCode int
	Message    string `json:"error"`
	Reason     string `json:"reason"`
}

func (ErrorResponse) Error

func (rme ErrorResponse) Error() string

type ExchangeEgressDetails

type ExchangeEgressDetails struct {
	Stats MessageStats `json:"stats"`
	Queue NameAndVhost `json:"queue"`
}

type ExchangeInfo

type ExchangeInfo struct {
	Name       string                 `json:"name"`
	Vhost      string                 `json:"vhost"`
	Type       string                 `json:"type"`
	Durable    bool                   `json:"durable"`
	AutoDelete bool                   `json:"auto_delete"`
	Internal   bool                   `json:"internal"`
	Arguments  map[string]interface{} `json:"arguments"`

	MessageStats IngressEgressStats `json:"message_stats"`
}

type ExchangeIngressDetails

type ExchangeIngressDetails struct {
	Stats          MessageStats      `json:"stats"`
	ChannelDetails PublishingChannel `json:"channel_details"`
}

type ExchangeSettings

type ExchangeSettings struct {
	Type       string                 `json:"type"`
	Durable    bool                   `json:"durable"`
	AutoDelete bool                   `json:"auto_delete,omitempty"`
	Arguments  map[string]interface{} `json:"arguments,omitempty"`
}

type ExchangeType

type ExchangeType NameDescriptionEnabled

type FederationDefinition

type FederationDefinition struct {
	Uri            string `json:"uri"`
	Expires        int    `json:"expires"`
	MessageTTL     int32  `json:"message-ttl"`
	MaxHops        int    `json:"max-hops"`
	PrefetchCount  int    `json:"prefetch-count"`
	ReconnectDelay int    `json:"reconnect-delay"`
	AckMode        string `json:"ack-mode,omitempty"`
	TrustUserId    bool   `json:"trust-user-id"`
	Exchange       string `json:"exchange"`
	Queue          string `json:"queue"`
}

Federation definition: additional arguments added to the entities (queues, exchanges or both) that match a policy.

type FederationUpstream

type FederationUpstream struct {
	Definition FederationDefinition `json:"value"`
}

Represents a configured Federation upstream.

type HashingAlgorithm

type HashingAlgorithm string
const (
	HashingAlgorithmSHA256 HashingAlgorithm = "rabbit_password_hashing_sha256"
	HashingAlgorithmSHA512 HashingAlgorithm = "rabbit_password_hashing_sha512"

	// deprecated, provided to support responses that include users created
	// before RabbitMQ 3.6 and other legacy scenarios. MK.
	HashingAlgorithmMD5 HashingAlgorithm = "rabbit_password_hashing_md5"
)

func (HashingAlgorithm) String

func (algo HashingAlgorithm) String() string

type IngressEgressStats

type IngressEgressStats struct {
	PublishIn        int         `json:"publish_in"`
	PublishInDetails RateDetails `json:"publish_in_details"`

	PublishOut        int         `json:"publish_out"`
	PublishOutDetails RateDetails `json:"publish_out_details"`
}

type Listener

type Listener struct {
	Node      string `json:"node"`
	Protocol  string `json:"protocol"`
	IpAddress string `json:"ip_address"`
	Port      Port   `json:"port"`
}

type MessageStats

type MessageStats struct {
	Publish             int64       `json:"publish"`
	PublishDetails      RateDetails `json:"publish_details"`
	Deliver             int64       `json:"deliver"`
	DeliverDetails      RateDetails `json:"deliver_details"`
	DeliverNoAck        int64       `json:"deliver_noack"`
	DeliverNoAckDetails RateDetails `json:"deliver_noack_details"`
	DeliverGet          int64       `json:"deliver_get"`
	DeliverGetDetails   RateDetails `json:"deliver_get_details"`
	Redeliver           int64       `json:"redeliver"`
	RedeliverDetails    RateDetails `json:"redeliver_details"`
	Get                 int64       `json:"get"`
	GetDetails          RateDetails `json:"get_details"`
	GetNoAck            int64       `json:"get_no_ack"`
	GetNoAckDetails     RateDetails `json:"get_no_ack_details"`
	Ack                 int64       `json:"ack"`
	AckDetails          RateDetails `json:"ack_details"`
}

Basic published messages statistics

type NameAndVhost

type NameAndVhost struct {
	Name  string `json:"name"`
	Vhost string `json:"vhost"`
}

type NameDescriptionEnabled

type NameDescriptionEnabled struct {
	Name        string `json:"name"`
	Description string `json:"description"`
	Enabled     bool   `json:"enabled"`
}

type NameDescriptionVersion

type NameDescriptionVersion struct {
	Name        string `json:"name"`
	Description string `json:"description"`
	Version     string `json:"version"`
}

type NodeInfo

type NodeInfo struct {
	Name      string `json:"name"`
	NodeType  string `json:"type"`
	IsRunning bool   `json:"running"`
	OsPid     OsPid  `json:"os_pid"`

	FdUsed        int  `json:"fd_used"`
	FdTotal       int  `json:"fd_total"`
	ProcUsed      int  `json:"proc_used"`
	ProcTotal     int  `json:"proc_total"`
	SocketsUsed   int  `json:"sockets_used"`
	SocketsTotal  int  `json:"sockets_total"`
	MemUsed       int  `json:"mem_used"`
	MemLimit      int  `json:"mem_limit"`
	MemAlarm      bool `json:"mem_alarm"`
	DiskFree      int  `json:"disk_free"`
	DiskFreeLimit int  `json:"disk_free_limit"`
	DiskFreeAlarm bool `json:"disk_free_alarm"`

	// Erlang scheduler run queue length
	RunQueueLength uint32 `json:"run_queue"`
	Processors     uint32 `json:"processors"`
	Uptime         uint64 `json:"uptime"`

	ExchangeTypes  []ExchangeType  `json:"exchange_types"`
	AuthMechanisms []AuthMechanism `json:"auth_mechanisms"`
	ErlangApps     []ErlangApp     `json:"applications"`
	Contexts       []BrokerContext `json:"contexts"`

	Partitions []string `json:"partitions"`
}

type NodeNames

type NodeNames []string

type ObjectTotals

type ObjectTotals struct {
	Consumers   int `json:"consumers"`
	Queues      int `json:"queues"`
	Exchanges   int `json:"exchanges"`
	Connections int `json:"connections"`
	Channels    int `json:"channels"`
}

type OsPid

type OsPid string

type Overview

type Overview struct {
	ManagementVersion string          `json:"management_version"`
	StatisticsLevel   string          `json:"statistics_level"`
	RabbitMQVersion   string          `json:"rabbitmq_version"`
	ErlangVersion     string          `json:"erlang_version"`
	FullErlangVersion string          `json:"erlang_full_version"`
	ExchangeTypes     []ExchangeType  `json:"exchange_types"`
	MessageStats      MessageStats    `json:"message_stats"`
	QueueTotals       QueueTotals     `json:"queue_totals"`
	ObjectTotals      ObjectTotals    `json:"object_totals"`
	Node              string          `json:"node"`
	StatisticsDBNode  string          `json:"statistics_db_node"`
	Listeners         []Listener      `json:"listeners"`
	Contexts          []BrokerContext `json:"contexts"`
}

type OwnerPidDetails

type OwnerPidDetails struct {
	Name     string `json:"name"`
	PeerPort Port   `json:"peer_port"`
	PeerHost string `json:"peer_host"`
}

type PagedQueueInfo

type PagedQueueInfo struct {
	Page          int         `json:"page"`
	PageCount     int         `json:"page_count"`
	PageSize      int         `json:"page_size"`
	FilteredCount int         `json:"filtered_count"`
	ItemCount     int         `json:"item_count"`
	TotalCount    int         `json:"total_count"`
	Items         []QueueInfo `json:"items"`
}

type PermissionInfo

type PermissionInfo struct {
	User  string `json:"user"`
	Vhost string `json:"vhost"`

	// Configuration permissions
	Configure string `json:"configure"`
	// Write permissions
	Write string `json:"write"`
	// Read permissions
	Read string `json:"read"`
}

type Permissions

type Permissions struct {
	Configure string `json:"configure"`
	Write     string `json:"write"`
	Read      string `json:"read"`
}

type Policy

type Policy struct {
	// Virtual host this policy is in.
	Vhost string `json:"vhost"`
	// Regular expression pattern used to match queues and exchanges,
	// , e.g. "^ha\..+"
	Pattern string `json:"pattern"`
	// What this policy applies to: "queues", "exchanges", etc.
	ApplyTo  string `json:"apply-to"`
	Name     string `json:"name"`
	Priority int    `json:"priority"`
	// Additional arguments added to the entities (queues,
	// exchanges or both) that match a policy
	Definition PolicyDefinition `json:"definition"`
}

Represents a configured policy.

type PolicyDefinition

type PolicyDefinition map[string]interface{}

Policy definition: additional arguments added to the entities (queues, exchanges or both) that match a policy.

type Port

type Port int

Port used by RabbitMQ or clients

func (*Port) UnmarshalJSON

func (p *Port) UnmarshalJSON(b []byte) error

type Properties

type Properties map[string]interface{}

Extra arguments as a map (on queues, bindings, etc)

type PublishingChannel

type PublishingChannel struct {
	Number         int    `json:"number"`
	Name           string `json:"name"`
	ConnectionName string `json:"connection_name"`
	PeerPort       Port   `json:"peer_port"`
	PeerHost       string `json:"peer_host"`
}

type QueueInfo

type QueueInfo struct {
	// Queue name
	Name string `json:"name"`
	// Virtual host this queue belongs to
	Vhost string `json:"vhost"`
	// Is this queue durable?
	Durable bool `json:"durable"`
	// Is this queue auto-delted?
	AutoDelete bool `json:"auto_delete"`
	// Extra queue arguments
	Arguments map[string]interface{} `json:"arguments"`

	// RabbitMQ node that hosts master for this queue
	Node string `json:"node"`
	// Queue status
	Status string `json:"status"`

	// Total amount of RAM used by this queue
	Memory int64 `json:"memory"`
	// How many consumers this queue has
	Consumers int `json:"consumers"`
	// Utilisation of all the consumers
	ConsumerUtilisation float64 `json:"consumer_utilisation"`
	// If there is an exclusive consumer, its consumer tag
	ExclusiveConsumerTag string `json:"exclusive_consumer_tag"`

	// Policy applied to this queue, if any
	Policy string `json:"policy"`

	// Total bytes of messages in this queues
	MessagesBytes           int64 `json:"message_bytes"`
	MessagesBytesPersistent int64 `json:"message_bytes_persistent"`
	MessagesBytesRAM        int64 `json:"message_bytes_ram"`

	// Total number of messages in this queue
	Messages           int         `json:"messages"`
	MessagesDetails    RateDetails `json:"messages_details"`
	MessagesPersistent int         `json:"messages_persistent"`
	MessagesRAM        int         `json:"messages_ram"`

	// Number of messages ready to be delivered
	MessagesReady        int         `json:"messages_ready"`
	MessagesReadyDetails RateDetails `json:"messages_ready_details"`

	// Number of messages delivered and pending acknowledgements from consumers
	MessagesUnacknowledged        int         `json:"messages_unacknowledged"`
	MessagesUnacknowledgedDetails RateDetails `json:"messages_unacknowledged_details"`

	MessageStats MessageStats `json:"message_stats"`

	OwnerPidDetails OwnerPidDetails `json:"owner_pid_details"`

	BackingQueueStatus BackingQueueStatus `json:"backing_queue_status"`

	ActiveConsumers int64 `json:"active_consumers"`
}

type QueueSettings

type QueueSettings struct {
	Type       string                 `json:"type"`
	Durable    bool                   `json:"durable"`
	AutoDelete bool                   `json:"auto_delete,omitempty"`
	Arguments  map[string]interface{} `json:"arguments,omitempty"`
}

type QueueTotals

type QueueTotals struct {
	Messages        int         `json:"messages"`
	MessagesDetails RateDetails `json:"messages_details"`

	MessagesReady        int         `json:"messages_ready"`
	MessagesReadyDetails RateDetails `json:"messages_ready_details"`

	MessagesUnacknowledged        int         `json:"messages_unacknowledged"`
	MessagesUnacknowledgedDetails RateDetails `json:"messages_unacknowledged_details"`
}

type RateDetailSample

type RateDetailSample struct {
	Sample    int64 `json:"sample"`
	Timestamp int64 `json:"timestamp"`
}

RateDetailSample single touple

type RateDetails

type RateDetails struct {
	Rate    float32            `json:"rate"`
	Samples []RateDetailSample `json:"samples"`
}

Rate of change of a numerical value

type ShovelDefinition

type ShovelDefinition struct {
	SourceURI              string `json:"src-uri"`
	SourceExchange         string `json:"src-exchange,omitempty"`
	SourceExchangeKey      string `json:"src-exchange-key,omitempty"`
	SourceQueue            string `json:"src-queue,omitempty"`
	DestinationURI         string `json:"dest-uri"`
	DestinationExchange    string `json:"dest-exchange,omitempty"`
	DestinationExchangeKey string `json:"dest-exchange-key,omitempty"`
	DestinationQueue       string `json:"dest-queue,omitempty"`
	PrefetchCount          int    `json:"prefetch-count,omitempty"`
	ReconnectDelay         int    `json:"reconnect-delay,omitempty"`
	AddForwardHeaders      bool   `json:"add-forward-headers"`
	AckMode                string `json:"ack-mode"`
	DeleteAfter            string `json:"delete-after"`
}

ShovelDefinition contains the details of the shovel configuration

type ShovelDefinitionDTO

type ShovelDefinitionDTO struct {
	Definition ShovelDefinition `json:"value"`
}

ShovelDefinitionDTO provides a data transfer object

type ShovelInfo

type ShovelInfo struct {
	// Shovel name
	Name string `json:"name"`
	// Virtual host this shovel belongs to
	Vhost string `json:"vhost"`
	// Component shovels belong to
	Component string `json:"component"`
	// Details the configuration values of the shovel
	Definition ShovelDefinition `json:"value"`
}

ShovelInfo contains the configuration of a shovel

type UserInfo

type UserInfo struct {
	Name             string           `json:"name"`
	PasswordHash     string           `json:"password_hash"`
	HashingAlgorithm HashingAlgorithm `json:"hashing_algorithm,omitempty"`
	// Tags control permissions. Built-in tags: administrator, management, policymaker.
	Tags string `json:"tags"`
}

type UserSettings

type UserSettings struct {
	Name string `json:"name"`
	// Tags control permissions. Administrator grants full
	// permissions, management grants management UI and HTTP API
	// access, policymaker grants policy management permissions.
	Tags string `json:"tags"`

	// *never* returned by RabbitMQ. Set by the client
	// to create/update a user. MK.
	Password         string           `json:"password,omitempty"`
	PasswordHash     string           `json:"password_hash,omitempty"`
	HashingAlgorithm HashingAlgorithm `json:"hashing_algorithm,omitempty"`
}

Settings used to create users. Tags must be comma-separated.

type VhostInfo

type VhostInfo struct {
	// Virtual host name
	Name string `json:"name"`
	// True if tracing is enabled for this virtual host
	Tracing bool `json:"tracing"`

	// Total number of messages in queues of this virtual host
	Messages        int         `json:"messages"`
	MessagesDetails RateDetails `json:"messages_details"`

	// Total number of messages ready to be delivered in queues of this virtual host
	MessagesReady        int         `json:"messages_ready"`
	MessagesReadyDetails RateDetails `json:"messages_ready_details"`

	// Total number of messages pending acknowledgement from consumers in this virtual host
	MessagesUnacknowledged        int         `json:"messages_unacknowledged"`
	MessagesUnacknowledgedDetails RateDetails `json:"messages_unacknowledged_details"`

	// Octets received
	RecvOct uint64 `json:"recv_oct"`
	// Octets sent
	SendOct        uint64      `json:"send_oct"`
	RecvCount      uint64      `json:"recv_cnt"`
	SendCount      uint64      `json:"send_cnt"`
	SendPending    uint64      `json:"send_pend"`
	RecvOctDetails RateDetails `json:"recv_oct_details"`
	SendOctDetails RateDetails `json:"send_oct_details"`
}

type VhostSettings

type VhostSettings struct {
	// True if tracing should be enabled.
	Tracing bool `json:"tracing"`
}

Settings used to create or modify virtual hosts.

type WhoamiInfo

type WhoamiInfo struct {
	Name        string `json:"name"`
	Tags        string `json:"tags"`
	AuthBackend string `json:"auth_backend"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL