Documentation
¶
Index ¶
- Constants
- Variables
- func IsConnectedTo(sub *Subscription, pub *Publication) bool
- type Aeron
- func (aeron *Aeron) AddExclusivePublication(channel string, streamID int32) chan *Publication
- func (aeron *Aeron) AddPublication(channel string, streamID int32) chan *Publication
- func (aeron *Aeron) AddSubscription(channel string, streamID int32) chan *Subscription
- func (aeron *Aeron) ClientID() int64
- func (aeron *Aeron) Close() error
- func (aeron *Aeron) NextCorrelationID() int64
- type AvailableImageHandler
- type ChannelUri
- func (uri ChannelUri) Clone() (res ChannelUri)
- func (uri ChannelUri) Get(key string) string
- func (uri ChannelUri) IsIpc() bool
- func (uri ChannelUri) IsUdp() bool
- func (uri ChannelUri) Media() string
- func (uri ChannelUri) Prefix() string
- func (uri ChannelUri) Remove(key string)
- func (uri ChannelUri) Scheme() string
- func (uri ChannelUri) Set(key string, value string)
- func (uri ChannelUri) SetMedia(media string)
- func (uri *ChannelUri) SetPrefix(prefix string)
- func (uri ChannelUri) String() (result string)
- type ClientConductor
- func (cc *ClientConductor) AddExclusivePublication(channel string, streamID int32) int64
- func (cc *ClientConductor) AddPublication(channel string, streamID int32) int64
- func (cc *ClientConductor) AddSubscription(channel string, streamID int32) int64
- func (cc *ClientConductor) Close() error
- func (cc *ClientConductor) CounterReader() *ctr.Reader
- func (cc *ClientConductor) FindPublication(regID int64) *Publication
- func (cc *ClientConductor) FindSubscription(regID int64) *Subscription
- func (cc *ClientConductor) Init(driverProxy *driver.Proxy, bcast *broadcast.CopyReceiver, ...) *ClientConductor
- func (cc *ClientConductor) OnAvailableCounter(correlationID int64, counterID int32)
- func (cc *ClientConductor) OnAvailableImage(streamID int32, sessionID int32, logFilename string, sourceIdentity string, ...)
- func (cc *ClientConductor) OnClientTimeout(clientID int64)
- func (cc *ClientConductor) OnErrorResponse(corrID int64, errorCode int32, errorMessage string)
- func (cc *ClientConductor) OnNewExclusivePublication(streamID int32, sessionID int32, posLimitCounterID int32, ...)
- func (cc *ClientConductor) OnNewPublication(streamID int32, sessionID int32, posLimitCounterID int32, ...)
- func (cc *ClientConductor) OnOperationSuccess(corrID int64)
- func (cc *ClientConductor) OnSubscriptionReady(correlationID int64, channelStatusIndicatorID int32)
- func (cc *ClientConductor) OnUnavailableCounter(correlationID int64, counterID int32)
- func (cc *ClientConductor) OnUnavailableImage(corrID int64, subscriptionRegistrationID int64)
- func (cc *ClientConductor) Start(idleStrategy idlestrategy.Idler)
- type Context
- func (ctx *Context) AeronDir(dir string) *Context
- func (ctx *Context) AvailableImageHandler(handler func(*Image)) *Context
- func (ctx *Context) CncFileName() string
- func (ctx *Context) ErrorHandler(handler func(error)) *Context
- func (ctx *Context) IdleStrategy(idleStrategy idlestrategy.Idler) *Context
- func (ctx *Context) InterServiceTimeout(to time.Duration) *Context
- func (ctx *Context) MediaDriverTimeout(to time.Duration) *Context
- func (ctx *Context) NewPublicationHandler(handler func(string, int32, int32, int64)) *Context
- func (ctx *Context) NewSubscriptionHandler(handler func(string, int32, int64)) *Context
- func (ctx *Context) PublicationConnectionTimeout(to time.Duration) *Context
- func (ctx *Context) ResourceLingerTimeout(to time.Duration) *Context
- func (ctx *Context) UnavailableImageHandler(handler func(*Image)) *Context
- type ControlledPollFragmentHandler
- type FragmentAssembler
- type Image
- func (image *Image) BoundedPoll(handler term.FragmentHandler, limitPosition int64, fragmentLimit int) int
- func (image *Image) Close() error
- func (image *Image) CorrelationID() int64
- func (image *Image) IsClosed() bool
- func (image *Image) IsEndOfStream() bool
- func (image *Image) Poll(handler term.FragmentHandler, fragmentLimit int) int
- func (image *Image) PollWithContext(handler term.FragmentHandler, fragmentLimit int) int
- func (image *Image) Position() int64
- func (image *Image) SessionID() int32
- func (image *Image) SubscriptionRegistrationID() int64
- type ImageList
- type NewPublicationHandler
- type NewSubscriptionHandler
- type Position
- type Publication
- func (pub *Publication) Channel() string
- func (pub *Publication) ChannelStatusID() int32
- func (pub *Publication) Close() error
- func (pub *Publication) InitialTermID() int32
- func (pub *Publication) IsClosed() bool
- func (pub *Publication) IsConnected() bool
- func (pub *Publication) IsOriginal() bool
- func (pub *Publication) Offer(buffer *atomic.Buffer, offset int32, length int32, ...) int64
- func (pub *Publication) Offer2(bufferOne *atomic.Buffer, offsetOne int32, lengthOne int32, ...) int64
- func (pub *Publication) OriginalRegistrationID() int64
- func (pub *Publication) RegistrationID() int64
- func (pub *Publication) SessionID() int32
- func (pub *Publication) StreamID() int32
- func (pub *Publication) TryClaim(length int32, bufferClaim *logbuffer.Claim) int64
- type Subscription
- func (sub *Subscription) Channel() string
- func (sub *Subscription) Close() error
- func (sub *Subscription) HasImages() bool
- func (sub *Subscription) ImageBySessionID(sessionID int32) *Image
- func (sub *Subscription) ImageCount() int
- func (sub *Subscription) IsClosed() bool
- func (sub *Subscription) IsConnected() bool
- func (sub *Subscription) Poll(handler term.FragmentHandler, fragmentLimit int) int
- func (sub *Subscription) PollWithContext(handler term.FragmentHandler, fragmentLimit int) int
- func (sub *Subscription) RegistrationID() int64
- func (sub *Subscription) StreamID() int32
- type UnavailableImageHandler
Constants ¶
const ( // NotConnected indicates that this Publication is not connected to the driver NotConnected int64 = -1 // BackPressured indicates that sending ring buffer is full BackPressured int64 = -2 // AdminAction indicates that terms needs to be rotated. User should retry the Offer AdminAction int64 = -3 // PublicationClosed indicates that this Publication is closed an no further Offers shall succeed PublicationClosed int64 = -4 // MaxPositionExceeded indicates that ... MaxPositionExceeded int64 = -5 )
const AeronScheme = "aeron"
AeronScheme is a URI Scheme for Aeron channels and destinations.
const (
DefaultFragmentAssemblyBufferLength = int32(4096)
)
const IpcMedia = "ipc"
IpcMedia is the media for IPC.
const SpyQualifier = "aeron-spy"
SpyQualifier is a qualifier for spy subscriptions which spy on outgoing network destined traffic efficiently.
const UdpMedia = "udp"
UdpMedia is the media for UDP.
Variables ¶
var ControlledPollAction = struct { /** * Abort the current polling operation and do not advance the position for this fragment. */ ABORT int /** * Break from the current polling operation and commit the position as of the end of the current fragment * being handled. */ BREAK int /** * Continue processing but commit the position as of the end of the current fragment so that * flow control is applied to this point. */ COMMIT int /** * Continue processing taking the same approach as the in fragment_handler_t. */ CONTINUE int }{ 1, 2, 3, 4, }
var DefaultAeronDir string = "/dev/shm"
DefaultAeronDir is the location of media driver files
var RegistrationStatus = struct { AwaitingMediaDriver int RegisteredMediaDriver int ErroredMediaDriver int }{ 0, 1, 2, }
var UserName = os.Getenv("USER")
UserName is used to determine default directory for driver files
Functions ¶
func IsConnectedTo ¶
func IsConnectedTo(sub *Subscription, pub *Publication) bool
IsConnectedTo is a helper function used primarily by tests, which is used within the same process to verify that subscription is connected to a specific publication.
Types ¶
type Aeron ¶
type Aeron struct {
// contains filtered or unexported fields
}
Aeron is the primary interface to the media driver for managing subscriptions and publications
func Connect ¶
Connect is the factory method used to create a new instance of Aeron based on Context settings
func (*Aeron) AddExclusivePublication ¶
func (aeron *Aeron) AddExclusivePublication(channel string, streamID int32) chan *Publication
AddExclusivePublication will add a new exclusive publication to the driver. If such publication already exists within ClientConductor the same instance will be returned. Returns a channel, which can be used for either blocking or non-blocking want for media driver confirmation
func (*Aeron) AddPublication ¶
func (aeron *Aeron) AddPublication(channel string, streamID int32) chan *Publication
AddPublication will add a new publication to the driver. If such publication already exists within ClientConductor the same instance will be returned. Returns a channel, which can be used for either blocking or non-blocking want for media driver confirmation
func (*Aeron) AddSubscription ¶
func (aeron *Aeron) AddSubscription(channel string, streamID int32) chan *Subscription
AddSubscription will add a new subscription to the driver. Returns a channel, which can be used for either blocking or non-blocking want for media driver confirmation
func (*Aeron) ClientID ¶
ClientID returns the client identity that has been allocated for communicating with the media driver.
func (*Aeron) Close ¶
Close will terminate client conductor and remove all publications and subscriptions from the media driver
func (*Aeron) NextCorrelationID ¶
NextCorrelationID generates the next correlation id that is unique for the connected Media Driver. This is useful generating correlation identifiers for pairing requests with responses in a clients own application protocol.
This method is thread safe and will work across processes that all use the same media driver.
type AvailableImageHandler ¶
type AvailableImageHandler func(*Image)
AvailableImageHandler is the handler type for image available notification from the media driver
type ChannelUri ¶
type ChannelUri struct {
// contains filtered or unexported fields
}
ChannelUri is a parser for Aeron channel URIs. The format is: aeron-uri = "aeron:" media [ "?" param *( "|" param ) ] media = *( "[^?:]" ) param = key "=" value key = *( "[^=]" ) value = *( "[^|]" )
Multiple params with the same key are allowed, the last value specified takes precedence.
func ParseChannelUri ¶
func ParseChannelUri(uriStr string) (uri ChannelUri, err error)
ParseChannelUri parses a string which contains an Aeron URI.
func (ChannelUri) Clone ¶
func (uri ChannelUri) Clone() (res ChannelUri)
Clone returns a deep copy of a ChannelUri.
func (ChannelUri) Get ¶
func (uri ChannelUri) Get(key string) string
func (ChannelUri) IsIpc ¶ added in v1.2.0
func (uri ChannelUri) IsIpc() bool
func (ChannelUri) IsUdp ¶ added in v1.2.0
func (uri ChannelUri) IsUdp() bool
func (ChannelUri) Media ¶
func (uri ChannelUri) Media() string
func (ChannelUri) Prefix ¶
func (uri ChannelUri) Prefix() string
func (ChannelUri) Remove ¶
func (uri ChannelUri) Remove(key string)
func (ChannelUri) Scheme ¶
func (uri ChannelUri) Scheme() string
func (ChannelUri) Set ¶
func (uri ChannelUri) Set(key string, value string)
func (ChannelUri) SetMedia ¶
func (uri ChannelUri) SetMedia(media string)
func (*ChannelUri) SetPrefix ¶
func (uri *ChannelUri) SetPrefix(prefix string)
func (ChannelUri) String ¶
func (uri ChannelUri) String() (result string)
type ClientConductor ¶
type ClientConductor struct {
// contains filtered or unexported fields
}
func (*ClientConductor) AddExclusivePublication ¶
func (cc *ClientConductor) AddExclusivePublication(channel string, streamID int32) int64
AddExclusivePublication sends the add publication command through the driver proxy
func (*ClientConductor) AddPublication ¶
func (cc *ClientConductor) AddPublication(channel string, streamID int32) int64
AddPublication sends the add publication command through the driver proxy
func (*ClientConductor) AddSubscription ¶
func (cc *ClientConductor) AddSubscription(channel string, streamID int32) int64
AddSubscription sends the add subscription command through the driver proxy
func (*ClientConductor) Close ¶
func (cc *ClientConductor) Close() error
Close will terminate the Run() goroutine body and close all active publications and subscription. Run() can be restarted in a another goroutine.
func (*ClientConductor) CounterReader ¶
func (cc *ClientConductor) CounterReader() *ctr.Reader
Return the counter reader
func (*ClientConductor) FindPublication ¶
func (cc *ClientConductor) FindPublication(regID int64) *Publication
func (*ClientConductor) FindSubscription ¶
func (cc *ClientConductor) FindSubscription(regID int64) *Subscription
func (*ClientConductor) Init ¶
func (cc *ClientConductor) Init(driverProxy *driver.Proxy, bcast *broadcast.CopyReceiver, interServiceTo, driverTo, pubConnectionTo, lingerTo time.Duration, counters *ctr.MetaDataFlyweight) *ClientConductor
Init is the primary initialization method for ClientConductor
func (*ClientConductor) OnAvailableCounter ¶
func (cc *ClientConductor) OnAvailableCounter(correlationID int64, counterID int32)
func (*ClientConductor) OnAvailableImage ¶
func (*ClientConductor) OnClientTimeout ¶
func (cc *ClientConductor) OnClientTimeout(clientID int64)
func (*ClientConductor) OnErrorResponse ¶
func (cc *ClientConductor) OnErrorResponse(corrID int64, errorCode int32, errorMessage string)
func (*ClientConductor) OnNewExclusivePublication ¶
func (cc *ClientConductor) OnNewExclusivePublication(streamID int32, sessionID int32, posLimitCounterID int32, channelStatusIndicatorID int32, logFileName string, regID int64, origRegID int64)
TODO Implement logic specific to exclusive publications
func (*ClientConductor) OnNewPublication ¶
func (*ClientConductor) OnOperationSuccess ¶
func (cc *ClientConductor) OnOperationSuccess(corrID int64)
func (*ClientConductor) OnSubscriptionReady ¶
func (cc *ClientConductor) OnSubscriptionReady(correlationID int64, channelStatusIndicatorID int32)
func (*ClientConductor) OnUnavailableCounter ¶
func (cc *ClientConductor) OnUnavailableCounter(correlationID int64, counterID int32)
func (*ClientConductor) OnUnavailableImage ¶
func (cc *ClientConductor) OnUnavailableImage(corrID int64, subscriptionRegistrationID int64)
func (*ClientConductor) Start ¶
func (cc *ClientConductor) Start(idleStrategy idlestrategy.Idler)
Start begins the main execution loop of ClientConductor on a goroutine.
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
Context configuration options are located here https://github.com/real-logic/Aeron/wiki/Configuration-Options#aeron-client-options
func NewContext ¶
func NewContext() *Context
NewContext creates and initializes new Context for Aeron
func (*Context) AvailableImageHandler ¶
AvailableImageHandler sets an optional callback for available image notifications
func (*Context) CncFileName ¶
CncFileName returns the name of the Counters file
func (*Context) ErrorHandler ¶
ErrorHandler sets the error handler callback
func (*Context) IdleStrategy ¶
func (ctx *Context) IdleStrategy(idleStrategy idlestrategy.Idler) *Context
IdleStrategy provides an IdleStrategy for the thread responsible for communicating with the Aeron Media Driver.
func (*Context) InterServiceTimeout ¶
InterServiceTimeout sets the timeout for client heartbeat
func (*Context) MediaDriverTimeout ¶
MediaDriverTimeout sets the timeout for keep alives to media driver
func (*Context) NewPublicationHandler ¶
newPublicationHandler sets an optional callback for new publications
func (*Context) NewSubscriptionHandler ¶
newSubscriptionHandler sets an optional callback for new subscriptions
func (*Context) PublicationConnectionTimeout ¶
func (*Context) ResourceLingerTimeout ¶
ResourceLingerTimeout sets the timeout for resource cleanup after they're released
func (*Context) UnavailableImageHandler ¶
UnavailableImageHandler sets an optional callback for unavailable image notification
type FragmentAssembler ¶
type FragmentAssembler struct {
// contains filtered or unexported fields
}
FragmentAssembler that sits in a chain-of-responsibility pattern that reassembles fragmented messages so that the next handler in the chain only sees whole messages.
Unfragmented messages are delegated without copy. Fragmented messages are copied to a temporary buffer for reassembly before delegation.
The Header passed to the delegate on assembling a message will be that of the last fragment.
Session based buffers will be allocated and grown as necessary based on the length of messages to be assembled.
func NewFragmentAssembler ¶
func NewFragmentAssembler(delegate term.FragmentHandler, initialBufferLength int32) *FragmentAssembler
NewFragmentAssembler constructs an adapter to reassemble message fragments and delegate on whole messages.
func (*FragmentAssembler) Clear ¶ added in v1.2.0
func (f *FragmentAssembler) Clear()
Clear removes all existing session buffers.
func (*FragmentAssembler) OnFragment ¶
func (f *FragmentAssembler) OnFragment( buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header)
OnFragment reassembles and forwards whole messages to the delegate.
type Image ¶
type Image struct {
// contains filtered or unexported fields
}
func NewImage ¶
func NewImage(sessionID int32, correlationID int64, logBuffers *logbuffer.LogBuffers) *Image
NewImage wraps around provided LogBuffers setting up the structures for polling
func (*Image) BoundedPoll ¶
func (*Image) CorrelationID ¶
CorrelationID returns the correlationId for identification of the image with the media driver.
func (*Image) IsClosed ¶
IsClosed returns whether this image has been closed. No further operations are valid.
func (*Image) IsEndOfStream ¶
IsEndOfStream returns if the current consumed position at the end of the stream?
func (*Image) PollWithContext ¶
func (image *Image) PollWithContext(handler term.FragmentHandler, fragmentLimit int) int
func (*Image) Position ¶
Position returns the position this Image has been consumed to by the subscriber.
func (*Image) SubscriptionRegistrationID ¶
SubscriptionRegistrationID returns the registrationId for the Subscription of the Image.
type ImageList ¶
type ImageList struct {
// contains filtered or unexported fields
}
ImageList is a helper class to manage list of images atomically without locks
type NewPublicationHandler ¶
NewPublicationHandler is the handler type for new publication notification from the media driver
type NewSubscriptionHandler ¶
NewSubscriptionHandler is the handler type for new subscription notification from the media driver
type Position ¶
type Position struct {
// contains filtered or unexported fields
}
Position is a wrapper for a buffer location of a position counter
type Publication ¶
type Publication struct {
// contains filtered or unexported fields
}
Publication is a sender structure
func NewPublication ¶
func NewPublication(logBuffers *logbuffer.LogBuffers) *Publication
NewPublication is a factory method create new publications
func (*Publication) Channel ¶
func (pub *Publication) Channel() string
Channel returns the media address for delivery to the channel.
func (*Publication) ChannelStatusID ¶
func (pub *Publication) ChannelStatusID() int32
ChannelStatusID returns the counter used to represent the channel status for this publication.
func (*Publication) Close ¶
func (pub *Publication) Close() error
Close will close this publication with the driver. This is a blocking call.
func (*Publication) InitialTermID ¶
func (pub *Publication) InitialTermID() int32
InitialTermID returns the initial term id assigned when this publication was created. This can be used to determine how many terms have passed since creation.
func (*Publication) IsClosed ¶
func (pub *Publication) IsClosed() bool
IsClosed returns whether this Publication has been closed
func (*Publication) IsConnected ¶
func (pub *Publication) IsConnected() bool
IsConnected returns whether this publication is connected to the driver (not whether it has any Subscriptions)
func (*Publication) IsOriginal ¶
func (pub *Publication) IsOriginal() bool
IsOriginal return true if this instance is the first added otherwise false.
func (*Publication) Offer ¶
func (pub *Publication) Offer(buffer *atomic.Buffer, offset int32, length int32, reservedValueSupplier term.ReservedValueSupplier) int64
Offer is the primary send mechanism on Publication
func (*Publication) Offer2 ¶ added in v1.1.0
func (pub *Publication) Offer2( bufferOne *atomic.Buffer, offsetOne int32, lengthOne int32, bufferTwo *atomic.Buffer, offsetTwo int32, lengthTwo int32, reservedValueSupplier term.ReservedValueSupplier, ) int64
Offer2 attempts to publish a message composed of two parts, e.g. a header and encapsulated payload.
func (*Publication) OriginalRegistrationID ¶
func (pub *Publication) OriginalRegistrationID() int64
OriginalRegistrationID returns the original registration id.
func (*Publication) RegistrationID ¶
func (pub *Publication) RegistrationID() int64
RegistrationID returns the registration id.
func (*Publication) SessionID ¶
func (pub *Publication) SessionID() int32
SessionID returns the session id for this publication.
func (*Publication) StreamID ¶
func (pub *Publication) StreamID() int32
StreamID returns Stream identity for scoping within the channel media address.
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription is the object responsible for receiving messages from media driver. It is specific to a channel and stream ID combination.
func NewSubscription ¶
func NewSubscription(conductor *ClientConductor, channel string, registrationID int64, streamID int32) *Subscription
NewSubscription is a factory method to create new subscription to be added to the media driver
func (*Subscription) Channel ¶
func (sub *Subscription) Channel() string
Channel returns the media address for delivery to the channel.
func (*Subscription) Close ¶
func (sub *Subscription) Close() error
Close will release all images in this subscription, send command to the driver and block waiting for response from the media driver. Images will be lingered by the ClientConductor.
func (*Subscription) HasImages ¶
func (sub *Subscription) HasImages() bool
HasImages is a helper method checking whether this subscription has any images associated with it.
func (*Subscription) ImageBySessionID ¶
func (sub *Subscription) ImageBySessionID(sessionID int32) *Image
ImageBySessionId returns the associated with the given sessionId.
func (*Subscription) ImageCount ¶
func (sub *Subscription) ImageCount() int
ImageCount count of images associated with this subscription.
func (*Subscription) IsClosed ¶
func (sub *Subscription) IsClosed() bool
IsClosed returns whether this subscription has been closed.
func (*Subscription) IsConnected ¶
func (sub *Subscription) IsConnected() bool
IsConnected returns if this subscription is connected by having at least one open publication Image.
func (*Subscription) Poll ¶
func (sub *Subscription) Poll(handler term.FragmentHandler, fragmentLimit int) int
Poll is the primary receive mechanism on subscription.
func (*Subscription) PollWithContext ¶
func (sub *Subscription) PollWithContext(handler term.FragmentHandler, fragmentLimit int) int
PollWithContext as for Poll() but provides an integer argument for passing contextual information
func (*Subscription) RegistrationID ¶
func (sub *Subscription) RegistrationID() int64
RegistrationID returns the registration id.
func (*Subscription) StreamID ¶
func (sub *Subscription) StreamID() int32
StreamID returns Stream identity for scoping within the channel media address.
type UnavailableImageHandler ¶
type UnavailableImageHandler func(*Image)
UnavailableImageHandler is the handler type for image unavailable notification from the media driver
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Provides a transition layer from "github.com/op/go-logging" to "go.uber.org/zap" to simply resolve some reentrancy issues in go-logging.
|
Provides a transition layer from "github.com/op/go-logging" to "go.uber.org/zap" to simply resolve some reentrancy issues in go-logging. |