aeron

package
v0.0.0-...-7da4229 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2020 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// NotConnected indicates that this Publication is not connected to the driver
	NotConnected int64 = -1
	// BackPressured indicates that sending ring buffer is full
	BackPressured int64 = -2
	// AdminAction indicates that terms needs to be rotated. User should retry the Offer
	AdminAction int64 = -3
	// PublicationClosed indicates that this Publication is closed an no further Offers shall succeed
	PublicationClosed int64 = -4
	// MaxPositionExceeded indicates that ...
	MaxPositionExceeded int64 = -5
)
View Source
const (
	DefaultFragmentAssemblyBufferLength = int32(0)
)
View Source
const (
	ImageClosed int = -1
)

Variables

View Source
var ControlledPollAction = struct {
	/**
	 * Abort the current polling operation and do not advance the position for this fragment.
	 */
	ABORT int

	/**
	 * Break from the current polling operation and commit the position as of the end of the current fragment
	 * being handled.
	 */
	BREAK int

	/**
	 * Continue processing but commit the position as of the end of the current fragment so that
	 * flow control is applied to this point.
	 */
	COMMIT int

	/**
	 * Continue processing taking the same approach as the in fragment_handler_t.
	 */
	CONTINUE int
}{
	1,
	2,
	3,
	4,
}
View Source
var DefaultAeronDir string = "/dev/shm"

DefaultAeronDir is the location of media driver files

View Source
var RegistrationStatus = struct {
	AwaitingMediaDriver   int
	RegisteredMediaDriver int
	ErroredMediaDriver    int
}{
	0,
	1,
	2,
}
View Source
var UserName = os.Getenv("USER")

UserName is used to determine default directory for driver files

Functions

func IsConnectedTo

func IsConnectedTo(sub *Subscription, pub *Publication) bool

IsConnectedTo is a helper function used primarily by tests, which is used within the same process to verify that subscription is connected to a specific publication.

Types

type Aeron

type Aeron struct {
	// contains filtered or unexported fields
}

Aeron is the primary interface to the media driver for managing subscriptions and publications

func Connect

func Connect(ctx *Context) (*Aeron, error)

Connect is the factory method used to create a new instance of Aeron based on Context settings

func (*Aeron) AddExclusivePublication

func (aeron *Aeron) AddExclusivePublication(channel string, streamID int32) chan *Publication

AddExclusivePublication will add a new exclusive publication to the driver. If such publication already exists within ClientConductor the same instance will be returned. Returns a channel, which can be used for either blocking or non-blocking want for media driver confirmation

func (*Aeron) AddPublication

func (aeron *Aeron) AddPublication(channel string, streamID int32) chan *Publication

AddPublication will add a new publication to the driver. If such publication already exists within ClientConductor the same instance will be returned. Returns a channel, which can be used for either blocking or non-blocking want for media driver confirmation

func (*Aeron) AddSubscription

func (aeron *Aeron) AddSubscription(channel string, streamID int32) chan *Subscription

AddSubscription will add a new subscription to the driver. Returns a channel, which can be used for either blocking or non-blocking want for media driver confirmation

func (*Aeron) ClientID

func (aeron *Aeron) ClientID() int64

ClientID returns the client identity that has been allocated for communicating with the media driver.

func (*Aeron) Close

func (aeron *Aeron) Close() error

Close will terminate client conductor and remove all publications and subscriptions from the media driver

func (*Aeron) NextCorrelationID

func (aeron *Aeron) NextCorrelationID() int64

NextCorrelationID generates the next correlation id that is unique for the connected Media Driver. This is useful generating correlation identifiers for pairing requests with responses in a clients own application protocol.

This method is thread safe and will work across processes that all use the same media driver.

type AvailableImageHandler

type AvailableImageHandler func(*Image)

AvailableImageHandler is the handler type for image available notification from the media driver

type ClientConductor

type ClientConductor struct {
	// contains filtered or unexported fields
}

func (*ClientConductor) AddExclusivePublication

func (cc *ClientConductor) AddExclusivePublication(channel string, streamID int32) int64

AddExclusivePublication sends the add publication command through the driver proxy

func (*ClientConductor) AddPublication

func (cc *ClientConductor) AddPublication(channel string, streamID int32) int64

AddPublication sends the add publication command through the driver proxy

func (*ClientConductor) AddSubscription

func (cc *ClientConductor) AddSubscription(channel string, streamID int32) int64

AddSubscription sends the add subscription command through the driver proxy

func (*ClientConductor) Close

func (cc *ClientConductor) Close() error

Close will terminate the Run() goroutine body and close all active publications and subscription. Run() can be restarted in a another goroutine.

func (*ClientConductor) FindPublication

func (cc *ClientConductor) FindPublication(regID int64) *Publication

func (*ClientConductor) FindSubscription

func (cc *ClientConductor) FindSubscription(regID int64) *Subscription

func (*ClientConductor) Init

func (cc *ClientConductor) Init(driverProxy *driver.Proxy, bcast *broadcast.CopyReceiver,
	interServiceTo, driverTo, pubConnectionTo, lingerTo time.Duration, counters *ctr.MetaDataFlyweight) *ClientConductor

Init is the primary initialization method for ClientConductor

func (*ClientConductor) OnAvailableCounter

func (cc *ClientConductor) OnAvailableCounter(correlationID int64, counterID int32)

func (*ClientConductor) OnAvailableImage

func (cc *ClientConductor) OnAvailableImage(streamID int32, sessionID int32, logFilename string, sourceIdentity string,
	subscriberPositionID int32, subsRegID int64, corrID int64)

func (*ClientConductor) OnClientTimeout

func (cc *ClientConductor) OnClientTimeout(clientID int64)

func (*ClientConductor) OnErrorResponse

func (cc *ClientConductor) OnErrorResponse(corrID int64, errorCode int32, errorMessage string)

func (*ClientConductor) OnNewExclusivePublication

func (cc *ClientConductor) OnNewExclusivePublication(streamID int32, sessionID int32, posLimitCounterID int32,
	channelStatusIndicatorID int32, logFileName string, regID int64, origRegID int64)

TODO Implement logic specific to exclusive publications

func (*ClientConductor) OnNewPublication

func (cc *ClientConductor) OnNewPublication(streamID int32, sessionID int32, posLimitCounterID int32,
	channelStatusIndicatorID int32, logFileName string, regID int64, origRegID int64)

func (*ClientConductor) OnOperationSuccess

func (cc *ClientConductor) OnOperationSuccess(corrID int64)

func (*ClientConductor) OnSubscriptionReady

func (cc *ClientConductor) OnSubscriptionReady(correlationID int64, channelStatusIndicatorID int32)

func (*ClientConductor) OnUnavailableCounter

func (cc *ClientConductor) OnUnavailableCounter(correlationID int64, counterID int32)

func (*ClientConductor) OnUnavailableImage

func (cc *ClientConductor) OnUnavailableImage(corrID int64, subscriptionRegistrationID int64)

func (*ClientConductor) Start

func (cc *ClientConductor) Start(idleStrategy idlestrategy.Idler)

Start begins the main execution loop of ClientConductor on a goroutine.

type Context

type Context struct {
	// contains filtered or unexported fields
}

Context configuration options are located here https://github.com/real-logic/Aeron/wiki/Configuration-Options#aeron-client-options

func NewContext

func NewContext() *Context

NewContext creates and initializes new Context for Aeron

func (*Context) AeronDir

func (ctx *Context) AeronDir(dir string) *Context

AeronDir sets the root directory for media driver files

func (*Context) AvailableImageHandler

func (ctx *Context) AvailableImageHandler(handler func(*Image)) *Context

AvailableImageHandler sets an optional callback for available image notifications

func (*Context) CncFileName

func (ctx *Context) CncFileName() string

CncFileName returns the name of the Counters file

func (*Context) ErrorHandler

func (ctx *Context) ErrorHandler(handler func(error)) *Context

ErrorHandler sets the error handler callback

func (*Context) InterServiceTimeout

func (ctx *Context) InterServiceTimeout(to time.Duration) *Context

InterServiceTimeout sets the timeout for client heartbeat

func (*Context) MediaDriverTimeout

func (ctx *Context) MediaDriverTimeout(to time.Duration) *Context

MediaDriverTimeout sets the timeout for keep alives to media driver

func (*Context) PublicationConnectionTimeout

func (ctx *Context) PublicationConnectionTimeout(to time.Duration) *Context

func (*Context) ResourceLingerTimeout

func (ctx *Context) ResourceLingerTimeout(to time.Duration) *Context

ResourceLingerTimeout sets the timeout for resource cleanup after they're released

func (*Context) UnavailableImageHandler

func (ctx *Context) UnavailableImageHandler(handler func(*Image)) *Context

UnavailableImageHandler sets an optional callback for unavailable image notification

type ControlledPollFragmentHandler

type ControlledPollFragmentHandler func(buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header)

type FragmentAssembler

type FragmentAssembler struct {
	// contains filtered or unexported fields
}

FragmentAssember that sits in a chain-of-responsibility pattern that reassembles fragmented messages so that the next handler in the chain only sees whole messages. Unfragmented messages are delegated without copy. Fragmented messages are copied to a temporary buffer for reassembly before delegation.

The Header passed to the delegate on assembling a message will be that of the last fragment.

Session based buffers will be allocated and grown as necessary based on the length of messages to be assembled.

func NewFragmentAssembler

func NewFragmentAssembler(delegate term.FragmentHandler, initialBufferLength int32) *FragmentAssembler

NewFragmentAssembler constructs an adapter to reassemble message fragments and delegate on whole messages.

func (*FragmentAssembler) OnFragment

func (self *FragmentAssembler) OnFragment(
	buffer *atomic.Buffer,
	offset int32,
	length int32,
	header *logbuffer.Header)

OnFragment reassembles and forwards whole messages to the delegate.

type Image

type Image struct {
	// contains filtered or unexported fields
}

func NewImage

func NewImage(sessionID int32, correlationID int64, logBuffers *logbuffer.LogBuffers) *Image

NewImage wraps around provided LogBuffers setting up the structures for polling

func (*Image) Close

func (image *Image) Close() error

Close the image and mappings. The image becomes unusable after closing.

func (*Image) IsClosed

func (image *Image) IsClosed() bool

IsClosed returns whether this image has been closed. No further operations are valid.

func (*Image) Poll

func (image *Image) Poll(handler term.FragmentHandler, fragmentLimit int) int

type ImageList

type ImageList struct {
	// contains filtered or unexported fields
}

ImageList is a helper class to manage list of images atomically without locks

func NewImageList

func NewImageList() *ImageList

NewImageList is a factory method for ImageList

func (*ImageList) Empty

func (l *ImageList) Empty() (oldList []Image)

Empty is a convenience method to reset the contents of the list

func (*ImageList) Get

func (l *ImageList) Get() []Image

Get returns a pointer to the underlying image array loaded atomically

func (*ImageList) Set

func (l *ImageList) Set(imgs []Image)

Set atomically sets the reference to the underlying array

type NewPublicationHandler

type NewPublicationHandler func(string, int32, int32, int64)

NewPublicationHandler is the handler type for new publication notification from the media driver

type NewSubscriptionHandler

type NewSubscriptionHandler func(string, int32, int64)

NewSubscriptionHandler is the handler type for new subscription notification from the media driver

type Position

type Position struct {
	// contains filtered or unexported fields
}

Position is a wrapper for a buffer location of a position counter

func NewPosition

func NewPosition(buffer *atomic.Buffer, id int32) Position

NewPosition is a factory method to create new Position wrappers

type Publication

type Publication struct {
	// contains filtered or unexported fields
}

Publication is a sender structure

func NewPublication

func NewPublication(logBuffers *logbuffer.LogBuffers) *Publication

NewPublication is a factory method create new publications

func (*Publication) Channel

func (pub *Publication) Channel() string

Channel returns the media address for delivery to the channel.

func (*Publication) ChannelStatusID

func (pub *Publication) ChannelStatusID() int32

ChannelStatusID returns the counter used to represent the channel status for this publication.

func (*Publication) Close

func (pub *Publication) Close() error

Close will close this publication with the driver. This is a blocking call.

func (*Publication) InitialTermID

func (pub *Publication) InitialTermID() int32

InitialTermID returns the initial term id assigned when this publication was created. This can be used to determine how many terms have passed since creation.

func (*Publication) IsClosed

func (pub *Publication) IsClosed() bool

IsClosed returns whether this Publication has been closed

func (*Publication) IsConnected

func (pub *Publication) IsConnected() bool

IsConnected returns whether this publication is connected to the driver (not whether it has any Subscriptions)

func (*Publication) IsOriginal

func (pub *Publication) IsOriginal() bool

IsOriginal return true if this instance is the first added otherwise false.

func (*Publication) Offer

func (pub *Publication) Offer(buffer *atomic.Buffer, offset int32, length int32, reservedValueSupplier term.ReservedValueSupplier) int64

Offer is the primary send mechanism on Publication

func (*Publication) OriginalRegistrationID

func (pub *Publication) OriginalRegistrationID() int64

OriginalRegistrationID returns the original registration id.

func (*Publication) RegistrationID

func (pub *Publication) RegistrationID() int64

RegistrationID returns the registration id.

func (*Publication) SessionID

func (pub *Publication) SessionID() int32

SessionID returns the session id for this publication.

func (*Publication) StreamID

func (pub *Publication) StreamID() int32

StreamID returna Stream identity for scoping within the channel media address.

func (*Publication) TryClaim

func (pub *Publication) TryClaim(length int32, bufferClaim *logbuffer.Claim) int64

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription is the object responsible for receiving messages from media driver. It is specific to a channel and stream ID combination.

func NewSubscription

func NewSubscription(conductor *ClientConductor, channel string, registrationID int64, streamID int32) *Subscription

NewSubscription is a factory method to create new subscription to be added to the media driver

func (*Subscription) Close

func (sub *Subscription) Close() error

Close will release all images in this subscription, send command to the driver and block waiting for response from the media driver. Images will be lingered by the ClientConductor.

func (*Subscription) HasImages

func (sub *Subscription) HasImages() bool

HasImages is a helper method checking whether this subscription has any images associated with it.

func (*Subscription) IsClosed

func (sub *Subscription) IsClosed() bool

IsClosed returns whether this subscription has been closed.

func (*Subscription) Poll

func (sub *Subscription) Poll(handler term.FragmentHandler, fragmentLimit int) int

Poll is the primary receive mechanism on subscription.

type UnavailableImageHandler

type UnavailableImageHandler func(*Image)

UnavailableImageHandler is the handler type for image unavailable notification from the media driver

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL