Documentation ¶
Overview ¶
Package proton wraps Proton-C, an event-driven, concurrent-unsafe AMQP 1.0 C library (package 'electron' is more "Go-like" and concurrent-safe)
Consult the C API documentation at http://qpid.apache.org/proton for more information about the types here. There is a 1-1 correspondence between C type pn_foo_t and Go type proton.Foo, and between C function
pn_foo_do_something(pn_foo_t*, ...)
and Go method
func (proton.Foo) DoSomething(...)
The proton.Engine type pumps data between a Go net.Conn and a proton event loop goroutine that feeds events to a proton.MessagingHandler, which you must implement. See the Engine documentation for more.
MessagingHandler defines an event handling interface that you can implement to react to AMQP protocol events. There is also a lower-level EventHandler, but MessagingHandler provides a simpler set of events and automates common tasks for you, for most applications it will be more convenient.
NOTE: Methods on most types defined in this package (Sessions, Links etc.) can *only* be called in the event handler goroutine of the relevant Connection/Engine, either by the HandleEvent method of a handler type or in a function injected into the goroutine via Inject() or InjectWait() Handlers and injected functions can set up channels to communicate with other goroutines. Note the Injecter associated with a handler available as part of the Event value passed to HandleEvent.
Separate Engine instances are independent, and can run concurrently.
The 'electron' package is built on the proton package but instead offers a concurrent-safe API that can use simple procedural loops rather than event handlers to express application logic. It is easier to use for most applications.
Internal implementation details - ignore.
Index ¶
- Constants
- func CloseError(e Endpoint, err error)
- func Decref(c CHandle)
- func EndpointError(e Endpoint) error
- func Incref(c CHandle)
- func PnError(e *C.pn_error_t) error
- type CHandle
- type Condition
- func (c Condition) CPtr() unsafe.Pointer
- func (c Condition) Clear()
- func (c Condition) Description() string
- func (c Condition) Error() error
- func (c Condition) Info() Data
- func (c Condition) IsNil() bool
- func (c Condition) IsRedirect() bool
- func (c Condition) IsSet() bool
- func (c Condition) Name() string
- func (c Condition) RedirectHost() string
- func (c Condition) RedirectPort() int
- func (c Condition) SetDescription(description string) int
- func (c Condition) SetError(err error)
- func (c Condition) SetName(name string) int
- type Connection
- func (c Connection) CPtr() unsafe.Pointer
- func (c Connection) Close()
- func (c Connection) Condition() Condition
- func (c Connection) Container() string
- func (c Connection) DesiredCapabilities() Data
- func (c Connection) Error() error
- func (c Connection) Free()
- func (c Connection) Hostname() string
- func (c Connection) IsNil() bool
- func (c Connection) LinkHead(s State) Link
- func (c Connection) Links(state State) (links []Link)
- func (c Connection) OfferedCapabilities() Data
- func (c Connection) Open()
- func (c Connection) Properties() Data
- func (c Connection) Release()
- func (c Connection) RemoteCondition() Condition
- func (c Connection) RemoteContainer() string
- func (c Connection) RemoteDesiredCapabilities() Data
- func (c Connection) RemoteHostname() string
- func (c Connection) RemoteOfferedCapabilities() Data
- func (c Connection) RemoteProperties() Data
- func (c Connection) Reset()
- func (c Connection) Session() (Session, error)
- func (c Connection) SessionHead(s State) Session
- func (c Connection) Sessions(state State) (sessions []Session)
- func (c Connection) SetContainer(container string)
- func (c Connection) SetHostname(hostname string)
- func (c Connection) SetPassword(password []byte)
- func (c Connection) SetUser(user string)
- func (c Connection) State() State
- func (c Connection) String() string
- func (c Connection) Transport() Transport
- func (c Connection) Type() string
- func (c Connection) User() string
- type Data
- type Delivery
- func (d Delivery) Accept()
- func (d Delivery) Buffered() bool
- func (d Delivery) CPtr() unsafe.Pointer
- func (d Delivery) Clear()
- func (d Delivery) Current() bool
- func (d Delivery) Dump()
- func (d Delivery) HasMessage() bool
- func (d Delivery) IsNil() bool
- func (d Delivery) Link() Link
- func (d Delivery) Local() Disposition
- func (d Delivery) LocalState() uint64
- func (delivery Delivery) Message() (m amqp.Message, err error)
- func (d Delivery) Partial() bool
- func (d Delivery) Pending() uint
- func (d Delivery) Readable() bool
- func (d Delivery) Reject()
- func (d Delivery) Release(delivered bool)
- func (d Delivery) Remote() Disposition
- func (d Delivery) RemoteState() uint64
- func (d Delivery) Settle()
- func (d Delivery) SettleAs(disposition uint64)
- func (d Delivery) Settled() bool
- func (d Delivery) Tag() DeliveryTag
- func (d Delivery) Update(state uint64)
- func (d Delivery) Updated() bool
- func (d Delivery) Writable() bool
- type DeliveryTag
- type Disposition
- func (d Disposition) Annotations() Data
- func (d Disposition) CPtr() unsafe.Pointer
- func (d Disposition) Condition() Condition
- func (d Disposition) Data() Data
- func (d Disposition) IsFailed() bool
- func (d Disposition) IsNil() bool
- func (d Disposition) IsUndeliverable() bool
- func (d Disposition) SectionNumber() uint16
- func (d Disposition) SectionOffset() uint64
- func (d Disposition) SetFailed(failed bool)
- func (d Disposition) SetSectionNumber(section_number uint16)
- func (d Disposition) SetSectionOffset(section_offset uint64)
- func (d Disposition) SetUndeliverable(undeliverable bool)
- func (d Disposition) Type() uint64
- type DistributionMode
- type Durability
- type Endpoint
- type Engine
- func (eng *Engine) Close(err error)
- func (eng *Engine) CloseTimeout(err error, timeout time.Duration)
- func (eng *Engine) Connection() Connection
- func (eng *Engine) Disconnect(err error)
- func (eng *Engine) Error() error
- func (eng *Engine) Id() string
- func (eng *Engine) Initialize(conn net.Conn, handlers ...EventHandler) error
- func (eng *Engine) Inject(f func()) error
- func (eng *Engine) InjectWait(f func() error) error
- func (eng *Engine) Run() error
- func (eng *Engine) Server()
- func (eng *Engine) String() string
- func (eng *Engine) Transport() Transport
- type ErrorHolder
- type Event
- type EventHandler
- type EventType
- type ExpiryPolicy
- type Injecter
- type Link
- func (l Link) Advance() bool
- func (l Link) Available() int
- func (l Link) CPtr() unsafe.Pointer
- func (l Link) Close()
- func (l Link) Condition() Condition
- func (l Link) Connection() Connection
- func (l Link) Credit() int
- func (l Link) Current() Delivery
- func (l Link) Delivery(tag string) Delivery
- func (l Link) Detach()
- func (l Link) Drain(credit int)
- func (l Link) Drained() int
- func (l Link) Draining() bool
- func (l Link) Error() error
- func (l Link) Flow(credit int)
- func (l Link) Free()
- func (l Link) IsDrain() bool
- func (l Link) IsNil() bool
- func (l Link) IsReceiver() bool
- func (l Link) IsSender() bool
- func (l Link) Name() string
- func (l Link) Next(state State) Link
- func (l Link) Offered(credit int)
- func (l Link) Open()
- func (l Link) Queued() int
- func (l Link) RcvSettleMode() RcvSettleMode
- func (l Link) Recv(buf []byte) int
- func (l Link) RemoteCondition() Condition
- func (l Link) RemoteCredit() int
- func (l Link) RemoteRcvSettleMode() RcvSettleMode
- func (l Link) RemoteSndSettleMode() SndSettleMode
- func (l Link) RemoteSource() Terminus
- func (l Link) RemoteTarget() Terminus
- func (link Link) Send(m amqp.Message) (Delivery, error)
- func (l Link) SendBytes(bytes []byte) int
- func (l Link) Session() Session
- func (l Link) SetDrain(drain bool)
- func (l Link) SetRcvSettleMode(mode RcvSettleMode)
- func (l Link) SetSndSettleMode(mode SndSettleMode)
- func (l Link) SndSettleMode() SndSettleMode
- func (l Link) Source() Terminus
- func (l Link) State() State
- func (l Link) String() string
- func (l Link) Target() Terminus
- func (l Link) Type() string
- func (l Link) Unsettled() int
- type MessagingAdapter
- type MessagingEvent
- type MessagingHandler
- type PnErrorCode
- type RcvSettleMode
- type SASL
- func (s SASL) AllowInsecureMechs() bool
- func (s SASL) AllowedMechs(mechs string)
- func (s SASL) CPtr() unsafe.Pointer
- func (s SASL) ConfigName(name string)
- func (s SASL) ConfigPath(path string)
- func (s SASL) Done(outcome SASLOutcome)
- func (s SASL) IsNil() bool
- func (s SASL) Mech() string
- func (s SASL) Outcome() SASLOutcome
- func (s SASL) SetAllowInsecureMechs(insecure bool)
- func (s SASL) User() string
- type SASLOutcome
- type Session
- func (s Session) CPtr() unsafe.Pointer
- func (s Session) Close()
- func (s Session) Condition() Condition
- func (s Session) Connection() Connection
- func (s Session) Error() error
- func (s Session) Free()
- func (s Session) IncomingBytes() uint
- func (s Session) IncomingCapacity() uint
- func (s Session) IsNil() bool
- func (s Session) Next(state State) Session
- func (s Session) Open()
- func (s Session) OutgoingBytes() uint
- func (s Session) OutgoingWindow() uint
- func (s Session) Receiver(name string) Link
- func (s Session) RemoteCondition() Condition
- func (s Session) Sender(name string) Link
- func (s Session) SetIncomingCapacity(capacity uint)
- func (s Session) SetOutgoingWindow(window uint)
- func (s Session) State() State
- func (s Session) String() string
- func (s Session) Type() string
- type SndSettleMode
- type State
- func (s State) Has(bits State) bool
- func (s State) Local() State
- func (s State) LocalActive() bool
- func (s State) LocalClosed() bool
- func (s State) LocalUninit() bool
- func (s State) Remote() State
- func (s State) RemoteActive() bool
- func (s State) RemoteClosed() bool
- func (s State) RemoteUninit() bool
- type Terminus
- func (t Terminus) Address() string
- func (t Terminus) CPtr() unsafe.Pointer
- func (t Terminus) Capabilities() Data
- func (t Terminus) Copy(src Terminus) int
- func (t Terminus) Durability() Durability
- func (t Terminus) ExpiryPolicy() ExpiryPolicy
- func (t Terminus) Filter() Data
- func (t Terminus) IsDynamic() bool
- func (t Terminus) IsNil() bool
- func (t Terminus) Outcomes() Data
- func (t Terminus) Properties() Data
- func (t Terminus) SetAddress(address string) int
- func (t Terminus) SetDistributionMode(mode DistributionMode) int
- func (t Terminus) SetDurability(durability Durability) int
- func (t Terminus) SetDynamic(dynamic bool) int
- func (t Terminus) SetExpiryPolicy(policy ExpiryPolicy) int
- func (t Terminus) SetTimeout(timeout time.Duration) int
- func (t Terminus) SetType(type_ TerminusType) int
- func (t Terminus) Timeout() time.Duration
- func (t Terminus) Type() TerminusType
- type TerminusType
- type Transport
- func (t Transport) Bind(connection Connection) int
- func (t Transport) CPtr() unsafe.Pointer
- func (t Transport) Capacity() int
- func (t Transport) ChannelMax() uint32
- func (t Transport) CloseHead() int
- func (t Transport) CloseTail() int
- func (t Transport) Closed() bool
- func (t Transport) Condition() Condition
- func (t Transport) Connection() Connection
- func (t Transport) Error() error
- func (t Transport) Free()
- func (t Transport) Head() unsafe.Pointer
- func (t Transport) IdleTimeout() time.Duration
- func (t Transport) Input(bytes string, available uint) int
- func (t Transport) IsAuthenticated() bool
- func (t Transport) IsEncrypted() bool
- func (t Transport) IsNil() bool
- func (t Transport) Log(message string)
- func (t Transport) MaxFrame() uint16
- func (t Transport) Output(bytes string, size uint) int
- func (t Transport) Peek(dst string, size uint) int
- func (t Transport) Pending() int
- func (t Transport) Pop(size uint)
- func (t Transport) Process(size uint) int
- func (t Transport) Push(bytes []byte) int
- func (t Transport) Quiesced() bool
- func (t Transport) RemoteChannelMax() uint32
- func (t Transport) RemoteIdleTimeout() time.Duration
- func (t Transport) RemoteMaxFrame() uint16
- func (t Transport) RequireAuth(required bool)
- func (t Transport) RequireEncryption(required bool)
- func (t Transport) SASL() SASL
- func (t Transport) SetChannelMax(channel_max uint32) int
- func (t Transport) SetIdleTimeout(timeout time.Duration)
- func (t Transport) SetMaxFrame(size uint16)
- func (t Transport) SetServer()
- func (t Transport) String() string
- func (t Transport) Tail() unsafe.Pointer
- func (t Transport) Tick(now time.Time) time.Time
- func (t Transport) Unbind() int
- func (t Transport) User() string
- type UUID
Constants ¶
const ( SLocalUninit State = C.PN_LOCAL_UNINIT SLocalActive = C.PN_LOCAL_ACTIVE SLocalClosed = C.PN_LOCAL_CLOSED SRemoteUninit = C.PN_REMOTE_UNINIT SRemoteActive = C.PN_REMOTE_ACTIVE SRemoteClosed = C.PN_REMOTE_CLOSED )
const ( Received uint64 = C.PN_RECEIVED Accepted = C.PN_ACCEPTED Rejected = C.PN_REJECTED Released = C.PN_RELEASED Modified = C.PN_MODIFIED )
Variables ¶
This section is empty.
Functions ¶
func CloseError ¶
CloseError sets an error condition (if err != nil) on an endpoint and closes the endpoint if not already closed
func Decref ¶
func Decref(c CHandle)
Decref decreases the refcount of a proton value, freeing the underlying C struct if this is the last reference. Only call this if you previously called Incref() for this value.
func EndpointError ¶
EndpointError returns the remote error if there is one, the local error if not nil if there is no error.
func Incref ¶
func Incref(c CHandle)
Incref increases the refcount of a proton value, which prevents the underlying C struct being freed until you call Decref().
It can be useful to "pin" a proton value in memory while it is in use by goroutines other than the event loop goroutine. For example if you Incref() a Link, the underlying object is not freed when the link is closed, so means other goroutines can continue to safely use it as an index in a map or inject it into the event loop goroutine. There will of course be an error if you try to use a link after it is closed, but not a segmentation fault.
func PnError ¶
func PnError(e *C.pn_error_t) error
Types ¶
type CHandle ¶
type CHandle interface { // CPtr returns the unsafe C pointer, equivalent to a C void*. CPtr() unsafe.Pointer }
CHandle holds an unsafe.Pointer to a proton C struct, the C type depends on the Go type implementing this interface. For low level, at-your-own-risk use only.
type Condition ¶
type Condition struct {
// contains filtered or unexported fields
}
func (Condition) Description ¶
func (Condition) IsRedirect ¶
func (Condition) RedirectHost ¶
func (Condition) RedirectPort ¶
func (Condition) SetDescription ¶
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
func (Connection) CPtr ¶
func (c Connection) CPtr() unsafe.Pointer
func (Connection) Close ¶
func (c Connection) Close()
func (Connection) Condition ¶
func (c Connection) Condition() Condition
func (Connection) Container ¶
func (c Connection) Container() string
func (Connection) DesiredCapabilities ¶
func (c Connection) DesiredCapabilities() Data
func (Connection) Error ¶
func (c Connection) Error() error
func (Connection) Free ¶
func (c Connection) Free()
func (Connection) Hostname ¶
func (c Connection) Hostname() string
func (Connection) IsNil ¶
func (c Connection) IsNil() bool
func (Connection) LinkHead ¶
func (c Connection) LinkHead(s State) Link
func (Connection) Links ¶
func (c Connection) Links(state State) (links []Link)
func (Connection) OfferedCapabilities ¶
func (c Connection) OfferedCapabilities() Data
func (Connection) Open ¶
func (c Connection) Open()
func (Connection) Properties ¶
func (c Connection) Properties() Data
func (Connection) Release ¶
func (c Connection) Release()
func (Connection) RemoteCondition ¶
func (c Connection) RemoteCondition() Condition
func (Connection) RemoteContainer ¶
func (c Connection) RemoteContainer() string
func (Connection) RemoteDesiredCapabilities ¶
func (c Connection) RemoteDesiredCapabilities() Data
func (Connection) RemoteHostname ¶
func (c Connection) RemoteHostname() string
func (Connection) RemoteOfferedCapabilities ¶
func (c Connection) RemoteOfferedCapabilities() Data
func (Connection) RemoteProperties ¶
func (c Connection) RemoteProperties() Data
func (Connection) Reset ¶
func (c Connection) Reset()
func (Connection) Session ¶
func (c Connection) Session() (Session, error)
func (Connection) SessionHead ¶
func (c Connection) SessionHead(s State) Session
func (Connection) Sessions ¶
func (c Connection) Sessions(state State) (sessions []Session)
func (Connection) SetContainer ¶
func (c Connection) SetContainer(container string)
func (Connection) SetHostname ¶
func (c Connection) SetHostname(hostname string)
func (Connection) SetPassword ¶
func (c Connection) SetPassword(password []byte)
SetPassword takes []byte not string because it is impossible to erase a string from memory reliably. Proton will not keep the password in memory longer than needed, the caller should overwrite their copy on return.
The password must not contain embedded nul characters, a trailing nul is ignored.
func (Connection) SetUser ¶
func (c Connection) SetUser(user string)
func (Connection) State ¶
func (c Connection) State() State
func (Connection) String ¶
func (c Connection) String() string
Unique (per process) string identifier for a connection, useful for debugging.
func (Connection) Transport ¶
func (c Connection) Transport() Transport
func (Connection) Type ¶
func (c Connection) Type() string
func (Connection) User ¶
func (c Connection) User() string
type Data ¶
type Data struct {
// contains filtered or unexported fields
}
Data holds a pointer to decoded AMQP data. Use amqp.marshal/unmarshal to access it as Go data types.
type Delivery ¶
type Delivery struct {
// contains filtered or unexported fields
}
func (Delivery) HasMessage ¶
HasMessage is true if all message data is available. Equivalent to !d.isNil && d.Readable() && !d.Partial()
func (Delivery) Local ¶
func (d Delivery) Local() Disposition
func (Delivery) LocalState ¶
func (Delivery) Message ¶
Message decodes the message containined in a delivery.
Must be called in the correct link context with this delivery as the current message, handling an MMessage event is always a safe context to call this function.
Will return an error if message is incomplete or not current.
func (Delivery) Release ¶
Release releases and settles a delivery If delivered is true the delivery count for the message will be increased.
func (Delivery) Remote ¶
func (d Delivery) Remote() Disposition
func (Delivery) RemoteState ¶
func (Delivery) Tag ¶
func (d Delivery) Tag() DeliveryTag
type DeliveryTag ¶
type DeliveryTag struct {
// contains filtered or unexported fields
}
func (DeliveryTag) String ¶
func (t DeliveryTag) String() string
type Disposition ¶
type Disposition struct {
// contains filtered or unexported fields
}
func (Disposition) Annotations ¶
func (d Disposition) Annotations() Data
func (Disposition) CPtr ¶
func (d Disposition) CPtr() unsafe.Pointer
func (Disposition) Condition ¶
func (d Disposition) Condition() Condition
func (Disposition) Data ¶
func (d Disposition) Data() Data
func (Disposition) IsFailed ¶
func (d Disposition) IsFailed() bool
func (Disposition) IsNil ¶
func (d Disposition) IsNil() bool
func (Disposition) IsUndeliverable ¶
func (d Disposition) IsUndeliverable() bool
func (Disposition) SectionNumber ¶
func (d Disposition) SectionNumber() uint16
func (Disposition) SectionOffset ¶
func (d Disposition) SectionOffset() uint64
func (Disposition) SetFailed ¶
func (d Disposition) SetFailed(failed bool)
func (Disposition) SetSectionNumber ¶
func (d Disposition) SetSectionNumber(section_number uint16)
func (Disposition) SetSectionOffset ¶
func (d Disposition) SetSectionOffset(section_offset uint64)
func (Disposition) SetUndeliverable ¶
func (d Disposition) SetUndeliverable(undeliverable bool)
func (Disposition) Type ¶
func (d Disposition) Type() uint64
type DistributionMode ¶
type DistributionMode C.pn_distribution_mode_t
const ( DistModeUnspecified DistributionMode = C.PN_DIST_MODE_UNSPECIFIED DistModeCopy DistributionMode = C.PN_DIST_MODE_COPY DistModeMove DistributionMode = C.PN_DIST_MODE_MOVE )
func (DistributionMode) String ¶
func (e DistributionMode) String() string
type Durability ¶
type Durability C.pn_durability_t
const ( Nondurable Durability = C.PN_NONDURABLE Configuration Durability = C.PN_CONFIGURATION Deliveries Durability = C.PN_DELIVERIES )
func (Durability) String ¶
func (e Durability) String() string
type Endpoint ¶
type Endpoint interface { // State is the open/closed state. State() State // Open an endpoint. Open() // Close an endpoint. Close() // Condition holds a local error condition. Condition() Condition // RemoteCondition holds a remote error condition. RemoteCondition() Condition // Human readable name String() string // Human readable endpoint type "link", "session" etc. Type() string }
Endpoint is the common interface for Connection, Link and Session.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine reads from a net.Conn, decodes AMQP events and calls the appropriate Handler functions sequentially in a single goroutine. Actions taken by Handler functions (such as sending messages) are encoded and written to the net.Conn. You can create multiple Engines to handle multiple connections concurrently.
You implement the EventHandler and/or MessagingHandler interfaces and provide those values to NewEngine(). Their HandleEvent method will be called in the event-handling goroutine.
Handlers can pass values from an event (Connections, Links, Deliveries etc.) to other goroutines, store them, or use them as map indexes. Effectively they are just pointers. Other goroutines cannot call their methods directly but they can can create a function closure to call such methods and pass it to Engine.Inject() to have it evaluated in the engine goroutine.
You are responsible for ensuring you don't use an event value after it is invalid. The handler methods will tell you when a value is no longer valid. For example after a LinkClosed event, that link is no longer valid. If you do Link.Close() yourself (in a handler or injected function) the link remains valid until the corresponing LinkClosed event is received by the handler.
Engine.Close() will take care of cleaning up any remaining values when you are done with the Engine. All values associated with a engine become invalid when you call Engine.Close()
The qpid.apache.org/proton/concurrent package will do all this for you, so it may be a better choice for some applications.
func NewEngine ¶
func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error)
Create a new Engine and call Initialize() with conn and handlers
func (*Engine) Close ¶
Close the engine's connection. If err != nil pass it to the remote end as the close condition. Returns when the remote end closes or disconnects.
func (*Engine) CloseTimeout ¶
CloseTimeout like Close but disconnect if the remote end doesn't close within timeout.
func (*Engine) Connection ¶
func (eng *Engine) Connection() Connection
func (*Engine) Disconnect ¶
Disconnect the engine's connection immediately without an AMQP close. Process any termination events before returning.
func (*Engine) Initialize ¶
func (eng *Engine) Initialize(conn net.Conn, handlers ...EventHandler) error
Initialize an Engine with a connection and handlers. Start it with Run()
func (*Engine) Inject ¶
Inject a function into the Engine's event loop.
f() will be called in the same event-processing goroutine that calls Handler methods. f() can safely call methods on values that belong to this engine (Sessions, Links etc)
The injected function has no parameters or return values. It is normally a closure and can use channels to communicate with the injecting goroutine if necessary.
Returns a non-nil error if the engine is closed before the function could be injected.
func (*Engine) InjectWait ¶
InjectWait is like Inject but does not return till f() has completed or the engine is closed, and returns an error value from f()
func (*Engine) Run ¶
Run the engine. Engine.Run() will exit when the engine is closed or disconnected. You can check for errors after exit with Engine.Error().
type ErrorHolder ¶
type ErrorHolder struct {
// contains filtered or unexported fields
}
ErrorHolder is a goroutine-safe error holder that keeps the first error that is set.
func (*ErrorHolder) Set ¶
func (e *ErrorHolder) Set(err error)
Set the error if not already set, return the error in the Holder.
type Event ¶
type Event struct {
// contains filtered or unexported fields
}
Event is an AMQP protocol event.
func (Event) Connection ¶
func (e Event) Connection() Connection
type EventHandler ¶
type EventHandler interface { // HandleEvent is called with an event. // Typically HandleEvent() is implemented as a switch on e.Type() // Returning an error will stop the Engine. HandleEvent(e Event) }
EventHandler handles core proton events.
type EventType ¶
type EventType int
const ( EConnectionInit EventType = C.PN_CONNECTION_INIT EConnectionBound EventType = C.PN_CONNECTION_BOUND EConnectionUnbound EventType = C.PN_CONNECTION_UNBOUND EConnectionLocalOpen EventType = C.PN_CONNECTION_LOCAL_OPEN EConnectionRemoteOpen EventType = C.PN_CONNECTION_REMOTE_OPEN EConnectionLocalClose EventType = C.PN_CONNECTION_LOCAL_CLOSE EConnectionRemoteClose EventType = C.PN_CONNECTION_REMOTE_CLOSE EConnectionFinal EventType = C.PN_CONNECTION_FINAL ESessionInit EventType = C.PN_SESSION_INIT ESessionLocalOpen EventType = C.PN_SESSION_LOCAL_OPEN ESessionRemoteOpen EventType = C.PN_SESSION_REMOTE_OPEN ESessionLocalClose EventType = C.PN_SESSION_LOCAL_CLOSE ESessionRemoteClose EventType = C.PN_SESSION_REMOTE_CLOSE ESessionFinal EventType = C.PN_SESSION_FINAL ELinkInit EventType = C.PN_LINK_INIT ELinkLocalOpen EventType = C.PN_LINK_LOCAL_OPEN ELinkRemoteOpen EventType = C.PN_LINK_REMOTE_OPEN ELinkLocalClose EventType = C.PN_LINK_LOCAL_CLOSE ELinkRemoteClose EventType = C.PN_LINK_REMOTE_CLOSE ELinkLocalDetach EventType = C.PN_LINK_LOCAL_DETACH ELinkRemoteDetach EventType = C.PN_LINK_REMOTE_DETACH ELinkFlow EventType = C.PN_LINK_FLOW ELinkFinal EventType = C.PN_LINK_FINAL EDelivery EventType = C.PN_DELIVERY ETransport EventType = C.PN_TRANSPORT ETransportAuthenticated EventType = C.PN_TRANSPORT_AUTHENTICATED ETransportError EventType = C.PN_TRANSPORT_ERROR ETransportHeadClosed EventType = C.PN_TRANSPORT_HEAD_CLOSED ETransportTailClosed EventType = C.PN_TRANSPORT_TAIL_CLOSED ETransportClosed EventType = C.PN_TRANSPORT_CLOSED )
type ExpiryPolicy ¶
type ExpiryPolicy C.pn_expiry_policy_t
const ( ExpireWithLink ExpiryPolicy = C.PN_EXPIRE_WITH_LINK ExpireWithSession ExpiryPolicy = C.PN_EXPIRE_WITH_SESSION ExpireWithConnection ExpiryPolicy = C.PN_EXPIRE_WITH_CONNECTION ExpireNever ExpiryPolicy = C.PN_EXPIRE_NEVER )
func (ExpiryPolicy) String ¶
func (e ExpiryPolicy) String() string
type Injecter ¶
type Injecter interface { // Inject a function into the engine goroutine. // // f() will be called in the same goroutine as event handlers, so it can safely // use values belonging to event handlers without synchronization. f() should // not block, no further events or injected functions can be processed until // f() returns. // // Returns a non-nil error if the function could not be injected and will // never be called. Otherwise the function will eventually be called. // // Note that proton values (Link, Session, Connection etc.) that existed when // Inject(f) was called may have become invalid by the time f() is executed. // Handlers should handle keep track of Closed events to ensure proton values // are not used after they become invalid. One technique is to have map from // proton values to application values. Check that the map has the correct // proton/application value pair at the start of the injected function and // delete the value from the map when handling a Closed event. Inject(f func()) error // InjectWait is like Inject but does not return till f() has completed. // If f() cannot be injected it returns the error from Inject(), otherwise // it returns the error from f() InjectWait(f func() error) error }
Injecter allows functions to be "injected" into the event-processing loop, to be called in the same goroutine as event handlers.
type Link ¶
type Link struct {
// contains filtered or unexported fields
}
func (Link) Connection ¶
func (l Link) Connection() Connection
func (Link) IsDrain ¶
IsDrain calls pn_link_get_drain(), it conflicts with pn_link_drain() under the normal mapping.
func (Link) IsReceiver ¶
func (Link) RcvSettleMode ¶
func (l Link) RcvSettleMode() RcvSettleMode
func (Link) RemoteCondition ¶
func (Link) RemoteCredit ¶
func (Link) RemoteRcvSettleMode ¶
func (l Link) RemoteRcvSettleMode() RcvSettleMode
func (Link) RemoteSndSettleMode ¶
func (l Link) RemoteSndSettleMode() SndSettleMode
func (Link) RemoteSource ¶
func (Link) RemoteTarget ¶
func (Link) Send ¶
Send sends a amqp.Message over a Link. Returns a Delivery that can be use to determine the outcome of the message.
func (Link) SetRcvSettleMode ¶
func (l Link) SetRcvSettleMode(mode RcvSettleMode)
func (Link) SetSndSettleMode ¶
func (l Link) SetSndSettleMode(mode SndSettleMode)
func (Link) SndSettleMode ¶
func (l Link) SndSettleMode() SndSettleMode
type MessagingAdapter ¶
type MessagingAdapter struct { // AutoSettle (default true) automatically pre-settle outgoing messages. AutoSettle bool // AutoAccept (default true) automatically accept and settle incoming messages // if they are not settled by the delegate. AutoAccept bool // AutoOpen (default true) automatically open remotely opened endpoints. AutoOpen bool // Prefetch (default 10) initial credit to issue for incoming links. Prefetch int // PeerCloseIsError (default false) if true a close by the peer will be treated as an error. PeerCloseError bool // contains filtered or unexported fields }
MessagingAdapter implments a EventHandler and delegates to a MessagingHandler. You can modify the exported fields before you pass the MessagingAdapter to a Engine.
func NewMessagingAdapter ¶
func NewMessagingAdapter(h MessagingHandler) *MessagingAdapter
func (*MessagingAdapter) HandleEvent ¶
func (d *MessagingAdapter) HandleEvent(e Event)
Handle a proton event by passing the corresponding MessagingEvent(s) to the MessagingHandler.
type MessagingEvent ¶
type MessagingEvent int
MessagingEvent provides a set of events that are easier to work with than the core events defined by EventType
There are 3 types of "endpoint": Connection, Session and Link. For each endpoint there are 5 events: Opening, Opened, Closing, Closed and Error.
The meaning of these events is as follows:
Opening: The remote end opened, the local end will open automatically.
Opened: Both ends are open, regardless of which end opened first.
Closing: The remote end closed without error, the local end will close automatically.
Error: The remote end closed with an error, the local end will close automatically.
Closed: Both ends are closed, regardless of which end closed first or if there was an error. No further events will be received for the endpoint.
const ( // The event loop starts. MStart MessagingEvent = iota // The peer closes the connection with an error condition. MConnectionError // The peer closes the session with an error condition. MSessionError // The peer closes the link with an error condition. MLinkError // The peer Initiates the opening of the connection. MConnectionOpening // The peer initiates the opening of the session. MSessionOpening // The peer initiates the opening of the link. MLinkOpening // The connection is opened. MConnectionOpened // The session is opened. MSessionOpened // The link is opened. MLinkOpened // The peer initiates the closing of the connection. MConnectionClosing // The peer initiates the closing of the session. MSessionClosing // The peer initiates the closing of the link. MLinkClosing // Both ends of the connection are closed. MConnectionClosed // Both ends of the session are closed. MSessionClosed // Both ends of the link are closed. MLinkClosed // The sender link has credit and messages can // therefore be transferred. MSendable // The remote peer accepts an outgoing message. MAccepted // The remote peer rejects an outgoing message. MRejected // The peer releases an outgoing message. Note that this may be in response to // either the RELEASE or MODIFIED state as defined by the AMQP specification. MReleased // The peer has settled the outgoing message. This is the point at which it // should never be re-transmitted. MSettled // A message is received. Call Event.Delivery().Message() to decode as an amqp.Message. // To manage the outcome of this messages (e.g. to accept or reject the message) // use Event.Delivery(). MMessage // A network connection was disconnected. MDisconnected )
func (MessagingEvent) String ¶
func (t MessagingEvent) String() string
type MessagingHandler ¶
type MessagingHandler interface { // HandleMessagingEvent is called with MessagingEvent. // Typically HandleEvent() is implemented as a switch on e.Type() // Returning an error will stop the Engine. HandleMessagingEvent(MessagingEvent, Event) }
MessagingHandler provides an alternative interface to EventHandler. it is easier to use for most applications that send and receive messages.
Implement this interface and then wrap your value with a MessagingHandlerDelegator. MessagingHandlerDelegator implements EventHandler and can be registered with a Engine.
type PnErrorCode ¶
type PnErrorCode int
func (PnErrorCode) String ¶
func (e PnErrorCode) String() string
type RcvSettleMode ¶
type RcvSettleMode C.pn_rcv_settle_mode_t
const ( RcvFirst RcvSettleMode = C.PN_RCV_FIRST RcvSecond RcvSettleMode = C.PN_RCV_SECOND )
func (RcvSettleMode) String ¶
func (e RcvSettleMode) String() string
type SASL ¶
type SASL struct {
// contains filtered or unexported fields
}
func (SASL) AllowInsecureMechs ¶
func (SASL) AllowedMechs ¶
func (SASL) ConfigName ¶
func (SASL) ConfigPath ¶
func (SASL) Done ¶
func (s SASL) Done(outcome SASLOutcome)
func (SASL) Outcome ¶
func (s SASL) Outcome() SASLOutcome
func (SASL) SetAllowInsecureMechs ¶
type SASLOutcome ¶
type SASLOutcome C.pn_sasl_outcome_t
const ( SASLNone SASLOutcome = C.PN_SASL_NONE SASLOk SASLOutcome = C.PN_SASL_OK SASLAuth SASLOutcome = C.PN_SASL_AUTH SASLSys SASLOutcome = C.PN_SASL_SYS SASLPerm SASLOutcome = C.PN_SASL_PERM SASLTemp SASLOutcome = C.PN_SASL_TEMP )
func (SASLOutcome) String ¶
func (e SASLOutcome) String() string
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
func (Session) Connection ¶
func (s Session) Connection() Connection
func (Session) IncomingBytes ¶
func (Session) IncomingCapacity ¶
func (Session) OutgoingBytes ¶
func (Session) OutgoingWindow ¶
func (Session) RemoteCondition ¶
func (Session) SetIncomingCapacity ¶
func (Session) SetOutgoingWindow ¶
type SndSettleMode ¶
type SndSettleMode C.pn_snd_settle_mode_t
const ( SndUnsettled SndSettleMode = C.PN_SND_UNSETTLED SndSettled SndSettleMode = C.PN_SND_SETTLED SndMixed SndSettleMode = C.PN_SND_MIXED )
func (SndSettleMode) String ¶
func (e SndSettleMode) String() string
type State ¶
type State byte
State holds the state flags for an AMQP endpoint.
func (State) LocalActive ¶
func (State) LocalClosed ¶
func (State) LocalUninit ¶
func (State) RemoteActive ¶
func (State) RemoteClosed ¶
func (State) RemoteUninit ¶
type Terminus ¶
type Terminus struct {
// contains filtered or unexported fields
}
func (Terminus) Capabilities ¶
func (Terminus) Durability ¶
func (t Terminus) Durability() Durability
func (Terminus) ExpiryPolicy ¶
func (t Terminus) ExpiryPolicy() ExpiryPolicy
func (Terminus) Properties ¶
func (Terminus) SetAddress ¶
func (Terminus) SetDistributionMode ¶
func (t Terminus) SetDistributionMode(mode DistributionMode) int
func (Terminus) SetDurability ¶
func (t Terminus) SetDurability(durability Durability) int
func (Terminus) SetDynamic ¶
func (Terminus) SetExpiryPolicy ¶
func (t Terminus) SetExpiryPolicy(policy ExpiryPolicy) int
func (Terminus) SetType ¶
func (t Terminus) SetType(type_ TerminusType) int
func (Terminus) Type ¶
func (t Terminus) Type() TerminusType
type TerminusType ¶
type TerminusType C.pn_terminus_type_t
const ( Unspecified TerminusType = C.PN_UNSPECIFIED Source TerminusType = C.PN_SOURCE Target TerminusType = C.PN_TARGET Coordinator TerminusType = C.PN_COORDINATOR )
func (TerminusType) String ¶
func (e TerminusType) String() string
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
func (Transport) Bind ¶
func (t Transport) Bind(connection Connection) int
func (Transport) ChannelMax ¶
func (Transport) Connection ¶
func (t Transport) Connection() Connection
func (Transport) Head ¶
Special treatment for Transport.Head, return value is unsafe.Pointer not string