Documentation ¶
Index ¶
- Constants
- Variables
- func IsConnectedTo(sub *Subscription, pub *Publication) bool
- type Aeron
- func (aeron *Aeron) AddExclusivePublication(channel string, streamID int32) chan *Publication
- func (aeron *Aeron) AddPublication(channel string, streamID int32) chan *Publication
- func (aeron *Aeron) AddSubscription(channel string, streamID int32) chan *Subscription
- func (aeron *Aeron) ClientID() int64
- func (aeron *Aeron) Close() error
- func (aeron *Aeron) NextCorrelationID() int64
- type AvailableImageHandler
- type ClientConductor
- func (cc *ClientConductor) AddExclusivePublication(channel string, streamID int32) int64
- func (cc *ClientConductor) AddPublication(channel string, streamID int32) int64
- func (cc *ClientConductor) AddSubscription(channel string, streamID int32) int64
- func (cc *ClientConductor) Close() error
- func (cc *ClientConductor) FindPublication(regID int64) *Publication
- func (cc *ClientConductor) FindSubscription(regID int64) *Subscription
- func (cc *ClientConductor) Init(driverProxy *driver.Proxy, bcast *broadcast.CopyReceiver, ...) *ClientConductor
- func (cc *ClientConductor) OnAvailableCounter(correlationID int64, counterID int32)
- func (cc *ClientConductor) OnAvailableImage(streamID int32, sessionID int32, logFilename string, sourceIdentity string, ...)
- func (cc *ClientConductor) OnClientTimeout(clientID int64)
- func (cc *ClientConductor) OnErrorResponse(corrID int64, errorCode int32, errorMessage string)
- func (cc *ClientConductor) OnNewExclusivePublication(streamID int32, sessionID int32, posLimitCounterID int32, ...)
- func (cc *ClientConductor) OnNewPublication(streamID int32, sessionID int32, posLimitCounterID int32, ...)
- func (cc *ClientConductor) OnOperationSuccess(corrID int64)
- func (cc *ClientConductor) OnSubscriptionReady(correlationID int64, channelStatusIndicatorID int32)
- func (cc *ClientConductor) OnUnavailableCounter(correlationID int64, counterID int32)
- func (cc *ClientConductor) OnUnavailableImage(corrID int64, subscriptionRegistrationID int64)
- func (cc *ClientConductor) Start(idleStrategy idlestrategy.Idler)
- type Context
- func (ctx *Context) AeronDir(dir string) *Context
- func (ctx *Context) AvailableImageHandler(handler func(*Image)) *Context
- func (ctx *Context) CncFileName() string
- func (ctx *Context) ErrorHandler(handler func(error)) *Context
- func (ctx *Context) InterServiceTimeout(to time.Duration) *Context
- func (ctx *Context) MediaDriverTimeout(to time.Duration) *Context
- func (ctx *Context) PublicationConnectionTimeout(to time.Duration) *Context
- func (ctx *Context) ResourceLingerTimeout(to time.Duration) *Context
- func (ctx *Context) UnavailableImageHandler(handler func(*Image)) *Context
- type ControlledPollFragmentHandler
- type FragmentAssembler
- type Image
- type ImageList
- type NewPublicationHandler
- type NewSubscriptionHandler
- type Position
- type Publication
- func (pub *Publication) Channel() string
- func (pub *Publication) ChannelStatusID() int32
- func (pub *Publication) Close() error
- func (pub *Publication) InitialTermID() int32
- func (pub *Publication) IsClosed() bool
- func (pub *Publication) IsConnected() bool
- func (pub *Publication) IsOriginal() bool
- func (pub *Publication) Offer(buffer *atomic.Buffer, offset int32, length int32, ...) int64
- func (pub *Publication) OriginalRegistrationID() int64
- func (pub *Publication) RegistrationID() int64
- func (pub *Publication) SessionID() int32
- func (pub *Publication) StreamID() int32
- func (pub *Publication) TryClaim(length int32, bufferClaim *logbuffer.Claim) int64
- type Subscription
- type UnavailableImageHandler
Constants ¶
const ( // NotConnected indicates that this Publication is not connected to the driver NotConnected int64 = -1 // BackPressured indicates that sending ring buffer is full BackPressured int64 = -2 // AdminAction indicates that terms needs to be rotated. User should retry the Offer AdminAction int64 = -3 // PublicationClosed indicates that this Publication is closed an no further Offers shall succeed PublicationClosed int64 = -4 // MaxPositionExceeded indicates that ... MaxPositionExceeded int64 = -5 )
const (
DefaultFragmentAssemblyBufferLength = int32(0)
)
const (
ImageClosed int = -1
)
Variables ¶
var ControlledPollAction = struct { /** * Abort the current polling operation and do not advance the position for this fragment. */ ABORT int /** * Break from the current polling operation and commit the position as of the end of the current fragment * being handled. */ BREAK int /** * Continue processing but commit the position as of the end of the current fragment so that * flow control is applied to this point. */ COMMIT int /** * Continue processing taking the same approach as the in fragment_handler_t. */ CONTINUE int }{ 1, 2, 3, 4, }
var DefaultAeronDir string = "/dev/shm"
DefaultAeronDir is the location of media driver files
var RegistrationStatus = struct { AwaitingMediaDriver int RegisteredMediaDriver int ErroredMediaDriver int }{ 0, 1, 2, }
var UserName = os.Getenv("USER")
UserName is used to determine default directory for driver files
Functions ¶
func IsConnectedTo ¶
func IsConnectedTo(sub *Subscription, pub *Publication) bool
IsConnectedTo is a helper function used primarily by tests, which is used within the same process to verify that subscription is connected to a specific publication.
Types ¶
type Aeron ¶
type Aeron struct {
// contains filtered or unexported fields
}
Aeron is the primary interface to the media driver for managing subscriptions and publications
func Connect ¶
Connect is the factory method used to create a new instance of Aeron based on Context settings
func (*Aeron) AddExclusivePublication ¶
func (aeron *Aeron) AddExclusivePublication(channel string, streamID int32) chan *Publication
AddExclusivePublication will add a new exclusive publication to the driver. If such publication already exists within ClientConductor the same instance will be returned. Returns a channel, which can be used for either blocking or non-blocking want for media driver confirmation
func (*Aeron) AddPublication ¶
func (aeron *Aeron) AddPublication(channel string, streamID int32) chan *Publication
AddPublication will add a new publication to the driver. If such publication already exists within ClientConductor the same instance will be returned. Returns a channel, which can be used for either blocking or non-blocking want for media driver confirmation
func (*Aeron) AddSubscription ¶
func (aeron *Aeron) AddSubscription(channel string, streamID int32) chan *Subscription
AddSubscription will add a new subscription to the driver. Returns a channel, which can be used for either blocking or non-blocking want for media driver confirmation
func (*Aeron) ClientID ¶
ClientID returns the client identity that has been allocated for communicating with the media driver.
func (*Aeron) Close ¶
Close will terminate client conductor and remove all publications and subscriptions from the media driver
func (*Aeron) NextCorrelationID ¶
NextCorrelationID generates the next correlation id that is unique for the connected Media Driver. This is useful generating correlation identifiers for pairing requests with responses in a clients own application protocol.
This method is thread safe and will work across processes that all use the same media driver.
type AvailableImageHandler ¶
type AvailableImageHandler func(*Image)
AvailableImageHandler is the handler type for image available notification from the media driver
type ClientConductor ¶
type ClientConductor struct {
// contains filtered or unexported fields
}
func (*ClientConductor) AddExclusivePublication ¶
func (cc *ClientConductor) AddExclusivePublication(channel string, streamID int32) int64
AddExclusivePublication sends the add publication command through the driver proxy
func (*ClientConductor) AddPublication ¶
func (cc *ClientConductor) AddPublication(channel string, streamID int32) int64
AddPublication sends the add publication command through the driver proxy
func (*ClientConductor) AddSubscription ¶
func (cc *ClientConductor) AddSubscription(channel string, streamID int32) int64
AddSubscription sends the add subscription command through the driver proxy
func (*ClientConductor) Close ¶
func (cc *ClientConductor) Close() error
Close will terminate the Run() goroutine body and close all active publications and subscription. Run() can be restarted in a another goroutine.
func (*ClientConductor) FindPublication ¶
func (cc *ClientConductor) FindPublication(regID int64) *Publication
func (*ClientConductor) FindSubscription ¶
func (cc *ClientConductor) FindSubscription(regID int64) *Subscription
func (*ClientConductor) Init ¶
func (cc *ClientConductor) Init(driverProxy *driver.Proxy, bcast *broadcast.CopyReceiver, interServiceTo, driverTo, pubConnectionTo, lingerTo time.Duration, counters *ctr.MetaDataFlyweight) *ClientConductor
Init is the primary initialization method for ClientConductor
func (*ClientConductor) OnAvailableCounter ¶
func (cc *ClientConductor) OnAvailableCounter(correlationID int64, counterID int32)
func (*ClientConductor) OnAvailableImage ¶
func (*ClientConductor) OnClientTimeout ¶
func (cc *ClientConductor) OnClientTimeout(clientID int64)
func (*ClientConductor) OnErrorResponse ¶
func (cc *ClientConductor) OnErrorResponse(corrID int64, errorCode int32, errorMessage string)
func (*ClientConductor) OnNewExclusivePublication ¶
func (cc *ClientConductor) OnNewExclusivePublication(streamID int32, sessionID int32, posLimitCounterID int32, channelStatusIndicatorID int32, logFileName string, regID int64, origRegID int64)
TODO Implement logic specific to exclusive publications
func (*ClientConductor) OnNewPublication ¶
func (*ClientConductor) OnOperationSuccess ¶
func (cc *ClientConductor) OnOperationSuccess(corrID int64)
func (*ClientConductor) OnSubscriptionReady ¶
func (cc *ClientConductor) OnSubscriptionReady(correlationID int64, channelStatusIndicatorID int32)
func (*ClientConductor) OnUnavailableCounter ¶
func (cc *ClientConductor) OnUnavailableCounter(correlationID int64, counterID int32)
func (*ClientConductor) OnUnavailableImage ¶
func (cc *ClientConductor) OnUnavailableImage(corrID int64, subscriptionRegistrationID int64)
func (*ClientConductor) Start ¶
func (cc *ClientConductor) Start(idleStrategy idlestrategy.Idler)
Start begins the main execution loop of ClientConductor on a goroutine.
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
Context configuration options are located here https://github.com/real-logic/Aeron/wiki/Configuration-Options#aeron-client-options
func NewContext ¶
func NewContext() *Context
NewContext creates and initializes new Context for Aeron
func (*Context) AvailableImageHandler ¶
AvailableImageHandler sets an optional callback for available image notifications
func (*Context) CncFileName ¶
CncFileName returns the name of the Counters file
func (*Context) ErrorHandler ¶
ErrorHandler sets the error handler callback
func (*Context) InterServiceTimeout ¶
InterServiceTimeout sets the timeout for client heartbeat
func (*Context) MediaDriverTimeout ¶
MediaDriverTimeout sets the timeout for keep alives to media driver
func (*Context) PublicationConnectionTimeout ¶
func (*Context) ResourceLingerTimeout ¶
ResourceLingerTimeout sets the timeout for resource cleanup after they're released
func (*Context) UnavailableImageHandler ¶
UnavailableImageHandler sets an optional callback for unavailable image notification
type FragmentAssembler ¶
type FragmentAssembler struct {
// contains filtered or unexported fields
}
FragmentAssember that sits in a chain-of-responsibility pattern that reassembles fragmented messages so that the next handler in the chain only sees whole messages. Unfragmented messages are delegated without copy. Fragmented messages are copied to a temporary buffer for reassembly before delegation.
The Header passed to the delegate on assembling a message will be that of the last fragment.
Session based buffers will be allocated and grown as necessary based on the length of messages to be assembled.
func NewFragmentAssembler ¶
func NewFragmentAssembler(delegate term.FragmentHandler, initialBufferLength int32) *FragmentAssembler
NewFragmentAssembler constructs an adapter to reassemble message fragments and delegate on whole messages.
func (*FragmentAssembler) OnFragment ¶
func (self *FragmentAssembler) OnFragment( buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header)
OnFragment reassembles and forwards whole messages to the delegate.
type Image ¶
type Image struct {
// contains filtered or unexported fields
}
func NewImage ¶
func NewImage(sessionID int32, correlationID int64, logBuffers *logbuffer.LogBuffers) *Image
NewImage wraps around provided LogBuffers setting up the structures for polling
type ImageList ¶
type ImageList struct {
// contains filtered or unexported fields
}
ImageList is a helper class to manage list of images atomically without locks
type NewPublicationHandler ¶
NewPublicationHandler is the handler type for new publication notification from the media driver
type NewSubscriptionHandler ¶
NewSubscriptionHandler is the handler type for new subscription notification from the media driver
type Position ¶
type Position struct {
// contains filtered or unexported fields
}
Position is a wrapper for a buffer location of a position counter
type Publication ¶
type Publication struct {
// contains filtered or unexported fields
}
Publication is a sender structure
func NewPublication ¶
func NewPublication(logBuffers *logbuffer.LogBuffers) *Publication
NewPublication is a factory method create new publications
func (*Publication) Channel ¶
func (pub *Publication) Channel() string
Channel returns the media address for delivery to the channel.
func (*Publication) ChannelStatusID ¶
func (pub *Publication) ChannelStatusID() int32
ChannelStatusID returns the counter used to represent the channel status for this publication.
func (*Publication) Close ¶
func (pub *Publication) Close() error
Close will close this publication with the driver. This is a blocking call.
func (*Publication) InitialTermID ¶
func (pub *Publication) InitialTermID() int32
InitialTermID returns the initial term id assigned when this publication was created. This can be used to determine how many terms have passed since creation.
func (*Publication) IsClosed ¶
func (pub *Publication) IsClosed() bool
IsClosed returns whether this Publication has been closed
func (*Publication) IsConnected ¶
func (pub *Publication) IsConnected() bool
IsConnected returns whether this publication is connected to the driver (not whether it has any Subscriptions)
func (*Publication) IsOriginal ¶
func (pub *Publication) IsOriginal() bool
IsOriginal return true if this instance is the first added otherwise false.
func (*Publication) Offer ¶
func (pub *Publication) Offer(buffer *atomic.Buffer, offset int32, length int32, reservedValueSupplier term.ReservedValueSupplier) int64
Offer is the primary send mechanism on Publication
func (*Publication) OriginalRegistrationID ¶
func (pub *Publication) OriginalRegistrationID() int64
OriginalRegistrationID returns the original registration id.
func (*Publication) RegistrationID ¶
func (pub *Publication) RegistrationID() int64
RegistrationID returns the registration id.
func (*Publication) SessionID ¶
func (pub *Publication) SessionID() int32
SessionID returns the session id for this publication.
func (*Publication) StreamID ¶
func (pub *Publication) StreamID() int32
StreamID returna Stream identity for scoping within the channel media address.
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription is the object responsible for receiving messages from media driver. It is specific to a channel and stream ID combination.
func NewSubscription ¶
func NewSubscription(conductor *ClientConductor, channel string, registrationID int64, streamID int32) *Subscription
NewSubscription is a factory method to create new subscription to be added to the media driver
func (*Subscription) Close ¶
func (sub *Subscription) Close() error
Close will release all images in this subscription, send command to the driver and block waiting for response from the media driver. Images will be lingered by the ClientConductor.
func (*Subscription) HasImages ¶
func (sub *Subscription) HasImages() bool
HasImages is a helper method checking whether this subscription has any images associated with it.
func (*Subscription) IsClosed ¶
func (sub *Subscription) IsClosed() bool
IsClosed returns whether this subscription has been closed.
func (*Subscription) Poll ¶
func (sub *Subscription) Poll(handler term.FragmentHandler, fragmentLimit int) int
Poll is the primary receive mechanism on subscription.
type UnavailableImageHandler ¶
type UnavailableImageHandler func(*Image)
UnavailableImageHandler is the handler type for image unavailable notification from the media driver