Documentation ¶
Index ¶
- Constants
- Variables
- func ProduceEvents(ctx context.Context, producer Producer, messages ...*Message) error
- func ShouldHandleMessage(ctx context.Context, handler EventHandler, msg *Message) bool
- type Consumer
- type ConsumerBackoffManager
- func (bm *ConsumerBackoffManager) GetBackoffDuration() time.Duration
- func (bm *ConsumerBackoffManager) GetMessage() *Message
- func (bm *ConsumerBackoffManager) IsMaxBackoffReached() bool
- func (bm *ConsumerBackoffManager) ResetBackoff()
- func (bm *ConsumerBackoffManager) TriggerBackoff()
- func (bm *ConsumerBackoffManager) TriggerBackoffWithMessage(msg *Message)
- type EventBrokerType
- type EventConsumer
- type EventHandler
- type EventPatchAnchorPlatformTransactionCompletionData
- type EventReceiverWalletSMSInvitationData
- type HandlerError
- type HandlerSuccess
- type KafkaConfig
- type KafkaConsumer
- type KafkaProducer
- type KafkaSecurityProtocol
- type Message
- type MockConsumer
- func (c *MockConsumer) BrokerType() EventBrokerType
- func (c *MockConsumer) Close() error
- func (c *MockConsumer) Handlers() []EventHandler
- func (c *MockConsumer) ReadMessage(ctx context.Context) (*Message, error)
- func (c *MockConsumer) RegisterEventHandler(ctx context.Context, eventHandlers ...EventHandler) error
- func (c *MockConsumer) Topic() string
- type MockEventHandler
- type MockProducer
- type NoopProducer
- type Producer
Constants ¶
const ( ReceiverWalletNewInvitationTopic = "events.receiver-wallets.new_invitation" PaymentCompletedTopic = "events.payment.payment_completed" PaymentReadyToPayTopic = "events.payment.ready_to_pay" CirclePaymentReadyToPayTopic = "events.payment.circle_ready_to_pay" )
Topic Names
Note: when adding a new topic here, please, add the new topic to `kafka-init` service command on dev/docker-compose-sdp-anchor.yml.
`kafka-topics.sh --create --if-not-exists --topic events.new-topic ...`
const ( RetryReceiverWalletInvitationType = "retry-receiver-wallet-sms-invitation" BatchReceiverWalletInvitationType = "batch-receiver-wallet-sms-invitation" PaymentCompletedSuccessType = "payment-completed-success" PaymentCompletedErrorType = "payment-completed-error" PaymentReadyToPayDisbursementStarted = "payment-ready-to-pay-disbursement-started" PaymentReadyToPayReceiverVerificationCompleted = "payment-ready-to-pay-receiver-verification-completed" PaymentReadyToPayRetryFailedPayment = "payment-ready-to-pay-retry-failed-payment" )
Type Names
const DefaultMaxBackoffExponent = 8
Variables ¶
var ( SASLProtocols = []KafkaSecurityProtocol{KafkaProtocolSASLPlaintext, KafkaProtocolSASLSSL} SSLProtocols = []KafkaSecurityProtocol{KafkaProtocolSASLSSL, KafkaProtocolSSL} )
Functions ¶
func ProduceEvents ¶
func ShouldHandleMessage ¶
func ShouldHandleMessage(ctx context.Context, handler EventHandler, msg *Message) bool
ShouldHandleMessage returns true if the message should be handled by the handler passed by parameter. A message should be handled by a handler if the handler can handle the message and the handler has not been executed before.
Types ¶
type Consumer ¶
type Consumer interface { ReadMessage(ctx context.Context) (*Message, error) Topic() string Handlers() []EventHandler Close() error BrokerType() EventBrokerType }
Consumer is an interface that defines the methods that a consumer should implement.
type ConsumerBackoffManager ¶
type ConsumerBackoffManager struct {
// contains filtered or unexported fields
}
func NewBackoffManager ¶
func NewBackoffManager(backoffChan chan<- struct{}, maxBackoff int) *ConsumerBackoffManager
func (*ConsumerBackoffManager) GetBackoffDuration ¶
func (bm *ConsumerBackoffManager) GetBackoffDuration() time.Duration
func (*ConsumerBackoffManager) GetMessage ¶
func (bm *ConsumerBackoffManager) GetMessage() *Message
func (*ConsumerBackoffManager) IsMaxBackoffReached ¶
func (bm *ConsumerBackoffManager) IsMaxBackoffReached() bool
func (*ConsumerBackoffManager) ResetBackoff ¶
func (bm *ConsumerBackoffManager) ResetBackoff()
func (*ConsumerBackoffManager) TriggerBackoff ¶
func (bm *ConsumerBackoffManager) TriggerBackoff()
func (*ConsumerBackoffManager) TriggerBackoffWithMessage ¶
func (bm *ConsumerBackoffManager) TriggerBackoffWithMessage(msg *Message)
type EventBrokerType ¶
type EventBrokerType string
const ( KafkaEventBrokerType EventBrokerType = "KAFKA" // NoneEventBrokerType means that no event broker was chosen. NoneEventBrokerType EventBrokerType = "NONE" )
func ParseEventBrokerType ¶
func ParseEventBrokerType(ebType string) (EventBrokerType, error)
type EventConsumer ¶
type EventConsumer struct {
// contains filtered or unexported fields
}
func NewEventConsumer ¶
func NewEventConsumer(consumer Consumer, producer Producer, crashTracker crashtracker.CrashTrackerClient) *EventConsumer
func (*EventConsumer) Consume ¶
func (ec *EventConsumer) Consume(ctx context.Context)
type EventHandler ¶
type EventPatchAnchorPlatformTransactionCompletionData ¶
type EventPatchAnchorPlatformTransactionCompletionData struct {
PaymentID string `json:"payment_id"`
}
type EventReceiverWalletSMSInvitationData ¶
type EventReceiverWalletSMSInvitationData struct {
ReceiverWalletID string `json:"id"`
}
type HandlerError ¶
type HandlerError struct { // FailedAt timestamp for the time of failure. FailedAt time.Time `json:"failed_at"` // ErrorMessage detailed error message. Used for displaying. ErrorMessage string `json:"error_message"` // HandlerName name of the handler where the error occurred. HandlerName string `json:"handler_name"` // Err full handler error. Err error `json:"-"` }
func NewHandlerError ¶
func NewHandlerError(hError error, handlerName string) HandlerError
type HandlerSuccess ¶
type HandlerSuccess struct { // ExecutedAt timestamp for the time of successful handling ExecutedAt time.Time `json:"executed_at"` // HandlerName name of the handler that succeeded HandlerName string `json:"handler_name"` }
HandlerSuccess represents a successful handling of a message
type KafkaConfig ¶
type KafkaConfig struct { Brokers []string SecurityProtocol KafkaSecurityProtocol SASLUsername string SASLPassword string SSLAccessKey string SSLAccessCertificate string }
func (*KafkaConfig) Validate ¶
func (kc *KafkaConfig) Validate() error
type KafkaConsumer ¶
type KafkaConsumer struct {
// contains filtered or unexported fields
}
func NewKafkaConsumer ¶
func NewKafkaConsumer(config KafkaConfig, topic string, consumerGroupID string, handlers ...EventHandler) (*KafkaConsumer, error)
func (*KafkaConsumer) BrokerType ¶
func (k *KafkaConsumer) BrokerType() EventBrokerType
BrokerType returns the type of the Kafka broker
func (*KafkaConsumer) Handlers ¶
func (k *KafkaConsumer) Handlers() []EventHandler
Handlers returns the event handlers of the Kafka consumer
func (*KafkaConsumer) ReadMessage ¶
func (k *KafkaConsumer) ReadMessage(ctx context.Context) (*Message, error)
ReadMessage reads a message from the Kafka topic of the consumer and commits the offset
func (*KafkaConsumer) Topic ¶
func (k *KafkaConsumer) Topic() string
Topic returns the topic of the Kafka consumer
type KafkaProducer ¶
type KafkaProducer struct {
// contains filtered or unexported fields
}
func NewKafkaProducer ¶
func NewKafkaProducer(config KafkaConfig) (*KafkaProducer, error)
func (*KafkaProducer) BrokerType ¶
func (k *KafkaProducer) BrokerType() EventBrokerType
BrokerType returns the type of the Kafka broker
func (*KafkaProducer) Close ¶
func (k *KafkaProducer) Close(ctx context.Context)
func (*KafkaProducer) Ping ¶
func (k *KafkaProducer) Ping(ctx context.Context) error
Ping pings the Kafka Broker
func (*KafkaProducer) WriteMessages ¶
func (k *KafkaProducer) WriteMessages(ctx context.Context, messages ...Message) error
type KafkaSecurityProtocol ¶
type KafkaSecurityProtocol string
const ( KafkaProtocolPlaintext KafkaSecurityProtocol = "PLAINTEXT" KafkaProtocolSASLPlaintext KafkaSecurityProtocol = "SASL_PLAINTEXT" KafkaProtocolSASLSSL KafkaSecurityProtocol = "SASL_SSL" KafkaProtocolSSL KafkaSecurityProtocol = "SSL" )
func ParseKafkaSecurityProtocol ¶
func ParseKafkaSecurityProtocol(protocol string) (KafkaSecurityProtocol, error)
type Message ¶
type Message struct { Topic string `json:"topic"` Key string `json:"key"` TenantID string `json:"tenant_id"` Type string `json:"type"` Data any `json:"data"` Errors []HandlerError `json:"errors,omitempty"` SuccessfulExecutions []HandlerSuccess `json:"successful_executions,omitempty"` }
func NewMessage ¶
NewMessage returns a new message with values passed by parameters. It also parses the `TenantID` from the context and inject it into the message. Returns error if the tenant is not found in the context.
func NewPaymentReadyToPayMessage ¶
func NewPaymentReadyToPayMessage(ctx context.Context, platform schema.Platform, key, messageType string) (*Message, error)
NewPaymentReadyToPayMessage returns a new message for the `PaymentReadyToPayTopic` topic or `CirclePaymentReadyToPayTopic` topic.
func (*Message) RecordError ¶
func (*Message) RecordSuccess ¶
type MockConsumer ¶
MockConsumer is a mock implementation of Consumer
func (*MockConsumer) BrokerType ¶
func (c *MockConsumer) BrokerType() EventBrokerType
func (*MockConsumer) Close ¶
func (c *MockConsumer) Close() error
func (*MockConsumer) Handlers ¶
func (c *MockConsumer) Handlers() []EventHandler
func (*MockConsumer) ReadMessage ¶
func (c *MockConsumer) ReadMessage(ctx context.Context) (*Message, error)
func (*MockConsumer) RegisterEventHandler ¶
func (c *MockConsumer) RegisterEventHandler(ctx context.Context, eventHandlers ...EventHandler) error
func (*MockConsumer) Topic ¶
func (c *MockConsumer) Topic() string
type MockEventHandler ¶
MockEventHandler is a mock implementation of EventHandler
func NewMockEventHandler ¶
func NewMockEventHandler(t testInterface) *MockEventHandler
func (*MockEventHandler) CanHandleMessage ¶
func (h *MockEventHandler) CanHandleMessage(ctx context.Context, msg *Message) bool
func (*MockEventHandler) Handle ¶
func (h *MockEventHandler) Handle(ctx context.Context, msg *Message) error
func (*MockEventHandler) Name ¶
func (h *MockEventHandler) Name() string
type MockProducer ¶
MockProducer is a mock implementation of Producer
func NewMockProducer ¶
func NewMockProducer(t testInterface) *MockProducer
NewMockProducer creates a new instance of MockProducer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func (*MockProducer) BrokerType ¶
func (c *MockProducer) BrokerType() EventBrokerType
func (*MockProducer) Close ¶
func (c *MockProducer) Close(ctx context.Context)
func (*MockProducer) WriteMessages ¶
func (c *MockProducer) WriteMessages(ctx context.Context, messages ...Message) error
type NoopProducer ¶
type NoopProducer struct{}
NoopProducer is a producer used to log messages instead of sending them to a real producer.
func (NoopProducer) BrokerType ¶
func (p NoopProducer) BrokerType() EventBrokerType
func (NoopProducer) Close ¶
func (p NoopProducer) Close(ctx context.Context)
func (NoopProducer) WriteMessages ¶
func (p NoopProducer) WriteMessages(ctx context.Context, messages ...Message) error