target

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2022 License: AGPL-3.0 Imports: 43 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AmqpQueueDir   = "queue_dir"
	AmqpQueueLimit = "queue_limit"

	AmqpURL               = "url"
	AmqpExchange          = "exchange"
	AmqpRoutingKey        = "routing_key"
	AmqpExchangeType      = "exchange_type"
	AmqpDeliveryMode      = "delivery_mode"
	AmqpMandatory         = "mandatory"
	AmqpImmediate         = "immediate"
	AmqpDurable           = "durable"
	AmqpInternal          = "internal"
	AmqpNoWait            = "no_wait"
	AmqpAutoDeleted       = "auto_deleted"
	AmqpArguments         = "arguments"
	AmqpPublisherConfirms = "publisher_confirms"

	EnvAMQPEnable            = "MINIO_NOTIFY_AMQP_ENABLE"
	EnvAMQPURL               = "MINIO_NOTIFY_AMQP_URL"
	EnvAMQPExchange          = "MINIO_NOTIFY_AMQP_EXCHANGE"
	EnvAMQPRoutingKey        = "MINIO_NOTIFY_AMQP_ROUTING_KEY"
	EnvAMQPExchangeType      = "MINIO_NOTIFY_AMQP_EXCHANGE_TYPE"
	EnvAMQPDeliveryMode      = "MINIO_NOTIFY_AMQP_DELIVERY_MODE"
	EnvAMQPMandatory         = "MINIO_NOTIFY_AMQP_MANDATORY"
	EnvAMQPImmediate         = "MINIO_NOTIFY_AMQP_IMMEDIATE"
	EnvAMQPDurable           = "MINIO_NOTIFY_AMQP_DURABLE"
	EnvAMQPInternal          = "MINIO_NOTIFY_AMQP_INTERNAL"
	EnvAMQPNoWait            = "MINIO_NOTIFY_AMQP_NO_WAIT"
	EnvAMQPAutoDeleted       = "MINIO_NOTIFY_AMQP_AUTO_DELETED"
	EnvAMQPArguments         = "MINIO_NOTIFY_AMQP_ARGUMENTS"
	EnvAMQPPublisherConfirms = "MINIO_NOTIFY_AMQP_PUBLISHING_CONFIRMS"
	EnvAMQPQueueDir          = "MINIO_NOTIFY_AMQP_QUEUE_DIR"
	EnvAMQPQueueLimit        = "MINIO_NOTIFY_AMQP_QUEUE_LIMIT"
)

AMQP input constants.

View Source
const (
	ElasticFormat     = "format"
	ElasticURL        = "url"
	ElasticIndex      = "index"
	ElasticQueueDir   = "queue_dir"
	ElasticQueueLimit = "queue_limit"
	ElasticUsername   = "username"
	ElasticPassword   = "password"

	EnvElasticEnable     = "MINIO_NOTIFY_ELASTICSEARCH_ENABLE"
	EnvElasticFormat     = "MINIO_NOTIFY_ELASTICSEARCH_FORMAT"
	EnvElasticURL        = "MINIO_NOTIFY_ELASTICSEARCH_URL"
	EnvElasticIndex      = "MINIO_NOTIFY_ELASTICSEARCH_INDEX"
	EnvElasticQueueDir   = "MINIO_NOTIFY_ELASTICSEARCH_QUEUE_DIR"
	EnvElasticQueueLimit = "MINIO_NOTIFY_ELASTICSEARCH_QUEUE_LIMIT"
	EnvElasticUsername   = "MINIO_NOTIFY_ELASTICSEARCH_USERNAME"
	EnvElasticPassword   = "MINIO_NOTIFY_ELASTICSEARCH_PASSWORD"
)

Elastic constants

View Source
const (
	KafkaBrokers       = "brokers"
	KafkaTopic         = "topic"
	KafkaQueueDir      = "queue_dir"
	KafkaQueueLimit    = "queue_limit"
	KafkaTLS           = "tls"
	KafkaTLSSkipVerify = "tls_skip_verify"
	KafkaTLSClientAuth = "tls_client_auth"
	KafkaSASL          = "sasl"
	KafkaSASLUsername  = "sasl_username"
	KafkaSASLPassword  = "sasl_password"
	KafkaSASLMechanism = "sasl_mechanism"
	KafkaClientTLSCert = "client_tls_cert"
	KafkaClientTLSKey  = "client_tls_key"
	KafkaVersion       = "version"

	EnvKafkaEnable        = "MINIO_NOTIFY_KAFKA_ENABLE"
	EnvKafkaBrokers       = "MINIO_NOTIFY_KAFKA_BROKERS"
	EnvKafkaTopic         = "MINIO_NOTIFY_KAFKA_TOPIC"
	EnvKafkaQueueDir      = "MINIO_NOTIFY_KAFKA_QUEUE_DIR"
	EnvKafkaQueueLimit    = "MINIO_NOTIFY_KAFKA_QUEUE_LIMIT"
	EnvKafkaTLS           = "MINIO_NOTIFY_KAFKA_TLS"
	EnvKafkaTLSSkipVerify = "MINIO_NOTIFY_KAFKA_TLS_SKIP_VERIFY"
	EnvKafkaTLSClientAuth = "MINIO_NOTIFY_KAFKA_TLS_CLIENT_AUTH"
	EnvKafkaSASLEnable    = "MINIO_NOTIFY_KAFKA_SASL"
	EnvKafkaSASLUsername  = "MINIO_NOTIFY_KAFKA_SASL_USERNAME"
	EnvKafkaSASLPassword  = "MINIO_NOTIFY_KAFKA_SASL_PASSWORD"
	EnvKafkaSASLMechanism = "MINIO_NOTIFY_KAFKA_SASL_MECHANISM"
	EnvKafkaClientTLSCert = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_CERT"
	EnvKafkaClientTLSKey  = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY"
	EnvKafkaVersion       = "MINIO_NOTIFY_KAFKA_VERSION"
)

Kafka input constants

View Source
const (
	MqttBroker            = "broker"
	MqttTopic             = "topic"
	MqttQoS               = "qos"
	MqttUsername          = "username"
	MqttPassword          = "password"
	MqttReconnectInterval = "reconnect_interval"
	MqttKeepAliveInterval = "keep_alive_interval"
	MqttQueueDir          = "queue_dir"
	MqttQueueLimit        = "queue_limit"

	EnvMQTTEnable            = "MINIO_NOTIFY_MQTT_ENABLE"
	EnvMQTTBroker            = "MINIO_NOTIFY_MQTT_BROKER"
	EnvMQTTTopic             = "MINIO_NOTIFY_MQTT_TOPIC"
	EnvMQTTQoS               = "MINIO_NOTIFY_MQTT_QOS"
	EnvMQTTUsername          = "MINIO_NOTIFY_MQTT_USERNAME"
	EnvMQTTPassword          = "MINIO_NOTIFY_MQTT_PASSWORD"
	EnvMQTTReconnectInterval = "MINIO_NOTIFY_MQTT_RECONNECT_INTERVAL"
	EnvMQTTKeepAliveInterval = "MINIO_NOTIFY_MQTT_KEEP_ALIVE_INTERVAL"
	EnvMQTTQueueDir          = "MINIO_NOTIFY_MQTT_QUEUE_DIR"
	EnvMQTTQueueLimit        = "MINIO_NOTIFY_MQTT_QUEUE_LIMIT"
)

MQTT input constants

View Source
const (
	MySQLFormat             = "format"
	MySQLDSNString          = "dsn_string"
	MySQLTable              = "table"
	MySQLHost               = "host"
	MySQLPort               = "port"
	MySQLUsername           = "username"
	MySQLPassword           = "password"
	MySQLDatabase           = "database"
	MySQLQueueLimit         = "queue_limit"
	MySQLQueueDir           = "queue_dir"
	MySQLMaxOpenConnections = "max_open_connections"

	EnvMySQLEnable             = "MINIO_NOTIFY_MYSQL_ENABLE"
	EnvMySQLFormat             = "MINIO_NOTIFY_MYSQL_FORMAT"
	EnvMySQLDSNString          = "MINIO_NOTIFY_MYSQL_DSN_STRING"
	EnvMySQLTable              = "MINIO_NOTIFY_MYSQL_TABLE"
	EnvMySQLHost               = "MINIO_NOTIFY_MYSQL_HOST"
	EnvMySQLPort               = "MINIO_NOTIFY_MYSQL_PORT"
	EnvMySQLUsername           = "MINIO_NOTIFY_MYSQL_USERNAME"
	EnvMySQLPassword           = "MINIO_NOTIFY_MYSQL_PASSWORD"
	EnvMySQLDatabase           = "MINIO_NOTIFY_MYSQL_DATABASE"
	EnvMySQLQueueLimit         = "MINIO_NOTIFY_MYSQL_QUEUE_LIMIT"
	EnvMySQLQueueDir           = "MINIO_NOTIFY_MYSQL_QUEUE_DIR"
	EnvMySQLMaxOpenConnections = "MINIO_NOTIFY_MYSQL_MAX_OPEN_CONNECTIONS"
)

MySQL related constants

View Source
const (
	NATSAddress       = "address"
	NATSSubject       = "subject"
	NATSUsername      = "username"
	NATSPassword      = "password"
	NATSToken         = "token"
	NATSTLS           = "tls"
	NATSTLSSkipVerify = "tls_skip_verify"
	NATSPingInterval  = "ping_interval"
	NATSQueueDir      = "queue_dir"
	NATSQueueLimit    = "queue_limit"
	NATSCertAuthority = "cert_authority"
	NATSClientCert    = "client_cert"
	NATSClientKey     = "client_key"

	// Streaming constants
	NATSStreaming                   = "streaming"
	NATSStreamingClusterID          = "streaming_cluster_id"
	NATSStreamingAsync              = "streaming_async"
	NATSStreamingMaxPubAcksInFlight = "streaming_max_pub_acks_in_flight"

	EnvNATSEnable        = "MINIO_NOTIFY_NATS_ENABLE"
	EnvNATSAddress       = "MINIO_NOTIFY_NATS_ADDRESS"
	EnvNATSSubject       = "MINIO_NOTIFY_NATS_SUBJECT"
	EnvNATSUsername      = "MINIO_NOTIFY_NATS_USERNAME"
	EnvNATSPassword      = "MINIO_NOTIFY_NATS_PASSWORD"
	EnvNATSToken         = "MINIO_NOTIFY_NATS_TOKEN"
	EnvNATSTLS           = "MINIO_NOTIFY_NATS_TLS"
	EnvNATSTLSSkipVerify = "MINIO_NOTIFY_NATS_TLS_SKIP_VERIFY"
	EnvNATSPingInterval  = "MINIO_NOTIFY_NATS_PING_INTERVAL"
	EnvNATSQueueDir      = "MINIO_NOTIFY_NATS_QUEUE_DIR"
	EnvNATSQueueLimit    = "MINIO_NOTIFY_NATS_QUEUE_LIMIT"
	EnvNATSCertAuthority = "MINIO_NOTIFY_NATS_CERT_AUTHORITY"
	EnvNATSClientCert    = "MINIO_NOTIFY_NATS_CLIENT_CERT"
	EnvNATSClientKey     = "MINIO_NOTIFY_NATS_CLIENT_KEY"

	// Streaming constants
	EnvNATSStreaming                   = "MINIO_NOTIFY_NATS_STREAMING"
	EnvNATSStreamingClusterID          = "MINIO_NOTIFY_NATS_STREAMING_CLUSTER_ID"
	EnvNATSStreamingAsync              = "MINIO_NOTIFY_NATS_STREAMING_ASYNC"
	EnvNATSStreamingMaxPubAcksInFlight = "MINIO_NOTIFY_NATS_STREAMING_MAX_PUB_ACKS_IN_FLIGHT"
)

NATS related constants

View Source
const (
	NSQAddress       = "nsqd_address"
	NSQTopic         = "topic"
	NSQTLS           = "tls"
	NSQTLSSkipVerify = "tls_skip_verify"
	NSQQueueDir      = "queue_dir"
	NSQQueueLimit    = "queue_limit"

	EnvNSQEnable        = "MINIO_NOTIFY_NSQ_ENABLE"
	EnvNSQAddress       = "MINIO_NOTIFY_NSQ_NSQD_ADDRESS"
	EnvNSQTopic         = "MINIO_NOTIFY_NSQ_TOPIC"
	EnvNSQTLS           = "MINIO_NOTIFY_NSQ_TLS"
	EnvNSQTLSSkipVerify = "MINIO_NOTIFY_NSQ_TLS_SKIP_VERIFY"
	EnvNSQQueueDir      = "MINIO_NOTIFY_NSQ_QUEUE_DIR"
	EnvNSQQueueLimit    = "MINIO_NOTIFY_NSQ_QUEUE_LIMIT"
)

NSQ constants

View Source
const (
	PostgresFormat             = "format"
	PostgresConnectionString   = "connection_string"
	PostgresTable              = "table"
	PostgresHost               = "host"
	PostgresPort               = "port"
	PostgresUsername           = "username"
	PostgresPassword           = "password"
	PostgresDatabase           = "database"
	PostgresQueueDir           = "queue_dir"
	PostgresQueueLimit         = "queue_limit"
	PostgresMaxOpenConnections = "max_open_connections"

	EnvPostgresEnable             = "MINIO_NOTIFY_POSTGRES_ENABLE"
	EnvPostgresFormat             = "MINIO_NOTIFY_POSTGRES_FORMAT"
	EnvPostgresConnectionString   = "MINIO_NOTIFY_POSTGRES_CONNECTION_STRING"
	EnvPostgresTable              = "MINIO_NOTIFY_POSTGRES_TABLE"
	EnvPostgresHost               = "MINIO_NOTIFY_POSTGRES_HOST"
	EnvPostgresPort               = "MINIO_NOTIFY_POSTGRES_PORT"
	EnvPostgresUsername           = "MINIO_NOTIFY_POSTGRES_USERNAME"
	EnvPostgresPassword           = "MINIO_NOTIFY_POSTGRES_PASSWORD"
	EnvPostgresDatabase           = "MINIO_NOTIFY_POSTGRES_DATABASE"
	EnvPostgresQueueDir           = "MINIO_NOTIFY_POSTGRES_QUEUE_DIR"
	EnvPostgresQueueLimit         = "MINIO_NOTIFY_POSTGRES_QUEUE_LIMIT"
	EnvPostgresMaxOpenConnections = "MINIO_NOTIFY_POSTGRES_MAX_OPEN_CONNECTIONS"
)

Postgres constants

View Source
const (
	RedisFormat     = "format"
	RedisAddress    = "address"
	RedisPassword   = "password"
	RedisKey        = "key"
	RedisQueueDir   = "queue_dir"
	RedisQueueLimit = "queue_limit"

	EnvRedisEnable     = "MINIO_NOTIFY_REDIS_ENABLE"
	EnvRedisFormat     = "MINIO_NOTIFY_REDIS_FORMAT"
	EnvRedisAddress    = "MINIO_NOTIFY_REDIS_ADDRESS"
	EnvRedisPassword   = "MINIO_NOTIFY_REDIS_PASSWORD"
	EnvRedisKey        = "MINIO_NOTIFY_REDIS_KEY"
	EnvRedisQueueDir   = "MINIO_NOTIFY_REDIS_QUEUE_DIR"
	EnvRedisQueueLimit = "MINIO_NOTIFY_REDIS_QUEUE_LIMIT"
)

Redis constants

View Source
const (
	WebhookEndpoint   = "endpoint"
	WebhookAuthToken  = "auth_token"
	WebhookQueueDir   = "queue_dir"
	WebhookQueueLimit = "queue_limit"
	WebhookClientCert = "client_cert"
	WebhookClientKey  = "client_key"

	EnvWebhookEnable     = "MINIO_NOTIFY_WEBHOOK_ENABLE"
	EnvWebhookEndpoint   = "MINIO_NOTIFY_WEBHOOK_ENDPOINT"
	EnvWebhookAuthToken  = "MINIO_NOTIFY_WEBHOOK_AUTH_TOKEN"
	EnvWebhookQueueDir   = "MINIO_NOTIFY_WEBHOOK_QUEUE_DIR"
	EnvWebhookQueueLimit = "MINIO_NOTIFY_WEBHOOK_QUEUE_LIMIT"
	EnvWebhookClientCert = "MINIO_NOTIFY_WEBHOOK_CLIENT_CERT"
	EnvWebhookClientKey  = "MINIO_NOTIFY_WEBHOOK_CLIENT_KEY"
)

Webhook constants

Variables

KafkaSHA256 is a function that returns a crypto/sha256 hasher and should be used to create Client objects configured for SHA-256 hashing.

KafkaSHA512 is a function that returns a crypto/sha512 hasher and should be used to create Client objects configured for SHA-512 hashing.

Functions

func IsConnErr

func IsConnErr(err error) bool

IsConnErr - To detect a connection error.

func IsConnRefusedErr

func IsConnRefusedErr(err error) bool

IsConnRefusedErr - To check fot "connection refused" error.

func IsConnResetErr

func IsConnResetErr(err error) bool

IsConnResetErr - Checks for connection reset errors.

Types

type AMQPArgs

type AMQPArgs struct {
	Enable            bool     `json:"enable"`
	URL               xnet.URL `json:"url"`
	Exchange          string   `json:"exchange"`
	RoutingKey        string   `json:"routingKey"`
	ExchangeType      string   `json:"exchangeType"`
	DeliveryMode      uint8    `json:"deliveryMode"`
	Mandatory         bool     `json:"mandatory"`
	Immediate         bool     `json:"immediate"`
	Durable           bool     `json:"durable"`
	Internal          bool     `json:"internal"`
	NoWait            bool     `json:"noWait"`
	AutoDeleted       bool     `json:"autoDeleted"`
	PublisherConfirms bool     `json:"publisherConfirms"`
	QueueDir          string   `json:"queueDir"`
	QueueLimit        uint64   `json:"queueLimit"`
}

AMQPArgs - AMQP target arguments.

func (*AMQPArgs) Validate

func (a *AMQPArgs) Validate() error

Validate AMQP arguments

type AMQPTarget

type AMQPTarget struct {
	// contains filtered or unexported fields
}

AMQPTarget - AMQP target

func NewAMQPTarget

func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}), test bool) (*AMQPTarget, error)

NewAMQPTarget - creates new AMQP target.

func (*AMQPTarget) Close

func (target *AMQPTarget) Close() error

Close - does nothing and available for interface compatibility.

func (*AMQPTarget) HasQueueStore

func (target *AMQPTarget) HasQueueStore() bool

HasQueueStore - Checks if the queueStore has been configured for the target

func (*AMQPTarget) ID

func (target *AMQPTarget) ID() event.TargetID

ID - returns TargetID.

func (*AMQPTarget) IsActive

func (target *AMQPTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*AMQPTarget) Save

func (target *AMQPTarget) Save(eventData event.Event) error

Save - saves the events to the store which will be replayed when the amqp connection is active.

func (*AMQPTarget) Send

func (target *AMQPTarget) Send(eventKey string) error

Send - sends event to AMQP.

type ESSupportStatus

type ESSupportStatus string

ESSupportStatus is a typed string representing the support status for Elasticsearch

const (
	// ESSUnknown is default value
	ESSUnknown ESSupportStatus = "ESSUnknown"
	// ESSDeprecated -> support will be removed in future
	ESSDeprecated ESSupportStatus = "ESSDeprecated"
	// ESSUnsupported -> we wont work with this ES server
	ESSUnsupported ESSupportStatus = "ESSUnsupported"
	// ESSSupported -> all good!
	ESSSupported ESSupportStatus = "ESSSupported"
)

type ElasticsearchArgs

type ElasticsearchArgs struct {
	Enable     bool            `json:"enable"`
	Format     string          `json:"format"`
	URL        xnet.URL        `json:"url"`
	Index      string          `json:"index"`
	QueueDir   string          `json:"queueDir"`
	QueueLimit uint64          `json:"queueLimit"`
	Transport  *http.Transport `json:"-"`
	Username   string          `json:"username"`
	Password   string          `json:"password"`
}

ElasticsearchArgs - Elasticsearch target arguments.

func (ElasticsearchArgs) Validate

func (a ElasticsearchArgs) Validate() error

Validate ElasticsearchArgs fields

type ElasticsearchTarget

type ElasticsearchTarget struct {
	// contains filtered or unexported fields
}

ElasticsearchTarget - Elasticsearch target.

func NewElasticsearchTarget

func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*ElasticsearchTarget, error)

NewElasticsearchTarget - creates new Elasticsearch target.

func (*ElasticsearchTarget) Close

func (target *ElasticsearchTarget) Close() error

Close - does nothing and available for interface compatibility.

func (*ElasticsearchTarget) HasQueueStore

func (target *ElasticsearchTarget) HasQueueStore() bool

HasQueueStore - Checks if the queueStore has been configured for the target

func (*ElasticsearchTarget) ID

func (target *ElasticsearchTarget) ID() event.TargetID

ID - returns target ID.

func (*ElasticsearchTarget) IsActive

func (target *ElasticsearchTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*ElasticsearchTarget) Save

func (target *ElasticsearchTarget) Save(eventData event.Event) error

Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active.

func (*ElasticsearchTarget) Send

func (target *ElasticsearchTarget) Send(eventKey string) error

Send - reads an event from store and sends it to Elasticsearch.

type KafkaArgs

type KafkaArgs struct {
	Enable     bool        `json:"enable"`
	Brokers    []xnet.Host `json:"brokers"`
	Topic      string      `json:"topic"`
	QueueDir   string      `json:"queueDir"`
	QueueLimit uint64      `json:"queueLimit"`
	Version    string      `json:"version"`
	TLS        struct {
		Enable        bool               `json:"enable"`
		RootCAs       *x509.CertPool     `json:"-"`
		SkipVerify    bool               `json:"skipVerify"`
		ClientAuth    tls.ClientAuthType `json:"clientAuth"`
		ClientTLSCert string             `json:"clientTLSCert"`
		ClientTLSKey  string             `json:"clientTLSKey"`
	} `json:"tls"`
	SASL struct {
		Enable    bool   `json:"enable"`
		User      string `json:"username"`
		Password  string `json:"password"`
		Mechanism string `json:"mechanism"`
	} `json:"sasl"`
}

KafkaArgs - Kafka target arguments.

func (KafkaArgs) Validate

func (k KafkaArgs) Validate() error

Validate KafkaArgs fields

type KafkaTarget

type KafkaTarget struct {
	// contains filtered or unexported fields
}

KafkaTarget - Kafka target.

func NewKafkaTarget

func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*KafkaTarget, error)

NewKafkaTarget - creates new Kafka target with auth credentials.

func (*KafkaTarget) Close

func (target *KafkaTarget) Close() error

Close - closes underneath kafka connection.

func (*KafkaTarget) HasQueueStore

func (target *KafkaTarget) HasQueueStore() bool

HasQueueStore - Checks if the queueStore has been configured for the target

func (*KafkaTarget) ID

func (target *KafkaTarget) ID() event.TargetID

ID - returns target ID.

func (*KafkaTarget) IsActive

func (target *KafkaTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*KafkaTarget) Save

func (target *KafkaTarget) Save(eventData event.Event) error

Save - saves the events to the store which will be replayed when the Kafka connection is active.

func (*KafkaTarget) Send

func (target *KafkaTarget) Send(eventKey string) error

Send - reads an event from store and sends it to Kafka.

type MQTTArgs

type MQTTArgs struct {
	Enable               bool           `json:"enable"`
	Broker               xnet.URL       `json:"broker"`
	Topic                string         `json:"topic"`
	QoS                  byte           `json:"qos"`
	User                 string         `json:"username"`
	Password             string         `json:"password"`
	MaxReconnectInterval time.Duration  `json:"reconnectInterval"`
	KeepAlive            time.Duration  `json:"keepAliveInterval"`
	RootCAs              *x509.CertPool `json:"-"`
	QueueDir             string         `json:"queueDir"`
	QueueLimit           uint64         `json:"queueLimit"`
}

MQTTArgs - MQTT target arguments.

func (MQTTArgs) Validate

func (m MQTTArgs) Validate() error

Validate MQTTArgs fields

type MQTTTarget

type MQTTTarget struct {
	// contains filtered or unexported fields
}

MQTTTarget - MQTT target.

func NewMQTTTarget

func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*MQTTTarget, error)

NewMQTTTarget - creates new MQTT target.

func (*MQTTTarget) Close

func (target *MQTTTarget) Close() error

Close - does nothing and available for interface compatibility.

func (*MQTTTarget) HasQueueStore

func (target *MQTTTarget) HasQueueStore() bool

HasQueueStore - Checks if the queueStore has been configured for the target

func (*MQTTTarget) ID

func (target *MQTTTarget) ID() event.TargetID

ID - returns target ID.

func (*MQTTTarget) IsActive

func (target *MQTTTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*MQTTTarget) Save

func (target *MQTTTarget) Save(eventData event.Event) error

Save - saves the events to the store if queuestore is configured, which will be replayed when the mqtt connection is active.

func (*MQTTTarget) Send

func (target *MQTTTarget) Send(eventKey string) error

Send - reads an event from store and sends it to MQTT.

type MySQLArgs

type MySQLArgs struct {
	Enable             bool     `json:"enable"`
	Format             string   `json:"format"`
	DSN                string   `json:"dsnString"`
	Table              string   `json:"table"`
	Host               xnet.URL `json:"host"`
	Port               string   `json:"port"`
	User               string   `json:"user"`
	Password           string   `json:"password"`
	Database           string   `json:"database"`
	QueueDir           string   `json:"queueDir"`
	QueueLimit         uint64   `json:"queueLimit"`
	MaxOpenConnections int      `json:"maxOpenConnections"`
}

MySQLArgs - MySQL target arguments.

func (MySQLArgs) Validate

func (m MySQLArgs) Validate() error

Validate MySQLArgs fields

type MySQLTarget

type MySQLTarget struct {
	// contains filtered or unexported fields
}

MySQLTarget - MySQL target.

func NewMySQLTarget

func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*MySQLTarget, error)

NewMySQLTarget - creates new MySQL target.

func (*MySQLTarget) Close

func (target *MySQLTarget) Close() error

Close - closes underneath connections to MySQL database.

func (*MySQLTarget) HasQueueStore

func (target *MySQLTarget) HasQueueStore() bool

HasQueueStore - Checks if the queueStore has been configured for the target

func (*MySQLTarget) ID

func (target *MySQLTarget) ID() event.TargetID

ID - returns target ID.

func (*MySQLTarget) IsActive

func (target *MySQLTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*MySQLTarget) Save

func (target *MySQLTarget) Save(eventData event.Event) error

Save - saves the events to the store which will be replayed when the SQL connection is active.

func (*MySQLTarget) Send

func (target *MySQLTarget) Send(eventKey string) error

Send - reads an event from store and sends it to MySQL.

type NATSArgs

type NATSArgs struct {
	Enable        bool      `json:"enable"`
	Address       xnet.Host `json:"address"`
	Subject       string    `json:"subject"`
	Username      string    `json:"username"`
	Password      string    `json:"password"`
	Token         string    `json:"token"`
	TLS           bool      `json:"tls"`
	TLSSkipVerify bool      `json:"tlsSkipVerify"`
	Secure        bool      `json:"secure"`
	CertAuthority string    `json:"certAuthority"`
	ClientCert    string    `json:"clientCert"`
	ClientKey     string    `json:"clientKey"`
	PingInterval  int64     `json:"pingInterval"`
	QueueDir      string    `json:"queueDir"`
	QueueLimit    uint64    `json:"queueLimit"`
	Streaming     struct {
		Enable             bool   `json:"enable"`
		ClusterID          string `json:"clusterID"`
		Async              bool   `json:"async"`
		MaxPubAcksInflight int    `json:"maxPubAcksInflight"`
	} `json:"streaming"`

	RootCAs *x509.CertPool `json:"-"`
}

NATSArgs - NATS target arguments.

func (NATSArgs) Validate

func (n NATSArgs) Validate() error

Validate NATSArgs fields

type NATSTarget

type NATSTarget struct {
	// contains filtered or unexported fields
}

NATSTarget - NATS target.

func NewNATSTarget

func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*NATSTarget, error)

NewNATSTarget - creates new NATS target.

func (*NATSTarget) Close

func (target *NATSTarget) Close() (err error)

Close - closes underneath connections to NATS server.

func (*NATSTarget) HasQueueStore

func (target *NATSTarget) HasQueueStore() bool

HasQueueStore - Checks if the queueStore has been configured for the target

func (*NATSTarget) ID

func (target *NATSTarget) ID() event.TargetID

ID - returns target ID.

func (*NATSTarget) IsActive

func (target *NATSTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*NATSTarget) Save

func (target *NATSTarget) Save(eventData event.Event) error

Save - saves the events to the store which will be replayed when the Nats connection is active.

func (*NATSTarget) Send

func (target *NATSTarget) Send(eventKey string) error

Send - sends event to Nats.

type NSQArgs

type NSQArgs struct {
	Enable      bool      `json:"enable"`
	NSQDAddress xnet.Host `json:"nsqdAddress"`
	Topic       string    `json:"topic"`
	TLS         struct {
		Enable     bool `json:"enable"`
		SkipVerify bool `json:"skipVerify"`
	} `json:"tls"`
	QueueDir   string `json:"queueDir"`
	QueueLimit uint64 `json:"queueLimit"`
}

NSQArgs - NSQ target arguments.

func (NSQArgs) Validate

func (n NSQArgs) Validate() error

Validate NSQArgs fields

type NSQTarget

type NSQTarget struct {
	// contains filtered or unexported fields
}

NSQTarget - NSQ target.

func NewNSQTarget

func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*NSQTarget, error)

NewNSQTarget - creates new NSQ target.

func (*NSQTarget) Close

func (target *NSQTarget) Close() (err error)

Close - closes underneath connections to NSQD server.

func (*NSQTarget) HasQueueStore

func (target *NSQTarget) HasQueueStore() bool

HasQueueStore - Checks if the queueStore has been configured for the target

func (*NSQTarget) ID

func (target *NSQTarget) ID() event.TargetID

ID - returns target ID.

func (*NSQTarget) IsActive

func (target *NSQTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*NSQTarget) Save

func (target *NSQTarget) Save(eventData event.Event) error

Save - saves the events to the store which will be replayed when the nsq connection is active.

func (*NSQTarget) Send

func (target *NSQTarget) Send(eventKey string) error

Send - reads an event from store and sends it to NSQ.

type PostgreSQLArgs

type PostgreSQLArgs struct {
	Enable             bool      `json:"enable"`
	Format             string    `json:"format"`
	ConnectionString   string    `json:"connectionString"`
	Table              string    `json:"table"`
	Host               xnet.Host `json:"host"`     // default: localhost
	Port               string    `json:"port"`     // default: 5432
	Username           string    `json:"username"` // default: user running minio
	Password           string    `json:"password"` // default: no password
	Database           string    `json:"database"` // default: same as user
	QueueDir           string    `json:"queueDir"`
	QueueLimit         uint64    `json:"queueLimit"`
	MaxOpenConnections int       `json:"maxOpenConnections"`
}

PostgreSQLArgs - PostgreSQL target arguments.

func (PostgreSQLArgs) Validate

func (p PostgreSQLArgs) Validate() error

Validate PostgreSQLArgs fields

type PostgreSQLTarget

type PostgreSQLTarget struct {
	// contains filtered or unexported fields
}

PostgreSQLTarget - PostgreSQL target.

func NewPostgreSQLTarget

func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*PostgreSQLTarget, error)

NewPostgreSQLTarget - creates new PostgreSQL target.

func (*PostgreSQLTarget) Close

func (target *PostgreSQLTarget) Close() error

Close - closes underneath connections to PostgreSQL database.

func (*PostgreSQLTarget) HasQueueStore

func (target *PostgreSQLTarget) HasQueueStore() bool

HasQueueStore - Checks if the queueStore has been configured for the target

func (*PostgreSQLTarget) ID

func (target *PostgreSQLTarget) ID() event.TargetID

ID - returns target ID.

func (*PostgreSQLTarget) IsActive

func (target *PostgreSQLTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*PostgreSQLTarget) Save

func (target *PostgreSQLTarget) Save(eventData event.Event) error

Save - saves the events to the store if questore is configured, which will be replayed when the PostgreSQL connection is active.

func (*PostgreSQLTarget) Send

func (target *PostgreSQLTarget) Send(eventKey string) error

Send - reads an event from store and sends it to PostgreSQL.

type QueueStore

type QueueStore struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

QueueStore - Filestore for persisting events.

func (*QueueStore) Del

func (store *QueueStore) Del(key string) error

Del - Deletes an entry from the store.

func (*QueueStore) Get

func (store *QueueStore) Get(key string) (event event.Event, err error)

Get - gets a event from the store.

func (*QueueStore) List

func (store *QueueStore) List() ([]string, error)

List - lists all files from the directory.

func (*QueueStore) Open

func (store *QueueStore) Open() error

Open - Creates the directory if not present.

func (*QueueStore) Put

func (store *QueueStore) Put(e event.Event) error

Put - puts a event to the store.

type RedisAccessEvent

type RedisAccessEvent struct {
	Event     []event.Event
	EventTime string
}

RedisAccessEvent holds event log data and timestamp

type RedisArgs

type RedisArgs struct {
	Enable     bool      `json:"enable"`
	Format     string    `json:"format"`
	Addr       xnet.Host `json:"address"`
	Password   string    `json:"password"`
	Key        string    `json:"key"`
	QueueDir   string    `json:"queueDir"`
	QueueLimit uint64    `json:"queueLimit"`
}

RedisArgs - Redis target arguments.

func (RedisArgs) Validate

func (r RedisArgs) Validate() error

Validate RedisArgs fields

type RedisTarget

type RedisTarget struct {
	// contains filtered or unexported fields
}

RedisTarget - Redis target.

func NewRedisTarget

func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}), test bool) (*RedisTarget, error)

NewRedisTarget - creates new Redis target.

func (*RedisTarget) Close

func (target *RedisTarget) Close() error

Close - releases the resources used by the pool.

func (*RedisTarget) HasQueueStore

func (target *RedisTarget) HasQueueStore() bool

HasQueueStore - Checks if the queueStore has been configured for the target

func (*RedisTarget) ID

func (target *RedisTarget) ID() event.TargetID

ID - returns target ID.

func (*RedisTarget) IsActive

func (target *RedisTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*RedisTarget) Save

func (target *RedisTarget) Save(eventData event.Event) error

Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active.

func (*RedisTarget) Send

func (target *RedisTarget) Send(eventKey string) error

Send - reads an event from store and sends it to redis.

type Store

type Store interface {
	Put(event event.Event) error
	Get(key string) (event.Event, error)
	List() ([]string, error)
	Del(key string) error
	Open() error
}

Store - To persist the events.

func NewQueueStore

func NewQueueStore(directory string, limit uint64) Store

NewQueueStore - Creates an instance for QueueStore.

type WebhookArgs

type WebhookArgs struct {
	Enable     bool            `json:"enable"`
	Endpoint   xnet.URL        `json:"endpoint"`
	AuthToken  string          `json:"authToken"`
	Transport  *http.Transport `json:"-"`
	QueueDir   string          `json:"queueDir"`
	QueueLimit uint64          `json:"queueLimit"`
	ClientCert string          `json:"clientCert"`
	ClientKey  string          `json:"clientKey"`
}

WebhookArgs - Webhook target arguments.

func (WebhookArgs) Validate

func (w WebhookArgs) Validate() error

Validate WebhookArgs fields

type WebhookTarget

type WebhookTarget struct {
	// contains filtered or unexported fields
}

WebhookTarget - Webhook target.

func NewWebhookTarget

func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), transport *http.Transport, test bool) (*WebhookTarget, error)

NewWebhookTarget - creates new Webhook target.

func (*WebhookTarget) Close

func (target *WebhookTarget) Close() error

Close - does nothing and available for interface compatibility.

func (*WebhookTarget) HasQueueStore

func (target *WebhookTarget) HasQueueStore() bool

HasQueueStore - Checks if the queueStore has been configured for the target

func (WebhookTarget) ID

func (target WebhookTarget) ID() event.TargetID

ID - returns target ID.

func (*WebhookTarget) IsActive

func (target *WebhookTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*WebhookTarget) Save

func (target *WebhookTarget) Save(eventData event.Event) error

Save - saves the events to the store if queuestore is configured, which will be replayed when the webhook connection is active.

func (*WebhookTarget) Send

func (target *WebhookTarget) Send(eventKey string) error

Send - reads an event from store and sends it to webhook.

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

XDGSCRAMClient implements the client-side of an authentication conversation with a server. A new conversation must be created for each authentication attempt.

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

Begin constructs a SCRAM client component based on a given hash.Hash factory receiver. This constructor will normalize the username, password and authzID via the SASLprep algorithm, as recommended by RFC-5802. If SASLprep fails, the method returns an error.

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

Done returns true if the conversation is completed or has errored.

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Step takes a string provided from a server (or just an empty string for the very first conversation step) and attempts to move the authentication conversation forward. It returns a string to be sent to the server or an error if the server message is invalid. Calling Step after a conversation completes is also an error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL