Documentation ¶
Overview ¶
Package datacommunicator ...
Package datacommunicator ...
Index ¶
- Constants
- func Decode(d []byte, a interface{}) error
- func Encode(d interface{}) ([]byte, error)
- func KafkaConnect(kp *KafkaPacket, messageQueueConfigPath string) error
- func SetConfiguration(filePath string) error
- func TLS(cCert, cKey, caCert string) (*tls.Config, error)
- type KafkaF
- type KafkaPacket
- func (kp *KafkaPacket) Accept(pipe string, fn MsgProcess) error
- func (kp *KafkaPacket) Close()
- func (kp *KafkaPacket) Distribute(pipe string, d interface{}) error
- func (kp *KafkaPacket) Get(pipe string, d interface{}) interface{}
- func (kp *KafkaPacket) Read(p string, fn MsgProcess) error
- func (kp *KafkaPacket) Remove(pipe string) error
- type MQBus
- type MQF
- type MsgProcess
- type Packet
Constants ¶
const (
KAFKA = iota // KAFKA as Messaging Platform, Please use this ID
)
BrokerType defines the underline MQ platform to be selected for the messages. KAFKA is the platform supported as part of odimra phase 1.
Variables ¶
This section is empty.
Functions ¶
func Decode ¶
Decode converts the byte stream into Data (DECODE). /data will be masked as Interface before sent to Consumer or Requester.
func KafkaConnect ¶
func KafkaConnect(kp *KafkaPacket, messageQueueConfigPath string) error
KafkaConnect defines the connection procedure for KAFKA Server. For now, we are taking only one server as input. TLS for client send would be formed as TLS object and same would be passed to the Server for connnection request. Common Dialer object will be used for both Reader and Writer objects. These objects would be updated if there is a request coming for specific Pipe, that specific Pipe name and Connection object would be stored as part of this map pair.
func SetConfiguration ¶
SetConfiguration defines the function to read the client side configuration file any configuration data, which need / should be provided by MQ user would be taken directly from the user by asking to fill a structure. THIS DATA DETAILS SHOULD BE DEFINED AS PART OF INTERFACE DEFINITION.
func TLS ¶
TLS creates the TLS Configuration object to used by any Broker for Auth and Encryption. The Certficate and Key files are created from Java Keytool generated JKS format files. Please look into README for more information In case of Kafka, we generate the Server certificate files in JKS format. We do the same for Clients as well. Then we convert those files into PEM format.
Types ¶
type KafkaF ¶
type KafkaF struct { // KServer defines the Kafka Server URI/Nodename. DEFAULT = localhost KServer string `toml:"KServers"` // KLport defines the Server listening port for Kafka. DEFAULT = 9092 KLport int `toml:"KLPort"` // KTimeout defines the timeout for Kafka Server connection. // DEFAULT = 10 (in seconds) KTimeout int `toml:"KTimeout"` // KAFKACertFile defines the TLS Certificate File for KAFKA. No DEFAULT KAFKACertFile string `toml:"KAFKACertFile"` // KAFKAKeyFile defines the TLS Key File for KAFKA. No DEFAULT KAFKAKeyFile string `toml:"KAFKAKeyFile"` // KAFKACAFile defines the KAFKA Certification Authority. No DEFAULT KAFKACAFile string `toml:"KAFKACAFile"` }
KafkaF defines the KAFKA Server connection configurations. This structure will be extended once we are adding the TLS Authentication and Message encoding capability.
type KafkaPacket ¶
type KafkaPacket struct { // All common base function objects are defined in this object. This // object will support only Publishing and Subscriptions based on KAFKA // support. We use KAFKA 2.2.0 with Scala 2.12. Packet // Readers would maintain a mapping between the Kafka Reader pointer // and the Topic which is handled in that reader. Readers map[string]*kafka.Reader // Writers defines the mapping between KAFKA Writer pointer reference // and the Topic which is handled in that Writer Writers map[string]*kafka.Writer // DialerConn defines the member which can be used for single connection // towards KAFKA DialerConn *kafka.Dialer // Server defines the KAFKA server with port Server string }
KafkaPacket defines the KAFKA Message Object. This one conains all the required KAFKA-GO related identifiers to maintain connection with KAFKA servers. For Publishing and Consuming two different Connection used towards Kafka as we are using Reader and Writer IO Stream Integration with RPC call. Because of the way Kafka communication works, we are storing these IO objects as Value for a map & mapped to Channel name for which these objects are created. Apart of Reader or Writer maps, It also maintains the Dialer Object for initial Kafka connection. Current Active Server name too maintained as part of KafkaPacket Object.
func (*KafkaPacket) Accept ¶
func (kp *KafkaPacket) Accept(pipe string, fn MsgProcess) error
Accept function defines the Consumer or Subscriber functionality for KAFKA. If Reader object for the specified Pipe is not available, New Reader Object would be created. From this function Goroutine "Read" will be invoked to handle the incoming messages.
func (*KafkaPacket) Close ¶
func (kp *KafkaPacket) Close()
Close will disconnect KAFKA Connection. This API should be called when client is completely closing Kafka connection, both Reader and Writer objects. We don't close just one channel subscription using this API. For that we would be have different APIs defined, called "Remove".
func (*KafkaPacket) Distribute ¶
func (kp *KafkaPacket) Distribute(pipe string, d interface{}) error
Distribute defines the Producer / Publisher role and functionality. Writer would be created for each Pipe comes-in for communication. If Writer already exists, that connection would be used for this call. Before publishing the message in the specified Pipe, it will be converted into Byte stream using "Encode" API. Encryption is enabled for the message via TLS.
func (*KafkaPacket) Get ¶
func (kp *KafkaPacket) Get(pipe string, d interface{}) interface{}
Get - Not supported for now in Kafka from Message Bus side due to limitations on the quality of the go library implementation. Will be taken-up in future.
func (*KafkaPacket) Read ¶
func (kp *KafkaPacket) Read(p string, fn MsgProcess) error
Read would access the KAFKA messages in a infinite loop. Callback method access is existing only in "goka" library. Not available in "kafka-go".
func (*KafkaPacket) Remove ¶
func (kp *KafkaPacket) Remove(pipe string) error
Remove will just remove the existing subscription. This API would check just the Reader map as to Distribute / Publish messages, we don't need subscription
type MQBus ¶
type MQBus interface { Distribute(pipe string, data interface{}) error Accept(pipe string, fn MsgProcess) error Get(pipe string, d interface{}) interface{} Remove(pipe string) error Close() }
MQBus Interface defines the Process interface function (Only function user should call). These functions are implemented as part of Packet struct. Distribute - API to Publish Messages into specified Pipe (Topic / Subject) Accept - Consume the incoming message if subscribed by that component Get - Would initiate blocking call to remote process to get response Close - Would disconnect the connection with Middleware.
func Communicator ¶
Communicator defines the Broker platform Middleware selection and corresponding communication object would be created to send / receive the messages. Broker type would be stored as part of Connection Object "Packet". TODO: We would be looking into Kafka Synchronous communication API for providing support for Sync Communication Model in MessageBus
type MQF ¶
type MQF struct {
KafkaF `toml:"KAFKA"`
}
MQF define the configuration File content for KAFKA in Golang structure format. These configurations are embedded into MQF structure for direct access to the data.
type MsgProcess ¶
type MsgProcess func(d interface{})
MsgProcess defines the functions for processing accepted messages. Any client who wants to accept and handle the events / notifications / messages, should implement this function as part of their procedure. That same function should be sent to MessageBus as callback for handling the incoming messages.
type Packet ¶
type Packet struct { // BrokerType defines the underline MQ platform BrokerType int }
Packet defines all the message related information that Producer or Consumer should know for message transactions. Both Producer and Consumer use this same structure for message transactions. BrokerType - Refer above defined Constants for possible values DataResponder - Refer HandleResponse Type description