Documentation ¶
Index ¶
- Variables
- func AddResponseToDB(ticketID Ticket, response []byte, kafkaDB *dbm.GoLevelDB, cdc *codec.Codec)
- func CliCtxFromKafkaMsg(msg KafkaMsg, cliCtx context.CLIContext) context.CLIContext
- func GetResponseFromDB(ticketID Ticket, kafkaDB *dbm.GoLevelDB, cdc *codec.Codec) []byte
- func KafkaAdmin(kafkaPorts []string) sarama.ClusterAdmin
- func KafkaProducerDeliverMessage(msg KafkaMsg, topic string, producer sarama.SyncProducer, cdc *codec.Codec) error
- func NewConsumer(kafkaPorts []string) sarama.Consumer
- func NewProducer(kafkaPorts []string) sarama.SyncProducer
- func PartitionConsumers(consumer sarama.Consumer, topic string) sarama.PartitionConsumer
- func QueryDB(cdc *codec.Codec, r *mux.Router, kafkaDB *dbm.GoLevelDB) http.HandlerFunc
- func RegisterCodec(cdc *codec.Codec)
- func SendToKafka(msg KafkaMsg, kafkaState KafkaState, cdc *codec.Codec) []byte
- func SetTicketIDtoDB(ticketID Ticket, kafkaDB *dbm.GoLevelDB, cdc *codec.Codec, msg []byte)
- func TopicsInit(admin sarama.ClusterAdmin, topic string)
- type KafkaCliCtx
- type KafkaMsg
- type KafkaState
- type Ticket
- type TicketIDResponse
Constants ¶
This section is empty.
Variables ¶
var DefaultCLIHome = os.ExpandEnv("$HOME/.kafka")
DefaultCLIHome : is the home path
var ModuleCdc *codec.Codec
module codec
var SleepRoutine = time.Duration(2500000000)
SleepRoutine : the time the kafka msgs are to be taken in
var SleepTimer = time.Duration(1000000000)
SleepTimer : the time the kafka msgs are to be taken in
var TicketIDAtomicCounter int64
TicketIDAtomicCounter is a counter that adds when each time a function is called
var Topics = []string{
"Topic",
}
Topics : is list of topics
Functions ¶
func AddResponseToDB ¶
AddResponseToDB : Updates response to DB
func CliCtxFromKafkaMsg ¶
func CliCtxFromKafkaMsg(msg KafkaMsg, cliCtx context.CLIContext) context.CLIContext
CliCtxFromKafkaMsg : sets the txctx and clictx again to consume
func GetResponseFromDB ¶
GetResponseFromDB : gives the response from DB
func KafkaAdmin ¶
func KafkaAdmin(kafkaPorts []string) sarama.ClusterAdmin
KafkaAdmin : is admin to create topics
func KafkaProducerDeliverMessage ¶
func KafkaProducerDeliverMessage(msg KafkaMsg, topic string, producer sarama.SyncProducer, cdc *codec.Codec) error
KafkaProducerDeliverMessage : delivers messages to kafka
func NewConsumer ¶
NewConsumer : is a consumer which is needed to create child consumers to consume topics
func NewProducer ¶
func NewProducer(kafkaPorts []string) sarama.SyncProducer
NewProducer is a producer to send messages to kafka
func PartitionConsumers ¶
func PartitionConsumers(consumer sarama.Consumer, topic string) sarama.PartitionConsumer
PartitionConsumers : is a child consumer
func SendToKafka ¶
func SendToKafka(msg KafkaMsg, kafkaState KafkaState, cdc *codec.Codec) []byte
SendToKafka : handles sending message to kafka
func SetTicketIDtoDB ¶
SetTicketIDtoDB : initiates ticketid in Database
func TopicsInit ¶
func TopicsInit(admin sarama.ClusterAdmin, topic string)
TopicsInit : is needed to initialise topics
Types ¶
type KafkaCliCtx ¶
type KafkaCliCtx struct { OutputFormat string Height int64 NodeURI string From string TrustNode bool UseLedger bool BroadcastMode string VerifierHome string Simulate bool GenerateOnly bool FromAddress sdk.AccAddress FromName string Indent bool SkipConfirm bool }
KafkaCliCtx : client tx without codec
type KafkaMsg ¶
type KafkaMsg struct { Msg sdk.Msg `json:"msg"` TicketID Ticket `json:"ticketID"` BaseRequest rest.BaseReq `json:"base_req"` KafkaCli KafkaCliCtx `json:"kafkaCliCtx"` Password string `json:"password"` Mode string `json:"mode"` }
KafkaMsg : is a store that can be stored in kafka queues
func KafkaTopicConsumer ¶
func KafkaTopicConsumer(topic string, consumers map[string]sarama.PartitionConsumer, cdc *codec.Codec) KafkaMsg
KafkaTopicConsumer : Takes a consumer and makes it consume a topic message at a time
type KafkaState ¶
type KafkaState struct { KafkaDB *dbm.GoLevelDB Admin sarama.ClusterAdmin Consumer sarama.Consumer Consumers map[string]sarama.PartitionConsumer Producer sarama.SyncProducer Topics []string }
KafkaState : is a struct showing the state of kafka
func NewKafkaState ¶
func NewKafkaState(kafkaPorts []string) KafkaState
NewKafkaState : returns a kafka state
type Ticket ¶
type Ticket string
Ticket : is a type that implements string
func TicketIDGenerator ¶
TicketIDGenerator is a random unique ticket ID generator, output is a string
type TicketIDResponse ¶
type TicketIDResponse struct {
TicketID Ticket `json:"TicketID" valid:"required~TicketID is mandatory,length(20)~RelayerAddress length should be 20" `
}
TicketIDResponse : is a json structure to send TicketID to user