stream

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2020 License: BSD-3-Clause Imports: 17 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrUnknownVM = errors.New("unknown VM")

	ErrInvalidTopicName    = errors.New("invalid topic name")
	ErrWrongTopicEventType = errors.New("wrong topic event type")
	ErrWrongTopicNetworkID = errors.New("wrong topic networkID")
)
View Source
var (

	// ErrUnknownProcessorType is returned when encountering a client type with no
	// known implementation
	ErrUnknownProcessorType = errors.New("unknown processor type")
)

Functions

func GetTopicName

func GetTopicName(networkID uint32, chainID string, eventType EventType) string

Types

type EventType

type EventType string
const (
	EventTypeConsensus EventType = "consensus"
	EventTypeDecisions EventType = "decisions"
)

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message is a message on the event stream

func (*Message) Body

func (m *Message) Body() []byte

func (*Message) ChainID

func (m *Message) ChainID() string

func (*Message) ID

func (m *Message) ID() string

func (*Message) Timestamp

func (m *Message) Timestamp() int64

type Processor

type Processor interface {
	ProcessNextMessage(context.Context, logging.Logger) error
	Close() error
}

Processor handles writing and reading to/from the event stream

func NewConsensusProducerProcessor

func NewConsensusProducerProcessor(conf cfg.Config, chainVM string, chainID string) (Processor, error)

NewConsensusProducerProcessor creates a producer for consensus events

func NewDecisionsProducerProcessor

func NewDecisionsProducerProcessor(conf cfg.Config, chainVM string, chainID string) (Processor, error)

NewDecisionsProducerProcessor creates a producer for decision events

type ProcessorFactory

type ProcessorFactory func(cfg.Config, string, string) (Processor, error)

ProcessorFactory takes in configuration and returns a stream Processor

func NewConsumerFactory

func NewConsumerFactory(factory serviceConsumerFactory) ProcessorFactory

NewConsumerFactory returns a processorFactory for the given service consumer

type ProcessorManager

type ProcessorManager struct {
	// contains filtered or unexported fields
}

ProcessorManager supervises the Processor lifecycle; it will use the given configuration and ProcessorFactory to keep a Processor active

func NewProcessorManager

func NewProcessorManager(conf cfg.Config, factory ProcessorFactory) (*ProcessorManager, error)

NewProcessorManager creates a new *ProcessorManager ready for listening

func (*ProcessorManager) Close

func (c *ProcessorManager) Close() error

Close tells the workers to shutdown and waits for them to all stop

func (*ProcessorManager) Listen

func (c *ProcessorManager) Listen() error

Listen sets a client to listen for and handle incoming messages

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

producer reads from the socket and writes to the event stream

func NewProducer

func NewProducer(conf cfg.Config, _ string, chainID string, eventType EventType) (*Producer, error)

NewProducer creates a producer using the given config

func (*Producer) Close

func (p *Producer) Close() error

Close shuts down the producer

func (*Producer) ProcessNextMessage

func (p *Producer) ProcessNextMessage(_ context.Context, log logging.Logger) error

ProcessNextMessage takes in a Message from the IPC socket and writes it to Kafka

func (*Producer) Write

func (p *Producer) Write(msg []byte) (int, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL