Documentation ¶
Index ¶
- func NewLookupService(cnxManager *connectionManager) *lookupService
- func ProtoSchemaToJSON(protoSchema *proto.Schema) (string, error)
- type BrokerAddress
- type ConfigDispatchStrategy
- type Consumer
- type ConsumerBuilder
- func (b *ConsumerBuilder) Build() (*Consumer, error)
- func (b *ConsumerBuilder) WithConsumerName(name string) *ConsumerBuilder
- func (b *ConsumerBuilder) WithSubscription(subscription string) *ConsumerBuilder
- func (b *ConsumerBuilder) WithSubscriptionType(subType SubType) *ConsumerBuilder
- func (b *ConsumerBuilder) WithTopic(topic string) *ConsumerBuilder
- type ConsumerOptions
- type DanubeClient
- func (dc *DanubeClient) GetSchema(ctx context.Context, topic string) (*Schema, error)
- func (dc *DanubeClient) LookupTopic(ctx context.Context, addr string, topic string) (*LookupResult, error)
- func (dc *DanubeClient) NewConsumer(ctx context.Context) *ConsumerBuilder
- func (dc *DanubeClient) NewProducer(ctx context.Context) *ProducerBuilder
- type DanubeClientBuilder
- type DialOption
- type LookupResult
- type MessageRouter
- type Producer
- type ProducerBuilder
- func (pb *ProducerBuilder) Build() (*Producer, error)
- func (pb *ProducerBuilder) WithDispatchStrategy(dispatch_strategy *ConfigDispatchStrategy) *ProducerBuilder
- func (pb *ProducerBuilder) WithName(producerName string) *ProducerBuilder
- func (pb *ProducerBuilder) WithOptions(options ProducerOptions) *ProducerBuilder
- func (pb *ProducerBuilder) WithPartitions(partitions int32) *ProducerBuilder
- func (pb *ProducerBuilder) WithSchema(schemaName string, schemaType SchemaType, schemaData string) *ProducerBuilder
- func (pb *ProducerBuilder) WithTopic(topic string) *ProducerBuilder
- type ProducerOptions
- type ReliableOptions
- type RetentionPolicy
- type Schema
- type SchemaType
- type SubType
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewLookupService ¶
func NewLookupService(cnxManager *connectionManager) *lookupService
NewLookupService creates a new instance of LookupService
Types ¶
type BrokerAddress ¶
type ConfigDispatchStrategy ¶
type ConfigDispatchStrategy struct { // Using embedded struct to simulate enum-like behavior NonReliable bool ReliableOptions *ReliableOptions }
ConfigDispatchStrategy represents the dispatch strategy for a topic
func NewConfigDispatchStrategy ¶
func NewConfigDispatchStrategy() *ConfigDispatchStrategy
NewConfigDispatchStrategy creates a new ConfigDispatchStrategy instance
func NewReliableDispatchStrategy ¶
func NewReliableDispatchStrategy(options *ReliableOptions) *ConfigDispatchStrategy
NewReliableDispatchStrategy creates a new reliable ConfigDispatchStrategy instance
func (*ConfigDispatchStrategy) ToProtoDispatchStrategy ¶
func (c *ConfigDispatchStrategy) ToProtoDispatchStrategy() *proto.TopicDispatchStrategy
ToProtoDispatchStrategy converts ConfigDispatchStrategy to proto.TopicDispatchStrategy
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer represents a message consumer that subscribes to a topic and receives messages. It handles communication with the message broker and manages the consumer's state.
func (*Consumer) Ack ¶
func (c *Consumer) Ack(ctx context.Context, message *proto.StreamMessage) (*proto.AckResponse, error)
Ack acknowledges a received message.
func (*Consumer) Receive ¶
Receive starts receiving messages from the subscribed partitioned or non-partitioned topic. It continuously polls for new messages and handles them as long as the stopSignal has not been set to true.
Parameters: - ctx: The context for managing the receive operation.
Returns: - StreamMessage channel for receiving messages from the broker. - error: An error if the receive client cannot be created or if other issues occur.
func (*Consumer) Subscribe ¶
Subscribe initializes the subscription to the non-partitioned or partitioned topic and starts the health check service. It establishes a gRPC connection with the brokers and requests to subscribe to the topic.
Parameters: - ctx: The context for managing the subscription lifecycle.
Returns: - error: An error if the subscription fails or if initialization encounters issues.
type ConsumerBuilder ¶
type ConsumerBuilder struct {
// contains filtered or unexported fields
}
ConsumerBuilder is a builder for creating a new Consumer instance. It allows setting various properties for the consumer such as topic, name, subscription, subscription type, and options.
func (*ConsumerBuilder) Build ¶
func (b *ConsumerBuilder) Build() (*Consumer, error)
Build creates a new Consumer instance using the settings configured in the ConsumerBuilder. It performs validation to ensure that all required fields are set before creating the consumer.
Returns: - *Consumer: A pointer to the newly created Consumer instance if successful. - error: An error if required fields are missing or if consumer creation fails.
func (*ConsumerBuilder) WithConsumerName ¶
func (b *ConsumerBuilder) WithConsumerName(name string) *ConsumerBuilder
WithConsumerName sets the name of the consumer. This is a required field.
Parameters: - name: The name assigned to the consumer instance.
func (*ConsumerBuilder) WithSubscription ¶
func (b *ConsumerBuilder) WithSubscription(subscription string) *ConsumerBuilder
WithSubscription sets the name of the subscription for the consumer. This is a required field.
Parameters: - subscription: The name of the subscription for the consumer.
func (*ConsumerBuilder) WithSubscriptionType ¶
func (b *ConsumerBuilder) WithSubscriptionType(subType SubType) *ConsumerBuilder
WithSubscriptionType sets the type of subscription for the consumer. This field is optional.
Parameters: - subType: The type of subscription (e.g., EXCLUSIVE, SHARED, FAILOVER).
func (*ConsumerBuilder) WithTopic ¶
func (b *ConsumerBuilder) WithTopic(topic string) *ConsumerBuilder
WithTopic sets the topic name for the consumer. This is a required field.
Parameters: - topic: The name of the topic for the consumer.
type ConsumerOptions ¶
type ConsumerOptions struct {
Others string
}
type DanubeClient ¶
type DanubeClient struct { URI string // contains filtered or unexported fields }
DanubeClient is the main client for interacting with the Danube messaging system. It provides methods to create producers and consumers, perform topic lookups, and retrieve schema information.
func (*DanubeClient) GetSchema ¶
GetSchema retrieves the schema associated with a specified topic from the schema service.
Parameters: - ctx: The context for managing the schema retrieval operation. - topic: The name of the topic for which the schema is to be retrieved.
Returns: - *Schema: The schema associated with the topic. - error: An error if the schema retrieval fails or other issues occur.
func (*DanubeClient) LookupTopic ¶
func (dc *DanubeClient) LookupTopic(ctx context.Context, addr string, topic string) (*LookupResult, error)
LookupTopic retrieves the address of the broker responsible for a specified topic.
Parameters: - ctx: The context for managing the lookup operation. - addr: The address of the lookup service. - topic: The name of the topic to look up.
Returns: - *LookupResult: The result of the lookup operation, containing broker address and other details. - error: An error if the lookup fails or other issues occur.
func (*DanubeClient) NewConsumer ¶
func (dc *DanubeClient) NewConsumer(ctx context.Context) *ConsumerBuilder
NewConsumer returns a new ConsumerBuilder, which is used to configure and create a Consumer instance.
Parameters: - ctx: The context for managing the lifecycle of the ConsumerBuilder and any operations performed with it.
func (*DanubeClient) NewProducer ¶
func (dc *DanubeClient) NewProducer(ctx context.Context) *ProducerBuilder
NewProducer returns a new ProducerBuilder, which is used to configure and create a Producer instance.
Parameters: - ctx: The context for managing the lifecycle of the ProducerBuilder and any operations performed with it.
type DanubeClientBuilder ¶
type DanubeClientBuilder struct { URI string ConnectionOptions []DialOption }
DanubeClientBuilder is used for configuring and creating a DanubeClient instance. It provides methods for setting various options, including the service URL, connection options, and logger.
Fields: - URI: The base URI for the Danube service. This is required for constructing the client. - ConnectionOptions: Optional connection settings for configuring how the client connects to the service.
func NewClient ¶
func NewClient() *DanubeClientBuilder
NewClient initializes a new DanubeClientBuilder. The builder pattern allows for configuring and constructing a DanubeClient instance with optional settings and options.
Returns: - *DanubeClientBuilder: A new instance of DanubeClientBuilder for configuring and building a DanubeClient.
func (*DanubeClientBuilder) Build ¶
func (b *DanubeClientBuilder) Build() *DanubeClient
Build constructs and returns a DanubeClient instance based on the configuration specified in the builder.
Returns: - *DanubeClient: A new instance of DanubeClient configured with the specified options.
func (*DanubeClientBuilder) ServiceURL ¶
func (b *DanubeClientBuilder) ServiceURL(url string) *DanubeClientBuilder
ServiceURL sets the base URI for the Danube service in the builder.
Parameters: - url: The base URI to use for connecting to the Danube service.
Returns: - *DanubeClientBuilder: The updated builder instance with the new service URL.
func (*DanubeClientBuilder) WithConnectionOptions ¶
func (b *DanubeClientBuilder) WithConnectionOptions(options []DialOption) *DanubeClientBuilder
WithConnectionOptions sets optional connection settings for the client in the builder.
Parameters: - options: A slice of DialOption used to configure the client's connection settings.
Returns: - *DanubeClientBuilder: The updated builder instance with the specified connection options.
type DialOption ¶
type DialOption func(*[]grpc.DialOption)
DialOption is a function that configures gRPC dial options.
func WithConnectionTimeout ¶
func WithConnectionTimeout(timeout time.Duration) DialOption
WithConnectionTimeout configures the connection timeout for the connection.
func WithKeepAliveInterval ¶
func WithKeepAliveInterval(interval time.Duration) DialOption
WithKeepAliveInterval configures the keepalive interval for the connection.
type LookupResult ¶
type LookupResult struct { ResponseType proto.TopicLookupResponse_LookupType Addr string }
LookupResult holds the result of a topic lookup
type MessageRouter ¶
type MessageRouter struct {
// contains filtered or unexported fields
}
func NewMessageRouter ¶
func NewMessageRouter(partitions int32) *MessageRouter
func (*MessageRouter) RoundRobin ¶
func (router *MessageRouter) RoundRobin() int32
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer represents a message producer that is responsible for sending messages to a specific partitioned or non-partitioned topic on a message broker. It handles producer creation, message sending, and maintains the producer's state.
func (*Producer) Create ¶
Create initializes the producer and registers it with the message brokers.
Parameters: - ctx: The context for managing request lifecycle and cancellation.
Returns: - error: An error if producer creation fails.
func (*Producer) Send ¶
func (p *Producer) Send(ctx context.Context, data []byte, attributes map[string]string) (uint64, error)
Send sends a message to the topic associated with this producer.
It constructs a message request and sends it to the broker. The method handles payload and error reporting. It assumes that the producer has been successfully created and is ready to send messages.
Parameters: - ctx: The context for managing request lifecycle and cancellation. - data: The message payload to be sent. - attributes: user-defined properties or attributes associated with the message
Returns: - uint64: The sequence ID of the sent message if successful. - error: An error if message sending fail
type ProducerBuilder ¶
type ProducerBuilder struct {
// contains filtered or unexported fields
}
ProducerBuilder is a builder for creating a new Producer instance. It allows setting various properties for the producer such as topic, name, schema, and options.
func (*ProducerBuilder) Build ¶
func (pb *ProducerBuilder) Build() (*Producer, error)
Build creates a new Producer instance using the settings configured in the ProducerBuilder. It performs validation to ensure that required fields are set before creating the producer.
Returns: - *Producer: A pointer to the newly created Producer instance if successful. - error: An error if required fields are missing or if producer creation fails.
func (*ProducerBuilder) WithDispatchStrategy ¶
func (pb *ProducerBuilder) WithDispatchStrategy(dispatch_strategy *ConfigDispatchStrategy) *ProducerBuilder
WithDispatchStrategy sets the dispatch strategy for the producer. This method configures the retention strategy for the producer, which determines how messages are stored and managed.
Parameters: - dispatch_strategy: The dispatch strategy for the producer.
func (*ProducerBuilder) WithName ¶
func (pb *ProducerBuilder) WithName(producerName string) *ProducerBuilder
WithName sets the name of the producer. This is a required field.
Parameters: - producerName: The name assigned to the producer instance.
func (*ProducerBuilder) WithOptions ¶
func (pb *ProducerBuilder) WithOptions(options ProducerOptions) *ProducerBuilder
WithOptions sets the configuration options for the producer. This allows for customization of producer behavior.
Parameters: - options: Configuration options for the producer.
func (*ProducerBuilder) WithPartitions ¶
func (pb *ProducerBuilder) WithPartitions(partitions int32) *ProducerBuilder
WithPartitions sets the number of topic partitions.
Parameters: - partitions: The number of partitions for a new topic.
func (*ProducerBuilder) WithSchema ¶
func (pb *ProducerBuilder) WithSchema(schemaName string, schemaType SchemaType, schemaData string) *ProducerBuilder
WithSchema sets the schema for the producer, defining the structure of the messages.
Parameters: - schemaName: The name of the schema. - schemaType: The type of the schema (e.g., SchemaType_BYTES, SchemaType_STRING, SchemaType_JSON) - schemaData: The data or definition of the schema only if it is SchemaType_JSON
func (*ProducerBuilder) WithTopic ¶
func (pb *ProducerBuilder) WithTopic(topic string) *ProducerBuilder
WithTopic sets the topic name for the producer. This is a required field.
Parameters: - topic: The name of the topic for the producer.
type ProducerOptions ¶
type ProducerOptions struct { }
type ReliableOptions ¶
type ReliableOptions struct { SegmentSize int64 RetentionPolicy RetentionPolicy RetentionPeriod uint64 }
ReliableOptions represents configuration options for reliable dispatch strategy
func NewReliableOptions ¶
func NewReliableOptions(segmentSize int64, retentionPolicy RetentionPolicy, retentionPeriod uint64) *ReliableOptions
NewReliableOptions creates a new ReliableOptions instance
type RetentionPolicy ¶
type RetentionPolicy int
RetentionPolicy represents the retention policy for messages in the topic
const ( RetainUntilAck RetentionPolicy = iota RetainUntilExpire )
type Schema ¶
type Schema struct { Name string SchemaData []byte TypeSchema SchemaType }
Schema represents the structure of data, including its type and associated schema data. It is used to define how data should be serialized, deserialized, and validated.
Fields: - Name: The name of the schema. This is typically used for identification purposes. - SchemaData: The schema data itself, which contains the schema's definition. Only used with JSON TypeSchema - TypeSchema: The type of schema that determines the format of the data (e.g., JSON, STRING).
func FromProtoSchema ¶
Convert Protobuf Schema to Schema
func NewSchema ¶
func NewSchema(name string, schemaType SchemaType, jsonSchema string) *Schema
NewSchema creates a new Schema instance with the specified name, type, and optional JSON schema data. It initializes the Schema with appropriate schema data based on the type.
Parameters: - name: The name assigned to the schema. - schemaType: The type of schema that determines how data is structured (e.g., JSON, STRING). - jsonSchema: The JSON schema data used if the schemaType is SchemaType_JSON. It is ignored for other schema types.
Returns: - *Schema: A pointer to the newly created Schema instance.
func (*Schema) JSONSchema ¶
Convert JSON Schema to a Go string
type SchemaType ¶
type SchemaType int32
SchemaType represents the type of schema used for data serialization and validation. It defines the possible types of schemas that can be applied to data.
Constants: - SchemaType_BYTES: Represents a schema where data is in raw bytes format. - SchemaType_STRING: Represents a schema where data is in string format. - SchemaType_INT64: Represents a schema where data is in 64-bit integer format. - SchemaType_JSON: Represents a schema where data is in JSON format.
const ( SchemaType_BYTES SchemaType = 0 SchemaType_STRING SchemaType = 1 SchemaType_INT64 SchemaType = 2 SchemaType_JSON SchemaType = 3 )
func FromProtoTypeSchema ¶
func FromProtoTypeSchema(protoSchema proto.Schema_TypeSchema) SchemaType
Convert Protobuf TypeSchema to SchemaType
func (SchemaType) ToProto ¶
func (s SchemaType) ToProto() proto.Schema_TypeSchema
Convert SchemaType to Protobuf representation