Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support SubscribeTopicEvents API #1088

Merged
merged 5 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion demo/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ You can run server/client demo with different component names.
It is worth noting that both server and client demo should set the same store name by param `-s`.
For example:
```shell
cd ${project_path}/demo/pubsub/server/
#################### Run pubsub demo with appcallback ####################
cd ${project_path}/demo/pubsub/appcallback/
# 1. start subscriber
go build -o subscriber
/.subscriber -s pub_subs_demo
Expand Down
2 changes: 2 additions & 0 deletions pkg/grpc/default_api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type api struct {
// app callback
AppCallbackConn *grpc.ClientConn
topicPerComponent map[string]TopicSubscriptions
streamer *streamer
// json
json jsoniter.API
}
Expand Down Expand Up @@ -148,6 +149,7 @@ func NewAPI(
secretStores: secretStores,
json: jsoniter.ConfigFastest,
}

}

func (a *api) SayHello(ctx context.Context, in *runtimev1pb.SayHelloRequest) (*runtimev1pb.SayHelloResponse, error) {
Expand Down
94 changes: 54 additions & 40 deletions pkg/grpc/default_api/api_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,56 +151,23 @@ func (a *api) getInterestedTopics() (map[string]TopicSubscriptions, error) {
}

func (a *api) publishMessageGRPC(ctx context.Context, msg *pubsub.NewMessage) error {
// 1. Unmarshal to cloudEvent model
var cloudEvent map[string]interface{}
err := a.json.Unmarshal(msg.Data, &cloudEvent)

// TODO tracing
envelope, cloudEvent, err := a.envelopeFromSubscriptionMessage(ctx, msg)

if err != nil {
log.DefaultLogger.Debugf("[runtime]error deserializing cloud events proto: %s", err)
return err
}

// 2. Drop msg if the current cloud event has expired
if pubsub.HasExpired(cloudEvent) {
log.DefaultLogger.Warnf("[runtime]dropping expired pub/sub event %v as of %v", cloudEvent[pubsub.IDField].(string), cloudEvent[pubsub.ExpirationField].(string))
if envelope == nil {
return nil
}

// 3. Convert to proto domain struct
envelope := &runtimev1pb.TopicEventRequest{
Id: cloudEvent[pubsub.IDField].(string),
Source: cloudEvent[pubsub.SourceField].(string),
DataContentType: cloudEvent[pubsub.DataContentTypeField].(string),
Type: cloudEvent[pubsub.TypeField].(string),
SpecVersion: cloudEvent[pubsub.SpecVersionField].(string),
Topic: msg.Topic,
PubsubName: msg.Metadata[Metadata_key_pubsubName],
}

// set data field
if data, ok := cloudEvent[pubsub.DataBase64Field]; ok && data != nil {
decoded, decodeErr := base64.StdEncoding.DecodeString(data.(string))
if decodeErr != nil {
log.DefaultLogger.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr)
return err
}

envelope.Data = decoded
} else if data, ok := cloudEvent[pubsub.DataField]; ok && data != nil {
envelope.Data = nil

if contenttype.IsStringContentType(envelope.DataContentType) {
envelope.Data = []byte(data.(string))
} else if contenttype.IsJSONContentType(envelope.DataContentType) {
envelope.Data, _ = a.json.Marshal(data)
}
}
// TODO tracing

// 4. Call appcallback
// Call appcallback
clientV1 := runtimev1pb.NewAppCallbackClient(a.AppCallbackConn)
res, err := clientV1.OnTopicEvent(ctx, envelope)

// 5. Check result
// Check result
return retryStrategy(err, res, cloudEvent)
}

Expand Down Expand Up @@ -246,3 +213,50 @@ func listTopicSubscriptions(client runtimev1pb.AppCallbackClient, log log.ErrorL
}
return make([]*runtimev1pb.TopicSubscription, 0)
}

func (a *api) envelopeFromSubscriptionMessage(ctx context.Context, msg *pubsub.NewMessage) (*runtimev1pb.TopicEventRequest, map[string]interface{}, error) {
// 1. Unmarshal to cloudEvent model
var cloudEvent map[string]interface{}
err := a.json.Unmarshal(msg.Data, &cloudEvent)
if err != nil {
log.DefaultLogger.Debugf("[runtime]error deserializing cloud events proto: %s", err)
return nil, cloudEvent, err
}

// 2. Drop msg if the current cloud event has expired
if pubsub.HasExpired(cloudEvent) {
log.DefaultLogger.Warnf("[runtime]dropping expired pub/sub event %v as of %v", cloudEvent[pubsub.IDField].(string), cloudEvent[pubsub.ExpirationField].(string))
return nil, cloudEvent, nil
}

// 3. Convert to proto domain struct
envelope := &runtimev1pb.TopicEventRequest{
Id: cloudEvent[pubsub.IDField].(string),
Source: cloudEvent[pubsub.SourceField].(string),
DataContentType: cloudEvent[pubsub.DataContentTypeField].(string),
Type: cloudEvent[pubsub.TypeField].(string),
SpecVersion: cloudEvent[pubsub.SpecVersionField].(string),
Topic: msg.Topic,
PubsubName: msg.Metadata[Metadata_key_pubsubName],
}

// set data field
if data, ok := cloudEvent[pubsub.DataBase64Field]; ok && data != nil {
decoded, decodeErr := base64.StdEncoding.DecodeString(data.(string))
if decodeErr != nil {
log.DefaultLogger.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr)
return nil, cloudEvent, err
}

envelope.Data = decoded
} else if data, ok := cloudEvent[pubsub.DataField]; ok && data != nil {
envelope.Data = nil

if contenttype.IsStringContentType(envelope.DataContentType) {
envelope.Data = []byte(data.(string))
} else if contenttype.IsJSONContentType(envelope.DataContentType) {
envelope.Data, _ = a.json.Marshal(data)
}
}
return envelope, cloudEvent, nil
}
Loading
Loading