templates

package
v0.18.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2021 License: Apache-2.0 Imports: 11 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Funcs = template.FuncMap{
	"resourceBelongsToProject": utils.IsProjectResource,
	"join":                     strings.Join,
	"lowercase":                strings.ToLower,
	"lower_camel":              strcase.ToLowerCamel,
	"upper_camel":              strcase.ToCamel,
	"snake":                    strcase.ToSnake,
	"p":                        gendoc.PFilter,
	"para":                     gendoc.ParaFilter,
	"nobr":                     gendoc.NoBrFilter,
	"fieldType":                fieldType,
	"yamlType":                 yamlType,
	"noescape":                 noEscape,
	"printfptr":                printPointer,
	"remove_magic_comments": func(in string) string {
		lines := strings.Split(in, "\n")
		var linesWithoutMagicComments []string
		for _, line := range lines {
			if magicCommentRegex.MatchString(line) {
				continue
			}
			linesWithoutMagicComments = append(linesWithoutMagicComments, line)
		}
		return strings.Join(linesWithoutMagicComments, "\n")
	},
	"new_str_slice": func() *[]string {
		var v []string
		return &v
	},
	"append_str_slice": func(to *[]string, str string) *[]string {
		*to = append(*to, str)
		return to
	},
	"join_str_slice": func(slc *[]string, sep string) string {
		return strings.Join(*slc, sep)
	},
	"new_bool": func() *bool {
		var v bool
		return &v
	},
	"set_bool": func(v *bool, val bool) *bool {
		*v = val
		return v
	},
	"unique": func(vals []string) []string {
		result := make([]string, 0, len(vals))
		for _, v := range vals {
			if !stringutils.ContainsString(v, result) {
				result = append(result, v)
			}
		}
		return result
	},
	"backtick": func() string {
		return "`"
	},
}
View Source
var ProjectTestSuiteTemplate = template.Must(template.New("project_template").Funcs(Funcs).Parse(`package {{ .ProjectConfig.Version }}

{{- $uniqueCrds := new_str_slice }}
{{- range .Resources}}
{{- if  ne .ProtoPackage ""}}
{{- $uniqueCrds := (append_str_slice $uniqueCrds  (printf "%v.%v"  .PluralName .ProtoPackage))}}
{{- end }}
{{- end }}
{{- $uniqueCrds := (unique $uniqueCrds)}}

import (
	"context"
	"testing"

	. "github.com/onsi/ginkgo"
	. "github.com/onsi/gomega"
	
	"github.com/solo-io/k8s-utils/kubeutils"
	"github.com/solo-io/k8s-utils/testutils/clusterlock"
	"github.com/solo-io/solo-kit/test/testutils"
	apiexts "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func Test{{ upper_camel .ProjectConfig.Name }}(t *testing.T) {
	RegisterFailHandler(Fail)
	RunSpecs(t, "{{ upper_camel .ProjectConfig.Name }} Suite")
}


var (	
	lock      *clusterlock.TestClusterLocker
	cfg       *rest.Config

	_ = SynchronizedAfterSuite(func() {}, func() {
		if os.Getenv("RUN_KUBE_TESTS") != "1" {
			return
		}
		ctx := context.Background()
		var err error
		cfg, err = kubeutils.GetConfig("", "")
		Expect(err).NotTo(HaveOccurred())
		clientset, err := apiexts.NewForConfig(cfg)
		Expect(err).NotTo(HaveOccurred())
		
		{{- range $uniqueCrds}}
		err = clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Delete(ctx, "{{lowercase .}}", metav1.DeleteOptions{})
		testutils.ErrorNotOccuredOrNotFound(err)
		{{- end}}
		Expect(lock.ReleaseLock()).NotTo(HaveOccurred())
	})

	_ = SynchronizedBeforeSuite(func() []byte {
		if os.Getenv("RUN_KUBE_TESTS") != "1" {
			return nil
		}
		var err error
		cfg, err = kubeutils.GetConfig("", "")
		Expect(err).NotTo(HaveOccurred())
		clientset, err := kubernetes.NewForConfig(cfg)
		Expect(err).NotTo(HaveOccurred())
		lock, err = clusterlock.NewTestClusterLocker(clientset, clusterlock.Options{})
		Expect(err).NotTo(HaveOccurred())
		Expect(lock.AcquireLock()).NotTo(HaveOccurred())
		return nil
	}, func([]byte) {})

)


`))
View Source
var ResourceClientTemplate = template.Must(template.New("resource_reconciler").Funcs(Funcs).Parse(`package {{ .Project.ProjectConfig.Version }}

import (
	"context"

	"github.com/solo-io/solo-kit/pkg/api/v1/clients"
	"github.com/solo-io/solo-kit/pkg/api/v1/clients/factory"
	"github.com/solo-io/solo-kit/pkg/api/v1/resources"
	"github.com/solo-io/solo-kit/pkg/errors"
)

type {{ .Name }}Watcher interface {
{{- if .ClusterScoped }}
	// watch cluster-scoped {{ .PluralName }}
	Watch(opts clients.WatchOpts) (<-chan {{ .Name }}List, <-chan error, error)
{{- else }}
	// watch namespace-scoped {{ .PluralName }}
	Watch(namespace string, opts clients.WatchOpts) (<-chan {{ .Name }}List, <-chan error, error)
{{- end }}
}

type {{ .Name }}Client interface {
	BaseClient() clients.ResourceClient
	Register() error
{{- if .ClusterScoped }}
	Read(name string, opts clients.ReadOpts) (*{{ .Name }}, error)
{{- else }}
	Read(namespace, name string, opts clients.ReadOpts) (*{{ .Name }}, error)
{{- end }}
	Write(resource *{{ .Name }}, opts clients.WriteOpts) (*{{ .Name }}, error)
{{- if .ClusterScoped }}
	Delete(name string, opts clients.DeleteOpts) error
	List(opts clients.ListOpts) ({{ .Name }}List, error)
{{- else }}
	Delete(namespace, name string, opts clients.DeleteOpts) error
	List(namespace string, opts clients.ListOpts) ({{ .Name }}List, error)
{{- end }}
	{{ .Name }}Watcher
}

type {{ lower_camel .Name }}Client struct {
	rc clients.ResourceClient
}

func New{{ .Name }}Client(ctx context.Context, rcFactory factory.ResourceClientFactory) ({{ .Name }}Client, error) {
	return New{{ .Name }}ClientWithToken(ctx, rcFactory, "")
}

func New{{ .Name }}ClientWithToken(ctx context.Context, rcFactory factory.ResourceClientFactory, token string) ({{ .Name }}Client, error) {
	rc, err := rcFactory.NewResourceClient(ctx, factory.NewResourceClientParams{
		ResourceType: &{{ .Name }}{},
		Token: token,
	})
	if err != nil {
		return nil, errors.Wrapf(err, "creating base {{ .Name }} resource client")
	}
	return New{{ .Name }}ClientWithBase(rc), nil
}

func New{{ .Name }}ClientWithBase(rc clients.ResourceClient) {{ .Name }}Client {
	return &{{ lower_camel .Name }}Client{
		rc: rc,
	}
}

func (client *{{ lower_camel .Name }}Client) BaseClient() clients.ResourceClient {
	return client.rc
}

func (client *{{ lower_camel .Name }}Client) Register() error {
	return client.rc.Register()
}

{{ if .ClusterScoped }}
func (client *{{ lower_camel .Name }}Client) Read(name string, opts clients.ReadOpts) (*{{ .Name }}, error) {
{{- else }}
func (client *{{ lower_camel .Name }}Client) Read(namespace, name string, opts clients.ReadOpts) (*{{ .Name }}, error) {
{{- end }}
	opts = opts.WithDefaults()
{{ if .ClusterScoped }}
	resource, err := client.rc.Read("", name, opts)
{{- else }}
	resource, err := client.rc.Read(namespace, name, opts)
{{- end }}
	if err != nil {
		return nil, err
	}
	return resource.(*{{ .Name }}), nil
}

func (client *{{ lower_camel .Name }}Client) Write({{ lower_camel .Name }} *{{ .Name }}, opts clients.WriteOpts) (*{{ .Name }}, error) {
	opts = opts.WithDefaults()
	resource, err := client.rc.Write({{ lower_camel .Name }}, opts)
	if err != nil {
		return nil, err
	}
	return resource.(*{{ .Name }}), nil
}

{{ if .ClusterScoped }}
func (client *{{ lower_camel .Name }}Client) Delete(name string, opts clients.DeleteOpts) error {
{{- else }}
func (client *{{ lower_camel .Name }}Client) Delete(namespace, name string, opts clients.DeleteOpts) error {
{{- end }}
	opts = opts.WithDefaults()
{{ if .ClusterScoped }}
	return client.rc.Delete("", name, opts)
{{- else }}
	return client.rc.Delete(namespace, name, opts)
{{- end }}
}

{{ if .ClusterScoped }}
func (client *{{ lower_camel .Name }}Client) List(opts clients.ListOpts) ({{ .Name }}List, error) {
{{- else }}
func (client *{{ lower_camel .Name }}Client) List(namespace string, opts clients.ListOpts) ({{ .Name }}List, error) {
{{- end }}
	opts = opts.WithDefaults()
{{ if .ClusterScoped }}
	resourceList, err := client.rc.List("", opts)
{{- else }}
	resourceList, err := client.rc.List(namespace, opts)
{{- end }}
	if err != nil {
		return nil, err
	}
	return convertTo{{ .Name }}(resourceList), nil
}

{{ if .ClusterScoped }}
func (client *{{ lower_camel .Name }}Client) Watch(opts clients.WatchOpts) (<-chan {{ .Name }}List, <-chan error, error) {
{{- else }}
func (client *{{ lower_camel .Name }}Client) Watch(namespace string, opts clients.WatchOpts) (<-chan {{ .Name }}List, <-chan error, error) {
{{- end }}
	opts = opts.WithDefaults()
{{ if .ClusterScoped }}
	resourcesChan, errs, initErr := client.rc.Watch("", opts)
{{- else }}
	resourcesChan, errs, initErr := client.rc.Watch(namespace, opts)
{{- end }}
	if initErr != nil {
		return nil, nil, initErr
	}
	{{ lower_camel .PluralName }}Chan := make(chan {{ .Name }}List)
	go func() {
		for {
			select {
			case resourceList := <-resourcesChan:
				select {
					case {{ lower_camel .PluralName }}Chan <- convertTo{{ .Name }}(resourceList):
					case <-opts.Ctx.Done():
						close({{ lower_camel .PluralName }}Chan)
						return
				}
			case <-opts.Ctx.Done():
				close({{ lower_camel .PluralName }}Chan)
				return
			}
		}
	}()
	return {{ lower_camel .PluralName }}Chan, errs, nil
}

func convertTo{{ .Name }}(resources resources.ResourceList) {{ .Name }}List {
	var {{ lower_camel .Name }}List {{ .Name }}List
	for _, resource := range resources {
		{{ lower_camel .Name }}List = append({{ lower_camel .Name }}List, resource.(*{{ .Name }}))
	}
	return {{ lower_camel .Name }}List
}

`))
View Source
var ResourceClientTestTemplate = template.Must(template.New("resource_client_test").Funcs(Funcs).Parse(`// +build solokit

package {{ .Project.ProjectConfig.Version }}

import (
	"context"
	"time"

	. "github.com/onsi/ginkgo"
	. "github.com/onsi/gomega"
	"github.com/solo-io/solo-kit/test/helpers"
	"github.com/solo-io/solo-kit/pkg/api/v1/clients"
	"github.com/solo-io/solo-kit/pkg/api/v1/resources"
	"github.com/solo-io/solo-kit/pkg/errors"
	"github.com/solo-io/solo-kit/pkg/api/v1/resources/core"
	"github.com/solo-io/solo-kit/test/helpers"
	"github.com/solo-io/solo-kit/test/tests/typed"
)

var _ = Describe("{{ .Name }}Client", func() {
	var ctx context.Context
{{- if (not .ClusterScoped) }}
	var (
		namespace string
	)
{{- end }}
	for _, test := range []typed.ResourceClientTester{
{{- if (not .IsCustom) }}
		&typed.KubeRcTester{Crd: {{ .Name }}Crd},
{{- end }}
{{- /* cluster-scoped resources are currently only supported by crd client */}}
{{- if (not .ClusterScoped) }}
		&typed.ConsulRcTester{},
		&typed.FileRcTester{},
		&typed.MemoryRcTester{},
		&typed.VaultRcTester{},
		&typed.KubeSecretRcTester{},
		&typed.KubeConfigMapRcTester{},
{{- end }}
	} {
		Context("resource client backed by "+test.Description(), func() {
			var (
				client {{ .Name }}Client
				err    error
				name1, name2, name3 = "foo"+helpers.RandString(3), "boo"+helpers.RandString(3), "goo"+helpers.RandString(3)
			)


{{- if .ClusterScoped }}
{{/* cluster-scoped resources get no namespace, must delete individual resources*/}}
			BeforeEach(func() {
				ctx = context.Background()
				factory := test.Setup(ctx, "")
				client, err = New{{ .Name }}Client(ctx, factory)
				Expect(err).NotTo(HaveOccurred())
			})
			AfterEach(func() {
				client.Delete(name1, clients.DeleteOpts{})
				client.Delete(name2, clients.DeleteOpts{})
				client.Delete(name3, clients.DeleteOpts{})
			})
			It("CRUDs {{ .Name }}s "+test.Description(), func() {
				{{ .Name }}ClientTest(client, name1, name2, name3)
			})
{{- else }}
{{/* non-cluster-scoped resources get a namespace and then the ns is deleted*/}}
			BeforeEach(func() {
				namespace = helpers.RandString(6)
				ctx = context.Background()
				factory := test.Setup(ctx, namespace)
				client, err = New{{ .Name }}Client(ctx, factory)
				Expect(err).NotTo(HaveOccurred())
			})
			AfterEach(func() {
				test.Teardown(ctx, namespace)
			})
			It("CRUDs {{ .Name }}s "+test.Description(), func() {
				{{ .Name }}ClientTest(namespace, client, name1, name2, name3)
			})
{{- end }}
		})
	}
})


{{- if .ClusterScoped }}
func {{ .Name }}ClientTest(client {{ .Name }}Client, name1, name2, name3 string) {
{{- else }}
func {{ .Name }}ClientTest(namespace string, client {{ .Name }}Client, name1, name2, name3 string) {
{{- end }}
	err := client.Register()
	Expect(err).NotTo(HaveOccurred())

	name := name1

{{- if .ClusterScoped }}
	input := New{{ .Name }}("", name)
{{- else }}
	input := New{{ .Name }}(namespace, name)
{{- end }}

	r1, err := client.Write(input, clients.WriteOpts{})
	Expect(err).NotTo(HaveOccurred())

	_, err = client.Write(input, clients.WriteOpts{})
	Expect(err).To(HaveOccurred())
	Expect(errors.IsExist(err)).To(BeTrue())

	Expect(r1).To(BeAssignableToTypeOf(&{{ .Name }}{}))
	Expect(r1.GetMetadata().Name).To(Equal(name))

{{- if (not .ClusterScoped) }}
	Expect(r1.GetMetadata().Namespace).To(Equal(namespace))
{{- end }}
	Expect(r1.GetMetadata().ResourceVersion).NotTo(Equal(input.GetMetadata().ResourceVersion))
	Expect(r1.GetMetadata().Ref()).To(Equal(input.GetMetadata().Ref()))
	{{- range .Fields }}
		{{- if and (not (eq .Name "metadata")) (not .IsOneof) }}
	Expect(r1.{{ upper_camel .Name }}).To(Equal(input.{{ upper_camel .Name }}))
		{{- end }}
	{{- end }}

	_, err = client.Write(input, clients.WriteOpts{
		OverwriteExisting: true,
	})
	Expect(err).To(HaveOccurred())

	resources.UpdateMetadata(input, func(meta *core.Metadata) {
		meta.ResourceVersion = r1.GetMetadata().ResourceVersion
	})
	r1, err = client.Write(input, clients.WriteOpts{
		OverwriteExisting: true,
	})
	Expect(err).NotTo(HaveOccurred())


{{- if .ClusterScoped }}
	read, err := client.Read(name, clients.ReadOpts{})
{{- else }}
	read, err := client.Read(namespace, name, clients.ReadOpts{})
{{- end }}
	Expect(err).NotTo(HaveOccurred())
	Expect(read).To(Equal(r1))


{{- if (not .ClusterScoped) }}
	_, err = client.Read("doesntexist", name, clients.ReadOpts{})
	Expect(err).To(HaveOccurred())
	Expect(errors.IsNotExist(err)).To(BeTrue())
{{- end }}

	name = name2
	input = &{{ .Name }}{}

	input.SetMetadata(&core.Metadata{
		Name:      name,
{{- if (not .ClusterScoped) }}
		Namespace: namespace,
{{- end }}
	})

	r2, err := client.Write(input, clients.WriteOpts{})
	Expect(err).NotTo(HaveOccurred())


{{- if .ClusterScoped }}
	list, err := client.List(clients.ListOpts{})
{{- else }}
	list, err := client.List(namespace, clients.ListOpts{})
{{- end }}
	Expect(err).NotTo(HaveOccurred())
	Expect(list).To(ContainElement(r1))
	Expect(list).To(ContainElement(r2))


{{- if .ClusterScoped }}
	err = client.Delete("adsfw", clients.DeleteOpts{})
{{- else }}
	err = client.Delete(namespace, "adsfw", clients.DeleteOpts{})
{{- end }}
	Expect(err).To(HaveOccurred())
	Expect(errors.IsNotExist(err)).To(BeTrue())


{{- if .ClusterScoped }}
	err = client.Delete("adsfw", clients.DeleteOpts{
{{- else }}
	err = client.Delete(namespace, "adsfw", clients.DeleteOpts{
{{- end }}
		IgnoreNotExist: true,
	})
	Expect(err).NotTo(HaveOccurred())


{{- if .ClusterScoped }}
	err = client.Delete(r2.GetMetadata().Name, clients.DeleteOpts{})
{{- else }}
	err = client.Delete(namespace, r2.GetMetadata().Name, clients.DeleteOpts{})
{{- end }}
	Expect(err).NotTo(HaveOccurred())

	Eventually(func() {{ .Name }}List {
{{- if .ClusterScoped }}
		list, err = client.List(clients.ListOpts{})
{{- else }}
		list, err = client.List(namespace, clients.ListOpts{})
{{- end }}
		Expect(err).NotTo(HaveOccurred())
		return list
	}, time.Second * 10).Should(ContainElement(r1))
	Eventually(func() {{ .Name }}List {
{{- if .ClusterScoped }}
		list, err = client.List(clients.ListOpts{})
{{- else }}
		list, err = client.List(namespace, clients.ListOpts{})
{{- end }}
		Expect(err).NotTo(HaveOccurred())
		return list
	}, time.Second * 10).ShouldNot(ContainElement(r2))

{{- if .ClusterScoped }}
	w, errs, err := client.Watch(clients.WatchOpts{
{{- else }}
	w, errs, err := client.Watch(namespace, clients.WatchOpts{
{{- end }}
		RefreshRate: time.Hour,
	})
	Expect(err).NotTo(HaveOccurred())

	var r3 resources.Resource
	wait := make(chan struct{})
	go func() {
		defer close(wait)
		defer GinkgoRecover()

		resources.UpdateMetadata(r2, func(meta *core.Metadata) {
			meta.ResourceVersion = ""
		})
		r2, err = client.Write(r2, clients.WriteOpts{})
		Expect(err).NotTo(HaveOccurred())

		name = name3
		input = &{{ .Name }}{}
		Expect(err).NotTo(HaveOccurred())
		input.SetMetadata(&core.Metadata{
			Name:      name,
{{- if (not .ClusterScoped) }}
			Namespace: namespace,
{{- end }}
		})

		r3, err = client.Write(input, clients.WriteOpts{})
		Expect(err).NotTo(HaveOccurred())
	}()
	<-wait

	select {
	case err := <-errs:
		Expect(err).NotTo(HaveOccurred())
	case list = <-w:
	case <-time.After(time.Millisecond * 5):
		Fail("expected a message in channel")
	}

	go func() {
		defer GinkgoRecover()
		for {
			select {
			case err := <-errs:
				Expect(err).NotTo(HaveOccurred())
			case <-time.After(time.Second / 4):
				return
			}
		}
	}()

	Eventually(w, time.Second*5, time.Second/10).Should(Receive(And(ContainElement(r1), ContainElement(r3), ContainElement(r3))))
}
`))
View Source
var ResourceGroupEmitterTemplate = template.Must(template.New("resource_group_emitter").Funcs(Funcs).Parse(
	`package {{ .Project.ProjectConfig.Version }}

{{- $client_declarations := new_str_slice }}
{{- $clients := new_str_slice }}
{{- range .Resources}}
{{- $client_declarations := (append_str_slice $client_declarations (printf "%vClient %v%vClient"  (lower_camel .Name) .ImportPrefix .Name)) }}
{{- $clients := (append_str_slice $clients (printf "%vClient"  (lower_camel .Name))) }}
{{- end}}
{{- $client_declarations := (join_str_slice $client_declarations ", ") }}
{{- $clients := (join_str_slice $clients ", ") }}

import (
	"fmt"
	"sync"
	"time"

	{{ .Imports }}
	"go.opencensus.io/stats"
	"go.opencensus.io/stats/view"
	"go.opencensus.io/tag"
	"go.uber.org/zap"


	"github.com/solo-io/solo-kit/pkg/api/v1/clients"
	"github.com/solo-io/solo-kit/pkg/errors"
	skstats "github.com/solo-io/solo-kit/pkg/stats"
	
	"github.com/solo-io/go-utils/errutils"
	"github.com/solo-io/go-utils/contextutils"
)

{{ $emitter_prefix := (print (snake .Name) "/emitter") }}
{{ $resource_group := upper_camel .GoName }}
var (
	// Deprecated. See m{{ $resource_group }}ResourcesIn
	m{{ $resource_group }}SnapshotIn  = stats.Int64("{{ $emitter_prefix }}/snap_in", "Deprecated. Use {{ $emitter_prefix }}/resources_in. The number of snapshots in", "1")
	
	// metrics for emitter
	m{{ $resource_group }}ResourcesIn = stats.Int64("{{ $emitter_prefix }}/resources_in", "The number of resource lists received on open watch channels", "1")
	m{{ $resource_group }}SnapshotOut = stats.Int64("{{ $emitter_prefix }}/snap_out", "The number of snapshots out", "1")
	m{{ $resource_group }}SnapshotMissed = stats.Int64("{{ $emitter_prefix }}/snap_missed", "The number of snapshots missed", "1")

	// views for emitter
	// deprecated: see {{ lower_camel .GoName }}ResourcesInView
	{{ lower_camel .GoName }}snapshotInView = &view.View{
		Name:        "{{ $emitter_prefix }}/snap_in",
		Measure:     m{{ $resource_group }}SnapshotIn,
		Description: "Deprecated. Use {{ $emitter_prefix }}/resources_in. The number of snapshots updates coming in.",
		Aggregation: view.Count(),
		TagKeys:     []tag.Key{
		},
	}

	{{ lower_camel .GoName }}ResourcesInView = &view.View{
			Name:        "{{ $emitter_prefix }}/resources_in",
			Measure:     m{{ $resource_group }}ResourcesIn,
			Description: "The number of resource lists received on open watch channels",
			Aggregation: view.Count(),
			TagKeys:     []tag.Key{
				skstats.NamespaceKey,
				skstats.ResourceKey,
			},
	}
	{{ lower_camel .GoName }}snapshotOutView = &view.View{
		Name:        "{{ $emitter_prefix }}/snap_out",
		Measure:     m{{ $resource_group }}SnapshotOut,
		Description: "The number of snapshots updates going out",
		Aggregation: view.Count(),
		TagKeys:     []tag.Key{
		},
	}
	{{ lower_camel .GoName }}snapshotMissedView = &view.View{
			Name:        "{{ $emitter_prefix }}/snap_missed",
			Measure:     m{{ $resource_group }}SnapshotMissed,
			Description: "The number of snapshots updates going missed. this can happen in heavy load. missed snapshot will be re-tried after a second.",
			Aggregation: view.Count(),
			TagKeys:     []tag.Key{
			},
	}


)

func init() {
	view.Register(
		{{ lower_camel .GoName }}snapshotInView, 
		{{ lower_camel .GoName }}snapshotOutView, 
		{{ lower_camel .GoName }}snapshotMissedView,
		{{ lower_camel .GoName }}ResourcesInView,
	)
}

type {{ .GoName }}SnapshotEmitter interface {
	Snapshots(watchNamespaces []string, opts clients.WatchOpts) (<-chan *{{ .GoName }}Snapshot, <-chan error, error)
}

type {{ .GoName }}Emitter interface {
	{{ .GoName }}SnapshotEmitter
	Register() error
{{- range .Resources}}
	{{ .Name }}() {{ .ImportPrefix }}{{ .Name }}Client
{{- end}}
}

func New{{ .GoName }}Emitter({{ $client_declarations }}) {{ .GoName }}Emitter {
	return New{{ .GoName }}EmitterWithEmit({{ $clients }}, make(chan struct{}))
}

func New{{ .GoName }}EmitterWithEmit({{ $client_declarations }}, emit <-chan struct{}) {{ .GoName }}Emitter {
	return &{{ lower_camel .GoName }}Emitter{
{{- range .Resources}}
		{{ lower_camel .Name }}:{{ lower_camel .Name }}Client,
{{- end}}
		forceEmit: emit,
	}
}

type {{ lower_camel .GoName }}Emitter struct {
	forceEmit <- chan struct{}
{{- range .Resources}}
	{{ lower_camel .Name }} {{ .ImportPrefix }}{{ .Name }}Client
{{- end}}
}

func (c *{{ lower_camel .GoName }}Emitter) Register() error {
{{- range .Resources}}
	if err := c.{{ lower_camel .Name }}.Register(); err != nil {
		return err
	}
{{- end}}
	return nil
}

{{- range .Resources}}

func (c *{{ lower_camel $.GoName }}Emitter) {{ .Name }}() {{ .ImportPrefix }}{{ .Name }}Client {
	return c.{{ lower_camel .Name }}
}
{{- end}}

func (c *{{ lower_camel .GoName }}Emitter) Snapshots(watchNamespaces []string, opts clients.WatchOpts) (<-chan *{{ .GoName }}Snapshot, <-chan error, error) {

	if len(watchNamespaces) == 0 {
		watchNamespaces = []string{""}
	}

	for _, ns := range watchNamespaces {
		if ns == "" && len(watchNamespaces) > 1 {
			return nil, nil, errors.Errorf("the \"\" namespace is used to watch all namespaces. Snapshots can either be tracked for "+
				"specific namespaces or \"\" AllNamespaces, but not both.")
		}
	}

	errs := make(chan error)
	var done sync.WaitGroup
	ctx := opts.Ctx


{{- range .Resources}}
	/* Create channel for {{ .Name }} */
{{- if (not .ClusterScoped) }}
	type {{ lower_camel .Name }}ListWithNamespace struct {
		list {{ .ImportPrefix }}{{ .Name }}List
		namespace string
	}
	{{ lower_camel .Name }}Chan := make(chan {{ lower_camel .Name }}ListWithNamespace)

	var initial{{ upper_camel .Name }}List {{ .ImportPrefix }}{{ .Name }}List{{- end }}

{{- end}}

	currentSnapshot := {{ .GoName }}Snapshot{}

	for _, namespace := range watchNamespaces {
{{- range .Resources}}
{{- if (not .ClusterScoped) }}
		/* Setup namespaced watch for {{ .Name }} */
		{
			{{ lower_camel .PluralName }}, err := c.{{ lower_camel .Name }}.List(namespace, clients.ListOpts{Ctx: opts.Ctx, Selector: opts.Selector})
			if err != nil {
				return nil, nil, errors.Wrapf(err, "initial {{ .Name }} list")
			}
			initial{{ upper_camel .Name }}List = append(initial{{ upper_camel .Name }}List, {{ lower_camel .PluralName }}...)
		}
		{{ lower_camel .Name }}NamespacesChan, {{ lower_camel .Name }}Errs, err := c.{{ lower_camel .Name }}.Watch(namespace, opts)
		if err != nil {
			return nil, nil, errors.Wrapf(err, "starting {{ .Name }} watch")
		}

		done.Add(1)
		go func(namespace string) {
			defer done.Done()
			errutils.AggregateErrs(ctx, errs, {{ lower_camel .Name }}Errs, namespace+"-{{ lower_camel .PluralName }}")
		}(namespace)

{{- end }}
{{- end}}

		/* Watch for changes and update snapshot */
		go func(namespace string) {
			for {
				select {
				case <-ctx.Done():
					return
{{- range .Resources}}
{{- if (not .ClusterScoped) }}
				case {{ lower_camel .Name }}List, ok := <- {{ lower_camel .Name }}NamespacesChan:
					if !ok {
						return
					}
					select {
					case <-ctx.Done():
						return
					case {{ lower_camel .Name }}Chan <- {{ lower_camel .Name }}ListWithNamespace{list:{{ lower_camel .Name }}List, namespace:namespace}:
					}
{{- end }}
{{- end}}
				}
			}
		}(namespace)
	}

{{- range .Resources}}
{{- if .ClusterScoped }}
	/* Setup cluster-wide watch for {{ .Name }} */
	var err error
	currentSnapshot.{{ upper_camel .PluralName }},err = c.{{ lower_camel .Name }}.List(clients.ListOpts{Ctx: opts.Ctx, Selector: opts.Selector})
	if err != nil {
		return nil, nil, errors.Wrapf(err, "initial {{ .Name }} list")
	}
	{{ lower_camel .Name }}Chan, {{ lower_camel .Name }}Errs, err := c.{{ lower_camel .Name }}.Watch(opts)
	if err != nil {
		return nil, nil, errors.Wrapf(err, "starting {{ .Name }} watch")
	}
	done.Add(1)
	go func() {
		defer done.Done()
		errutils.AggregateErrs(ctx, errs, {{ lower_camel .Name }}Errs, "{{ lower_camel .PluralName }}")
	}()

{{- else }}
	/* Initialize snapshot for {{ upper_camel .PluralName }} */
	currentSnapshot.{{ upper_camel .PluralName }} = initial{{ upper_camel .Name }}List.Sort()
{{- end }}
{{- end}}

	snapshots := make(chan *{{ .GoName }}Snapshot)
	go func() {
		// sent initial snapshot to kick off the watch
		initialSnapshot := currentSnapshot.Clone()
		snapshots <- &initialSnapshot

		timer := time.NewTicker(time.Second * 1)
		previousHash, err := currentSnapshot.Hash(nil)
		if err != nil {
			contextutils.LoggerFrom(ctx).Panicw("error while hashing, this should never happen", zap.Error(err))
		}
		sync := func() {
			currentHash, err := currentSnapshot.Hash(nil)
			// this should never happen, so panic if it does
			if err != nil {
				contextutils.LoggerFrom(ctx).Panicw("error while hashing, this should never happen", zap.Error(err))
			}
			if previousHash == currentHash {
				return
			}

			sentSnapshot := currentSnapshot.Clone()
			select {
			case snapshots <- &sentSnapshot:
				stats.Record(ctx, m{{ $resource_group }}SnapshotOut.M(1))
				previousHash = currentHash
			default:
				stats.Record(ctx, m{{ $resource_group }}SnapshotMissed.M(1))
			}
		}

		{{- range .Resources}}
		{{- if not .ClusterScoped }}
				{{ lower_camel .PluralName }}ByNamespace := make(map[string]{{ .ImportPrefix }}{{ .Name }}List)
		{{- end }}
		{{- end }}
		defer func() {
			close(snapshots)
			// we must wait for done before closing the error chan,
			// to avoid sending on close channel.
			done.Wait()
			close(errs)
		}()
		for {
			record := func(){stats.Record(ctx, m{{ $resource_group }}SnapshotIn.M(1))}
			
			select {
			case <-timer.C:
				sync()
			case <-ctx.Done():
				return
			case <-c.forceEmit:
				sentSnapshot := currentSnapshot.Clone()
				snapshots <- &sentSnapshot
{{- range .Resources}}
{{- if .ClusterScoped }}
			case {{ lower_camel .Name }}List, ok := <- {{ lower_camel .Name }}Chan:
				if !ok {
					return
				}
				record()

				skstats.IncrementResourceCount(
					ctx,
					"<all>",
					"{{ snake .Name }}",
					m{{ $resource_group }}ResourcesIn,
				)

				currentSnapshot.{{ upper_camel .PluralName }} = {{ lower_camel .Name }}List
{{- else }}
			case {{ lower_camel .Name }}NamespacedList, ok := <- {{ lower_camel .Name }}Chan:
				if !ok {
					return
				}
				record()

				namespace := {{ lower_camel .Name }}NamespacedList.namespace

				skstats.IncrementResourceCount(
					ctx,
					namespace,
					"{{ snake .Name }}",
					m{{ $resource_group }}ResourcesIn,
				)

				// merge lists by namespace
				{{ lower_camel .PluralName }}ByNamespace[namespace] = {{ lower_camel .Name }}NamespacedList.list
				var {{ lower_camel .Name }}List {{ .ImportPrefix }}{{ .Name }}List
				for _, {{ lower_camel .PluralName }} := range {{ lower_camel .PluralName }}ByNamespace {
					{{ lower_camel .Name }}List  = append({{ lower_camel .Name }}List, {{ lower_camel .PluralName }}...)
				}
				currentSnapshot.{{ upper_camel .PluralName }} = {{ lower_camel .Name }}List.Sort()
{{- end }}
{{- end}}
			}
		}
	}()
	return snapshots, errs, nil
}
`))
View Source
var ResourceGroupEmitterTestTemplate = template.Must(template.New("resource_group_emitter_test").Funcs(Funcs).Parse(`// +build solokit

package {{ .Project.ProjectConfig.Version }}

{{- /* we need to know if the tests require a crd client or a regular clientset */ -}}
{{- $clients := new_str_slice }}
{{- $need_kube_config := false }}
{{- range .Resources}}
{{- $clients := (append_str_slice $clients (printf "%vClient"  (lower_camel .Name))) }}
{{- if .HasStatus }}
{{- $need_kube_config = true }}
{{- end}}
{{- end}}
{{- $clients := (join_str_slice $clients ", ") }}

import (
	"context"
	"os"
	"time"

	{{ .Imports }}
	"k8s.io/client-go/kubernetes"
	. "github.com/onsi/ginkgo"
	. "github.com/onsi/gomega"
	"github.com/solo-io/go-utils/log"
	"github.com/solo-io/solo-kit/pkg/api/v1/clients"
	"github.com/solo-io/solo-kit/pkg/api/v1/clients/factory"
	"github.com/solo-io/solo-kit/pkg/api/v1/clients/memory"
	"github.com/solo-io/solo-kit/test/helpers"
	"github.com/solo-io/solo-kit/test/setup"
	"github.com/solo-io/k8s-utils/kubeutils"
	"github.com/solo-io/solo-kit/test/util"
	kuberc "github.com/solo-io/solo-kit/pkg/api/v1/clients/kube"
	"k8s.io/client-go/rest"
	apiext "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"

	// Needed to run tests in GKE
	_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

	// From https://github.com/kubernetes/client-go/blob/53c7adfd0294caa142d961e1f780f74081d5b15f/examples/out-of-cluster-client-configuration/main.go#L31
	_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)

var _ = Describe("{{ upper_camel .Project.ProjectConfig.Version }}Emitter", func() {
	if os.Getenv("RUN_KUBE_TESTS") != "1" {
		log.Printf("This test creates kubernetes resources and is disabled by default. To enable, set RUN_KUBE_TESTS=1 in your env.")
		return
	}
	var (
		ctx 				context.Context
		namespace1          string
		namespace2          string
		name1, name2        = "angela"+helpers.RandString(3), "bob"+helpers.RandString(3)
{{- if $need_kube_config }}
		cfg                *rest.Config
		clientset		   *apiext.Clientset
{{- end}}
		kube                      kubernetes.Interface
		emitter            {{ .GoName }}Emitter
{{- range .Resources }}
		{{ lower_camel .Name }}Client {{ .ImportPrefix }}{{ .Name }}Client
{{- end}}
	)

	BeforeEach(func() {
		ctx = context.Background()
		namespace1 = helpers.RandString(8)
		namespace2 = helpers.RandString(8)
		kube = helpers.MustKubeClient()
		err := kubeutils.CreateNamespacesInParallel(ctx, kube, namespace1, namespace2)
		Expect(err).NotTo(HaveOccurred())
{{- if $need_kube_config }}
		cfg, err = kubeutils.GetConfig("", "")
		Expect(err).NotTo(HaveOccurred())

		clientset, err = apiext.NewForConfig(cfg)
		Expect(err).NotTo(HaveOccurred())
{{- end}}

{{- range .Resources }}
		// {{ .Name }} Constructor

{{- if .HasStatus }}
		{{ lower_camel .Name }}ClientFactory := &factory.KubeResourceClientFactory{
			Crd: {{ .ImportPrefix }}{{ .Name }}Crd,
			Cfg: cfg,
		    SharedCache: kuberc.NewKubeCache(context.TODO()),
		}

		err = helpers.AddAndRegisterCrd(ctx, {{ .ImportPrefix }}{{ .Name }}Crd, clientset)
		Expect(err).NotTo(HaveOccurred())

{{- else }}
		{{ lower_camel .Name }}ClientFactory := &factory.MemoryResourceClientFactory{
			Cache: memory.NewInMemoryResourceCache(),
		}
{{- end }}

		{{ lower_camel .Name }}Client, err = {{ .ImportPrefix }}New{{ .Name }}Client(ctx, {{ lower_camel .Name }}ClientFactory)
		Expect(err).NotTo(HaveOccurred())
{{- end}}
		emitter = New{{ .GoName }}Emitter({{ $clients }})
	})
	AfterEach(func() {
		err := kubeutils.DeleteNamespacesInParallelBlocking(ctx, kube, namespace1, namespace2)
		Expect(err).NotTo(HaveOccurred())
{{- range .Resources }}
{{- if .ClusterScoped }}
		{{ lower_camel .Name }}Client.Delete(name1, clients.DeleteOpts{})
		{{ lower_camel .Name }}Client.Delete(name2, clients.DeleteOpts{})
{{- end }}
{{- end }}
	})
	It("tracks snapshots on changes to any resource", func() {
		ctx := context.Background()
		err := emitter.Register()
		Expect(err).NotTo(HaveOccurred())

		snapshots, errs, err := emitter.Snapshots([]string{namespace1, namespace2}, clients.WatchOpts{
			Ctx: ctx,
			RefreshRate: time.Second,
		})
		Expect(err).NotTo(HaveOccurred())

		var snap *{{ .GoName }}Snapshot
{{- range .Resources }}

		/*
			{{ .Name }}
		*/
		
		assertSnapshot{{ .PluralName }} := func(expect{{ .PluralName }} {{ .ImportPrefix }}{{ .Name }}List, unexpect{{ .PluralName }} {{ .ImportPrefix }}{{ .Name }}List) {
		drain:
			for {
				select {
				case snap = <-snapshots:
					for _, expected := range expect{{ .PluralName }} {
						if _, err := snap.{{ upper_camel .PluralName }}.Find(expected.GetMetadata().Ref().Strings()); err != nil {
							continue drain
						}
					}
					for _, unexpected := range unexpect{{ .PluralName }} {
						if _, err := snap.{{ upper_camel .PluralName }}.Find(unexpected.GetMetadata().Ref().Strings()); err == nil {
							continue drain
						}
					}
					break drain
				case err := <-errs:
					Expect(err).NotTo(HaveOccurred())
				case <-time.After(time.Second * 10):
{{- if .ClusterScoped }}
					combined, _ := {{ lower_camel .Name }}Client.List(clients.ListOpts{})
{{- else }}
					nsList1, _ := {{ lower_camel .Name }}Client.List(namespace1, clients.ListOpts{})
					nsList2, _ := {{ lower_camel .Name }}Client.List(namespace2, clients.ListOpts{})
					combined := append(nsList1, nsList2...)
{{- end }}
					Fail("expected final snapshot before 10 seconds. expected " + log.Sprintf("%v", combined))
				}
			}
		}	

{{- if .ClusterScoped }}
		{{ lower_camel .Name }}1a, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace1, name1), clients.WriteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())

		assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a }, nil)
{{- else }}
		{{ lower_camel .Name }}1a, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace1, name1), clients.WriteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())
		{{ lower_camel .Name }}1b, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace2, name1), clients.WriteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())

		assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}1b }, nil)
{{- end }}

{{- if .ClusterScoped }}
		{{ lower_camel .Name }}2a, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace1, name2), clients.WriteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())

		assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}2a }, nil)
{{- else }}
		{{ lower_camel .Name }}2a, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace1, name2), clients.WriteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())
		{{ lower_camel .Name }}2b, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace2, name2), clients.WriteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())

		assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}1b,  {{ lower_camel .Name }}2a, {{ lower_camel .Name }}2b  }, nil)
{{- end }}

{{- if .ClusterScoped }}

		err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}2a.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())

		assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a }, {{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}2a })
{{- else }}

		err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}2a.GetMetadata().Namespace, {{ lower_camel .Name }}2a.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())
		err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}2b.GetMetadata().Namespace, {{ lower_camel .Name }}2b.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())

		assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}1b }, {{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}2a, {{ lower_camel .Name }}2b })
{{- end }}

{{- if .ClusterScoped }}

		err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}1a.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())

		assertSnapshot{{ .PluralName }}(nil, {{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}2a })
{{- else }}

		err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}1a.GetMetadata().Namespace, {{ lower_camel .Name }}1a.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())
		err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}1b.GetMetadata().Namespace, {{ lower_camel .Name }}1b.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())

		assertSnapshot{{ .PluralName }}(nil, {{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}1b, {{ lower_camel .Name }}2a, {{ lower_camel .Name }}2b })
{{- end }}
{{- end}}
	})
	It("tracks snapshots on changes to any resource using AllNamespace", func() {
		ctx := context.Background()
		err := emitter.Register()
		Expect(err).NotTo(HaveOccurred())

		snapshots, errs, err := emitter.Snapshots([]string{""}, clients.WatchOpts{
			Ctx: ctx,
			RefreshRate: time.Second,
		})
		Expect(err).NotTo(HaveOccurred())

		var snap *{{ .GoName }}Snapshot
{{- range .Resources }}

		/*
			{{ .Name }}
		*/
		
		assertSnapshot{{ .PluralName }} := func(expect{{ .PluralName }} {{ .ImportPrefix }}{{ .Name }}List, unexpect{{ .PluralName }} {{ .ImportPrefix }}{{ .Name }}List) {
		drain:
			for {
				select {
				case snap = <-snapshots:
					for _, expected := range expect{{ .PluralName }} {
						if _, err := snap.{{ upper_camel .PluralName }}.Find(expected.GetMetadata().Ref().Strings()); err != nil {
							continue drain
						}
					}
					for _, unexpected := range unexpect{{ .PluralName }} {
						if _, err := snap.{{ upper_camel .PluralName }}.Find(unexpected.GetMetadata().Ref().Strings()); err == nil {
							continue drain
						}
					}
					break drain
				case err := <-errs:
					Expect(err).NotTo(HaveOccurred())
				case <-time.After(time.Second * 10):
{{- if .ClusterScoped }}
					combined, _ := {{ lower_camel .Name }}Client.List(clients.ListOpts{})
{{- else }}
					nsList1, _ := {{ lower_camel .Name }}Client.List(namespace1, clients.ListOpts{})
					nsList2, _ := {{ lower_camel .Name }}Client.List(namespace2, clients.ListOpts{})
					combined := append(nsList1, nsList2...)
{{- end }}
					Fail("expected final snapshot before 10 seconds. expected " + log.Sprintf("%v", combined))
				}
			}
		}	

{{- if .ClusterScoped }}
		{{ lower_camel .Name }}1a, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace1, name1), clients.WriteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())

		assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a }, nil)
{{- else }}
		{{ lower_camel .Name }}1a, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace1, name1), clients.WriteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())
		{{ lower_camel .Name }}1b, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace2, name1), clients.WriteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())

		assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}1b }, nil)
{{- end }}

{{- if .ClusterScoped }}
		{{ lower_camel .Name }}2a, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace1, name2), clients.WriteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())

		assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}2a }, nil)
{{- else }}
		{{ lower_camel .Name }}2a, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace1, name2), clients.WriteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())
		{{ lower_camel .Name }}2b, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace2, name2), clients.WriteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())

		assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}1b,  {{ lower_camel .Name }}2a, {{ lower_camel .Name }}2b  }, nil)
{{- end }}

{{- if .ClusterScoped }}

		err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}2a.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())

		assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a }, {{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}2a })
{{- else }}

		err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}2a.GetMetadata().Namespace, {{ lower_camel .Name }}2a.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())
		err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}2b.GetMetadata().Namespace, {{ lower_camel .Name }}2b.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())

		assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}1b }, {{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}2a, {{ lower_camel .Name }}2b })
{{- end }}

{{- if .ClusterScoped }}

		err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}1a.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())

		assertSnapshot{{ .PluralName }}(nil, {{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}2a })
{{- else }}

		err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}1a.GetMetadata().Namespace, {{ lower_camel .Name }}1a.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())
		err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}1b.GetMetadata().Namespace, {{ lower_camel .Name }}1b.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
		Expect(err).NotTo(HaveOccurred())

		assertSnapshot{{ .PluralName }}(nil, {{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}1b, {{ lower_camel .Name }}2a, {{ lower_camel .Name }}2b })
{{- end }}
{{- end}}
	})
})

`))
View Source
var ResourceGroupEventLoopTemplate = template.Must(template.New("resource_group_event_loop").Funcs(Funcs).Parse(`package {{ .Project.ProjectConfig.Version }}

import (
	"context"

	"go.opencensus.io/trace"

	"github.com/hashicorp/go-multierror"

	"github.com/solo-io/solo-kit/pkg/api/v1/clients"
	"github.com/solo-io/solo-kit/pkg/api/v1/eventloop"
	"github.com/solo-io/solo-kit/pkg/errors"
	"github.com/solo-io/go-utils/contextutils"
	"github.com/solo-io/go-utils/errutils"
)

type {{ .GoName }}Syncer interface {
	Sync(context.Context, *{{ .GoName }}Snapshot) error
}

type {{ .GoName }}Syncers []{{ .GoName }}Syncer

func (s {{ .GoName }}Syncers) Sync(ctx context.Context, snapshot *{{ .GoName }}Snapshot) error {
	var multiErr *multierror.Error
	for _, syncer := range s {
		if err := syncer.Sync(ctx, snapshot); err != nil {
			multiErr = multierror.Append(multiErr, err)
		}
	}
	return multiErr.ErrorOrNil()
}

type {{ lower_camel .GoName }}EventLoop struct {
	emitter {{ .GoName }}SnapshotEmitter
	syncer  {{ .GoName }}Syncer
	ready chan struct{}
}

func New{{ .GoName }}EventLoop(emitter {{ .GoName }}SnapshotEmitter, syncer {{ .GoName }}Syncer) eventloop.EventLoop {
	return &{{ lower_camel .GoName }}EventLoop{
		emitter: emitter,
		syncer:  syncer,
		ready: make(chan struct{}),
	}
}


func (el *{{ lower_camel .GoName }}EventLoop) Ready() <-chan struct{} {
	return el.ready
}

func (el *{{ lower_camel .GoName }}EventLoop) Run(namespaces []string, opts clients.WatchOpts) (<-chan error, error) {
	opts = opts.WithDefaults()
	opts.Ctx = contextutils.WithLogger(opts.Ctx, "{{ .Project.ProjectConfig.Version }}.event_loop")
	logger := contextutils.LoggerFrom(opts.Ctx)
	logger.Infof("event loop started")

	errs := make(chan error)

	watch, emitterErrs, err := el.emitter.Snapshots(namespaces, opts)
	if err != nil {
		return nil, errors.Wrapf(err, "starting snapshot watch")
	}
	go errutils.AggregateErrs(opts.Ctx, errs, emitterErrs, "{{ .Project.ProjectConfig.Version }}.emitter errors")
	go func() {
		var channelClosed bool
		// create a new context for each loop, cancel it before each loop
		var cancel context.CancelFunc = func() {}
		// use closure to allow cancel function to be updated as context changes
		defer func() { cancel() }()
		for {
			select {
			case snapshot, ok := <-watch:
				if !ok {
					return
				}
				// cancel any open watches from previous loop
				cancel()

				ctx, span := trace.StartSpan(opts.Ctx, "{{ .Name }}.EventLoopSync")
				ctx, canc := context.WithCancel(ctx)
				cancel = canc
				err := el.syncer.Sync(ctx, snapshot)
				span.End()

				if err != nil {
					select {
					case errs <- err:
					default:
						logger.Errorf("write error channel is full! could not propagate err: %v", err)
					}
				} else if !channelClosed {
					channelClosed = true
					close(el.ready)
				}
			case <-opts.Ctx.Done():
				return
			}
		}
	}()
	return errs, nil
}
`))
View Source
var ResourceGroupEventLoopTestTemplate = template.Must(template.New("resource_group_event_loop_test").Funcs(Funcs).Parse(`// +build solokit

package {{ .Project.ProjectConfig.Version }}

{{- $clients := new_str_slice }}
{{- range .Resources}}
{{- $clients := (append_str_slice $clients (printf "%vClient" (lower_camel .Name))) }}
{{- end}}
{{- $clients := (join_str_slice $clients ", ") }}

import (
	"context"
	"time"
	"sync"

	{{ .Imports }}
	. "github.com/onsi/ginkgo"
	. "github.com/onsi/gomega"
	"github.com/solo-io/solo-kit/pkg/api/v1/clients"
	"github.com/solo-io/solo-kit/pkg/api/v1/clients/factory"
	"github.com/solo-io/solo-kit/pkg/api/v1/clients/memory"
)

var _ = Describe("{{ .GoName }}EventLoop", func() {
	var (
		ctx context.Context
		namespace string
		emitter     {{ .GoName }}Emitter
		err       error
	)

	BeforeEach(func() {
		ctx = context.Background()
{{- range .Resources}}

		{{ lower_camel .Name }}ClientFactory := &factory.MemoryResourceClientFactory{
			Cache: memory.NewInMemoryResourceCache(),
		}
		{{ lower_camel .Name }}Client, err := {{ .ImportPrefix }}New{{ .Name }}Client(ctx, {{ lower_camel .Name }}ClientFactory)
		Expect(err).NotTo(HaveOccurred())
{{- end}}

		emitter = New{{ .GoName }}Emitter({{ $clients }})
	})
	It("runs sync function on a new snapshot", func() {
{{- range .Resources  }}
		_, err = emitter.{{ .Name }}().Write({{ .ImportPrefix }}New{{ .Name }}(namespace, "jerry"), clients.WriteOpts{})
		Expect(err).NotTo(HaveOccurred())
{{- end}}
		sync := &mock{{ .GoName }}Syncer{}
		el := New{{ .GoName }}EventLoop(emitter, sync)
		_, err := el.Run([]string{namespace}, clients.WatchOpts{})
		Expect(err).NotTo(HaveOccurred())
		Eventually(sync.Synced, 5*time.Second).Should(BeTrue())
	})
})

type mock{{ .GoName }}Syncer struct {
	synced bool
	mutex  sync.Mutex
}

func (s *mock{{ .GoName }}Syncer) Synced() bool {
	s.mutex.Lock()
	defer s.mutex.Unlock()
	return s.synced
}

func (s *mock{{ .GoName }}Syncer) Sync(ctx context.Context, snap *{{ .GoName }}Snapshot) error {
	s.mutex.Lock()
	s.synced = true
	s.mutex.Unlock()
	return nil
}
`))
View Source
var ResourceGroupSnapshotTemplate = template.Must(template.New("resource_group_snapshot").Funcs(Funcs).Parse(
	`package {{ .Project.ProjectConfig.Version }}

import (
	"encoding/binary"
	"fmt"
	"hash"
	"hash/fnv"
	"log"

	{{ .Imports }}
	"github.com/solo-io/solo-kit/pkg/api/v1/resources"
	"github.com/rotisserie/eris"
	"github.com/solo-io/go-utils/hashutils"
	"go.uber.org/zap"
)

type {{ .GoName }}Snapshot struct {
{{- range .Resources}}
	{{ upper_camel .PluralName }} {{ .ImportPrefix }}{{ .Name }}List
{{- end}}
}

func (s {{ .GoName }}Snapshot) Clone() {{ .GoName }}Snapshot {
	return {{ .GoName }}Snapshot{
{{- range .Resources}}
		{{ upper_camel .PluralName }}: s.{{ upper_camel .PluralName }}.Clone(),
{{- end}}
	}
}

func (s {{ .GoName }}Snapshot) Hash(hasher hash.Hash64) (uint64, error) {
	if hasher == nil {
		hasher = fnv.New64()
	}
{{- range .Resources}}
	if _, err := s.hash{{ upper_camel .PluralName }}(hasher); err != nil {
		return 0, err
	}
{{- end}}
	return hasher.Sum64(), nil
}

{{- $ResourceGroup := . }}
{{- range .Resources }}

func (s {{ $ResourceGroup.GoName }}Snapshot) hash{{ upper_camel .PluralName }}(hasher hash.Hash64) (uint64, error) {
	{{- if .SkipHashingAnnotations }}
	clonedList := s.{{ upper_camel .PluralName }}.Clone()
	for _, v := range clonedList {
		v.Metadata.Annotations = nil
	}
	return hashutils.HashAllSafe(hasher, clonedList.AsInterfaces()...)
	{{- else }}
	return hashutils.HashAllSafe(hasher, s.{{ upper_camel .PluralName }}.AsInterfaces()...)
	{{- end }}
}
{{- end}}

func (s {{ .GoName }}Snapshot) HashFields() []zap.Field {
	var fields []zap.Field
	hasher := fnv.New64()
{{- range .Resources}}
	{{ upper_camel .PluralName }}Hash, err := s.hash{{ upper_camel .PluralName }}(hasher)
	if err != nil {
		log.Println(eris.Wrapf(err, "error hashing, this should never happen"))
	}
	fields = append(fields, zap.Uint64("{{ lower_camel .PluralName }}", {{ upper_camel .PluralName }}Hash ))
{{- end}}
	snapshotHash, err := s.Hash(hasher)
	if err != nil {
		log.Println(eris.Wrapf(err, "error hashing, this should never happen"))
	}
	return append(fields, zap.Uint64("snapshotHash",  snapshotHash))
}

type {{ .GoName }}SnapshotStringer struct {
	Version              uint64
{{- range .Resources}}
	{{ upper_camel .PluralName }} []string
{{- end}}
}

func (ss {{ .GoName }}SnapshotStringer) String() string {
	s := fmt.Sprintf("{{ .GoName }}Snapshot %v\n", ss.Version)
{{- range .Resources}}

	s += fmt.Sprintf("  {{ upper_camel .PluralName }} %v\n", len(ss.{{ upper_camel .PluralName }}))
	for _, name := range ss.{{ upper_camel .PluralName }} {
		s += fmt.Sprintf("    %v\n", name)
	}
{{- end}}

	return s
}

func (s {{ .GoName }}Snapshot) Stringer() {{ .GoName }}SnapshotStringer {
	snapshotHash, err := s.Hash(nil)
	if err != nil {
		log.Println(eris.Wrapf(err, "error hashing, this should never happen"))
	}
	return {{ .GoName }}SnapshotStringer{
		Version: snapshotHash,
{{- range .Resources}}
{{- if .ClusterScoped }}
		{{ upper_camel .PluralName }}: s.{{ upper_camel .PluralName }}.Names(),
{{- else }}
		{{ upper_camel .PluralName }}: s.{{ upper_camel .PluralName }}.NamespacesDotNames(),
{{- end }}
{{- end}}
	}
}
`))
View Source
var ResourceReconcilerTemplate = template.Must(template.New("resource_client").Funcs(Funcs).Parse(`package {{ .Project.ProjectConfig.Version }}
import (
	"github.com/solo-io/solo-kit/pkg/api/v1/clients"
	"github.com/solo-io/solo-kit/pkg/api/v1/reconcile"
	"github.com/solo-io/solo-kit/pkg/api/v1/resources"
	"github.com/solo-io/go-utils/contextutils"
)

// Option to copy anything from the original to the desired before writing. Return value of false means don't update
type Transition{{ .Name }}Func func(original, desired *{{ .Name }}) (bool, error)

type {{ .Name }}Reconciler interface {
	Reconcile(namespace string, desiredResources {{ .Name }}List, transition Transition{{ .Name }}Func, opts clients.ListOpts) error
}

func {{ lower_camel .Name }}sToResources(list {{ .Name }}List) resources.ResourceList {
	var resourceList resources.ResourceList
	for _, {{ lower_camel .Name }} := range list {
		resourceList = append(resourceList, {{ lower_camel .Name }})
	}
	return resourceList
}

func New{{ .Name }}Reconciler(client {{ .Name }}Client) {{ .Name }}Reconciler {
	return &{{ lower_camel .Name }}Reconciler{
		base: reconcile.NewReconciler(client.BaseClient()),
	}
}

type {{ lower_camel .Name }}Reconciler struct {
	base reconcile.Reconciler
}

func (r *{{ lower_camel .Name }}Reconciler) Reconcile(namespace string, desiredResources {{ .Name }}List, transition Transition{{ .Name }}Func, opts clients.ListOpts) error {
	opts = opts.WithDefaults()
	opts.Ctx = contextutils.WithLogger(opts.Ctx, "{{ lower_camel .Name }}_reconciler")
	var transitionResources reconcile.TransitionResourcesFunc
	if transition != nil {
		transitionResources = func(original, desired resources.Resource) (bool, error) {
			return transition(original.(*{{ .Name }}), desired.(*{{ .Name }}))
		}
	}
	return r.base.Reconcile(namespace, {{ lower_camel .Name }}sToResources(desiredResources), transitionResources, opts)
}
`))
View Source
var ResourceTemplate = template.Must(template.New("resource").Funcs(Funcs).Parse(`package {{ .Project.ProjectConfig.Version }}

import (
	"encoding/binary"
	"hash"
	"hash/fnv"
	"log"
	"sort"

{{- if $.IsCustom }}
	{{ $.CustomImportPrefix }} "{{ $.CustomResource.Package }}"
{{- end }}

	"github.com/solo-io/solo-kit/pkg/api/v1/resources"
	"github.com/solo-io/solo-kit/pkg/api/v1/resources/core"
	"github.com/solo-io/solo-kit/pkg/errors"
	"github.com/solo-io/go-utils/hashutils"
{{- if not $.IsCustom }}
	"github.com/solo-io/solo-kit/pkg/api/v1/clients/kube/crd"
	"k8s.io/apimachinery/pkg/runtime"
{{- end }}
	"k8s.io/apimachinery/pkg/runtime/schema"
)

func New{{ .Name }}(namespace, name string) *{{ .Name }} {
	{{ lowercase .Name }} := &{{ .Name }}{}
{{- if $.IsCustom }}
	{{ lowercase .Name }}.{{ $.Name }}.SetMetadata(&core.Metadata{
{{- else }}
	{{ lowercase .Name }}.SetMetadata(&core.Metadata{
{{- end }}
		Name:      name,
		Namespace: namespace,
	})
	return {{ lowercase .Name }}
}

{{- if $.IsCustom }}

// require custom resource to implement Clone() as well as resources.Resource interface

type Cloneable{{ $.Name }} interface {
	resources.Resource
	Clone() *{{ $.CustomImportPrefix}}.{{ $.Name }}
}

var _ Cloneable{{ $.Name }} = &{{ $.CustomImportPrefix}}.{{ $.Name }}{}

type {{ $.Name }} struct {
	{{ $.CustomImportPrefix}}.{{ $.Name }}
}

func (r *{{ .Name }}) Clone() resources.Resource {
	return &{{ .Name }}{ {{ .Name }}: *r.{{ .Name }}.Clone() }
}

func (r *{{ .Name }}) Hash(hasher hash.Hash64) (uint64, error) {
	if hasher == nil {
		hasher = fnv.New64()
	}
	clone := r.{{ .Name }}.Clone()
	resources.UpdateMetadata(clone, func(meta *core.Metadata) {
		meta.ResourceVersion = ""
		{{- if $.SkipHashingAnnotations }}
		meta.Annotations = nil
		{{- end }}
	})
	err := binary.Write(hasher, binary.LittleEndian, hashutils.HashAll(clone))
	if err != nil {
		return 0, err
	}
	return hasher.Sum64(), nil
}

{{- else }}

func (r *{{ .Name }}) SetMetadata(meta *core.Metadata) {
	r.Metadata = meta
}

{{- if $.HasStatus }}

func (r *{{ .Name }}) SetStatus(status *core.Status) {
	r.Status = status
}
{{- end }}

{{- end }}

func (r *{{ .Name }}) MustHash() uint64 {
	hashVal, err := r.Hash(nil)
	if err != nil {
		log.Panicf("error while hashing: (%s) this should never happen", err)
	}
	return hashVal
}

func (r *{{ .Name }}) GroupVersionKind() schema.GroupVersionKind {
	return {{ .Name }}GVK
}

type {{ .Name }}List []*{{ .Name }}

func (list {{ .Name }}List) Find(namespace, name string) (*{{ .Name }}, error) {
	for _, {{ lower_camel .Name }} := range list {
		if {{ lower_camel .Name }}.GetMetadata().Name == name && {{ lower_camel .Name }}.GetMetadata().Namespace == namespace {
			return {{ lower_camel .Name }}, nil
		}
	}
	return nil, errors.Errorf("list did not find {{ lower_camel .Name }} %v.%v", namespace, name)
}

func (list {{ .Name }}List) AsResources() resources.ResourceList {
	var ress resources.ResourceList 
	for _, {{ lower_camel .Name }} := range list {
		ress = append(ress, {{ lower_camel .Name }})
	}
	return ress
}

{{ if $.HasStatus -}}
func (list {{ .Name }}List) AsInputResources() resources.InputResourceList {
	var ress resources.InputResourceList
	for _, {{ lower_camel .Name }} := range list {
		ress = append(ress, {{ lower_camel .Name }})
	}
	return ress
}
{{- end}}

func (list {{ .Name }}List) Names() []string {
	var names []string
	for _, {{ lower_camel .Name }} := range list {
		names = append(names, {{ lower_camel .Name }}.GetMetadata().Name)
	}
	return names
}

func (list {{ .Name }}List) NamespacesDotNames() []string {
	var names []string
	for _, {{ lower_camel .Name }} := range list {
		names = append(names, {{ lower_camel .Name }}.GetMetadata().Namespace + "." + {{ lower_camel .Name }}.GetMetadata().Name)
	}
	return names
}

func (list {{ .Name }}List) Sort() {{ .Name }}List {
	sort.SliceStable(list, func(i, j int) bool {
		return list[i].GetMetadata().Less(list[j].GetMetadata())
	})
	return list
}

func (list {{ .Name }}List) Clone() {{ .Name }}List {
	var {{ lower_camel .Name }}List {{ .Name }}List
	for _, {{ lower_camel .Name }} := range list {
		{{ lower_camel .Name }}List = append({{ lower_camel .Name }}List, resources.Clone({{ lower_camel .Name }}).(*{{ .Name }}))
	}
	return {{ lower_camel .Name }}List 
}

func (list {{ .Name }}List) Each(f func(element *{{ .Name }})) {
	for _, {{ lower_camel .Name }} := range list {
		f({{ lower_camel .Name }})
	}
}

func (list {{ .Name }}List) EachResource(f func(element resources.Resource)) {
	for _, {{ lower_camel .Name }} := range list {
		f({{ lower_camel .Name }})
	}
}

func (list {{ .Name }}List) AsInterfaces() []interface{}{
	var asInterfaces []interface{}
	list.Each(func(element *{{ .Name }}) {
		asInterfaces = append(asInterfaces, element)
	})
	return asInterfaces
}

{{- $crdGroupName := .Project.ProtoPackage }}
{{- if ne .Project.ProjectConfig.CrdGroupOverride "" }}
{{- $crdGroupName = .Project.ProjectConfig.CrdGroupOverride }}
{{- end}}

{{- if not $.IsCustom }}

// Kubernetes Adapter for {{ .Name }}

func (o *{{ .Name }}) GetObjectKind() schema.ObjectKind {
	t := {{ .Name }}Crd.TypeMeta()
	return &t
}

func (o *{{ .Name }}) DeepCopyObject() runtime.Object {
	return resources.Clone(o).(*{{ .Name }})
}

func (o *{{ .Name }}) DeepCopyInto(out *{{ .Name }}) {
	clone := resources.Clone(o).(*{{ .Name }})
	*out = *clone
}

var (
	{{ .Name }}Crd = crd.NewCrd(
		"{{ lowercase (upper_camel .PluralName) }}",
		{{ .Name }}GVK.Group,
		{{ .Name }}GVK.Version,
		{{ .Name }}GVK.Kind,
		"{{ .ShortName }}",
		{{ .ClusterScoped }},
		&{{ .Name }}{})
)

{{- end}}

var (
	{{ .Name }}GVK = schema.GroupVersionKind{
		Version: "{{ .Project.ProjectConfig.Version }}",
		Group: "{{ $crdGroupName }}",
		Kind: "{{ .Name }}",
	}
)
`))
View Source
var SimpleEmitterTemplate = template.Must(template.New("resource_group_emitter").Funcs(Funcs).Parse(
	`package {{ .Project.ProjectConfig.Version }}

import (
	"context"
	"sync"
	"time"

	{{ .Imports }}
	"go.opencensus.io/stats"
	"go.uber.org/zap"

	"github.com/solo-io/solo-kit/pkg/api/v1/clients"
	"github.com/solo-io/solo-kit/pkg/api/v1/resources"
	"github.com/solo-io/solo-kit/pkg/errors"
	"github.com/solo-io/go-utils/errutils"
	"github.com/solo-io/go-utils/contextutils"
)


type {{ .GoName }}SimpleEmitter interface {
	Snapshots(ctx context.Context) (<-chan *{{ .GoName }}Snapshot, <-chan error, error)
}

func New{{ .GoName }}SimpleEmitter(aggregatedWatch clients.ResourceWatch) {{ .GoName }}SimpleEmitter {
	return New{{ .GoName }}SimpleEmitterWithEmit(aggregatedWatch, make(chan struct{}))
}

func New{{ .GoName }}SimpleEmitterWithEmit(aggregatedWatch clients.ResourceWatch, emit <-chan struct{}) {{ .GoName }}SimpleEmitter {
	return &{{ lower_camel .GoName }}SimpleEmitter{
		aggregatedWatch: aggregatedWatch,
		forceEmit: emit,
	}
}

type {{ lower_camel .GoName }}SimpleEmitter struct {
	forceEmit <- chan struct{}
	aggregatedWatch clients.ResourceWatch
}

func (c *{{ lower_camel .GoName }}SimpleEmitter) Snapshots(ctx context.Context) (<-chan *{{ .GoName }}Snapshot, <-chan error, error) {
	snapshots := make(chan *{{ .GoName }}Snapshot)
	errs := make(chan error)
	
	untyped, watchErrs, err := c.aggregatedWatch(ctx)
	if err != nil {
		return nil, nil, err
	}

	go errutils.AggregateErrs(ctx, errs, watchErrs, "{{ lower_camel .GoName }}-emitter")

	go func() {
		currentSnapshot := {{ .GoName }}Snapshot{}
		timer := time.NewTicker(time.Second * 1)
		var previousHash uint64
		sync := func() {
			currentHash, err := currentSnapshot.Hash(nil)
			if err != nil {
				contextutils.LoggerFrom(ctx).Panicw("error while hashing, this should never happen", zap.Error(err))
			}
			if previousHash == currentHash {
				return
			}

			previousHash = currentHash

			stats.Record(ctx, m{{ .GoName }}SnapshotOut.M(1))
			sentSnapshot := currentSnapshot.Clone()
			snapshots <- &sentSnapshot
		}

		defer func() {
			close(snapshots)
			close(errs)
		}()

		for {
			record := func() { stats.Record(ctx, m{{ .GoName }}SnapshotIn.M(1)) }

			select {
			case <-timer.C:
				sync()
			case <-ctx.Done():
				return
			case <-c.forceEmit:
				sentSnapshot := currentSnapshot.Clone()
				snapshots <- &sentSnapshot
			case untypedList := <-untyped:
				record()

				currentSnapshot = {{ .GoName }}Snapshot{}
				for _, res := range untypedList {
					switch typed := res.(type) {
{{- range .Resources}}
					case *{{ .ImportPrefix }}{{ .Name }}:
						currentSnapshot.{{ upper_camel .PluralName }} = append(currentSnapshot.{{ upper_camel .PluralName }}, typed)
{{- end}}
					default:
						select {
						case errs <- fmt.Errorf("{{ .GoName }}SnapshotEmitter "+
							"cannot process resource %v of type %T", res.GetMetadata().Ref(), res):
						case <-ctx.Done():
							return
						}
					}
				}

			}
		}
	}()
	return snapshots, errs, nil
}
`))
View Source
var SimpleEventLoopTemplate = template.Must(template.New("simple_event_loop").Funcs(Funcs).Parse(`package {{ .Project.ProjectConfig.Version }}

import (
	"context"
	"fmt"

	"go.opencensus.io/trace"

	"github.com/hashicorp/go-multierror"

	"github.com/solo-io/solo-kit/pkg/api/v1/eventloop"
	"github.com/solo-io/solo-kit/pkg/errors"
	"github.com/solo-io/go-utils/contextutils"
	"github.com/solo-io/go-utils/errutils"
)

// SyncDeciders Syncer which implements this interface 
// can make smarter decisions over whether 
// it should be restarted (including having its context cancelled)
// based on a diff of the previous and current snapshot

// Deprecated: use {{ .GoName }}SyncDeciderWithContext
type {{ .GoName }}SyncDecider interface {
	{{ .GoName }}Syncer
	ShouldSync(old, new *{{ .GoName }}Snapshot) bool
}

type {{ .GoName }}SyncDeciderWithContext interface {
	{{ .GoName }}Syncer
	ShouldSync(ctx context.Context, old, new *{{ .GoName }}Snapshot) bool
}

type {{ lower_camel .GoName }}SimpleEventLoop struct {
	emitter {{ .GoName }}SimpleEmitter
	syncers  []{{ .GoName }}Syncer
}

func New{{ .GoName }}SimpleEventLoop(emitter {{ .GoName }}SimpleEmitter, syncers ... {{ .GoName }}Syncer) eventloop.SimpleEventLoop {
	return &{{ lower_camel .GoName }}SimpleEventLoop{
		emitter: emitter,
		syncers: syncers,
	}
}

func (el *{{ lower_camel .GoName }}SimpleEventLoop) Run(ctx context.Context) (<-chan error, error) {
	ctx = contextutils.WithLogger(ctx, "{{ .Project.ProjectConfig.Version }}.event_loop")
	logger := contextutils.LoggerFrom(ctx)
	logger.Infof("event loop started")

	errs := make(chan error)

	watch, emitterErrs, err := el.emitter.Snapshots(ctx)
	if err != nil {
		return nil, errors.Wrapf(err, "starting snapshot watch")
	}


	go errutils.AggregateErrs(ctx, errs, emitterErrs, "{{ .Project.ProjectConfig.Version }}.emitter errors")
	go func() {
		// create a new context for each syncer for each loop, cancel each before each loop
		syncerCancels := make(map[{{ .GoName }}Syncer]context.CancelFunc)

		// use closure to allow cancel function to be updated as context changes
		defer func() {
			for _, cancel := range syncerCancels {
				cancel()
			}
		}()

		// cache the previous snapshot for comparison
		var previousSnapshot *{{ .GoName }}Snapshot

		for {
			select {
			case snapshot, ok := <-watch:
				if !ok {
					return
				}

				// cancel any open watches from previous loop
				for _, syncer := range el.syncers {
					// allow the syncer to decide if we should sync it + cancel its previous context
					if syncDecider, isDecider := syncer.({{ .GoName }}SyncDecider); isDecider {
						if shouldSync := syncDecider.ShouldSync(previousSnapshot, snapshot); !shouldSync {
							continue // skip syncing this syncer
						}
					} else if syncDeciderWithContext, isDecider := syncer.({{ .GoName }}SyncDeciderWithContext); isDecider {
						if shouldSync := syncDeciderWithContext.ShouldSync(ctx, previousSnapshot, snapshot); !shouldSync {
							continue // skip syncing this syncer
						}
					}  

					// if this syncer had a previous context, cancel it
					cancel, ok := syncerCancels[syncer]
					if ok {
						cancel()
					}
						
					ctx, span := trace.StartSpan(ctx, fmt.Sprintf("{{ .Name }}.SimpleEventLoopSync-%T", syncer))
					ctx, canc := context.WithCancel(ctx)
					err := syncer.Sync(ctx, snapshot)
					span.End()

					if err != nil {
						select {
						case errs <- err:
						default:
							logger.Errorf("write error channel is full! could not propagate err: %v", err)
						}
					}

					syncerCancels[syncer] = canc
				}

				previousSnapshot = snapshot

			case <-ctx.Done():
				return
			}
		}
	}()
	return errs, nil
}
`))
View Source
var XdsTemplate = template.Must(template.New("xds_template").Funcs(Funcs).Parse(`package {{ .Project.ProjectConfig.Version }}

import (
	"context"
	"errors"
	"fmt"

	core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
	discovery "github.com/envoyproxy/go-control-plane/envoy/api/v2"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	"github.com/solo-io/solo-kit/pkg/api/v1/control-plane/cache"
	"github.com/solo-io/solo-kit/pkg/api/v1/control-plane/client"
	"github.com/solo-io/solo-kit/pkg/api/v1/control-plane/resource"
	"github.com/solo-io/solo-kit/pkg/api/v1/control-plane/server"
)

// Type Definitions:

const {{ upper_camel .MessageType }}Type = resource.TypePrefix + "/{{ .ProtoPackage }}.{{ upper_camel .MessageType }}"

/* Defined a resource - to be used by snapshot */
type {{ upper_camel .MessageType }}XdsResourceWrapper struct {
	// TODO(yuval-k): This is public for mitchellh hashstructure to work properly. consider better alternatives.
	Resource *{{ upper_camel .MessageType }}
}

// Make sure the Resource interface is implemented
var _ cache.Resource = &{{ upper_camel .MessageType }}XdsResourceWrapper{}

func New{{ upper_camel .MessageType }}XdsResourceWrapper(resourceProto *{{ upper_camel .MessageType }}) *{{ upper_camel .MessageType }}XdsResourceWrapper {
	return &{{ upper_camel .MessageType }}XdsResourceWrapper{
		Resource: resourceProto,
	}
}

func (e *{{ upper_camel .MessageType }}XdsResourceWrapper) Self() cache.XdsResourceReference {
	return cache.XdsResourceReference{Name: e.Resource.{{ upper_camel .NameField }}, Type: {{ upper_camel .MessageType }}Type}
}

func (e *{{ upper_camel .MessageType }}XdsResourceWrapper) ResourceProto() cache.ResourceProto {
	return e.Resource
}

{{- if .NoReferences }}
func (e *{{ upper_camel .MessageType }}XdsResourceWrapper) References() []cache.XdsResourceReference {
	return nil
}
{{- else }}
	// This method is not implemented as it requires domain knowledge and cannot be auto generated.
	// Please copy it, and implement it in a different file (so it doesn't get overwritten).
	// Alternativly, specify the annotation @solo-kit:resource.no_references in the comments for the 
	// {{ upper_camel .MessageType }} to indicate that there are no references.
	//	func (e *{{ upper_camel .MessageType }}XdsResourceWrapper) References() []cache.XdsResourceReference {
	//		panic("not implemented")
	//	}
{{- end }}

// Define a type record. This is used by the generic client library.
var {{ upper_camel .MessageType }}TypeRecord = client.NewTypeRecord(
	{{ upper_camel .MessageType }}Type,
	
	// Return an empty message, that can be used to deserialize bytes into it.
	func() cache.ResourceProto { return &{{ upper_camel .MessageType }}{} },
	
	// Covert the message to a resource suitable for use for protobuf's Any.
	func(r cache.ResourceProto) cache.Resource {
		return &{{ upper_camel .MessageType }}XdsResourceWrapper{Resource: r.(*{{ upper_camel .MessageType }})}
	},
)

// Server Implementation:

// Wrap the generic server and implement the type sepcific methods:
type {{ lower_camel .Name }}Server struct {
	server.Server
}

func New{{ upper_camel .Name }}Server(genericServer server.Server) {{ upper_camel .Name }}Server {
	return &{{ lower_camel .Name }}Server{Server: genericServer}
}

func (s *{{ lower_camel .Name }}Server) Stream{{ upper_camel .MessageType }}(stream {{ upper_camel .Name }}_Stream{{ upper_camel .MessageType }}Server) error {
	return s.Server.StreamV2(stream, {{ upper_camel .MessageType }}Type)
}

func (s *{{ lower_camel .Name }}Server) Fetch{{ upper_camel .MessageType }}(ctx context.Context, req *discovery.DiscoveryRequest) (*discovery.DiscoveryResponse, error) {
	if req == nil {
		return nil, status.Errorf(codes.Unavailable, "empty request")
	}
	req.TypeUrl = {{ upper_camel .MessageType }}Type
	return s.Server.FetchV2(ctx, req)
}

func (s *{{ lower_camel .Name }}Server) Delta{{ upper_camel .MessageType }}(_ {{ upper_camel .Name }}_Delta{{ upper_camel .MessageType }}Server) error {
	return errors.New("not implemented")
}


// Client Implementation: Generate a strongly typed client over the generic client

// The apply functions receives resources and returns an error if they were applied correctly.
// In theory the configuration can become valid in the future (i.e. eventually consistent), but I don't think we need to worry about that now
// As our current use cases only have one configuration resource, so no interactions are expected.
type Apply{{ upper_camel .MessageType }} func(version string, resources []*{{ upper_camel .MessageType }}) error

// Convert the strongly typed apply to a generic apply.
func apply{{ upper_camel .MessageType }}(typedApply Apply{{ upper_camel .MessageType }}) func(cache.Resources) error {
	return func(resources cache.Resources) error {

		var configs []*{{ upper_camel .MessageType }}
		for _, r := range resources.Items {
			if proto, ok := r.ResourceProto().(*{{ upper_camel .MessageType }}); !ok {
				return fmt.Errorf("resource %s of type %s incorrect", r.Self().Name, r.Self().Type)
			} else {
				configs = append(configs, proto)
			}
		}

		return typedApply(resources.Version, configs)
	}
}

func New{{ upper_camel .MessageType }}Client(nodeinfo *core.Node, typedApply Apply{{ upper_camel .MessageType }}) client.Client {
	return client.NewClient(nodeinfo, {{ upper_camel .MessageType }}TypeRecord, apply{{ upper_camel .MessageType }}(typedApply))
}

`))

Functions

This section is empty.

Types

This section is empty.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL