pubsub_lib

package
v0.0.3-beta3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 19, 2023 License: Apache-2.0 Imports: 8 Imported by: 29

Documentation

Index

Constants

View Source
const (
	CI_RUNNER_STREAM                    string = "CI-RUNNER"
	ORCHESTRATOR_STREAM                 string = "ORCHESTRATOR"
	KUBEWATCH_STREAM                    string = "KUBEWATCH"
	GIT_SENSOR_STREAM                   string = "GIT-SENSOR"
	IMAGE_SCANNER_STREAM                string = "IMAGE-SCANNER"
	BULK_APPSTORE_DEPLOY_TOPIC          string = "APP-STORE.BULK-DEPLOY"
	BULK_APPSTORE_DEPLOY_GROUP          string = "APP-STORE-BULK-DEPLOY-GROUP-1"
	BULK_APPSTORE_DEPLOY_DURABLE        string = "APP-STORE-BULK-DEPLOY-DURABLE-1"
	CD_STAGE_COMPLETE_TOPIC             string = "CD-STAGE-COMPLETE"
	CD_COMPLETE_GROUP                   string = "CD-COMPLETE_GROUP-1"
	CD_COMPLETE_DURABLE                 string = "CD-COMPLETE_DURABLE-1"
	BULK_DEPLOY_TOPIC                   string = "CD.BULK"
	BULK_HIBERNATE_TOPIC                string = "CD.BULK-HIBERNATE"
	BULK_DEPLOY_GROUP                   string = "CD.BULK.GROUP-1"
	BULK_HIBERNATE_GROUP                string = "CD.BULK-HIBERNATE.GROUP-1"
	BULK_DEPLOY_DURABLE                 string = "CD-BULK-DURABLE-1"
	BULK_HIBERNATE_DURABLE              string = "CD-BULK-HIBERNATE-DURABLE-1"
	CI_COMPLETE_TOPIC                   string = "CI-COMPLETE"
	CI_COMPLETE_GROUP                   string = "CI-COMPLETE_GROUP-1"
	CI_COMPLETE_DURABLE                 string = "CI-COMPLETE_DURABLE-1"
	APPLICATION_STATUS_UPDATE_TOPIC     string = "APPLICATION_STATUS_UPDATE"
	APPLICATION_STATUS_UPDATE_GROUP     string = "APPLICATION_STATUS_UPDATE_GROUP-1"
	APPLICATION_STATUS_UPDATE_DURABLE   string = "APPLICATION_STATUS_UPDATE_DURABLE-1"
	APPLICATION_STATUS_DELETE_TOPIC     string = "APPLICATION_STATUS_DELETE"
	APPLICATION_STATUS_DELETE_GROUP     string = "APPLICATION_STATUS_DELETE_GROUP-1"
	APPLICATION_STATUS_DELETE_DURABLE   string = "APPLICATION_STATUS_DELETE_DURABLE-1"
	CRON_EVENTS                         string = "CRON_EVENTS"
	CRON_EVENTS_GROUP                   string = "CRON_EVENTS_GROUP-2"
	CRON_EVENTS_DURABLE                 string = "CRON_EVENTS_DURABLE-2"
	WORKFLOW_STATUS_UPDATE_TOPIC        string = "WORKFLOW_STATUS_UPDATE"
	WORKFLOW_STATUS_UPDATE_GROUP        string = "WORKFLOW_STATUS_UPDATE_GROUP-1"
	WORKFLOW_STATUS_UPDATE_DURABLE      string = "WORKFLOW_STATUS_UPDATE_DURABLE-1"
	CD_WORKFLOW_STATUS_UPDATE           string = "CD_WORKFLOW_STATUS_UPDATE"
	CD_WORKFLOW_STATUS_UPDATE_GROUP     string = "CD_WORKFLOW_STATUS_UPDATE_GROUP-1"
	CD_WORKFLOW_STATUS_UPDATE_DURABLE   string = "CD_WORKFLOW_STATUS_UPDATE_DURABLE-1"
	NEW_CI_MATERIAL_TOPIC               string = "NEW-CI-MATERIAL"
	NEW_CI_MATERIAL_TOPIC_GROUP         string = "NEW-CI-MATERIAL_GROUP-1"
	NEW_CI_MATERIAL_TOPIC_DURABLE       string = "NEW-CI-MATERIAL_DURABLE-1"
	CD_SUCCESS                          string = "CD.TRIGGER"
	CD_TRIGGER_GROUP                    string = "CD.TRIGGER_GRP1"
	CD_TRIGGER_DURABLE                  string = "CD-TRIGGER-DURABLE1"
	WEBHOOK_EVENT_TOPIC                 string = "WEBHOOK_EVENT"
	WEBHOOK_EVENT_GROUP                 string = "WEBHOOK_EVENT_GRP"
	WEBHOOK_EVENT_DURABLE               string = "WEBHOOK_EVENT_DURABLE"
	DEVTRON_TEST_TOPIC                  string = "Test_Topic"
	DEVTRON_TEST_STREAM                 string = "Devtron_Test_Stream"
	DEVTRON_TEST_QUEUE                  string = "Test_Topic_Queue"
	DEVTRON_TEST_CONSUMER               string = "Test_Topic_Consumer"
	TOPIC_CI_SCAN                       string = "CI-SCAN"
	TOPIC_CI_SCAN_GRP                   string = "CI-SCAN-GRP-1"
	TOPIC_CI_SCAN_DURABLE               string = "CI-SCAN-DURABLE-1"
	ARGO_PIPELINE_STATUS_UPDATE_TOPIC   string = "ARGO_PIPELINE_STATUS_UPDATE"
	ARGO_PIPELINE_STATUS_UPDATE_GROUP   string = "ARGO_PIPELINE_STATUS_UPDATE_GROUP-1"
	ARGO_PIPELINE_STATUS_UPDATE_DURABLE string = "ARGO_PIPELINE_STATUS_UPDATE_DURABLE-1"
	CD_BULK_DEPLOY_TRIGGER_TOPIC        string = "CD-BULK-DEPLOY-TRIGGER"
	CD_BULK_DEPLOY_TRIGGER_GROUP        string = "CD-BULK-DEPLOY-TRIGGER-GROUP-1"
	CD_BULK_DEPLOY_TRIGGER_DURABLE      string = "CD-BULK-DEPLOY-TRIGGER-DURABLE-1"
	HELM_CHART_INSTALL_STATUS_TOPIC     string = "HELM-CHART-INSTALL-STATUS-TOPIC"
	HELM_CHART_INSTALL_STATUS_GROUP     string = "HELM-CHART-INSTALL-STATUS-GROUP"
	HELM_CHART_INSTALL_STATUS_DURABLE   string = "HELM-CHART-INSTALL-STATUS-DURABLE"
)
View Source
const DefaultMaxAge time.Duration = 86400000000000

Variables

Functions

func AddStream

func AddStream(js nats.JetStreamContext, streamConfig *nats.StreamConfig, streamNames ...string) error

func GetStreamSubjects

func GetStreamSubjects(streamName string) []string

func ParseAndFillStreamWiseAndConsumerWiseConfigMaps

func ParseAndFillStreamWiseAndConsumerWiseConfigMaps()

Types

type ConfigJson

type ConfigJson struct {
	StreamConfigJson   string `env:"STREAM_CONFIG_JSON"`
	ConsumerConfigJson string `env:"CONSUMER_CONFIG_JSON"`
}

type LogsConfig

type LogsConfig struct {
	DefaultLogTimeLimit int64 `env:"DEFAULT_LOG_TIME_LIMIT" envDefault:"1"`
}

type NatsClient

type NatsClient struct {
	JetStrCtxt nats.JetStreamContext

	NatsMsgProcessingBatchSize int
	NatsMsgBufferSize          int
	Conn                       nats.Conn
	// contains filtered or unexported fields
}

func NewNatsClient

func NewNatsClient(logger *zap.SugaredLogger) (*NatsClient, error)

#nosec

type NatsClientConfig

type NatsClientConfig struct {
	NatsServerHost string `env:"NATS_SERVER_HOST" envDefault:"nats://devtron-nats.devtroncd:4222"`

	//consumer wise
	NatsMsgProcessingBatchSize int `env:"NATS_MSG_PROCESSING_BATCH_SIZE" envDefault:"1"`
	NatsMsgBufferSize          int `env:"NATS_MSG_BUFFER_SIZE" envDefault:"64"`

	//stream wise
	NatsStreamConfig string `env:"NATS_STREAM_CONFIG" envDefault:"{\"max_age\":86400000000000}"`

	// Consumer config
	NatsConsumerConfig string `env:"NATS_CONSUMER_CONFIG" envDefault:"{\"ackWaitInSecs\":3600}"`
}

type NatsConsumerConfig

type NatsConsumerConfig struct {
	NatsMsgProcessingBatchSize int `json:"natsMsgProcessingBatchSize"`
	NatsMsgBufferSize          int `json:"natsMsgBufferSize"`
	AckWaitInSecs              int `json:"ackWaitInSecs"`
}

type NatsStreamConfig

type NatsStreamConfig struct {
	StreamConfig StreamConfig `json:"streamConfig"`
}

type NatsTopic

type NatsTopic struct {
	// contains filtered or unexported fields
}

func GetNatsTopic

func GetNatsTopic(topicName string) NatsTopic

type PubSubClientService

type PubSubClientService interface {
	Publish(topic string, msg string) error
	Subscribe(topic string, callback func(msg *PubSubMsg)) error
}

type PubSubClientServiceImpl

type PubSubClientServiceImpl struct {
	Logger     *zap.SugaredLogger
	NatsClient *NatsClient
	// contains filtered or unexported fields
}

func NewPubSubClientServiceImpl

func NewPubSubClientServiceImpl(logger *zap.SugaredLogger) *PubSubClientServiceImpl

func (PubSubClientServiceImpl) Publish

func (impl PubSubClientServiceImpl) Publish(topic string, msg string) error

func (PubSubClientServiceImpl) Subscribe

func (impl PubSubClientServiceImpl) Subscribe(topic string, callback func(msg *PubSubMsg)) error

type PubSubMsg

type PubSubMsg struct {
	Data string
}

type StreamConfig

type StreamConfig struct {
	MaxAge time.Duration `json:"max_age"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL