Skip to content

Commit

Permalink
Merge pull request #329 from moio/configurable_leader_election_2.1
Browse files Browse the repository at this point in the history
[Backport]: Configurable leader election 2.1
  • Loading branch information
rmweir authored Oct 23, 2023
2 parents e168394 + 63b9a5f commit 060ae2b
Show file tree
Hide file tree
Showing 2 changed files with 237 additions and 35 deletions.
116 changes: 81 additions & 35 deletions pkg/leader/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package leader

import (
"context"
"fmt"
"os"
"time"

Expand All @@ -13,6 +14,18 @@ import (

type Callback func(cb context.Context)

const devModeEnvKey = "CATTLE_DEV_MODE"
const leaseDurationEnvKey = "CATTLE_ELECTION_LEASE_DURATION"
const renewDeadlineEnvKey = "CATTLE_ELECTION_RENEW_DEADLINE"
const retryPeriodEnvKey = "CATTLE_ELECTION_RETRY_PERIOD"

const defaultLeaseDuration = 45 * time.Second
const defaultRenewDeadline = 30 * time.Second
const defaultRetryPeriod = 2 * time.Second

const developmentLeaseDuration = 45 * time.Hour
const developmentRenewDeadline = 30 * time.Hour

func RunOrDie(ctx context.Context, namespace, name string, client kubernetes.Interface, cb Callback) {
if namespace == "" {
namespace = "kube-system"
Expand Down Expand Up @@ -43,42 +56,75 @@ func run(ctx context.Context, namespace, name string, client kubernetes.Interfac
logrus.Fatalf("error creating leader lock for %s: %v", name, err)
}

t := time.Second
if dl := os.Getenv("CATTLE_DEV_MODE"); dl != "" {
t = time.Hour
cbs := leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
go cb(ctx)
},
OnStoppedLeading: func() {
select {
case <-ctx.Done():
// The context has been canceled or is otherwise complete.
// This is a request to terminate. Exit 0.
// Exiting cleanly is useful when the context is canceled
// so that Kubernetes doesn't record it exiting in error
// when the exit was requested. For example, the wrangler-cli
// package sets up a context that cancels when SIGTERM is
// sent in. If a node is shut down this is the type of signal
// sent. In that case you want the 0 exit code to mark it as
// complete so that everything comes back up correctly after
// a restart.
// The pattern found here can be found inside the kube-scheduler.
logrus.Info("requested to terminate, exiting")
os.Exit(0)
default:
logrus.Fatalf("leaderelection lost for %s", name)
}
},
}

leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: 45 * t,
RenewDeadline: 30 * t,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
go cb(ctx)
},
OnStoppedLeading: func() {
select {
case <-ctx.Done():
// The context has been canceled or is otherwise complete.
// This is a request to terminate. Exit 0.
// Exiting cleanly is useful when the context is canceled
// so that Kubernetes doesn't record it exiting in error
// when the exit was requested. For example, the wrangler-cli
// package sets up a context that cancels when SIGTERM is
// sent in. If a node is shut down this is the type of signal
// sent. In that case you want the 0 exit code to mark it as
// complete so that everything comes back up correctly after
// a restart.
// The pattern found here can be found inside the kube-scheduler.
logrus.Info("requested to terminate, exiting")
os.Exit(0)
default:
logrus.Fatalf("leaderelection lost for %s", name)
}
},
},
ReleaseOnCancel: true,
})
config, err := computeConfig(rl, cbs)
if err != nil {
return err
}

leaderelection.RunOrDie(ctx, *config)
panic("unreachable")
}

func computeConfig(rl resourcelock.Interface, cbs leaderelection.LeaderCallbacks) (*leaderelection.LeaderElectionConfig, error) {
leaseDuration := defaultLeaseDuration
renewDeadline := defaultRenewDeadline
retryPeriod := defaultRetryPeriod
var err error
if d := os.Getenv(devModeEnvKey); d != "" {
leaseDuration = developmentLeaseDuration
renewDeadline = developmentRenewDeadline
}
if d := os.Getenv(leaseDurationEnvKey); d != "" {
leaseDuration, err = time.ParseDuration(d)
if err != nil {
return nil, fmt.Errorf("%s value [%s] is not a valid duration: %w", leaseDurationEnvKey, d, err)
}
}
if d := os.Getenv(renewDeadlineEnvKey); d != "" {
renewDeadline, err = time.ParseDuration(d)
if err != nil {
return nil, fmt.Errorf("%s value [%s] is not a valid duration: %w", renewDeadlineEnvKey, d, err)
}
}
if d := os.Getenv(retryPeriodEnvKey); d != "" {
retryPeriod, err = time.ParseDuration(d)
if err != nil {
return nil, fmt.Errorf("%s value [%s] is not a valid duration: %w", retryPeriodEnvKey, d, err)
}
}

return &leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
Callbacks: cbs,
ReleaseOnCancel: true,
}, nil
}
156 changes: 156 additions & 0 deletions pkg/leader/leader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package leader

import (
"os"
"reflect"
"testing"
"time"

"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
)

func Test_computeConfig(t *testing.T) {
type args struct {
rl resourcelock.Interface
cbs leaderelection.LeaderCallbacks
}
type env struct {
key string
value string
}
tests := []struct {
name string
args args
envs []env
want *leaderelection.LeaderElectionConfig
wantErr bool
}{
{
name: "all defaults",
args: args{
rl: nil,
cbs: leaderelection.LeaderCallbacks{},
},
envs: []env{},
want: &leaderelection.LeaderElectionConfig{
Lock: nil,
LeaseDuration: defaultLeaseDuration,
RenewDeadline: defaultRenewDeadline,
RetryPeriod: defaultRetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{},
ReleaseOnCancel: true,
},
wantErr: false,
},
{
name: "dev mode",
args: args{
rl: nil,
cbs: leaderelection.LeaderCallbacks{},
},
envs: []env{
{key: devModeEnvKey, value: "true"},
},
want: &leaderelection.LeaderElectionConfig{
Lock: nil,
LeaseDuration: developmentLeaseDuration,
RenewDeadline: developmentRenewDeadline,
RetryPeriod: defaultRetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{},
ReleaseOnCancel: true,
},
wantErr: false,
},
{
name: "all overridden",
args: args{
rl: nil,
cbs: leaderelection.LeaderCallbacks{},
},
envs: []env{
{key: devModeEnvKey, value: "true"},
{key: leaseDurationEnvKey, value: "1s"},
{key: renewDeadlineEnvKey, value: "2s"},
{key: retryPeriodEnvKey, value: "3m"},
},
want: &leaderelection.LeaderElectionConfig{
Lock: nil,
LeaseDuration: time.Second,
RenewDeadline: 2 * time.Second,
RetryPeriod: 3 * time.Minute,
Callbacks: leaderelection.LeaderCallbacks{},
ReleaseOnCancel: true,
},
wantErr: false,
},
{
name: "unparseable lease duration",
args: args{
rl: nil,
cbs: leaderelection.LeaderCallbacks{},
},
envs: []env{
{key: leaseDurationEnvKey, value: "bomb"},
{key: renewDeadlineEnvKey, value: "2s"},
{key: retryPeriodEnvKey, value: "3m"},
},
want: nil,
wantErr: true,
},
{
name: "unparseable renew deadline",
args: args{
rl: nil,
cbs: leaderelection.LeaderCallbacks{},
},
envs: []env{
{key: leaseDurationEnvKey, value: "1s"},
{key: renewDeadlineEnvKey, value: "bomb"},
{key: retryPeriodEnvKey, value: "3m"},
},
want: nil,
wantErr: true,
},
{
name: "unparseable retry period",
args: args{
rl: nil,
cbs: leaderelection.LeaderCallbacks{},
},
envs: []env{
{key: leaseDurationEnvKey, value: "1s"},
{key: renewDeadlineEnvKey, value: "2s"},
{key: retryPeriodEnvKey, value: "bomb"},
},
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
for _, e := range []string{leaseDurationEnvKey, renewDeadlineEnvKey, retryPeriodEnvKey} {
err := os.Unsetenv(e)
if err != nil {
t.Errorf("could not Unsetenv: %v", err)
return
}
}
for _, e := range tt.envs {
err := os.Setenv(e.key, e.value)
if err != nil {
t.Errorf("could not SetEnv: %v", err)
return
}
}
got, err := computeConfig(tt.args.rl, tt.args.cbs)
if (err != nil) != tt.wantErr {
t.Errorf("computeConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("computeConfig() got = %v, want %v", got, tt.want)
}
})
}
}

0 comments on commit 060ae2b

Please sign in to comment.