Upgrade Operator SDK to v1.34.1 and update dependencies (#185)

This does the following updates:

* Upgrade to Operator SDK v1.34.1. This fixes building multi-arch images from Makefile. Check this MR from operator-framework for details.
* Update Go dependencies. This addresses Dependabot alert ["Golang protojson.Unmarshal function infinite loop when unmarshaling certain forms of invalid JSON"](https://github.com/1Password/onepassword-operator/security/dependabot/13).
* Update versions of the GitHub Actions used in the pipelines.
* Update Kubernetes related tools (such as controller-tools version, and operator-sdk for ci pipelines)

By updating dependencies, the pipelines no longer fail due to a panic error when running `make test`.
This commit is contained in:
Eduard Filip
2024-03-25 15:41:18 +01:00
committed by GitHub
parent 5f232b121a
commit eda5612827
503 changed files with 30557 additions and 7140 deletions

View File

@@ -59,9 +59,9 @@ linters-settings:
- pkg: sigs.k8s.io/controller-runtime
alias: ctrl
staticcheck:
go: "1.20"
go: "1.21"
stylecheck:
go: "1.20"
go: "1.21"
revive:
rules:
# The following rules are recommended https://github.com/mgechev/revive#recommended-configuration

View File

@@ -4,9 +4,9 @@ The Kubernetes controller-runtime Project is released on an as-needed basis. The
**Note:** Releases are done from the `release-MAJOR.MINOR` branches. For PATCH releases is not required
to create a new branch you will just need to ensure that all big fixes are cherry-picked into the respective
`release-MAJOR.MINOR` branch. To know more about versioning check https://semver.org/.
`release-MAJOR.MINOR` branch. To know more about versioning check https://semver.org/.
## How to do a release
## How to do a release
### Create the new branch and the release tag
@@ -15,7 +15,7 @@ to create a new branch you will just need to ensure that all big fixes are cherr
### Now, let's generate the changelog
1. Create the changelog from the new branch `release-<MAJOR.MINOR>` (`git checkout release-<MAJOR.MINOR>`).
1. Create the changelog from the new branch `release-<MAJOR.MINOR>` (`git checkout release-<MAJOR.MINOR>`).
You will need to use the [kubebuilder-release-tools][kubebuilder-release-tools] to generate the notes. See [here][release-notes-generation]
> **Note**
@@ -24,12 +24,12 @@ You will need to use the [kubebuilder-release-tools][kubebuilder-release-tools]
### Draft a new release from GitHub
1. Create a new tag with the correct version from the new `release-<MAJOR.MINOR>` branch
1. Create a new tag with the correct version from the new `release-<MAJOR.MINOR>` branch
2. Add the changelog on it and publish. Now, the code source is released !
### Add a new Prow test the for the new branch release
1. Create a new prow test under [github.com/kubernetes/test-infra/tree/master/config/jobs/kubernetes-sigs/controller-runtime](https://github.com/kubernetes/test-infra/tree/master/config/jobs/kubernetes-sigs/controller-runtime)
1. Create a new prow test under [github.com/kubernetes/test-infra/tree/master/config/jobs/kubernetes-sigs/controller-runtime](https://github.com/kubernetes/test-infra/tree/master/config/jobs/kubernetes-sigs/controller-runtime)
for the new `release-<MAJOR.MINOR>` branch. (i.e. for the `0.11.0` release see the PR: https://github.com/kubernetes/test-infra/pull/25205)
2. Ping the infra PR in the controller-runtime slack channel for reviews.
@@ -45,3 +45,7 @@ For more info, see the release page: https://github.com/kubernetes-sigs/controll
````
2. An announcement email is sent to `kubebuilder@googlegroups.com` with the subject `[ANNOUNCE] Controller-Runtime $VERSION is released`
[kubebuilder-release-tools]: https://github.com/kubernetes-sigs/kubebuilder-release-tools
[release-notes-generation]: https://github.com/kubernetes-sigs/kubebuilder-release-tools/blob/master/README.md#release-notes-generation
[release-process]: https://github.com/kubernetes-sigs/kubebuilder/blob/master/VERSIONING.md#releasing

View File

@@ -33,7 +33,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
toolscache "k8s.io/client-go/tools/cache"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/cache/internal"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -83,6 +83,9 @@ type Informers interface {
// of the underlying object.
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error)
// RemoveInformer removes an informer entry and stops it if it was running.
RemoveInformer(ctx context.Context, obj client.Object) error
// Start runs all the informers known to this cache until the context is closed.
// It blocks.
Start(ctx context.Context) error
@@ -121,6 +124,8 @@ type Informer interface {
// HasSynced return true if the informers underlying store has synced.
HasSynced() bool
// IsStopped returns true if the informer has been stopped.
IsStopped() bool
}
// AllNamespaces should be used as the map key to deliminate namespace settings
@@ -199,6 +204,12 @@ type Options struct {
// unless there is already one set in ByObject or DefaultNamespaces.
DefaultTransform toolscache.TransformFunc
// DefaultWatchErrorHandler will be used to the WatchErrorHandler which is called
// whenever ListAndWatch drops the connection with an error.
//
// After calling this handler, the informer will backoff and retry.
DefaultWatchErrorHandler toolscache.WatchErrorHandler
// DefaultUnsafeDisableDeepCopy is the default for UnsafeDisableDeepCopy
// for everything that doesn't specify this.
//
@@ -369,7 +380,8 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc {
Field: config.FieldSelector,
},
Transform: config.Transform,
UnsafeDisableDeepCopy: pointer.BoolDeref(config.UnsafeDisableDeepCopy, false),
WatchErrorHandler: opts.DefaultWatchErrorHandler,
UnsafeDisableDeepCopy: ptr.Deref(config.UnsafeDisableDeepCopy, false),
NewInformer: opts.newInformer,
}),
readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer,
@@ -400,7 +412,7 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
// Construct a new Mapper if unset
if opts.Mapper == nil {
var err error
opts.Mapper, err = apiutil.NewDiscoveryRESTMapper(config, opts.HTTPClient)
opts.Mapper, err = apiutil.NewDynamicRESTMapper(config, opts.HTTPClient)
if err != nil {
return Options{}, fmt.Errorf("could not create RESTMapper from config: %w", err)
}

View File

@@ -52,6 +52,14 @@ func (dbt *delegatingByGVKCache) List(ctx context.Context, list client.ObjectLis
return cache.List(ctx, list, opts...)
}
func (dbt *delegatingByGVKCache) RemoveInformer(ctx context.Context, obj client.Object) error {
cache, err := dbt.cacheForObject(obj)
if err != nil {
return err
}
return cache.RemoveInformer(ctx, obj)
}
func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
cache, err := dbt.cacheForObject(obj)
if err != nil {

View File

@@ -190,6 +190,17 @@ func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.Grou
return ic.Informers.Get(ctx, gvk, obj, &internal.GetOptions{})
}
// RemoveInformer deactivates and removes the informer from the cache.
func (ic *informerCache) RemoveInformer(_ context.Context, obj client.Object) error {
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
if err != nil {
return err
}
ic.Informers.Remove(gvk, obj)
return nil
}
// NeedLeaderElection implements the LeaderElectionRunnable interface
// to indicate that this can be started without requiring the leader lock.
func (ic *informerCache) NeedLeaderElection() bool {

View File

@@ -23,6 +23,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -117,16 +118,14 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli
switch {
case listOpts.FieldSelector != nil:
// TODO(directxman12): support more complicated field selectors by
// combining multiple indices, GetIndexers, etc
field, val, requiresExact := selector.RequiresExactMatch(listOpts.FieldSelector)
requiresExact := selector.RequiresExactMatch(listOpts.FieldSelector)
if !requiresExact {
return fmt.Errorf("non-exact field matches are not supported by the cache")
}
// list all objects by the field selector. If this is namespaced and we have one, ask for the
// namespaced index key. Otherwise, ask for the non-namespaced variant by using the fake "all namespaces"
// namespace.
objs, err = c.indexer.ByIndex(FieldIndexName(field), KeyToNamespacedKey(listOpts.Namespace, val))
objs, err = byIndexes(c.indexer, listOpts.FieldSelector.Requirements(), listOpts.Namespace)
case listOpts.Namespace != "":
objs, err = c.indexer.ByIndex(cache.NamespaceIndex, listOpts.Namespace)
default:
@@ -178,6 +177,54 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli
return apimeta.SetList(out, runtimeObjs)
}
func byIndexes(indexer cache.Indexer, requires fields.Requirements, namespace string) ([]interface{}, error) {
var (
err error
objs []interface{}
vals []string
)
indexers := indexer.GetIndexers()
for idx, req := range requires {
indexName := FieldIndexName(req.Field)
indexedValue := KeyToNamespacedKey(namespace, req.Value)
if idx == 0 {
// we use first require to get snapshot data
// TODO(halfcrazy): use complicated index when client-go provides byIndexes
// https://github.com/kubernetes/kubernetes/issues/109329
objs, err = indexer.ByIndex(indexName, indexedValue)
if err != nil {
return nil, err
}
if len(objs) == 0 {
return nil, nil
}
continue
}
fn, exist := indexers[indexName]
if !exist {
return nil, fmt.Errorf("index with name %s does not exist", indexName)
}
filteredObjects := make([]interface{}, 0, len(objs))
for _, obj := range objs {
vals, err = fn(obj)
if err != nil {
return nil, err
}
for _, val := range vals {
if val == indexedValue {
filteredObjects = append(filteredObjects, obj)
break
}
}
}
if len(filteredObjects) == 0 {
return nil, nil
}
objs = filteredObjects
}
return objs, nil
}
// objectKeyToStorageKey converts an object key to store key.
// It's akin to MetaNamespaceKeyFunc. It's separate from
// String to allow keeping the key format easily in sync with

View File

@@ -36,6 +36,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/internal/syncs"
)
// InformersOpts configures an InformerMap.
@@ -49,6 +50,7 @@ type InformersOpts struct {
Selector Selector
Transform cache.TransformFunc
UnsafeDisableDeepCopy bool
WatchErrorHandler cache.WatchErrorHandler
}
// NewInformers creates a new InformersMap that can create informers under the hood.
@@ -76,6 +78,7 @@ func NewInformers(config *rest.Config, options *InformersOpts) *Informers {
transform: options.Transform,
unsafeDisableDeepCopy: options.UnsafeDisableDeepCopy,
newInformer: newInformer,
watchErrorHandler: options.WatchErrorHandler,
}
}
@@ -86,6 +89,20 @@ type Cache struct {
// CacheReader wraps Informer and implements the CacheReader interface for a single type
Reader CacheReader
// Stop can be used to stop this individual informer.
stop chan struct{}
}
// Start starts the informer managed by a MapEntry.
// Blocks until the informer stops. The informer can be stopped
// either individually (via the entry's stop channel) or globally
// via the provided stop argument.
func (c *Cache) Start(stop <-chan struct{}) {
// Stop on either the whole map stopping or just this informer being removed.
internalStop, cancel := syncs.MergeChans(stop, c.stop)
defer cancel()
c.Informer.Run(internalStop)
}
type tracker struct {
@@ -159,6 +176,11 @@ type Informers struct {
// NewInformer allows overriding of the shared index informer constructor for testing.
newInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer
// WatchErrorHandler allows the shared index informer's
// watchErrorHandler to be set by overriding the options
// or to use the default watchErrorHandler
watchErrorHandler cache.WatchErrorHandler
}
// Start calls Run on each of the informers and sets started to true. Blocks on the context.
@@ -173,13 +195,13 @@ func (ip *Informers) Start(ctx context.Context) error {
// Start each informer
for _, i := range ip.tracker.Structured {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}
for _, i := range ip.tracker.Unstructured {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}
for _, i := range ip.tracker.Metadata {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}
// Set started to true so we immediately start any informers added later.
@@ -194,7 +216,7 @@ func (ip *Informers) Start(ctx context.Context) error {
return nil
}
func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) {
func (ip *Informers) startInformerLocked(cacheEntry *Cache) {
// Don't start the informer in case we are already waiting for the items in
// the waitGroup to finish, since waitGroups don't support waiting and adding
// at the same time.
@@ -205,7 +227,7 @@ func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) {
ip.waitGroup.Add(1)
go func() {
defer ip.waitGroup.Done()
informer.Run(ip.ctx.Done())
cacheEntry.Start(ip.ctx.Done())
}()
}
@@ -281,6 +303,21 @@ func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj r
return started, i, nil
}
// Remove removes an informer entry and stops it if it was running.
func (ip *Informers) Remove(gvk schema.GroupVersionKind, obj runtime.Object) {
ip.mu.Lock()
defer ip.mu.Unlock()
informerMap := ip.informersByType(obj)
entry, ok := informerMap[gvk]
if !ok {
return
}
close(entry.stop)
delete(informerMap, gvk)
}
func (ip *Informers) informersByType(obj runtime.Object) map[schema.GroupVersionKind]*Cache {
switch obj.(type) {
case runtime.Unstructured:
@@ -323,6 +360,13 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
})
// Set WatchErrorHandler on SharedIndexInformer if set
if ip.watchErrorHandler != nil {
if err := sharedIndexInformer.SetWatchErrorHandler(ip.watchErrorHandler); err != nil {
return nil, false, err
}
}
// Check to see if there is a transformer for this gvk
if err := sharedIndexInformer.SetTransform(ip.transform); err != nil {
return nil, false, err
@@ -342,13 +386,14 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O
scopeName: mapping.Scope.Name(),
disableDeepCopy: ip.unsafeDisableDeepCopy,
},
stop: make(chan struct{}),
}
ip.informersByType(obj)[gvk] = i
// Start the informer in case the InformersMap has started, otherwise it will be
// started when the InformersMap starts.
if ip.started {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}
return i, ip.started, nil
}

View File

@@ -109,6 +109,27 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object
return &multiNamespaceInformer{namespaceToInformer: namespaceToInformer}, nil
}
func (c *multiNamespaceCache) RemoveInformer(ctx context.Context, obj client.Object) error {
// If the object is clusterscoped, get the informer from clusterCache,
// if not use the namespaced caches.
isNamespaced, err := apiutil.IsObjectNamespaced(obj, c.Scheme, c.RESTMapper)
if err != nil {
return err
}
if !isNamespaced {
return c.clusterCache.RemoveInformer(ctx, obj)
}
for _, cache := range c.namespaceToCache {
err := cache.RemoveInformer(ctx, obj)
if err != nil {
return err
}
}
return nil
}
func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
// If the object is cluster scoped, get the informer from clusterCache,
// if not use the namespaced caches.
@@ -391,3 +412,13 @@ func (i *multiNamespaceInformer) HasSynced() bool {
}
return true
}
// IsStopped checks if each namespaced informer has stopped, returns false if any are still running.
func (i *multiNamespaceInformer) IsStopped() bool {
for _, informer := range i.namespaceToInformer {
if stopped := informer.IsStopped(); !stopped {
return false
}
}
return true
}

View File

@@ -31,11 +31,9 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
)
var (
@@ -60,25 +58,6 @@ func AddToProtobufScheme(addToScheme func(*runtime.Scheme) error) error {
return addToScheme(protobufScheme)
}
// NewDiscoveryRESTMapper constructs a new RESTMapper based on discovery
// information fetched by a new client with the given config.
func NewDiscoveryRESTMapper(c *rest.Config, httpClient *http.Client) (meta.RESTMapper, error) {
if httpClient == nil {
return nil, fmt.Errorf("httpClient must not be nil, consider using rest.HTTPClientFor(c) to create a client")
}
// Get a mapper
dc, err := discovery.NewDiscoveryClientForConfigAndClient(c, httpClient)
if err != nil {
return nil, err
}
gr, err := restmapper.GetAPIGroupResources(dc)
if err != nil {
return nil, err
}
return restmapper.NewDiscoveryRESTMapper(gr), nil
}
// IsObjectNamespaced returns true if the object is namespace scoped.
// For unstructured objects the gvk is found from the object itself.
func IsObjectNamespaced(obj runtime.Object, scheme *runtime.Scheme, restmapper meta.RESTMapper) (bool, error) {

View File

@@ -21,6 +21,7 @@ import (
"net/http"
"sync"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -52,7 +53,7 @@ func NewDynamicRESTMapper(cfg *rest.Config, httpClient *http.Client) (meta.RESTM
// client for discovery information to do REST mappings.
type mapper struct {
mapper meta.RESTMapper
client *discovery.DiscoveryClient
client discovery.DiscoveryInterface
knownGroups map[string]*restmapper.APIGroupResources
apiGroups map[string]*metav1.APIGroup
@@ -166,8 +167,10 @@ func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) er
if err != nil {
return err
}
for _, version := range apiGroup.Versions {
versions = append(versions, version.Version)
if apiGroup != nil {
for _, version := range apiGroup.Versions {
versions = append(versions, version.Version)
}
}
}
@@ -179,23 +182,28 @@ func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) er
Group: metav1.APIGroup{Name: groupName},
VersionedResources: make(map[string][]metav1.APIResource),
}
// Update information for group resources about versioned resources.
// The number of API calls is equal to the number of versions: /apis/<group>/<version>.
// If we encounter a missing API version (NotFound error), we will remove the group from
// the m.apiGroups and m.knownGroups caches.
// If this happens, in the next call the group will be added back to apiGroups
// and only the existing versions will be loaded in knownGroups.
groupVersionResources, err := m.fetchGroupVersionResourcesLocked(groupName, versions...)
if err != nil {
return fmt.Errorf("failed to get API group resources: %w", err)
}
if _, ok := m.knownGroups[groupName]; ok {
groupResources = m.knownGroups[groupName]
}
// Update information for group resources about versioned resources.
// The number of API calls is equal to the number of versions: /apis/<group>/<version>.
groupVersionResources, err := m.fetchGroupVersionResources(groupName, versions...)
if err != nil {
return fmt.Errorf("failed to get API group resources: %w", err)
}
for version, resources := range groupVersionResources {
groupResources.VersionedResources[version.Version] = resources.APIResources
}
// Update information for group resources about the API group by adding new versions.
// Ignore the versions that are already registered.
for _, version := range versions {
for groupVersion, resources := range groupVersionResources {
version := groupVersion.Version
groupResources.VersionedResources[version] = resources.APIResources
found := false
for _, v := range groupResources.Group.Versions {
if v.Version == version {
@@ -254,21 +262,17 @@ func (m *mapper) findAPIGroupByName(groupName string) (*metav1.APIGroup, error)
m.mu.Unlock()
// Looking in the cache again.
{
m.mu.RLock()
group, ok := m.apiGroups[groupName]
m.mu.RUnlock()
if ok {
return group, nil
}
}
m.mu.RLock()
defer m.mu.RUnlock()
// If there is still nothing, return an error.
return nil, fmt.Errorf("failed to find API group %q", groupName)
// Don't return an error here if the API group is not present.
// The reloaded RESTMapper will take care of returning a NoMatchError.
return m.apiGroups[groupName], nil
}
// fetchGroupVersionResources fetches the resources for the specified group and its versions.
func (m *mapper) fetchGroupVersionResources(groupName string, versions ...string) (map[schema.GroupVersion]*metav1.APIResourceList, error) {
// fetchGroupVersionResourcesLocked fetches the resources for the specified group and its versions.
// This method might modify the cache so it needs to be called under the lock.
func (m *mapper) fetchGroupVersionResourcesLocked(groupName string, versions ...string) (map[schema.GroupVersion]*metav1.APIResourceList, error) {
groupVersionResources := make(map[schema.GroupVersion]*metav1.APIResourceList)
failedGroups := make(map[schema.GroupVersion]error)
@@ -276,9 +280,20 @@ func (m *mapper) fetchGroupVersionResources(groupName string, versions ...string
groupVersion := schema.GroupVersion{Group: groupName, Version: version}
apiResourceList, err := m.client.ServerResourcesForGroupVersion(groupVersion.String())
if err != nil {
if apierrors.IsNotFound(err) {
// If the version is not found, we remove the group from the cache
// so it gets refreshed on the next call.
if m.isAPIGroupCached(groupVersion) {
delete(m.apiGroups, groupName)
}
if m.isGroupVersionCached(groupVersion) {
delete(m.knownGroups, groupName)
}
continue
} else if err != nil {
failedGroups[groupVersion] = err
}
if apiResourceList != nil {
// even in case of error, some fallback might have been returned.
groupVersionResources[groupVersion] = apiResourceList
@@ -292,3 +307,29 @@ func (m *mapper) fetchGroupVersionResources(groupName string, versions ...string
return groupVersionResources, nil
}
// isGroupVersionCached checks if a version for a group is cached in the known groups cache.
func (m *mapper) isGroupVersionCached(gv schema.GroupVersion) bool {
if cachedGroup, ok := m.knownGroups[gv.Group]; ok {
_, cached := cachedGroup.VersionedResources[gv.Version]
return cached
}
return false
}
// isAPIGroupCached checks if a version for a group is cached in the api groups cache.
func (m *mapper) isAPIGroupCached(gv schema.GroupVersion) bool {
cachedGroup, ok := m.apiGroups[gv.Group]
if !ok {
return false
}
for _, version := range cachedGroup.Versions {
if version.Version == gv.Version {
return true
}
}
return false
}

View File

@@ -90,11 +90,18 @@ type CacheOptions struct {
type NewClientFunc func(config *rest.Config, options Options) (Client, error)
// New returns a new Client using the provided config and Options.
// The returned client reads *and* writes directly from the server
// (it doesn't use object caches). It understands how to work with
// normal types (both custom resources and aggregated/built-in resources),
// as well as unstructured types.
//
// The client's read behavior is determined by Options.Cache.
// If either Options.Cache or Options.Cache.Reader is nil,
// the client reads directly from the API server.
// If both Options.Cache and Options.Cache.Reader are non-nil,
// the client reads from a local cache. However, specific
// resources can still be configured to bypass the cache based
// on Options.Cache.Unstructured and Options.Cache.DisableFor.
// Write operations are always performed directly on the API server.
//
// The client understands how to work with normal types (both custom resources
// and aggregated/built-in resources), as well as unstructured types.
// In the case of normal types, the scheme will be used to look up the
// corresponding group, version, and kind for the given type. In the
// case of unstructured types, the group, version, and kind will be extracted
@@ -210,7 +217,8 @@ func newClient(config *rest.Config, options Options) (*client, error) {
var _ Client = &client{}
// client is a client.Client that reads and writes directly from/to an API server.
// client is a client.Client configured to either read from a local cache or directly from the API server.
// Write operations are always performed directly on the API server.
// It lazily initializes new clients at the time they are used.
type client struct {
typedClient typedClient

View File

@@ -334,10 +334,12 @@ func (t versionedTracker) Create(gvr schema.GroupVersionResource, obj runtime.Ob
// tries to assign whatever it finds into a ListType it gets from schema.New() - Thus we have to ensure
// we save as the very same type, otherwise subsequent List requests will fail.
func convertFromUnstructuredIfNecessary(s *runtime.Scheme, o runtime.Object) (runtime.Object, error) {
gvk := o.GetObjectKind().GroupVersionKind()
u, isUnstructured := o.(runtime.Unstructured)
if !isUnstructured || !s.Recognizes(gvk) {
if !isUnstructured {
return o, nil
}
gvk := o.GetObjectKind().GroupVersionKind()
if !s.Recognizes(gvk) {
return o, nil
}
@@ -380,12 +382,9 @@ func (t versionedTracker) update(gvr schema.GroupVersionResource, obj runtime.Ob
field.ErrorList{field.Required(field.NewPath("metadata.name"), "name is required")})
}
gvk := obj.GetObjectKind().GroupVersionKind()
if gvk.Empty() {
gvk, err = apiutil.GVKForObject(obj, t.scheme)
if err != nil {
return err
}
gvk, err := apiutil.GVKForObject(obj, t.scheme)
if err != nil {
return err
}
oldObject, err := t.ObjectTracker.Get(gvr, ns, accessor.GetName())
@@ -464,25 +463,25 @@ func (c *fakeClient) Get(ctx context.Context, key client.ObjectKey, obj client.O
return err
}
gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
return err
if _, isUnstructured := obj.(runtime.Unstructured); isUnstructured {
gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
return err
}
ta, err := meta.TypeAccessor(o)
if err != nil {
return err
}
ta.SetKind(gvk.Kind)
ta.SetAPIVersion(gvk.GroupVersion().String())
}
ta, err := meta.TypeAccessor(o)
if err != nil {
return err
}
ta.SetKind(gvk.Kind)
ta.SetAPIVersion(gvk.GroupVersion().String())
j, err := json.Marshal(o)
if err != nil {
return err
}
decoder := scheme.Codecs.UniversalDecoder()
zero(obj)
_, _, err = decoder.Decode(j, nil, obj)
return err
return json.Unmarshal(j, obj)
}
func (c *fakeClient) Watch(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (watch.Interface, error) {
@@ -527,21 +526,21 @@ func (c *fakeClient) List(ctx context.Context, obj client.ObjectList, opts ...cl
return err
}
ta, err := meta.TypeAccessor(o)
if err != nil {
return err
if _, isUnstructured := obj.(runtime.Unstructured); isUnstructured {
ta, err := meta.TypeAccessor(o)
if err != nil {
return err
}
ta.SetKind(originalKind)
ta.SetAPIVersion(gvk.GroupVersion().String())
}
ta.SetKind(originalKind)
ta.SetAPIVersion(gvk.GroupVersion().String())
j, err := json.Marshal(o)
if err != nil {
return err
}
decoder := scheme.Codecs.UniversalDecoder()
zero(obj)
_, _, err = decoder.Decode(j, nil, obj)
if err != nil {
if err := json.Unmarshal(j, obj); err != nil {
return err
}
@@ -588,9 +587,7 @@ func (c *fakeClient) filterList(list []runtime.Object, gvk schema.GroupVersionKi
}
func (c *fakeClient) filterWithFields(list []runtime.Object, gvk schema.GroupVersionKind, fs fields.Selector) ([]runtime.Object, error) {
// We only allow filtering on the basis of a single field to ensure consistency with the
// behavior of the cache reader (which we're faking here).
fieldKey, fieldVal, requiresExact := selector.RequiresExactMatch(fs)
requiresExact := selector.RequiresExactMatch(fs)
if !requiresExact {
return nil, fmt.Errorf("field selector %s is not in one of the two supported forms \"key==val\" or \"key=val\"",
fs)
@@ -599,15 +596,24 @@ func (c *fakeClient) filterWithFields(list []runtime.Object, gvk schema.GroupVer
// Field selection is mimicked via indexes, so there's no sane answer this function can give
// if there are no indexes registered for the GroupVersionKind of the objects in the list.
indexes := c.indexes[gvk]
if len(indexes) == 0 || indexes[fieldKey] == nil {
return nil, fmt.Errorf("List on GroupVersionKind %v specifies selector on field %s, but no "+
"index with name %s has been registered for GroupVersionKind %v", gvk, fieldKey, fieldKey, gvk)
for _, req := range fs.Requirements() {
if len(indexes) == 0 || indexes[req.Field] == nil {
return nil, fmt.Errorf("List on GroupVersionKind %v specifies selector on field %s, but no "+
"index with name %s has been registered for GroupVersionKind %v", gvk, req.Field, req.Field, gvk)
}
}
indexExtractor := indexes[fieldKey]
filteredList := make([]runtime.Object, 0, len(list))
for _, obj := range list {
if c.objMatchesFieldSelector(obj, indexExtractor, fieldVal) {
matches := true
for _, req := range fs.Requirements() {
indexExtractor := indexes[req.Field]
if !c.objMatchesFieldSelector(obj, indexExtractor, req.Value) {
matches = false
break
}
}
if matches {
filteredList = append(filteredList, obj)
}
}
@@ -862,21 +868,22 @@ func (c *fakeClient) patch(obj client.Object, patch client.Patch, opts ...client
if !handled {
panic("tracker could not handle patch method")
}
ta, err := meta.TypeAccessor(o)
if err != nil {
return err
if _, isUnstructured := obj.(runtime.Unstructured); isUnstructured {
ta, err := meta.TypeAccessor(o)
if err != nil {
return err
}
ta.SetKind(gvk.Kind)
ta.SetAPIVersion(gvk.GroupVersion().String())
}
ta.SetKind(gvk.Kind)
ta.SetAPIVersion(gvk.GroupVersion().String())
j, err := json.Marshal(o)
if err != nil {
return err
}
decoder := scheme.Codecs.UniversalDecoder()
zero(obj)
_, _, err = decoder.Decode(j, nil, obj)
return err
return json.Unmarshal(j, obj)
}
// Applying a patch results in a deletionTimestamp that is truncated to the nearest second.
@@ -940,7 +947,7 @@ func dryPatch(action testing.PatchActionImpl, tracker testing.ObjectTracker) (ru
if err := json.Unmarshal(modified, obj); err != nil {
return nil, err
}
case types.StrategicMergePatchType, types.ApplyPatchType:
case types.StrategicMergePatchType:
mergedByte, err := strategicpatch.StrategicMergePatch(old, action.GetPatch(), obj)
if err != nil {
return nil, err
@@ -948,8 +955,10 @@ func dryPatch(action testing.PatchActionImpl, tracker testing.ObjectTracker) (ru
if err = json.Unmarshal(mergedByte, obj); err != nil {
return nil, err
}
case types.ApplyPatchType:
return nil, errors.New("apply patches are not supported in the fake client. Follow https://github.com/kubernetes/kubernetes/issues/115598 for the current status")
default:
return nil, fmt.Errorf("PatchType is not supported")
return nil, fmt.Errorf("%s PatchType is not supported", action.GetPatchType())
}
return obj, nil
}
@@ -1247,6 +1256,8 @@ func inTreeResourcesWithStatus() []schema.GroupVersionKind {
{Group: "flowcontrol.apiserver.k8s.io", Version: "v1beta2", Kind: "FlowSchema"},
{Group: "flowcontrol.apiserver.k8s.io", Version: "v1beta2", Kind: "PriorityLevelConfiguration"},
{Group: "flowcontrol.apiserver.k8s.io", Version: "v1", Kind: "FlowSchema"},
{Group: "flowcontrol.apiserver.k8s.io", Version: "v1", Kind: "PriorityLevelConfiguration"},
}
}

View File

@@ -419,7 +419,7 @@ type ListOptions struct {
LabelSelector labels.Selector
// FieldSelector filters results by a particular field. In order
// to use this with cache-based implementations, restrict usage to
// a single field-value pair that's been added to the indexers.
// exact match field-value pair that's been added to the indexers.
FieldSelector fields.Selector
// Namespace represents the namespace to list for, or empty for
@@ -514,7 +514,8 @@ type MatchingLabels map[string]string
func (m MatchingLabels) ApplyToList(opts *ListOptions) {
// TODO(directxman12): can we avoid reserializing this over and over?
if opts.LabelSelector == nil {
opts.LabelSelector = labels.NewSelector()
opts.LabelSelector = labels.SelectorFromValidatedSet(map[string]string(m))
return
}
// If there's already a selector, we need to AND the two together.
noValidSel := labels.SelectorFromValidatedSet(map[string]string(m))

View File

@@ -27,7 +27,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
@@ -77,8 +77,8 @@ func SetControllerReference(owner, controlled metav1.Object, scheme *runtime.Sch
Kind: gvk.Kind,
Name: owner.GetName(),
UID: owner.GetUID(),
BlockOwnerDeletion: pointer.Bool(true),
Controller: pointer.Bool(true),
BlockOwnerDeletion: ptr.To(true),
Controller: ptr.To(true),
}
// Return early with an error if the object is already controlled.
@@ -121,6 +121,84 @@ func SetOwnerReference(owner, object metav1.Object, scheme *runtime.Scheme) erro
return nil
}
// RemoveOwnerReference is a helper method to make sure the given object removes an owner reference to the object provided.
// This allows you to remove the owner to establish a new owner of the object in a subsequent call.
func RemoveOwnerReference(owner, object metav1.Object, scheme *runtime.Scheme) error {
owners := object.GetOwnerReferences()
length := len(owners)
if length < 1 {
return fmt.Errorf("%T does not have any owner references", object)
}
ro, ok := owner.(runtime.Object)
if !ok {
return fmt.Errorf("%T is not a runtime.Object, cannot call RemoveOwnerReference", owner)
}
gvk, err := apiutil.GVKForObject(ro, scheme)
if err != nil {
return err
}
index := indexOwnerRef(owners, metav1.OwnerReference{
APIVersion: gvk.GroupVersion().String(),
Name: owner.GetName(),
Kind: gvk.Kind,
})
if index == -1 {
return fmt.Errorf("%T does not have an owner reference for %T", object, owner)
}
owners = append(owners[:index], owners[index+1:]...)
object.SetOwnerReferences(owners)
return nil
}
// HasControllerReference returns true if the object
// has an owner ref with controller equal to true
func HasControllerReference(object metav1.Object) bool {
owners := object.GetOwnerReferences()
for _, owner := range owners {
isTrue := owner.Controller
if owner.Controller != nil && *isTrue {
return true
}
}
return false
}
// RemoveControllerReference removes an owner reference where the controller
// equals true
func RemoveControllerReference(owner, object metav1.Object, scheme *runtime.Scheme) error {
if ok := HasControllerReference(object); !ok {
return fmt.Errorf("%T does not have a owner reference with controller equals true", object)
}
ro, ok := owner.(runtime.Object)
if !ok {
return fmt.Errorf("%T is not a runtime.Object, cannot call RemoveControllerReference", owner)
}
gvk, err := apiutil.GVKForObject(ro, scheme)
if err != nil {
return err
}
ownerRefs := object.GetOwnerReferences()
index := indexOwnerRef(ownerRefs, metav1.OwnerReference{
APIVersion: gvk.GroupVersion().String(),
Name: owner.GetName(),
Kind: gvk.Kind,
})
if index == -1 {
return fmt.Errorf("%T does not have an controller reference for %T", object, owner)
}
if ownerRefs[index].Controller == nil || !*ownerRefs[index].Controller {
return fmt.Errorf("%T owner is not the controller reference for %T", owner, object)
}
ownerRefs = append(ownerRefs[:index], ownerRefs[index+1:]...)
object.SetOwnerReferences(ownerRefs)
return nil
}
func upsertOwnerRef(ref metav1.OwnerReference, object metav1.Object) {
owners := object.GetOwnerReferences()
if idx := indexOwnerRef(owners, ref); idx == -1 {
@@ -166,7 +244,6 @@ func referSameObject(a, b metav1.OwnerReference) bool {
if err != nil {
return false
}
return aGV.Group == bGV.Group && a.Kind == b.Kind && a.Name == b.Name
}
@@ -193,6 +270,9 @@ const ( // They should complete the sentence "Deployment default/foo has been ..
// The MutateFn is called regardless of creating or updating an object.
//
// It returns the executed operation and an error.
//
// Note: changes made by MutateFn to any sub-resource (status...), will be
// discarded.
func CreateOrUpdate(ctx context.Context, c client.Client, obj client.Object, f MutateFn) (OperationResult, error) {
key := client.ObjectKeyFromObject(obj)
if err := c.Get(ctx, key, obj); err != nil {
@@ -230,6 +310,12 @@ func CreateOrUpdate(ctx context.Context, c client.Client, obj client.Object, f M
// The MutateFn is called regardless of creating or updating an object.
//
// It returns the executed operation and an error.
//
// Note: changes to any sub-resource other than status will be ignored.
// Changes to the status sub-resource will only be applied if the object
// already exist. To change the status on object creation, the easiest
// way is to requeue the object in the controller if OperationResult is
// OperationResultCreated
func CreateOrPatch(ctx context.Context, c client.Client, obj client.Object, f MutateFn) (OperationResult, error) {
key := client.ObjectKeyFromObject(obj)
if err := c.Get(ctx, key, obj); err != nil {

View File

@@ -38,7 +38,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/retry"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
"sigs.k8s.io/yaml"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -364,19 +364,26 @@ func modifyConversionWebhooks(crds []*apiextensionsv1.CustomResourceDefinition,
if err != nil {
return err
}
url := pointer.String(fmt.Sprintf("https://%s/convert", hostPort))
url := ptr.To(fmt.Sprintf("https://%s/convert", hostPort))
for i := range crds {
// Continue if we're preserving unknown fields.
if crds[i].Spec.PreserveUnknownFields {
continue
}
// Continue if the GroupKind isn't registered as being convertible.
if _, ok := convertibles[schema.GroupKind{
Group: crds[i].Spec.Group,
Kind: crds[i].Spec.Names.Kind,
}]; !ok {
continue
if !webhookOptions.IgnoreSchemeConvertible {
// Continue if the GroupKind isn't registered as being convertible,
// and remove any existing conversion webhooks if they exist.
// This is to prevent the CRD from being rejected by the apiserver, usually
// manifests that are generated by controller-gen will have a conversion
// webhook set, but we don't want to enable it if the type isn't registered.
if _, ok := convertibles[schema.GroupKind{
Group: crds[i].Spec.Group,
Kind: crds[i].Spec.Names.Kind,
}]; !ok {
crds[i].Spec.Conversion = nil
continue
}
}
if crds[i].Spec.Conversion == nil {
crds[i].Spec.Conversion = &apiextensionsv1.CustomResourceConversion{

View File

@@ -49,6 +49,11 @@ type WebhookInstallOptions struct {
// ValidatingWebhooks is a list of ValidatingWebhookConfigurations to install
ValidatingWebhooks []*admissionv1.ValidatingWebhookConfiguration
// IgnoreSchemeConvertible, will modify any CRD conversion webhook to use the local serving host and port,
// bypassing the need to have the types registered in the Scheme. This is useful for testing CRD conversion webhooks
// with unregistered or unstructured types.
IgnoreSchemeConvertible bool
// IgnoreErrorIfPathMissing will ignore an error if a DirectoryPath does not exist when set to true
IgnoreErrorIfPathMissing bool
@@ -184,7 +189,8 @@ func defaultWebhookOptions(o *WebhookInstallOptions) {
func WaitForWebhooks(config *rest.Config,
mutatingWebhooks []*admissionv1.MutatingWebhookConfiguration,
validatingWebhooks []*admissionv1.ValidatingWebhookConfiguration,
options WebhookInstallOptions) error {
options WebhookInstallOptions,
) error {
waitingFor := map[schema.GroupVersionKind]*sets.Set[string]{}
for _, hook := range mutatingWebhooks {
@@ -242,7 +248,7 @@ func (p *webhookPoller) poll(ctx context.Context) (done bool, err error) {
continue
}
for _, name := range names.UnsortedList() {
var obj = &unstructured.Unstructured{}
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(gvk)
err := c.Get(context.Background(), client.ObjectKey{
Namespace: "",

View File

@@ -42,7 +42,7 @@ import (
// Unless you are implementing your own EventHandler, you can ignore the functions on the EventHandler interface.
// Most users shouldn't need to implement their own EventHandler.
type EventHandler interface {
// Create is called in response to an create event - e.g. Pod Creation.
// Create is called in response to a create event - e.g. Pod Creation.
Create(context.Context, event.CreateEvent, workqueue.RateLimitingInterface)
// Update is called in response to an update event - e.g. Pod Updated.

View File

@@ -22,14 +22,16 @@ import (
)
// RequiresExactMatch checks if the given field selector is of the form `k=v` or `k==v`.
func RequiresExactMatch(sel fields.Selector) (field, val string, required bool) {
func RequiresExactMatch(sel fields.Selector) bool {
reqs := sel.Requirements()
if len(reqs) != 1 {
return "", "", false
if len(reqs) == 0 {
return false
}
req := reqs[0]
if req.Operator != selection.Equals && req.Operator != selection.DoubleEquals {
return "", "", false
for _, req := range reqs {
if req.Operator != selection.Equals && req.Operator != selection.DoubleEquals {
return false
}
}
return req.Field, req.Value, true
return true
}

View File

@@ -0,0 +1,38 @@
package syncs
import (
"context"
"reflect"
"sync"
)
// MergeChans returns a channel that is closed when any of the input channels are signaled.
// The caller must call the returned CancelFunc to ensure no resources are leaked.
func MergeChans[T any](chans ...<-chan T) (<-chan T, context.CancelFunc) {
var once sync.Once
out := make(chan T)
cancel := make(chan T)
cancelFunc := func() {
once.Do(func() {
close(cancel)
})
<-out
}
cases := make([]reflect.SelectCase, len(chans)+1)
for i := range chans {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(chans[i]),
}
}
cases[len(cases)-1] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(cancel),
}
go func() {
defer close(out)
_, _, _ = reflect.Select(cases)
}()
return out, cancelFunc
}

View File

@@ -112,6 +112,7 @@ func (k *KubeCtl) Run(args ...string) (stdout, stderr io.Reader, err error) {
cmd := exec.Command(k.Path, allArgs...)
cmd.Stdout = stdoutBuffer
cmd.Stderr = stderrBuffer
cmd.SysProcAttr = process.GetSysProcAttr()
err = cmd.Run()

View File

@@ -0,0 +1,28 @@
//go:build !aix && !darwin && !dragonfly && !freebsd && !linux && !netbsd && !openbsd && !solaris && !zos
// +build !aix,!darwin,!dragonfly,!freebsd,!linux,!netbsd,!openbsd,!solaris,!zos
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package process
import "syscall"
// GetSysProcAttr returns the SysProcAttr to use for the process,
// for non-unix systems this returns nil.
func GetSysProcAttr() *syscall.SysProcAttr {
return nil
}

View File

@@ -0,0 +1,33 @@
//go:build aix || darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris || zos
// +build aix darwin dragonfly freebsd linux netbsd openbsd solaris zos
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package process
import (
"golang.org/x/sys/unix"
)
// GetSysProcAttr returns the SysProcAttr to use for the process,
// for unix systems this returns a SysProcAttr with Setpgid set to true,
// which inherits the parent's process group id.
func GetSysProcAttr() *unix.SysProcAttr {
return &unix.SysProcAttr{
Setpgid: true,
}
}

View File

@@ -155,6 +155,7 @@ func (ps *State) Start(stdout, stderr io.Writer) (err error) {
ps.Cmd = exec.Command(ps.Path, ps.Args...)
ps.Cmd.Stdout = stdout
ps.Cmd.Stderr = stderr
ps.Cmd.SysProcAttr = GetSysProcAttr()
ready := make(chan bool)
timedOut := time.After(ps.StartTimeout)
@@ -265,6 +266,9 @@ func (ps *State) Stop() error {
case <-ps.waitDone:
break
case <-timedOut:
if err := ps.Cmd.Process.Signal(syscall.SIGKILL); err != nil {
return fmt.Errorf("unable to kill process %s: %w", ps.Path, err)
}
return fmt.Errorf("timeout waiting for process %s to stop", path.Base(ps.Path))
}
ps.ready = false

View File

@@ -34,7 +34,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/cache"
@@ -409,10 +409,10 @@ func New(config *rest.Config, options Options) (Manager, error) {
return nil, fmt.Errorf("failed to new pprof listener: %w", err)
}
errChan := make(chan error)
errChan := make(chan error, 1)
runnables := newRunnables(options.BaseContext, errChan)
return &controllerManager{
stopProcedureEngaged: pointer.Int64(0),
stopProcedureEngaged: ptr.To(int64(0)),
cluster: cluster,
runnables: runnables,
errChan: errChan,

View File

@@ -54,14 +54,14 @@ var (
Subsystem: WorkQueueSubsystem,
Name: QueueLatencyKey,
Help: "How long in seconds an item stays in workqueue before being requested",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12),
}, []string{"name"})
workDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: WorkDurationKey,
Help: "How long in seconds processing an item from workqueue takes.",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12),
}, []string{"name"})
unfinished = prometheus.NewGaugeVec(prometheus.GaugeOpts{

View File

@@ -19,9 +19,11 @@ package reconcile
import (
"context"
"errors"
"reflect"
"time"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// Result contains the result of a Reconciler invocation.
@@ -97,7 +99,7 @@ type Reconciler interface {
// If the error is nil and the returned Result has a non-zero result.RequeueAfter, the request
// will be requeued after the specified duration.
//
// If the error is nil and result.RequeueAfter is zero and result.Reque is true, the request
// If the error is nil and result.RequeueAfter is zero and result.Requeue is true, the request
// will be requeued using exponential backoff.
Reconcile(context.Context, Request) (Result, error)
}
@@ -110,6 +112,36 @@ var _ Reconciler = Func(nil)
// Reconcile implements Reconciler.
func (r Func) Reconcile(ctx context.Context, o Request) (Result, error) { return r(ctx, o) }
// ObjectReconciler is a specialized version of Reconciler that acts on instances of client.Object. Each reconciliation
// event gets the associated object from Kubernetes before passing it to Reconcile. An ObjectReconciler can be used in
// Builder.Complete by calling AsReconciler. See Reconciler for more details.
type ObjectReconciler[T client.Object] interface {
Reconcile(context.Context, T) (Result, error)
}
// AsReconciler creates a Reconciler based on the given ObjectReconciler.
func AsReconciler[T client.Object](client client.Client, rec ObjectReconciler[T]) Reconciler {
return &objectReconcilerAdapter[T]{
objReconciler: rec,
client: client,
}
}
type objectReconcilerAdapter[T client.Object] struct {
objReconciler ObjectReconciler[T]
client client.Client
}
// Reconcile implements Reconciler.
func (a *objectReconcilerAdapter[T]) Reconcile(ctx context.Context, req Request) (Result, error) {
o := reflect.New(reflect.TypeOf(*new(T)).Elem()).Interface().(T)
if err := a.client.Get(ctx, req.NamespacedName, o); err != nil {
return Result{}, client.IgnoreNotFound(err)
}
return a.objReconciler.Reconcile(ctx, o)
}
// TerminalError is an error that will not be retried but still be logged
// and recorded in metrics.
func TerminalError(wrapped error) error {

View File

@@ -27,12 +27,14 @@ import (
)
// Defaulter defines functions for setting defaults on resources.
// Deprecated: Ue CustomDefaulter instead.
type Defaulter interface {
runtime.Object
Default()
}
// DefaultingWebhookFor creates a new Webhook for Defaulting the provided type.
// Deprecated: Use WithCustomDefaulter instead.
func DefaultingWebhookFor(scheme *runtime.Scheme, defaulter Defaulter) *Webhook {
return &Webhook{
Handler: &mutatingHandler{defaulter: defaulter, decoder: NewDecoder(scheme)},

View File

@@ -34,6 +34,26 @@ import (
var admissionScheme = runtime.NewScheme()
var admissionCodecs = serializer.NewCodecFactory(admissionScheme)
// adapted from https://github.com/kubernetes/kubernetes/blob/c28c2009181fcc44c5f6b47e10e62dacf53e4da0/staging/src/k8s.io/pod-security-admission/cmd/webhook/server/server.go
//
// From https://github.com/kubernetes/apiserver/blob/d6876a0600de06fef75968c4641c64d7da499f25/pkg/server/config.go#L433-L442C5:
//
// 1.5MB is the recommended client request size in byte
// the etcd server should accept. See
// https://github.com/etcd-io/etcd/blob/release-3.4/embed/config.go#L56.
// A request body might be encoded in json, and is converted to
// proto when persisted in etcd, so we allow 2x as the largest request
// body size to be accepted and decoded in a write request.
//
// For the admission request, we can infer that it contains at most two objects
// (the old and new versions of the object being admitted), each of which can
// be at most 3MB in size. For the rest of the request, we can assume that
// it will be less than 1MB in size. Therefore, we can set the max request
// size to 7MB.
// If your use case requires larger max request sizes, please
// open an issue (https://github.com/kubernetes-sigs/controller-runtime/issues/new).
const maxRequestSize = int64(7 * 1024 * 1024)
func init() {
utilruntime.Must(v1.AddToScheme(admissionScheme))
utilruntime.Must(v1beta1.AddToScheme(admissionScheme))
@@ -42,27 +62,30 @@ func init() {
var _ http.Handler = &Webhook{}
func (wh *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var body []byte
var err error
ctx := r.Context()
if wh.WithContextFunc != nil {
ctx = wh.WithContextFunc(ctx, r)
}
var reviewResponse Response
if r.Body == nil {
err = errors.New("request body is empty")
if r.Body == nil || r.Body == http.NoBody {
err := errors.New("request body is empty")
wh.getLogger(nil).Error(err, "bad request")
reviewResponse = Errored(http.StatusBadRequest, err)
wh.writeResponse(w, reviewResponse)
wh.writeResponse(w, Errored(http.StatusBadRequest, err))
return
}
defer r.Body.Close()
if body, err = io.ReadAll(r.Body); err != nil {
limitedReader := &io.LimitedReader{R: r.Body, N: maxRequestSize}
body, err := io.ReadAll(limitedReader)
if err != nil {
wh.getLogger(nil).Error(err, "unable to read the body from the incoming request")
reviewResponse = Errored(http.StatusBadRequest, err)
wh.writeResponse(w, reviewResponse)
wh.writeResponse(w, Errored(http.StatusBadRequest, err))
return
}
if limitedReader.N <= 0 {
err := fmt.Errorf("request entity is too large; limit is %d bytes", maxRequestSize)
wh.getLogger(nil).Error(err, "unable to read the body from the incoming request; limit reached")
wh.writeResponse(w, Errored(http.StatusRequestEntityTooLarge, err))
return
}
@@ -70,8 +93,7 @@ func (wh *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if contentType := r.Header.Get("Content-Type"); contentType != "application/json" {
err = fmt.Errorf("contentType=%s, expected application/json", contentType)
wh.getLogger(nil).Error(err, "unable to process a request with unknown content type")
reviewResponse = Errored(http.StatusBadRequest, err)
wh.writeResponse(w, reviewResponse)
wh.writeResponse(w, Errored(http.StatusBadRequest, err))
return
}
@@ -89,14 +111,12 @@ func (wh *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) {
_, actualAdmRevGVK, err := admissionCodecs.UniversalDeserializer().Decode(body, nil, &ar)
if err != nil {
wh.getLogger(nil).Error(err, "unable to decode the request")
reviewResponse = Errored(http.StatusBadRequest, err)
wh.writeResponse(w, reviewResponse)
wh.writeResponse(w, Errored(http.StatusBadRequest, err))
return
}
wh.getLogger(&req).V(5).Info("received request")
reviewResponse = wh.Handle(ctx, req)
wh.writeResponseTyped(w, reviewResponse, actualAdmRevGVK)
wh.writeResponseTyped(w, wh.Handle(ctx, req), actualAdmRevGVK)
}
// writeResponse writes response to w generically, i.e. without encoding GVK information.

View File

@@ -33,6 +33,7 @@ type Warnings []string
// Validator defines functions for validating an operation.
// The custom resource kind which implements this interface can validate itself.
// To validate the custom resource with another specific struct, use CustomValidator instead.
// Deprecated: Use CustomValidator instead.
type Validator interface {
runtime.Object
@@ -53,6 +54,7 @@ type Validator interface {
}
// ValidatingWebhookFor creates a new Webhook for validating the provided type.
// Deprecated: Use WithCustomValidator instead.
func ValidatingWebhookFor(scheme *runtime.Scheme, validator Validator) *Webhook {
return &Webhook{
Handler: &validatingHandler{validator: validator, decoder: NewDecoder(scheme)},

View File

@@ -30,7 +30,6 @@ import (
// CustomValidator defines functions for validating an operation.
// The object to be validated is passed into methods as a parameter.
type CustomValidator interface {
// ValidateCreate validates the object on creation.
// The optional warnings will be added to the response as warning messages.
// Return an error if the object is invalid.

View File

@@ -24,9 +24,11 @@ import (
// define some aliases for common bits of the webhook functionality
// Defaulter defines functions for setting defaults on resources.
// Deprecated: Use CustomDefaulter instead.
type Defaulter = admission.Defaulter
// Validator defines functions for validating an operation.
// Deprecated: Use CustomValidator instead.
type Validator = admission.Validator
// CustomDefaulter defines functions for setting defaults on resources.