danube

package module
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

README

Danube-go client

The Go Client library for interacting with Danube Messaging Broker platform.

Danube is an open-source distributed Messaging platform written in Rust. Consult the documentation for supported concepts and the platform architecture.

Example usage

Check out the example files.

Start the Danube server

Use the instructions from the documentation to run the Danube broker/cluster.

Create Producer
client := danube.NewClient().ServiceURL("127.0.0.1:6650").Build()

ctx := context.Background()
topic := "/default/test_topic"
producerName := "test_producer"

producer, err := client.NewProducer(ctx).
    WithName(producerName).
    WithTopic(topic).
    Build()
if err != nil {
    log.Fatalf("unable to initialize the producer: %v", err)
}

if err := producer.Create(ctx); err != nil {
    log.Fatalf("Failed to create producer: %v", err)
}
log.Printf("The Producer %s was created", producerName)

payload := fmt.Sprintln("Hello Danube")

// Convert string to bytes
bytes_payload := []byte(payload)

// You can send the payload along with the user defined attributes, in this case is nil
messageID, err := producer.Send(ctx, bytes_payload, nil)
if err != nil {
    log.Fatalf("Failed to send message: %v", err)
}
log.Printf("The Message with id %v was sent", messageID)
Create Consumer
client := danube.NewClient().ServiceURL("127.0.0.1:6650").Build()

ctx := context.Background()
topic := "/default/test_topic"
consumerName := "test_consumer"
subscriptionName := "test_subscription"
subType := danube.Exclusive

consumer, err := client.NewConsumer(ctx).
    WithConsumerName(consumerName).
    WithTopic(topic).
    WithSubscription(subscriptionName).
    WithSubscriptionType(subType).
    Build()
if err != nil {
    log.Fatalf("Failed to initialize the consumer: %v", err)
}

// Request to subscribe to the topic and create the resources on the Danube Broker
if err := consumer.Subscribe(ctx); err != nil {
    log.Fatalf("Failed to subscribe: %v", err)
}
log.Printf("The Consumer %s was created", consumerName)

// Request to receive messages
stream, err := consumer.Receive(ctx)
if err != nil {
    log.Fatalf("Failed to receive messages: %v", err)
}

// consume the messages from the go channel
for msg := range stream {
    fmt.Printf("Received message: %+v\n", string(msg.GetPayload()))

    // Acknowledge the message
    if _, err := consumer.Ack(ctx, msg); err != nil {
        log.Fatalf("Failed to acknowledge message: %v", err)
    }
}

Contribution

Working on improving and adding new features. Please feel free to contribute or report any issues you encounter.

Use latest DanubeApi.proto file

Make sure the proto/DanubeApi.proto is the latest from Danube project.

If not replace the file and add at the top of the file

option go_package = "github.com/danube-messaging/danube-go/proto";

right after the package danube;

In order to generate the Go grpc code you need the following packages installed:

go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

And generate the Go code from the proto file:

protoc --proto_path=./proto --go_out=./proto --go-grpc_out=./proto --go_opt=paths=source_relative      --go-grpc_opt=paths=source_relative proto/DanubeApi.proto

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewLookupService

func NewLookupService(cnxManager *connectionManager) *lookupService

NewLookupService creates a new instance of LookupService

func ProtoSchemaToJSON

func ProtoSchemaToJSON(protoSchema *proto.Schema) (string, error)

Convert Protobuf Schema to JSON

Types

type BrokerAddress

type BrokerAddress struct {
	ConnectURL string
	BrokerURL  string
	Proxy      bool
}

type ConfigDispatchStrategy

type ConfigDispatchStrategy struct {
	// Using embedded struct to simulate enum-like behavior
	NonReliable     bool
	ReliableOptions *ReliableOptions
}

ConfigDispatchStrategy represents the dispatch strategy for a topic

func NewConfigDispatchStrategy

func NewConfigDispatchStrategy() *ConfigDispatchStrategy

NewConfigDispatchStrategy creates a new ConfigDispatchStrategy instance

func NewReliableDispatchStrategy

func NewReliableDispatchStrategy(options *ReliableOptions) *ConfigDispatchStrategy

NewReliableDispatchStrategy creates a new reliable ConfigDispatchStrategy instance

func (*ConfigDispatchStrategy) ToProtoDispatchStrategy

func (c *ConfigDispatchStrategy) ToProtoDispatchStrategy() *proto.TopicDispatchStrategy

ToProtoDispatchStrategy converts ConfigDispatchStrategy to proto.TopicDispatchStrategy

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a message consumer that subscribes to a topic and receives messages. It handles communication with the message broker and manages the consumer's state.

func (*Consumer) Ack

func (c *Consumer) Ack(ctx context.Context, message *proto.StreamMessage) (*proto.AckResponse, error)

Ack acknowledges a received message.

func (*Consumer) Receive

func (c *Consumer) Receive(ctx context.Context) (chan *proto.StreamMessage, error)

Receive starts receiving messages from the subscribed partitioned or non-partitioned topic. It continuously polls for new messages and handles them as long as the stopSignal has not been set to true.

Parameters: - ctx: The context for managing the receive operation.

Returns: - StreamMessage channel for receiving messages from the broker. - error: An error if the receive client cannot be created or if other issues occur.

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(ctx context.Context) error

Subscribe initializes the subscription to the non-partitioned or partitioned topic and starts the health check service. It establishes a gRPC connection with the brokers and requests to subscribe to the topic.

Parameters: - ctx: The context for managing the subscription lifecycle.

Returns: - error: An error if the subscription fails or if initialization encounters issues.

type ConsumerBuilder

type ConsumerBuilder struct {
	// contains filtered or unexported fields
}

ConsumerBuilder is a builder for creating a new Consumer instance. It allows setting various properties for the consumer such as topic, name, subscription, subscription type, and options.

func (*ConsumerBuilder) Build

func (b *ConsumerBuilder) Build() (*Consumer, error)

Build creates a new Consumer instance using the settings configured in the ConsumerBuilder. It performs validation to ensure that all required fields are set before creating the consumer.

Returns: - *Consumer: A pointer to the newly created Consumer instance if successful. - error: An error if required fields are missing or if consumer creation fails.

func (*ConsumerBuilder) WithConsumerName

func (b *ConsumerBuilder) WithConsumerName(name string) *ConsumerBuilder

WithConsumerName sets the name of the consumer. This is a required field.

Parameters: - name: The name assigned to the consumer instance.

func (*ConsumerBuilder) WithSubscription

func (b *ConsumerBuilder) WithSubscription(subscription string) *ConsumerBuilder

WithSubscription sets the name of the subscription for the consumer. This is a required field.

Parameters: - subscription: The name of the subscription for the consumer.

func (*ConsumerBuilder) WithSubscriptionType

func (b *ConsumerBuilder) WithSubscriptionType(subType SubType) *ConsumerBuilder

WithSubscriptionType sets the type of subscription for the consumer. This field is optional.

Parameters: - subType: The type of subscription (e.g., EXCLUSIVE, SHARED, FAILOVER).

func (*ConsumerBuilder) WithTopic

func (b *ConsumerBuilder) WithTopic(topic string) *ConsumerBuilder

WithTopic sets the topic name for the consumer. This is a required field.

Parameters: - topic: The name of the topic for the consumer.

type ConsumerOptions

type ConsumerOptions struct {
	Others string
}

type DanubeClient

type DanubeClient struct {
	URI string
	// contains filtered or unexported fields
}

DanubeClient is the main client for interacting with the Danube messaging system. It provides methods to create producers and consumers, perform topic lookups, and retrieve schema information.

func (*DanubeClient) GetSchema

func (dc *DanubeClient) GetSchema(ctx context.Context, topic string) (*Schema, error)

GetSchema retrieves the schema associated with a specified topic from the schema service.

Parameters: - ctx: The context for managing the schema retrieval operation. - topic: The name of the topic for which the schema is to be retrieved.

Returns: - *Schema: The schema associated with the topic. - error: An error if the schema retrieval fails or other issues occur.

func (*DanubeClient) LookupTopic

func (dc *DanubeClient) LookupTopic(ctx context.Context, addr string, topic string) (*LookupResult, error)

LookupTopic retrieves the address of the broker responsible for a specified topic.

Parameters: - ctx: The context for managing the lookup operation. - addr: The address of the lookup service. - topic: The name of the topic to look up.

Returns: - *LookupResult: The result of the lookup operation, containing broker address and other details. - error: An error if the lookup fails or other issues occur.

func (*DanubeClient) NewConsumer

func (dc *DanubeClient) NewConsumer(ctx context.Context) *ConsumerBuilder

NewConsumer returns a new ConsumerBuilder, which is used to configure and create a Consumer instance.

Parameters: - ctx: The context for managing the lifecycle of the ConsumerBuilder and any operations performed with it.

func (*DanubeClient) NewProducer

func (dc *DanubeClient) NewProducer(ctx context.Context) *ProducerBuilder

NewProducer returns a new ProducerBuilder, which is used to configure and create a Producer instance.

Parameters: - ctx: The context for managing the lifecycle of the ProducerBuilder and any operations performed with it.

type DanubeClientBuilder

type DanubeClientBuilder struct {
	URI               string
	ConnectionOptions []DialOption
}

DanubeClientBuilder is used for configuring and creating a DanubeClient instance. It provides methods for setting various options, including the service URL, connection options, and logger.

Fields: - URI: The base URI for the Danube service. This is required for constructing the client. - ConnectionOptions: Optional connection settings for configuring how the client connects to the service.

func NewClient

func NewClient() *DanubeClientBuilder

NewClient initializes a new DanubeClientBuilder. The builder pattern allows for configuring and constructing a DanubeClient instance with optional settings and options.

Returns: - *DanubeClientBuilder: A new instance of DanubeClientBuilder for configuring and building a DanubeClient.

func (*DanubeClientBuilder) Build

func (b *DanubeClientBuilder) Build() *DanubeClient

Build constructs and returns a DanubeClient instance based on the configuration specified in the builder.

Returns: - *DanubeClient: A new instance of DanubeClient configured with the specified options.

func (*DanubeClientBuilder) ServiceURL

func (b *DanubeClientBuilder) ServiceURL(url string) *DanubeClientBuilder

ServiceURL sets the base URI for the Danube service in the builder.

Parameters: - url: The base URI to use for connecting to the Danube service.

Returns: - *DanubeClientBuilder: The updated builder instance with the new service URL.

func (*DanubeClientBuilder) WithConnectionOptions

func (b *DanubeClientBuilder) WithConnectionOptions(options []DialOption) *DanubeClientBuilder

WithConnectionOptions sets optional connection settings for the client in the builder.

Parameters: - options: A slice of DialOption used to configure the client's connection settings.

Returns: - *DanubeClientBuilder: The updated builder instance with the specified connection options.

type DialOption

type DialOption func(*[]grpc.DialOption)

DialOption is a function that configures gRPC dial options.

func WithConnectionTimeout

func WithConnectionTimeout(timeout time.Duration) DialOption

WithConnectionTimeout configures the connection timeout for the connection.

func WithKeepAliveInterval

func WithKeepAliveInterval(interval time.Duration) DialOption

WithKeepAliveInterval configures the keepalive interval for the connection.

type LookupResult

type LookupResult struct {
	ResponseType proto.TopicLookupResponse_LookupType
	Addr         string
}

LookupResult holds the result of a topic lookup

type MessageRouter

type MessageRouter struct {
	// contains filtered or unexported fields
}

func NewMessageRouter

func NewMessageRouter(partitions int32) *MessageRouter

func (*MessageRouter) RoundRobin

func (router *MessageRouter) RoundRobin() int32

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer represents a message producer that is responsible for sending messages to a specific partitioned or non-partitioned topic on a message broker. It handles producer creation, message sending, and maintains the producer's state.

func (*Producer) Create

func (p *Producer) Create(ctx context.Context) error

Create initializes the producer and registers it with the message brokers.

Parameters: - ctx: The context for managing request lifecycle and cancellation.

Returns: - error: An error if producer creation fails.

func (*Producer) Send

func (p *Producer) Send(ctx context.Context, data []byte, attributes map[string]string) (uint64, error)

Send sends a message to the topic associated with this producer.

It constructs a message request and sends it to the broker. The method handles payload and error reporting. It assumes that the producer has been successfully created and is ready to send messages.

Parameters: - ctx: The context for managing request lifecycle and cancellation. - data: The message payload to be sent. - attributes: user-defined properties or attributes associated with the message

Returns: - uint64: The sequence ID of the sent message if successful. - error: An error if message sending fail

type ProducerBuilder

type ProducerBuilder struct {
	// contains filtered or unexported fields
}

ProducerBuilder is a builder for creating a new Producer instance. It allows setting various properties for the producer such as topic, name, schema, and options.

func (*ProducerBuilder) Build

func (pb *ProducerBuilder) Build() (*Producer, error)

Build creates a new Producer instance using the settings configured in the ProducerBuilder. It performs validation to ensure that required fields are set before creating the producer.

Returns: - *Producer: A pointer to the newly created Producer instance if successful. - error: An error if required fields are missing or if producer creation fails.

func (*ProducerBuilder) WithDispatchStrategy

func (pb *ProducerBuilder) WithDispatchStrategy(dispatch_strategy *ConfigDispatchStrategy) *ProducerBuilder

WithDispatchStrategy sets the dispatch strategy for the producer. This method configures the retention strategy for the producer, which determines how messages are stored and managed.

Parameters: - dispatch_strategy: The dispatch strategy for the producer.

func (*ProducerBuilder) WithName

func (pb *ProducerBuilder) WithName(producerName string) *ProducerBuilder

WithName sets the name of the producer. This is a required field.

Parameters: - producerName: The name assigned to the producer instance.

func (*ProducerBuilder) WithOptions

func (pb *ProducerBuilder) WithOptions(options ProducerOptions) *ProducerBuilder

WithOptions sets the configuration options for the producer. This allows for customization of producer behavior.

Parameters: - options: Configuration options for the producer.

func (*ProducerBuilder) WithPartitions

func (pb *ProducerBuilder) WithPartitions(partitions int32) *ProducerBuilder

WithPartitions sets the number of topic partitions.

Parameters: - partitions: The number of partitions for a new topic.

func (*ProducerBuilder) WithSchema

func (pb *ProducerBuilder) WithSchema(schemaName string, schemaType SchemaType, schemaData string) *ProducerBuilder

WithSchema sets the schema for the producer, defining the structure of the messages.

Parameters: - schemaName: The name of the schema. - schemaType: The type of the schema (e.g., SchemaType_BYTES, SchemaType_STRING, SchemaType_JSON) - schemaData: The data or definition of the schema only if it is SchemaType_JSON

func (*ProducerBuilder) WithTopic

func (pb *ProducerBuilder) WithTopic(topic string) *ProducerBuilder

WithTopic sets the topic name for the producer. This is a required field.

Parameters: - topic: The name of the topic for the producer.

type ProducerOptions

type ProducerOptions struct {
}

type ReliableOptions

type ReliableOptions struct {
	SegmentSize     int64
	RetentionPolicy RetentionPolicy
	RetentionPeriod uint64
}

ReliableOptions represents configuration options for reliable dispatch strategy

func NewReliableOptions

func NewReliableOptions(segmentSize int64, retentionPolicy RetentionPolicy, retentionPeriod uint64) *ReliableOptions

NewReliableOptions creates a new ReliableOptions instance

type RetentionPolicy

type RetentionPolicy int

RetentionPolicy represents the retention policy for messages in the topic

const (
	RetainUntilAck RetentionPolicy = iota
	RetainUntilExpire
)

type Schema

type Schema struct {
	Name       string
	SchemaData []byte
	TypeSchema SchemaType
}

Schema represents the structure of data, including its type and associated schema data. It is used to define how data should be serialized, deserialized, and validated.

Fields: - Name: The name of the schema. This is typically used for identification purposes. - SchemaData: The schema data itself, which contains the schema's definition. Only used with JSON TypeSchema - TypeSchema: The type of schema that determines the format of the data (e.g., JSON, STRING).

func FromProtoSchema

func FromProtoSchema(protoSchema *proto.Schema) (*Schema, error)

Convert Protobuf Schema to Schema

func NewSchema

func NewSchema(name string, schemaType SchemaType, jsonSchema string) *Schema

NewSchema creates a new Schema instance with the specified name, type, and optional JSON schema data. It initializes the Schema with appropriate schema data based on the type.

Parameters: - name: The name assigned to the schema. - schemaType: The type of schema that determines how data is structured (e.g., JSON, STRING). - jsonSchema: The JSON schema data used if the schemaType is SchemaType_JSON. It is ignored for other schema types.

Returns: - *Schema: A pointer to the newly created Schema instance.

func (*Schema) JSONSchema

func (s *Schema) JSONSchema() (string, error)

Convert JSON Schema to a Go string

func (*Schema) ToProto

func (s *Schema) ToProto() *proto.Schema

Convert Schema to Protobuf Schema

type SchemaType

type SchemaType int32

SchemaType represents the type of schema used for data serialization and validation. It defines the possible types of schemas that can be applied to data.

Constants: - SchemaType_BYTES: Represents a schema where data is in raw bytes format. - SchemaType_STRING: Represents a schema where data is in string format. - SchemaType_INT64: Represents a schema where data is in 64-bit integer format. - SchemaType_JSON: Represents a schema where data is in JSON format.

const (
	SchemaType_BYTES  SchemaType = 0
	SchemaType_STRING SchemaType = 1
	SchemaType_INT64  SchemaType = 2
	SchemaType_JSON   SchemaType = 3
)

func FromProtoTypeSchema

func FromProtoTypeSchema(protoSchema proto.Schema_TypeSchema) SchemaType

Convert Protobuf TypeSchema to SchemaType

func (SchemaType) ToProto

func (s SchemaType) ToProto() proto.Schema_TypeSchema

Convert SchemaType to Protobuf representation

type SubType

type SubType int

the type of subscription (e.g., EXCLUSIVE, SHARED, FAILOVER)

const (
	// Exclusive - only one consumer can subscribe to a specific subscription
	Exclusive SubType = iota
	//  Shared - multiple consumers can subscribe, messages are delivered round-robin
	Shared
	// FailOver - similar to exclusive subscriptions, but multiple consumers can subscribe, and one actively receives messages
	FailOver
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL