Documentation
¶
Overview ¶
Package fluent implements a client for the fluentd data logging daemon.
Example ¶
package main import ( "context" "log" "time" fluent "github.com/Edwardsj/fluent-client" ) func main() { // Connects to fluentd at 127.0.0.1:24224. If you want to connect to // a different host, use the following: // // client, err := fluent.New(fluent.WithAddress("fluent.example.com")) // client, err := fluent.New() if err != nil { // fluent.New may return an error if invalid values were // passed to the constructor log.Printf("failed to create client: %s", err) return } // do not forget to shutdown this client at the end. otherwise // we would not know if we were able to flush the pending // buffer or not. defer func() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := client.Shutdown(ctx); err != nil { log.Printf("Failed to shutdown properly. force-close it") client.Close() } log.Printf("shutdown complete") }() var payload = map[string]string{ "foo": "bar", } log.Printf("Posting message") if err := client.Post("debug.test", payload); err != nil { log.Printf("failed to post: %s", err) return } }
Output:
Index ¶
- func IsBufferFull(e error) bool
- func Ping(ctx context.Context, client Client, tag string, record interface{}, ...)
- type Buffered
- func (c *Buffered) Close() error
- func (c *Buffered) HttpPost(tag string, record interface{}, options ...Option) (err error)
- func (c *Buffered) Ping(tag string, record interface{}, options ...Option) (err error)
- func (c *Buffered) Post(tag string, v interface{}, options ...Option) (err error)
- func (c *Buffered) Shutdown(ctx context.Context) error
- type Client
- type EventTime
- type Message
- func (m *Message) DecodeMsgpack(d *msgpack.Decoder) error
- func (m *Message) EncodeMsgpack(e *msgpack.Encoder) error
- func (m *Message) MarshalJSON() ([]byte, error)
- func (m *Message) MarshalRawJSON() ([]byte, error)
- func (m *Message) UnmarshalJSON(buf []byte) error
- func (m *Message) UnmarshalRawJSON(buf []byte) error
- type Option
- func WithAddress(s string) Option
- func WithBufferLimit(v interface{}) Option
- func WithBuffered(b bool) Option
- func WithConnectOnStart(b bool) Option
- func WithContext(ctx context.Context) Option
- func WithDialTimeout(d time.Duration) Option
- func WithJSONMarshaler() Option
- func WithMaxConnAttempts(n uint64) Option
- func WithMethod(method string) Option
- func WithMsgpackMarshaler() Option
- func WithNetwork(s string) Option
- func WithPingInterval(t time.Duration) Option
- func WithPingResultChan(ch chan error) Option
- func WithRawJSONMarshaler() Option
- func WithSubsecond(b bool) Option
- func WithSyncAppend(b bool) Option
- func WithTLS(conf tls.Config) Option
- func WithTagPrefix(s string) Option
- func WithTimestamp(t time.Time) Option
- func WithWriteQueueSize(n int) Option
- func WithWriteThreshold(i int) Option
- type TLSConfig
- type Unbuffered
- func (c *Unbuffered) Close() error
- func (c *Unbuffered) HttpPost(tag string, v interface{}, options ...Option) (err error)
- func (c *Unbuffered) Ping(tag string, v interface{}, options ...Option) (err error)
- func (c *Unbuffered) Post(tag string, v interface{}, options ...Option) (err error)
- func (c *Unbuffered) Shutdown(_ context.Context) error
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsBufferFull ¶
IsBufferFull returns true if the error is a BufferFull error
func Ping ¶
Ping is a helper method that allows you to call client.Ping in a periodic manner.
By default a ping message will be sent every 5 minutes. You may change this using the WithPingInterval option.
If you need to capture ping failures, pass it a channel using WithPingResultChan
Example ¶
package main import ( "context" "log" fluent "github.com/Edwardsj/fluent-client" ) func main() { client, err := fluent.New() if err != nil { log.Printf("failed to create client: %s", err) return } ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Goroutine to wait for errors errorCh := make(chan error, 1) go func() { // This is just an example to stop pinging on errors for { select { case <-ctx.Done(): return case e := <-errorCh: log.Printf("got an error during ping: %s", e.Error()) cancel() return } } }() go fluent.Ping(ctx, client, "ping", "hostname", fluent.WithPingResultChan(errorCh)) // Do what you need with your main program... }
Output:
Types ¶
type Buffered ¶
type Buffered struct {
// contains filtered or unexported fields
}
Buffered is a Client that buffers incoming messages, and sends them asynchrnously when it can.
func NewBuffered ¶
NewBuffered creates a new Buffered client. Options may be one of the following:
- fluent.WithAddress
- fluent.WithBufferLimit
- fluent.WithDialTimeout
- fluent.WithJSONMarshaler
- fluent.WithMaxConnAttempts
- fluent.WithMsgpackMarshaler
- fluent.WithNetwork
- fluent.WithTagPrefix
- fluent.WithWriteThreshold
- fluent.WithWriteQueueSize
Please see their respective documentation for details.
func (*Buffered) Close ¶
Close closes the connection, but does not wait for the pending buffers to be flushed. If you want to make sure that background minion has properly exited, you should probably use the Shutdown() method
func (*Buffered) HttpPost ¶
Ping synchronously sends a ping message. This ping bypasses the underlying buffer of pending messages, and establishes a connection to the server entirely for this ping message.
func (*Buffered) Ping ¶
Ping synchronously sends a ping message. This ping bypasses the underlying buffer of pending messages, and establishes a connection to the server entirely for this ping message.
func (*Buffered) Post ¶
Post posts the given structure after encoding it along with the given tag.
An error is returned if the client has already been closed.
If you would like to specify options to `Post()`, you may pass them at the end of the method. Currently you can use the following:
fluent.WithContext: specify context.Context to use fluent.WithTimestamp: allows you to set arbitrary timestamp values fluent.WithSyncAppend: allows you to verify if the append was successful
If fluent.WithSyncAppend is provide and is true, the following errors may be returned:
- If the current underlying pending buffer is is not large enough to hold this new data, an error will be returned
- If the marshaling into msgpack/json failed, it is returned
type Client ¶
type Client interface { Post(string, interface{}, ...Option) error Ping(string, interface{}, ...Option) error Close() error Shutdown(context.Context) error }
Client represents a fluentd client. The client receives data as we go, and proxies it to a background minion. The background minion attempts to write to the server as soon as possible
func New ¶
New creates a new client. By default a buffered client is created. The `WithBufered` option switches which type of client is created. `WithBuffered(true)` (default) creates a buffered client, and `WithBuffered(false)` creates a unbuffered client. All options are delegates to `NewBuffered` and `NewUnbuffered` respectively.
type EventTime ¶
EventTime is used to represent the time in a msgpack Message
func (*EventTime) DecodeMsgpack ¶
DecodeMsgpack decodes from a msgpack stream and materializes a EventTime object
type Message ¶
type Message struct { Tag string `msgpack:"tag"` Time EventTime `msgpack:"time"` Record interface{} `msgpack:"record"` Option interface{} `msgpack:"option"` Next *Message //for Message chain End *Message //end of Message chain Len int //for Message chain // contains filtered or unexported fields }
Message is a fluentd's payload, which can be encoded in JSON or MessagePack format.
func (*Message) DecodeMsgpack ¶
DecodeMsgpack deserializes from a msgpack buffer and populates a Message struct appropriately
func (*Message) EncodeMsgpack ¶
EncodeMsgpack serializes a Message to msgpack format
func (*Message) MarshalJSON ¶
MarshalJSON serializes a Message to JSON format
func (*Message) MarshalRawJSON ¶
func (*Message) UnmarshalJSON ¶
UnmarshalJSON deserializes from a JSON buffer and populates a Message struct appropriately
func (*Message) UnmarshalRawJSON ¶
type Option ¶
type Option interface { Name() string Value() interface{} }
Option is an interface used for providing options to the various methods
func WithAddress ¶
WithAddress specifies the address to connect to for `fluent.New` A unix domain socket path, or a hostname/IP address.
func WithBufferLimit ¶
func WithBufferLimit(v interface{}) Option
WithBufferLimit specifies the buffer limit to be used for the underlying pending buffer. If a `Client.Post` operation would exceed this size, an error is returned (note: you must use `WithSyncAppend` in `Client.Post` if you want this error to be reported). The defalut value is 8MB
func WithBuffered ¶
WithBuffered specifies if we should create a buffered or unbuffered client
func WithConnectOnStart ¶
WithConnectOnStart is specified when you would like a buffered client to make sure that it can connect to the specified fluentd server on startup.
func WithContext ¶
WithContext specifies the context.Context object to be used by Post(). Possible blocking operations are (1) writing to the background buffer, and (2) waiting for a reply from when WithSyncAppend(true) is in use.
func WithDialTimeout ¶
WithDialTimeout specifies the amount of time allowed for the client to establish connection with the server. If we are forced to wait for a duration that exceeds the specified timeout, we deem the connection to have failed. The default value is 3 seconds
func WithJSONMarshaler ¶
func WithJSONMarshaler() Option
WithJSONMarshaler specifies JSON marshaling to be used when sending messages to fluentd. Used for `fluent.New`
func WithMaxConnAttempts ¶
WithMaxConnAttempts specifies the maximum number of attempts made by the client to connect to the fluentd server during final data flushing for buffered clients. For unbuffered clients, this controls the number of attempts made when calling `Post`.
For buffered clients: During normal operation, the client will indefinitely attempt to connect to the server (whilst being backed-off properly), as it should try as hard as possible to send the stored data.
This option controls the behavior when the client still has more data to send AFTER it has been told to Close() or Shutdown(). In this case we know the client wants to stop at some point, so we try to connect up to a finite number of attempts.
The default value is 64 for both buffered and unbuffered clients.
func WithMethod ¶
WithMethod specifies the sent type, i.e. "forward" or "http" for `fluent.New`
func WithMsgpackMarshaler ¶
func WithMsgpackMarshaler() Option
WithMsgpackMarshaler specifies msgpack marshaling to be used when sending messages to fluentd. Used in `fluent.New`
func WithNetwork ¶
WithNetwork specifies the network type, i.e. "tcp" or "unix" for `fluent.New`
func WithPingInterval ¶
WithPingInterval is used in the fluent.Ping method to specify the time between pings. The default value is 5 minutes
func WithPingResultChan ¶
WithPingResultChan specifies the channel where you will receive ping failures
func WithRawJSONMarshaler ¶
func WithRawJSONMarshaler() Option
WithRawJSONMarshaler raw JSON marshaling to be used when sending messages to fluentd. Used for `fluent.New`
func WithSubsecond ¶
WithSubsecond specifies if we should use EventTime for timestamps on fluentd messages. May be used on a per-client basis or per-call to Post(). By default this feature is turned OFF.
Note that this option will only work for fluentd v0.14 or above.
func WithSyncAppend ¶
WithSyncAppend specifies if we should synchronously check for success when appending to the underlying pending buffer. Used in `Client.Post`. If not specified, errors appending are not reported.
func WithTagPrefix ¶
WithTagPrefix specifies the prefix to be appended to tag names when sending messages to fluend. Used in `fluent.New`
func WithTimestamp ¶
WithTimestamp specifies the timestamp to be used for `Client.Post`
func WithWriteQueueSize ¶
WithWriteQueueSize specifies the channel buffer size for the queue used to pass messages from the Client to the background writer goroutines. The default value is 64.
func WithWriteThreshold ¶
WithWriteThreshold specifies the minimum number of bytes that we should have pending before starting to attempt to write to the server. The default value is 8KB
type Unbuffered ¶
type Unbuffered struct {
// contains filtered or unexported fields
}
Unbuffered is a Client that synchronously sends messages.
func NewUnbuffered ¶
func NewUnbuffered(options ...Option) (client *Unbuffered, err error)
NewUnbuffered creates an unbuffered client. Unlike the normal buffered client, an unbuffered client handles the Post() method synchronously, and does not attempt to buffer the payload.
- fluent.WithAddress
- fluent.WithDialTimeout
- fluent.WithMarshaler
- fluent.WithMaxConnAttempts
- fluent.WithNetwork
- fluent.WithSubSecond
- fluent.WithTagPrefix
Please see their respective documentation for details.
func (*Unbuffered) Close ¶
func (c *Unbuffered) Close() error
Close cloes the currenct cached connection, if any
func (*Unbuffered) HttpPost ¶
func (c *Unbuffered) HttpPost(tag string, v interface{}, options ...Option) (err error)
func (*Unbuffered) Ping ¶
func (c *Unbuffered) Ping(tag string, v interface{}, options ...Option) (err error)
Ping sends a ping message. A ping for an unbuffered client is completely analogous to sending a message with Post
func (*Unbuffered) Post ¶
func (c *Unbuffered) Post(tag string, v interface{}, options ...Option) (err error)
Post posts the given structure after encoding it along with the given tag.
If you would like to specify options to `Post()`, you may pass them at the end of the method. Currently you can use the following:
fluent.WithTimestamp: allows you to set arbitrary timestamp values
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package pdebug provides tools to produce debug logs the way the author (Daisuke Maki a.k.a.
|
Package pdebug provides tools to produce debug logs the way the author (Daisuke Maki a.k.a. |