mirror of
https://github.com/1Password/onepassword-operator.git
synced 2025-10-23 16:00:46 +00:00
Revert "Add option to cosume connect events rather than polling to restart deployments"
This reverts commit a5f4a7a0c1
.
This commit is contained in:
@@ -13,11 +13,6 @@ import (
|
||||
|
||||
"github.com/1Password/onepassword-operator/operator/pkg/controller"
|
||||
op "github.com/1Password/onepassword-operator/operator/pkg/onepassword"
|
||||
"github.com/1Password/onepassword-operator/operator/pkg/onepassword/message"
|
||||
"github.com/suborbital/grav/discovery/local"
|
||||
"github.com/suborbital/grav/grav"
|
||||
"github.com/suborbital/grav/transport/websocket"
|
||||
"github.com/suborbital/vektor/vlog"
|
||||
|
||||
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
|
||||
|
||||
@@ -45,7 +40,6 @@ import (
|
||||
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
|
||||
)
|
||||
|
||||
const envHostVariable = "OP_CONNECT_HOST"
|
||||
const envPollingIntervalVariable = "POLLING_INTERVAL"
|
||||
const manageConnect = "MANAGE_CONNECT"
|
||||
const restartDeploymentsEnvVariable = "AUTO_RESTART"
|
||||
@@ -173,28 +167,21 @@ func main() {
|
||||
// Add the Metrics Service
|
||||
addMetrics(ctx, cfg)
|
||||
|
||||
_, connectSet := os.LookupEnv(envHostVariable)
|
||||
done := make(chan bool)
|
||||
updateSecretsHandler := op.NewManager(mgr.GetClient(), opConnectClient, shouldAutoRestartDeployments())
|
||||
// Setup update secrets task
|
||||
if connectSet {
|
||||
consumeConnectEvents(*updateSecretsHandler)
|
||||
} else {
|
||||
// If not using connect then we will use polling to get secret updates
|
||||
// TODO implement 1Password events-api
|
||||
ticker := time.NewTicker(getPollingIntervalForUpdatingSecrets())
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
ticker.Stop()
|
||||
return
|
||||
case <-ticker.C:
|
||||
updateSecretsHandler.UpdateKubernetesSecretsTask("", "")
|
||||
}
|
||||
updatedSecretsPoller := op.NewManager(mgr.GetClient(), opConnectClient, shouldAutoRestartDeployments())
|
||||
done := make(chan bool)
|
||||
ticker := time.NewTicker(getPollingIntervalForUpdatingSecrets())
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
ticker.Stop()
|
||||
return
|
||||
case <-ticker.C:
|
||||
updatedSecretsPoller.UpdateKubernetesSecretsTask()
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Start the Cmd
|
||||
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
|
||||
@@ -313,43 +300,3 @@ func shouldAutoRestartDeployments() bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func consumeConnectEvents(updateSecretsHandler op.SecretUpdateHandler) {
|
||||
log.Info(fmt.Sprintf("Operator Version: %s", version.Version))
|
||||
log.Info("Testing stuff")
|
||||
logger := vlog.Default(vlog.Level(vlog.LogLevelDebug))
|
||||
gwss := websocket.New()
|
||||
locald := local.New()
|
||||
|
||||
port := "42829"
|
||||
if port, err := strconv.Atoi(os.Getenv("OP_BUS_PORT")); err == nil {
|
||||
port = port
|
||||
}
|
||||
|
||||
g := grav.New(
|
||||
grav.UseLogger(logger),
|
||||
grav.UseEndpoint(port, "http://onepassword-connect/meta/message"),
|
||||
grav.UseTransport(gwss),
|
||||
grav.UseDiscovery(locald),
|
||||
)
|
||||
|
||||
pod := g.Connect()
|
||||
pod.OnType(message.TypeItemUpdate, ItemUpdate(updateSecretsHandler))
|
||||
}
|
||||
|
||||
// B5ItemUsage Grav message handler for activity.event messages. On READ
|
||||
// events an update will be sent to the b5 api
|
||||
func ItemUpdate(updateSecretsHandler op.SecretUpdateHandler) grav.MsgFunc {
|
||||
return func(msg grav.Message) error {
|
||||
e := message.ItemUpdateEvent{}
|
||||
if err := msg.UnmarshalData(&e); err != nil {
|
||||
log.Error(err, "failed to UnmarshalData into Event")
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Info(fmt.Sprintf("Detected update for item %s at vault %s", e.ItemId, e.VaultId))
|
||||
updateSecretsHandler.UpdateKubernetesSecretsTask("", "")
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@@ -1,27 +0,0 @@
|
||||
package message
|
||||
|
||||
import "encoding/json"
|
||||
|
||||
// TypeItemUpdate and others are sync message types
|
||||
const (
|
||||
TypeItemUpdate = "item.update"
|
||||
)
|
||||
|
||||
// ItemUpdateEvent is the data for a sync status message
|
||||
type ItemUpdateEvent struct {
|
||||
VaultId string `json:"vaultId"`
|
||||
ItemId string `json:"itemId"`
|
||||
Version string `json:"version"`
|
||||
}
|
||||
|
||||
// Type returns a the syns status data type
|
||||
func (s *ItemUpdateEvent) Type() string {
|
||||
return TypeItemUpdate
|
||||
}
|
||||
|
||||
// Bytes returns Bytes
|
||||
func (s *ItemUpdateEvent) Bytes() []byte {
|
||||
bytes, _ := json.Marshal(s)
|
||||
|
||||
return bytes
|
||||
}
|
@@ -36,13 +36,13 @@ type SecretUpdateHandler struct {
|
||||
shouldAutoRestartDeploymentsGlobal bool
|
||||
}
|
||||
|
||||
func (h *SecretUpdateHandler) UpdateKubernetesSecretsTask(vaultId, itemId string) error {
|
||||
updatedKubernetesSecrets, err := h.updateKubernetesSecrets(vaultId, itemId)
|
||||
func (h *SecretUpdateHandler) UpdateKubernetesSecretsTask() error {
|
||||
updatedKubernetesSecrets, err := h.updateKubernetesSecrets()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
updatedInjectedSecrets, err := h.updateInjectedSecrets(vaultId, itemId)
|
||||
updatedInjectedSecrets, err := h.updateInjectedSecrets()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -113,7 +113,7 @@ func (h *SecretUpdateHandler) restartDeployment(deployment *appsv1.Deployment) {
|
||||
}
|
||||
}
|
||||
|
||||
func (h *SecretUpdateHandler) updateKubernetesSecrets(vaultId, itemId string) (map[string]map[string]*corev1.Secret, error) {
|
||||
func (h *SecretUpdateHandler) updateKubernetesSecrets() (map[string]map[string]*corev1.Secret, error) {
|
||||
secrets := &corev1.SecretList{}
|
||||
err := h.client.List(context.Background(), secrets)
|
||||
if err != nil {
|
||||
@@ -126,16 +126,11 @@ func (h *SecretUpdateHandler) updateKubernetesSecrets(vaultId, itemId string) (m
|
||||
secret := secrets.Items[i]
|
||||
|
||||
itemPath := secret.Annotations[ItemPathAnnotation]
|
||||
|
||||
currentVersion := secret.Annotations[VersionAnnotation]
|
||||
if len(itemPath) == 0 || len(currentVersion) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if vaultId != "" && itemId != "" && itemPath != fmt.Sprintf("vaults/%s/items%s", vaultId, itemId) {
|
||||
continue
|
||||
}
|
||||
|
||||
item, err := GetOnePasswordItemByPath(h.opConnectClient, secret.Annotations[ItemPathAnnotation])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to retrieve item: %v", err)
|
||||
@@ -162,7 +157,7 @@ func (h *SecretUpdateHandler) updateKubernetesSecrets(vaultId, itemId string) (m
|
||||
return updatedSecrets, nil
|
||||
}
|
||||
|
||||
func (h *SecretUpdateHandler) updateInjectedSecrets(vaultId, itemId string) (map[string]map[string]*onepasswordv1.OnePasswordItem, error) {
|
||||
func (h *SecretUpdateHandler) updateInjectedSecrets() (map[string]map[string]*onepasswordv1.OnePasswordItem, error) {
|
||||
// fetch all onepassworditems
|
||||
onepasswordItems := &onepasswordv1.OnePasswordItemList{}
|
||||
err := h.client.List(context.Background(), onepasswordItems)
|
||||
@@ -185,9 +180,6 @@ func (h *SecretUpdateHandler) updateInjectedSecrets(vaultId, itemId string) (map
|
||||
if len(itemPath) == 0 || len(currentVersion) == 0 {
|
||||
continue
|
||||
}
|
||||
if vaultId != "" && itemId != "" && itemPath != fmt.Sprintf("vaults/%s/items%s", vaultId, itemId) {
|
||||
continue
|
||||
}
|
||||
|
||||
storedItem, err := GetOnePasswordItemByPath(h.opConnectClient, itemPath)
|
||||
if err != nil {
|
||||
|
@@ -918,7 +918,7 @@ func TestUpdateSecretHandler(t *testing.T) {
|
||||
shouldAutoRestartDeploymentsGlobal: testData.globalAutoRestartEnabled,
|
||||
}
|
||||
|
||||
err := h.UpdateKubernetesSecretsTask("", "")
|
||||
err := h.UpdateKubernetesSecretsTask()
|
||||
|
||||
assert.Equal(t, testData.expectedError, err)
|
||||
|
||||
|
Reference in New Issue
Block a user