diff --git a/.changelog/4255.txt b/.changelog/4255.txt deleted file mode 100644 index 1960697afa..0000000000 --- a/.changelog/4255.txt +++ /dev/null @@ -1,3 +0,0 @@ -```release-note:bug -sync-catalog: Enable the user to purge the registered services by passing parent node and necessary filters. -``` \ No newline at end of file diff --git a/control-plane/subcommand/sync-catalog/command.go b/control-plane/subcommand/sync-catalog/command.go index c3965165ed..b5d2c9d5af 100644 --- a/control-plane/subcommand/sync-catalog/command.go +++ b/control-plane/subcommand/sync-catalog/command.go @@ -12,20 +12,16 @@ import ( "os" "os/signal" "regexp" - "strings" "sync" "syscall" "time" "github.com/armon/go-metrics/prometheus" - "github.com/cenkalti/backoff" mapset "github.com/deckarep/golang-set" "github.com/hashicorp/consul-server-connection-manager/discovery" - "github.com/hashicorp/consul/api" "github.com/hashicorp/go-hclog" "github.com/mitchellh/cli" "github.com/prometheus/client_golang/prometheus/promhttp" - "golang.org/x/sync/errgroup" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" @@ -46,29 +42,27 @@ import ( type Command struct { UI cli.Ui - flags *flag.FlagSet - consul *flags.ConsulFlags - k8s *flags.K8SFlags - flagListen string - flagToConsul bool - flagToK8S bool - flagConsulDomain string - flagConsulK8STag string - flagConsulNodeName string - flagK8SDefault bool - flagK8SServicePrefix string - flagConsulServicePrefix string - flagK8SSourceNamespace string - flagK8SWriteNamespace string - flagConsulWritePeriod time.Duration - flagSyncClusterIPServices bool - flagSyncLBEndpoints bool - flagNodePortSyncType string - flagAddK8SNamespaceSuffix bool - flagLogLevel string - flagLogJSON bool - flagPurgeK8SServicesFromNode string - flagFilter string + flags *flag.FlagSet + consul *flags.ConsulFlags + k8s *flags.K8SFlags + flagListen string + flagToConsul bool + flagToK8S bool + flagConsulDomain string + flagConsulK8STag string + flagConsulNodeName string + flagK8SDefault bool + flagK8SServicePrefix string + flagConsulServicePrefix string + flagK8SSourceNamespace string + flagK8SWriteNamespace string + flagConsulWritePeriod time.Duration + flagSyncClusterIPServices bool + flagSyncLBEndpoints bool + flagNodePortSyncType string + flagAddK8SNamespaceSuffix bool + flagLogLevel string + flagLogJSON bool // Flags to support namespaces flagEnableNamespaces bool // Use namespacing on all components @@ -156,11 +150,6 @@ func (c *Command) init() { "\"debug\", \"info\", \"warn\", and \"error\".") c.flags.BoolVar(&c.flagLogJSON, "log-json", false, "Enable or disable JSON output format for logging.") - c.flags.StringVar(&c.flagPurgeK8SServicesFromNode, "purge-k8s-services-from-node", "", - "Specifies the name of the Consul node for which to deregister synced Kubernetes services.") - c.flags.StringVar(&c.flagFilter, "filter", "", - "Specifies the expression used to filter the services on the Consul node that will be deregistered. "+ - "The syntax for this filter is the same as the syntax used in the List Services for Node API in the Consul catalog.") c.flags.Var((*flags.AppendSliceValue)(&c.flagAllowK8sNamespacesList), "allow-k8s-namespace", "K8s namespaces to explicitly allow. May be specified multiple times.") @@ -279,19 +268,6 @@ func (c *Command) Run(args []string) int { } c.ready = true - if c.flagPurgeK8SServicesFromNode != "" { - consulClient, err := consul.NewClientFromConnMgr(consulConfig, c.connMgr) - if err != nil { - c.UI.Error(fmt.Sprintf("unable to instantiate consul client: %s", err)) - return 1 - } - if err := c.removeAllK8SServicesFromConsulNode(consulClient); err != nil { - c.UI.Error(fmt.Sprintf("unable to remove all K8S services: %s", err)) - return 1 - } - return 0 - } - // Convert allow/deny lists to sets allowSet := flags.ToSet(c.flagAllowK8sNamespacesList) denySet := flags.ToSet(c.flagDenyK8sNamespacesList) @@ -460,70 +436,6 @@ func (c *Command) Run(args []string) int { } } -// remove all k8s services from Consul. -func (c *Command) removeAllK8SServicesFromConsulNode(consulClient *api.Client) error { - node, _, err := consulClient.Catalog().NodeServiceList(c.flagPurgeK8SServicesFromNode, &api.QueryOptions{Filter: c.flagFilter}) - if err != nil { - return err - } - - var firstErr error - services := node.Services - batchSize := 300 - maxRetries := 2 - retryDelay := 200 * time.Millisecond - - // Ask for user confirmation before purging services - for { - c.UI.Info(fmt.Sprintf("Are you sure you want to delete %v K8S services from %v? (y/n): ", len(services), c.flagPurgeK8SServicesFromNode)) - var input string - fmt.Scanln(&input) - if input = strings.ToLower(input); input == "y" { - break - } else if input == "n" { - return nil - } else { - c.UI.Info("Invalid input. Please enter 'y' or 'n'.") - } - } - - for i := 0; i < len(services); i += batchSize { - end := i + batchSize - if end > len(services) { - end = len(services) - } - - var eg errgroup.Group - for _, service := range services[i:end] { - s := service - eg.Go(func() error { - var b backoff.BackOff = backoff.NewConstantBackOff(retryDelay) - b = backoff.WithMaxRetries(b, uint64(maxRetries)) - return backoff.Retry(func() error { - _, err := consulClient.Catalog().Deregister(&api.CatalogDeregistration{ - Node: c.flagPurgeK8SServicesFromNode, - ServiceID: s.ID, - }, nil) - return err - }, b) - }) - } - if err := eg.Wait(); err != nil { - if firstErr == nil { - c.UI.Info("Some K8S services were not deregistered from Consul") - firstErr = err - } - } - c.UI.Info(fmt.Sprintf("Processed %v K8S services from %v", end-i, c.flagPurgeK8SServicesFromNode)) - } - - if firstErr != nil { - return firstErr - } - c.UI.Info("All K8S services were deregistered from Consul") - return nil -} - func (c *Command) handleReady(rw http.ResponseWriter, _ *http.Request) { if !c.ready { c.UI.Error("[GET /health/ready] sync catalog controller is not yet ready") @@ -553,7 +465,7 @@ func (c *Command) validateFlags() error { // For the Consul node name to be discoverable via DNS, it must contain only // dashes and alphanumeric characters. Length is also constrained. // These restrictions match those defined in Consul's agent definition. - var invalidDnsRe = regexp.MustCompile(`[^A-Za-z0-9\\-]+`) + invalidDnsRe := regexp.MustCompile(`[^A-Za-z0-9\\-]+`) const maxDNSLabelLength = 63 if invalidDnsRe.MatchString(c.flagConsulNodeName) { @@ -586,7 +498,7 @@ func (c *Command) recordMetrics() (*prometheus.PrometheusSink, error) { return &prometheus.PrometheusSink{}, err } - var counters = [][]prometheus.CounterDefinition{ + counters := [][]prometheus.CounterDefinition{ catalogtoconsul.SyncToConsulCounters, catalogtok8s.SyncToK8sCounters, } @@ -621,8 +533,9 @@ func (c *Command) authorizeMiddleware() func(http.Handler) http.Handler { } } -const synopsis = "Sync Kubernetes services and Consul services." -const help = ` +const ( + synopsis = "Sync Kubernetes services and Consul services." + help = ` Usage: consul-k8s-control-plane sync-catalog [options] Sync K8S pods, services, and more with the Consul service catalog. @@ -631,3 +544,4 @@ Usage: consul-k8s-control-plane sync-catalog [options] K8S services. ` +) diff --git a/control-plane/subcommand/sync-catalog/command_test.go b/control-plane/subcommand/sync-catalog/command_test.go index b6aa63a903..ca2aca4e37 100644 --- a/control-plane/subcommand/sync-catalog/command_test.go +++ b/control-plane/subcommand/sync-catalog/command_test.go @@ -13,7 +13,6 @@ import ( "time" "github.com/armon/go-metrics/prometheus" - "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/go-hclog" "github.com/mitchellh/cli" @@ -558,164 +557,6 @@ func TestRun_ToConsulChangingFlags(t *testing.T) { } } -// Test services could be de-registered from Consul. -func TestRemoveAllK8SServicesFromConsul(t *testing.T) { - t.Parallel() - - k8s, testClient := completeSetup(t) - consulClient := testClient.APIClient - - // Create a mock reader to simulate user input - input := "y\n" - reader, writer, err := os.Pipe() - require.NoError(t, err) - oldStdin := os.Stdin - os.Stdin = reader - defer func() { os.Stdin = oldStdin }() - - // Write the simulated user input to the mock reader - go func() { - defer writer.Close() - _, err := writer.WriteString(input) - require.NoError(t, err) - }() - - // Run the command. - ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - clientset: k8s, - logger: hclog.New(&hclog.LoggerOptions{ - Name: t.Name(), - Level: hclog.Debug, - }), - flagAllowK8sNamespacesList: []string{"*"}, - connMgr: testClient.Watcher, - } - - // create two services in k8s - _, err = k8s.CoreV1().Services("bar").Create(context.Background(), lbService("foo", "1.1.1.1"), metav1.CreateOptions{}) - require.NoError(t, err) - - _, err = k8s.CoreV1().Services("baz").Create(context.Background(), lbService("foo", "2.2.2.2"), metav1.CreateOptions{}) - require.NoError(t, err) - - longRunningChan := runCommandAsynchronously(&cmd, []string{ - "-addresses", "127.0.0.1", - "-http-port", strconv.Itoa(testClient.Cfg.HTTPPort), - "-consul-write-interval", "100ms", - "-add-k8s-namespace-suffix", - }) - defer stopCommand(t, &cmd, longRunningChan) - - // check that the two K8s services have been synced into Consul - retry.Run(t, func(r *retry.R) { - svc, _, err := consulClient.Catalog().Service("foo-bar", "k8s", nil) - require.NoError(r, err) - require.Len(r, svc, 1) - require.Equal(r, "1.1.1.1", svc[0].ServiceAddress) - svc, _, err = consulClient.Catalog().Service("foo-baz", "k8s", nil) - require.NoError(r, err) - require.Len(r, svc, 1) - require.Equal(r, "2.2.2.2", svc[0].ServiceAddress) - }) - - exitChan := runCommandAsynchronously(&cmd, []string{ - "-addresses", "127.0.0.1", - "-http-port", strconv.Itoa(testClient.Cfg.HTTPPort), - "-purge-k8s-services-from-node=k8s-sync", - }) - stopCommand(t, &cmd, exitChan) - - retry.Run(t, func(r *retry.R) { - serviceList, _, err := consulClient.Catalog().NodeServiceList("k8s-sync", &api.QueryOptions{AllowStale: false}) - require.NoError(r, err) - require.Len(r, serviceList.Services, 0) - }) -} - -// Test services could be de-registered from Consul with filter. -func TestRemoveAllK8SServicesFromConsulWithFilter(t *testing.T) { - t.Parallel() - - k8s, testClient := completeSetup(t) - consulClient := testClient.APIClient - - // Create a mock reader to simulate user input - input := "y\n" - reader, writer, err := os.Pipe() - require.NoError(t, err) - oldStdin := os.Stdin - os.Stdin = reader - defer func() { os.Stdin = oldStdin }() - - // Write the simulated user input to the mock reader - go func() { - defer writer.Close() - _, err := writer.WriteString(input) - require.NoError(t, err) - }() - - // Run the command. - ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - clientset: k8s, - logger: hclog.New(&hclog.LoggerOptions{ - Name: t.Name(), - Level: hclog.Debug, - }), - flagAllowK8sNamespacesList: []string{"*"}, - connMgr: testClient.Watcher, - } - - // create two services in k8s - _, err = k8s.CoreV1().Services("bar").Create(context.Background(), lbService("foo", "1.1.1.1"), metav1.CreateOptions{}) - require.NoError(t, err) - _, err = k8s.CoreV1().Services("baz").Create(context.Background(), lbService("foo", "2.2.2.2"), metav1.CreateOptions{}) - require.NoError(t, err) - _, err = k8s.CoreV1().Services("bat").Create(context.Background(), lbService("foo", "3.3.3.3"), metav1.CreateOptions{}) - require.NoError(t, err) - - longRunningChan := runCommandAsynchronously(&cmd, []string{ - "-addresses", "127.0.0.1", - "-http-port", strconv.Itoa(testClient.Cfg.HTTPPort), - "-consul-write-interval", "100ms", - "-add-k8s-namespace-suffix", - }) - defer stopCommand(t, &cmd, longRunningChan) - - // check that the name of the service is namespaced - retry.Run(t, func(r *retry.R) { - svc, _, err := consulClient.Catalog().Service("foo-bar", "k8s", nil) - require.NoError(r, err) - require.Len(r, svc, 1) - require.Equal(r, "1.1.1.1", svc[0].ServiceAddress) - svc, _, err = consulClient.Catalog().Service("foo-baz", "k8s", nil) - require.NoError(r, err) - require.Len(r, svc, 1) - require.Equal(r, "2.2.2.2", svc[0].ServiceAddress) - svc, _, err = consulClient.Catalog().Service("foo-bat", "k8s", nil) - require.NoError(r, err) - require.Len(r, svc, 1) - require.Equal(r, "3.3.3.3", svc[0].ServiceAddress) - }) - - exitChan := runCommandAsynchronously(&cmd, []string{ - "-addresses", "127.0.0.1", - "-http-port", strconv.Itoa(testClient.Cfg.HTTPPort), - "-purge-k8s-services-from-node=k8s-sync", - "-filter=baz in ID", - }) - stopCommand(t, &cmd, exitChan) - - retry.Run(t, func(r *retry.R) { - serviceList, _, err := consulClient.Catalog().NodeServiceList("k8s-sync", &api.QueryOptions{AllowStale: false}) - require.NoError(r, err) - require.Len(r, serviceList.Services, 2) - }) -} - // Set up test consul agent and fake kubernetes cluster client. func completeSetup(t *testing.T) (*fake.Clientset, *test.TestServerClient) { k8s := fake.NewSimpleClientset()