consumer

package
v0.0.0-...-7b65136 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2020 License: MIT Imports: 19 Imported by: 0

README

EvnetLog Consumer

CMD tool

go get github.com/techxmind/logserver/consumer/cmd/logconsumer

logconsumer --help

Usage:
  -addrs string
    	Kafka broker addresses, multiple values are comma separated (default "127.0.0.1:9092")
  -group_id string
    	Kafka consumer group id (default "default")
  -kafka_version string
    	Kafka version
  -log-level value
    	minimum enabled logging level. debug|info|warn|error|dpanic|panic|fatal
  -sink.input_buffer_size int
    	Sink input buffer size (default 100)
  -sink.marshaler string
    	Output marshaler, json|csv (default "json")
  -sink.marshaler_args string
    	Output marshaler args, csv marshaler requires headers definitions. Header names are comma separated
  -sink.output_buffer_size int
    	Sink output buffer size, 0 means no output buffer (default 4096)
  -sink.target string
    	Output target, stdout|file (default "stdout")
  -sink.target_args string
    	Output target args, file target requires filename specified. RollingFile format: filename:maxsize[:maxage:suffix]
  -topics string
    	Topics to consume, multiple values are comma separated (default "event-log")

CUSTOM

import (
	"context"
	"flag"
	"fmt"
	"os"
	"os/signal"
	"syscall"

	"github.com/techxmind/logserver/consumer"
)

func main() {
	flag.Parse()

	ctx := context.Background()

    cfg := &consumer.Config{
		Addrs:        "localhost:9092",
		KafkaVersion: "2.6.0",
		GroupID:      "test",
		Topics:       "event_log",
		Sink: &consumer.SinkConfig{
			Marshaler:  "json",
			Target:     "file",
			TargetArgs: "event_log.json:1024000:300", //rollingfile filename:maxsize:maxage
		},
	}

	consumer, err := consumer.New(ctx, cfg)
	if err != nil {
		fmt.Printf("consumer.New err:%s\n", err)
		return
	}

	go func() {
		c := make(chan os.Signal, 1)
		signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
		<-c
		consumer.Close()
	}()

	consumer.Start()
}

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetSinkTarget

func GetSinkTarget(target, args string) (io.Writer, error)

func JSONMarshaler

func JSONMarshaler(msg *SinkMessage) ([]byte, error)

JSONMarshaler marshal message to json data

func RegisterMarshaler

func RegisterMarshaler(marshaler string, factory func(string) (Marshaler, error))

func RegisterSinkTarget

func RegisterSinkTarget(target string, factory func(string) (io.Writer, error))

Types

type Ack

type Ack struct {
	Session   sarama.ConsumerGroupSession
	Topic     string
	Partition int32
	Offset    int64
	// contains filtered or unexported fields
}

func (*Ack) Chain

func (ack *Ack) Chain(dst SinkAck) SinkAck

Chain merge ack coming later It's not thread-safe

func (*Ack) MarkOffset

func (ack *Ack) MarkOffset()

type CSVMarshaler

type CSVMarshaler struct {
	// contains filtered or unexported fields
}

CSVMarshaler marshal message to csv text

func NewCSVMarshaler

func NewCSVMarshaler(originalHeaders []string) (*CSVMarshaler, error)

func (*CSVMarshaler) Marshal

func (d *CSVMarshaler) Marshal(msg *SinkMessage) ([]byte, error)

type Config

type Config struct {
	GroupID      string      `json:"group_id"`
	KafkaVersion string      `json:"kafka_version"`
	Addrs        string      `json:"addrs"`
	Topics       string      `json:"topics"`
	Offset       string      `json:"offset"`
	Sink         *SinkConfig `json:"sink"`
}
var DefaultConfig *Config

func (*Config) GetAddrs

func (cfg *Config) GetAddrs() []string

func (*Config) GetKafkaVersion

func (cfg *Config) GetKafkaVersion() sarama.KafkaVersion

func (*Config) GetTopics

func (cfg *Config) GetTopics() []string

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func New

func New(pctx context.Context, cfg *Config) (*Consumer, error)
Example
package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"syscall"

	"github.com/techxmind/logserver/consumer"
)

func main() {
	ctx := context.Background()
	cfg := &consumer.Config{
		Addrs:        "localhost:9092",
		KafkaVersion: "2.6.0",
		GroupID:      "test",
		Topics:       "event_log",
		Sink: &consumer.SinkConfig{
			Marshaler:  "json",
			Target:     "file",
			TargetArgs: "event_log.json:1024000:300", //rollingfile filename:maxsize:maxage
		},
	}
	consumer, err := consumer.New(ctx, cfg)
	if err != nil {
		fmt.Printf("consumer.New err:%s\n", err)
		return
	}

	go func() {
		c := make(chan os.Signal, 1)
		signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
		<-c
		consumer.Close()
	}()

	consumer.Start()
}
Output:

func (*Consumer) Cleanup

func (c *Consumer) Cleanup(s sarama.ConsumerGroupSession) error

Cleanupimplements sarama.ConsumerGroupHandler

func (*Consumer) Close

func (c *Consumer) Close()

func (*Consumer) ConsumeClaim

func (c *Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim implements sarama.ConsumerGroupHandler

func (*Consumer) Setup

Setup implements sarama.ConsumerGroupHandler

func (*Consumer) Start

func (c *Consumer) Start()

type Marshaler

type Marshaler interface {
	Marshal(*SinkMessage) ([]byte, error)
}

Marshaler marshal message

func GetMarshaler

func GetMarshaler(marshaler, args string) (Marshaler, error)

type MarshalerFunc

type MarshalerFunc func(*SinkMessage) ([]byte, error)

func (MarshalerFunc) Marshal

func (f MarshalerFunc) Marshal(msg *SinkMessage) ([]byte, error)

type Option

type Option interface {
	// contains filtered or unexported methods
}

func WithInputBufferSize

func WithInputBufferSize(size int) Option

func WithOutputBufferSize

func WithOutputBufferSize(size int) Option

type Sink

type Sink interface {
	// Sink errors
	Errors() <-chan error

	// Channel to send *SinkMessage
	Input() chan<- *SinkMessage

	// Ack notify the last successfully processed message
	Ack() <-chan SinkAck

	// Must eventually be called to ensure
	// that any buffered data is written to the underlying io.Writer
	Close() error
}

Sink defines where the EventLog data goes

func NewSink

func NewSink(w io.Writer, m Marshaler, optList ...Option) Sink

NewSink returns new Sink

type SinkAck

type SinkAck interface {
	Chain(SinkAck) SinkAck
}

type SinkConfig

type SinkConfig struct {
	Marshaler        string `json:"marshaler"`
	MarshalerArgs    string `json:"marshaler_args"`
	Target           string `json:"target"`
	TargetArgs       string `json:"target_args"`
	OutputBufferSize int    `json:"output_buffer_size"`
	InputBufferSize  int    `json:"input_buffer_size"`
}

type SinkMessage

type SinkMessage struct {
	Ack   SinkAck
	Topic string
	Event *pb.EventLog
}

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL