diff --git a/protocol/activity_center.go b/protocol/activity_center.go index 6c7d32600c1..0ed9fe1e07e 100644 --- a/protocol/activity_center.go +++ b/protocol/activity_center.go @@ -42,6 +42,10 @@ const ( ActivityCenterNotificationTypeCommunityUnbanned ActivityCenterNotificationTypeNewInstallationReceived ActivityCenterNotificationTypeNewInstallationCreated + ActivityCenterNotificationTypeBackupSyncingFetching + ActivityCenterNotificationTypeBackupSyncingSuccess + ActivityCenterNotificationTypeBackupSyncingPartialFailure + ActivityCenterNotificationTypeBackupSyncingFailure ) type ActivityCenterMembershipStatus int diff --git a/protocol/messenger.go b/protocol/messenger.go index 2402b0c6ead..48796a4e1d4 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -59,6 +59,7 @@ import ( "github.com/status-im/status-go/protocol/transport" v1protocol "github.com/status-im/status-go/protocol/v1" "github.com/status-im/status-go/protocol/verification" + "github.com/status-im/status-go/protocol/wakusync" "github.com/status-im/status-go/server" "github.com/status-im/status-go/services/browsers" ensservice "github.com/status-im/status-go/services/ens" @@ -84,6 +85,8 @@ const ( privateChat ChatContext = "private-chat" ) +const backupSyncingNotificationID = "BACKUP_SYNCING" + // errors var ( ErrChatNotFoundError = errors.New("Chat not found") @@ -195,6 +198,10 @@ type Messenger struct { peersyncingRequests map[string]uint64 mvdsStatusChangeEvent chan datasyncnode.PeerStatusChangeEvent + + fetchingBackedUpDataProgress map[string]wakusync.FetchingBackedUpDataTracking + lastKnownBackedUpMsgClock uint64 + fetchingBackedUpDataCompleted bool } type EnvelopeEventsInterceptor struct { @@ -917,9 +924,68 @@ func (m *Messenger) Start() (*MessengerResponse, error) { } } + if m.processBackedupMessages { + err = m.startBackupFetchingTracking(response) + if err != nil { + return nil, err + } + } + return response, nil } +func (m *Messenger) startBackupFetchingTracking(response *MessengerResponse) error { + // Add an acivity center notification to show that we are fetching back up messages + notification := &ActivityCenterNotification{ + ID: types.FromHex(backupSyncingNotificationID), // TODO put this in a constant + Type: ActivityCenterNotificationTypeBackupSyncingFetching, + Timestamp: m.getTimesource().GetCurrentTime(), + Read: false, + Deleted: false, + UpdatedAt: m.GetCurrentTimeInMillis(), + } + err := m.addActivityCenterNotification(response, notification, nil) + + if err != nil { + return err + } + + // Add a timeout to mark the backup syncing as failed after 1 minute and 30 seconds + time.AfterFunc(1*time.Minute+30*time.Second, func() { + if m.fetchingBackedUpDataCompleted { + // Nothing to do, the fetching has completed successfully + return + } + // Update the AC notification to the failure state + hexBytesIds := []types.HexBytes{} + hexBytesIds = append(hexBytesIds, types.FromHex(backupSyncingNotificationID)) + notifications, err := m.persistence.GetActivityCenterNotificationsByID(hexBytesIds) + if err != nil { + m.logger.Error("failed to get activity center notification", zap.Error(err)) + } else if len(notifications) == 1 { + notification := notifications[0] + notification.UpdatedAt = m.GetCurrentTimeInMillis() + if m.fetchingBackedUpDataProgress == nil || len(m.fetchingBackedUpDataProgress) == 0 { + notification.Type = ActivityCenterNotificationTypeBackupSyncingFailure + } else { + notification.Type = ActivityCenterNotificationTypeBackupSyncingPartialFailure + } + _, err = m.persistence.SaveActivityCenterNotification(notification, true) + if err != nil { + m.logger.Error("failed to save activity center notification", zap.Error(err)) + } else { + if m.config.messengerSignalsHandler != nil { + resp := &MessengerResponse{} + resp.AddActivityCenterNotification(notification) + m.config.messengerSignalsHandler.MessengerResponse(resp) + } + } + } + }) + + return nil +} + func (m *Messenger) startHistoryArchivesImportLoop() { defer gocommon.LogOnPanic() joinedCommunities, err := m.communitiesManager.Joined() diff --git a/protocol/messenger_backup_handler.go b/protocol/messenger_backup_handler.go index 009e6d83d0c..211146615fa 100644 --- a/protocol/messenger_backup_handler.go +++ b/protocol/messenger_backup_handler.go @@ -8,6 +8,7 @@ import ( "github.com/golang/protobuf/proto" utils "github.com/status-im/status-go/common" + "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/images" "github.com/status-im/status-go/multiaccounts/errors" "github.com/status-im/status-go/multiaccounts/settings" @@ -96,6 +97,64 @@ func (m *Messenger) handleBackup(state *ReceivedMessageState, message *protobuf. response.AddFetchingBackedUpDataDetails(SyncWakuSectionKeyKeypairs, message.KeypairDetails) response.AddFetchingBackedUpDataDetails(SyncWakuSectionKeyWatchOnlyAccounts, message.WatchOnlyAccountDetails) + if !m.fetchingBackedUpDataCompleted { + evaluate := true + if m.lastKnownBackedUpMsgClock > message.Clock { + evaluate = false + } else if m.lastKnownBackedUpMsgClock < message.Clock { + // Reset the progress tracker because we have access to a more recent copy of the backup + m.lastKnownBackedUpMsgClock = message.Clock + m.fetchingBackedUpDataProgress = make(map[string]wakusync.FetchingBackedUpDataTracking) + for backupName, details := range response.FetchingBackedUpDataDetails() { + m.fetchingBackedUpDataProgress[backupName] = wakusync.FetchingBackedUpDataTracking{ + LoadedItems: make(map[uint32]bool), + TotalNumber: details.TotalNumber, + } + } + if len(m.fetchingBackedUpDataProgress) == 0 { + evaluate = false + } + } + + // Evaluate the progress of the backup + if evaluate { + // Set the new items before evaluating + for backupName, details := range response.FetchingBackedUpDataDetails() { + m.fetchingBackedUpDataProgress[backupName].LoadedItems[details.DataNumber] = true + } + + receivedEverything := true + for _, tracker := range m.fetchingBackedUpDataProgress { + if len(tracker.LoadedItems)-1 < int(tracker.TotalNumber) { + receivedEverything = false + break + } + } + + if receivedEverything { + m.fetchingBackedUpDataCompleted = true + + // Update the AC notification and add it to the response + hexBytesIds := []types.HexBytes{} + hexBytesIds = append(hexBytesIds, types.FromHex(backupSyncingNotificationID)) + notifications, err := m.persistence.GetActivityCenterNotificationsByID(hexBytesIds) + if err != nil { + errors = append(errors, err) + } else if len(notifications) == 1 { + notification := notifications[0] + notification.UpdatedAt = m.GetCurrentTimeInMillis() + notification.Type = ActivityCenterNotificationTypeBackupSyncingSuccess + _, err = m.persistence.SaveActivityCenterNotification(notification, true) + if err != nil { + errors = append(errors, err) + } else { + state.Response.AddActivityCenterNotification(notification) + } + } + } + } + } + m.config.messengerSignalsHandler.SendWakuFetchingBackupProgress(&response) } diff --git a/protocol/messenger_backup_test.go b/protocol/messenger_backup_test.go index 5139a2c525d..46038dcfd34 100644 --- a/protocol/messenger_backup_test.go +++ b/protocol/messenger_backup_test.go @@ -260,6 +260,159 @@ func (s *MessengerBackupSuite) TestBackupProfileWithInvalidDisplayName() { s.Require().Equal("", storedBob1DisplayName) } +func (s *MessengerBackupSuite) TestFetchingDuringBackup() { + bob1 := s.m + bob1.config.messengerSignalsHandler = &MessengerSignalsHandlerMock{} + + state := ReceivedMessageState{ + Response: &MessengerResponse{}, + } + + backup := &protobuf.Backup{ + Clock: 1, + ContactsDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(1), + }, + CommunitiesDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(1), + }, + ProfileDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(1), + }, + } + + err := bob1.HandleBackup( + &state, + backup, + &v1protocol.StatusMessage{}, + ) + s.Require().NoError(err) + // The backup is not done, so no signal should be sent + s.Require().Len(state.Response.ActivityCenterNotifications(), 0) + s.Require().Len(bob1.fetchingBackedUpDataProgress, 3) + s.Require().Equal(uint32(1), bob1.fetchingBackedUpDataProgress[SyncWakuSectionKeyContacts].TotalNumber) + + // Parse a backup with a higher clock so reset the fetching + backup = &protobuf.Backup{ + Clock: 2, + ContactsDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(2), + }, + CommunitiesDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(1), + }, + ProfileDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(1), + }, + SettingsDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(1), + }, + KeypairDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(1), + }, + WatchOnlyAccountDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(1), + }, + } + err = bob1.HandleBackup( + &state, + backup, + &v1protocol.StatusMessage{}, + ) + s.Require().NoError(err) + // The backup is not done, so no signal should be sent + s.Require().Len(state.Response.ActivityCenterNotifications(), 0) + s.Require().Len(bob1.fetchingBackedUpDataProgress, 6) + s.Require().Equal(uint32(2), bob1.fetchingBackedUpDataProgress[SyncWakuSectionKeyContacts].TotalNumber) + + // Backup with a smaller clock is ignored + backup = &protobuf.Backup{ + Clock: 2, + ContactsDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(5), + }, + CommunitiesDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(1), + }, + } + err = bob1.HandleBackup( + &state, + backup, + &v1protocol.StatusMessage{}, + ) + s.Require().NoError(err) + // The backup is not done, so no signal should be sent + s.Require().Len(state.Response.ActivityCenterNotifications(), 0) + // The values are gonna be the same as before as the backup was ignored + s.Require().Len(bob1.fetchingBackedUpDataProgress, 6) + s.Require().Equal(uint32(2), bob1.fetchingBackedUpDataProgress[SyncWakuSectionKeyContacts].TotalNumber) + + // Parse the backup with almost all the correct data numbers + backup = &protobuf.Backup{ + Clock: 2, + ContactsDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(1), + TotalNumber: uint32(2), + }, + CommunitiesDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(1), + TotalNumber: uint32(1), + }, + ProfileDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(1), + TotalNumber: uint32(1), + }, + SettingsDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(1), + TotalNumber: uint32(1), + }, + KeypairDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(1), + TotalNumber: uint32(1), + }, + WatchOnlyAccountDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(1), + TotalNumber: uint32(1), + }, + } + err = bob1.HandleBackup( + &state, + backup, + &v1protocol.StatusMessage{}, + ) + s.Require().NoError(err) + // The backup is not done, so no signal should be sent + s.Require().Len(state.Response.ActivityCenterNotifications(), 0) + + // Parse the remaining backup so the notification should be sent now + backup = &protobuf.Backup{ + Clock: 2, + ContactsDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(2), + TotalNumber: uint32(2), + }, + } + err = bob1.HandleBackup( + &state, + backup, + &v1protocol.StatusMessage{}, + ) + s.Require().NoError(err) + // The backup is done, so the signal should be sent + s.Require().Len(state.Response.ActivityCenterNotifications(), 1) +} + func (s *MessengerBackupSuite) TestBackupSettings() { s.T().Skip("flaky test") const ( diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index 1679baaabd9..42d8854d3f0 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -322,7 +322,13 @@ func (m *Messenger) RequestAllHistoricMessages(forceFetchingBackup, withRetries return nil, nil } + allResponses := &MessengerResponse{} if forceFetchingBackup || !backupFetched { + err = m.startBackupFetchingTracking(allResponses) + if err != nil { + return nil, err + } + m.logger.Info("fetching backup") err := m.syncBackup() if err != nil { @@ -336,7 +342,6 @@ func (m *Messenger) RequestAllHistoricMessages(forceFetchingBackup, withRetries defer m.resetFiltersPriority(filters) filtersByMs := m.SplitFiltersByStoreNode(filters) - allResponses := &MessengerResponse{} for communityID, filtersForMs := range filtersByMs { peerID := m.getCommunityStorenode(communityID) if withRetries { diff --git a/protocol/messenger_testing_utils.go b/protocol/messenger_testing_utils.go index 96d2f0d3a17..246725955c9 100644 --- a/protocol/messenger_testing_utils.go +++ b/protocol/messenger_testing_utils.go @@ -63,7 +63,10 @@ type MessengerSignalsHandlerMock struct { } func (m *MessengerSignalsHandlerMock) SendWakuFetchingBackupProgress(response *wakusync.WakuBackedUpDataResponse) { - m.wakuBackedUpDataResponseChan <- response + select { + case m.wakuBackedUpDataResponseChan <- response: + default: + } } func (m *MessengerSignalsHandlerMock) SendWakuBackedUpProfile(*wakusync.WakuBackedUpDataResponse) {} func (m *MessengerSignalsHandlerMock) SendWakuBackedUpSettings(*wakusync.WakuBackedUpDataResponse) {} diff --git a/protocol/wakusync/progress_response.go b/protocol/wakusync/progress_response.go index b15846c18b2..c4ce596a9b7 100644 --- a/protocol/wakusync/progress_response.go +++ b/protocol/wakusync/progress_response.go @@ -9,6 +9,11 @@ type FetchingBackupedDataDetails struct { TotalNumber uint32 `json:"totalNumber,omitempty"` } +type FetchingBackedUpDataTracking struct { + LoadedItems map[uint32]bool + TotalNumber uint32 +} + func (sfwr *WakuBackedUpDataResponse) AddFetchingBackedUpDataDetails(section string, details *protobuf.FetchingBackedUpDataDetails) { if details == nil { return