Documentation
¶
Overview ¶
Package endpoint implements replication endpoints for use with package replication.
Index ¶
- Constants
- Variables
- func AbstractionEquals(a, b Abstraction) bool
- func BatchDestroy(ctx context.Context, abs []Abstraction) <-chan BatchDestroyResult
- func GetMostRecentReplicationCursorOfJob(ctx context.Context, fs string, jobID JobID) (*zfs.FilesystemVersion, error)
- func GetReplicationCursors(ctx context.Context, dp *zfs.DatasetPath, jobID JobID) ([]zfs.FilesystemVersion, error)
- func LastReceivedHoldTag(jobID JobID) (string, error)
- func ListAbstractions(ctx context.Context, query ListZFSHoldsAndBookmarksQuery) (out []Abstraction, outErrs []ListAbstractionsError, err error)
- func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmarksQuery) (<-chan Abstraction, <-chan ListAbstractionsError, error)
- func MoveLastReceivedHold(ctx context.Context, fs string, to zfs.FilesystemVersion, jobID JobID) error
- func RegisterMetrics(r prometheus.Registerer)
- func ReplicationCursorBookmarkName(fs string, guid uint64, id JobID) (string, error)
- func SendAbstractionsCacheInvalidate(fs string)
- func StepBookmarkName(fs string, guid uint64, id JobID) (string, error)
- func StepHoldTag(jobid JobID) (string, error)
- func TestClientIdentity(rootFS *zfs.DatasetPath, clientIdentity string) error
- type Abstraction
- func CreateLastReceivedHold(ctx context.Context, fs string, to zfs.FilesystemVersion, jobID JobID) (Abstraction, error)
- func CreateReplicationCursor(ctx context.Context, fs string, target zfs.FilesystemVersion, jobID JobID) (a Abstraction, err error)
- func HoldStep(ctx context.Context, fs string, v zfs.FilesystemVersion, jobID JobID) (Abstraction, error)
- func LastReceivedHoldExtractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion, holdTag string) Abstraction
- func ReplicationCursorV1Extractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion) (_ Abstraction)
- func ReplicationCursorV2Extractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion) (_ Abstraction)
- func StepBookmarkExtractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion) (_ Abstraction)
- func StepHoldExtractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion, holdTag string) Abstraction
- type AbstractionJSON
- type AbstractionType
- type AbstractionTypeSet
- type BatchDestroyResult
- type BookmarkExtractor
- type CreateTXGRange
- type CreateTXGRangeBound
- type FSFilter
- type FSMap
- type HoldExtractor
- type JobID
- func MakeJobID(s string) (JobID, error)
- func MustMakeJobID(s string) JobID
- func ParseLastReceivedHoldTag(tag string) (JobID, error)
- func ParseReplicationCursorBookmarkName(fullname string) (uint64, JobID, error)
- func ParseStepBookmarkName(fullname string) (guid uint64, jobID JobID, err error)
- func ParseStepHoldTag(tag string) (JobID, error)
- type ListAbstractionsError
- type ListAbstractionsErrors
- type ListStaleQueryError
- type ListZFSHoldsAndBookmarksQuery
- type ListZFSHoldsAndBookmarksQueryFilesystemFilter
- type Logger
- type Receiver
- func (s *Receiver) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error)
- func (s *Receiver) ListFilesystemVersions(ctx context.Context, req *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error)
- func (s *Receiver) ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error)
- func (s *Receiver) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error)
- func (s *Receiver) PingDataconn(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error)
- func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.ReadCloser) (*pdu.ReceiveRes, error)
- func (s *Receiver) ReplicationCursor(ctx context.Context, _ *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error)
- func (s *Receiver) Send(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error)
- func (p *Receiver) SendCompleted(ctx context.Context, _ *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error)
- func (s *Receiver) WaitForConnectivity(ctx context.Context) error
- type ReceiverConfig
- type ReplicationCursorV1
- func (c ReplicationCursorV1) Destroy(ctx context.Context) error
- func (c ReplicationCursorV1) GetFS() string
- func (c ReplicationCursorV1) GetFilesystemVersion() zfs.FilesystemVersion
- func (c ReplicationCursorV1) GetFullPath() string
- func (c ReplicationCursorV1) GetJobID() *JobID
- func (c ReplicationCursorV1) GetType() AbstractionType
- func (c ReplicationCursorV1) MarshalJSON() ([]byte, error)
- func (c ReplicationCursorV1) String() string
- type Sender
- func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error)
- func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error)
- func (s *Sender) ListFilesystems(ctx context.Context, r *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error)
- func (p *Sender) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error)
- func (p *Sender) PingDataconn(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error)
- func (p *Sender) Receive(ctx context.Context, r *pdu.ReceiveReq, _ io.ReadCloser) (*pdu.ReceiveRes, error)
- func (p *Sender) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error)
- func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error)
- func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error)
- func (p *Sender) WaitForConnectivity(ctx context.Context) error
- type SenderConfig
- type StalenessInfo
Constants ¶
const (
ClientIdentityKey contextKey = iota
)
const (
ReplicationCursorBookmarkNamePrefix = "zrepl_last_received_J_"
)
Variables ¶
var AbstractionTypesAll = map[AbstractionType]bool{ AbstractionStepBookmark: true, AbstractionStepHold: true, AbstractionLastReceivedHold: true, AbstractionReplicationCursorBookmarkV1: true, AbstractionReplicationCursorBookmarkV2: true, }
var ErrV1ReplicationCursor = fmt.Errorf("bookmark name is a v1-replication cursor")
Functions ¶
func AbstractionEquals ¶ added in v0.3.0
func AbstractionEquals(a, b Abstraction) bool
func BatchDestroy ¶ added in v0.3.0
func BatchDestroy(ctx context.Context, abs []Abstraction) <-chan BatchDestroyResult
func GetMostRecentReplicationCursorOfJob ¶ added in v0.3.0
func GetMostRecentReplicationCursorOfJob(ctx context.Context, fs string, jobID JobID) (*zfs.FilesystemVersion, error)
may return nil for both values, indicating there is no cursor
func GetReplicationCursors ¶ added in v0.3.0
func GetReplicationCursors(ctx context.Context, dp *zfs.DatasetPath, jobID JobID) ([]zfs.FilesystemVersion, error)
func LastReceivedHoldTag ¶ added in v0.3.0
func ListAbstractions ¶ added in v0.3.0
func ListAbstractions(ctx context.Context, query ListZFSHoldsAndBookmarksQuery) (out []Abstraction, outErrs []ListAbstractionsError, err error)
func ListAbstractionsStreamed ¶ added in v0.3.0
func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmarksQuery) (<-chan Abstraction, <-chan ListAbstractionsError, error)
if err != nil, the returned channels are both nil if err == nil, both channels must be fully drained by the caller to avoid leaking goroutines
func MoveLastReceivedHold ¶
func RegisterMetrics ¶ added in v0.3.0
func RegisterMetrics(r prometheus.Registerer)
func ReplicationCursorBookmarkName ¶ added in v0.3.0
func SendAbstractionsCacheInvalidate ¶
func SendAbstractionsCacheInvalidate(fs string)
func StepBookmarkName ¶
v must be validated by caller
func StepHoldTag ¶ added in v0.3.0
func TestClientIdentity ¶
func TestClientIdentity(rootFS *zfs.DatasetPath, clientIdentity string) error
Types ¶
type Abstraction ¶ added in v0.3.0
type Abstraction interface { GetType() AbstractionType GetFS() string GetName() string GetFullPath() string GetJobID() *JobID // may return nil if the abstraction does not have a JobID GetCreateTXG() uint64 GetFilesystemVersion() zfs.FilesystemVersion String() string // destroy the abstraction: either releases the hold or destroys the bookmark Destroy(context.Context) error json.Marshaler }
Implementation Note: Whenever you add a new accessor, adjust AbstractionJSON.MarshalJSON accordingly
func CreateLastReceivedHold ¶ added in v0.3.0
func CreateLastReceivedHold(ctx context.Context, fs string, to zfs.FilesystemVersion, jobID JobID) (Abstraction, error)
func CreateReplicationCursor ¶ added in v0.3.0
func CreateReplicationCursor(ctx context.Context, fs string, target zfs.FilesystemVersion, jobID JobID) (a Abstraction, err error)
idempotently create a replication cursor targeting `target`
returns ErrBookmarkCloningNotSupported if version is a bookmark and bookmarking bookmarks is not supported by ZFS
func HoldStep ¶ added in v0.3.0
func HoldStep(ctx context.Context, fs string, v zfs.FilesystemVersion, jobID JobID) (Abstraction, error)
idempotently hold / step-bookmark `version`
returns ErrBookmarkCloningNotSupported if version is a bookmark and bookmarking bookmarks is not supported by ZFS
func LastReceivedHoldExtractor ¶ added in v0.3.0
func LastReceivedHoldExtractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion, holdTag string) Abstraction
func ReplicationCursorV1Extractor ¶ added in v0.3.0
func ReplicationCursorV1Extractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion) (_ Abstraction)
func ReplicationCursorV2Extractor ¶ added in v0.3.0
func ReplicationCursorV2Extractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion) (_ Abstraction)
func StepBookmarkExtractor ¶
func StepBookmarkExtractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion) (_ Abstraction)
func StepHoldExtractor ¶ added in v0.3.0
func StepHoldExtractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion, holdTag string) Abstraction
type AbstractionJSON ¶ added in v0.3.0
type AbstractionJSON struct{ Abstraction }
func (AbstractionJSON) MarshalJSON ¶ added in v0.3.0
func (a AbstractionJSON) MarshalJSON() ([]byte, error)
type AbstractionType ¶ added in v0.3.0
type AbstractionType string
const ( AbstractionStepBookmark AbstractionType = "step-bookmark" AbstractionStepHold AbstractionType = "step-hold" AbstractionLastReceivedHold AbstractionType = "last-received-hold" AbstractionReplicationCursorBookmarkV1 AbstractionType = "replication-cursor-bookmark-v1" AbstractionReplicationCursorBookmarkV2 AbstractionType = "replication-cursor-bookmark-v2" )
Implementation note: There are a lot of exhaustive switches on AbstractionType in the code base. When adding a new abstraction type, make sure to search and update them!
func (AbstractionType) BookmarkExtractor ¶ added in v0.3.0
func (t AbstractionType) BookmarkExtractor() BookmarkExtractor
returns nil if the abstraction type is not bookmark-based
func (AbstractionType) HoldExtractor ¶ added in v0.3.0
func (t AbstractionType) HoldExtractor() HoldExtractor
returns nil if the abstraction type is not hold-based
func (AbstractionType) MustValidate ¶ added in v0.3.0
func (t AbstractionType) MustValidate() error
func (AbstractionType) Validate ¶ added in v0.3.0
func (t AbstractionType) Validate() error
type AbstractionTypeSet ¶ added in v0.3.0
type AbstractionTypeSet map[AbstractionType]bool
func AbstractionTypeSetFromStrings ¶ added in v0.3.0
func AbstractionTypeSetFromStrings(sts []string) (AbstractionTypeSet, error)
func (AbstractionTypeSet) ContainsAll ¶ added in v0.3.0
func (s AbstractionTypeSet) ContainsAll(q AbstractionTypeSet) bool
func (AbstractionTypeSet) ContainsAnyOf ¶ added in v0.3.0
func (s AbstractionTypeSet) ContainsAnyOf(q AbstractionTypeSet) bool
func (AbstractionTypeSet) String ¶ added in v0.3.0
func (s AbstractionTypeSet) String() string
func (AbstractionTypeSet) Validate ¶ added in v0.3.0
func (s AbstractionTypeSet) Validate() error
type BatchDestroyResult ¶ added in v0.3.0
type BatchDestroyResult struct { Abstraction DestroyErr error }
func (BatchDestroyResult) MarshalJSON ¶ added in v0.3.0
func (r BatchDestroyResult) MarshalJSON() ([]byte, error)
type BookmarkExtractor ¶ added in v0.3.0
type BookmarkExtractor func(fs *zfs.DatasetPath, v zfs.FilesystemVersion) Abstraction
type CreateTXGRange ¶ added in v0.3.0
type CreateTXGRange struct { // if not nil: The hold's snapshot or the bookmark's createtxg must be greater than (or equal) Since // else: CreateTXG of the hold or bookmark can be any value accepted by Until Since *CreateTXGRangeBound // if not nil: The hold's snapshot or the bookmark's createtxg must be less than (or equal) Until // else: CreateTXG of the hold or bookmark can be any value accepted by Since Until *CreateTXGRangeBound }
A non-empty range of CreateTXGs
If both Since and Until are nil, any CreateTXG is acceptable
func (*CreateTXGRange) Contains ¶ added in v0.3.0
func (r *CreateTXGRange) Contains(qCreateTxg uint64) bool
panics if not .Validate()
func (*CreateTXGRange) IsUnbounded ¶ added in v0.3.0
func (r *CreateTXGRange) IsUnbounded() bool
panics if not .Validate()
func (*CreateTXGRange) String ¶ added in v0.3.0
func (r *CreateTXGRange) String() string
func (*CreateTXGRange) Validate ¶ added in v0.3.0
func (r *CreateTXGRange) Validate() error
type CreateTXGRangeBound ¶ added in v0.3.0
func (*CreateTXGRangeBound) Validate ¶ added in v0.3.0
func (i *CreateTXGRangeBound) Validate() error
type FSMap ¶
type FSMap interface { FSFilter Map(path *zfs.DatasetPath) (*zfs.DatasetPath, error) Invert() (FSMap, error) AsFilter() FSFilter }
FIXME: can we get away without error types here?
type HoldExtractor ¶ added in v0.3.0
type HoldExtractor = func(fs *zfs.DatasetPath, v zfs.FilesystemVersion, tag string) Abstraction
type JobID ¶ added in v0.3.0
type JobID struct {
// contains filtered or unexported fields
}
JobID instances returned by MakeJobID() guarantee their JobID.String() can be used in ZFS dataset names and hold tags.
func MustMakeJobID ¶ added in v0.3.0
func ParseLastReceivedHoldTag ¶ added in v0.3.0
err != nil always means that the bookmark is not a step bookmark
func ParseReplicationCursorBookmarkName ¶ added in v0.3.0
err != nil always means that the bookmark is not a valid replication bookmark
Returns ErrV1ReplicationCursor as error if the bookmark is a v1 replication cursor
func ParseStepBookmarkName ¶
name is the full bookmark name, including dataset path
err != nil always means that the bookmark is not a step bookmark
func ParseStepHoldTag ¶ added in v0.3.0
err != nil always means that the bookmark is not a step bookmark
func (JobID) MarshalJSON ¶ added in v0.3.0
func (JobID) MustValidate ¶ added in v0.3.0
func (j JobID) MustValidate()
func (*JobID) UnmarshalJSON ¶ added in v0.3.0
type ListAbstractionsError ¶ added in v0.3.0
func (ListAbstractionsError) Error ¶ added in v0.3.0
func (e ListAbstractionsError) Error() string
type ListAbstractionsErrors ¶ added in v0.3.0
type ListAbstractionsErrors []ListAbstractionsError
func (ListAbstractionsErrors) Error ¶ added in v0.3.0
func (e ListAbstractionsErrors) Error() string
type ListStaleQueryError ¶ added in v0.3.0
type ListStaleQueryError struct {
// contains filtered or unexported fields
}
type ListZFSHoldsAndBookmarksQuery ¶ added in v0.3.0
type ListZFSHoldsAndBookmarksQuery struct { FS ListZFSHoldsAndBookmarksQueryFilesystemFilter // What abstraction types should match (any contained in the set) What AbstractionTypeSet // if not nil: JobID of the hold or bookmark in question must be equal // else: JobID of the hold or bookmark can be any value JobID *JobID // zero-value means any CreateTXG is acceptable CreateTXG CreateTXGRange // Number of concurrently queried filesystems. Must be >= 1 Concurrency int64 }
func (*ListZFSHoldsAndBookmarksQuery) Validate ¶ added in v0.3.0
func (q *ListZFSHoldsAndBookmarksQuery) Validate() error
type ListZFSHoldsAndBookmarksQueryFilesystemFilter ¶ added in v0.3.0
type ListZFSHoldsAndBookmarksQueryFilesystemFilter struct { FS *string Filter zfs.DatasetFilter }
FS == nil XOR Filter == nil
func (*ListZFSHoldsAndBookmarksQueryFilesystemFilter) Filesystems ¶ added in v0.3.0
func (f *ListZFSHoldsAndBookmarksQueryFilesystemFilter) Filesystems(ctx context.Context) ([]string, error)
func (*ListZFSHoldsAndBookmarksQueryFilesystemFilter) Validate ¶ added in v0.3.0
func (f *ListZFSHoldsAndBookmarksQueryFilesystemFilter) Validate() error
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
Receiver implements replication.ReplicationEndpoint for a receiving side
func NewReceiver ¶
func NewReceiver(config ReceiverConfig) *Receiver
func (*Receiver) DestroySnapshots ¶
func (s *Receiver) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error)
func (*Receiver) ListFilesystemVersions ¶
func (s *Receiver) ListFilesystemVersions(ctx context.Context, req *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error)
func (*Receiver) ListFilesystems ¶
func (s *Receiver) ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error)
func (*Receiver) PingDataconn ¶
func (*Receiver) Receive ¶
func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.ReadCloser) (*pdu.ReceiveRes, error)
func (*Receiver) ReplicationCursor ¶
func (s *Receiver) ReplicationCursor(ctx context.Context, _ *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error)
func (*Receiver) SendCompleted ¶ added in v0.3.0
func (p *Receiver) SendCompleted(ctx context.Context, _ *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error)
type ReceiverConfig ¶ added in v0.3.0
type ReceiverConfig struct { JobID JobID RootWithoutClientComponent *zfs.DatasetPath // TODO use AppendClientIdentity bool UpdateLastReceivedHold bool }
func (*ReceiverConfig) Validate ¶ added in v0.3.0
func (c *ReceiverConfig) Validate() error
type ReplicationCursorV1 ¶ added in v0.3.0
type ReplicationCursorV1 struct { Type AbstractionType FS string zfs.FilesystemVersion }
func (ReplicationCursorV1) Destroy ¶ added in v0.3.0
func (c ReplicationCursorV1) Destroy(ctx context.Context) error
func (ReplicationCursorV1) GetFS ¶ added in v0.3.0
func (c ReplicationCursorV1) GetFS() string
func (ReplicationCursorV1) GetFilesystemVersion ¶ added in v0.3.0
func (c ReplicationCursorV1) GetFilesystemVersion() zfs.FilesystemVersion
func (ReplicationCursorV1) GetFullPath ¶ added in v0.3.0
func (c ReplicationCursorV1) GetFullPath() string
func (ReplicationCursorV1) GetJobID ¶ added in v0.3.0
func (c ReplicationCursorV1) GetJobID() *JobID
func (ReplicationCursorV1) GetType ¶ added in v0.3.0
func (c ReplicationCursorV1) GetType() AbstractionType
func (ReplicationCursorV1) MarshalJSON ¶ added in v0.3.0
func (c ReplicationCursorV1) MarshalJSON() ([]byte, error)
func (ReplicationCursorV1) String ¶ added in v0.3.0
func (c ReplicationCursorV1) String() string
type Sender ¶
type Sender struct { FSFilter zfs.DatasetFilter // contains filtered or unexported fields }
Sender implements replication.ReplicationEndpoint for a sending side
func NewSender ¶
func NewSender(conf SenderConfig) *Sender
func (*Sender) DestroySnapshots ¶
func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error)
func (*Sender) ListFilesystemVersions ¶
func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error)
func (*Sender) ListFilesystems ¶
func (s *Sender) ListFilesystems(ctx context.Context, r *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error)
func (*Sender) PingDataconn ¶
func (*Sender) Receive ¶
func (p *Sender) Receive(ctx context.Context, r *pdu.ReceiveReq, _ io.ReadCloser) (*pdu.ReceiveRes, error)
func (*Sender) ReplicationCursor ¶
func (p *Sender) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error)
func (*Sender) SendCompleted ¶ added in v0.3.0
func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error)
type SenderConfig ¶ added in v0.3.0
type SenderConfig struct { FSF zfs.DatasetFilter Encrypt *zfs.NilBool DisableIncrementalStepHolds bool JobID JobID }
func (*SenderConfig) Validate ¶ added in v0.3.0
func (c *SenderConfig) Validate() error
type StalenessInfo ¶ added in v0.3.0
type StalenessInfo struct { ConstructedWithQuery ListZFSHoldsAndBookmarksQuery Live []Abstraction Stale []Abstraction }
func ListStale ¶ added in v0.3.0
func ListStale(ctx context.Context, q ListZFSHoldsAndBookmarksQuery) (*StalenessInfo, error)
returns *ListStaleQueryError if the given query cannot be used for determining staleness info