Skip to content

Commit

Permalink
Create a v2 snapshot when running etcdutl migrate command
Browse files Browse the repository at this point in the history
Also added test to cover the etcdutl migrate command

Signed-off-by: Benjamin Wang <benjamin.ahrtr@gmail.com>
  • Loading branch information
ahrtr committed Jan 13, 2025
1 parent ad11af7 commit df49a51
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 29 deletions.
83 changes: 83 additions & 0 deletions etcdutl/etcdutl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@ package etcdutl

import (
"errors"
"fmt"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/raft/v3/raftpb"
Expand Down Expand Up @@ -68,3 +73,81 @@ func getLatestV2Snapshot(lg *zap.Logger, dataDir string) (*raftpb.Snapshot, erro

return snapshot, nil
}

func createV2SnapshotFromV3Store(dataDir string, be backend.Backend) error {
var (
lg = GetLogger()

Check warning on line 79 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L77-L79

Added lines #L77 - L79 were not covered by tests

snapDir = datadir.ToSnapDir(dataDir)
walDir = datadir.ToWALDir(dataDir)
)

Check warning on line 83 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L81-L83

Added lines #L81 - L83 were not covered by tests

ci, term := schema.ReadConsistentIndex(be.ReadTx())

Check warning on line 85 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L85

Added line #L85 was not covered by tests

cl := membership.NewCluster(lg)
cl.SetBackend(schema.NewMembershipBackend(lg, be))
cl.UnsafeLoad()

Check warning on line 89 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L87-L89

Added lines #L87 - L89 were not covered by tests

latestWALSnap, err := getLatestWALSnap(lg, dataDir)
if err != nil {
return err

Check warning on line 93 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L91-L93

Added lines #L91 - L93 were not covered by tests
}

// Each time before creating the v2 snapshot, etcdserve always flush
// the backend storage (bbolt db), so the consistent index should never
// less than the Index or term of the latest snapshot.
if ci < latestWALSnap.Index || term < latestWALSnap.Term {

Check warning on line 99 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L99

Added line #L99 was not covered by tests
// This should never happen
return fmt.Errorf("consistent_index [Index: %d, Term: %d] is less than the latest snapshot [Index: %d, Term: %d]", ci, term, latestWALSnap.Index, latestWALSnap.Term)

Check warning on line 101 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L101

Added line #L101 was not covered by tests
}

voters, learners := getVotersAndLearners(cl)
confState := raftpb.ConfState{
Voters: voters,
Learners: learners,

Check warning on line 107 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L104-L107

Added lines #L104 - L107 were not covered by tests
}

// create the v2 snaspshot file
raftSnap := raftpb.Snapshot{
Data: etcdserver.GetMembershipInfoInV2Format(lg, cl),
Metadata: raftpb.SnapshotMetadata{
Index: ci,
Term: term,
ConfState: confState,
},

Check warning on line 117 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L111-L117

Added lines #L111 - L117 were not covered by tests
}
sn := snap.New(lg, snapDir)
if err = sn.SaveSnap(raftSnap); err != nil {
return err

Check warning on line 121 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L119-L121

Added lines #L119 - L121 were not covered by tests
}

// save WAL snapshot record
w, err := wal.Open(lg, walDir, latestWALSnap)
if err != nil {
return err

Check warning on line 127 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L125-L127

Added lines #L125 - L127 were not covered by tests
}
defer w.Close()

Check warning on line 129 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L129

Added line #L129 was not covered by tests
// We must read all records to locate the tail of the last valid WAL file.
if _, _, _, err = w.ReadAll(); err != nil {
return err

Check warning on line 132 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L131-L132

Added lines #L131 - L132 were not covered by tests
}

return w.SaveSnapshot(walpb.Snapshot{Index: ci, Term: term, ConfState: &confState})

Check warning on line 135 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L135

Added line #L135 was not covered by tests
}

func getVotersAndLearners(cl *membership.RaftCluster) ([]uint64, []uint64) {
var (
voters []uint64
learners []uint64
)
for _, m := range cl.Members() {
if m.IsLearner {
learners = append(learners, uint64(m.ID))
continue

Check warning on line 146 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L138-L146

Added lines #L138 - L146 were not covered by tests
}

voters = append(voters, uint64(m.ID))

Check warning on line 149 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L149

Added line #L149 was not covered by tests
}

return voters, learners

Check warning on line 152 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L152

Added line #L152 was not covered by tests
}
67 changes: 48 additions & 19 deletions etcdutl/etcdutl/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ func (o *migrateOptions) AddFlags(cmd *cobra.Command) {

func (o *migrateOptions) Config() (*migrateConfig, error) {
c := &migrateConfig{
force: o.force,
lg: GetLogger(),
force: o.force,
dataDir: o.dataDir,
lg: GetLogger(),

Check warning on line 79 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L77-L79

Added lines #L77 - L79 were not covered by tests
}
var err error
dotCount := strings.Count(o.targetVersion, ".")
Expand All @@ -90,47 +91,73 @@ func (o *migrateOptions) Config() (*migrateConfig, error) {
return nil, fmt.Errorf(`target version %q not supported. Minimal "3.5"`, storageVersionToString(c.targetVersion))
}

dbPath := datadir.ToBackendFileName(o.dataDir)
c.be = backend.NewDefaultBackend(GetLogger(), dbPath)
return c, nil

Check warning on line 94 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L94

Added line #L94 was not covered by tests
}

type migrateConfig struct {
lg *zap.Logger
be backend.Backend
targetVersion *semver.Version
walVersion schema.WALVersion
dataDir string
force bool
}

walPath := datadir.ToWALDir(o.dataDir)
walSnap, err := getLatestWALSnap(c.lg, o.dataDir)
func (c *migrateConfig) finalize() error {
walPath := datadir.ToWALDir(c.dataDir)
walSnap, err := getLatestWALSnap(c.lg, c.dataDir)

Check warning on line 108 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L106-L108

Added lines #L106 - L108 were not covered by tests
if err != nil {
return nil, fmt.Errorf("failed to get the lastest snapshot: %w", err)
return fmt.Errorf("failed to get the lastest snapshot: %w", err)

Check warning on line 110 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L110

Added line #L110 was not covered by tests
}
w, err := wal.OpenForRead(c.lg, walPath, walSnap)
if err != nil {
return nil, fmt.Errorf(`failed to open wal: %w`, err)
return fmt.Errorf(`failed to open wal: %w`, err)

Check warning on line 114 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L114

Added line #L114 was not covered by tests
}
defer w.Close()
c.walVersion, err = wal.ReadWALVersion(w)
if err != nil {
return nil, fmt.Errorf(`failed to read wal: %w`, err)
return fmt.Errorf(`failed to read wal: %w`, err)

Check warning on line 119 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L119

Added line #L119 was not covered by tests
}

return c, nil
}

type migrateConfig struct {
lg *zap.Logger
be backend.Backend
targetVersion *semver.Version
walVersion schema.WALVersion
force bool
return nil

Check warning on line 122 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L122

Added line #L122 was not covered by tests
}

func migrateCommandFunc(c *migrateConfig) error {
dbPath := datadir.ToBackendFileName(c.dataDir)
c.be = backend.NewDefaultBackend(GetLogger(), dbPath)

Check warning on line 127 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L126-L127

Added lines #L126 - L127 were not covered by tests
defer c.be.Close()

tx := c.be.BatchTx()
current, err := schema.DetectSchemaVersion(c.lg, c.be.ReadTx())
if err != nil {
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older")
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older", zap.Error(err))

Check warning on line 133 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L133

Added line #L133 was not covered by tests
return err
}
if current == *c.targetVersion {
c.lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(&current)))
return nil
}

// only generate a v2 snapshot file for downgrade case
if c.targetVersion.LessThan(current) {

Check warning on line 142 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L142

Added line #L142 was not covered by tests
// Update cluster version
be := schema.NewMembershipBackend(c.lg, c.be)
be.MustSaveClusterVersionToBackend(c.targetVersion)

Check warning on line 145 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L144-L145

Added lines #L144 - L145 were not covered by tests

// forcibly create a v2 snapshot file
// TODO: remove in 3.8
if err = createV2SnapshotFromV3Store(c.dataDir, c.be); err != nil {
c.lg.Error("Failed to create v2 snapshot file", zap.Error(err))
return err

Check warning on line 151 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L149-L151

Added lines #L149 - L151 were not covered by tests
}
c.lg.Info("Generated a v2 snapshot file")

Check warning on line 153 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L153

Added line #L153 was not covered by tests
}

if err = c.finalize(); err != nil {
c.lg.Error("Failed to finalize config", zap.Error(err))
return err

Check warning on line 158 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L156-L158

Added lines #L156 - L158 were not covered by tests
}

err = schema.Migrate(c.lg, tx, c.walVersion, *c.targetVersion)
if err != nil {
if !c.force {
Expand All @@ -139,7 +166,9 @@ func migrateCommandFunc(c *migrateConfig) error {
c.lg.Info("normal migrate failed, trying with force", zap.Error(err))
migrateForce(c.lg, tx, c.targetVersion)
}

c.be.ForceCommit()

return nil
}

Expand Down
16 changes: 11 additions & 5 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,22 +256,28 @@ func (c *RaftCluster) SetVersionChangedNotifier(n *notify.Notifier) {
c.versionChanged = n
}

func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.Lock()
defer c.Unlock()

func (c *RaftCluster) UnsafeLoad() {
if c.be != nil {
c.version = c.be.ClusterVersionFromBackend()
c.members, c.removed = c.be.MustReadMembersFromBackend()
} else {
c.version = clusterVersionFromStore(c.lg, c.v2store)
c.members, c.removed = membersFromStore(c.lg, c.v2store)
}
c.buildMembershipMetric()

if c.be != nil {
c.downgradeInfo = c.be.DowngradeInfoFromBackend()
}
}

func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.Lock()
defer c.Unlock()

c.UnsafeLoad()

c.buildMembershipMetric()

sv := semver.Must(semver.NewVersion(version.Version))
if c.downgradeInfo != nil && c.downgradeInfo.Enabled {
c.lg.Info(
Expand Down
55 changes: 50 additions & 5 deletions tests/e2e/utl_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestEtctlutlMigrate(t *testing.T) {

expectLogsSubString string
expectStorageVersion *semver.Version
expectTargetBinary string
}{
{
name: "Invalid target version string",
Expand Down Expand Up @@ -81,23 +82,25 @@ func TestEtctlutlMigrate(t *testing.T) {
{
name: "Migrate v3.5 to v3.5 is no-op",
clusterVersion: e2e.LastVersion,
clusterSize: 1,
targetVersion: "3.5",
clusterSize: 1,
expectLogsSubString: "storage version up-to-date\t" + `{"storage-version": "3.5"}`,
},
{
name: "Upgrade 1 member cluster from v3.5 to v3.6 should work",
clusterVersion: e2e.LastVersion,
clusterSize: 1,
targetVersion: "3.6",
clusterSize: 1,
expectStorageVersion: &version.V3_6,
expectTargetBinary: e2e.BinPath.Etcd,
},
{
name: "Upgrade 3 member cluster from v3.5 to v3.6 should work",
clusterVersion: e2e.LastVersion,
clusterSize: 3,
targetVersion: "3.6",
clusterSize: 3,
expectStorageVersion: &version.V3_6,
expectTargetBinary: e2e.BinPath.Etcd,
},
{
name: "Migrate v3.6 to v3.6 is no-op",
Expand All @@ -112,13 +115,15 @@ func TestEtctlutlMigrate(t *testing.T) {
clusterSize: 1,
expectLogsSubString: "updated storage version",
expectStorageVersion: nil, // 3.5 doesn't have the field `storageVersion`, so it returns nil.
expectTargetBinary: e2e.BinPath.EtcdLastRelease,
},
{
name: "Downgrade 3 member cluster from v3.6 to v3.5 should work",
targetVersion: "3.5",
clusterSize: 3,
expectLogsSubString: "updated storage version",
expectStorageVersion: nil, // 3.5 doesn't have the field `storageVersion`, so it returns nil.
expectTargetBinary: e2e.BinPath.EtcdLastRelease,
},
{
name: "Upgrade v3.6 to v3.7 with force should work",
Expand All @@ -141,7 +146,7 @@ func TestEtctlutlMigrate(t *testing.T) {
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithVersion(tc.clusterVersion),
e2e.WithDataDirPath(dataDirPath),
e2e.WithClusterSize(1),
e2e.WithClusterSize(tc.clusterSize),
e2e.WithKeepDataDir(true),
// Set low SnapshotCount to ensure wal snapshot is done
e2e.WithSnapshotCount(1),
Expand All @@ -163,7 +168,7 @@ func TestEtctlutlMigrate(t *testing.T) {
require.NoError(t, e2e.SpawnWithExpect(append(prefixArgs, "put", fmt.Sprintf("%d", i), "value"), expect.ExpectedResponse{Value: "OK"}))
}

t.Log("Stopping the the members")
t.Log("Stopping all the servers")
for i := 0; i < len(epc.Procs); i++ {
t.Logf("Stopping server %d: %v", i, epc.Procs[i].EndpointsGRPC())
err = epc.Procs[i].Stop()
Expand All @@ -190,6 +195,46 @@ func TestEtctlutlMigrate(t *testing.T) {
assert.Equal(t, tc.expectStorageVersion, ver)
be.Close()
}

if len(tc.expectTargetBinary) == 0 || !fileutil.Exist(tc.expectTargetBinary) {
return
}

t.Log("Start all members with new binary")
for i := 0; i < len(epc.Procs); i++ {
t.Logf("Replace binary for member %d: %v", i, epc.Procs[i].EndpointsGRPC())
member := epc.Procs[i]
member.Config().ExecPath = tc.expectTargetBinary
}
require.NoError(t, epc.Start(context.TODO()))

t.Log("Verify the versions of all members")
for i := 0; i < len(epc.Procs); i++ {
t.Logf("Verify the version of member %d: %v", i, epc.Procs[i].EndpointsGRPC())
expectedVersion := tc.expectStorageVersion
if expectedVersion == nil {
expectedVersion = &version.V3_5
}

verifyVersion(t, epc, epc.Procs[i], expectedVersion, expectedVersion)
}
})
}
}

func verifyVersion(t *testing.T, clus *e2e.EtcdProcessCluster, member e2e.EtcdProcess, expectedServerVersion, expectedClusterVersion *semver.Version) error {
var err error
expected := fmt.Sprintf(`"etcdserver":"%d.%d\..*"etcdcluster":"%d\.%d\.`, expectedServerVersion.Major, expectedServerVersion.Minor, expectedClusterVersion.Major, expectedClusterVersion.Minor)
for i := 0; i < 35; i++ {
if err = e2e.CURLGetFromMember(clus, member, e2e.CURLReq{Endpoint: "/version", Expected: expect.ExpectedResponse{Value: expected, IsRegularExpr: true}}); err != nil {
t.Logf("#%d: v3 is not ready yet (%v)", i, err)
time.Sleep(200 * time.Millisecond)
continue
}
break
}
if err != nil {
return fmt.Errorf("failed to verify version, expected %v got (%w)", expected, err)
}
return nil
}

0 comments on commit df49a51

Please sign in to comment.