Skip to content

Commit

Permalink
CBG-4407 synchronously stop xdcr from CBS
Browse files Browse the repository at this point in the history
  • Loading branch information
torcolvin committed Jan 7, 2025
1 parent e1a3ecb commit b893746
Showing 1 changed file with 80 additions and 1 deletion.
81 changes: 80 additions & 1 deletion xdcr/cbs_xdcr.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"fmt"
"net/http"
"net/url"
"os/exec"
"runtime"
"slices"
"strings"

"github.com/couchbase/sync_gateway/base"
Expand Down Expand Up @@ -94,6 +97,13 @@ func createCluster(ctx context.Context, bucket *base.GocbV2Bucket) error {

// newCouchbaseServerManager creates an instance of XDCR backed by Couchbase Server. This is not started until Start is called.
func newCouchbaseServerManager(ctx context.Context, fromBucket *base.GocbV2Bucket, toBucket *base.GocbV2Bucket, opts XDCROptions) (*couchbaseServerManager, error) {
nodes, err := kvNodes(ctx, fromBucket)
if err != nil {
return nil, err
}
if len(nodes) != 1 {
return nil, fmt.Errorf("To run xdcr tests, exactly one kv node is needed to grep the goxdcr.log file. To extend this to multiple nodes (found %s), all nodes would need to be grepped", nodes)
}
// there needs to be a global cluster present, this is a hostname + username + password. There can be only one per hostname, so create it lazily.
isPresent, err := isClusterPresent(ctx, fromBucket)
if err != nil {
Expand All @@ -114,6 +124,38 @@ func newCouchbaseServerManager(ctx context.Context, fromBucket *base.GocbV2Bucke
}, nil
}

// kvNodes returns the hostnames of KV nodes in the cluster.
func kvNodes(ctx context.Context, bucket *base.GocbV2Bucket) ([]string, error) {
url := "/pools/default/"
method := http.MethodGet
output, statusCode, err := bucket.MgmtRequest(ctx, method, url, "application/x-www-form-urlencoded", nil)
if err != nil {
return nil, err
}
if statusCode != http.StatusOK {
return nil, fmt.Errorf("Could not get the number bucket metadata: %s. %s %s -> (%d) %s", xdcrClusterName, method, url, statusCode, output)
}
type nodesOutput struct {
Nodes []struct {
Hostname string `json:"hostname"`
Services []string `json:"services"`
} `json:"nodes"`
}
fmt.Println(string(output))
nodes := nodesOutput{}
err = base.JSONUnmarshal(output, &nodes)
if err != nil {
return nil, err
}
var hostnames []string
for _, node := range nodes.Nodes {
if slices.Contains(node.Services, "kv") {
hostnames = append(hostnames, node.Hostname)
}
}
return hostnames, nil
}

// Start starts the XDCR replication.
func (x *couchbaseServerManager) Start(ctx context.Context) error {
if x.replicationID != "" {
Expand Down Expand Up @@ -170,8 +212,12 @@ func (x *couchbaseServerManager) Stop(ctx context.Context) error {
if statusCode != http.StatusOK {
return fmt.Errorf("Could not cancel XDCR replication: %s. %s %s -> (%d) %s", x.replicationID, method, url, statusCode, output)
}
err = x.waitForStoppedInLogFile(ctx)
if err != nil {
return err
}
x.replicationID = ""
return nil
return err
}

// Stats returns the stats of the XDCR replication.
Expand Down Expand Up @@ -230,4 +276,37 @@ outer:
return 0, errNoXDCRMetrics
}

// waitForStoppedInLogFile waits for the replication to stop by checking the log file.
func (x *couchbaseServerManager) waitForStoppedInLogFile(ctx context.Context) error {
// magic string to indicate that the replication has stopped
grepStr := fmt.Sprintf("%s status is finished shutting down", x.replicationID)
usingDocker, dockerName := base.TestUseCouchbaseServerDockerName()
cmdLine := ""
logFile := "/opt/couchbase/var/lib/couchbase/logs/goxdcr.log"
if usingDocker {
cmdLine = fmt.Sprintf(`docker exec -t %s cat "%s" | grep "%s"`, dockerName, logFile, grepStr)
} else {
if runtime.GOOS == "darwin" {
logFile = "$HOME/Library/Application Support/Couchbase/var/lib/couchbase/logs/goxdcr.log"
}
cmdLine = fmt.Sprintf(`cat "%s" | grep "%s"`, logFile, grepStr)
}
err, _ := base.RetryLoop(ctx, "ReadLogFileUntilStopped", func() (shouldRetry bool, err error, value any) {
cmd := exec.Command("bash", "-c", cmdLine)
output, err := cmd.CombinedOutput()
if err != nil {
return true, fmt.Errorf("Failed to run %s (%w) Output: %s", cmd, err, output), nil
}
return false, nil, nil
}, base.CreateMaxDoublingSleeperFunc(10, 10, 1000))
if err != nil {
suffix := ""
if !usingDocker {
suffix = fmt.Sprintf(". If you are running in docker, you may need to set the environment variable %s=<name of the container>", base.TestEnvCouchbaseServerDockerName)
}
return fmt.Errorf("Could not find %s in %s. %w%s", grepStr, logFile, err, suffix)
}
return nil
}

var _ Manager = &couchbaseServerManager{}

0 comments on commit b893746

Please sign in to comment.