gstream

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2022 License: MIT Imports: 9 Imported by: 0

README

GStream

Go Report Card Godoc

Kafka Streams DSL liked, Stream Processing Library abstracting pipelines pattern using generic.

Remaining Tasks

  • api
    • Windowing
    • Table-Table join
  • stores
    • BoltDB for read intensive
      • ⚠️ unstable
    • Pebble for write intensive

Getting Started

GStream State Machine

The GStream has similar DSL of Kafka Streams. The relationship between Stream and Table is exactly the same as Kafka Streams. One difference is that GStream has stream without a key. KeyValueGStream has same meaning as KStream of Kafka Streams

For map operations with side effects, it branches into a success stream and a failure stream. The success stream is stream for the result of the Map operation being processed. The failure stream is stream for arguments and errors of the failed operation.

graph TD
    A(GStream) -->|'SelectKey| B(KeyValueGStream)
    A -->|Filter, ''Map, Merge, Pipe| A
    B -->|Filter, ''Map, Merge, Pipe, Join, GroupBy| B
    B -->|ToValueStream| A
    B -->|ToTable, 'Aggregate, 'Count| C(GTable)
    C -->|ToStream| B
    C -->|ToValueStream| A
    A -->|''MapErr| E(FailedGStream)
    E -->|Filter| E
    E -->|ToStream| A
    B -->|''MapErr| F(FailedKeyValueGStream)
    F -->|Filter| F
    F -->|ToStream| B
    subgraph GStream
    A
    subgraph SideEffect
    E
    end
    end
    subgraph KeyValueGStream
    B
    subgraph SideEffect'
    F
    end
    end
    subgraph GTable
    C
    end
Stateless Example

Let's assume a situation that processing tweet stream for emotion analysis.

  1. Tweets are mixed in Korean and English, so Korean tweets should be translated into English.
  2. Tweets should be enriched with a sentiment score
  3. Enriched data are printed and send to output channel

The stream graph is as follows

stateDiagram-v2
    Tweets --> English : filter
    Tweets --> Korean : filter
    Korean --> Translate : map
    English --> Merge : merge
    Translate --> Merge
    Merge --> Sentiment : flatMap
    Sentiment --> Printed : foreach
    Sentiment --> outputChannel : to

First, define some structs to be used in stream

type Tweet struct {
	ID   int
	Lang string
	Text string
}

type Sentiment struct {
	ID    int
	Text  string
	Score float64
}

Then, create source stream through input channel.

tweetCh := make(chan Tweet)
// emit Tweet to tweetCh

builder := gstream.NewBuilder()
tweets := gstream.Stream[Tweet](builder).From(tweetCh)

Then, build stream according to the previous graph

// branch into english
english := tweets.Filter(func(t Tweet) bool { 
	return t.Lang == "en"
})

// brnach into korean and translate
translate := tweets.Filter(func(t Tweet) bool {
    return t.Lang == "kr"
}).Map(func(ctx context.Context, t Tweet) Tweet {
	// translate t.Text to English
	return translated
})

// merge english and translate branch
merged := english.Merge(translate)

// enrich tweet
sentiment := gstream.FlatMap(merged, func(ctx context.Context, t Tweet) []Sentiment {
	// calculate sentiment score of t.Text and enrich tweet
	return enriched
})

// print sentiment
sentiment.Foreach(func(_ context.Context, s Sentiment) {
	fmt.Println(s)
})
// get chan of sentiment
outputCh := sentiment.To()

finally, start stream to processing tweets.

builder.BuildAndStart(ctx)

The stream terminates when all input channels are closed or context is cancelled. BuildAndStart func will be blocked until the stream ends.

Stateful Example

Let's assume a situation that processing video game leaderboard.

  1. ScoreEvent should be enriched with player.
  2. Score with player records are keyed by playerID. They should be grouped by productID for join with product.
  3. Score with player should be enriched with product.
  4. Calculate the top three high scores for each game through aggregating enriched records.

The stream graph is as follows

stateDiagram-v2
    Scores(Stream) --> WithPlayers : join
    Players(Table) --> WithPlayers
    WithPlayers --> groupByProduct : groupBy
    groupByProduct --> WithProducts : join
    products(Table) --> WithProducts
    WithProducts --> HighScores : aggregate
    HighScores --> Printed : ToValueStream, Foreach

Define Player, Product, ScoreEvent structs and implement HighScores that calculating the top three high scores.

type Player struct {
	ID   int
	Name string
}

type Product struct {
	ID   int
	Name string
}

type ScoreEvent struct {
	PlayerID  int
	ProductID int
	Score     float64
}

type ScoreWithPlayer struct {
	Player Player
	Score  ScoreEvent
}

type Enriched struct {
	PlayerID    int
	PlayerName  string
	ProductID   int
	ProductName string
	Score       float64
}

type HighScores struct {
	highScores []Enriched
}

func (s *HighScores) Add(e Enriched) {
	added := append(s.highScores, e)
	sort.SliceStable(added, func(i, j int) bool {
		return added[i].Score > added[j].Score
	})
	if len(added) > 3 {
		added = added[:3]
	}
	s.highScores = added
}

Build stream according to the previous graph.

b := NewBuilder()

players := Table[int, Player](b).From(
	playerCh,
	func(p Player) int { return p.ID }, // keySelector
	state.NewOptions[int, Player](), // using default state options(in-memory store)
)
products := Table[int, Product](b).From(
	productCh,
	func(p Product) int { return p.ID },
	state.NewOptions[int, Product](),
)
scores := SelectKey[int, ScoreEvent](
	Stream[ScoreEvent](b).From(scoreCh),
	func(s ScoreEvent) int { return s.PlayerID },
)
	
// ScoreEvent join with player by playerID
withPlayers := JoinStreamTable(
	scores, // left stream
	players, // right table
	func(id int, score ScoreEvent, player Player) ScoreWithPlayer { // joiner
		return ScoreWithPlayer{
			Player: player,
			Score:  score,
		}
	},
)

// Group by productID
groupByProduct := GroupBy(
	withPlayers,
	func(playerID int, withPlayer ScoreWithPlayer) int { // key mapper
		return withPlayer.Score.ProductID
	},
)

// withPlayer join with product by productID
withProducts := JoinStreamTable(
	groupByProduct,
	products,
	func(playerID int, withPlayer ScoreWithPlayer, product Product) Enriched {
		return Enriched{
			PlayerID:    withPlayer.Player.ID,
			PlayerName:  withPlayer.Player.Name,
			ProductID:   product.ID,
			ProductName: product.Name,
			Score:       withPlayer.Score.Score,
		}
	},
)

// Perform the aggregation
Aggregate[int, Enriched, *HighScores](
	withProducts,
	func() *HighScores { // initializer
		return &HighScores{highScores: []Enriched{}}
	},
	func(kv KeyValue[int, Enriched], aggregate *HighScores) *HighScores { // aggregater
		aggregate.Add(kv.Value)
		return aggregate
	},
	state.NewOptions[int, *HighScores](), // state options
).
	ToValueStream().
	Foreach(func(_ context.Context, hs *HighScores) {
		fmt.Println(hs.highScores)
	})
State Options

The operations that create GTable receive state.Options as parameter.

func (tb *tableBuilder[K, V]) From(pipe <-chan V, selectKey func(V) K, sopt state.Options[K, V], opts ...source.Option) GTable[K, V]

type KeyValueGStream[K, V any] interface {
	// ...
    ToTable(state.Options[K, V]) GTable[K, V]
	// ...
}

func Aggregate[K, V, VR any](kvs KeyValueGStream[K, V], initializer func() VR, aggregator func(KeyValue[K, V], VR) VR, sopt state.Options[K, VR]) GTable[K, VR]

func Count[K, V any](kvs KeyValueGStream[K, V], sopt state.Options[K, int]) GTable[K, int]

The state options are as follows

// constructor using option pattern.
func NewOptions[K, V any](opts ...Option[K, V]) Options[K, V]

// set custom key serializer/deserializer.
// default is json serde.
func WithKeySerde[K, V any](keySerde Serde[K]) Option[K, V]
// set custom value serializer/deserializer.
// default is json serde.
func WithValueSerde[K, V any](valueSerde Serde[V]) Option[K, V]
// use in-memory store.
// default store type.
func WithInMemory[K, V any]() Option[K, V]
// use boltDB persistent store.
func WithBoltDB[K, V any](name string) Option[K, V]
Backpressure

By default, a pipeline with single source stream processes records with single goroutine. If , we can specify the number of concurrent worker of pipeline to handle backpressure.

gstream.Stream[string](ch, source.WithWorkerPool(3))
gstream.Table[int, string](ch, keySelector, stateOpt, source.WithWorkerPool(3))

stream.Pipe(pipe.WithWorkerPool(3))
stream.Merge(otherStream, pipe.WithWorkerPool(3))

Since the capacity of pipe channel is 0, it is blocking when consumer not ready to consume. we can configure the capacity of the pipe or sink channel.

Additionally, we can set a timeout of sink output channel for the drop.

stream.Pipe(
	pipe.WithWorkerPool(3),
	pipe.WithBufferedChan(100), 
)

stream.To(
	sink.WithBufferedChan(100),
	sink.WithTimeout(time.Second * 2)
)

Documentation

Overview

Package gstream is stream processing library abstracting pipelines pattern using generic.

Example (Stateful)

ExampleStateful demonstrates a stateful processing using table, join, aggregate.

package main

import (
	"context"
	"fmt"
	"github.com/KumKeeHyun/gstream/state"
	"sort"
)

type Player struct {
	ID   int
	Name string
}

type Product struct {
	ID   int
	Name string
}

type ScoreEvent struct {
	PlayerID  int
	ProductID int
	Score     float64
}

type ScoreWithPlayer struct {
	Player Player
	Score  ScoreEvent
}

type Enriched struct {
	PlayerID    int
	PlayerName  string
	ProductID   int
	ProductName string
	Score       float64
}

type HighScores struct {
	highScores []Enriched
}

func (s *HighScores) Add(e Enriched) {
	added := append(s.highScores, e)
	sort.SliceStable(added, func(i, j int) bool {
		return added[i].Score > added[j].Score
	})
	if len(added) > 3 {
		added = added[:3]
	}
	s.highScores = added
}

// ExampleStateful demonstrates a stateful processing using table, join, aggregate.
func main() {
	playerCh := make(chan Player)
	productCh := make(chan Product)
	scoreCh := make(chan ScoreEvent)

	// Create producer emitting players, products, scores
	go func() {
		defer func() {
			close(playerCh)
			close(productCh)
			close(scoreCh)
		}()

		for i := 1; i <= 3; i++ {
			playerCh <- Player{
				ID:   i,
				Name: fmt.Sprintf("player-%d", i),
			}
		}
		for i := 1; i <= 2; i++ {
			productCh <- Product{
				ID:   i,
				Name: fmt.Sprintf("product-%d", i),
			}
		}

		scoreCh <- ScoreEvent{
			PlayerID:  1,
			ProductID: 1,
			Score:     0.6,
		}
		scoreCh <- ScoreEvent{
			PlayerID:  2,
			ProductID: 1,
			Score:     0.5,
		}
		scoreCh <- ScoreEvent{
			PlayerID:  3,
			ProductID: 1,
			Score:     0.7,
		}
		scoreCh <- ScoreEvent{
			PlayerID:  2,
			ProductID: 2,
			Score:     0.8,
		}
		scoreCh <- ScoreEvent{
			PlayerID:  1,
			ProductID: 1,
			Score:     0.8,
		}
		scoreCh <- ScoreEvent{
			PlayerID:  3,
			ProductID: 2,
			Score:     0.4,
		}
		scoreCh <- ScoreEvent{
			PlayerID:  2,
			ProductID: 1,
			Score:     0.9,
		}
	}()

	b := NewBuilder()

	players := Table[int, Player](b).From(
		playerCh,
		func(p Player) int { return p.ID },
		state.NewOptions[int, Player](),
	)
	products := Table[int, Product](b).From(
		productCh,
		func(p Product) int { return p.ID },
		state.NewOptions[int, Product](),
	)
	scores := SelectKey[int, ScoreEvent](
		Stream[ScoreEvent](b).From(scoreCh),
		func(s ScoreEvent) int { return s.PlayerID },
	)

	// ScoreEvent join with player by playerID
	withPlayers := JoinStreamTable(
		scores,
		players,
		func(id int, score ScoreEvent, player Player) ScoreWithPlayer {
			return ScoreWithPlayer{
				Player: player,
				Score:  score,
			}
		},
	)
	// Group by productID
	groupByProduct := GroupBy(
		withPlayers,
		func(playerID int, withPlayer ScoreWithPlayer) int {
			return withPlayer.Score.ProductID
		},
	)
	// withPlayer join with product by productID
	withProducts := JoinStreamTable(
		groupByProduct,
		products,
		func(playerID int, withPlayer ScoreWithPlayer, product Product) Enriched {
			return Enriched{
				PlayerID:    withPlayer.Player.ID,
				PlayerName:  withPlayer.Player.Name,
				ProductID:   product.ID,
				ProductName: product.Name,
				Score:       withPlayer.Score.Score,
			}
		},
	)

	// Perform the aggregation
	Aggregate[int, Enriched, *HighScores](
		withProducts,
		func() *HighScores {
			return &HighScores{highScores: []Enriched{}}
		},
		func(kv KeyValue[int, Enriched], aggregate *HighScores) *HighScores {
			aggregate.Add(kv.Value)
			return aggregate
		},
		state.NewOptions[int, *HighScores](),
	).
		ToValueStream().
		Foreach(func(_ context.Context, hs *HighScores) {
			fmt.Println(hs.highScores)
		})

	b.BuildAndStart(context.Background())

}
Output:

[{1 player-1 1 product-1 0.6}]
[{1 player-1 1 product-1 0.6} {2 player-2 1 product-1 0.5}]
[{3 player-3 1 product-1 0.7} {1 player-1 1 product-1 0.6} {2 player-2 1 product-1 0.5}]
[{2 player-2 2 product-2 0.8}]
[{1 player-1 1 product-1 0.8} {3 player-3 1 product-1 0.7} {1 player-1 1 product-1 0.6}]
[{2 player-2 2 product-2 0.8} {3 player-3 2 product-2 0.4}]
[{2 player-2 1 product-1 0.9} {1 player-1 1 product-1 0.8} {3 player-3 1 product-1 0.7}]
Example (Stateless)

ExampleStateful demonstrates a stateless processing using stream, filter, map, merge

package main

import (
	"context"
	"fmt"
)

type Tweet struct {
	ID   int
	Lang string
	Text string
}

type Sentiment struct {
	ID    int
	Text  string
	Score float64
}

// ExampleStateful demonstrates a stateless processing using stream, filter, map, merge
func main() {
	tweetCh := make(chan Tweet)

	// Create producer emitting tweets
	go func() {
		defer close(tweetCh)
		for i := 0; i < 3; i++ {
			tweetCh <- Tweet{
				ID:   i,
				Lang: "en",
				Text: fmt.Sprintf("some text %d", i),
			}
			tweetCh <- Tweet{
				ID:   i + 10,
				Lang: "kr",
				Text: fmt.Sprintf("썸 텍스트 %d", i),
			}
		}
	}()

	b := NewBuilder()
	tweets := Stream[Tweet](b).From(tweetCh)

	// Branch into english
	english := tweets.Filter(func(t Tweet) bool {
		return t.Lang == "en"
	})

	// Branch into korean and translate
	translate := tweets.Filter(func(t Tweet) bool {
		return t.Lang == "kr"
	}).Map(func(ctx context.Context, t Tweet) Tweet {
		// Translate t.Text to English
		return Tweet{
			ID:   t.ID,
			Lang: "en",
			Text: fmt.Sprintf("translated text %d", t.ID),
		}
	})

	// Merge english and translate branch
	merged := english.Merge(translate)

	// Enrich tweet
	sentiment := FlatMap(merged, func(ctx context.Context, t Tweet) []Sentiment {
		// Calculate sentiment score of t.Text and enrich tweet
		return []Sentiment{
			{
				ID:    t.ID,
				Text:  t.Text,
				Score: 0.5,
			},
		}
	})

	// Print sentiment
	sentiment.Foreach(func(_ context.Context, s Sentiment) {
		fmt.Println(s)
	})

	b.BuildAndStart(context.Background())

}
Output:

{0 some text 0 0.5}
{10 translated text 10 0.5}
{1 some text 1 0.5}
{11 translated text 11 0.5}
{2 some text 2 0.5}
{12 translated text 12 0.5}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func FlatMapErr

func FlatMapErr[T, TR any](s GStream[T], flatMapper func(context.Context, T) ([]TR, error)) (ss GStream[TR], fs FailedGStream[T])

FlatMapErr transform each record into zero or more records with side effects.

func JoinStreamTableErr

func JoinStreamTableErr[K, V, VO, VR any](s KeyValueGStream[K, V], t GTable[K, VO], joiner func(K, V, VO) (VR, error)) (rs KeyValueGStream[K, VR], fs FailedKeyValueGStream[K, V])

JoinStreamTableErr join records of stream with table's records using inner join with side effects.

func KVFlatMapErr

func KVFlatMapErr[K, V, KR, VR any](kvs KeyValueGStream[K, V], flatMapper func(context.Context, KeyValue[K, V]) ([]KeyValue[KR, VR], error)) (KeyValueGStream[KR, VR], FailedKeyValueGStream[K, V])

KVFlatMapErr transform each record into zero or more records with side effects. KeyValue version of FlatMapErr.

func KVFlatMapValuesErr

func KVFlatMapValuesErr[K, V, VR any](kvs KeyValueGStream[K, V], flatMapper func(context.Context, V) ([]VR, error)) (KeyValueGStream[K, VR], FailedKeyValueGStream[K, V])

KVFlatMapValuesErr transform the value of each record into zero or more values with side effects.

func KVMapErr

func KVMapErr[K, V, KR, VR any](kvs KeyValueGStream[K, V], mapper func(context.Context, KeyValue[K, V]) (KeyValue[KR, VR], error)) (KeyValueGStream[KR, VR], FailedKeyValueGStream[K, V])

KVMapErr transform each record into new record with side effects. KeyValue version of MapErr.

func KVMapValuesErr

func KVMapValuesErr[K, V, VR any](kvs KeyValueGStream[K, V], mapper func(context.Context, V) (VR, error)) (KeyValueGStream[K, VR], FailedKeyValueGStream[K, V])

KVMapValuesErr transform the value of each record into new value of record with side effects.

func MapErr

func MapErr[T, TR any](s GStream[T], mapper func(context.Context, T) (TR, error)) (ss GStream[TR], fs FailedGStream[T])

MapErr transform each record into new record with side effects.

The first return value is a stream for a normally mapped value. The second return value is a stream for a record and an error value that failed to be mapped.

func NewBuilder

func NewBuilder() *builder

NewBuilder create new builder. builder provides an entry point for the DSL.

b := gstream.NewBuilder()
gstream.Stream[int](b)
gstream.Table[int, string](b)

func Stream

func Stream[T any](b *builder) *streamBuilder[T]

Stream is generics facilititators for create source stream.

func Table

func Table[K, V any](b *builder) *tableBuilder[K, V]

Table is generics facilititators for create source table.

Types

type Change

type Change[T any] struct {
	OldValue, NewValue T
}

func NewChange

func NewChange[T any](ov, nv T) Change[T]

type Closer

type Closer interface {
	Close() error
}

type Fail

type Fail[T any] struct {
	Arg T
	Err error
}

func NewFail

func NewFail[T any](v T, err error) Fail[T]

type FailedGStream

type FailedGStream[T any] interface {
	Filter(func(T, error) bool) FailedGStream[T]
	Foreach(func(context.Context, T, error))
	ToStream() GStream[T]
}

type FailedKeyValueGStream

type FailedKeyValueGStream[K, V any] interface {
	Filter(func(KeyValue[K, V], error) bool) FailedKeyValueGStream[K, V]
	Foreach(func(context.Context, KeyValue[K, V], error))
	ToStream() KeyValueGStream[K, V]
}

type GStream

type GStream[T any] interface {
	Filter(func(T) bool) GStream[T]
	Foreach(func(context.Context, T))
	Map(func(context.Context, T) T) GStream[T]
	MapErr(func(context.Context, T) (T, error)) (GStream[T], FailedGStream[T])
	FlatMap(func(context.Context, T) []T) GStream[T]
	FlatMapErr(func(context.Context, T) ([]T, error)) (GStream[T], FailedGStream[T])
	// Merge merge two streams into one.
	// If two streams are in a different pipeline, a new pipeline is created.
	Merge(GStream[T], ...pipe.Option) GStream[T]
	// Pipe creates a new pipeline.
	// Downstream are processed in the new pipeline.
	Pipe(...pipe.Option) GStream[T]
	// To emits records to returned sink channel.
	To(...sink.Option) <-chan T
}

GStream is value stream interface for DSL. it can be converted to KeyValueGStream by SelectKey function.

func FlatMap

func FlatMap[T, TR any](s GStream[T], flatMapper func(context.Context, T) []TR) GStream[TR]

FlatMap transform each record into zero or more records.

func Map

func Map[T, TR any](s GStream[T], mapper func(context.Context, T) TR) GStream[TR]

Map transform each record into new record. The provided mapper must not have any side effects.

If you need to use a mapper with side effects, look MapErr.

type GTable

type GTable[K, V any] interface {
	// ToValueStream convert this table to GStream.
	// GTable[K, V] -> GStream[V]
	ToValueStream() GStream[V]
	// ToStream convert this table to KeyValueGStream.
	// GTable[K, V] -> KeyValueGStream[K, V]
	ToStream() KeyValueGStream[K, V]
}

GTable is table interface for DSL.

func Aggregate

func Aggregate[K, V, VR any](kvs KeyValueGStream[K, V], initializer func() VR, aggregator func(KeyValue[K, V], VR) VR, stateOpt state.Options[K, VR]) GTable[K, VR]

Aggregate aggregate records by the key.

func Count

func Count[K, V any](kvs KeyValueGStream[K, V], stateOpt state.Options[K, int]) GTable[K, int]

Count count the number of records by the key.

type KeyValue

type KeyValue[K, V any] struct {
	Key   K
	Value V
}

func NewKeyValue

func NewKeyValue[K, V any](k K, v V) KeyValue[K, V]

type KeyValueGStream

type KeyValueGStream[K, V any] interface {
	Filter(func(KeyValue[K, V]) bool) KeyValueGStream[K, V]
	Foreach(func(context.Context, KeyValue[K, V]))
	Map(func(context.Context, KeyValue[K, V]) KeyValue[K, V]) KeyValueGStream[K, V]
	MapErr(func(context.Context, KeyValue[K, V]) (KeyValue[K, V], error)) (KeyValueGStream[K, V], FailedKeyValueGStream[K, V])
	MapValues(func(context.Context, V) V) KeyValueGStream[K, V]
	MapValuesErr(func(context.Context, V) (V, error)) (KeyValueGStream[K, V], FailedKeyValueGStream[K, V])
	FlatMap(func(context.Context, KeyValue[K, V]) []KeyValue[K, V]) KeyValueGStream[K, V]
	FlatMapErr(func(context.Context, KeyValue[K, V]) ([]KeyValue[K, V], error)) (KeyValueGStream[K, V], FailedKeyValueGStream[K, V])
	FlatMapValues(func(context.Context, V) []V) KeyValueGStream[K, V]
	FlatMapValuesErr(func(context.Context, V) ([]V, error)) (KeyValueGStream[K, V], FailedKeyValueGStream[K, V])
	// Merge merge two streams into one.
	// If two streams are in a different pipeline, a new pipeline is created.
	Merge(KeyValueGStream[K, V], ...pipe.Option) KeyValueGStream[K, V]
	// Pipe creates a new pipeline.
	// Subgraph nodes are processed in the new pipeline.
	Pipe(...pipe.Option) KeyValueGStream[K, V]
	// To emits records to returned sink channel.
	To(...sink.Option) <-chan KeyValue[K, V]

	// ToValueStream convert this stream to a value stream.
	// KeyValueGStream[K, V] -> GStream[V]
	ToValueStream() GStream[V]
	// ToTable convert this stream to a table.
	ToTable(state.Options[K, V]) GTable[K, V]
}

KeyValueGStream is key-value stream interface for DSL. it can be converted to GStream by ToValueStream method or GTable by ToTable method.

func GroupBy

func GroupBy[K, V, KR any](kvs KeyValueGStream[K, V], keyMapper func(K, V) KR) KeyValueGStream[KR, V]

GroupBy group records on a new key.

func JoinStreamTable

func JoinStreamTable[K, V, VO, VR any](s KeyValueGStream[K, V], t GTable[K, VO], joiner func(K, V, VO) VR) KeyValueGStream[K, VR]

JoinStreamTable join records of stream with table's records using inner join.

func KVFlatMap

func KVFlatMap[K, V, KR, VR any](kvs KeyValueGStream[K, V], flatMapper func(context.Context, KeyValue[K, V]) []KeyValue[KR, VR]) KeyValueGStream[KR, VR]

KVFlatMap transform each record into zero or more records. KeyValue version of FlatMap.

func KVFlatMapValues

func KVFlatMapValues[K, V, VR any](kvs KeyValueGStream[K, V], flatMapper func(context.Context, V) []VR) KeyValueGStream[K, VR]

KVFlatMapValues transform the value of each record into zero or more values.

func KVMap

func KVMap[K, V, KR, VR any](kvs KeyValueGStream[K, V], mapper func(context.Context, KeyValue[K, V]) KeyValue[KR, VR]) KeyValueGStream[KR, VR]

KVMap transform each record into new record. KeyValue version of Map.

func KVMapValues

func KVMapValues[K, V, VR any](kvs KeyValueGStream[K, V], mapper func(context.Context, V) VR) KeyValueGStream[K, VR]

KVMapValues transform the value of each record into new value of record.

func SelectKey

func SelectKey[K, V any](s GStream[V], keySelecter func(V) K) KeyValueGStream[K, V]

SelectKey set a new key for each record in gstream.

type Processor

type Processor[T any] func(ctx context.Context, v T)

type ProcessorSupplier

type ProcessorSupplier[T, TR any] interface {
	Processor(forwards ...Processor[TR]) Processor[T]
}

Directories

Path Synopsis
examples
options

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL