diff --git a/examples/Taskfile.yml b/examples/Taskfile.yml index b67c760..f6116bd 100644 --- a/examples/Taskfile.yml +++ b/examples/Taskfile.yml @@ -66,3 +66,8 @@ tasks: cmds: - echo "running generate_totp..." - go run ./generate_totp/generate_totp.go + webhooks: + desc: "running webhooks..." + cmds: + - echo "running webhooks..." + - go run ./webhooks/webhooks.go diff --git a/examples/go.mod b/examples/go.mod index 7b7b0bd..a012b72 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -6,7 +6,7 @@ replace github.com/bitcoin-sv/spv-wallet-go-client => ../ require ( github.com/bitcoin-sv/spv-wallet-go-client v0.0.0-00010101000000-000000000000 - github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15 + github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.16 ) require ( diff --git a/examples/go.sum b/examples/go.sum index 99f6fce..da1cca2 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -4,6 +4,10 @@ github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.14.0.20240626082725-2c073c53 github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.14.0.20240626082725-2c073c5330a6/go.mod h1:u3gnRDS3uHWZNM2qbYATTpN+mAphyozCJrYIKGwBX7k= github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15 h1:Qjp9gSe1XlBwADgDlkaIGuzqNoQwktu1DuB6tzurdQI= github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e h1:Bw8bq7YUvMSNwRNQUm8gFKakICyNk8ScBqhJ9LFr54o= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.16 h1:iHLUofGb40sQ31KpVwtdjuKVs3W/vW1w8exF8Vidvfc= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.16/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40= github.com/bitcoinschema/go-bitcoin/v2 v2.0.5 h1:Sgh5Eb746Zck/46rFDrZZEXZWyO53fMuWYhNoZa1tck= github.com/bitcoinschema/go-bitcoin/v2 v2.0.5/go.mod h1:JjO1ivfZv6vhK0uAXzyH08AAHlzNMAfnyK1Fiv9r4ZA= github.com/bitcoinsv/bsvd v0.0.0-20190609155523-4c29707f7173 h1:2yTIV9u7H0BhRDGXH5xrAwAz7XibWJtX2dNezMeNsUo= diff --git a/examples/webhooks/webhooks.go b/examples/webhooks/webhooks.go new file mode 100644 index 0000000..b71e7b1 --- /dev/null +++ b/examples/webhooks/webhooks.go @@ -0,0 +1,77 @@ +/* +Package main - send_op_return example +*/ +package main + +import ( + "context" + "fmt" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + walletclient "github.com/bitcoin-sv/spv-wallet-go-client" + "github.com/bitcoin-sv/spv-wallet-go-client/examples" + "github.com/bitcoin-sv/spv-wallet-go-client/notifications" + "github.com/bitcoin-sv/spv-wallet/models" +) + +func main() { + defer examples.HandlePanic() + + examples.CheckIfAdminKeyExists() + + client := walletclient.NewWithAdminKey("http://localhost:3003/v1", examples.ExampleAdminKey) + wh := notifications.NewWebhook( + client, + "http://localhost:5005/notification", + notifications.WithToken("Authorization", "this-is-the-token"), + notifications.WithProcessors(3), + ) + err := wh.Subscribe(context.Background()) + if err != nil { + panic(err) + } + + http.Handle("/notification", wh.HTTPHandler()) + + if err = notifications.RegisterHandler(wh, func(gpe *models.StringEvent) { + time.Sleep(50 * time.Millisecond) // simulate processing time + fmt.Printf("Processing event-string: %s\n", gpe.Value) + }); err != nil { + panic(err) + } + + if err = notifications.RegisterHandler(wh, func(gpe *models.TransactionEvent) { + time.Sleep(50 * time.Millisecond) // simulate processing time + fmt.Printf("Processing event-transaction: XPubID: %s, TxID: %s, Status: %s\n", gpe.XPubID, gpe.TransactionID, gpe.Status) + }); err != nil { + panic(err) + } + + server := http.Server{ + Addr: ":5005", + Handler: nil, + ReadHeaderTimeout: time.Second * 10, + } + go func() { + _ = server.ListenAndServe() + }() + + // wait for signal to shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan + + fmt.Printf("Unsubscribing...\n") + if err = wh.Unsubscribe(context.Background()); err != nil { + panic(err) + } + + fmt.Printf("Shutting down...\n") + if err = server.Shutdown(context.Background()); err != nil { + panic(err) + } +} diff --git a/go.mod b/go.mod index 523c5d7..b7ae30a 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/bitcoin-sv/spv-wallet-go-client go 1.22.4 require ( - github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15 + github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.16 github.com/bitcoinschema/go-bitcoin/v2 v2.0.5 github.com/libsv/go-bk v0.1.6 github.com/libsv/go-bt/v2 v2.2.5 diff --git a/go.sum b/go.sum index bc4ae1f..cfe05a0 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,10 @@ github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.14.0.20240626082725-2c073c53 github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.14.0.20240626082725-2c073c5330a6/go.mod h1:u3gnRDS3uHWZNM2qbYATTpN+mAphyozCJrYIKGwBX7k= github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15 h1:Qjp9gSe1XlBwADgDlkaIGuzqNoQwktu1DuB6tzurdQI= github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e h1:Bw8bq7YUvMSNwRNQUm8gFKakICyNk8ScBqhJ9LFr54o= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.15.0.20240704130751-f3156fd52a0e/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.16 h1:iHLUofGb40sQ31KpVwtdjuKVs3W/vW1w8exF8Vidvfc= +github.com/bitcoin-sv/spv-wallet/models v1.0.0-beta.16/go.mod h1:Ni6SFkmMjV39Bg4FtlgPAsnsiJUfRDVEPlbzTZa8z40= github.com/bitcoinschema/go-bitcoin/v2 v2.0.5 h1:Sgh5Eb746Zck/46rFDrZZEXZWyO53fMuWYhNoZa1tck= github.com/bitcoinschema/go-bitcoin/v2 v2.0.5/go.mod h1:JjO1ivfZv6vhK0uAXzyH08AAHlzNMAfnyK1Fiv9r4ZA= github.com/bitcoinsv/bsvd v0.0.0-20190609155523-4c29707f7173 h1:2yTIV9u7H0BhRDGXH5xrAwAz7XibWJtX2dNezMeNsUo= diff --git a/http.go b/http.go index 4b0b66d..8bf6144 100644 --- a/http.go +++ b/http.go @@ -1132,3 +1132,31 @@ func (wc *WalletClient) SendToRecipients(ctx context.Context, recipients []*Reci return wc.RecordTransaction(ctx, hex, draft.ID, metadata) } + +// AdminSubscribeWebhook subscribes to a webhook to receive notifications from spv-wallet +func (wc *WalletClient) AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) error { + requestModel := models.SubscribeRequestBody{ + URL: webhookURL, + TokenHeader: tokenHeader, + TokenValue: tokenValue, + } + rawJSON, err := json.Marshal(requestModel) + if err != nil { + return WrapError(err) + } + err = wc.doHTTPRequest(ctx, http.MethodPost, "/admin/webhooks/subscriptions", rawJSON, wc.adminXPriv, true, nil) + return WrapError(err) +} + +// AdminUnsubscribeWebhook unsubscribes from a webhook +func (wc *WalletClient) AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) error { + requestModel := models.UnsubscribeRequestBody{ + URL: webhookURL, + } + rawJSON, err := json.Marshal(requestModel) + if err != nil { + return WrapError(err) + } + err = wc.doHTTPRequest(ctx, http.MethodDelete, "/admin/webhooks/subscriptions", rawJSON, wc.adminXPriv, true, nil) + return err +} diff --git a/notifications/eventsMap.go b/notifications/eventsMap.go new file mode 100644 index 0000000..00e0b52 --- /dev/null +++ b/notifications/eventsMap.go @@ -0,0 +1,25 @@ +package notifications + +import "sync" + +type eventsMap struct { + registered *sync.Map +} + +func newEventsMap() *eventsMap { + return &eventsMap{ + registered: &sync.Map{}, + } +} + +func (em *eventsMap) store(name string, handler *eventHandler) { + em.registered.Store(name, handler) +} + +func (em *eventsMap) load(name string) (*eventHandler, bool) { + h, ok := em.registered.Load(name) + if !ok { + return nil, false + } + return h.(*eventHandler), true +} diff --git a/notifications/interface.go b/notifications/interface.go new file mode 100644 index 0000000..610ea2b --- /dev/null +++ b/notifications/interface.go @@ -0,0 +1,9 @@ +package notifications + +import "context" + +// WebhookSubscriber - interface for subscribing and unsubscribing to webhooks +type WebhookSubscriber interface { + AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) error + AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) error +} diff --git a/notifications/options.go b/notifications/options.go new file mode 100644 index 0000000..436a3ed --- /dev/null +++ b/notifications/options.go @@ -0,0 +1,58 @@ +package notifications + +import ( + "context" + "runtime" +) + +// WebhookOptions - options for the webhook +type WebhookOptions struct { + TokenHeader string + TokenValue string + BufferSize int + RootContext context.Context + Processors int +} + +// NewWebhookOptions - creates a new webhook options +func NewWebhookOptions() *WebhookOptions { + return &WebhookOptions{ + TokenHeader: "", + TokenValue: "", + BufferSize: 100, + Processors: runtime.NumCPU(), + RootContext: context.Background(), + } +} + +// WebhookOpts - functional options for the webhook +type WebhookOpts = func(*WebhookOptions) + +// WithToken - sets the token header and value +func WithToken(tokenHeader, tokenValue string) WebhookOpts { + return func(w *WebhookOptions) { + w.TokenHeader = tokenHeader + w.TokenValue = tokenValue + } +} + +// WithBufferSize - sets the buffer size +func WithBufferSize(size int) WebhookOpts { + return func(w *WebhookOptions) { + w.BufferSize = size + } +} + +// WithRootContext - sets the root context +func WithRootContext(ctx context.Context) WebhookOpts { + return func(w *WebhookOptions) { + w.RootContext = ctx + } +} + +// WithProcessors - sets the number of concurrent loops which will process the events +func WithProcessors(count int) WebhookOpts { + return func(w *WebhookOptions) { + w.Processors = count + } +} diff --git a/notifications/registerer.go b/notifications/registerer.go new file mode 100644 index 0000000..edaf2e9 --- /dev/null +++ b/notifications/registerer.go @@ -0,0 +1,27 @@ +package notifications + +import ( + "reflect" + + "github.com/bitcoin-sv/spv-wallet/models" +) + +type eventHandler struct { + Caller reflect.Value + ModelType reflect.Type +} + +// RegisterHandler - registers a handler for a specific event type +func RegisterHandler[EventType models.Events](nd *Webhook, handlerFunction func(event *EventType)) error { + handlerValue := reflect.ValueOf(handlerFunction) + + modelType := handlerValue.Type().In(0).Elem() + name := modelType.Name() + + nd.handlers.store(name, &eventHandler{ + Caller: handlerValue, + ModelType: modelType, + }) + + return nil +} diff --git a/notifications/webhook.go b/notifications/webhook.go new file mode 100644 index 0000000..0dd488d --- /dev/null +++ b/notifications/webhook.go @@ -0,0 +1,100 @@ +package notifications + +import ( + "context" + "encoding/json" + "net/http" + "reflect" + "time" + + "github.com/bitcoin-sv/spv-wallet/models" +) + +// Webhook - the webhook event receiver +type Webhook struct { + URL string + options *WebhookOptions + buffer chan *models.RawEvent + subscriber WebhookSubscriber + handlers *eventsMap +} + +// NewWebhook - creates a new webhook +func NewWebhook(subscriber WebhookSubscriber, url string, opts ...WebhookOpts) *Webhook { + options := NewWebhookOptions() + for _, opt := range opts { + opt(options) + } + + wh := &Webhook{ + URL: url, + options: options, + buffer: make(chan *models.RawEvent, options.BufferSize), + subscriber: subscriber, + handlers: newEventsMap(), + } + for i := 0; i < options.Processors; i++ { + go wh.process() + } + return wh +} + +// Subscribe - sends a subscription request to the spv-wallet +func (w *Webhook) Subscribe(ctx context.Context) error { + return w.subscriber.AdminSubscribeWebhook(ctx, w.URL, w.options.TokenHeader, w.options.TokenValue) +} + +// Unsubscribe - sends an unsubscription request to the spv-wallet +func (w *Webhook) Unsubscribe(ctx context.Context) error { + return w.subscriber.AdminUnsubscribeWebhook(ctx, w.URL) +} + +// HTTPHandler - returns an http handler for the webhook; it should be registered with the http server +func (w *Webhook) HTTPHandler() http.Handler { + return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + if w.options.TokenHeader != "" && r.Header.Get(w.options.TokenHeader) != w.options.TokenValue { + http.Error(rw, "Unauthorized", http.StatusUnauthorized) + return + } + var events []*models.RawEvent + if err := json.NewDecoder(r.Body).Decode(&events); err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + for _, event := range events { + select { + case w.buffer <- event: + // event sent + case <-r.Context().Done(): + // request context canceled + return + case <-w.options.RootContext.Done(): + // root context canceled - the whole event processing has been stopped + return + case <-time.After(1 * time.Second): + // timeout, most probably the channel is full + } + } + rw.WriteHeader(http.StatusOK) + }) +} + +func (w *Webhook) process() { + for { + select { + case event := <-w.buffer: + handler, ok := w.handlers.load(event.Type) + if !ok { + continue + } + model := reflect.New(handler.ModelType).Interface() + if err := json.Unmarshal(event.Content, model); err != nil { + continue + } + handler.Caller.Call([]reflect.Value{reflect.ValueOf(model)}) + case <-w.options.RootContext.Done(): + return + } + } +}