Documentation ¶
Index ¶
- Constants
- Variables
- type BeatPublisher
- func (publisher *BeatPublisher) Connect() Client
- func (publisher *BeatPublisher) GeoLite() *libgeo.GeoIP
- func (publisher *BeatPublisher) GetServerName(ip string) string
- func (publisher *BeatPublisher) IsPublisherIP(ip string) bool
- func (publisher *BeatPublisher) PublishTopology(params ...string) error
- func (publisher *BeatPublisher) Stop()
- func (publisher *BeatPublisher) UpdateTopologyPeriodically()
- type Client
- type ClientOption
- type Context
- type Publisher
- type ShipperConfig
- type Topology
- type TransactionalEventPublisher
Constants ¶
const ( DefaultQueueSize = 1000 DefaultBulkQueueSize = 0 )
Variables ¶
var (
ErrClientClosed = errors.New("client closed")
)
Functions ¶
This section is empty.
Types ¶
type BeatPublisher ¶
type BeatPublisher struct { IPAddrs []string Index string Output []*outputWorker TopologyOutput outputs.TopologyOutputer Processors *processors.Processors RefreshTopologyTimer <-chan time.Time // contains filtered or unexported fields }
func New ¶
func New( beatName string, beatVersion string, configs map[string]*common.Config, shipper ShipperConfig, processors *processors.Processors, ) (*BeatPublisher, error)
Create new PublisherType
func (*BeatPublisher) Connect ¶
func (publisher *BeatPublisher) Connect() Client
func (*BeatPublisher) GeoLite ¶
func (publisher *BeatPublisher) GeoLite() *libgeo.GeoIP
func (*BeatPublisher) GetServerName ¶
func (publisher *BeatPublisher) GetServerName(ip string) string
func (*BeatPublisher) IsPublisherIP ¶
func (publisher *BeatPublisher) IsPublisherIP(ip string) bool
func (*BeatPublisher) PublishTopology ¶
func (publisher *BeatPublisher) PublishTopology(params ...string) error
func (*BeatPublisher) Stop ¶
func (publisher *BeatPublisher) Stop()
func (*BeatPublisher) UpdateTopologyPeriodically ¶
func (publisher *BeatPublisher) UpdateTopologyPeriodically()
type Client ¶
type Client interface { // Close disconnects the Client from the publisher pipeline. Close() error // PublishEvent publishes one event with given options. If Sync option is set, // PublishEvent will block until output plugins report success or failure state // being returned by this method. PublishEvent(event common.MapStr, opts ...ClientOption) bool // PublishEvents publishes multiple events with given options. If Guaranteed // option is set, PublishEvent will block until output plugins report // success or failure state being returned by this method. PublishEvents(events []common.MapStr, opts ...ClientOption) bool }
Client is used by beats to publish new events.
The publish methods add fields that are common to all events. Both methods add the 'beat' field that contains name and hostname. Also they add 'tags' and 'fields'.
Event publishers can override the default index for an event by adding a 'beat' field whose value is a common.MapStr that contains an 'index' field specifying the destination index.
event := common.MapStr{ // Setting a custom index for a single event. "beat": common.MapStr{"index": "custom-index"}, }
Event publishers can add fields and tags to an event. The fields will take precedence over the global fields defined in the shipper configuration.
event := common.MapStr{ // Add custom fields to the root of the event. common.EventMetadataKey: common.EventMetadata{ UnderRoot: true, Fields: common.MapStr{"env": "production"} } }
type ClientOption ¶
ClientOption allows API users to set additional options when publishing events.
func Signal ¶
func Signal(signaler op.Signaler) ClientOption
type Context ¶
func Guaranteed ¶
Guaranteed option will retry publishing the event, until send attempt have been ACKed by output plugin.
func MakeContext ¶
func MakeContext(opts []ClientOption) Context
type ShipperConfig ¶
type ShipperConfig struct { common.EventMetadata `config:",inline"` // Fields and tags to add to each event. Name string `config:"name"` RefreshTopologyFreq time.Duration `config:"refresh_topology_freq"` TopologyExpire int `config:"topology_expire"` Geoip common.Geoip `config:"geoip"` // internal publisher queue sizes QueueSize *int `config:"queue_size"` BulkQueueSize *int `config:"bulk_queue_size"` MaxProcs *int `config:"max_procs"` }
func (*ShipperConfig) InitShipperConfig ¶
func (config *ShipperConfig) InitShipperConfig()