com

package
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitKafkaTopic

func InitKafkaTopic(bootstrapUrl string, partitionNumber int, topics ...string) (err error)

func NewKafkaReader

func NewKafkaReader(config configuration.Config, topic model.Topic, handler ReadHandler) (reader *kafka.Reader, err error)

func NewKafkaWriter

func NewKafkaWriter(config configuration.Config, topic model.Topic) *kafka.Writer

Types

type Bypass

type Bypass struct {
	// contains filtered or unexported fields
}

func (*Bypass) Close

func (this *Bypass) Close() error

func (*Bypass) SendPermissions

func (this *Bypass) SendPermissions(ctx context.Context, topic model.Topic, id string, permissions model.ResourcePermissions) (err error)

type BypassProvider

type BypassProvider struct{}

func NewBypassProvider

func NewBypassProvider() *BypassProvider

func (*BypassProvider) Get

func (this *BypassProvider) Get(config configuration.Config, topic model.Topic, readHandler ReadHandler) (Com, error)

type Com

type Com interface {
	Close() error
	SendPermissions(ctx context.Context, topic model.Topic, id string, permissions model.ResourcePermissions) (err error)
}

type Command

type Command struct {
	Command string               `json:"command"`
	Id      string               `json:"id"`
	Rights  *ResourcePermissions `json:"rights"`
	Owner   string               `json:"owner,omitempty"`
}

type KafkaCom

type KafkaCom struct {
	// contains filtered or unexported fields
}

func (*KafkaCom) Close

func (this *KafkaCom) Close() (err error)

func (*KafkaCom) SendPermissions

func (this *KafkaCom) SendPermissions(ctx context.Context, topic model.Topic, id string, permissions model.ResourcePermissions) (err error)

type KafkaComProvider

type KafkaComProvider struct{}

func NewKafkaComProvider

func NewKafkaComProvider() *KafkaComProvider

func (*KafkaComProvider) Get

func (this *KafkaComProvider) Get(config configuration.Config, topic model.Topic, readHandler ReadHandler) (result Com, err error)

type KeySeparationBalancer

type KeySeparationBalancer struct {
	SubBalancer kafka.Balancer
	Seperator   string
}

func (*KeySeparationBalancer) Balance

func (this *KeySeparationBalancer) Balance(msg kafka.Message, partitions ...int) (partition int)

type Provider

type Provider interface {
	Get(config configuration.Config, topic model.Topic, readHandler ReadHandler) (Com, error)
}

type ReadHandler

type ReadHandler interface {
	HandleReceivedCommand(topic model.Topic, r model.Resource, t time.Time) error
	HandleReaderError(topic model.Topic, err error)
	HandleResourceUpdate(topic model.Topic, id string, owner string) error
	HandleResourceDelete(topic model.Topic, id string) error
}

type ResourcePermissions

type ResourcePermissions struct {
	UserRights  map[string]Right `json:"user_rights"`
	GroupRights map[string]Right `json:"group_rights"`
}
type Right struct {
	Read         bool `json:"read"`
	Write        bool `json:"write"`
	Execute      bool `json:"execute"`
	Administrate bool `json:"administrate"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL