Documentation ¶
Index ¶
- func GetSinkTarget(target, args string) (io.Writer, error)
- func JSONMarshaler(msg *SinkMessage) ([]byte, error)
- func RegisterMarshaler(marshaler string, factory func(string) (Marshaler, error))
- func RegisterSinkTarget(target string, factory func(string) (io.Writer, error))
- type Ack
- type CSVMarshaler
- type Config
- type Consumer
- type Marshaler
- type MarshalerFunc
- type Option
- type Sink
- type SinkAck
- type SinkConfig
- type SinkMessage
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func JSONMarshaler ¶
func JSONMarshaler(msg *SinkMessage) ([]byte, error)
JSONMarshaler marshal message to json data
func RegisterMarshaler ¶
Types ¶
type Ack ¶
type Ack struct { Session sarama.ConsumerGroupSession Topic string Partition int32 Offset int64 // contains filtered or unexported fields }
func (*Ack) MarkOffset ¶
func (ack *Ack) MarkOffset()
type CSVMarshaler ¶
type CSVMarshaler struct {
// contains filtered or unexported fields
}
CSVMarshaler marshal message to csv text
func NewCSVMarshaler ¶
func NewCSVMarshaler(originalHeaders []string) (*CSVMarshaler, error)
func (*CSVMarshaler) Marshal ¶
func (d *CSVMarshaler) Marshal(msg *SinkMessage) ([]byte, error)
type Config ¶
type Config struct { GroupID string `json:"group_id"` KafkaVersion string `json:"kafka_version"` Addrs string `json:"addrs"` Topics string `json:"topics"` Offset string `json:"offset"` Sink *SinkConfig `json:"sink"` }
var DefaultConfig *Config
func (*Config) GetKafkaVersion ¶
func (cfg *Config) GetKafkaVersion() sarama.KafkaVersion
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func New ¶
Example ¶
package main import ( "context" "fmt" "os" "os/signal" "syscall" "github.com/techxmind/logserver/consumer" ) func main() { ctx := context.Background() cfg := &consumer.Config{ Addrs: "localhost:9092", KafkaVersion: "2.6.0", GroupID: "test", Topics: "event_log", Sink: &consumer.SinkConfig{ Marshaler: "json", Target: "file", TargetArgs: "event_log.json:1024000:300", //rollingfile filename:maxsize:maxage }, } consumer, err := consumer.New(ctx, cfg) if err != nil { fmt.Printf("consumer.New err:%s\n", err) return } go func() { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) <-c consumer.Close() }() consumer.Start() }
Output:
func (*Consumer) Cleanup ¶
func (c *Consumer) Cleanup(s sarama.ConsumerGroupSession) error
Cleanupimplements sarama.ConsumerGroupHandler
func (*Consumer) ConsumeClaim ¶
func (c *Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim implements sarama.ConsumerGroupHandler
type Marshaler ¶
type Marshaler interface {
Marshal(*SinkMessage) ([]byte, error)
}
Marshaler marshal message
func GetMarshaler ¶
type MarshalerFunc ¶
type MarshalerFunc func(*SinkMessage) ([]byte, error)
func (MarshalerFunc) Marshal ¶
func (f MarshalerFunc) Marshal(msg *SinkMessage) ([]byte, error)
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
func WithInputBufferSize ¶
func WithOutputBufferSize ¶
type Sink ¶
type Sink interface { // Sink errors Errors() <-chan error // Channel to send *SinkMessage Input() chan<- *SinkMessage // Ack notify the last successfully processed message Ack() <-chan SinkAck // Must eventually be called to ensure // that any buffered data is written to the underlying io.Writer Close() error }
Sink defines where the EventLog data goes
type SinkConfig ¶
Click to show internal directories.
Click to hide internal directories.