Documentation ¶
Index ¶
- Constants
- Variables
- func AddStream(js nats.JetStreamContext, streamConfig *nats.StreamConfig, ...) error
- func GetStreamSubjects(streamName string) []string
- func ParseAndFillStreamWiseAndConsumerWiseConfigMaps()
- type ConfigJson
- type LoggerFunc
- type NatsClient
- type NatsClientConfig
- type NatsConsumerConfig
- type NatsStreamConfig
- type NatsTopic
- type PubSubClientService
- type PubSubClientServiceImpl
- func (impl PubSubClientServiceImpl) Log(loggerFunc LoggerFunc, topic string, subMsg model.PubSubMsg)
- func (impl PubSubClientServiceImpl) Publish(topic string, msg string) error
- func (impl PubSubClientServiceImpl) Subscribe(topic string, callback func(msg *model.PubSubMsg), loggerFunc LoggerFunc, ...) error
- func (impl PubSubClientServiceImpl) TryCatchCallBack(msg *nats.Msg, callback func(msg *model.PubSubMsg), loggerFunc LoggerFunc, ...)
- type StreamConfig
- type ValidateMsg
Constants ¶
View Source
const ( CI_RUNNER_STREAM string = "CI-RUNNER" ORCHESTRATOR_STREAM string = "ORCHESTRATOR" KUBEWATCH_STREAM string = "KUBEWATCH" GIT_SENSOR_STREAM string = "GIT-SENSOR" IMAGE_SCANNER_STREAM string = "IMAGE-SCANNER" BULK_APPSTORE_DEPLOY_TOPIC string = "APP-STORE.BULK-DEPLOY" BULK_APPSTORE_DEPLOY_GROUP string = "APP-STORE-BULK-DEPLOY-GROUP-1" BULK_APPSTORE_DEPLOY_DURABLE string = "APP-STORE-BULK-DEPLOY-DURABLE-1" CD_STAGE_COMPLETE_TOPIC string = "CD-STAGE-COMPLETE" CD_COMPLETE_GROUP string = "CD-COMPLETE_GROUP-1" CD_COMPLETE_DURABLE string = "CD-COMPLETE_DURABLE-1" BULK_DEPLOY_TOPIC string = "CD.BULK" BULK_HIBERNATE_TOPIC string = "CD.BULK-HIBERNATE" BULK_DEPLOY_GROUP string = "CD.BULK.GROUP-1" BULK_HIBERNATE_GROUP string = "CD.BULK-HIBERNATE.GROUP-1" BULK_DEPLOY_DURABLE string = "CD-BULK-DURABLE-1" BULK_HIBERNATE_DURABLE string = "CD-BULK-HIBERNATE-DURABLE-1" CI_COMPLETE_TOPIC string = "CI-COMPLETE" CI_COMPLETE_GROUP string = "CI-COMPLETE_GROUP-1" CI_COMPLETE_DURABLE string = "CI-COMPLETE_DURABLE-1" APPLICATION_STATUS_UPDATE_TOPIC string = "APPLICATION_STATUS_UPDATE" APPLICATION_STATUS_UPDATE_GROUP string = "APPLICATION_STATUS_UPDATE_GROUP-1" APPLICATION_STATUS_UPDATE_DURABLE string = "APPLICATION_STATUS_UPDATE_DURABLE-1" APPLICATION_STATUS_DELETE_TOPIC string = "APPLICATION_STATUS_DELETE" APPLICATION_STATUS_DELETE_GROUP string = "APPLICATION_STATUS_DELETE_GROUP-1" APPLICATION_STATUS_DELETE_DURABLE string = "APPLICATION_STATUS_DELETE_DURABLE-1" CRON_EVENTS string = "CRON_EVENTS" CRON_EVENTS_GROUP string = "CRON_EVENTS_GROUP-2" CRON_EVENTS_DURABLE string = "CRON_EVENTS_DURABLE-2" WORKFLOW_STATUS_UPDATE_TOPIC string = "WORKFLOW_STATUS_UPDATE" WORKFLOW_STATUS_UPDATE_GROUP string = "WORKFLOW_STATUS_UPDATE_GROUP-1" WORKFLOW_STATUS_UPDATE_DURABLE string = "WORKFLOW_STATUS_UPDATE_DURABLE-1" CD_WORKFLOW_STATUS_UPDATE string = "CD_WORKFLOW_STATUS_UPDATE" CD_WORKFLOW_STATUS_UPDATE_GROUP string = "CD_WORKFLOW_STATUS_UPDATE_GROUP-1" CD_WORKFLOW_STATUS_UPDATE_DURABLE string = "CD_WORKFLOW_STATUS_UPDATE_DURABLE-1" NEW_CI_MATERIAL_TOPIC string = "NEW-CI-MATERIAL" NEW_CI_MATERIAL_TOPIC_GROUP string = "NEW-CI-MATERIAL_GROUP-1" NEW_CI_MATERIAL_TOPIC_DURABLE string = "NEW-CI-MATERIAL_DURABLE-1" CD_SUCCESS string = "CD.TRIGGER" CD_TRIGGER_GROUP string = "CD.TRIGGER_GRP1" CD_TRIGGER_DURABLE string = "CD-TRIGGER-DURABLE1" WEBHOOK_EVENT_TOPIC string = "WEBHOOK_EVENT" WEBHOOK_EVENT_GROUP string = "WEBHOOK_EVENT_GRP" WEBHOOK_EVENT_DURABLE string = "WEBHOOK_EVENT_DURABLE" DEVTRON_TEST_TOPIC string = "Test_Topic" DEVTRON_TEST_STREAM string = "Devtron_Test_Stream" DEVTRON_TEST_QUEUE string = "Test_Topic_Queue" DEVTRON_TEST_CONSUMER string = "Test_Topic_Consumer" TOPIC_CI_SCAN string = "CI-SCAN" TOPIC_CI_SCAN_GRP string = "CI-SCAN-GRP-1" TOPIC_CI_SCAN_DURABLE string = "CI-SCAN-DURABLE-1" ARGO_PIPELINE_STATUS_UPDATE_TOPIC string = "ARGO_PIPELINE_STATUS_UPDATE" ARGO_PIPELINE_STATUS_UPDATE_GROUP string = "ARGO_PIPELINE_STATUS_UPDATE_GROUP-1" ARGO_PIPELINE_STATUS_UPDATE_DURABLE string = "ARGO_PIPELINE_STATUS_UPDATE_DURABLE-1" CD_BULK_DEPLOY_TRIGGER_TOPIC string = "CD-BULK-DEPLOY-TRIGGER" CD_BULK_DEPLOY_TRIGGER_GROUP string = "CD-BULK-DEPLOY-TRIGGER-GROUP-1" CD_BULK_DEPLOY_TRIGGER_DURABLE string = "CD-BULK-DEPLOY-TRIGGER-DURABLE-1" HELM_CHART_INSTALL_STATUS_TOPIC string = "HELM-CHART-INSTALL-STATUS-TOPIC" HELM_CHART_INSTALL_STATUS_GROUP string = "HELM-CHART-INSTALL-STATUS-GROUP" HELM_CHART_INSTALL_STATUS_DURABLE string = "HELM-CHART-INSTALL-STATUS-DURABLE" DEVTRON_CHART_INSTALL_TOPIC string = "DEVTRON-CHART-INSTALL-TOPIC" DEVTRON_CHART_INSTALL_GROUP string = "DEVTRON-CHART-INSTALL-GROUP" DEVTRON_CHART_INSTALL_DURABLE string = "DEVTRON-CHART-INSTALL-DURABLE" PANIC_ON_PROCESSING_TOPIC string = "PANIC-ON-PROCESSING-TOPIC" PANIC_ON_PROCESSING_GROUP string = "PANIC-ON-PROCESSING-GROUP" PANIC_ON_PROCESSING_DURABLE string = "PANIC-ON-PROCESSING-DURABLE" CD_STAGE_SUCCESS_EVENT_TOPIC string = "CD-STAGE-SUCCESS-EVENT" CD_STAGE_SUCCESS_EVENT_GROUP string = "CD-STAGE-SUCCESS-EVENT-GROUP" CD_STAGE_SUCCESS_EVENT_DURABLE string = "CD-STAGE-SUCCESS-EVENT-DURABLE" )
View Source
const NATS_MSG_LOG_PREFIX = "NATS_LOG"
Variables ¶
View Source
var NatsConsumerWiseConfigMapping = map[string]NatsConsumerConfig{ ARGO_PIPELINE_STATUS_UPDATE_DURABLE: {}, TOPIC_CI_SCAN_DURABLE: {}, NEW_CI_MATERIAL_TOPIC_DURABLE: {}, CD_WORKFLOW_STATUS_UPDATE_DURABLE: {}, WORKFLOW_STATUS_UPDATE_DURABLE: {}, CRON_EVENTS_DURABLE: {}, APPLICATION_STATUS_UPDATE_DURABLE: {}, APPLICATION_STATUS_DELETE_DURABLE: {}, CD_COMPLETE_DURABLE: {}, CI_COMPLETE_DURABLE: {}, WEBHOOK_EVENT_DURABLE: {}, CD_TRIGGER_DURABLE: {}, BULK_HIBERNATE_DURABLE: {}, BULK_DEPLOY_DURABLE: {}, BULK_APPSTORE_DEPLOY_DURABLE: {}, CD_BULK_DEPLOY_TRIGGER_DURABLE: {}, HELM_CHART_INSTALL_STATUS_DURABLE: {}, DEVTRON_CHART_INSTALL_DURABLE: {}, PANIC_ON_PROCESSING_DURABLE: {}, DEVTRON_TEST_CONSUMER: {}, }
View Source
var NatsStreamWiseConfigMapping = map[string]NatsStreamConfig{ ORCHESTRATOR_STREAM: {}, CI_RUNNER_STREAM: {}, KUBEWATCH_STREAM: {}, GIT_SENSOR_STREAM: {}, IMAGE_SCANNER_STREAM: {}, DEVTRON_TEST_STREAM: {}, }
Functions ¶
func GetStreamSubjects ¶
func ParseAndFillStreamWiseAndConsumerWiseConfigMaps ¶
func ParseAndFillStreamWiseAndConsumerWiseConfigMaps()
Types ¶
type ConfigJson ¶
type ConfigJson struct { // StreamConfigJson is a json string of map[string]NatsStreamConfig StreamConfigJson string `env:"STREAM_CONFIG_JSON"` // ConsumerConfigJson is a json string of map[string]NatsConsumerConfig // eg: "{\"ARGO_PIPELINE_STATUS_UPDATE_DURABLE-1\" : \"{\"natsMsgProcessingBatchSize\" : 3, \"natsMsgBufferSize\" : 3, \"ackWaitInSecs\": 300}\"}" ConsumerConfigJson string `env:"CONSUMER_CONFIG_JSON"` }
type LoggerFunc ¶ added in v0.0.10
LoggerFunc is used to log the message before passing to callback function. it expects logg message and key value pairs to be returned. if keysAndValues is empty, it will log whole model.PubSubMsg
type NatsClient ¶
type NatsClient struct { JetStrCtxt nats.JetStreamContext Conn *nats.Conn // contains filtered or unexported fields }
func NewNatsClient ¶
func NewNatsClient(logger *zap.SugaredLogger) (*NatsClient, error)
type NatsClientConfig ¶
type NatsClientConfig struct { NatsServerHost string `env:"NATS_SERVER_HOST" envDefault:"nats://devtron-nats.devtroncd:4222"` // consumer wise // NatsMsgProcessingBatchSize is the number of messages that will be processed in one go NatsMsgProcessingBatchSize int `env:"NATS_MSG_PROCESSING_BATCH_SIZE" envDefault:"1"` // NatsMsgBufferSize is the number of messages that will be buffered in memory (channel size) // it is recommended to set this value equal to NatsMsgProcessingBatchSize as we want to process maximum messages in the buffer in one go. // Note: if NatsMsgBufferSize is less than NatsMsgProcessingBatchSize // then the wait time for the unprocessed messages in the buffer will be high.(total process time = life-time in buffer + processing time) // NatsMsgBufferSize can be configured independently of NatsMsgProcessingBatchSize if needed by setting its value to positive value in env. // if NatsMsgBufferSize set to a non-positive value then it will take the value of NatsMsgProcessingBatchSize. // Note: always get this value by calling GetNatsMsgBufferSize method NatsMsgBufferSize int `env:"NATS_MSG_BUFFER_SIZE" envDefault:"-1"` NatsMsgMaxAge int `env:"NATS_MSG_MAX_AGE" envDefault:"86400"` NatsMsgAckWaitInSecs int `env:"NATS_MSG_ACK_WAIT_IN_SECS" envDefault:"120"` }
func (NatsClientConfig) GetDefaultNatsConsumerConfig ¶ added in v0.0.10
func (ncc NatsClientConfig) GetDefaultNatsConsumerConfig() NatsConsumerConfig
func (NatsClientConfig) GetDefaultNatsStreamConfig ¶ added in v0.0.10
func (ncc NatsClientConfig) GetDefaultNatsStreamConfig() NatsStreamConfig
func (NatsClientConfig) GetNatsMsgBufferSize ¶ added in v0.0.10
func (ncc NatsClientConfig) GetNatsMsgBufferSize() int
type NatsConsumerConfig ¶
type NatsConsumerConfig struct { // NatsMsgProcessingBatchSize is the number of messages that will be processed in one go NatsMsgProcessingBatchSize int `json:"natsMsgProcessingBatchSize"` // NatsMsgBufferSize is the number of messages that will be buffered in memory (channel size). // Note: always get this value by calling GetNatsMsgBufferSize method NatsMsgBufferSize int `json:"natsMsgBufferSize"` // AckWaitInSecs is the time in seconds for which the message can be in unacknowledged state AckWaitInSecs int `json:"ackWaitInSecs"` }
func (NatsConsumerConfig) GetNatsMsgBufferSize ¶ added in v0.0.10
func (consumerConf NatsConsumerConfig) GetNatsMsgBufferSize() int
type NatsStreamConfig ¶
type NatsStreamConfig struct {
StreamConfig StreamConfig `json:"streamConfig"`
}
type NatsTopic ¶
type NatsTopic struct {
// contains filtered or unexported fields
}
func GetNatsTopic ¶
type PubSubClientService ¶
type PubSubClientService interface { Publish(topic string, msg string) error Subscribe(topic string, callback func(msg *model.PubSubMsg), loggerFunc LoggerFunc, validations ...ValidateMsg) error }
type PubSubClientServiceImpl ¶
type PubSubClientServiceImpl struct { Logger *zap.SugaredLogger NatsClient *NatsClient // contains filtered or unexported fields }
func NewPubSubClientServiceImpl ¶
func NewPubSubClientServiceImpl(logger *zap.SugaredLogger) *PubSubClientServiceImpl
func (PubSubClientServiceImpl) Log ¶ added in v0.0.10
func (impl PubSubClientServiceImpl) Log(loggerFunc LoggerFunc, topic string, subMsg model.PubSubMsg)
func (PubSubClientServiceImpl) Publish ¶
func (impl PubSubClientServiceImpl) Publish(topic string, msg string) error
func (PubSubClientServiceImpl) Subscribe ¶
func (impl PubSubClientServiceImpl) Subscribe(topic string, callback func(msg *model.PubSubMsg), loggerFunc LoggerFunc, validations ...ValidateMsg) error
Subscribe method is used to subscribe to the given topic(+required), this creates blocking process to continuously fetch messages from nats server published on this topic. invokes callback(+required) func for each message received. loggerFunc(+optional) is invoked before passing the message to the callback function. validations(+optional) methods were called before passing the message to the callback func.
func (PubSubClientServiceImpl) TryCatchCallBack ¶ added in v0.0.8
func (impl PubSubClientServiceImpl) TryCatchCallBack(msg *nats.Msg, callback func(msg *model.PubSubMsg), loggerFunc LoggerFunc, validations ...ValidateMsg)
TryCatchCallBack is a fail-safe method to use callback function
type StreamConfig ¶
type ValidateMsg ¶ added in v0.0.10
Click to show internal directories.
Click to hide internal directories.