diff --git a/bee_test.go b/bee_test.go index 638249d..baa3f61 100644 --- a/bee_test.go +++ b/bee_test.go @@ -37,7 +37,7 @@ import ( const hostText = ` host1 bee_host=192.168.2.141 bee_user=root bee_ssh_passwd=xxx -localhost bee_connect=grpc bee_host=127.0.0.1 bee_port=15250 bee_platform=darwin bee_arch=arm64 bee_home=/tmp/bee +localhost bee_connect=grpc bee_host=127.0.0.1 bee_port=15250 bee_platform=linux bee_arch=amd64 bee_home=/tmp/bee host2 bee_host=192.168.2.164 bee_connect=winrm bee_platform=windows bee_user=Administrator bee_home=C:\\Windows\\Temp\\bee ` @@ -45,13 +45,23 @@ func startGRPCServer(t *testing.T) { port := 15250 addr := fmt.Sprintf("localhost:%d", port) + ep := keepalive.EnforcementPolicy{ + MinTime: time.Hour * 2, + PermitWithoutStream: true, + } + kp := keepalive.ServerParameters{ - Time: 5 * time.Minute, - Timeout: 1 * time.Minute, + //MaxConnectionIdle: 30 * time.Second, + //MaxConnectionAge: 45 * time.Second, + //MaxConnectionAgeGrace: 15 * time.Second, + Time: 15 * time.Second, + Timeout: 5 * time.Second, } impl := bs.NewServer() - server := grpc.NewServer(grpc.KeepaliveParams(kp)) + server := grpc.NewServer( + grpc.KeepaliveParams(kp), grpc.KeepaliveEnforcementPolicy(ep), + ) pb.RegisterRemoteRPCServer(server, impl) ln, err := net.Listen("tcp", addr) if err != nil { @@ -120,12 +130,6 @@ func Test_Runtime(t *testing.T) { t.Fatal(err) } t.Log(string(data)) - - //data, err = rt.Execute(ctx, "localhost", "ping", options...) - //if err != nil { - // t.Fatal(err) - //} - //t.Log(string(data)) } func Test_Copy(t *testing.T) { diff --git a/executor/client/grpc/grpc.go b/executor/client/grpc/grpc.go index f0d2108..d94636e 100644 --- a/executor/client/grpc/grpc.go +++ b/executor/client/grpc/grpc.go @@ -26,13 +26,11 @@ import ( "time" "github.com/cockroachdb/errors" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/keepalive" - pb "github.com/olive-io/bee/api/rpc" "github.com/olive-io/bee/api/rpctype" "github.com/olive-io/bee/executor/client" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) const ( @@ -74,17 +72,17 @@ func (c *Client) dial(ctx context.Context) (*grpc.ClientConn, error) { ctx, cancel := context.WithTimeout(ctx, cfg.Timeout) defer cancel() - ckp := keepalive.ClientParameters{ - Time: 10 * time.Second, - Timeout: 5 * time.Second, - PermitWithoutStream: true, - } - opts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithKeepaliveParams(ckp), } + //ckp := keepalive.ClientParameters{ + // Time: 10 * time.Second, + // Timeout: 20 * time.Second, + // PermitWithoutStream: true, + //} + //opts = append(opts, grpc.WithKeepaliveParams(ckp)) + conn, err := grpc.DialContext(ctx, cfg.Address, opts...) if err != nil { return nil, err diff --git a/module/command.go b/module/command.go index e7423b0..e89c9fb 100644 --- a/module/command.go +++ b/module/command.go @@ -211,11 +211,11 @@ var DefaultRunCommand RunE = func(ctx *RunContext, opts ...client.ExecOption) ([ shell := fmt.Sprintf("%s -import %s %s %s", repl, resolve, script, strings.Join(args, " ")) start := time.Now() cmd, err := conn.Execute(ctx, repl, options...) - lg.Debug("remote execute", zap.String("command", shell), zap.Duration("took", time.Now().Sub(start))) if err != nil { return nil, err } data, err := cmd.CombinedOutput() + lg.Debug("remote execute", zap.String("command", shell), zap.Duration("took", time.Now().Sub(start))) if err != nil { return nil, &CommandErr{Err: err, Stderr: beautify(data)} } diff --git a/process_test.go b/process_test.go index a7c76fe..6807336 100644 --- a/process_test.go +++ b/process_test.go @@ -129,14 +129,14 @@ func TestRuntime_PlayWithService(t *testing.T) { } func TestRuntime_PlayWithTracer(t *testing.T) { - sources := []string{"host1"} + sources := []string{"host1", "localhost"} rt, inventory, cancel := newRuntime(t) defer cancel() ctx := context.TODO() options := make([]bee.RunOption, 0) tracer := make(chan tracing.ITrace, 10) - options = append(options, bee.WithRunTracer(tracer)) + options = append(options, bee.WithRunTracer(tracer), bee.WithRunSync(false)) inventory.AddSources(sources...) pr := &process.Process{ @@ -148,6 +148,8 @@ func TestRuntime_PlayWithTracer(t *testing.T) { Name: "first task", Id: "t1", Action: "ping", + Hosts: []string{"localhost"}, + Args: map[string]any{"data": "timeout"}, }, }, }