Documentation ¶
Index ¶
- Constants
- Variables
- type KafkaOpreations
- func (ops *KafkaOpreations) Consume(data chan<- []byte, topic string, consumerIndex int)
- func (ops *KafkaOpreations) InitPaymentConsumer(groupId string, router func(string, []byte, *KafkaOpreations)) error
- func (ops *KafkaOpreations) InitProducer() error
- func (ops *KafkaOpreations) Produce(topic string, msg []byte) error
- func (op *KafkaOpreations) SendErrMsg(msgId, instructionId, standardType, reqMsgType, ofiId, rfiId string, ...)
- func (op *KafkaOpreations) SendRequestToKafka(topic string, msg []byte) error
Constants ¶
View Source
const ( KAFKA_SSL = "ssl" KAFKA_SASL = "sasl_ssl" ENV_KEY_KAFKA_AUTH_MODE = "KAFKA_AUTH_MODE" ENV_KEY_KAFKA_CA_LOCATION = "KAFKA_CA_LOCATION" ENV_KEY_KAFKA_CERTIFICATE_LOCATION = "KAFKA_CERTIFICATE_LOCATION" ENV_KEY_KAFKA_KEY_LOCATION = "KAFKA_KEY_LOCATION" ENV_KEY_KAFKA_KEY_PASSWORD = "KAFKA_KEY_PASSWORD" ENV_KEY_KAFKA_KEY_SASL_USER = "KAFKA_KEY_SASL_USER" ENV_KEY_KAFKA_KEY_SASL_PASSWORD = "KAFKA_KEY_SASL_PASSWORD" ENV_KEY_KAFKA_KEY_SASL_MECHANISM = "KAFKA_KEY_SASL_MECHANISM" ENV_KEY_KAFKA_BROKER_URL = "KAFKA_BROKER_URL" ENV_KEY_KAFKA_GROUP_ID = "KAFKA_GROUP_ID" ENV_KEY_KAFKA_AUTO_OFF_RESET = "KAFKA_AUTO_OFF_RESET" PAYMENT_TOPIC = "PAYMENT" QUOTES_TOPIC = "QUOTES" FEE_TOPIC = "FEE" TRANSACTION_TOPIC = "TRANSACTIONS" REQUEST_TOPIC = "_req" RESPONSE_TOPIC = "_res" ANCHOR_REDEMPTION_TOPIC = "ANCHOR_REDEMPTION" + REQUEST_TOPIC )
Variables ¶
View Source
var LOGGER = logging.MustGetLogger("kafka")
View Source
var SUPPORT_MESSAGE_TYPES = []string{PAYMENT_TOPIC, QUOTES_TOPIC, FEE_TOPIC, TRANSACTION_TOPIC}
Functions ¶
This section is empty.
Types ¶
type KafkaOpreations ¶
type KafkaOpreations struct { BrokerURL string AutoOffReset string SecurityProtocol string SslCaLocation string SslCertificateLocation string SslKeyLocation string SslKeyPassword string SaslUsername string SaslPassword string SaslMechanism string Producer *kafka.Producer Consumers []*kafka.Consumer GroupId string //used only by send-service FundHandler transaction.CreateFundingOpereations SignHandler signing.CreateSignOperations WhitelistHandler whitelist_handler.ParticipantWhiteList DbClient *database.PostgreDatabaseClient ResponseHandler *parse.ResponseHandler }
func Initialize ¶
func Initialize() (*KafkaOpreations, error)
func (*KafkaOpreations) Consume ¶
func (ops *KafkaOpreations) Consume(data chan<- []byte, topic string, consumerIndex int)
func (*KafkaOpreations) InitPaymentConsumer ¶
func (ops *KafkaOpreations) InitPaymentConsumer(groupId string, router func(string, []byte, *KafkaOpreations)) error
func (*KafkaOpreations) InitProducer ¶
func (ops *KafkaOpreations) InitProducer() error
func (*KafkaOpreations) Produce ¶
func (ops *KafkaOpreations) Produce(topic string, msg []byte) error
func (*KafkaOpreations) SendErrMsg ¶
func (op *KafkaOpreations) SendErrMsg(msgId, instructionId, standardType, reqMsgType, ofiId, rfiId string, errType int)
Send back errors happened on RFI site during request processing to OFI
func (*KafkaOpreations) SendRequestToKafka ¶
func (op *KafkaOpreations) SendRequestToKafka(topic string, msg []byte) error
Click to show internal directories.
Click to hide internal directories.