mirror of
https://github.com/1Password/onepassword-operator.git
synced 2025-10-23 07:58:04 +00:00
Upgrade the operator to use Operator SDK v1.33.0 (#182)
* Move controller package inside internal directory Based on the go/v4 project structure, the following changed: - Pakcage `controllers` is now named `controller` - Package `controller` now lives inside new `internal` directory * Move main.go in cmd directory Based on the new go/v4 project structure, `main.go` now lives in the `cmd` directory. * Change package import in main.go * Update go mod dependencies Update the dependencies based on the versions obtained by creating a new operator project using `kubebuilder init --domain onepassword.com --plugins=go/v4`. This is based on the migration steps provided to go from go/v3 to go/v4 (https://book.kubebuilder.io/migration/migration_guide_gov3_to_gov4) * Update vendor * Adjust code for breaking changes from pkg update sigs.k8s.io/controller-runtime package had breaking changes from v0.14.5 to v0.16.3. This commit brings the changes needed to achieve the same things using the new functionality avaialble. * Adjust paths to connect yaml files Since `main.go` is now in `cmd` directory, the paths to the files for deploying Connect have to be adjusted based on the new location `main.go` is executed from. * Update files based on new structure and scaffolding These changes are made based on the new project structure and scaffolding obtained when using the new go/v4 project structure. These were done based on the migration steps mentioned when migrating to go/v4 (https://book.kubebuilder.io/migration/migration_guide_gov3_to_gov4). * Update config files These updates are made based on the Kustomize v4 syntax. This is part of the upgrate to go/v4 (https://book.kubebuilder.io/migration/migration_guide_gov3_to_gov4) * Update dependencies and GO version * Update vendor * Update Kubernetes tools versions * Update operator version in Makefile Now the version in the Makefile matches the version of the operator * Update Operator SDK version in version.go * Adjust generated deepcopy It seems that the +build tag is no longer needed based on the latest generated scaffolding, therefore it's removed. * Update copyright year * Bring back missing changes from migration Some customization in Makefile was lost during the migration process. Specifically, the namespace customization for `make deploy` command. Also, we push changes to kustomization.yaml for making the deploy process smoother. * Add RBAC perms for coordination.k8s.io It seems that with the latest changes to Kubernetes and Kustomize, we need to add additional RBAC to the service account used so that it can properly access the `leases` resource. * Optimize Dockerfile Dockerfile had a step for caching dependencies (go mod download). However, this is already done by the vendor directory, which we include. Therefore, this step can be removed to make the image build time faster.
This commit is contained in:
51
vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go
generated
vendored
51
vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go
generated
vendored
@@ -28,17 +28,15 @@ import (
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
|
||||
logf "sigs.k8s.io/controller-runtime/pkg/log"
|
||||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
|
||||
"sigs.k8s.io/controller-runtime/pkg/source"
|
||||
)
|
||||
|
||||
var _ inject.Injector = &Controller{}
|
||||
|
||||
// Controller implements controller.Controller.
|
||||
type Controller struct {
|
||||
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
|
||||
@@ -61,10 +59,6 @@ type Controller struct {
|
||||
// the Queue for processing
|
||||
Queue workqueue.RateLimitingInterface
|
||||
|
||||
// SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates
|
||||
// Deprecated: the caller should handle injected fields itself.
|
||||
SetFields func(i interface{}) error
|
||||
|
||||
// mu is used to synchronize Controller setup
|
||||
mu sync.Mutex
|
||||
|
||||
@@ -93,6 +87,9 @@ type Controller struct {
|
||||
|
||||
// RecoverPanic indicates whether the panic caused by reconcile should be recovered.
|
||||
RecoverPanic *bool
|
||||
|
||||
// LeaderElected indicates whether the controller is leader elected or always running.
|
||||
LeaderElected *bool
|
||||
}
|
||||
|
||||
// watchDescription contains all the information necessary to start a watch.
|
||||
@@ -127,19 +124,6 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// Inject Cache into arguments
|
||||
if err := c.SetFields(src); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.SetFields(evthdler); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, pr := range prct {
|
||||
if err := c.SetFields(pr); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Controller hasn't started yet, store the watches locally and return.
|
||||
//
|
||||
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
|
||||
@@ -152,6 +136,14 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
|
||||
return src.Start(c.ctx, evthdler, c.Queue, prct...)
|
||||
}
|
||||
|
||||
// NeedLeaderElection implements the manager.LeaderElectionRunnable interface.
|
||||
func (c *Controller) NeedLeaderElection() bool {
|
||||
if c.LeaderElected == nil {
|
||||
return true
|
||||
}
|
||||
return *c.LeaderElected
|
||||
}
|
||||
|
||||
// Start implements controller.Controller.
|
||||
func (c *Controller) Start(ctx context.Context) error {
|
||||
// use an IIFE to get proper lock handling
|
||||
@@ -320,14 +312,23 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
|
||||
|
||||
// RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the
|
||||
// resource to be synced.
|
||||
log.V(5).Info("Reconciling")
|
||||
result, err := c.Reconcile(ctx, req)
|
||||
switch {
|
||||
case err != nil:
|
||||
c.Queue.AddRateLimited(req)
|
||||
if errors.Is(err, reconcile.TerminalError(nil)) {
|
||||
ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()
|
||||
} else {
|
||||
c.Queue.AddRateLimited(req)
|
||||
}
|
||||
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
|
||||
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
|
||||
if !result.IsZero() {
|
||||
log.Info("Warning: Reconciler returned both a non-zero result and a non-nil error. The result will always be ignored if the error is non-nil and the non-nil error causes reqeueuing with exponential backoff. For more details, see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile#Reconciler")
|
||||
}
|
||||
log.Error(err, "Reconciler error")
|
||||
case result.RequeueAfter > 0:
|
||||
log.V(5).Info(fmt.Sprintf("Reconcile done, requeueing after %s", result.RequeueAfter))
|
||||
// The result.RequeueAfter request will be lost, if it is returned
|
||||
// along with a non-nil error. But this is intended as
|
||||
// We need to drive to stable reconcile loops before queuing due
|
||||
@@ -336,9 +337,11 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
|
||||
c.Queue.AddAfter(req, result.RequeueAfter)
|
||||
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
|
||||
case result.Requeue:
|
||||
log.V(5).Info("Reconcile done, requeueing")
|
||||
c.Queue.AddRateLimited(req)
|
||||
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
|
||||
default:
|
||||
log.V(5).Info("Reconcile successful")
|
||||
// Finally, if no error occurs we Forget this item so it does not
|
||||
// get queued again until another change happens.
|
||||
c.Queue.Forget(obj)
|
||||
@@ -351,12 +354,6 @@ func (c *Controller) GetLogger() logr.Logger {
|
||||
return c.LogConstructor(nil)
|
||||
}
|
||||
|
||||
// InjectFunc implement SetFields.Injector.
|
||||
func (c *Controller) InjectFunc(f inject.Func) error {
|
||||
c.SetFields = f
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateMetrics updates prometheus metrics within the controller.
|
||||
func (c *Controller) updateMetrics(reconcileTime time.Duration) {
|
||||
ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds())
|
||||
|
8
vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics/metrics.go
generated
vendored
8
vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics/metrics.go
generated
vendored
@@ -39,6 +39,13 @@ var (
|
||||
Help: "Total number of reconciliation errors per controller",
|
||||
}, []string{"controller"})
|
||||
|
||||
// TerminalReconcileErrors is a prometheus counter metrics which holds the total
|
||||
// number of terminal errors from the Reconciler.
|
||||
TerminalReconcileErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "controller_runtime_terminal_reconcile_errors_total",
|
||||
Help: "Total number of terminal reconciliation errors per controller",
|
||||
}, []string{"controller"})
|
||||
|
||||
// ReconcileTime is a prometheus metric which keeps track of the duration
|
||||
// of reconciliations.
|
||||
ReconcileTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
@@ -67,6 +74,7 @@ func init() {
|
||||
metrics.Registry.MustRegister(
|
||||
ReconcileTotal,
|
||||
ReconcileErrors,
|
||||
TerminalReconcileErrors,
|
||||
ReconcileTime,
|
||||
WorkerCount,
|
||||
ActiveWorkers,
|
||||
|
36
vendor/sigs.k8s.io/controller-runtime/pkg/internal/objectutil/objectutil.go
generated
vendored
36
vendor/sigs.k8s.io/controller-runtime/pkg/internal/objectutil/objectutil.go
generated
vendored
@@ -17,14 +17,9 @@ limitations under the License.
|
||||
package objectutil
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
|
||||
)
|
||||
|
||||
// FilterWithLabels returns a copy of the items in objs matching labelSel.
|
||||
@@ -45,34 +40,3 @@ func FilterWithLabels(objs []runtime.Object, labelSel labels.Selector) ([]runtim
|
||||
}
|
||||
return outItems, nil
|
||||
}
|
||||
|
||||
// IsAPINamespaced returns true if the object is namespace scoped.
|
||||
// For unstructured objects the gvk is found from the object itself.
|
||||
func IsAPINamespaced(obj runtime.Object, scheme *runtime.Scheme, restmapper apimeta.RESTMapper) (bool, error) {
|
||||
gvk, err := apiutil.GVKForObject(obj, scheme)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return IsAPINamespacedWithGVK(gvk, scheme, restmapper)
|
||||
}
|
||||
|
||||
// IsAPINamespacedWithGVK returns true if the object having the provided
|
||||
// GVK is namespace scoped.
|
||||
func IsAPINamespacedWithGVK(gk schema.GroupVersionKind, scheme *runtime.Scheme, restmapper apimeta.RESTMapper) (bool, error) {
|
||||
restmapping, err := restmapper.RESTMapping(schema.GroupKind{Group: gk.Group, Kind: gk.Kind})
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to get restmapping: %w", err)
|
||||
}
|
||||
|
||||
scope := restmapping.Scope.Name()
|
||||
|
||||
if scope == "" {
|
||||
return false, errors.New("scope cannot be identified, empty scope returned")
|
||||
}
|
||||
|
||||
if scope != apimeta.RESTScopeNameRoot {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
9
vendor/sigs.k8s.io/controller-runtime/pkg/internal/recorder/recorder.go
generated
vendored
9
vendor/sigs.k8s.io/controller-runtime/pkg/internal/recorder/recorder.go
generated
vendored
@@ -19,6 +19,7 @@ package recorder
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
@@ -110,8 +111,12 @@ func (p *Provider) getBroadcaster() record.EventBroadcaster {
|
||||
}
|
||||
|
||||
// NewProvider create a new Provider instance.
|
||||
func NewProvider(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster EventBroadcasterProducer) (*Provider, error) {
|
||||
corev1Client, err := corev1client.NewForConfig(config)
|
||||
func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster EventBroadcasterProducer) (*Provider, error) {
|
||||
if httpClient == nil {
|
||||
panic("httpClient must not be nil")
|
||||
}
|
||||
|
||||
corev1Client, err := corev1client.NewForConfigAndClient(config, httpClient)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to init client: %w", err)
|
||||
}
|
||||
|
170
vendor/sigs.k8s.io/controller-runtime/pkg/internal/source/event_handler.go
generated
vendored
Normal file
170
vendor/sigs.k8s.io/controller-runtime/pkg/internal/source/event_handler.go
generated
vendored
Normal file
@@ -0,0 +1,170 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
|
||||
|
||||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
)
|
||||
|
||||
var log = logf.RuntimeLog.WithName("source").WithName("EventHandler")
|
||||
|
||||
// NewEventHandler creates a new EventHandler.
|
||||
func NewEventHandler(ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.EventHandler, predicates []predicate.Predicate) *EventHandler {
|
||||
return &EventHandler{
|
||||
ctx: ctx,
|
||||
handler: handler,
|
||||
queue: queue,
|
||||
predicates: predicates,
|
||||
}
|
||||
}
|
||||
|
||||
// EventHandler adapts a handler.EventHandler interface to a cache.ResourceEventHandler interface.
|
||||
type EventHandler struct {
|
||||
// ctx stores the context that created the event handler
|
||||
// that is used to propagate cancellation signals to each handler function.
|
||||
ctx context.Context
|
||||
|
||||
handler handler.EventHandler
|
||||
queue workqueue.RateLimitingInterface
|
||||
predicates []predicate.Predicate
|
||||
}
|
||||
|
||||
// HandlerFuncs converts EventHandler to a ResourceEventHandlerFuncs
|
||||
// TODO: switch to ResourceEventHandlerDetailedFuncs with client-go 1.27
|
||||
func (e *EventHandler) HandlerFuncs() cache.ResourceEventHandlerFuncs {
|
||||
return cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: e.OnAdd,
|
||||
UpdateFunc: e.OnUpdate,
|
||||
DeleteFunc: e.OnDelete,
|
||||
}
|
||||
}
|
||||
|
||||
// OnAdd creates CreateEvent and calls Create on EventHandler.
|
||||
func (e *EventHandler) OnAdd(obj interface{}) {
|
||||
c := event.CreateEvent{}
|
||||
|
||||
// Pull Object out of the object
|
||||
if o, ok := obj.(client.Object); ok {
|
||||
c.Object = o
|
||||
} else {
|
||||
log.Error(nil, "OnAdd missing Object",
|
||||
"object", obj, "type", fmt.Sprintf("%T", obj))
|
||||
return
|
||||
}
|
||||
|
||||
for _, p := range e.predicates {
|
||||
if !p.Create(c) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Invoke create handler
|
||||
ctx, cancel := context.WithCancel(e.ctx)
|
||||
defer cancel()
|
||||
e.handler.Create(ctx, c, e.queue)
|
||||
}
|
||||
|
||||
// OnUpdate creates UpdateEvent and calls Update on EventHandler.
|
||||
func (e *EventHandler) OnUpdate(oldObj, newObj interface{}) {
|
||||
u := event.UpdateEvent{}
|
||||
|
||||
if o, ok := oldObj.(client.Object); ok {
|
||||
u.ObjectOld = o
|
||||
} else {
|
||||
log.Error(nil, "OnUpdate missing ObjectOld",
|
||||
"object", oldObj, "type", fmt.Sprintf("%T", oldObj))
|
||||
return
|
||||
}
|
||||
|
||||
// Pull Object out of the object
|
||||
if o, ok := newObj.(client.Object); ok {
|
||||
u.ObjectNew = o
|
||||
} else {
|
||||
log.Error(nil, "OnUpdate missing ObjectNew",
|
||||
"object", newObj, "type", fmt.Sprintf("%T", newObj))
|
||||
return
|
||||
}
|
||||
|
||||
for _, p := range e.predicates {
|
||||
if !p.Update(u) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Invoke update handler
|
||||
ctx, cancel := context.WithCancel(e.ctx)
|
||||
defer cancel()
|
||||
e.handler.Update(ctx, u, e.queue)
|
||||
}
|
||||
|
||||
// OnDelete creates DeleteEvent and calls Delete on EventHandler.
|
||||
func (e *EventHandler) OnDelete(obj interface{}) {
|
||||
d := event.DeleteEvent{}
|
||||
|
||||
// Deal with tombstone events by pulling the object out. Tombstone events wrap the object in a
|
||||
// DeleteFinalStateUnknown struct, so the object needs to be pulled out.
|
||||
// Copied from sample-controller
|
||||
// This should never happen if we aren't missing events, which we have concluded that we are not
|
||||
// and made decisions off of this belief. Maybe this shouldn't be here?
|
||||
var ok bool
|
||||
if _, ok = obj.(client.Object); !ok {
|
||||
// If the object doesn't have Metadata, assume it is a tombstone object of type DeletedFinalStateUnknown
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
log.Error(nil, "Error decoding objects. Expected cache.DeletedFinalStateUnknown",
|
||||
"type", fmt.Sprintf("%T", obj),
|
||||
"object", obj)
|
||||
return
|
||||
}
|
||||
|
||||
// Set DeleteStateUnknown to true
|
||||
d.DeleteStateUnknown = true
|
||||
|
||||
// Set obj to the tombstone obj
|
||||
obj = tombstone.Obj
|
||||
}
|
||||
|
||||
// Pull Object out of the object
|
||||
if o, ok := obj.(client.Object); ok {
|
||||
d.Object = o
|
||||
} else {
|
||||
log.Error(nil, "OnDelete missing Object",
|
||||
"object", obj, "type", fmt.Sprintf("%T", obj))
|
||||
return
|
||||
}
|
||||
|
||||
for _, p := range e.predicates {
|
||||
if !p.Delete(d) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Invoke delete handler
|
||||
ctx, cancel := context.WithCancel(e.ctx)
|
||||
defer cancel()
|
||||
e.handler.Delete(ctx, d, e.queue)
|
||||
}
|
117
vendor/sigs.k8s.io/controller-runtime/pkg/internal/source/kind.go
generated
vendored
Normal file
117
vendor/sigs.k8s.io/controller-runtime/pkg/internal/source/kind.go
generated
vendored
Normal file
@@ -0,0 +1,117 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
)
|
||||
|
||||
// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
|
||||
type Kind struct {
|
||||
// Type is the type of object to watch. e.g. &v1.Pod{}
|
||||
Type client.Object
|
||||
|
||||
// Cache used to watch APIs
|
||||
Cache cache.Cache
|
||||
|
||||
// started may contain an error if one was encountered during startup. If its closed and does not
|
||||
// contain an error, startup and syncing finished.
|
||||
started chan error
|
||||
startCancel func()
|
||||
}
|
||||
|
||||
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
|
||||
// to enqueue reconcile.Requests.
|
||||
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
|
||||
prct ...predicate.Predicate) error {
|
||||
if ks.Type == nil {
|
||||
return fmt.Errorf("must create Kind with a non-nil object")
|
||||
}
|
||||
if ks.Cache == nil {
|
||||
return fmt.Errorf("must create Kind with a non-nil cache")
|
||||
}
|
||||
|
||||
// cache.GetInformer will block until its context is cancelled if the cache was already started and it can not
|
||||
// sync that informer (most commonly due to RBAC issues).
|
||||
ctx, ks.startCancel = context.WithCancel(ctx)
|
||||
ks.started = make(chan error)
|
||||
go func() {
|
||||
var (
|
||||
i cache.Informer
|
||||
lastErr error
|
||||
)
|
||||
|
||||
// Tries to get an informer until it returns true,
|
||||
// an error or the specified context is cancelled or expired.
|
||||
if err := wait.PollUntilContextCancel(ctx, 10*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
|
||||
i, lastErr = ks.Cache.GetInformer(ctx, ks.Type)
|
||||
if lastErr != nil {
|
||||
kindMatchErr := &meta.NoKindMatchError{}
|
||||
switch {
|
||||
case errors.As(lastErr, &kindMatchErr):
|
||||
log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start",
|
||||
"kind", kindMatchErr.GroupKind)
|
||||
case runtime.IsNotRegisteredError(lastErr):
|
||||
log.Error(lastErr, "kind must be registered to the Scheme")
|
||||
default:
|
||||
log.Error(lastErr, "failed to get informer from cache")
|
||||
}
|
||||
return false, nil // Retry.
|
||||
}
|
||||
return true, nil
|
||||
}); err != nil {
|
||||
if lastErr != nil {
|
||||
ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr)
|
||||
return
|
||||
}
|
||||
ks.started <- err
|
||||
return
|
||||
}
|
||||
|
||||
_, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs())
|
||||
if err != nil {
|
||||
ks.started <- err
|
||||
return
|
||||
}
|
||||
if !ks.Cache.WaitForCacheSync(ctx) {
|
||||
// Would be great to return something more informative here
|
||||
ks.started <- errors.New("cache did not sync")
|
||||
}
|
||||
close(ks.started)
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ks *Kind) String() string {
|
||||
if ks.Type != nil {
|
||||
return fmt.Sprintf("kind source: %T", ks.Type)
|
||||
}
|
||||
return "kind source: unknown type"
|
||||
}
|
||||
|
||||
// WaitForSync implements SyncingSource to allow controllers to wait with starting
|
||||
// workers until the cache is synced.
|
||||
func (ks *Kind) WaitForSync(ctx context.Context) error {
|
||||
select {
|
||||
case err := <-ks.started:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
ks.startCancel()
|
||||
if errors.Is(ctx.Err(), context.Canceled) {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("timed out waiting for cache to be synced for Kind %T", ks.Type)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user