Documentation ¶
Index ¶
- Constants
- Variables
- func ActivityTypePtr(v s.ActivityType) *s.ActivityType
- func AddSecondsToBaseTime(baseTimeInNanoSec int64, durationInSeconds int64) int64
- func AggregateYarpcOptions(ctx context.Context, opts ...yarpc.CallOption) []yarpc.CallOption
- func ArchivalStatusPtr(t s.ArchivalStatus) *s.ArchivalStatus
- func AwaitWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool
- func BoolDefault(v *bool) bool
- func BoolPtr(v bool) *bool
- func CancelExternalWorkflowExecutionFailedCausePtr(t s.CancelExternalWorkflowExecutionFailedCause) *s.CancelExternalWorkflowExecutionFailedCause
- func CheckEventBlobSizeLimit(actualSize, warnLimit, errorLimit int, domainID, workflowID, runID string, ...) error
- func ChildPolicyPtr(t s.ChildPolicy) *s.ChildPolicy
- func ChildWorkflowExecutionFailedCausePtr(t s.ChildWorkflowExecutionFailedCause) *s.ChildWorkflowExecutionFailedCause
- func CreateAdminServiceRetryPolicy() backoff.RetryPolicy
- func CreateBlobstoreClientRetryPolicy() backoff.RetryPolicy
- func CreateFrontendServiceRetryPolicy() backoff.RetryPolicy
- func CreateHistoryServiceRetryPolicy() backoff.RetryPolicy
- func CreateHistoryStartWorkflowRequest(domainID string, startRequest *workflow.StartWorkflowExecutionRequest) *h.StartWorkflowExecutionRequest
- func CreateKafkaOperationRetryPolicy() backoff.RetryPolicy
- func CreateMatchingPollForDecisionTaskResponse(historyResponse *h.RecordDecisionTaskStartedResponse, ...) *m.PollForDecisionTaskResponse
- func CreateMatchingServiceRetryPolicy() backoff.RetryPolicy
- func CreatePersistanceRetryPolicy() backoff.RetryPolicy
- func CreatePublicClientRetryPolicy() backoff.RetryPolicy
- func DecisionTaskFailedCausePtr(t s.DecisionTaskFailedCause) *s.DecisionTaskFailedCause
- func DecisionTypePtr(t s.DecisionType) *s.DecisionType
- func EventTypePtr(t s.EventType) *s.EventType
- func Float64Ptr(v float64) *float64
- func GenerateRandomString(n int) string
- func Int32Default(v *int32) int32
- func Int32Ptr(v int32) *int32
- func Int64Default(v *int64) int64
- func Int64Ptr(v int64) *int64
- func IntPtr(v int) *int
- func IsBlobstoreNonRetryableError(err error) bool
- func IsBlobstoreTransientError(err error) bool
- func IsKafkaTransientError(err error) bool
- func IsPersistenceTransientError(err error) bool
- func IsServiceNonRetryableError(err error) bool
- func IsServiceTransientError(err error) bool
- func IsValidContext(ctx context.Context) error
- func IsWhitelistServiceTransientError(err error) bool
- func MinInt32(a, b int32) int32
- func PrettyPrintHistory(history *workflow.History, logger bark.Logger)
- func SignalExternalWorkflowExecutionFailedCausePtr(t s.SignalExternalWorkflowExecutionFailedCause) *s.SignalExternalWorkflowExecutionFailedCause
- func StringDefault(v *string) string
- func StringPtr(v string) *string
- func TDeserialize(msg thrift.TStruct, b []byte) (err error)
- func TDeserializeString(msg thrift.TStruct, s string) (err error)
- func TListDeserialize(msgType reflect.Type, b []byte) (msgs []thrift.TStruct, err error)
- func TListSerialize(msgs []thrift.TStruct) (b []byte, err error)
- func TSerialize(msg thrift.TStruct) (b []byte, err error)
- func TSerializeString(msg thrift.TStruct) (s string, err error)
- func TaskListKindPtr(t s.TaskListKind) *s.TaskListKind
- func TaskListPtr(v s.TaskList) *s.TaskList
- func TaskListTypePtr(t s.TaskListType) *s.TaskListType
- func TimeoutTypePtr(t s.TimeoutType) *s.TimeoutType
- func Uint32Ptr(v uint32) *uint32
- func Uint64Ptr(v uint64) *uint64
- func ValidateCronSchedule(cronSchedule string) error
- func ValidateRetryPolicy(policy *workflow.RetryPolicy) error
- func WorkflowIDToHistoryShard(workflowID string, numberOfShards int) int
- func WorkflowTypePtr(t s.WorkflowType) *s.WorkflowType
- type ClientCache
- type Daemon
- type EncodingType
- type EventTimeSource
- type PProfInitializer
- type PriorityTokenBucket
- type QueryTaskToken
- type RPCFactory
- type RealTimeSource
- type TaskToken
- type TaskTokenSerializer
- type TimeSource
- type TokenBucket
- type TokenBucketFactory
Constants ¶
const ( // FirstEventID is the id of the first event in the history FirstEventID int64 = 1 // EmptyEventID is the id of the empty event EmptyEventID int64 = -23 // EmptyVersion is used as the default value for failover version when no value is provided EmptyVersion int64 = -24 // EndEventID is the id of the end event, here we use the int64 max EndEventID int64 = 1<<63 - 1 // BufferedEventID is the id of the buffered event BufferedEventID int64 = -123 // EmptyEventTaskID is uninitialized id of the task id within event EmptyEventTaskID int64 = -1234 // TransientEventID is the id of the transient event TransientEventID int64 = -124 // FirstBlobPageToken is the page token identifying the first blob for each history archival FirstBlobPageToken = 1 // LastBlobNextPageToken is the next page token on the last blob for each history archival LastBlobNextPageToken = -1 )
const ( // FrontendServiceName is the name of the frontend service FrontendServiceName = "cadence-frontend" // HistoryServiceName is the name of the history service HistoryServiceName = "cadence-history" // MatchingServiceName is the name of the matching service MatchingServiceName = "cadence-matching" // WorkerServiceName is the name of the worker service WorkerServiceName = "cadence-worker" )
const ( EncodingTypeJSON EncodingType = "json" EncodingTypeThriftRW = "thriftrw" EncodingTypeGob = "gob" EncodingTypeUnknown = "unknow" )
Data encoding types
const ( // GetHistoryWarnSizeLimit is the threshold for emitting warn log GetHistoryWarnSizeLimit = 500 * 1024 // Warn when size goes over 500KB // GetHistoryMaxPageSize is the max page size for get history GetHistoryMaxPageSize = 1000 )
const ( // DaemonStatusInitialized coroutine pool initialized DaemonStatusInitialized int32 = 0 // DaemonStatusStarted coroutine pool started DaemonStatusStarted int32 = 1 // DaemonStatusStopped coroutine pool stopped DaemonStatusStopped int32 = 2 )
const ( // LibraryVersionHeaderName refers to the name of the // tchannel / http header that contains the client // library version LibraryVersionHeaderName = "cadence-client-library-version" // FeatureVersionHeaderName refers to the name of the // tchannel / http header that contains the client // feature version // the feature version sent from client represents the // feature set of the cadence client library support. // This can be used for client capibility check, on // Cadence server, for backward compatibility FeatureVersionHeaderName = "cadence-client-feature-version" // ClientImplHeaderName refers to the name of the // header that contains the client implementation ClientImplHeaderName = "cadence-client-name" )
const ( // FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit FailureReasonCompleteResultExceedsLimit = "COMPLETE_RESULT_EXCEEDS_LIMIT" // FailureReasonFailureDetailsExceedsLimit is failureReason for failure details exceeds limit FailureReasonFailureDetailsExceedsLimit = "FAILURE_DETAILS_EXCEEDS_LIMIT" // FailureReasonCancelDetailsExceedsLimit is failureReason for cancel details exceeds limit FailureReasonCancelDetailsExceedsLimit = "CANCEL_DETAILS_EXCEEDS_LIMIT" //FailureReasonHeartbeatExceedsLimit is failureReason for heartbeat exceeds limit FailureReasonHeartbeatExceedsLimit = "HEARTBEAT_EXCEEDS_LIMIT" // FailureReasonDecisionBlobSizeExceedsLimit is the failureReason for decision blob exceeds size limit FailureReasonDecisionBlobSizeExceedsLimit = "DECISION_BLOB_SIZE_EXCEEDS_LIMIT" // TerminateReasonSizeExceedsLimit is reason to terminate workflow when history size exceed limit TerminateReasonSizeExceedsLimit = "HISTORY_SIZE_EXCEEDS_LIMIT" )
const MaxTaskTimeout = 31622400
MaxTaskTimeout is maximum task timeout allowed. 366 days in seconds
const NoRetryBackoff = time.Duration(-1)
NoRetryBackoff is used to represent backoff when no retry is needed
const (
// VisibilityAppName is used to find kafka topics and ES indexName for visibility
VisibilityAppName = "visibility"
)
Variables ¶
var ( // ErrBlobSizeExceedsLimit is error for event blob size exceeds limit ErrBlobSizeExceedsLimit = &workflow.BadRequestError{Message: "Blob data size exceeds limit."} )
Functions ¶
func ActivityTypePtr ¶
func ActivityTypePtr(v s.ActivityType) *s.ActivityType
ActivityTypePtr makes a copy and returns the pointer to a ActivityType.
func AddSecondsToBaseTime ¶
AddSecondsToBaseTime - Gets the UnixNano with given duration and base time.
func AggregateYarpcOptions ¶ added in v0.3.5
func AggregateYarpcOptions(ctx context.Context, opts ...yarpc.CallOption) []yarpc.CallOption
AggregateYarpcOptions aggregate the header information from context to existing yarpc call options
func ArchivalStatusPtr ¶ added in v0.5.0
func ArchivalStatusPtr(t s.ArchivalStatus) *s.ArchivalStatus
ArchivalStatusPtr makes a copy and returns the pointer to an ArchivalStatus.
func AwaitWaitGroup ¶
AwaitWaitGroup calls Wait on the given wait Returns true if the Wait() call succeeded before the timeout Returns false if the Wait() did not return before the timeout
func BoolDefault ¶ added in v0.3.2
BoolDefault returns value if bool pointer is set otherwise default value of bool
func CancelExternalWorkflowExecutionFailedCausePtr ¶ added in v0.3.2
func CancelExternalWorkflowExecutionFailedCausePtr(t s.CancelExternalWorkflowExecutionFailedCause) *s.CancelExternalWorkflowExecutionFailedCause
CancelExternalWorkflowExecutionFailedCausePtr makes a copy and returns the pointer to a CancelExternalWorkflowExecutionFailedCause.
func CheckEventBlobSizeLimit ¶ added in v0.5.0
func CheckEventBlobSizeLimit(actualSize, warnLimit, errorLimit int, domainID, workflowID, runID string, metricsClient metrics.Client, scope int, logger bark.Logger) error
CheckEventBlobSizeLimit checks if a blob data exceeds limits. It logs a warning if it exceeds warnLimit, and return ErrBlobSizeExceedsLimit if it exceeds errorLimit.
func ChildPolicyPtr ¶ added in v0.3.2
func ChildPolicyPtr(t s.ChildPolicy) *s.ChildPolicy
ChildPolicyPtr makes a copy and returns the pointer to a ChildPolicy.
func ChildWorkflowExecutionFailedCausePtr ¶ added in v0.3.2
func ChildWorkflowExecutionFailedCausePtr(t s.ChildWorkflowExecutionFailedCause) *s.ChildWorkflowExecutionFailedCause
ChildWorkflowExecutionFailedCausePtr makes a copy and returns the pointer to a ChildWorkflowExecutionFailedCause.
func CreateAdminServiceRetryPolicy ¶ added in v0.5.0
func CreateAdminServiceRetryPolicy() backoff.RetryPolicy
CreateAdminServiceRetryPolicy creates a retry policy for calls to matching service
func CreateBlobstoreClientRetryPolicy ¶ added in v0.5.2
func CreateBlobstoreClientRetryPolicy() backoff.RetryPolicy
CreateBlobstoreClientRetryPolicy creates a retry policy for blobstore client
func CreateFrontendServiceRetryPolicy ¶ added in v0.3.11
func CreateFrontendServiceRetryPolicy() backoff.RetryPolicy
CreateFrontendServiceRetryPolicy creates a retry policy for calls to frontend service
func CreateHistoryServiceRetryPolicy ¶
func CreateHistoryServiceRetryPolicy() backoff.RetryPolicy
CreateHistoryServiceRetryPolicy creates a retry policy for calls to history service
func CreateHistoryStartWorkflowRequest ¶ added in v0.4.0
func CreateHistoryStartWorkflowRequest(domainID string, startRequest *workflow.StartWorkflowExecutionRequest) *h.StartWorkflowExecutionRequest
CreateHistoryStartWorkflowRequest create a start workflow request for history
func CreateKafkaOperationRetryPolicy ¶ added in v0.5.2
func CreateKafkaOperationRetryPolicy() backoff.RetryPolicy
CreateKafkaOperationRetryPolicy creates a retry policy for kafka operation
func CreateMatchingPollForDecisionTaskResponse ¶ added in v0.3.12
func CreateMatchingPollForDecisionTaskResponse(historyResponse *h.RecordDecisionTaskStartedResponse, workflowExecution *workflow.WorkflowExecution, token []byte) *m.PollForDecisionTaskResponse
CreateMatchingPollForDecisionTaskResponse create response for matching's PollForDecisionTask
func CreateMatchingServiceRetryPolicy ¶ added in v0.5.0
func CreateMatchingServiceRetryPolicy() backoff.RetryPolicy
CreateMatchingServiceRetryPolicy creates a retry policy for calls to matching service
func CreatePersistanceRetryPolicy ¶
func CreatePersistanceRetryPolicy() backoff.RetryPolicy
CreatePersistanceRetryPolicy creates a retry policy for persistence layer operations
func CreatePublicClientRetryPolicy ¶ added in v0.5.2
func CreatePublicClientRetryPolicy() backoff.RetryPolicy
CreatePublicClientRetryPolicy creates a retry policy for calls to frontend service
func DecisionTaskFailedCausePtr ¶ added in v0.3.2
func DecisionTaskFailedCausePtr(t s.DecisionTaskFailedCause) *s.DecisionTaskFailedCause
DecisionTaskFailedCausePtr makes a copy and returns the pointer to a DecisionTaskFailedCause.
func DecisionTypePtr ¶
func DecisionTypePtr(t s.DecisionType) *s.DecisionType
DecisionTypePtr makes a copy and returns the pointer to a DecisionType.
func EventTypePtr ¶
EventTypePtr makes a copy and returns the pointer to a EventType.
func Float64Ptr ¶
Float64Ptr makes a copy and returns the pointer to an int64.
func GenerateRandomString ¶ added in v0.3.12
GenerateRandomString is used for generate test string
func Int32Default ¶ added in v0.3.2
Int32Default returns value if int32 pointer is set otherwise default value of int32
func Int64Default ¶ added in v0.3.2
Int64Default returns value if int64 pointer is set otherwise default value of int64
func IsBlobstoreNonRetryableError ¶ added in v0.5.2
IsBlobstoreNonRetryableError checks if the error is a non retryable error.
func IsBlobstoreTransientError ¶ added in v0.5.2
IsBlobstoreTransientError checks if the error is a retryable error.
func IsKafkaTransientError ¶ added in v0.5.2
IsKafkaTransientError check if the error is a transient kafka error
func IsPersistenceTransientError ¶
IsPersistenceTransientError checks if the error is a transient persistence error
func IsServiceNonRetryableError ¶
IsServiceNonRetryableError checks if the error is a non retryable error.
func IsServiceTransientError ¶ added in v0.3.11
IsServiceTransientError checks if the error is a retryable error.
func IsValidContext ¶
IsValidContext checks that the thrift context is not expired on cancelled. Returns nil if the context is still valid. Otherwise, returns the result of ctx.Err()
func IsWhitelistServiceTransientError ¶ added in v0.3.14
IsWhitelistServiceTransientError checks if the error is a transient error.
func PrettyPrintHistory ¶
PrettyPrintHistory prints history in human readable format
func SignalExternalWorkflowExecutionFailedCausePtr ¶ added in v0.3.6
func SignalExternalWorkflowExecutionFailedCausePtr(t s.SignalExternalWorkflowExecutionFailedCause) *s.SignalExternalWorkflowExecutionFailedCause
SignalExternalWorkflowExecutionFailedCausePtr makes a copy and returns the pointer to a SignalExternalWorkflowExecutionFailedCause.
func StringDefault ¶ added in v0.3.2
StringDefault returns value if string pointer is set otherwise default value of string
func TDeserialize ¶
TDeserialize is used to deserialize []byte to thrift TStruct
func TDeserializeString ¶
TDeserializeString is used to deserialize string to thrift TStruct
func TListDeserialize ¶
TListDeserialize is used to deserialize []byte to list of thrift TStruct
func TListSerialize ¶
TListSerialize is used to serialize list of thrift TStruct to []byte
func TSerialize ¶
TSerialize is used to serialize thrift TStruct to []byte
func TSerializeString ¶
TSerializeString is used to serialize thrift TStruct to string
func TaskListKindPtr ¶ added in v0.3.6
func TaskListKindPtr(t s.TaskListKind) *s.TaskListKind
TaskListKindPtr makes a copy and returns the pointer to a TaskListKind.
func TaskListPtr ¶
TaskListPtr makes a copy and returns the pointer to a TaskList.
func TaskListTypePtr ¶ added in v0.4.0
func TaskListTypePtr(t s.TaskListType) *s.TaskListType
TaskListTypePtr makes a copy and returns the pointer to a TaskListKind.
func TimeoutTypePtr ¶ added in v0.3.2
func TimeoutTypePtr(t s.TimeoutType) *s.TimeoutType
TimeoutTypePtr makes a copy and returns the pointer to a TimeoutType.
func ValidateCronSchedule ¶ added in v0.5.0
ValidateCronSchedule validates a cron schedule spec
func ValidateRetryPolicy ¶ added in v0.4.0
func ValidateRetryPolicy(policy *workflow.RetryPolicy) error
ValidateRetryPolicy validates a retry policy
func WorkflowIDToHistoryShard ¶
WorkflowIDToHistoryShard is used to map workflowID to a shardID
func WorkflowTypePtr ¶
func WorkflowTypePtr(t s.WorkflowType) *s.WorkflowType
WorkflowTypePtr makes a copy and returns the pointer to a WorkflowType.
Types ¶
type ClientCache ¶ added in v0.5.0
type ClientCache interface { GetClientForKey(key string) (interface{}, error) GetClientForClientKey(clientKey string) (interface{}, error) }
ClientCache store initialized clients
func NewClientCache ¶ added in v0.5.0
func NewClientCache( keyResolver keyResolver, clientProvider clientProvider, ) ClientCache
NewClientCache creates a new client cache based on membership
type Daemon ¶
type Daemon interface { Start() Stop() }
Daemon is the base interfaces implemented by background tasks within cherami
type EncodingType ¶
type EncodingType string
EncodingType is an enum that represents various data encoding types
type EventTimeSource ¶ added in v0.3.14
type EventTimeSource struct {
// contains filtered or unexported fields
}
EventTimeSource serves fake controlled time
func NewEventTimeSource ¶ added in v0.3.14
func NewEventTimeSource() *EventTimeSource
NewEventTimeSource returns a time source that servers fake controlled time
func (*EventTimeSource) Now ¶ added in v0.3.14
func (ts *EventTimeSource) Now() time.Time
Now return the fake current time
func (*EventTimeSource) Update ¶ added in v0.3.14
func (ts *EventTimeSource) Update(now time.Time) *EventTimeSource
Update update the fake current time
type PProfInitializer ¶ added in v0.3.5
type PProfInitializer interface {
Start() error
}
PProfInitializer initialize the pprof based on config
type PriorityTokenBucket ¶ added in v0.4.0
type PriorityTokenBucket interface { // GetToken attempts to take count tokens from the // bucket with that priority. Priority 0 is highest. // Returns true on success, false // otherwise along with the duration for the next refill GetToken(priority, count int) (bool, time.Duration) }
PriorityTokenBucket is the interface for rate limiter with priority
func NewFullPriorityTokenBucket ¶ added in v0.4.0
func NewFullPriorityTokenBucket(numOfPriority, rps int, timeSource TimeSource) PriorityTokenBucket
NewFullPriorityTokenBucket creates and returns a new priority token bucket with all bucket init with full tokens. With all buckets full, get tokens from low priority buckets won't be missed initially, but may caused bursts.
func NewPriorityTokenBucket ¶ added in v0.4.0
func NewPriorityTokenBucket(numOfPriority, rps int, timeSource TimeSource) PriorityTokenBucket
NewPriorityTokenBucket creates and returns a new token bucket rate limiter support priority. There are n buckets for n priorities. It replenishes the top priority bucket every 100 milliseconds, unused tokens flows to next bucket. The idea comes from Dual Token Bucket Algorithms. Thread safe.
@param numOfPriority
Number of priorities
@param rps
Desired rate per second
type QueryTaskToken ¶ added in v0.3.2
type QueryTaskToken struct { DomainID string `json:"domainId"` TaskList string `json:"taskList"` TaskID string `json:"taskId"` }
QueryTaskToken identifies a query task
type RPCFactory ¶ added in v0.3.2
type RPCFactory interface { CreateDispatcher() *yarpc.Dispatcher CreateDispatcherForOutbound(callerName, serviceName, hostName string) *yarpc.Dispatcher }
RPCFactory Creates a dispatcher that knows how to transport requests.
type RealTimeSource ¶ added in v0.3.12
type RealTimeSource struct{}
RealTimeSource serves real wall-clock time
func NewRealTimeSource ¶
func NewRealTimeSource() *RealTimeSource
NewRealTimeSource returns a time source that servers real wall clock time
func (*RealTimeSource) Now ¶ added in v0.3.12
func (ts *RealTimeSource) Now() time.Time
Now return the real current time
type TaskToken ¶
type TaskToken struct { DomainID string `json:"domainId"` WorkflowID string `json:"workflowId"` RunID string `json:"runId"` ScheduleID int64 `json:"scheduleId"` ScheduleAttempt int64 `json:"scheduleAttempt"` ActivityID string `json:"activityId"` }
TaskToken identifies a task
type TaskTokenSerializer ¶
type TaskTokenSerializer interface { Serialize(token *TaskToken) ([]byte, error) Deserialize(data []byte) (*TaskToken, error) SerializeQueryTaskToken(token *QueryTaskToken) ([]byte, error) DeserializeQueryTaskToken(data []byte) (*QueryTaskToken, error) }
TaskTokenSerializer serializes task tokens
func NewJSONTaskTokenSerializer ¶
func NewJSONTaskTokenSerializer() TaskTokenSerializer
NewJSONTaskTokenSerializer creates a new instance of TaskTokenSerializer
type TimeSource ¶
TimeSource is an interface for any entity that provides the current time. Its primarily used to mock out timesources in unit test
type TokenBucket ¶
type TokenBucket interface { // TryConsume attempts to take count tokens from the // bucket. Returns true on success, false // otherwise along with the duration for the next refill TryConsume(count int) (bool, time.Duration) // Consume waits up to timeout duration to take count // tokens from the bucket. Returns true if count // tokens were acquired before timeout, false // otherwise Consume(count int, timeout time.Duration) bool }
TokenBucket is the interface for any implementation of a token bucket rate limiter
func NewTokenBucket ¶
func NewTokenBucket(rps int, timeSource TimeSource) TokenBucket
NewTokenBucket creates and returns a new token bucket rate limiter that replenishes the bucket every 100 milliseconds. Thread safe.
@param rps
Desired rate per second
Golang.org has an alternative implementation of the rate limiter. On benchmarking, golang's implementation was order of magnitude slower. In addition, it does a lot more than what we need. These are the benchmarks under different scenarios
BenchmarkTokenBucketParallel 50000000 40.7 ns/op BenchmarkGolangRateParallel 10000000 150 ns/op BenchmarkTokenBucketParallel-8 20000000 124 ns/op BenchmarkGolangRateParallel-8 10000000 208 ns/op BenchmarkTokenBucketParallel 50000000 37.8 ns/op BenchmarkGolangRateParallel 10000000 153 ns/op BenchmarkTokenBucketParallel-8 10000000 129 ns/op BenchmarkGolangRateParallel-8 10000000 208 ns/op
type TokenBucketFactory ¶
type TokenBucketFactory interface {
CreateTokenBucket(rps int, timeSource TimeSource) TokenBucket
}
TokenBucketFactory is an interface mainly used for injecting mock implementation of TokenBucket for unit testing
func NewTokenBucketFactory ¶
func NewTokenBucketFactory() TokenBucketFactory
NewTokenBucketFactory creates an instance of factory used for creating TokenBucket instances