common

package
v0.5.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2019 License: MIT Imports: 18 Imported by: 68

Documentation

Index

Constants

View Source
const (
	// FirstEventID is the id of the first event in the history
	FirstEventID int64 = 1
	// EmptyEventID is the id of the empty event
	EmptyEventID int64 = -23
	// EmptyVersion is used as the default value for failover version when no value is provided
	EmptyVersion int64 = -24
	// EndEventID is the id of the end event, here we use the int64 max
	EndEventID int64 = 1<<63 - 1
	// BufferedEventID is the id of the buffered event
	BufferedEventID int64 = -123
	// EmptyEventTaskID is uninitialized id of the task id within event
	EmptyEventTaskID int64 = -1234
	// TransientEventID is the id of the transient event
	TransientEventID int64 = -124
	// FirstBlobPageToken is the page token identifying the first blob for each history archival
	FirstBlobPageToken = 1
	// LastBlobNextPageToken is the next page token on the last blob for each history archival
	LastBlobNextPageToken = -1
)
View Source
const (
	// FrontendServiceName is the name of the frontend service
	FrontendServiceName = "cadence-frontend"
	// HistoryServiceName is the name of the history service
	HistoryServiceName = "cadence-history"
	// MatchingServiceName is the name of the matching service
	MatchingServiceName = "cadence-matching"
	// WorkerServiceName is the name of the worker service
	WorkerServiceName = "cadence-worker"
)
View Source
const (
	EncodingTypeJSON     EncodingType = "json"
	EncodingTypeThriftRW              = "thriftrw"
	EncodingTypeGob                   = "gob"
	EncodingTypeUnknown               = "unknow"
)

Data encoding types

View Source
const (
	// GetHistoryWarnSizeLimit is the threshold for emitting warn log
	GetHistoryWarnSizeLimit = 500 * 1024 // Warn when size goes over 500KB
	// GetHistoryMaxPageSize is the max page size for get history
	GetHistoryMaxPageSize = 1000
)
View Source
const (

	// DaemonStatusInitialized coroutine pool initialized
	DaemonStatusInitialized int32 = 0
	// DaemonStatusStarted coroutine pool started
	DaemonStatusStarted int32 = 1
	// DaemonStatusStopped coroutine pool stopped
	DaemonStatusStopped int32 = 2
)
View Source
const (
	// LibraryVersionHeaderName refers to the name of the
	// tchannel / http header that contains the client
	// library version
	LibraryVersionHeaderName = "cadence-client-library-version"

	// FeatureVersionHeaderName refers to the name of the
	// tchannel / http header that contains the client
	// feature version
	// the feature version sent from client represents the
	// feature set of the cadence client library support.
	// This can be used for client capibility check, on
	// Cadence server, for backward compatibility
	FeatureVersionHeaderName = "cadence-client-feature-version"

	// ClientImplHeaderName refers to the name of the
	// header that contains the client implementation
	ClientImplHeaderName = "cadence-client-name"
)
View Source
const (

	// FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit
	FailureReasonCompleteResultExceedsLimit = "COMPLETE_RESULT_EXCEEDS_LIMIT"
	// FailureReasonFailureDetailsExceedsLimit is failureReason for failure details exceeds limit
	FailureReasonFailureDetailsExceedsLimit = "FAILURE_DETAILS_EXCEEDS_LIMIT"
	// FailureReasonCancelDetailsExceedsLimit is failureReason for cancel details exceeds limit
	FailureReasonCancelDetailsExceedsLimit = "CANCEL_DETAILS_EXCEEDS_LIMIT"
	//FailureReasonHeartbeatExceedsLimit is failureReason for heartbeat exceeds limit
	FailureReasonHeartbeatExceedsLimit = "HEARTBEAT_EXCEEDS_LIMIT"
	// FailureReasonDecisionBlobSizeExceedsLimit is the failureReason for decision blob exceeds size limit
	FailureReasonDecisionBlobSizeExceedsLimit = "DECISION_BLOB_SIZE_EXCEEDS_LIMIT"
	// TerminateReasonSizeExceedsLimit is reason to terminate workflow when history size exceed limit
	TerminateReasonSizeExceedsLimit = "HISTORY_SIZE_EXCEEDS_LIMIT"
)
View Source
const MaxTaskTimeout = 31622400

MaxTaskTimeout is maximum task timeout allowed. 366 days in seconds

View Source
const NoRetryBackoff = time.Duration(-1)

NoRetryBackoff is used to represent backoff when no retry is needed

View Source
const (
	// VisibilityAppName is used to find kafka topics and ES indexName for visibility
	VisibilityAppName = "visibility"
)

Variables

View Source
var (
	// ErrBlobSizeExceedsLimit is error for event blob size exceeds limit
	ErrBlobSizeExceedsLimit = &workflow.BadRequestError{Message: "Blob data size exceeds limit."}
)

Functions

func ActivityTypePtr

func ActivityTypePtr(v s.ActivityType) *s.ActivityType

ActivityTypePtr makes a copy and returns the pointer to a ActivityType.

func AddSecondsToBaseTime

func AddSecondsToBaseTime(baseTimeInNanoSec int64, durationInSeconds int64) int64

AddSecondsToBaseTime - Gets the UnixNano with given duration and base time.

func AggregateYarpcOptions added in v0.3.5

func AggregateYarpcOptions(ctx context.Context, opts ...yarpc.CallOption) []yarpc.CallOption

AggregateYarpcOptions aggregate the header information from context to existing yarpc call options

func ArchivalStatusPtr added in v0.5.0

func ArchivalStatusPtr(t s.ArchivalStatus) *s.ArchivalStatus

ArchivalStatusPtr makes a copy and returns the pointer to an ArchivalStatus.

func AwaitWaitGroup

func AwaitWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool

AwaitWaitGroup calls Wait on the given wait Returns true if the Wait() call succeeded before the timeout Returns false if the Wait() did not return before the timeout

func BoolDefault added in v0.3.2

func BoolDefault(v *bool) bool

BoolDefault returns value if bool pointer is set otherwise default value of bool

func BoolPtr

func BoolPtr(v bool) *bool

BoolPtr makes a copy and returns the pointer to a bool.

func CancelExternalWorkflowExecutionFailedCausePtr added in v0.3.2

func CancelExternalWorkflowExecutionFailedCausePtr(t s.CancelExternalWorkflowExecutionFailedCause) *s.CancelExternalWorkflowExecutionFailedCause

CancelExternalWorkflowExecutionFailedCausePtr makes a copy and returns the pointer to a CancelExternalWorkflowExecutionFailedCause.

func CheckEventBlobSizeLimit added in v0.5.0

func CheckEventBlobSizeLimit(actualSize, warnLimit, errorLimit int, domainID, workflowID, runID string, metricsClient metrics.Client, scope int, logger bark.Logger) error

CheckEventBlobSizeLimit checks if a blob data exceeds limits. It logs a warning if it exceeds warnLimit, and return ErrBlobSizeExceedsLimit if it exceeds errorLimit.

func ChildPolicyPtr added in v0.3.2

func ChildPolicyPtr(t s.ChildPolicy) *s.ChildPolicy

ChildPolicyPtr makes a copy and returns the pointer to a ChildPolicy.

func ChildWorkflowExecutionFailedCausePtr added in v0.3.2

func ChildWorkflowExecutionFailedCausePtr(t s.ChildWorkflowExecutionFailedCause) *s.ChildWorkflowExecutionFailedCause

ChildWorkflowExecutionFailedCausePtr makes a copy and returns the pointer to a ChildWorkflowExecutionFailedCause.

func CreateAdminServiceRetryPolicy added in v0.5.0

func CreateAdminServiceRetryPolicy() backoff.RetryPolicy

CreateAdminServiceRetryPolicy creates a retry policy for calls to matching service

func CreateBlobstoreClientRetryPolicy added in v0.5.2

func CreateBlobstoreClientRetryPolicy() backoff.RetryPolicy

CreateBlobstoreClientRetryPolicy creates a retry policy for blobstore client

func CreateFrontendServiceRetryPolicy added in v0.3.11

func CreateFrontendServiceRetryPolicy() backoff.RetryPolicy

CreateFrontendServiceRetryPolicy creates a retry policy for calls to frontend service

func CreateHistoryServiceRetryPolicy

func CreateHistoryServiceRetryPolicy() backoff.RetryPolicy

CreateHistoryServiceRetryPolicy creates a retry policy for calls to history service

func CreateHistoryStartWorkflowRequest added in v0.4.0

func CreateHistoryStartWorkflowRequest(domainID string, startRequest *workflow.StartWorkflowExecutionRequest) *h.StartWorkflowExecutionRequest

CreateHistoryStartWorkflowRequest create a start workflow request for history

func CreateKafkaOperationRetryPolicy added in v0.5.2

func CreateKafkaOperationRetryPolicy() backoff.RetryPolicy

CreateKafkaOperationRetryPolicy creates a retry policy for kafka operation

func CreateMatchingPollForDecisionTaskResponse added in v0.3.12

func CreateMatchingPollForDecisionTaskResponse(historyResponse *h.RecordDecisionTaskStartedResponse, workflowExecution *workflow.WorkflowExecution, token []byte) *m.PollForDecisionTaskResponse

CreateMatchingPollForDecisionTaskResponse create response for matching's PollForDecisionTask

func CreateMatchingServiceRetryPolicy added in v0.5.0

func CreateMatchingServiceRetryPolicy() backoff.RetryPolicy

CreateMatchingServiceRetryPolicy creates a retry policy for calls to matching service

func CreatePersistanceRetryPolicy

func CreatePersistanceRetryPolicy() backoff.RetryPolicy

CreatePersistanceRetryPolicy creates a retry policy for persistence layer operations

func CreatePublicClientRetryPolicy added in v0.5.2

func CreatePublicClientRetryPolicy() backoff.RetryPolicy

CreatePublicClientRetryPolicy creates a retry policy for calls to frontend service

func DecisionTaskFailedCausePtr added in v0.3.2

func DecisionTaskFailedCausePtr(t s.DecisionTaskFailedCause) *s.DecisionTaskFailedCause

DecisionTaskFailedCausePtr makes a copy and returns the pointer to a DecisionTaskFailedCause.

func DecisionTypePtr

func DecisionTypePtr(t s.DecisionType) *s.DecisionType

DecisionTypePtr makes a copy and returns the pointer to a DecisionType.

func EventTypePtr

func EventTypePtr(t s.EventType) *s.EventType

EventTypePtr makes a copy and returns the pointer to a EventType.

func Float64Ptr

func Float64Ptr(v float64) *float64

Float64Ptr makes a copy and returns the pointer to an int64.

func GenerateRandomString added in v0.3.12

func GenerateRandomString(n int) string

GenerateRandomString is used for generate test string

func Int32Default added in v0.3.2

func Int32Default(v *int32) int32

Int32Default returns value if int32 pointer is set otherwise default value of int32

func Int32Ptr

func Int32Ptr(v int32) *int32

Int32Ptr makes a copy and returns the pointer to an int32.

func Int64Default added in v0.3.2

func Int64Default(v *int64) int64

Int64Default returns value if int64 pointer is set otherwise default value of int64

func Int64Ptr

func Int64Ptr(v int64) *int64

Int64Ptr makes a copy and returns the pointer to an int64.

func IntPtr

func IntPtr(v int) *int

IntPtr makes a copy and returns the pointer to an int.

func IsBlobstoreNonRetryableError added in v0.5.2

func IsBlobstoreNonRetryableError(err error) bool

IsBlobstoreNonRetryableError checks if the error is a non retryable error.

func IsBlobstoreTransientError added in v0.5.2

func IsBlobstoreTransientError(err error) bool

IsBlobstoreTransientError checks if the error is a retryable error.

func IsKafkaTransientError added in v0.5.2

func IsKafkaTransientError(err error) bool

IsKafkaTransientError check if the error is a transient kafka error

func IsPersistenceTransientError

func IsPersistenceTransientError(err error) bool

IsPersistenceTransientError checks if the error is a transient persistence error

func IsServiceNonRetryableError

func IsServiceNonRetryableError(err error) bool

IsServiceNonRetryableError checks if the error is a non retryable error.

func IsServiceTransientError added in v0.3.11

func IsServiceTransientError(err error) bool

IsServiceTransientError checks if the error is a retryable error.

func IsValidContext

func IsValidContext(ctx context.Context) error

IsValidContext checks that the thrift context is not expired on cancelled. Returns nil if the context is still valid. Otherwise, returns the result of ctx.Err()

func IsWhitelistServiceTransientError added in v0.3.14

func IsWhitelistServiceTransientError(err error) bool

IsWhitelistServiceTransientError checks if the error is a transient error.

func MinInt32 added in v0.3.14

func MinInt32(a, b int32) int32

MinInt32 return smaller one of two inputs int32

func PrettyPrintHistory

func PrettyPrintHistory(history *workflow.History, logger bark.Logger)

PrettyPrintHistory prints history in human readable format

func SignalExternalWorkflowExecutionFailedCausePtr added in v0.3.6

func SignalExternalWorkflowExecutionFailedCausePtr(t s.SignalExternalWorkflowExecutionFailedCause) *s.SignalExternalWorkflowExecutionFailedCause

SignalExternalWorkflowExecutionFailedCausePtr makes a copy and returns the pointer to a SignalExternalWorkflowExecutionFailedCause.

func StringDefault added in v0.3.2

func StringDefault(v *string) string

StringDefault returns value if string pointer is set otherwise default value of string

func StringPtr

func StringPtr(v string) *string

StringPtr makes a copy and returns the pointer to a string.

func TDeserialize

func TDeserialize(msg thrift.TStruct, b []byte) (err error)

TDeserialize is used to deserialize []byte to thrift TStruct

func TDeserializeString

func TDeserializeString(msg thrift.TStruct, s string) (err error)

TDeserializeString is used to deserialize string to thrift TStruct

func TListDeserialize

func TListDeserialize(msgType reflect.Type, b []byte) (msgs []thrift.TStruct, err error)

TListDeserialize is used to deserialize []byte to list of thrift TStruct

func TListSerialize

func TListSerialize(msgs []thrift.TStruct) (b []byte, err error)

TListSerialize is used to serialize list of thrift TStruct to []byte

func TSerialize

func TSerialize(msg thrift.TStruct) (b []byte, err error)

TSerialize is used to serialize thrift TStruct to []byte

func TSerializeString

func TSerializeString(msg thrift.TStruct) (s string, err error)

TSerializeString is used to serialize thrift TStruct to string

func TaskListKindPtr added in v0.3.6

func TaskListKindPtr(t s.TaskListKind) *s.TaskListKind

TaskListKindPtr makes a copy and returns the pointer to a TaskListKind.

func TaskListPtr

func TaskListPtr(v s.TaskList) *s.TaskList

TaskListPtr makes a copy and returns the pointer to a TaskList.

func TaskListTypePtr added in v0.4.0

func TaskListTypePtr(t s.TaskListType) *s.TaskListType

TaskListTypePtr makes a copy and returns the pointer to a TaskListKind.

func TimeoutTypePtr added in v0.3.2

func TimeoutTypePtr(t s.TimeoutType) *s.TimeoutType

TimeoutTypePtr makes a copy and returns the pointer to a TimeoutType.

func Uint32Ptr

func Uint32Ptr(v uint32) *uint32

Uint32Ptr makes a copy and returns the pointer to a uint32.

func Uint64Ptr

func Uint64Ptr(v uint64) *uint64

Uint64Ptr makes a copy and returns the pointer to a uint64.

func ValidateCronSchedule added in v0.5.0

func ValidateCronSchedule(cronSchedule string) error

ValidateCronSchedule validates a cron schedule spec

func ValidateRetryPolicy added in v0.4.0

func ValidateRetryPolicy(policy *workflow.RetryPolicy) error

ValidateRetryPolicy validates a retry policy

func WorkflowIDToHistoryShard

func WorkflowIDToHistoryShard(workflowID string, numberOfShards int) int

WorkflowIDToHistoryShard is used to map workflowID to a shardID

func WorkflowTypePtr

func WorkflowTypePtr(t s.WorkflowType) *s.WorkflowType

WorkflowTypePtr makes a copy and returns the pointer to a WorkflowType.

Types

type ClientCache added in v0.5.0

type ClientCache interface {
	GetClientForKey(key string) (interface{}, error)
	GetClientForClientKey(clientKey string) (interface{}, error)
}

ClientCache store initialized clients

func NewClientCache added in v0.5.0

func NewClientCache(
	keyResolver keyResolver,
	clientProvider clientProvider,
) ClientCache

NewClientCache creates a new client cache based on membership

type Daemon

type Daemon interface {
	Start()
	Stop()
}

Daemon is the base interfaces implemented by background tasks within cherami

type EncodingType

type EncodingType string

EncodingType is an enum that represents various data encoding types

type EventTimeSource added in v0.3.14

type EventTimeSource struct {
	// contains filtered or unexported fields
}

EventTimeSource serves fake controlled time

func NewEventTimeSource added in v0.3.14

func NewEventTimeSource() *EventTimeSource

NewEventTimeSource returns a time source that servers fake controlled time

func (*EventTimeSource) Now added in v0.3.14

func (ts *EventTimeSource) Now() time.Time

Now return the fake current time

func (*EventTimeSource) Update added in v0.3.14

func (ts *EventTimeSource) Update(now time.Time) *EventTimeSource

Update update the fake current time

type PProfInitializer added in v0.3.5

type PProfInitializer interface {
	Start() error
}

PProfInitializer initialize the pprof based on config

type PriorityTokenBucket added in v0.4.0

type PriorityTokenBucket interface {
	// GetToken attempts to take count tokens from the
	// bucket with that priority. Priority 0 is highest.
	// Returns true on success, false
	// otherwise along with the duration for the next refill
	GetToken(priority, count int) (bool, time.Duration)
}

PriorityTokenBucket is the interface for rate limiter with priority

func NewFullPriorityTokenBucket added in v0.4.0

func NewFullPriorityTokenBucket(numOfPriority, rps int, timeSource TimeSource) PriorityTokenBucket

NewFullPriorityTokenBucket creates and returns a new priority token bucket with all bucket init with full tokens. With all buckets full, get tokens from low priority buckets won't be missed initially, but may caused bursts.

func NewPriorityTokenBucket added in v0.4.0

func NewPriorityTokenBucket(numOfPriority, rps int, timeSource TimeSource) PriorityTokenBucket

NewPriorityTokenBucket creates and returns a new token bucket rate limiter support priority. There are n buckets for n priorities. It replenishes the top priority bucket every 100 milliseconds, unused tokens flows to next bucket. The idea comes from Dual Token Bucket Algorithms. Thread safe.

@param numOfPriority

Number of priorities

@param rps

Desired rate per second

type QueryTaskToken added in v0.3.2

type QueryTaskToken struct {
	DomainID string `json:"domainId"`
	TaskList string `json:"taskList"`
	TaskID   string `json:"taskId"`
}

QueryTaskToken identifies a query task

type RPCFactory added in v0.3.2

type RPCFactory interface {
	CreateDispatcher() *yarpc.Dispatcher
	CreateDispatcherForOutbound(callerName, serviceName, hostName string) *yarpc.Dispatcher
}

RPCFactory Creates a dispatcher that knows how to transport requests.

type RealTimeSource added in v0.3.12

type RealTimeSource struct{}

RealTimeSource serves real wall-clock time

func NewRealTimeSource

func NewRealTimeSource() *RealTimeSource

NewRealTimeSource returns a time source that servers real wall clock time

func (*RealTimeSource) Now added in v0.3.12

func (ts *RealTimeSource) Now() time.Time

Now return the real current time

type TaskToken

type TaskToken struct {
	DomainID        string `json:"domainId"`
	WorkflowID      string `json:"workflowId"`
	RunID           string `json:"runId"`
	ScheduleID      int64  `json:"scheduleId"`
	ScheduleAttempt int64  `json:"scheduleAttempt"`
	ActivityID      string `json:"activityId"`
}

TaskToken identifies a task

type TaskTokenSerializer

type TaskTokenSerializer interface {
	Serialize(token *TaskToken) ([]byte, error)
	Deserialize(data []byte) (*TaskToken, error)
	SerializeQueryTaskToken(token *QueryTaskToken) ([]byte, error)
	DeserializeQueryTaskToken(data []byte) (*QueryTaskToken, error)
}

TaskTokenSerializer serializes task tokens

func NewJSONTaskTokenSerializer

func NewJSONTaskTokenSerializer() TaskTokenSerializer

NewJSONTaskTokenSerializer creates a new instance of TaskTokenSerializer

type TimeSource

type TimeSource interface {
	Now() time.Time
}

TimeSource is an interface for any entity that provides the current time. Its primarily used to mock out timesources in unit test

type TokenBucket

type TokenBucket interface {
	// TryConsume attempts to take count tokens from the
	// bucket. Returns true on success, false
	// otherwise along with the duration for the next refill
	TryConsume(count int) (bool, time.Duration)
	// Consume waits up to timeout duration to take count
	// tokens from the bucket. Returns true if count
	// tokens were acquired before timeout, false
	// otherwise
	Consume(count int, timeout time.Duration) bool
}

TokenBucket is the interface for any implementation of a token bucket rate limiter

func NewTokenBucket

func NewTokenBucket(rps int, timeSource TimeSource) TokenBucket

NewTokenBucket creates and returns a new token bucket rate limiter that replenishes the bucket every 100 milliseconds. Thread safe.

@param rps

Desired rate per second

Golang.org has an alternative implementation of the rate limiter. On benchmarking, golang's implementation was order of magnitude slower. In addition, it does a lot more than what we need. These are the benchmarks under different scenarios

BenchmarkTokenBucketParallel 50000000 40.7 ns/op BenchmarkGolangRateParallel 10000000 150 ns/op BenchmarkTokenBucketParallel-8 20000000 124 ns/op BenchmarkGolangRateParallel-8 10000000 208 ns/op BenchmarkTokenBucketParallel 50000000 37.8 ns/op BenchmarkGolangRateParallel 10000000 153 ns/op BenchmarkTokenBucketParallel-8 10000000 129 ns/op BenchmarkGolangRateParallel-8 10000000 208 ns/op

type TokenBucketFactory

type TokenBucketFactory interface {
	CreateTokenBucket(rps int, timeSource TimeSource) TokenBucket
}

TokenBucketFactory is an interface mainly used for injecting mock implementation of TokenBucket for unit testing

func NewTokenBucketFactory

func NewTokenBucketFactory() TokenBucketFactory

NewTokenBucketFactory creates an instance of factory used for creating TokenBucket instances

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL