Documentation ¶
Overview ¶
Package datacommunicator ...
Package datacommunicator ...
Index ¶
- Constants
- func CloseAll()
- func Decode(d []byte, a interface{}) error
- func Encode(d interface{}) ([]byte, error)
- func SetConfiguration(filePath string) error
- func TLS(cCert, cKey, caCert string) (*tls.Config, error)
- type KafkaF
- type KafkaPacket
- type MQBus
- type MQF
- type MsgProcess
- type Packet
- type RedisStreams
- type RedisStreamsPacket
- func (rp *RedisStreamsPacket) Accept(fn MsgProcess) error
- func (rp *RedisStreamsPacket) Close() error
- func (rp *RedisStreamsPacket) Distribute(data interface{}) error
- func (rp *RedisStreamsPacket) Get(pipe string, d interface{}) interface{}
- func (rp *RedisStreamsPacket) Ping() bool
- func (rp *RedisStreamsPacket) Read(fn MsgProcess) error
- func (rp *RedisStreamsPacket) Remove() error
Constants ¶
const ( KAFKA = "Kafka" // KAFKA as Messaging Platform, Please use this ID REDISSTREAMS = "RedisStreams" // REDISSTREAMS as Messaging Platform EVENTREADERGROUPNAME = "eventreaders_grp" )
BrokerType defines the underline MQ platform to be selected for the messages. KAFKA and RedisStremas platforms are supported.
const ( // DefaultTLSMinVersion is default minimum version for tls DefaultTLSMinVersion = tls.VersionTLS12 // TimeoutErrMsg is the connection time out error message TimeoutErrMsg string = " connection timed out" )
Variables ¶
This section is empty.
Functions ¶
func CloseAll ¶
func CloseAll()
CloseAll will disconnect KAFKA Connection. This API should be called when client is completely closing Kafka connection, both Reader and Writer objects. We don't close just one channel subscription using this API. For that we would be have different APIs defined, called "Remove".
func Decode ¶
Decode converts the byte stream into Data (DECODE). /data will be masked as Interface before sent to Consumer or Requester.
func SetConfiguration ¶
SetConfiguration defines the function to read the client side configuration file any configuration data, which need / should be provided by MQ user would be taken directly from the user by asking to fill a structure. THIS DATA DETAILS SHOULD BE DEFINED AS PART OF INTERFACE DEFINITION.
func TLS ¶
TLS creates the TLS Configuration object to used by any Broker for Auth and Encryption. The Certficate and Key files are created from Java Keytool generated JKS format files. Please look into README for more information In case of Kafka, we generate the Server certificate files in JKS format. We do the same for Clients as well. Then we convert those files into PEM format.
Types ¶
type KafkaF ¶
type KafkaF struct { // KServersInfo defines the list of Kafka Server URI/Nodename:port. DEFAULT = [localhost:9092] KServersInfo []string `toml:"KServersInfo"` // KTimeout defines the timeout for Kafka Server connection. // DEFAULT = 10 (in seconds) KTimeout int `toml:"KTimeout"` // KAFKACertFile defines the TLS Certificate File for KAFKA. No DEFAULT KAFKACertFile string `toml:"KAFKACertFile"` // KAFKAKeyFile defines the TLS Key File for KAFKA. No DEFAULT KAFKAKeyFile string `toml:"KAFKAKeyFile"` // KAFKACAFile defines the KAFKA Certification Authority. No DEFAULT KAFKACAFile string `toml:"KAFKACAFile"` }
KafkaF defines the KAFKA Server connection configurations. This structure will be extended once we are adding the TLS Authentication and Message encoding capability.
type KafkaPacket ¶
type KafkaPacket struct { // All common base function objects are defined in this object. This // object will support only Publishing and Subscriptions based on KAFKA // support. We use KAFKA 2.2.0 with Scala 2.12. Packet // DialerConn defines the member which can be used for single connection // towards KAFKA DialerConn *kafka.Dialer // ServerInfo defines list of the KAFKA server with port ServersInfo []string // contains filtered or unexported fields }
KafkaPacket defines the KAFKA Message Object. This one conains all the required KAFKA-GO related identifiers to maintain connection with KAFKA servers. For Publishing and Consuming two different Connection used towards Kafka as we are using Reader and Writer IO Stream Integration with RPC call. Because of the way Kafka communication works, we are storing these IO objects as Value for a map & mapped to Channel name for which these objects are created. Apart of Reader or Writer maps, It also maintains the Dialer Object for initial Kafka connection. Current Active Server name too maintained as part of KafkaPacket Object.
func (*KafkaPacket) Accept ¶
func (kp *KafkaPacket) Accept(fn MsgProcess) error
Accept function defines the Consumer or Subscriber functionality for KAFKA. If Reader object for the specified Pipe is not available, New Reader Object would be created. From this function Goroutine "Read" will be invoked to handle the incoming messages.
func (*KafkaPacket) Close ¶
func (kp *KafkaPacket) Close() error
Close will terminate the write connection created for the topic. This API would check just the Writer map for the connection object.
func (*KafkaPacket) Distribute ¶
func (kp *KafkaPacket) Distribute(d interface{}) error
Distribute defines the Producer / Publisher role and functionality. Writer would be created for each Pipe comes-in for communication. If Writer already exists, that connection would be used for this call. Before publishing the message in the specified Pipe, it will be converted into Byte stream using "Encode" API. Encryption is enabled for the message via TLS.
func (*KafkaPacket) Get ¶
func (kp *KafkaPacket) Get(pipe string, d interface{}) interface{}
Get - Not supported for now in Kafka from Message Bus side due to limitations on the quality of the go library implementation. Will be taken-up in future.
func (*KafkaPacket) Read ¶
func (kp *KafkaPacket) Read(fn MsgProcess) error
Read would access the KAFKA messages in a infinite loop. Callback method access is existing only in "goka" library. Not available in "kafka-go".
func (*KafkaPacket) Remove ¶
func (kp *KafkaPacket) Remove() error
Remove will just remove the existing subscription. This API would check just the Reader map as to Distribute / Publish messages, we don't need subscription
type MQBus ¶
type MQBus interface { Distribute(data interface{}) error Accept(fn MsgProcess) error Get(pipe string, d interface{}) interface{} Remove() error Close() error }
MQBus Interface defines the Process interface function (Only function user should call). These functions are implemented as part of Packet struct. Distribute - API to Publish Messages into specified Pipe (Topic / Subject) Accept - Consume the incoming message if subscribed by that component Get - Would initiate blocking call to remote process to get response Close - Would disconnect the connection with Middleware.
func Communicator ¶
Communicator defines the Broker platform Middleware selection and corresponding communication object would be created to send / receive the messages. Broker type would be stored as part of Connection Object "Packet". TODO: We would be looking into Kafka Synchronous communication API for providing support for Sync Communication Model in MessageBus
type MQF ¶
type MQF struct { KafkaF *KafkaF `toml:"KAFKA"` RedisStreams *RedisStreams `toml:"RedisStreams"` }
MQF define the configuration File content for KAFKA in Golang structure format. These configurations are embedded into MQF structure for direct access to the data.
var MQ MQF
MQ Create both MQF and KafkaPacket Objects. MQF will be used to store all config information including Server URL, Port, User credentials and other configuration information, which is for Future Expansion.
type MsgProcess ¶
type MsgProcess func(d interface{})
MsgProcess defines the functions for processing accepted messages. Any client who wants to accept and handle the events / notifications / messages, should implement this function as part of their procedure. That same function should be sent to MessageBus as callback for handling the incoming messages.
type Packet ¶
type Packet struct { // BrokerType defines the underline MQ platform BrokerType string }
Packet defines all the message related information that Producer or Consumer should know for message transactions. Both Producer and Consumer use this same structure for message transactions. BrokerType - Refer above defined Constants for possible values DataResponder - Refer HandleResponse Type description
type RedisStreams ¶
type RedisStreams struct { RedisServerAddress string `toml:"RedisServerAddress"` RedisServerPort string `toml:"RedisServerPort"` SentinalAddress string `toml:"SentinalAddress"` RedisCertFile string `toml:"RedisCertFile"` RedisKeyFile string `toml:"RedisKeyFile"` RedisCAFile string `toml:"RedisCAFile"` RSAPrivateKeyPath string `toml:"RSAPrivateKeyPath"` RedisInMemoryEncryptedPassword string `toml:"RedisInMemoryEncryptedPassword"` RSAPrivateKey []byte RedisInMemoryPassword []byte }
RedisStreams defines the Redis connection configurations.
type RedisStreamsPacket ¶
type RedisStreamsPacket struct { Packet // contains filtered or unexported fields }
RedisStreamsPacket defines the RedisStreamsPacket Message Packet Object. Apart from Base Packet, it will contain Redis Connection Object
func (*RedisStreamsPacket) Accept ¶
func (rp *RedisStreamsPacket) Accept(fn MsgProcess) error
Accept implmentation need to be added
func (*RedisStreamsPacket) Close ¶
func (rp *RedisStreamsPacket) Close() error
Close implmentation need to be added
func (*RedisStreamsPacket) Distribute ¶
func (rp *RedisStreamsPacket) Distribute(data interface{}) error
Distribute defines the Producer / Publisher role and functionality. Writer would be created for each Pipe comes-in for communication. If Writer already exists, that connection would be used for this call. Before publishing the message in the specified Pipe, it will be converted into Byte stream using
func (*RedisStreamsPacket) Get ¶
func (rp *RedisStreamsPacket) Get(pipe string, d interface{}) interface{}
Get - Not supported for now in RedisStreams from Message Bus side due to limitations
func (*RedisStreamsPacket) Ping ¶
func (rp *RedisStreamsPacket) Ping() bool
Ping function is used to test the db connection with ping command
func (*RedisStreamsPacket) Read ¶
func (rp *RedisStreamsPacket) Read(fn MsgProcess) error
Read implmentation need to be added
func (*RedisStreamsPacket) Remove ¶
func (rp *RedisStreamsPacket) Remove() error
Remove implmentation need to be added