archive

package
v0.0.0-...-8b05ad1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2024 License: Apache-2.0 Imports: 14 Imported by: 5

README

aeron-go/archive

Implementation of Aeron Archive client in Go.

The Aeron Archive protocol is specified in xml using the Simple Binary Encoding (SBE)

Current State

The implementation is the second beta release. The API may still be changed for bug fixes or significant issues.

Design

Guidelines

The structure of the archive client library is heavily based on the Java archive library. It's hoped this will aid comprehension, bug fixing, feature additions etc.

Many design choices are also based upon the golang client library as the archive library is a layering on top of that.

Finally golang idioms are used where reasonable.

The archive library does not lock and concurrent calls on an archive client to invoke the aeron-archive protocol calls should be externally locked to ensure only one concurrent access.

Naming and other choices

Function names used in archive.go which contains the main API are based on the Java names so that developers can more easily switch between languages and so that any API documentation is more useful across implementations. Some differences exist due to capitalization requirements, lack of polymorphism, etc.

Function names used in encoders.go and proxy.go are based on the protocol specification. Where the protocol specifies a type that can be naturally represented in golang, the golang type is used used where possible until encoding. Examples include the use of bool rather than BooleanType and string over []uint8

Structure

The archive protocol is largely an RPC mechanism built on top of Aeron. Each Archive instance has it's own aeron instance running a proxy (publication/request) and control (subscription/response) pair. This mirrors the Java implementation. The proxy invokes the encoders to marshal packets using SBE.

Additionally there are some asynchronous events that can arrive on a recordingevents subscription. These are not enabled by default to avoid using resources when not required.

Synchronous unlocked API optionally using polling

The implementation provides a synchronous API as the underlying mechanism is largely an RPC mechanism and archive operations are not considered high frequency.

If needed it is simple in golang to wrap a synchronous API with a channel (see for example aeron.AddSubscription().

Some asynchronous events do exist (e.g, recording events and heartbeats) and to be delivered the polling mechanisms of RecordingEventsPoll() and PollForErrorResponse() are provided. These may be easily wrapped in a goroutine if desired,

Examples

Examples are provided for a basic_recording_publisher and basic_replayed_subscriber that interoperate with the Java examples.

Security

Enabling security is done via setting the various auth options. config_test.go and archive_test.go provide an example.

The actual semantics of the security are dependent upon which authenticator supplier you use and is tested against secure-logging-archiving-media-driver.

Backlog

  • godoc improvements
  • more testing
  • archive-media-driver mocking/execution
  • test cleanup in the media driver can be problematic
  • Auth should provide some callout mechanism
  • various FIXMEs
  • There seems to be problems if there are multiple archive instances. Particularly noticeable when calling aeron.Close()

Bigger picture issues

  • Java and C++ poll the counters to determine when a recording has actually started but the counters are not available in go. As a result we use delays and 'hope' which isn't ideal.
  • It would be nice to silence the OnAvailableCounter noise
  • Within aeron-go there are cases of Log.Fatalf(), see for example trying to add a publication on a "bogus" channel.

Release Notes

1.0b3 (in-progress)
  • Add PollForErrorResponse()
  • concurrency improvements by having the library lock around RPCs
1.0b2
  • Handle different archive clients using same channel/stream pairing
  • Provide Subscription.IsConnected() and IsRecordingEventsConnected()
  • Replace go-logging with zap to avoid reentrancy crashes in logging library
  • Improve the logging by downgrading in severity and message tone some warning level messages
  • Fix a race condition looking up correlationIDs on ControlResponse
  • Fix a return code error in StopRecordingById()
  • Fix unused argumentin StopRecording()
  • Cosmetic improvements for golint and staticcheck
  • Make the Listeners used for async events be per archive client instead of global

Documentation

Overview

Package archive provides API access to Aeron's archive-media-driver

Index

Constants

View Source
const (
	RecordingPositionNull = int64(-1)        // Replay a stream from the start.
	RecordingLengthNull   = int64(-1)        // Replay will follow a live recording
	RecordingLengthMax    = int64(2<<31 - 1) // Replay the whole stream
)

Constant values used to control behaviour of StartReplay

View Source
const (
	ControlStateError              = -1
	ControlStateNew                = iota
	ControlStateConnectRequestSent = iota
	ControlStateChallenged         = iota
	ControlStateConnectRequestOk   = iota
	ControlStateConnected          = iota
	ControlStateTimedOut           = iota
)

Archive Connection State used internally for connection establishment

View Source
const (
	RECORDING_ID_OFFSET           = 0
	SESSION_ID_OFFSET             = RECORDING_ID_OFFSET + 8
	SOURCE_IDENTITY_LENGTH_OFFSET = SESSION_ID_OFFSET + 4
	SOURCE_IDENTITY_OFFSET        = SOURCE_IDENTITY_LENGTH_OFFSET + 4
)
View Source
const NULL_RECORDING_ID = -1
View Source
const RECORDING_POSITION_COUNTER_TYPE_ID = 100
View Source
const (
	RecordingIdNullValue = int32(-1)
)

replication flag used for duplication instead of extension, see Replicate and variants

Variables

View Source
var ErrNotConnected = fmt.Errorf("not connected")

Functions

func AddSessionIdToChannel

func AddSessionIdToChannel(channel string, sessionID int32) (string, error)

AddSessionIdToChannel utility function to add a session to a channel URI On failure it will return the original and an error

func ConnectionControlFragmentHandler

func ConnectionControlFragmentHandler(context *PollContext, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header)

ConnectionControlFragmentHandler is the connection handling specific fragment handler. This mechanism only alows us to pass results back via global state which we do in control.State

func DescriptorFragmentHandler

func DescriptorFragmentHandler(context interface{}, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header)

DescriptorFragmentHandler is used to poll for descriptors (both recording and subscription) The current subscription handler doesn't provide a mechanism for passing a context so we return data via the control's Results

func FindCounterIdByRecording

func FindCounterIdByRecording(countersReader *counters.Reader, recordingId int64) int32

FindCounterIdByRecording finds the active counter id for a stream based on the recording id. Returns the counter id if found otherwise NullCounterId

func FindCounterIdBySession

func FindCounterIdBySession(countersReader *counters.Reader, sessionId int32) int32

FindCounterIdBySession finds the active counterID for a stream based on the session id. Returns the counter id if found otherwise NullCounterId

func GetRecordingId

func GetRecordingId(countersReader *counters.Reader, counterId int32) int64

func GetSourceIdentity

func GetSourceIdentity(countersReader *counters.Reader, counterId int32) string

GetSourceIdentity returns the source identity for the recording

func IsRecordingActive

func IsRecordingActive(countersReader *counters.Reader, counterId int32, recordingId int64) bool

IsRecordingActive tells us if the recording counter is still active?

func LoggingAvailableImageListener

func LoggingAvailableImageListener(image aeron.Image)

LoggingAvailableImageListener from underlying aeron (called by default only in DEBUG)

func LoggingErrorListener

func LoggingErrorListener(err error)

LoggingErrorListener is set by default and will report internal failures when returning an error is not possible

func LoggingNewPublicationListener

func LoggingNewPublicationListener(channel string, stream int32, session int32, regID int64)

LoggingNewPublicationListener from underlying aeron (called by default only in DEBUG)

func LoggingNewSubscriptionListener

func LoggingNewSubscriptionListener(channel string, stream int32, correlationID int64)

LoggingNewSubscriptionListener from underlying aeron (called by default only in DEBUG)

func LoggingRecordingEventProgressListener

func LoggingRecordingEventProgressListener(rp *codecs.RecordingProgress)

LoggingRecordingEventProgressListener (called by default only in DEBUG)

func LoggingRecordingEventStartedListener

func LoggingRecordingEventStartedListener(rs *codecs.RecordingStarted)

LoggingRecordingEventStartedListener (called by default only in DEBUG)

func LoggingRecordingEventStoppedListener

func LoggingRecordingEventStoppedListener(rs *codecs.RecordingStopped)

LoggingRecordingEventStoppedListener (called by default only in DEBUG)

func LoggingRecordingSignalListener

func LoggingRecordingSignalListener(rse *codecs.RecordingSignalEvent)

LoggingRecordingSignalListener (called by default only in DEBUG)

func LoggingUnavailableImageListener

func LoggingUnavailableImageListener(image aeron.Image)

LoggingUnavailableImageListener from underlying aeron (called by default only in DEBUG)

func ReplaySessionIdToSessionId

func ReplaySessionIdToSessionId(replaySessionID int64) int32

ReplaySessionIdToSessionId utility function to convert a ReplaySessionID into a session ID

Types

type Archive

type Archive struct {
	Options   *Options                // Configuration options
	SessionID int64                   // Allocated by the archiving media driver
	Proxy     *Proxy                  // For outgoing protocol messages (publish/request)
	Control   *Control                // For incoming protocol messages (subscribe/reponse)
	Events    *RecordingEventsAdapter // For async recording events (must be enabled)
	Listeners *ArchiveListeners       // Per client event listeners for async callbacks
	// contains filtered or unexported fields
}

Archive is the primary interface to the media driver for managing archiving

func NewArchive

func NewArchive(options *Options, context *aeron.Context) (*Archive, error)

NewArchive factory method to create an Archive instance You may provide your own archive Options or otherwise one will be created from defaults You may provide your own aeron Context or otherwise one will be created from defaults

func (*Archive) AddExclusivePublication

func (archive *Archive) AddExclusivePublication(channel string, streamID int32) (*aeron.Publication, error)

AddExclusivePublication will add a new exclusive publication to the driver.

If such a publication already exists within ClientConductor the same instance will be returned.

Returns a channel, which can be used for either blocking or non-blocking want for media driver confirmation

func (*Archive) AddPublication

func (archive *Archive) AddPublication(channel string, streamID int32) (*aeron.Publication, error)

AddPublication will add a new publication to the driver.

If such a publication already exists within ClientConductor the same instance will be returned.

Returns a channel, which can be used for either blocking or non-blocking want for media driver confirmation

func (*Archive) AddRecordedPublication

func (archive *Archive) AddRecordedPublication(channel string, stream int32) (*aeron.Publication, error)

AddRecordedPublication to set it up forrecording.

This creates a per-session recording which can fail if: Sending the request fails - see error for detail Publication.IsOriginal() is false // FIXME: check semantics

func (*Archive) AddSubscription

func (archive *Archive) AddSubscription(channel string, streamID int32) (*aeron.Subscription, error)

AddSubscription will add a new subscription to the driver.

Returns a channel, which can be used for either blocking or non-blocking wait for media driver confirmation

func (*Archive) Aeron

func (archive *Archive) Aeron() *aeron.Aeron

Aeron returns the archive's aeron client.

func (*Archive) AeronCncFileName

func (archive *Archive) AeronCncFileName() string

AeronCncFileName returns the name of the Counters file

func (*Archive) AttachSegments

func (archive *Archive) AttachSegments(recordingID int64) (int64, error)

AttachSegments to the beginning of a recording to restore history that was previously detached. Segment files must match the existing recording and join exactly to the start position of the recording they are being attached to.

Returns the count of attached segment files.

func (*Archive) BoundedReplay

func (archive *Archive) BoundedReplay(recordingID int64, position int64, length int64, limitCounterID int32, replayStream int32, replayChannel string) (int64, error)

BoundedReplay to start a replay for a length in bytes of a recording from a position bounded by a position counter.

If the position is RecordingPositionNull (-1) then the stream will be replayed from the start.

If the length is RecordingLengthMax (2^31-1) the replay will follow a live recording.

If the length is RecordingLengthNull (-1) the replay will replay the whole stream of unknown length.

The lower 32-bits of the returned value contains the ImageSessionID() of the received replay. All 64-bits are required to uniquely identify the replay when calling StopReplay(). The lower 32-bits can be obtained by casting the int64 value to an int32. See ReplaySessionIdToSessionId() helper.

Returns a ReplaySessionID - the id of the replay session which will be the same as the Image sessionId of the received replay for correlation with the matching channel and stream id in the lower 32 bits

func (*Archive) ClientId

func (archive *Archive) ClientId() int64

ClientId returns the client identity that has been allocated for communicating with the media driver.

func (*Archive) Close

func (archive *Archive) Close() error

Close will terminate client conductor and remove all publications and subscriptions from the media driver

func (*Archive) DeleteDetachedSegments

func (archive *Archive) DeleteDetachedSegments(recordingID int64) (int64, error)

DeleteDetachedSegments which have been previously detached from a recording.

Returns the count of deleted segment files.

func (*Archive) DetachSegments

func (archive *Archive) DetachSegments(recordingID int64, newStartPosition int64) error

DetachSegments from the beginning of a recording up to the provided new start position. The new start position must be first byte position of a segment after the existing start position. It is not possible to detach segments which are active for recording or being replayed.

Returns error on failure, nil on success

func (*Archive) DisableRecordingEvents

func (archive *Archive) DisableRecordingEvents()

DisableRecordingEvents stops recording events flowing

func (*Archive) EnableRecordingEvents

func (archive *Archive) EnableRecordingEvents() error

EnableRecordingEvents starts recording events flowing Events are returned via the three callbacks which should be overridden from the default logging listeners defined in the Listeners

func (*Archive) ExtendRecording

func (archive *Archive) ExtendRecording(recordingID int64, stream int32, sourceLocation codecs.SourceLocationEnum, autoStop bool, channel string) (int64, error)

ExtendRecording to extend an existing non-active recording of a channel and stream pairing.

The channel must be configured for the initial position from which it will be extended.

Returns the subscriptionId of the recording that can be passed to StopRecording()

func (*Archive) FindLastMatchingRecording

func (archive *Archive) FindLastMatchingRecording(minRecordingID int64, sessionID int32, stream int32, channel string) (int64, error)

FindLastMatchingRecording that matches the given criteria.

Returns the RecordingID or RecordingIdNullValue if no match

func (*Archive) GetRecordingPosition

func (archive *Archive) GetRecordingPosition(recordingID int64) (int64, error)

GetRecordingPosition of the position recorded for an active recording.

Returns the recording position or if there are no active recordings then RecordingPositionNull.

func (*Archive) GetStartPosition

func (archive *Archive) GetStartPosition(recordingID int64) (int64, error)

GetStartPosition for a recording.

Return the start position of the recording or (0, error) on failure

func (*Archive) GetStopPosition

func (archive *Archive) GetStopPosition(recordingID int64) (int64, error)

GetStopPosition for a recording.

Return the stop position, or RecordingPositionNull if still active.

func (*Archive) IsRecordingEventsConnected

func (archive *Archive) IsRecordingEventsConnected() (bool, error)

IsRecordingEventsConnected returns true if the recording events subscription is connected.

func (*Archive) KeepAlive

func (archive *Archive) KeepAlive() error

KeepAlive sends a simple packet to the media-driver

Returns error on failure, nil on success

func (*Archive) ListRecording

func (archive *Archive) ListRecording(recordingID int64) (*codecs.RecordingDescriptor, error)

ListRecording will fetch the recording descriptor for a recordingID

Returns a single recording descriptor or nil if there was no match

func (*Archive) ListRecordingSubscriptions

func (archive *Archive) ListRecordingSubscriptions(pseudoIndex int32, subscriptionCount int32, applyStreamID bool, stream int32, channelFragment string) ([]*codecs.RecordingSubscriptionDescriptor, error)

ListRecordingSubscriptions to list the active recording subscriptions in the archive create via StartRecording or ExtendRecording.

Returns a (possibly empty) list of RecordingSubscriptionDescriptors

func (*Archive) ListRecordings

func (archive *Archive) ListRecordings(fromRecordingID int64, recordCount int32) ([]*codecs.RecordingDescriptor, error)

ListRecordings up to recordCount recording descriptors

func (*Archive) ListRecordingsForUri

func (archive *Archive) ListRecordingsForUri(fromRecordingID int64, recordCount int32, channelFragment string, stream int32) ([]*codecs.RecordingDescriptor, error)

ListRecordingsForUri will list up to recordCount recording descriptors from fromRecordingID with a limit of recordCount for a given channel and stream.

Returns the number of descriptors consumed. If fromRecordingID is greater than the largest known we return 0.

func (*Archive) MigrateSegments

func (archive *Archive) MigrateSegments(recordingID int64, position int64) (int64, error)

MigrateSegments from a source recording and attach them to the beginning of a destination recording.

The source recording must match the destination recording for segment length, term length, mtu length, stream id, plus the stop position and term id of the source must join with the start position of the destination and be on a segment boundary.

The source recording will be effectively truncated back to its start position after the migration. Returns the count of attached segment files.

Returns the count of attached segment files.

func (*Archive) PollForErrorResponse

func (archive *Archive) PollForErrorResponse() (int, error)

PollForErrorResponse polls the response stream for an error draining the queue.

This may be used to check for errors, to dispatch async events, and to catch up on messages not for this session if for example the same channel and stream are in use by other sessions.

Returns an error if we detect an archive operation in progress and a count of how many messages were consumed

func (*Archive) PurgeRecording

func (archive *Archive) PurgeRecording(recordingID int64) error

PurgeRecording of a stopped recording, i.e. mark recording as Invalid and delete the corresponding segment files. The space in the Catalog will be reclaimed upon compaction.

Returns error on failure, nil on success

func (*Archive) PurgeSegments

func (archive *Archive) PurgeSegments(recordingID int64, newStartPosition int64) (int64, error)

PurgeSegments (detach and delete) to segments from the beginning of a recording up to the provided new start position. The new start position must be first byte position of a segment after the existing start position. It is not possible to detach segments which are active for recording or being replayed.

Returns the count of deleted segment files.

func (*Archive) RecordingEventsPoll

func (archive *Archive) RecordingEventsPoll() int

RecordingEventsPoll is used to poll for recording events

func (*Archive) Replicate

func (archive *Archive) Replicate(srcRecordingID int64, dstRecordingID int64, srcControlStreamID int32, srcControlChannel string, liveDestination string) (int64, error)

Replicate a recording from a source archive to a destination which can be considered a backup for a primary archive. The source recording will be replayed via the provided replay channel and use the original stream id. If the destination recording id is RecordingIdNullValue (-1) then a new destination recording is created, otherwise the provided destination recording id will be extended. The details of the source recording descriptor will be replicated.

For a source recording that is still active the replay can merge with the live stream and then follow it directly and no longer require the replay from the source. This would require a multicast live destination.

srcRecordingID recording id which must exist in the source archive. dstRecordingID recording to extend in the destination, otherwise {@link io.aeron.Aeron#NULL_VALUE}. srcControlStreamID remote control stream id for the source archive to instruct the replay on. srcControlChannel remote control channel for the source archive to instruct the replay on. liveDestination destination for the live stream if merge is required. nil for no merge.

Returns the replication session id which can be passed StopReplication()

func (*Archive) Replicate2

func (archive *Archive) Replicate2(srcRecordingID int64, dstRecordingID int64, stopPosition int64, channelTagID int64, srcControlStreamID int32, srcControlChannel string, liveDestination string, replicationChannel string) (int64, error)

Replicate2 will replicate a recording from a source archive to a destination which can be considered a backup for a primary archive. The source recording will be replayed via the provided replay channel and use the original stream id. If the destination recording id is RecordingIdNullValue (-1) then a new destination recording is created, otherwise the provided destination recording id will be extended. The details of the source recording descriptor will be replicated.

For a source recording that is still active the replay can merge with the live stream and then follow it directly and no longer require the replay from the source. This would require a multicast live destination.

srcRecordingID recording id which must exist in the source archive. dstRecordingID recording to extend in the destination, otherwise {@link io.aeron.Aeron#NULL_VALUE}. stopPosition position to stop the replication. RecordingPositionNull to stop at end of current recording. srcControlStreamID remote control stream id for the source archive to instruct the replay on. srcControlChannel remote control channel for the source archive to instruct the replay on. liveDestination destination for the live stream if merge is required. nil for no merge. replicationChannel channel over which the replication will occur. Empty or null for default channel.

Returns the replication session id which can be passed StopReplication()

func (*Archive) SetAeronDir

func (archive *Archive) SetAeronDir(dir string)

SetAeronDir sets the root directory for media driver files

func (*Archive) SetAeronInterServiceTimeout

func (archive *Archive) SetAeronInterServiceTimeout(timeout time.Duration)

SetAeronInterServiceTimeout sets the timeout for client heartbeat

func (*Archive) SetAeronMediaDriverTimeout

func (archive *Archive) SetAeronMediaDriverTimeout(timeout time.Duration)

SetAeronMediaDriverTimeout sets the timeout for keep alives to media driver

func (*Archive) SetAeronPublicationConnectionTimeout

func (archive *Archive) SetAeronPublicationConnectionTimeout(timeout time.Duration)

SetAeronPublicationConnectionTimeout sets the timeout for publications

func (*Archive) SetAeronResourceLingerTimeout

func (archive *Archive) SetAeronResourceLingerTimeout(timeout time.Duration)

SetAeronResourceLingerTimeout sets the timeout for resource cleanup after they're released

func (*Archive) StartRecording

func (archive *Archive) StartRecording(channel string, stream int32, isLocal bool, autoStop bool) (int64, error)

StartRecording a channel/stream

Channels that include sessionId parameters are considered different than channels without sessionIds. If a publication matches both a sessionId specific channel recording and a non-sessionId specific recording, it will be recorded twice.

Returns (subscriptionId, nil) or (0, error) on failure. The SubscriptionId can be used in StopRecordingBySubscription()

func (*Archive) StartReplay

func (archive *Archive) StartReplay(recordingID int64, position int64, length int64, replayChannel string, replayStream int32) (int64, error)

StartReplay for a length in bytes of a recording from a position.

If the position is RecordingPositionNull (-1) then the stream will be replayed from the start.

If the length is RecordingLengthMax (2^31-1) the replay will follow a live recording.

If the length is RecordingLengthNull (-1) the replay will replay the whole stream of unknown length.

The lower 32-bits of the returned value contains the ImageSessionID() of the received replay. All 64-bits are required to uniquely identify the replay when calling StopReplay(). The lower 32-bits can be obtained by casting the int64 value to an int32. See ReplaySessionIdToSessionId() helper.

Returns a ReplaySessionID - the id of the replay session which will be the same as the Image sessionId of the received replay for correlation with the matching channel and stream id in the lower 32 bits

func (*Archive) StopAllReplays

func (archive *Archive) StopAllReplays(recordingID int64) error

StopAllReplays for a given recordingID

Returns error on failure, nil on success

func (*Archive) StopRecording

func (archive *Archive) StopRecording(channel string, stream int32) error

StopRecording by Channel and Stream

Channels that include sessionId parameters are considered different than channels without sessionIds. Stopping recording on a channel without a sessionId parameter will not stop the recording of any sessionId specific recordings that use the same channel and streamID.

func (*Archive) StopRecordingByIdentity

func (archive *Archive) StopRecordingByIdentity(recordingID int64) (bool, error)

StopRecordingByIdentity for the RecordingIdentity looked up in ListRecording*()

Returns True if the recording was stopped or false if the recording is not currently active and (false, error) if something went wrong

func (*Archive) StopRecordingByPublication

func (archive *Archive) StopRecordingByPublication(publication aeron.Publication) error

StopRecordingByPublication to stop recording a sessionId specific recording that pertains to the given Publication

Returns error on failure, nil on success

func (*Archive) StopRecordingBySubscriptionId

func (archive *Archive) StopRecordingBySubscriptionId(subscriptionID int64) error

StopRecordingBySubscriptionId as returned by StartRecording

Channels that include sessionId parameters are considered different than channels without sessionIds. Stopping recording on a channel without a sessionId parameter will not stop the recording of any sessionId specific recordings that use the same channel and streamID.

Returns error on failure, nil on success

func (*Archive) StopReplay

func (archive *Archive) StopReplay(replaySessionID int64) error

StopReplay for a session.

Returns error on failure, nil on success

func (*Archive) StopReplication

func (archive *Archive) StopReplication(replicationID int64) error

StopReplication of a replication request

Returns error on failure, nil on success

func (*Archive) TaggedReplicate

func (archive *Archive) TaggedReplicate(srcRecordingID int64, dstRecordingID int64, channelTagID int64, subscriptionTagID int64, srcControlStreamID int32, srcControlChannel string, liveDestination string) (int64, error)

TaggedReplicate to replicate a recording from a source archive to a destination which can be considered a backup for a primary archive. The source recording will be replayed via the provided replay channel and use the original stream id. If the destination recording id is RecordingIdNullValue (-1) then a new destination recording is created, otherwise the provided destination recording id will be extended. The details of the source recording descriptor will be replicated.

The subscription used in the archive will be tagged with the provided tags. For a source recording that is still active the replay can merge with the live stream and then follow it directly and no longer require the replay from the source. This would require a multicast live destination.

srcRecordingID recording id which must exist in the source archive. dstRecordingID recording to extend in the destination, otherwise {@link io.aeron.Aeron#NULL_VALUE}. channelTagID used to tag the replication subscription. subscriptionTagID used to tag the replication subscription. srcControlStreamID remote control stream id for the source archive to instruct the replay on. srcControlChannel remote control channel for the source archive to instruct the replay on. liveDestination destination for the live stream if merge is required. nil for no merge.

Returns the replication session id which can be passed StopReplication()

func (*Archive) TruncateRecording

func (archive *Archive) TruncateRecording(recordingID int64, position int64) error

TruncateRecording of a stopped recording to a given position that is less than the stopped position. The provided position must be on a fragment boundary. Truncating a recording to the start position effectively deletes the recording. If the truncate operation will result in deleting segments then this will occur asynchronously. Before extending a truncated recording which has segments being asynchronously being deleted then you should await completion via the RecordingSignal Delete

Returns nil on success, error on failre

type ArchiveListeners

type ArchiveListeners struct {
	// Called on errors for things like uncorrelated control messages
	ErrorListener func(error)

	// Async protocol events if enabled
	RecordingEventStartedListener  func(*codecs.RecordingStarted)
	RecordingEventProgressListener func(*codecs.RecordingProgress)
	RecordingEventStoppedListener  func(*codecs.RecordingStopped)

	// Async protocol event
	RecordingSignalListener func(*codecs.RecordingSignalEvent)

	// Async events from the underlying Aeron instance
	NewSubscriptionListener  func(string, int32, int64)
	NewPublicationListener   func(string, int32, int32, int64)
	AvailableImageListener   func(aeron.Image)
	UnavailableImageListener func(aeron.Image)
}

ArchiveListeners contains all the callbacks By default only the ErrorListener is set to a logging listener. If "archive" is at loglevel DEBUG then logging listeners are set for all listeners.

The signal listener will be called in normal operation if set.

The image listeners will be be called in normal operation if set.

The RecordingEvent listeners require RecordingEventEnable() to be called as well as having the RecordingEvent Poll() called by user code.

type CodecIds

type CodecIds struct {
	// contains filtered or unexported fields
}

CodecIds stops us allocating every object when we need only one Arguably SBE should give us a static value

type Control

type Control struct {
	Subscription *aeron.Subscription
	State        controlState

	// Polling results
	Results ControlResults
	// contains filtered or unexported fields
}

Control contains everything required for the archive subscription/response side

func (*Control) Poll

func (control *Control) Poll() (workCount int)

Poll for control response events. Returns number of fragments read during the operation. Zero if no events are available.

func (*Control) PollForDescriptors

func (control *Control) PollForDescriptors(correlationID int64, sessionID int64, fragmentsWanted int32) error

PollForDescriptors to poll for recording descriptors, adding them to the set in the control

func (*Control) PollForErrorResponse

func (control *Control) PollForErrorResponse() (int, error)

PollForErrorResponse polls the response stream for errors or async events.

It will continue until it either receives an error or the queue is empty.

If any control messages are present then they will be discarded so this call should not be used unless there are no outstanding operations.

This may be used to check for errors, to dispatch async events, and to catch up on messages not for this session if for example the same channel and stream are in use by other sessions.

Returns an error if we detect an archive operation in progress and a count of how many messages were consumed

func (*Control) PollForResponse

func (control *Control) PollForResponse(correlationID int64, sessionID int64) (int64, error)

PollForResponse polls for a specific correlationID Returns (relevantId, nil) on success, (0 or relevantId, error) on failure More complex responses are contained in Control.ControlResponse after the call

type ControlResults

type ControlResults struct {
	CorrelationId                    int64
	ControlResponse                  *codecs.ControlResponse
	RecordingDescriptors             []*codecs.RecordingDescriptor
	RecordingSubscriptionDescriptors []*codecs.RecordingSubscriptionDescriptor
	IsPollComplete                   bool
	FragmentsReceived                int
	ErrorResponse                    error // Used by PollForErrorResponse
}

ControlResults for holding state over a Control request/response The polling mechanism is not parameterized so we need to set state for the results as we go These pieces are filled out by various ResponsePollers which will set IsPollComplete to true

type FragmentHandlerWithListeners

type FragmentHandlerWithListeners func(listeners *ArchiveListeners, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header)

FragmentHandlerWithListeners provides a FragmentHandler with ArchiveListeners

type Options

type Options struct {
	RequestChannel         string             // [init] Control request publication channel
	RequestStream          int32              // [init] and stream
	ResponseChannel        string             // [init] Control response subscription channel
	ResponseStream         int32              // [init] and stream
	RecordingEventsChannel string             // [enable] Recording progress events
	RecordingEventsStream  int32              // [enable] and stream
	ArchiveLoglevel        zapcore.Level      // [runtime] via logging.SetLevel()
	AeronLoglevel          zapcore.Level      // [runtime] via logging.SetLevel()
	Timeout                time.Duration      // [runtime] How long to try sending/receiving control messages
	IdleStrategy           idlestrategy.Idler // [runtime] Idlestrategy for sending/receiving control messages
	RangeChecking          bool               // [runtime] archive protocol marshalling checks
	AuthEnabled            bool               // [init] enable to require AuthConnect() over Connect()
	AuthCredentials        []uint8            // [init] The credentials to be provided to AuthConnect()
	AuthChallenge          []uint8            // [init] The challenge string we are to expect (checked iff not nil)
	AuthResponse           []uint8            // [init] The challengeResponse we should provide
}

Options are set by NewArchive() with the default options specified below

Some options may be changed dynamically by setting their values in Archive.Context.Options.* Those attributes marked [compile] must be changed at compile time Those attributes marked [init] must be changed before calling ArchiveConnect() Those attributes marked runtime may be changed at any time Those attributes marked [enable] may be changed when the feature is not enabled

func DefaultOptions

func DefaultOptions() *Options

DefaultOptions creates and returns a new Options from the defaults.

type PollContext

type PollContext struct {
	// contains filtered or unexported fields
}

PollContext contains the information we'll need in the image Poll() callback to match against our request or for async events to invoke the appropriate listener

type Proxy

type Proxy struct {
	Publication *aeron.Publication
	// contains filtered or unexported fields
}

Proxy class for encapsulating encoding and sending of control protocol messages to an archive

func (*Proxy) AttachSegmentsRequest

func (proxy *Proxy) AttachSegmentsRequest(correlationID int64, recordingID int64) error

AttachSegmentsRequest packet and offer

func (*Proxy) AuthConnectRequest

func (proxy *Proxy) AuthConnectRequest(correlationID int64, responseStream int32, responseChannel string, encodedCredentials []uint8) error

AuthConnectRequest packet and offer

func (*Proxy) BoundedReplayRequest

func (proxy *Proxy) BoundedReplayRequest(correlationID int64, recordingID int64, position int64, length int64, limitCounterID int32, replayStream int32, replayChannel string) error

BoundedReplayRequest packet and offer

func (*Proxy) CatalogHeaderRequest

func (proxy *Proxy) CatalogHeaderRequest(version int32, length int32, nextRecordingID int64, alignment int32) error

CatalogHeaderRequest packet and offer

func (*Proxy) ChallengeResponse

func (proxy *Proxy) ChallengeResponse(correlationID int64, encodedCredentials []uint8) error

ChallengeResponse packet and offer

func (*Proxy) CloseSessionRequest

func (proxy *Proxy) CloseSessionRequest() error

CloseSessionRequest packet and offer

func (*Proxy) ConnectRequest

func (proxy *Proxy) ConnectRequest(correlationID int64, responseStream int32, responseChannel string) error

ConnectRequest packet and offer

func (*Proxy) DeleteDetachedSegmentsRequest

func (proxy *Proxy) DeleteDetachedSegmentsRequest(correlationID int64, recordingID int64) error

DeleteDetachedSegmentsRequest packet and offer

func (*Proxy) DetachSegmentsRequest

func (proxy *Proxy) DetachSegmentsRequest(correlationID int64, recordingID int64, newStartPosition int64) error

DetachSegmentsRequest packet and offer

func (*Proxy) ExtendRecordingRequest

func (proxy *Proxy) ExtendRecordingRequest(correlationID int64, recordingID int64, stream int32, sourceLocation codecs.SourceLocationEnum, autoStop bool, channel string) error

ExtendRecordingRequest packet and offer Uses the more recent protocol addition ExtendRecordingRequest2 which added autoStop

func (*Proxy) FindLastMatchingRecordingRequest

func (proxy *Proxy) FindLastMatchingRecordingRequest(correlationID int64, minRecordingID int64, sessionID int32, stream int32, channel string) error

FindLastMatchingRecordingRequest packet and offer

func (*Proxy) KeepAliveRequest

func (proxy *Proxy) KeepAliveRequest(correlationID int64) error

KeepAliveRequest packet and offer

func (*Proxy) ListRecordingRequest

func (proxy *Proxy) ListRecordingRequest(correlationID int64, fromRecordingID int64) error

ListRecordingRequest packet and offer Retrieves a recording descriptor for a specific recordingID

func (*Proxy) ListRecordingSubscriptionsRequest

func (proxy *Proxy) ListRecordingSubscriptionsRequest(correlationID int64, pseudoIndex int32, subscriptionCount int32, applyStreamID bool, stream int32, channel string) error

ListRecordingSubscriptionsRequest packet and offer

func (*Proxy) ListRecordingsForUriRequest

func (proxy *Proxy) ListRecordingsForUriRequest(correlationID int64, fromRecordingID int64, recordCount int32, stream int32, channel string) error

ListRecordingsForUriRequest packet and offer Lists up to recordCount recordings that match the channel and stream

func (*Proxy) ListRecordingsRequest

func (proxy *Proxy) ListRecordingsRequest(correlationID int64, fromRecordingID int64, recordCount int32) error

ListRecordingsRequest packet and offer Lists up to recordCount recordings starting at fromRecordingID

func (*Proxy) MigrateSegmentsRequest

func (proxy *Proxy) MigrateSegmentsRequest(correlationID int64, srcRecordingID int64, destRecordingID int64) error

MigrateSegmentsRequest packet and offer

func (*Proxy) Offer

func (proxy *Proxy) Offer(buffer *atomic.Buffer, offset int32, length int32, reservedValueSupplier term.ReservedValueSupplier) int64

Offer to our request publication with a retry to allow time for the image establishment, some back pressure etc

func (*Proxy) PurgeRecordingRequest

func (proxy *Proxy) PurgeRecordingRequest(correlationID int64, replaySessionID int64) error

PurgeRecordingRequest packet and offer

func (*Proxy) PurgeSegmentsRequest

func (proxy *Proxy) PurgeSegmentsRequest(correlationID int64, recordingID int64, newStartPosition int64) error

PurgeSegmentsRequest packet and offer

func (*Proxy) RecordingPositionRequest

func (proxy *Proxy) RecordingPositionRequest(correlationID int64, recordingID int64) error

RecordingPositionRequest packet and offer

func (*Proxy) ReplayRequest

func (proxy *Proxy) ReplayRequest(correlationID int64, recordingID int64, position int64, length int64, replayChannel string, replayStream int32) error

ReplayRequest packet and offer

func (*Proxy) ReplicateRequest

func (proxy *Proxy) ReplicateRequest(correlationID int64, srcRecordingID int64, dstRecordingID int64, srcControlStreamID int32, srcControlChannel string, liveDestination string) error

ReplicateRequest packet and offer

func (*Proxy) ReplicateRequest2

func (proxy *Proxy) ReplicateRequest2(correlationID int64, srcRecordingID int64, dstRecordingID int64, stopPosition int64, channelTagID int64, srcControlStreamID int32, srcControlChannel string, liveDestination string, replicationChannel string) error

ReplicateRequest2 packet and offer

func (*Proxy) StartPositionRequest

func (proxy *Proxy) StartPositionRequest(correlationID int64, recordingID int64) error

StartPositionRequest packet and offer

func (*Proxy) StartRecordingRequest

func (proxy *Proxy) StartRecordingRequest(correlationID int64, stream int32, isLocal bool, autoStop bool, channel string) error

StartRecordingRequest packet and offer Uses the more recent protocol addition StartdRecordingRequest2 which added autoStop

func (*Proxy) StopAllReplaysRequest

func (proxy *Proxy) StopAllReplaysRequest(correlationID int64, recordingID int64) error

StopAllReplaysRequest packet and offer

func (*Proxy) StopPositionRequest

func (proxy *Proxy) StopPositionRequest(correlationID int64, recordingID int64) error

StopPositionRequest packet and offer

func (*Proxy) StopRecordingByIdentityRequest

func (proxy *Proxy) StopRecordingByIdentityRequest(correlationID int64, recordingID int64) error

StopRecordingByIdentityRequest packet and offer

func (*Proxy) StopRecordingRequest

func (proxy *Proxy) StopRecordingRequest(correlationID int64, stream int32, channel string) error

StopRecordingRequest packet and offer

func (*Proxy) StopRecordingSubscriptionRequest

func (proxy *Proxy) StopRecordingSubscriptionRequest(correlationID int64, subscriptionID int64) error

StopRecordingSubscriptionRequest packet and offer

func (*Proxy) StopReplayRequest

func (proxy *Proxy) StopReplayRequest(correlationID int64, replaySessionID int64) error

StopReplayRequest packet and offer

func (*Proxy) StopReplicationRequest

func (proxy *Proxy) StopReplicationRequest(correlationID int64, replicationID int64) error

StopReplicationRequest packet and offer

func (*Proxy) TaggedReplicateRequest

func (proxy *Proxy) TaggedReplicateRequest(correlationID int64, srcRecordingID int64, dstRecordingID int64, channelTagID int64, subscriptionTagID int64, srcControlStreamID int32, srcControlChannel string, liveDestination string) error

TaggedReplicateRequest packet and offer

func (*Proxy) TruncateRecordingRequest

func (proxy *Proxy) TruncateRecordingRequest(correlationID int64, recordingID int64, position int64) error

TruncateRecordingRequest packet and offer

type RecordingEventsAdapter

type RecordingEventsAdapter struct {
	Subscription *aeron.Subscription
	Enabled      bool
	// contains filtered or unexported fields
}

RecordingEventsAdapter is used to poll for recording events on a subscription.

func (*RecordingEventsAdapter) PollWithContext

func (rea *RecordingEventsAdapter) PollWithContext(handler FragmentHandlerWithListeners, fragmentLimit int) int

PollWithContext the aeron subscription handler. If you pass it a nil handler it will use the builtin and call the Listeners If you ask for 0 fragments it will only return one fragment (if available)

Directories

Path Synopsis
Package codecs contains the archive protocol packet encoding and decoding
Package codecs contains the archive protocol packet encoding and decoding
basic_recording_publisher
An example recorded publisher
An example recorded publisher
basic_replay_merge
An example replayed subscriber
An example replayed subscriber
basic_replayed_subscriber
An example replayed subscriber
An example replayed subscriber

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL