Update packages and add vendor directory

This commit is contained in:
Eddy Filip
2022-09-13 16:24:52 +03:00
parent 23b66f73af
commit 1d75f78891
3906 changed files with 1365387 additions and 465 deletions

View File

@@ -0,0 +1,360 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/go-logr/logr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/source"
)
var _ inject.Injector = &Controller{}
// Controller implements controller.Controller.
type Controller struct {
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
Name string
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
MaxConcurrentReconciles int
// Reconciler is a function that can be called at any time with the Name / Namespace of an object and
// ensures that the state of the system matches the state specified in the object.
// Defaults to the DefaultReconcileFunc.
Do reconcile.Reconciler
// MakeQueue constructs the queue for this controller once the controller is ready to start.
// This exists because the standard Kubernetes workqueues start themselves immediately, which
// leads to goroutine leaks if something calls controller.New repeatedly.
MakeQueue func() workqueue.RateLimitingInterface
// Queue is an listeningQueue that listens for events from Informers and adds object keys to
// the Queue for processing
Queue workqueue.RateLimitingInterface
// SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates
// Deprecated: the caller should handle injected fields itself.
SetFields func(i interface{}) error
// mu is used to synchronize Controller setup
mu sync.Mutex
// Started is true if the Controller has been Started
Started bool
// ctx is the context that was passed to Start() and used when starting watches.
//
// According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context,
// while we usually always strive to follow best practices, we consider this a legacy case and it should
// undergo a major refactoring and redesign to allow for context to not be stored in a struct.
ctx context.Context
// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
// Defaults to 2 minutes if not set.
CacheSyncTimeout time.Duration
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
startWatches []watchDescription
// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
// or for example when a watch is started.
// Note: LogConstructor has to be able to handle nil requests as we are also using it
// outside the context of a reconciliation.
LogConstructor func(request *reconcile.Request) logr.Logger
// RecoverPanic indicates whether the panic caused by reconcile should be recovered.
RecoverPanic bool
}
// watchDescription contains all the information necessary to start a watch.
type watchDescription struct {
src source.Source
handler handler.EventHandler
predicates []predicate.Predicate
}
// Reconcile implements reconcile.Reconciler.
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
defer func() {
if r := recover(); r != nil {
if c.RecoverPanic {
for _, fn := range utilruntime.PanicHandlers {
fn(r)
}
err = fmt.Errorf("panic: %v [recovered]", r)
return
}
log := logf.FromContext(ctx)
log.Info(fmt.Sprintf("Observed a panic in reconciler: %v", r))
panic(r)
}
}()
return c.Do.Reconcile(ctx, req)
}
// Watch implements controller.Controller.
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
c.mu.Lock()
defer c.mu.Unlock()
// Inject Cache into arguments
if err := c.SetFields(src); err != nil {
return err
}
if err := c.SetFields(evthdler); err != nil {
return err
}
for _, pr := range prct {
if err := c.SetFields(pr); err != nil {
return err
}
}
// Controller hasn't started yet, store the watches locally and return.
//
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
if !c.Started {
c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
return nil
}
c.LogConstructor(nil).Info("Starting EventSource", "source", src)
return src.Start(c.ctx, evthdler, c.Queue, prct...)
}
// Start implements controller.Controller.
func (c *Controller) Start(ctx context.Context) error {
// use an IIFE to get proper lock handling
// but lock outside to get proper handling of the queue shutdown
c.mu.Lock()
if c.Started {
return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
}
c.initMetrics()
// Set the internal context.
c.ctx = ctx
c.Queue = c.MakeQueue()
go func() {
<-ctx.Done()
c.Queue.ShutDown()
}()
wg := &sync.WaitGroup{}
err := func() error {
defer c.mu.Unlock()
// TODO(pwittrock): Reconsider HandleCrash
defer utilruntime.HandleCrash()
// NB(directxman12): launch the sources *before* trying to wait for the
// caches to sync so that they have a chance to register their intendeded
// caches.
for _, watch := range c.startWatches {
c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
c.LogConstructor(nil).Info("Starting Controller")
for _, watch := range c.startWatches {
syncingSource, ok := watch.src.(source.SyncingSource)
if !ok {
continue
}
if err := func() error {
// use a context with timeout for launching sources and syncing caches.
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()
// WaitForSync waits for a definitive timeout, and returns if there
// is an error or a timeout
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync")
return err
}
return nil
}(); err != nil {
return err
}
}
// All the watches have been started, we can reset the local slice.
//
// We should never hold watches more than necessary, each watch source can hold a backing cache,
// which won't be garbage collected if we hold a reference to it.
c.startWatches = nil
// Launch workers to process resources
c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
wg.Add(c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go func() {
defer wg.Done()
// Run a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
for c.processNextWorkItem(ctx) {
}
}()
}
c.Started = true
return nil
}()
if err != nil {
return err
}
<-ctx.Done()
c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish")
wg.Wait()
c.LogConstructor(nil).Info("All workers finished")
return nil
}
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the reconcileHandler.
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.Queue.Get()
if shutdown {
// Stop working
return false
}
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.Queue.Done(obj)
ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
c.reconcileHandler(ctx, obj)
return true
}
const (
labelError = "error"
labelRequeueAfter = "requeue_after"
labelRequeue = "requeue"
labelSuccess = "success"
)
func (c *Controller) initMetrics() {
ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Set(0)
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Add(0)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Add(0)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Add(0)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Add(0)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Add(0)
ctrlmetrics.WorkerCount.WithLabelValues(c.Name).Set(float64(c.MaxConcurrentReconciles))
}
func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
// Update metrics after processing each item
reconcileStartTS := time.Now()
defer func() {
c.updateMetrics(time.Since(reconcileStartTS))
}()
// Make sure that the object is a valid request.
req, ok := obj.(reconcile.Request)
if !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.Queue.Forget(obj)
c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
// Return true, don't take a break
return
}
log := c.LogConstructor(&req)
log = log.WithValues("reconcileID", uuid.NewUUID())
ctx = logf.IntoContext(ctx, log)
// RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the
// resource to be synced.
result, err := c.Reconcile(ctx, req)
switch {
case err != nil:
c.Queue.AddRateLimited(req)
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
log.Error(err, "Reconciler error")
case result.RequeueAfter > 0:
// The result.RequeueAfter request will be lost, if it is returned
// along with a non-nil error. But this is intended as
// We need to drive to stable reconcile loops before queuing due
// to result.RequestAfter
c.Queue.Forget(obj)
c.Queue.AddAfter(req, result.RequeueAfter)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
case result.Requeue:
c.Queue.AddRateLimited(req)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
default:
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.Queue.Forget(obj)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()
}
}
// GetLogger returns this controller's logger.
func (c *Controller) GetLogger() logr.Logger {
return c.LogConstructor(nil)
}
// InjectFunc implement SetFields.Injector.
func (c *Controller) InjectFunc(f inject.Func) error {
c.SetFields = f
return nil
}
// updateMetrics updates prometheus metrics within the controller.
func (c *Controller) updateMetrics(reconcileTime time.Duration) {
ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds())
}

View File

@@ -0,0 +1,78 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)
var (
// ReconcileTotal is a prometheus counter metrics which holds the total
// number of reconciliations per controller. It has two labels. controller label refers
// to the controller name and result label refers to the reconcile result i.e
// success, error, requeue, requeue_after.
ReconcileTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "controller_runtime_reconcile_total",
Help: "Total number of reconciliations per controller",
}, []string{"controller", "result"})
// ReconcileErrors is a prometheus counter metrics which holds the total
// number of errors from the Reconciler.
ReconcileErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "controller_runtime_reconcile_errors_total",
Help: "Total number of reconciliation errors per controller",
}, []string{"controller"})
// ReconcileTime is a prometheus metric which keeps track of the duration
// of reconciliations.
ReconcileTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "controller_runtime_reconcile_time_seconds",
Help: "Length of time per reconciliation per controller",
Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0,
1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 40, 50, 60},
}, []string{"controller"})
// WorkerCount is a prometheus metric which holds the number of
// concurrent reconciles per controller.
WorkerCount = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "controller_runtime_max_concurrent_reconciles",
Help: "Maximum number of concurrent reconciles per controller",
}, []string{"controller"})
// ActiveWorkers is a prometheus metric which holds the number
// of active workers per controller.
ActiveWorkers = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "controller_runtime_active_workers",
Help: "Number of currently used workers per controller",
}, []string{"controller"})
)
func init() {
metrics.Registry.MustRegister(
ReconcileTotal,
ReconcileErrors,
ReconcileTime,
WorkerCount,
ActiveWorkers,
// expose process metrics like CPU, Memory, file descriptor usage etc.
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
// expose Go runtime metrics like GC stats, memory stats etc.
collectors.NewGoCollector(),
)
}

View File

@@ -0,0 +1,21 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package flock is copied from k8s.io/kubernetes/pkg/util/flock to avoid
// importing k8s.io/kubernetes as a dependency.
//
// Provides file locking functionalities on unix systems.
package flock

View File

@@ -0,0 +1,24 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flock
import "errors"
var (
// ErrAlreadyLocked is returned when the file is already locked.
ErrAlreadyLocked = errors.New("the file is already locked")
)

View File

@@ -0,0 +1,24 @@
// +build !linux,!darwin,!freebsd,!openbsd,!netbsd,!dragonfly
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flock
// Acquire is not implemented on non-unix systems.
func Acquire(path string) error {
return nil
}

View File

@@ -0,0 +1,48 @@
//go:build linux || darwin || freebsd || openbsd || netbsd || dragonfly
// +build linux darwin freebsd openbsd netbsd dragonfly
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flock
import (
"errors"
"fmt"
"os"
"golang.org/x/sys/unix"
)
// Acquire acquires a lock on a file for the duration of the process. This method
// is reentrant.
func Acquire(path string) error {
fd, err := unix.Open(path, unix.O_CREAT|unix.O_RDWR|unix.O_CLOEXEC, 0600)
if err != nil {
if errors.Is(err, os.ErrExist) {
return fmt.Errorf("cannot lock file %q: %w", path, ErrAlreadyLocked)
}
return err
}
// We don't need to close the fd since we should hold
// it until the process exits.
err = unix.Flock(fd, unix.LOCK_NB|unix.LOCK_EX)
if errors.Is(err, unix.EWOULDBLOCK) { // This condition requires LOCK_NB.
return fmt.Errorf("cannot lock file %q: %w", path, ErrAlreadyLocked)
}
return err
}

View File

@@ -0,0 +1,16 @@
package httpserver
import (
"net/http"
"time"
)
// New returns a new server with sane defaults.
func New(handler http.Handler) *http.Server {
return &http.Server{
Handler: handler,
MaxHeaderBytes: 1 << 20,
IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout
ReadHeaderTimeout: 32 * time.Second,
}
}

View File

@@ -0,0 +1,32 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package log
import (
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/log"
)
var (
// RuntimeLog is a base parent logger for use inside controller-runtime.
RuntimeLog logr.Logger
)
func init() {
RuntimeLog = log.Log.WithName("controller-runtime")
}

View File

@@ -0,0 +1,78 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package objectutil
import (
"errors"
"fmt"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)
// FilterWithLabels returns a copy of the items in objs matching labelSel.
func FilterWithLabels(objs []runtime.Object, labelSel labels.Selector) ([]runtime.Object, error) {
outItems := make([]runtime.Object, 0, len(objs))
for _, obj := range objs {
meta, err := apimeta.Accessor(obj)
if err != nil {
return nil, err
}
if labelSel != nil {
lbls := labels.Set(meta.GetLabels())
if !labelSel.Matches(lbls) {
continue
}
}
outItems = append(outItems, obj.DeepCopyObject())
}
return outItems, nil
}
// IsAPINamespaced returns true if the object is namespace scoped.
// For unstructured objects the gvk is found from the object itself.
func IsAPINamespaced(obj runtime.Object, scheme *runtime.Scheme, restmapper apimeta.RESTMapper) (bool, error) {
gvk, err := apiutil.GVKForObject(obj, scheme)
if err != nil {
return false, err
}
return IsAPINamespacedWithGVK(gvk, scheme, restmapper)
}
// IsAPINamespacedWithGVK returns true if the object having the provided
// GVK is namespace scoped.
func IsAPINamespacedWithGVK(gk schema.GroupVersionKind, scheme *runtime.Scheme, restmapper apimeta.RESTMapper) (bool, error) {
restmapping, err := restmapper.RESTMapping(schema.GroupKind{Group: gk.Group, Kind: gk.Kind})
if err != nil {
return false, fmt.Errorf("failed to get restmapping: %w", err)
}
scope := restmapping.Scope.Name()
if scope == "" {
return false, errors.New("scope cannot be identified, empty scope returned")
}
if scope != apimeta.RESTScopeNameRoot {
return true, nil
}
return false, nil
}

View File

@@ -0,0 +1,176 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package recorder
import (
"context"
"fmt"
"sync"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
)
// EventBroadcasterProducer makes an event broadcaster, returning
// whether or not the broadcaster should be stopped with the Provider,
// or not (e.g. if it's shared, it shouldn't be stopped with the Provider).
type EventBroadcasterProducer func() (caster record.EventBroadcaster, stopWithProvider bool)
// Provider is a recorder.Provider that records events to the k8s API server
// and to a logr Logger.
type Provider struct {
lock sync.RWMutex
stopped bool
// scheme to specify when creating a recorder
scheme *runtime.Scheme
// logger is the logger to use when logging diagnostic event info
logger logr.Logger
evtClient corev1client.EventInterface
makeBroadcaster EventBroadcasterProducer
broadcasterOnce sync.Once
broadcaster record.EventBroadcaster
stopBroadcaster bool
}
// NB(directxman12): this manually implements Stop instead of Being a runnable because we need to
// stop it *after* everything else shuts down, otherwise we'll cause panics as the leader election
// code finishes up and tries to continue emitting events.
// Stop attempts to stop this provider, stopping the underlying broadcaster
// if the broadcaster asked to be stopped. It kinda tries to honor the given
// context, but the underlying broadcaster has an indefinite wait that doesn't
// return until all queued events are flushed, so this may end up just returning
// before the underlying wait has finished instead of cancelling the wait.
// This is Very Frustrating™.
func (p *Provider) Stop(shutdownCtx context.Context) {
doneCh := make(chan struct{})
go func() {
// technically, this could start the broadcaster, but practically, it's
// almost certainly already been started (e.g. by leader election). We
// need to invoke this to ensure that we don't inadvertently race with
// an invocation of getBroadcaster.
broadcaster := p.getBroadcaster()
if p.stopBroadcaster {
p.lock.Lock()
broadcaster.Shutdown()
p.stopped = true
p.lock.Unlock()
}
close(doneCh)
}()
select {
case <-shutdownCtx.Done():
case <-doneCh:
}
}
// getBroadcaster ensures that a broadcaster is started for this
// provider, and returns it. It's threadsafe.
func (p *Provider) getBroadcaster() record.EventBroadcaster {
// NB(directxman12): this can technically still leak if something calls
// "getBroadcaster" (i.e. Emits an Event) but never calls Start, but if we
// create the broadcaster in start, we could race with other things that
// are started at the same time & want to emit events. The alternative is
// silently swallowing events and more locking, but that seems suboptimal.
p.broadcasterOnce.Do(func() {
broadcaster, stop := p.makeBroadcaster()
broadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient})
broadcaster.StartEventWatcher(
func(e *corev1.Event) {
p.logger.V(1).Info(e.Message, "type", e.Type, "object", e.InvolvedObject, "reason", e.Reason)
})
p.broadcaster = broadcaster
p.stopBroadcaster = stop
})
return p.broadcaster
}
// NewProvider create a new Provider instance.
func NewProvider(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster EventBroadcasterProducer) (*Provider, error) {
corev1Client, err := corev1client.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to init client: %w", err)
}
p := &Provider{scheme: scheme, logger: logger, makeBroadcaster: makeBroadcaster, evtClient: corev1Client.Events("")}
return p, nil
}
// GetEventRecorderFor returns an event recorder that broadcasts to this provider's
// broadcaster. All events will be associated with a component of the given name.
func (p *Provider) GetEventRecorderFor(name string) record.EventRecorder {
return &lazyRecorder{
prov: p,
name: name,
}
}
// lazyRecorder is a recorder that doesn't actually instantiate any underlying
// recorder until the first event is emitted.
type lazyRecorder struct {
prov *Provider
name string
recOnce sync.Once
rec record.EventRecorder
}
// ensureRecording ensures that a concrete recorder is populated for this recorder.
func (l *lazyRecorder) ensureRecording() {
l.recOnce.Do(func() {
broadcaster := l.prov.getBroadcaster()
l.rec = broadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name})
})
}
func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message string) {
l.ensureRecording()
l.prov.lock.RLock()
if !l.prov.stopped {
l.rec.Event(object, eventtype, reason, message)
}
l.prov.lock.RUnlock()
}
func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
l.ensureRecording()
l.prov.lock.RLock()
if !l.prov.stopped {
l.rec.Eventf(object, eventtype, reason, messageFmt, args...)
}
l.prov.lock.RUnlock()
}
func (l *lazyRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
l.ensureRecording()
l.prov.lock.RLock()
if !l.prov.stopped {
l.rec.AnnotatedEventf(object, annotations, eventtype, reason, messageFmt, args...)
}
l.prov.lock.RUnlock()
}

View File

@@ -0,0 +1,142 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package addr
import (
"errors"
"fmt"
"io/fs"
"net"
"os"
"path/filepath"
"strings"
"time"
"sigs.k8s.io/controller-runtime/pkg/internal/flock"
)
// TODO(directxman12): interface / release functionality for external port managers
const (
portReserveTime = 2 * time.Minute
portConflictRetry = 100
portFilePrefix = "port-"
)
var (
cacheDir string
)
func init() {
baseDir, err := os.UserCacheDir()
if err == nil {
cacheDir = filepath.Join(baseDir, "kubebuilder-envtest")
err = os.MkdirAll(cacheDir, 0o750)
}
if err != nil {
// Either we didn't get a cache directory, or we can't use it
baseDir = os.TempDir()
cacheDir = filepath.Join(baseDir, "kubebuilder-envtest")
err = os.MkdirAll(cacheDir, 0o750)
}
if err != nil {
panic(err)
}
}
type portCache struct{}
func (c *portCache) add(port int) (bool, error) {
// Remove outdated ports.
if err := fs.WalkDir(os.DirFS(cacheDir), ".", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() || !d.Type().IsRegular() || !strings.HasPrefix(path, portFilePrefix) {
return nil
}
info, err := d.Info()
if err != nil {
// No-op if file no longer exists; may have been deleted by another
// process/thread trying to allocate ports.
if errors.Is(err, fs.ErrNotExist) {
return nil
}
return err
}
if time.Since(info.ModTime()) > portReserveTime {
if err := os.Remove(filepath.Join(cacheDir, path)); err != nil {
// No-op if file no longer exists; may have been deleted by another
// process/thread trying to allocate ports.
if os.IsNotExist(err) {
return nil
}
return err
}
}
return nil
}); err != nil {
return false, err
}
// Try allocating new port, by acquiring a file.
path := fmt.Sprintf("%s/%s%d", cacheDir, portFilePrefix, port)
if err := flock.Acquire(path); errors.Is(err, flock.ErrAlreadyLocked) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
}
var cache = &portCache{}
func suggest(listenHost string) (*net.TCPListener, int, string, error) {
if listenHost == "" {
listenHost = "localhost"
}
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(listenHost, "0"))
if err != nil {
return nil, -1, "", err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, -1, "", err
}
return l, l.Addr().(*net.TCPAddr).Port,
addr.IP.String(),
nil
}
// Suggest suggests an address a process can listen on. It returns
// a tuple consisting of a free port and the hostname resolved to its IP.
// It makes sure that new port allocated does not conflict with old ports
// allocated within 1 minute.
func Suggest(listenHost string) (int, string, error) {
for i := 0; i < portConflictRetry; i++ {
listener, port, resolvedHost, err := suggest(listenHost)
if err != nil {
return -1, "", err
}
defer listener.Close()
if ok, err := cache.add(port); ok {
return port, resolvedHost, nil
} else if err != nil {
return -1, "", err
}
}
return -1, "", fmt.Errorf("no free ports found after %d retries", portConflictRetry)
}

View File

@@ -0,0 +1,224 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package certs
// NB(directxman12): nothing has verified that this has good settings. In fact,
// the setting generated here are probably terrible, but they're fine for integration
// tests. These ABSOLUTELY SHOULD NOT ever be exposed in the public API. They're
// ONLY for use with envtest's ability to configure webhook testing.
// If I didn't otherwise not want to add a dependency on cfssl, I'd just use that.
import (
"crypto"
"crypto/ecdsa"
"crypto/elliptic"
crand "crypto/rand"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"math/big"
"net"
"time"
certutil "k8s.io/client-go/util/cert"
)
var (
ellipticCurve = elliptic.P256()
bigOne = big.NewInt(1)
)
// CertPair is a private key and certificate for use for client auth, as a CA, or serving.
type CertPair struct {
Key crypto.Signer
Cert *x509.Certificate
}
// CertBytes returns the PEM-encoded version of the certificate for this pair.
func (k CertPair) CertBytes() []byte {
return pem.EncodeToMemory(&pem.Block{
Type: "CERTIFICATE",
Bytes: k.Cert.Raw,
})
}
// AsBytes encodes keypair in the appropriate formats for on-disk storage (PEM and
// PKCS8, respectively).
func (k CertPair) AsBytes() (cert []byte, key []byte, err error) {
cert = k.CertBytes()
rawKeyData, err := x509.MarshalPKCS8PrivateKey(k.Key)
if err != nil {
return nil, nil, fmt.Errorf("unable to encode private key: %w", err)
}
key = pem.EncodeToMemory(&pem.Block{
Type: "PRIVATE KEY",
Bytes: rawKeyData,
})
return cert, key, nil
}
// TinyCA supports signing serving certs and client-certs,
// and can be used as an auth mechanism with envtest.
type TinyCA struct {
CA CertPair
orgName string
nextSerial *big.Int
}
// newPrivateKey generates a new private key of a relatively sane size (see
// rsaKeySize).
func newPrivateKey() (crypto.Signer, error) {
return ecdsa.GenerateKey(ellipticCurve, crand.Reader)
}
// NewTinyCA creates a new a tiny CA utility for provisioning serving certs and client certs FOR TESTING ONLY.
// Don't use this for anything else!
func NewTinyCA() (*TinyCA, error) {
caPrivateKey, err := newPrivateKey()
if err != nil {
return nil, fmt.Errorf("unable to generate private key for CA: %w", err)
}
caCfg := certutil.Config{CommonName: "envtest-environment", Organization: []string{"envtest"}}
caCert, err := certutil.NewSelfSignedCACert(caCfg, caPrivateKey)
if err != nil {
return nil, fmt.Errorf("unable to generate certificate for CA: %w", err)
}
return &TinyCA{
CA: CertPair{Key: caPrivateKey, Cert: caCert},
orgName: "envtest",
nextSerial: big.NewInt(1),
}, nil
}
func (c *TinyCA) makeCert(cfg certutil.Config) (CertPair, error) {
now := time.Now()
key, err := newPrivateKey()
if err != nil {
return CertPair{}, fmt.Errorf("unable to create private key: %w", err)
}
serial := new(big.Int).Set(c.nextSerial)
c.nextSerial.Add(c.nextSerial, bigOne)
template := x509.Certificate{
Subject: pkix.Name{CommonName: cfg.CommonName, Organization: cfg.Organization},
DNSNames: cfg.AltNames.DNSNames,
IPAddresses: cfg.AltNames.IPs,
SerialNumber: serial,
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: cfg.Usages,
// technically not necessary for testing, but let's set anyway just in case.
NotBefore: now.UTC(),
// 1 week -- the default for cfssl, and just long enough for a
// long-term test, but not too long that anyone would try to use this
// seriously.
NotAfter: now.Add(168 * time.Hour).UTC(),
}
certRaw, err := x509.CreateCertificate(crand.Reader, &template, c.CA.Cert, key.Public(), c.CA.Key)
if err != nil {
return CertPair{}, fmt.Errorf("unable to create certificate: %w", err)
}
cert, err := x509.ParseCertificate(certRaw)
if err != nil {
return CertPair{}, fmt.Errorf("generated invalid certificate, could not parse: %w", err)
}
return CertPair{
Key: key,
Cert: cert,
}, nil
}
// NewServingCert returns a new CertPair for a serving HTTPS on localhost (or other specified names).
func (c *TinyCA) NewServingCert(names ...string) (CertPair, error) {
if len(names) == 0 {
names = []string{"localhost"}
}
dnsNames, ips, err := resolveNames(names)
if err != nil {
return CertPair{}, err
}
return c.makeCert(certutil.Config{
CommonName: "localhost",
Organization: []string{c.orgName},
AltNames: certutil.AltNames{
DNSNames: dnsNames,
IPs: ips,
},
Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
})
}
// ClientInfo describes some Kubernetes user for the purposes of creating
// client certificates.
type ClientInfo struct {
// Name is the user name (embedded as the cert's CommonName)
Name string
// Groups are the groups to which this user belongs (embedded as the cert's
// Organization)
Groups []string
}
// NewClientCert produces a new CertPair suitable for use with Kubernetes
// client cert auth with an API server validating based on this CA.
func (c *TinyCA) NewClientCert(user ClientInfo) (CertPair, error) {
return c.makeCert(certutil.Config{
CommonName: user.Name,
Organization: user.Groups,
Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
})
}
func resolveNames(names []string) ([]string, []net.IP, error) {
dnsNames := []string{}
ips := []net.IP{}
for _, name := range names {
if name == "" {
continue
}
ip := net.ParseIP(name)
if ip == nil {
dnsNames = append(dnsNames, name)
// Also resolve to IPs.
nameIPs, err := net.LookupHost(name)
if err != nil {
return nil, nil, err
}
for _, nameIP := range nameIPs {
ip = net.ParseIP(nameIP)
if ip != nil {
ips = append(ips, ip)
}
}
} else {
ips = append(ips, ip)
}
}
return dnsNames, ips, nil
}

View File

@@ -0,0 +1,468 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controlplane
import (
"fmt"
"io"
"net/url"
"os"
"path/filepath"
"strconv"
"time"
"sigs.k8s.io/controller-runtime/pkg/internal/testing/addr"
"sigs.k8s.io/controller-runtime/pkg/internal/testing/certs"
"sigs.k8s.io/controller-runtime/pkg/internal/testing/process"
)
const (
// saKeyFile is the name of the service account signing private key file.
saKeyFile = "sa-signer.key"
// saKeyFile is the name of the service account signing public key (cert) file.
saCertFile = "sa-signer.crt"
)
// SecureServing provides/configures how the API server serves on the secure port.
type SecureServing struct {
// ListenAddr contains the host & port to serve on.
//
// Configurable. If unset, it will be defaulted.
process.ListenAddr
// CA contains the CA that signed the API server's serving certificates.
//
// Read-only.
CA []byte
// Authn can be used to provision users, and override what type of
// authentication is used to provision users.
//
// Configurable. If unset, it will be defaulted.
Authn
}
// APIServer knows how to run a kubernetes apiserver.
type APIServer struct {
// URL is the address the ApiServer should listen on for client
// connections.
//
// If set, this will configure the *insecure* serving details.
// If unset, it will contain the insecure port if insecure serving is enabled,
// and otherwise will contain the secure port.
//
// If this is not specified, we default to a random free port on localhost.
//
// Deprecated: use InsecureServing (for the insecure URL) or SecureServing, ideally.
URL *url.URL
// SecurePort is the additional secure port that the APIServer should listen on.
//
// If set, this will override SecureServing.Port.
//
// Deprecated: use SecureServing.
SecurePort int
// SecureServing indicates how the API server will serve on the secure port.
//
// Some parts are configurable. Will be defaulted if unset.
SecureServing
// InsecureServing indicates how the API server will serve on the insecure port.
//
// If unset, the insecure port will be disabled. Set to an empty struct to get
// default values.
//
// Deprecated: does not work with Kubernetes versions 1.20 and above. Use secure
// serving instead.
InsecureServing *process.ListenAddr
// Path is the path to the apiserver binary.
//
// If this is left as the empty string, we will attempt to locate a binary,
// by checking for the TEST_ASSET_KUBE_APISERVER environment variable, and
// the default test assets directory. See the "Binaries" section above (in
// doc.go) for details.
Path string
// Args is a list of arguments which will passed to the APIServer binary.
// Before they are passed on, they will be evaluated as go-template strings.
// This means you can use fields which are defined and exported on this
// APIServer struct (e.g. "--cert-dir={{ .Dir }}").
// Those templates will be evaluated after the defaulting of the APIServer's
// fields has already happened and just before the binary actually gets
// started. Thus you have access to calculated fields like `URL` and others.
//
// If not specified, the minimal set of arguments to run the APIServer will
// be used.
//
// They will be loaded into the same argument set as Configure. Each flag
// will be Append-ed to the configured arguments just before launch.
//
// Deprecated: use Configure instead.
Args []string
// CertDir is a path to a directory containing whatever certificates the
// APIServer will need.
//
// If left unspecified, then the Start() method will create a fresh temporary
// directory, and the Stop() method will clean it up.
CertDir string
// EtcdURL is the URL of the Etcd the APIServer should use.
//
// If this is not specified, the Start() method will return an error.
EtcdURL *url.URL
// StartTimeout, StopTimeout specify the time the APIServer is allowed to
// take when starting and stoppping before an error is emitted.
//
// If not specified, these default to 20 seconds.
StartTimeout time.Duration
StopTimeout time.Duration
// Out, Err specify where APIServer should write its StdOut, StdErr to.
//
// If not specified, the output will be discarded.
Out io.Writer
Err io.Writer
processState *process.State
// args contains the structured arguments to use for running the API server
// Lazily initialized by .Configure(), Defaulted eventually with .defaultArgs()
args *process.Arguments
}
// Configure returns Arguments that may be used to customize the
// flags used to launch the API server. A set of defaults will
// be applied underneath.
func (s *APIServer) Configure() *process.Arguments {
if s.args == nil {
s.args = process.EmptyArguments()
}
return s.args
}
// Start starts the apiserver, waits for it to come up, and returns an error,
// if occurred.
func (s *APIServer) Start() error {
if err := s.prepare(); err != nil {
return err
}
return s.processState.Start(s.Out, s.Err)
}
func (s *APIServer) prepare() error {
if err := s.setProcessState(); err != nil {
return err
}
return s.Authn.Start()
}
// configurePorts configures the serving ports for this API server.
//
// Most of this method currently deals with making the deprecated fields
// take precedence over the new fields.
func (s *APIServer) configurePorts() error {
// prefer the old fields to the new fields if a user set one,
// otherwise, default the new fields and populate the old ones.
// Insecure: URL, InsecureServing
if s.URL != nil {
s.InsecureServing = &process.ListenAddr{
Address: s.URL.Hostname(),
Port: s.URL.Port(),
}
} else if insec := s.InsecureServing; insec != nil {
if insec.Port == "" || insec.Address == "" {
port, host, err := addr.Suggest("")
if err != nil {
return fmt.Errorf("unable to provision unused insecure port: %w", err)
}
s.InsecureServing.Port = strconv.Itoa(port)
s.InsecureServing.Address = host
}
s.URL = s.InsecureServing.URL("http", "")
}
// Secure: SecurePort, SecureServing
if s.SecurePort != 0 {
s.SecureServing.Port = strconv.Itoa(s.SecurePort)
// if we don't have an address, try the insecure address, and otherwise
// default to loopback.
if s.SecureServing.Address == "" {
if s.InsecureServing != nil {
s.SecureServing.Address = s.InsecureServing.Address
} else {
s.SecureServing.Address = "127.0.0.1"
}
}
} else if s.SecureServing.Port == "" || s.SecureServing.Address == "" {
port, host, err := addr.Suggest("")
if err != nil {
return fmt.Errorf("unable to provision unused secure port: %w", err)
}
s.SecureServing.Port = strconv.Itoa(port)
s.SecureServing.Address = host
s.SecurePort = port
}
return nil
}
func (s *APIServer) setProcessState() error {
if s.EtcdURL == nil {
return fmt.Errorf("expected EtcdURL to be configured")
}
var err error
// unconditionally re-set this so we can successfully restart
// TODO(directxman12): we supported this in the past, but do we actually
// want to support re-using an API server object to restart? The loss
// of provisioned users is surprising to say the least.
s.processState = &process.State{
Dir: s.CertDir,
Path: s.Path,
StartTimeout: s.StartTimeout,
StopTimeout: s.StopTimeout,
}
if err := s.processState.Init("kube-apiserver"); err != nil {
return err
}
if err := s.configurePorts(); err != nil {
return err
}
// the secure port will always be on, so use that
s.processState.HealthCheck.URL = *s.SecureServing.URL("https", "/healthz")
s.CertDir = s.processState.Dir
s.Path = s.processState.Path
s.StartTimeout = s.processState.StartTimeout
s.StopTimeout = s.processState.StopTimeout
if err := s.populateAPIServerCerts(); err != nil {
return err
}
if s.SecureServing.Authn == nil {
authn, err := NewCertAuthn()
if err != nil {
return err
}
s.SecureServing.Authn = authn
}
if err := s.Authn.Configure(s.CertDir, s.Configure()); err != nil {
return err
}
// NB(directxman12): insecure port is a mess:
// - 1.19 and below have the `--insecure-port` flag, and require it to be set to zero to
// disable it, otherwise the default will be used and we'll conflict.
// - 1.20 requires the flag to be unset or set to zero, and yells at you if you configure it
// - 1.24 won't have the flag at all...
//
// In an effort to automatically do the right thing during this mess, we do feature discovery
// on the flags, and hope that we've "parsed" them properly.
//
// TODO(directxman12): once we support 1.20 as the min version (might be when 1.24 comes out,
// might be around 1.25 or 1.26), remove this logic and the corresponding line in API server's
// default args.
if err := s.discoverFlags(); err != nil {
return err
}
s.processState.Args, s.Args, err = process.TemplateAndArguments(s.Args, s.Configure(), process.TemplateDefaults{ //nolint:staticcheck
Data: s,
Defaults: s.defaultArgs(),
MinimalDefaults: map[string][]string{
// as per kubernetes-sigs/controller-runtime#641, we need this (we
// probably need other stuff too, but this is the only thing that was
// previously considered a "minimal default")
"service-cluster-ip-range": {"10.0.0.0/24"},
// we need *some* authorization mode for health checks on the secure port,
// so default to RBAC unless the user set something else (in which case
// this'll be ignored due to SliceToArguments using AppendNoDefaults).
"authorization-mode": {"RBAC"},
},
})
if err != nil {
return err
}
return nil
}
// discoverFlags checks for certain flags that *must* be set in certain
// versions, and *must not* be set in others.
func (s *APIServer) discoverFlags() error {
// Present: <1.24, Absent: >= 1.24
present, err := s.processState.CheckFlag("insecure-port")
if err != nil {
return err
}
if !present {
s.Configure().Disable("insecure-port")
}
return nil
}
func (s *APIServer) defaultArgs() map[string][]string {
args := map[string][]string{
"service-cluster-ip-range": {"10.0.0.0/24"},
"allow-privileged": {"true"},
// we're keeping this disabled because if enabled, default SA is
// missing which would force all tests to create one in normal
// apiserver operation this SA is created by controller, but that is
// not run in integration environment
"disable-admission-plugins": {"ServiceAccount"},
"cert-dir": {s.CertDir},
"authorization-mode": {"RBAC"},
"secure-port": {s.SecureServing.Port},
// NB(directxman12): previously we didn't set the bind address for the secure
// port. It *shouldn't* make a difference unless people are doing something really
// funky, but if you start to get bug reports look here ;-)
"bind-address": {s.SecureServing.Address},
// required on 1.20+, fine to leave on for <1.20
"service-account-issuer": {s.SecureServing.URL("https", "/").String()},
"service-account-key-file": {filepath.Join(s.CertDir, saCertFile)},
"service-account-signing-key-file": {filepath.Join(s.CertDir, saKeyFile)},
}
if s.EtcdURL != nil {
args["etcd-servers"] = []string{s.EtcdURL.String()}
}
if s.URL != nil {
args["insecure-port"] = []string{s.URL.Port()}
args["insecure-bind-address"] = []string{s.URL.Hostname()}
} else {
// TODO(directxman12): remove this once 1.21 is the lowest version we support
// (this might be a while, but this line'll break as of 1.24, so see the comment
// in Start
args["insecure-port"] = []string{"0"}
}
return args
}
func (s *APIServer) populateAPIServerCerts() error {
_, statErr := os.Stat(filepath.Join(s.CertDir, "apiserver.crt"))
if !os.IsNotExist(statErr) {
return statErr
}
ca, err := certs.NewTinyCA()
if err != nil {
return err
}
servingCerts, err := ca.NewServingCert()
if err != nil {
return err
}
certData, keyData, err := servingCerts.AsBytes()
if err != nil {
return err
}
if err := os.WriteFile(filepath.Join(s.CertDir, "apiserver.crt"), certData, 0640); err != nil { //nolint:gosec
return err
}
if err := os.WriteFile(filepath.Join(s.CertDir, "apiserver.key"), keyData, 0640); err != nil { //nolint:gosec
return err
}
s.SecureServing.CA = ca.CA.CertBytes()
// service account signing files too
saCA, err := certs.NewTinyCA()
if err != nil {
return err
}
saCert, saKey, err := saCA.CA.AsBytes()
if err != nil {
return err
}
if err := os.WriteFile(filepath.Join(s.CertDir, saCertFile), saCert, 0640); err != nil { //nolint:gosec
return err
}
return os.WriteFile(filepath.Join(s.CertDir, saKeyFile), saKey, 0640) //nolint:gosec
}
// Stop stops this process gracefully, waits for its termination, and cleans up
// the CertDir if necessary.
func (s *APIServer) Stop() error {
if s.processState != nil {
if s.processState.DirNeedsCleaning {
s.CertDir = "" // reset the directory if it was randomly allocated, so that we can safely restart
}
if err := s.processState.Stop(); err != nil {
return err
}
}
return s.Authn.Stop()
}
// APIServerDefaultArgs exposes the default args for the APIServer so that you
// can use those to append your own additional arguments.
//
// Note that these arguments don't handle newer API servers well to due the more
// complex feature detection neeeded. It's recommended that you switch to .Configure
// as you upgrade API server versions.
//
// Deprecated: use APIServer.Configure().
var APIServerDefaultArgs = []string{
"--advertise-address=127.0.0.1",
"--etcd-servers={{ if .EtcdURL }}{{ .EtcdURL.String }}{{ end }}",
"--cert-dir={{ .CertDir }}",
"--insecure-port={{ if .URL }}{{ .URL.Port }}{{else}}0{{ end }}",
"{{ if .URL }}--insecure-bind-address={{ .URL.Hostname }}{{ end }}",
"--secure-port={{ if .SecurePort }}{{ .SecurePort }}{{ end }}",
// we're keeping this disabled because if enabled, default SA is missing which would force all tests to create one
// in normal apiserver operation this SA is created by controller, but that is not run in integration environment
"--disable-admission-plugins=ServiceAccount",
"--service-cluster-ip-range=10.0.0.0/24",
"--allow-privileged=true",
// NB(directxman12): we also enable RBAC if nothing else was enabled
}
// PrepareAPIServer is an internal-only (NEVER SHOULD BE EXPOSED)
// function that sets up the API server just before starting it,
// without actually starting it. This saves time on tests.
//
// NB(directxman12): do not expose this outside of internal -- it's unsafe to
// use, because things like port allocation could race even more than they
// currently do if you later call start!
func PrepareAPIServer(s *APIServer) error {
return s.prepare()
}
// APIServerArguments is an internal-only (NEVER SHOULD BE EXPOSED)
// function that sets up the API server just before starting it,
// without actually starting it. It's public to make testing easier.
//
// NB(directxman12): do not expose this outside of internal.
func APIServerArguments(s *APIServer) []string {
return s.processState.Args
}

View File

@@ -0,0 +1,142 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controlplane
import (
"fmt"
"os"
"path/filepath"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/internal/testing/certs"
"sigs.k8s.io/controller-runtime/pkg/internal/testing/process"
)
// User represents a Kubernetes user.
type User struct {
// Name is the user's Name.
Name string
// Groups are the groups to which the user belongs.
Groups []string
}
// Authn knows how to configure an API server for a particular type of authentication,
// and provision users under that authentication scheme.
//
// The methods must be called in the following order (as presented below in the interface
// for a mnemonic):
//
// 1. Configure
// 2. Start
// 3. AddUsers (0+ calls)
// 4. Stop.
type Authn interface {
// Configure provides the working directory to this authenticator,
// and configures the given API server arguments to make use of this authenticator.
//
// Should be called first.
Configure(workDir string, args *process.Arguments) error
// Start runs this authenticator. Will be called just before API server start.
//
// Must be called after Configure.
Start() error
// AddUser provisions a user, returning a copy of the given base rest.Config
// configured to authenticate as that users.
//
// May only be called while the authenticator is "running".
AddUser(user User, baseCfg *rest.Config) (*rest.Config, error)
// Stop shuts down this authenticator.
Stop() error
}
// CertAuthn is an authenticator (Authn) that makes use of client certificate authn.
type CertAuthn struct {
// ca is the CA used to sign the client certs
ca *certs.TinyCA
// certDir is the directory used to write the CA crt file
// so that the API server can read it.
certDir string
}
// NewCertAuthn creates a new client-cert-based Authn with a new CA.
func NewCertAuthn() (*CertAuthn, error) {
ca, err := certs.NewTinyCA()
if err != nil {
return nil, fmt.Errorf("unable to provision client certificate auth CA: %w", err)
}
return &CertAuthn{
ca: ca,
}, nil
}
// AddUser provisions a new user that's authenticated via certificates, with
// the given uesrname and groups embedded in the certificate as expected by the
// API server.
func (c *CertAuthn) AddUser(user User, baseCfg *rest.Config) (*rest.Config, error) {
certs, err := c.ca.NewClientCert(certs.ClientInfo{
Name: user.Name,
Groups: user.Groups,
})
if err != nil {
return nil, fmt.Errorf("unable to create client certificates for %s: %w", user.Name, err)
}
crt, key, err := certs.AsBytes()
if err != nil {
return nil, fmt.Errorf("unable to serialize client certificates for %s: %w", user.Name, err)
}
cfg := rest.CopyConfig(baseCfg)
cfg.CertData = crt
cfg.KeyData = key
return cfg, nil
}
// caCrtPath returns the path to the on-disk client-cert CA crt file.
func (c *CertAuthn) caCrtPath() string {
return filepath.Join(c.certDir, "client-cert-auth-ca.crt")
}
// Configure provides the working directory to this authenticator,
// and configures the given API server arguments to make use of this authenticator.
func (c *CertAuthn) Configure(workDir string, args *process.Arguments) error {
c.certDir = workDir
args.Set("client-ca-file", c.caCrtPath())
return nil
}
// Start runs this authenticator. Will be called just before API server start.
//
// Must be called after Configure.
func (c *CertAuthn) Start() error {
if len(c.certDir) == 0 {
return fmt.Errorf("start called before configure")
}
caCrt := c.ca.CA.CertBytes()
if err := os.WriteFile(c.caCrtPath(), caCrt, 0640); err != nil { //nolint:gosec
return fmt.Errorf("unable to save the client certificate CA to %s: %w", c.caCrtPath(), err)
}
return nil
}
// Stop shuts down this authenticator.
func (c *CertAuthn) Stop() error {
// no-op -- our workdir is cleaned up for us automatically
return nil
}

View File

@@ -0,0 +1,202 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controlplane
import (
"io"
"net"
"net/url"
"strconv"
"time"
"sigs.k8s.io/controller-runtime/pkg/internal/testing/addr"
"sigs.k8s.io/controller-runtime/pkg/internal/testing/process"
)
// Etcd knows how to run an etcd server.
type Etcd struct {
// URL is the address the Etcd should listen on for client connections.
//
// If this is not specified, we default to a random free port on localhost.
URL *url.URL
// Path is the path to the etcd binary.
//
// If this is left as the empty string, we will attempt to locate a binary,
// by checking for the TEST_ASSET_ETCD environment variable, and the default
// test assets directory. See the "Binaries" section above (in doc.go) for
// details.
Path string
// Args is a list of arguments which will passed to the Etcd binary. Before
// they are passed on, the`y will be evaluated as go-template strings. This
// means you can use fields which are defined and exported on this Etcd
// struct (e.g. "--data-dir={{ .Dir }}").
// Those templates will be evaluated after the defaulting of the Etcd's
// fields has already happened and just before the binary actually gets
// started. Thus you have access to calculated fields like `URL` and others.
//
// If not specified, the minimal set of arguments to run the Etcd will be
// used.
//
// They will be loaded into the same argument set as Configure. Each flag
// will be Append-ed to the configured arguments just before launch.
//
// Deprecated: use Configure instead.
Args []string
// DataDir is a path to a directory in which etcd can store its state.
//
// If left unspecified, then the Start() method will create a fresh temporary
// directory, and the Stop() method will clean it up.
DataDir string
// StartTimeout, StopTimeout specify the time the Etcd is allowed to
// take when starting and stopping before an error is emitted.
//
// If not specified, these default to 20 seconds.
StartTimeout time.Duration
StopTimeout time.Duration
// Out, Err specify where Etcd should write its StdOut, StdErr to.
//
// If not specified, the output will be discarded.
Out io.Writer
Err io.Writer
// processState contains the actual details about this running process
processState *process.State
// args contains the structured arguments to use for running etcd.
// Lazily initialized by .Configure(), Defaulted eventually with .defaultArgs()
args *process.Arguments
// listenPeerURL is the address the Etcd should listen on for peer connections.
// It's automatically generated and a random port is picked during execution.
listenPeerURL *url.URL
}
// Start starts the etcd, waits for it to come up, and returns an error, if one
// occurred.
func (e *Etcd) Start() error {
if err := e.setProcessState(); err != nil {
return err
}
return e.processState.Start(e.Out, e.Err)
}
func (e *Etcd) setProcessState() error {
e.processState = &process.State{
Dir: e.DataDir,
Path: e.Path,
StartTimeout: e.StartTimeout,
StopTimeout: e.StopTimeout,
}
// unconditionally re-set this so we can successfully restart
// TODO(directxman12): we supported this in the past, but do we actually
// want to support re-using an API server object to restart? The loss
// of provisioned users is surprising to say the least.
if err := e.processState.Init("etcd"); err != nil {
return err
}
// Set the listen url.
if e.URL == nil {
port, host, err := addr.Suggest("")
if err != nil {
return err
}
e.URL = &url.URL{
Scheme: "http",
Host: net.JoinHostPort(host, strconv.Itoa(port)),
}
}
// Set the listen peer URL.
{
port, host, err := addr.Suggest("")
if err != nil {
return err
}
e.listenPeerURL = &url.URL{
Scheme: "http",
Host: net.JoinHostPort(host, strconv.Itoa(port)),
}
}
// can use /health as of etcd 3.3.0
e.processState.HealthCheck.URL = *e.URL
e.processState.HealthCheck.Path = "/health"
e.DataDir = e.processState.Dir
e.Path = e.processState.Path
e.StartTimeout = e.processState.StartTimeout
e.StopTimeout = e.processState.StopTimeout
var err error
e.processState.Args, e.Args, err = process.TemplateAndArguments(e.Args, e.Configure(), process.TemplateDefaults{ //nolint:staticcheck
Data: e,
Defaults: e.defaultArgs(),
})
return err
}
// Stop stops this process gracefully, waits for its termination, and cleans up
// the DataDir if necessary.
func (e *Etcd) Stop() error {
if e.processState.DirNeedsCleaning {
e.DataDir = "" // reset the directory if it was randomly allocated, so that we can safely restart
}
return e.processState.Stop()
}
func (e *Etcd) defaultArgs() map[string][]string {
args := map[string][]string{
"listen-peer-urls": {e.listenPeerURL.String()},
"data-dir": {e.DataDir},
}
if e.URL != nil {
args["advertise-client-urls"] = []string{e.URL.String()}
args["listen-client-urls"] = []string{e.URL.String()}
}
// Add unsafe no fsync, available from etcd 3.5
if ok, _ := e.processState.CheckFlag("unsafe-no-fsync"); ok {
args["unsafe-no-fsync"] = []string{"true"}
}
return args
}
// Configure returns Arguments that may be used to customize the
// flags used to launch etcd. A set of defaults will
// be applied underneath.
func (e *Etcd) Configure() *process.Arguments {
if e.args == nil {
e.args = process.EmptyArguments()
}
return e.args
}
// EtcdDefaultArgs exposes the default args for Etcd so that you
// can use those to append your own additional arguments.
var EtcdDefaultArgs = []string{
"--listen-peer-urls=http://localhost:0",
"--advertise-client-urls={{ if .URL }}{{ .URL.String }}{{ end }}",
"--listen-client-urls={{ if .URL }}{{ .URL.String }}{{ end }}",
"--data-dir={{ .DataDir }}",
}

View File

@@ -0,0 +1,119 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controlplane
import (
"bytes"
"fmt"
"io"
"net/url"
"os/exec"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
kcapi "k8s.io/client-go/tools/clientcmd/api"
"sigs.k8s.io/controller-runtime/pkg/internal/testing/process"
)
const (
envtestName = "envtest"
)
// KubeConfigFromREST reverse-engineers a kubeconfig file from a rest.Config.
// The options are tailored towards the rest.Configs we generate, so they're
// not broadly applicable.
//
// This is not intended to be exposed beyond internal for the above reasons.
func KubeConfigFromREST(cfg *rest.Config) ([]byte, error) {
kubeConfig := kcapi.NewConfig()
protocol := "https"
if !rest.IsConfigTransportTLS(*cfg) {
protocol = "http"
}
// cfg.Host is a URL, so we need to parse it so we can properly append the API path
baseURL, err := url.Parse(cfg.Host)
if err != nil {
return nil, fmt.Errorf("unable to interpret config's host value as a URL: %w", err)
}
kubeConfig.Clusters[envtestName] = &kcapi.Cluster{
// TODO(directxman12): if client-go ever decides to expose defaultServerUrlFor(config),
// we can just use that. Note that this is not the same as the public DefaultServerURL,
// which requires us to pass a bunch of stuff in manually.
Server: (&url.URL{Scheme: protocol, Host: baseURL.Host, Path: cfg.APIPath}).String(),
CertificateAuthorityData: cfg.CAData,
}
kubeConfig.AuthInfos[envtestName] = &kcapi.AuthInfo{
// try to cover all auth strategies that aren't plugins
ClientCertificateData: cfg.CertData,
ClientKeyData: cfg.KeyData,
Token: cfg.BearerToken,
Username: cfg.Username,
Password: cfg.Password,
}
kcCtx := kcapi.NewContext()
kcCtx.Cluster = envtestName
kcCtx.AuthInfo = envtestName
kubeConfig.Contexts[envtestName] = kcCtx
kubeConfig.CurrentContext = envtestName
contents, err := clientcmd.Write(*kubeConfig)
if err != nil {
return nil, fmt.Errorf("unable to serialize kubeconfig file: %w", err)
}
return contents, nil
}
// KubeCtl is a wrapper around the kubectl binary.
type KubeCtl struct {
// Path where the kubectl binary can be found.
//
// If this is left empty, we will attempt to locate a binary, by checking for
// the TEST_ASSET_KUBECTL environment variable, and the default test assets
// directory. See the "Binaries" section above (in doc.go) for details.
Path string
// Opts can be used to configure additional flags which will be used each
// time the wrapped binary is called.
//
// For example, you might want to use this to set the URL of the APIServer to
// connect to.
Opts []string
}
// Run executes the wrapped binary with some preconfigured options and the
// arguments given to this method. It returns Readers for the stdout and
// stderr.
func (k *KubeCtl) Run(args ...string) (stdout, stderr io.Reader, err error) {
if k.Path == "" {
k.Path = process.BinPathFinder("kubectl", "")
}
stdoutBuffer := &bytes.Buffer{}
stderrBuffer := &bytes.Buffer{}
allArgs := append(k.Opts, args...)
cmd := exec.Command(k.Path, allArgs...)
cmd.Stdout = stdoutBuffer
cmd.Stderr = stderrBuffer
err = cmd.Run()
return stdoutBuffer, stderrBuffer, err
}

View File

@@ -0,0 +1,259 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controlplane
import (
"fmt"
"net/url"
"os"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/internal/testing/certs"
)
// NewTinyCA creates a new a tiny CA utility for provisioning serving certs and client certs FOR TESTING ONLY.
// Don't use this for anything else!
var NewTinyCA = certs.NewTinyCA
// ControlPlane is a struct that knows how to start your test control plane.
//
// Right now, that means Etcd and your APIServer. This is likely to increase in
// future.
type ControlPlane struct {
APIServer *APIServer
Etcd *Etcd
// Kubectl will override the default asset search path for kubectl
KubectlPath string
// for the deprecated methods (Kubectl, etc)
defaultUserCfg *rest.Config
defaultUserKubectl *KubeCtl
}
// Start will start your control plane processes. To stop them, call Stop().
func (f *ControlPlane) Start() (retErr error) {
if f.Etcd == nil {
f.Etcd = &Etcd{}
}
if err := f.Etcd.Start(); err != nil {
return err
}
defer func() {
if retErr != nil {
_ = f.Etcd.Stop()
}
}()
if f.APIServer == nil {
f.APIServer = &APIServer{}
}
f.APIServer.EtcdURL = f.Etcd.URL
if err := f.APIServer.Start(); err != nil {
return err
}
defer func() {
if retErr != nil {
_ = f.APIServer.Stop()
}
}()
// provision the default user -- can be removed when the related
// methods are removed. The default user has admin permissions to
// mimic legacy no-authz setups.
user, err := f.AddUser(User{Name: "default", Groups: []string{"system:masters"}}, &rest.Config{})
if err != nil {
return fmt.Errorf("unable to provision the default (legacy) user: %w", err)
}
kubectl, err := user.Kubectl()
if err != nil {
return fmt.Errorf("unable to provision the default (legacy) kubeconfig: %w", err)
}
f.defaultUserCfg = user.Config()
f.defaultUserKubectl = kubectl
return nil
}
// Stop will stop your control plane processes, and clean up their data.
func (f *ControlPlane) Stop() error {
var errList []error
if f.APIServer != nil {
if err := f.APIServer.Stop(); err != nil {
errList = append(errList, err)
}
}
if f.Etcd != nil {
if err := f.Etcd.Stop(); err != nil {
errList = append(errList, err)
}
}
return kerrors.NewAggregate(errList)
}
// APIURL returns the URL you should connect to to talk to your API server.
//
// If insecure serving is configured, this will contain the insecure port.
// Otherwise, it will contain the secure port.
//
// Deprecated: use AddUser instead, or APIServer.{Ins|S}ecureServing.URL if
// you really want just the URL.
func (f *ControlPlane) APIURL() *url.URL {
return f.APIServer.URL
}
// KubeCtl returns a pre-configured KubeCtl, ready to connect to this
// ControlPlane.
//
// Deprecated: use AddUser & AuthenticatedUser.Kubectl instead.
func (f *ControlPlane) KubeCtl() *KubeCtl {
return f.defaultUserKubectl
}
// RESTClientConfig returns a pre-configured restconfig, ready to connect to
// this ControlPlane.
//
// Deprecated: use AddUser & AuthenticatedUser.Config instead.
func (f *ControlPlane) RESTClientConfig() (*rest.Config, error) {
return f.defaultUserCfg, nil
}
// AuthenticatedUser contains access information for an provisioned user,
// including REST config, kubeconfig contents, and access to a KubeCtl instance.
//
// It's not "safe" to use the methods on this till after the API server has been
// started (due to certificate initialization and such). The various methods will
// panic if this is done.
type AuthenticatedUser struct {
// cfg is the rest.Config for connecting to the API server. It's lazily initialized.
cfg *rest.Config
// cfgIsComplete indicates the cfg has had late-initialized fields (e.g.
// API server CA data) initialized.
cfgIsComplete bool
// apiServer is a handle to the APIServer that's used when finalizing cfg
// and producing the kubectl instance.
plane *ControlPlane
// kubectl is our existing, provisioned kubectl. We don't provision one
// till someone actually asks for it.
kubectl *KubeCtl
}
// Config returns the REST config that can be used to connect to the API server
// as this user.
//
// Will panic if used before the API server is started.
func (u *AuthenticatedUser) Config() *rest.Config {
// NB(directxman12): we choose to panic here for ergonomics sake, and because there's
// not really much you can do to "handle" this error. This machinery is intended to be
// used in tests anyway, so panicing is not a particularly big deal.
if u.cfgIsComplete {
return u.cfg
}
if len(u.plane.APIServer.SecureServing.CA) == 0 {
panic("the API server has not yet been started, please do that before accessing connection details")
}
u.cfg.CAData = u.plane.APIServer.SecureServing.CA
u.cfg.Host = u.plane.APIServer.SecureServing.URL("https", "/").String()
u.cfgIsComplete = true
return u.cfg
}
// KubeConfig returns a KubeConfig that's roughly equivalent to this user's REST config.
//
// Will panic if used before the API server is started.
func (u AuthenticatedUser) KubeConfig() ([]byte, error) {
// NB(directxman12): we don't return the actual API object to avoid yet another
// piece of kubernetes API in our public API, and also because generally the thing
// you want to do with this is just write it out to a file for external debugging
// purposes, etc.
return KubeConfigFromREST(u.Config())
}
// Kubectl returns a KubeCtl instance for talking to the API server as this user. It uses
// a kubeconfig equivalent to that returned by .KubeConfig.
//
// Will panic if used before the API server is started.
func (u *AuthenticatedUser) Kubectl() (*KubeCtl, error) {
if u.kubectl != nil {
return u.kubectl, nil
}
if len(u.plane.APIServer.CertDir) == 0 {
panic("the API server has not yet been started, please do that before accessing connection details")
}
// cleaning this up is handled when our tmpDir is deleted
out, err := os.CreateTemp(u.plane.APIServer.CertDir, "*.kubecfg")
if err != nil {
return nil, fmt.Errorf("unable to create file for kubeconfig: %w", err)
}
defer out.Close()
contents, err := KubeConfigFromREST(u.Config())
if err != nil {
return nil, err
}
if _, err := out.Write(contents); err != nil {
return nil, fmt.Errorf("unable to write kubeconfig to disk at %s: %w", out.Name(), err)
}
k := &KubeCtl{
Path: u.plane.KubectlPath,
}
k.Opts = append(k.Opts, fmt.Sprintf("--kubeconfig=%s", out.Name()))
u.kubectl = k
return k, nil
}
// AddUser provisions a new user in the cluster. It uses the APIServer's authentication
// strategy -- see APIServer.SecureServing.Authn.
//
// Unlike AddUser, it's safe to pass a nil rest.Config here if you have no
// particular opinions about the config.
//
// The default authentication strategy is not guaranteed to any specific strategy, but it is
// guaranteed to be callable both before and after Start has been called (but, as noted in the
// AuthenticatedUser docs, the given user objects are only valid after Start has been called).
func (f *ControlPlane) AddUser(user User, baseConfig *rest.Config) (*AuthenticatedUser, error) {
if f.GetAPIServer().SecureServing.Authn == nil {
return nil, fmt.Errorf("no API server authentication is configured yet. The API server defaults one when Start is called, did you mean to use that?")
}
if baseConfig == nil {
baseConfig = &rest.Config{}
}
cfg, err := f.GetAPIServer().SecureServing.AddUser(user, baseConfig)
if err != nil {
return nil, err
}
return &AuthenticatedUser{
cfg: cfg,
plane: f,
}, nil
}
// GetAPIServer returns this ControlPlane's APIServer, initializing it if necessary.
func (f *ControlPlane) GetAPIServer() *APIServer {
if f.APIServer == nil {
f.APIServer = &APIServer{}
}
return f.APIServer
}

View File

@@ -0,0 +1,340 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package process
import (
"bytes"
"html/template"
"sort"
"strings"
)
// RenderTemplates returns an []string to render the templates
//
// Deprecated: will be removed in favor of Arguments.
func RenderTemplates(argTemplates []string, data interface{}) (args []string, err error) {
var t *template.Template
for _, arg := range argTemplates {
t, err = template.New(arg).Parse(arg)
if err != nil {
args = nil
return
}
buf := &bytes.Buffer{}
err = t.Execute(buf, data)
if err != nil {
args = nil
return
}
args = append(args, buf.String())
}
return
}
// SliceToArguments converts a slice of arguments to structured arguments,
// appending each argument that starts with `--` and contains an `=` to the
// argument set (ignoring defaults), returning the rest.
//
// Deprecated: will be removed when RenderTemplates is removed.
func SliceToArguments(sliceArgs []string, args *Arguments) []string {
var rest []string
for i, arg := range sliceArgs {
if arg == "--" {
rest = append(rest, sliceArgs[i:]...)
return rest
}
// skip non-flag arguments, skip arguments w/o equals because we
// can't tell if the next argument should take a value
if !strings.HasPrefix(arg, "--") || !strings.Contains(arg, "=") {
rest = append(rest, arg)
continue
}
parts := strings.SplitN(arg[2:], "=", 2)
name := parts[0]
val := parts[1]
args.AppendNoDefaults(name, val)
}
return rest
}
// TemplateDefaults specifies defaults to be used for joining structured arguments with templates.
//
// Deprecated: will be removed when RenderTemplates is removed.
type TemplateDefaults struct {
// Data will be used to render the template.
Data interface{}
// Defaults will be used to default structured arguments if no template is passed.
Defaults map[string][]string
// MinimalDefaults will be used to default structured arguments if a template is passed.
// Use this for flags which *must* be present.
MinimalDefaults map[string][]string // for api server service-cluster-ip-range
}
// TemplateAndArguments joins structured arguments and non-structured arguments, preserving existing
// behavior. Namely:
//
// 1. if templ has len > 0, it will be rendered against data
// 2. the rendered template values that look like `--foo=bar` will be split
// and appended to args, the rest will be kept around
// 3. the given args will be rendered as string form. If a template is given,
// no defaults will be used, otherwise defaults will be used
// 4. a result of [args..., rest...] will be returned
//
// It returns the resulting rendered arguments, plus the arguments that were
// not transferred to `args` during rendering.
//
// Deprecated: will be removed when RenderTemplates is removed.
func TemplateAndArguments(templ []string, args *Arguments, data TemplateDefaults) (allArgs []string, nonFlagishArgs []string, err error) {
if len(templ) == 0 { // 3 & 4 (no template case)
return args.AsStrings(data.Defaults), nil, nil
}
// 1: render the template
rendered, err := RenderTemplates(templ, data.Data)
if err != nil {
return nil, nil, err
}
// 2: filter out structured args and add them to args
rest := SliceToArguments(rendered, args)
// 3 (template case): render structured args, no defaults (matching the
// legacy case where if Args was specified, no defaults were used)
res := args.AsStrings(data.MinimalDefaults)
// 4: return the rendered structured args + all non-structured args
return append(res, rest...), rest, nil
}
// EmptyArguments constructs an empty set of flags with no defaults.
func EmptyArguments() *Arguments {
return &Arguments{
values: make(map[string]Arg),
}
}
// Arguments are structured, overridable arguments.
// Each Arguments object contains some set of default arguments, which may
// be appended to, or overridden.
//
// When ready, you can serialize them to pass to exec.Command and friends using
// AsStrings.
//
// All flag-setting methods return the *same* instance of Arguments so that you
// can chain calls.
type Arguments struct {
// values contains the user-set values for the arguments.
// `values[key] = dontPass` means "don't pass this flag"
// `values[key] = passAsName` means "pass this flag without args like --key`
// `values[key] = []string{a, b, c}` means "--key=a --key=b --key=c`
// any values not explicitly set here will be copied from defaults on final rendering.
values map[string]Arg
}
// Arg is an argument that has one or more values,
// and optionally falls back to default values.
type Arg interface {
// Append adds new values to this argument, returning
// a new instance contain the new value. The intermediate
// argument should generally be assumed to be consumed.
Append(vals ...string) Arg
// Get returns the full set of values, optionally including
// the passed in defaults. If it returns nil, this will be
// skipped. If it returns a non-nil empty slice, it'll be
// assumed that the argument should be passed as name-only.
Get(defaults []string) []string
}
type userArg []string
func (a userArg) Append(vals ...string) Arg {
return userArg(append(a, vals...)) //nolint:unconvert
}
func (a userArg) Get(_ []string) []string {
return []string(a)
}
type defaultedArg []string
func (a defaultedArg) Append(vals ...string) Arg {
return defaultedArg(append(a, vals...)) //nolint:unconvert
}
func (a defaultedArg) Get(defaults []string) []string {
res := append([]string(nil), defaults...)
return append(res, a...)
}
type dontPassArg struct{}
func (a dontPassArg) Append(vals ...string) Arg {
return userArg(vals)
}
func (dontPassArg) Get(_ []string) []string {
return nil
}
type passAsNameArg struct{}
func (a passAsNameArg) Append(_ ...string) Arg {
return passAsNameArg{}
}
func (passAsNameArg) Get(_ []string) []string {
return []string{}
}
var (
// DontPass indicates that the given argument will not actually be
// rendered.
DontPass Arg = dontPassArg{}
// PassAsName indicates that the given flag will be passed as `--key`
// without any value.
PassAsName Arg = passAsNameArg{}
)
// AsStrings serializes this set of arguments to a slice of strings appropriate
// for passing to exec.Command and friends, making use of the given defaults
// as indicated for each particular argument.
//
// - Any flag in defaults that's not in Arguments will be present in the output
// - Any flag that's present in Arguments will be passed the corresponding
// defaults to do with as it will (ignore, append-to, suppress, etc).
func (a *Arguments) AsStrings(defaults map[string][]string) []string {
// sort for deterministic ordering
keysInOrder := make([]string, 0, len(defaults)+len(a.values))
for key := range defaults {
if _, userSet := a.values[key]; userSet {
continue
}
keysInOrder = append(keysInOrder, key)
}
for key := range a.values {
keysInOrder = append(keysInOrder, key)
}
sort.Strings(keysInOrder)
var res []string
for _, key := range keysInOrder {
vals := a.Get(key).Get(defaults[key])
switch {
case vals == nil: // don't pass
continue
case len(vals) == 0: // pass as name
res = append(res, "--"+key)
default:
for _, val := range vals {
res = append(res, "--"+key+"="+val)
}
}
}
return res
}
// Get returns the value of the given flag. If nil,
// it will not be passed in AsString, otherwise:
//
// len == 0 --> `--key`, len > 0 --> `--key=val1 --key=val2 ...`.
func (a *Arguments) Get(key string) Arg {
if vals, ok := a.values[key]; ok {
return vals
}
return defaultedArg(nil)
}
// Enable configures the given key to be passed as a "name-only" flag,
// like, `--key`.
func (a *Arguments) Enable(key string) *Arguments {
a.values[key] = PassAsName
return a
}
// Disable prevents this flag from be passed.
func (a *Arguments) Disable(key string) *Arguments {
a.values[key] = DontPass
return a
}
// Append adds additional values to this flag. If this flag has
// yet to be set, initial values will include defaults. If you want
// to intentionally ignore defaults/start from scratch, call AppendNoDefaults.
//
// Multiple values will look like `--key=value1 --key=value2 ...`.
func (a *Arguments) Append(key string, values ...string) *Arguments {
vals, present := a.values[key]
if !present {
vals = defaultedArg{}
}
a.values[key] = vals.Append(values...)
return a
}
// AppendNoDefaults adds additional values to this flag. However,
// unlike Append, it will *not* copy values from defaults.
func (a *Arguments) AppendNoDefaults(key string, values ...string) *Arguments {
vals, present := a.values[key]
if !present {
vals = userArg{}
}
a.values[key] = vals.Append(values...)
return a
}
// Set resets the given flag to the specified values, ignoring any existing
// values or defaults.
func (a *Arguments) Set(key string, values ...string) *Arguments {
a.values[key] = userArg(values)
return a
}
// SetRaw sets the given flag to the given Arg value directly. Use this if
// you need to do some complicated deferred logic or something.
//
// Otherwise behaves like Set.
func (a *Arguments) SetRaw(key string, val Arg) *Arguments {
a.values[key] = val
return a
}
// FuncArg is a basic implementation of Arg that can be used for custom argument logic,
// like pulling values out of APIServer, or dynamically calculating values just before
// launch.
//
// The given function will be mapped directly to Arg#Get, and will generally be
// used in conjunction with SetRaw. For example, to set `--some-flag` to the
// API server's CertDir, you could do:
//
// server.Configure().SetRaw("--some-flag", FuncArg(func(defaults []string) []string {
// return []string{server.CertDir}
// }))
//
// FuncArg ignores Appends; if you need to support appending values too, consider implementing
// Arg directly.
type FuncArg func([]string) []string
// Append is a no-op for FuncArg, and just returns itself.
func (a FuncArg) Append(vals ...string) Arg { return a }
// Get delegates functionality to the FuncArg function itself.
func (a FuncArg) Get(defaults []string) []string {
return a(defaults)
}

View File

@@ -0,0 +1,70 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package process
import (
"os"
"path/filepath"
"regexp"
"strings"
)
const (
// EnvAssetsPath is the environment variable that stores the global test
// binary location override.
EnvAssetsPath = "KUBEBUILDER_ASSETS"
// EnvAssetOverridePrefix is the environment variable prefix for per-binary
// location overrides.
EnvAssetOverridePrefix = "TEST_ASSET_"
// AssetsDefaultPath is the default location to look for test binaries in,
// if no override was provided.
AssetsDefaultPath = "/usr/local/kubebuilder/bin"
)
// BinPathFinder finds the path to the given named binary, using the following locations
// in order of precedence (highest first). Notice that the various env vars only need
// to be set -- the asset is not checked for existence on the filesystem.
//
// 1. TEST_ASSET_{tr/a-z-/A-Z_/} (if set; asset overrides -- EnvAssetOverridePrefix)
// 1. KUBEBUILDER_ASSETS (if set; global asset path -- EnvAssetsPath)
// 3. assetDirectory (if set; per-config asset directory)
// 4. /usr/local/kubebuilder/bin (AssetsDefaultPath).
func BinPathFinder(symbolicName, assetDirectory string) (binPath string) {
punctuationPattern := regexp.MustCompile("[^A-Z0-9]+")
sanitizedName := punctuationPattern.ReplaceAllString(strings.ToUpper(symbolicName), "_")
leadingNumberPattern := regexp.MustCompile("^[0-9]+")
sanitizedName = leadingNumberPattern.ReplaceAllString(sanitizedName, "")
envVar := EnvAssetOverridePrefix + sanitizedName
// TEST_ASSET_XYZ
if val, ok := os.LookupEnv(envVar); ok {
return val
}
// KUBEBUILDER_ASSETS
if val, ok := os.LookupEnv(EnvAssetsPath); ok {
return filepath.Join(val, symbolicName)
}
// assetDirectory
if assetDirectory != "" {
return filepath.Join(assetDirectory, symbolicName)
}
// default path
return filepath.Join(AssetsDefaultPath, symbolicName)
}

View File

@@ -0,0 +1,272 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package process
import (
"crypto/tls"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"os/exec"
"path"
"regexp"
"sync"
"syscall"
"time"
)
// ListenAddr represents some listening address and port.
type ListenAddr struct {
Address string
Port string
}
// URL returns a URL for this address with the given scheme and subpath.
func (l *ListenAddr) URL(scheme string, path string) *url.URL {
return &url.URL{
Scheme: scheme,
Host: l.HostPort(),
Path: path,
}
}
// HostPort returns the joined host-port pair for this address.
func (l *ListenAddr) HostPort() string {
return net.JoinHostPort(l.Address, l.Port)
}
// HealthCheck describes the information needed to health-check a process via
// some health-check URL.
type HealthCheck struct {
url.URL
// HealthCheckPollInterval is the interval which will be used for polling the
// endpoint described by Host, Port, and Path.
//
// If left empty it will default to 100 Milliseconds.
PollInterval time.Duration
}
// State define the state of the process.
type State struct {
Cmd *exec.Cmd
// HealthCheck describes how to check if this process is up. If we get an http.StatusOK,
// we assume the process is ready to operate.
//
// For example, the /healthz endpoint of the k8s API server, or the /health endpoint of etcd.
HealthCheck HealthCheck
Args []string
StopTimeout time.Duration
StartTimeout time.Duration
Dir string
DirNeedsCleaning bool
Path string
// ready holds whether the process is currently in ready state (hit the ready condition) or not.
// It will be set to true on a successful `Start()` and set to false on a successful `Stop()`
ready bool
// waitDone is closed when our call to wait finishes up, and indicates that
// our process has terminated.
waitDone chan struct{}
errMu sync.Mutex
exitErr error
exited bool
}
// Init sets up this process, configuring binary paths if missing, initializing
// temporary directories, etc.
//
// This defaults all defaultable fields.
func (ps *State) Init(name string) error {
if ps.Path == "" {
if name == "" {
return fmt.Errorf("must have at least one of name or path")
}
ps.Path = BinPathFinder(name, "")
}
if ps.Dir == "" {
newDir, err := os.MkdirTemp("", "k8s_test_framework_")
if err != nil {
return err
}
ps.Dir = newDir
ps.DirNeedsCleaning = true
}
if ps.StartTimeout == 0 {
ps.StartTimeout = 20 * time.Second
}
if ps.StopTimeout == 0 {
ps.StopTimeout = 20 * time.Second
}
return nil
}
type stopChannel chan struct{}
// CheckFlag checks the help output of this command for the presence of the given flag, specified
// without the leading `--` (e.g. `CheckFlag("insecure-port")` checks for `--insecure-port`),
// returning true if the flag is present.
func (ps *State) CheckFlag(flag string) (bool, error) {
cmd := exec.Command(ps.Path, "--help")
outContents, err := cmd.CombinedOutput()
if err != nil {
return false, fmt.Errorf("unable to run command %q to check for flag %q: %w", ps.Path, flag, err)
}
pat := `(?m)^\s*--` + flag + `\b` // (m --> multi-line --> ^ matches start of line)
matched, err := regexp.Match(pat, outContents)
if err != nil {
return false, fmt.Errorf("unable to check command %q for flag %q in help output: %w", ps.Path, flag, err)
}
return matched, nil
}
// Start starts the apiserver, waits for it to come up, and returns an error,
// if occurred.
func (ps *State) Start(stdout, stderr io.Writer) (err error) {
if ps.ready {
return nil
}
ps.Cmd = exec.Command(ps.Path, ps.Args...)
ps.Cmd.Stdout = stdout
ps.Cmd.Stderr = stderr
ready := make(chan bool)
timedOut := time.After(ps.StartTimeout)
pollerStopCh := make(stopChannel)
go pollURLUntilOK(ps.HealthCheck.URL, ps.HealthCheck.PollInterval, ready, pollerStopCh)
ps.waitDone = make(chan struct{})
if err := ps.Cmd.Start(); err != nil {
ps.errMu.Lock()
defer ps.errMu.Unlock()
ps.exited = true
return err
}
go func() {
defer close(ps.waitDone)
err := ps.Cmd.Wait()
ps.errMu.Lock()
defer ps.errMu.Unlock()
ps.exitErr = err
ps.exited = true
}()
select {
case <-ready:
ps.ready = true
return nil
case <-ps.waitDone:
close(pollerStopCh)
return fmt.Errorf("timeout waiting for process %s to start successfully "+
"(it may have failed to start, or stopped unexpectedly before becoming ready)",
path.Base(ps.Path))
case <-timedOut:
close(pollerStopCh)
if ps.Cmd != nil {
// intentionally ignore this -- we might've crashed, failed to start, etc
ps.Cmd.Process.Signal(syscall.SIGTERM) //nolint:errcheck
}
return fmt.Errorf("timeout waiting for process %s to start", path.Base(ps.Path))
}
}
// Exited returns true if the process exited, and may also
// return an error (as per Cmd.Wait) if the process did not
// exit with error code 0.
func (ps *State) Exited() (bool, error) {
ps.errMu.Lock()
defer ps.errMu.Unlock()
return ps.exited, ps.exitErr
}
func pollURLUntilOK(url url.URL, interval time.Duration, ready chan bool, stopCh stopChannel) {
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
// there's probably certs *somewhere*,
// but it's fine to just skip validating
// them for health checks during testing
InsecureSkipVerify: true, //nolint:gosec
},
},
}
if interval <= 0 {
interval = 100 * time.Millisecond
}
for {
res, err := client.Get(url.String())
if err == nil {
res.Body.Close()
if res.StatusCode == http.StatusOK {
ready <- true
return
}
}
select {
case <-stopCh:
return
default:
time.Sleep(interval)
}
}
}
// Stop stops this process gracefully, waits for its termination, and cleans up
// the CertDir if necessary.
func (ps *State) Stop() error {
// Always clear the directory if we need to.
defer func() {
if ps.DirNeedsCleaning {
_ = os.RemoveAll(ps.Dir)
}
}()
if ps.Cmd == nil {
return nil
}
if done, _ := ps.Exited(); done {
return nil
}
if err := ps.Cmd.Process.Signal(syscall.SIGTERM); err != nil {
return fmt.Errorf("unable to signal for process %s to stop: %w", ps.Path, err)
}
timedOut := time.After(ps.StopTimeout)
select {
case <-ps.waitDone:
break
case <-timedOut:
return fmt.Errorf("timeout waiting for process %s to stop", path.Base(ps.Path))
}
ps.ready = false
return nil
}