Documentation ¶
Index ¶
- Variables
- type Client
- type MessageClient
- func (client *MessageClient) All(datasource core.DataSource) ([]proto.Message, error)
- func (client *MessageClient) IsReady() bool
- func (client *MessageClient) Latest(datasource core.DataSource) (proto.Message, error)
- func (client *MessageClient) Metrics() []metrics.Measurement
- func (client *MessageClient) Observe(filter *[]core.DataSource) <-chan proto.Message
- func (client *MessageClient) Run(ctx context.Context, waitGroup *sync.WaitGroup) error
- func (client *MessageClient) String() string
Constants ¶
This section is empty.
Variables ¶
var ErrNoNewEvents = errors.New("No new events available")
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client interface { // Runable core interface. Used to run message fetch n background. core.Runable // Latest willreturn latest element for given datasource. Latest(core.DataSource) (proto.Message, error) // All returns all available message for given datasource. All(core.DataSource) ([]proto.Message, error) // Observe returns a channel clients can subsribe to get new messages. Observe(*[]core.DataSource) <-chan proto.Message // IsReady will ensure at least one event in local storage. In case datasource // filters are set it will ensure that there's one event for each datasource. IsReady() bool // Metrics returns stats of consumed topics and local message count. Metrics() []metrics.Measurement }
type MessageClient ¶
type MessageClient struct {
// contains filtered or unexported fields
}
func (*MessageClient) All ¶
func (client *MessageClient) All(datasource core.DataSource) ([]proto.Message, error)
All returns all available events for passed datasource.
func (*MessageClient) IsReady ¶ added in v1.1.14
func (client *MessageClient) IsReady() bool
IsReady will ensure at least one event in local storage. In case datasource filters are set it will ensure that there's one event for each datasource.
func (*MessageClient) Latest ¶
func (client *MessageClient) Latest(datasource core.DataSource) (proto.Message, error)
Latest returns latest event in local queue.
func (*MessageClient) Metrics ¶ added in v1.2.0
func (client *MessageClient) Metrics() []metrics.Measurement
Metrics returns count of messages for each available datasource.
func (*MessageClient) Observe ¶ added in v1.1.0
func (client *MessageClient) Observe(filter *[]core.DataSource) <-chan proto.Message
Observe returns a channel you can use to listen for new messages. Filter can be passed if you're interested in a specific type of events. If channel sappacitiy is reached new events will be discarded. Channel capacity can be defined by config key "kafka.channel_size".
func (*MessageClient) Run ¶
Run starts message observing. There's no contimuous subsription for new messages. Message client will listen for new message until no new messages come in and will pause messagae receiving for configured duration. Pause duration can be set by config key kafka.poll_sleep".
func (*MessageClient) String ¶ added in v1.1.12
func (client *MessageClient) String() string
String returns log information for current datasource.