qstreamer

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 25, 2024 License: MIT Imports: 6 Imported by: 0

README

Queue Streamer

Coverage Test CodeQL

Queue Streamer is a Go package that processes and transfers data between Kafka topics with exactly-once delivery guarantees. This package receives messages from Kafka brokers and transfers them to specified topics. This document explains how to install and use Queue Streamer.

Installation

To install Queue Streamer, use the Go modules:

go get github.com/violetpay-org/queue-streamer

Usage

Here is an example code to use Queue Streamer.

Example
package main

import (
	"github.com/violetpay-org/queue-streamer"
	"sync"
)


func main() {
	wg := &sync.WaitGroup{}
	
	brokers := []string{"localhost:9092"} 
	
	// Topic name and partition
	origin := qstreamer.Topic("origin-topic", 3)
	
	// Create a topic streamer from the brokers and the origin topic.
	streamer := qstreamer.NewTopicStreamer(brokers, origin)
	
	// Serializer that converts the message to the message to be produced. 
	// In this case, the message is not converted, so it is a pass-through serializer.
	serializer := qstreamer.NewPassThroughSerializer()
	
	// Destination topic and partition
	destination1 := qstreamer.Topic("destination-topic-1", 5)
	
	cfg := qstreamer.NewStreamConfig(serializer, destination1)
	streamer.AddConfig(cfg)

	
	go streamer.Run()
	defer streamer.Stop()
	wg.Add(1)
	
	wg.Wait()
}
Explanation
  1. Set Topics: Use the Topic() to set the start and end topics.

  2. Create Streamer: Create a new streamer with the NewTopicStreamer() function. This function takes the Kafka brokers and the origin topic as arguments.

  3. Set Serializer: Create a new serializer with the NewPassThroughSerializer() function. This function is used to convert the message to the message to be produced. In this case, the message is not converted, so it is a pass-through serializer.

  4. Set Destination Topic: Use the Topic() to set the destination topic and partition.

  5. Set Configuration: Create a new configuration with the NewStreamConfig() function. This function takes the serializer and the destination topic as arguments. Add the configuration to the streamer with the AddConfig() function.

  6. Run Streamer: Run the streamer with the Run() function. This function starts the streamer and processes the messages.

Contribution

Contributions are welcome! You can contribute to the project by reporting bugs, requesting features, and submitting pull requests.

License

Queue Streamer is distributed under the MIT License. See the LICENSE file for more details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Topic added in v0.1.1

func Topic(name string, partition int32) common.Topic

Types

type DirectStreamer added in v0.2.0

type DirectStreamer struct {
	// contains filtered or unexported fields
}

DirectStreamer is a streamer that streams messages from a topic to a topic.

func NewDirectStreamer added in v0.2.0

func NewDirectStreamer(brokers []string, src common.Topic, groupId string, args ...interface{}) *DirectStreamer

NewDirectStreamer creates a new topic streamer that streams messages from a topic to a topic. The streamer is configured with a list of brokers, a topic to stream from and a consumer group id . If you want to override the default configuration of the sarama consumer and producer, you can pass additional arguments.

  • ds := NewDirectStreamer(brokers, topic, groupId)
  • ds := NewDirectStreamer(brokers, topic, groupId, consumerConfig, producerConfig)
  • ds := NewDirectStreamer(brokers, topic, groupId, nil, producerConfig)

func (*DirectStreamer) Config added in v0.2.0

func (ds *DirectStreamer) Config() (bool, StreamConfig)

func (*DirectStreamer) Consumer added in v0.2.0

func (ds *DirectStreamer) Consumer() *internal.StreamConsumer

func (*DirectStreamer) GroupId added in v0.2.0

func (ds *DirectStreamer) GroupId() string

func (*DirectStreamer) Run added in v0.2.0

func (ds *DirectStreamer) Run()

func (*DirectStreamer) SetConfig added in v0.2.0

func (ds *DirectStreamer) SetConfig(config StreamConfig)

func (*DirectStreamer) Stop added in v0.2.0

func (ds *DirectStreamer) Stop() error

func (*DirectStreamer) Topic added in v0.2.0

func (ds *DirectStreamer) Topic() common.Topic

type PassThroughSerializer

type PassThroughSerializer struct {
}

func NewPassThroughSerializer

func NewPassThroughSerializer() *PassThroughSerializer

func (*PassThroughSerializer) MessageToProduceMessage

func (ts *PassThroughSerializer) MessageToProduceMessage(value string) string

type StreamConfig

type StreamConfig struct {
	// contains filtered or unexported fields
}

func NewStreamConfig

func NewStreamConfig(ms common.MessageSerializer, topic common.Topic) StreamConfig

func (StreamConfig) MessageSerializer

func (ss StreamConfig) MessageSerializer() common.MessageSerializer

func (StreamConfig) Topic

func (ss StreamConfig) Topic() common.Topic

type TopicStreamer

type TopicStreamer struct {
	// contains filtered or unexported fields
}

TopicStreamer is a streamer that streams messages from a topic to other topics.

func NewTopicStreamer

func NewTopicStreamer(brokers []string, topic common.Topic, groupId string, args ...interface{}) *TopicStreamer

NewTopicStreamer creates a new topic streamer that streams messages from a topic to other topics. The streamer is configured with a list of brokers, a topic to stream from and a consumer group id . If you want to override the default configuration of the sarama consumer and producer, you can pass additional arguments.

  • ts := NewTopicStreamer(brokers, topic, groupId)
  • ts := NewTopicStreamer(brokers, topic, groupId, consumerConfig, producerConfig)
  • ts := NewTopicStreamer(brokers, topic, groupId, nil, producerConfig)

func (*TopicStreamer) AddConfig

func (ts *TopicStreamer) AddConfig(config StreamConfig)

func (*TopicStreamer) Configs added in v0.1.1

func (ts *TopicStreamer) Configs() []StreamConfig

func (*TopicStreamer) Consumer added in v0.1.1

func (ts *TopicStreamer) Consumer() *internal.StreamConsumer

func (*TopicStreamer) GroupId added in v0.2.0

func (ts *TopicStreamer) GroupId() string

func (*TopicStreamer) Run

func (ts *TopicStreamer) Run()

func (*TopicStreamer) Stop

func (ts *TopicStreamer) Stop() error

func (*TopicStreamer) Topic added in v0.1.1

func (ts *TopicStreamer) Topic() common.Topic

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL