Documentation ¶
Index ¶
- Constants
- Variables
- func Exit(signal os.Signal)
- func GetKafkaHeader(message *kafka.Message, key string) string
- func GetKafkaIntHeader(message *kafka.Message, name string) (int, error)
- func GetKafkaTimeHeader(message *kafka.Message, name string) (time.Time, error)
- func MakeTopicId(destinationId, mode, tableName string, checkLength bool) (string, error)
- func ParseTopicId(topic string) (destinationId, mode, tableName string, err error)
- func ProducerErrorLabels(topicId string, errText string) (topic, destinationId, mode, tableName, err string)
- func ProducerMessageLabels(topicId string, status, errText string) (topic, destinationId, mode, tableName, st string, err string)
- func RedisError(err error) string
- func RetryBackOffTime(config *AppConfig, attempt int) time.Time
- func Run()
- type AbstractBatchConsumer
- func (bc *AbstractBatchConsumer) BatchPeriodSec() int
- func (bc *AbstractBatchConsumer) ConsumeAll() (counters BatchCounters, err error)
- func (bc *AbstractBatchConsumer) Retire()
- func (bc *AbstractBatchConsumer) RunJob()
- func (bc *AbstractBatchConsumer) TopicId() string
- func (bc *AbstractBatchConsumer) UpdateBatchPeriod(batchPeriodSec int)
- type ApiKey
- type ApiKeyBinding
- type AppConfig
- type AppContext
- type BatchConsumer
- type BatchConsumerImpl
- type BatchCounters
- type BatchFunction
- type BatchState
- type ConfigurationSource
- type Cron
- type DataLayout
- type Destination
- type DestinationConfig
- type DummyEventsLogService
- func (d *DummyEventsLogService) Close() error
- func (d *DummyEventsLogService) GetEvents(eventType EventType, actorId string, filter *EventsLogFilter, limit int) ([]EventsLogRecord, error)
- func (d *DummyEventsLogService) PostEvent(eventType EventType, actorId string, event any) (id EventsLogRecordId, err error)
- type EnvConfigurationSource
- func (ecs *EnvConfigurationSource) ChangesChannel() <-chan bool
- func (ecs *EnvConfigurationSource) Close() error
- func (ecs *EnvConfigurationSource) GetDestinationConfig(id string) *DestinationConfig
- func (ecs *EnvConfigurationSource) GetDestinationConfigs() []*DestinationConfig
- func (ecs *EnvConfigurationSource) GetValue(key string) any
- type EventStatus
- type EventType
- type EventsLogFilter
- type EventsLogRecord
- type EventsLogRecordId
- type EventsLogService
- type FastStore
- type IngestMessage
- type IngestMessageOrigin
- type JobRunner
- type MetricsRelay
- type MetricsServer
- type MultiConfigurationSource
- type Producer
- type RedisConfigurationSource
- func (rcs *RedisConfigurationSource) ChangesChannel() <-chan bool
- func (rcs *RedisConfigurationSource) Close() error
- func (rcs *RedisConfigurationSource) GetDestinationConfig(id string) *DestinationConfig
- func (rcs *RedisConfigurationSource) GetDestinationConfigs() []*DestinationConfig
- func (rcs *RedisConfigurationSource) GetValue(key string) any
- type RedisEventsLog
- type Repository
- type RepositoryChange
- type RetryConsumer
- type Router
- func (r *Router) AuthMiddleware(c *gin.Context)
- func (r *Router) BulkHandler(c *gin.Context)
- func (r *Router) EventsHandler(c *gin.Context)
- func (r *Router) EventsLogHandler(c *gin.Context)
- func (r *Router) FailedHandler(c *gin.Context)
- func (r *Router) GetEngine() *gin.Engine
- func (r *Router) IngestHandler(c *gin.Context)
- func (r *Router) ResponseError(c *gin.Context, code int, errorType string, maskError bool, err error, ...) RouterError
- func (r *Router) TestConnectionHandler(c *gin.Context)
- type RouterError
- type ShortDestinationConfig
- type StreamConfig
- type StreamConsumer
- type StreamWithDestinations
- type TagDestinationConfig
- type TopicManager
- type YamlConfigurationSource
- func (ycp *YamlConfigurationSource) ChangesChannel() <-chan bool
- func (ycp *YamlConfigurationSource) Close() error
- func (ycp *YamlConfigurationSource) GetDestinationConfig(id string) *DestinationConfig
- func (ycp *YamlConfigurationSource) GetDestinationConfigs() []*DestinationConfig
- func (ycp *YamlConfigurationSource) GetValue(key string) any
Constants ¶
const ( DataLayoutSegmentCompatible = "segment-compatible" DataLayoutSegmentSingleTable = "segment-single-table" DataLayoutJitsuLegacy = "jitsu-legacy" )
const MessageIdHeader = "message_id"
Variables ¶
var TimestampPattern = regexp.MustCompile(`^\d{13}$`)
Functions ¶
func GetKafkaTimeHeader ¶
func MakeTopicId ¶
func ParseTopicId ¶
func ProducerErrorLabels ¶
func ProducerMessageLabels ¶
func RedisError ¶
Types ¶
type AbstractBatchConsumer ¶
type AbstractBatchConsumer struct { sync.Mutex objects.ServiceBase // contains filtered or unexported fields }
func NewAbstractBatchConsumer ¶
func NewAbstractBatchConsumer(repository *Repository, destinationId string, batchPeriodSec int, topicId, mode string, config *AppConfig, kafkaConfig *kafka.ConfigMap) (*AbstractBatchConsumer, error)
func (*AbstractBatchConsumer) BatchPeriodSec ¶
func (bc *AbstractBatchConsumer) BatchPeriodSec() int
func (*AbstractBatchConsumer) ConsumeAll ¶
func (bc *AbstractBatchConsumer) ConsumeAll() (counters BatchCounters, err error)
func (*AbstractBatchConsumer) Retire ¶
func (bc *AbstractBatchConsumer) Retire()
Retire Mark consumer as retired Consumer will close itself when com
func (*AbstractBatchConsumer) RunJob ¶
func (bc *AbstractBatchConsumer) RunJob()
func (*AbstractBatchConsumer) TopicId ¶
func (bc *AbstractBatchConsumer) TopicId() string
func (*AbstractBatchConsumer) UpdateBatchPeriod ¶
func (bc *AbstractBatchConsumer) UpdateBatchPeriod(batchPeriodSec int)
type ApiKeyBinding ¶
type AppConfig ¶
type AppConfig struct { // InstanceId ID of bulker instance. It is used for identifying Kafka consumers. // If is not set, instance id will be generated and persisted to disk (~/.bulkerapp/instance_id) and reused on next restart. // Default: random uuid InstanceId string `mapstructure:"INSTANCE_ID"` // HTTPPort port for bulker http server. Default: 3042 HTTPPort int `mapstructure:"HTTP_PORT"` // AuthTokens A list of auth tokens that authorizes user in HTTP interface separated by comma. Each token can be either: // - `${token}` un-encrypted token value // - `${salt}.${hash}` hashed token.` ${salt}` should be random string. Hash is `base64(sha512($token + $salt + TokenSecrets)`. // - Token is `[0-9a-zA-Z_\-]` (only letters, digits, underscore and dash) AuthTokens string `mapstructure:"AUTH_TOKENS"` // See AuthTokens TokenSecrets string `mapstructure:"TOKEN_SECRET"` // For ingest endpoint only GlobalHashSecret string `mapstructure:"GLOBAL_HASH_SECRET" default:"dea42a58-acf4-45af-85bb-e77e94bd5025"` // For ingest endpoint only GlobalHashSecrets []string // LogFormat log format. Can be `text` or `json`. Default: `text` LogFormat string `mapstructure:"LOG_FORMAT"` // ConfigSource source of destinations configs. Can be: // - `file://...` for destinations config in yaml format // - `redis` or `redis://redis_url` to load configs from redis `enrichedConnections` key // - `env://PREFIX` to load each destination environment variables with like `PREFIX_ID` where ID is destination id // // Default: `env://BULKER_DESTINATION` ConfigSource string `mapstructure:"CONFIG_SOURCE"` // RedisURL that will be used by default by all services that need Redis RedisURL string `mapstructure:"REDIS_URL"` RedisTLSCA string `mapstructure:"REDIS_TLS_CA"` // KubernetesNamespace namespace of bulker app. Default: `default` KubernetesNamespace string `mapstructure:"KUBERNETES_NAMESPACE" default:"default"` KubernetesClientConfig string `mapstructure:"KUBERNETES_CLIENT_CONFIG"` // KafkaBootstrapServers List of Kafka brokers separated by comma. Each broker should be in format host:port. KafkaBootstrapServers string `mapstructure:"KAFKA_BOOTSTRAP_SERVERS"` KafkaSSL bool `mapstructure:"KAFKA_SSL" default:"false"` KafkaSSLSkipVerify bool `mapstructure:"KAFKA_SSL_SKIP_VERIFY" default:"false"` //Kafka authorization as JSON object {"mechanism": "SCRAM-SHA-256|PLAIN", "username": "user", "password": "password"} KafkaSASL string `mapstructure:"KAFKA_SASL"` KafkaTopicCompression string `mapstructure:"KAFKA_TOPIC_COMPRESSION" default:"snappy"` KafkaTopicRetentionHours int `mapstructure:"KAFKA_TOPIC_RETENTION_HOURS" default:"168"` KafkaRetryTopicRetentionHours int `mapstructure:"KAFKA_RETRY_TOPIC_RETENTION_HOURS" default:"168"` KafkaRetryTopicSegmentBytes int `mapstructure:"KAFKA_RETRY_TOPIC_SEGMENT_BYTES" default:"104857600"` KafkaDeadTopicRetentionHours int `mapstructure:"KAFKA_DEAD_TOPIC_RETENTION_HOURS" default:"168"` KafkaTopicReplicationFactor int `mapstructure:"KAFKA_TOPIC_REPLICATION_FACTOR"` KafkaAdminMetadataTimeoutMs int `mapstructure:"KAFKA_ADMIN_METADATA_TIMEOUT_MS" default:"1000"` KafkaConsumerPartitionsAssigmentStrategy string `mapstructure:"KAFKA_CONSUMER_PARTITIONS_ASSIGMENT_STRATEGY" default:"cooperative-sticky"` // KafkaDestinationsTopicName destination topic for /ingest endpoint KafkaDestinationsTopicName string `mapstructure:"KAFKA_DESTINATIONS_TOPIC_NAME" default:"destination-messages"` KafkaDestinationsTopicPartitions int `mapstructure:"KAFKA_DESTINATIONS_TOPIC_PARTITIONS" default:"4"` KafkaDestinationsDeadLetterTopicName string `mapstructure:"KAFKA_DESTINATIONS_DEAD_LETTER_TOPIC_NAME" default:"destination-messages-dead-letter"` // TopicManagerRefreshPeriodSec how often topic manager will check for new topics TopicManagerRefreshPeriodSec int `mapstructure:"TOPIC_MANAGER_REFRESH_PERIOD_SEC" default:"5"` // ProducerWaitForDeliveryMs For ProduceSync only is a timeout for producer to wait for delivery report. ProducerWaitForDeliveryMs int `mapstructure:"PRODUCER_WAIT_FOR_DELIVERY_MS" default:"1000"` BatchRunnerPeriodSec int `mapstructure:"BATCH_RUNNER_DEFAULT_PERIOD_SEC" default:"300"` BatchRunnerDefaultBatchSize int `mapstructure:"BATCH_RUNNER_DEFAULT_BATCH_SIZE" default:"10000"` // BatchRunnerWaitForMessagesSec when there are no more messages in the topic BatchRunner will wait for BatchRunnerWaitForMessagesSec seconds before sending a batch BatchRunnerWaitForMessagesSec int `mapstructure:"BATCH_RUNNER_WAIT_FOR_MESSAGES_SEC" default:"1"` BatchRunnerRetryPeriodSec int `mapstructure:"BATCH_RUNNER_DEFAULT_RETRY_PERIOD_SEC" default:"300"` BatchRunnerDefaultRetryBatchFraction float64 `mapstructure:"BATCH_RUNNER_DEFAULT_RETRY_BATCH_FRACTION" default:"0.1"` MessagesRetryCount int `mapstructure:"MESSAGES_RETRY_COUNT" default:"5"` // MessagesRetryBackoffBase defines base for exponential backoff in minutes. // For example, if retry count is 3 and base is 5, then retry delays will be 5, 25, 125 minutes. // Default: 5 MessagesRetryBackoffBase float64 `mapstructure:"MESSAGES_RETRY_BACKOFF_BASE" default:"5"` // MessagesRetryBackoffMaxDelay defines maximum possible retry delay in minutes. Default: 1440 minutes = 24 hours MessagesRetryBackoffMaxDelay float64 `mapstructure:"MESSAGES_RETRY_BACKOFF_MAX_DELAY" default:"1440"` EventsLogRedisURL string `mapstructure:"EVENTS_LOG_REDIS_URL"` EventsLogMaxSize int `mapstructure:"EVENTS_LOG_MAX_SIZE" default:"1000"` MetricsPort int `mapstructure:"METRICS_PORT" default:"9091"` MetricsRelayDestination string `mapstructure:"METRICS_RELAY_DESTINATION"` MetricsRelayPeriodSec int `mapstructure:"METRICS_RELAY_PERIOD_SEC" default:"60"` //Timeout that give running batch tasks time to finish during shutdown. ShutdownTimeoutSec int `mapstructure:"SHUTDOWN_TIMEOUT_SEC" default:"10"` //Extra delay may be needed. E.g. for metric scrapper to scrape final metrics. So http server will stay active for an extra period. ShutdownExtraDelay int `mapstructure:"SHUTDOWN_EXTRA_DELAY_SEC"` }
AppConfig is a struct for bulker app configuration It is loaded from `bulker.env` config file or environment variables.
Environment variables requires prefix `BULKER_`
func InitAppConfig ¶
func (*AppConfig) GetKafkaConfig ¶
GetKafkaConfig returns kafka config
type AppContext ¶
type AppContext struct {
// contains filtered or unexported fields
}
func InitAppContext ¶
func InitAppContext() *AppContext
func (*AppContext) Shutdown ¶
func (a *AppContext) Shutdown()
type BatchConsumer ¶
type BatchConsumer interface { RunJob() ConsumeAll() (consumed BatchCounters, err error) Retire() BatchPeriodSec() int UpdateBatchPeriod(batchPeriodSec int) TopicId() string }
type BatchConsumerImpl ¶
type BatchConsumerImpl struct { *AbstractBatchConsumer // contains filtered or unexported fields }
func NewBatchConsumer ¶
func NewBatchConsumer(repository *Repository, destinationId string, batchPeriodSec int, topicId string, config *AppConfig, kafkaConfig *kafka.ConfigMap, eventsLogService EventsLogService) (*BatchConsumerImpl, error)
type BatchCounters ¶
type BatchCounters struct {
// contains filtered or unexported fields
}
type BatchFunction ¶
type BatchFunction func(destination *Destination, batchNum, batchSize, retryBatchSize int) (counters BatchCounters, nextBatch bool, err error)
type BatchState ¶
type ConfigurationSource ¶
type ConfigurationSource interface { io.Closer GetDestinationConfigs() []*DestinationConfig GetDestinationConfig(id string) *DestinationConfig ChangesChannel() <-chan bool }
func InitConfigurationSource ¶
func InitConfigurationSource(config *AppConfig) (ConfigurationSource, error)
type Cron ¶
type Cron struct { objects.ServiceBase // contains filtered or unexported fields }
func (*Cron) AddBatchConsumer ¶
func (c *Cron) AddBatchConsumer(batchConsumer BatchConsumer) (*gocron.Job, error)
func (*Cron) RemoveBatchConsumer ¶
func (c *Cron) RemoveBatchConsumer(batchConsumer BatchConsumer) error
func (*Cron) ReplaceBatchConsumer ¶
func (c *Cron) ReplaceBatchConsumer(batchConsumer BatchConsumer) (*gocron.Job, error)
type DataLayout ¶
type DataLayout string
type Destination ¶
func (*Destination) Lease ¶
func (d *Destination) Lease()
Lease destination. destination cannot be closed while at lease one service is using it (e.g. batch consumer)
func (*Destination) Mode ¶
func (d *Destination) Mode() bulker.BulkMode
Mode returns destination mode
type DestinationConfig ¶
type DestinationConfig struct { UpdatedAt time.Time `mapstructure:"updatedAt" json:"updatedAt"` UsesBulker bool `mapstructure:"usesBulker" json:"usesBulker"` bulker.Config `mapstructure:",squash"` bulker.StreamConfig `mapstructure:",squash"` }
func (*DestinationConfig) Id ¶
func (dc *DestinationConfig) Id() string
type DummyEventsLogService ¶
type DummyEventsLogService struct{}
func (*DummyEventsLogService) Close ¶
func (d *DummyEventsLogService) Close() error
func (*DummyEventsLogService) GetEvents ¶
func (d *DummyEventsLogService) GetEvents(eventType EventType, actorId string, filter *EventsLogFilter, limit int) ([]EventsLogRecord, error)
func (*DummyEventsLogService) PostEvent ¶
func (d *DummyEventsLogService) PostEvent(eventType EventType, actorId string, event any) (id EventsLogRecordId, err error)
type EnvConfigurationSource ¶
type EnvConfigurationSource struct { objects.ServiceBase // contains filtered or unexported fields }
func NewEnvConfigurationSource ¶
func NewEnvConfigurationSource(prefix string) *EnvConfigurationSource
func (*EnvConfigurationSource) ChangesChannel ¶
func (ecs *EnvConfigurationSource) ChangesChannel() <-chan bool
func (*EnvConfigurationSource) Close ¶
func (ecs *EnvConfigurationSource) Close() error
func (*EnvConfigurationSource) GetDestinationConfig ¶
func (ecs *EnvConfigurationSource) GetDestinationConfig(id string) *DestinationConfig
func (*EnvConfigurationSource) GetDestinationConfigs ¶
func (ecs *EnvConfigurationSource) GetDestinationConfigs() []*DestinationConfig
func (*EnvConfigurationSource) GetValue ¶
func (ecs *EnvConfigurationSource) GetValue(key string) any
type EventStatus ¶
type EventStatus string
type EventType ¶
type EventType string
const ( EventTypeIncomingAll EventType = "incoming.all" EventTypeIncomingError EventType = "incoming.error" EventTypeProcessedAll EventType = "bulker_stream.all" EventTypeProcessedError EventType = "bulker_stream.error" EventTypeBatchAll EventType = "bulker_batch.all" EventTypeBatchError EventType = "bulker_batch.error" )
type EventsLogFilter ¶
type EventsLogFilter struct { Start time.Time End time.Time BeforeId EventsLogRecordId Filter func(event any) bool }
func (*EventsLogFilter) GetStartAndEndIds ¶
func (f *EventsLogFilter) GetStartAndEndIds() (start, end string, err error)
GetStartAndEndIds returns end and start ids for the stream
type EventsLogRecord ¶
type EventsLogRecord struct { Id EventsLogRecordId `json:"id"` Date time.Time `json:"date"` Content any `json:"content"` }
type EventsLogRecordId ¶
type EventsLogRecordId string
type EventsLogService ¶
type EventsLogService interface { io.Closer // PostEvent posts event to the events log // actorId – id of entity of event origin. E.g. for 'incoming' event - id of site, for 'processed' event - id of destination PostEvent(eventType EventType, actorId string, event any) (id EventsLogRecordId, err error) GetEvents(eventType EventType, actorId string, filter *EventsLogFilter, limit int) ([]EventsLogRecord, error) }
type FastStore ¶
type FastStore struct { objects.ServiceBase // contains filtered or unexported fields }
func NewFastStore ¶
func (*FastStore) GetStreamById ¶
func (fs *FastStore) GetStreamById(slug string) (*StreamWithDestinations, error)
func (*FastStore) GetStreamsByDomain ¶
func (fs *FastStore) GetStreamsByDomain(domain string) ([]StreamWithDestinations, error)
type IngestMessage ¶
type IngestMessage struct { IngestType string `json:"ingestType"` ConnectionId string `json:"connectionId"` MessageCreated time.Time `json:"messageCreated"` WriteKey string `json:"writeKey"` MessageId string `json:"messageId"` Type string `json:"type"` Origin IngestMessageOrigin `json:"origin"` HttpHeaders map[string]string `json:"httpHeaders"` HttpPayload map[string]any `json:"httpPayload"` }
type IngestMessageOrigin ¶
type JobRunner ¶
type JobRunner struct { objects.ServiceBase // contains filtered or unexported fields }
func NewJobRunner ¶
func NewJobRunner(appContext *AppContext) (*JobRunner, error)
func (*JobRunner) SpecHandler ¶
type MetricsRelay ¶
type MetricsRelay struct { objects.ServiceBase // contains filtered or unexported fields }
func NewMetricsRelay ¶
func NewMetricsRelay(appConfig *AppConfig) (*MetricsRelay, error)
type MetricsServer ¶
type MetricsServer struct { objects.ServiceBase // contains filtered or unexported fields }
func NewMetricsServer ¶
func NewMetricsServer(appconfig *AppConfig) *MetricsServer
func (*MetricsServer) Stop ¶
func (s *MetricsServer) Stop() error
type MultiConfigurationSource ¶
type MultiConfigurationSource struct {
// contains filtered or unexported fields
}
func NewMultiConfigurationSource ¶
func NewMultiConfigurationSource(configurationSources []ConfigurationSource) *MultiConfigurationSource
func (*MultiConfigurationSource) ChangesChannel ¶
func (mcs *MultiConfigurationSource) ChangesChannel() <-chan bool
func (*MultiConfigurationSource) Close ¶
func (mcs *MultiConfigurationSource) Close() error
func (*MultiConfigurationSource) GetDestinationConfig ¶
func (mcs *MultiConfigurationSource) GetDestinationConfig(id string) *DestinationConfig
func (*MultiConfigurationSource) GetDestinationConfigs ¶
func (mcs *MultiConfigurationSource) GetDestinationConfigs() []*DestinationConfig
type Producer ¶
type Producer struct { objects.ServiceBase // contains filtered or unexported fields }
func NewProducer ¶
NewProducer creates new Producer
func (*Producer) ProduceAsync ¶
ProduceAsync TODO: transactional delivery? produces messages to kafka
func (*Producer) ProduceSync ¶
ProduceSync TODO: transactional delivery? produces messages to kafka
type RedisConfigurationSource ¶
type RedisConfigurationSource struct { objects.ServiceBase sync.Mutex // contains filtered or unexported fields }
func NewRedisConfigurationSource ¶
func NewRedisConfigurationSource(appconfig *AppConfig) (*RedisConfigurationSource, error)
func (*RedisConfigurationSource) ChangesChannel ¶
func (rcs *RedisConfigurationSource) ChangesChannel() <-chan bool
func (*RedisConfigurationSource) Close ¶
func (rcs *RedisConfigurationSource) Close() error
func (*RedisConfigurationSource) GetDestinationConfig ¶
func (rcs *RedisConfigurationSource) GetDestinationConfig(id string) *DestinationConfig
func (*RedisConfigurationSource) GetDestinationConfigs ¶
func (rcs *RedisConfigurationSource) GetDestinationConfigs() []*DestinationConfig
func (*RedisConfigurationSource) GetValue ¶
func (rcs *RedisConfigurationSource) GetValue(key string) any
type RedisEventsLog ¶
type RedisEventsLog struct { objects.ServiceBase // contains filtered or unexported fields }
func NewRedisEventsLog ¶
func NewRedisEventsLog(config *AppConfig, redisUrl string) (*RedisEventsLog, error)
func (*RedisEventsLog) Close ¶
func (r *RedisEventsLog) Close() error
func (*RedisEventsLog) GetEvents ¶
func (r *RedisEventsLog) GetEvents(eventType EventType, actorId string, filter *EventsLogFilter, limit int) ([]EventsLogRecord, error)
func (*RedisEventsLog) PostEvent ¶
func (r *RedisEventsLog) PostEvent(eventType EventType, actorId string, event any) (id EventsLogRecordId, err error)
type Repository ¶
type Repository struct { objects.ServiceBase sync.Mutex // contains filtered or unexported fields }
func NewRepository ¶
func NewRepository(config *AppConfig, configurationSource ConfigurationSource) (*Repository, error)
func (*Repository) ChangesChannel ¶
func (r *Repository) ChangesChannel() <-chan RepositoryChange
func (*Repository) GetDestination ¶
func (r *Repository) GetDestination(id string) *Destination
func (*Repository) GetDestinations ¶
func (r *Repository) GetDestinations() []*Destination
func (*Repository) LeaseDestination ¶
func (r *Repository) LeaseDestination(id string) *Destination
LeaseDestination destination. destination cannot be closed while at lease one service is using it (e.g. batch consumer)
type RepositoryChange ¶
type RepositoryChange struct { AddedDestinations []*Destination ChangedDestinations []*Destination RemovedDestinationIds []string }
type RetryConsumer ¶
type RetryConsumer struct {
*AbstractBatchConsumer
}
func NewRetryConsumer ¶
func NewRetryConsumer(repository *Repository, destinationId string, batchPeriodSec int, topicId string, config *AppConfig, kafkaConfig *kafka.ConfigMap) (*RetryConsumer, error)
type Router ¶
type Router struct { objects.ServiceBase // contains filtered or unexported fields }
func NewRouter ¶
func NewRouter(appContext *AppContext, jobRunner *JobRunner) *Router
func (*Router) AuthMiddleware ¶
func (*Router) BulkHandler ¶
func (*Router) EventsHandler ¶
func (*Router) EventsLogHandler ¶
EventsLogHandler - gets events log by EventType, actor id. Filtered by date range and cursorId
func (*Router) FailedHandler ¶
func (*Router) IngestHandler ¶
func (*Router) ResponseError ¶
func (*Router) TestConnectionHandler ¶
type RouterError ¶
type ShortDestinationConfig ¶
type ShortDestinationConfig struct { TagDestinationConfig Id string `json:"id"` ConnectionId string `json:"connectionId"` DestinationType string `json:"destinationType"` }
type StreamConfig ¶
type StreamConfig struct { Id string `json:"id"` Type string `json:"type"` WorkspaceId string `json:"workspaceId"` Slug string `json:"slug"` Name string `json:"name"` Domains []string `json:"domains"` AuthorizedJavaScriptDomains string `json:"authorizedJavaScriptDomains"` PublicKeys []ApiKey `json:"publicKeys"` PrivateKeys []ApiKey `json:"privateKeys""` DataLayout string `json:"dataLayout"` }
type StreamConsumer ¶
type StreamConsumer struct { objects.ServiceBase // contains filtered or unexported fields }
func NewStreamConsumer ¶
func NewStreamConsumer(repository *Repository, destination *Destination, topicId string, config *AppConfig, kafkaConfig *kafka.ConfigMap, bulkerProducer *Producer, eventsLogService EventsLogService) (*StreamConsumer, error)
func (*StreamConsumer) UpdateDestination ¶
func (sc *StreamConsumer) UpdateDestination(destination *Destination) error
UpdateDestination
type StreamWithDestinations ¶
type StreamWithDestinations struct { Stream StreamConfig `json:"stream"` SynchronousDestinations []ShortDestinationConfig `json:"synchronousDestinations"` AsynchronousDestinations []ShortDestinationConfig `json:"asynchronousDestinations"` }
type TagDestinationConfig ¶
type TopicManager ¶
type TopicManager struct { objects.ServiceBase sync.Mutex // contains filtered or unexported fields }
func NewTopicManager ¶
func NewTopicManager(appContext *AppContext) (*TopicManager, error)
NewTopicManager returns TopicManager
func (*TopicManager) Close ¶
func (tm *TopicManager) Close() error
func (*TopicManager) EnsureDestinationTopic ¶
func (tm *TopicManager) EnsureDestinationTopic(destination *Destination, topicId string) error
EnsureDestinationTopic creates destination topic if it doesn't exist
func (*TopicManager) IsReady ¶
func (tm *TopicManager) IsReady() bool
IsReady returns true if topic manager is ready to serve requests
func (*TopicManager) LoadMetadata ¶
func (tm *TopicManager) LoadMetadata()
func (*TopicManager) Refresh ¶
func (tm *TopicManager) Refresh()
type YamlConfigurationSource ¶
type YamlConfigurationSource struct { objects.ServiceBase // contains filtered or unexported fields }
func NewYamlConfigurationSource ¶
func NewYamlConfigurationSource(data []byte) (*YamlConfigurationSource, error)
func (*YamlConfigurationSource) ChangesChannel ¶
func (ycp *YamlConfigurationSource) ChangesChannel() <-chan bool
func (*YamlConfigurationSource) Close ¶
func (ycp *YamlConfigurationSource) Close() error
func (*YamlConfigurationSource) GetDestinationConfig ¶
func (ycp *YamlConfigurationSource) GetDestinationConfig(id string) *DestinationConfig
func (*YamlConfigurationSource) GetDestinationConfigs ¶
func (ycp *YamlConfigurationSource) GetDestinationConfigs() []*DestinationConfig
func (*YamlConfigurationSource) GetValue ¶
func (ycp *YamlConfigurationSource) GetValue(key string) any
Source Files ¶
- abstract_batch_consumer.go
- app.go
- app_config.go
- batch_consumer.go
- configuration_source.go
- cron.go
- events_log.go
- fast_store.go
- job_runner.go
- metrics_relay.go
- metrics_server.go
- multi_configuration_source.go
- producer.go
- redis_configuration_source.go
- repository.go
- retry_consumer.go
- router.go
- stream_consumer.go
- topic_manager.go
- utils.go