Documentation
¶
Index ¶
- Constants
- type ReplayMerge
- func (rm *ReplayMerge) Close()
- func (rm *ReplayMerge) DoWork() (workCount int, err error)
- func (rm *ReplayMerge) HasFailed() bool
- func (rm *ReplayMerge) Image() aeron.Image
- func (rm *ReplayMerge) IsLiveAdded() bool
- func (rm *ReplayMerge) IsMerged() bool
- func (rm *ReplayMerge) Poll(fragmentHandler term.FragmentHandler, fragmentLimit int) (workCount int, err error)
- func (rm *ReplayMerge) Subscription() *aeron.Subscription
- type State
Constants ¶
const LiveAddMaxWindow = int32(32 * 1024 * 1024)
const MergeProgressTimeoutDefaultMs = int64(5 * time.Millisecond)
const ReplayRemoveThreshold = int64(0)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ReplayMerge ¶
type ReplayMerge struct {
// contains filtered or unexported fields
}
ReplayMerge replays a recorded stream from a starting position and merge with live stream for a full history of a stream.
Once constructed either of Poll or DoWork, interleaved with consumption of the Image, should be called in a duty cycle loop until IsMerged is true. After which the ReplayMerge can be closed and continued usage can be made of the Image or its parent Subscription. If an exception occurs or progress stops, the merge will fail and HasFailed will be true.
If the endpoint on the replay destination uses a port of 0, then the OS will assign a port from the ephemeral range and this will be added to the replay channel for instructing the archive.
NOTE: Merging is only supported with UDP streams.
func NewReplayMerge ¶
func NewReplayMerge( subscription *aeron.Subscription, archive *archive.Archive, replayChannel string, replayDestination string, liveDestination string, recordingId int64, startPosition int64, mergeProgressTimeoutMs int64) (rm *ReplayMerge, err error)
NewReplayMerge creates a ReplayMerge to manage the merging of a replayed stream and switching over to live stream as appropriate.
Parameters:
subscription to use for the replay and live stream. Must be a multi-destination subscription. archive to use for the replay. replayChannel to as a template for what the archive will use. replayDestination to send the replay to and the destination added by the Subscription. liveDestination for the live stream and the destination added by the Subscription. recordingId for the replay. startPosition for the replay. epochClock to use for progress checks. mergeProgressTimeoutMs to use for progress checks.
func (*ReplayMerge) Close ¶
func (rm *ReplayMerge) Close()
Close closes and stops any active replay. Will remove the replay destination from the subscription. This operation Will NOT remove the live destination if it has been added, so it can be used for live consumption.
func (*ReplayMerge) DoWork ¶
func (rm *ReplayMerge) DoWork() (workCount int, err error)
DoWork performs the work of replaying and merging. Should only be used if polling the underlying Image directly. Returns indication of work done processing the merge.
func (*ReplayMerge) HasFailed ¶
func (rm *ReplayMerge) HasFailed() bool
HasFailed returns if the replay merge failed due to an error?
func (*ReplayMerge) Image ¶
func (rm *ReplayMerge) Image() aeron.Image
Image returns the image which is a merge of the replay and live stream.
func (*ReplayMerge) IsLiveAdded ¶
func (rm *ReplayMerge) IsLiveAdded() bool
IsLiveAdded returns if the live destination added to the subscription.
func (*ReplayMerge) IsMerged ¶
func (rm *ReplayMerge) IsMerged() bool
IsMerged returns if the live stream merged and the replay stopped?
func (*ReplayMerge) Poll ¶
func (rm *ReplayMerge) Poll(fragmentHandler term.FragmentHandler, fragmentLimit int) (workCount int, err error)
Poll polls the Image used for replay and merging and live stream. The doWork method will be called before the poll so that processing of the merge can be done.
Returns number of fragments processed.
func (*ReplayMerge) Subscription ¶
func (rm *ReplayMerge) Subscription() *aeron.Subscription
Subscription returns the Subscription used to consume the replayed and merged stream.