Skip to content

Commit

Permalink
keepalive
Browse files Browse the repository at this point in the history
  • Loading branch information
Lack30 committed Mar 25, 2024
1 parent 71dd77a commit 74e4796
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 24 deletions.
24 changes: 14 additions & 10 deletions bee_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,31 @@ import (

const hostText = `
host1 bee_host=192.168.2.141 bee_user=root bee_ssh_passwd=xxx
localhost bee_connect=grpc bee_host=127.0.0.1 bee_port=15250 bee_platform=darwin bee_arch=arm64 bee_home=/tmp/bee
localhost bee_connect=grpc bee_host=127.0.0.1 bee_port=15250 bee_platform=linux bee_arch=amd64 bee_home=/tmp/bee
host2 bee_host=192.168.2.164 bee_connect=winrm bee_platform=windows bee_user=Administrator bee_home=C:\\Windows\\Temp\\bee
`

func startGRPCServer(t *testing.T) {
port := 15250
addr := fmt.Sprintf("localhost:%d", port)

ep := keepalive.EnforcementPolicy{
MinTime: time.Hour * 2,
PermitWithoutStream: true,
}

kp := keepalive.ServerParameters{
Time: 5 * time.Minute,
Timeout: 1 * time.Minute,
//MaxConnectionIdle: 30 * time.Second,
//MaxConnectionAge: 45 * time.Second,
//MaxConnectionAgeGrace: 15 * time.Second,
Time: 15 * time.Second,
Timeout: 5 * time.Second,
}

impl := bs.NewServer()
server := grpc.NewServer(grpc.KeepaliveParams(kp))
server := grpc.NewServer(
grpc.KeepaliveParams(kp), grpc.KeepaliveEnforcementPolicy(ep),
)
pb.RegisterRemoteRPCServer(server, impl)
ln, err := net.Listen("tcp", addr)
if err != nil {
Expand Down Expand Up @@ -120,12 +130,6 @@ func Test_Runtime(t *testing.T) {
t.Fatal(err)
}
t.Log(string(data))

//data, err = rt.Execute(ctx, "localhost", "ping", options...)
//if err != nil {
// t.Fatal(err)
//}
//t.Log(string(data))
}

func Test_Copy(t *testing.T) {
Expand Down
20 changes: 9 additions & 11 deletions executor/client/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@ import (
"time"

"github.com/cockroachdb/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"

pb "github.com/olive-io/bee/api/rpc"
"github.com/olive-io/bee/api/rpctype"
"github.com/olive-io/bee/executor/client"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
Expand Down Expand Up @@ -74,17 +72,17 @@ func (c *Client) dial(ctx context.Context) (*grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(ctx, cfg.Timeout)
defer cancel()

ckp := keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 5 * time.Second,
PermitWithoutStream: true,
}

opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(ckp),
}

//ckp := keepalive.ClientParameters{
// Time: 10 * time.Second,
// Timeout: 20 * time.Second,
// PermitWithoutStream: true,
//}
//opts = append(opts, grpc.WithKeepaliveParams(ckp))

conn, err := grpc.DialContext(ctx, cfg.Address, opts...)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion module/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,11 @@ var DefaultRunCommand RunE = func(ctx *RunContext, opts ...client.ExecOption) ([
shell := fmt.Sprintf("%s -import %s %s %s", repl, resolve, script, strings.Join(args, " "))
start := time.Now()
cmd, err := conn.Execute(ctx, repl, options...)
lg.Debug("remote execute", zap.String("command", shell), zap.Duration("took", time.Now().Sub(start)))
if err != nil {
return nil, err
}
data, err := cmd.CombinedOutput()
lg.Debug("remote execute", zap.String("command", shell), zap.Duration("took", time.Now().Sub(start)))
if err != nil {
return nil, &CommandErr{Err: err, Stderr: beautify(data)}
}
Expand Down
6 changes: 4 additions & 2 deletions process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,14 @@ func TestRuntime_PlayWithService(t *testing.T) {
}

func TestRuntime_PlayWithTracer(t *testing.T) {
sources := []string{"host1"}
sources := []string{"host1", "localhost"}
rt, inventory, cancel := newRuntime(t)
defer cancel()

ctx := context.TODO()
options := make([]bee.RunOption, 0)
tracer := make(chan tracing.ITrace, 10)
options = append(options, bee.WithRunTracer(tracer))
options = append(options, bee.WithRunTracer(tracer), bee.WithRunSync(false))
inventory.AddSources(sources...)

pr := &process.Process{
Expand All @@ -148,6 +148,8 @@ func TestRuntime_PlayWithTracer(t *testing.T) {
Name: "first task",
Id: "t1",
Action: "ping",
Hosts: []string{"localhost"},
Args: map[string]any{"data": "timeout"},
},
},
}
Expand Down

0 comments on commit 74e4796

Please sign in to comment.