pubsub_lib

package
v0.0.15-beta1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2024 License: Apache-2.0 Imports: 12 Imported by: 29

Documentation

Index

Constants

View Source
const (
	CI_RUNNER_STREAM                    string = "CI-RUNNER"
	ORCHESTRATOR_STREAM                 string = "ORCHESTRATOR"
	KUBEWATCH_STREAM                    string = "KUBEWATCH"
	GIT_SENSOR_STREAM                   string = "GIT-SENSOR"
	IMAGE_SCANNER_STREAM                string = "IMAGE-SCANNER"
	BULK_APPSTORE_DEPLOY_TOPIC          string = "APP-STORE.BULK-DEPLOY"
	BULK_APPSTORE_DEPLOY_GROUP          string = "APP-STORE-BULK-DEPLOY-GROUP-1"
	BULK_APPSTORE_DEPLOY_DURABLE        string = "APP-STORE-BULK-DEPLOY-DURABLE-1"
	CD_STAGE_COMPLETE_TOPIC             string = "CD-STAGE-COMPLETE"
	CD_COMPLETE_GROUP                   string = "CD-COMPLETE_GROUP-1"
	CD_COMPLETE_DURABLE                 string = "CD-COMPLETE_DURABLE-1"
	BULK_DEPLOY_TOPIC                   string = "CD.BULK"
	BULK_HIBERNATE_TOPIC                string = "CD.BULK-HIBERNATE"
	BULK_DEPLOY_GROUP                   string = "CD.BULK.GROUP-1"
	BULK_HIBERNATE_GROUP                string = "CD.BULK-HIBERNATE.GROUP-1"
	BULK_DEPLOY_DURABLE                 string = "CD-BULK-DURABLE-1"
	BULK_HIBERNATE_DURABLE              string = "CD-BULK-HIBERNATE-DURABLE-1"
	CI_COMPLETE_TOPIC                   string = "CI-COMPLETE"
	CI_COMPLETE_GROUP                   string = "CI-COMPLETE_GROUP-1"
	CI_COMPLETE_DURABLE                 string = "CI-COMPLETE_DURABLE-1"
	APPLICATION_STATUS_UPDATE_TOPIC     string = "APPLICATION_STATUS_UPDATE"
	APPLICATION_STATUS_UPDATE_GROUP     string = "APPLICATION_STATUS_UPDATE_GROUP-1"
	APPLICATION_STATUS_UPDATE_DURABLE   string = "APPLICATION_STATUS_UPDATE_DURABLE-1"
	APPLICATION_STATUS_DELETE_TOPIC     string = "APPLICATION_STATUS_DELETE"
	APPLICATION_STATUS_DELETE_GROUP     string = "APPLICATION_STATUS_DELETE_GROUP-1"
	APPLICATION_STATUS_DELETE_DURABLE   string = "APPLICATION_STATUS_DELETE_DURABLE-1"
	CRON_EVENTS                         string = "CRON_EVENTS"
	CRON_EVENTS_GROUP                   string = "CRON_EVENTS_GROUP-2"
	CRON_EVENTS_DURABLE                 string = "CRON_EVENTS_DURABLE-2"
	WORKFLOW_STATUS_UPDATE_TOPIC        string = "WORKFLOW_STATUS_UPDATE"
	WORKFLOW_STATUS_UPDATE_GROUP        string = "WORKFLOW_STATUS_UPDATE_GROUP-1"
	WORKFLOW_STATUS_UPDATE_DURABLE      string = "WORKFLOW_STATUS_UPDATE_DURABLE-1"
	CD_WORKFLOW_STATUS_UPDATE           string = "CD_WORKFLOW_STATUS_UPDATE"
	CD_WORKFLOW_STATUS_UPDATE_GROUP     string = "CD_WORKFLOW_STATUS_UPDATE_GROUP-1"
	CD_WORKFLOW_STATUS_UPDATE_DURABLE   string = "CD_WORKFLOW_STATUS_UPDATE_DURABLE-1"
	NEW_CI_MATERIAL_TOPIC               string = "NEW-CI-MATERIAL"
	NEW_CI_MATERIAL_TOPIC_GROUP         string = "NEW-CI-MATERIAL_GROUP-1"
	NEW_CI_MATERIAL_TOPIC_DURABLE       string = "NEW-CI-MATERIAL_DURABLE-1"
	CD_SUCCESS                          string = "CD.TRIGGER"
	CD_TRIGGER_GROUP                    string = "CD.TRIGGER_GRP1"
	CD_TRIGGER_DURABLE                  string = "CD-TRIGGER-DURABLE1"
	WEBHOOK_EVENT_TOPIC                 string = "WEBHOOK_EVENT"
	WEBHOOK_EVENT_GROUP                 string = "WEBHOOK_EVENT_GRP"
	WEBHOOK_EVENT_DURABLE               string = "WEBHOOK_EVENT_DURABLE"
	DEVTRON_TEST_TOPIC                  string = "Test_Topic"
	DEVTRON_TEST_STREAM                 string = "Devtron_Test_Stream"
	DEVTRON_TEST_QUEUE                  string = "Test_Topic_Queue"
	DEVTRON_TEST_CONSUMER               string = "Test_Topic_Consumer"
	TOPIC_CI_SCAN                       string = "CI-SCAN"
	TOPIC_CI_SCAN_GRP                   string = "CI-SCAN-GRP-1"
	TOPIC_CI_SCAN_DURABLE               string = "CI-SCAN-DURABLE-1"
	ARGO_PIPELINE_STATUS_UPDATE_TOPIC   string = "ARGO_PIPELINE_STATUS_UPDATE"
	ARGO_PIPELINE_STATUS_UPDATE_GROUP   string = "ARGO_PIPELINE_STATUS_UPDATE_GROUP-1"
	ARGO_PIPELINE_STATUS_UPDATE_DURABLE string = "ARGO_PIPELINE_STATUS_UPDATE_DURABLE-1"
	CD_BULK_DEPLOY_TRIGGER_TOPIC        string = "CD-BULK-DEPLOY-TRIGGER"
	CD_BULK_DEPLOY_TRIGGER_GROUP        string = "CD-BULK-DEPLOY-TRIGGER-GROUP-1"
	CD_BULK_DEPLOY_TRIGGER_DURABLE      string = "CD-BULK-DEPLOY-TRIGGER-DURABLE-1"
	HELM_CHART_INSTALL_STATUS_TOPIC     string = "HELM-CHART-INSTALL-STATUS-TOPIC"
	HELM_CHART_INSTALL_STATUS_GROUP     string = "HELM-CHART-INSTALL-STATUS-GROUP"
	HELM_CHART_INSTALL_STATUS_DURABLE   string = "HELM-CHART-INSTALL-STATUS-DURABLE"
	DEVTRON_CHART_INSTALL_TOPIC         string = "DEVTRON-CHART-INSTALL-TOPIC"
	DEVTRON_CHART_INSTALL_GROUP         string = "DEVTRON-CHART-INSTALL-GROUP"
	DEVTRON_CHART_INSTALL_DURABLE       string = "DEVTRON-CHART-INSTALL-DURABLE"
	PANIC_ON_PROCESSING_TOPIC           string = "PANIC-ON-PROCESSING-TOPIC"
	PANIC_ON_PROCESSING_GROUP           string = "PANIC-ON-PROCESSING-GROUP"
	PANIC_ON_PROCESSING_DURABLE         string = "PANIC-ON-PROCESSING-DURABLE"
	CD_STAGE_SUCCESS_EVENT_TOPIC        string = "CD-STAGE-SUCCESS-EVENT"
	CD_STAGE_SUCCESS_EVENT_GROUP        string = "CD-STAGE-SUCCESS-EVENT-GROUP"
	CD_STAGE_SUCCESS_EVENT_DURABLE      string = "CD-STAGE-SUCCESS-EVENT-DURABLE"
)
View Source
const NATS_MSG_LOG_PREFIX = "NATS_LOG"

Variables

Functions

func AddStream

func AddStream(js nats.JetStreamContext, streamConfig *nats.StreamConfig, streamNames ...string) error

func GetStreamSubjects

func GetStreamSubjects(streamName string) []string

func ParseAndFillStreamWiseAndConsumerWiseConfigMaps

func ParseAndFillStreamWiseAndConsumerWiseConfigMaps()

Types

type ConfigJson

type ConfigJson struct {
	// StreamConfigJson is a json string of map[string]NatsStreamConfig
	StreamConfigJson string `env:"STREAM_CONFIG_JSON"`
	// ConsumerConfigJson is a json string of map[string]NatsConsumerConfig
	// eg: "{\"ARGO_PIPELINE_STATUS_UPDATE_DURABLE-1\" : \"{\"natsMsgProcessingBatchSize\" : 3, \"natsMsgBufferSize\" : 3, \"ackWaitInSecs\": 300}\"}"
	ConsumerConfigJson string `env:"CONSUMER_CONFIG_JSON"`
}

type LoggerFunc added in v0.0.10

type LoggerFunc func(msg model.PubSubMsg) (logMsg string, keysAndValues []interface{})

LoggerFunc is used to log the message before passing to callback function. it expects logg message and key value pairs to be returned. if keysAndValues is empty, it will log whole model.PubSubMsg

type NatsClient

type NatsClient struct {
	JetStrCtxt nats.JetStreamContext
	Conn       *nats.Conn
	// contains filtered or unexported fields
}

func NewNatsClient

func NewNatsClient(logger *zap.SugaredLogger) (*NatsClient, error)

type NatsClientConfig

type NatsClientConfig struct {
	NatsServerHost string `env:"NATS_SERVER_HOST" envDefault:"nats://devtron-nats.devtroncd:4222"`

	// consumer wise
	// NatsMsgProcessingBatchSize is the number of messages that will be processed in one go
	NatsMsgProcessingBatchSize int `env:"NATS_MSG_PROCESSING_BATCH_SIZE" envDefault:"1"`

	// NatsMsgBufferSize is the number of messages that will be buffered in memory (channel size)
	// it is recommended to set this value equal to NatsMsgProcessingBatchSize as we want to process maximum messages in the buffer in one go.
	// Note: if NatsMsgBufferSize is less than NatsMsgProcessingBatchSize
	// then the wait time for the unprocessed messages in the buffer will be high.(total process time = life-time in buffer + processing time)
	// NatsMsgBufferSize can be configured independently of NatsMsgProcessingBatchSize if needed by setting its value to positive value in env.
	// if NatsMsgBufferSize set to a non-positive value then it will take the value of NatsMsgProcessingBatchSize.
	// Note: always get this value by calling GetNatsMsgBufferSize method
	NatsMsgBufferSize    int `env:"NATS_MSG_BUFFER_SIZE" envDefault:"-1"`
	NatsMsgMaxAge        int `env:"NATS_MSG_MAX_AGE" envDefault:"86400"`
	NatsMsgAckWaitInSecs int `env:"NATS_MSG_ACK_WAIT_IN_SECS" envDefault:"120"`
}

func (NatsClientConfig) GetDefaultNatsConsumerConfig added in v0.0.10

func (ncc NatsClientConfig) GetDefaultNatsConsumerConfig() NatsConsumerConfig

func (NatsClientConfig) GetDefaultNatsStreamConfig added in v0.0.10

func (ncc NatsClientConfig) GetDefaultNatsStreamConfig() NatsStreamConfig

func (NatsClientConfig) GetNatsMsgBufferSize added in v0.0.10

func (ncc NatsClientConfig) GetNatsMsgBufferSize() int

type NatsConsumerConfig

type NatsConsumerConfig struct {
	// NatsMsgProcessingBatchSize is the number of messages that will be processed in one go
	NatsMsgProcessingBatchSize int `json:"natsMsgProcessingBatchSize"`
	// NatsMsgBufferSize is the number of messages that will be buffered in memory (channel size).
	// Note: always get this value by calling GetNatsMsgBufferSize method
	NatsMsgBufferSize int `json:"natsMsgBufferSize"`
	// AckWaitInSecs is the time in seconds for which the message can be in unacknowledged state
	AckWaitInSecs int `json:"ackWaitInSecs"`
}

func (NatsConsumerConfig) GetNatsMsgBufferSize added in v0.0.10

func (consumerConf NatsConsumerConfig) GetNatsMsgBufferSize() int

type NatsStreamConfig

type NatsStreamConfig struct {
	StreamConfig StreamConfig `json:"streamConfig"`
}

type NatsTopic

type NatsTopic struct {
	// contains filtered or unexported fields
}

func GetNatsTopic

func GetNatsTopic(topicName string) NatsTopic

type PubSubClientService

type PubSubClientService interface {
	Publish(topic string, msg string) error
	Subscribe(topic string, callback func(msg *model.PubSubMsg), loggerFunc LoggerFunc, validations ...ValidateMsg) error
}

type PubSubClientServiceImpl

type PubSubClientServiceImpl struct {
	Logger     *zap.SugaredLogger
	NatsClient *NatsClient
	// contains filtered or unexported fields
}

func NewPubSubClientServiceImpl

func NewPubSubClientServiceImpl(logger *zap.SugaredLogger) *PubSubClientServiceImpl

func (PubSubClientServiceImpl) Log added in v0.0.10

func (impl PubSubClientServiceImpl) Log(loggerFunc LoggerFunc, topic string, subMsg model.PubSubMsg)

func (PubSubClientServiceImpl) Publish

func (impl PubSubClientServiceImpl) Publish(topic string, msg string) error

func (PubSubClientServiceImpl) Subscribe

func (impl PubSubClientServiceImpl) Subscribe(topic string, callback func(msg *model.PubSubMsg), loggerFunc LoggerFunc, validations ...ValidateMsg) error

Subscribe method is used to subscribe to the given topic(+required), this creates blocking process to continuously fetch messages from nats server published on this topic. invokes callback(+required) func for each message received. loggerFunc(+optional) is invoked before passing the message to the callback function. validations(+optional) methods were called before passing the message to the callback func.

func (PubSubClientServiceImpl) TryCatchCallBack added in v0.0.8

func (impl PubSubClientServiceImpl) TryCatchCallBack(msg *nats.Msg, callback func(msg *model.PubSubMsg), loggerFunc LoggerFunc, validations ...ValidateMsg)

TryCatchCallBack is a fail-safe method to use callback function

type StreamConfig

type StreamConfig struct {
	MaxAge time.Duration `json:"max_age"`
}

type ValidateMsg added in v0.0.10

type ValidateMsg func(msg model.PubSubMsg) bool

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL