Skip to content

Commit

Permalink
Add alert processing retry cron
Browse files Browse the repository at this point in the history
  • Loading branch information
galt-tr committed Jan 26, 2024
1 parent fd66602 commit 7e2f56c
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 43 deletions.
44 changes: 24 additions & 20 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,30 @@ var (

// Application configuration constants
var (
ApplicationName = "alert_system" // Application name used in places where we need an application name space
DatabasePrefix = "alert_system" // Default database prefix
DefaultAlertSystemProtocolID = "/bitcoin/alert-system/0.0.1" // Default alert system protocol for libp2p syncing
DefaultTopicName = "alert_system" // Default alert system topic name for libp2p subscription
DefaultServerShutdown = 5 * time.Second // Default server shutdown delay time (to finish any requests or internal processes)
LocalPrivateKeyDefault = "alert_system_private_key" // Default local private key
LocalPrivateKeyDirectory = ".bitcoin" // Default local private key directory
ApplicationName = "alert_system" // Application name used in places where we need an application name space
DatabasePrefix = "alert_system" // Default database prefix
DefaultAlertSystemProtocolID = "/bitcoin/alert-system/0.0.1" // Default alert system protocol for libp2p syncing
DefaultTopicName = "alert_system" // Default alert system topic name for libp2p subscription
DefaultServerShutdown = 5 * time.Second // Default server shutdown delay time (to finish any requests or internal processes)
DefaultPeerDiscoveryInterval = 10 * time.Minute // Default peer discovery refresh interval
DefaultAlertProcessingInterval = 5 * time.Minute // Default alert processing retry interval
LocalPrivateKeyDefault = "alert_system_private_key" // Default local private key
LocalPrivateKeyDirectory = ".bitcoin" // Default local private key directory
)

// The global configuration settings
type (

// Config is the global configuration settings
Config struct {
AlertWebhookURL string `json:"alert_webhook_url" mapstructure:"alert_webhook_url"` // AlertWebhookURL is the URL for the alert webhook
Datastore DatastoreConfig `json:"datastore" mapstructure:"datastore"` // Datastore's configuration
P2P P2PConfig `json:"p2p" mapstructure:"p2p"` // P2P is the configuration for the P2P server
RPCConnections []RPCConfig `json:"rpc_connections" mapstructure:"rpc_connections"` // RPCConnections is a list of RPC connections
RequestLogging bool `json:"request_logging" mapstructure:"request_logging"` // Toggle for verbose request logging (API requests)
Services Services `json:"-" mapstructure:"services"` // Services is the global services
WebServer WebServerConfig `json:"web_server" mapstructure:"web_server"` // WebServer is the configuration for the web HTTP Server
AlertWebhookURL string `json:"alert_webhook_url" mapstructure:"alert_webhook_url"` // AlertWebhookURL is the URL for the alert webhook
Datastore DatastoreConfig `json:"datastore" mapstructure:"datastore"` // Datastore's configuration
P2P P2PConfig `json:"p2p" mapstructure:"p2p"` // P2P is the configuration for the P2P server
RPCConnections []RPCConfig `json:"rpc_connections" mapstructure:"rpc_connections"` // RPCConnections is a list of RPC connections
RequestLogging bool `json:"request_logging" mapstructure:"request_logging"` // Toggle for verbose request logging (API requests)
Services Services `json:"-" mapstructure:"services"` // Services is the global services
WebServer WebServerConfig `json:"web_server" mapstructure:"web_server"` // WebServer is the configuration for the web HTTP Server
AlertProcessingInterval time.Duration `json:"alert_processing_interval" mapstructure:"alert_processing_interval"` // AlertProcessingInterval is the interval in which the system will go through all of the saved alerts and attempt to retry any unprocessed alerts
}

// DatastoreConfig is the configuration for the datastore
Expand Down Expand Up @@ -84,12 +87,13 @@ type (

// P2PConfig is the configuration for the P2P server and connection
P2PConfig struct {
AlertSystemProtocolID string `json:"alert_system_protocol_id" mapstructure:"alert_system_protocol_id"` // AlertSystemProtocolID is the protocol ID to use on the libp2p network for alert system communication
BootstrapPeer string `json:"bootstrap_peer" mapstructure:"bootstrap_peer"` // BootstrapPeer is the bootstrap peer for the libp2p network
IP string `json:"ip" mapstructure:"ip"` // IP is the IP address for the P2P server
Port string `json:"port" mapstructure:"port"` // Port is the port for the P2P server
PrivateKeyPath string `json:"private_key_path" mapstructure:"private_key_path"` // PrivateKeyPath is the path to the private key
TopicName string `json:"topic_name" mapstructure:"topic_name"` // TopicName is the name of the topic to subscribe to
AlertSystemProtocolID string `json:"alert_system_protocol_id" mapstructure:"alert_system_protocol_id"` // AlertSystemProtocolID is the protocol ID to use on the libp2p network for alert system communication
BootstrapPeer string `json:"bootstrap_peer" mapstructure:"bootstrap_peer"` // BootstrapPeer is the bootstrap peer for the libp2p network
IP string `json:"ip" mapstructure:"ip"` // IP is the IP address for the P2P server
Port string `json:"port" mapstructure:"port"` // Port is the port for the P2P server
PrivateKeyPath string `json:"private_key_path" mapstructure:"private_key_path"` // PrivateKeyPath is the path to the private key
TopicName string `json:"topic_name" mapstructure:"topic_name"` // TopicName is the name of the topic to subscribe to
PeerDiscoveryInterval time.Duration `json:"peer_discovery_interval" mapstructure:"peer_discovery_interval"` // PeerDiscoveryInterval is the interval in which we will refresh the peer table and check peers for missing messages
}

// RPCConfig is the configuration for the RPC client
Expand Down
2 changes: 2 additions & 0 deletions app/config/envs/local.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"alert_webhook_url": "",
"request_logging": true,
"alert_processing_interval": "5m",
"web_server": {
"idle_timeout": "60s",
"port": "3000",
Expand Down Expand Up @@ -57,6 +58,7 @@
"alert_system_protocol_id": "/bitcoin/alert-system/0.0.1",
"bootstrap_peer": "",
"private_key_path": "",
"peer_discovery_interval": "10m",
"topic_name": "alert_system"
},
"rpc_connections": [
Expand Down
1 change: 1 addition & 0 deletions app/config/envs/stn.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"alert_webhook_url": "",
"request_logging": true,
"alert_processing_interval": "5m",
"web_server": {
"idle_timeout": "60s",
"port": "3000",
Expand Down
10 changes: 10 additions & 0 deletions app/config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ func requireP2P(_appConfig *Config) error {
}
}

// Load the peer discovery interval
if _appConfig.P2P.PeerDiscoveryInterval <= 0 {
_appConfig.P2P.PeerDiscoveryInterval = DefaultPeerDiscoveryInterval
}

// Load the p2p ip (local, ip address or domain name)
// todo better validation of what is a valid IP, domain name or local address
if len(_appConfig.P2P.IP) < 5 {
Expand Down Expand Up @@ -205,6 +210,11 @@ func LoadConfigFile() (_appConfig *Config, err error) {
Logger: gocore.Log(ApplicationName),
}

// Set default alert processing interval if it doesn't exist
if _appConfig.AlertProcessingInterval <= 0 {
_appConfig.AlertProcessingInterval = DefaultAlertProcessingInterval
}

// Log the configuration that was detected and where it was loaded from
_appConfig.Services.Log.Debug("loaded configuration from: " + viper.ConfigFileUsed())

Expand Down
2 changes: 2 additions & 0 deletions app/config/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func TestLoadConfig_Success(t *testing.T) {
assert.Equal(t, "/path/to/private/key", c.P2P.PrivateKeyPath)
assert.Equal(t, "", c.P2P.BootstrapPeer)
assert.Equal(t, DefaultAlertSystemProtocolID, c.P2P.AlertSystemProtocolID)
assert.Equal(t, DefaultPeerDiscoveryInterval, c.P2P.PeerDiscoveryInterval)
assert.Equal(t, DefaultAlertProcessingInterval, c.AlertProcessingInterval)
assert.Equal(t, "192.168.1.1", c.P2P.IP)
assert.Equal(t, "8000", c.P2P.Port)
assert.Equal(t, "https://webhook.url", c.AlertWebhookURL)
Expand Down
77 changes: 60 additions & 17 deletions app/models/alert_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type AlertMessage struct {
Hash string `json:"hash" toml:"hash" yaml:"hash" bson:"hash" gorm:"<-;type:char(64);index;comment:This is the hash"`
SequenceNumber uint32 `json:"sequence_number" toml:"sequence_number" yaml:"sequence_number" bson:"sequence_number" gorm:"<-;type:int8;index;comment:This is the alert sequence number"`
Raw string `json:"raw" toml:"raw" yaml:"raw" bson:"raw" gorm:"<-;type:text;comment:This is the raw alert message"`
Processed bool `json:"processed" toml:"processed" yaml:"processed" bson:"processed"`
Processed bool `json:"processed" toml:"processed" yaml:"processed" bson:"processed" gorm:"<-;type:boolean;comment:This determine if the alert was processed"`

// Private fields (never to be exported)
alertType AlertType
Expand Down Expand Up @@ -247,13 +247,16 @@ func (m *AlertMessage) Timestamp() uint64 {
return m.timestamp
}

// NewAlertFromBytes creates a new alert from bytes
func NewAlertFromBytes(ak []byte, opts ...model.Options) (*AlertMessage, error) {

// Check if the alert is valid
if len(ak) < 16 {
// ReadRaw sets the model fields based on the raw message
func (m *AlertMessage) ReadRaw() error {
ak, err := hex.DecodeString(m.Raw)
if err != nil {
return err
}
m.SetRawMessage(ak)
if len(m.GetRawMessage()) < 16 {
// todo DETERMINE ACTUAL PROPER LENGTH
return nil, fmt.Errorf("alert needs to be at least 16")
return fmt.Errorf("alert needs to be at least 16 bytes")
}
version := binary.LittleEndian.Uint32(ak[:4])
sequenceNumber := binary.LittleEndian.Uint32(ak[4:8])
Expand All @@ -274,7 +277,7 @@ func NewAlertFromBytes(ak []byte, opts ...model.Options) (*AlertMessage, error)
// but possible. Regardless let's just error out now if this length is lower. At least
// allows us to grab the expected signature.
if len(alertAndSignature) < sigLen+2 {
return nil, fmt.Errorf("alert message is invalid - too short length")
return fmt.Errorf("alert message is invalid - too short length")
}

// Get alert message bytes
Expand All @@ -292,17 +295,26 @@ func NewAlertFromBytes(ak []byte, opts ...model.Options) (*AlertMessage, error)

dataLen := 20 + len(alert)

// Create the new alert
m.SetAlertType(AlertType(alertType))
m.message = alert
m.SequenceNumber = sequenceNumber
m.timestamp = timestamp
m.version = version
m.data = ak[:dataLen]
m.signatures = sigs
_ = m.Serialize()
return nil
}

// NewAlertFromBytes creates a new alert from bytes
func NewAlertFromBytes(ak []byte, opts ...model.Options) (*AlertMessage, error) {
opts = append(opts, model.New())
newAlert := NewAlertMessage(opts...)
newAlert.SetAlertType(AlertType(alertType))
newAlert.message = alert
newAlert.SequenceNumber = sequenceNumber
newAlert.timestamp = timestamp
newAlert.version = version
newAlert.data = ak[:dataLen]
newAlert.signatures = sigs
_ = newAlert.Serialize()
newAlert.SetRawMessage(ak)
err := newAlert.ReadRaw()
if err != nil {
return nil, err
}

// Return alert
return newAlert, nil
Expand Down Expand Up @@ -359,3 +371,34 @@ func GetLatestAlert(ctx context.Context, metadata *model.Metadata, opts ...model
// Return the first item (only item)
return modelItems[0], nil
}

// GetAllUnprocessedAlerts will get all alerts that weren't successfully processed
func GetAllUnprocessedAlerts(ctx context.Context, metadata *model.Metadata, opts ...model.Options) ([]*AlertMessage, error) {

// Set the conditions
conditions := &map[string]interface{}{
utils.FieldDeletedAt: map[string]interface{}{ // IS NULL
utils.ExistsCondition: false,
},
"processed": false,
}

// Set the query params
queryParams := &datastore.QueryParams{
OrderByField: utils.FieldSequenceNumber,
SortDirection: utils.SortAscending,
}

// Get the record
modelItems := make([]*AlertMessage, 0)
if err := model.GetModelsByConditions(
ctx, model.NameAlertMessage, &modelItems, metadata, conditions, queryParams, opts...,
); err != nil {
return nil, err
} else if len(modelItems) == 0 {
return nil, nil
}

// Return the first item (only item)
return modelItems, nil
}
1 change: 1 addition & 0 deletions app/models/genesis_alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func CreateGenesisAlert(ctx context.Context, opts ...model.Options) error {
newAlert.SequenceNumber = 0
newAlert.timestamp = uint64(time.Date(2923, time.November, 1, 1, 1, 1, 1, time.UTC).Unix())
newAlert.version = 1
newAlert.Processed = true

// Serialize the data
newAlert.SerializeData()
Expand Down
76 changes: 70 additions & 6 deletions app/p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (s *Server) Start(ctx context.Context) error {
}

_ = s.RunPeerDiscovery(ctx, routingDiscovery)
_ = s.RunAlertProcessingCron(ctx)
for !s.connected {
time.Sleep(5 * time.Second)
}
Expand Down Expand Up @@ -187,19 +188,82 @@ func (s *Server) Stop(_ context.Context) error {
return nil
}

// RunPeerDiscovery starts a 5 min cron job to resync peers and update routable peers
// RunAlertProcessingCron starts a cron job to attempt to retry unprocessed alerts
func (s *Server) RunAlertProcessingCron(ctx context.Context) chan bool {
ticker := time.NewTicker(s.config.AlertProcessingInterval)
quit := make(chan bool, 1)
go func() {
for {
select {
case <-ticker.C:
err := s.processAlerts(ctx)
if err != nil {
s.config.Services.Log.Errorf("error processing alerts: %v", err.Error())
}
case <-quit:
ticker.Stop()
}
}
}()
return quit
}

// processAlerts performs the alert processing
func (s *Server) processAlerts(ctx context.Context) error {
alerts, err := models.GetAllUnprocessedAlerts(ctx, nil, model.WithAllDependencies(s.config))
if err != nil {
return err
}
s.config.Services.Log.Infof("Attempting to process %d failed alerts", len(alerts))
success := 0
for _, alert := range alerts {
alert.SetOptions(model.WithAllDependencies(s.config))
// Serialize the alert data and hash
err := alert.ReadRaw()
if err != nil {
continue
}
alert.SerializeData()
// Process the alert
ak := alert.ProcessAlertMessage()
if ak == nil {
continue
}
if err = ak.Read(alert.GetRawMessage()); err != nil {
return err
}
s.config.Services.Log.Debugf("attempting to process alert %d of type %d", alert.SequenceNumber, alert.GetAlertType())
alert.Processed = true
if err = ak.Do(ctx); err != nil {
s.config.Services.Log.Errorf("failed to process alert %d; err: %v", alert.SequenceNumber, err.Error())
alert.Processed = false
}

if alert.Processed {
success++
// Save the alert
if err = alert.Save(ctx); err != nil {
return err
}
}
}
s.config.Services.Log.Infof("Processed %d failed alerts", success)
return nil
}

// RunPeerDiscovery starts a cron job to resync peers and update routable peers
func (s *Server) RunPeerDiscovery(ctx context.Context, routingDiscovery *drouting.RoutingDiscovery) chan bool {
ticker := time.NewTicker(5 * time.Minute)
ticker := time.NewTicker(s.config.P2P.PeerDiscoveryInterval)
quit := make(chan bool, 1)
go func() {
err := s.discoverPeers(ctx, s.topicNames, routingDiscovery)
err := s.discoverPeers(ctx, routingDiscovery)
if err != nil {
s.config.Services.Log.Errorf("error discovering peers: %v", err.Error())
}
for {
select {
case <-ticker.C:
err := s.discoverPeers(ctx, s.topicNames, routingDiscovery)
err := s.discoverPeers(ctx, routingDiscovery)
if err != nil {
s.config.Services.Log.Errorf("error discovering peers: %v", err.Error())
}
Expand Down Expand Up @@ -262,13 +326,13 @@ func (s *Server) Topics() map[string]*pubsub.Topic {
}

// discoverPeers will discover peers
func (s *Server) discoverPeers(ctx context.Context, tn []string, routingDiscovery *drouting.RoutingDiscovery) error {
func (s *Server) discoverPeers(ctx context.Context, routingDiscovery *drouting.RoutingDiscovery) error {
s.config.Services.Log.Infof("Running peer discovery at %s", time.Now().String())

// Look for others who have announced and attempt to connect to them
connected := 0
for connected < 2 {
for _, topicName := range tn {
for _, topicName := range s.topicNames {
s.config.Services.Log.Debugf("searching for peers for topic %s..\n", topicName)

var peerChan <-chan peer.AddrInfo
Expand Down
2 changes: 2 additions & 0 deletions app/p2p/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,10 @@ func (s *StreamThread) ProcessGotSequenceNumber(msg *SyncMessage) error {
if err = ak.Read(a.GetRawMessage()); err != nil {
return err
}
a.Processed = true
if err = ak.Do(s.ctx); err != nil {
s.config.Services.Log.Errorf("failed to process alert %d; err: %v", a.SequenceNumber, err.Error())
a.Processed = false
}

// Save the alert
Expand Down

0 comments on commit 7e2f56c

Please sign in to comment.