observer

package
v0.11.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

README

Observer: Observing blocks and events for particular channel or peer

Main features:

  • Stream of channel blocks from peer
  • Stream of all channels blocks from peer
  • Auto reconnection when block or event stream interrupted

Every feature can be used for common block, also for parsed block from block

Documentation

Index

Constants

View Source
const DefaultChannelsBLocksPeerRefreshPeriod = 10 * time.Second
View Source
const DefaultPeerChannelsRefreshPeriod = 30 * time.Second
View Source
const MatchAnyPattern = `*`

Variables

View Source
var DefaultChannelBlocksOpts = &ChannelBlocksOpts{
	Opts:               DefaultOpts,
	stopRecreateStream: false,
}
View Source
var DefaultChannelsBlocksPeerOpts = &ChannelsBlocksPeerOpts{
	refreshPeriod: DefaultChannelsBLocksPeerRefreshPeriod,
	logger:        zap.NewNop(),
}
View Source
var DefaultOpts = &Opts{
	identity: nil,
	logger:   zap.NewNop(),
}
View Source
var DefaultPeerChannelsOpts = &PeerChannelsOpts{
	channels:      MatchAllChannels,
	refreshPeriod: DefaultPeerChannelsRefreshPeriod,
	logger:        zap.NewNop(),
}
View Source
var ErrChannelObserverAlreadyStarted = errors.New(`channel observer already started`)
View Source
var MatchAllChannels = []ChannelToMatch{{
	MatchPattern: MatchAnyPattern,
}}

Functions

func ChannelsInfoToStrings

func ChannelsInfoToStrings(channelsInfo []*peer.ChannelInfo) []string

Types

type Block

type Block[T any] struct {
	Channel string
	Block   T
}

type BlocksStream

type BlocksStream[T any] struct {
	// contains filtered or unexported fields
}

func NewBlocksStream

func NewBlocksStream[T any]() *BlocksStream[T]

func (*BlocksStream[T]) Observe

func (b *BlocksStream[T]) Observe(ctx context.Context, blocks <-chan *Block[T])

func (*BlocksStream[T]) Stop

func (b *BlocksStream[T]) Stop()

func (*BlocksStream[T]) Subscribe

func (b *BlocksStream[T]) Subscribe() (<-chan *Block[T], func())

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

func (*Channel) GetLastError added in v0.10.0

func (c *Channel) GetLastError() error

func (*Channel) GetStatus added in v0.10.0

func (c *Channel) GetStatus() ChannelObserverStatus

type ChannelBlocks

type ChannelBlocks[T any] struct {
	// pointer is to use Channel's data, which can be changed
	*Channel
	// contains filtered or unexported fields
}

func NewChannelBlocks added in v0.10.6

func NewChannelBlocks[T any](
	channel string,
	deliverer func(context.Context, string, msp.SigningIdentity, ...int64) (<-chan T, func() error, error),
	createStreamWithRetry CreateBlockStreamWithRetry[T],
	seekFromFetcher SeekFromFetcher,
	opts ...ChannelBlocksOpt,
) *ChannelBlocks[T]

func (*ChannelBlocks[T]) Observe added in v0.10.6

func (cb *ChannelBlocks[T]) Observe(ctx context.Context) (<-chan *Block[T], error)

func (*ChannelBlocks[T]) Stop added in v0.10.6

func (cb *ChannelBlocks[T]) Stop() error

type ChannelBlocksCommon added in v0.10.6

type ChannelBlocksCommon struct {
	*ChannelBlocks[*common.Block]
}

func NewChannelBlocksCommon added in v0.10.6

func NewChannelBlocksCommon(channel string, blocksDeliver api.BlocksDeliverer, seekFromFetcher SeekFromFetcher, opts ...ChannelBlocksOpt) *ChannelBlocksCommon

type ChannelBlocksOpt added in v0.10.6

type ChannelBlocksOpt func(*ChannelBlocksOpts)

func WithChannelBlockLogger

func WithChannelBlockLogger(logger *zap.Logger) ChannelBlocksOpt

func WithChannelStopRecreateStream

func WithChannelStopRecreateStream(stop bool) ChannelBlocksOpt

type ChannelBlocksOpts added in v0.10.6

type ChannelBlocksOpts struct {
	*Opts
	// contains filtered or unexported fields
}

type ChannelBlocksParsed added in v0.10.6

type ChannelBlocksParsed struct {
	*ChannelBlocks[*hlfproto.Block]
}

func NewChannelBlocksParsed added in v0.10.6

func NewChannelBlocksParsed(channel string, blocksDeliver api.ParsedBlocksDeliverer, seekFromFetcher SeekFromFetcher, opts ...ChannelBlocksOpt) *ChannelBlocksParsed

type ChannelBlocksWithName added in v0.10.6

type ChannelBlocksWithName[T any] struct {
	Name   string
	Blocks <-chan *Block[T]
}

type ChannelInfo

type ChannelInfo struct {
	Channel   string
	Height    uint64
	UpdatedAt *timestamppb.Timestamp
}

type ChannelMatched

type ChannelMatched struct {
	Name string
	// name from settings that lead to this subscription
	MatchPattern    string
	NotMatchPattern string
}

type ChannelObserverStatus

type ChannelObserverStatus int
const (
	ChannelObserverCreated ChannelObserverStatus = iota
	ChannelObserverConnecting
	ChannelObserverConnected
	ChannelObserverStopped
	ChannelObserverErrored

	DefaultConnectRetryDelay = 5 * time.Second
)

func (ChannelObserverStatus) String

func (s ChannelObserverStatus) String() string

type ChannelToMatch

type ChannelToMatch struct {
	Name            string `json:"name" yaml:"name"`
	MatchPattern    string `json:"match_pattern" yaml:"matchPattern"`
	NotMatchPattern string `json:"not_match_pattern" yaml:"notMatchPattern"`
}

type ChannelWithChannels added in v0.10.6

type ChannelWithChannels[T any] struct {
	// contains filtered or unexported fields
}

func (*ChannelWithChannels[T]) Observe added in v0.10.6

func (cwc *ChannelWithChannels[T]) Observe() <-chan *ChannelBlocksWithName[T]

type ChannelsBlocksPeer added in v0.10.6

type ChannelsBlocksPeer[T any] struct {
	// contains filtered or unexported fields
}

func NewChannelsBlocksPeer added in v0.10.6

func NewChannelsBlocksPeer[T any](
	peerChannelsGetter PeerChannelsGetter,
	deliverer func(context.Context, string, msp.SigningIdentity, ...int64) (<-chan T, func() error, error),
	createStreamWithRetry CreateBlockStreamWithRetry[T],
	opts ...ChannelsBlocksPeerOpt,
) *ChannelsBlocksPeer[T]

func (*ChannelsBlocksPeer[T]) Channels added in v0.10.6

func (acb *ChannelsBlocksPeer[T]) Channels() map[string]*Channel

func (*ChannelsBlocksPeer[T]) Observe added in v0.10.6

func (acb *ChannelsBlocksPeer[T]) Observe(ctx context.Context) <-chan *Block[T]

func (*ChannelsBlocksPeer[T]) ObserveByChannels added in v0.10.6

func (acb *ChannelsBlocksPeer[T]) ObserveByChannels(ctx context.Context) *ChannelWithChannels[T]

func (*ChannelsBlocksPeer[T]) Stop added in v0.10.6

func (acb *ChannelsBlocksPeer[T]) Stop()

type ChannelsBlocksPeerCommon added in v0.10.6

type ChannelsBlocksPeerCommon struct {
	*ChannelsBlocksPeer[*common.Block]
}

func NewChannelsBlocksPeerCommon added in v0.10.6

func NewChannelsBlocksPeerCommon(peerChannels PeerChannelsGetter, blocksDeliver api.BlocksDeliverer, opts ...ChannelsBlocksPeerOpt) *ChannelsBlocksPeerCommon

type ChannelsBlocksPeerOpt added in v0.10.6

type ChannelsBlocksPeerOpt func(*ChannelsBlocksPeerOpts)

func WithBlockStopRecreateStream

func WithBlockStopRecreateStream(stop bool) ChannelsBlocksPeerOpt

func WithChannelsBlocksPeerLogger added in v0.10.6

func WithChannelsBlocksPeerLogger(logger *zap.Logger) ChannelsBlocksPeerOpt

func WithChannelsBlocksPeerRefreshPeriod added in v0.10.6

func WithChannelsBlocksPeerRefreshPeriod(refreshPeriod time.Duration) ChannelsBlocksPeerOpt

func WithSeekFrom

func WithSeekFrom(seekFrom map[string]uint64) ChannelsBlocksPeerOpt

func WithSeekFromFetcher added in v0.10.2

func WithSeekFromFetcher(seekFromFetcher SeekFromFetcher) ChannelsBlocksPeerOpt

type ChannelsBlocksPeerOpts added in v0.10.6

type ChannelsBlocksPeerOpts struct {
	// contains filtered or unexported fields
}

type ChannelsBlocksPeerParsed added in v0.10.6

type ChannelsBlocksPeerParsed struct {
	*ChannelsBlocksPeer[*hlfproto.Block]
}

func NewChannelsBlocksPeerParsed added in v0.10.6

func NewChannelsBlocksPeerParsed(peerChannels PeerChannelsGetter, blocksDeliver api.ParsedBlocksDeliverer, opts ...ChannelsBlocksPeerOpt) *ChannelsBlocksPeerParsed

type ChannelsMatcher

type ChannelsMatcher struct {
	// contains filtered or unexported fields
}

func NewChannelsMatcher

func NewChannelsMatcher(channelsToMatch []ChannelToMatch) (*ChannelsMatcher, error)

func (*ChannelsMatcher) Match

func (cm *ChannelsMatcher) Match(channels []string) ([]*ChannelMatched, error)

type CreateBlockStream

type CreateBlockStream[T any] func(context.Context) (<-chan T, error)

type CreateBlockStreamWithRetry

type CreateBlockStreamWithRetry[T any] func(context.Context, CreateBlockStream[T]) (<-chan T, error)

func CreateBlockStreamWithRetryDelay

func CreateBlockStreamWithRetryDelay[T any](delay time.Duration) CreateBlockStreamWithRetry[T]

type Opts

type Opts struct {
	// contains filtered or unexported fields
}

type PeerChannels

type PeerChannels struct {
	// contains filtered or unexported fields
}

PeerChannels observes for peer channels

func NewPeerChannels added in v0.10.6

func NewPeerChannels(peerChannelsFetcher PeerChannelsFetcher, opts ...PeerChannelsOpt) (*PeerChannels, error)

func (*PeerChannels) Channels

func (pc *PeerChannels) Channels() map[string]*ChannelInfo

func (*PeerChannels) Observe added in v0.10.6

func (pc *PeerChannels) Observe(ctx context.Context)

func (*PeerChannels) Stop added in v0.10.6

func (pc *PeerChannels) Stop()

func (*PeerChannels) URI added in v0.10.6

func (pc *PeerChannels) URI() string

type PeerChannelsFetcher

type PeerChannelsFetcher interface {
	URI() string
	api.ChannelListGetter
	api.ChainInfoGetter
}

type PeerChannelsFetcherMock added in v0.10.6

type PeerChannelsFetcherMock struct {
	// contains filtered or unexported fields
}

func NewPeerChannelsFetcherMock added in v0.10.6

func NewPeerChannelsFetcherMock(channels map[string]uint64) *PeerChannelsFetcherMock

func (*PeerChannelsFetcherMock) GetChainInfo added in v0.10.6

func (p *PeerChannelsFetcherMock) GetChainInfo(_ context.Context, channel string) (*common.BlockchainInfo, error)

func (*PeerChannelsFetcherMock) GetChannels added in v0.10.6

func (*PeerChannelsFetcherMock) URI added in v0.10.6

type PeerChannelsGetter added in v0.10.6

type PeerChannelsGetter interface {
	URI() string
	Channels() map[string]*ChannelInfo
}

type PeerChannelsMock added in v0.10.6

type PeerChannelsMock struct {
	// contains filtered or unexported fields
}

func NewPeerChannelsMock added in v0.10.6

func NewPeerChannelsMock(channelsInfo ...*ChannelInfo) *PeerChannelsMock

func (*PeerChannelsMock) Channels added in v0.10.6

func (p *PeerChannelsMock) Channels() map[string]*ChannelInfo

func (*PeerChannelsMock) URI added in v0.10.6

func (p *PeerChannelsMock) URI() string

func (*PeerChannelsMock) UpdateChannelInfo added in v0.10.6

func (p *PeerChannelsMock) UpdateChannelInfo(channelInfo *ChannelInfo)

type PeerChannelsOpt added in v0.10.6

type PeerChannelsOpt func(*PeerChannelsOpts)

func WithChannels

func WithChannels(channels []ChannelToMatch) PeerChannelsOpt

func WithPeerChannelsLogger added in v0.10.6

func WithPeerChannelsLogger(logger *zap.Logger) PeerChannelsOpt

type PeerChannelsOpts added in v0.10.6

type PeerChannelsOpts struct {
	// contains filtered or unexported fields
}

type PeerReader added in v0.10.8

PeerReader implement it to create your service as peer (for example, message broker)

type SeekFromFetcher

type SeekFromFetcher func(ctx context.Context, channel string) (uint64, error)

func ChannelSeekFrom

func ChannelSeekFrom(seekFrom uint64) SeekFromFetcher

func ChannelSeekOldest

func ChannelSeekOldest() SeekFromFetcher

type Stream

type Stream[T any] interface {
	Subscribe() (ch <-chan *Block[T], closer func())
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL