sink

package
v0.11.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2022 License: AGPL-3.0 Imports: 37 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AMPLITUDE_STANDARD_ENDPOINT string = "https://api2.amplitude.com/2/httpapi"
	AMPLITUDE_EU_ENDPOINT       string = "https://api.eu.amplitude.com/2/httpapi"
	AMPLITUDE_STANDARD          string = "standard"
	AMPLITUDE_EU                string = "eu"
)
View Source
const (
	DEFAULT_PARTITIONS         int32 = 3
	DEFAULT_REPLICATION_FACTOR int16 = 1 // NOTE! Really not a good default.
)
View Source
const (
	PUBNUB_PUBLISH_URL   string = "ps.pndsn.com/publish"
	PUBNUB_HTTP_PROTOCOL string = "https"
)
View Source
const (
	PUBSUB           string = "pubsub"
	REDPANDA         string = "redpanda"
	KAFKA            string = "kafka"
	KINESIS          string = "kinesis"
	KINESIS_FIREHOSE string = "kinesis-firehose"
	STDOUT           string = "stdout"
	HTTP             string = "http"
	HTTPS            string = "https"
	BLACKHOLE        string = "blackhole"
	FILE             string = "file"
	PUBNUB           string = "pubnub"
	NATS             string = "nats"
	NATS_JETSTREAM   string = "nats-jetstream"
	INDICATIVE       string = "indicative"
	AMPLITUDE        string = "amplitude"
)
View Source
const INDICATIVE_BATCH_ENDPOINT string = "https://api.indicative.com/service/event/batch"
View Source
const INIT_TIMEOUT_SECONDS = 10

Variables

View Source
var (
	Info = Teal
	Warn = Yellow
	Fata = Red
)
View Source
var (
	Black   = Colorize("\033[1;30m%s\033[0m")
	Red     = Colorize("\033[1;31m%s\033[0m")
	Green   = Colorize("\033[1;32m%s\033[0m")
	Yellow  = Colorize("\033[1;33m%s\033[0m")
	Purple  = Colorize("\033[1;34m%s\033[0m")
	Magenta = Colorize("\033[1;35m%s\033[0m")
	Teal    = Colorize("\033[1;36m%s\033[0m")
	White   = Colorize("\033[1;37m%s\033[0m")
)

Functions

func Colorize

func Colorize(colorString string) func(...interface{}) string

func InitializeSink

func InitializeSink(conf config.Sink, s Sink) error

Types

type AmplitudeSink

type AmplitudeSink struct {
	// contains filtered or unexported fields
}

func (*AmplitudeSink) BatchPublishInvalid

func (s *AmplitudeSink) BatchPublishInvalid(ctx context.Context, envelopes []envelope.Envelope) error

func (*AmplitudeSink) BatchPublishValid

func (s *AmplitudeSink) BatchPublishValid(ctx context.Context, envelopes []envelope.Envelope) error

func (*AmplitudeSink) Close

func (s *AmplitudeSink) Close()

func (*AmplitudeSink) DeliveryRequired

func (s *AmplitudeSink) DeliveryRequired() bool

func (*AmplitudeSink) Id

func (s *AmplitudeSink) Id() *uuid.UUID

func (*AmplitudeSink) Initialize

func (s *AmplitudeSink) Initialize(conf config.Sink) error

func (*AmplitudeSink) Name

func (s *AmplitudeSink) Name() string

func (*AmplitudeSink) Type

func (s *AmplitudeSink) Type() string

type BlackholeSink

type BlackholeSink struct {
	// contains filtered or unexported fields
}

func (*BlackholeSink) BatchPublishInvalid

func (s *BlackholeSink) BatchPublishInvalid(ctx context.Context, invalidEnvelopes []envelope.Envelope) error

func (*BlackholeSink) BatchPublishValid

func (s *BlackholeSink) BatchPublishValid(ctx context.Context, validEnvelopes []envelope.Envelope) error

func (*BlackholeSink) Close

func (s *BlackholeSink) Close()

func (*BlackholeSink) DeliveryRequired

func (s *BlackholeSink) DeliveryRequired() bool

func (*BlackholeSink) Id

func (s *BlackholeSink) Id() *uuid.UUID

func (*BlackholeSink) Initialize

func (s *BlackholeSink) Initialize(conf config.Sink) error

func (*BlackholeSink) Name

func (s *BlackholeSink) Name() string

func (*BlackholeSink) Type

func (s *BlackholeSink) Type() string

type ClickhouseSink

type ClickhouseSink struct {
	// contains filtered or unexported fields
}

func (*ClickhouseSink) BatchPublishInvalid

func (s *ClickhouseSink) BatchPublishInvalid(ctx context.Context, envelopes []envelope.Envelope) error

func (*ClickhouseSink) BatchPublishValid

func (s *ClickhouseSink) BatchPublishValid(ctx context.Context, envelopes []envelope.Envelope) error

func (*ClickhouseSink) Close

func (s *ClickhouseSink) Close()

func (*ClickhouseSink) DeliveryRequired

func (s *ClickhouseSink) DeliveryRequired() bool

func (*ClickhouseSink) Id

func (s *ClickhouseSink) Id() *uuid.UUID

func (*ClickhouseSink) Initialize

func (s *ClickhouseSink) Initialize(conf config.Sink) error

func (*ClickhouseSink) Name

func (s *ClickhouseSink) Name() string

func (*ClickhouseSink) Type

func (s *ClickhouseSink) Type() string

type ElasticsearchSink

type ElasticsearchSink struct {
	// contains filtered or unexported fields
}

func (*ElasticsearchSink) BatchPublishInvalid

func (s *ElasticsearchSink) BatchPublishInvalid(ctx context.Context, invalidEnvelopes []envelope.Envelope) error

func (*ElasticsearchSink) BatchPublishValid

func (s *ElasticsearchSink) BatchPublishValid(ctx context.Context, validEnvelopes []envelope.Envelope) error

func (*ElasticsearchSink) Close

func (s *ElasticsearchSink) Close()

func (*ElasticsearchSink) DeliveryRequired

func (s *ElasticsearchSink) DeliveryRequired() bool

func (*ElasticsearchSink) Id

func (s *ElasticsearchSink) Id() *uuid.UUID

func (*ElasticsearchSink) Initialize

func (s *ElasticsearchSink) Initialize(conf config.Sink) error

func (*ElasticsearchSink) Name

func (s *ElasticsearchSink) Name() string

func (*ElasticsearchSink) Type

func (s *ElasticsearchSink) Type() string

type FileSink

type FileSink struct {
	// contains filtered or unexported fields
}

func (*FileSink) BatchPublishInvalid

func (s *FileSink) BatchPublishInvalid(ctx context.Context, envelopes []envelope.Envelope) error

func (*FileSink) BatchPublishValid

func (s *FileSink) BatchPublishValid(ctx context.Context, envelopes []envelope.Envelope) error

func (*FileSink) Close

func (s *FileSink) Close()

func (*FileSink) DeliveryRequired

func (s *FileSink) DeliveryRequired() bool

func (*FileSink) Id

func (s *FileSink) Id() *uuid.UUID

func (*FileSink) Initialize

func (s *FileSink) Initialize(conf config.Sink) error

func (*FileSink) Name

func (s *FileSink) Name() string

func (*FileSink) Type

func (s *FileSink) Type() string

type HttpSink

type HttpSink struct {
	// contains filtered or unexported fields
}

func (*HttpSink) BatchPublishInvalid

func (s *HttpSink) BatchPublishInvalid(ctx context.Context, invalidEnvelopes []envelope.Envelope) error

func (*HttpSink) BatchPublishValid

func (s *HttpSink) BatchPublishValid(ctx context.Context, validEnvelopes []envelope.Envelope) error

func (*HttpSink) Close

func (s *HttpSink) Close()

func (*HttpSink) DeliveryRequired

func (s *HttpSink) DeliveryRequired() bool

func (*HttpSink) Id

func (s *HttpSink) Id() *uuid.UUID

func (*HttpSink) Initialize

func (s *HttpSink) Initialize(conf config.Sink) error

func (*HttpSink) Name

func (s *HttpSink) Name() string

func (*HttpSink) Type

func (s *HttpSink) Type() string

type IndicativeSink

type IndicativeSink struct {
	// contains filtered or unexported fields
}

func (*IndicativeSink) BatchPublishInvalid

func (s *IndicativeSink) BatchPublishInvalid(ctx context.Context, envelopes []envelope.Envelope) error

func (*IndicativeSink) BatchPublishValid

func (s *IndicativeSink) BatchPublishValid(ctx context.Context, envelopes []envelope.Envelope) error

func (*IndicativeSink) Close

func (s *IndicativeSink) Close()

func (*IndicativeSink) DeliveryRequired

func (s *IndicativeSink) DeliveryRequired() bool

func (*IndicativeSink) Id

func (s *IndicativeSink) Id() *uuid.UUID

func (*IndicativeSink) Initialize

func (s *IndicativeSink) Initialize(conf config.Sink) error

func (*IndicativeSink) Name

func (s *IndicativeSink) Name() string

func (*IndicativeSink) Type

func (s *IndicativeSink) Type() string

type KafkaSink

type KafkaSink struct {
	// contains filtered or unexported fields
}

func (*KafkaSink) BatchPublishInvalid

func (s *KafkaSink) BatchPublishInvalid(ctx context.Context, envelopes []envelope.Envelope) error

func (*KafkaSink) BatchPublishValid

func (s *KafkaSink) BatchPublishValid(ctx context.Context, envelopes []envelope.Envelope) error

func (*KafkaSink) Close

func (s *KafkaSink) Close()

func (*KafkaSink) DeliveryRequired

func (s *KafkaSink) DeliveryRequired() bool

func (*KafkaSink) Id

func (s *KafkaSink) Id() *uuid.UUID

func (*KafkaSink) Initialize

func (s *KafkaSink) Initialize(conf config.Sink) error

func (*KafkaSink) Name

func (s *KafkaSink) Name() string

func (*KafkaSink) Type

func (s *KafkaSink) Type() string

type KinesisFirehoseSink

type KinesisFirehoseSink struct {
	// contains filtered or unexported fields
}

func (*KinesisFirehoseSink) BatchPublishInvalid

func (s *KinesisFirehoseSink) BatchPublishInvalid(ctx context.Context, envelopes []envelope.Envelope) error

func (*KinesisFirehoseSink) BatchPublishValid

func (s *KinesisFirehoseSink) BatchPublishValid(ctx context.Context, envelopes []envelope.Envelope) error

func (*KinesisFirehoseSink) Close

func (s *KinesisFirehoseSink) Close()

func (*KinesisFirehoseSink) DeliveryRequired

func (s *KinesisFirehoseSink) DeliveryRequired() bool

func (*KinesisFirehoseSink) Id

func (s *KinesisFirehoseSink) Id() *uuid.UUID

func (*KinesisFirehoseSink) Initialize

func (s *KinesisFirehoseSink) Initialize(conf config.Sink) error

func (*KinesisFirehoseSink) Name

func (s *KinesisFirehoseSink) Name() string

func (*KinesisFirehoseSink) Type

func (s *KinesisFirehoseSink) Type() string

type KinesisSink

type KinesisSink struct {
	// contains filtered or unexported fields
}

func (*KinesisSink) BatchPublishInvalid

func (s *KinesisSink) BatchPublishInvalid(ctx context.Context, envelopes []envelope.Envelope) error

func (*KinesisSink) BatchPublishValid

func (s *KinesisSink) BatchPublishValid(ctx context.Context, envelopes []envelope.Envelope) error

func (*KinesisSink) Close

func (s *KinesisSink) Close()

func (*KinesisSink) DeliveryRequired

func (s *KinesisSink) DeliveryRequired() bool

func (*KinesisSink) Id

func (s *KinesisSink) Id() *uuid.UUID

func (*KinesisSink) Initialize

func (s *KinesisSink) Initialize(conf config.Sink) error

func (*KinesisSink) Name

func (s *KinesisSink) Name() string

func (*KinesisSink) Type

func (s *KinesisSink) Type() string

type MaterializeSink

type MaterializeSink struct {
	// contains filtered or unexported fields
}

func (*MaterializeSink) BatchPublishInvalid

func (s *MaterializeSink) BatchPublishInvalid(ctx context.Context, envelopes []envelope.Envelope) error

func (*MaterializeSink) BatchPublishValid

func (s *MaterializeSink) BatchPublishValid(ctx context.Context, envelopes []envelope.Envelope) error

func (*MaterializeSink) Close

func (s *MaterializeSink) Close()

func (*MaterializeSink) DeliveryRequired

func (s *MaterializeSink) DeliveryRequired() bool

func (*MaterializeSink) Id

func (s *MaterializeSink) Id() *uuid.UUID

func (*MaterializeSink) Initialize

func (s *MaterializeSink) Initialize(conf config.Sink) error

func (*MaterializeSink) Name

func (s *MaterializeSink) Name() string

func (*MaterializeSink) Type

func (s *MaterializeSink) Type() string

type MongodbSink

type MongodbSink struct {
	// contains filtered or unexported fields
}

func (*MongodbSink) BatchPublishInvalid

func (s *MongodbSink) BatchPublishInvalid(ctx context.Context, envelopes []envelope.Envelope) error

func (*MongodbSink) BatchPublishValid

func (s *MongodbSink) BatchPublishValid(ctx context.Context, envelopes []envelope.Envelope) error

func (*MongodbSink) Close

func (s *MongodbSink) Close()

func (*MongodbSink) DeliveryRequired

func (s *MongodbSink) DeliveryRequired() bool

func (*MongodbSink) Id

func (s *MongodbSink) Id() *uuid.UUID

func (*MongodbSink) Initialize

func (s *MongodbSink) Initialize(conf config.Sink) error

func (*MongodbSink) Name

func (s *MongodbSink) Name() string

func (*MongodbSink) Type

func (s *MongodbSink) Type() string

type MysqlSink

type MysqlSink struct {
	// contains filtered or unexported fields
}

func (*MysqlSink) BatchPublishInvalid

func (s *MysqlSink) BatchPublishInvalid(ctx context.Context, envelopes []envelope.Envelope) error

func (*MysqlSink) BatchPublishValid

func (s *MysqlSink) BatchPublishValid(ctx context.Context, envelopes []envelope.Envelope) error

func (*MysqlSink) Close

func (s *MysqlSink) Close()

func (*MysqlSink) DeliveryRequired

func (s *MysqlSink) DeliveryRequired() bool

func (*MysqlSink) Id

func (s *MysqlSink) Id() *uuid.UUID

func (*MysqlSink) Initialize

func (s *MysqlSink) Initialize(conf config.Sink) error

func (*MysqlSink) Name

func (s *MysqlSink) Name() string

func (*MysqlSink) Type

func (s *MysqlSink) Type() string

type NatsJetstreamSink

type NatsJetstreamSink struct {
	// contains filtered or unexported fields
}

func (*NatsJetstreamSink) BatchPublishInvalid

func (s *NatsJetstreamSink) BatchPublishInvalid(ctx context.Context, envelopes []envelope.Envelope) error

func (*NatsJetstreamSink) BatchPublishValid

func (s *NatsJetstreamSink) BatchPublishValid(ctx context.Context, envelopes []envelope.Envelope) error

func (*NatsJetstreamSink) Close

func (s *NatsJetstreamSink) Close()

func (*NatsJetstreamSink) DeliveryRequired

func (s *NatsJetstreamSink) DeliveryRequired() bool

func (*NatsJetstreamSink) Id

func (s *NatsJetstreamSink) Id() *uuid.UUID

func (*NatsJetstreamSink) Initialize

func (s *NatsJetstreamSink) Initialize(conf config.Sink) error

func (*NatsJetstreamSink) Name

func (s *NatsJetstreamSink) Name() string

func (*NatsJetstreamSink) Type

func (s *NatsJetstreamSink) Type() string

type NatsSink

type NatsSink struct {
	// contains filtered or unexported fields
}

func (*NatsSink) BatchPublishInvalid

func (s *NatsSink) BatchPublishInvalid(ctx context.Context, envelopes []envelope.Envelope) error

func (*NatsSink) BatchPublishValid

func (s *NatsSink) BatchPublishValid(ctx context.Context, envelopes []envelope.Envelope) error

func (*NatsSink) Close

func (s *NatsSink) Close()

func (*NatsSink) DeliveryRequired

func (s *NatsSink) DeliveryRequired() bool

func (*NatsSink) Id

func (s *NatsSink) Id() *uuid.UUID

func (*NatsSink) Initialize

func (s *NatsSink) Initialize(conf config.Sink) error

func (*NatsSink) Name

func (s *NatsSink) Name() string

func (*NatsSink) Type

func (s *NatsSink) Type() string

type PostgresSink

type PostgresSink struct {
	// contains filtered or unexported fields
}

func (*PostgresSink) BatchPublishInvalid

func (s *PostgresSink) BatchPublishInvalid(ctx context.Context, envelopes []envelope.Envelope) error

func (*PostgresSink) BatchPublishValid

func (s *PostgresSink) BatchPublishValid(ctx context.Context, envelopes []envelope.Envelope) error

func (*PostgresSink) Close

func (s *PostgresSink) Close()

func (*PostgresSink) DeliveryRequired

func (s *PostgresSink) DeliveryRequired() bool

func (*PostgresSink) Id

func (s *PostgresSink) Id() *uuid.UUID

func (*PostgresSink) Initialize

func (s *PostgresSink) Initialize(conf config.Sink) error

func (*PostgresSink) Name

func (s *PostgresSink) Name() string

func (*PostgresSink) Type

func (s *PostgresSink) Type() string

type PubnubSink

type PubnubSink struct {
	// contains filtered or unexported fields
}

func (*PubnubSink) BatchPublishInvalid

func (s *PubnubSink) BatchPublishInvalid(ctx context.Context, envelopes []envelope.Envelope) error

func (*PubnubSink) BatchPublishValid

func (s *PubnubSink) BatchPublishValid(ctx context.Context, envelopes []envelope.Envelope) error

func (*PubnubSink) Close

func (s *PubnubSink) Close()

func (*PubnubSink) DeliveryRequired

func (s *PubnubSink) DeliveryRequired() bool

func (*PubnubSink) Id

func (s *PubnubSink) Id() *uuid.UUID

func (*PubnubSink) Initialize

func (s *PubnubSink) Initialize(conf config.Sink) error

func (*PubnubSink) Name

func (s *PubnubSink) Name() string

func (*PubnubSink) Type

func (s *PubnubSink) Type() string

type PubsubSink

type PubsubSink struct {
	// contains filtered or unexported fields
}

func (*PubsubSink) BatchPublishInvalid

func (s *PubsubSink) BatchPublishInvalid(ctx context.Context, envelopes []envelope.Envelope) error

func (*PubsubSink) BatchPublishValid

func (s *PubsubSink) BatchPublishValid(ctx context.Context, envelopes []envelope.Envelope) error

func (*PubsubSink) Close

func (s *PubsubSink) Close()

func (*PubsubSink) DeliveryRequired

func (s *PubsubSink) DeliveryRequired() bool

func (*PubsubSink) Id

func (s *PubsubSink) Id() *uuid.UUID

func (*PubsubSink) Initialize

func (s *PubsubSink) Initialize(conf config.Sink) error

func (*PubsubSink) Name

func (s *PubsubSink) Name() string

func (*PubsubSink) Type

func (s *PubsubSink) Type() string

type Sink

type Sink interface {
	Id() *uuid.UUID
	Name() string
	Type() string
	DeliveryRequired() bool
	Initialize(conf config.Sink) error
	BatchPublishValid(ctx context.Context, envelopes []envelope.Envelope) error
	BatchPublishInvalid(ctx context.Context, envelopes []envelope.Envelope) error
	Close()
}

func BuildAndInitializeSinks

func BuildAndInitializeSinks(conf []config.Sink) ([]Sink, error)

func BuildSink

func BuildSink(conf config.Sink) (sink Sink, err error)

type StdoutSink

type StdoutSink struct {
	// contains filtered or unexported fields
}

func (*StdoutSink) BatchPublishInvalid

func (s *StdoutSink) BatchPublishInvalid(ctx context.Context, invalidEnvelopes []envelope.Envelope) error

func (*StdoutSink) BatchPublishValid

func (s *StdoutSink) BatchPublishValid(ctx context.Context, validEnvelopes []envelope.Envelope) error

func (*StdoutSink) Close

func (s *StdoutSink) Close()

func (*StdoutSink) DeliveryRequired

func (s *StdoutSink) DeliveryRequired() bool

func (*StdoutSink) Id

func (s *StdoutSink) Id() *uuid.UUID

func (*StdoutSink) Initialize

func (s *StdoutSink) Initialize(conf config.Sink) error

func (*StdoutSink) Name

func (s *StdoutSink) Name() string

func (*StdoutSink) Type

func (s *StdoutSink) Type() string

type TimescaleSink

type TimescaleSink struct {
	// contains filtered or unexported fields
}

func (*TimescaleSink) BatchPublishInvalid

func (s *TimescaleSink) BatchPublishInvalid(ctx context.Context, envelopes []envelope.Envelope) error

func (*TimescaleSink) BatchPublishValid

func (s *TimescaleSink) BatchPublishValid(ctx context.Context, envelopes []envelope.Envelope) error

func (*TimescaleSink) Close

func (s *TimescaleSink) Close()

func (*TimescaleSink) DeliveryRequired

func (s *TimescaleSink) DeliveryRequired() bool

func (*TimescaleSink) Id

func (s *TimescaleSink) Id() *uuid.UUID

func (*TimescaleSink) Initialize

func (s *TimescaleSink) Initialize(conf config.Sink) error

func (*TimescaleSink) Name

func (s *TimescaleSink) Name() string

func (*TimescaleSink) Type

func (s *TimescaleSink) Type() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL