Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Marshal ¶
func Marshal(insertedEvent InsertedEvent) ([]byte, error)
Marshal converts the given ObservationsInsertedEvent to a []byte.
Types ¶
type DimensionHeaderCache ¶
type DimensionHeaderCache interface {
GetOrder(ctx context.Context, instanceID string) ([]string, error)
}
DimensionHeaderCache provides the an array of dimension names to define the order of dimensions (v4 format)
type DimensionIDCache ¶
type DimensionIDCache interface {
GetNodeIDs(ctx context.Context, instanceID string) (map[string]string, error)
}
DimensionIDCache provides database ID's of dimensions when inserting observations.
type InsertedEvent ¶
type InsertedEvent struct { ObservationsInserted int32 `avro:"observations_inserted"` InstanceID string `avro:"instance_id"` }
InsertedEvent is the data that is output for each observation batch inserted.
type Mapper ¶
type Mapper struct {
// contains filtered or unexported fields
}
Mapper interprets a CSV line and returns an observation instance.
func NewMapper ¶
func NewMapper(dimensionOrderCache DimensionHeaderCache) *Mapper
NewMapper returns a new Mapper instance
type MessageProducer ¶
type MessageProducer interface {
Channels() *kafka.ProducerChannels
}
MessageProducer dependency that writes messages
type MessageWriter ¶
type MessageWriter struct {
MessageProducer MessageProducer
}
MessageWriter writes observations as messages
func NewResultWriter ¶
func NewResultWriter(messageProducer MessageProducer) *MessageWriter
NewResultWriter returns a new observation message writer.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store provides persistence for observations.
func NewStore ¶
func NewStore(dimensionIDCache DimensionIDCache, db graph.Observation, errorReporter reporter.ErrorReporter, getGraphDimensionID bool) *Store
NewStore returns a new Observation store instance that uses the given dimension ID cache and db connection.