Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var Funcs = template.FuncMap{ "resourceBelongsToProject": utils.IsProjectResource, "join": strings.Join, "lowercase": strings.ToLower, "lower_camel": strcase.ToLowerCamel, "upper_camel": strcase.ToCamel, "snake": strcase.ToSnake, "p": gendoc.PFilter, "para": gendoc.ParaFilter, "nobr": gendoc.NoBrFilter, "fieldType": fieldType, "yamlType": yamlType, "noescape": noEscape, "printfptr": printPointer, "remove_magic_comments": func(in string) string { lines := strings.Split(in, "\n") var linesWithoutMagicComments []string for _, line := range lines { if magicCommentRegex.MatchString(line) { continue } linesWithoutMagicComments = append(linesWithoutMagicComments, line) } return strings.Join(linesWithoutMagicComments, "\n") }, "new_str_slice": func() *[]string { var v []string return &v }, "append_str_slice": func(to *[]string, str string) *[]string { *to = append(*to, str) return to }, "join_str_slice": func(slc *[]string, sep string) string { return strings.Join(*slc, sep) }, "new_bool": func() *bool { var v bool return &v }, "set_bool": func(v *bool, val bool) *bool { *v = val return v }, "unique": func(vals []string) []string { result := make([]string, 0, len(vals)) for _, v := range vals { if !stringutils.ContainsString(v, result) { result = append(result, v) } } return result }, "backtick": func() string { return "`" }, }
View Source
var ProjectTestSuiteTemplate = template.Must(template.New("project_template").Funcs(Funcs).Parse(`package {{ .ProjectConfig.Version }}
{{- $uniqueCrds := new_str_slice }}
{{- range .Resources}}
{{- if ne .ProtoPackage ""}}
{{- $uniqueCrds := (append_str_slice $uniqueCrds (printf "%v.%v" .PluralName .ProtoPackage))}}
{{- end }}
{{- end }}
{{- $uniqueCrds := (unique $uniqueCrds)}}
import (
"context"
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/solo-io/k8s-utils/kubeutils"
"github.com/solo-io/k8s-utils/testutils/clusterlock"
"github.com/solo-io/solo-kit/test/testutils"
apiexts "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func Test{{ upper_camel .ProjectConfig.Name }}(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "{{ upper_camel .ProjectConfig.Name }} Suite")
}
var (
lock *clusterlock.TestClusterLocker
cfg *rest.Config
_ = SynchronizedAfterSuite(func() {}, func() {
if os.Getenv("RUN_KUBE_TESTS") != "1" {
return
}
ctx := context.Background()
var err error
cfg, err = kubeutils.GetConfig("", "")
Expect(err).NotTo(HaveOccurred())
clientset, err := apiexts.NewForConfig(cfg)
Expect(err).NotTo(HaveOccurred())
{{- range $uniqueCrds}}
err = clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Delete(ctx, "{{lowercase .}}", metav1.DeleteOptions{})
testutils.ErrorNotOccuredOrNotFound(err)
{{- end}}
Expect(lock.ReleaseLock()).NotTo(HaveOccurred())
})
_ = SynchronizedBeforeSuite(func() []byte {
if os.Getenv("RUN_KUBE_TESTS") != "1" {
return nil
}
var err error
cfg, err = kubeutils.GetConfig("", "")
Expect(err).NotTo(HaveOccurred())
clientset, err := kubernetes.NewForConfig(cfg)
Expect(err).NotTo(HaveOccurred())
lock, err = clusterlock.NewTestClusterLocker(clientset, clusterlock.Options{})
Expect(err).NotTo(HaveOccurred())
Expect(lock.AcquireLock()).NotTo(HaveOccurred())
return nil
}, func([]byte) {})
)
`))
View Source
var ResourceClientTemplate = template.Must(template.New("resource_reconciler").Funcs(Funcs).Parse(`package {{ .Project.ProjectConfig.Version }}
import (
"context"
"github.com/solo-io/solo-kit/pkg/api/v1/clients"
"github.com/solo-io/solo-kit/pkg/api/v1/clients/factory"
"github.com/solo-io/solo-kit/pkg/api/v1/resources"
"github.com/solo-io/solo-kit/pkg/errors"
)
type {{ .Name }}Watcher interface {
{{- if .ClusterScoped }}
// watch cluster-scoped {{ .PluralName }}
Watch(opts clients.WatchOpts) (<-chan {{ .Name }}List, <-chan error, error)
{{- else }}
// watch namespace-scoped {{ .PluralName }}
Watch(namespace string, opts clients.WatchOpts) (<-chan {{ .Name }}List, <-chan error, error)
{{- end }}
}
type {{ .Name }}Client interface {
BaseClient() clients.ResourceClient
Register() error
{{- if .ClusterScoped }}
Read(name string, opts clients.ReadOpts) (*{{ .Name }}, error)
{{- else }}
Read(namespace, name string, opts clients.ReadOpts) (*{{ .Name }}, error)
{{- end }}
Write(resource *{{ .Name }}, opts clients.WriteOpts) (*{{ .Name }}, error)
{{- if .ClusterScoped }}
Delete(name string, opts clients.DeleteOpts) error
List(opts clients.ListOpts) ({{ .Name }}List, error)
{{- else }}
Delete(namespace, name string, opts clients.DeleteOpts) error
List(namespace string, opts clients.ListOpts) ({{ .Name }}List, error)
{{- end }}
{{ .Name }}Watcher
}
type {{ lower_camel .Name }}Client struct {
rc clients.ResourceClient
}
func New{{ .Name }}Client(ctx context.Context, rcFactory factory.ResourceClientFactory) ({{ .Name }}Client, error) {
return New{{ .Name }}ClientWithToken(ctx, rcFactory, "")
}
func New{{ .Name }}ClientWithToken(ctx context.Context, rcFactory factory.ResourceClientFactory, token string) ({{ .Name }}Client, error) {
rc, err := rcFactory.NewResourceClient(ctx, factory.NewResourceClientParams{
ResourceType: &{{ .Name }}{},
Token: token,
})
if err != nil {
return nil, errors.Wrapf(err, "creating base {{ .Name }} resource client")
}
return New{{ .Name }}ClientWithBase(rc), nil
}
func New{{ .Name }}ClientWithBase(rc clients.ResourceClient) {{ .Name }}Client {
return &{{ lower_camel .Name }}Client{
rc: rc,
}
}
func (client *{{ lower_camel .Name }}Client) BaseClient() clients.ResourceClient {
return client.rc
}
func (client *{{ lower_camel .Name }}Client) Register() error {
return client.rc.Register()
}
{{ if .ClusterScoped }}
func (client *{{ lower_camel .Name }}Client) Read(name string, opts clients.ReadOpts) (*{{ .Name }}, error) {
{{- else }}
func (client *{{ lower_camel .Name }}Client) Read(namespace, name string, opts clients.ReadOpts) (*{{ .Name }}, error) {
{{- end }}
opts = opts.WithDefaults()
{{ if .ClusterScoped }}
resource, err := client.rc.Read("", name, opts)
{{- else }}
resource, err := client.rc.Read(namespace, name, opts)
{{- end }}
if err != nil {
return nil, err
}
return resource.(*{{ .Name }}), nil
}
func (client *{{ lower_camel .Name }}Client) Write({{ lower_camel .Name }} *{{ .Name }}, opts clients.WriteOpts) (*{{ .Name }}, error) {
opts = opts.WithDefaults()
resource, err := client.rc.Write({{ lower_camel .Name }}, opts)
if err != nil {
return nil, err
}
return resource.(*{{ .Name }}), nil
}
{{ if .ClusterScoped }}
func (client *{{ lower_camel .Name }}Client) Delete(name string, opts clients.DeleteOpts) error {
{{- else }}
func (client *{{ lower_camel .Name }}Client) Delete(namespace, name string, opts clients.DeleteOpts) error {
{{- end }}
opts = opts.WithDefaults()
{{ if .ClusterScoped }}
return client.rc.Delete("", name, opts)
{{- else }}
return client.rc.Delete(namespace, name, opts)
{{- end }}
}
{{ if .ClusterScoped }}
func (client *{{ lower_camel .Name }}Client) List(opts clients.ListOpts) ({{ .Name }}List, error) {
{{- else }}
func (client *{{ lower_camel .Name }}Client) List(namespace string, opts clients.ListOpts) ({{ .Name }}List, error) {
{{- end }}
opts = opts.WithDefaults()
{{ if .ClusterScoped }}
resourceList, err := client.rc.List("", opts)
{{- else }}
resourceList, err := client.rc.List(namespace, opts)
{{- end }}
if err != nil {
return nil, err
}
return convertTo{{ .Name }}(resourceList), nil
}
{{ if .ClusterScoped }}
func (client *{{ lower_camel .Name }}Client) Watch(opts clients.WatchOpts) (<-chan {{ .Name }}List, <-chan error, error) {
{{- else }}
func (client *{{ lower_camel .Name }}Client) Watch(namespace string, opts clients.WatchOpts) (<-chan {{ .Name }}List, <-chan error, error) {
{{- end }}
opts = opts.WithDefaults()
{{ if .ClusterScoped }}
resourcesChan, errs, initErr := client.rc.Watch("", opts)
{{- else }}
resourcesChan, errs, initErr := client.rc.Watch(namespace, opts)
{{- end }}
if initErr != nil {
return nil, nil, initErr
}
{{ lower_camel .PluralName }}Chan := make(chan {{ .Name }}List)
go func() {
for {
select {
case resourceList := <-resourcesChan:
{{ lower_camel .PluralName }}Chan <- convertTo{{ .Name }}(resourceList)
case <-opts.Ctx.Done():
close({{ lower_camel .PluralName }}Chan)
return
}
}
}()
return {{ lower_camel .PluralName }}Chan, errs, nil
}
func convertTo{{ .Name }}(resources resources.ResourceList) {{ .Name }}List {
var {{ lower_camel .Name }}List {{ .Name }}List
for _, resource := range resources {
{{ lower_camel .Name }}List = append({{ lower_camel .Name }}List, resource.(*{{ .Name }}))
}
return {{ lower_camel .Name }}List
}
`))
View Source
var ResourceClientTestTemplate = template.Must(template.New("resource_client_test").Funcs(Funcs).Parse(`// +build solokit
package {{ .Project.ProjectConfig.Version }}
import (
"context"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/solo-io/solo-kit/test/helpers"
"github.com/solo-io/solo-kit/pkg/api/v1/clients"
"github.com/solo-io/solo-kit/pkg/api/v1/resources"
"github.com/solo-io/solo-kit/pkg/errors"
"github.com/solo-io/solo-kit/pkg/api/v1/resources/core"
"github.com/solo-io/solo-kit/test/helpers"
"github.com/solo-io/solo-kit/test/tests/typed"
)
var _ = Describe("{{ .Name }}Client", func() {
var ctx context.Context
{{- if (not .ClusterScoped) }}
var (
namespace string
)
{{- end }}
for _, test := range []typed.ResourceClientTester{
{{- if (not .IsCustom) }}
&typed.KubeRcTester{Crd: {{ .Name }}Crd},
{{- end }}
{{- /* cluster-scoped resources are currently only supported by crd client */}}
{{- if (not .ClusterScoped) }}
&typed.ConsulRcTester{},
&typed.FileRcTester{},
&typed.MemoryRcTester{},
&typed.VaultRcTester{},
&typed.KubeSecretRcTester{},
&typed.KubeConfigMapRcTester{},
{{- end }}
} {
Context("resource client backed by "+test.Description(), func() {
var (
client {{ .Name }}Client
err error
name1, name2, name3 = "foo"+helpers.RandString(3), "boo"+helpers.RandString(3), "goo"+helpers.RandString(3)
)
{{- if .ClusterScoped }}
{{/* cluster-scoped resources get no namespace, must delete individual resources*/}}
BeforeEach(func() {
ctx = context.Background()
factory := test.Setup(ctx, "")
client, err = New{{ .Name }}Client(ctx, factory)
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
client.Delete(name1, clients.DeleteOpts{})
client.Delete(name2, clients.DeleteOpts{})
client.Delete(name3, clients.DeleteOpts{})
})
It("CRUDs {{ .Name }}s "+test.Description(), func() {
{{ .Name }}ClientTest(client, name1, name2, name3)
})
{{- else }}
{{/* non-cluster-scoped resources get a namespace and then the ns is deleted*/}}
BeforeEach(func() {
namespace = helpers.RandString(6)
ctx = context.Background()
factory := test.Setup(ctx, namespace)
client, err = New{{ .Name }}Client(ctx, factory)
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
test.Teardown(ctx, namespace)
})
It("CRUDs {{ .Name }}s "+test.Description(), func() {
{{ .Name }}ClientTest(namespace, client, name1, name2, name3)
})
{{- end }}
})
}
})
{{- if .ClusterScoped }}
func {{ .Name }}ClientTest(client {{ .Name }}Client, name1, name2, name3 string) {
{{- else }}
func {{ .Name }}ClientTest(namespace string, client {{ .Name }}Client, name1, name2, name3 string) {
{{- end }}
err := client.Register()
Expect(err).NotTo(HaveOccurred())
name := name1
{{- if .ClusterScoped }}
input := New{{ .Name }}("", name)
{{- else }}
input := New{{ .Name }}(namespace, name)
{{- end }}
r1, err := client.Write(input, clients.WriteOpts{})
Expect(err).NotTo(HaveOccurred())
_, err = client.Write(input, clients.WriteOpts{})
Expect(err).To(HaveOccurred())
Expect(errors.IsExist(err)).To(BeTrue())
Expect(r1).To(BeAssignableToTypeOf(&{{ .Name }}{}))
Expect(r1.GetMetadata().Name).To(Equal(name))
{{- if (not .ClusterScoped) }}
Expect(r1.GetMetadata().Namespace).To(Equal(namespace))
{{- end }}
Expect(r1.GetMetadata().ResourceVersion).NotTo(Equal(input.GetMetadata().ResourceVersion))
Expect(r1.GetMetadata().Ref()).To(Equal(input.GetMetadata().Ref()))
{{- range .Fields }}
{{- if and (not (eq .Name "metadata")) (not .IsOneof) }}
Expect(r1.{{ upper_camel .Name }}).To(Equal(input.{{ upper_camel .Name }}))
{{- end }}
{{- end }}
_, err = client.Write(input, clients.WriteOpts{
OverwriteExisting: true,
})
Expect(err).To(HaveOccurred())
resources.UpdateMetadata(input, func(meta *core.Metadata) {
meta.ResourceVersion = r1.GetMetadata().ResourceVersion
})
r1, err = client.Write(input, clients.WriteOpts{
OverwriteExisting: true,
})
Expect(err).NotTo(HaveOccurred())
{{- if .ClusterScoped }}
read, err := client.Read(name, clients.ReadOpts{})
{{- else }}
read, err := client.Read(namespace, name, clients.ReadOpts{})
{{- end }}
Expect(err).NotTo(HaveOccurred())
Expect(read).To(Equal(r1))
{{- if (not .ClusterScoped) }}
_, err = client.Read("doesntexist", name, clients.ReadOpts{})
Expect(err).To(HaveOccurred())
Expect(errors.IsNotExist(err)).To(BeTrue())
{{- end }}
name = name2
input = &{{ .Name }}{}
input.SetMetadata(&core.Metadata{
Name: name,
{{- if (not .ClusterScoped) }}
Namespace: namespace,
{{- end }}
})
r2, err := client.Write(input, clients.WriteOpts{})
Expect(err).NotTo(HaveOccurred())
{{- if .ClusterScoped }}
list, err := client.List(clients.ListOpts{})
{{- else }}
list, err := client.List(namespace, clients.ListOpts{})
{{- end }}
Expect(err).NotTo(HaveOccurred())
Expect(list).To(ContainElement(r1))
Expect(list).To(ContainElement(r2))
{{- if .ClusterScoped }}
err = client.Delete("adsfw", clients.DeleteOpts{})
{{- else }}
err = client.Delete(namespace, "adsfw", clients.DeleteOpts{})
{{- end }}
Expect(err).To(HaveOccurred())
Expect(errors.IsNotExist(err)).To(BeTrue())
{{- if .ClusterScoped }}
err = client.Delete("adsfw", clients.DeleteOpts{
{{- else }}
err = client.Delete(namespace, "adsfw", clients.DeleteOpts{
{{- end }}
IgnoreNotExist: true,
})
Expect(err).NotTo(HaveOccurred())
{{- if .ClusterScoped }}
err = client.Delete(r2.GetMetadata().Name, clients.DeleteOpts{})
{{- else }}
err = client.Delete(namespace, r2.GetMetadata().Name, clients.DeleteOpts{})
{{- end }}
Expect(err).NotTo(HaveOccurred())
Eventually(func() {{ .Name }}List {
{{- if .ClusterScoped }}
list, err = client.List(clients.ListOpts{})
{{- else }}
list, err = client.List(namespace, clients.ListOpts{})
{{- end }}
Expect(err).NotTo(HaveOccurred())
return list
}, time.Second * 10).Should(ContainElement(r1))
Eventually(func() {{ .Name }}List {
{{- if .ClusterScoped }}
list, err = client.List(clients.ListOpts{})
{{- else }}
list, err = client.List(namespace, clients.ListOpts{})
{{- end }}
Expect(err).NotTo(HaveOccurred())
return list
}, time.Second * 10).ShouldNot(ContainElement(r2))
{{- if .ClusterScoped }}
w, errs, err := client.Watch(clients.WatchOpts{
{{- else }}
w, errs, err := client.Watch(namespace, clients.WatchOpts{
{{- end }}
RefreshRate: time.Hour,
})
Expect(err).NotTo(HaveOccurred())
var r3 resources.Resource
wait := make(chan struct{})
go func() {
defer close(wait)
defer GinkgoRecover()
resources.UpdateMetadata(r2, func(meta *core.Metadata) {
meta.ResourceVersion = ""
})
r2, err = client.Write(r2, clients.WriteOpts{})
Expect(err).NotTo(HaveOccurred())
name = name3
input = &{{ .Name }}{}
Expect(err).NotTo(HaveOccurred())
input.SetMetadata(&core.Metadata{
Name: name,
{{- if (not .ClusterScoped) }}
Namespace: namespace,
{{- end }}
})
r3, err = client.Write(input, clients.WriteOpts{})
Expect(err).NotTo(HaveOccurred())
}()
<-wait
select {
case err := <-errs:
Expect(err).NotTo(HaveOccurred())
case list = <-w:
case <-time.After(time.Millisecond * 5):
Fail("expected a message in channel")
}
go func() {
defer GinkgoRecover()
for {
select {
case err := <-errs:
Expect(err).NotTo(HaveOccurred())
case <-time.After(time.Second / 4):
return
}
}
}()
Eventually(w, time.Second*5, time.Second/10).Should(Receive(And(ContainElement(r1), ContainElement(r3), ContainElement(r3))))
}
`))
View Source
var ResourceGroupEmitterTemplate = template.Must(template.New("resource_group_emitter").Funcs(Funcs).Parse(
`package {{ .Project.ProjectConfig.Version }}
{{- $client_declarations := new_str_slice }}
{{- $clients := new_str_slice }}
{{- range .Resources}}
{{- $client_declarations := (append_str_slice $client_declarations (printf "%vClient %v%vClient" (lower_camel .Name) .ImportPrefix .Name)) }}
{{- $clients := (append_str_slice $clients (printf "%vClient" (lower_camel .Name))) }}
{{- end}}
{{- $client_declarations := (join_str_slice $client_declarations ", ") }}
{{- $clients := (join_str_slice $clients ", ") }}
import (
"fmt"
"sync"
"time"
{{ .Imports }}
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.uber.org/zap"
"github.com/solo-io/solo-kit/pkg/api/v1/clients"
"github.com/solo-io/solo-kit/pkg/errors"
skstats "github.com/solo-io/solo-kit/pkg/stats"
"github.com/solo-io/go-utils/errutils"
"github.com/solo-io/go-utils/contextutils"
)
{{ $emitter_prefix := (print (snake .Name) "/emitter") }}
{{ $resource_group := upper_camel .GoName }}
var (
// Deprecated. See m{{ $resource_group }}ResourcesIn
m{{ $resource_group }}SnapshotIn = stats.Int64("{{ $emitter_prefix }}/snap_in", "Deprecated. Use {{ $emitter_prefix }}/resources_in. The number of snapshots in", "1")
// metrics for emitter
m{{ $resource_group }}ResourcesIn = stats.Int64("{{ $emitter_prefix }}/resources_in", "The number of resource lists received on open watch channels", "1")
m{{ $resource_group }}SnapshotOut = stats.Int64("{{ $emitter_prefix }}/snap_out", "The number of snapshots out", "1")
m{{ $resource_group }}SnapshotMissed = stats.Int64("{{ $emitter_prefix }}/snap_missed", "The number of snapshots missed", "1")
// views for emitter
// deprecated: see {{ lower_camel .GoName }}ResourcesInView
{{ lower_camel .GoName }}snapshotInView = &view.View{
Name: "{{ $emitter_prefix }}/snap_in",
Measure: m{{ $resource_group }}SnapshotIn,
Description: "Deprecated. Use {{ $emitter_prefix }}/resources_in. The number of snapshots updates coming in.",
Aggregation: view.Count(),
TagKeys: []tag.Key{
},
}
{{ lower_camel .GoName }}ResourcesInView = &view.View{
Name: "{{ $emitter_prefix }}/resources_in",
Measure: m{{ $resource_group }}ResourcesIn,
Description: "The number of resource lists received on open watch channels",
Aggregation: view.Count(),
TagKeys: []tag.Key{
skstats.NamespaceKey,
skstats.ResourceKey,
},
}
{{ lower_camel .GoName }}snapshotOutView = &view.View{
Name: "{{ $emitter_prefix }}/snap_out",
Measure: m{{ $resource_group }}SnapshotOut,
Description: "The number of snapshots updates going out",
Aggregation: view.Count(),
TagKeys: []tag.Key{
},
}
{{ lower_camel .GoName }}snapshotMissedView = &view.View{
Name: "{{ $emitter_prefix }}/snap_missed",
Measure: m{{ $resource_group }}SnapshotMissed,
Description: "The number of snapshots updates going missed. this can happen in heavy load. missed snapshot will be re-tried after a second.",
Aggregation: view.Count(),
TagKeys: []tag.Key{
},
}
)
func init() {
view.Register(
{{ lower_camel .GoName }}snapshotInView,
{{ lower_camel .GoName }}snapshotOutView,
{{ lower_camel .GoName }}snapshotMissedView,
{{ lower_camel .GoName }}ResourcesInView,
)
}
type {{ .GoName }}SnapshotEmitter interface {
Snapshots(watchNamespaces []string, opts clients.WatchOpts) (<-chan *{{ .GoName }}Snapshot, <-chan error, error)
}
type {{ .GoName }}Emitter interface {
{{ .GoName }}SnapshotEmitter
Register() error
{{- range .Resources}}
{{ .Name }}() {{ .ImportPrefix }}{{ .Name }}Client
{{- end}}
}
func New{{ .GoName }}Emitter({{ $client_declarations }}) {{ .GoName }}Emitter {
return New{{ .GoName }}EmitterWithEmit({{ $clients }}, make(chan struct{}))
}
func New{{ .GoName }}EmitterWithEmit({{ $client_declarations }}, emit <-chan struct{}) {{ .GoName }}Emitter {
return &{{ lower_camel .GoName }}Emitter{
{{- range .Resources}}
{{ lower_camel .Name }}:{{ lower_camel .Name }}Client,
{{- end}}
forceEmit: emit,
}
}
type {{ lower_camel .GoName }}Emitter struct {
forceEmit <- chan struct{}
{{- range .Resources}}
{{ lower_camel .Name }} {{ .ImportPrefix }}{{ .Name }}Client
{{- end}}
}
func (c *{{ lower_camel .GoName }}Emitter) Register() error {
{{- range .Resources}}
if err := c.{{ lower_camel .Name }}.Register(); err != nil {
return err
}
{{- end}}
return nil
}
{{- range .Resources}}
func (c *{{ lower_camel $.GoName }}Emitter) {{ .Name }}() {{ .ImportPrefix }}{{ .Name }}Client {
return c.{{ lower_camel .Name }}
}
{{- end}}
func (c *{{ lower_camel .GoName }}Emitter) Snapshots(watchNamespaces []string, opts clients.WatchOpts) (<-chan *{{ .GoName }}Snapshot, <-chan error, error) {
if len(watchNamespaces) == 0 {
watchNamespaces = []string{""}
}
for _, ns := range watchNamespaces {
if ns == "" && len(watchNamespaces) > 1 {
return nil, nil, errors.Errorf("the \"\" namespace is used to watch all namespaces. Snapshots can either be tracked for "+
"specific namespaces or \"\" AllNamespaces, but not both.")
}
}
errs := make(chan error)
var done sync.WaitGroup
ctx := opts.Ctx
{{- range .Resources}}
/* Create channel for {{ .Name }} */
{{- if (not .ClusterScoped) }}
type {{ lower_camel .Name }}ListWithNamespace struct {
list {{ .ImportPrefix }}{{ .Name }}List
namespace string
}
{{ lower_camel .Name }}Chan := make(chan {{ lower_camel .Name }}ListWithNamespace)
var initial{{ upper_camel .Name }}List {{ .ImportPrefix }}{{ .Name }}List{{- end }}
{{- end}}
currentSnapshot := {{ .GoName }}Snapshot{}
for _, namespace := range watchNamespaces {
{{- range .Resources}}
{{- if (not .ClusterScoped) }}
/* Setup namespaced watch for {{ .Name }} */
{
{{ lower_camel .PluralName }}, err := c.{{ lower_camel .Name }}.List(namespace, clients.ListOpts{Ctx: opts.Ctx, Selector: opts.Selector})
if err != nil {
return nil, nil, errors.Wrapf(err, "initial {{ .Name }} list")
}
initial{{ upper_camel .Name }}List = append(initial{{ upper_camel .Name }}List, {{ lower_camel .PluralName }}...)
}
{{ lower_camel .Name }}NamespacesChan, {{ lower_camel .Name }}Errs, err := c.{{ lower_camel .Name }}.Watch(namespace, opts)
if err != nil {
return nil, nil, errors.Wrapf(err, "starting {{ .Name }} watch")
}
done.Add(1)
go func(namespace string) {
defer done.Done()
errutils.AggregateErrs(ctx, errs, {{ lower_camel .Name }}Errs, namespace+"-{{ lower_camel .PluralName }}")
}(namespace)
{{- end }}
{{- end}}
/* Watch for changes and update snapshot */
go func(namespace string) {
for {
select {
case <-ctx.Done():
return
{{- range .Resources}}
{{- if (not .ClusterScoped) }}
case {{ lower_camel .Name }}List, ok := <- {{ lower_camel .Name }}NamespacesChan:
if !ok {
return
}
select {
case <-ctx.Done():
return
case {{ lower_camel .Name }}Chan <- {{ lower_camel .Name }}ListWithNamespace{list:{{ lower_camel .Name }}List, namespace:namespace}:
}
{{- end }}
{{- end}}
}
}
}(namespace)
}
{{- range .Resources}}
{{- if .ClusterScoped }}
/* Setup cluster-wide watch for {{ .Name }} */
var err error
currentSnapshot.{{ upper_camel .PluralName }},err = c.{{ lower_camel .Name }}.List(clients.ListOpts{Ctx: opts.Ctx, Selector: opts.Selector})
if err != nil {
return nil, nil, errors.Wrapf(err, "initial {{ .Name }} list")
}
{{ lower_camel .Name }}Chan, {{ lower_camel .Name }}Errs, err := c.{{ lower_camel .Name }}.Watch(opts)
if err != nil {
return nil, nil, errors.Wrapf(err, "starting {{ .Name }} watch")
}
done.Add(1)
go func() {
defer done.Done()
errutils.AggregateErrs(ctx, errs, {{ lower_camel .Name }}Errs, "{{ lower_camel .PluralName }}")
}()
{{- else }}
/* Initialize snapshot for {{ upper_camel .PluralName }} */
currentSnapshot.{{ upper_camel .PluralName }} = initial{{ upper_camel .Name }}List.Sort()
{{- end }}
{{- end}}
snapshots := make(chan *{{ .GoName }}Snapshot)
go func() {
// sent initial snapshot to kick off the watch
initialSnapshot := currentSnapshot.Clone()
snapshots <- &initialSnapshot
timer := time.NewTicker(time.Second * 1)
previousHash, err := currentSnapshot.Hash(nil)
if err != nil {
contextutils.LoggerFrom(ctx).Panicw("error while hashing, this should never happen", zap.Error(err))
}
sync := func() {
currentHash, err := currentSnapshot.Hash(nil)
// this should never happen, so panic if it does
if err != nil {
contextutils.LoggerFrom(ctx).Panicw("error while hashing, this should never happen", zap.Error(err))
}
if previousHash == currentHash {
return
}
sentSnapshot := currentSnapshot.Clone()
select {
case snapshots <- &sentSnapshot:
stats.Record(ctx, m{{ $resource_group }}SnapshotOut.M(1))
previousHash = currentHash
default:
stats.Record(ctx, m{{ $resource_group }}SnapshotMissed.M(1))
}
}
{{- range .Resources}}
{{- if not .ClusterScoped }}
{{ lower_camel .PluralName }}ByNamespace := make(map[string]{{ .ImportPrefix }}{{ .Name }}List)
{{- end }}
{{- end }}
defer func() {
close(snapshots)
// we must wait for done before closing the error chan,
// to avoid sending on close channel.
done.Wait()
close(errs)
}()
for {
record := func(){stats.Record(ctx, m{{ $resource_group }}SnapshotIn.M(1))}
select {
case <-timer.C:
sync()
case <-ctx.Done():
return
case <-c.forceEmit:
sentSnapshot := currentSnapshot.Clone()
snapshots <- &sentSnapshot
{{- range .Resources}}
{{- if .ClusterScoped }}
case {{ lower_camel .Name }}List, ok := <- {{ lower_camel .Name }}Chan:
if !ok {
return
}
record()
skstats.IncrementResourceCount(
ctx,
"<all>",
"{{ snake .Name }}",
m{{ $resource_group }}ResourcesIn,
)
currentSnapshot.{{ upper_camel .PluralName }} = {{ lower_camel .Name }}List
{{- else }}
case {{ lower_camel .Name }}NamespacedList, ok := <- {{ lower_camel .Name }}Chan:
if !ok {
return
}
record()
namespace := {{ lower_camel .Name }}NamespacedList.namespace
skstats.IncrementResourceCount(
ctx,
namespace,
"{{ snake .Name }}",
m{{ $resource_group }}ResourcesIn,
)
// merge lists by namespace
{{ lower_camel .PluralName }}ByNamespace[namespace] = {{ lower_camel .Name }}NamespacedList.list
var {{ lower_camel .Name }}List {{ .ImportPrefix }}{{ .Name }}List
for _, {{ lower_camel .PluralName }} := range {{ lower_camel .PluralName }}ByNamespace {
{{ lower_camel .Name }}List = append({{ lower_camel .Name }}List, {{ lower_camel .PluralName }}...)
}
currentSnapshot.{{ upper_camel .PluralName }} = {{ lower_camel .Name }}List.Sort()
{{- end }}
{{- end}}
}
}
}()
return snapshots, errs, nil
}
`))
View Source
var ResourceGroupEmitterTestTemplate = template.Must(template.New("resource_group_emitter_test").Funcs(Funcs).Parse(`// +build solokit
package {{ .Project.ProjectConfig.Version }}
{{- /* we need to know if the tests require a crd client or a regular clientset */ -}}
{{- $clients := new_str_slice }}
{{- $need_kube_config := false }}
{{- range .Resources}}
{{- $clients := (append_str_slice $clients (printf "%vClient" (lower_camel .Name))) }}
{{- if .HasStatus }}
{{- $need_kube_config = true }}
{{- end}}
{{- end}}
{{- $clients := (join_str_slice $clients ", ") }}
import (
"context"
"os"
"time"
{{ .Imports }}
"k8s.io/client-go/kubernetes"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/solo-io/go-utils/log"
"github.com/solo-io/solo-kit/pkg/api/v1/clients"
"github.com/solo-io/solo-kit/pkg/api/v1/clients/factory"
"github.com/solo-io/solo-kit/pkg/api/v1/clients/memory"
"github.com/solo-io/solo-kit/test/helpers"
"github.com/solo-io/solo-kit/test/setup"
"github.com/solo-io/k8s-utils/kubeutils"
"github.com/solo-io/solo-kit/test/util"
kuberc "github.com/solo-io/solo-kit/pkg/api/v1/clients/kube"
"k8s.io/client-go/rest"
// Needed to run tests in GKE
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
// From https://github.com/kubernetes/client-go/blob/53c7adfd0294caa142d961e1f780f74081d5b15f/examples/out-of-cluster-client-configuration/main.go#L31
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)
var _ = Describe("{{ upper_camel .Project.ProjectConfig.Version }}Emitter", func() {
if os.Getenv("RUN_KUBE_TESTS") != "1" {
log.Printf("This test creates kubernetes resources and is disabled by default. To enable, set RUN_KUBE_TESTS=1 in your env.")
return
}
var (
ctx context.Context
namespace1 string
namespace2 string
name1, name2 = "angela"+helpers.RandString(3), "bob"+helpers.RandString(3)
{{- if $need_kube_config }}
cfg *rest.Config
{{- end}}
kube kubernetes.Interface
emitter {{ .GoName }}Emitter
{{- range .Resources }}
{{ lower_camel .Name }}Client {{ .ImportPrefix }}{{ .Name }}Client
{{- end}}
)
BeforeEach(func() {
ctx = context.Background()
namespace1 = helpers.RandString(8)
namespace2 = helpers.RandString(8)
kube = helpers.MustKubeClient()
err := kubeutils.CreateNamespacesInParallel(ctx, kube, namespace1, namespace2)
Expect(err).NotTo(HaveOccurred())
{{- if $need_kube_config }}
cfg, err = kubeutils.GetConfig("", "")
Expect(err).NotTo(HaveOccurred())
{{- end}}
{{- range .Resources }}
// {{ .Name }} Constructor
{{- if .HasStatus }}
{{ lower_camel .Name }}ClientFactory := &factory.KubeResourceClientFactory{
Crd: {{ .ImportPrefix }}{{ .Name }}Crd,
Cfg: cfg,
SharedCache: kuberc.NewKubeCache(context.TODO()),
}
{{- else }}
{{ lower_camel .Name }}ClientFactory := &factory.MemoryResourceClientFactory{
Cache: memory.NewInMemoryResourceCache(),
}
{{- end }}
{{ lower_camel .Name }}Client, err = {{ .ImportPrefix }}New{{ .Name }}Client(ctx, {{ lower_camel .Name }}ClientFactory)
Expect(err).NotTo(HaveOccurred())
{{- end}}
emitter = New{{ .GoName }}Emitter({{ $clients }})
})
AfterEach(func() {
err := kubeutils.DeleteNamespacesInParallelBlocking(ctx, kube, namespace1, namespace2)
Expect(err).NotTo(HaveOccurred())
{{- range .Resources }}
{{- if .ClusterScoped }}
{{ lower_camel .Name }}Client.Delete(name1, clients.DeleteOpts{})
{{ lower_camel .Name }}Client.Delete(name2, clients.DeleteOpts{})
{{- end }}
{{- end }}
})
It("tracks snapshots on changes to any resource", func() {
ctx := context.Background()
err := emitter.Register()
Expect(err).NotTo(HaveOccurred())
snapshots, errs, err := emitter.Snapshots([]string{namespace1, namespace2}, clients.WatchOpts{
Ctx: ctx,
RefreshRate: time.Second,
})
Expect(err).NotTo(HaveOccurred())
var snap *{{ .GoName }}Snapshot
{{- range .Resources }}
/*
{{ .Name }}
*/
assertSnapshot{{ .PluralName }} := func(expect{{ .PluralName }} {{ .ImportPrefix }}{{ .Name }}List, unexpect{{ .PluralName }} {{ .ImportPrefix }}{{ .Name }}List) {
drain:
for {
select {
case snap = <-snapshots:
for _, expected := range expect{{ .PluralName }} {
if _, err := snap.{{ upper_camel .PluralName }}.Find(expected.GetMetadata().Ref().Strings()); err != nil {
continue drain
}
}
for _, unexpected := range unexpect{{ .PluralName }} {
if _, err := snap.{{ upper_camel .PluralName }}.Find(unexpected.GetMetadata().Ref().Strings()); err == nil {
continue drain
}
}
break drain
case err := <-errs:
Expect(err).NotTo(HaveOccurred())
case <-time.After(time.Second * 10):
{{- if .ClusterScoped }}
combined, _ := {{ lower_camel .Name }}Client.List(clients.ListOpts{})
{{- else }}
nsList1, _ := {{ lower_camel .Name }}Client.List(namespace1, clients.ListOpts{})
nsList2, _ := {{ lower_camel .Name }}Client.List(namespace2, clients.ListOpts{})
combined := append(nsList1, nsList2...)
{{- end }}
Fail("expected final snapshot before 10 seconds. expected " + log.Sprintf("%v", combined))
}
}
}
{{- if .ClusterScoped }}
{{ lower_camel .Name }}1a, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace1, name1), clients.WriteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a }, nil)
{{- else }}
{{ lower_camel .Name }}1a, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace1, name1), clients.WriteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
{{ lower_camel .Name }}1b, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace2, name1), clients.WriteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}1b }, nil)
{{- end }}
{{- if .ClusterScoped }}
{{ lower_camel .Name }}2a, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace1, name2), clients.WriteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}2a }, nil)
{{- else }}
{{ lower_camel .Name }}2a, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace1, name2), clients.WriteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
{{ lower_camel .Name }}2b, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace2, name2), clients.WriteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}1b, {{ lower_camel .Name }}2a, {{ lower_camel .Name }}2b }, nil)
{{- end }}
{{- if .ClusterScoped }}
err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}2a.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a }, {{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}2a })
{{- else }}
err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}2a.GetMetadata().Namespace, {{ lower_camel .Name }}2a.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}2b.GetMetadata().Namespace, {{ lower_camel .Name }}2b.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}1b }, {{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}2a, {{ lower_camel .Name }}2b })
{{- end }}
{{- if .ClusterScoped }}
err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}1a.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
assertSnapshot{{ .PluralName }}(nil, {{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}2a })
{{- else }}
err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}1a.GetMetadata().Namespace, {{ lower_camel .Name }}1a.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}1b.GetMetadata().Namespace, {{ lower_camel .Name }}1b.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
assertSnapshot{{ .PluralName }}(nil, {{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}1b, {{ lower_camel .Name }}2a, {{ lower_camel .Name }}2b })
{{- end }}
{{- end}}
})
It("tracks snapshots on changes to any resource using AllNamespace", func() {
ctx := context.Background()
err := emitter.Register()
Expect(err).NotTo(HaveOccurred())
snapshots, errs, err := emitter.Snapshots([]string{""}, clients.WatchOpts{
Ctx: ctx,
RefreshRate: time.Second,
})
Expect(err).NotTo(HaveOccurred())
var snap *{{ .GoName }}Snapshot
{{- range .Resources }}
/*
{{ .Name }}
*/
assertSnapshot{{ .PluralName }} := func(expect{{ .PluralName }} {{ .ImportPrefix }}{{ .Name }}List, unexpect{{ .PluralName }} {{ .ImportPrefix }}{{ .Name }}List) {
drain:
for {
select {
case snap = <-snapshots:
for _, expected := range expect{{ .PluralName }} {
if _, err := snap.{{ upper_camel .PluralName }}.Find(expected.GetMetadata().Ref().Strings()); err != nil {
continue drain
}
}
for _, unexpected := range unexpect{{ .PluralName }} {
if _, err := snap.{{ upper_camel .PluralName }}.Find(unexpected.GetMetadata().Ref().Strings()); err == nil {
continue drain
}
}
break drain
case err := <-errs:
Expect(err).NotTo(HaveOccurred())
case <-time.After(time.Second * 10):
{{- if .ClusterScoped }}
combined, _ := {{ lower_camel .Name }}Client.List(clients.ListOpts{})
{{- else }}
nsList1, _ := {{ lower_camel .Name }}Client.List(namespace1, clients.ListOpts{})
nsList2, _ := {{ lower_camel .Name }}Client.List(namespace2, clients.ListOpts{})
combined := append(nsList1, nsList2...)
{{- end }}
Fail("expected final snapshot before 10 seconds. expected " + log.Sprintf("%v", combined))
}
}
}
{{- if .ClusterScoped }}
{{ lower_camel .Name }}1a, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace1, name1), clients.WriteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a }, nil)
{{- else }}
{{ lower_camel .Name }}1a, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace1, name1), clients.WriteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
{{ lower_camel .Name }}1b, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace2, name1), clients.WriteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}1b }, nil)
{{- end }}
{{- if .ClusterScoped }}
{{ lower_camel .Name }}2a, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace1, name2), clients.WriteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}2a }, nil)
{{- else }}
{{ lower_camel .Name }}2a, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace1, name2), clients.WriteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
{{ lower_camel .Name }}2b, err := {{ lower_camel .Name }}Client.Write({{ .ImportPrefix }}New{{ .Name }}(namespace2, name2), clients.WriteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}1b, {{ lower_camel .Name }}2a, {{ lower_camel .Name }}2b }, nil)
{{- end }}
{{- if .ClusterScoped }}
err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}2a.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a }, {{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}2a })
{{- else }}
err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}2a.GetMetadata().Namespace, {{ lower_camel .Name }}2a.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}2b.GetMetadata().Namespace, {{ lower_camel .Name }}2b.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
assertSnapshot{{ .PluralName }}({{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}1b }, {{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}2a, {{ lower_camel .Name }}2b })
{{- end }}
{{- if .ClusterScoped }}
err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}1a.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
assertSnapshot{{ .PluralName }}(nil, {{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}2a })
{{- else }}
err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}1a.GetMetadata().Namespace, {{ lower_camel .Name }}1a.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
err = {{ lower_camel .Name }}Client.Delete({{ lower_camel .Name }}1b.GetMetadata().Namespace, {{ lower_camel .Name }}1b.GetMetadata().Name, clients.DeleteOpts{Ctx: ctx})
Expect(err).NotTo(HaveOccurred())
assertSnapshot{{ .PluralName }}(nil, {{ .ImportPrefix }}{{ .Name }}List{ {{ lower_camel .Name }}1a, {{ lower_camel .Name }}1b, {{ lower_camel .Name }}2a, {{ lower_camel .Name }}2b })
{{- end }}
{{- end}}
})
})
`))
View Source
var ResourceGroupEventLoopTemplate = template.Must(template.New("resource_group_event_loop").Funcs(Funcs).Parse(`package {{ .Project.ProjectConfig.Version }}
import (
"context"
"go.opencensus.io/trace"
"github.com/hashicorp/go-multierror"
"github.com/solo-io/solo-kit/pkg/api/v1/clients"
"github.com/solo-io/solo-kit/pkg/api/v1/eventloop"
"github.com/solo-io/solo-kit/pkg/errors"
"github.com/solo-io/go-utils/contextutils"
"github.com/solo-io/go-utils/errutils"
)
type {{ .GoName }}Syncer interface {
Sync(context.Context, *{{ .GoName }}Snapshot) error
}
type {{ .GoName }}Syncers []{{ .GoName }}Syncer
func (s {{ .GoName }}Syncers) Sync(ctx context.Context, snapshot *{{ .GoName }}Snapshot) error {
var multiErr *multierror.Error
for _, syncer := range s {
if err := syncer.Sync(ctx, snapshot); err != nil {
multiErr = multierror.Append(multiErr, err)
}
}
return multiErr.ErrorOrNil()
}
type {{ lower_camel .GoName }}EventLoop struct {
emitter {{ .GoName }}SnapshotEmitter
syncer {{ .GoName }}Syncer
ready chan struct{}
}
func New{{ .GoName }}EventLoop(emitter {{ .GoName }}SnapshotEmitter, syncer {{ .GoName }}Syncer) eventloop.EventLoop {
return &{{ lower_camel .GoName }}EventLoop{
emitter: emitter,
syncer: syncer,
ready: make(chan struct{}),
}
}
func (el *{{ lower_camel .GoName }}EventLoop) Ready() <-chan struct{} {
return el.ready
}
func (el *{{ lower_camel .GoName }}EventLoop) Run(namespaces []string, opts clients.WatchOpts) (<-chan error, error) {
opts = opts.WithDefaults()
opts.Ctx = contextutils.WithLogger(opts.Ctx, "{{ .Project.ProjectConfig.Version }}.event_loop")
logger := contextutils.LoggerFrom(opts.Ctx)
logger.Infof("event loop started")
errs := make(chan error)
watch, emitterErrs, err := el.emitter.Snapshots(namespaces, opts)
if err != nil {
return nil, errors.Wrapf(err, "starting snapshot watch")
}
go errutils.AggregateErrs(opts.Ctx, errs, emitterErrs, "{{ .Project.ProjectConfig.Version }}.emitter errors")
go func() {
var channelClosed bool
// create a new context for each loop, cancel it before each loop
var cancel context.CancelFunc = func() {}
// use closure to allow cancel function to be updated as context changes
defer func() { cancel() }()
for {
select {
case snapshot, ok := <-watch:
if !ok {
return
}
// cancel any open watches from previous loop
cancel()
ctx, span := trace.StartSpan(opts.Ctx, "{{ .Name }}.EventLoopSync")
ctx, canc := context.WithCancel(ctx)
cancel = canc
err := el.syncer.Sync(ctx, snapshot)
span.End()
if err != nil {
select {
case errs <- err:
default:
logger.Errorf("write error channel is full! could not propagate err: %v", err)
}
} else if !channelClosed {
channelClosed = true
close(el.ready)
}
case <-opts.Ctx.Done():
return
}
}
}()
return errs, nil
}
`))
View Source
var ResourceGroupEventLoopTestTemplate = template.Must(template.New("resource_group_event_loop_test").Funcs(Funcs).Parse(`// +build solokit
package {{ .Project.ProjectConfig.Version }}
{{- $clients := new_str_slice }}
{{- range .Resources}}
{{- $clients := (append_str_slice $clients (printf "%vClient" (lower_camel .Name))) }}
{{- end}}
{{- $clients := (join_str_slice $clients ", ") }}
import (
"context"
"time"
"sync"
{{ .Imports }}
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/solo-io/solo-kit/pkg/api/v1/clients"
"github.com/solo-io/solo-kit/pkg/api/v1/clients/factory"
"github.com/solo-io/solo-kit/pkg/api/v1/clients/memory"
)
var _ = Describe("{{ .GoName }}EventLoop", func() {
var (
ctx context.Context
namespace string
emitter {{ .GoName }}Emitter
err error
)
BeforeEach(func() {
ctx = context.Background()
{{- range .Resources}}
{{ lower_camel .Name }}ClientFactory := &factory.MemoryResourceClientFactory{
Cache: memory.NewInMemoryResourceCache(),
}
{{ lower_camel .Name }}Client, err := {{ .ImportPrefix }}New{{ .Name }}Client(ctx, {{ lower_camel .Name }}ClientFactory)
Expect(err).NotTo(HaveOccurred())
{{- end}}
emitter = New{{ .GoName }}Emitter({{ $clients }})
})
It("runs sync function on a new snapshot", func() {
{{- range .Resources }}
_, err = emitter.{{ .Name }}().Write({{ .ImportPrefix }}New{{ .Name }}(namespace, "jerry"), clients.WriteOpts{})
Expect(err).NotTo(HaveOccurred())
{{- end}}
sync := &mock{{ .GoName }}Syncer{}
el := New{{ .GoName }}EventLoop(emitter, sync)
_, err := el.Run([]string{namespace}, clients.WatchOpts{})
Expect(err).NotTo(HaveOccurred())
Eventually(sync.Synced, 5*time.Second).Should(BeTrue())
})
})
type mock{{ .GoName }}Syncer struct {
synced bool
mutex sync.Mutex
}
func (s *mock{{ .GoName }}Syncer) Synced() bool {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.synced
}
func (s *mock{{ .GoName }}Syncer) Sync(ctx context.Context, snap *{{ .GoName }}Snapshot) error {
s.mutex.Lock()
s.synced = true
s.mutex.Unlock()
return nil
}
`))
View Source
var ResourceGroupSnapshotTemplate = template.Must(template.New("resource_group_snapshot").Funcs(Funcs).Parse(
`package {{ .Project.ProjectConfig.Version }}
import (
"encoding/binary"
"fmt"
"hash"
"hash/fnv"
"log"
{{ .Imports }}
"github.com/solo-io/solo-kit/pkg/api/v1/resources"
"github.com/rotisserie/eris"
"github.com/solo-io/go-utils/hashutils"
"go.uber.org/zap"
)
type {{ .GoName }}Snapshot struct {
{{- range .Resources}}
{{ upper_camel .PluralName }} {{ .ImportPrefix }}{{ .Name }}List
{{- end}}
}
func (s {{ .GoName }}Snapshot) Clone() {{ .GoName }}Snapshot {
return {{ .GoName }}Snapshot{
{{- range .Resources}}
{{ upper_camel .PluralName }}: s.{{ upper_camel .PluralName }}.Clone(),
{{- end}}
}
}
func (s {{ .GoName }}Snapshot) Hash(hasher hash.Hash64) (uint64, error) {
if hasher == nil {
hasher = fnv.New64()
}
{{- range .Resources}}
if _, err := s.hash{{ upper_camel .PluralName }}(hasher); err != nil {
return 0, err
}
{{- end}}
return hasher.Sum64(), nil
}
{{- $ResourceGroup := . }}
{{- range .Resources }}
func (s {{ $ResourceGroup.GoName }}Snapshot) hash{{ upper_camel .PluralName }}(hasher hash.Hash64) (uint64, error) {
{{- if .SkipHashingAnnotations }}
clonedList := s.{{ upper_camel .PluralName }}.Clone()
for _, v := range clonedList {
v.Metadata.Annotations = nil
}
return hashutils.HashAllSafe(hasher, clonedList.AsInterfaces()...)
{{- else }}
return hashutils.HashAllSafe(hasher, s.{{ upper_camel .PluralName }}.AsInterfaces()...)
{{- end }}
}
{{- end}}
func (s {{ .GoName }}Snapshot) HashFields() []zap.Field {
var fields []zap.Field
hasher := fnv.New64()
{{- range .Resources}}
{{ upper_camel .PluralName }}Hash, err := s.hash{{ upper_camel .PluralName }}(hasher)
if err != nil {
log.Println(eris.Wrapf(err, "error hashing, this should never happen"))
}
fields = append(fields, zap.Uint64("{{ lower_camel .PluralName }}", {{ upper_camel .PluralName }}Hash ))
{{- end}}
snapshotHash, err := s.Hash(hasher)
if err != nil {
log.Println(eris.Wrapf(err, "error hashing, this should never happen"))
}
return append(fields, zap.Uint64("snapshotHash", snapshotHash))
}
type {{ .GoName }}SnapshotStringer struct {
Version uint64
{{- range .Resources}}
{{ upper_camel .PluralName }} []string
{{- end}}
}
func (ss {{ .GoName }}SnapshotStringer) String() string {
s := fmt.Sprintf("{{ .GoName }}Snapshot %v\n", ss.Version)
{{- range .Resources}}
s += fmt.Sprintf(" {{ upper_camel .PluralName }} %v\n", len(ss.{{ upper_camel .PluralName }}))
for _, name := range ss.{{ upper_camel .PluralName }} {
s += fmt.Sprintf(" %v\n", name)
}
{{- end}}
return s
}
func (s {{ .GoName }}Snapshot) Stringer() {{ .GoName }}SnapshotStringer {
snapshotHash, err := s.Hash(nil)
if err != nil {
log.Println(eris.Wrapf(err, "error hashing, this should never happen"))
}
return {{ .GoName }}SnapshotStringer{
Version: snapshotHash,
{{- range .Resources}}
{{- if .ClusterScoped }}
{{ upper_camel .PluralName }}: s.{{ upper_camel .PluralName }}.Names(),
{{- else }}
{{ upper_camel .PluralName }}: s.{{ upper_camel .PluralName }}.NamespacesDotNames(),
{{- end }}
{{- end}}
}
}
`))
View Source
var ResourceReconcilerTemplate = template.Must(template.New("resource_client").Funcs(Funcs).Parse(`package {{ .Project.ProjectConfig.Version }}
import (
"github.com/solo-io/solo-kit/pkg/api/v1/clients"
"github.com/solo-io/solo-kit/pkg/api/v1/reconcile"
"github.com/solo-io/solo-kit/pkg/api/v1/resources"
"github.com/solo-io/go-utils/contextutils"
)
// Option to copy anything from the original to the desired before writing. Return value of false means don't update
type Transition{{ .Name }}Func func(original, desired *{{ .Name }}) (bool, error)
type {{ .Name }}Reconciler interface {
Reconcile(namespace string, desiredResources {{ .Name }}List, transition Transition{{ .Name }}Func, opts clients.ListOpts) error
}
func {{ lower_camel .Name }}sToResources(list {{ .Name }}List) resources.ResourceList {
var resourceList resources.ResourceList
for _, {{ lower_camel .Name }} := range list {
resourceList = append(resourceList, {{ lower_camel .Name }})
}
return resourceList
}
func New{{ .Name }}Reconciler(client {{ .Name }}Client) {{ .Name }}Reconciler {
return &{{ lower_camel .Name }}Reconciler{
base: reconcile.NewReconciler(client.BaseClient()),
}
}
type {{ lower_camel .Name }}Reconciler struct {
base reconcile.Reconciler
}
func (r *{{ lower_camel .Name }}Reconciler) Reconcile(namespace string, desiredResources {{ .Name }}List, transition Transition{{ .Name }}Func, opts clients.ListOpts) error {
opts = opts.WithDefaults()
opts.Ctx = contextutils.WithLogger(opts.Ctx, "{{ lower_camel .Name }}_reconciler")
var transitionResources reconcile.TransitionResourcesFunc
if transition != nil {
transitionResources = func(original, desired resources.Resource) (bool, error) {
return transition(original.(*{{ .Name }}), desired.(*{{ .Name }}))
}
}
return r.base.Reconcile(namespace, {{ lower_camel .Name }}sToResources(desiredResources), transitionResources, opts)
}
`))
View Source
var ResourceTemplate = template.Must(template.New("resource").Funcs(Funcs).Parse(`package {{ .Project.ProjectConfig.Version }}
import (
"encoding/binary"
"hash"
"hash/fnv"
"log"
"sort"
{{- if $.IsCustom }}
{{ $.CustomImportPrefix }} "{{ $.CustomResource.Package }}"
{{- end }}
"github.com/solo-io/solo-kit/pkg/api/v1/resources"
"github.com/solo-io/solo-kit/pkg/api/v1/resources/core"
"github.com/solo-io/solo-kit/pkg/errors"
"github.com/solo-io/go-utils/hashutils"
{{- if not $.IsCustom }}
"github.com/solo-io/solo-kit/pkg/api/v1/clients/kube/crd"
"k8s.io/apimachinery/pkg/runtime"
{{- end }}
"k8s.io/apimachinery/pkg/runtime/schema"
)
func New{{ .Name }}(namespace, name string) *{{ .Name }} {
{{ lowercase .Name }} := &{{ .Name }}{}
{{- if $.IsCustom }}
{{ lowercase .Name }}.{{ $.Name }}.SetMetadata(&core.Metadata{
{{- else }}
{{ lowercase .Name }}.SetMetadata(&core.Metadata{
{{- end }}
Name: name,
Namespace: namespace,
})
return {{ lowercase .Name }}
}
{{- if $.IsCustom }}
// require custom resource to implement Clone() as well as resources.Resource interface
type Cloneable{{ $.Name }} interface {
resources.Resource
Clone() *{{ $.CustomImportPrefix}}.{{ $.Name }}
}
var _ Cloneable{{ $.Name }} = &{{ $.CustomImportPrefix}}.{{ $.Name }}{}
type {{ $.Name }} struct {
{{ $.CustomImportPrefix}}.{{ $.Name }}
}
func (r *{{ .Name }}) Clone() resources.Resource {
return &{{ .Name }}{ {{ .Name }}: *r.{{ .Name }}.Clone() }
}
func (r *{{ .Name }}) Hash(hasher hash.Hash64) (uint64, error) {
if hasher == nil {
hasher = fnv.New64()
}
clone := r.{{ .Name }}.Clone()
resources.UpdateMetadata(clone, func(meta *core.Metadata) {
meta.ResourceVersion = ""
{{- if $.SkipHashingAnnotations }}
meta.Annotations = nil
{{- end }}
})
err := binary.Write(hasher, binary.LittleEndian, hashutils.HashAll(clone))
if err != nil {
return 0, err
}
return hasher.Sum64(), nil
}
{{- else }}
func (r *{{ .Name }}) SetMetadata(meta *core.Metadata) {
r.Metadata = meta
}
{{- if $.HasStatus }}
func (r *{{ .Name }}) SetStatus(status *core.Status) {
r.Status = status
}
{{- end }}
{{- end }}
func (r *{{ .Name }}) MustHash() uint64 {
hashVal, err := r.Hash(nil)
if err != nil {
log.Panicf("error while hashing: (%s) this should never happen", err)
}
return hashVal
}
func (r *{{ .Name }}) GroupVersionKind() schema.GroupVersionKind {
return {{ .Name }}GVK
}
type {{ .Name }}List []*{{ .Name }}
func (list {{ .Name }}List) Find(namespace, name string) (*{{ .Name }}, error) {
for _, {{ lower_camel .Name }} := range list {
if {{ lower_camel .Name }}.GetMetadata().Name == name && {{ lower_camel .Name }}.GetMetadata().Namespace == namespace {
return {{ lower_camel .Name }}, nil
}
}
return nil, errors.Errorf("list did not find {{ lower_camel .Name }} %v.%v", namespace, name)
}
func (list {{ .Name }}List) AsResources() resources.ResourceList {
var ress resources.ResourceList
for _, {{ lower_camel .Name }} := range list {
ress = append(ress, {{ lower_camel .Name }})
}
return ress
}
{{ if $.HasStatus -}}
func (list {{ .Name }}List) AsInputResources() resources.InputResourceList {
var ress resources.InputResourceList
for _, {{ lower_camel .Name }} := range list {
ress = append(ress, {{ lower_camel .Name }})
}
return ress
}
{{- end}}
func (list {{ .Name }}List) Names() []string {
var names []string
for _, {{ lower_camel .Name }} := range list {
names = append(names, {{ lower_camel .Name }}.GetMetadata().Name)
}
return names
}
func (list {{ .Name }}List) NamespacesDotNames() []string {
var names []string
for _, {{ lower_camel .Name }} := range list {
names = append(names, {{ lower_camel .Name }}.GetMetadata().Namespace + "." + {{ lower_camel .Name }}.GetMetadata().Name)
}
return names
}
func (list {{ .Name }}List) Sort() {{ .Name }}List {
sort.SliceStable(list, func(i, j int) bool {
return list[i].GetMetadata().Less(list[j].GetMetadata())
})
return list
}
func (list {{ .Name }}List) Clone() {{ .Name }}List {
var {{ lower_camel .Name }}List {{ .Name }}List
for _, {{ lower_camel .Name }} := range list {
{{ lower_camel .Name }}List = append({{ lower_camel .Name }}List, resources.Clone({{ lower_camel .Name }}).(*{{ .Name }}))
}
return {{ lower_camel .Name }}List
}
func (list {{ .Name }}List) Each(f func(element *{{ .Name }})) {
for _, {{ lower_camel .Name }} := range list {
f({{ lower_camel .Name }})
}
}
func (list {{ .Name }}List) EachResource(f func(element resources.Resource)) {
for _, {{ lower_camel .Name }} := range list {
f({{ lower_camel .Name }})
}
}
func (list {{ .Name }}List) AsInterfaces() []interface{}{
var asInterfaces []interface{}
list.Each(func(element *{{ .Name }}) {
asInterfaces = append(asInterfaces, element)
})
return asInterfaces
}
{{- $crdGroupName := .Project.ProtoPackage }}
{{- if ne .Project.ProjectConfig.CrdGroupOverride "" }}
{{- $crdGroupName = .Project.ProjectConfig.CrdGroupOverride }}
{{- end}}
{{- if not $.IsCustom }}
// Kubernetes Adapter for {{ .Name }}
func (o *{{ .Name }}) GetObjectKind() schema.ObjectKind {
t := {{ .Name }}Crd.TypeMeta()
return &t
}
func (o *{{ .Name }}) DeepCopyObject() runtime.Object {
return resources.Clone(o).(*{{ .Name }})
}
func (o *{{ .Name }}) DeepCopyInto(out *{{ .Name }}) {
clone := resources.Clone(o).(*{{ .Name }})
*out = *clone
}
var (
{{ .Name }}Crd = crd.NewCrd(
"{{ lowercase (upper_camel .PluralName) }}",
{{ .Name }}GVK.Group,
{{ .Name }}GVK.Version,
{{ .Name }}GVK.Kind,
"{{ .ShortName }}",
{{ .ClusterScoped }},
&{{ .Name }}{})
)
func init() {
if err := crd.AddCrd({{ .Name }}Crd); err != nil {
log.Fatalf("could not add crd to global registry")
}
}
{{- end}}
var (
{{ .Name }}GVK = schema.GroupVersionKind{
Version: "{{ .Project.ProjectConfig.Version }}",
Group: "{{ $crdGroupName }}",
Kind: "{{ .Name }}",
}
)
`))
View Source
var SimpleEmitterTemplate = template.Must(template.New("resource_group_emitter").Funcs(Funcs).Parse(
`package {{ .Project.ProjectConfig.Version }}
import (
"context"
"sync"
"time"
{{ .Imports }}
"go.opencensus.io/stats"
"go.uber.org/zap"
"github.com/solo-io/solo-kit/pkg/api/v1/clients"
"github.com/solo-io/solo-kit/pkg/api/v1/resources"
"github.com/solo-io/solo-kit/pkg/errors"
"github.com/solo-io/go-utils/errutils"
"github.com/solo-io/go-utils/contextutils"
)
type {{ .GoName }}SimpleEmitter interface {
Snapshots(ctx context.Context) (<-chan *{{ .GoName }}Snapshot, <-chan error, error)
}
func New{{ .GoName }}SimpleEmitter(aggregatedWatch clients.ResourceWatch) {{ .GoName }}SimpleEmitter {
return New{{ .GoName }}SimpleEmitterWithEmit(aggregatedWatch, make(chan struct{}))
}
func New{{ .GoName }}SimpleEmitterWithEmit(aggregatedWatch clients.ResourceWatch, emit <-chan struct{}) {{ .GoName }}SimpleEmitter {
return &{{ lower_camel .GoName }}SimpleEmitter{
aggregatedWatch: aggregatedWatch,
forceEmit: emit,
}
}
type {{ lower_camel .GoName }}SimpleEmitter struct {
forceEmit <- chan struct{}
aggregatedWatch clients.ResourceWatch
}
func (c *{{ lower_camel .GoName }}SimpleEmitter) Snapshots(ctx context.Context) (<-chan *{{ .GoName }}Snapshot, <-chan error, error) {
snapshots := make(chan *{{ .GoName }}Snapshot)
errs := make(chan error)
untyped, watchErrs, err := c.aggregatedWatch(ctx)
if err != nil {
return nil, nil, err
}
go errutils.AggregateErrs(ctx, errs, watchErrs, "{{ lower_camel .GoName }}-emitter")
go func() {
currentSnapshot := {{ .GoName }}Snapshot{}
timer := time.NewTicker(time.Second * 1)
var previousHash uint64
sync := func() {
currentHash, err := currentSnapshot.Hash(nil)
if err != nil {
contextutils.LoggerFrom(ctx).Panicw("error while hashing, this should never happen", zap.Error(err))
}
if previousHash == currentHash {
return
}
previousHash = currentHash
stats.Record(ctx, m{{ .GoName }}SnapshotOut.M(1))
sentSnapshot := currentSnapshot.Clone()
snapshots <- &sentSnapshot
}
defer func() {
close(snapshots)
close(errs)
}()
for {
record := func() { stats.Record(ctx, m{{ .GoName }}SnapshotIn.M(1)) }
select {
case <-timer.C:
sync()
case <-ctx.Done():
return
case <-c.forceEmit:
sentSnapshot := currentSnapshot.Clone()
snapshots <- &sentSnapshot
case untypedList := <-untyped:
record()
currentSnapshot = {{ .GoName }}Snapshot{}
for _, res := range untypedList {
switch typed := res.(type) {
{{- range .Resources}}
case *{{ .ImportPrefix }}{{ .Name }}:
currentSnapshot.{{ upper_camel .PluralName }} = append(currentSnapshot.{{ upper_camel .PluralName }}, typed)
{{- end}}
default:
select {
case errs <- fmt.Errorf("{{ .GoName }}SnapshotEmitter "+
"cannot process resource %v of type %T", res.GetMetadata().Ref(), res):
case <-ctx.Done():
return
}
}
}
}
}
}()
return snapshots, errs, nil
}
`))
View Source
var SimpleEventLoopTemplate = template.Must(template.New("simple_event_loop").Funcs(Funcs).Parse(`package {{ .Project.ProjectConfig.Version }}
import (
"context"
"fmt"
"go.opencensus.io/trace"
"github.com/hashicorp/go-multierror"
"github.com/solo-io/solo-kit/pkg/api/v1/eventloop"
"github.com/solo-io/solo-kit/pkg/errors"
"github.com/solo-io/go-utils/contextutils"
"github.com/solo-io/go-utils/errutils"
)
// SyncDeciders Syncer which implements this interface
// can make smarter decisions over whether
// it should be restarted (including having its context cancelled)
// based on a diff of the previous and current snapshot
// Deprecated: use {{ .GoName }}SyncDeciderWithContext
type {{ .GoName }}SyncDecider interface {
{{ .GoName }}Syncer
ShouldSync(old, new *{{ .GoName }}Snapshot) bool
}
type {{ .GoName }}SyncDeciderWithContext interface {
{{ .GoName }}Syncer
ShouldSync(ctx context.Context, old, new *{{ .GoName }}Snapshot) bool
}
type {{ lower_camel .GoName }}SimpleEventLoop struct {
emitter {{ .GoName }}SimpleEmitter
syncers []{{ .GoName }}Syncer
}
func New{{ .GoName }}SimpleEventLoop(emitter {{ .GoName }}SimpleEmitter, syncers ... {{ .GoName }}Syncer) eventloop.SimpleEventLoop {
return &{{ lower_camel .GoName }}SimpleEventLoop{
emitter: emitter,
syncers: syncers,
}
}
func (el *{{ lower_camel .GoName }}SimpleEventLoop) Run(ctx context.Context) (<-chan error, error) {
ctx = contextutils.WithLogger(ctx, "{{ .Project.ProjectConfig.Version }}.event_loop")
logger := contextutils.LoggerFrom(ctx)
logger.Infof("event loop started")
errs := make(chan error)
watch, emitterErrs, err := el.emitter.Snapshots(ctx)
if err != nil {
return nil, errors.Wrapf(err, "starting snapshot watch")
}
go errutils.AggregateErrs(ctx, errs, emitterErrs, "{{ .Project.ProjectConfig.Version }}.emitter errors")
go func() {
// create a new context for each syncer for each loop, cancel each before each loop
syncerCancels := make(map[{{ .GoName }}Syncer]context.CancelFunc)
// use closure to allow cancel function to be updated as context changes
defer func() {
for _, cancel := range syncerCancels {
cancel()
}
}()
// cache the previous snapshot for comparison
var previousSnapshot *{{ .GoName }}Snapshot
for {
select {
case snapshot, ok := <-watch:
if !ok {
return
}
// cancel any open watches from previous loop
for _, syncer := range el.syncers {
// allow the syncer to decide if we should sync it + cancel its previous context
if syncDecider, isDecider := syncer.({{ .GoName }}SyncDecider); isDecider {
if shouldSync := syncDecider.ShouldSync(previousSnapshot, snapshot); !shouldSync {
continue // skip syncing this syncer
}
} else if syncDeciderWithContext, isDecider := syncer.({{ .GoName }}SyncDeciderWithContext); isDecider {
if shouldSync := syncDeciderWithContext.ShouldSync(ctx, previousSnapshot, snapshot); !shouldSync {
continue // skip syncing this syncer
}
}
// if this syncer had a previous context, cancel it
cancel, ok := syncerCancels[syncer]
if ok {
cancel()
}
ctx, span := trace.StartSpan(ctx, fmt.Sprintf("{{ .Name }}.SimpleEventLoopSync-%T", syncer))
ctx, canc := context.WithCancel(ctx)
err := syncer.Sync(ctx, snapshot)
span.End()
if err != nil {
select {
case errs <- err:
default:
logger.Errorf("write error channel is full! could not propagate err: %v", err)
}
}
syncerCancels[syncer] = canc
}
previousSnapshot = snapshot
case <-ctx.Done():
return
}
}
}()
return errs, nil
}
`))
View Source
var XdsTemplate = template.Must(template.New("xds_template").Funcs(Funcs).Parse(`package {{ .Project.ProjectConfig.Version }}
import (
"context"
"errors"
"fmt"
core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
discovery "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/solo-io/solo-kit/pkg/api/v1/control-plane/cache"
"github.com/solo-io/solo-kit/pkg/api/v1/control-plane/client"
"github.com/solo-io/solo-kit/pkg/api/v1/control-plane/resource"
"github.com/solo-io/solo-kit/pkg/api/v1/control-plane/server"
)
// Type Definitions:
const {{ upper_camel .MessageType }}Type = resource.TypePrefix + "/{{ .ProtoPackage }}.{{ upper_camel .MessageType }}"
/* Defined a resource - to be used by snapshot */
type {{ upper_camel .MessageType }}XdsResourceWrapper struct {
// TODO(yuval-k): This is public for mitchellh hashstructure to work properly. consider better alternatives.
Resource *{{ upper_camel .MessageType }}
}
// Make sure the Resource interface is implemented
var _ cache.Resource = &{{ upper_camel .MessageType }}XdsResourceWrapper{}
func New{{ upper_camel .MessageType }}XdsResourceWrapper(resourceProto *{{ upper_camel .MessageType }}) *{{ upper_camel .MessageType }}XdsResourceWrapper {
return &{{ upper_camel .MessageType }}XdsResourceWrapper{
Resource: resourceProto,
}
}
func (e *{{ upper_camel .MessageType }}XdsResourceWrapper) Self() cache.XdsResourceReference {
return cache.XdsResourceReference{Name: e.Resource.{{ upper_camel .NameField }}, Type: {{ upper_camel .MessageType }}Type}
}
func (e *{{ upper_camel .MessageType }}XdsResourceWrapper) ResourceProto() cache.ResourceProto {
return e.Resource
}
{{- if .NoReferences }}
func (e *{{ upper_camel .MessageType }}XdsResourceWrapper) References() []cache.XdsResourceReference {
return nil
}
{{- else }}
// This method is not implemented as it requires domain knowledge and cannot be auto generated.
// Please copy it, and implement it in a different file (so it doesn't get overwritten).
// Alternativly, specify the annotation @solo-kit:resource.no_references in the comments for the
// {{ upper_camel .MessageType }} to indicate that there are no references.
// func (e *{{ upper_camel .MessageType }}XdsResourceWrapper) References() []cache.XdsResourceReference {
// panic("not implemented")
// }
{{- end }}
// Define a type record. This is used by the generic client library.
var {{ upper_camel .MessageType }}TypeRecord = client.NewTypeRecord(
{{ upper_camel .MessageType }}Type,
// Return an empty message, that can be used to deserialize bytes into it.
func() cache.ResourceProto { return &{{ upper_camel .MessageType }}{} },
// Covert the message to a resource suitable for use for protobuf's Any.
func(r cache.ResourceProto) cache.Resource {
return &{{ upper_camel .MessageType }}XdsResourceWrapper{Resource: r.(*{{ upper_camel .MessageType }})}
},
)
// Server Implementation:
// Wrap the generic server and implement the type sepcific methods:
type {{ lower_camel .Name }}Server struct {
server.Server
}
func New{{ upper_camel .Name }}Server(genericServer server.Server) {{ upper_camel .Name }}Server {
return &{{ lower_camel .Name }}Server{Server: genericServer}
}
func (s *{{ lower_camel .Name }}Server) Stream{{ upper_camel .MessageType }}(stream {{ upper_camel .Name }}_Stream{{ upper_camel .MessageType }}Server) error {
return s.Server.StreamV2(stream, {{ upper_camel .MessageType }}Type)
}
func (s *{{ lower_camel .Name }}Server) Fetch{{ upper_camel .MessageType }}(ctx context.Context, req *discovery.DiscoveryRequest) (*discovery.DiscoveryResponse, error) {
if req == nil {
return nil, status.Errorf(codes.Unavailable, "empty request")
}
req.TypeUrl = {{ upper_camel .MessageType }}Type
return s.Server.FetchV2(ctx, req)
}
func (s *{{ lower_camel .Name }}Server) Delta{{ upper_camel .MessageType }}(_ {{ upper_camel .Name }}_Delta{{ upper_camel .MessageType }}Server) error {
return errors.New("not implemented")
}
// Client Implementation: Generate a strongly typed client over the generic client
// The apply functions receives resources and returns an error if they were applied correctly.
// In theory the configuration can become valid in the future (i.e. eventually consistent), but I don't think we need to worry about that now
// As our current use cases only have one configuration resource, so no interactions are expected.
type Apply{{ upper_camel .MessageType }} func(version string, resources []*{{ upper_camel .MessageType }}) error
// Convert the strongly typed apply to a generic apply.
func apply{{ upper_camel .MessageType }}(typedApply Apply{{ upper_camel .MessageType }}) func(cache.Resources) error {
return func(resources cache.Resources) error {
var configs []*{{ upper_camel .MessageType }}
for _, r := range resources.Items {
if proto, ok := r.ResourceProto().(*{{ upper_camel .MessageType }}); !ok {
return fmt.Errorf("resource %s of type %s incorrect", r.Self().Name, r.Self().Type)
} else {
configs = append(configs, proto)
}
}
return typedApply(resources.Version, configs)
}
}
func New{{ upper_camel .MessageType }}Client(nodeinfo *core.Node, typedApply Apply{{ upper_camel .MessageType }}) client.Client {
return client.NewClient(nodeinfo, {{ upper_camel .MessageType }}TypeRecord, apply{{ upper_camel .MessageType }}(typedApply))
}
`))
Functions ¶
This section is empty.
Types ¶
This section is empty.
Source Files ¶
- event_loop_template.go
- event_loop_test_template.go
- funcs.go
- resource_client_template.go
- resource_client_test_template.go
- resource_reconciler_template.go
- resource_template.go
- simple_event_loop_template.go
- snapshot_emitter_template.go
- snapshot_emitter_test_template.go
- snapshot_simple_emitter_template.go
- snapshot_template.go
- test_suite_template.go
- xds_template.go
Click to show internal directories.
Click to hide internal directories.