source

package
v0.0.0-...-161b3a9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2023 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SourceFactory

func SourceFactory(job *model.Job) model.MessageSource

Types

type KafkaMessageSource

type KafkaMessageSource struct {
	// contains filtered or unexported fields
}

func NewKafkaMessageSource

func NewKafkaMessageSource(ctx context.Context, config aws.Config, topicName string, brokers []string, groupId string) *KafkaMessageSource

func (*KafkaMessageSource) Fetch

func (k *KafkaMessageSource) Fetch(processor model.MessageProcessor)

Fetch FIXME: fetch runs forever Idea: create a kafka.client to list topic offset and check if message with offset was read See: - https://github.com/segmentio/kafka-go/blob/4da3b721ca38db775a5024089cdc4ba14f84e698/client_test.go#L85 - https://github.com/segmentio/kafka-go/blob/4da3b721ca38db775a5024089cdc4ba14f84e698/listoffset.go#L79

type S3ObjectGetter

type S3ObjectGetter interface {
	ListObjectsV2(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error)
	GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
}

type S3Source

type S3Source struct {
	Context    context.Context
	S3Client   S3ObjectGetter
	BucketName string
	TopicName  string
}

func NewS3Source

func NewS3Source(ctx context.Context, config aws.Config, bucketName string, topicName string) *S3Source

func (*S3Source) CreateMessage

func (s *S3Source) CreateMessage(fileName string, content []byte) kafka.Message

func (*S3Source) Fetch

func (s *S3Source) Fetch(processor model.MessageProcessor)

func (*S3Source) GetFileContent

func (s *S3Source) GetFileContent(fileName string) []byte

func (*S3Source) ListFiles

func (s *S3Source) ListFiles(continuationToken *string) []string

ListFiles Note: continuationToken is used for recursion. Set it to `nil` for the first call

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL