observer

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2023 License: Apache-2.0 Imports: 16 Imported by: 0

README

Observer: Observing blocks and events for particular channel or peer

Main features:

  • Block parsing to components (transactions, events, states etc)
  • Auto reconnection when block or event stream interrupted
  • Block and event transformation if needed

Documentation

Index

Constants

View Source
const DefaultBlockPeerObservePeriod = 10 * time.Second
View Source
const DefaultChannelPeerObservePeriod = 30 * time.Second
View Source
const MatchAnyPattern = `*`

Variables

View Source
var DefaultBlockChannelOpts = &BlockChannelOpts{
	createStreamWithRetry: CreateBlockStreamWithRetryDelay(DefaultConnectRetryDelay),
	Opts:                  DefaultOpts,
}
View Source
var DefaultBlockPeerOpts = &BlockPeerOpts{
	observePeriod: DefaultBlockPeerObservePeriod,
	logger:        zap.NewNop(),
}
View Source
var DefaultChannelPeerOpts = &ChannelPeerOpts{
	channels:      MatchAllChannels,
	observePeriod: DefaultChannelPeerObservePeriod,
	logger:        zap.NewNop(),
}
View Source
var DefaultOpts = &Opts{
	identity: nil,
	logger:   zap.NewNop(),
}
View Source
var ErrChannelObserverAlreadyStarted = errors.New(`channel observer already started`)
View Source
var MatchAllChannels = []ChannelToMatch{{
	MatchPattern: MatchAnyPattern,
}}

Functions

func ChannelsInfoToStrings

func ChannelsInfoToStrings(channelsInfo []*peer.ChannelInfo) []string

Types

type Block

type Block struct {
	Block   *common.Block
	Channel string
}

type BlockChannel

type BlockChannel struct {
	*Channel
	// contains filtered or unexported fields
}

func NewBlockChannel

func NewBlockChannel(channel string, blocksDeliver api.BlocksDeliverer, seekFromFetcher SeekFromFetcher, opts ...BlockChannelOpt) *BlockChannel

func (*BlockChannel) Observe

func (c *BlockChannel) Observe(ctx context.Context) (<-chan *Block, error)

func (*BlockChannel) Stop

func (c *BlockChannel) Stop() error

type BlockChannelOpt

type BlockChannelOpt func(*BlockChannelOpts)

func WithChannelBlockLogger

func WithChannelBlockLogger(logger *zap.Logger) BlockChannelOpt

func WithChannelStopRecreateStream

func WithChannelStopRecreateStream(stop bool) BlockChannelOpt

type BlockChannelOpts

type BlockChannelOpts struct {
	*Opts
	// contains filtered or unexported fields
}

type BlockPeer

type BlockPeer struct {
	// contains filtered or unexported fields
}

func NewBlockPeer

func NewBlockPeer(peerChannels PeerChannels, blockDeliverer api.BlocksDeliverer, opts ...BlockPeerOpt) *BlockPeer

func (*BlockPeer) ChannelObservers

func (bp *BlockPeer) ChannelObservers() map[string]*BlockPeerChannel

func (*BlockPeer) Observe

func (bp *BlockPeer) Observe(ctx context.Context) <-chan *Block

func (*BlockPeer) ObserveByChannels

func (bp *BlockPeer) ObserveByChannels(ctx context.Context) *BlocksByChannels

func (*BlockPeer) Stop

func (bp *BlockPeer) Stop()

type BlockPeerChannel added in v0.10.0

type BlockPeerChannel struct {
	Observer *BlockChannel
	// contains filtered or unexported fields
}

type BlockPeerOpt

type BlockPeerOpt func(*BlockPeerOpts)

func WithBlockPeerLogger

func WithBlockPeerLogger(logger *zap.Logger) BlockPeerOpt

func WithBlockPeerObservePeriod

func WithBlockPeerObservePeriod(observePeriod time.Duration) BlockPeerOpt

func WithBlockStopRecreateStream

func WithBlockStopRecreateStream(stop bool) BlockPeerOpt

func WithSeekFrom

func WithSeekFrom(seekFrom map[string]uint64) BlockPeerOpt

type BlockPeerOpts

type BlockPeerOpts struct {
	// contains filtered or unexported fields
}

type BlockTransformer

type BlockTransformer interface {
	Transform(*ParsedBlock) error
}

BlockTransformer transforms parsed observer data. For example decrypt, or transformer protobuf state to json

type BlocksByChannels

type BlocksByChannels struct {
	// contains filtered or unexported fields
}

func (*BlocksByChannels) Observe

func (b *BlocksByChannels) Observe() chan *ChannelCommonBlocks

type BlocksStream

type BlocksStream struct {
	// contains filtered or unexported fields
}

func NewBlocksStream

func NewBlocksStream() *BlocksStream

func (*BlocksStream) Observe

func (b *BlocksStream) Observe(ctx context.Context, blocks <-chan *Block)

func (*BlocksStream) Stop

func (b *BlocksStream) Stop()

func (*BlocksStream) Subscribe

func (b *BlocksStream) Subscribe() (chan *Block, func())

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

func (*Channel) GetLastError added in v0.10.0

func (c *Channel) GetLastError() error

func (*Channel) GetStatus added in v0.10.0

func (c *Channel) GetStatus() ChannelObserverStatus

type ChannelCommonBlocks added in v0.8.1

type ChannelCommonBlocks struct {
	Name   string
	Blocks <-chan *Block
}

type ChannelInfo

type ChannelInfo struct {
	Channel   string
	Height    uint64
	UpdatedAt *timestamppb.Timestamp
}

type ChannelMatched

type ChannelMatched struct {
	Name string
	// name from settings that lead to this subscription
	MatchPattern    string
	NotMatchPattern string
}

type ChannelObserverStatus

type ChannelObserverStatus int
const (
	ChannelObserverCreated ChannelObserverStatus = iota
	ChannelObserverConnecting
	ChannelObserverConnected
	ChannelObserverStopped
	ChannelObserverErrored

	DefaultConnectRetryDelay = 5 * time.Second
)

func (ChannelObserverStatus) String

func (s ChannelObserverStatus) String() string

type ChannelParsedBlocks added in v0.8.1

type ChannelParsedBlocks struct {
	Name   string
	Blocks <-chan *ParsedBlock
}

type ChannelPeer

type ChannelPeer struct {
	// contains filtered or unexported fields
}

ChannelPeer observes for peer channels

func NewChannelPeer

func NewChannelPeer(peerChannelsFetcher PeerChannelsFetcher, opts ...ChannelPeerOpt) (*ChannelPeer, error)

func (*ChannelPeer) Channels

func (cp *ChannelPeer) Channels() map[string]*ChannelInfo

func (*ChannelPeer) Observe

func (cp *ChannelPeer) Observe(ctx context.Context)

func (*ChannelPeer) Stop

func (cp *ChannelPeer) Stop()

type ChannelPeerFetcherMock

type ChannelPeerFetcherMock struct {
	// contains filtered or unexported fields
}

func NewChannelPeerFetcherMock

func NewChannelPeerFetcherMock(channels map[string]uint64) *ChannelPeerFetcherMock

func (*ChannelPeerFetcherMock) GetChainInfo

func (c *ChannelPeerFetcherMock) GetChainInfo(_ context.Context, channel string) (*common.BlockchainInfo, error)

func (*ChannelPeerFetcherMock) GetChannels

type ChannelPeerMock

type ChannelPeerMock struct {
	// contains filtered or unexported fields
}

func NewChannelPeerMock

func NewChannelPeerMock(channelsInfo ...*ChannelInfo) *ChannelPeerMock

func (*ChannelPeerMock) Channels

func (m *ChannelPeerMock) Channels() map[string]*ChannelInfo

func (*ChannelPeerMock) UpdateChannelInfo

func (m *ChannelPeerMock) UpdateChannelInfo(channelInfo *ChannelInfo)

type ChannelPeerOpt

type ChannelPeerOpt func(*ChannelPeerOpts)

func WithChannelPeerLogger

func WithChannelPeerLogger(logger *zap.Logger) ChannelPeerOpt

func WithChannels

func WithChannels(channels []ChannelToMatch) ChannelPeerOpt

type ChannelPeerOpts

type ChannelPeerOpts struct {
	// contains filtered or unexported fields
}

type ChannelStatus

type ChannelStatus struct {
	Status ChannelObserverStatus
	Err    error
}

type ChannelToMatch

type ChannelToMatch struct {
	Name            string `json:"name" yaml:"name"`
	MatchPattern    string `json:"match_pattern" yaml:"matchPattern"`
	NotMatchPattern string `json:"not_match_pattern" yaml:"notMatchPattern"`
}

type ChannelsMatcher

type ChannelsMatcher struct {
	// contains filtered or unexported fields
}

func NewChannelsMatcher

func NewChannelsMatcher(channelsToMatch []ChannelToMatch) (*ChannelsMatcher, error)

func (*ChannelsMatcher) Match

func (cm *ChannelsMatcher) Match(channels []string) ([]*ChannelMatched, error)

type CreateBlockStream

type CreateBlockStream func(context.Context) (<-chan *common.Block, error)

type CreateBlockStreamWithRetry

type CreateBlockStreamWithRetry func(context.Context, CreateBlockStream) (<-chan *common.Block, error)

func CreateBlockStreamWithRetryDelay

func CreateBlockStreamWithRetryDelay(delay time.Duration) CreateBlockStreamWithRetry

type CreateEventStream

type CreateEventStream func(context.Context) (<-chan *peer.ChaincodeEvent, error)

type CreateEventStreamWithRetry

type CreateEventStreamWithRetry func(context.Context, CreateEventStream) (<-chan *peer.ChaincodeEvent, error)

type Event

type Event struct {
	Block   *peer.ChaincodeEvent
	Channel string
	Error   error
}

type EventTransformer

type EventTransformer interface {
	Transform(*Event)
}

type Opts

type Opts struct {
	// contains filtered or unexported fields
}

type ParsedBlock added in v0.8.1

type ParsedBlock struct {
	Block         *hlfproto.Block // parsed block
	BlockOriginal *hlfproto.Block // here is original block before transformation if it is, otherwise it's nil
	Channel       string
	Error         error
}

type ParsedBlockChannel added in v0.8.1

type ParsedBlockChannel struct {
	BlockChannel *BlockChannel
	// contains filtered or unexported fields
}

func NewParsedBlockChannel added in v0.8.1

func NewParsedBlockChannel(blockChannel *BlockChannel, opts ...ParsedBlockChannelOpt) *ParsedBlockChannel

func (*ParsedBlockChannel) Observe added in v0.8.1

func (p *ParsedBlockChannel) Observe(ctx context.Context) (<-chan *ParsedBlock, error)

func (*ParsedBlockChannel) Stop added in v0.8.1

func (p *ParsedBlockChannel) Stop() error

type ParsedBlockChannelOpt added in v0.8.1

type ParsedBlockChannelOpt func(*ParsedBlockChannel)

func WithParsedChannelBlockTransformers added in v0.8.1

func WithParsedChannelBlockTransformers(transformers []BlockTransformer) ParsedBlockChannelOpt

func WithParsedChannelConfigBlock added in v0.8.1

func WithParsedChannelConfigBlock(configBlock *common.Block) ParsedBlockChannelOpt

type ParsedBlockPeer added in v0.8.1

type ParsedBlockPeer struct {
	// contains filtered or unexported fields
}

func NewParsedBlockPeer added in v0.8.1

func NewParsedBlockPeer(blocksPeer *BlockPeer, opts ...ParsedBlockPeerOpt) *ParsedBlockPeer

func (*ParsedBlockPeer) ChannelObservers added in v0.9.1

func (pbp *ParsedBlockPeer) ChannelObservers() map[string]*ParsedBlockPeerChannel

func (*ParsedBlockPeer) Observe added in v0.8.1

func (pbp *ParsedBlockPeer) Observe(ctx context.Context) <-chan *ParsedBlock

func (*ParsedBlockPeer) ObserveByChannels added in v0.8.1

func (pbp *ParsedBlockPeer) ObserveByChannels(ctx context.Context) *ParsedBlocksByChannels

func (*ParsedBlockPeer) Stop added in v0.8.1

func (pbp *ParsedBlockPeer) Stop()

type ParsedBlockPeerChannel added in v0.10.0

type ParsedBlockPeerChannel struct {
	Observer *ParsedBlockChannel
	// contains filtered or unexported fields
}

type ParsedBlockPeerOpt added in v0.8.1

type ParsedBlockPeerOpt func(*ParsedBlockPeer)

func WithBlockPeerTransformer

func WithBlockPeerTransformer(transformers ...BlockTransformer) ParsedBlockPeerOpt

func WithConfigBlocks

func WithConfigBlocks(configBlocks map[string]*common.Block) ParsedBlockPeerOpt

WithConfigBlocks just for correct parsing of BFT at hlfproto.ParseBlock

type ParsedBlocksByChannels added in v0.8.1

type ParsedBlocksByChannels struct {
	// contains filtered or unexported fields
}

func (*ParsedBlocksByChannels) Observe added in v0.8.1

func (p *ParsedBlocksByChannels) Observe() chan *ChannelParsedBlocks

type ParsedBlocksStream added in v0.8.1

type ParsedBlocksStream struct {
	// contains filtered or unexported fields
}

func NewParsedBlocksStream added in v0.8.1

func NewParsedBlocksStream() *ParsedBlocksStream

func (*ParsedBlocksStream) Observe added in v0.8.1

func (b *ParsedBlocksStream) Observe(ctx context.Context, blocks <-chan *ParsedBlock)

func (*ParsedBlocksStream) Stop added in v0.8.1

func (b *ParsedBlocksStream) Stop()

func (*ParsedBlocksStream) Subscribe added in v0.8.1

func (b *ParsedBlocksStream) Subscribe() (chan *ParsedBlock, func())

func (*ParsedBlocksStream) SubscribeParsed added in v0.8.1

func (b *ParsedBlocksStream) SubscribeParsed() (chan *ParsedBlock, func())

type PeerChannels

type PeerChannels interface {
	Channels() map[string]*ChannelInfo
}

type PeerChannelsFetcher

type PeerChannelsFetcher interface {
	api.ChannelListGetter
	api.ChainInfoGetter
}

type SeekFromFetcher

type SeekFromFetcher func(ctx context.Context, channel string) (uint64, error)

func ChannelSeekFrom

func ChannelSeekFrom(seekFrom uint64) SeekFromFetcher

func ChannelSeekOldest

func ChannelSeekOldest() SeekFromFetcher

type Stream

type Stream interface {
	Subscribe() (ch chan *Block, closer func())
}

type StreamParsed added in v0.8.1

type StreamParsed interface {
	Subscribe() (ch chan *ParsedBlock, closer func())
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL