Documentation ¶
Index ¶
- Constants
- func IsConnErr(err error) bool
- func IsConnRefusedErr(err error) bool
- func IsConnResetErr(err error) bool
- type AMQPArgs
- type AMQPTarget
- type ElasticsearchArgs
- type ElasticsearchTarget
- type HTTPClientTarget
- type KafkaArgs
- type KafkaTarget
- type MQTTArgs
- type MQTTTarget
- type MySQLArgs
- type MySQLTarget
- type NATSArgs
- type NATSTarget
- type NSQArgs
- type NSQTarget
- type PostgreSQLArgs
- type PostgreSQLTarget
- type QueueStore
- type RedisArgs
- type RedisTarget
- type Store
- type WebhookArgs
- type WebhookTarget
Constants ¶
const ( AmqpQueueDir = "queue_dir" AmqpQueueLimit = "queue_limit" AmqpURL = "url" AmqpExchange = "exchange" AmqpRoutingKey = "routing_key" AmqpExchangeType = "exchange_type" AmqpDeliveryMode = "delivery_mode" AmqpMandatory = "mandatory" AmqpImmediate = "immediate" AmqpDurable = "durable" AmqpInternal = "internal" AmqpNoWait = "no_wait" AmqpAutoDeleted = "auto_deleted" AmqpArguments = "arguments" AmqpPublishingHeaders = "publishing_headers" EnvAMQPEnable = "MINIO_NOTIFY_AMQP_ENABLE" EnvAMQPURL = "MINIO_NOTIFY_AMQP_URL" EnvAMQPExchange = "MINIO_NOTIFY_AMQP_EXCHANGE" EnvAMQPRoutingKey = "MINIO_NOTIFY_AMQP_ROUTING_KEY" EnvAMQPExchangeType = "MINIO_NOTIFY_AMQP_EXCHANGE_TYPE" EnvAMQPDeliveryMode = "MINIO_NOTIFY_AMQP_DELIVERY_MODE" EnvAMQPMandatory = "MINIO_NOTIFY_AMQP_MANDATORY" EnvAMQPImmediate = "MINIO_NOTIFY_AMQP_IMMEDIATE" EnvAMQPDurable = "MINIO_NOTIFY_AMQP_DURABLE" EnvAMQPInternal = "MINIO_NOTIFY_AMQP_INTERNAL" EnvAMQPNoWait = "MINIO_NOTIFY_AMQP_NO_WAIT" EnvAMQPAutoDeleted = "MINIO_NOTIFY_AMQP_AUTO_DELETED" EnvAMQPArguments = "MINIO_NOTIFY_AMQP_ARGUMENTS" EnvAMQPPublishingHeaders = "MINIO_NOTIFY_AMQP_PUBLISHING_HEADERS" EnvAMQPQueueDir = "MINIO_NOTIFY_AMQP_QUEUE_DIR" EnvAMQPQueueLimit = "MINIO_NOTIFY_AMQP_QUEUE_LIMIT" )
AMQP input constants.
const ( ElasticFormat = "format" ElasticURL = "url" ElasticIndex = "index" ElasticQueueDir = "queue_dir" ElasticQueueLimit = "queue_limit" EnvElasticEnable = "MINIO_NOTIFY_ELASTICSEARCH_ENABLE" EnvElasticFormat = "MINIO_NOTIFY_ELASTICSEARCH_FORMAT" EnvElasticURL = "MINIO_NOTIFY_ELASTICSEARCH_URL" EnvElasticIndex = "MINIO_NOTIFY_ELASTICSEARCH_INDEX" EnvElasticQueueDir = "MINIO_NOTIFY_ELASTICSEARCH_QUEUE_DIR" EnvElasticQueueLimit = "MINIO_NOTIFY_ELASTICSEARCH_QUEUE_LIMIT" )
Elastic constants
const ( KafkaBrokers = "brokers" KafkaTopic = "topic" KafkaQueueDir = "queue_dir" KafkaQueueLimit = "queue_limit" KafkaTLS = "tls" KafkaTLSSkipVerify = "tls_skip_verify" KafkaTLSClientAuth = "tls_client_auth" KafkaSASL = "sasl" KafkaSASLUsername = "sasl_username" KafkaSASLPassword = "sasl_password" KafkaClientTLSCert = "client_tls_cert" KafkaClientTLSKey = "client_tls_key" EnvKafkaEnable = "MINIO_NOTIFY_KAFKA_ENABLE" EnvKafkaBrokers = "MINIO_NOTIFY_KAFKA_BROKERS" EnvKafkaTopic = "MINIO_NOTIFY_KAFKA_TOPIC" EnvKafkaQueueDir = "MINIO_NOTIFY_KAFKA_QUEUE_DIR" EnvKafkaQueueLimit = "MINIO_NOTIFY_KAFKA_QUEUE_LIMIT" EnvKafkaTLS = "MINIO_NOTIFY_KAFKA_TLS" EnvKafkaTLSSkipVerify = "MINIO_NOTIFY_KAFKA_TLS_SKIP_VERIFY" EnvKafkaTLSClientAuth = "MINIO_NOTIFY_KAFKA_TLS_CLIENT_AUTH" EnvKafkaSASLEnable = "MINIO_NOTIFY_KAFKA_SASL" EnvKafkaSASLUsername = "MINIO_NOTIFY_KAFKA_SASL_USERNAME" EnvKafkaSASLPassword = "MINIO_NOTIFY_KAFKA_SASL_PASSWORD" EnvKafkaClientTLSCert = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_CERT" EnvKafkaClientTLSKey = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY" )
MQTT input constants
const ( MqttBroker = "broker" MqttTopic = "topic" MqttQoS = "qos" MqttUsername = "username" MqttPassword = "password" MqttReconnectInterval = "reconnect_interval" MqttKeepAliveInterval = "keep_alive_interval" MqttQueueDir = "queue_dir" MqttQueueLimit = "queue_limit" EnvMQTTEnable = "MINIO_NOTIFY_MQTT_ENABLE" EnvMQTTBroker = "MINIO_NOTIFY_MQTT_BROKER" EnvMQTTTopic = "MINIO_NOTIFY_MQTT_TOPIC" EnvMQTTQoS = "MINIO_NOTIFY_MQTT_QOS" EnvMQTTUsername = "MINIO_NOTIFY_MQTT_USERNAME" EnvMQTTPassword = "MINIO_NOTIFY_MQTT_PASSWORD" EnvMQTTReconnectInterval = "MINIO_NOTIFY_MQTT_RECONNECT_INTERVAL" EnvMQTTKeepAliveInterval = "MINIO_NOTIFY_MQTT_KEEP_ALIVE_INTERVAL" EnvMQTTQueueDir = "MINIO_NOTIFY_MQTT_QUEUE_DIR" EnvMQTTQueueLimit = "MINIO_NOTIFY_MQTT_QUEUE_LIMIT" )
MQTT input constants
const ( MySQLFormat = "format" MySQLDSNString = "dsn_string" MySQLTable = "table" MySQLHost = "host" MySQLPort = "port" MySQLUsername = "username" MySQLPassword = "password" MySQLDatabase = "database" MySQLQueueLimit = "queue_limit" MySQLQueueDir = "queue_dir" EnvMySQLEnable = "MINIO_NOTIFY_MYSQL_ENABLE" EnvMySQLFormat = "MINIO_NOTIFY_MYSQL_FORMAT" EnvMySQLDSNString = "MINIO_NOTIFY_MYSQL_DSN_STRING" EnvMySQLTable = "MINIO_NOTIFY_MYSQL_TABLE" EnvMySQLHost = "MINIO_NOTIFY_MYSQL_HOST" EnvMySQLPort = "MINIO_NOTIFY_MYSQL_PORT" EnvMySQLUsername = "MINIO_NOTIFY_MYSQL_USERNAME" EnvMySQLPassword = "MINIO_NOTIFY_MYSQL_PASSWORD" EnvMySQLDatabase = "MINIO_NOTIFY_MYSQL_DATABASE" EnvMySQLQueueLimit = "MINIO_NOTIFY_MYSQL_QUEUE_LIMIT" EnvMySQLQueueDir = "MINIO_NOTIFY_MYSQL_QUEUE_DIR" )
MySQL related constants
const ( NATSAddress = "address" NATSSubject = "subject" NATSUsername = "username" NATSPassword = "password" NATSToken = "token" NATSTLS = "tls" NATSTLSSkipVerify = "tls_skip_verify" NATSPingInterval = "ping_interval" NATSQueueDir = "queue_dir" NATSQueueLimit = "queue_limit" NATSCertAuthority = "cert_authority" NATSClientCert = "client_cert" NATSClientKey = "client_key" // Streaming constants NATSStreaming = "streaming" NATSStreamingClusterID = "streaming_cluster_id" NATSStreamingAsync = "streaming_async" NATSStreamingMaxPubAcksInFlight = "streaming_max_pub_acks_in_flight" EnvNATSEnable = "MINIO_NOTIFY_NATS_ENABLE" EnvNATSAddress = "MINIO_NOTIFY_NATS_ADDRESS" EnvNATSSubject = "MINIO_NOTIFY_NATS_SUBJECT" EnvNATSUsername = "MINIO_NOTIFY_NATS_USERNAME" EnvNATSPassword = "MINIO_NOTIFY_NATS_PASSWORD" EnvNATSToken = "MINIO_NOTIFY_NATS_TOKEN" EnvNATSTLS = "MINIO_NOTIFY_NATS_TLS" EnvNATSTLSSkipVerify = "MINIO_NOTIFY_NATS_TLS_SKIP_VERIFY" EnvNATSPingInterval = "MINIO_NOTIFY_NATS_PING_INTERVAL" EnvNATSQueueDir = "MINIO_NOTIFY_NATS_QUEUE_DIR" EnvNATSQueueLimit = "MINIO_NOTIFY_NATS_QUEUE_LIMIT" EnvNATSCertAuthority = "MINIO_NOTIFY_NATS_CERT_AUTHORITY" EnvNATSClientCert = "MINIO_NOTIFY_NATS_CLIENT_CERT" EnvNATSClientKey = "MINIO_NOTIFY_NATS_CLIENT_KEY" // Streaming constants EnvNATSStreaming = "MINIO_NOTIFY_NATS_STREAMING" EnvNATSStreamingClusterID = "MINIO_NOTIFY_NATS_STREAMING_CLUSTER_ID" EnvNATSStreamingAsync = "MINIO_NOTIFY_NATS_STREAMING_ASYNC" EnvNATSStreamingMaxPubAcksInFlight = "MINIO_NOTIFY_NATS_STREAMING_MAX_PUB_ACKS_IN_FLIGHT" )
NATS related constants
const ( NSQAddress = "nsqd_address" NSQTopic = "topic" NSQTLS = "tls" NSQTLSSkipVerify = "tls_skip_verify" NSQQueueDir = "queue_dir" NSQQueueLimit = "queue_limit" EnvNSQEnable = "MINIO_NOTIFY_NSQ" EnvNSQAddress = "MINIO_NOTIFY_NSQ_NSQD_ADDRESS" EnvNSQTopic = "MINIO_NOTIFY_NSQ_TOPIC" EnvNSQTLS = "MINIO_NOTIFY_NSQ_TLS" EnvNSQTLSSkipVerify = "MINIO_NOTIFY_NSQ_TLS_SKIP_VERIFY" EnvNSQQueueDir = "MINIO_NOTIFY_NSQ_QUEUE_DIR" EnvNSQQueueLimit = "MINIO_NOTIFY_NSQ_QUEUE_LIMIT" )
NSQ constants
const ( PostgresFormat = "format" PostgresConnectionString = "connection_string" PostgresTable = "table" PostgresHost = "host" PostgresPort = "port" PostgresUsername = "username" PostgresPassword = "password" PostgresDatabase = "database" PostgresQueueDir = "queue_dir" PostgresQueueLimit = "queue_limit" EnvPostgresEnable = "MINIO_NOTIFY_POSTGRES_ENABLE" EnvPostgresFormat = "MINIO_NOTIFY_POSTGRES_FORMAT" EnvPostgresConnectionString = "MINIO_NOTIFY_POSTGRES_CONNECTION_STRING" EnvPostgresTable = "MINIO_NOTIFY_POSTGRES_TABLE" EnvPostgresHost = "MINIO_NOTIFY_POSTGRES_HOST" EnvPostgresPort = "MINIO_NOTIFY_POSTGRES_PORT" EnvPostgresUsername = "MINIO_NOTIFY_POSTGRES_USERNAME" EnvPostgresPassword = "MINIO_NOTIFY_POSTGRES_PASSWORD" EnvPostgresDatabase = "MINIO_NOTIFY_POSTGRES_DATABASE" EnvPostgresQueueDir = "MINIO_NOTIFY_POSTGRES_QUEUE_DIR" EnvPostgresQueueLimit = "MINIO_NOTIFY_POSTGRES_QUEUE_LIMIT" )
Postgres constants
const ( RedisFormat = "format" RedisAddress = "address" RedisPassword = "password" RedisKey = "key" RedisQueueDir = "queue_dir" RedisQueueLimit = "queue_limit" EnvRedisEnable = "MINIO_NOTIFY_REDIS_ENABLE" EnvRedisFormat = "MINIO_NOTIFY_REDIS_FORMAT" EnvRedisAddress = "MINIO_NOTIFY_REDIS_ADDRESS" EnvRedisPassword = "MINIO_NOTIFY_REDIS_PASSWORD" EnvRedisKey = "MINIO_NOTIFY_REDIS_KEY" EnvRedisQueueDir = "MINIO_NOTIFY_REDIS_QUEUE_DIR" EnvRedisQueueLimit = "MINIO_NOTIFY_REDIS_QUEUE_LIMIT" )
Redis constants
const ( WebhookEndpoint = "endpoint" WebhookAuthToken = "auth_token" WebhookQueueDir = "queue_dir" WebhookQueueLimit = "queue_limit" EnvWebhookEnable = "MINIO_NOTIFY_WEBHOOK_ENABLE" EnvWebhookEndpoint = "MINIO_NOTIFY_WEBHOOK_ENDPOINT" EnvWebhookAuthToken = "MINIO_NOTIFY_WEBHOOK_AUTH_TOKEN" EnvWebhookQueueDir = "MINIO_NOTIFY_WEBHOOK_QUEUE_DIR" EnvWebhookQueueLimit = "MINIO_NOTIFY_WEBHOOK_QUEUE_LIMIT" )
Webhook constants
Variables ¶
This section is empty.
Functions ¶
func IsConnRefusedErr ¶
IsConnRefusedErr - To check fot "connection refused" error.
func IsConnResetErr ¶
IsConnResetErr - Checks for connection reset errors.
Types ¶
type AMQPArgs ¶
type AMQPArgs struct { Enable bool `json:"enable"` URL xnet.URL `json:"url"` Exchange string `json:"exchange"` RoutingKey string `json:"routingKey"` ExchangeType string `json:"exchangeType"` DeliveryMode uint8 `json:"deliveryMode"` Mandatory bool `json:"mandatory"` Immediate bool `json:"immediate"` Durable bool `json:"durable"` Internal bool `json:"internal"` NoWait bool `json:"noWait"` AutoDeleted bool `json:"autoDeleted"` QueueDir string `json:"queueDir"` QueueLimit uint64 `json:"queueLimit"` }
AMQPArgs - AMQP target arguments.
type AMQPTarget ¶
type AMQPTarget struct {
// contains filtered or unexported fields
}
AMQPTarget - AMQP target
func NewAMQPTarget ¶
func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}), test bool) (*AMQPTarget, error)
NewAMQPTarget - creates new AMQP target.
func (*AMQPTarget) Close ¶
func (target *AMQPTarget) Close() error
Close - does nothing and available for interface compatibility.
func (*AMQPTarget) IsActive ¶
func (target *AMQPTarget) IsActive() (bool, error)
IsActive - Return true if target is up and active
func (*AMQPTarget) Save ¶
func (target *AMQPTarget) Save(eventData event.Event) error
Save - saves the events to the store which will be replayed when the amqp connection is active.
func (*AMQPTarget) Send ¶
func (target *AMQPTarget) Send(eventKey string) error
Send - sends event to AMQP.
type ElasticsearchArgs ¶
type ElasticsearchArgs struct { Enable bool `json:"enable"` Format string `json:"format"` URL xnet.URL `json:"url"` Index string `json:"index"` QueueDir string `json:"queueDir"` QueueLimit uint64 `json:"queueLimit"` }
ElasticsearchArgs - Elasticsearch target arguments.
func (ElasticsearchArgs) Validate ¶
func (a ElasticsearchArgs) Validate() error
Validate ElasticsearchArgs fields
type ElasticsearchTarget ¶
type ElasticsearchTarget struct {
// contains filtered or unexported fields
}
ElasticsearchTarget - Elasticsearch target.
func NewElasticsearchTarget ¶
func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*ElasticsearchTarget, error)
NewElasticsearchTarget - creates new Elasticsearch target.
func (*ElasticsearchTarget) Close ¶
func (target *ElasticsearchTarget) Close() error
Close - does nothing and available for interface compatibility.
func (*ElasticsearchTarget) ID ¶
func (target *ElasticsearchTarget) ID() event.TargetID
ID - returns target ID.
func (*ElasticsearchTarget) IsActive ¶
func (target *ElasticsearchTarget) IsActive() (bool, error)
IsActive - Return true if target is up and active
func (*ElasticsearchTarget) Save ¶
func (target *ElasticsearchTarget) Save(eventData event.Event) error
Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active.
func (*ElasticsearchTarget) Send ¶
func (target *ElasticsearchTarget) Send(eventKey string) error
Send - reads an event from store and sends it to Elasticsearch.
type HTTPClientTarget ¶
type HTTPClientTarget struct { DoneCh chan struct{} // contains filtered or unexported fields }
HTTPClientTarget - HTTP client target.
func NewHTTPClientTarget ¶
func NewHTTPClientTarget(host xnet.Host, w http.ResponseWriter) (*HTTPClientTarget, error)
NewHTTPClientTarget - creates new HTTP client target.
func (*HTTPClientTarget) Close ¶
func (target *HTTPClientTarget) Close() error
Close - closes underneath goroutine.
func (HTTPClientTarget) ID ¶
func (target HTTPClientTarget) ID() event.TargetID
ID - returns target ID.
func (*HTTPClientTarget) IsActive ¶
func (target *HTTPClientTarget) IsActive() (bool, error)
IsActive - does nothing and available for interface compatibility.
func (*HTTPClientTarget) Save ¶
func (target *HTTPClientTarget) Save(eventData event.Event) error
Save - sends event to HTTP client.
func (*HTTPClientTarget) Send ¶
func (target *HTTPClientTarget) Send(eventKey string) error
Send - interface compatible method does no-op.
type KafkaArgs ¶
type KafkaArgs struct { Enable bool `json:"enable"` Brokers []xnet.Host `json:"brokers"` Topic string `json:"topic"` QueueDir string `json:"queueDir"` QueueLimit uint64 `json:"queueLimit"` TLS struct { Enable bool `json:"enable"` RootCAs *x509.CertPool `json:"-"` SkipVerify bool `json:"skipVerify"` ClientAuth tls.ClientAuthType `json:"clientAuth"` ClientTLSCert string `json:"clientTLSCert"` ClientTLSKey string `json:"clientTLSKey"` } `json:"tls"` SASL struct { Enable bool `json:"enable"` User string `json:"username"` Password string `json:"password"` } `json:"sasl"` }
KafkaArgs - Kafka target arguments.
type KafkaTarget ¶
type KafkaTarget struct {
// contains filtered or unexported fields
}
KafkaTarget - Kafka target.
func NewKafkaTarget ¶
func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*KafkaTarget, error)
NewKafkaTarget - creates new Kafka target with auth credentials.
func (*KafkaTarget) Close ¶
func (target *KafkaTarget) Close() error
Close - closes underneath kafka connection.
func (*KafkaTarget) IsActive ¶
func (target *KafkaTarget) IsActive() (bool, error)
IsActive - Return true if target is up and active
func (*KafkaTarget) Save ¶
func (target *KafkaTarget) Save(eventData event.Event) error
Save - saves the events to the store which will be replayed when the Kafka connection is active.
func (*KafkaTarget) Send ¶
func (target *KafkaTarget) Send(eventKey string) error
Send - reads an event from store and sends it to Kafka.
type MQTTArgs ¶
type MQTTArgs struct { Enable bool `json:"enable"` Broker xnet.URL `json:"broker"` Topic string `json:"topic"` QoS byte `json:"qos"` User string `json:"username"` Password string `json:"password"` MaxReconnectInterval time.Duration `json:"reconnectInterval"` KeepAlive time.Duration `json:"keepAliveInterval"` RootCAs *x509.CertPool `json:"-"` QueueDir string `json:"queueDir"` QueueLimit uint64 `json:"queueLimit"` }
MQTTArgs - MQTT target arguments.
type MQTTTarget ¶
type MQTTTarget struct {
// contains filtered or unexported fields
}
MQTTTarget - MQTT target.
func NewMQTTTarget ¶
func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*MQTTTarget, error)
NewMQTTTarget - creates new MQTT target.
func (*MQTTTarget) Close ¶
func (target *MQTTTarget) Close() error
Close - does nothing and available for interface compatibility.
func (*MQTTTarget) IsActive ¶
func (target *MQTTTarget) IsActive() (bool, error)
IsActive - Return true if target is up and active
func (*MQTTTarget) Save ¶
func (target *MQTTTarget) Save(eventData event.Event) error
Save - saves the events to the store if queuestore is configured, which will be replayed when the mqtt connection is active.
func (*MQTTTarget) Send ¶
func (target *MQTTTarget) Send(eventKey string) error
Send - reads an event from store and sends it to MQTT.
type MySQLArgs ¶
type MySQLArgs struct { Enable bool `json:"enable"` Format string `json:"format"` DSN string `json:"dsnString"` Table string `json:"table"` Host xnet.URL `json:"host"` Port string `json:"port"` User string `json:"user"` Password string `json:"password"` Database string `json:"database"` QueueDir string `json:"queueDir"` QueueLimit uint64 `json:"queueLimit"` }
MySQLArgs - MySQL target arguments.
type MySQLTarget ¶
type MySQLTarget struct {
// contains filtered or unexported fields
}
MySQLTarget - MySQL target.
func NewMySQLTarget ¶
func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*MySQLTarget, error)
NewMySQLTarget - creates new MySQL target.
func (*MySQLTarget) Close ¶
func (target *MySQLTarget) Close() error
Close - closes underneath connections to MySQL database.
func (*MySQLTarget) IsActive ¶
func (target *MySQLTarget) IsActive() (bool, error)
IsActive - Return true if target is up and active
func (*MySQLTarget) Save ¶
func (target *MySQLTarget) Save(eventData event.Event) error
Save - saves the events to the store which will be replayed when the SQL connection is active.
func (*MySQLTarget) Send ¶
func (target *MySQLTarget) Send(eventKey string) error
Send - reads an event from store and sends it to MySQL.
type NATSArgs ¶
type NATSArgs struct { Enable bool `json:"enable"` Address xnet.Host `json:"address"` Subject string `json:"subject"` Username string `json:"username"` Password string `json:"password"` Token string `json:"token"` TLS bool `json:"tls"` TLSSkipVerify bool `json:"tlsSkipVerify"` Secure bool `json:"secure"` CertAuthority string `json:"certAuthority"` ClientCert string `json:"clientCert"` ClientKey string `json:"clientKey"` PingInterval int64 `json:"pingInterval"` QueueDir string `json:"queueDir"` QueueLimit uint64 `json:"queueLimit"` Streaming struct { Enable bool `json:"enable"` ClusterID string `json:"clusterID"` Async bool `json:"async"` MaxPubAcksInflight int `json:"maxPubAcksInflight"` } `json:"streaming"` RootCAs *x509.CertPool `json:"-"` }
NATSArgs - NATS target arguments.
type NATSTarget ¶
type NATSTarget struct {
// contains filtered or unexported fields
}
NATSTarget - NATS target.
func NewNATSTarget ¶
func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*NATSTarget, error)
NewNATSTarget - creates new NATS target.
func (*NATSTarget) Close ¶
func (target *NATSTarget) Close() (err error)
Close - closes underneath connections to NATS server.
func (*NATSTarget) IsActive ¶
func (target *NATSTarget) IsActive() (bool, error)
IsActive - Return true if target is up and active
func (*NATSTarget) Save ¶
func (target *NATSTarget) Save(eventData event.Event) error
Save - saves the events to the store which will be replayed when the Nats connection is active.
func (*NATSTarget) Send ¶
func (target *NATSTarget) Send(eventKey string) error
Send - sends event to Nats.
type NSQArgs ¶
type NSQArgs struct { Enable bool `json:"enable"` NSQDAddress xnet.Host `json:"nsqdAddress"` Topic string `json:"topic"` TLS struct { Enable bool `json:"enable"` SkipVerify bool `json:"skipVerify"` } `json:"tls"` QueueDir string `json:"queueDir"` QueueLimit uint64 `json:"queueLimit"` }
NSQArgs - NSQ target arguments.
type NSQTarget ¶
type NSQTarget struct {
// contains filtered or unexported fields
}
NSQTarget - NSQ target.
func NewNSQTarget ¶
func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*NSQTarget, error)
NewNSQTarget - creates new NSQ target.
type PostgreSQLArgs ¶
type PostgreSQLArgs struct { Enable bool `json:"enable"` Format string `json:"format"` ConnectionString string `json:"connectionString"` Table string `json:"table"` Host xnet.Host `json:"host"` // default: localhost Port string `json:"port"` // default: 5432 User string `json:"user"` // default: user running minio Password string `json:"password"` // default: no password Database string `json:"database"` // default: same as user QueueDir string `json:"queueDir"` QueueLimit uint64 `json:"queueLimit"` }
PostgreSQLArgs - PostgreSQL target arguments.
func (PostgreSQLArgs) Validate ¶
func (p PostgreSQLArgs) Validate() error
Validate PostgreSQLArgs fields
type PostgreSQLTarget ¶
type PostgreSQLTarget struct {
// contains filtered or unexported fields
}
PostgreSQLTarget - PostgreSQL target.
func NewPostgreSQLTarget ¶
func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*PostgreSQLTarget, error)
NewPostgreSQLTarget - creates new PostgreSQL target.
func (*PostgreSQLTarget) Close ¶
func (target *PostgreSQLTarget) Close() error
Close - closes underneath connections to PostgreSQL database.
func (*PostgreSQLTarget) ID ¶
func (target *PostgreSQLTarget) ID() event.TargetID
ID - returns target ID.
func (*PostgreSQLTarget) IsActive ¶
func (target *PostgreSQLTarget) IsActive() (bool, error)
IsActive - Return true if target is up and active
func (*PostgreSQLTarget) Save ¶
func (target *PostgreSQLTarget) Save(eventData event.Event) error
Save - saves the events to the store if questore is configured, which will be replayed when the PostgreSQL connection is active.
func (*PostgreSQLTarget) Send ¶
func (target *PostgreSQLTarget) Send(eventKey string) error
Send - reads an event from store and sends it to PostgreSQL.
type QueueStore ¶
QueueStore - Filestore for persisting events.
func (*QueueStore) Del ¶
func (store *QueueStore) Del(key string) error
Del - Deletes an entry from the store.
func (*QueueStore) Get ¶
func (store *QueueStore) Get(key string) (event event.Event, err error)
Get - gets a event from the store.
func (*QueueStore) List ¶
func (store *QueueStore) List() ([]string, error)
List - lists all files from the directory.
func (*QueueStore) Open ¶
func (store *QueueStore) Open() error
Open - Creates the directory if not present.
type RedisArgs ¶
type RedisArgs struct { Enable bool `json:"enable"` Format string `json:"format"` Addr xnet.Host `json:"address"` Password string `json:"password"` Key string `json:"key"` QueueDir string `json:"queueDir"` QueueLimit uint64 `json:"queueLimit"` }
RedisArgs - Redis target arguments.
type RedisTarget ¶
type RedisTarget struct {
// contains filtered or unexported fields
}
RedisTarget - Redis target.
func NewRedisTarget ¶
func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}), test bool) (*RedisTarget, error)
NewRedisTarget - creates new Redis target.
func (*RedisTarget) Close ¶
func (target *RedisTarget) Close() error
Close - releases the resources used by the pool.
func (*RedisTarget) IsActive ¶
func (target *RedisTarget) IsActive() (bool, error)
IsActive - Return true if target is up and active
func (*RedisTarget) Save ¶
func (target *RedisTarget) Save(eventData event.Event) error
Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active.
func (*RedisTarget) Send ¶
func (target *RedisTarget) Send(eventKey string) error
Send - reads an event from store and sends it to redis.
type Store ¶
type Store interface { Put(event event.Event) error Get(key string) (event.Event, error) List() ([]string, error) Del(key string) error Open() error }
Store - To persist the events.
func NewQueueStore ¶
NewQueueStore - Creates an instance for QueueStore.
type WebhookArgs ¶
type WebhookArgs struct { Enable bool `json:"enable"` Endpoint xnet.URL `json:"endpoint"` AuthToken string `json:"authToken"` Transport *http.Transport `json:"-"` QueueDir string `json:"queueDir"` QueueLimit uint64 `json:"queueLimit"` }
WebhookArgs - Webhook target arguments.
type WebhookTarget ¶
type WebhookTarget struct {
// contains filtered or unexported fields
}
WebhookTarget - Webhook target.
func NewWebhookTarget ¶
func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), transport *http.Transport, test bool) (*WebhookTarget, error)
NewWebhookTarget - creates new Webhook target.
func (*WebhookTarget) Close ¶
func (target *WebhookTarget) Close() error
Close - does nothing and available for interface compatibility.
func (*WebhookTarget) IsActive ¶
func (target *WebhookTarget) IsActive() (bool, error)
IsActive - Return true if target is up and active
func (*WebhookTarget) Save ¶
func (target *WebhookTarget) Save(eventData event.Event) error
Save - saves the events to the store if queuestore is configured, which will be replayed when the wenhook connection is active.
func (*WebhookTarget) Send ¶
func (target *WebhookTarget) Send(eventKey string) error
Send - reads an event from store and sends it to webhook.