Documentation ¶
Index ¶
- Constants
- Variables
- func ActivityTypePtr(v s.ActivityType) *s.ActivityType
- func AddSecondsToBaseTime(baseTimeInNanoSec int64, durationInSeconds int64) int64
- func AggregateYarpcOptions(ctx context.Context, opts ...yarpc.CallOption) []yarpc.CallOption
- func ArchivalStatusPtr(t s.ArchivalStatus) *s.ArchivalStatus
- func AwaitWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool
- func BoolDefault(v *bool) bool
- func BoolPtr(v bool) *bool
- func CancelExternalWorkflowExecutionFailedCausePtr(t s.CancelExternalWorkflowExecutionFailedCause) *s.CancelExternalWorkflowExecutionFailedCause
- func CheckEventBlobSizeLimit(actualSize, warnLimit, errorLimit int, domainID, workflowID, runID string, ...) error
- func ChildPolicyPtr(t s.ChildPolicy) *s.ChildPolicy
- func ChildWorkflowExecutionFailedCausePtr(t s.ChildWorkflowExecutionFailedCause) *s.ChildWorkflowExecutionFailedCause
- func ClientArchivalStatusPtr(t shared.ArchivalStatus) *shared.ArchivalStatus
- func CreateAdminServiceRetryPolicy() backoff.RetryPolicy
- func CreateFrontendServiceRetryPolicy() backoff.RetryPolicy
- func CreateHistoryServiceRetryPolicy() backoff.RetryPolicy
- func CreateHistoryStartWorkflowRequest(domainID string, startRequest *workflow.StartWorkflowExecutionRequest) *h.StartWorkflowExecutionRequest
- func CreateKafkaOperationRetryPolicy() backoff.RetryPolicy
- func CreateMatchingPollForDecisionTaskResponse(historyResponse *h.RecordDecisionTaskStartedResponse, ...) *m.PollForDecisionTaskResponse
- func CreateMatchingServiceRetryPolicy() backoff.RetryPolicy
- func CreatePersistanceRetryPolicy() backoff.RetryPolicy
- func DecisionTaskFailedCausePtr(t s.DecisionTaskFailedCause) *s.DecisionTaskFailedCause
- func DecisionTypePtr(t s.DecisionType) *s.DecisionType
- func EventTypePtr(t s.EventType) *s.EventType
- func Float64Ptr(v float64) *float64
- func GenerateRandomString(n int) string
- func GetSizeOfMapStringToByteArray(input map[string][]byte) int
- func Int16Ptr(v int16) *int16
- func Int32Default(v *int32) int32
- func Int32Ptr(v int32) *int32
- func Int64Default(v *int64) int64
- func Int64Ptr(v int64) *int64
- func IntPtr(v int) *int
- func IsKafkaTransientError(err error) bool
- func IsPersistenceTransientError(err error) bool
- func IsServiceNonRetryableError(err error) bool
- func IsServiceTransientError(err error) bool
- func IsValidContext(ctx context.Context) error
- func IsWhitelistServiceTransientError(err error) bool
- func MinInt(a, b int) int
- func MinInt32(a, b int32) int32
- func PrettyPrintHistory(history *workflow.History, logger log.Logger)
- func SignalExternalWorkflowExecutionFailedCausePtr(t s.SignalExternalWorkflowExecutionFailedCause) *s.SignalExternalWorkflowExecutionFailedCause
- func StringDefault(v *string) string
- func StringPtr(v string) *string
- func TDeserialize(msg thrift.TStruct, b []byte) (err error)
- func TDeserializeString(msg thrift.TStruct, s string) (err error)
- func TListDeserialize(msgType reflect.Type, b []byte) (msgs []thrift.TStruct, err error)
- func TListSerialize(msgs []thrift.TStruct) (b []byte, err error)
- func TSerialize(msg thrift.TStruct) (b []byte, err error)
- func TSerializeString(msg thrift.TStruct) (s string, err error)
- func TaskListKindPtr(t s.TaskListKind) *s.TaskListKind
- func TaskListPtr(v s.TaskList) *s.TaskList
- func TaskListTypePtr(t s.TaskListType) *s.TaskListType
- func TimeNowNanosPtr() *int64
- func TimeoutTypePtr(t s.TimeoutType) *s.TimeoutType
- func Uint32Ptr(v uint32) *uint32
- func Uint64Ptr(v uint64) *uint64
- func ValidateLongPollContextTimeout(ctx context.Context, handlerName string, logger log.Logger) error
- func ValidateRetryPolicy(policy *workflow.RetryPolicy) error
- func WorkflowIDToHistoryShard(workflowID string, numberOfShards int) int
- func WorkflowTypePtr(t s.WorkflowType) *s.WorkflowType
- type ClientCache
- type Daemon
- type EncodingType
- type PProfInitializer
- type QueryTaskToken
- type RPCFactory
- type TaskToken
- type TaskTokenSerializer
Constants ¶
const ( // FirstEventID is the id of the first event in the history FirstEventID int64 = 1 // EmptyEventID is the id of the empty event EmptyEventID int64 = -23 // EmptyVersion is used as the default value for failover version when no value is provided EmptyVersion int64 = -24 // EndEventID is the id of the end event, here we use the int64 max EndEventID int64 = 1<<63 - 1 // BufferedEventID is the id of the buffered event BufferedEventID int64 = -123 // EmptyEventTaskID is uninitialized id of the task id within event EmptyEventTaskID int64 = -1234 // TransientEventID is the id of the transient event TransientEventID int64 = -124 // FirstBlobPageToken is the page token identifying the first blob for each history archival FirstBlobPageToken = 1 // LastBlobNextPageToken is the next page token on the last blob for each history archival LastBlobNextPageToken = -1 )
const ( // FrontendServiceName is the name of the frontend service FrontendServiceName = "cadence-frontend" // HistoryServiceName is the name of the history service HistoryServiceName = "cadence-history" // MatchingServiceName is the name of the matching service MatchingServiceName = "cadence-matching" // WorkerServiceName is the name of the worker service WorkerServiceName = "cadence-worker" )
const ( EncodingTypeJSON EncodingType = "json" EncodingTypeThriftRW = "thriftrw" EncodingTypeGob = "gob" EncodingTypeUnknown = "unknow" EncodingTypeEmpty = "" )
Data encoding types
const ( // GetHistoryWarnSizeLimit is the threshold for emitting warn log GetHistoryWarnSizeLimit = 500 * 1024 // Warn when size goes over 500KB // GetHistoryMaxPageSize is the max page size for get history GetHistoryMaxPageSize = 1000 )
const ( // SystemDomainName is domain name for all cadence system workflows SystemDomainName = "cadence-system" SystemDomainID = "32049b68-7872-4094-8e63-d0dd59896a83" SystemDomainRetentionDays = 7 )
const ( // MinLongPollTimeout is the minimum context timeout for long poll API, below which // the request won't be processed MinLongPollTimeout = time.Second * 2 // CriticalLongPollTimeout is a threshold for the context timeout passed into long poll API, // below which a warning will be logged CriticalLongPollTimeout = time.Second * 20 )
const ( // DaemonStatusInitialized coroutine pool initialized DaemonStatusInitialized int32 = 0 // DaemonStatusStarted coroutine pool started DaemonStatusStarted int32 = 1 // DaemonStatusStopped coroutine pool stopped DaemonStatusStopped int32 = 2 )
const ( // LibraryVersionHeaderName refers to the name of the // tchannel / http header that contains the client // library version LibraryVersionHeaderName = "cadence-client-library-version" // FeatureVersionHeaderName refers to the name of the // tchannel / http header that contains the client // feature version // the feature version sent from client represents the // feature set of the cadence client library support. // This can be used for client capibility check, on // Cadence server, for backward compatibility FeatureVersionHeaderName = "cadence-client-feature-version" // ClientImplHeaderName refers to the name of the // header that contains the client implementation ClientImplHeaderName = "cadence-client-name" )
const ( // FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit FailureReasonCompleteResultExceedsLimit = "COMPLETE_RESULT_EXCEEDS_LIMIT" // FailureReasonFailureDetailsExceedsLimit is failureReason for failure details exceeds limit FailureReasonFailureDetailsExceedsLimit = "FAILURE_DETAILS_EXCEEDS_LIMIT" // FailureReasonCancelDetailsExceedsLimit is failureReason for cancel details exceeds limit FailureReasonCancelDetailsExceedsLimit = "CANCEL_DETAILS_EXCEEDS_LIMIT" //FailureReasonHeartbeatExceedsLimit is failureReason for heartbeat exceeds limit FailureReasonHeartbeatExceedsLimit = "HEARTBEAT_EXCEEDS_LIMIT" // FailureReasonDecisionBlobSizeExceedsLimit is the failureReason for decision blob exceeds size limit FailureReasonDecisionBlobSizeExceedsLimit = "DECISION_BLOB_SIZE_EXCEEDS_LIMIT" // TerminateReasonSizeExceedsLimit is reason to terminate workflow when history size or count exceed limit TerminateReasonSizeExceedsLimit = "HISTORY_EXCEEDS_LIMIT" )
const MaxTaskTimeout = 31622400
MaxTaskTimeout is maximum task timeout allowed. 366 days in seconds
const NoRetryBackoff = cron.NoBackoff
NoRetryBackoff is used to represent backoff when no retry is needed
const (
// VisibilityAppName is used to find kafka topics and ES indexName for visibility
VisibilityAppName = "visibility"
)
Variables ¶
var ( // ErrBlobSizeExceedsLimit is error for event blob size exceeds limit ErrBlobSizeExceedsLimit = &workflow.BadRequestError{Message: "Blob data size exceeds limit."} // ErrContextTimeoutTooShort is error for setting a very short context timeout when calling a long poll API ErrContextTimeoutTooShort = &workflow.BadRequestError{Message: "Context timeout is too short."} // ErrContextTimeoutNotSet is error for not setting a context timeout when calling a long poll API ErrContextTimeoutNotSet = &workflow.BadRequestError{Message: "Context timeout is not set."} )
Functions ¶
func ActivityTypePtr ¶
func ActivityTypePtr(v s.ActivityType) *s.ActivityType
ActivityTypePtr makes a copy and returns the pointer to a ActivityType.
func AddSecondsToBaseTime ¶
AddSecondsToBaseTime - Gets the UnixNano with given duration and base time.
func AggregateYarpcOptions ¶ added in v0.3.5
func AggregateYarpcOptions(ctx context.Context, opts ...yarpc.CallOption) []yarpc.CallOption
AggregateYarpcOptions aggregate the header information from context to existing yarpc call options
func ArchivalStatusPtr ¶ added in v0.5.0
func ArchivalStatusPtr(t s.ArchivalStatus) *s.ArchivalStatus
ArchivalStatusPtr makes a copy and returns the pointer to an ArchivalStatus.
func AwaitWaitGroup ¶
AwaitWaitGroup calls Wait on the given wait Returns true if the Wait() call succeeded before the timeout Returns false if the Wait() did not return before the timeout
func BoolDefault ¶ added in v0.3.2
BoolDefault returns value if bool pointer is set otherwise default value of bool
func CancelExternalWorkflowExecutionFailedCausePtr ¶ added in v0.3.2
func CancelExternalWorkflowExecutionFailedCausePtr(t s.CancelExternalWorkflowExecutionFailedCause) *s.CancelExternalWorkflowExecutionFailedCause
CancelExternalWorkflowExecutionFailedCausePtr makes a copy and returns the pointer to a CancelExternalWorkflowExecutionFailedCause.
func CheckEventBlobSizeLimit ¶ added in v0.5.0
func CheckEventBlobSizeLimit(actualSize, warnLimit, errorLimit int, domainID, workflowID, runID string, scope metrics.Scope, logger log.Logger) error
CheckEventBlobSizeLimit checks if a blob data exceeds limits. It logs a warning if it exceeds warnLimit, and return ErrBlobSizeExceedsLimit if it exceeds errorLimit.
func ChildPolicyPtr ¶ added in v0.3.2
func ChildPolicyPtr(t s.ChildPolicy) *s.ChildPolicy
ChildPolicyPtr makes a copy and returns the pointer to a ChildPolicy.
func ChildWorkflowExecutionFailedCausePtr ¶ added in v0.3.2
func ChildWorkflowExecutionFailedCausePtr(t s.ChildWorkflowExecutionFailedCause) *s.ChildWorkflowExecutionFailedCause
ChildWorkflowExecutionFailedCausePtr makes a copy and returns the pointer to a ChildWorkflowExecutionFailedCause.
func ClientArchivalStatusPtr ¶ added in v0.5.7
func ClientArchivalStatusPtr(t shared.ArchivalStatus) *shared.ArchivalStatus
ClientArchivalStatusPtr makes a copy and returns the pointer to a client ArchivalStatus.
func CreateAdminServiceRetryPolicy ¶ added in v0.5.0
func CreateAdminServiceRetryPolicy() backoff.RetryPolicy
CreateAdminServiceRetryPolicy creates a retry policy for calls to matching service
func CreateFrontendServiceRetryPolicy ¶ added in v0.3.11
func CreateFrontendServiceRetryPolicy() backoff.RetryPolicy
CreateFrontendServiceRetryPolicy creates a retry policy for calls to frontend service
func CreateHistoryServiceRetryPolicy ¶
func CreateHistoryServiceRetryPolicy() backoff.RetryPolicy
CreateHistoryServiceRetryPolicy creates a retry policy for calls to history service
func CreateHistoryStartWorkflowRequest ¶ added in v0.4.0
func CreateHistoryStartWorkflowRequest(domainID string, startRequest *workflow.StartWorkflowExecutionRequest) *h.StartWorkflowExecutionRequest
CreateHistoryStartWorkflowRequest create a start workflow request for history
func CreateKafkaOperationRetryPolicy ¶ added in v0.5.2
func CreateKafkaOperationRetryPolicy() backoff.RetryPolicy
CreateKafkaOperationRetryPolicy creates a retry policy for kafka operation
func CreateMatchingPollForDecisionTaskResponse ¶ added in v0.3.12
func CreateMatchingPollForDecisionTaskResponse(historyResponse *h.RecordDecisionTaskStartedResponse, workflowExecution *workflow.WorkflowExecution, token []byte) *m.PollForDecisionTaskResponse
CreateMatchingPollForDecisionTaskResponse create response for matching's PollForDecisionTask
func CreateMatchingServiceRetryPolicy ¶ added in v0.5.0
func CreateMatchingServiceRetryPolicy() backoff.RetryPolicy
CreateMatchingServiceRetryPolicy creates a retry policy for calls to matching service
func CreatePersistanceRetryPolicy ¶
func CreatePersistanceRetryPolicy() backoff.RetryPolicy
CreatePersistanceRetryPolicy creates a retry policy for persistence layer operations
func DecisionTaskFailedCausePtr ¶ added in v0.3.2
func DecisionTaskFailedCausePtr(t s.DecisionTaskFailedCause) *s.DecisionTaskFailedCause
DecisionTaskFailedCausePtr makes a copy and returns the pointer to a DecisionTaskFailedCause.
func DecisionTypePtr ¶
func DecisionTypePtr(t s.DecisionType) *s.DecisionType
DecisionTypePtr makes a copy and returns the pointer to a DecisionType.
func EventTypePtr ¶
EventTypePtr makes a copy and returns the pointer to a EventType.
func Float64Ptr ¶
Float64Ptr makes a copy and returns the pointer to an int64.
func GenerateRandomString ¶ added in v0.3.12
GenerateRandomString is used for generate test string
func GetSizeOfMapStringToByteArray ¶ added in v0.5.7
GetSizeOfMapStringToByteArray get size of map[string][]byte
func Int32Default ¶ added in v0.3.2
Int32Default returns value if int32 pointer is set otherwise default value of int32
func Int64Default ¶ added in v0.3.2
Int64Default returns value if int64 pointer is set otherwise default value of int64
func IsKafkaTransientError ¶ added in v0.5.2
IsKafkaTransientError check if the error is a transient kafka error
func IsPersistenceTransientError ¶
IsPersistenceTransientError checks if the error is a transient persistence error
func IsServiceNonRetryableError ¶
IsServiceNonRetryableError checks if the error is a non retryable error.
func IsServiceTransientError ¶ added in v0.3.11
IsServiceTransientError checks if the error is a retryable error.
func IsValidContext ¶
IsValidContext checks that the thrift context is not expired on cancelled. Returns nil if the context is still valid. Otherwise, returns the result of ctx.Err()
func IsWhitelistServiceTransientError ¶ added in v0.3.14
IsWhitelistServiceTransientError checks if the error is a transient error.
func PrettyPrintHistory ¶
PrettyPrintHistory prints history in human readable format
func SignalExternalWorkflowExecutionFailedCausePtr ¶ added in v0.3.6
func SignalExternalWorkflowExecutionFailedCausePtr(t s.SignalExternalWorkflowExecutionFailedCause) *s.SignalExternalWorkflowExecutionFailedCause
SignalExternalWorkflowExecutionFailedCausePtr makes a copy and returns the pointer to a SignalExternalWorkflowExecutionFailedCause.
func StringDefault ¶ added in v0.3.2
StringDefault returns value if string pointer is set otherwise default value of string
func TDeserialize ¶
TDeserialize is used to deserialize []byte to thrift TStruct
func TDeserializeString ¶
TDeserializeString is used to deserialize string to thrift TStruct
func TListDeserialize ¶
TListDeserialize is used to deserialize []byte to list of thrift TStruct
func TListSerialize ¶
TListSerialize is used to serialize list of thrift TStruct to []byte
func TSerialize ¶
TSerialize is used to serialize thrift TStruct to []byte
func TSerializeString ¶
TSerializeString is used to serialize thrift TStruct to string
func TaskListKindPtr ¶ added in v0.3.6
func TaskListKindPtr(t s.TaskListKind) *s.TaskListKind
TaskListKindPtr makes a copy and returns the pointer to a TaskListKind.
func TaskListPtr ¶
TaskListPtr makes a copy and returns the pointer to a TaskList.
func TaskListTypePtr ¶ added in v0.4.0
func TaskListTypePtr(t s.TaskListType) *s.TaskListType
TaskListTypePtr makes a copy and returns the pointer to a TaskListKind.
func TimeNowNanosPtr ¶ added in v0.5.7
func TimeNowNanosPtr() *int64
TimeNowNanosPtr returns an int64 ptr to current time in unix nanos
func TimeoutTypePtr ¶ added in v0.3.2
func TimeoutTypePtr(t s.TimeoutType) *s.TimeoutType
TimeoutTypePtr makes a copy and returns the pointer to a TimeoutType.
func ValidateLongPollContextTimeout ¶ added in v0.5.7
func ValidateLongPollContextTimeout(ctx context.Context, handlerName string, logger log.Logger) error
ValidateLongPollContextTimeout check if the context timeout for a long poll handler is too short or below a normal value. If the timeout is not set or too short, it logs an error, and return ErrContextTimeoutNotSet or ErrContextTimeoutTooShort accordingly. If the timeout is only below a normal value, it just logs an info and return nil.
func ValidateRetryPolicy ¶ added in v0.4.0
func ValidateRetryPolicy(policy *workflow.RetryPolicy) error
ValidateRetryPolicy validates a retry policy
func WorkflowIDToHistoryShard ¶
WorkflowIDToHistoryShard is used to map workflowID to a shardID
func WorkflowTypePtr ¶
func WorkflowTypePtr(t s.WorkflowType) *s.WorkflowType
WorkflowTypePtr makes a copy and returns the pointer to a WorkflowType.
Types ¶
type ClientCache ¶ added in v0.5.0
type ClientCache interface { GetClientForKey(key string) (interface{}, error) GetClientForClientKey(clientKey string) (interface{}, error) }
ClientCache store initialized clients
func NewClientCache ¶ added in v0.5.0
func NewClientCache( keyResolver keyResolver, clientProvider clientProvider, ) ClientCache
NewClientCache creates a new client cache based on membership
type Daemon ¶
type Daemon interface { Start() Stop() }
Daemon is the base interfaces implemented by background tasks within cherami
type EncodingType ¶
type EncodingType string
EncodingType is an enum that represents various data encoding types
type PProfInitializer ¶ added in v0.3.5
type PProfInitializer interface {
Start() error
}
PProfInitializer initialize the pprof based on config
type QueryTaskToken ¶ added in v0.3.2
type QueryTaskToken struct { DomainID string `json:"domainId"` TaskList string `json:"taskList"` TaskID string `json:"taskId"` }
QueryTaskToken identifies a query task
type RPCFactory ¶ added in v0.3.2
type RPCFactory interface { CreateDispatcher() *yarpc.Dispatcher CreateDispatcherForOutbound(callerName, serviceName, hostName string) *yarpc.Dispatcher }
RPCFactory Creates a dispatcher that knows how to transport requests.
type TaskToken ¶
type TaskToken struct { DomainID string `json:"domainId"` WorkflowID string `json:"workflowId"` RunID string `json:"runId"` ScheduleID int64 `json:"scheduleId"` ScheduleAttempt int64 `json:"scheduleAttempt"` ActivityID string `json:"activityId"` }
TaskToken identifies a task
type TaskTokenSerializer ¶
type TaskTokenSerializer interface { Serialize(token *TaskToken) ([]byte, error) Deserialize(data []byte) (*TaskToken, error) SerializeQueryTaskToken(token *QueryTaskToken) ([]byte, error) DeserializeQueryTaskToken(data []byte) (*QueryTaskToken, error) }
TaskTokenSerializer serializes task tokens
func NewJSONTaskTokenSerializer ¶
func NewJSONTaskTokenSerializer() TaskTokenSerializer
NewJSONTaskTokenSerializer creates a new instance of TaskTokenSerializer