Documentation ¶
Overview ¶
Package devosender implements the tools to send data to Devo in a different ways and scenarios
Interfaces to grant abstraction between implementations are defined and complex objects has associated their own builder
Index ¶
- Constants
- Variables
- type ClienBuilderDevoCentralRelay
- type Client
- func (dsc *Client) AddReplaceSequences(old, new string) error
- func (dsc *Client) AreAsyncOps() bool
- func (dsc *Client) AsyncError(id string) (bool, error)
- func (dsc *Client) AsyncErrors() map[string]error
- func (dsc *Client) AsyncErrorsIds() []string
- func (dsc *Client) AsyncErrorsNumber() int
- func (dsc *Client) AsyncIds() []string
- func (dsc *Client) AsyncsNumber() int
- func (dsc *Client) Close() error
- func (dsc *Client) GetEntryPoint() string
- func (dsc *Client) IsAsyncActive(id string) bool
- func (dsc *Client) IsConnWorking() (bool, error)
- func (dsc *Client) LastSendCallTimestamp() time.Time
- func (dsc *Client) PurgeAsyncErrors()
- func (dsc *Client) Send(m string) error
- func (dsc *Client) SendAsync(m string) string
- func (dsc *Client) SendWTag(t, m string) error
- func (dsc *Client) SendWTagAndCompressor(t, m string, c *compressor.Compressor) error
- func (dsc *Client) SendWTagAndCompressorAsync(t, m string, c *compressor.Compressor) string
- func (dsc *Client) SendWTagAsync(t, m string) string
- func (dsc *Client) SetDefaultTag(t string) error
- func (dsc *Client) SetSyslogHostName(host string)
- func (dsc *Client) String() string
- func (dsc *Client) WaitForPendingAsyncMessages() error
- func (dsc *Client) WaitForPendingAsyncMsgsOrTimeout(timeout time.Duration) error
- func (dsc *Client) Write(p []byte) (n int, err error)
- type ClientBuilder
- func (dsb *ClientBuilder) Build() (*Client, error)
- func (dsb *ClientBuilder) CompressorMinSize(s int) *ClientBuilder
- func (dsb *ClientBuilder) ConnectionExpiration(t time.Duration) *ClientBuilder
- func (dsb *ClientBuilder) DefaultCompressor(c compressor.CompressorAlgorithm) *ClientBuilder
- func (dsb *ClientBuilder) DefaultDevoTag(t string) *ClientBuilder
- func (dsb *ClientBuilder) DevoCentralEntryPoint(relay ClienBuilderDevoCentralRelay) *ClientBuilder
- func (dsb *ClientBuilder) EntryPoint(entrypoint string) *ClientBuilder
- func (dsb *ClientBuilder) IsConnWorkingCheckPayload(s string) *ClientBuilder
- func (dsb *ClientBuilder) TCPKeepAlive(t time.Duration) *ClientBuilder
- func (dsb *ClientBuilder) TCPTimeout(t time.Duration) *ClientBuilder
- func (dsb *ClientBuilder) TLSCerts(key []byte, cert []byte, chain []byte) *ClientBuilder
- func (dsb *ClientBuilder) TLSFiles(keyFileName string, certFileName string, chainFileName *string) *ClientBuilder
- func (dsb *ClientBuilder) TLSInsecureSkipVerify(insecureSkipVerify bool) *ClientBuilder
- func (dsb *ClientBuilder) TLSRenegotiation(renegotiation tls.RenegotiationSupport) *ClientBuilder
- type DevoSender
- type LazyClient
- func (lc *LazyClient) Close() error
- func (lc *LazyClient) Flush() error
- func (lc *LazyClient) IsLimitReachedLastFlush() bool
- func (lc *LazyClient) IsStandBy() bool
- func (lc *LazyClient) OnlyInMemory() bool
- func (lc *LazyClient) PendingEventsNoConn() int
- func (lc *LazyClient) SendAsync(m string) string
- func (lc *LazyClient) SendWTagAndCompressorAsync(t, m string, c *compressor.Compressor) string
- func (lc *LazyClient) SendWTagAsync(t, m string) string
- func (lc *LazyClient) StandBy() error
- func (lc *LazyClient) String() string
- func (lc *LazyClient) WakeUp() error
- type LazyClientBuilder
- func (lcb *LazyClientBuilder) AppLogger(log applogger.SimpleAppLogger) *LazyClientBuilder
- func (lcb *LazyClientBuilder) BufferSize(s uint32) *LazyClientBuilder
- func (lcb *LazyClientBuilder) Build() (*LazyClient, error)
- func (lcb *LazyClientBuilder) ClientBuilder(cb *ClientBuilder) *LazyClientBuilder
- func (lcb *LazyClientBuilder) EnableStandByModeTimeout(d time.Duration) *LazyClientBuilder
- func (lcb *LazyClientBuilder) FlushTimeout(d time.Duration) *LazyClientBuilder
- func (lcb *LazyClientBuilder) MaxRecordsResendByFlush(max int) *LazyClientBuilder
- type LazyClientStats
- type ReliableClient
- func (dsrc *ReliableClient) Close() error
- func (dsrc *ReliableClient) Flush() error
- func (dsrc *ReliableClient) IsConnWorking() (bool, error)
- func (dsrc *ReliableClient) IsLimitReachedLastFlush() bool
- func (dsrc *ReliableClient) IsStandBy() bool
- func (dsrc *ReliableClient) OnlyInMemory() bool
- func (dsrc *ReliableClient) PendingEventsNoConn() int
- func (dsrc *ReliableClient) SendAsync(m string) string
- func (dsrc *ReliableClient) SendWTagAndCompressorAsync(t string, m string, c *compressor.Compressor) string
- func (dsrc *ReliableClient) SendWTagAsync(t, m string) string
- func (dsrc *ReliableClient) StandBy() error
- func (dsrc *ReliableClient) Stats() status.Stats
- func (dsrc *ReliableClient) String() string
- func (dsrc *ReliableClient) WakeUp() error
- type ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) AppLogger(lg applogger.SimpleAppLogger) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) Build() (*ReliableClient, error)
- func (dsrcb *ReliableClientBuilder) ClientBuilder(cb *ClientBuilder) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) ClientReconnDaemonInitDelay(d time.Duration) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) ClientReconnDaemonWaitBtwChecks(d time.Duration) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) DaemonStopTimeout(d time.Duration) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) EnableStandByModeTimeout(d time.Duration) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) FlushTimeout(d time.Duration) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) HouseKeepingDaemonInitDelay(d time.Duration) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) HouseKeepingDaemonWaitBtwChecks(d time.Duration) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) MaxRecordsResendByFlush(max int) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) RetryDaemonInitDelay(d time.Duration) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) RetryDaemonWaitBtwChecks(d time.Duration) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) StatusBuilder(sb status.Builder) *ReliableClientBuilder
- type SwitchDevoSender
Examples ¶
- LazyClient
- LazyClient.IsLimitReachedLastFlush
- LazyClient.OnlyInMemory
- LazyClient.PendingEventsNoConn
- LazyClient.SendAsync
- LazyClient.StandBy
- LazyClientBuilder
- LazyClientBuilder (InitErrors)
- NewLazyClientBuilder
- ReliableClient
- ReliableClient (AppLoggerError)
- ReliableClient (As_SwitchDevoSender)
- ReliableClient (Dropped)
- ReliableClient (Evicted)
- ReliableClient (ExtendedStats)
- ReliableClient (NilInnerClient)
- ReliableClient (StandbyAndWakeUp)
- ReliableClient (WithoutConnection)
- ReliableClient.IsLimitReachedLastFlush
- ReliableClient.OnlyInMemory
- ReliableClient.PendingEventsNoConn
- ReliableClientBuilder (InitErrors)
Constants ¶
const ( // DevoCentralRelayUS is the public entrypoint of Devo central-relay on USA site DevoCentralRelayUS = "tcp://us.elb.relay.logtrust.net:443" // DevoCentralRelayEU is the public entrypoint of Devo central-relay on Europe site DevoCentralRelayEU = "tcp://eu.elb.relay.logtrust.net:443" // DefaultSyslogLevel is the code for facility and level used at raw syslog protocol. <14> = facility:user and level:info DefaultSyslogLevel = "<14>" // ClientBuilderRelayUS select DevoCentralRelayUS in builder ClientBuilderRelayUS ClienBuilderDevoCentralRelay = iota // ClientBuilderRelayEU select DevoCentralRelayEU in builder ClientBuilderRelayEU // ClientBuilderDefaultCompressorMinSize is the default min size of payload to apply compression // Following discussion in https://webmasters.stackexchange.com/questions/31750/what-is-recommended-minimum-object-size-for-gzip-performance-benefits // this value is set by CDN providers as Akamai, other services like goolge are more // aggrsive and set 150 bytes as fringe value. ClientBuilderDefaultCompressorMinSize = 860 )
const ( // LCBDefaultBufferEventsSize is the default BufferSize value set in LazyClientBuilder // when it is created with NewLazyClientBuilder LCBDefaultBufferEventsSize uint32 = 256000 // LCBDefaultFlushTimeout is the default FlushTimeout value set in LazyClientBuilder // when it is created with NewLazyClientBuilder LCBDefaultFlushTimeout = time.Second * 2 )
const ( // DefaultClientReconnDaemonWaitBtwChecks is the default time that client reconnect daemon // must wait between run checks or works DefaultClientReconnDaemonWaitBtwChecks = 10 * time.Second // DefaultClientReconnDaemonInitDelay is the default delay time that client reconnect daemon // must wait before start to work DefaultClientReconnDaemonInitDelay = time.Second * 30 // DefaultDaemonMicroWait is the default micro delay to check in the midle of daemon // sleep time if daemon was marked to be stopped, then interrup sleep operation DefaultDaemonMicroWait = time.Millisecond * 200 // DefaultEnableStandByModeTimeout is the Default timeout to wait for all pending // async messages managed byclient when StandBy func is called . If timeout is // reached then error will be send DefaultEnableStandByModeTimeout = time.Second // DefaultDaemonStopTimeout is the default timeout to wait when stopping (Close) each daemon DefaultDaemonStopTimeout = time.Second * 2 // DefaultFlushAsyncTimeout is the Default timeout to wait for all pending async // messages managed by client when Flush func is called. If timeout is reached // then error will be send DefaultFlushAsyncTimeout = time.Millisecond * 500 // DefaultHouseKeepingDaemonWaitBtwChecks is the default time that status housekeeping daemon must wait // between run checks or do any action DefaultHouseKeepingDaemonWaitBtwChecks = time.Minute * 5 // DefaultHouseKeepingDaemonInitDelay is the default delay time that status housekeeping daemon // must wait before start to work DefaultHouseKeepingDaemonInitDelay = time.Second * 5 // DefaultResendEventsDaemonWaitBtwChecks is the default time that resend pending events daemon must wait // between run checks or do any action DefaultResendEventsDaemonWaitBtwChecks = time.Second * 30 // DefaultResendEventsDaemonInitDelay is the default delay time that resend pending events daemon // must wait before start to work DefaultResendEventsDaemonInitDelay = time.Second * 20 // DefaultMaxRecordsResendByFlush is the default value for the max numbers of pending events to resend // when Flush operation is called DefaultMaxRecordsResendByFlush = 1000 )
Variables ¶
var ( // ErrBufferOverflow is the error returned when buffer is full and element was lost ErrBufferOverflow = errors.New("overwriting item(s) because buffer is full") //ErrBufferFull is the error returned when buffer is full and any other action taken ErrBufferFull = errors.New("buffer is full") )
var ErrNilPointerReceiver = errors.New("receiver func call with nil pointer")
ErrNilPointerReceiver is the error returned when received funcs are call over nil pointer
var ErrPayloadNoDefined = errors.New("payload to check connection is not defined")
ErrPayloadNoDefined is the error returned when payload is required by was not defined
var ErrWaitAsyncTimeout = errors.New("timeout while wait for pending items")
ErrWaitAsyncTimeout is the error returned while timeout is reached in "WaitFor" functions
var ErrorTagEmpty error = errors.New("tag can not be empty")
ErrorTagEmpty is returneed when Devo tag is empty string
Functions ¶
This section is empty.
Types ¶
type ClienBuilderDevoCentralRelay ¶
type ClienBuilderDevoCentralRelay int
ClienBuilderDevoCentralRelay is the type used to set Devo central relay as entrypoint
func ParseDevoCentralEntrySite ¶
func ParseDevoCentralEntrySite(s string) (ClienBuilderDevoCentralRelay, error)
ParseDevoCentralEntrySite returns ClientBuilderDevoCentralRelay based on site code. valid codes are 'US' and 'EU'
type Client ¶
Client is the engine that can send data to Devo throug central (tls) or in-house (clean) realy
func (*Client) AddReplaceSequences ¶
AddReplaceSequences is helper function to add elements to Client.ReplaceSequences old is the string to search in message and new is the replacement string. Replacement will be done using strings.ReplaceAll
func (*Client) AreAsyncOps ¶
AreAsyncOps returns true is there is any Async operation running
func (*Client) AsyncError ¶ added in v1.0.0
AsyncError return true and last error detected fo ID if error was captured or false, nil in other case One special case is when dsc Pointer is nil, that this method returns (false, ErrNilPointerReceiver) AsyncError is thread-safe mode
func (*Client) AsyncErrors ¶
AsyncErrors return errors from async calls collected until now. WARNING that map returned IS NOT thread safe.
func (*Client) AsyncErrorsIds ¶ added in v1.0.0
AsyncErrorsIds returns the request IDs with error registered. This method is different from AsyncErrors because is thread safe at cost of performance.
func (*Client) AsyncErrorsNumber ¶
AsyncErrorsNumber return then number of errors from async calls collected until now
func (*Client) AsyncsNumber ¶
AsyncsNumber return the number of async operations pending. This is more optimal that call len(dsc.AsyncIds())
func (*Client) Close ¶
Close is the method to close all interanl elements like connection that should be closed at end
func (*Client) GetEntryPoint ¶
GetEntryPoint return entrypoint used by client
func (*Client) IsAsyncActive ¶
IsAsyncActive returns true if id is present in AsyncIds(). This function is more optimal that look into result of AsyncIds
func (*Client) IsConnWorking ¶ added in v1.0.0
IsConnWorking check if connection is opened and make a test writing data to ensure that is working If payload to check is not defined (ClientBuilder.IsConnWorkingCheckPayload) then ErrPayloadNoDefined will be returned
func (*Client) LastSendCallTimestamp ¶
LastSendCallTimestamp returns the timestamp of last time that any of SendXXXX func was called with valid parameters If Client is nil default time.Time value will be returned
func (*Client) PurgeAsyncErrors ¶
func (dsc *Client) PurgeAsyncErrors()
PurgeAsyncErrors cleans internal AsyncErrors captured until now
func (*Client) Send ¶
Send func send message using default tag (SetDefaultTag). Meessage will be transformed before send, using ReplaceAll with values from Client.ReplaceSequences
func (*Client) SendAsync ¶
SendAsync is similar to Send but send events in async way (goroutine). Empty string is returned in Client is nil
func (*Client) SendWTagAndCompressor ¶
func (dsc *Client) SendWTagAndCompressor(t, m string, c *compressor.Compressor) error
SendWTagAndCompressor is similar to SendWTag but using a specific Compressor. This can be usefull, for example, to force disable compression for one message using Client.SendWTagAndCompressor(t, m, nil)
func (*Client) SendWTagAndCompressorAsync ¶
func (dsc *Client) SendWTagAndCompressorAsync(t, m string, c *compressor.Compressor) string
SendWTagAndCompressorAsync is similar to SendWTagAsync but send events with specific compressor. This can be usefull, for example, to force disable compression for one message using Client.SendWTagAndCompressorAsync(t, m, nil) Empty string is returned in Client is nil
func (*Client) SendWTagAsync ¶
SendWTagAsync is similar to SendWTag but send events in async way (goroutine). Empty string is returned in Client is nil
func (*Client) SetDefaultTag ¶
SetDefaultTag set tag used when call funcs to send messages without splicit tag
func (*Client) SetSyslogHostName ¶
SetSyslogHostName overwrite hostname send in raw Syslog payload
func (*Client) WaitForPendingAsyncMessages ¶
WaitForPendingAsyncMessages wait for all Async messages that are pending to send
func (*Client) WaitForPendingAsyncMsgsOrTimeout ¶
WaitForPendingAsyncMsgsOrTimeout is similar to WaitForPendingAsyncMessages but return ErrWaitAsyncTimeout error if timeout is reached
type ClientBuilder ¶
type ClientBuilder struct {
// contains filtered or unexported fields
}
ClientBuilder defines builder for easy DevoSender instantiation
func NewClientBuilder ¶
func NewClientBuilder() *ClientBuilder
NewClientBuilder returns new DevoSenderBuilder
func (*ClientBuilder) Build ¶
func (dsb *ClientBuilder) Build() (*Client, error)
Build implements build method of builder returning Client instance.
func (*ClientBuilder) CompressorMinSize ¶
func (dsb *ClientBuilder) CompressorMinSize(s int) *ClientBuilder
CompressorMinSize set the minium size to be applied when compress data.
func (*ClientBuilder) ConnectionExpiration ¶
func (dsb *ClientBuilder) ConnectionExpiration(t time.Duration) *ClientBuilder
ConnectionExpiration set expiration time used to recreate connection from last time was used
func (*ClientBuilder) DefaultCompressor ¶
func (dsb *ClientBuilder) DefaultCompressor(c compressor.CompressorAlgorithm) *ClientBuilder
DefaultCompressor set and enable ompression when send messages
func (*ClientBuilder) DefaultDevoTag ¶
func (dsb *ClientBuilder) DefaultDevoTag(t string) *ClientBuilder
DefaultDevoTag set the default tag to be used when send data to Devo in target client
func (*ClientBuilder) DevoCentralEntryPoint ¶
func (dsb *ClientBuilder) DevoCentralEntryPoint(relay ClienBuilderDevoCentralRelay) *ClientBuilder
DevoCentralEntryPoint Set One of the available Devo cental relays. This value overwrite (and is overwritten) by EntryPoint
func (*ClientBuilder) EntryPoint ¶
func (dsb *ClientBuilder) EntryPoint(entrypoint string) *ClientBuilder
EntryPoint sets entrypoint in builder used to create Client This value overwrite (and is overwritten) by DevoCentralEntryPoint
func (*ClientBuilder) IsConnWorkingCheckPayload ¶ added in v1.0.0
func (dsb *ClientBuilder) IsConnWorkingCheckPayload(s string) *ClientBuilder
IsConnWorkingCheckPayload sets the payload of the raw message that will be sent (Write) to check conection during IsConnWorking call Empty string implies that IsConnWorking will return an error. The payload size must be less that 4 characters Recommended value for this payload when you like to enable this check is zero-character: "\x00"
func (*ClientBuilder) TCPKeepAlive ¶
func (dsb *ClientBuilder) TCPKeepAlive(t time.Duration) *ClientBuilder
TCPKeepAlive allow to set KeepAlive value configured in net.Dialer
func (*ClientBuilder) TCPTimeout ¶
func (dsb *ClientBuilder) TCPTimeout(t time.Duration) *ClientBuilder
TCPTimeout allow to set Timeout value configured in net.Dialer
func (*ClientBuilder) TLSCerts ¶
func (dsb *ClientBuilder) TLSCerts(key []byte, cert []byte, chain []byte) *ClientBuilder
TLSCerts sets keys and certs used to make Client using TLS connection Call to this method overwrite TLSFiles
func (*ClientBuilder) TLSFiles ¶
func (dsb *ClientBuilder) TLSFiles(keyFileName string, certFileName string, chainFileName *string) *ClientBuilder
TLSFiles sets keys and certs from files used to make Client using TLS connection TLSCerts overwrites calls to this method
func (*ClientBuilder) TLSInsecureSkipVerify ¶
func (dsb *ClientBuilder) TLSInsecureSkipVerify(insecureSkipVerify bool) *ClientBuilder
TLSInsecureSkipVerify sets InsecureSkipFlag, this value is used only with TLS connetions
func (*ClientBuilder) TLSRenegotiation ¶
func (dsb *ClientBuilder) TLSRenegotiation(renegotiation tls.RenegotiationSupport) *ClientBuilder
TLSRenegotiation sets tlsRenegotiation support, this value is used only with TLS connections
type DevoSender ¶
type DevoSender interface { io.WriteCloser Send(m string) error SetDefaultTag(t string) error SendWTag(t, m string) error SendAsync(m string) string SendWTagAsync(t, m string) string WaitForPendingAsyncMessages() error AsyncErrors() map[string]error AsyncErrorsNumber() int PurgeAsyncErrors() GetEntryPoint() string AreAsyncOps() bool AsyncIds() []string IsAsyncActive(id string) bool AsyncsNumber() int LastSendCallTimestamp() time.Time String() string }
DevoSender interface define the minimum behaviour required for Send data to Devo
func NewDevoSender ¶
func NewDevoSender(entrypoint string) (DevoSender, error)
NewDevoSender Create new DevoSender with clean comunication using ClientBuilder entrypoint is the Devo entrypoint where send events with protocol://fqdn:port format. You can use DevoCentralRelayXX constants to easy assign these value
func NewDevoSenderTLS ¶
NewDevoSenderTLS create TLS connection using ClientBuiler with minimal configuration
func NewDevoSenderTLSFiles ¶
func NewDevoSenderTLSFiles(entrypoint string, keyFileName string, certFileName string, chainFileName *string) (DevoSender, error)
NewDevoSenderTLSFiles is similar to NewDevoSenderTLS but loading different certificates from files
type LazyClient ¶
type LazyClient struct { *Client Stats LazyClientStats // contains filtered or unexported fields }
LazyClient is a SwitchDevoSender that save events in a buffer when it is in "stand by" mode. Events are saved in a circular buffer, and when limit of buffer size is reached, new arrived events are saved at the begining of the buffer. when WakeUp is called, all events in buffer as send using Client.Async funcs in starting for first element in slide. This implies that arrived event order can not be maintained if buffer suffered an "oversize" situation.
Example ¶
lc, err := NewLazyClientBuilder(). ClientBuilder( NewClientBuilder().EntryPoint("udp://localhost:13000")). // udp protocol never return error Build() if err != nil { panic(err) } fmt.Println("LazyClient", lc.String()[:146]) // send messages in connected mode err = lc.SendWTag("test.keep.free", "message 1") // No error because client is connected if err != nil { panic(err) } for i := 2; i <= 10; i++ { id := lc.SendWTagAsync( "test.keep.free", fmt.Sprintf("message %d", i), ) fmt.Printf("ID of msg %d has non-conn- prefix: %v\n", i, strings.HasPrefix(id, "non-conn-")) } fmt.Println("Stats", lc.Stats) // WakeUp should not have any effect err = lc.WakeUp() if err != nil { panic(err) } fmt.Println("Stats (after WakeUp)", lc.Stats) fmt.Println("LazyClient (after WakeUp)", lc.String()[0:146]) fmt.Println("SwitchDevoSender.LastSendCallTimestamp (after WakeUp) is empty", lc.LastSendCallTimestamp() == time.Time{}) var sender SwitchDevoSender = lc sender.Close() fmt.Println("LazyClient as SwitchDevoSender closed", sender.String()) id := sender.SendWTagAsync("test.keep.free", "message after close is the same as after StandBy") fmt.Printf("ID has non-conn- prefix: %v\n", strings.HasPrefix(id, "non-conn-")) fmt.Println("SwitchDevoSender (pending events after close)", sender.String()) fmt.Println("Stats (pending events after close)", lc.Stats) fmt.Println("SwitchDevoSender.LastSendCallTimestamp (pending events after close) is empty", lc.LastSendCallTimestamp() == time.Time{}) sender.Close() fmt.Println("SwitchDevoSender (after last close)", sender.String()) fmt.Println("Stats (after last close)", lc.Stats)
Output: LazyClient bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:13000' ID of msg 2 has non-conn- prefix: false ID of msg 3 has non-conn- prefix: false ID of msg 4 has non-conn- prefix: false ID of msg 5 has non-conn- prefix: false ID of msg 6 has non-conn- prefix: false ID of msg 7 has non-conn- prefix: false ID of msg 8 has non-conn- prefix: false ID of msg 9 has non-conn- prefix: false ID of msg 10 has non-conn- prefix: false Stats AsyncEvents: 9, TotalBuffered: 0, BufferedLost: 0, SendFromBuffer: 0 BufferCount: 0 Stats (after WakeUp) AsyncEvents: 9, TotalBuffered: 0, BufferedLost: 0, SendFromBuffer: 0 BufferCount: 0 LazyClient (after WakeUp) bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:13000' SwitchDevoSender.LastSendCallTimestamp (after WakeUp) is empty false LazyClient as SwitchDevoSender closed bufferSize: 256000, standByMode: true, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {<nil>} ID has non-conn- prefix: true SwitchDevoSender (pending events after close) bufferSize: 256000, standByMode: true, #eventsInBuffer: 1, flushTimeout: 2s, standByModeTimeout: 0s, Client: {<nil>} Stats (pending events after close) AsyncEvents: 10, TotalBuffered: 1, BufferedLost: 0, SendFromBuffer: 0 BufferCount: 1 SwitchDevoSender.LastSendCallTimestamp (pending events after close) is empty true SwitchDevoSender (after last close) bufferSize: 256000, standByMode: true, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {<nil>} Stats (after last close) AsyncEvents: 10, TotalBuffered: 1, BufferedLost: 0, SendFromBuffer: 1 BufferCount: 0
func (*LazyClient) Close ¶
func (lc *LazyClient) Close() error
Close send all events forcing WakeUp if it is in stand-by mode and then close the client
func (*LazyClient) Flush ¶
func (lc *LazyClient) Flush() error
Flush send all events in buffer only if client is not in stand by mode (IsStandBy == false)
func (*LazyClient) IsLimitReachedLastFlush ¶ added in v1.0.0
func (lc *LazyClient) IsLimitReachedLastFlush() bool
IsLimitReachedLastFlush treturs true if there are pending events after flush because max limit was reached, or false in the other case.
Example ¶
lc, err := NewLazyClientBuilder(). ClientBuilder( NewClientBuilder(). EntryPoint("udp://localhost:13000"). // udp does not return error DefaultDevoTag("test.keep.free")). MaxRecordsResendByFlush(1). Build() if err != nil { panic(err) } // Pass to standby mode to queue events err = lc.StandBy() if err != nil { panic(err) } // Send events lc.SendWTagAsync("test.keep.free", "event 1") lc.SendWTagAsync("test.keep.free", "event 2") fmt.Println("IsLimitReachedLastFlush after send events", lc.IsLimitReachedLastFlush()) // Wake up and err = lc.WakeUp() if err != nil { panic(err) } fmt.Println("IsLimitReachedLastFlush after Wakeup (Flush implicit)", lc.IsLimitReachedLastFlush()) //Flush one remaining event => limit reached too err = lc.Flush() if err != nil { panic(err) } fmt.Println("IsLimitReachedLastFlush after flush 1", lc.IsLimitReachedLastFlush()) err = lc.Flush() if err != nil { panic(err) } fmt.Println("IsLimitReachedLastFlush after flush 2", lc.IsLimitReachedLastFlush())
Output: IsLimitReachedLastFlush after send events false IsLimitReachedLastFlush after Wakeup (Flush implicit) true IsLimitReachedLastFlush after flush 1 true IsLimitReachedLastFlush after flush 2 false
func (*LazyClient) IsStandBy ¶
func (lc *LazyClient) IsStandBy() bool
IsStandBy returns true if client is in STandBy mode or false in otherwise
func (*LazyClient) OnlyInMemory ¶ added in v1.0.0
func (lc *LazyClient) OnlyInMemory() bool
OnlyInMemory is the implementation of SwitchDevoSender.OnlyInMemory. Allways return true
Example ¶
lc, err := NewLazyClientBuilder(). ClientBuilder( NewClientBuilder(). EntryPoint("udp://localhost:13000")). // udp does not return error Build() if err != nil { panic(err) } fmt.Printf("OnlyInMemory: %v\n", lc.OnlyInMemory())
Output: OnlyInMemory: true
func (*LazyClient) PendingEventsNoConn ¶ added in v1.0.0
func (lc *LazyClient) PendingEventsNoConn() int
PendingEventsNoConn return the number of events that are pending to send and was created when the connection does not was available
Example ¶
lc, err := NewLazyClientBuilder(). ClientBuilder( NewClientBuilder(). EntryPoint("udp://localhost:13000"). // udp does not return error DefaultDevoTag("test.keep.free")). Build() if err != nil { panic(err) } // Pass to standby mode to queue events err = lc.StandBy() if err != nil { panic(err) } // Send events lc.SendWTagAsync("test.keep.free", "event 1") lc.SendWTagAsync("test.keep.free", "event 2") fmt.Println("PendingEventsNoConn after StandBy", lc.PendingEventsNoConn()) // Wake up and err = lc.WakeUp() if err != nil { panic(err) } fmt.Println("PendingEventsNoConn after Wakeup (Flush implicit)", lc.PendingEventsNoConn())
Output: PendingEventsNoConn after StandBy 2 PendingEventsNoConn after Wakeup (Flush implicit) 0
func (*LazyClient) SendAsync ¶
func (lc *LazyClient) SendAsync(m string) string
SendAsync is the same as Client.SendAsync but if the Lazy Client is in stand-by mode then the event is saved in buffer
Example ¶
lc, err := NewLazyClientBuilder(). ClientBuilder( NewClientBuilder().EntryPoint("udp://localhost:13000")). // udp protocol never return error Build() if err != nil { panic(err) } fmt.Println("LazyClient", lc.String()[0:143]) // send messages in connected mode id := lc.SendAsync("message 1") // Empty default tag implies error // Wait until id was processed for lc.IsAsyncActive(id) { time.Sleep(time.Millisecond * 100) } fmt.Printf("AsyncErrors associated with first id: %v\n", lc.AsyncErrors()[id]) lc.SetDefaultTag("test.keep.free") id2 := lc.SendAsync("message 2") fmt.Printf( "Second msg id is equal to first id: %v, len(AsyncErrors): %d, AsyncErrors associated with first id: %v\n", id == id2, len(lc.AsyncErrors()), lc.AsyncErrors()[id], ) fmt.Println("Stats", lc.Stats) fmt.Println("LazyClient (after events)", lc.String()[0:143]) err = lc.Close() if err != nil { panic(err) } fmt.Println("Stats (after close)", lc.Stats) fmt.Println("LazyClient (after close)", lc)
Output: LazyClient bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:130 AsyncErrors associated with first id: tag can not be empty Second msg id is equal to first id: false, len(AsyncErrors): 1, AsyncErrors associated with first id: tag can not be empty Stats AsyncEvents: 2, TotalBuffered: 0, BufferedLost: 0, SendFromBuffer: 0 BufferCount: 0 LazyClient (after events) bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:130 Stats (after close) AsyncEvents: 2, TotalBuffered: 0, BufferedLost: 0, SendFromBuffer: 0 BufferCount: 0 LazyClient (after close) bufferSize: 256000, standByMode: true, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {<nil>}
func (*LazyClient) SendWTagAndCompressorAsync ¶
func (lc *LazyClient) SendWTagAndCompressorAsync(t, m string, c *compressor.Compressor) string
SendWTagAndCompressorAsync is the same as Client.SendWTagAndCompressorAsync but if the Lazy Client is in stand-by mode then the event is saved in buffer
func (*LazyClient) SendWTagAsync ¶
func (lc *LazyClient) SendWTagAsync(t, m string) string
SendWTagAsync is the same as Client.SendWTagAsync but if the Lazy Client is in stand-by mode then the event is saved in buffer
func (*LazyClient) StandBy ¶
func (lc *LazyClient) StandBy() error
StandBy closes connection to Devo and pass the Client to admit Async send events calls only These events will be saved in a buffer. Client will wait for Pending events before to pass to StandBy, if timeout is triggered during this operation, error will be returned and stand by mode will not be active
Example ¶
lc, err := NewLazyClientBuilder(). ClientBuilder( // udp protocol never return error NewClientBuilder().EntryPoint("udp://localhost:13000"), ). BufferSize(2). // very small buffers of two events only EnableStandByModeTimeout(time.Second). // Set to 0 to wait for ever for async events when pass to stand by mode MaxRecordsResendByFlush(10). // Max records can be set too Build() if err != nil { panic(err) } fmt.Println("LazyClient", lc.String()[:141]) // Pass to stand by mode err = lc.StandBy() if err != nil { panic(err) } fmt.Println("IsStandBy", lc.IsStandBy()) // send messages in stand by mode err = lc.SendWTag("test.keep.free", "message 1") // Should return error because client is in standby mode fmt.Println("SendWTag error", err) id := lc.SendWTagAsync("test.keep.free", "message 2") fmt.Println("ID has non-conn- prefix", strings.HasPrefix(id, "non-conn-")) fmt.Println("Stats", lc.Stats) lc.SendWTagAsync("test.keep.free", "message 3") lc.SendWTagAsync("test.keep.free", "message 4") fmt.Println("Stats", lc.Stats) fmt.Println("LazyClient", lc) err = lc.WakeUp() if err != nil { panic(err) } fmt.Println("Stats after WakeUp", lc.Stats) fmt.Println("LazyClient after WakeUp", lc.String()[0:141]) var sender SwitchDevoSender = lc sender.Close() fmt.Println("LazyClient as SwitchDevoSender closed", sender.String())
Output: LazyClient bufferSize: 2, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 1s, Client: {entryPoint: 'udp://localhost:13000' IsStandBy true SendWTag error receiver func call with nil pointer ID has non-conn- prefix true Stats AsyncEvents: 1, TotalBuffered: 1, BufferedLost: 0, SendFromBuffer: 0 BufferCount: 1 Stats AsyncEvents: 3, TotalBuffered: 3, BufferedLost: 1, SendFromBuffer: 0 BufferCount: 2 LazyClient bufferSize: 2, standByMode: true, #eventsInBuffer: 2, flushTimeout: 2s, standByModeTimeout: 1s, Client: {<nil>} Stats after WakeUp AsyncEvents: 3, TotalBuffered: 3, BufferedLost: 1, SendFromBuffer: 2 BufferCount: 0 LazyClient after WakeUp bufferSize: 2, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 1s, Client: {entryPoint: 'udp://localhost:13000' LazyClient as SwitchDevoSender closed bufferSize: 2, standByMode: true, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 1s, Client: {<nil>}
func (*LazyClient) String ¶
func (lc *LazyClient) String() string
func (*LazyClient) WakeUp ¶
func (lc *LazyClient) WakeUp() error
WakeUp leave stand by mode creating new connection to Devo. Any action will be done if Client is not in stand-by mode of if connection is not working. ClientBuilder.IsConnWorkingCheckPayload must be set with a valid value ("\x00") as example to allow check if connection is working.
type LazyClientBuilder ¶
type LazyClientBuilder struct {
// contains filtered or unexported fields
}
LazyClientBuilder is the builder to build LazyClient
Example ¶
lcb := &LazyClientBuilder{} lc, err := lcb. ClientBuilder( NewClientBuilder().EntryPoint("udp://localhost:13000")). BufferSize(256). FlushTimeout(time.Second). Build() lc, err = lcb.Build() // Only print first n characters to easy test with output lcStr := lc.String()[:143] fmt.Println("error", err, "- LazyClient", lcStr, "...")
Output: error <nil> - LazyClient bufferSize: 256, standByMode: false, #eventsInBuffer: 0, flushTimeout: 1s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:13000' ...
Example (InitErrors) ¶
lcb := &LazyClientBuilder{} lc, err := lcb.Build() fmt.Println("1: error", err, "- LazyClient", lc) lcb.ClientBuilder(NewClientBuilder()) lc, err = lcb.Build() fmt.Println("2: error", err, "- LazyClient", lc) lcb.BufferSize(123) lc, err = lcb.Build() fmt.Println("3: error", err, "- LazyClient", lc) lcb.FlushTimeout(time.Second) lc, err = lcb.Build() fmt.Println("4: error", err, "- LazyClient", lc)
Output: 1: error undefined inner client builder - LazyClient <nil> 2: error buffer size less than 1 - LazyClient <nil> 3: error flush timeout empty or negative - LazyClient <nil> 4: error while initialize client: while create new DevoSender (Clear): entrypoint can not be empty - LazyClient <nil>
func NewLazyClientBuilder ¶
func NewLazyClientBuilder() *LazyClientBuilder
NewLazyClientBuilder is the factory method of LazyClientBuilder with defautl values set
Example ¶
lc, err := NewLazyClientBuilder().Build() fmt.Println("1: error", err, "- LazyClient", lc) lc, err = NewLazyClientBuilder(). ClientBuilder( NewClientBuilder().EntryPoint("udp://localhost:13000")). Build() // Only print first n characters to easy test with output lcStr := lc.String()[:146] fmt.Println("2: error", err, "- LazyClient", lcStr, "...")
Output: 1: error undefined inner client builder - LazyClient <nil> 2: error <nil> - LazyClient bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:13000' ...
func (*LazyClientBuilder) AppLogger ¶
func (lcb *LazyClientBuilder) AppLogger(log applogger.SimpleAppLogger) *LazyClientBuilder
AppLogger sets the applogger.SimpleAppLogger used to write log messages
func (*LazyClientBuilder) BufferSize ¶
func (lcb *LazyClientBuilder) BufferSize(s uint32) *LazyClientBuilder
BufferSize sets the buffer size if s is greater than 0
func (*LazyClientBuilder) Build ¶
func (lcb *LazyClientBuilder) Build() (*LazyClient, error)
Build creates new LazyClient instance
func (*LazyClientBuilder) ClientBuilder ¶
func (lcb *LazyClientBuilder) ClientBuilder(cb *ClientBuilder) *LazyClientBuilder
ClientBuilder sets the ClientBuilder used to re-create connections after StandyBy - WakeUp situation
func (*LazyClientBuilder) EnableStandByModeTimeout ¶
func (lcb *LazyClientBuilder) EnableStandByModeTimeout(d time.Duration) *LazyClientBuilder
EnableStandByModeTimeout sets and enable if value is greater than 0, the timeout to wait for pending async events in client when StandBy() func is called
func (*LazyClientBuilder) FlushTimeout ¶
func (lcb *LazyClientBuilder) FlushTimeout(d time.Duration) *LazyClientBuilder
FlushTimeout sets the timeout when wait for pending async envents in client when Flush() func is called. Timeout is set only if parameter is greater than 0
func (*LazyClientBuilder) MaxRecordsResendByFlush ¶ added in v1.0.0
func (lcb *LazyClientBuilder) MaxRecordsResendByFlush(max int) *LazyClientBuilder
MaxRecordsResendByFlush sets the max number of pending events to be resend when Flush events is called. Zero or negative values deactivate the functionallity
type LazyClientStats ¶
type LazyClientStats struct { AsyncEvents uint BufferedLost uint TotalBuffered uint SendFromBuffer uint BufferCount int }
LazyClientStats is the metrics storage for LazyClient
func (LazyClientStats) String ¶
func (lcs LazyClientStats) String() string
type ReliableClient ¶
type ReliableClient struct { *Client // contains filtered or unexported fields }
ReliableClient defines a Client with Reliable capatilities for Async operations only
Example ¶
// Ensure path is clean os.RemoveAll("/tmp/test") rc, err := NewReliableClientBuilder(). StatusBuilder( status.NewNutsDBStatusBuilder().DbPath("/tmp/test")). ClientBuilder( NewClientBuilder(). EntryPoint("udp://example.com:80"), ).Build() fmt.Println("error", err) fmt.Println("rc.GetEntryPoint", rc.GetEntryPoint()) fmt.Println("rc.IsStandBy", rc.IsStandBy()) rc.SendWTagAsync("tag", fmt.Sprintf("async msg at %s", time.Now())) fmt.Printf("rc.Stats %+v\n", rc.Stats()) err = rc.Flush() fmt.Printf("error flush: %+v\n", err) fmt.Printf("rc.Stats after flush %+v\n", rc.Stats()) err = rc.Close() fmt.Printf("error close: %+v\n", err)
Output: error <nil> rc.GetEntryPoint udp://example.com:80 rc.IsStandBy false rc.Stats {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:1 DbMaxFileID:0 DbDataEntries:-1} error flush: <nil> rc.Stats after flush {BufferCount:0 Updated:0 Finished:1 Dropped:0 Evicted:0 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:-1} error close: <nil>
Example (AppLoggerError) ¶
buf := &bytes.Buffer{} lg := &applogger.WriterAppLogger{Writer: buf, Level: applogger.ERROR} // Ensure path is clean os.RemoveAll("/tmp/test") rc, err := NewReliableClientBuilder(). StatusBuilder( status.NewNutsDBStatusBuilder().DbPath("/tmp/test")). ClientBuilder( NewClientBuilder(), ). AppLogger(lg). Build() fmt.Println("rc.IsStandBy()", rc.IsStandBy(), "err", err) // close err = rc.Close() fmt.Println("rc.Close", err) rc.SendAsync("test message") rc.SendWTagAsync("tag", "test message") rc.SendWTagAndCompressorAsync("tag", "test message", &compressor.Compressor{Algorithm: compressor.CompressorGzip}) // We hide the ID to easy check the output logString := buf.String() re := regexp.MustCompile(`-\w{8}-\w{4}-\w{4}-\w{4}-\w{12}`) logString = re.ReplaceAllString(logString, "-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX") fmt.Println("Log:") fmt.Println(logString)
Output: rc.IsStandBy() false err <nil> rc.Close <nil> Log: ERROR Uncontrolled error when create status record in SendAsync, ID: non-conn-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX: while update nutsdb: db is closed ERROR Uncontrolled error when create status record in SendWTagAsync, ID: non-conn-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX: while update nutsdb: db is closed ERROR Uncontrolled error when create status record in SendWTagAndCompressorAsync, ID: non-conn-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX: while update nutsdb: db is closed
Example (As_SwitchDevoSender) ¶
// Ensure path is clean os.RemoveAll("/tmp/test-reliable-client") var sender SwitchDevoSender var err error sender, err = NewReliableClientBuilder(). StatusBuilder( status.NewNutsDBStatusBuilder().DbPath("/tmp/test-reliable-client")). ClientBuilder( NewClientBuilder(). EntryPoint("udp://localhost:13000"), ).Build() fmt.Println("error", err) fmt.Println("SwitchDevoSender", sender.String()[0:61]) fmt.Println("rc.IsStandBy", sender.IsStandBy()) sender.SendWTagAsync("tag", fmt.Sprintf("async msg at %s", time.Now())) rc, ok := sender.(*ReliableClient) if ok { fmt.Printf("rc.Stats %+v\n", rc.Stats()) } else { panic(fmt.Sprintf("%v can not be casted to ReliableClient", sender)) } err = sender.Flush() fmt.Printf("error flush: %+v\n", err) fmt.Printf("rc.Stats after flush %+v\n", rc.Stats()) err = sender.Close() fmt.Printf("error close: %+v\n", err) // Ensure path is clean os.RemoveAll("/tmp/test-reliable-client")
Output: error <nil> SwitchDevoSender Client: {entryPoint: 'udp://localhost:13000', syslogHostname: rc.IsStandBy false rc.Stats {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:1 DbMaxFileID:0 DbDataEntries:-1} error flush: <nil> rc.Stats after flush {BufferCount:0 Updated:0 Finished:1 Dropped:0 Evicted:0 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:-1} error close: <nil>
Example (Dropped) ¶
// Ensure path is clean os.RemoveAll("/tmp/test") rc, err := NewReliableClientBuilder(). StatusBuilder( status.NewNutsDBStatusBuilder(). DbPath("/tmp/test"). BufferSize(2)). ClientBuilder( NewClientBuilder().EntryPoint("udp://example.com:80"), ). Build() if err != nil { panic(err) } // Pass to inactive err = rc.StandBy() fmt.Println("StandBy error", err) // Send data on stand by mode rc.SendWTagAsync("tag", fmt.Sprintf("async msg 1 at %s", time.Now())) rc.SendWTagAsync("tag", fmt.Sprintf("async msg 2 at %s", time.Now())) rc.SendWTagAsync("tag", fmt.Sprintf("async msg 3 at %s", time.Now())) rc.SendWTagAsync("tag", fmt.Sprintf("async msg 4 at %s", time.Now())) rc.SendWTagAsync("tag", fmt.Sprintf("async msg 5 at %s", time.Now())) fmt.Printf("rc.Stats %+v\n", rc.Stats()) err = rc.WakeUp() fmt.Println("WakeUp error", err) fmt.Printf("rc.Stats after Wakeup and wait %+v\n", rc.Stats()) // There should be Count: 2 after flush, because we do not have enough time // to daemon to check that records were propertly resend err = rc.Flush() fmt.Println("Flush error", err) fmt.Printf("rc.Stats after Flush and wait %+v\n", rc.Stats()) rc.Close() fmt.Printf("rc.Stats after closed %+v\n", rc.Stats())
Output: StandBy error <nil> rc.Stats {BufferCount:2 Updated:0 Finished:0 Dropped:3 Evicted:0 DbIdxSize:2 DbMaxFileID:0 DbDataEntries:-1} WakeUp error <nil> rc.Stats after Wakeup and wait {BufferCount:2 Updated:0 Finished:0 Dropped:3 Evicted:0 DbIdxSize:2 DbMaxFileID:0 DbDataEntries:-1} Flush error <nil> rc.Stats after Flush and wait {BufferCount:2 Updated:2 Finished:0 Dropped:3 Evicted:0 DbIdxSize:2 DbMaxFileID:0 DbDataEntries:-1} rc.Stats after closed {BufferCount:0 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:0}
Example (Evicted) ¶
// Ensure path is clean os.RemoveAll("/tmp/test") rc, err := NewReliableClientBuilder(). StatusBuilder( status.NewNutsDBStatusBuilder(). DbPath("/tmp/test"). EventsTTLSeconds(1)). ClientBuilder( NewClientBuilder().EntryPoint("udp://localhost:13000"), ). RetryDaemonInitDelay(time.Minute). // Prevents retry daemon to waste CPU RetryDaemonWaitBtwChecks(time.Millisecond * 100). Build() if err != nil { panic(err) } // Pass to inactive err = rc.StandBy() fmt.Println("StandBy error", err) // Send data on stand by mode rc.SendWTagAsync("tag", fmt.Sprintf("async msg at %s", time.Now())) fmt.Printf("rc.Stats %+v\n", rc.Stats()) // Waiting for ensure event was expired time.Sleep(time.Millisecond * 1100) err = rc.WakeUp() fmt.Println("WakeUp error", err) fmt.Printf("rc.Stats after Wakeup and wait %+v\n", rc.Stats()) err = rc.Flush() fmt.Println("Flush error", err) fmt.Printf("rc.Stats after Flush and wait %+v\n", rc.Stats()) rc.Close() fmt.Printf("rc.Stats after closed %+v\n", rc.Stats())
Output: StandBy error <nil> rc.Stats {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:1 DbMaxFileID:0 DbDataEntries:-1} WakeUp error <nil> rc.Stats after Wakeup and wait {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:1 DbMaxFileID:0 DbDataEntries:-1} Flush error <nil> rc.Stats after Flush and wait {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:1 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:-1} rc.Stats after closed {BufferCount:0 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:0}
Example (ExtendedStats) ¶
// Ensure path is clean os.RemoveAll("/tmp/test") // Extended stats are enabled using environment variable. We add here in code but // you can set in your shell session as well err := os.Setenv("DEVOGO_DEBUG_SENDER_STATS_COUNT_DATA", "yes") if err != nil { panic(err) } rc, err := NewReliableClientBuilder(). StatusBuilder( status.NewNutsDBStatusBuilder().DbPath("/tmp/test")). ClientBuilder( NewClientBuilder(). EntryPoint("udp://example.com:80"), ).Build() fmt.Println("error", err) fmt.Println("rc.GetEntryPoint", rc.GetEntryPoint()) fmt.Println("rc.IsStandBy", rc.IsStandBy()) rc.SendWTagAsync("tag", fmt.Sprintf("async msg at %s", time.Now())) fmt.Printf("rc.Stats %+v\n", rc.Stats()) err = rc.Flush() fmt.Printf("error flush: %+v\n", err) fmt.Printf("rc.Stats after flush %+v\n", rc.Stats()) err = rc.Close() fmt.Printf("error close: %+v\n", err)
Output: error <nil> rc.GetEntryPoint udp://example.com:80 rc.IsStandBy false rc.Stats {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:1 DbMaxFileID:0 DbDataEntries:1} error flush: <nil> rc.Stats after flush {BufferCount:0 Updated:0 Finished:1 Dropped:0 Evicted:0 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:0} error close: <nil>
Example (NilInnerClient) ¶
// Ensure path is clean os.RemoveAll("/tmp/test") rc, err := NewReliableClientBuilder(). StatusBuilder( status.NewNutsDBStatusBuilder().DbPath("/tmp/test")). ClientBuilder( NewClientBuilder(), ). Build() // Call all inherit funcs err = rc.AddReplaceSequences("old", "new") fmt.Println("AddReplaceSequences:", err) rc.SetSyslogHostName("hostname") fmt.Println("Nothing to do when call SetSyslogHostName") err = rc.SetDefaultTag("old") fmt.Println("SetDefaultTag:", err) err = rc.Send("msg") fmt.Println("Send:", err) err = rc.SendWTag("tag", "msg") fmt.Println("SendWTag:", err) s := rc.SendAsync("msg") fmt.Printf("SendAsync returns not empty id: '%v'\n", s != "") s = rc.SendWTagAsync("tag", "msg") fmt.Printf("SendWTagAsync returns not empty id: '%v'\n", s != "") s = rc.SendWTagAndCompressorAsync("tag", "msg", nil) fmt.Printf("SendWTagAndCompressorAsync returns not empty id: '%v'\n", s != "") err = rc.WaitForPendingAsyncMessages() fmt.Println("WaitForPendingAsyncMessages:", err) err = rc.WaitForPendingAsyncMsgsOrTimeout(0) fmt.Println("WaitForPendingAsyncMsgsOrTimeout:", err) m := rc.AsyncErrors() fmt.Println("AsyncErrors:", m) i := rc.AsyncErrorsNumber() fmt.Printf("AsyncErrorsNumber: '%d'\n", i) s = rc.GetEntryPoint() fmt.Printf("GetEntryPoint returns: '%s'\n", s) ss := rc.AsyncIds() fmt.Printf("AsyncIds returns nil: '%v'\n", ss == nil) b := rc.IsAsyncActive("") fmt.Printf("IsAsyncActive returns: %v\n", b) i = rc.AsyncsNumber() fmt.Printf("AsyncsNumber returns: '%d'\n", i) t := rc.LastSendCallTimestamp() fmt.Printf("LastSendCallTimestamp returns: '%v'\n", t) i, err = rc.Write([]byte{}) fmt.Println("Write i:", i, "err:", err) err = rc.Close() fmt.Println("Close:", err) fmt.Printf("rc.Stats after closed %+v\n", rc.Stats())
Output: AddReplaceSequences: receiver func call with nil pointer Nothing to do when call SetSyslogHostName SetDefaultTag: receiver func call with nil pointer Send: receiver func call with nil pointer SendWTag: receiver func call with nil pointer SendAsync returns not empty id: 'true' SendWTagAsync returns not empty id: 'true' SendWTagAndCompressorAsync returns not empty id: 'true' WaitForPendingAsyncMessages: receiver func call with nil pointer WaitForPendingAsyncMsgsOrTimeout: receiver func call with nil pointer AsyncErrors: map[:receiver func call with nil pointer] AsyncErrorsNumber: '0' GetEntryPoint returns: '' AsyncIds returns nil: 'true' IsAsyncActive returns: false AsyncsNumber returns: '0' LastSendCallTimestamp returns: '0001-01-01 00:00:00 +0000 UTC' Write i: 0 err: receiver func call with nil pointer Close: <nil> rc.Stats after closed {BufferCount:0 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:0}
Example (StandbyAndWakeUp) ¶
// Ensure path is clean os.RemoveAll("/tmp/test") rc, err := NewReliableClientBuilder(). StatusBuilder( status.NewNutsDBStatusBuilder().DbPath("/tmp/test")). ClientBuilder( NewClientBuilder().EntryPoint("udp://example.com:80"), ). RetryDaemonWaitBtwChecks(time.Millisecond * 100). RetryDaemonInitDelay(time.Millisecond * 50). Build() if err != nil { panic(err) } // Pass to inactive err = rc.StandBy() fmt.Println("StandBy error", err) // Send data on stand by mode rc.SendWTagAsync("tag", fmt.Sprintf("async msg at %s", time.Now())) fmt.Printf("rc.Stats %+v\n", rc.Stats()) rc.Flush() fmt.Printf("rc.Stats after flush %+v\n", rc.Stats()) err = rc.WakeUp() fmt.Println("WakeUp error", err) // Waiting for ensure damon is retrying time.Sleep(time.Millisecond * 300) fmt.Printf("rc.Stats after Wakeup and wait %+v\n", rc.Stats()) rc.Close() fmt.Printf("rc.Stats after closed %+v\n", rc.Stats())
Output: StandBy error <nil> rc.Stats {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:1 DbMaxFileID:0 DbDataEntries:-1} rc.Stats after flush {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:1 DbMaxFileID:0 DbDataEntries:-1} WakeUp error <nil> rc.Stats after Wakeup and wait {BufferCount:0 Updated:1 Finished:1 Dropped:0 Evicted:0 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:-1} rc.Stats after closed {BufferCount:0 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:0}
Example (WithoutConnection) ¶
// Ensure path is clean os.RemoveAll("/tmp/test") rc, err := NewReliableClientBuilder(). StatusBuilder( status.NewNutsDBStatusBuilder().DbPath("/tmp/test")). ClientBuilder(NewClientBuilder()). Build() fmt.Println("error", err) fmt.Println("rc.Client", rc.Client) fmt.Println("rc.IsStandBy", rc.IsStandBy()) rc.SendWTagAsync("tag", "async msg") fmt.Printf("rc.Stats %+v\n", rc.Stats()) err = rc.Flush() fmt.Printf("error flush: %+v\n", err) fmt.Printf("rc.Stats after flush %+v\n", rc.Stats()) err = rc.Close() fmt.Printf("error close: %+v\n", err) fmt.Printf("rc.Stats after close %+v\n", rc.Stats())
Output: error <nil> rc.Client <nil> rc.IsStandBy false rc.Stats {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:1 DbMaxFileID:0 DbDataEntries:-1} error flush: <nil> rc.Stats after flush {BufferCount:1 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:1 DbMaxFileID:0 DbDataEntries:-1} error close: <nil> rc.Stats after close {BufferCount:0 Updated:0 Finished:0 Dropped:0 Evicted:0 DbIdxSize:0 DbMaxFileID:0 DbDataEntries:0}
func (*ReliableClient) Close ¶
func (dsrc *ReliableClient) Close() error
Close closes current client. This implies operations like shutdown daemons, call Flush func, etc.
func (*ReliableClient) Flush ¶
func (dsrc *ReliableClient) Flush() error
Flush checks all pending messages (sent with Async funcs), waits for pending async messages and update status of all of them. This func can call on demand but it is called by internal retry send events daemon too
func (*ReliableClient) IsConnWorking ¶ added in v1.0.0
func (dsrc *ReliableClient) IsConnWorking() (bool, error)
IsConnWorking is the same as Client.IsConnWorking but check first if client is in stand by mode
func (*ReliableClient) IsLimitReachedLastFlush ¶ added in v1.0.0
func (dsrc *ReliableClient) IsLimitReachedLastFlush() bool
IsLimitReachedLastFlush treturs true if there are pending events after flush because max limit was reached, or false in the other case.
Example ¶
// Ensure path is clean os.RemoveAll("/tmp/test") rc, err := NewReliableClientBuilder(). StatusBuilder( status.NewNutsDBStatusBuilder().DbPath("/tmp/test")). ClientBuilder( NewClientBuilder(). EntryPoint("udp://localhost:13000"), ). MaxRecordsResendByFlush(1). Build() if err != nil { panic(err) } // Pass to standby mode to queue events err = rc.StandBy() if err != nil { panic(err) } // Send events rc.SendWTagAsync("test.keep.free", "event 1") rc.SendWTagAsync("test.keep.free", "event 2") fmt.Println("IsLimitReachedLastFlush after send events", rc.IsLimitReachedLastFlush()) // Wake up and err = rc.WakeUp() if err != nil { panic(err) } fmt.Println("IsLimitReachedLastFlush after Wakeup (Flush is not implicit)", rc.IsLimitReachedLastFlush()) //Flush err = rc.Flush() if err != nil { panic(err) } fmt.Println("IsLimitReachedLastFlush after flush", rc.IsLimitReachedLastFlush())
Output: IsLimitReachedLastFlush after send events false IsLimitReachedLastFlush after Wakeup (Flush is not implicit) false IsLimitReachedLastFlush after flush true
func (*ReliableClient) IsStandBy ¶
func (dsrc *ReliableClient) IsStandBy() bool
IsStandBy retursn true when client is in StandBy() mode
func (*ReliableClient) OnlyInMemory ¶ added in v1.0.0
func (dsrc *ReliableClient) OnlyInMemory() bool
OnlyInMemory is the implementation of SwitchDevoSender.OnlyInMemory. Allways return true
Example ¶
// Ensure path is clean os.RemoveAll("/tmp/test") rc, err := NewReliableClientBuilder(). StatusBuilder( status.NewNutsDBStatusBuilder().DbPath("/tmp/test")). ClientBuilder( NewClientBuilder(). EntryPoint("udp://localhost:13000"), ). Build() if err != nil { panic(err) } fmt.Printf("OnlyInMemory: %v\n", rc.OnlyInMemory()) // Ensure path is clean os.RemoveAll("/tmp/test")
Output: OnlyInMemory: false
func (*ReliableClient) PendingEventsNoConn ¶ added in v1.0.0
func (dsrc *ReliableClient) PendingEventsNoConn() int
PendingEventsNoConn return the number of events that are pending to send and was created when the connection does not was available. -1 is returned when value could not be solved
Example ¶
// Ensure path is clean os.RemoveAll("/tmp/test") rc, err := NewReliableClientBuilder(). StatusBuilder( status.NewNutsDBStatusBuilder().DbPath("/tmp/test")). ClientBuilder( NewClientBuilder(). EntryPoint("udp://localhost:13000"), ). RetryDaemonInitDelay(50 * time.Millisecond). RetryDaemonInitDelay(75 * time.Millisecond). // Prevents retry daemon send events Build() if err != nil { panic(err) } // Pass to standby mode to queue events err = rc.StandBy() if err != nil { panic(err) } // Send events rc.SendWTagAsync("test.keep.free", "event 1") rc.SendWTagAsync("test.keep.free", "event 2") fmt.Println("PendingEventsNoConn after StandBy", rc.PendingEventsNoConn()) // Wake up and err = rc.WakeUp() if err != nil { panic(err) } time.Sleep(time.Millisecond * 100) // wait time to get resend daemon works fmt.Println("PendingEventsNoConn after Wakeup (Resend daemon flushs data)", rc.PendingEventsNoConn()) err = rc.Close() if err != nil { panic(err) } fmt.Println("PendingEventsNoConn after Close (-1 == error)", rc.PendingEventsNoConn()) // Ensure path is clean os.RemoveAll("/tmp/test")
Output: PendingEventsNoConn after StandBy 2 PendingEventsNoConn after Wakeup (Resend daemon flushs data) 0 PendingEventsNoConn after Close (-1 == error) -1
func (*ReliableClient) SendAsync ¶
func (dsrc *ReliableClient) SendAsync(m string) string
SendAsync sends Async message in same way like Client.SendAsync but saving the message in status until can ensure, at certain level of confiance, that it was sent
func (*ReliableClient) SendWTagAndCompressorAsync ¶
func (dsrc *ReliableClient) SendWTagAndCompressorAsync(t string, m string, c *compressor.Compressor) string
SendWTagAndCompressorAsync sends Async message in same way like Client.SendWTagAndCompressorAsync but saving the message,tag and Compressor in status until can ensure, at certain level of confiance, that it was sent
func (*ReliableClient) SendWTagAsync ¶
func (dsrc *ReliableClient) SendWTagAsync(t, m string) string
SendWTagAsync sends Async message in same way like Client.SendWTagAsync but saving the message and tag in status until can ensure, at certain level of confiance, that it was sent
func (*ReliableClient) StandBy ¶
func (dsrc *ReliableClient) StandBy() error
StandBy put current client in stand by mode closing active connection and saving new incoming events from Async operations in status. Note that after call StandBy, Send Sync operations will return errors.
func (*ReliableClient) Stats ¶
func (dsrc *ReliableClient) Stats() status.Stats
Stats returns the curren stats (session + persisted). Erros when load stas are ignored DbDataEntries and DbKeysSize will be filled only if DEVOGO_DEBUG_SENDER_STATS_COUNT_DATA environment variable is set with "yes" value
func (*ReliableClient) String ¶
func (dsrc *ReliableClient) String() string
func (*ReliableClient) WakeUp ¶
func (dsrc *ReliableClient) WakeUp() error
WakeUp is the inverse oeration to call StandBy
type ReliableClientBuilder ¶
type ReliableClientBuilder struct {
// contains filtered or unexported fields
}
ReliableClientBuilder defines the Builder for build ReliableClient
Example (InitErrors) ¶
rc, err := NewReliableClientBuilder().Build() fmt.Println("1 error", err) fmt.Println("1 rc", rc) // Ensure path is clean os.RemoveAll("/tmp/test") rc, err = NewReliableClientBuilder(). StatusBuilder( status.NewNutsDBStatusBuilder().DbPath("/tmp/test")). Build() fmt.Println("2 error", err) fmt.Println("2 rc", rc) // No path permissions os.RemoveAll("/this-is-not-valid-path") rc, err = NewReliableClientBuilder(). StatusBuilder( status.NewNutsDBStatusBuilder().DbPath( "/this-is-not-valid-path")). ClientBuilder(NewClientBuilder()). Build() fmt.Println("3 error", errors.Unwrap(err)) // Unwrapped to decrese the verbose of the output fmt.Println("3 rc", rc)
Output: 1 error undefined status builder 1 rc <nil> 2 error undefined inner client builder 2 rc <nil> 3 error while open nutsdb: mkdir /this-is-not-valid-path: permission denied 3 rc <nil>
func NewReliableClientBuilder ¶
func NewReliableClientBuilder() *ReliableClientBuilder
NewReliableClientBuilder return ReliableClientBuilder with intialized to default values
func (*ReliableClientBuilder) AppLogger ¶
func (dsrcb *ReliableClientBuilder) AppLogger(lg applogger.SimpleAppLogger) *ReliableClientBuilder
AppLogger sets the AppLogger used to send logger messages in case that errors can not be returned. Debug traces can be saved usint this logger too.
func (*ReliableClientBuilder) Build ¶
func (dsrcb *ReliableClientBuilder) Build() (*ReliableClient, error)
Build builds the ReliableClient based in current parameters.
func (*ReliableClientBuilder) ClientBuilder ¶
func (dsrcb *ReliableClientBuilder) ClientBuilder(cb *ClientBuilder) *ReliableClientBuilder
ClientBuilder sets the ClientBuilder needed to build the underhood client. This is required to initial setup and it is used by reconnect daemon too. If IsConnWorkingCheckPayload is not defined in cb ClientBuilder then \x00 VALUE WILL BE fixed to ensure that ClientReconn daemon can recreate connection after any outage
func (*ReliableClientBuilder) ClientReconnDaemonInitDelay ¶
func (dsrcb *ReliableClientBuilder) ClientReconnDaemonInitDelay(d time.Duration) *ReliableClientBuilder
ClientReconnDaemonInitDelay sets the initial time delay when reconnect daemon is started value is set only if d value is greater than 0
func (*ReliableClientBuilder) ClientReconnDaemonWaitBtwChecks ¶
func (dsrcb *ReliableClientBuilder) ClientReconnDaemonWaitBtwChecks(d time.Duration) *ReliableClientBuilder
ClientReconnDaemonWaitBtwChecks sets the time wait interval between checks for reconnect daemon value is set only if d value is greater than 0
func (*ReliableClientBuilder) DaemonStopTimeout ¶
func (dsrcb *ReliableClientBuilder) DaemonStopTimeout(d time.Duration) *ReliableClientBuilder
DaemonStopTimeout sets the timeout to wait for each daemon when ReliableClient is closed value is set only if d value is greater than 0
func (*ReliableClientBuilder) EnableStandByModeTimeout ¶
func (dsrcb *ReliableClientBuilder) EnableStandByModeTimeout(d time.Duration) *ReliableClientBuilder
EnableStandByModeTimeout sets and enable if value is greter than 0, the timeout to wait for pending async events in client when StandBy() func is called
func (*ReliableClientBuilder) FlushTimeout ¶
func (dsrcb *ReliableClientBuilder) FlushTimeout(d time.Duration) *ReliableClientBuilder
FlushTimeout sets the timeout when wait for pending async envents in clien when Flush() func is called
func (*ReliableClientBuilder) HouseKeepingDaemonInitDelay ¶ added in v1.0.0
func (dsrcb *ReliableClientBuilder) HouseKeepingDaemonInitDelay(d time.Duration) *ReliableClientBuilder
HouseKeepingDaemonInitDelay sets the initial time delay to wait before HouseKeepingDaemon launchs the first execution. Value is set only if d value is greater than 0
func (*ReliableClientBuilder) HouseKeepingDaemonWaitBtwChecks ¶ added in v1.0.0
func (dsrcb *ReliableClientBuilder) HouseKeepingDaemonWaitBtwChecks(d time.Duration) *ReliableClientBuilder
HouseKeepingDaemonWaitBtwChecks sets the interval time waited by daemon between executions of stattus HouseKeeping operations. Value is set only if d value is greater than 0
func (*ReliableClientBuilder) MaxRecordsResendByFlush ¶ added in v1.0.0
func (dsrcb *ReliableClientBuilder) MaxRecordsResendByFlush(max int) *ReliableClientBuilder
MaxRecordsResendByFlush sets the max number of pending events to be resend when Flush events is called. Zero or negative values deactivate the functionallity
func (*ReliableClientBuilder) RetryDaemonInitDelay ¶
func (dsrcb *ReliableClientBuilder) RetryDaemonInitDelay(d time.Duration) *ReliableClientBuilder
RetryDaemonInitDelay sets the initial time delay when retry send events daemon is started value is set only if d value is greater than 0
func (*ReliableClientBuilder) RetryDaemonWaitBtwChecks ¶
func (dsrcb *ReliableClientBuilder) RetryDaemonWaitBtwChecks(d time.Duration) *ReliableClientBuilder
RetryDaemonWaitBtwChecks sets the time wait interval between checks for retry send events daemon value is set only if d value is greater than 0
func (*ReliableClientBuilder) StatusBuilder ¶ added in v1.0.0
func (dsrcb *ReliableClientBuilder) StatusBuilder(sb status.Builder) *ReliableClientBuilder
StatusBuilder sets the required status builder engine to save load ans manage events in a reliable way
type SwitchDevoSender ¶
type SwitchDevoSender interface { io.Closer String() string SendAsync(m string) string SendWTagAsync(t, m string) string SendWTagAndCompressorAsync(t string, m string, c *compressor.Compressor) string WaitForPendingAsyncMsgsOrTimeout(timeout time.Duration) error AsyncErrors() map[string]error AsyncErrorsNumber() int AsyncIds() []string AreAsyncOps() bool IsAsyncActive(id string) bool Flush() error IsLimitReachedLastFlush() bool PendingEventsNoConn() int StandBy() error WakeUp() error IsStandBy() bool LastSendCallTimestamp() time.Time OnlyInMemory() bool }
SwitchDevoSender represents a Client that can be paused. That is that can close connection to Devo and save in a buffer the events that are recieved. When client is waked up pending events are send to Devo.