From 65b972e3b4a0d874512b8431becc262a17acd56c Mon Sep 17 00:00:00 2001 From: Dylan Murray Date: Tue, 30 Jan 2024 14:45:38 -0500 Subject: [PATCH] Changes to support confiscation --- app/api/base/alerts.go | 43 +++++++++++++++ app/api/base/routes.go | 3 ++ app/models/alert_message.go | 29 +++++++++++ app/models/alert_message_confiscate_utxo.go | 58 +++++++++++++-------- app/p2p/server.go | 24 +++++---- 5 files changed, 126 insertions(+), 31 deletions(-) create mode 100644 app/api/base/alerts.go diff --git a/app/api/base/alerts.go b/app/api/base/alerts.go new file mode 100644 index 0000000..dc867ed --- /dev/null +++ b/app/api/base/alerts.go @@ -0,0 +1,43 @@ +package base + +import ( + "encoding/json" + "errors" + "net/http" + + "github.com/bitcoin-sv/alert-system/app" + "github.com/bitcoin-sv/alert-system/app/models" + "github.com/bitcoin-sv/alert-system/app/models/model" + "github.com/julienschmidt/httprouter" + apirouter "github.com/mrz1836/go-api-router" +) + +// HealthResponse is the response for the health endpoint +type AlertsResponse struct { + Alerts []*models.AlertMessage `json:"alerts"` + LatestSequence uint32 `json:"latest_sequence"` +} + +// health will return the health of the API and the current alert +func (a *Action) alerts(w http.ResponseWriter, req *http.Request, _ httprouter.Params) { + + // Get the latest alert + alerts, err := models.GetAllAlerts(req.Context(), nil, model.WithAllDependencies(a.Config)) + if err != nil { + app.APIErrorResponse(w, req, http.StatusBadRequest, err) + return + } else if alerts == nil { + app.APIErrorResponse(w, req, http.StatusNotFound, errors.New("alert not found")) + return + } + + // Return the response + _ = apirouter.ReturnJSONEncode( + w, + http.StatusOK, + json.NewEncoder(w), + AlertsResponse{ + Alerts: alerts, + LatestSequence: alerts[len(alerts)-1].SequenceNumber, + }, []string{"alerts", "latest_sequence"}) +} diff --git a/app/api/base/routes.go b/app/api/base/routes.go index 5ba4901..3ac6fc4 100644 --- a/app/api/base/routes.go +++ b/app/api/base/routes.go @@ -36,4 +36,7 @@ func RegisterRoutes(router *apirouter.Router, conf *config.Config) { // Set the health request router.HTTPRouter.GET("/health", action.Request(router, action.health)) + + // Set the get alerts request + router.HTTPRouter.GET("/alerts", action.Request(router, action.alerts)) } diff --git a/app/models/alert_message.go b/app/models/alert_message.go index de835f4..8d4a215 100644 --- a/app/models/alert_message.go +++ b/app/models/alert_message.go @@ -376,6 +376,35 @@ func GetLatestAlert(ctx context.Context, metadata *model.Metadata, opts ...model return modelItems[0], nil } +// GetAllAlerts +func GetAllAlerts(ctx context.Context, metadata *model.Metadata, opts ...model.Options) ([]*AlertMessage, error) { + // Set the conditions + conditions := &map[string]interface{}{ + utils.FieldDeletedAt: map[string]interface{}{ // IS NULL + utils.ExistsCondition: false, + }, + } + + // Set the query params + queryParams := &datastore.QueryParams{ + OrderByField: utils.FieldSequenceNumber, + SortDirection: utils.SortAscending, + } + + // Get the record + modelItems := make([]*AlertMessage, 0) + if err := model.GetModelsByConditions( + ctx, model.NameAlertMessage, &modelItems, metadata, conditions, queryParams, opts..., + ); err != nil { + return nil, err + } else if len(modelItems) == 0 { + return nil, nil + } + + // Return the first item (only item) + return modelItems, nil +} + // GetAllUnprocessedAlerts will get all alerts that weren't successfully processed func GetAllUnprocessedAlerts(ctx context.Context, metadata *model.Metadata, opts ...model.Options) ([]*AlertMessage, error) { diff --git a/app/models/alert_message_confiscate_utxo.go b/app/models/alert_message_confiscate_utxo.go index 7737aa7..3d418ee 100644 --- a/app/models/alert_message_confiscate_utxo.go +++ b/app/models/alert_message_confiscate_utxo.go @@ -1,12 +1,14 @@ package models import ( + "bytes" "context" "encoding/binary" "encoding/hex" + "errors" "fmt" - "github.com/libsv/go-bn/models" + "github.com/libsv/go-p2p/wire" ) // AlertMessageConfiscateTransaction is a confiscate utxo alert @@ -17,36 +19,50 @@ type AlertMessageConfiscateTransaction struct { // ConfiscateTransaction defines the parameters for the confiscation transaction type ConfiscateTransaction struct { - EnforceAtHeight [8]byte - ID [32]byte + EnforceAtHeight uint64 + Hex []byte } // Read reads the alert func (a *AlertMessageConfiscateTransaction) Read(raw []byte) error { a.Config().Services.Log.Infof("%x", raw) - if len(raw) < 40 { - return fmt.Errorf("confiscation alert is less than 41 bytes") - } - if len(raw)%40 != 0 { - return fmt.Errorf("confiscation alert is not a multiple of 41 bytes") + if len(raw) < 9 { + return fmt.Errorf("confiscation alert is less than 9 bytes") } - txCount := len(raw) / 40 + // TODO: assume for now only 1 confiscation tx in the alert for simplicity details := []models.ConfiscationTransactionDetails{} - for i := 0; i < txCount; i++ { - tx := ConfiscateTransaction{ - EnforceAtHeight: [8]byte(raw[:8]), - ID: [32]byte(raw[8:40]), - } - detail := models.ConfiscationTransactionDetails{ - ConfiscationTransaction: models.ConfiscationTransaction{ - EnforceAtHeight: int64(binary.LittleEndian.Uint64(tx.EnforceAtHeight[:])), - Hex: hex.EncodeToString(tx.ID[:]), - }, + enforceAtHeight := binary.LittleEndian.Uint64(raw[0:8]) + buf := bytes.NewReader(raw[8:]) + + length, err := wire.ReadVarInt(buf, 0) + if err != nil { + return err + } + if length > uint64(buf.Len()) { + return errors.New("tx hex length is longer than the remaining buffer") + } + + // read the tx hex + var rawHex []byte + for i := uint64(0); i < length; i++ { + var b byte + if b, err = buf.ReadByte(); err != nil { + return fmt.Errorf("failed to read tx hex: %s", err.Error()) } - details = append(details, detail) - raw = raw[40:] + rawHex = append(rawHex, b) + } + + detail := models.ConfiscationTransactionDetails{ + ConfiscationTransaction: models.ConfiscationTransaction{ + EnforceAtHeight: int64(enforceAtHeight), + Hex: hex.EncodeToString(rawHex), + }, } + details = append(details, detail) + a.Transactions = details + a.Config().Services.Log.Infof("ConfiscateTransaction alert; enforceAt [%d]; hex [%s]", enforceAtHeight, hex.EncodeToString(rawHex)) + return nil } diff --git a/app/p2p/server.go b/app/p2p/server.go index 26159fe..36e463f 100644 --- a/app/p2p/server.go +++ b/app/p2p/server.go @@ -42,14 +42,16 @@ type ServerOptions struct { // Server is the P2P server type Server struct { // alertKeyTopicName string - connected bool - config *config.Config - host host.Host - privateKey *crypto.PrivKey - subscriptions map[string]*pubsub.Subscription - topicNames []string - topics map[string]*pubsub.Topic - dht *dht.IpfsDHT + connected bool + config *config.Config + host host.Host + privateKey *crypto.PrivKey + subscriptions map[string]*pubsub.Subscription + topicNames []string + topics map[string]*pubsub.Topic + dht *dht.IpfsDHT + quitAlertProcessingChannel chan bool + quitPeerDiscoveryChannel chan bool //peers []peer.AddrInfo } @@ -113,8 +115,8 @@ func (s *Server) Start(ctx context.Context) error { dutil.Advertise(ctx, routingDiscovery, topicName) } - _ = s.RunPeerDiscovery(ctx, routingDiscovery) - _ = s.RunAlertProcessingCron(ctx) + s.quitPeerDiscoveryChannel = s.RunPeerDiscovery(ctx, routingDiscovery) + s.quitAlertProcessingChannel = s.RunAlertProcessingCron(ctx) ps, err := pubsub.NewGossipSub(ctx, s.host, pubsub.WithDiscovery(routingDiscovery)) if err != nil { @@ -185,6 +187,8 @@ func (s *Server) Connected() bool { func (s *Server) Stop(_ context.Context) error { // todo there needs to be a way to stop the server s.config.Services.Log.Info("stopping P2P service") + s.quitPeerDiscoveryChannel <- true + s.quitAlertProcessingChannel <- true return nil }