Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(SPV-848) notifications #246

Merged
merged 18 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions examples/Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,8 @@ tasks:
cmds:
- echo "running generate_totp..."
- go run ./generate_totp/generate_totp.go
webhooks:
desc: "running webhooks..."
cmds:
- echo "running webhooks..."
- go run ./webhooks/webhooks.go
2 changes: 1 addition & 1 deletion examples/go.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions examples/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 77 additions & 0 deletions examples/webhooks/webhooks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Package main - send_op_return example
*/
package main

import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"

walletclient "github.com/bitcoin-sv/spv-wallet-go-client"
"github.com/bitcoin-sv/spv-wallet-go-client/examples"
"github.com/bitcoin-sv/spv-wallet-go-client/notifications"
"github.com/bitcoin-sv/spv-wallet/models"
)

func main() {
defer examples.HandlePanic()
wregulski marked this conversation as resolved.
Show resolved Hide resolved

examples.CheckIfAdminKeyExists()

client := walletclient.NewWithAdminKey("http://localhost:3003/v1", examples.ExampleAdminKey)
wh := notifications.NewWebhook(
chris-4chain marked this conversation as resolved.
Show resolved Hide resolved
client,
"http://localhost:5005/notification",
notifications.WithToken("Authorization", "this-is-the-token"),
notifications.WithProcessors(3),
)
err := wh.Subscribe(context.Background())
if err != nil {
panic(err)
}

http.Handle("/notification", wh.HTTPHandler())

if err = notifications.RegisterHandler(wh, func(gpe *models.StringEvent) {
time.Sleep(50 * time.Millisecond) // simulate processing time
fmt.Printf("Processing event-string: %s\n", gpe.Value)
}); err != nil {
panic(err)
}

if err = notifications.RegisterHandler(wh, func(gpe *models.TransactionEvent) {
time.Sleep(50 * time.Millisecond) // simulate processing time
fmt.Printf("Processing event-transaction: XPubID: %s, TxID: %s, Status: %s\n", gpe.XPubID, gpe.TransactionID, gpe.Status)
}); err != nil {
panic(err)
}

server := http.Server{
Addr: ":5005",
Handler: nil,
ReadHeaderTimeout: time.Second * 10,
}
go func() {
_ = server.ListenAndServe()
}()

// wait for signal to shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan

fmt.Printf("Unsubscribing...\n")
if err = wh.Unsubscribe(context.Background()); err != nil {
panic(err)
}

fmt.Printf("Shutting down...\n")
if err = server.Shutdown(context.Background()); err != nil {
panic(err)
}
}
2 changes: 1 addition & 1 deletion go.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -1132,3 +1132,31 @@ func (wc *WalletClient) SendToRecipients(ctx context.Context, recipients []*Reci

return wc.RecordTransaction(ctx, hex, draft.ID, metadata)
}

// AdminSubscribeWebhook subscribes to a webhook to receive notifications from spv-wallet
func (wc *WalletClient) AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) error {
requestModel := models.SubscribeRequestBody{
URL: webhookURL,
TokenHeader: tokenHeader,
TokenValue: tokenValue,
}
rawJSON, err := json.Marshal(requestModel)
if err != nil {
return WrapError(err)
}
err = wc.doHTTPRequest(ctx, http.MethodPost, "/admin/webhooks/subscriptions", rawJSON, wc.adminXPriv, true, nil)
return WrapError(err)
}

// AdminUnsubscribeWebhook unsubscribes from a webhook
func (wc *WalletClient) AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) error {
requestModel := models.UnsubscribeRequestBody{
URL: webhookURL,
}
rawJSON, err := json.Marshal(requestModel)
if err != nil {
return WrapError(err)
}
err = wc.doHTTPRequest(ctx, http.MethodDelete, "/admin/webhooks/subscriptions", rawJSON, wc.adminXPriv, true, nil)
return err
}
25 changes: 25 additions & 0 deletions notifications/eventsMap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package notifications

import "sync"

type eventsMap struct {
registered *sync.Map
}

func newEventsMap() *eventsMap {
return &eventsMap{
registered: &sync.Map{},
}
}

func (em *eventsMap) store(name string, handler *eventHandler) {
em.registered.Store(name, handler)
}

func (em *eventsMap) load(name string) (*eventHandler, bool) {
h, ok := em.registered.Load(name)
if !ok {
return nil, false
}
return h.(*eventHandler), true
}
9 changes: 9 additions & 0 deletions notifications/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package notifications

import "context"

// WebhookSubscriber - interface for subscribing and unsubscribing to webhooks
type WebhookSubscriber interface {
AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) error
AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) error
}
58 changes: 58 additions & 0 deletions notifications/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package notifications

import (
"context"
"runtime"
)

// WebhookOptions - options for the webhook
type WebhookOptions struct {
TokenHeader string
TokenValue string
BufferSize int
RootContext context.Context
Processors int
}

// NewWebhookOptions - creates a new webhook options
func NewWebhookOptions() *WebhookOptions {
return &WebhookOptions{
TokenHeader: "",
TokenValue: "",
BufferSize: 100,
Processors: runtime.NumCPU(),
RootContext: context.Background(),
}
}

// WebhookOpts - functional options for the webhook
type WebhookOpts = func(*WebhookOptions)

// WithToken - sets the token header and value
func WithToken(tokenHeader, tokenValue string) WebhookOpts {
return func(w *WebhookOptions) {
w.TokenHeader = tokenHeader
w.TokenValue = tokenValue
}
}

// WithBufferSize - sets the buffer size
func WithBufferSize(size int) WebhookOpts {
return func(w *WebhookOptions) {
w.BufferSize = size
}
}

// WithRootContext - sets the root context
func WithRootContext(ctx context.Context) WebhookOpts {
return func(w *WebhookOptions) {
w.RootContext = ctx
}
}

// WithProcessors - sets the number of concurrent loops which will process the events
func WithProcessors(count int) WebhookOpts {
return func(w *WebhookOptions) {
w.Processors = count
}
}
27 changes: 27 additions & 0 deletions notifications/registerer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package notifications

import (
"reflect"

"github.com/bitcoin-sv/spv-wallet/models"
)

type eventHandler struct {
Caller reflect.Value
ModelType reflect.Type
}

// RegisterHandler - registers a handler for a specific event type
func RegisterHandler[EventType models.Events](nd *Webhook, handlerFunction func(event *EventType)) error {
handlerValue := reflect.ValueOf(handlerFunction)

modelType := handlerValue.Type().In(0).Elem()
name := modelType.Name()

nd.handlers.store(name, &eventHandler{
Caller: handlerValue,
ModelType: modelType,
})

return nil
}
100 changes: 100 additions & 0 deletions notifications/webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package notifications

import (
"context"
"encoding/json"
"net/http"
"reflect"
"time"

"github.com/bitcoin-sv/spv-wallet/models"
)

// Webhook - the webhook event receiver
type Webhook struct {
URL string
options *WebhookOptions
buffer chan *models.RawEvent
subscriber WebhookSubscriber
handlers *eventsMap
}

// NewWebhook - creates a new webhook
func NewWebhook(subscriber WebhookSubscriber, url string, opts ...WebhookOpts) *Webhook {
options := NewWebhookOptions()
for _, opt := range opts {
opt(options)
}

wh := &Webhook{
URL: url,
options: options,
buffer: make(chan *models.RawEvent, options.BufferSize),
subscriber: subscriber,
handlers: newEventsMap(),
}
for i := 0; i < options.Processors; i++ {
go wh.process()
}
return wh
}

// Subscribe - sends a subscription request to the spv-wallet
func (w *Webhook) Subscribe(ctx context.Context) error {
return w.subscriber.AdminSubscribeWebhook(ctx, w.URL, w.options.TokenHeader, w.options.TokenValue)
}

// Unsubscribe - sends an unsubscription request to the spv-wallet
func (w *Webhook) Unsubscribe(ctx context.Context) error {
return w.subscriber.AdminUnsubscribeWebhook(ctx, w.URL)
}

// HTTPHandler - returns an http handler for the webhook; it should be registered with the http server
func (w *Webhook) HTTPHandler() http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
if w.options.TokenHeader != "" && r.Header.Get(w.options.TokenHeader) != w.options.TokenValue {
http.Error(rw, "Unauthorized", http.StatusUnauthorized)
return
}
var events []*models.RawEvent
if err := json.NewDecoder(r.Body).Decode(&events); err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}

for _, event := range events {
select {
case w.buffer <- event:
// event sent
case <-r.Context().Done():
// request context canceled
return
case <-w.options.RootContext.Done():
// root context canceled - the whole event processing has been stopped
return
case <-time.After(1 * time.Second):
// timeout, most probably the channel is full
}
}
rw.WriteHeader(http.StatusOK)
})
}

func (w *Webhook) process() {
for {
select {
case event := <-w.buffer:
handler, ok := w.handlers.load(event.Type)
if !ok {
continue
}
model := reflect.New(handler.ModelType).Interface()
if err := json.Unmarshal(event.Content, model); err != nil {
continue
}
handler.Caller.Call([]reflect.Value{reflect.ValueOf(model)})
case <-w.options.RootContext.Done():
return
}
}
}
Loading