pubsub_lib

package
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2024 License: Apache-2.0 Imports: 12 Imported by: 20

Documentation

Index

Constants

View Source
const (
	CI_RUNNER_STREAM                              string = "CI-RUNNER"
	ORCHESTRATOR_STREAM                           string = "ORCHESTRATOR"
	KUBEWATCH_STREAM                              string = "KUBEWATCH"
	GIT_SENSOR_STREAM                             string = "GIT-SENSOR"
	IMAGE_SCANNER_STREAM                          string = "IMAGE-SCANNER"
	BULK_APPSTORE_DEPLOY_TOPIC                    string = "APP-STORE.BULK-DEPLOY"
	BULK_APPSTORE_DEPLOY_GROUP                    string = "APP-STORE-BULK-DEPLOY-GROUP-1"
	BULK_APPSTORE_DEPLOY_DURABLE                  string = "APP-STORE-BULK-DEPLOY-DURABLE-1"
	CD_STAGE_COMPLETE_TOPIC                       string = "CD-STAGE-COMPLETE"
	CD_COMPLETE_GROUP                             string = "CD-COMPLETE_GROUP-1"
	CD_COMPLETE_DURABLE                           string = "CD-COMPLETE_DURABLE-1"
	BULK_DEPLOY_TOPIC                             string = "CD.BULK"
	BULK_HIBERNATE_TOPIC                          string = "CD.BULK-HIBERNATE"
	BULK_DEPLOY_GROUP                             string = "CD.BULK.GROUP-1"
	BULK_HIBERNATE_GROUP                          string = "CD.BULK-HIBERNATE.GROUP-1"
	BULK_DEPLOY_DURABLE                           string = "CD-BULK-DURABLE-1"
	BULK_HIBERNATE_DURABLE                        string = "CD-BULK-HIBERNATE-DURABLE-1"
	CI_COMPLETE_TOPIC                             string = "CI-COMPLETE"
	CI_COMPLETE_GROUP                             string = "CI-COMPLETE_GROUP-1"
	CI_COMPLETE_DURABLE                           string = "CI-COMPLETE_DURABLE-1"
	IMAGE_SCANNING_SUCCESS_TOPIC                  string = "IMAGE-SCANNING-SUCCESS"
	IMAGE_SCANNING_SUCCESS_GROUP                  string = "IMAGE-SCANNING-SUCCESS-GROUP"
	IMAGE_SCANNING_SUCCESS_DURABLE                string = "IMAGE-SCANNING-SUCCESS-DURABLE"
	APPLICATION_STATUS_UPDATE_TOPIC               string = "APPLICATION_STATUS_UPDATE"
	APPLICATION_STATUS_UPDATE_GROUP               string = "APPLICATION_STATUS_UPDATE_GROUP-1"
	APPLICATION_STATUS_UPDATE_DURABLE             string = "APPLICATION_STATUS_UPDATE_DURABLE-1"
	APPLICATION_STATUS_DELETE_TOPIC               string = "APPLICATION_STATUS_DELETE"
	APPLICATION_STATUS_DELETE_GROUP               string = "APPLICATION_STATUS_DELETE_GROUP-1"
	APPLICATION_STATUS_DELETE_DURABLE             string = "APPLICATION_STATUS_DELETE_DURABLE-1"
	CRON_EVENTS                                   string = "CRON_EVENTS"
	CRON_EVENTS_GROUP                             string = "CRON_EVENTS_GROUP-2"
	CRON_EVENTS_DURABLE                           string = "CRON_EVENTS_DURABLE-2"
	WORKFLOW_STATUS_UPDATE_TOPIC                  string = "WORKFLOW_STATUS_UPDATE"
	WORKFLOW_STATUS_UPDATE_GROUP                  string = "WORKFLOW_STATUS_UPDATE_GROUP-1"
	WORKFLOW_STATUS_UPDATE_DURABLE                string = "WORKFLOW_STATUS_UPDATE_DURABLE-1"
	CD_WORKFLOW_STATUS_UPDATE                     string = "CD_WORKFLOW_STATUS_UPDATE"
	CD_WORKFLOW_STATUS_UPDATE_GROUP               string = "CD_WORKFLOW_STATUS_UPDATE_GROUP-1"
	CD_WORKFLOW_STATUS_UPDATE_DURABLE             string = "CD_WORKFLOW_STATUS_UPDATE_DURABLE-1"
	NEW_CI_MATERIAL_TOPIC                         string = "NEW-CI-MATERIAL"
	NEW_CI_MATERIAL_TOPIC_GROUP                   string = "NEW-CI-MATERIAL_GROUP-1"
	NEW_CI_MATERIAL_TOPIC_DURABLE                 string = "NEW-CI-MATERIAL_DURABLE-1"
	CD_SUCCESS                                    string = "CD.TRIGGER"
	CD_TRIGGER_GROUP                              string = "CD.TRIGGER_GRP1"
	CD_TRIGGER_DURABLE                            string = "CD-TRIGGER-DURABLE1"
	WEBHOOK_EVENT_TOPIC                           string = "WEBHOOK_EVENT"
	WEBHOOK_EVENT_GROUP                           string = "WEBHOOK_EVENT_GRP"
	WEBHOOK_EVENT_DURABLE                         string = "WEBHOOK_EVENT_DURABLE"
	DEVTRON_TEST_TOPIC                            string = "Test_Topic"
	DEVTRON_TEST_STREAM                           string = "Devtron_Test_Stream"
	DEVTRON_TEST_QUEUE                            string = "Test_Topic_Queue"
	DEVTRON_TEST_CONSUMER                         string = "Test_Topic_Consumer"
	TOPIC_CI_SCAN                                 string = "CI-SCAN"
	TOPIC_CI_SCAN_GRP                             string = "CI-SCAN-GRP-1"
	TOPIC_CI_SCAN_DURABLE                         string = "CI-SCAN-DURABLE-1"
	ARGO_PIPELINE_STATUS_UPDATE_TOPIC             string = "ARGO_PIPELINE_STATUS_UPDATE"
	ARGO_PIPELINE_STATUS_UPDATE_GROUP             string = "ARGO_PIPELINE_STATUS_UPDATE_GROUP-1"
	ARGO_PIPELINE_STATUS_UPDATE_DURABLE           string = "ARGO_PIPELINE_STATUS_UPDATE_DURABLE-1"
	CD_BULK_DEPLOY_TRIGGER_TOPIC                  string = "CD-BULK-DEPLOY-TRIGGER"
	CD_BULK_DEPLOY_TRIGGER_GROUP                  string = "CD-BULK-DEPLOY-TRIGGER-GROUP-1"
	CD_BULK_DEPLOY_TRIGGER_DURABLE                string = "CD-BULK-DEPLOY-TRIGGER-DURABLE-1"
	HELM_CHART_INSTALL_STATUS_TOPIC               string = "HELM-CHART-INSTALL-STATUS-TOPIC"
	HELM_CHART_INSTALL_STATUS_GROUP               string = "HELM-CHART-INSTALL-STATUS-GROUP"
	HELM_CHART_INSTALL_STATUS_DURABLE             string = "HELM-CHART-INSTALL-STATUS-DURABLE"
	DEVTRON_CHART_INSTALL_TOPIC                   string = "DEVTRON-CHART-INSTALL-TOPIC"
	DEVTRON_CHART_INSTALL_GROUP                   string = "DEVTRON-CHART-INSTALL-GROUP"
	DEVTRON_CHART_INSTALL_DURABLE                 string = "DEVTRON-CHART-INSTALL-DURABLE"
	DEVTRON_CHART_PRIORITY_INSTALL_TOPIC          string = "DEVTRON-CHART-PRIORITY-INSTALL-TOPIC"
	DEVTRON_CHART_PRIORITY_INSTALL_GROUP          string = "DEVTRON-CHART-PRIORITY-INSTALL-GROUP"
	DEVTRON_CHART_PRIORITY_INSTALL_DURABLE        string = "DEVTRON-CHART-PRIORITY-INSTALL-DURABLE"
	DEVTRON_CHART_GITOPS_INSTALL_TOPIC            string = "DEVTRON-CHART-GITOPS-INSTALL-TOPIC"
	DEVTRON_CHART_GITOPS_INSTALL_GROUP            string = "DEVTRON-CHART-GITOPS-INSTALL-GROUP"
	DEVTRON_CHART_GITOPS_INSTALL_DURABLE          string = "DEVTRON-CHART-GITOPS-INSTALL-DURABLE"
	DEVTRON_CHART_GITOPS_PRIORITY_INSTALL_TOPIC   string = "DEVTRON-CHART-PRIORITY-GITOPS-INSTALL-TOPIC"
	DEVTRON_CHART_GITOPS_PRIORITY_INSTALL_GROUP   string = "DEVTRON-CHART-PRIORITY-GITOPS-INSTALL-GROUP"
	DEVTRON_CHART_GITOPS_PRIORITY_INSTALL_DURABLE string = "DEVTRON-CHART-PRIORITY-GITOPS-INSTALL-DURABLE"
	PANIC_ON_PROCESSING_TOPIC                     string = "PANIC-ON-PROCESSING-TOPIC"
	PANIC_ON_PROCESSING_GROUP                     string = "PANIC-ON-PROCESSING-GROUP"
	PANIC_ON_PROCESSING_DURABLE                   string = "PANIC-ON-PROCESSING-DURABLE"
	CD_STAGE_SUCCESS_EVENT_TOPIC                  string = "CD-STAGE-SUCCESS-EVENT"
	CD_STAGE_SUCCESS_EVENT_GROUP                  string = "CD-STAGE-SUCCESS-EVENT-GROUP"
	CD_STAGE_SUCCESS_EVENT_DURABLE                string = "CD-STAGE-SUCCESS-EVENT-DURABLE"
	CD_PIPELINE_DELETE_EVENT_TOPIC                string = "CD-PIPELINE-DELETE-EVENT"
	CD_PIPELINE_DELETE_EVENT_GROUP                string = "CD-PIPELINE-DELETE-EVENT-GROUP"
	CD_PIPELINE_DELETE_EVENT_DURABLE              string = "CD-PIPELINE-DELETE-EVENT-DURABLE"
	CHART_SCAN_TOPIC                              string = "CHART-SCAN-TOPIC"
	CHART_SCAN_GROUP                              string = "CHART-SCAN-GROUP"
	CHART_SCAN_DURABLE                            string = "CHART-SCAN-DURABLE"
	NOTIFICATION_EVENT_TOPIC                      string = "NOTIFICATION_EVENT_TOPIC"
	NOTIFICATION_EVENT_GROUP                      string = "NOTIFICATION_EVENT_GROUP"
	NOTIFICATION_EVENT_DURABLE                    string = "NOTIFICATION_EVENT_DURABLE"
)
View Source
const NATS_MSG_LOG_PREFIX = "NATS_LOG"
View Source
const NATS_PANIC_MSG_LOG_PREFIX = "NATS_PANIC_LOG"

Variables

Functions

func AddStream

func AddStream(js nats.JetStreamContext, streamConfig *nats.StreamConfig, streamNames ...string) error

func GetStreamSubjects

func GetStreamSubjects(streamName string) []string

func ParseAndFillStreamWiseAndConsumerWiseConfigMaps

func ParseAndFillStreamWiseAndConsumerWiseConfigMaps() error

Types

type ConfigJson

type ConfigJson struct {
	// StreamConfigJson is a json string of map[string]NatsStreamConfig
	StreamConfigJson string `env:"STREAM_CONFIG_JSON"`
	// ConsumerConfigJson is a json string of map[string]NatsConsumerConfig
	// eg: "{\"ARGO_PIPELINE_STATUS_UPDATE_DURABLE-1\" : \"{\"natsMsgProcessingBatchSize\" : 3, \"natsMsgBufferSize\" : 3, \"ackWaitInSecs\": 300}\"}"
	ConsumerConfigJson string `env:"CONSUMER_CONFIG_JSON"`
}

type LoggerFunc added in v0.0.10

type LoggerFunc func(msg model.PubSubMsg) (logMsg string, keysAndValues []interface{})

LoggerFunc is used to log the message before passing to callback function. it expects logg message and key value pairs to be returned. if keysAndValues is empty, it will log whole model.PubSubMsg

type NatsClient

type NatsClient struct {
	JetStrCtxt nats.JetStreamContext
	Conn       *nats.Conn
	ConnWg     *sync.WaitGroup
	// contains filtered or unexported fields
}

func NewNatsClient

func NewNatsClient(logger *zap.SugaredLogger) (*NatsClient, error)

type NatsClientConfig

type NatsClientConfig struct {
	NatsServerHost string `env:"NATS_SERVER_HOST" envDefault:"nats://devtron-nats.devtroncd:4222"`

	// consumer wise
	// NatsMsgProcessingBatchSize is the number of messages that will be processed in one go
	NatsMsgProcessingBatchSize int `env:"NATS_MSG_PROCESSING_BATCH_SIZE" envDefault:"1"`

	// NatsMsgBufferSize is the number of messages that will be buffered in memory (channel size)
	// it is recommended to set this value equal to NatsMsgProcessingBatchSize as we want to process maximum messages in the buffer in one go.
	// Note: if NatsMsgBufferSize is less than NatsMsgProcessingBatchSize
	// then the wait time for the unprocessed messages in the buffer will be high.(total process time = life-time in buffer + processing time)
	// NatsMsgBufferSize can be configured independently of NatsMsgProcessingBatchSize if needed by setting its value to positive value in env.
	// if NatsMsgBufferSize set to a non-positive value then it will take the value of NatsMsgProcessingBatchSize.
	// Note: always get this value by calling GetNatsMsgBufferSize method
	NatsMsgBufferSize    int `env:"NATS_MSG_BUFFER_SIZE" envDefault:"-1"`
	NatsMsgMaxAge        int `env:"NATS_MSG_MAX_AGE" envDefault:"86400"`
	NatsMsgAckWaitInSecs int `env:"NATS_MSG_ACK_WAIT_IN_SECS" envDefault:"120"`
}

func (NatsClientConfig) GetDefaultNatsConsumerConfig added in v0.0.10

func (ncc NatsClientConfig) GetDefaultNatsConsumerConfig() NatsConsumerConfig

func (NatsClientConfig) GetDefaultNatsStreamConfig added in v0.0.10

func (ncc NatsClientConfig) GetDefaultNatsStreamConfig() NatsStreamConfig

func (NatsClientConfig) GetNatsMsgBufferSize added in v0.0.10

func (ncc NatsClientConfig) GetNatsMsgBufferSize() int

type NatsConsumerConfig

type NatsConsumerConfig struct {
	// NatsMsgProcessingBatchSize is the number of messages that will be processed in one go
	NatsMsgProcessingBatchSize int `json:"natsMsgProcessingBatchSize"`
	// NatsMsgBufferSize is the number of messages that will be buffered in memory (channel size).
	// Note: always get this value by calling GetNatsMsgBufferSize method
	NatsMsgBufferSize int `json:"natsMsgBufferSize"`
	// AckWaitInSecs is the time in seconds for which the message can be in unacknowledged state
	AckWaitInSecs int `json:"ackWaitInSecs"`
}

func (NatsConsumerConfig) GetNatsMsgBufferSize added in v0.0.10

func (consumerConf NatsConsumerConfig) GetNatsMsgBufferSize() int

type NatsStreamConfig

type NatsStreamConfig struct {
	StreamConfig StreamConfig `json:"streamConfig"`
}

type NatsTopic

type NatsTopic struct {
	// contains filtered or unexported fields
}

func GetNatsTopic

func GetNatsTopic(topicName string) NatsTopic

type PubSubClientService

type PubSubClientService interface {
	Publish(topic string, msg string) error
	Subscribe(topic string, callback func(msg *model.PubSubMsg), loggerFunc LoggerFunc, validations ...ValidateMsg) error
	ShutDown() error
}

type PubSubClientServiceImpl

type PubSubClientServiceImpl struct {
	Logger     *zap.SugaredLogger
	NatsClient *NatsClient
	// contains filtered or unexported fields
}

func NewPubSubClientServiceImpl

func NewPubSubClientServiceImpl(logger *zap.SugaredLogger) (*PubSubClientServiceImpl, error)

func (PubSubClientServiceImpl) Log added in v0.0.10

func (impl PubSubClientServiceImpl) Log(loggerFunc LoggerFunc, topic string, subMsg model.PubSubMsg)

func (PubSubClientServiceImpl) Publish

func (impl PubSubClientServiceImpl) Publish(topic string, msg string) error

func (PubSubClientServiceImpl) ShutDown added in v0.0.19

func (impl PubSubClientServiceImpl) ShutDown() error

func (PubSubClientServiceImpl) Subscribe

func (impl PubSubClientServiceImpl) Subscribe(topic string, callback func(msg *model.PubSubMsg), loggerFunc LoggerFunc, validations ...ValidateMsg) error

func (PubSubClientServiceImpl) TryCatchCallBack added in v0.0.8

func (impl PubSubClientServiceImpl) TryCatchCallBack(msg *nats.Msg, callback func(msg *model.PubSubMsg), loggerFunc LoggerFunc, validations ...ValidateMsg)

TryCatchCallBack is a fail-safe method to use callback function

type StreamConfig

type StreamConfig struct {
	MaxAge time.Duration `json:"max_age"`
}

type ValidateMsg added in v0.0.10

type ValidateMsg func(msg model.PubSubMsg) bool

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL