Skip to content

Commit

Permalink
remove subscribeTopicEvents API demo
Browse files Browse the repository at this point in the history
Signed-off-by: “huazhongming” <crazyhzm@apache.org>
  • Loading branch information
CrazyHZM committed Nov 19, 2024
1 parent 530e88d commit 744d3d7
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 122 deletions.
17 changes: 0 additions & 17 deletions demo/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,5 @@ go build -o layotto
cd ${project_path}/demo/pubsub/client/
go build -o publisher
./publisher -s pub_subs_demo

#################### Run pubsub demo with SubscribeTopicEvents ####################
# 1. start layotto
cd ${project_path}/cmd/layotto
go build -o layotto
./layotto start -c ../../configs/config_standalone.json

cd ${project_path}/demo/pubsub/dynamic/
# 2. start subscriber
go build -o subscriber
/.subscriber -s pub_subs_demo

# 3. start publisher
cd ${project_path}/demo/pubsub/client/
go build -o publisher
./publisher -s pub_subs_demo


```
96 changes: 0 additions & 96 deletions demo/pubsub/server/dynamic/subscribe_server.go

This file was deleted.

2 changes: 0 additions & 2 deletions pkg/grpc/default_api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"errors"
"sync"
"sync/atomic"

"github.com/dapr/components-contrib/secretstores"

Expand Down Expand Up @@ -90,7 +89,6 @@ type api struct {
streamer *streamer
// json
json jsoniter.API
closed atomic.Bool
closeCh chan struct{}
wg sync.WaitGroup
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/grpc/default_api/api_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,15 @@ func (a *api) publishMessageGRPC(ctx context.Context, msg *pubsub.NewMessage) er

// TODO tracing
envelope, cloudEvent, err := a.envelopeFromSubscriptionMessage(ctx, msg)

if err != nil {
return err
}

if envelope == nil {
return nil
}

// Call appcallback
clientV1 := runtimev1pb.NewAppCallbackClient(a.AppCallbackConn)
res, err := clientV1.OnTopicEvent(ctx, envelope)
Expand Down
4 changes: 2 additions & 2 deletions pkg/grpc/default_api/api_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,13 @@ func (a *api) publishMessageForStream(ctx context.Context, msg *pubsub.NewMessag
return fmt.Errorf("no streamer subscribed to pubsub %q topic %q", pubsubName, msg.Topic)
}

envelope, cloudEvent, err := a.envelopeFromSubscriptionMessage(ctx, msg)
envelope, cloudEvent, _ := a.envelopeFromSubscriptionMessage(ctx, msg)

ch, defFn := conn.registerPublishResponse(envelope.GetId())
defer defFn()

conn.streamLock.Lock()
err = conn.stream.Send(&runtimev1pb.SubscribeTopicEventsResponse{
err := conn.stream.Send(&runtimev1pb.SubscribeTopicEventsResponse{
SubscribeTopicEventsResponseType: &runtimev1pb.SubscribeTopicEventsResponse_EventMessage{
EventMessage: envelope,
},
Expand Down
8 changes: 3 additions & 5 deletions pkg/grpc/default_api/api_subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestPublishMessageForStream(t *testing.T) {
Topic: topic,
Metadata: make(map[string]string),
}
a := NewAPI("", nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
a := NewAPI("", nil, nil, nil, make(map[string]pubsub.PubSub), nil, nil, nil, nil, nil, nil)

var apiForTest = a.(*api)

Expand All @@ -112,14 +112,12 @@ func TestPublishMessageForStream(t *testing.T) {
publishResponses: make(map[string]chan *runtimev1pb.SubscribeTopicEventsRequestProcessed),
}

err = apiForTest.SubscribeTopicEvents(stream)
assert.Nil(t, err)

_ = apiForTest.SubscribeTopicEvents(stream)
apiForTest.json = jsoniter.ConfigFastest

go func() {
time.Sleep(1 * time.Second)
ch, _ := apiForTest.streamer.subscribers["___test||layotto"].publishResponses["1"]
ch := apiForTest.streamer.subscribers["___test||layotto"].publishResponses["1"]
ch <- &runtimev1pb.SubscribeTopicEventsRequestProcessed{
Id: "1",
Status: &runtimev1pb.TopicEventResponse{
Expand Down

0 comments on commit 744d3d7

Please sign in to comment.