watcher

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2024 License: MIT Imports: 13 Imported by: 0

README

Casbin Go Cloud Development kit based WatcherEX

Go Reference Go Report Card Build Release

Casbin WatcherEX built on top of gocloud.dev.

Installation

go get github.com/bartventer/casbin-go-cloud-watcher

Usage

Configuration is slightly different for each provider as it needs to get different settings from environment. You can read more about URLs and configuration here: https://gocloud.dev/concepts/urls/.

Supported providers:

You can view provider configuration examples here: https://github.com/google/go-cloud/tree/master/pubsub.

NATS
import (
    cloudwatcher "github.com/bartventer/casbin-go-cloud-watcher"
    // Enable NATS driver
    _ "github.com/bartventer/casbin-go-cloud-watcher/drivers/natspubsub"
    
    "github.com/casbin/casbin/v2"
)

func main() {
    // This URL will Dial the NATS server at the URL in the environment variable
	  // NATS_SERVER_URL and send messages with subject "casbin-policy-updates".
    os.Setenv("NATS_SERVER_URL", "nats://localhost:4222")
    ctx, cancel := context.WithCancel(context.Background())
	  defer cancel()
    watcher, _ := cloudwatcher.New(ctx, "nats://casbin-policy-updates")

    enforcer := casbin.NewSyncedEnforcer("model.conf", "policy.csv")
    enforcer.SetWatcher(watcher)

    watcher.SetUpdateCallback(func(m string) {
      enforcer.LoadPolicy()
    })
}
In Memory
import (
    cloudwatcher "github.com/bartventer/casbin-go-cloud-watcher"
    // Enable in-memory driver
    _ "github.com/bartventer/casbin-go-cloud-watcher/drivers/mempubsub"
    
    "github.com/casbin/casbin/v2"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
	  defer cancel()
    watcher, _ := cloudwatcher.New(ctx, "mem://topicA")

    enforcer := casbin.NewSyncedEnforcer("model.conf", "policy.csv")
    enforcer.SetWatcher(watcher)

    watcher.SetUpdateCallback(func(m string) {
      enforcer.LoadPolicy()
    })
}
GCP Cloud Pub/Sub

URLs are gcppubsub://projects/myproject/topics/mytopic. The URLs use the project ID and the topic ID. See Application Default Credentials to learn about authentication alternatives, including using environment variables.

import (
    cloudwatcher "github.com/bartventer/casbin-go-cloud-watcher"
    // Enable in GCP pubsub driver
    _ "github.com/bartventer/casbin-go-cloud-watcher/drivers/gcppubsub"

    "github.com/casbin/casbin/v2"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    watcher, _ := cloudwatcher.New(ctx, "gcppubsub://projects/myproject/topics/mytopic")

    enforcer := casbin.NewSyncedEnforcer("model.conf", "policy.csv")
    enforcer.SetWatcher(watcher)

    watcher.SetUpdateCallback(func(m string) {
      enforcer.LoadPolicy()
    })
}
Amazon Simple Notification Service (SNS)

Watcher can publish to an Amazon Simple Notification Service (SNS) topic. SNS URLs in the Go CDK use the Amazon Resource Name (ARN) to identify the topic. You should specify the region query parameter to ensure your application connects to the correct region.

Watcher will create a default AWS Session with the SharedConfigEnable option enabled; if you have authenticated with the AWS CLI, it will use those credentials. See AWS Session to learn about authentication alternatives, including using environment variables.

import (
    cloudwatcher "github.com/bartventer/casbin-go-cloud-watcher"
    // Enable AWS SQS & SNS driver
    _ "github.com/bartventer/casbin-go-cloud-watcher/drivers/awssnssqs"


    "github.com/casbin/casbin/v2"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    const topicARN = "arn:aws:sns:us-east-2:123456789012:mytopic"
    // Note the 3 slashes; ARNs have multiple colons and therefore aren't valid
    // as hostnames in the URL.
    watcher, _ := cloudwatcher.New(ctx, "awssns:///"+topicARN+"?region=us-east-2")

    enforcer := casbin.NewSyncedEnforcer("model.conf", "policy.csv")
    enforcer.SetWatcher(watcher)

    watcher.SetUpdateCallback(func(m string) {
      enforcer.LoadPolicy()
    })
}
Amazon Simple Queue Service (SQS)

Watcher can publish to an Amazon Simple Queue Service (SQS) topic. SQS URLs closely resemble the the queue URL, except the leading https:// is replaced with awssqs://. You can specify the region query parameter to ensure your application connects to the correct region, but otherwise watcher will use the region found in the environment variables or your AWS CLI configuration.

import (
    cloudwatcher "github.com/bartventer/casbin-go-cloud-watcher"/
    // Enable AWS SQS & SNS driver
    _ "github.com/bartventer/casbin-go-cloud-watcher/drivers/awssnssqs"
    "github.com/casbin/casbin/v2"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    // https://docs.aws.amazon.com/sdk-for-net/v2/developer-guide/QueueURL.html
    const queueURL = "https://sqs.us-east-2.amazonaws.com/123456789012/myqueue"
    watcher, _ := cloudwatcher.New(ctx, "awssqs://"+queueURL+"?region=us-east-2")

    enforcer := casbin.NewSyncedEnforcer("model.conf", "policy.csv")
    enforcer.SetWatcher(watcher)

    watcher.SetUpdateCallback(func(m string) {
      enforcer.LoadPolicy()
    })
}
Azure Service Bus

Watcher can publish to an Azure Service Bus topic over AMQP 1.0. The URL for publishing is the topic name. pubsub.OpenTopic will use the environment variable SERVICEBUS_CONNECTION_STRING to obtain the Service Bus connection string. The connection string can be obtained from the Azure portal.

import (
    cloudwatcher "github.com/bartventer/casbin-go-cloud-watcher"
    // Enable Azure Service Bus driver
    _ "github.com/bartventer/casbin-go-cloud-watcher/drivers/azuresb"

    "github.com/casbin/casbin/v2"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()    
    
    // Watcher creates a *pubsub.Topic from a URL.
    // This URL will open the topic "mytopic" using a connection string
    // from the environment variable SERVICEBUS_CONNECTION_STRING.
    const queueURL = "https://sqs.us-east-2.amazonaws.com/123456789012/myqueue"
    watcher, _ := cloudwatcher.New(ctx, "azuresb://mytopic")

    enforcer := casbin.NewSyncedEnforcer("model.conf", "policy.csv")
    enforcer.SetWatcher(watcher)

    watcher.SetUpdateCallback(func(m string) {
      enforcer.LoadPolicy()
    })
}
Kafka

Watcher can publish to a Kafka cluster. A Kafka URL only includes the topic name. The brokers in the Kafka cluster are discovered from the KAFKA_BROKERS environment variable (which is a comma-delimited list of hosts, something like 1.2.3.4:9092,5.6.7.8:9092).

import (
    cloudwatcher "github.com/bartventer/casbin-go-cloud-watcher"
    // Enable Kafka driver
    _ "github.com/bartventer/casbin-go-cloud-watcher/drivers/kafkapubsub"

    "github.com/casbin/casbin/v2"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()    
    
    // Watcher creates a *pubsub.Topic from a URL.
    // The host + path are the topic name to send to.
    // The set of brokers must be in an environment variable KAFKA_BROKERS.
    const queueURL = "https://sqs.us-east-2.amazonaws.com/123456789012/myqueue"
    watcher, _ := cloudwatcher.New(ctx, "kafka://my-topic")

    enforcer := casbin.NewSyncedEnforcer("model.conf", "policy.csv")
    enforcer.SetWatcher(watcher)

    watcher.SetUpdateCallback(func(m string) {
      enforcer.LoadPolicy()
    })
}

About Go Cloud Dev

Portable Cloud APIs in Go. Strives to implement these APIs for the leading Cloud providers: AWS, GCP and Azure, as well as provide a local (on-prem) implementation such as Kafka, NATS, etc.

Using the Go CDK you can write your application code once using these idiomatic APIs, test locally using the local versions, and then deploy to a cloud provider with only minimal setup-time changes.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgement

This project is based of a fork of casbin-go-cloud-watcher by rusenask.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotConnected = errors.New("pubsub not connected, cannot dispatch update message")
)

Errors

Functions

func DefaultCallback

func DefaultCallback(e casbin.IEnforcer) func(string)

DefaultCallback is the default callback function that the watcher will call when the policy in DB has been changed by other instances.

Types

type MSG

type MSG struct {
	Method      UpdateType `json:"method"`                 // the update method
	ID          string     `json:"id"`                     // the unique ID of the watcher instance
	Sec         string     `json:"sec,omitempty"`          // the section of the policy
	Ptype       string     `json:"ptype,omitempty"`        // the policy type
	OldRules    [][]string `json:"old_rules,omitempty"`    // the old rules
	NewRules    [][]string `json:"new_rules,omitempty"`    // the new rules
	FieldIndex  int        `json:"field_index,omitempty"`  // the field index
	FieldValues []string   `json:"field_values,omitempty"` // the field values
}

MSG is the payload for the pubsub message

func (MSG) MarshalBinary

func (m MSG) MarshalBinary() ([]byte, error)

MarshalBinary implements the encoding.BinaryMarshaler interface.

func (*MSG) UnmarshalBinary

func (m *MSG) UnmarshalBinary(data []byte) error

UnmarshalBinary implements the encoding.BinaryUnmarshaler interface.

type Option

type Option struct {
	Verbose bool   // Verbose indicates the watcher should print verbose log.
	LocalID string // LocalID indicates the watcher's local ID, used to ignore self update event. Generates a random id if not specified.
}

Option is the option for the watcher.

type UpdateType

type UpdateType string

UpdateType is the type of update

const (
	Update                        UpdateType = "Update"                        // Update is the default update type
	UpdateForAddPolicy            UpdateType = "UpdateForAddPolicy"            // UpdateForAddPolicy is the update type for AddPolicy
	UpdateForRemovePolicy         UpdateType = "UpdateForRemovePolicy"         // UpdateForRemovePolicy is the update type for RemovePolicy
	UpdateForRemoveFilteredPolicy UpdateType = "UpdateForRemoveFilteredPolicy" // UpdateForRemoveFilteredPolicy is the update type for RemoveFilteredPolicy
	UpdateForSavePolicy           UpdateType = "UpdateForSavePolicy"           // UpdateForSavePolicy is the update type for SavePolicy
	UpdateForAddPolicies          UpdateType = "UpdateForAddPolicies"          // UpdateForAddPolicies is the update type for AddPolicies
	UpdateForRemovePolicies       UpdateType = "UpdateForRemovePolicies"       // UpdateForRemovePolicies is the update type for RemovePolicies
	UpdateForUpdatePolicy         UpdateType = "UpdateForUpdatePolicy"         // UpdateForUpdatePolicy is the update type for UpdatePolicy
	UpdateForUpdatePolicies       UpdateType = "UpdateForUpdatePolicies"       // UpdateForUpdatePolicies is the update type for UpdatePolicies
)

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

Watcher implements Casbin updates watcher to synchronize policy changes between the nodes

func New

func New(ctx context.Context, url string) (*Watcher, error)

New creates a new watcher

Parameters:

  • ctx: the context for pubsub connections
  • url: the pubsub url (e.g. "kafka://my-topic")

Returns:

  • Watcher: the new watcher instance
  • error: the error if the watcher cannot be created

func NewWithOption

func NewWithOption(ctx context.Context, url string, opt Option) (*Watcher, error)

NewWithOption creates a new watcher with the option

Parameters:

  • ctx: the context for pubsub connections
  • url: the pubsub url (e.g. "kafka://my-topic")
  • opt: the watcher option

Returns:

  • Watcher: the new watcher instance
  • error: the error if the watcher cannot be created

func (*Watcher) Close

func (w *Watcher) Close()

Close stops and releases the watcher, the callback function will not be called any more.

func (*Watcher) GetLocalID

func (w *Watcher) GetLocalID() string

GetLocalID gets the local id for the option.

func (*Watcher) GetVerbose

func (w *Watcher) GetVerbose() bool

GetVerbose gets the verbose for the option.

func (*Watcher) SetUpdateCallback

func (w *Watcher) SetUpdateCallback(callbackFunc func(string)) error

SetUpdateCallback sets the callback function that the watcher will call when the policy in DB has been changed by other instances. A classic callback is Enforcer.LoadPolicy().

func (*Watcher) Update

func (w *Watcher) Update() error

Update calls the update callback of other instances to synchronize their policy. It is usually called after changing the policy in DB, like Enforcer.SavePolicy(), Enforcer.AddPolicy(), Enforcer.RemovePolicy(), etc.

func (*Watcher) UpdateForAddPolicies

func (w *Watcher) UpdateForAddPolicies(sec string, ptype string, rules ...[]string) error

UpdateForAddPolicies calls the update callback of other instances to synchronize their policy. It is called after Enforcer.AddPolicies(), Enforcer.AddNamedPolicies(), Enforcer.AddGroupingPolicies() and Enforcer.AddNamedGroupingPolicies().

func (*Watcher) UpdateForAddPolicy

func (w *Watcher) UpdateForAddPolicy(sec string, ptype string, params ...string) error

UpdateForAddPolicy calls the update callback of other instances to synchronize their policy. It is called after a policy is added via Enforcer.AddPolicy(), Enforcer.AddNamedPolicy(), Enforcer.AddGroupingPolicy() and Enforcer.AddNamedGroupingPolicy().

func (*Watcher) UpdateForRemoveFilteredPolicy

func (w *Watcher) UpdateForRemoveFilteredPolicy(sec string, ptype string, fieldIndex int, fieldValues ...string) error

UpdateForRemoveFilteredPolicy calls the update callback of other instances to synchronize their policy. It is called after Enforcer.RemoveFilteredPolicy(), Enforcer.RemoveFilteredNamedPolicy(), Enforcer.RemoveFilteredGroupingPolicy() and Enforcer.RemoveFilteredNamedGroupingPolicy().

func (*Watcher) UpdateForRemovePolicies

func (w *Watcher) UpdateForRemovePolicies(sec string, ptype string, rules ...[]string) error

UpdateForRemovePolicies calls the update callback of other instances to synchronize their policy. It is called after Enforcer.RemovePolicies(), Enforcer.RemoveNamedPolicies(), Enforcer.RemoveGroupingPolicies() and Enforcer.RemoveNamedGroupingPolicies().

func (*Watcher) UpdateForRemovePolicy

func (w *Watcher) UpdateForRemovePolicy(sec string, ptype string, params ...string) error

UPdateForRemovePolicy calls the update callback of other instances to synchronize their policy. It is called after a policy is removed by Enforcer.RemovePolicy(), Enforcer.RemoveNamedPolicy(), Enforcer.RemoveGroupingPolicy() and Enforcer.RemoveNamedGroupingPolicy().

func (*Watcher) UpdateForSavePolicy

func (w *Watcher) UpdateForSavePolicy(model model.Model) error

UpdateForSavePolicy calls the update callback of other instances to synchronize their policy. It is called after Enforcer.SavePolicy()

func (*Watcher) UpdateForUpdatePolicies

func (w *Watcher) UpdateForUpdatePolicies(sec string, ptype string, oldRules, newRules [][]string) error

UpdateForUpdatePolicies calls the update callback of other instances to synchronize their policy. It is called after Enforcer.UpdatePolicies()

func (*Watcher) UpdateForUpdatePolicy

func (w *Watcher) UpdateForUpdatePolicy(sec string, ptype string, oldRule, newRule []string) error

UpdateForUpdatePolicy calls the update callback of other instances to synchronize their policy. It is called after Enforcer.UpdatePolicy()

Directories

Path Synopsis
drivers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL