From 51f105c8fad9b4a1cb3c796898b389a78369dc7a Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Thu, 27 Jun 2024 09:01:23 +0200 Subject: [PATCH 01/17] feat(SPV-848): webhook subscribe --- examples/Taskfile.yml | 5 +++ examples/webhooks/webhooks.go | 41 +++++++++++++++++++++++ http.go | 18 ++++++++++ webhook.go | 62 +++++++++++++++++++++++++++++++++++ 4 files changed, 126 insertions(+) create mode 100644 examples/webhooks/webhooks.go create mode 100644 webhook.go diff --git a/examples/Taskfile.yml b/examples/Taskfile.yml index b67c760..f6116bd 100644 --- a/examples/Taskfile.yml +++ b/examples/Taskfile.yml @@ -66,3 +66,8 @@ tasks: cmds: - echo "running generate_totp..." - go run ./generate_totp/generate_totp.go + webhooks: + desc: "running webhooks..." + cmds: + - echo "running webhooks..." + - go run ./webhooks/webhooks.go diff --git a/examples/webhooks/webhooks.go b/examples/webhooks/webhooks.go new file mode 100644 index 0000000..de3a041 --- /dev/null +++ b/examples/webhooks/webhooks.go @@ -0,0 +1,41 @@ +/* +Package main - send_op_return example +*/ +package main + +import ( + "context" + "fmt" + "net/http" + "time" + + walletclient "github.com/bitcoin-sv/spv-wallet-go-client" + "github.com/bitcoin-sv/spv-wallet-go-client/examples" +) + +func main() { + defer examples.HandlePanic() + + client := walletclient.NewWithAdminKey("http://localhost:3003/v1", examples.ExampleAdminKey) + wh := walletclient.NewWebhook(client, "http://localhost:5005/notification", "", "") + err := wh.Subscribe(context.Background()) + if err != nil { + panic(err) + } + + http.Handle("/notification", wh.HTTPHandler()) + + go func() { + for { + select { + case event := <-wh.Channel: + time.Sleep(100 * time.Millisecond) // simulate processing time + fmt.Println(event) + case <-context.Background().Done(): + return + } + } + }() + + http.ListenAndServe(":5005", nil) +} diff --git a/http.go b/http.go index 6c66be6..5632024 100644 --- a/http.go +++ b/http.go @@ -1128,3 +1128,21 @@ func (wc *WalletClient) SendToRecipients(ctx context.Context, recipients []*Reci return wc.RecordTransaction(ctx, hex, draft.ID, metadata) } + +func (wc *WalletClient) AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) ResponseError { + requestModel := struct { + URL string `json:"url"` + TokenHeader string `json:"tokenHeader"` + TokenValue string `json:"tokenValue"` + }{ + URL: webhookURL, + TokenHeader: tokenHeader, + TokenValue: tokenValue, + } + rawJSON, err := json.Marshal(requestModel) + if err != nil { + return WrapError(nil) + } + err = wc.doHTTPRequest(ctx, http.MethodPost, "/admin/webhooks/subscribe", rawJSON, wc.adminXPriv, true, nil) + return WrapError(err) +} diff --git a/webhook.go b/webhook.go new file mode 100644 index 0000000..4db3b4d --- /dev/null +++ b/webhook.go @@ -0,0 +1,62 @@ +package walletclient + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "time" +) + +const ( + eventBufferLength = 100 +) + +type Event any + +type Webhook struct { + URL string + TokenHeader string + TokenValue string + Channel chan Event + + client *WalletClient +} + +func NewWebhook(client *WalletClient, url, tokenHeader, tokenValue string) *Webhook { + return &Webhook{ + URL: url, + TokenHeader: tokenHeader, + TokenValue: tokenValue, + Channel: make(chan Event, eventBufferLength), + client: client, + } +} + +func (w *Webhook) Subscribe(ctx context.Context) ResponseError { + return w.client.AdminSubscribeWebhook(ctx, w.URL, w.TokenHeader, w.TokenValue) +} + +func (w *Webhook) HTTPHandler() http.Handler { + return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + var events []Event + if err := json.NewDecoder(r.Body).Decode(&events); err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + fmt.Printf("Received: %v\n", events) + for _, event := range events { + select { + case w.Channel <- event: + // event sent + case <-r.Context().Done(): + // context cancelled + return + case <-time.After(1 * time.Second): + // timeout, most probably the channel is full + // TODO: log this + } + } + rw.WriteHeader(http.StatusOK) + }) +} From 34c6be3360c4b6f07d7ba7ff7bd51b9f9079aa2b Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Thu, 27 Jun 2024 11:51:24 +0200 Subject: [PATCH 02/17] feat(SPV-848): unsubscribe --- examples/webhooks/webhooks.go | 22 +++++++++++++++++----- http.go | 14 ++++++++++++++ webhook.go | 8 ++++++++ 3 files changed, 39 insertions(+), 5 deletions(-) diff --git a/examples/webhooks/webhooks.go b/examples/webhooks/webhooks.go index de3a041..89596d9 100644 --- a/examples/webhooks/webhooks.go +++ b/examples/webhooks/webhooks.go @@ -16,8 +16,8 @@ import ( func main() { defer examples.HandlePanic() - client := walletclient.NewWithAdminKey("http://localhost:3003/v1", examples.ExampleAdminKey) - wh := walletclient.NewWebhook(client, "http://localhost:5005/notification", "", "") + client := walletclient.NewWithAdminKey("http://localhost:3003/v1", "xprv9s21ZrQH143K2pmNeAHBzU4JHNDaFaPTbzKbBCw55ErhMDLsxDwKqcaDVV3PwmEmRZa9qUaU261iJaUx8eBiBF77zrPxTH8JGXC7LZQnsgA") + wh := walletclient.NewWebhook(client, "http://localhost:5005/notification", "Authorization", "this-is-the-token") err := wh.Subscribe(context.Background()) if err != nil { panic(err) @@ -29,13 +29,25 @@ func main() { for { select { case event := <-wh.Channel: - time.Sleep(100 * time.Millisecond) // simulate processing time - fmt.Println(event) + time.Sleep(50 * time.Millisecond) // simulate processing time + fmt.Println("Processing event:", event) case <-context.Background().Done(): return } } }() - http.ListenAndServe(":5005", nil) + go func() { + _ = http.ListenAndServe(":5005", nil) + }() + + <-time.After(30 * time.Second) + + fmt.Printf("Unsubscribing...\n") + err = wh.Unsubscribe(context.Background()) + if err != nil { + panic(err) + } + + fmt.Printf("Shutting down...\n") } diff --git a/http.go b/http.go index 5632024..fd833a3 100644 --- a/http.go +++ b/http.go @@ -1146,3 +1146,17 @@ func (wc *WalletClient) AdminSubscribeWebhook(ctx context.Context, webhookURL, t err = wc.doHTTPRequest(ctx, http.MethodPost, "/admin/webhooks/subscribe", rawJSON, wc.adminXPriv, true, nil) return WrapError(err) } + +func (wc *WalletClient) AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) ResponseError { + requestModel := struct { + URL string `json:"url"` + }{ + URL: webhookURL, + } + rawJSON, err := json.Marshal(requestModel) + if err != nil { + return WrapError(nil) + } + err = wc.doHTTPRequest(ctx, http.MethodPost, "/admin/webhooks/unsubscribe", rawJSON, wc.adminXPriv, true, nil) + return WrapError(err) +} diff --git a/webhook.go b/webhook.go index 4db3b4d..e35d8ae 100644 --- a/webhook.go +++ b/webhook.go @@ -37,8 +37,16 @@ func (w *Webhook) Subscribe(ctx context.Context) ResponseError { return w.client.AdminSubscribeWebhook(ctx, w.URL, w.TokenHeader, w.TokenValue) } +func (w *Webhook) Unsubscribe(ctx context.Context) ResponseError { + return w.client.AdminUnsubscribeWebhook(ctx, w.URL) +} + func (w *Webhook) HTTPHandler() http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + if w.TokenHeader != "" && r.Header.Get(w.TokenHeader) != w.TokenValue { + http.Error(rw, "Unauthorized", http.StatusUnauthorized) + return + } var events []Event if err := json.NewDecoder(r.Body).Decode(&events); err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) From 2268fe4922c9dc0e66026172ca10a41eef1e3baf Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Thu, 27 Jun 2024 13:30:04 +0200 Subject: [PATCH 03/17] feat(SPV-848): event type --- notifications_dispatcher.go | 89 +++++++++++++++++++++++++++++++++++++ webhook.go | 8 ++-- 2 files changed, 92 insertions(+), 5 deletions(-) create mode 100644 notifications_dispatcher.go diff --git a/notifications_dispatcher.go b/notifications_dispatcher.go new file mode 100644 index 0000000..22cd0e1 --- /dev/null +++ b/notifications_dispatcher.go @@ -0,0 +1,89 @@ +package walletclient + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/pkg/errors" +) + +type Handlers struct { + GeneralPurposeEvent []func(*GeneralPurposeEvent) +} + +type NotificationsDispatcher struct { + ctx context.Context + input chan *RawEvent + handlers Handlers +} + +func NewNotificationsDispatcher(ctx context.Context, inputChannel chan *RawEvent) *NotificationsDispatcher { + obj := &NotificationsDispatcher{ + ctx: ctx, + input: inputChannel, + } + + return obj +} + +func (nd *NotificationsDispatcher) process() { + for { + select { + case event := <-nd.input: + switch event.Type { + case "general-purpose-event": + content, err := GetEventContent[GeneralPurposeEvent](event) + if err != nil { + fmt.Println("Error getting event content") + continue + } + for _, handler := range nd.handlers.GeneralPurposeEvent { + handler(content) + } + } + case <-nd.ctx.Done(): + return + } + } +} + +////////////////////// BELOW it should be imported from spv-wallet models + +// RawEvent - event type +type RawEvent struct { + Type string `json:"type"` + Content json.RawMessage `json:"content"` +} + +type EventContent interface { + GetType() string +} + +type GeneralPurposeEvent struct { + Value string +} + +func (GeneralPurposeEvent) GetType() string { + return "general-purpose-event" +} + +func GetEventContent[modelType EventContent](raw *RawEvent) (*modelType, error) { + model := *new(modelType) + if raw.Type != model.GetType() { + return nil, fmt.Errorf("Wrong type") + } + + if err := json.Unmarshal(raw.Content, &model); err != nil { + return nil, errors.Wrap(err, "Cannot unmarshall the content json") + } + return &model, nil +} + +func NewRawEvent(namedEvent EventContent) *RawEvent { + asJson, _ := json.Marshal(namedEvent) + return &RawEvent{ + Type: namedEvent.GetType(), + Content: asJson, + } +} diff --git a/webhook.go b/webhook.go index e35d8ae..95c5f46 100644 --- a/webhook.go +++ b/webhook.go @@ -12,13 +12,11 @@ const ( eventBufferLength = 100 ) -type Event any - type Webhook struct { URL string TokenHeader string TokenValue string - Channel chan Event + Channel chan *RawEvent client *WalletClient } @@ -28,7 +26,7 @@ func NewWebhook(client *WalletClient, url, tokenHeader, tokenValue string) *Webh URL: url, TokenHeader: tokenHeader, TokenValue: tokenValue, - Channel: make(chan Event, eventBufferLength), + Channel: make(chan *RawEvent, eventBufferLength), client: client, } } @@ -47,7 +45,7 @@ func (w *Webhook) HTTPHandler() http.Handler { http.Error(rw, "Unauthorized", http.StatusUnauthorized) return } - var events []Event + var events []*RawEvent if err := json.NewDecoder(r.Body).Decode(&events); err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) return From b76f919c17ca12a1397b1ccad33b825028842d03 Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Thu, 27 Jun 2024 13:56:38 +0200 Subject: [PATCH 04/17] feat(SPV-848): notifications dispatcher --- examples/webhooks/webhooks.go | 29 ++++++++----- notifications_dispatcher.go | 77 ++++++++++++++++++----------------- 2 files changed, 57 insertions(+), 49 deletions(-) diff --git a/examples/webhooks/webhooks.go b/examples/webhooks/webhooks.go index 89596d9..bc17a60 100644 --- a/examples/webhooks/webhooks.go +++ b/examples/webhooks/webhooks.go @@ -25,17 +25,24 @@ func main() { http.Handle("/notification", wh.HTTPHandler()) - go func() { - for { - select { - case event := <-wh.Channel: - time.Sleep(50 * time.Millisecond) // simulate processing time - fmt.Println("Processing event:", event) - case <-context.Background().Done(): - return - } - } - }() + _ = walletclient.NewNotificationsDispatcher(context.Background(), wh.Channel, []walletclient.Handler{ + {Model: &walletclient.GeneralPurposeEvent{}, HandlerFunc: func(gpe *walletclient.GeneralPurposeEvent) { + time.Sleep(50 * time.Millisecond) // simulate processing time + fmt.Printf("Processing event: %s\n", gpe.Value) + }}, + }) + + // go func() { + // for { + // select { + // case event := <-wh.Channel: + // time.Sleep(50 * time.Millisecond) // simulate processing time + // fmt.Println("Processing event:", event) + // case <-context.Background().Done(): + // return + // } + // } + // }() go func() { _ = http.ListenAndServe(":5005", nil) diff --git a/notifications_dispatcher.go b/notifications_dispatcher.go index 22cd0e1..d705086 100644 --- a/notifications_dispatcher.go +++ b/notifications_dispatcher.go @@ -4,43 +4,64 @@ import ( "context" "encoding/json" "fmt" - - "github.com/pkg/errors" + "reflect" ) -type Handlers struct { - GeneralPurposeEvent []func(*GeneralPurposeEvent) +type Handler struct { + HandlerFunc any // any as function to handle the event + Model EventContent } type NotificationsDispatcher struct { ctx context.Context input chan *RawEvent - handlers Handlers + handlers map[string][]Handler } -func NewNotificationsDispatcher(ctx context.Context, inputChannel chan *RawEvent) *NotificationsDispatcher { - obj := &NotificationsDispatcher{ - ctx: ctx, - input: inputChannel, +func NewNotificationsDispatcher(ctx context.Context, inputChannel chan *RawEvent, providedHandlers []Handler) *NotificationsDispatcher { + dispatcher := &NotificationsDispatcher{ + ctx: ctx, + input: inputChannel, + handlers: make(map[string][]Handler, len(providedHandlers)), + } + + for _, handler := range providedHandlers { + dispatcher.handlers[handler.Model.GetType()] = append(dispatcher.handlers[handler.Model.GetType()], handler) } - return obj + go dispatcher.process() + + return dispatcher } func (nd *NotificationsDispatcher) process() { for { select { case event := <-nd.input: - switch event.Type { - case "general-purpose-event": - content, err := GetEventContent[GeneralPurposeEvent](event) - if err != nil { - fmt.Println("Error getting event content") + handlers, ok := nd.handlers[event.Type] + if !ok { + fmt.Printf("No handlers for %s event type", event.Type) + continue + } + for _, handler := range handlers { + modelSource := handler.Model + // copy the event to the model, use reflection + model := reflect.New(reflect.TypeOf(modelSource).Elem()).Interface() + if err := json.Unmarshal(event.Content, model); err != nil { + fmt.Println("Cannot unmarshall the content json") + continue + } + // use reflect + handlerValue := reflect.ValueOf(handler.HandlerFunc) + if handlerValue.Kind() != reflect.Func { + fmt.Println("Not a function") continue } - for _, handler := range nd.handlers.GeneralPurposeEvent { - handler(content) + if handlerValue.Type().NumIn() != 1 { + fmt.Println("Wrong number of arguments") + continue } + handlerValue.Call([]reflect.Value{reflect.ValueOf(model)}) } case <-nd.ctx.Done(): return @@ -64,26 +85,6 @@ type GeneralPurposeEvent struct { Value string } -func (GeneralPurposeEvent) GetType() string { +func (*GeneralPurposeEvent) GetType() string { return "general-purpose-event" } - -func GetEventContent[modelType EventContent](raw *RawEvent) (*modelType, error) { - model := *new(modelType) - if raw.Type != model.GetType() { - return nil, fmt.Errorf("Wrong type") - } - - if err := json.Unmarshal(raw.Content, &model); err != nil { - return nil, errors.Wrap(err, "Cannot unmarshall the content json") - } - return &model, nil -} - -func NewRawEvent(namedEvent EventContent) *RawEvent { - asJson, _ := json.Marshal(namedEvent) - return &RawEvent{ - Type: namedEvent.GetType(), - Content: asJson, - } -} From 37b0517c066646b05eae66b8325e334763efca5c Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Fri, 28 Jun 2024 13:38:21 +0200 Subject: [PATCH 05/17] feat(SPV-848): event names by reflect::Name --- .../list_transactions/list_transactions.go | 4 +- examples/webhooks/webhooks.go | 21 ++++-- notifications_dispatcher.go | 75 ++++++++++--------- 3 files changed, 57 insertions(+), 43 deletions(-) diff --git a/examples/list_transactions/list_transactions.go b/examples/list_transactions/list_transactions.go index d1d0c6f..b83e190 100644 --- a/examples/list_transactions/list_transactions.go +++ b/examples/list_transactions/list_transactions.go @@ -23,7 +23,9 @@ func main() { client := walletclient.NewWithXPriv(server, examples.ExampleXPriv) ctx := context.Background() - metadata := map[string]any{} + metadata := map[string]any{ + "note": "user-id-123", + } conditions := filter.TransactionFilter{} queryParams := filter.QueryParams{} diff --git a/examples/webhooks/webhooks.go b/examples/webhooks/webhooks.go index bc17a60..9d5101e 100644 --- a/examples/webhooks/webhooks.go +++ b/examples/webhooks/webhooks.go @@ -25,12 +25,21 @@ func main() { http.Handle("/notification", wh.HTTPHandler()) - _ = walletclient.NewNotificationsDispatcher(context.Background(), wh.Channel, []walletclient.Handler{ - {Model: &walletclient.GeneralPurposeEvent{}, HandlerFunc: func(gpe *walletclient.GeneralPurposeEvent) { - time.Sleep(50 * time.Millisecond) // simulate processing time - fmt.Printf("Processing event: %s\n", gpe.Value) - }}, - }) + d := walletclient.NewNotificationsDispatcher(context.Background(), wh.Channel) + + if err := walletclient.RegisterHandler(d, func(gpe *walletclient.NumericEvent) { + time.Sleep(50 * time.Millisecond) // simulate processing time + fmt.Printf("Processing event-numeric: %d\n", gpe.Numeric) + }); err != nil { + panic(err) + } + + if err := walletclient.RegisterHandler(d, func(gpe *walletclient.StringEvent) { + time.Sleep(50 * time.Millisecond) // simulate processing time + fmt.Printf("Processing event-string: %s\n", gpe.Value) + }); err != nil { + panic(err) + } // go func() { // for { diff --git a/notifications_dispatcher.go b/notifications_dispatcher.go index d705086..af5c233 100644 --- a/notifications_dispatcher.go +++ b/notifications_dispatcher.go @@ -5,28 +5,25 @@ import ( "encoding/json" "fmt" "reflect" + "sync" ) type Handler struct { - HandlerFunc any // any as function to handle the event - Model EventContent + Caller reflect.Value + ModelType reflect.Type } type NotificationsDispatcher struct { ctx context.Context input chan *RawEvent - handlers map[string][]Handler + handlers *sync.Map } -func NewNotificationsDispatcher(ctx context.Context, inputChannel chan *RawEvent, providedHandlers []Handler) *NotificationsDispatcher { +func NewNotificationsDispatcher(ctx context.Context, inputChannel chan *RawEvent) *NotificationsDispatcher { dispatcher := &NotificationsDispatcher{ ctx: ctx, input: inputChannel, - handlers: make(map[string][]Handler, len(providedHandlers)), - } - - for _, handler := range providedHandlers { - dispatcher.handlers[handler.Model.GetType()] = append(dispatcher.handlers[handler.Model.GetType()], handler) + handlers: &sync.Map{}, } go dispatcher.process() @@ -34,35 +31,42 @@ func NewNotificationsDispatcher(ctx context.Context, inputChannel chan *RawEvent return dispatcher } +func RegisterHandler[EventType Events](nd *NotificationsDispatcher, handlerFunction func(event *EventType)) error { + handlerValue := reflect.ValueOf(handlerFunction) + if handlerValue.Kind() != reflect.Func { + return fmt.Errorf("Not a function") + } + + modelType := handlerValue.Type().In(0) + if modelType.Kind() == reflect.Ptr { + modelType = modelType.Elem() + } + name := modelType.Name() + + nd.handlers.Store(name, Handler{ + Caller: handlerValue, + ModelType: modelType, + }) + return nil +} + func (nd *NotificationsDispatcher) process() { for { select { case event := <-nd.input: - handlers, ok := nd.handlers[event.Type] + h, ok := nd.handlers.Load(event.Type) if !ok { fmt.Printf("No handlers for %s event type", event.Type) continue } - for _, handler := range handlers { - modelSource := handler.Model - // copy the event to the model, use reflection - model := reflect.New(reflect.TypeOf(modelSource).Elem()).Interface() - if err := json.Unmarshal(event.Content, model); err != nil { - fmt.Println("Cannot unmarshall the content json") - continue - } - // use reflect - handlerValue := reflect.ValueOf(handler.HandlerFunc) - if handlerValue.Kind() != reflect.Func { - fmt.Println("Not a function") - continue - } - if handlerValue.Type().NumIn() != 1 { - fmt.Println("Wrong number of arguments") - continue - } - handlerValue.Call([]reflect.Value{reflect.ValueOf(model)}) + handler := h.(Handler) + + model := reflect.New(handler.ModelType).Interface() + if err := json.Unmarshal(event.Content, model); err != nil { + fmt.Println("Cannot unmarshall the content json") + continue } + handler.Caller.Call([]reflect.Value{reflect.ValueOf(model)}) case <-nd.ctx.Done(): return } @@ -71,20 +75,19 @@ func (nd *NotificationsDispatcher) process() { ////////////////////// BELOW it should be imported from spv-wallet models -// RawEvent - event type type RawEvent struct { Type string `json:"type"` Content json.RawMessage `json:"content"` } -type EventContent interface { - GetType() string +type StringEvent struct { + Value string } -type GeneralPurposeEvent struct { - Value string +type NumericEvent struct { + Numeric int } -func (*GeneralPurposeEvent) GetType() string { - return "general-purpose-event" +type Events interface { + StringEvent | NumericEvent } From 3538458ac6b441fe52cfa2304f741c47465cf3ab Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Fri, 28 Jun 2024 14:00:15 +0200 Subject: [PATCH 06/17] feat(SPV-848): webhook handles event dispatching on itself --- examples/webhooks/webhooks.go | 20 +----- notifications_dispatcher.go | 93 --------------------------- webhook.go | 114 +++++++++++++++++++++++++++++++--- 3 files changed, 110 insertions(+), 117 deletions(-) delete mode 100644 notifications_dispatcher.go diff --git a/examples/webhooks/webhooks.go b/examples/webhooks/webhooks.go index 9d5101e..f10f6a2 100644 --- a/examples/webhooks/webhooks.go +++ b/examples/webhooks/webhooks.go @@ -17,7 +17,7 @@ func main() { defer examples.HandlePanic() client := walletclient.NewWithAdminKey("http://localhost:3003/v1", "xprv9s21ZrQH143K2pmNeAHBzU4JHNDaFaPTbzKbBCw55ErhMDLsxDwKqcaDVV3PwmEmRZa9qUaU261iJaUx8eBiBF77zrPxTH8JGXC7LZQnsgA") - wh := walletclient.NewWebhook(client, "http://localhost:5005/notification", "Authorization", "this-is-the-token") + wh := walletclient.NewWebhook(context.Background(), client, "http://localhost:5005/notification", "Authorization", "this-is-the-token", 3) err := wh.Subscribe(context.Background()) if err != nil { panic(err) @@ -25,34 +25,20 @@ func main() { http.Handle("/notification", wh.HTTPHandler()) - d := walletclient.NewNotificationsDispatcher(context.Background(), wh.Channel) - - if err := walletclient.RegisterHandler(d, func(gpe *walletclient.NumericEvent) { + if err := walletclient.RegisterHandler(wh, func(gpe *walletclient.NumericEvent) { time.Sleep(50 * time.Millisecond) // simulate processing time fmt.Printf("Processing event-numeric: %d\n", gpe.Numeric) }); err != nil { panic(err) } - if err := walletclient.RegisterHandler(d, func(gpe *walletclient.StringEvent) { + if err := walletclient.RegisterHandler(wh, func(gpe *walletclient.StringEvent) { time.Sleep(50 * time.Millisecond) // simulate processing time fmt.Printf("Processing event-string: %s\n", gpe.Value) }); err != nil { panic(err) } - // go func() { - // for { - // select { - // case event := <-wh.Channel: - // time.Sleep(50 * time.Millisecond) // simulate processing time - // fmt.Println("Processing event:", event) - // case <-context.Background().Done(): - // return - // } - // } - // }() - go func() { _ = http.ListenAndServe(":5005", nil) }() diff --git a/notifications_dispatcher.go b/notifications_dispatcher.go deleted file mode 100644 index af5c233..0000000 --- a/notifications_dispatcher.go +++ /dev/null @@ -1,93 +0,0 @@ -package walletclient - -import ( - "context" - "encoding/json" - "fmt" - "reflect" - "sync" -) - -type Handler struct { - Caller reflect.Value - ModelType reflect.Type -} - -type NotificationsDispatcher struct { - ctx context.Context - input chan *RawEvent - handlers *sync.Map -} - -func NewNotificationsDispatcher(ctx context.Context, inputChannel chan *RawEvent) *NotificationsDispatcher { - dispatcher := &NotificationsDispatcher{ - ctx: ctx, - input: inputChannel, - handlers: &sync.Map{}, - } - - go dispatcher.process() - - return dispatcher -} - -func RegisterHandler[EventType Events](nd *NotificationsDispatcher, handlerFunction func(event *EventType)) error { - handlerValue := reflect.ValueOf(handlerFunction) - if handlerValue.Kind() != reflect.Func { - return fmt.Errorf("Not a function") - } - - modelType := handlerValue.Type().In(0) - if modelType.Kind() == reflect.Ptr { - modelType = modelType.Elem() - } - name := modelType.Name() - - nd.handlers.Store(name, Handler{ - Caller: handlerValue, - ModelType: modelType, - }) - return nil -} - -func (nd *NotificationsDispatcher) process() { - for { - select { - case event := <-nd.input: - h, ok := nd.handlers.Load(event.Type) - if !ok { - fmt.Printf("No handlers for %s event type", event.Type) - continue - } - handler := h.(Handler) - - model := reflect.New(handler.ModelType).Interface() - if err := json.Unmarshal(event.Content, model); err != nil { - fmt.Println("Cannot unmarshall the content json") - continue - } - handler.Caller.Call([]reflect.Value{reflect.ValueOf(model)}) - case <-nd.ctx.Done(): - return - } - } -} - -////////////////////// BELOW it should be imported from spv-wallet models - -type RawEvent struct { - Type string `json:"type"` - Content json.RawMessage `json:"content"` -} - -type StringEvent struct { - Value string -} - -type NumericEvent struct { - Numeric int -} - -type Events interface { - StringEvent | NumericEvent -} diff --git a/webhook.go b/webhook.go index 95c5f46..0ba7fda 100644 --- a/webhook.go +++ b/webhook.go @@ -5,6 +5,8 @@ import ( "encoding/json" "fmt" "net/http" + "reflect" + "sync" "time" ) @@ -16,19 +18,27 @@ type Webhook struct { URL string TokenHeader string TokenValue string - Channel chan *RawEvent + buffer chan *RawEvent - client *WalletClient + client *WalletClient + rootCtx context.Context + handlers *eventsMap } -func NewWebhook(client *WalletClient, url, tokenHeader, tokenValue string) *Webhook { - return &Webhook{ +func NewWebhook(ctx context.Context, client *WalletClient, url, tokenHeader, tokenValue string, processors int) *Webhook { + wh := &Webhook{ URL: url, TokenHeader: tokenHeader, TokenValue: tokenValue, - Channel: make(chan *RawEvent, eventBufferLength), + buffer: make(chan *RawEvent, eventBufferLength), client: client, + rootCtx: ctx, + handlers: newEventsMap(), } + for i := 0; i < processors; i++ { + go wh.process() + } + return wh } func (w *Webhook) Subscribe(ctx context.Context) ResponseError { @@ -53,10 +63,13 @@ func (w *Webhook) HTTPHandler() http.Handler { fmt.Printf("Received: %v\n", events) for _, event := range events { select { - case w.Channel <- event: + case w.buffer <- event: // event sent case <-r.Context().Done(): - // context cancelled + // request context cancelled + return + case <-w.rootCtx.Done(): + // root context cancelled - the whole event processing has been stopped return case <-time.After(1 * time.Second): // timeout, most probably the channel is full @@ -66,3 +79,90 @@ func (w *Webhook) HTTPHandler() http.Handler { rw.WriteHeader(http.StatusOK) }) } + +func RegisterHandler[EventType Events](nd *Webhook, handlerFunction func(event *EventType)) error { + handlerValue := reflect.ValueOf(handlerFunction) + if handlerValue.Kind() != reflect.Func { + return fmt.Errorf("Not a function") + } + + modelType := handlerValue.Type().In(0) + if modelType.Kind() == reflect.Ptr { + modelType = modelType.Elem() + } + name := modelType.Name() + + nd.handlers.store(name, &eventHandler{ + Caller: handlerValue, + ModelType: modelType, + }) + + return nil +} + +type eventHandler struct { + Caller reflect.Value + ModelType reflect.Type +} + +type eventsMap struct { + registered *sync.Map +} + +func newEventsMap() *eventsMap { + return &eventsMap{ + registered: &sync.Map{}, + } +} + +func (em *eventsMap) store(name string, handler *eventHandler) { + em.registered.Store(name, handler) +} + +func (em *eventsMap) load(name string) (*eventHandler, bool) { + h, ok := em.registered.Load(name) + if !ok { + return nil, false + } + return h.(*eventHandler), true +} + +func (nd *Webhook) process() { + for { + select { + case event := <-nd.buffer: + handler, ok := nd.handlers.load(event.Type) + if !ok { + fmt.Printf("No handlers for %s event type", event.Type) + continue + } + model := reflect.New(handler.ModelType).Interface() + if err := json.Unmarshal(event.Content, model); err != nil { + fmt.Println("Cannot unmarshall the content json") + continue + } + handler.Caller.Call([]reflect.Value{reflect.ValueOf(model)}) + case <-nd.rootCtx.Done(): + return + } + } +} + +////////////////////// BELOW it should be imported from spv-wallet models + +type RawEvent struct { + Type string `json:"type"` + Content json.RawMessage `json:"content"` +} + +type StringEvent struct { + Value string +} + +type NumericEvent struct { + Numeric int +} + +type Events interface { + StringEvent | NumericEvent +} From 10760ad40bcbe4c9d0b02169b167b002a4aa0a03 Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Fri, 28 Jun 2024 14:27:15 +0200 Subject: [PATCH 07/17] feat(SPV-848): webhook features to a separate package --- examples/webhooks/webhooks.go | 16 +++- http.go | 6 +- notifications/eventsMap.go | 25 +++++ notifications/interface.go | 8 ++ notifications/options.go | 56 ++++++++++++ notifications/registerer.go | 31 +++++++ notifications/webhook.go | 108 ++++++++++++++++++++++ webhook.go | 168 ---------------------------------- 8 files changed, 243 insertions(+), 175 deletions(-) create mode 100644 notifications/eventsMap.go create mode 100644 notifications/interface.go create mode 100644 notifications/options.go create mode 100644 notifications/registerer.go create mode 100644 notifications/webhook.go delete mode 100644 webhook.go diff --git a/examples/webhooks/webhooks.go b/examples/webhooks/webhooks.go index f10f6a2..aee9dab 100644 --- a/examples/webhooks/webhooks.go +++ b/examples/webhooks/webhooks.go @@ -11,13 +11,21 @@ import ( walletclient "github.com/bitcoin-sv/spv-wallet-go-client" "github.com/bitcoin-sv/spv-wallet-go-client/examples" + "github.com/bitcoin-sv/spv-wallet-go-client/notifications" ) func main() { defer examples.HandlePanic() - client := walletclient.NewWithAdminKey("http://localhost:3003/v1", "xprv9s21ZrQH143K2pmNeAHBzU4JHNDaFaPTbzKbBCw55ErhMDLsxDwKqcaDVV3PwmEmRZa9qUaU261iJaUx8eBiBF77zrPxTH8JGXC7LZQnsgA") - wh := walletclient.NewWebhook(context.Background(), client, "http://localhost:5005/notification", "Authorization", "this-is-the-token", 3) + client := walletclient.NewWithAdminKey("http://localhost:3003/v1", examples.ExampleAdminKey) + //"Authorization", "this-is-the-token", 3 + wh := notifications.NewWebhook( + context.Background(), + client, + "http://localhost:5005/notification", + notifications.WithToken("Authorization", "this-is-the-token"), + notifications.WithProcessors(3), + ) err := wh.Subscribe(context.Background()) if err != nil { panic(err) @@ -25,14 +33,14 @@ func main() { http.Handle("/notification", wh.HTTPHandler()) - if err := walletclient.RegisterHandler(wh, func(gpe *walletclient.NumericEvent) { + if err = notifications.RegisterHandler(wh, func(gpe *notifications.NumericEvent) { time.Sleep(50 * time.Millisecond) // simulate processing time fmt.Printf("Processing event-numeric: %d\n", gpe.Numeric) }); err != nil { panic(err) } - if err := walletclient.RegisterHandler(wh, func(gpe *walletclient.StringEvent) { + if err = notifications.RegisterHandler(wh, func(gpe *notifications.StringEvent) { time.Sleep(50 * time.Millisecond) // simulate processing time fmt.Printf("Processing event-string: %s\n", gpe.Value) }); err != nil { diff --git a/http.go b/http.go index fd833a3..6299512 100644 --- a/http.go +++ b/http.go @@ -1129,7 +1129,7 @@ func (wc *WalletClient) SendToRecipients(ctx context.Context, recipients []*Reci return wc.RecordTransaction(ctx, hex, draft.ID, metadata) } -func (wc *WalletClient) AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) ResponseError { +func (wc *WalletClient) AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) error { requestModel := struct { URL string `json:"url"` TokenHeader string `json:"tokenHeader"` @@ -1147,7 +1147,7 @@ func (wc *WalletClient) AdminSubscribeWebhook(ctx context.Context, webhookURL, t return WrapError(err) } -func (wc *WalletClient) AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) ResponseError { +func (wc *WalletClient) AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) error { requestModel := struct { URL string `json:"url"` }{ @@ -1158,5 +1158,5 @@ func (wc *WalletClient) AdminUnsubscribeWebhook(ctx context.Context, webhookURL return WrapError(nil) } err = wc.doHTTPRequest(ctx, http.MethodPost, "/admin/webhooks/unsubscribe", rawJSON, wc.adminXPriv, true, nil) - return WrapError(err) + return err } diff --git a/notifications/eventsMap.go b/notifications/eventsMap.go new file mode 100644 index 0000000..00e0b52 --- /dev/null +++ b/notifications/eventsMap.go @@ -0,0 +1,25 @@ +package notifications + +import "sync" + +type eventsMap struct { + registered *sync.Map +} + +func newEventsMap() *eventsMap { + return &eventsMap{ + registered: &sync.Map{}, + } +} + +func (em *eventsMap) store(name string, handler *eventHandler) { + em.registered.Store(name, handler) +} + +func (em *eventsMap) load(name string) (*eventHandler, bool) { + h, ok := em.registered.Load(name) + if !ok { + return nil, false + } + return h.(*eventHandler), true +} diff --git a/notifications/interface.go b/notifications/interface.go new file mode 100644 index 0000000..0741568 --- /dev/null +++ b/notifications/interface.go @@ -0,0 +1,8 @@ +package notifications + +import "context" + +type WebhookSubscriber interface { + AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) error + AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) error +} diff --git a/notifications/options.go b/notifications/options.go new file mode 100644 index 0000000..9f09f27 --- /dev/null +++ b/notifications/options.go @@ -0,0 +1,56 @@ +package notifications + +import "context" + +type WebhookOptions struct { + TokenHeader string + TokenValue string + BufferSize int + RootContext context.Context + Processors int +} + +func NewWebhookOptions() *WebhookOptions { + return &WebhookOptions{ + TokenHeader: "", + TokenValue: "", + BufferSize: 100, + Processors: 1, + RootContext: context.Background(), + } +} + +type WebhookOpts = func(*WebhookOptions) + +func WithToken(tokenHeader, tokenValue string) WebhookOpts { + return func(w *WebhookOptions) { + w.TokenHeader = tokenHeader + w.TokenValue = tokenValue + } +} + +func WithBufferSize(size int) WebhookOpts { + return func(w *WebhookOptions) { + w.BufferSize = size + } +} + +func WithRootContext(ctx context.Context) WebhookOpts { + return func(w *WebhookOptions) { + w.RootContext = ctx + } +} + +func WithProcessors(count int) WebhookOpts { + return func(w *WebhookOptions) { + w.Processors = count + } +} + +type Webhook struct { + URL string + options *WebhookOptions + buffer chan *RawEvent + subscriber WebhookSubscriber + handlers *eventsMap +} diff --git a/notifications/registerer.go b/notifications/registerer.go new file mode 100644 index 0000000..2c40c44 --- /dev/null +++ b/notifications/registerer.go @@ -0,0 +1,31 @@ +package notifications + +import ( + "fmt" + "reflect" +) + +type eventHandler struct { + Caller reflect.Value + ModelType reflect.Type +} + +func RegisterHandler[EventType Events](nd *Webhook, handlerFunction func(event *EventType)) error { + handlerValue := reflect.ValueOf(handlerFunction) + if handlerValue.Kind() != reflect.Func { + return fmt.Errorf("Not a function") + } + + modelType := handlerValue.Type().In(0) + if modelType.Kind() == reflect.Ptr { + modelType = modelType.Elem() + } + name := modelType.Name() + + nd.handlers.store(name, &eventHandler{ + Caller: handlerValue, + ModelType: modelType, + }) + + return nil +} diff --git a/notifications/webhook.go b/notifications/webhook.go new file mode 100644 index 0000000..85cc38b --- /dev/null +++ b/notifications/webhook.go @@ -0,0 +1,108 @@ +package notifications + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "reflect" + "time" +) + +func NewWebhook(ctx context.Context, subscriber WebhookSubscriber, url string, opts ...WebhookOpts) *Webhook { + options := NewWebhookOptions() + for _, opt := range opts { + opt(options) + } + + wh := &Webhook{ + URL: url, + options: options, + buffer: make(chan *RawEvent, options.BufferSize), + subscriber: subscriber, + handlers: newEventsMap(), + } + for i := 0; i < options.Processors; i++ { + go wh.process() + } + return wh +} + +func (w *Webhook) Subscribe(ctx context.Context) error { + return w.subscriber.AdminSubscribeWebhook(ctx, w.URL, w.options.TokenHeader, w.options.TokenValue) +} + +func (w *Webhook) Unsubscribe(ctx context.Context) error { + return w.subscriber.AdminUnsubscribeWebhook(ctx, w.URL) +} + +func (w *Webhook) HTTPHandler() http.Handler { + return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + if w.options.TokenHeader != "" && r.Header.Get(w.options.TokenHeader) != w.options.TokenValue { + http.Error(rw, "Unauthorized", http.StatusUnauthorized) + return + } + var events []*RawEvent + if err := json.NewDecoder(r.Body).Decode(&events); err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + fmt.Printf("Received: %v\n", events) + for _, event := range events { + select { + case w.buffer <- event: + // event sent + case <-r.Context().Done(): + // request context cancelled + return + case <-w.options.RootContext.Done(): + // root context cancelled - the whole event processing has been stopped + return + case <-time.After(1 * time.Second): + // timeout, most probably the channel is full + // TODO: log this + } + } + rw.WriteHeader(http.StatusOK) + }) +} + +func (nd *Webhook) process() { + for { + select { + case event := <-nd.buffer: + handler, ok := nd.handlers.load(event.Type) + if !ok { + fmt.Printf("No handlers for %s event type", event.Type) + continue + } + model := reflect.New(handler.ModelType).Interface() + if err := json.Unmarshal(event.Content, model); err != nil { + fmt.Println("Cannot unmarshall the content json") + continue + } + handler.Caller.Call([]reflect.Value{reflect.ValueOf(model)}) + case <-nd.options.RootContext.Done(): + return + } + } +} + +////////////////////// BELOW it should be imported from spv-wallet models + +type RawEvent struct { + Type string `json:"type"` + Content json.RawMessage `json:"content"` +} + +type StringEvent struct { + Value string +} + +type NumericEvent struct { + Numeric int +} + +type Events interface { + StringEvent | NumericEvent +} diff --git a/webhook.go b/webhook.go deleted file mode 100644 index 0ba7fda..0000000 --- a/webhook.go +++ /dev/null @@ -1,168 +0,0 @@ -package walletclient - -import ( - "context" - "encoding/json" - "fmt" - "net/http" - "reflect" - "sync" - "time" -) - -const ( - eventBufferLength = 100 -) - -type Webhook struct { - URL string - TokenHeader string - TokenValue string - buffer chan *RawEvent - - client *WalletClient - rootCtx context.Context - handlers *eventsMap -} - -func NewWebhook(ctx context.Context, client *WalletClient, url, tokenHeader, tokenValue string, processors int) *Webhook { - wh := &Webhook{ - URL: url, - TokenHeader: tokenHeader, - TokenValue: tokenValue, - buffer: make(chan *RawEvent, eventBufferLength), - client: client, - rootCtx: ctx, - handlers: newEventsMap(), - } - for i := 0; i < processors; i++ { - go wh.process() - } - return wh -} - -func (w *Webhook) Subscribe(ctx context.Context) ResponseError { - return w.client.AdminSubscribeWebhook(ctx, w.URL, w.TokenHeader, w.TokenValue) -} - -func (w *Webhook) Unsubscribe(ctx context.Context) ResponseError { - return w.client.AdminUnsubscribeWebhook(ctx, w.URL) -} - -func (w *Webhook) HTTPHandler() http.Handler { - return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - if w.TokenHeader != "" && r.Header.Get(w.TokenHeader) != w.TokenValue { - http.Error(rw, "Unauthorized", http.StatusUnauthorized) - return - } - var events []*RawEvent - if err := json.NewDecoder(r.Body).Decode(&events); err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - fmt.Printf("Received: %v\n", events) - for _, event := range events { - select { - case w.buffer <- event: - // event sent - case <-r.Context().Done(): - // request context cancelled - return - case <-w.rootCtx.Done(): - // root context cancelled - the whole event processing has been stopped - return - case <-time.After(1 * time.Second): - // timeout, most probably the channel is full - // TODO: log this - } - } - rw.WriteHeader(http.StatusOK) - }) -} - -func RegisterHandler[EventType Events](nd *Webhook, handlerFunction func(event *EventType)) error { - handlerValue := reflect.ValueOf(handlerFunction) - if handlerValue.Kind() != reflect.Func { - return fmt.Errorf("Not a function") - } - - modelType := handlerValue.Type().In(0) - if modelType.Kind() == reflect.Ptr { - modelType = modelType.Elem() - } - name := modelType.Name() - - nd.handlers.store(name, &eventHandler{ - Caller: handlerValue, - ModelType: modelType, - }) - - return nil -} - -type eventHandler struct { - Caller reflect.Value - ModelType reflect.Type -} - -type eventsMap struct { - registered *sync.Map -} - -func newEventsMap() *eventsMap { - return &eventsMap{ - registered: &sync.Map{}, - } -} - -func (em *eventsMap) store(name string, handler *eventHandler) { - em.registered.Store(name, handler) -} - -func (em *eventsMap) load(name string) (*eventHandler, bool) { - h, ok := em.registered.Load(name) - if !ok { - return nil, false - } - return h.(*eventHandler), true -} - -func (nd *Webhook) process() { - for { - select { - case event := <-nd.buffer: - handler, ok := nd.handlers.load(event.Type) - if !ok { - fmt.Printf("No handlers for %s event type", event.Type) - continue - } - model := reflect.New(handler.ModelType).Interface() - if err := json.Unmarshal(event.Content, model); err != nil { - fmt.Println("Cannot unmarshall the content json") - continue - } - handler.Caller.Call([]reflect.Value{reflect.ValueOf(model)}) - case <-nd.rootCtx.Done(): - return - } - } -} - -////////////////////// BELOW it should be imported from spv-wallet models - -type RawEvent struct { - Type string `json:"type"` - Content json.RawMessage `json:"content"` -} - -type StringEvent struct { - Value string -} - -type NumericEvent struct { - Numeric int -} - -type Events interface { - StringEvent | NumericEvent -} From 59c6c7204140ddc49f398bbcd0eebb82ddabbd1d Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Fri, 28 Jun 2024 14:56:15 +0200 Subject: [PATCH 08/17] feat(SPV-848): adjust example to new event models --- examples/webhooks/webhooks.go | 10 +++++----- notifications/webhook.go | 17 +++++++++++++---- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/examples/webhooks/webhooks.go b/examples/webhooks/webhooks.go index aee9dab..7f12df8 100644 --- a/examples/webhooks/webhooks.go +++ b/examples/webhooks/webhooks.go @@ -17,7 +17,7 @@ import ( func main() { defer examples.HandlePanic() - client := walletclient.NewWithAdminKey("http://localhost:3003/v1", examples.ExampleAdminKey) + client := walletclient.NewWithAdminKey("http://localhost:3003/v1", "xprv9s21ZrQH143K2pmNeAHBzU4JHNDaFaPTbzKbBCw55ErhMDLsxDwKqcaDVV3PwmEmRZa9qUaU261iJaUx8eBiBF77zrPxTH8JGXC7LZQnsgA") //"Authorization", "this-is-the-token", 3 wh := notifications.NewWebhook( context.Background(), @@ -33,16 +33,16 @@ func main() { http.Handle("/notification", wh.HTTPHandler()) - if err = notifications.RegisterHandler(wh, func(gpe *notifications.NumericEvent) { + if err = notifications.RegisterHandler(wh, func(gpe *notifications.StringEvent) { time.Sleep(50 * time.Millisecond) // simulate processing time - fmt.Printf("Processing event-numeric: %d\n", gpe.Numeric) + fmt.Printf("Processing event-string: %s\n", gpe.Value) }); err != nil { panic(err) } - if err = notifications.RegisterHandler(wh, func(gpe *notifications.StringEvent) { + if err = notifications.RegisterHandler(wh, func(gpe *notifications.TransactionEvent) { time.Sleep(50 * time.Millisecond) // simulate processing time - fmt.Printf("Processing event-string: %s\n", gpe.Value) + fmt.Printf("Processing event-transaction: XPubID: %s, TxID: %s\n", gpe.XPubID, gpe.TransactionID) }); err != nil { panic(err) } diff --git a/notifications/webhook.go b/notifications/webhook.go index 85cc38b..3574b07 100644 --- a/notifications/webhook.go +++ b/notifications/webhook.go @@ -90,19 +90,28 @@ func (nd *Webhook) process() { ////////////////////// BELOW it should be imported from spv-wallet models +// RawEvent - the base event type type RawEvent struct { Type string `json:"type"` Content json.RawMessage `json:"content"` } +// StringEvent - event with string value; can be used for generic messages and it's used for testing type StringEvent struct { - Value string + Value string `json:"value"` } -type NumericEvent struct { - Numeric int +type UserEvent struct { + XPubID string `json:"xpubId"` } +type TransactionEvent struct { + UserEvent `json:",inline"` + + TransactionID string `json:"transactionId"` +} + +// Events - interface for all supported events type Events interface { - StringEvent | NumericEvent + StringEvent | TransactionEvent } From a26b75609e24cea91ac27bb458f441c6702baa4f Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Tue, 2 Jul 2024 12:03:46 +0200 Subject: [PATCH 09/17] feat(SPV-848): use models from spv-wallet --- examples/go.mod | 4 ++-- examples/go.sum | 4 ++-- examples/webhooks/webhooks.go | 12 +++++++----- go.mod | 4 ++-- go.sum | 4 ++-- notifications/options.go | 8 ++++++-- notifications/registerer.go | 4 +++- notifications/webhook.go | 34 ++++------------------------------ 8 files changed, 28 insertions(+), 46 deletions(-) diff --git a/examples/go.mod b/examples/go.mod index 458eab3..34dd12d 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -1,12 +1,12 @@ module github.com/bitcoin-sv/spv-wallet-go-client/examples -go 1.21 +go 1.22.4 replace github.com/bitcoin-sv/spv-wallet-go-client => ../ require ( github.com/bitcoin-sv/spv-wallet-go-client v0.0.0-00010101000000-000000000000 - github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.13 + github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.14.0.20240702094836-4955eed44573 ) require ( diff --git a/examples/go.sum b/examples/go.sum index fdd302c..be29a03 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -1,5 +1,5 @@ -github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.13 h1:rBscs3Gbz0RWY03eI3Z9AwD7/MxajdJF54oy3xMqKRQ= -github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.13/go.mod h1:i3txysriHpprqYd3u97wEQsC4/jn+KHcyFOmuFYMw8M= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.14.0.20240702094836-4955eed44573 h1:lWWSZefre67rhyUistJjTH2BHZyNpczRjp2MHYibt7c= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.14.0.20240702094836-4955eed44573/go.mod h1:VsUb0ZRA6Emr8+VDEq5SbOyzwvfKb+32Lkb9dM+n20o= github.com/bitcoinschema/go-bitcoin/v2 v2.0.5 h1:Sgh5Eb746Zck/46rFDrZZEXZWyO53fMuWYhNoZa1tck= github.com/bitcoinschema/go-bitcoin/v2 v2.0.5/go.mod h1:JjO1ivfZv6vhK0uAXzyH08AAHlzNMAfnyK1Fiv9r4ZA= github.com/bitcoinsv/bsvd v0.0.0-20190609155523-4c29707f7173 h1:2yTIV9u7H0BhRDGXH5xrAwAz7XibWJtX2dNezMeNsUo= diff --git a/examples/webhooks/webhooks.go b/examples/webhooks/webhooks.go index 7f12df8..c10f17c 100644 --- a/examples/webhooks/webhooks.go +++ b/examples/webhooks/webhooks.go @@ -12,13 +12,15 @@ import ( walletclient "github.com/bitcoin-sv/spv-wallet-go-client" "github.com/bitcoin-sv/spv-wallet-go-client/examples" "github.com/bitcoin-sv/spv-wallet-go-client/notifications" + "github.com/bitcoin-sv/spv-wallet/models" ) func main() { defer examples.HandlePanic() - client := walletclient.NewWithAdminKey("http://localhost:3003/v1", "xprv9s21ZrQH143K2pmNeAHBzU4JHNDaFaPTbzKbBCw55ErhMDLsxDwKqcaDVV3PwmEmRZa9qUaU261iJaUx8eBiBF77zrPxTH8JGXC7LZQnsgA") - //"Authorization", "this-is-the-token", 3 + examples.CheckIfAdminKeyExists() + + client := walletclient.NewWithAdminKey("http://localhost:3003/v1", examples.ExampleAdminKey) wh := notifications.NewWebhook( context.Background(), client, @@ -33,16 +35,16 @@ func main() { http.Handle("/notification", wh.HTTPHandler()) - if err = notifications.RegisterHandler(wh, func(gpe *notifications.StringEvent) { + if err = notifications.RegisterHandler(wh, func(gpe *models.StringEvent) { time.Sleep(50 * time.Millisecond) // simulate processing time fmt.Printf("Processing event-string: %s\n", gpe.Value) }); err != nil { panic(err) } - if err = notifications.RegisterHandler(wh, func(gpe *notifications.TransactionEvent) { + if err = notifications.RegisterHandler(wh, func(gpe *models.TransactionEvent) { time.Sleep(50 * time.Millisecond) // simulate processing time - fmt.Printf("Processing event-transaction: XPubID: %s, TxID: %s\n", gpe.XPubID, gpe.TransactionID) + fmt.Printf("Processing event-transaction: XPubID: %s, TxID: %s, Status: %s\n", gpe.XPubID, gpe.TransactionID, gpe.Status) }); err != nil { panic(err) } diff --git a/go.mod b/go.mod index 5e1d077..f44665f 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,9 @@ module github.com/bitcoin-sv/spv-wallet-go-client -go 1.21 +go 1.22.4 require ( - github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.13 + github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.14.0.20240702094836-4955eed44573 github.com/bitcoinschema/go-bitcoin/v2 v2.0.5 github.com/libsv/go-bk v0.1.6 github.com/libsv/go-bt/v2 v2.2.5 diff --git a/go.sum b/go.sum index 5413a3d..d201127 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.13 h1:rBscs3Gbz0RWY03eI3Z9AwD7/MxajdJF54oy3xMqKRQ= -github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.13/go.mod h1:i3txysriHpprqYd3u97wEQsC4/jn+KHcyFOmuFYMw8M= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.14.0.20240702094836-4955eed44573 h1:lWWSZefre67rhyUistJjTH2BHZyNpczRjp2MHYibt7c= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.14.0.20240702094836-4955eed44573/go.mod h1:VsUb0ZRA6Emr8+VDEq5SbOyzwvfKb+32Lkb9dM+n20o= github.com/bitcoinschema/go-bitcoin/v2 v2.0.5 h1:Sgh5Eb746Zck/46rFDrZZEXZWyO53fMuWYhNoZa1tck= github.com/bitcoinschema/go-bitcoin/v2 v2.0.5/go.mod h1:JjO1ivfZv6vhK0uAXzyH08AAHlzNMAfnyK1Fiv9r4ZA= github.com/bitcoinsv/bsvd v0.0.0-20190609155523-4c29707f7173 h1:2yTIV9u7H0BhRDGXH5xrAwAz7XibWJtX2dNezMeNsUo= diff --git a/notifications/options.go b/notifications/options.go index 9f09f27..626f80d 100644 --- a/notifications/options.go +++ b/notifications/options.go @@ -1,6 +1,10 @@ package notifications -import "context" +import ( + "context" + + "github.com/bitcoin-sv/spv-wallet/models" +) type WebhookOptions struct { TokenHeader string @@ -50,7 +54,7 @@ func WithProcessors(count int) WebhookOpts { type Webhook struct { URL string options *WebhookOptions - buffer chan *RawEvent + buffer chan *models.RawEvent subscriber WebhookSubscriber handlers *eventsMap } diff --git a/notifications/registerer.go b/notifications/registerer.go index 2c40c44..8022838 100644 --- a/notifications/registerer.go +++ b/notifications/registerer.go @@ -3,6 +3,8 @@ package notifications import ( "fmt" "reflect" + + "github.com/bitcoin-sv/spv-wallet/models" ) type eventHandler struct { @@ -10,7 +12,7 @@ type eventHandler struct { ModelType reflect.Type } -func RegisterHandler[EventType Events](nd *Webhook, handlerFunction func(event *EventType)) error { +func RegisterHandler[EventType models.Events](nd *Webhook, handlerFunction func(event *EventType)) error { handlerValue := reflect.ValueOf(handlerFunction) if handlerValue.Kind() != reflect.Func { return fmt.Errorf("Not a function") diff --git a/notifications/webhook.go b/notifications/webhook.go index 3574b07..7d002fe 100644 --- a/notifications/webhook.go +++ b/notifications/webhook.go @@ -7,6 +7,8 @@ import ( "net/http" "reflect" "time" + + "github.com/bitcoin-sv/spv-wallet/models" ) func NewWebhook(ctx context.Context, subscriber WebhookSubscriber, url string, opts ...WebhookOpts) *Webhook { @@ -18,7 +20,7 @@ func NewWebhook(ctx context.Context, subscriber WebhookSubscriber, url string, o wh := &Webhook{ URL: url, options: options, - buffer: make(chan *RawEvent, options.BufferSize), + buffer: make(chan *models.RawEvent, options.BufferSize), subscriber: subscriber, handlers: newEventsMap(), } @@ -42,7 +44,7 @@ func (w *Webhook) HTTPHandler() http.Handler { http.Error(rw, "Unauthorized", http.StatusUnauthorized) return } - var events []*RawEvent + var events []*models.RawEvent if err := json.NewDecoder(r.Body).Decode(&events); err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) return @@ -87,31 +89,3 @@ func (nd *Webhook) process() { } } } - -////////////////////// BELOW it should be imported from spv-wallet models - -// RawEvent - the base event type -type RawEvent struct { - Type string `json:"type"` - Content json.RawMessage `json:"content"` -} - -// StringEvent - event with string value; can be used for generic messages and it's used for testing -type StringEvent struct { - Value string `json:"value"` -} - -type UserEvent struct { - XPubID string `json:"xpubId"` -} - -type TransactionEvent struct { - UserEvent `json:",inline"` - - TransactionID string `json:"transactionId"` -} - -// Events - interface for all supported events -type Events interface { - StringEvent | TransactionEvent -} From 13386738b07182daa781b78361b501dccc02b263 Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Tue, 2 Jul 2024 13:48:57 +0200 Subject: [PATCH 10/17] feat(SPV-848): fix lint errors --- examples/webhooks/webhooks.go | 1 - notifications/interface.go | 1 + notifications/options.go | 20 +++++++++----------- notifications/registerer.go | 5 +++-- notifications/webhook.go | 33 +++++++++++++++++++++------------ 5 files changed, 34 insertions(+), 26 deletions(-) diff --git a/examples/webhooks/webhooks.go b/examples/webhooks/webhooks.go index c10f17c..2b612ce 100644 --- a/examples/webhooks/webhooks.go +++ b/examples/webhooks/webhooks.go @@ -22,7 +22,6 @@ func main() { client := walletclient.NewWithAdminKey("http://localhost:3003/v1", examples.ExampleAdminKey) wh := notifications.NewWebhook( - context.Background(), client, "http://localhost:5005/notification", notifications.WithToken("Authorization", "this-is-the-token"), diff --git a/notifications/interface.go b/notifications/interface.go index 0741568..610ea2b 100644 --- a/notifications/interface.go +++ b/notifications/interface.go @@ -2,6 +2,7 @@ package notifications import "context" +// WebhookSubscriber - interface for subscribing and unsubscribing to webhooks type WebhookSubscriber interface { AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) error AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) error diff --git a/notifications/options.go b/notifications/options.go index 626f80d..436a3ed 100644 --- a/notifications/options.go +++ b/notifications/options.go @@ -2,10 +2,10 @@ package notifications import ( "context" - - "github.com/bitcoin-sv/spv-wallet/models" + "runtime" ) +// WebhookOptions - options for the webhook type WebhookOptions struct { TokenHeader string TokenValue string @@ -14,18 +14,21 @@ type WebhookOptions struct { Processors int } +// NewWebhookOptions - creates a new webhook options func NewWebhookOptions() *WebhookOptions { return &WebhookOptions{ TokenHeader: "", TokenValue: "", BufferSize: 100, - Processors: 1, + Processors: runtime.NumCPU(), RootContext: context.Background(), } } +// WebhookOpts - functional options for the webhook type WebhookOpts = func(*WebhookOptions) +// WithToken - sets the token header and value func WithToken(tokenHeader, tokenValue string) WebhookOpts { return func(w *WebhookOptions) { w.TokenHeader = tokenHeader @@ -33,28 +36,23 @@ func WithToken(tokenHeader, tokenValue string) WebhookOpts { } } +// WithBufferSize - sets the buffer size func WithBufferSize(size int) WebhookOpts { return func(w *WebhookOptions) { w.BufferSize = size } } +// WithRootContext - sets the root context func WithRootContext(ctx context.Context) WebhookOpts { return func(w *WebhookOptions) { w.RootContext = ctx } } +// WithProcessors - sets the number of concurrent loops which will process the events func WithProcessors(count int) WebhookOpts { return func(w *WebhookOptions) { w.Processors = count } } - -type Webhook struct { - URL string - options *WebhookOptions - buffer chan *models.RawEvent - subscriber WebhookSubscriber - handlers *eventsMap -} diff --git a/notifications/registerer.go b/notifications/registerer.go index 8022838..eb6bc31 100644 --- a/notifications/registerer.go +++ b/notifications/registerer.go @@ -1,7 +1,7 @@ package notifications import ( - "fmt" + "errors" "reflect" "github.com/bitcoin-sv/spv-wallet/models" @@ -12,10 +12,11 @@ type eventHandler struct { ModelType reflect.Type } +// RegisterHandler - registers a handler for a specific event type func RegisterHandler[EventType models.Events](nd *Webhook, handlerFunction func(event *EventType)) error { handlerValue := reflect.ValueOf(handlerFunction) if handlerValue.Kind() != reflect.Func { - return fmt.Errorf("Not a function") + return errors.New("handlerFunction must be a function") } modelType := handlerValue.Type().In(0) diff --git a/notifications/webhook.go b/notifications/webhook.go index 7d002fe..0dd488d 100644 --- a/notifications/webhook.go +++ b/notifications/webhook.go @@ -3,7 +3,6 @@ package notifications import ( "context" "encoding/json" - "fmt" "net/http" "reflect" "time" @@ -11,7 +10,17 @@ import ( "github.com/bitcoin-sv/spv-wallet/models" ) -func NewWebhook(ctx context.Context, subscriber WebhookSubscriber, url string, opts ...WebhookOpts) *Webhook { +// Webhook - the webhook event receiver +type Webhook struct { + URL string + options *WebhookOptions + buffer chan *models.RawEvent + subscriber WebhookSubscriber + handlers *eventsMap +} + +// NewWebhook - creates a new webhook +func NewWebhook(subscriber WebhookSubscriber, url string, opts ...WebhookOpts) *Webhook { options := NewWebhookOptions() for _, opt := range opts { opt(options) @@ -30,14 +39,17 @@ func NewWebhook(ctx context.Context, subscriber WebhookSubscriber, url string, o return wh } +// Subscribe - sends a subscription request to the spv-wallet func (w *Webhook) Subscribe(ctx context.Context) error { return w.subscriber.AdminSubscribeWebhook(ctx, w.URL, w.options.TokenHeader, w.options.TokenValue) } +// Unsubscribe - sends an unsubscription request to the spv-wallet func (w *Webhook) Unsubscribe(ctx context.Context) error { return w.subscriber.AdminUnsubscribeWebhook(ctx, w.URL) } +// HTTPHandler - returns an http handler for the webhook; it should be registered with the http server func (w *Webhook) HTTPHandler() http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { if w.options.TokenHeader != "" && r.Header.Get(w.options.TokenHeader) != w.options.TokenValue { @@ -49,42 +61,39 @@ func (w *Webhook) HTTPHandler() http.Handler { http.Error(rw, err.Error(), http.StatusBadRequest) return } - fmt.Printf("Received: %v\n", events) + for _, event := range events { select { case w.buffer <- event: // event sent case <-r.Context().Done(): - // request context cancelled + // request context canceled return case <-w.options.RootContext.Done(): - // root context cancelled - the whole event processing has been stopped + // root context canceled - the whole event processing has been stopped return case <-time.After(1 * time.Second): // timeout, most probably the channel is full - // TODO: log this } } rw.WriteHeader(http.StatusOK) }) } -func (nd *Webhook) process() { +func (w *Webhook) process() { for { select { - case event := <-nd.buffer: - handler, ok := nd.handlers.load(event.Type) + case event := <-w.buffer: + handler, ok := w.handlers.load(event.Type) if !ok { - fmt.Printf("No handlers for %s event type", event.Type) continue } model := reflect.New(handler.ModelType).Interface() if err := json.Unmarshal(event.Content, model); err != nil { - fmt.Println("Cannot unmarshall the content json") continue } handler.Caller.Call([]reflect.Value{reflect.ValueOf(model)}) - case <-nd.options.RootContext.Done(): + case <-w.options.RootContext.Done(): return } } From c675400c43b6e2b6c585183c48c922e0705794e9 Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Thu, 4 Jul 2024 15:12:34 +0200 Subject: [PATCH 11/17] feat(SPV-848): temporary pseudoversion of models --- examples/go.mod | 2 +- examples/go.sum | 2 ++ go.mod | 2 +- go.sum | 2 ++ 4 files changed, 6 insertions(+), 2 deletions(-) diff --git a/examples/go.mod b/examples/go.mod index 7b7b0bd..c197ac0 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -6,7 +6,7 @@ replace github.com/bitcoin-sv/spv-wallet-go-client => ../ require ( github.com/bitcoin-sv/spv-wallet-go-client v0.0.0-00010101000000-000000000000 - github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15 + github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e ) require ( diff --git a/examples/go.sum b/examples/go.sum index 99f6fce..6a437d9 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -4,6 +4,8 @@ github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.14.0.20240626082725-2c073c53 github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.14.0.20240626082725-2c073c5330a6/go.mod h1:u3gnRDS3uHWZNM2qbYATTpN+mAphyozCJrYIKGwBX7k= github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15 h1:Qjp9gSe1XlBwADgDlkaIGuzqNoQwktu1DuB6tzurdQI= github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e h1:Bw8bq7YUvMSNwRNQUm8gFKakICyNk8ScBqhJ9LFr54o= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40= github.com/bitcoinschema/go-bitcoin/v2 v2.0.5 h1:Sgh5Eb746Zck/46rFDrZZEXZWyO53fMuWYhNoZa1tck= github.com/bitcoinschema/go-bitcoin/v2 v2.0.5/go.mod h1:JjO1ivfZv6vhK0uAXzyH08AAHlzNMAfnyK1Fiv9r4ZA= github.com/bitcoinsv/bsvd v0.0.0-20190609155523-4c29707f7173 h1:2yTIV9u7H0BhRDGXH5xrAwAz7XibWJtX2dNezMeNsUo= diff --git a/go.mod b/go.mod index 523c5d7..33600ec 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/bitcoin-sv/spv-wallet-go-client go 1.22.4 require ( - github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15 + github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e github.com/bitcoinschema/go-bitcoin/v2 v2.0.5 github.com/libsv/go-bk v0.1.6 github.com/libsv/go-bt/v2 v2.2.5 diff --git a/go.sum b/go.sum index bc4ae1f..787a667 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.14.0.20240626082725-2c073c53 github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.14.0.20240626082725-2c073c5330a6/go.mod h1:u3gnRDS3uHWZNM2qbYATTpN+mAphyozCJrYIKGwBX7k= github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15 h1:Qjp9gSe1XlBwADgDlkaIGuzqNoQwktu1DuB6tzurdQI= github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e h1:Bw8bq7YUvMSNwRNQUm8gFKakICyNk8ScBqhJ9LFr54o= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40= github.com/bitcoinschema/go-bitcoin/v2 v2.0.5 h1:Sgh5Eb746Zck/46rFDrZZEXZWyO53fMuWYhNoZa1tck= github.com/bitcoinschema/go-bitcoin/v2 v2.0.5/go.mod h1:JjO1ivfZv6vhK0uAXzyH08AAHlzNMAfnyK1Fiv9r4ZA= github.com/bitcoinsv/bsvd v0.0.0-20190609155523-4c29707f7173 h1:2yTIV9u7H0BhRDGXH5xrAwAz7XibWJtX2dNezMeNsUo= From 042514b00900ee61cc541921251c5f22ac8c24b5 Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Thu, 4 Jul 2024 15:25:55 +0200 Subject: [PATCH 12/17] feat(SPV-848): gracafully shutdown in webhooks example --- examples/webhooks/webhooks.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/examples/webhooks/webhooks.go b/examples/webhooks/webhooks.go index 2b612ce..b71e7b1 100644 --- a/examples/webhooks/webhooks.go +++ b/examples/webhooks/webhooks.go @@ -7,6 +7,9 @@ import ( "context" "fmt" "net/http" + "os" + "os/signal" + "syscall" "time" walletclient "github.com/bitcoin-sv/spv-wallet-go-client" @@ -48,17 +51,27 @@ func main() { panic(err) } + server := http.Server{ + Addr: ":5005", + Handler: nil, + ReadHeaderTimeout: time.Second * 10, + } go func() { - _ = http.ListenAndServe(":5005", nil) + _ = server.ListenAndServe() }() - <-time.After(30 * time.Second) + // wait for signal to shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan fmt.Printf("Unsubscribing...\n") - err = wh.Unsubscribe(context.Background()) - if err != nil { + if err = wh.Unsubscribe(context.Background()); err != nil { panic(err) } fmt.Printf("Shutting down...\n") + if err = server.Shutdown(context.Background()); err != nil { + panic(err) + } } From 87c454e43686b882ac90143e8e0298e55a4edd9c Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Thu, 4 Jul 2024 15:30:55 +0200 Subject: [PATCH 13/17] feat(SPV-848): use actual models for subscribe & unsubscribe --- http.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/http.go b/http.go index a107f65..a66add4 100644 --- a/http.go +++ b/http.go @@ -1133,33 +1133,29 @@ func (wc *WalletClient) SendToRecipients(ctx context.Context, recipients []*Reci return wc.RecordTransaction(ctx, hex, draft.ID, metadata) } +// AdminSubscribeWebhook subscribes to a webhook to receive notifications from spv-wallet func (wc *WalletClient) AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) error { - requestModel := struct { - URL string `json:"url"` - TokenHeader string `json:"tokenHeader"` - TokenValue string `json:"tokenValue"` - }{ + requestModel := models.SubscribeRequestBody{ URL: webhookURL, TokenHeader: tokenHeader, TokenValue: tokenValue, } rawJSON, err := json.Marshal(requestModel) if err != nil { - return WrapError(nil) + return WrapError(err) } err = wc.doHTTPRequest(ctx, http.MethodPost, "/admin/webhooks/subscribe", rawJSON, wc.adminXPriv, true, nil) return WrapError(err) } +// AdminUnsubscribeWebhook unsubscribes from a webhook func (wc *WalletClient) AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) error { - requestModel := struct { - URL string `json:"url"` - }{ + requestModel := models.UnsubscribeRequestBody{ URL: webhookURL, } rawJSON, err := json.Marshal(requestModel) if err != nil { - return WrapError(nil) + return WrapError(err) } err = wc.doHTTPRequest(ctx, http.MethodPost, "/admin/webhooks/unsubscribe", rawJSON, wc.adminXPriv, true, nil) return err From 216e65cb2042cbf985ba5906942dfcefd1ffc5b8 Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Tue, 9 Jul 2024 15:16:26 +0200 Subject: [PATCH 14/17] feat(SPV-848): adjust to the review --- examples/list_transactions/list_transactions.go | 4 +--- http.go | 4 ++-- notifications/registerer.go | 9 +-------- 3 files changed, 4 insertions(+), 13 deletions(-) diff --git a/examples/list_transactions/list_transactions.go b/examples/list_transactions/list_transactions.go index b83e190..d1d0c6f 100644 --- a/examples/list_transactions/list_transactions.go +++ b/examples/list_transactions/list_transactions.go @@ -23,9 +23,7 @@ func main() { client := walletclient.NewWithXPriv(server, examples.ExampleXPriv) ctx := context.Background() - metadata := map[string]any{ - "note": "user-id-123", - } + metadata := map[string]any{} conditions := filter.TransactionFilter{} queryParams := filter.QueryParams{} diff --git a/http.go b/http.go index a66add4..9b95d6f 100644 --- a/http.go +++ b/http.go @@ -1144,7 +1144,7 @@ func (wc *WalletClient) AdminSubscribeWebhook(ctx context.Context, webhookURL, t if err != nil { return WrapError(err) } - err = wc.doHTTPRequest(ctx, http.MethodPost, "/admin/webhooks/subscribe", rawJSON, wc.adminXPriv, true, nil) + err = wc.doHTTPRequest(ctx, http.MethodPost, "/admin/webhooks/subscribtion", rawJSON, wc.adminXPriv, true, nil) return WrapError(err) } @@ -1157,6 +1157,6 @@ func (wc *WalletClient) AdminUnsubscribeWebhook(ctx context.Context, webhookURL if err != nil { return WrapError(err) } - err = wc.doHTTPRequest(ctx, http.MethodPost, "/admin/webhooks/unsubscribe", rawJSON, wc.adminXPriv, true, nil) + err = wc.doHTTPRequest(ctx, http.MethodDelete, "/admin/webhooks/subscribtion", rawJSON, wc.adminXPriv, true, nil) return err } diff --git a/notifications/registerer.go b/notifications/registerer.go index eb6bc31..edaf2e9 100644 --- a/notifications/registerer.go +++ b/notifications/registerer.go @@ -1,7 +1,6 @@ package notifications import ( - "errors" "reflect" "github.com/bitcoin-sv/spv-wallet/models" @@ -15,14 +14,8 @@ type eventHandler struct { // RegisterHandler - registers a handler for a specific event type func RegisterHandler[EventType models.Events](nd *Webhook, handlerFunction func(event *EventType)) error { handlerValue := reflect.ValueOf(handlerFunction) - if handlerValue.Kind() != reflect.Func { - return errors.New("handlerFunction must be a function") - } - modelType := handlerValue.Type().In(0) - if modelType.Kind() == reflect.Ptr { - modelType = modelType.Elem() - } + modelType := handlerValue.Type().In(0).Elem() name := modelType.Name() nd.handlers.store(name, &eventHandler{ From 4f2f54bad12679d786e51982b437a1d53790f3f1 Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Tue, 9 Jul 2024 15:49:23 +0200 Subject: [PATCH 15/17] feat(SPV-848): typo --- http.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/http.go b/http.go index 9b95d6f..456a0e4 100644 --- a/http.go +++ b/http.go @@ -1144,7 +1144,7 @@ func (wc *WalletClient) AdminSubscribeWebhook(ctx context.Context, webhookURL, t if err != nil { return WrapError(err) } - err = wc.doHTTPRequest(ctx, http.MethodPost, "/admin/webhooks/subscribtion", rawJSON, wc.adminXPriv, true, nil) + err = wc.doHTTPRequest(ctx, http.MethodPost, "/admin/webhooks/subscription", rawJSON, wc.adminXPriv, true, nil) return WrapError(err) } @@ -1157,6 +1157,6 @@ func (wc *WalletClient) AdminUnsubscribeWebhook(ctx context.Context, webhookURL if err != nil { return WrapError(err) } - err = wc.doHTTPRequest(ctx, http.MethodDelete, "/admin/webhooks/subscribtion", rawJSON, wc.adminXPriv, true, nil) + err = wc.doHTTPRequest(ctx, http.MethodDelete, "/admin/webhooks/subscription", rawJSON, wc.adminXPriv, true, nil) return err } From ec1a7593675fa7f8fc6690fe8b9dc002c590d393 Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Thu, 11 Jul 2024 12:47:51 +0200 Subject: [PATCH 16/17] feat(SPV-848): webhook subscriptions (with s) --- http.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/http.go b/http.go index 456a0e4..8bf6144 100644 --- a/http.go +++ b/http.go @@ -1144,7 +1144,7 @@ func (wc *WalletClient) AdminSubscribeWebhook(ctx context.Context, webhookURL, t if err != nil { return WrapError(err) } - err = wc.doHTTPRequest(ctx, http.MethodPost, "/admin/webhooks/subscription", rawJSON, wc.adminXPriv, true, nil) + err = wc.doHTTPRequest(ctx, http.MethodPost, "/admin/webhooks/subscriptions", rawJSON, wc.adminXPriv, true, nil) return WrapError(err) } @@ -1157,6 +1157,6 @@ func (wc *WalletClient) AdminUnsubscribeWebhook(ctx context.Context, webhookURL if err != nil { return WrapError(err) } - err = wc.doHTTPRequest(ctx, http.MethodDelete, "/admin/webhooks/subscription", rawJSON, wc.adminXPriv, true, nil) + err = wc.doHTTPRequest(ctx, http.MethodDelete, "/admin/webhooks/subscriptions", rawJSON, wc.adminXPriv, true, nil) return err } From 68be00f467a7c6b632b4e290ce5841bbc2e146d1 Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Fri, 12 Jul 2024 11:01:37 +0200 Subject: [PATCH 17/17] feat(SPV-848): update go mods --- examples/go.mod | 2 +- examples/go.sum | 2 ++ go.mod | 2 +- go.sum | 2 ++ 4 files changed, 6 insertions(+), 2 deletions(-) diff --git a/examples/go.mod b/examples/go.mod index c197ac0..a012b72 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -6,7 +6,7 @@ replace github.com/bitcoin-sv/spv-wallet-go-client => ../ require ( github.com/bitcoin-sv/spv-wallet-go-client v0.0.0-00010101000000-000000000000 - github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e + github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.16 ) require ( diff --git a/examples/go.sum b/examples/go.sum index 6a437d9..da1cca2 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -6,6 +6,8 @@ github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15 h1:Qjp9gSe1XlBwADgDlkaIGu github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40= github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e h1:Bw8bq7YUvMSNwRNQUm8gFKakICyNk8ScBqhJ9LFr54o= github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.16 h1:iHLUofGb40sQ31KpVwtdjuKVs3W/vW1w8exF8Vidvfc= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.16/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40= github.com/bitcoinschema/go-bitcoin/v2 v2.0.5 h1:Sgh5Eb746Zck/46rFDrZZEXZWyO53fMuWYhNoZa1tck= github.com/bitcoinschema/go-bitcoin/v2 v2.0.5/go.mod h1:JjO1ivfZv6vhK0uAXzyH08AAHlzNMAfnyK1Fiv9r4ZA= github.com/bitcoinsv/bsvd v0.0.0-20190609155523-4c29707f7173 h1:2yTIV9u7H0BhRDGXH5xrAwAz7XibWJtX2dNezMeNsUo= diff --git a/go.mod b/go.mod index 33600ec..b7ae30a 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/bitcoin-sv/spv-wallet-go-client go 1.22.4 require ( - github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e + github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.16 github.com/bitcoinschema/go-bitcoin/v2 v2.0.5 github.com/libsv/go-bk v0.1.6 github.com/libsv/go-bt/v2 v2.2.5 diff --git a/go.sum b/go.sum index 787a667..cfe05a0 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15 h1:Qjp9gSe1XlBwADgDlkaIGu github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40= github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e h1:Bw8bq7YUvMSNwRNQUm8gFKakICyNk8ScBqhJ9LFr54o= github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.16 h1:iHLUofGb40sQ31KpVwtdjuKVs3W/vW1w8exF8Vidvfc= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.16/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40= github.com/bitcoinschema/go-bitcoin/v2 v2.0.5 h1:Sgh5Eb746Zck/46rFDrZZEXZWyO53fMuWYhNoZa1tck= github.com/bitcoinschema/go-bitcoin/v2 v2.0.5/go.mod h1:JjO1ivfZv6vhK0uAXzyH08AAHlzNMAfnyK1Fiv9r4ZA= github.com/bitcoinsv/bsvd v0.0.0-20190609155523-4c29707f7173 h1:2yTIV9u7H0BhRDGXH5xrAwAz7XibWJtX2dNezMeNsUo=