Documentation ¶
Index ¶
- Constants
- Variables
- func ChannelStatusString(channelStatus int) string
- func IsConnectedTo(sub *Subscription, pub *Publication) bool
- func NewImage(sessionID int32, correlationID int64, logBuffers *logbuffer.LogBuffers) *image
- type Aeron
- func (aeron *Aeron) AddAvailableCounterHandler(handler AvailableCounterHandler) int64
- func (aeron *Aeron) AddCounter(typeId int32, keyBuffer *atomic.Buffer, keyOffset int32, keyLength int32, ...) (int64, error)
- func (aeron *Aeron) AddCounterByLabel(typeId int32, label string) (int64, error)
- func (aeron *Aeron) AddExclusivePublication(channel string, streamID int32) (*Publication, error)
- func (aeron *Aeron) AddExclusivePublicationDeprecated(channel string, streamID int32) chan *Publication
- func (aeron *Aeron) AddPublication(channel string, streamID int32) (*Publication, error)
- func (aeron *Aeron) AddPublicationDeprecated(channel string, streamID int32) chan *Publication
- func (aeron *Aeron) AddSubscription(channel string, streamID int32) (*Subscription, error)
- func (aeron *Aeron) AddSubscriptionDeprecated(channel string, streamID int32) chan *Subscription
- func (aeron *Aeron) AddSubscriptionWithHandlers(channel string, streamID int32, onAvailableImage AvailableImageHandler, ...) (*Subscription, error)
- func (aeron *Aeron) AddUnavailableCounterHandler(handler UnavailableCounterHandler) int64
- func (aeron *Aeron) AsyncAddExclusivePublication(channel string, streamID int32) (int64, error)
- func (aeron *Aeron) AsyncAddPublication(channel string, streamID int32) (int64, error)
- func (aeron *Aeron) AsyncAddSubscription(channel string, streamID int32) (int64, error)
- func (aeron *Aeron) AsyncAddSubscriptionWithHandlers(channel string, streamID int32, onAvailableImage AvailableImageHandler, ...) (int64, error)
- func (aeron *Aeron) ClientID() int64
- func (aeron *Aeron) Close() error
- func (aeron *Aeron) CounterReader() *counters.Reader
- func (aeron *Aeron) FindCounter(registrationID int64) (*Counter, error)
- func (aeron *Aeron) GetExclusivePublication(registrationID int64) (*Publication, error)
- func (aeron *Aeron) GetPublication(registrationID int64) (*Publication, error)
- func (aeron *Aeron) GetSubscription(registrationID int64) (*Subscription, error)
- func (aeron *Aeron) IsClosed() bool
- func (aeron *Aeron) NextCorrelationID() int64
- func (aeron *Aeron) RemoveAvailableCounterHandler(handler AvailableCounterHandler) bool
- func (aeron *Aeron) RemoveAvailableCounterHandlerById(registrationId int64) bool
- func (aeron *Aeron) RemoveUnavailableCounterHandler(handler UnavailableCounterHandler) bool
- func (aeron *Aeron) RemoveUnavailableCounterHandlerById(registrationId int64) bool
- type AvailableCounterHandler
- type AvailableImageHandler
- type ChannelUri
- func (uri ChannelUri) Clone() (res ChannelUri)
- func (uri ChannelUri) Get(key string) string
- func (uri ChannelUri) IsIpc() bool
- func (uri ChannelUri) IsUdp() bool
- func (uri ChannelUri) Media() string
- func (uri ChannelUri) Prefix() string
- func (uri ChannelUri) Remove(key string)
- func (uri ChannelUri) Scheme() string
- func (uri ChannelUri) Set(key string, value string)
- func (uri *ChannelUri) SetControlMode(controlMode string)
- func (uri *ChannelUri) SetMedia(media string)
- func (uri *ChannelUri) SetPrefix(prefix string)
- func (uri *ChannelUri) SetSessionID(sessionID int32)
- func (uri ChannelUri) String() (result string)
- type ClientConductor
- func (cc *ClientConductor) AddAvailableCounterHandler(handler AvailableCounterHandler) int64
- func (cc *ClientConductor) AddCounter(typeId int32, keyBuffer *atomic.Buffer, keyOffset int32, keyLength int32, ...) (int64, error)
- func (cc *ClientConductor) AddCounterByLabel(typeId int32, label string) (int64, error)
- func (cc *ClientConductor) AddDestination(registrationID int64, endpointChannel string) error
- func (cc *ClientConductor) AddExclusivePublication(channel string, streamID int32) (int64, error)
- func (cc *ClientConductor) AddPublication(channel string, streamID int32) (int64, error)
- func (cc *ClientConductor) AddRcvDestination(registrationID int64, endpointChannel string) error
- func (cc *ClientConductor) AddSubscription(channel string, streamID int32) (int64, error)
- func (cc *ClientConductor) AddSubscriptionWithHandlers(channel string, streamID int32, onAvailableImage AvailableImageHandler, ...) (int64, error)
- func (cc *ClientConductor) AddUnavailableCounterHandler(handler UnavailableCounterHandler) int64
- func (cc *ClientConductor) Close() (err error)
- func (cc *ClientConductor) CounterReader() *ctr.Reader
- func (cc *ClientConductor) FindCounter(registrationID int64) (*Counter, error)
- func (cc *ClientConductor) FindPublication(registrationID int64) (*Publication, error)
- func (cc *ClientConductor) FindSubscription(registrationID int64) (*Subscription, error)
- func (cc *ClientConductor) Init(driverProxy DriverProxy, bcast *broadcast.CopyReceiver, ...) *ClientConductor
- func (cc *ClientConductor) OnAvailableCounter(registrationId int64, counterId int32)
- func (cc *ClientConductor) OnAvailableImage(streamID int32, sessionID int32, logFilename string, sourceIdentity string, ...)
- func (cc *ClientConductor) OnChannelEndpointError(corrID int64, errorMessage string)
- func (cc *ClientConductor) OnClientTimeout(clientID int64)
- func (cc *ClientConductor) OnErrorResponse(corrID int64, errorCode int32, errorMessage string)
- func (cc *ClientConductor) OnNewExclusivePublication(streamID int32, sessionID int32, posLimitCounterID int32, ...)
- func (cc *ClientConductor) OnNewPublication(streamID int32, sessionID int32, posLimitCounterID int32, ...)
- func (cc *ClientConductor) OnOperationSuccess(corrID int64)
- func (cc *ClientConductor) OnSubscriptionReady(correlationID int64, channelStatusIndicatorID int32)
- func (cc *ClientConductor) OnUnavailableCounter(registrationId int64, counterId int32)
- func (cc *ClientConductor) OnUnavailableImage(corrID int64, subscriptionRegistrationID int64)
- func (cc *ClientConductor) RemoveAvailableCounterHandler(handler AvailableCounterHandler) bool
- func (cc *ClientConductor) RemoveAvailableCounterHandlerById(registrationId int64) bool
- func (cc *ClientConductor) RemoveDestination(registrationID int64, endpointChannel string) error
- func (cc *ClientConductor) RemoveRcvDestination(registrationID int64, endpointChannel string) error
- func (cc *ClientConductor) RemoveUnavailableCounterHandler(handler UnavailableCounterHandler) bool
- func (cc *ClientConductor) RemoveUnavailableCounterHandlerById(registrationId int64) bool
- func (cc *ClientConductor) Start(idleStrategy idlestrategy.Idler)
- type Context
- func (ctx *Context) AeronDir(dir string) *Context
- func (ctx *Context) AvailableCounterHandler(handler AvailableCounterHandler)
- func (ctx *Context) AvailableImageHandler(handler func(Image)) *Context
- func (ctx *Context) CncFileName() string
- func (ctx *Context) ErrorHandler(handler func(error)) *Context
- func (ctx *Context) GetAvailableCounterHandler() AvailableCounterHandler
- func (ctx *Context) GetUnavailableCounterHandler() UnavailableCounterHandler
- func (ctx *Context) IdleStrategy(idleStrategy idlestrategy.Idler) *Context
- func (ctx *Context) InterServiceTimeout(to time.Duration) *Context
- func (ctx *Context) MediaDriverTimeout(to time.Duration) *Context
- func (ctx *Context) NewPublicationHandler(handler func(string, int32, int32, int64)) *Context
- func (ctx *Context) NewSubscriptionHandler(handler func(string, int32, int64)) *Context
- func (ctx *Context) PublicationConnectionTimeout(to time.Duration) *Context
- func (ctx *Context) ResourceLingerTimeout(to time.Duration) *Context
- func (ctx *Context) UnavailableCounterHandler(handler UnavailableCounterHandler)
- func (ctx *Context) UnavailableImageHandler(handler func(Image)) *Context
- type ControlledFragmentAssembler
- type Counter
- type DriverProxy
- type FragmentAssembler
- type IdAndAvailableCounterHandler
- type IdAndUnavailableCounterHandler
- type Image
- type ImageFactory
- type ImageList
- type MockImage
- func (_m *MockImage) ActiveTransportCount() int32
- func (_m *MockImage) BoundedPoll(handler term.FragmentHandler, limitPosition int64, fragmentLimit int) int
- func (_m *MockImage) Close() error
- func (_m *MockImage) ControlledPoll(handler term.ControlledFragmentHandler, fragmentLimit int) int
- func (_m *MockImage) CorrelationID() int64
- func (_m *MockImage) IsClosed() bool
- func (_m *MockImage) IsEndOfStream() bool
- func (_m *MockImage) Poll(handler term.FragmentHandler, fragmentLimit int) int
- func (_m *MockImage) Position() int64
- func (_m *MockImage) SessionID() int32
- func (_m *MockImage) SubscriptionRegistrationID() int64
- func (_m *MockImage) TermBufferLength() int32
- type NewPublicationHandler
- type NewSubscriptionHandler
- type Position
- type Publication
- func (pub *Publication) Channel() string
- func (pub *Publication) ChannelStatusID() int32
- func (pub *Publication) Close() error
- func (pub *Publication) InitialTermID() int32
- func (pub *Publication) IsClosed() bool
- func (pub *Publication) IsConnected() bool
- func (pub *Publication) IsOriginal() bool
- func (pub *Publication) Offer(buffer *atomic.Buffer, offset int32, length int32, ...) int64
- func (pub *Publication) Offer2(bufferOne *atomic.Buffer, offsetOne int32, lengthOne int32, ...) int64
- func (pub *Publication) OriginalRegistrationID() int64
- func (pub *Publication) Position() int64
- func (pub *Publication) RegistrationID() int64
- func (pub *Publication) SessionID() int32
- func (pub *Publication) StreamID() int32
- func (pub *Publication) TryClaim(length int32, bufferClaim *logbuffer.Claim) int64
- type ReceivingConductor
- type Subscription
- func (sub *Subscription) AddDestination(endpointChannel string) error
- func (sub *Subscription) AvailableImageHandler() AvailableImageHandler
- func (sub *Subscription) Channel() string
- func (sub *Subscription) ChannelStatus() int
- func (sub *Subscription) ChannelStatusId() int32
- func (sub *Subscription) Close() error
- func (sub *Subscription) ControlledPoll(handler term.ControlledFragmentHandler, fragmentLimit int) int
- func (sub *Subscription) HasImages() bool
- func (sub *Subscription) ImageBySessionID(sessionID int32) Image
- func (sub *Subscription) ImageCount() int
- func (sub *Subscription) IsClosed() bool
- func (sub *Subscription) IsConnected() bool
- func (sub *Subscription) LocalSocketAddresses() []string
- func (sub *Subscription) Poll(handler term.FragmentHandler, fragmentLimit int) int
- func (sub *Subscription) RegistrationID() int64
- func (sub *Subscription) RemoveDestination(endpointChannel string) error
- func (sub *Subscription) ResolvedEndpoint() string
- func (sub *Subscription) StreamID() int32
- func (sub *Subscription) TryResolveChannelEndpointPort() string
- func (sub *Subscription) UnavailableImageHandler() UnavailableImageHandler
- type UnavailableCounterHandler
- type UnavailableImageHandler
Constants ¶
const ( // NotConnected indicates that this Publication is not connected to the driver NotConnected int64 = -1 // BackPressured indicates that sending ring buffer is full BackPressured int64 = -2 // AdminAction indicates that terms needs to be rotated. User should retry the Offer AdminAction int64 = -3 // PublicationClosed indicates that this Publication is closed and no further Offers shall succeed PublicationClosed int64 = -4 // MaxPositionExceeded indicates that ... MaxPositionExceeded int64 = -5 )
const ( ChannelStatusNoIdAllocated = -1 // Channel status counter not allocated for IPC channels ChannelStatusErrored = -1 // Channel has errored. Check logs for information ChannelStatusInitializing = 0 // Channel is being initialized ChannelStatusActive = 1 // Channel has finished initialization and is active ChannelStatusClosing = 2 // Channel is being closed )
const ( ChannelStatusIdOffset = 0 LocalSocketAddressLengthOffset = ChannelStatusIdOffset + 4 LocalSocketAddressStringOffset = LocalSocketAddressLengthOffset + 4 )
From LocalSocketAddressStatus.Java
const AeronPrefix = "aeron:"
const AeronScheme = "aeron"
const AliasParamName = "alias"
const ChannelRcvTimestampOffsetParamName = "channel-rcv-ts-offset"
const ChannelSndTimestampOffsetParamName = "channel-snd-ts-offset"
const CongestionControlParamName = "cc"
const (
DefaultFragmentAssemblyBufferLength = int32(4096)
)
const EndpointParamName = "endpoint"
const EosParamName = "eos"
const FlowControlParamName = "fc"
const GroupParamName = "group"
const GroupTagParamName = "gtag"
const InitialTermIdParamName = "init-term-id"
const InterfaceParamName = "interface"
const IpcChannel = "aeron:ipc"
const IpcMedia = "ipc"
const LingerParamName = "linger"
const LocalSocketAddressStatusCounterTypeId = 14
const MdcControlModeDynamic = "dynamic"
const MdcControlModeManual = "manual"
const MdcControlModeParamName = "control-mode"
const MdcControlParamName = "control"
const MediaRcvTimestampOffsetParamName = "media-rcv-ts-offset"
const MtuLengthParamName = "mtu"
const NullValue = -1
NullValue is used to represent a null value for when some value is not yet set.
const ReceiverWindowLengthParamName = "rcv-wnd"
const RejoinParamName = "rejoin"
const ReliableStreamParamName = "reliable"
const SessionIdParamName = "session-id"
const SocketRcvbufParamName = "so-rcvbuf"
const SocketSndbufParamName = "so-sndbuf"
const SparseParamName = "sparse"
const SpiesSimulateConnectionParamName = "ssc"
const SpyPrefix = "aeron-spy:"
const SpyQualifier = "aeron-spy"
const TagPrefix = "tag:"
const TagsParamName = "tags"
const TermIdParamName = "term-id"
const TermLengthParamName = "term-length"
const TermOffsetParamName = "term-offset"
const TetherParamName = "tether"
const TtlParamName = "ttl"
const UdpMedia = "udp"
Variables ¶
var DefaultAeronDir string = "/dev/shm"
DefaultAeronDir is the location of media driver files
var RegistrationStatus = struct { AwaitingMediaDriver int RegisteredMediaDriver int ErroredMediaDriver int }{ 0, 1, 2, }
var UserName = os.Getenv("USER")
UserName is used to determine default directory for driver files
Functions ¶
func ChannelStatusString ¶
ChannelStatusString provides a convenience method for logging and error handling
func IsConnectedTo ¶
func IsConnectedTo(sub *Subscription, pub *Publication) bool
IsConnectedTo is a helper function used primarily by tests, which is used within the same process to verify that subscription is connected to a specific publication.
Types ¶
type Aeron ¶
type Aeron struct {
// contains filtered or unexported fields
}
Aeron is the primary interface to the media driver for managing subscriptions and publications
func Connect ¶
Connect is the factory method used to create a new instance of Aeron based on Context settings
func (*Aeron) AddAvailableCounterHandler ¶
func (aeron *Aeron) AddAvailableCounterHandler(handler AvailableCounterHandler) int64
AddAvailableCounterHandler adds a handler to the list to be called when a counter becomes available. Return the registrationID to use to remove the handler.
func (*Aeron) AddCounter ¶
func (aeron *Aeron) AddCounter( typeId int32, keyBuffer *atomic.Buffer, keyOffset int32, keyLength int32, labelBuffer *atomic.Buffer, labelOffset int32, labelLength int32) (int64, error)
AddCounter allocates a counter on the media driver and returns its registrationId. The Counter should be freed by calling Counter.Close().
func (*Aeron) AddCounterByLabel ¶
AddCounterByLabel allocates a counter on the media driver and returns its registrationId. The Counter should be freed by calling Counter.Close().
func (*Aeron) AddExclusivePublication ¶
func (aeron *Aeron) AddExclusivePublication(channel string, streamID int32) (*Publication, error)
AddExclusivePublication will add a new exclusive publication to the driver. If such publication already exists within ClientConductor the same instance will be returned.
func (*Aeron) AddExclusivePublicationDeprecated ¶
func (aeron *Aeron) AddExclusivePublicationDeprecated(channel string, streamID int32) chan *Publication
AddExclusivePublicationDeprecated will add a new exclusive publication to the driver. If such publication already exists within ClientConductor the same instance will be returned. Returns a channel, which can be used for either blocking or non-blocking want for media driver confirmation
func (*Aeron) AddPublication ¶
func (aeron *Aeron) AddPublication(channel string, streamID int32) (*Publication, error)
AddPublication will add a new publication to the driver. If such publication already exists within ClientConductor the same instance will be returned.
func (*Aeron) AddPublicationDeprecated ¶
func (aeron *Aeron) AddPublicationDeprecated(channel string, streamID int32) chan *Publication
AddPublicationDeprecated will add a new publication to the driver. If such publication already exists within ClientConductor the same instance will be returned. Returns a channel, which can be used for either blocking or non-blocking want for media driver confirmation
func (*Aeron) AddSubscription ¶
func (aeron *Aeron) AddSubscription(channel string, streamID int32) (*Subscription, error)
AddSubscription will add a new subscription to the driver and wait until it is ready.
func (*Aeron) AddSubscriptionDeprecated ¶
func (aeron *Aeron) AddSubscriptionDeprecated(channel string, streamID int32) chan *Subscription
AddSubscriptionDeprecated will add a new subscription to the driver. Returns a channel, which can be used for either blocking or non-blocking want for media driver confirmation
func (*Aeron) AddSubscriptionWithHandlers ¶
func (aeron *Aeron) AddSubscriptionWithHandlers(channel string, streamID int32, onAvailableImage AvailableImageHandler, onUnavailableImage UnavailableImageHandler) (*Subscription, error)
AddSubscriptionWithHandlers will add a new subscription to the driver and wait until it is ready. It will use the specified Handlers for available/unavailable Images instead of the default handlers.
func (*Aeron) AddUnavailableCounterHandler ¶
func (aeron *Aeron) AddUnavailableCounterHandler(handler UnavailableCounterHandler) int64
AddUnavailableCounterHandler adds a handler to the list to be called when Counters become unavailable. Return the registrationID to use to remove the handler.
func (*Aeron) AsyncAddExclusivePublication ¶
AsyncAddExclusivePublication will add a new exclusive publication to the driver and return its registration ID. That ID can be used to get the added exclusive Publication with GetExclusivePublication().
func (*Aeron) AsyncAddPublication ¶
AsyncAddPublication will add a new publication to the driver and return its registration ID. That ID can be used to get the added Publication with GetPublication().
func (*Aeron) AsyncAddSubscription ¶
AsyncAddSubscription will add a new subscription to the driver and return its registration ID. That ID can be used to get the Subscription with GetSubscription().
func (*Aeron) AsyncAddSubscriptionWithHandlers ¶
func (aeron *Aeron) AsyncAddSubscriptionWithHandlers(channel string, streamID int32, onAvailableImage AvailableImageHandler, onUnavailableImage UnavailableImageHandler) (int64, error)
AsyncAddSubscriptionWithHandlers will add a new subscription to the driver and return its registration ID. That ID can be used to get the Subscription with GetSubscription(). This call will use the specified Handlers for available/unavailable Images instead of the default handlers.
func (*Aeron) ClientID ¶
ClientID returns the client identity that has been allocated for communicating with the media driver.
func (*Aeron) Close ¶
Close will terminate client conductor and remove all publications and subscriptions from the media driver
func (*Aeron) CounterReader ¶
CounterReader returns Aeron's clientconductor's counterReader
func (*Aeron) FindCounter ¶
FindCounter retrieves the Counter associated with the given registrationID. This function is non-blocking. The value returned is dependent on what has occurred with respect to the media driver:
- If the registrationID is unknown, an error is returned. - If the media driver has not answered the command, (nil,nil) is returned. - If the media driver has successfully added the Counter then what is returned is the Counter. - If the media driver has returned an error, that error will be returned.
func (*Aeron) GetExclusivePublication ¶
func (aeron *Aeron) GetExclusivePublication(registrationID int64) (*Publication, error)
GetExclusivePublication will attempt to get an exclusive Publication from a registrationID. See AsyncAddExclusivePublication. A pending Publication will return nil,nil signifying that there is neither a Publication nor an error. Also note that while aeron-go currently handles GetPublication and GetExclusivePublication the same way, they may diverge in the future. Other Aeron languages already handle these calls differently.
func (*Aeron) GetPublication ¶
func (aeron *Aeron) GetPublication(registrationID int64) (*Publication, error)
GetPublication will attempt to get a Publication from a registrationID. See AsyncAddPublication. A pending Publication will return nil,nil signifying that there is neither a Publication nor an error.
func (*Aeron) GetSubscription ¶
func (aeron *Aeron) GetSubscription(registrationID int64) (*Subscription, error)
GetSubscription will attempt to get a Subscription from a registrationID. See AsyncAddSubscription. A pending Subscription will return nil,nil signifying that there is neither a Subscription nor an error.
func (*Aeron) NextCorrelationID ¶
NextCorrelationID generates the next correlation id that is unique for the connected Media Driver. This is useful generating correlation identifiers for pairing requests with responses in a clients own application protocol.
This method is thread safe and will work across processes that all use the same media driver.
func (*Aeron) RemoveAvailableCounterHandler ¶
func (aeron *Aeron) RemoveAvailableCounterHandler(handler AvailableCounterHandler) bool
RemoveAvailableCounterHandler removes a previously added handler from the list to be called when Counters become available. Returns true iff the handler was found and removed.
func (*Aeron) RemoveAvailableCounterHandlerById ¶
RemoveAvailableCounterHandlerById removes a previously added handler from the list to be called when Counters become available. Returns true iff the handler was found and removed.
func (*Aeron) RemoveUnavailableCounterHandler ¶
func (aeron *Aeron) RemoveUnavailableCounterHandler(handler UnavailableCounterHandler) bool
RemoveUnavailableCounterHandler removes a previously added handler from the list to be called when Counters become unavailable. Returns true iff the handler was found and removed.
func (*Aeron) RemoveUnavailableCounterHandlerById ¶
RemoveUnavailableCounterHandlerById removes a previously added handler from the list to be called when Counters become unavailable. Returns true iff the handler was found and removed.
type AvailableCounterHandler ¶
type AvailableCounterHandler interface {
Handle(countersReader *counters.Reader, registrationId int64, counterId int32)
}
AvailableCounterHandler is the function called by Aeron to deliver notification of a Counter being available. Implementations should do the minimum work for passing off state to another thread for later processing and should not make a reentrant call back into the Aeron instance. Note that this is an interface instead of a function in order to support RemoveAvailableCounterHandler.
type AvailableImageHandler ¶
type AvailableImageHandler func(Image)
AvailableImageHandler is the handler type for image available notification from the media driver
type ChannelUri ¶
type ChannelUri struct {
// contains filtered or unexported fields
}
ChannelUri is a parser for Aeron channel URIs. The format is: aeron-uri = "aeron:" media [ "?" param *( "|" param ) ] media = *( "[^?:]" ) param = key "=" value key = *( "[^=]" ) value = *( "[^|]" )
Multiple params with the same key are allowed, the last value specified takes precedence.
func ParseChannelUri ¶
func ParseChannelUri(uriStr string) (uri ChannelUri, err error)
ParseChannelUri parses a string which contains an Aeron URI.
func (ChannelUri) Clone ¶
func (uri ChannelUri) Clone() (res ChannelUri)
Clone returns a deep copy of a ChannelUri.
func (ChannelUri) Get ¶
func (uri ChannelUri) Get(key string) string
func (ChannelUri) IsIpc ¶
func (uri ChannelUri) IsIpc() bool
func (ChannelUri) IsUdp ¶
func (uri ChannelUri) IsUdp() bool
func (ChannelUri) Media ¶
func (uri ChannelUri) Media() string
func (ChannelUri) Prefix ¶
func (uri ChannelUri) Prefix() string
func (ChannelUri) Remove ¶
func (uri ChannelUri) Remove(key string)
func (ChannelUri) Scheme ¶
func (uri ChannelUri) Scheme() string
func (ChannelUri) Set ¶
func (uri ChannelUri) Set(key string, value string)
func (*ChannelUri) SetControlMode ¶
func (uri *ChannelUri) SetControlMode(controlMode string)
func (*ChannelUri) SetMedia ¶
func (uri *ChannelUri) SetMedia(media string)
func (*ChannelUri) SetPrefix ¶
func (uri *ChannelUri) SetPrefix(prefix string)
func (*ChannelUri) SetSessionID ¶
func (uri *ChannelUri) SetSessionID(sessionID int32)
func (ChannelUri) String ¶
func (uri ChannelUri) String() (result string)
type ClientConductor ¶
type ClientConductor struct {
// contains filtered or unexported fields
}
func (*ClientConductor) AddAvailableCounterHandler ¶
func (cc *ClientConductor) AddAvailableCounterHandler(handler AvailableCounterHandler) int64
func (*ClientConductor) AddCounter ¶
func (*ClientConductor) AddCounterByLabel ¶
func (cc *ClientConductor) AddCounterByLabel(typeId int32, label string) (int64, error)
func (*ClientConductor) AddDestination ¶
func (cc *ClientConductor) AddDestination(registrationID int64, endpointChannel string) error
AddDestination sends the add destination command through the driver proxy
func (*ClientConductor) AddExclusivePublication ¶
func (cc *ClientConductor) AddExclusivePublication(channel string, streamID int32) (int64, error)
AddExclusivePublication sends the add publication command through the driver proxy
func (*ClientConductor) AddPublication ¶
func (cc *ClientConductor) AddPublication(channel string, streamID int32) (int64, error)
AddPublication sends the add publication command through the driver proxy
func (*ClientConductor) AddRcvDestination ¶
func (cc *ClientConductor) AddRcvDestination(registrationID int64, endpointChannel string) error
AddRcvDestination sends the add rcv destination command through the driver proxy
func (*ClientConductor) AddSubscription ¶
func (cc *ClientConductor) AddSubscription(channel string, streamID int32) (int64, error)
AddSubscription sends the add subscription command through the driver proxy
func (*ClientConductor) AddSubscriptionWithHandlers ¶
func (cc *ClientConductor) AddSubscriptionWithHandlers(channel string, streamID int32, onAvailableImage AvailableImageHandler, onUnavailableImage UnavailableImageHandler) (int64, error)
AddSubscriptionWithHandlers sends the add subscription command through the driver proxy. It will use the specified Handlers for available/unavailable Images instead of the default handlers.
func (*ClientConductor) AddUnavailableCounterHandler ¶
func (cc *ClientConductor) AddUnavailableCounterHandler(handler UnavailableCounterHandler) int64
func (*ClientConductor) Close ¶
func (cc *ClientConductor) Close() (err error)
Close will terminate the Run() goroutine body and close all active publications and subscription. Run() can be restarted in a another goroutine.
func (*ClientConductor) CounterReader ¶
func (cc *ClientConductor) CounterReader() *ctr.Reader
func (*ClientConductor) FindCounter ¶
func (cc *ClientConductor) FindCounter(registrationID int64) (*Counter, error)
func (*ClientConductor) FindPublication ¶
func (cc *ClientConductor) FindPublication(registrationID int64) (*Publication, error)
func (*ClientConductor) FindSubscription ¶
func (cc *ClientConductor) FindSubscription(registrationID int64) (*Subscription, error)
FindSubscription by Registration ID, which is returned by AddSubscription. Returns the Subscription or an error. A pending Subscription will return nil,nil signifying that there is neither a Subscription nor an error.
func (*ClientConductor) Init ¶
func (cc *ClientConductor) Init(driverProxy DriverProxy, bcast *broadcast.CopyReceiver, interServiceTo, driverTo, pubConnectionTo, lingerTo time.Duration, counters *ctr.MetaDataFlyweight) *ClientConductor
Init is the primary initialization method for ClientConductor
func (*ClientConductor) OnAvailableCounter ¶
func (cc *ClientConductor) OnAvailableCounter(registrationId int64, counterId int32)
func (*ClientConductor) OnAvailableImage ¶
func (*ClientConductor) OnChannelEndpointError ¶
func (cc *ClientConductor) OnChannelEndpointError(corrID int64, errorMessage string)
func (*ClientConductor) OnClientTimeout ¶
func (cc *ClientConductor) OnClientTimeout(clientID int64)
func (*ClientConductor) OnErrorResponse ¶
func (cc *ClientConductor) OnErrorResponse(corrID int64, errorCode int32, errorMessage string)
func (*ClientConductor) OnNewExclusivePublication ¶
func (cc *ClientConductor) OnNewExclusivePublication(streamID int32, sessionID int32, posLimitCounterID int32, channelStatusIndicatorID int32, logFileName string, regID int64, origRegID int64)
TODO Implement logic specific to exclusive publications
func (*ClientConductor) OnNewPublication ¶
func (*ClientConductor) OnOperationSuccess ¶
func (cc *ClientConductor) OnOperationSuccess(corrID int64)
func (*ClientConductor) OnSubscriptionReady ¶
func (cc *ClientConductor) OnSubscriptionReady(correlationID int64, channelStatusIndicatorID int32)
func (*ClientConductor) OnUnavailableCounter ¶
func (cc *ClientConductor) OnUnavailableCounter(registrationId int64, counterId int32)
func (*ClientConductor) OnUnavailableImage ¶
func (cc *ClientConductor) OnUnavailableImage(corrID int64, subscriptionRegistrationID int64)
func (*ClientConductor) RemoveAvailableCounterHandler ¶
func (cc *ClientConductor) RemoveAvailableCounterHandler(handler AvailableCounterHandler) bool
func (*ClientConductor) RemoveAvailableCounterHandlerById ¶
func (cc *ClientConductor) RemoveAvailableCounterHandlerById(registrationId int64) bool
func (*ClientConductor) RemoveDestination ¶
func (cc *ClientConductor) RemoveDestination(registrationID int64, endpointChannel string) error
RemoveDestination sends the remove destination command through the driver proxy
func (*ClientConductor) RemoveRcvDestination ¶
func (cc *ClientConductor) RemoveRcvDestination(registrationID int64, endpointChannel string) error
RemoveRcvDestination sends the remove rcv destination command through the driver proxy
func (*ClientConductor) RemoveUnavailableCounterHandler ¶
func (cc *ClientConductor) RemoveUnavailableCounterHandler(handler UnavailableCounterHandler) bool
func (*ClientConductor) RemoveUnavailableCounterHandlerById ¶
func (cc *ClientConductor) RemoveUnavailableCounterHandlerById(registrationId int64) bool
func (*ClientConductor) Start ¶
func (cc *ClientConductor) Start(idleStrategy idlestrategy.Idler)
Start begins the main execution loop of ClientConductor on a goroutine.
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
Context configuration options are located here https://github.com/real-logic/Aeron/wiki/Configuration-Options#aeron-client-options
func NewContext ¶
func NewContext() *Context
NewContext creates and initializes new Context for Aeron
func (*Context) AvailableCounterHandler ¶
func (ctx *Context) AvailableCounterHandler(handler AvailableCounterHandler)
AvailableCounterHandler sets up a callback for when a Counter is available. This will be added to the list before additional handlers are added with Aeron.AddAvailableCounterHandler.
func (*Context) AvailableImageHandler ¶
AvailableImageHandler sets an optional default callback for available image notifications
func (*Context) CncFileName ¶
CncFileName returns the name of the Counters file
func (*Context) ErrorHandler ¶
ErrorHandler sets the error handler callback
func (*Context) GetAvailableCounterHandler ¶
func (ctx *Context) GetAvailableCounterHandler() AvailableCounterHandler
GetAvailableCounterHandler gets the callback handler for when a counter is available.
func (*Context) GetUnavailableCounterHandler ¶
func (ctx *Context) GetUnavailableCounterHandler() UnavailableCounterHandler
func (*Context) IdleStrategy ¶
func (ctx *Context) IdleStrategy(idleStrategy idlestrategy.Idler) *Context
IdleStrategy provides an IdleStrategy for the thread responsible for communicating with the Aeron Media Driver.
func (*Context) InterServiceTimeout ¶
InterServiceTimeout sets the timeout for client heartbeat
func (*Context) MediaDriverTimeout ¶
MediaDriverTimeout sets the timeout for keep alives to media driver
func (*Context) NewPublicationHandler ¶
newPublicationHandler sets an optional callback for new publications
func (*Context) NewSubscriptionHandler ¶
newSubscriptionHandler sets an optional callback for new subscriptions
func (*Context) PublicationConnectionTimeout ¶
func (*Context) ResourceLingerTimeout ¶
ResourceLingerTimeout sets the timeout for resource cleanup after they're released
func (*Context) UnavailableCounterHandler ¶
func (ctx *Context) UnavailableCounterHandler(handler UnavailableCounterHandler)
UnavailableCounterHandler sets up a callback for when a Counter is unavailable. This will be added to the list first before additional handlers are added with Aeron.AddUnavailableCounterHandler.
func (*Context) UnavailableImageHandler ¶
UnavailableImageHandler sets an optional default callback for unavailable image notification
type ControlledFragmentAssembler ¶
type ControlledFragmentAssembler struct {
// contains filtered or unexported fields
}
ControlledFragmentAssembler that sits in a chain-of-responsibility pattern that reassembles fragmented messages so that the next handler in the chain only sees whole messages.
Unfragmented messages are delegated without copy. Fragmented messages are copied to a temporary buffer for reassembly before delegation.
The Header passed to the delegate on assembling a message will be that of the last fragment.
Session based buffers will be allocated and grown as necessary based on the length of messages to be assembled.
func NewControlledFragmentAssembler ¶
func NewControlledFragmentAssembler(delegate term.ControlledFragmentHandler, initialBufferLength int32) *ControlledFragmentAssembler
NewControlledFragmentAssembler constructs an adapter to reassemble message fragments and delegate on whole messages.
func (*ControlledFragmentAssembler) OnFragment ¶
func (f *ControlledFragmentAssembler) OnFragment( buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) (action term.ControlledPollAction)
OnFragment reassembles and forwards whole messages to the delegate.
type Counter ¶
type Counter struct {
// contains filtered or unexported fields
}
func NewCounter ¶
func NewCounter( registrationId int64, clientConductor *ClientConductor, counterId int32) (*Counter, error)
func (*Counter) Counter ¶
func (c *Counter) Counter() *counters.AtomicCounter
func (*Counter) RegistrationId ¶
type DriverProxy ¶
type DriverProxy interface { ClientID() int64 TimeOfLastDriverKeepalive() int64 NextCorrelationID() int64 AddSubscription(channel string, streamID int32) (int64, error) RemoveSubscription(registrationID int64) error AddPublication(channel string, streamID int32) (int64, error) AddExclusivePublication(channel string, streamID int32) (int64, error) RemovePublication(registrationID int64) error ClientClose() error AddDestination(registrationID int64, channel string) (int64, error) RemoveDestination(registrationID int64, channel string) (int64, error) AddRcvDestination(registrationID int64, channel string) (int64, error) RemoveRcvDestination(registrationID int64, channel string) (int64, error) AddCounter(typeId int32, keyBuffer *atomic.Buffer, keyOffset int32, keyLength int32, labelBuffer *atomic.Buffer, labelOffset int32, labelLength int32) (int64, error) AddCounterByLabel(typeId int32, label string) (int64, error) RemoveCounter(registrationId int64) (int64, error) }
type FragmentAssembler ¶
type FragmentAssembler struct {
// contains filtered or unexported fields
}
FragmentAssembler that sits in a chain-of-responsibility pattern that reassembles fragmented messages so that the next handler in the chain only sees whole messages.
Unfragmented messages are delegated without copy. Fragmented messages are copied to a temporary buffer for reassembly before delegation.
The Header passed to the delegate on assembling a message will be that of the last fragment.
Session based buffers will be allocated and grown as necessary based on the length of messages to be assembled.
func NewFragmentAssembler ¶
func NewFragmentAssembler(delegate term.FragmentHandler, initialBufferLength int32) *FragmentAssembler
NewFragmentAssembler constructs an adapter to reassemble message fragments and delegate on whole messages.
func (*FragmentAssembler) Clear ¶
func (f *FragmentAssembler) Clear()
Clear removes all existing session buffers.
func (*FragmentAssembler) OnFragment ¶
func (f *FragmentAssembler) OnFragment( buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header)
OnFragment reassembles and forwards whole messages to the delegate.
type IdAndAvailableCounterHandler ¶
type IdAndAvailableCounterHandler struct {
// contains filtered or unexported fields
}
func NewIdAndAvailablePair ¶
func NewIdAndAvailablePair(registrationId int64, handler AvailableCounterHandler) *IdAndAvailableCounterHandler
type IdAndUnavailableCounterHandler ¶
type IdAndUnavailableCounterHandler struct {
// contains filtered or unexported fields
}
func NewIdAndUnavailablePair ¶
func NewIdAndUnavailablePair(registrationId int64, handler UnavailableCounterHandler) *IdAndUnavailableCounterHandler
type Image ¶
type Image interface { IsClosed() bool Poll(handler term.FragmentHandler, fragmentLimit int) int BoundedPoll(handler term.FragmentHandler, limitPosition int64, fragmentLimit int) int ControlledPoll(handler term.ControlledFragmentHandler, fragmentLimit int) int Position() int64 IsEndOfStream() bool SessionID() int32 CorrelationID() int64 SubscriptionRegistrationID() int64 TermBufferLength() int32 ActiveTransportCount() int32 Close() error }
Image is a Java-style interface for the image struct. This is to allow dependency injection and testing of the many structs that use image, without deviating from the existing function signatures and code structure.
type ImageFactory ¶
type ImageFactory func(sessionID int32, corrID int64, logFilename string, subRegId int64, sourceIdentity string, counterValuesBuffer *atomic.Buffer, subscriberPositionID int32) Image
ImageFactory allows tests to use fake Images
type ImageList ¶
type ImageList struct {
// contains filtered or unexported fields
}
ImageList is a helper class to manage list of images atomically without locks
type MockImage ¶
MockImage is an autogenerated mock type for the Image type
func NewMockImage ¶
func NewMockImage(t mockConstructorTestingTNewMockImage) *MockImage
NewMockImage creates a new instance of MockImage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func (*MockImage) ActiveTransportCount ¶
ActiveTransportCount provides a mock function with given fields:
func (*MockImage) BoundedPoll ¶
func (_m *MockImage) BoundedPoll(handler term.FragmentHandler, limitPosition int64, fragmentLimit int) int
BoundedPoll provides a mock function with given fields: handler, limitPosition, fragmentLimit
func (*MockImage) ControlledPoll ¶
func (_m *MockImage) ControlledPoll(handler term.ControlledFragmentHandler, fragmentLimit int) int
ControlledPoll provides a mock function with given fields: handler, fragmentLimit
func (*MockImage) CorrelationID ¶
CorrelationID provides a mock function with given fields:
func (*MockImage) IsEndOfStream ¶
IsEndOfStream provides a mock function with given fields:
func (*MockImage) Poll ¶
func (_m *MockImage) Poll(handler term.FragmentHandler, fragmentLimit int) int
Poll provides a mock function with given fields: handler, fragmentLimit
func (*MockImage) SubscriptionRegistrationID ¶
SubscriptionRegistrationID provides a mock function with given fields:
func (*MockImage) TermBufferLength ¶
TermBufferLength provides a mock function with given fields:
type NewPublicationHandler ¶
NewPublicationHandler is the handler type for new publication notification from the media driver
type NewSubscriptionHandler ¶
NewSubscriptionHandler is the handler type for new subscription notification from the media driver
type Position ¶
type Position struct {
// contains filtered or unexported fields
}
Position is a wrapper for a buffer location of a position counter
type Publication ¶
type Publication struct {
// contains filtered or unexported fields
}
Publication is a sender structure
func NewPublication ¶
func NewPublication(logBuffers *logbuffer.LogBuffers) *Publication
NewPublication is a factory method create new publications
func (*Publication) Channel ¶
func (pub *Publication) Channel() string
Channel returns the media address for delivery to the channel.
func (*Publication) ChannelStatusID ¶
func (pub *Publication) ChannelStatusID() int32
ChannelStatusID returns the counter used to represent the channel status for this publication.
func (*Publication) Close ¶
func (pub *Publication) Close() error
Close will close this publication with the driver. This is a blocking call.
func (*Publication) InitialTermID ¶
func (pub *Publication) InitialTermID() int32
InitialTermID returns the initial term id assigned when this publication was created. This can be used to determine how many terms have passed since creation.
func (*Publication) IsClosed ¶
func (pub *Publication) IsClosed() bool
IsClosed returns whether this Publication has been closed
func (*Publication) IsConnected ¶
func (pub *Publication) IsConnected() bool
IsConnected returns whether this publication is connected to the driver (not whether it has any Subscriptions)
func (*Publication) IsOriginal ¶
func (pub *Publication) IsOriginal() bool
IsOriginal return true if this instance is the first added otherwise false.
func (*Publication) Offer ¶
func (pub *Publication) Offer(buffer *atomic.Buffer, offset int32, length int32, reservedValueSupplier term.ReservedValueSupplier) int64
Offer is the primary send mechanism on Publication
func (*Publication) Offer2 ¶
func (pub *Publication) Offer2( bufferOne *atomic.Buffer, offsetOne int32, lengthOne int32, bufferTwo *atomic.Buffer, offsetTwo int32, lengthTwo int32, reservedValueSupplier term.ReservedValueSupplier, ) int64
Offer2 attempts to publish a message composed of two parts, e.g. a header and encapsulated payload.
func (*Publication) OriginalRegistrationID ¶
func (pub *Publication) OriginalRegistrationID() int64
OriginalRegistrationID returns the original registration id.
func (*Publication) Position ¶
func (pub *Publication) Position() int64
Position returns the current position to which the publication has advanced for this stream or PublicationClosed if closed.
func (*Publication) RegistrationID ¶
func (pub *Publication) RegistrationID() int64
RegistrationID returns the registration id.
func (*Publication) SessionID ¶
func (pub *Publication) SessionID() int32
SessionID returns the session id for this publication.
func (*Publication) StreamID ¶
func (pub *Publication) StreamID() int32
StreamID returns Stream identity for scoping within the channel media address.
type ReceivingConductor ¶
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription is the object responsible for receiving messages from media driver. It is specific to a channel and stream ID combination.
func NewSubscription ¶
func NewSubscription( conductor ReceivingConductor, channel string, registrationID int64, streamID int32, channelStatusID int32, availableImagehandler AvailableImageHandler, unavailableImageHandler UnavailableImageHandler) *Subscription
NewSubscription is a factory method to create new subscription to be added to the media driver
func (*Subscription) AddDestination ¶
func (sub *Subscription) AddDestination(endpointChannel string) error
AddDestination adds a destination manually to a multi-destination Subscription.
func (*Subscription) AvailableImageHandler ¶
func (sub *Subscription) AvailableImageHandler() AvailableImageHandler
AvailableImageHandler returns a callback used to indicate when an Image becomes available under this Subscription. The handler may be nil.
func (*Subscription) Channel ¶
func (sub *Subscription) Channel() string
Channel returns the media address for delivery to the channel.
func (*Subscription) ChannelStatus ¶
func (sub *Subscription) ChannelStatus() int
ChannelStatus returns the status of the media channel for this Subscription. The status will be ChannelStatusErrored if a socket exception on setup or ChannelStatusActive if all is well.
func (*Subscription) ChannelStatusId ¶
func (sub *Subscription) ChannelStatusId() int32
ChannelStatusId returns the counter ID used to represent the channel status of this Subscription.
func (*Subscription) Close ¶
func (sub *Subscription) Close() error
Close will release all images in this subscription, send command to the driver and block waiting for response from the media driver. Images will be lingered by the ClientConductor.
func (*Subscription) ControlledPoll ¶
func (sub *Subscription) ControlledPoll(handler term.ControlledFragmentHandler, fragmentLimit int) int
ControlledPoll polls in a controlled manner the image s under the subscription for available message fragments. Control is applied to fragments in the stream. If more fragments can be read on another stream they will even if BREAK or ABORT is returned from the fragment handler.
Each fragment read will be a whole message if it is under MTU length. If larger than MTU then it will come as a series of fragments ordered within a session. Returns the number of fragments received.
func (*Subscription) HasImages ¶
func (sub *Subscription) HasImages() bool
HasImages is a helper method checking whether this subscription has any images associated with it.
func (*Subscription) ImageBySessionID ¶
func (sub *Subscription) ImageBySessionID(sessionID int32) Image
ImageBySessionID returns the associated with the given sessionId.
func (*Subscription) ImageCount ¶
func (sub *Subscription) ImageCount() int
ImageCount count of images associated with this subscription.
func (*Subscription) IsClosed ¶
func (sub *Subscription) IsClosed() bool
IsClosed returns whether this subscription has been closed.
func (*Subscription) IsConnected ¶
func (sub *Subscription) IsConnected() bool
IsConnected returns if this subscription is connected by having at least one open publication image.
func (*Subscription) LocalSocketAddresses ¶
func (sub *Subscription) LocalSocketAddresses() []string
LocalSocketAddresses fetches the local socket addresses for this subscription.
func (*Subscription) Poll ¶
func (sub *Subscription) Poll(handler term.FragmentHandler, fragmentLimit int) int
Poll is the primary receive mechanism on subscription.
func (*Subscription) RegistrationID ¶
func (sub *Subscription) RegistrationID() int64
RegistrationID returns the registration id.
func (*Subscription) RemoveDestination ¶
func (sub *Subscription) RemoveDestination(endpointChannel string) error
RemoveDestination removes a destination manually from a multi-destination Subscription.
func (*Subscription) ResolvedEndpoint ¶
func (sub *Subscription) ResolvedEndpoint() string
ResolvedEndpoint finds the resolved endpoint for the channel. This may be nil if MDS is used and no destination is yet added. The result is simply the first in the list of addresses found if multiple addresses exist
func (*Subscription) StreamID ¶
func (sub *Subscription) StreamID() int32
StreamID returns Stream identity for scoping within the channel media address.
func (*Subscription) TryResolveChannelEndpointPort ¶
func (sub *Subscription) TryResolveChannelEndpointPort() string
TryResolveChannelEndpointPort resolves the channel endpoint and replaces it with the port from the ephemeral range when 0 was provided. If there are no addresses, or if there is more than one, returned from LocalSocketAddresses() then the original channel is returned. If the channel is not ACTIVE, then empty string will be returned.
func (*Subscription) UnavailableImageHandler ¶
func (sub *Subscription) UnavailableImageHandler() UnavailableImageHandler
UnavailableImageHandler returns a callback used to indicate when an Image goes unavailable under this Subscription. The handler may be nil.
type UnavailableCounterHandler ¶
type UnavailableCounterHandler interface {
}UnavailableCounterHandler is for notification of Counters being removed via an Aeron client. Note that this is an interface instead of a function in order to support RemoveAvailableCounterHandler.
type UnavailableImageHandler ¶
type UnavailableImageHandler func(Image)
UnavailableImageHandler is the handler type for image unavailable notification from the media driver
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Provides a transition layer from "github.com/op/go-logging" to "go.uber.org/zap" to simply resolve some reentrancy issues in go-logging.
|
Provides a transition layer from "github.com/op/go-logging" to "go.uber.org/zap" to simply resolve some reentrancy issues in go-logging. |