go-queue
Kafka, Beanstalkd, Pulsar Pub/Sub framework. Reference: https://github.com/zeromicro/go-queue
installation
go get -u github.com/chenquan/go-queue
beanstalkd
High available beanstalkd.
consumer example
config.yaml
Name: beanstalkd
Telemetry:
Name: beanstalkd
Endpoint: http://localhost:14268/api/traces
Sampler: 1.0
Natcher: jaeger
package main
import (
"context"
"github.com/chenquan/go-queue/beanstalkd"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/service"
"github.com/zeromicro/go-zero/core/stores/redis"
)
func main() {
var c service.ServiceConf
conf.MustLoad("config.yaml", &c)
c.MustSetUp()
consumer := beanstalkd.NewConsumer(beanstalkd.Conf{
Beanstalkd: beanstalkd.Beanstalkd{
Endpoints: []string{
"localhost:11300",
"localhost:11300",
},
Tube: "tube",
},
Redis: redis.RedisConf{
Host: "localhost:6379",
Type: redis.NodeType,
},
}, beanstalkd.WithHandle(func(ctx context.Context, body []byte) {
logx.WithContext(ctx).Info(string(body))
}))
defer consumer.Stop()
consumer.Start()
}
producer example
package main
import (
"context"
"fmt"
"github.com/chenquan/go-queue/beanstalkd"
"strconv"
"time"
)
func main() {
producer := beanstalkd.NewProducer(
beanstalkd.Beanstalkd{
Tube: "tube",
Endpoints: []string{
"localhost:11300",
"127.0.0.1:11300",
},
},
)
for i := 1; i < 1005; i++ {
//_, err := producer.Delay(context.Background(), []byte(strconv.Itoa(i)), time.Second*5)
//if err != nil {
// fmt.Println(err)
//}
_, err := producer.Push(context.Background(), nil, []byte(strconv.Itoa(i)), beanstalkd.WithDuration(time.Second*5))
if err != nil {
fmt.Println(err)
}
}
}
kafka
Kafka Pub/Sub framework
consumer example
config.yaml
Name: kafka
Brokers:
- 127.0.0.1:19092
- 127.0.0.1:19092
- 127.0.0.1:19092
Group: kafka
Topic: kafka
Offset: first
Consumers: 1
Telemetry:
Name: kq
Endpoint: http://localhost:14268/api/traces
Sampler: 1.0
Natcher: jaeger
example code
package main
import (
"context"
"fmt"
"github.com/chenquan/go-queue/kafka"
"github.com/chenquan/go-queue/queue"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/service"
"github.com/zeromicro/go-zero/core/conf"
)
func main() {
var c struct {
kafka.Conf
service.ServiceConf
}
conf.MustLoad("config.yaml", &c)
c.MustSetUp()
q := kafka.MustNewQueue(c.Conf, queue.WithHandle(func(ctx context.Context, k, v []byte) error {
logx.WithContext(ctx).Info(fmt.Sprintf("=> %s\n", v))
return nil
}))
defer q.Stop()
q.Start()
}
producer example
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/chenquan/go-queue/kafka"
"log"
"math/rand"
"strconv"
"time"
"github.com/zeromicro/go-zero/core/cmdline"
)
type message struct {
Key string `json:"key"`
Value string `json:"value"`
Payload string `json:"message"`
}
func main() {
pusher := kafka.NewPusher([]string{
"127.0.0.1:19092",
"127.0.0.1:19092",
"127.0.0.1:19092",
}, "kafka")
ticker := time.NewTicker(time.Millisecond)
for round := 0; round < 3; round++ {
<-ticker.C
count := rand.Intn(100)
m := message{
Key: strconv.FormatInt(time.Now().UnixNano(), 10),
Value: fmt.Sprintf("%d,%d", round, count),
Payload: fmt.Sprintf("%d,%d", round, count),
}
body, err := json.Marshal(m)
if err != nil {
log.Fatal(err)
}
fmt.Println(string(body))
if _, err := pusher.Push(context.Background(), []byte(strconv.FormatInt(time.Now().UnixNano(), 10)), body); err != nil {
log.Fatal(err)
}
}
cmdline.EnterToContinue()
}
pulsar
Pulsar Pub/Sub framework
consumer example
config.yaml
Name: pulsar
Brokers:
- 127.0.0.1:6650
Topic: pulsar
Conns: 2
Processors: 2
SubscriptionName: pulsar
Telemetry:
Name: pulsar
Endpoint: http://localhost:14268/api/traces
Sampler: 1.0
Natcher: jaeger
consumer code
package main
import (
"context"
"fmt"
"github.com/chenquan/go-queue/pulsar"
"github.com/chenquan/go-queue/queue"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/service"
"github.com/zeromicro/go-zero/core/conf"
)
func main() {
var c struct {
pulsar.Conf
service.ServiceConf
}
conf.MustLoad("config.yaml", &c)
c.MustSetUp()
q := pulsar.MustNewQueue(c.Conf, queue.WithHandle(func(ctx context.Context, k, v []byte) error {
logx.WithContext(ctx).Info(fmt.Sprintf("=> %s\n", v))
return nil
}))
defer q.Stop()
q.Start()
}
producer code
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/chenquan/go-queue/pulsar"
"log"
"math/rand"
"strconv"
"time"
"github.com/zeromicro/go-zero/core/cmdline"
)
type message struct {
Key string `json:"key"`
Value string `json:"value"`
Payload string `json:"message"`
}
func main() {
pusher := pulsar.NewPusher([]string{
"127.0.0.1:19092",
"127.0.0.1:19092",
"127.0.0.1:19092",
}, "pulsar")
ticker := time.NewTicker(time.Millisecond)
for round := 0; round < 3; round++ {
<-ticker.C
count := rand.Intn(100)
m := message{
Key: strconv.FormatInt(time.Now().UnixNano(), 10),
Value: fmt.Sprintf("%d,%d", round, count),
Payload: fmt.Sprintf("%d,%d", round, count),
}
body, err := json.Marshal(m)
if err != nil {
log.Fatal(err)
}
fmt.Println(string(body))
if _, err := pusher.Push(context.Background(), []byte(strconv.FormatInt(time.Now().UnixNano(), 10)), body); err != nil {
log.Fatal(err)
}
}
cmdline.EnterToContinue()
}