Documentation ¶
Index ¶
- type InsertHook
- type Option
- func WithClientID(clientID string) Option
- func WithConsumerSaramaBuilder(cgb goka.SaramaConsumerBuilder) Option
- func WithInitialLoad(loadFromPast time.Duration) Option
- func WithInsertHook(hook InsertHook) Option
- func WithPrimaryKey(fieldName string) Option
- func WithRetention(retention time.Duration, updatedFieldName string) Option
- func WithTester(tt *tester.Tester) Option
- func WithTopicManagerBuilder(tmb goka.TopicManagerBuilder) Option
- type Telly
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type InsertHook ¶
type InsertHook func(key []byte, value interface{}) interface{}
type Option ¶
type Option func(*options)
Option defines an option to modify the behavior of telly
func WithClientID ¶
WithClientID overwrites the default client ID to create consumers for sarama
func WithConsumerSaramaBuilder ¶
func WithConsumerSaramaBuilder(cgb goka.SaramaConsumerBuilder) Option
WithConsumerSaramaBuilder replaces the default consumer group builder
func WithInitialLoad ¶
WithInitialLoad adds loading old data from the topic initially into rethinkdb. if loadFromPast == -1, it will load from the beginning. if loadFromPast == 0, loading will start from the end of the topic.
func WithInsertHook ¶
func WithInsertHook(hook InsertHook) Option
WithInsertHook adds a hook that gets called on every new message added to the database
func WithPrimaryKey ¶
WithPrimaryKey specifies the field-name that is being used as primary key when creating the table. If the table exists and the key is different, Telly will return an error, so changing the key is impossible without deleting the table first. By default, field name 'id' is used by rethinkdb. If your data type does not provide this field and the primary key is not set, rethinkdb will create a new id, thus making it impossible to overwrite entries. Note that we cannot use the message-key provided by Kafka. If you need to use the key, add an InsertHook which returns a new data structure containing the key as needed.
func WithRetention ¶
WithRetention runs a cleaner go-routine that cleans entries, oder than passed retention updatedFieldName specifies the rethinkdb-field (for nesting, do 'nested.field.timestamp') Note: telly does not check, if this column exists or if it actually contains a valid timestamp. It blindly deletes every row which has a value "older" than the one provided
func WithTopicManagerBuilder ¶
func WithTopicManagerBuilder(tmb goka.TopicManagerBuilder) Option
WithTopicManagerBuilder replaces the default topic manager.
type Telly ¶
type Telly struct {
// contains filtered or unexported fields
}
Telly imports from kafka to rethinkdb
func NewTelly ¶
func NewTelly(ctx context.Context, executor rdb.QueryExecutor, dbName string, table string, inputTopic string, codec goka.Codec, options ...Option) (*Telly, error)
NewTelly creates a new telly importer
func (*Telly) DropOffsets ¶
DropOffsets deletes the offsets document to make telly start from scratch
func (*Telly) Executor ¶
func (t *Telly) Executor() rdb.QueryExecutor