README ¶
What
This README explains how to add new Archiver implementations.
Steps
Step 1: Create a new package for your implementation
Create a new directory in the archiver
folder. The structure should look like the following:
./common/archiver
- filestore/ -- Filestore implementation
- provider/
- provider.go -- Provider of archiver instances
- yourImplementation/
- historyArchiver.go -- HistoryArchiver implementation
- historyArchiver_test.go -- Unit tests for HistoryArchiver
- visibilityArchiver.go -- VisibilityArchiver implementations
- visibilityArchiver_test.go -- Unit tests for VisibilityArchiver
Step 2: Implement the HistoryArchiver interface
type HistoryArchiver interface {
// Archive is used to archive a workflow history. When the context expires the method should stop trying to archive.
// Implementors are free to archive however they want, including implementing retries of sub-operations. The URI defines
// the resource that histories should be archived into. The implementor gets to determine how to interpret the URI.
// The Archive method may or may not be automatically retried by the caller. The ArchiveOptions are used
// to interact with these retries including giving the implementor the ability to cancel retries and record progress
// between retry attempts.
// This method will be invoked after a workflow passes its retention period.
Archive(context.Context, URI, *ArchiveHistoryRequest, ...ArchiveOption) error
// Get is used to access an archived history. When context expires method should stop trying to fetch history.
// The URI identifies the resource from which history should be accessed and it is up to the implementor to interpret this URI.
// This method should thrift errors - see filestore as an example.
Get(context.Context, URI, *GetHistoryRequest) (*GetHistoryResponse, error)
// ValidateURI is used to define what a valid URI for an implementation is.
ValidateURI(URI) error
}
Step 3: Implement the VisibilityArchiver interface
type VisibilityArchiver interface {
// Archive is used to archive one workflow visibility record.
// Check the Archive() method of the HistoryArchiver interface in Step 2 for parameters' meaning and requirements.
// The only difference is that the ArchiveOption parameter won't include an option for recording process.
// Please make sure your implementation is lossless. If any in-memory batching mechanism is used, then those batched records will be lost during server restarts.
// This method will be invoked when workflow closes. Note that because of conflict resolution, it is possible for a workflow to through the closing process multiple times, which means that this method can be invoked more than once after a workflow closes.
Archive(context.Context, URI, *ArchiveVisibilityRequest, ...ArchiveOption) error
// Query is used to retrieve archived visibility records.
// Check the Get() method of the HistoryArchiver interface in Step 2 for parameters' meaning and requirements.
// The request includes a string field called query, which describes what kind of visibility records should be returned. For example, it can be some SQL-like syntax query string.
// Your implementation is responsible for parsing and validating the query, and also returning all visibility records that match the query.
// Currently the maximum context timeout passed into the method is 3 minutes, so it's ok if this method takes a long time to run.
Query(context.Context, URI, *QueryVisibilityRequest) (*QueryVisibilityResponse, error)
// ValidateURI is used to define what a valid URI for an implementation is.
ValidateURI(URI) error
}
Step 4: Update provider to provide access to your implementation
Modify the ./provider/provider.go
file so that the ArchiverProvider
knows how to create an instance of your archiver.
Also, add configs for you archiver to static yaml config files and modify the HistoryArchiverProvider
and VisibilityArchiverProvider
struct in the ../common/service/config.go
accordingly.
FAQ
If my Archive method can automatically be retried by caller how can I record and access progress between retries?
ArchiverOptions is used to handle this. The following shows and example:
func (a *Archiver) Archive(
ctx context.Context,
URI string,
request *ArchiveRequest,
opts ...ArchiveOption,
) error {
featureCatalog := GetFeatureCatalog(opts...) // this function is defined in options.go
var progress progress
// Check if the feature for recording progress is enabled.
if featureCatalog.ProgressManager != nil {
if err := featureCatalog.ProgressManager.LoadProgress(ctx, &prevProgress); err != nil {
// log some error message and return error if needed.
}
}
// Your archiver implementation...
// Record current progress
if featureCatalog.ProgressManager != nil {
if err := featureCatalog.ProgressManager.RecordProgress(ctx, progress); err != nil {
// log some error message and return error if needed.
}
}
}
If my Archive method encounters an error which is non-retryable how do I indicate that the caller should not retry?
func (a *Archiver) Archive(
ctx context.Context,
URI string,
request *ArchiveRequest,
opts ...ArchiveOption,
) error {
featureCatalog := GetFeatureCatalog(opts...) // this function is defined in options.go
err := youArchiverImpl()
if nonRetryableErr(err) {
if featureCatalog.NonRetryableError != nil {
return featureCatalog.NonRetryableError() // when the caller gets this error type back it will not retry anymore.
}
}
}
How does my history archiver implementation read history?
The archiver
package provides a utility class called HistoryIterator
which is a wrapper of HistoryManager
.
Its usage is simpler than the HistoryManager
given in the BootstrapContainer
,
so archiver implementations can choose to use it when reading workflow histories.
See the historyIterator.go
file for more details.
Sample usage can be found in the filestore historyArchiver implementation.
Should my archiver define all its own error types?
Each archiver is free to define and return any errors it wants. However many common errors which
exist between archivers are already defined in constants.go
.
Is there a generic query syntax for visibility archiver?
Currently no. But this is something we plan to do in the future. As for now, try to make your syntax similar to the one used by our advanced list workflow API.
Documentation ¶
Overview ¶
Package archiver is a generated GoMock package.
Index ¶
- Constants
- Variables
- func ConvertSearchAttrToPayload(searchAttrStr map[string]string) map[string]*commonpb.Payload
- func TagLoggerWithArchiveHistoryRequestAndURI(logger log.Logger, request *ArchiveHistoryRequest, URI string) log.Logger
- func TagLoggerWithArchiveVisibilityRequestAndURI(logger log.Logger, request *archiverspb.ArchiveVisibilityRequest, URI string) log.Logger
- func ValidateGetRequest(request *GetHistoryRequest) error
- func ValidateHistoryArchiveRequest(request *ArchiveHistoryRequest) error
- func ValidateQueryRequest(request *QueryVisibilityRequest) error
- func ValidateVisibilityArchivalRequest(request *archiverspb.ArchiveVisibilityRequest) error
- type ArchivalConfig
- type ArchivalMetadata
- type ArchivalState
- type ArchiveFeatureCatalog
- type ArchiveHistoryRequest
- type ArchiveOption
- type GetHistoryRequest
- type GetHistoryResponse
- type HistoryArchiver
- type HistoryArchiverMock
- type HistoryBootstrapContainer
- type HistoryIterator
- type MockArchivalMetadata
- type MockHistoryIterator
- type MockHistoryIteratorMockRecorder
- type NonRetryableError
- type ProgressManager
- type QueryVisibilityRequest
- type QueryVisibilityResponse
- type SizeEstimator
- type URI
- type VisibilityArchiver
- type VisibilityArchiverMock
- func (_m *VisibilityArchiverMock) Archive(_a0 context.Context, _a1 URI, _a2 *archiverspb.ArchiveVisibilityRequest, ...) error
- func (_m *VisibilityArchiverMock) Query(_a0 context.Context, _a1 URI, _a2 *QueryVisibilityRequest) (*QueryVisibilityResponse, error)
- func (_m *VisibilityArchiverMock) ValidateURI(uri URI) error
- type VisibilityBootstrapContainer
Constants ¶
const ( // ArchiveNonRetryableErrorMsg is the log message when the Archive() method encounters a non-retryable error ArchiveNonRetryableErrorMsg = "Archive method encountered an non-retryable error." // ArchiveTransientErrorMsg is the log message when the Archive() method encounters a transient error ArchiveTransientErrorMsg = "Archive method encountered a transient error." // ErrReasonInvalidURI is the error reason for invalid URI ErrReasonInvalidURI = "URI is invalid" // ErrReasonInvalidArchiveRequest is the error reason for invalid archive request ErrReasonInvalidArchiveRequest = "archive request is invalid" // ErrReasonConstructHistoryIterator is the error reason for failing to construct history iterator ErrReasonConstructHistoryIterator = "failed to construct history iterator" // ErrReasonReadHistory is the error reason for failing to read history ErrReasonReadHistory = "failed to read history batches" // ErrReasonHistoryMutated is the error reason for mutated history ErrReasonHistoryMutated = "history was mutated" )
Variables ¶
var ( // ErrInvalidURI is the error for invalid URI ErrInvalidURI = errors.New("URI is invalid") // ErrURISchemeMismatch is the error for mismatch between URI scheme and archiver ErrURISchemeMismatch = errors.New("URI scheme does not match the archiver") // ErrHistoryMutated is the error for mutated history ErrHistoryMutated = errors.New("history was mutated") // ErrContextTimeout is the error for context timeout ErrContextTimeout = errors.New("archive aborted because context timed out") // ErrInvalidGetHistoryRequest is the error for invalid GetHistory request ErrInvalidGetHistoryRequest = errors.New("get archived history request is invalid") // ErrInvalidQueryVisibilityRequest is the error for invalid Query Visibility request ErrInvalidQueryVisibilityRequest = errors.New("query visiblity request is invalid") // ErrNextPageTokenCorrupted is the error for corrupted GetHistory token ErrNextPageTokenCorrupted = errors.New("next page token is corrupted") // ErrHistoryNotExist is the error for non-exist history ErrHistoryNotExist = errors.New("requested workflow history does not exist") )
Functions ¶
func ConvertSearchAttrToPayload ¶ added in v0.27.0
ConvertSearchAttrToPayload converts search attribute value from string back to byte array
func TagLoggerWithArchiveHistoryRequestAndURI ¶ added in v0.27.0
func TagLoggerWithArchiveHistoryRequestAndURI(logger log.Logger, request *ArchiveHistoryRequest, URI string) log.Logger
TagLoggerWithArchiveHistoryRequestAndURI tags logger with fields in the archive history request and the URI
func TagLoggerWithArchiveVisibilityRequestAndURI ¶ added in v0.27.0
func TagLoggerWithArchiveVisibilityRequestAndURI(logger log.Logger, request *archiverspb.ArchiveVisibilityRequest, URI string) log.Logger
TagLoggerWithArchiveVisibilityRequestAndURI tags logger with fields in the archive visibility request and the URI
func ValidateGetRequest ¶ added in v0.27.0
func ValidateGetRequest(request *GetHistoryRequest) error
ValidateGetRequest validates the get archived history request
func ValidateHistoryArchiveRequest ¶ added in v0.27.0
func ValidateHistoryArchiveRequest(request *ArchiveHistoryRequest) error
ValidateHistoryArchiveRequest validates the archive history request
func ValidateQueryRequest ¶ added in v0.27.0
func ValidateQueryRequest(request *QueryVisibilityRequest) error
ValidateQueryRequest validates the query visibility request
func ValidateVisibilityArchivalRequest ¶ added in v0.27.0
func ValidateVisibilityArchivalRequest(request *archiverspb.ArchiveVisibilityRequest) error
ValidateVisibilityArchivalRequest validates the archive visibility request
Types ¶
type ArchivalConfig ¶ added in v0.7.0
type ArchivalConfig interface { ClusterConfiguredForArchival() bool GetClusterState() ArchivalState ReadEnabled() bool GetNamespaceDefaultState() enumspb.ArchivalState GetNamespaceDefaultURI() string }
ArchivalConfig is an immutable representation of the archival configuration of the cluster This config is determined at cluster startup time
func NewArchivalConfig ¶ added in v0.7.0
func NewArchivalConfig( staticClusterStateStr string, dynamicClusterState dynamicconfig.StringPropertyFn, enableRead dynamicconfig.BoolPropertyFn, namespaceDefaultStateStr string, namespaceDefaultURI string, ) ArchivalConfig
NewArchivalConfig constructs a new valid ArchivalConfig
func NewDisabledArchvialConfig ¶ added in v0.8.0
func NewDisabledArchvialConfig() ArchivalConfig
NewDisabledArchvialConfig returns a disabled ArchivalConfig
type ArchivalMetadata ¶ added in v0.7.0
type ArchivalMetadata interface { GetHistoryConfig() ArchivalConfig GetVisibilityConfig() ArchivalConfig }
ArchivalMetadata provides cluster level archival information
func NewArchivalMetadata ¶ added in v0.7.0
func NewArchivalMetadata( dc *dynamicconfig.Collection, historyState string, historyReadEnabled bool, visibilityState string, visibilityReadEnabled bool, namespaceDefaults *config.ArchivalNamespaceDefaults, ) ArchivalMetadata
NewArchivalMetadata constructs a new ArchivalMetadata
type ArchivalState ¶ added in v0.27.0
type ArchivalState int
ArchivalState represents the archival state of the cluster
const ( // ArchivalDisabled means this cluster is not configured to handle archival ArchivalDisabled ArchivalState = iota // ArchivalPaused means this cluster is configured to handle archival but is currently not archiving // This state is not yet implemented, as of now ArchivalPaused is treated the same way as ArchivalDisabled ArchivalPaused // ArchivalEnabled means this cluster is currently archiving ArchivalEnabled )
type ArchiveFeatureCatalog ¶
type ArchiveFeatureCatalog struct { ProgressManager ProgressManager NonRetryableError NonRetryableError }
ArchiveFeatureCatalog is a collection features for the Archive method of History/Visibility Archiver
func GetFeatureCatalog ¶
func GetFeatureCatalog(opts ...ArchiveOption) *ArchiveFeatureCatalog
GetFeatureCatalog applies all the ArchiveOptions to the catalog and returns the catalog. It should be called inside the Archive method.
type ArchiveHistoryRequest ¶
type ArchiveHistoryRequest struct { ShardID int32 NamespaceID string Namespace string WorkflowID string RunID string BranchToken []byte NextEventID int64 CloseFailoverVersion int64 }
ArchiveHistoryRequest is request to Archive workflow history
type ArchiveOption ¶
type ArchiveOption func(featureCatalog *ArchiveFeatureCatalog)
ArchiveOption is used to provide options for adding features to the Archive method of History/Visibility Archiver
func GetHeartbeatArchiveOption ¶
func GetHeartbeatArchiveOption() ArchiveOption
GetHeartbeatArchiveOption returns an ArchiveOption for enabling heartbeating. It should be used when the Archive method is invoked inside an activity.
func GetNonRetryableErrorOption ¶ added in v0.27.0
func GetNonRetryableErrorOption(nonRetryableErr error) ArchiveOption
GetNonRetryableErrorOption returns an ArchiveOption so that archiver knows what should be returned when an non-retryable error is encountered.
type GetHistoryRequest ¶
type GetHistoryRequest struct { NamespaceID string WorkflowID string RunID string CloseFailoverVersion *int64 NextPageToken []byte PageSize int }
GetHistoryRequest is the request to Get archived history
type GetHistoryResponse ¶
GetHistoryResponse is the response of Get archived history
type HistoryArchiver ¶
type HistoryArchiver interface { // Archive is used to archive a Workflow's history. When the context expires the method should stop trying to archive. // Implementors are free to archive however they want, including implementing retries of sub-operations. The URI defines // the resource that histories should be archived into. The implementor gets to determine how to interpret the URI. // The Archive method may or may not be automatically retried by the caller. ArchiveOptions are used // to interact with these retries including giving the implementor the ability to cancel retries and record progress // between retry attempts. // This method will be invoked after a workflow passes its retention period. Archive(context.Context, URI, *ArchiveHistoryRequest, ...ArchiveOption) error // Get is used to access an archived history. When context expires this method should stop trying to fetch history. // The URI identifies the resource from which history should be accessed and it is up to the implementor to interpret this URI. // This method should emit api service errors - see the filestore as an example. Get(context.Context, URI, *GetHistoryRequest) (*GetHistoryResponse, error) // ValidateURI is used to define what a valid URI for an implementation is. ValidateURI(URI) error }
HistoryArchiver is used to archive history and read archived history
type HistoryArchiverMock ¶ added in v0.7.0
HistoryArchiverMock is an autogenerated mock type for the HistoryArchiver type
func (*HistoryArchiverMock) Archive ¶ added in v0.7.0
func (_m *HistoryArchiverMock) Archive(ctx context.Context, uri URI, request *ArchiveHistoryRequest, opts ...ArchiveOption) error
Archive provides a mock function with given fields: ctx, uri, request, opts
func (*HistoryArchiverMock) Get ¶ added in v0.7.0
func (_m *HistoryArchiverMock) Get(ctx context.Context, uri URI, request *GetHistoryRequest) (*GetHistoryResponse, error)
Get provides a mock function with given fields: ctx, uri, request
func (*HistoryArchiverMock) ValidateURI ¶ added in v0.7.0
func (_m *HistoryArchiverMock) ValidateURI(uri URI) error
ValidateURI provides a mock function with given fields: uri
type HistoryBootstrapContainer ¶
type HistoryBootstrapContainer struct { HistoryV2Manager persistence.HistoryManager Logger log.Logger MetricsClient metrics.Client ClusterMetadata cluster.Metadata NamespaceCache cache.NamespaceCache }
HistoryBootstrapContainer contains components needed by all history Archiver implementations
type HistoryIterator ¶
type HistoryIterator interface { Next() (*archiverspb.HistoryBlob, error) HasNext() bool GetState() ([]byte, error) }
HistoryIterator is used to get history batches
func NewHistoryIterator ¶
func NewHistoryIterator( request *ArchiveHistoryRequest, historyV2Manager persistence.HistoryManager, targetHistoryBlobSize int, ) HistoryIterator
NewHistoryIterator returns a new HistoryIterator
func NewHistoryIteratorFromState ¶ added in v0.7.0
func NewHistoryIteratorFromState( request *ArchiveHistoryRequest, historyV2Manager persistence.HistoryManager, targetHistoryBlobSize int, initialState []byte, ) (HistoryIterator, error)
NewHistoryIteratorFromState returns a new HistoryIterator with specified state
type MockArchivalMetadata ¶ added in v0.7.0
MockArchivalMetadata is an autogenerated mock type for the ArchivalMetadata type
func (*MockArchivalMetadata) GetHistoryConfig ¶ added in v0.7.0
func (_m *MockArchivalMetadata) GetHistoryConfig() ArchivalConfig
GetHistoryConfig provides a mock function with given fields:
func (*MockArchivalMetadata) GetVisibilityConfig ¶ added in v0.7.0
func (_m *MockArchivalMetadata) GetVisibilityConfig() ArchivalConfig
GetVisibilityConfig provides a mock function with given fields:
type MockHistoryIterator ¶
type MockHistoryIterator struct {
// contains filtered or unexported fields
}
MockHistoryIterator is a mock of HistoryIterator interface
func NewMockHistoryIterator ¶
func NewMockHistoryIterator(ctrl *gomock.Controller) *MockHistoryIterator
NewMockHistoryIterator creates a new mock instance
func (*MockHistoryIterator) EXPECT ¶
func (m *MockHistoryIterator) EXPECT() *MockHistoryIteratorMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockHistoryIterator) GetState ¶
func (m *MockHistoryIterator) GetState() ([]byte, error)
GetState mocks base method
func (*MockHistoryIterator) HasNext ¶
func (m *MockHistoryIterator) HasNext() bool
HasNext mocks base method
func (*MockHistoryIterator) Next ¶
func (m *MockHistoryIterator) Next() (*archiverspb.HistoryBlob, error)
Next mocks base method
type MockHistoryIteratorMockRecorder ¶
type MockHistoryIteratorMockRecorder struct {
// contains filtered or unexported fields
}
MockHistoryIteratorMockRecorder is the mock recorder for MockHistoryIterator
func (*MockHistoryIteratorMockRecorder) GetState ¶
func (mr *MockHistoryIteratorMockRecorder) GetState() *gomock.Call
GetState indicates an expected call of GetState
func (*MockHistoryIteratorMockRecorder) HasNext ¶
func (mr *MockHistoryIteratorMockRecorder) HasNext() *gomock.Call
HasNext indicates an expected call of HasNext
func (*MockHistoryIteratorMockRecorder) Next ¶
func (mr *MockHistoryIteratorMockRecorder) Next() *gomock.Call
Next indicates an expected call of Next
type NonRetryableError ¶ added in v0.27.0
type NonRetryableError func() error
NonRetryableError returns an error indicating archiver has encountered an non-retryable error
type ProgressManager ¶
type ProgressManager interface { RecordProgress(ctx context.Context, progress interface{}) error LoadProgress(ctx context.Context, valuePtr interface{}) error HasProgress(ctx context.Context) bool }
ProgressManager is used to record and load archive progress
type QueryVisibilityRequest ¶ added in v0.27.0
type QueryVisibilityRequest struct { NamespaceID string PageSize int NextPageToken []byte Query string }
QueryVisibilityRequest is the request to query archived visibility records
type QueryVisibilityResponse ¶ added in v0.27.0
type QueryVisibilityResponse struct { Executions []*workflowpb.WorkflowExecutionInfo NextPageToken []byte }
QueryVisibilityResponse is the response of querying archived visibility records
type SizeEstimator ¶
SizeEstimator is used to estimate the size of any object
func NewJSONSizeEstimator ¶
func NewJSONSizeEstimator() SizeEstimator
NewJSONSizeEstimator returns a new SizeEstimator which uses json encoding to estimate size
type URI ¶ added in v0.7.0
type URI interface { Scheme() string Path() string Hostname() string Port() string Username() string Password() string String() string Opaque() string Query() map[string][]string }
URI identifies the archival resource to which records are written to and read from.
type VisibilityArchiver ¶
type VisibilityArchiver interface { // Archive is used to archive one Workflow visibility record. // Check the Archive method of the HistoryArchiver interface for parameters' meaning and requirements. // The only difference is that the ArchiveOption parameter won't include an option for recording process. // Please make sure your implementation is lossless. If any in-memory batching mechanism is used // then those batched records will be lost during server restarts. This method will be invoked when the Workflow closes. // Note that because of conflict resolution, it is possible for a Workflow to through the closing process multiple times, // which means that this method can be invoked more than once after a Workflow closes. Archive(context.Context, URI, *archiverspb.ArchiveVisibilityRequest, ...ArchiveOption) error // Query is used to retrieve archived visibility records. // Check the Get() method of the HistoryArchiver interface in Step 2 for parameters' meaning and requirements. // The request includes a string field called query, which describes what kind of visibility records should be returned. // For example, it can be some SQL-like syntax query string. // Your implementation is responsible for parsing and validating the query, and also returning all visibility records that match the query. // Currently the maximum context timeout passed into the method is 3 minutes, so it's accetable if this method takes some time to run. Query(context.Context, URI, *QueryVisibilityRequest) (*QueryVisibilityResponse, error) // ValidateURI is used to define what a valid URI for an implementation is. ValidateURI(URI) error }
VisibilityArchiver is used to archive visibility and read archived visibility
type VisibilityArchiverMock ¶ added in v0.7.0
VisibilityArchiverMock is an autogenerated mock type for the VisibilityArchiver type
func (*VisibilityArchiverMock) Archive ¶ added in v0.27.0
func (_m *VisibilityArchiverMock) Archive(_a0 context.Context, _a1 URI, _a2 *archiverspb.ArchiveVisibilityRequest, _a3 ...ArchiveOption) error
Archive provides a mock function with given fields: _a0, _a1, _a2, _a3
func (*VisibilityArchiverMock) Query ¶ added in v0.27.0
func (_m *VisibilityArchiverMock) Query(_a0 context.Context, _a1 URI, _a2 *QueryVisibilityRequest) (*QueryVisibilityResponse, error)
Query provides a mock function with given fields: _a0, _a1, _a2
func (*VisibilityArchiverMock) ValidateURI ¶ added in v0.7.0
func (_m *VisibilityArchiverMock) ValidateURI(uri URI) error
ValidateURI provides a mock function with given fields: uri
type VisibilityBootstrapContainer ¶
type VisibilityBootstrapContainer struct { Logger log.Logger MetricsClient metrics.Client ClusterMetadata cluster.Metadata NamespaceCache cache.NamespaceCache }
VisibilityBootstrapContainer contains components needed by all visibility Archiver implementations
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package filestore is a generated GoMock package.
|
Package filestore is a generated GoMock package. |
Package gcloud is a generated GoMock package.
|
Package gcloud is a generated GoMock package. |
Package s3store is a generated GoMock package.
|
Package s3store is a generated GoMock package. |