streamconsumergroup

package
v0.3.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2025 License: Apache-2.0 Imports: 16 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrShardNotFound = errors.New("Shard not found")
View Source
var ErrShardSequenceNumberAttributeNotFound = errors.New("Shard sequenceNumber attribute")

Functions

This section is empty.

Types

type Claim

type Claim interface {
	GetStreamPath() string
	GetShardID() int
	GetCurrentLocation() string
	GetRecordBatchChan() <-chan *RecordBatch
	// contains filtered or unexported methods
}

type Config

type Config struct {
	Session struct {
		Timeout           time.Duration `json:"timeout,omitempty"`
		HeartbeatInterval time.Duration
	} `json:"session,omitempty"`
	State struct {
		ModifyRetry struct {
			Attempts int            `json:"attempts,omitempty"`
			Backoff  common.Backoff `json:"backoff,omitempty"`
		} `json:"modifyRetry,omitempty"`
	} `json:"state,omitempty"`
	SequenceNumber struct {
		CommitInterval    time.Duration `json:"commitInterval,omitempty"`
		ShardWaitInterval time.Duration `json:"shardWaitInterval,omitempty"`
	}
	Claim struct {
		RecordBatchChanSize int `json:"recordBatchChanSize,omitempty"`
		RecordBatchFetch    struct {
			Interval          time.Duration           `json:"interval,omitempty"`
			NumRecordsInBatch int                     `json:"numRecordsInBatch,omitempty"`
			InitialLocation   v3io.SeekShardInputType `json:"initialLocation,omitempty"`
		} `json:"recordBatchFetch,omitempty"`
		GetShardLocationRetry struct {
			Attempts int            `json:"attempts,omitempty"`
			Backoff  common.Backoff `json:"backoff,omitempty"`
		} `json:"getShardLocationRetry,omitempty"`
	} `json:"claim,omitempty"`
}

func NewConfig

func NewConfig() *Config

NewConfig returns a new configuration instance with sane defaults.

type Handler

type Handler interface {

	// Setup is run at the beginning of a new session, before ConsumeClaim.
	Setup(Session) error

	// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
	// but before the locations are committed for the very last time.
	Cleanup(Session) error

	// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
	// Once the Messages() channel is closed, the Handler must finish its processing
	// loop and exit.
	ConsumeClaim(Session, Claim) error

	// Abort signals the handler to start abort procedure
	Abort(Session) error
}

type Member

type Member interface {
	Consume(Handler) error
	Close() error
	Start() error
	GetID() string
	GetRetainShardFlag() bool
	GetShardsToRetain() []int
}

func NewMember

func NewMember(streamConsumerGroupInterface StreamConsumerGroup, name string) (Member, error)

type RecordBatch

type RecordBatch struct {
	Records      []v3io.StreamRecord
	Location     string
	NextLocation string
	ShardID      int
}

type Session

type Session interface {
	GetClaims() []Claim
	GetMemberID() string
	MarkRecord(*v3io.StreamRecord) error
	// contains filtered or unexported methods
}

type SessionState

type SessionState struct {
	MemberID      string    `json:"member_id"`
	LastHeartbeat time.Time `json:"last_heartbeat_time"`
	Shards        []int     `json:"shards"`
}

type State

type State struct {
	SchemasVersion string          `json:"schema_version"`
	SessionStates  []*SessionState `json:"session_states"`
}

func (*State) String

func (s *State) String() string

type StreamConsumerGroup

type StreamConsumerGroup interface {
	GetState() (*State, error)
	GetShardSequenceNumber(int) (uint64, error)
	GetNumShards() (int, error)
}

func NewStreamConsumerGroup

func NewStreamConsumerGroup(parentLogger logger.Logger,
	name string,
	config *Config,
	container v3io.Container,
	streamPath string,
	maxReplicas int) (StreamConsumerGroup, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL