sink

package
v0.0.0-...-161b3a9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2023 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SinkFactory

func SinkFactory(job *model.Job) model.MessageProcessor

Types

type KafkaMessageSink

type KafkaMessageSink struct {
	// contains filtered or unexported fields
}

func NewKafkaMessageSink

func NewKafkaMessageSink(context context.Context, config aws.Config, topicName string, brokers []string) *KafkaMessageSink

func (*KafkaMessageSink) Process

func (k *KafkaMessageSink) Process(message kafka.Message) error

type MessagePrinter

type MessagePrinter struct{}

func (*MessagePrinter) Process

func (p *MessagePrinter) Process(message kafka.Message) error

type S3MessageSink

type S3MessageSink struct {
	Context    context.Context
	S3Client   S3ObjectPutter
	BucketName string
}

func NewS3MessageSink

func NewS3MessageSink(ctx context.Context, config aws.Config, bucketName string) *S3MessageSink

func (*S3MessageSink) Process

func (s *S3MessageSink) Process(message kafka.Message) error

func (*S3MessageSink) Upload

func (s *S3MessageSink) Upload(message kafka.Message) error

type S3ObjectPutter

type S3ObjectPutter interface {
	PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL