Documentation ¶
Overview ¶
Package archive provides API access to Aeron's archive-media-driver
Index ¶
- Constants
- Variables
- func AddSessionIdToChannel(channel string, sessionID int32) (string, error)
- func ConnectionControlFragmentHandler(context *PollContext, buffer *atomic.Buffer, offset int32, length int32, ...)
- func DescriptorFragmentHandler(context interface{}, buffer *atomic.Buffer, offset int32, length int32, ...)
- func FindCounterIdByRecording(countersReader *counters.Reader, recordingId int64) int32
- func FindCounterIdBySession(countersReader *counters.Reader, sessionId int32) int32
- func GetRecordingId(countersReader *counters.Reader, counterId int32) int64
- func GetSourceIdentity(countersReader *counters.Reader, counterId int32) string
- func IsRecordingActive(countersReader *counters.Reader, counterId int32, recordingId int64) bool
- func LoggingAvailableImageListener(image aeron.Image)
- func LoggingErrorListener(err error)
- func LoggingNewPublicationListener(channel string, stream int32, session int32, regID int64)
- func LoggingNewSubscriptionListener(channel string, stream int32, correlationID int64)
- func LoggingRecordingEventProgressListener(rp *codecs.RecordingProgress)
- func LoggingRecordingEventStartedListener(rs *codecs.RecordingStarted)
- func LoggingRecordingEventStoppedListener(rs *codecs.RecordingStopped)
- func LoggingRecordingSignalListener(rse *codecs.RecordingSignalEvent)
- func LoggingUnavailableImageListener(image aeron.Image)
- func ReplaySessionIdToSessionId(replaySessionID int64) int32
- type Archive
- func (archive *Archive) AddExclusivePublication(channel string, streamID int32) (*aeron.Publication, error)
- func (archive *Archive) AddPublication(channel string, streamID int32) (*aeron.Publication, error)
- func (archive *Archive) AddRecordedPublication(channel string, stream int32) (*aeron.Publication, error)
- func (archive *Archive) AddSubscription(channel string, streamID int32) (*aeron.Subscription, error)
- func (archive *Archive) Aeron() *aeron.Aeron
- func (archive *Archive) AeronCncFileName() string
- func (archive *Archive) AttachSegments(recordingID int64) (int64, error)
- func (archive *Archive) BoundedReplay(recordingID int64, position int64, length int64, limitCounterID int32, ...) (int64, error)
- func (archive *Archive) ClientId() int64
- func (archive *Archive) Close() error
- func (archive *Archive) DeleteDetachedSegments(recordingID int64) (int64, error)
- func (archive *Archive) DetachSegments(recordingID int64, newStartPosition int64) error
- func (archive *Archive) DisableRecordingEvents()
- func (archive *Archive) EnableRecordingEvents() error
- func (archive *Archive) ExtendRecording(recordingID int64, stream int32, sourceLocation codecs.SourceLocationEnum, ...) (int64, error)
- func (archive *Archive) FindLastMatchingRecording(minRecordingID int64, sessionID int32, stream int32, channel string) (int64, error)
- func (archive *Archive) GetRecordingPosition(recordingID int64) (int64, error)
- func (archive *Archive) GetStartPosition(recordingID int64) (int64, error)
- func (archive *Archive) GetStopPosition(recordingID int64) (int64, error)
- func (archive *Archive) IsRecordingEventsConnected() (bool, error)
- func (archive *Archive) KeepAlive() error
- func (archive *Archive) ListRecording(recordingID int64) (*codecs.RecordingDescriptor, error)
- func (archive *Archive) ListRecordingSubscriptions(pseudoIndex int32, subscriptionCount int32, applyStreamID bool, stream int32, ...) ([]*codecs.RecordingSubscriptionDescriptor, error)
- func (archive *Archive) ListRecordings(fromRecordingID int64, recordCount int32) ([]*codecs.RecordingDescriptor, error)
- func (archive *Archive) ListRecordingsForUri(fromRecordingID int64, recordCount int32, channelFragment string, stream int32) ([]*codecs.RecordingDescriptor, error)
- func (archive *Archive) MigrateSegments(recordingID int64, position int64) (int64, error)
- func (archive *Archive) PollForErrorResponse() (int, error)
- func (archive *Archive) PurgeRecording(recordingID int64) error
- func (archive *Archive) PurgeSegments(recordingID int64, newStartPosition int64) (int64, error)
- func (archive *Archive) RecordingEventsPoll() int
- func (archive *Archive) Replicate(srcRecordingID int64, dstRecordingID int64, srcControlStreamID int32, ...) (int64, error)
- func (archive *Archive) Replicate2(srcRecordingID int64, dstRecordingID int64, stopPosition int64, ...) (int64, error)
- func (archive *Archive) SetAeronDir(dir string)
- func (archive *Archive) SetAeronInterServiceTimeout(timeout time.Duration)
- func (archive *Archive) SetAeronMediaDriverTimeout(timeout time.Duration)
- func (archive *Archive) SetAeronPublicationConnectionTimeout(timeout time.Duration)
- func (archive *Archive) SetAeronResourceLingerTimeout(timeout time.Duration)
- func (archive *Archive) StartRecording(channel string, stream int32, isLocal bool, autoStop bool) (int64, error)
- func (archive *Archive) StartReplay(recordingID int64, position int64, length int64, replayChannel string, ...) (int64, error)
- func (archive *Archive) StopAllReplays(recordingID int64) error
- func (archive *Archive) StopRecording(channel string, stream int32) error
- func (archive *Archive) StopRecordingByIdentity(recordingID int64) (bool, error)
- func (archive *Archive) StopRecordingByPublication(publication aeron.Publication) error
- func (archive *Archive) StopRecordingBySubscriptionId(subscriptionID int64) error
- func (archive *Archive) StopReplay(replaySessionID int64) error
- func (archive *Archive) StopReplication(replicationID int64) error
- func (archive *Archive) TaggedReplicate(srcRecordingID int64, dstRecordingID int64, channelTagID int64, ...) (int64, error)
- func (archive *Archive) TruncateRecording(recordingID int64, position int64) error
- type ArchiveListeners
- type CodecIds
- type Control
- func (control *Control) Poll() (workCount int)
- func (control *Control) PollForDescriptors(correlationID int64, sessionID int64, fragmentsWanted int32) error
- func (control *Control) PollForErrorResponse() (int, error)
- func (control *Control) PollForResponse(correlationID int64, sessionID int64) (int64, error)
- type ControlResults
- type FragmentHandlerWithListeners
- type Options
- type PollContext
- type Proxy
- func (proxy *Proxy) AttachSegmentsRequest(correlationID int64, recordingID int64) error
- func (proxy *Proxy) AuthConnectRequest(correlationID int64, responseStream int32, responseChannel string, ...) error
- func (proxy *Proxy) BoundedReplayRequest(correlationID int64, recordingID int64, position int64, length int64, ...) error
- func (proxy *Proxy) CatalogHeaderRequest(version int32, length int32, nextRecordingID int64, alignment int32) error
- func (proxy *Proxy) ChallengeResponse(correlationID int64, encodedCredentials []uint8) error
- func (proxy *Proxy) CloseSessionRequest() error
- func (proxy *Proxy) ConnectRequest(correlationID int64, responseStream int32, responseChannel string) error
- func (proxy *Proxy) DeleteDetachedSegmentsRequest(correlationID int64, recordingID int64) error
- func (proxy *Proxy) DetachSegmentsRequest(correlationID int64, recordingID int64, newStartPosition int64) error
- func (proxy *Proxy) ExtendRecordingRequest(correlationID int64, recordingID int64, stream int32, ...) error
- func (proxy *Proxy) FindLastMatchingRecordingRequest(correlationID int64, minRecordingID int64, sessionID int32, stream int32, ...) error
- func (proxy *Proxy) KeepAliveRequest(correlationID int64) error
- func (proxy *Proxy) ListRecordingRequest(correlationID int64, fromRecordingID int64) error
- func (proxy *Proxy) ListRecordingSubscriptionsRequest(correlationID int64, pseudoIndex int32, subscriptionCount int32, ...) error
- func (proxy *Proxy) ListRecordingsForUriRequest(correlationID int64, fromRecordingID int64, recordCount int32, stream int32, ...) error
- func (proxy *Proxy) ListRecordingsRequest(correlationID int64, fromRecordingID int64, recordCount int32) error
- func (proxy *Proxy) MigrateSegmentsRequest(correlationID int64, srcRecordingID int64, destRecordingID int64) error
- func (proxy *Proxy) Offer(buffer *atomic.Buffer, offset int32, length int32, ...) int64
- func (proxy *Proxy) PurgeRecordingRequest(correlationID int64, replaySessionID int64) error
- func (proxy *Proxy) PurgeSegmentsRequest(correlationID int64, recordingID int64, newStartPosition int64) error
- func (proxy *Proxy) RecordingPositionRequest(correlationID int64, recordingID int64) error
- func (proxy *Proxy) ReplayRequest(correlationID int64, recordingID int64, position int64, length int64, ...) error
- func (proxy *Proxy) ReplicateRequest(correlationID int64, srcRecordingID int64, dstRecordingID int64, ...) error
- func (proxy *Proxy) ReplicateRequest2(correlationID int64, srcRecordingID int64, dstRecordingID int64, ...) error
- func (proxy *Proxy) StartPositionRequest(correlationID int64, recordingID int64) error
- func (proxy *Proxy) StartRecordingRequest(correlationID int64, stream int32, isLocal bool, autoStop bool, channel string) error
- func (proxy *Proxy) StopAllReplaysRequest(correlationID int64, recordingID int64) error
- func (proxy *Proxy) StopPositionRequest(correlationID int64, recordingID int64) error
- func (proxy *Proxy) StopRecordingByIdentityRequest(correlationID int64, recordingID int64) error
- func (proxy *Proxy) StopRecordingRequest(correlationID int64, stream int32, channel string) error
- func (proxy *Proxy) StopRecordingSubscriptionRequest(correlationID int64, subscriptionID int64) error
- func (proxy *Proxy) StopReplayRequest(correlationID int64, replaySessionID int64) error
- func (proxy *Proxy) StopReplicationRequest(correlationID int64, replicationID int64) error
- func (proxy *Proxy) TaggedReplicateRequest(correlationID int64, srcRecordingID int64, dstRecordingID int64, ...) error
- func (proxy *Proxy) TruncateRecordingRequest(correlationID int64, recordingID int64, position int64) error
- type RecordingEventsAdapter
Constants ¶
const ( RecordingPositionNull = int64(-1) // Replay a stream from the start. RecordingLengthNull = int64(-1) // Replay will follow a live recording RecordingLengthMax = int64(2<<31 - 1) // Replay the whole stream )
Constant values used to control behaviour of StartReplay
const ( ControlStateError = -1 ControlStateNew = iota ControlStateConnectRequestSent = iota ControlStateChallenged = iota ControlStateConnectRequestOk = iota ControlStateConnected = iota ControlStateTimedOut = iota )
Archive Connection State used internally for connection establishment
const ( RECORDING_ID_OFFSET = 0 SESSION_ID_OFFSET = RECORDING_ID_OFFSET + 8 SOURCE_IDENTITY_LENGTH_OFFSET = SESSION_ID_OFFSET + 4 SOURCE_IDENTITY_OFFSET = SOURCE_IDENTITY_LENGTH_OFFSET + 4 )
const NULL_RECORDING_ID = -1
const RECORDING_POSITION_COUNTER_TYPE_ID = 100
const (
RecordingIdNullValue = int32(-1)
)
replication flag used for duplication instead of extension, see Replicate and variants
Variables ¶
var ErrNotConnected = fmt.Errorf("not connected")
Functions ¶
func AddSessionIdToChannel ¶
AddSessionIdToChannel utility function to add a session to a channel URI On failure it will return the original and an error
func ConnectionControlFragmentHandler ¶
func ConnectionControlFragmentHandler(context *PollContext, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header)
ConnectionControlFragmentHandler is the connection handling specific fragment handler. This mechanism only alows us to pass results back via global state which we do in control.State
func DescriptorFragmentHandler ¶
func DescriptorFragmentHandler(context interface{}, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header)
DescriptorFragmentHandler is used to poll for descriptors (both recording and subscription) The current subscription handler doesn't provide a mechanism for passing a context so we return data via the control's Results
func FindCounterIdByRecording ¶
FindCounterIdByRecording finds the active counter id for a stream based on the recording id. Returns the counter id if found otherwise NullCounterId
func FindCounterIdBySession ¶
FindCounterIdBySession finds the active counterID for a stream based on the session id. Returns the counter id if found otherwise NullCounterId
func GetSourceIdentity ¶
GetSourceIdentity returns the source identity for the recording
func IsRecordingActive ¶
IsRecordingActive tells us if the recording counter is still active?
func LoggingAvailableImageListener ¶
LoggingAvailableImageListener from underlying aeron (called by default only in DEBUG)
func LoggingErrorListener ¶
func LoggingErrorListener(err error)
LoggingErrorListener is set by default and will report internal failures when returning an error is not possible
func LoggingNewPublicationListener ¶
LoggingNewPublicationListener from underlying aeron (called by default only in DEBUG)
func LoggingNewSubscriptionListener ¶
LoggingNewSubscriptionListener from underlying aeron (called by default only in DEBUG)
func LoggingRecordingEventProgressListener ¶
func LoggingRecordingEventProgressListener(rp *codecs.RecordingProgress)
LoggingRecordingEventProgressListener (called by default only in DEBUG)
func LoggingRecordingEventStartedListener ¶
func LoggingRecordingEventStartedListener(rs *codecs.RecordingStarted)
LoggingRecordingEventStartedListener (called by default only in DEBUG)
func LoggingRecordingEventStoppedListener ¶
func LoggingRecordingEventStoppedListener(rs *codecs.RecordingStopped)
LoggingRecordingEventStoppedListener (called by default only in DEBUG)
func LoggingRecordingSignalListener ¶
func LoggingRecordingSignalListener(rse *codecs.RecordingSignalEvent)
LoggingRecordingSignalListener (called by default only in DEBUG)
func LoggingUnavailableImageListener ¶
LoggingUnavailableImageListener from underlying aeron (called by default only in DEBUG)
func ReplaySessionIdToSessionId ¶
ReplaySessionIdToSessionId utility function to convert a ReplaySessionID into a session ID
Types ¶
type Archive ¶
type Archive struct { Options *Options // Configuration options SessionID int64 // Allocated by the archiving media driver Proxy *Proxy // For outgoing protocol messages (publish/request) Control *Control // For incoming protocol messages (subscribe/reponse) Events *RecordingEventsAdapter // For async recording events (must be enabled) Listeners *ArchiveListeners // Per client event listeners for async callbacks // contains filtered or unexported fields }
Archive is the primary interface to the media driver for managing archiving
func NewArchive ¶
NewArchive factory method to create an Archive instance You may provide your own archive Options or otherwise one will be created from defaults You may provide your own aeron Context or otherwise one will be created from defaults
func (*Archive) AddExclusivePublication ¶
func (archive *Archive) AddExclusivePublication(channel string, streamID int32) (*aeron.Publication, error)
AddExclusivePublication will add a new exclusive publication to the driver.
If such a publication already exists within ClientConductor the same instance will be returned.
Returns a channel, which can be used for either blocking or non-blocking want for media driver confirmation
func (*Archive) AddPublication ¶
AddPublication will add a new publication to the driver.
If such a publication already exists within ClientConductor the same instance will be returned.
Returns a channel, which can be used for either blocking or non-blocking want for media driver confirmation
func (*Archive) AddRecordedPublication ¶
func (archive *Archive) AddRecordedPublication(channel string, stream int32) (*aeron.Publication, error)
AddRecordedPublication to set it up forrecording.
This creates a per-session recording which can fail if: Sending the request fails - see error for detail Publication.IsOriginal() is false // FIXME: check semantics
func (*Archive) AddSubscription ¶
func (archive *Archive) AddSubscription(channel string, streamID int32) (*aeron.Subscription, error)
AddSubscription will add a new subscription to the driver.
Returns a channel, which can be used for either blocking or non-blocking wait for media driver confirmation
func (*Archive) AeronCncFileName ¶
AeronCncFileName returns the name of the Counters file
func (*Archive) AttachSegments ¶
AttachSegments to the beginning of a recording to restore history that was previously detached. Segment files must match the existing recording and join exactly to the start position of the recording they are being attached to.
Returns the count of attached segment files.
func (*Archive) BoundedReplay ¶
func (archive *Archive) BoundedReplay(recordingID int64, position int64, length int64, limitCounterID int32, replayStream int32, replayChannel string) (int64, error)
BoundedReplay to start a replay for a length in bytes of a recording from a position bounded by a position counter.
If the position is RecordingPositionNull (-1) then the stream will be replayed from the start.
If the length is RecordingLengthMax (2^31-1) the replay will follow a live recording.
If the length is RecordingLengthNull (-1) the replay will replay the whole stream of unknown length.
The lower 32-bits of the returned value contains the ImageSessionID() of the received replay. All 64-bits are required to uniquely identify the replay when calling StopReplay(). The lower 32-bits can be obtained by casting the int64 value to an int32. See ReplaySessionIdToSessionId() helper.
Returns a ReplaySessionID - the id of the replay session which will be the same as the Image sessionId of the received replay for correlation with the matching channel and stream id in the lower 32 bits
func (*Archive) ClientId ¶
ClientId returns the client identity that has been allocated for communicating with the media driver.
func (*Archive) Close ¶
Close will terminate client conductor and remove all publications and subscriptions from the media driver
func (*Archive) DeleteDetachedSegments ¶
DeleteDetachedSegments which have been previously detached from a recording.
Returns the count of deleted segment files.
func (*Archive) DetachSegments ¶
DetachSegments from the beginning of a recording up to the provided new start position. The new start position must be first byte position of a segment after the existing start position. It is not possible to detach segments which are active for recording or being replayed.
Returns error on failure, nil on success
func (*Archive) DisableRecordingEvents ¶
func (archive *Archive) DisableRecordingEvents()
DisableRecordingEvents stops recording events flowing
func (*Archive) EnableRecordingEvents ¶
EnableRecordingEvents starts recording events flowing Events are returned via the three callbacks which should be overridden from the default logging listeners defined in the Listeners
func (*Archive) ExtendRecording ¶
func (archive *Archive) ExtendRecording(recordingID int64, stream int32, sourceLocation codecs.SourceLocationEnum, autoStop bool, channel string) (int64, error)
ExtendRecording to extend an existing non-active recording of a channel and stream pairing.
The channel must be configured for the initial position from which it will be extended.
Returns the subscriptionId of the recording that can be passed to StopRecording()
func (*Archive) FindLastMatchingRecording ¶
func (archive *Archive) FindLastMatchingRecording(minRecordingID int64, sessionID int32, stream int32, channel string) (int64, error)
FindLastMatchingRecording that matches the given criteria.
Returns the RecordingID or RecordingIdNullValue if no match
func (*Archive) GetRecordingPosition ¶
GetRecordingPosition of the position recorded for an active recording.
Returns the recording position or if there are no active recordings then RecordingPositionNull.
func (*Archive) GetStartPosition ¶
GetStartPosition for a recording.
Return the start position of the recording or (0, error) on failure
func (*Archive) GetStopPosition ¶
GetStopPosition for a recording.
Return the stop position, or RecordingPositionNull if still active.
func (*Archive) IsRecordingEventsConnected ¶
IsRecordingEventsConnected returns true if the recording events subscription is connected.
func (*Archive) KeepAlive ¶
KeepAlive sends a simple packet to the media-driver
Returns error on failure, nil on success
func (*Archive) ListRecording ¶
func (archive *Archive) ListRecording(recordingID int64) (*codecs.RecordingDescriptor, error)
ListRecording will fetch the recording descriptor for a recordingID
Returns a single recording descriptor or nil if there was no match
func (*Archive) ListRecordingSubscriptions ¶
func (archive *Archive) ListRecordingSubscriptions(pseudoIndex int32, subscriptionCount int32, applyStreamID bool, stream int32, channelFragment string) ([]*codecs.RecordingSubscriptionDescriptor, error)
ListRecordingSubscriptions to list the active recording subscriptions in the archive create via StartRecording or ExtendRecording.
Returns a (possibly empty) list of RecordingSubscriptionDescriptors
func (*Archive) ListRecordings ¶
func (archive *Archive) ListRecordings(fromRecordingID int64, recordCount int32) ([]*codecs.RecordingDescriptor, error)
ListRecordings up to recordCount recording descriptors
func (*Archive) ListRecordingsForUri ¶
func (archive *Archive) ListRecordingsForUri(fromRecordingID int64, recordCount int32, channelFragment string, stream int32) ([]*codecs.RecordingDescriptor, error)
ListRecordingsForUri will list up to recordCount recording descriptors from fromRecordingID with a limit of recordCount for a given channel and stream.
Returns the number of descriptors consumed. If fromRecordingID is greater than the largest known we return 0.
func (*Archive) MigrateSegments ¶
MigrateSegments from a source recording and attach them to the beginning of a destination recording.
The source recording must match the destination recording for segment length, term length, mtu length, stream id, plus the stop position and term id of the source must join with the start position of the destination and be on a segment boundary.
The source recording will be effectively truncated back to its start position after the migration. Returns the count of attached segment files.
Returns the count of attached segment files.
func (*Archive) PollForErrorResponse ¶
PollForErrorResponse polls the response stream for an error draining the queue.
This may be used to check for errors, to dispatch async events, and to catch up on messages not for this session if for example the same channel and stream are in use by other sessions.
Returns an error if we detect an archive operation in progress and a count of how many messages were consumed
func (*Archive) PurgeRecording ¶
PurgeRecording of a stopped recording, i.e. mark recording as Invalid and delete the corresponding segment files. The space in the Catalog will be reclaimed upon compaction.
Returns error on failure, nil on success
func (*Archive) PurgeSegments ¶
PurgeSegments (detach and delete) to segments from the beginning of a recording up to the provided new start position. The new start position must be first byte position of a segment after the existing start position. It is not possible to detach segments which are active for recording or being replayed.
Returns the count of deleted segment files.
func (*Archive) RecordingEventsPoll ¶
RecordingEventsPoll is used to poll for recording events
func (*Archive) Replicate ¶
func (archive *Archive) Replicate(srcRecordingID int64, dstRecordingID int64, srcControlStreamID int32, srcControlChannel string, liveDestination string) (int64, error)
Replicate a recording from a source archive to a destination which can be considered a backup for a primary archive. The source recording will be replayed via the provided replay channel and use the original stream id. If the destination recording id is RecordingIdNullValue (-1) then a new destination recording is created, otherwise the provided destination recording id will be extended. The details of the source recording descriptor will be replicated.
For a source recording that is still active the replay can merge with the live stream and then follow it directly and no longer require the replay from the source. This would require a multicast live destination.
srcRecordingID recording id which must exist in the source archive. dstRecordingID recording to extend in the destination, otherwise {@link io.aeron.Aeron#NULL_VALUE}. srcControlStreamID remote control stream id for the source archive to instruct the replay on. srcControlChannel remote control channel for the source archive to instruct the replay on. liveDestination destination for the live stream if merge is required. nil for no merge.
Returns the replication session id which can be passed StopReplication()
func (*Archive) Replicate2 ¶
func (archive *Archive) Replicate2(srcRecordingID int64, dstRecordingID int64, stopPosition int64, channelTagID int64, srcControlStreamID int32, srcControlChannel string, liveDestination string, replicationChannel string) (int64, error)
Replicate2 will replicate a recording from a source archive to a destination which can be considered a backup for a primary archive. The source recording will be replayed via the provided replay channel and use the original stream id. If the destination recording id is RecordingIdNullValue (-1) then a new destination recording is created, otherwise the provided destination recording id will be extended. The details of the source recording descriptor will be replicated.
For a source recording that is still active the replay can merge with the live stream and then follow it directly and no longer require the replay from the source. This would require a multicast live destination.
srcRecordingID recording id which must exist in the source archive. dstRecordingID recording to extend in the destination, otherwise {@link io.aeron.Aeron#NULL_VALUE}. stopPosition position to stop the replication. RecordingPositionNull to stop at end of current recording. srcControlStreamID remote control stream id for the source archive to instruct the replay on. srcControlChannel remote control channel for the source archive to instruct the replay on. liveDestination destination for the live stream if merge is required. nil for no merge. replicationChannel channel over which the replication will occur. Empty or null for default channel.
Returns the replication session id which can be passed StopReplication()
func (*Archive) SetAeronDir ¶
SetAeronDir sets the root directory for media driver files
func (*Archive) SetAeronInterServiceTimeout ¶
SetAeronInterServiceTimeout sets the timeout for client heartbeat
func (*Archive) SetAeronMediaDriverTimeout ¶
SetAeronMediaDriverTimeout sets the timeout for keep alives to media driver
func (*Archive) SetAeronPublicationConnectionTimeout ¶
SetAeronPublicationConnectionTimeout sets the timeout for publications
func (*Archive) SetAeronResourceLingerTimeout ¶
SetAeronResourceLingerTimeout sets the timeout for resource cleanup after they're released
func (*Archive) StartRecording ¶
func (archive *Archive) StartRecording(channel string, stream int32, isLocal bool, autoStop bool) (int64, error)
StartRecording a channel/stream
Channels that include sessionId parameters are considered different than channels without sessionIds. If a publication matches both a sessionId specific channel recording and a non-sessionId specific recording, it will be recorded twice.
Returns (subscriptionId, nil) or (0, error) on failure. The SubscriptionId can be used in StopRecordingBySubscription()
func (*Archive) StartReplay ¶
func (archive *Archive) StartReplay(recordingID int64, position int64, length int64, replayChannel string, replayStream int32) (int64, error)
StartReplay for a length in bytes of a recording from a position.
If the position is RecordingPositionNull (-1) then the stream will be replayed from the start.
If the length is RecordingLengthMax (2^31-1) the replay will follow a live recording.
If the length is RecordingLengthNull (-1) the replay will replay the whole stream of unknown length.
The lower 32-bits of the returned value contains the ImageSessionID() of the received replay. All 64-bits are required to uniquely identify the replay when calling StopReplay(). The lower 32-bits can be obtained by casting the int64 value to an int32. See ReplaySessionIdToSessionId() helper.
Returns a ReplaySessionID - the id of the replay session which will be the same as the Image sessionId of the received replay for correlation with the matching channel and stream id in the lower 32 bits
func (*Archive) StopAllReplays ¶
StopAllReplays for a given recordingID
Returns error on failure, nil on success
func (*Archive) StopRecording ¶
StopRecording by Channel and Stream
Channels that include sessionId parameters are considered different than channels without sessionIds. Stopping recording on a channel without a sessionId parameter will not stop the recording of any sessionId specific recordings that use the same channel and streamID.
func (*Archive) StopRecordingByIdentity ¶
StopRecordingByIdentity for the RecordingIdentity looked up in ListRecording*()
Returns True if the recording was stopped or false if the recording is not currently active and (false, error) if something went wrong
func (*Archive) StopRecordingByPublication ¶
func (archive *Archive) StopRecordingByPublication(publication aeron.Publication) error
StopRecordingByPublication to stop recording a sessionId specific recording that pertains to the given Publication
Returns error on failure, nil on success
func (*Archive) StopRecordingBySubscriptionId ¶
StopRecordingBySubscriptionId as returned by StartRecording
Channels that include sessionId parameters are considered different than channels without sessionIds. Stopping recording on a channel without a sessionId parameter will not stop the recording of any sessionId specific recordings that use the same channel and streamID.
Returns error on failure, nil on success
func (*Archive) StopReplication ¶
StopReplication of a replication request
Returns error on failure, nil on success
func (*Archive) TaggedReplicate ¶
func (archive *Archive) TaggedReplicate(srcRecordingID int64, dstRecordingID int64, channelTagID int64, subscriptionTagID int64, srcControlStreamID int32, srcControlChannel string, liveDestination string) (int64, error)
TaggedReplicate to replicate a recording from a source archive to a destination which can be considered a backup for a primary archive. The source recording will be replayed via the provided replay channel and use the original stream id. If the destination recording id is RecordingIdNullValue (-1) then a new destination recording is created, otherwise the provided destination recording id will be extended. The details of the source recording descriptor will be replicated.
The subscription used in the archive will be tagged with the provided tags. For a source recording that is still active the replay can merge with the live stream and then follow it directly and no longer require the replay from the source. This would require a multicast live destination.
srcRecordingID recording id which must exist in the source archive. dstRecordingID recording to extend in the destination, otherwise {@link io.aeron.Aeron#NULL_VALUE}. channelTagID used to tag the replication subscription. subscriptionTagID used to tag the replication subscription. srcControlStreamID remote control stream id for the source archive to instruct the replay on. srcControlChannel remote control channel for the source archive to instruct the replay on. liveDestination destination for the live stream if merge is required. nil for no merge.
Returns the replication session id which can be passed StopReplication()
func (*Archive) TruncateRecording ¶
TruncateRecording of a stopped recording to a given position that is less than the stopped position. The provided position must be on a fragment boundary. Truncating a recording to the start position effectively deletes the recording. If the truncate operation will result in deleting segments then this will occur asynchronously. Before extending a truncated recording which has segments being asynchronously being deleted then you should await completion via the RecordingSignal Delete
Returns nil on success, error on failre
type ArchiveListeners ¶
type ArchiveListeners struct { // Called on errors for things like uncorrelated control messages ErrorListener func(error) // Async protocol events if enabled RecordingEventStartedListener func(*codecs.RecordingStarted) RecordingEventProgressListener func(*codecs.RecordingProgress) RecordingEventStoppedListener func(*codecs.RecordingStopped) // Async protocol event RecordingSignalListener func(*codecs.RecordingSignalEvent) // Async events from the underlying Aeron instance NewSubscriptionListener func(string, int32, int64) NewPublicationListener func(string, int32, int32, int64) AvailableImageListener func(aeron.Image) }
ArchiveListeners contains all the callbacks By default only the ErrorListener is set to a logging listener. If "archive" is at loglevel DEBUG then logging listeners are set for all listeners.
The signal listener will be called in normal operation if set.
The image listeners will be be called in normal operation if set.
The RecordingEvent listeners require RecordingEventEnable() to be called as well as having the RecordingEvent Poll() called by user code.
type CodecIds ¶
type CodecIds struct {
// contains filtered or unexported fields
}
CodecIds stops us allocating every object when we need only one Arguably SBE should give us a static value
type Control ¶
type Control struct { Subscription *aeron.Subscription State controlState // Polling results Results ControlResults // contains filtered or unexported fields }
Control contains everything required for the archive subscription/response side
func (*Control) Poll ¶
Poll for control response events. Returns number of fragments read during the operation. Zero if no events are available.
func (*Control) PollForDescriptors ¶
func (control *Control) PollForDescriptors(correlationID int64, sessionID int64, fragmentsWanted int32) error
PollForDescriptors to poll for recording descriptors, adding them to the set in the control
func (*Control) PollForErrorResponse ¶
PollForErrorResponse polls the response stream for errors or async events.
It will continue until it either receives an error or the queue is empty.
If any control messages are present then they will be discarded so this call should not be used unless there are no outstanding operations.
This may be used to check for errors, to dispatch async events, and to catch up on messages not for this session if for example the same channel and stream are in use by other sessions.
Returns an error if we detect an archive operation in progress and a count of how many messages were consumed
func (*Control) PollForResponse ¶
PollForResponse polls for a specific correlationID Returns (relevantId, nil) on success, (0 or relevantId, error) on failure More complex responses are contained in Control.ControlResponse after the call
type ControlResults ¶
type ControlResults struct { CorrelationId int64 ControlResponse *codecs.ControlResponse RecordingDescriptors []*codecs.RecordingDescriptor RecordingSubscriptionDescriptors []*codecs.RecordingSubscriptionDescriptor IsPollComplete bool FragmentsReceived int ErrorResponse error // Used by PollForErrorResponse }
ControlResults for holding state over a Control request/response The polling mechanism is not parameterized so we need to set state for the results as we go These pieces are filled out by various ResponsePollers which will set IsPollComplete to true
type FragmentHandlerWithListeners ¶
type FragmentHandlerWithListeners func(listeners *ArchiveListeners, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header)
FragmentHandlerWithListeners provides a FragmentHandler with ArchiveListeners
type Options ¶
type Options struct { RequestChannel string // [init] Control request publication channel RequestStream int32 // [init] and stream ResponseChannel string // [init] Control response subscription channel ResponseStream int32 // [init] and stream RecordingEventsChannel string // [enable] Recording progress events RecordingEventsStream int32 // [enable] and stream ArchiveLoglevel zapcore.Level // [runtime] via logging.SetLevel() AeronLoglevel zapcore.Level // [runtime] via logging.SetLevel() Timeout time.Duration // [runtime] How long to try sending/receiving control messages IdleStrategy idlestrategy.Idler // [runtime] Idlestrategy for sending/receiving control messages RangeChecking bool // [runtime] archive protocol marshalling checks AuthEnabled bool // [init] enable to require AuthConnect() over Connect() AuthCredentials []uint8 // [init] The credentials to be provided to AuthConnect() AuthChallenge []uint8 // [init] The challenge string we are to expect (checked iff not nil) AuthResponse []uint8 // [init] The challengeResponse we should provide }
Options are set by NewArchive() with the default options specified below
Some options may be changed dynamically by setting their values in Archive.Context.Options.* Those attributes marked [compile] must be changed at compile time Those attributes marked [init] must be changed before calling ArchiveConnect() Those attributes marked runtime may be changed at any time Those attributes marked [enable] may be changed when the feature is not enabled
func DefaultOptions ¶
func DefaultOptions() *Options
DefaultOptions creates and returns a new Options from the defaults.
type PollContext ¶
type PollContext struct {
// contains filtered or unexported fields
}
PollContext contains the information we'll need in the image Poll() callback to match against our request or for async events to invoke the appropriate listener
type Proxy ¶
type Proxy struct { Publication *aeron.Publication // contains filtered or unexported fields }
Proxy class for encapsulating encoding and sending of control protocol messages to an archive
func (*Proxy) AttachSegmentsRequest ¶
AttachSegmentsRequest packet and offer
func (*Proxy) AuthConnectRequest ¶
func (proxy *Proxy) AuthConnectRequest(correlationID int64, responseStream int32, responseChannel string, encodedCredentials []uint8) error
AuthConnectRequest packet and offer
func (*Proxy) BoundedReplayRequest ¶
func (proxy *Proxy) BoundedReplayRequest(correlationID int64, recordingID int64, position int64, length int64, limitCounterID int32, replayStream int32, replayChannel string) error
BoundedReplayRequest packet and offer
func (*Proxy) CatalogHeaderRequest ¶
func (proxy *Proxy) CatalogHeaderRequest(version int32, length int32, nextRecordingID int64, alignment int32) error
CatalogHeaderRequest packet and offer
func (*Proxy) ChallengeResponse ¶
ChallengeResponse packet and offer
func (*Proxy) CloseSessionRequest ¶
CloseSessionRequest packet and offer
func (*Proxy) ConnectRequest ¶
func (proxy *Proxy) ConnectRequest(correlationID int64, responseStream int32, responseChannel string) error
ConnectRequest packet and offer
func (*Proxy) DeleteDetachedSegmentsRequest ¶
DeleteDetachedSegmentsRequest packet and offer
func (*Proxy) DetachSegmentsRequest ¶
func (proxy *Proxy) DetachSegmentsRequest(correlationID int64, recordingID int64, newStartPosition int64) error
DetachSegmentsRequest packet and offer
func (*Proxy) ExtendRecordingRequest ¶
func (proxy *Proxy) ExtendRecordingRequest(correlationID int64, recordingID int64, stream int32, sourceLocation codecs.SourceLocationEnum, autoStop bool, channel string) error
ExtendRecordingRequest packet and offer Uses the more recent protocol addition ExtendRecordingRequest2 which added autoStop
func (*Proxy) FindLastMatchingRecordingRequest ¶
func (proxy *Proxy) FindLastMatchingRecordingRequest(correlationID int64, minRecordingID int64, sessionID int32, stream int32, channel string) error
FindLastMatchingRecordingRequest packet and offer
func (*Proxy) KeepAliveRequest ¶
KeepAliveRequest packet and offer
func (*Proxy) ListRecordingRequest ¶
ListRecordingRequest packet and offer Retrieves a recording descriptor for a specific recordingID
func (*Proxy) ListRecordingSubscriptionsRequest ¶
func (proxy *Proxy) ListRecordingSubscriptionsRequest(correlationID int64, pseudoIndex int32, subscriptionCount int32, applyStreamID bool, stream int32, channel string) error
ListRecordingSubscriptionsRequest packet and offer
func (*Proxy) ListRecordingsForUriRequest ¶
func (proxy *Proxy) ListRecordingsForUriRequest(correlationID int64, fromRecordingID int64, recordCount int32, stream int32, channel string) error
ListRecordingsForUriRequest packet and offer Lists up to recordCount recordings that match the channel and stream
func (*Proxy) ListRecordingsRequest ¶
func (proxy *Proxy) ListRecordingsRequest(correlationID int64, fromRecordingID int64, recordCount int32) error
ListRecordingsRequest packet and offer Lists up to recordCount recordings starting at fromRecordingID
func (*Proxy) MigrateSegmentsRequest ¶
func (proxy *Proxy) MigrateSegmentsRequest(correlationID int64, srcRecordingID int64, destRecordingID int64) error
MigrateSegmentsRequest packet and offer
func (*Proxy) Offer ¶
func (proxy *Proxy) Offer(buffer *atomic.Buffer, offset int32, length int32, reservedValueSupplier term.ReservedValueSupplier) int64
Offer to our request publication with a retry to allow time for the image establishment, some back pressure etc
func (*Proxy) PurgeRecordingRequest ¶
PurgeRecordingRequest packet and offer
func (*Proxy) PurgeSegmentsRequest ¶
func (proxy *Proxy) PurgeSegmentsRequest(correlationID int64, recordingID int64, newStartPosition int64) error
PurgeSegmentsRequest packet and offer
func (*Proxy) RecordingPositionRequest ¶
RecordingPositionRequest packet and offer
func (*Proxy) ReplayRequest ¶
func (proxy *Proxy) ReplayRequest(correlationID int64, recordingID int64, position int64, length int64, replayChannel string, replayStream int32) error
ReplayRequest packet and offer
func (*Proxy) ReplicateRequest ¶
func (proxy *Proxy) ReplicateRequest(correlationID int64, srcRecordingID int64, dstRecordingID int64, srcControlStreamID int32, srcControlChannel string, liveDestination string) error
ReplicateRequest packet and offer
func (*Proxy) ReplicateRequest2 ¶
func (proxy *Proxy) ReplicateRequest2(correlationID int64, srcRecordingID int64, dstRecordingID int64, stopPosition int64, channelTagID int64, srcControlStreamID int32, srcControlChannel string, liveDestination string, replicationChannel string) error
ReplicateRequest2 packet and offer
func (*Proxy) StartPositionRequest ¶
StartPositionRequest packet and offer
func (*Proxy) StartRecordingRequest ¶
func (proxy *Proxy) StartRecordingRequest(correlationID int64, stream int32, isLocal bool, autoStop bool, channel string) error
StartRecordingRequest packet and offer Uses the more recent protocol addition StartdRecordingRequest2 which added autoStop
func (*Proxy) StopAllReplaysRequest ¶
StopAllReplaysRequest packet and offer
func (*Proxy) StopPositionRequest ¶
StopPositionRequest packet and offer
func (*Proxy) StopRecordingByIdentityRequest ¶
StopRecordingByIdentityRequest packet and offer
func (*Proxy) StopRecordingRequest ¶
StopRecordingRequest packet and offer
func (*Proxy) StopRecordingSubscriptionRequest ¶
func (proxy *Proxy) StopRecordingSubscriptionRequest(correlationID int64, subscriptionID int64) error
StopRecordingSubscriptionRequest packet and offer
func (*Proxy) StopReplayRequest ¶
StopReplayRequest packet and offer
func (*Proxy) StopReplicationRequest ¶
StopReplicationRequest packet and offer
func (*Proxy) TaggedReplicateRequest ¶
func (proxy *Proxy) TaggedReplicateRequest(correlationID int64, srcRecordingID int64, dstRecordingID int64, channelTagID int64, subscriptionTagID int64, srcControlStreamID int32, srcControlChannel string, liveDestination string) error
TaggedReplicateRequest packet and offer
type RecordingEventsAdapter ¶
type RecordingEventsAdapter struct { Subscription *aeron.Subscription Enabled bool // contains filtered or unexported fields }
RecordingEventsAdapter is used to poll for recording events on a subscription.
func (*RecordingEventsAdapter) PollWithContext ¶
func (rea *RecordingEventsAdapter) PollWithContext(handler FragmentHandlerWithListeners, fragmentLimit int) int
PollWithContext the aeron subscription handler. If you pass it a nil handler it will use the builtin and call the Listeners If you ask for 0 fragments it will only return one fragment (if available)
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package codecs contains the archive protocol packet encoding and decoding
|
Package codecs contains the archive protocol packet encoding and decoding |
basic_recording_publisher
An example recorded publisher
|
An example recorded publisher |
basic_replay_merge
An example replayed subscriber
|
An example replayed subscriber |
basic_replayed_subscriber
An example replayed subscriber
|
An example replayed subscriber |