archiver

package
v1.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2021 License: MIT Imports: 29 Imported by: 2

README

What

This README explains how to add new Archiver implementations.

Steps

Step 1: Create a new package for your implementation

Create a new directory in the archiver folder. The structure should look like the following:

./common/archiver
  - filestore/                      -- Filestore implementation 
  - provider/
      - provider.go                 -- Provider of archiver instances
  - yourImplementation/
      - historyArchiver.go          -- HistoryArchiver implementation
      - historyArchiver_test.go     -- Unit tests for HistoryArchiver
      - visibilityArchiver.go       -- VisibilityArchiver implementations
      - visibilityArchiver_test.go  -- Unit tests for VisibilityArchiver

Step 2: Implement the HistoryArchiver interface

type HistoryArchiver interface {
	// Archive is used to archive a workflow history. When the context expires the method should stop trying to archive.
	// Implementors are free to archive however they want, including implementing retries of sub-operations. The URI defines
	// the resource that histories should be archived into. The implementor gets to determine how to interpret the URI.
	// The Archive method may or may not be automatically retried by the caller. The ArchiveOptions are used
	// to interact with these retries including giving the implementor the ability to cancel retries and record progress
  // between retry attempts. 
  // This method will be invoked after a workflow passes its retention period.
    Archive(context.Context, URI, *ArchiveHistoryRequest, ...ArchiveOption) error
    
    // Get is used to access an archived history. When context expires method should stop trying to fetch history.
    // The URI identifies the resource from which history should be accessed and it is up to the implementor to interpret this URI.
    // This method should thrift errors - see filestore as an example.
    Get(context.Context, URI, *GetHistoryRequest) (*GetHistoryResponse, error)
    
    // ValidateURI is used to define what a valid URI for an implementation is.
    ValidateURI(URI) error
}

Step 3: Implement the VisibilityArchiver interface

type VisibilityArchiver interface {
    // Archive is used to archive one workflow visibility record. 
    // Check the Archive() method of the HistoryArchiver interface in Step 2 for parameters' meaning and requirements. 
    // The only difference is that the ArchiveOption parameter won't include an option for recording process. 
    // Please make sure your implementation is lossless. If any in-memory batching mechanism is used, then those batched records will be lost during server restarts. 
    // This method will be invoked when workflow closes. Note that because of conflict resolution, it is possible for a workflow to through the closing process multiple times, which means that this method can be invoked more than once after a workflow closes.
    Archive(context.Context, URI, *ArchiveVisibilityRequest, ...ArchiveOption) error
    
    // Query is used to retrieve archived visibility records. 
    // Check the Get() method of the HistoryArchiver interface in Step 2 for parameters' meaning and requirements.
    // The request includes a string field called query, which describes what kind of visibility records should be returned. For example, it can be some SQL-like syntax query string. 
    // Your implementation is responsible for parsing and validating the query, and also returning all visibility records that match the query. 
    // Currently the maximum context timeout passed into the method is 3 minutes, so it's ok if this method takes a long time to run.
    Query(context.Context, URI, *QueryVisibilityRequest) (*QueryVisibilityResponse, error)

    // ValidateURI is used to define what a valid URI for an implementation is.
    ValidateURI(URI) error
}

Step 4: Update provider to provide access to your implementation

Modify the ./provider/provider.go file so that the ArchiverProvider knows how to create an instance of your archiver. Also, add configs for you archiver to static yaml config files and modify the HistoryArchiverProvider and VisibilityArchiverProvider struct in the ../common/service/config.go accordingly.

FAQ

If my Archive method can automatically be retried by caller how can I record and access progress between retries?

ArchiverOptions is used to handle this. The following shows and example:

func (a *Archiver) Archive(
	ctx context.Context,
	URI string,
	request *ArchiveRequest,
	opts ...ArchiveOption,
) error {
  featureCatalog := GetFeatureCatalog(opts...) // this function is defined in options.go

  var progress progress

  // Check if the feature for recording progress is enabled.
  if featureCatalog.ProgressManager != nil {
    if err := featureCatalog.ProgressManager.LoadProgress(ctx, &prevProgress); err != nil {
      // log some error message and return error if needed.
    }
  }

  // Your archiver implementation...

  // Record current progress
  if featureCatalog.ProgressManager != nil {
    if err := featureCatalog.ProgressManager.RecordProgress(ctx, progress); err != nil {
      // log some error message and return error if needed. 
    }
  }
}

If my Archive method encounters an error which is non-retryable how do I indicate that the caller should not retry?

func (a *Archiver) Archive(
	ctx context.Context,
	URI string,
	request *ArchiveRequest,
	opts ...ArchiveOption,
) error {
  featureCatalog := GetFeatureCatalog(opts...) // this function is defined in options.go

  err := youArchiverImpl()
  if nonRetryableErr(err) {
    if featureCatalog.NonRetryableError != nil {
	  return featureCatalog.NonRetryableError() // when the caller gets this error type back it will not retry anymore.
    }
  }
}

How does my history archiver implementation read history?

The archiver package provides a utility class called HistoryIterator which is a wrapper of HistoryManager. Its usage is simpler than the HistoryManager given in the BootstrapContainer, so archiver implementations can choose to use it when reading workflow histories. See the historyIterator.go file for more details. Sample usage can be found in the filestore historyArchiver implementation.

Should my archiver define all its own error types?

Each archiver is free to define and return any errors it wants. However many common errors which exist between archivers are already defined in constants.go.

Is there a generic query syntax for visibility archiver?

Currently no. But this is something we plan to do in the future. As for now, try to make your syntax similar to the one used by our advanced list workflow API.

Documentation

Overview

Package archiver is a generated GoMock package.

Index

Constants

View Source
const (
	// ArchiveNonRetryableErrorMsg is the log message when the Archive() method encounters a non-retryable error
	ArchiveNonRetryableErrorMsg = "Archive method encountered an non-retryable error."
	// ArchiveTransientErrorMsg is the log message when the Archive() method encounters a transient error
	ArchiveTransientErrorMsg = "Archive method encountered a transient error."

	// ErrReasonInvalidURI is the error reason for invalid URI
	ErrReasonInvalidURI = "URI is invalid"
	// ErrReasonInvalidArchiveRequest is the error reason for invalid archive request
	ErrReasonInvalidArchiveRequest = "archive request is invalid"
	// ErrReasonConstructHistoryIterator is the error reason for failing to construct history iterator
	ErrReasonConstructHistoryIterator = "failed to construct history iterator"
	// ErrReasonReadHistory is the error reason for failing to read history
	ErrReasonReadHistory = "failed to read history batches"
	// ErrReasonHistoryMutated is the error reason for mutated history
	ErrReasonHistoryMutated = "history was mutated"
)

Variables

View Source
var (
	// ErrInvalidURI is the error for invalid URI
	ErrInvalidURI = errors.New("URI is invalid")
	// ErrURISchemeMismatch is the error for mismatch between URI scheme and archiver
	ErrURISchemeMismatch = errors.New("URI scheme does not match the archiver")
	// ErrHistoryMutated is the error for mutated history
	ErrHistoryMutated = errors.New("history was mutated")
	// ErrContextTimeout is the error for context timeout
	ErrContextTimeout = errors.New("archive aborted because context timed out")
	// ErrInvalidGetHistoryRequest is the error for invalid GetHistory request
	ErrInvalidGetHistoryRequest = errors.New("get archived history request is invalid")
	// ErrInvalidQueryVisibilityRequest is the error for invalid Query Visibility request
	ErrInvalidQueryVisibilityRequest = errors.New("query visiblity request is invalid")
	// ErrNextPageTokenCorrupted is the error for corrupted GetHistory token
	ErrNextPageTokenCorrupted = errors.New("next page token is corrupted")
	// ErrHistoryNotExist is the error for non-exist history
	ErrHistoryNotExist = errors.New("requested workflow history does not exist")
)

Functions

func ConvertSearchAttrToPayload added in v0.27.0

func ConvertSearchAttrToPayload(searchAttrStr map[string]string) map[string]*commonpb.Payload

ConvertSearchAttrToPayload converts search attribute value from string back to byte array

func TagLoggerWithArchiveHistoryRequestAndURI added in v0.27.0

func TagLoggerWithArchiveHistoryRequestAndURI(logger log.Logger, request *ArchiveHistoryRequest, URI string) log.Logger

TagLoggerWithArchiveHistoryRequestAndURI tags logger with fields in the archive history request and the URI

func TagLoggerWithArchiveVisibilityRequestAndURI added in v0.27.0

func TagLoggerWithArchiveVisibilityRequestAndURI(logger log.Logger, request *archiverspb.ArchiveVisibilityRequest, URI string) log.Logger

TagLoggerWithArchiveVisibilityRequestAndURI tags logger with fields in the archive visibility request and the URI

func ValidateGetRequest added in v0.27.0

func ValidateGetRequest(request *GetHistoryRequest) error

ValidateGetRequest validates the get archived history request

func ValidateHistoryArchiveRequest added in v0.27.0

func ValidateHistoryArchiveRequest(request *ArchiveHistoryRequest) error

ValidateHistoryArchiveRequest validates the archive history request

func ValidateQueryRequest added in v0.27.0

func ValidateQueryRequest(request *QueryVisibilityRequest) error

ValidateQueryRequest validates the query visibility request

func ValidateVisibilityArchivalRequest added in v0.27.0

func ValidateVisibilityArchivalRequest(request *archiverspb.ArchiveVisibilityRequest) error

ValidateVisibilityArchivalRequest validates the archive visibility request

Types

type ArchivalConfig added in v0.7.0

type ArchivalConfig interface {
	ClusterConfiguredForArchival() bool
	GetClusterState() ArchivalState
	ReadEnabled() bool
	GetNamespaceDefaultState() enumspb.ArchivalState
	GetNamespaceDefaultURI() string
}

ArchivalConfig is an immutable representation of the archival configuration of the cluster This config is determined at cluster startup time

func NewArchivalConfig added in v0.7.0

func NewArchivalConfig(
	staticClusterStateStr string,
	dynamicClusterState dynamicconfig.StringPropertyFn,
	enableRead dynamicconfig.BoolPropertyFn,
	namespaceDefaultStateStr string,
	namespaceDefaultURI string,
) ArchivalConfig

NewArchivalConfig constructs a new valid ArchivalConfig

func NewDisabledArchvialConfig added in v0.8.0

func NewDisabledArchvialConfig() ArchivalConfig

NewDisabledArchvialConfig returns a disabled ArchivalConfig

type ArchivalMetadata added in v0.7.0

type ArchivalMetadata interface {
	GetHistoryConfig() ArchivalConfig
	GetVisibilityConfig() ArchivalConfig
}

ArchivalMetadata provides cluster level archival information

func NewArchivalMetadata added in v0.7.0

func NewArchivalMetadata(
	dc *dynamicconfig.Collection,
	historyState string,
	historyReadEnabled bool,
	visibilityState string,
	visibilityReadEnabled bool,
	namespaceDefaults *config.ArchivalNamespaceDefaults,
) ArchivalMetadata

NewArchivalMetadata constructs a new ArchivalMetadata

type ArchivalState added in v0.27.0

type ArchivalState int

ArchivalState represents the archival state of the cluster

const (
	// ArchivalDisabled means this cluster is not configured to handle archival
	ArchivalDisabled ArchivalState = iota
	// ArchivalPaused means this cluster is configured to handle archival but is currently not archiving
	// This state is not yet implemented, as of now ArchivalPaused is treated the same way as ArchivalDisabled
	ArchivalPaused
	// ArchivalEnabled means this cluster is currently archiving
	ArchivalEnabled
)

type ArchiveFeatureCatalog

type ArchiveFeatureCatalog struct {
	ProgressManager   ProgressManager
	NonRetryableError NonRetryableError
}

ArchiveFeatureCatalog is a collection features for the Archive method of History/Visibility Archiver

func GetFeatureCatalog

func GetFeatureCatalog(opts ...ArchiveOption) *ArchiveFeatureCatalog

GetFeatureCatalog applies all the ArchiveOptions to the catalog and returns the catalog. It should be called inside the Archive method.

type ArchiveHistoryRequest

type ArchiveHistoryRequest struct {
	ShardID              int32
	NamespaceID          string
	Namespace            string
	WorkflowID           string
	RunID                string
	BranchToken          []byte
	NextEventID          int64
	CloseFailoverVersion int64
}

ArchiveHistoryRequest is request to Archive workflow history

type ArchiveOption

type ArchiveOption func(featureCatalog *ArchiveFeatureCatalog)

ArchiveOption is used to provide options for adding features to the Archive method of History/Visibility Archiver

func GetHeartbeatArchiveOption

func GetHeartbeatArchiveOption() ArchiveOption

GetHeartbeatArchiveOption returns an ArchiveOption for enabling heartbeating. It should be used when the Archive method is invoked inside an activity.

func GetNonRetryableErrorOption added in v0.27.0

func GetNonRetryableErrorOption(nonRetryableErr error) ArchiveOption

GetNonRetryableErrorOption returns an ArchiveOption so that archiver knows what should be returned when an non-retryable error is encountered.

type GetHistoryRequest

type GetHistoryRequest struct {
	NamespaceID          string
	WorkflowID           string
	RunID                string
	CloseFailoverVersion *int64
	NextPageToken        []byte
	PageSize             int
}

GetHistoryRequest is the request to Get archived history

type GetHistoryResponse

type GetHistoryResponse struct {
	HistoryBatches []*historypb.History
	NextPageToken  []byte
}

GetHistoryResponse is the response of Get archived history

type HistoryArchiver

type HistoryArchiver interface {
	// Archive is used to archive a Workflow's history. When the context expires the method should stop trying to archive.
	// Implementors are free to archive however they want, including implementing retries of sub-operations. The URI defines
	// the resource that histories should be archived into. The implementor gets to determine how to interpret the URI.
	// The Archive method may or may not be automatically retried by the caller. ArchiveOptions are used
	// to interact with these retries including giving the implementor the ability to cancel retries and record progress
	// between retry attempts.
	// This method will be invoked after a workflow passes its retention period.
	Archive(context.Context, URI, *ArchiveHistoryRequest, ...ArchiveOption) error
	// Get is used to access an archived history. When context expires this method should stop trying to fetch history.
	// The URI identifies the resource from which history should be accessed and it is up to the implementor to interpret this URI.
	// This method should emit api service errors - see the filestore as an example.
	Get(context.Context, URI, *GetHistoryRequest) (*GetHistoryResponse, error)
	// ValidateURI is used to define what a valid URI for an implementation is.
	ValidateURI(URI) error
}

HistoryArchiver is used to archive history and read archived history

type HistoryArchiverMock added in v0.7.0

type HistoryArchiverMock struct {
	mock.Mock
}

HistoryArchiverMock is an autogenerated mock type for the HistoryArchiver type

func (*HistoryArchiverMock) Archive added in v0.7.0

func (_m *HistoryArchiverMock) Archive(ctx context.Context, uri URI, request *ArchiveHistoryRequest, opts ...ArchiveOption) error

Archive provides a mock function with given fields: ctx, uri, request, opts

func (*HistoryArchiverMock) Get added in v0.7.0

Get provides a mock function with given fields: ctx, uri, request

func (*HistoryArchiverMock) ValidateURI added in v0.7.0

func (_m *HistoryArchiverMock) ValidateURI(uri URI) error

ValidateURI provides a mock function with given fields: uri

type HistoryBootstrapContainer

type HistoryBootstrapContainer struct {
	HistoryV2Manager persistence.HistoryManager
	Logger           log.Logger
	MetricsClient    metrics.Client
	ClusterMetadata  cluster.Metadata
	NamespaceCache   cache.NamespaceCache
}

HistoryBootstrapContainer contains components needed by all history Archiver implementations

type HistoryIterator

type HistoryIterator interface {
	Next() (*archiverspb.HistoryBlob, error)
	HasNext() bool
	GetState() ([]byte, error)
}

HistoryIterator is used to get history batches

func NewHistoryIterator

func NewHistoryIterator(
	request *ArchiveHistoryRequest,
	historyV2Manager persistence.HistoryManager,
	targetHistoryBlobSize int,
) HistoryIterator

NewHistoryIterator returns a new HistoryIterator

func NewHistoryIteratorFromState added in v0.7.0

func NewHistoryIteratorFromState(
	request *ArchiveHistoryRequest,
	historyV2Manager persistence.HistoryManager,
	targetHistoryBlobSize int,
	initialState []byte,
) (HistoryIterator, error)

NewHistoryIteratorFromState returns a new HistoryIterator with specified state

type MockArchivalMetadata added in v0.7.0

type MockArchivalMetadata struct {
	mock.Mock
}

MockArchivalMetadata is an autogenerated mock type for the ArchivalMetadata type

func (*MockArchivalMetadata) GetHistoryConfig added in v0.7.0

func (_m *MockArchivalMetadata) GetHistoryConfig() ArchivalConfig

GetHistoryConfig provides a mock function with given fields:

func (*MockArchivalMetadata) GetVisibilityConfig added in v0.7.0

func (_m *MockArchivalMetadata) GetVisibilityConfig() ArchivalConfig

GetVisibilityConfig provides a mock function with given fields:

type MockHistoryIterator

type MockHistoryIterator struct {
	// contains filtered or unexported fields
}

MockHistoryIterator is a mock of HistoryIterator interface

func NewMockHistoryIterator

func NewMockHistoryIterator(ctrl *gomock.Controller) *MockHistoryIterator

NewMockHistoryIterator creates a new mock instance

func (*MockHistoryIterator) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockHistoryIterator) GetState

func (m *MockHistoryIterator) GetState() ([]byte, error)

GetState mocks base method

func (*MockHistoryIterator) HasNext

func (m *MockHistoryIterator) HasNext() bool

HasNext mocks base method

func (*MockHistoryIterator) Next

Next mocks base method

type MockHistoryIteratorMockRecorder

type MockHistoryIteratorMockRecorder struct {
	// contains filtered or unexported fields
}

MockHistoryIteratorMockRecorder is the mock recorder for MockHistoryIterator

func (*MockHistoryIteratorMockRecorder) GetState

GetState indicates an expected call of GetState

func (*MockHistoryIteratorMockRecorder) HasNext

HasNext indicates an expected call of HasNext

func (*MockHistoryIteratorMockRecorder) Next

Next indicates an expected call of Next

type NonRetryableError added in v0.27.0

type NonRetryableError func() error

NonRetryableError returns an error indicating archiver has encountered an non-retryable error

type ProgressManager

type ProgressManager interface {
	RecordProgress(ctx context.Context, progress interface{}) error
	LoadProgress(ctx context.Context, valuePtr interface{}) error
	HasProgress(ctx context.Context) bool
}

ProgressManager is used to record and load archive progress

type QueryVisibilityRequest added in v0.27.0

type QueryVisibilityRequest struct {
	NamespaceID   string
	PageSize      int
	NextPageToken []byte
	Query         string
}

QueryVisibilityRequest is the request to query archived visibility records

type QueryVisibilityResponse added in v0.27.0

type QueryVisibilityResponse struct {
	Executions    []*workflowpb.WorkflowExecutionInfo
	NextPageToken []byte
}

QueryVisibilityResponse is the response of querying archived visibility records

type SizeEstimator

type SizeEstimator interface {
	EstimateSize(v interface{}) (int, error)
}

SizeEstimator is used to estimate the size of any object

func NewJSONSizeEstimator

func NewJSONSizeEstimator() SizeEstimator

NewJSONSizeEstimator returns a new SizeEstimator which uses json encoding to estimate size

type URI added in v0.7.0

type URI interface {
	Scheme() string
	Path() string
	Hostname() string
	Port() string
	Username() string
	Password() string
	String() string
	Opaque() string
	Query() map[string][]string
}

URI identifies the archival resource to which records are written to and read from.

func NewURI added in v0.7.0

func NewURI(s string) (URI, error)

NewURI constructs a new archiver URI from string.

type VisibilityArchiver

type VisibilityArchiver interface {
	// Archive is used to archive one Workflow visibility record.
	// Check the Archive method of the HistoryArchiver interface for parameters' meaning and requirements.
	// The only difference is that the ArchiveOption parameter won't include an option for recording process.
	// Please make sure your implementation is lossless. If any in-memory batching mechanism is used
	// then those batched records will be lost during server restarts. This method will be invoked when the Workflow closes.
	// Note that because of conflict resolution, it is possible for a Workflow to through the closing process multiple times,
	// which means that this method can be invoked more than once after a Workflow closes.
	Archive(context.Context, URI, *archiverspb.ArchiveVisibilityRequest, ...ArchiveOption) error
	// Query is used to retrieve archived visibility records.
	// Check the Get() method of the HistoryArchiver interface in Step 2 for parameters' meaning and requirements.
	// The request includes a string field called query, which describes what kind of visibility records should be returned.
	// For example, it can be  some SQL-like syntax query string.
	// Your implementation is responsible for parsing and validating the query, and also returning all visibility records that match the query.
	// Currently the maximum context timeout passed into the method is 3 minutes, so it's accetable if this method takes some time to run.
	Query(context.Context, URI, *QueryVisibilityRequest) (*QueryVisibilityResponse, error)
	// ValidateURI is used to define what a valid URI for an implementation is.
	ValidateURI(URI) error
}

VisibilityArchiver is used to archive visibility and read archived visibility

type VisibilityArchiverMock added in v0.7.0

type VisibilityArchiverMock struct {
	mock.Mock
}

VisibilityArchiverMock is an autogenerated mock type for the VisibilityArchiver type

func (*VisibilityArchiverMock) Archive added in v0.27.0

Archive provides a mock function with given fields: _a0, _a1, _a2, _a3

func (*VisibilityArchiverMock) Query added in v0.27.0

Query provides a mock function with given fields: _a0, _a1, _a2

func (*VisibilityArchiverMock) ValidateURI added in v0.7.0

func (_m *VisibilityArchiverMock) ValidateURI(uri URI) error

ValidateURI provides a mock function with given fields: uri

type VisibilityBootstrapContainer

type VisibilityBootstrapContainer struct {
	Logger          log.Logger
	MetricsClient   metrics.Client
	ClusterMetadata cluster.Metadata
	NamespaceCache  cache.NamespaceCache
}

VisibilityBootstrapContainer contains components needed by all visibility Archiver implementations

Directories

Path Synopsis
Package filestore is a generated GoMock package.
Package filestore is a generated GoMock package.
Package gcloud is a generated GoMock package.
Package gcloud is a generated GoMock package.
Package s3store is a generated GoMock package.
Package s3store is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL