migration

package
v1.27.0-128.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2025 License: MIT Imports: 35 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	NamespaceTagName           = "namespace"
	ForceReplicationRpsTagName = "force_replication_rps"
)

Functions

func ForceReplicationWorkflow

func ForceReplicationWorkflow(ctx workflow.Context, params ForceReplicationParams) error

func ForceTaskQueueUserDataReplicationWorkflow added in v1.21.0

func ForceTaskQueueUserDataReplicationWorkflow(ctx workflow.Context, params TaskQueueUserDataReplicationParamsWithNamespace) error

func NamespaceHandoverWorkflow

func NamespaceHandoverWorkflow(ctx workflow.Context, params NamespaceHandoverParams) (retErr error)

func NewResult

func NewResult(params initParams) fxResult

Types

type CatchUpOutput

type CatchUpOutput struct{}

func CatchupWorkflow

func CatchupWorkflow(ctx workflow.Context, params CatchUpParams) (CatchUpOutput, error)

type CatchUpParams

type CatchUpParams struct {
	Namespace     string
	RemoteCluster string
}

type ForceReplicationOutput

type ForceReplicationOutput struct {
}

type ForceReplicationParams

type ForceReplicationParams struct {
	Namespace               string `validate:"required"`
	Query                   string `validate:"required"` // query to list workflows for replication
	ConcurrentActivityCount int
	OverallRps              float64 // RPS for enqueuing of replication tasks
	GetParentInfoRPS        float64 // RPS for getting parent child info
	ListWorkflowsPageSize   int     // PageSize of ListWorkflow, will paginate through results.
	PageCountPerExecution   int     // number of pages to be processed before continue as new, max is 1000.
	NextPageToken           []byte  // used by continue as new

	// Used for verifying workflow executions were replicated successfully on target cluster.
	EnableVerification      bool
	TargetClusterEndpoint   string
	TargetClusterName       string
	VerifyIntervalInSeconds int `validate:"gte=0"`

	// Used by query handler to indicate overall progress of replication
	LastCloseTime                      time.Time
	LastStartTime                      time.Time
	ContinuedAsNewCount                int
	TaskQueueUserDataReplicationParams TaskQueueUserDataReplicationParams
	ReplicatedWorkflowCount            int64
	TotalForceReplicateWorkflowCount   int64
	ReplicatedWorkflowCountPerSecond   float64

	// Used to calculate QPS
	QPSQueue QPSQueue

	// Carry over the replication status after continue-as-new.
	TaskQueueUserDataReplicationStatus TaskQueueUserDataReplicationStatus
}

type ForceReplicationStatus added in v1.16.3

type ForceReplicationStatus struct {
	LastCloseTime                      time.Time
	LastStartTime                      time.Time
	TaskQueueUserDataReplicationStatus TaskQueueUserDataReplicationStatus
	ContinuedAsNewCount                int
	TotalWorkflowCount                 int64
	ReplicatedWorkflowCount            int64
	ReplicatedWorkflowCountPerSecond   float64
	PageTokenForRestart                []byte
}

type NamespaceHandoverParams

type NamespaceHandoverParams struct {
	Namespace     string
	RemoteCluster string

	// how far behind on replication is allowed for remote cluster before handover is initiated
	AllowedLaggingSeconds int
	AllowedLaggingTasks   int64

	// how long to wait for handover to complete before rollback
	HandoverTimeoutSeconds int
}

type QPSData

type QPSData struct {
	Count     int64
	Timestamp time.Time
}

type QPSQueue

type QPSQueue struct {
	MaxSize int
	Data    []QPSData
}

func NewQPSQueue

func NewQPSQueue(concurrentActivityCount int) QPSQueue

NewQPSQueue initializes a QPSQueue to collect data points for each workflow execution. The queue size is set to concurrency + 1 to account for up to 'concurrency' activities running simultaneously and the initial starting point.

func (*QPSQueue) CalculateQPS

func (q *QPSQueue) CalculateQPS() float64

func (*QPSQueue) Enqueue

func (q *QPSQueue) Enqueue(count int64)

type SkippedWorkflowExecution added in v1.21.5

type SkippedWorkflowExecution struct {
	WorkflowExecution *commonpb.WorkflowExecution
	Reason            string
}

type TaskQueueUserDataReplicationParams added in v1.21.0

type TaskQueueUserDataReplicationParams struct {
	// PageSize for the SeedReplicationQueueWithUserDataEntries activity
	PageSize int
	// RPS limits the number of task queue user data entries pages requested per second.
	RPS float64
}

type TaskQueueUserDataReplicationParamsWithNamespace added in v1.21.0

type TaskQueueUserDataReplicationParamsWithNamespace struct {
	TaskQueueUserDataReplicationParams
	// Namespace name
	Namespace string
}

TaskQueueUserDataReplicationParamsWithNamespace is used for child workflow / activity input

type TaskQueueUserDataReplicationStatus added in v1.21.0

type TaskQueueUserDataReplicationStatus struct {
	Done           bool
	FailureMessage string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL