Documentation
¶
Index ¶
- Constants
- func ApecsTxnCurrentStep(txn *rkcypb.ApecsTxn) *rkcypb.ApecsTxn_Step
- func BuildFullTopicName(platformName string, environment string, concernName string, ...) string
- func BuildTopicName(topicNamePrefix string, name string, generation int32) string
- func BuildTopicNamePrefix(platformName string, environment string, concernName string, ...) string
- func BytesToInt(arr []byte) int
- func ComplexConfigUnmarshal(msgType string, b []byte) (proto.Message, error)
- func ComplexConfigUnmarshalJson(msgType string, b []byte) (proto.Message, error)
- func ConfigToJson(conf *Config) (string, error)
- func ConfigTopic(platformName string, environment string) string
- func ConsumersTopic(platformName string, environment string) string
- func Contains(slice []string, item string) bool
- func CreatePlatformTopics(ctx context.Context, strmprov StreamProvider, platform string, ...) error
- func DecodeInstance(cncHdlrs ConcernHandlers, concern string, payload64 string)
- func GetDirective(msg *kafka.Message) rkcypb.Directive
- func GetTraceId(msg *kafka.Message) string
- func GetTraceParent(msg *kafka.Message) string
- func IntToBytes(num int) []byte
- func IsACETopic(topic string) bool
- func IsInstanceStoreStep(step *rkcypb.ApecsTxn_Step) bool
- func IsKeylessStep(step *rkcypb.ApecsTxn_Step) bool
- func IsPackedPayload(payload []byte) bool
- func IsPlatformCommand(cmd string) bool
- func IsReservedCommand(cmd string) bool
- func IsStorageSystem(system rkcypb.System) bool
- func IsTxnProhibitedCommand(cmd string) bool
- func IsValidName(name string) bool
- func LogProto(msg proto.Message)
- func LogResult(rslt *rkcypb.ApecsTxn_Step_Result, sev rkcypb.Severity, format string, ...)
- func LogResultDebug(rslt *rkcypb.ApecsTxn_Step_Result, format string, args ...interface{})
- func LogResultError(rslt *rkcypb.ApecsTxn_Step_Result, format string, args ...interface{})
- func LogResultInfo(rslt *rkcypb.ApecsTxn_Step_Result, format string, args ...interface{})
- func LogResultWarn(rslt *rkcypb.ApecsTxn_Step_Result, format string, args ...interface{})
- func Maxi(x, y int) int
- func Maxi32(x, y int32) int32
- func Maxi64(x, y int64) int64
- func Mini(x, y int) int
- func Mini32(x, y int32) int32
- func Mini64(x, y int64) int64
- func NewApecsTxn(txnId string, assocTxn *rkcypb.AssocTxn, respTarget *rkcypb.TopicTarget, ...) (*rkcypb.ApecsTxn, error)
- func NewKafkaMessage(topic *string, partition int32, value proto.Message, ...) (*kafka.Message, error)
- func NewSpanId() string
- func NewTraceId() string
- func OffsetGT(lhs *rkcypb.CompoundOffset, rhs *rkcypb.CompoundOffset) bool
- func OffsetGTE(lhs *rkcypb.CompoundOffset, rhs *rkcypb.CompoundOffset) bool
- func PackPayloads(payload0 []byte, payload1 []byte) []byte
- func ParsePayload(payload []byte) ([]byte, []byte, error)
- func PlatformTopic(platformName string, environment string) string
- func PrepLogging()
- func PrintKafkaLogs(ctx context.Context, kafkaLogCh <-chan kafka.LogEvent)
- func ProducersTopic(platformName string, environment string) string
- func ProgKey(prog *rkcypb.Program) string
- func RegisterComplexConfigHandler(msgType string, handler ComplexConfigHandler)
- func RegisterGlobalConcernHandlerNewFunc(newCncHdlr func() ConcernHandler)
- func RegisterHashFunc(name string, hashFunc HashFunc)
- func StandardHeaders(directive rkcypb.Directive, traceParent string) []kafka.Header
- func StepSystemName(step *rkcypb.ApecsTxn_Step) string
- func StorageInitNoop(ctx context.Context, wg *sync.WaitGroup, config map[string]string) error
- func TraceIdFromTraceParent(traceParent string) string
- func TraceParentIsValid(traceParent string) bool
- func TraceParentParts(traceParent string) []string
- func TxnDirectionName(txn *rkcypb.ApecsTxn) string
- func UncommittedGroupName(topic string, partition int) string
- func UncommittedGroupNameAllPartitions(topic string) string
- func UnpackPayloads(packed []byte) ([]byte, []byte, error)
- func UpdateTopics(ctx context.Context, strmprov StreamProvider, platDef *rkcypb.PlatformDef) error
- func ValidateTxn(txn *Txn) error
- type AdminClient
- type ClientCode
- func (clientCode *ClientCode) AddCrudHandler(concern string, storageType string, handler interface{})
- func (clientCode *ClientCode) AddLogicHandler(concern string, handler interface{})
- func (clientCode *ClientCode) AddStorageInit(storageType string, storageInit StorageInit)
- func (clientCode *ClientCode) UpdateStorageTargets(storageTargets map[string]*rkcypb.StorageTarget)
- type ComplexConfigHandler
- type ConcernHandler
- type ConcernHandlers
- func (concernHandlers ConcernHandlers) DecodeArgPayload(ctx context.Context, concern string, system rkcypb.System, command string, ...) (*ResultProto, ConcernHandler, error)
- func (concernHandlers ConcernHandlers) DecodeArgPayload64(ctx context.Context, concern string, system rkcypb.System, command string, ...) (*ResultProto, ConcernHandler, error)
- func (concernHandlers ConcernHandlers) DecodeArgPayload64Json(ctx context.Context, concern string, system rkcypb.System, command string, ...) ([]byte, error)
- func (concernHandlers ConcernHandlers) DecodeArgPayloadJson(ctx context.Context, concern string, system rkcypb.System, command string, ...) ([]byte, error)
- func (concernHandlers ConcernHandlers) DecodeInstance(ctx context.Context, concern string, buffer []byte) (*ResultProto, error)
- func (concernHandlers ConcernHandlers) DecodeInstance64(ctx context.Context, concern string, buffer64 string) (*ResultProto, error)
- func (concernHandlers ConcernHandlers) DecodeInstance64Json(ctx context.Context, concern string, buffer64 string) ([]byte, error)
- func (concernHandlers ConcernHandlers) DecodeInstanceJson(ctx context.Context, concern string, buffer []byte) ([]byte, error)
- func (concernHandlers ConcernHandlers) DecodeResultPayload(ctx context.Context, concern string, system rkcypb.System, command string, ...) (*ResultProto, ConcernHandler, error)
- func (concernHandlers ConcernHandlers) DecodeResultPayload64(ctx context.Context, concern string, system rkcypb.System, command string, ...) (*ResultProto, ConcernHandler, error)
- func (concernHandlers ConcernHandlers) DecodeResultPayload64Json(ctx context.Context, concern string, system rkcypb.System, command string, ...) ([]byte, error)
- func (concernHandlers ConcernHandlers) DecodeResultPayloadJson(ctx context.Context, concern string, system rkcypb.System, command string, ...) ([]byte, error)
- func (concernHandlers ConcernHandlers) RegisterCrudHandler(concern string, storageType string, handler interface{})
- func (concernHandlers ConcernHandlers) RegisterLogicHandler(concern string, handler interface{})
- func (concernHandlers ConcernHandlers) ValidateConcernHandlers() bool
- type Config
- func (conf *Config) GetBool(key string) (bool, bool)
- func (conf *Config) GetComplexBytes(msgType string, key string) ([]byte, bool)
- func (conf *Config) GetComplexMsg(msgType string, key string) (proto.Message, bool)
- func (conf *Config) GetFloat64(key string) (float64, bool)
- func (conf *Config) GetString(key string) (string, bool)
- func (conf *Config) SetBool(key string, val bool)
- func (conf *Config) SetComplexBytes(msgType string, key string, val []byte)
- func (conf *Config) SetComplexMsg(msgType string, key string, msg proto.Message)
- func (conf *Config) SetFloat64(key string, val float64)
- func (conf *Config) SetString(key string, val string)
- type ConfigRdr
- type Consumer
- type Error
- type Fnv64HashFunc
- type HashFunc
- type InstanceRecord
- type InstanceStore
- func (instStore *InstanceStore) Get(key string) *InstanceRecord
- func (instStore *InstanceStore) GetInstance(key string) []byte
- func (instStore *InstanceStore) GetPacked(key string) []byte
- func (instStore *InstanceStore) GetRelated(key string) []byte
- func (instStore *InstanceStore) Remove(key string)
- func (instStore *InstanceStore) Set(key string, instance []byte, related []byte, cmpdOffset *rkcypb.CompoundOffset)
- func (instStore *InstanceStore) SetInstance(key string, instance []byte, cmpdOffset *rkcypb.CompoundOffset)
- func (instStore *InstanceStore) SetRelated(key string, related []byte, cmpdOffset *rkcypb.CompoundOffset) error
- type PlatformArgs
- type PlatformDiff
- type Producer
- type ProducerCh
- type ResultProto
- type RevertType
- type RtApecsTxn
- func (rtxn *RtApecsTxn) AdvanceStepIdx() bool
- func (rtxn *RtApecsTxn) CanAdvance() bool
- func (rtxn *RtApecsTxn) CurrentStep() *rkcypb.ApecsTxn_Step
- func (rtxn *RtApecsTxn) FirstForwardStep() *rkcypb.ApecsTxn_Step
- func (rtxn *RtApecsTxn) InsertSteps(idx int32, steps ...*rkcypb.ApecsTxn_Step) error
- func (rtxn *RtApecsTxn) PreviousStep() *rkcypb.ApecsTxn_Step
- func (rtxn *RtApecsTxn) ReplaceStep(idx int32, step *rkcypb.ApecsTxn_Step) error
- func (rtxn *RtApecsTxn) Validate() error
- type RtConcern
- type RtPlatformDef
- type RtTopics
- type StandardTopicName
- type Step
- type StepArgs
- type StorageInit
- type StorageTargetInit
- type StreamProvider
- type TopicParts
- type Txn
Constants ¶
View Source
const ( CREATE = "Create" READ = "Read" UPDATE = "Update" UPDATE_ASYNC = "UpdateAsync" DELETE = "Delete" VALIDATE_CREATE = "ValidateCreate" VALIDATE_UPDATE = "ValidateUpdate" REFRESH_INSTANCE = "RefreshInstance" FLUSH_INSTANCE = "FlushInstance" REQUEST_RELATED = "RequestRelated" REFRESH_RELATED = "RefreshRelated" )
View Source
const ( RKCY string = "rkcy" DIRECTIVE_HEADER string = "rkcy-directive" TRACE_PARENT_HEADER string = "traceparent" RKCY_TOPIC string = "rkcy.topic" RKCY_PARTITION string = "rkcy.partition" MAX_PARTITION int32 = 1024 PLATFORM_TOPIC_RETENTION_BYTES int32 = 10 * 1024 * 1024 )
View Source
const ( ADMIN StandardTopicName = "admin" PROCESS = "process" ERROR = "error" COMPLETE = "complete" STORAGE = "storage" STORAGE_SCND = "storage-scnd" )
Variables ¶
This section is empty.
Functions ¶
func ApecsTxnCurrentStep ¶
func ApecsTxnCurrentStep(txn *rkcypb.ApecsTxn) *rkcypb.ApecsTxn_Step
func BuildFullTopicName ¶
func BuildTopicName ¶
func BuildTopicNamePrefix ¶
func BytesToInt ¶
func ComplexConfigUnmarshal ¶
func ConfigToJson ¶
func ConfigTopic ¶
func ConsumersTopic ¶
func CreatePlatformTopics ¶
func DecodeInstance ¶
func DecodeInstance(cncHdlrs ConcernHandlers, concern string, payload64 string)
func GetTraceId ¶
func GetTraceParent ¶
func IntToBytes ¶
func IsACETopic ¶
func IsInstanceStoreStep ¶
func IsInstanceStoreStep(step *rkcypb.ApecsTxn_Step) bool
func IsKeylessStep ¶
func IsKeylessStep(step *rkcypb.ApecsTxn_Step) bool
func IsPackedPayload ¶
func IsPlatformCommand ¶
func IsReservedCommand ¶
func IsStorageSystem ¶
func IsTxnProhibitedCommand ¶
func IsValidName ¶
func LogResult ¶
func LogResult(rslt *rkcypb.ApecsTxn_Step_Result, sev rkcypb.Severity, format string, args ...interface{})
func LogResultDebug ¶
func LogResultDebug(rslt *rkcypb.ApecsTxn_Step_Result, format string, args ...interface{})
func LogResultError ¶
func LogResultError(rslt *rkcypb.ApecsTxn_Step_Result, format string, args ...interface{})
func LogResultInfo ¶
func LogResultInfo(rslt *rkcypb.ApecsTxn_Step_Result, format string, args ...interface{})
func LogResultWarn ¶
func LogResultWarn(rslt *rkcypb.ApecsTxn_Step_Result, format string, args ...interface{})
func NewApecsTxn ¶
func NewKafkaMessage ¶
func NewTraceId ¶
func NewTraceId() string
func OffsetGT ¶
func OffsetGT(lhs *rkcypb.CompoundOffset, rhs *rkcypb.CompoundOffset) bool
func OffsetGTE ¶
func OffsetGTE(lhs *rkcypb.CompoundOffset, rhs *rkcypb.CompoundOffset) bool
func PackPayloads ¶
func PlatformTopic ¶
func PrepLogging ¶
func PrepLogging()
func ProducersTopic ¶
func RegisterComplexConfigHandler ¶
func RegisterComplexConfigHandler(msgType string, handler ComplexConfigHandler)
func RegisterGlobalConcernHandlerNewFunc ¶
func RegisterGlobalConcernHandlerNewFunc(newCncHdlr func() ConcernHandler)
func RegisterHashFunc ¶
func StandardHeaders ¶
func StepSystemName ¶
func StepSystemName(step *rkcypb.ApecsTxn_Step) string
func StorageInitNoop ¶
func TraceIdFromTraceParent ¶
func TraceParentIsValid ¶
func TraceParentParts ¶
func TxnDirectionName ¶
func UncommittedGroupName ¶
func UpdateTopics ¶
func UpdateTopics( ctx context.Context, strmprov StreamProvider, platDef *rkcypb.PlatformDef, ) error
func ValidateTxn ¶
Types ¶
type AdminClient ¶
type AdminClient interface { GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error) CreateTopics( ctx context.Context, topics []kafka.TopicSpecification, options ...kafka.CreateTopicsAdminOption, ) ([]kafka.TopicResult, error) Close() }
type ClientCode ¶
type ClientCode struct { StorageInits map[string]StorageInit ConcernHandlers ConcernHandlers }
func NewClientCode ¶
func NewClientCode() *ClientCode
func (*ClientCode) AddCrudHandler ¶
func (clientCode *ClientCode) AddCrudHandler(concern string, storageType string, handler interface{})
func (*ClientCode) AddLogicHandler ¶
func (clientCode *ClientCode) AddLogicHandler(concern string, handler interface{})
func (*ClientCode) AddStorageInit ¶
func (clientCode *ClientCode) AddStorageInit(storageType string, storageInit StorageInit)
func (*ClientCode) UpdateStorageTargets ¶
func (clientCode *ClientCode) UpdateStorageTargets(storageTargets map[string]*rkcypb.StorageTarget)
type ComplexConfigHandler ¶
type ConcernHandler ¶
type ConcernHandler interface { ConcernName() string HandleLogicCommand( ctx context.Context, system rkcypb.System, command string, direction rkcypb.Direction, args *StepArgs, instanceStore *InstanceStore, confRdr ConfigRdr, ) (*rkcypb.ApecsTxn_Step_Result, []*rkcypb.ApecsTxn_Step) HandleCrudCommand( ctx context.Context, wg *sync.WaitGroup, system rkcypb.System, command string, direction rkcypb.Direction, args *StepArgs, storageType string, ) (*rkcypb.ApecsTxn_Step_Result, []*rkcypb.ApecsTxn_Step) DecodeInstance(ctx context.Context, buffer []byte) (*ResultProto, error) DecodeArg(ctx context.Context, system rkcypb.System, command string, buffer []byte) (*ResultProto, error) DecodeResult(ctx context.Context, system rkcypb.System, command string, buffer []byte) (*ResultProto, error) DecodeRelatedRequest(ctx context.Context, relReq *rkcypb.RelatedRequest) (*ResultProto, error) SetLogicHandler(commands interface{}) error SetCrudHandler(storageType string, commands interface{}) error ValidateHandlers() bool SetStorageTargets(storageTargets map[string]*StorageTargetInit) }
type ConcernHandlers ¶
type ConcernHandlers map[string]ConcernHandler
func NewConcernHandlers ¶
func NewConcernHandlers() ConcernHandlers
func NewGlobalConcernHandlerRegistry ¶
func NewGlobalConcernHandlerRegistry() ConcernHandlers
func (ConcernHandlers) DecodeArgPayload ¶
func (concernHandlers ConcernHandlers) DecodeArgPayload( ctx context.Context, concern string, system rkcypb.System, command string, buffer []byte, ) (*ResultProto, ConcernHandler, error)
func (ConcernHandlers) DecodeArgPayload64 ¶
func (concernHandlers ConcernHandlers) DecodeArgPayload64( ctx context.Context, concern string, system rkcypb.System, command string, buffer64 string, ) (*ResultProto, ConcernHandler, error)
func (ConcernHandlers) DecodeArgPayload64Json ¶
func (ConcernHandlers) DecodeArgPayloadJson ¶
func (ConcernHandlers) DecodeInstance ¶
func (concernHandlers ConcernHandlers) DecodeInstance(ctx context.Context, concern string, buffer []byte) (*ResultProto, error)
func (ConcernHandlers) DecodeInstance64 ¶
func (concernHandlers ConcernHandlers) DecodeInstance64(ctx context.Context, concern string, buffer64 string) (*ResultProto, error)
func (ConcernHandlers) DecodeInstance64Json ¶
func (ConcernHandlers) DecodeInstanceJson ¶
func (ConcernHandlers) DecodeResultPayload ¶
func (concernHandlers ConcernHandlers) DecodeResultPayload( ctx context.Context, concern string, system rkcypb.System, command string, buffer []byte, ) (*ResultProto, ConcernHandler, error)
func (ConcernHandlers) DecodeResultPayload64 ¶
func (concernHandlers ConcernHandlers) DecodeResultPayload64( ctx context.Context, concern string, system rkcypb.System, command string, buffer64 string, ) (*ResultProto, ConcernHandler, error)
func (ConcernHandlers) DecodeResultPayload64Json ¶
func (ConcernHandlers) DecodeResultPayloadJson ¶
func (ConcernHandlers) RegisterCrudHandler ¶
func (concernHandlers ConcernHandlers) RegisterCrudHandler(concern string, storageType string, handler interface{})
func (ConcernHandlers) RegisterLogicHandler ¶
func (concernHandlers ConcernHandlers) RegisterLogicHandler(concern string, handler interface{})
func (ConcernHandlers) ValidateConcernHandlers ¶
func (concernHandlers ConcernHandlers) ValidateConcernHandlers() bool
type Config ¶
func JsonToConfig ¶
func (*Config) GetComplexBytes ¶
func (*Config) GetComplexMsg ¶
func (*Config) SetComplexBytes ¶
func (*Config) SetComplexMsg ¶
func (*Config) SetFloat64 ¶
type ConfigRdr ¶
type ConfigRdr interface { GetString(key string) (string, bool) GetBool(key string) (bool, bool) GetFloat64(key string) (float64, bool) GetComplexMsg(msgType string, key string) (proto.Message, bool) GetComplexBytes(msgType string, key string) ([]byte, bool) BuildConfigResponse() *rkcypb.ConfigReadResponse }
type Consumer ¶
type Consumer interface { Assign(partitions []kafka.TopicPartition) error Close() error Commit() ([]kafka.TopicPartition, error) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (int64, int64, error) ReadMessage(timeout time.Duration) (*kafka.Message, error) StoreOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) }
type Fnv64HashFunc ¶
type Fnv64HashFunc struct {
// contains filtered or unexported fields
}
func NewFnv64HashFunc ¶
func NewFnv64HashFunc() *Fnv64HashFunc
type HashFunc ¶
func GetHashFunc ¶
type InstanceRecord ¶
type InstanceStore ¶
type InstanceStore struct {
// contains filtered or unexported fields
}
func NewInstanceStore ¶
func NewInstanceStore() *InstanceStore
func (*InstanceStore) Get ¶
func (instStore *InstanceStore) Get(key string) *InstanceRecord
func (*InstanceStore) GetInstance ¶
func (instStore *InstanceStore) GetInstance(key string) []byte
func (*InstanceStore) GetPacked ¶
func (instStore *InstanceStore) GetPacked(key string) []byte
func (*InstanceStore) GetRelated ¶
func (instStore *InstanceStore) GetRelated(key string) []byte
func (*InstanceStore) Remove ¶
func (instStore *InstanceStore) Remove(key string)
func (*InstanceStore) Set ¶
func (instStore *InstanceStore) Set(key string, instance []byte, related []byte, cmpdOffset *rkcypb.CompoundOffset)
func (*InstanceStore) SetInstance ¶
func (instStore *InstanceStore) SetInstance(key string, instance []byte, cmpdOffset *rkcypb.CompoundOffset)
func (*InstanceStore) SetRelated ¶
func (instStore *InstanceStore) SetRelated(key string, related []byte, cmpdOffset *rkcypb.CompoundOffset) error
type PlatformArgs ¶
type PlatformDiff ¶
type ProducerCh ¶
type ResultProto ¶
func ApecsTxnResult ¶
func ApecsTxnResult( ctx context.Context, cncHdlrs ConcernHandlers, txn *rkcypb.ApecsTxn, ) (bool, *ResultProto, *rkcypb.ApecsTxn_Step_Result)
type RtApecsTxn ¶
func NewRtApecsTxn ¶
func NewRtApecsTxn(txn *rkcypb.ApecsTxn, traceParent string) (*RtApecsTxn, error)
func (*RtApecsTxn) AdvanceStepIdx ¶
func (rtxn *RtApecsTxn) AdvanceStepIdx() bool
func (*RtApecsTxn) CanAdvance ¶
func (rtxn *RtApecsTxn) CanAdvance() bool
func (*RtApecsTxn) CurrentStep ¶
func (rtxn *RtApecsTxn) CurrentStep() *rkcypb.ApecsTxn_Step
func (*RtApecsTxn) FirstForwardStep ¶
func (rtxn *RtApecsTxn) FirstForwardStep() *rkcypb.ApecsTxn_Step
func (*RtApecsTxn) InsertSteps ¶
func (rtxn *RtApecsTxn) InsertSteps(idx int32, steps ...*rkcypb.ApecsTxn_Step) error
func (*RtApecsTxn) PreviousStep ¶
func (rtxn *RtApecsTxn) PreviousStep() *rkcypb.ApecsTxn_Step
func (*RtApecsTxn) ReplaceStep ¶
func (rtxn *RtApecsTxn) ReplaceStep(idx int32, step *rkcypb.ApecsTxn_Step) error
func (*RtApecsTxn) Validate ¶
func (rtxn *RtApecsTxn) Validate() error
type RtPlatformDef ¶
type RtPlatformDef struct { PlatformDef *rkcypb.PlatformDef Hash string Concerns map[string]*RtConcern DefaultResponseTopic *RtTopics Clusters map[string]*rkcypb.Cluster AdminCluster *rkcypb.Cluster StorageTargets map[string]*rkcypb.StorageTarget PrimaryStorageTarget string }
func NewRtPlatformDef ¶
func NewRtPlatformDef(platDef *rkcypb.PlatformDef, platformName string, environment string) (*RtPlatformDef, error)
func NewRtPlatformDefFromJson ¶
func NewRtPlatformDefFromJson(platDefJson []byte) (*RtPlatformDef, error)
func (*RtPlatformDef) Diff ¶
func (lhs *RtPlatformDef) Diff(rhs *RtPlatformDef, streamType string, adminBrokers string, otelcolEndpoint string) *PlatformDiff
type StandardTopicName ¶
type StandardTopicName string
type StepArgs ¶
type StepArgs struct { TxnId string Key string Instance []byte Payload []byte EffectiveTime time.Time CmpdOffset *rkcypb.CompoundOffset ForwardResult *rkcypb.ApecsTxn_Step_Result }
type StorageInit ¶
type StorageTargetInit ¶
type StorageTargetInit struct { *rkcypb.StorageTarget Init StorageInit }
type StreamProvider ¶
type TopicParts ¶
type TopicParts struct { FullTopic string Platform string Environment string Concern string Topic string System rkcypb.System ConcernType rkcypb.Concern_Type Generation int32 }
func ParseFullTopicName ¶
func ParseFullTopicName(fullTopic string) (*TopicParts, error)
type Txn ¶
type Txn struct { Revert RevertType Steps []Step }
Click to show internal directories.
Click to hide internal directories.