From 744d3d74d0a3e679923e0302be2ace73a15fad42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Chuazhongming=E2=80=9D?= Date: Tue, 19 Nov 2024 21:26:55 +0800 Subject: [PATCH] remove subscribeTopicEvents API demo MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “huazhongming” --- demo/pubsub/README.md | 17 ---- .../pubsub/server/dynamic/subscribe_server.go | 96 ------------------- pkg/grpc/default_api/api.go | 2 - pkg/grpc/default_api/api_pubsub.go | 9 ++ pkg/grpc/default_api/api_subscribe.go | 4 +- pkg/grpc/default_api/api_subscribe_test.go | 8 +- 6 files changed, 14 insertions(+), 122 deletions(-) delete mode 100644 demo/pubsub/server/dynamic/subscribe_server.go diff --git a/demo/pubsub/README.md b/demo/pubsub/README.md index 0bd211ee2c..54adb532dd 100644 --- a/demo/pubsub/README.md +++ b/demo/pubsub/README.md @@ -17,22 +17,5 @@ go build -o layotto cd ${project_path}/demo/pubsub/client/ go build -o publisher ./publisher -s pub_subs_demo - - #################### Run pubsub demo with SubscribeTopicEvents #################### -# 1. start layotto -cd ${project_path}/cmd/layotto -go build -o layotto -./layotto start -c ../../configs/config_standalone.json - -cd ${project_path}/demo/pubsub/dynamic/ -# 2. start subscriber -go build -o subscriber -/.subscriber -s pub_subs_demo - -# 3. start publisher - cd ${project_path}/demo/pubsub/client/ -go build -o publisher - ./publisher -s pub_subs_demo - ``` \ No newline at end of file diff --git a/demo/pubsub/server/dynamic/subscribe_server.go b/demo/pubsub/server/dynamic/subscribe_server.go deleted file mode 100644 index fe7ec628eb..0000000000 --- a/demo/pubsub/server/dynamic/subscribe_server.go +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright 2021 Layotto Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "context" - "errors" - "flag" - "log" - "time" - - "mosn.io/layotto/sdk/go-sdk/client" - - runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" -) - -var storeName string - -func init() { - flag.StringVar(&storeName, "s", "", "set `storeName`") -} - -func main() { - flag.Parse() - if storeName == "" { - panic("storeName is empty.") - } - testDynamicSub() -} - -func testDynamicSub() { - cli, err := client.NewClient() - - // Use SubscribeWithHandler API to subscribe to a topic. - stop, err := cli.SubscribeWithHandler(context.Background(), client.SubscriptionRequest{ - PubsubName: storeName, - Topic: "hello", - Metadata: nil, - }, eventHandler) - - //Use Subscribe API to subscribe to a topic. - sub, err := cli.Subscribe(context.Background(), client.SubscriptionRequest{ - PubsubName: storeName, - Topic: "topic1", - Metadata: nil, - }) - - if err != nil { - log.Fatalf("failed to subscribe to topic: %v", err) - } - - msg, err := sub.Receive() - if err != nil { - log.Fatalf("Error receiving message: %v", err) - } - log.Printf(">>[Subscribe API]Received message\n") - log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s\n", msg.PubsubName, msg.Topic, msg.Id, msg.Data) - - // Use _MUST_ always signal the result of processing the message, else the - // message will not be considered as processed and will be redelivered or - // dead lettered. - if err := msg.Success(); err != nil { - log.Fatalf("error sending message success: %v", err) - } - - time.Sleep(time.Second * 10) - - if err := errors.Join(stop(), sub.Close()); err != nil { - log.Fatal(err) - } - - if err != nil { - panic(err) - } - cli.Close() -} - -func eventHandler(request *runtimev1pb.TopicEventRequest) client.SubscriptionResponseStatus { - log.Printf(">>[SubscribeWithHandler API] Received message\n") - log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s\n", request.PubsubName, request.Topic, request.Id, request.Data) - return client.SubscriptionResponseStatusSuccess -} diff --git a/pkg/grpc/default_api/api.go b/pkg/grpc/default_api/api.go index 32957a7e1e..cb16b24a2f 100644 --- a/pkg/grpc/default_api/api.go +++ b/pkg/grpc/default_api/api.go @@ -20,7 +20,6 @@ import ( "context" "errors" "sync" - "sync/atomic" "github.com/dapr/components-contrib/secretstores" @@ -90,7 +89,6 @@ type api struct { streamer *streamer // json json jsoniter.API - closed atomic.Bool closeCh chan struct{} wg sync.WaitGroup } diff --git a/pkg/grpc/default_api/api_pubsub.go b/pkg/grpc/default_api/api_pubsub.go index 36209c9e82..43c78216e6 100644 --- a/pkg/grpc/default_api/api_pubsub.go +++ b/pkg/grpc/default_api/api_pubsub.go @@ -154,6 +154,15 @@ func (a *api) publishMessageGRPC(ctx context.Context, msg *pubsub.NewMessage) er // TODO tracing envelope, cloudEvent, err := a.envelopeFromSubscriptionMessage(ctx, msg) + + if err != nil { + return err + } + + if envelope == nil { + return nil + } + // Call appcallback clientV1 := runtimev1pb.NewAppCallbackClient(a.AppCallbackConn) res, err := clientV1.OnTopicEvent(ctx, envelope) diff --git a/pkg/grpc/default_api/api_subscribe.go b/pkg/grpc/default_api/api_subscribe.go index ddef3796cb..d8dc3704f0 100644 --- a/pkg/grpc/default_api/api_subscribe.go +++ b/pkg/grpc/default_api/api_subscribe.go @@ -210,13 +210,13 @@ func (a *api) publishMessageForStream(ctx context.Context, msg *pubsub.NewMessag return fmt.Errorf("no streamer subscribed to pubsub %q topic %q", pubsubName, msg.Topic) } - envelope, cloudEvent, err := a.envelopeFromSubscriptionMessage(ctx, msg) + envelope, cloudEvent, _ := a.envelopeFromSubscriptionMessage(ctx, msg) ch, defFn := conn.registerPublishResponse(envelope.GetId()) defer defFn() conn.streamLock.Lock() - err = conn.stream.Send(&runtimev1pb.SubscribeTopicEventsResponse{ + err := conn.stream.Send(&runtimev1pb.SubscribeTopicEventsResponse{ SubscribeTopicEventsResponseType: &runtimev1pb.SubscribeTopicEventsResponse_EventMessage{ EventMessage: envelope, }, diff --git a/pkg/grpc/default_api/api_subscribe_test.go b/pkg/grpc/default_api/api_subscribe_test.go index 2c859d0ead..48188dacd3 100644 --- a/pkg/grpc/default_api/api_subscribe_test.go +++ b/pkg/grpc/default_api/api_subscribe_test.go @@ -99,7 +99,7 @@ func TestPublishMessageForStream(t *testing.T) { Topic: topic, Metadata: make(map[string]string), } - a := NewAPI("", nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + a := NewAPI("", nil, nil, nil, make(map[string]pubsub.PubSub), nil, nil, nil, nil, nil, nil) var apiForTest = a.(*api) @@ -112,14 +112,12 @@ func TestPublishMessageForStream(t *testing.T) { publishResponses: make(map[string]chan *runtimev1pb.SubscribeTopicEventsRequestProcessed), } - err = apiForTest.SubscribeTopicEvents(stream) - assert.Nil(t, err) - + _ = apiForTest.SubscribeTopicEvents(stream) apiForTest.json = jsoniter.ConfigFastest go func() { time.Sleep(1 * time.Second) - ch, _ := apiForTest.streamer.subscribers["___test||layotto"].publishResponses["1"] + ch := apiForTest.streamer.subscribers["___test||layotto"].publishResponses["1"] ch <- &runtimev1pb.SubscribeTopicEventsRequestProcessed{ Id: "1", Status: &runtimev1pb.TopicEventResponse{