Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Support MSC3814: Dehydrated Devices #15929

Merged
merged 15 commits into from
Jul 24, 2023
3 changes: 3 additions & 0 deletions changelog.d/15929.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Implement [MSC3814](https://github.com/matrix-org/matrix-spec-proposals/pull/3814),
dehydrated devices v2/shrivelled sessions and move [MSC2697](https://github.com/matrix-org/matrix-spec-proposals/pull/2697)
behind a config flag. Contributed by Nico from Famedly and H-Shay.
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
12 changes: 12 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,18 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
# MSC3026 (busy presence state)
self.msc3026_enabled: bool = experimental.get("msc3026_enabled", False)

# MSC2697 (device dehydration)
# Enabled by default since this option was added after adding the feature.
# It is not recommended that both MSC2697 and MSC3814 both be enabled at
# once.
self.msc2697_enabled: bool = experimental.get("msc2697_enabled", True)

# MSC3814 (dehydrated devices with SSSS)
# This is an alternative method to achieve the same goals as MSC2697.
# It is not recommended that both MSC2697 and MSC3814 both be enabled at
# once.
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
self.msc3814_enabled: bool = experimental.get("msc3814_enabled", False)

# MSC3244 (room version capabilities)
self.msc3244_enabled: bool = experimental.get("msc3244_enabled", True)

Expand Down
98 changes: 96 additions & 2 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
# limitations under the License.

import logging
from typing import TYPE_CHECKING, Any, Dict
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Dict, Optional

from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes
from synapse.api.errors import SynapseError
from synapse.api.errors import Codes, SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.logging.context import run_in_background
from synapse.logging.opentracing import (
Expand Down Expand Up @@ -48,6 +49,9 @@ def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.notifier = hs.get_notifier()
self.is_mine = hs.is_mine
if hs.config.experimental.msc3814_enabled:
self.event_sources = hs.get_event_sources()
self.device_handler = hs.get_device_handler()

# We only need to poke the federation sender explicitly if its on the
# same instance. Other federation sender instances will get notified by
Expand Down Expand Up @@ -303,3 +307,93 @@ async def send_device_message(
# Enqueue a new federation transaction to send the new
# device messages to each remote destination.
self.federation_sender.send_device_messages(destination)

async def get_events_for_dehydrated_device(
self,
requester: Requester,
device_id: str,
since_token: Optional[str],
limit: int,
) -> JsonDict:
"""Fetches up to `limit` events sent to `device_id` starting from `since_token`
and returns the new since token. If there are no more messages, returns an empty
array and deletes the dehydrated device associated with the user/device_id.

Args:
requester: the user requesting the messages
device_id: ID of the dehydrated device
since_token: stream id to start from when fetching messages
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a /sync next_batch stream token or a stream id? (I'd expect stream ids to be integers)

limit: the number of messages to fetch
"""
H-Shay marked this conversation as resolved.
Show resolved Hide resolved

user_id = requester.user.to_string()

# only allow fetching messages for the dehydrated device id currently associated
# with the user
dehydrated_device = await self.device_handler.get_dehydrated_device(user_id)
if dehydrated_device is None or device_id != dehydrated_device[0]:
raise SynapseError(
HTTPStatus.FORBIDDEN,
"You may only fetch messages for your dehydrated device",
Codes.FORBIDDEN,
)
H-Shay marked this conversation as resolved.
Show resolved Hide resolved

since_stream_id = 0
if since_token:
if not since_token.startswith("d"):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"from parameter %r has an invalid format" % (since_token,),
errcode=Codes.INVALID_PARAM,
)

try:
since_stream_id = int(since_token[1:])
except Exception:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"from parameter %r has an invalid format" % (since_token,),
errcode=Codes.INVALID_PARAM,
)
H-Shay marked this conversation as resolved.
Show resolved Hide resolved

# if we have a since token, delete any to-device messages before that token
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned already, I think we should not delete any delivered to-device messages for dehydrated devices.

We can do this in a later PR, but I think that this will be crucial to ease the resumption of rehydration and ensure that room keys don't get lost because a device aborted the rehydration step.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can open a follow-up PR to stop deleting the delivered to-device messages and address the TODOs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The follow-up PR is here: #16010

# (since we now know that the device has received them)
deleted = await self.store.delete_messages_for_device(
user_id, device_id, since_stream_id
)
logger.debug(
"Deleted %d to-device messages up to %d", deleted, since_stream_id
)
H-Shay marked this conversation as resolved.
Show resolved Hide resolved

to_token = self.event_sources.get_current_token().to_device_key

messages, stream_id = await self.store.get_messages_for_device(
user_id, device_id, since_stream_id, to_token, limit
)

for message in messages:
# We pop here as we shouldn't be sending the message ID down
# `/sync`
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
message_id = message.pop("message_id", None)
if message_id:
set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id)

logger.debug(
"Returning %d to-device messages between %d and %d (current token: %d) for dehydrated device %s",
len(messages),
since_stream_id,
stream_id,
to_token,
device_id,
)
H-Shay marked this conversation as resolved.
Show resolved Hide resolved

if messages == []:
# we've fetched all the messages, delete the dehydrated device
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not align with the MSC, the MSC tells us this:

Once a client calls POST /dehydrated_device/{device_id}/events, the server can delete the device (though not necessarily its to-device messages).

So, per MSC, we should delete the device the first time the client makes a request to this endpoint, no matter how many messages we may have.

That being said, I think that we should leave the deletion of the device up to the client, see my rationale here: matrix-org/matrix-spec-proposals#3814 (comment).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is one of the things I asked for clarification about on the MSC, see matrix-org/matrix-spec-proposals#3814 (comment)

await self.store.remove_dehydrated_device(
requester.user.to_string(), device_id
)

return {
"events": messages,
"next_batch": f"d{stream_id}",
}
60 changes: 55 additions & 5 deletions synapse/rest/client/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
from synapse.http.servlet import (
RestServlet,
parse_and_validate_json_object_from_request,
parse_integer,
)
from synapse.http.site import SynapseRequest
from synapse.rest.client._base import client_patterns, interactive_auth_handler
from synapse.rest.client.models import AuthenticationData
from synapse.rest.models import RequestBodyModel
from synapse.types import JsonDict
from synapse.util.cancellation import cancellable

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -229,6 +231,8 @@ class Config:
class DehydratedDeviceServlet(RestServlet):
"""Retrieve or store a dehydrated device.

Implements both MSC2697 and MSC3814.
H-Shay marked this conversation as resolved.
Show resolved Hide resolved

GET /org.matrix.msc2697.v2/dehydrated_device

HTTP/1.1 200 OK
Expand Down Expand Up @@ -261,16 +265,21 @@ class DehydratedDeviceServlet(RestServlet):

"""

PATTERNS = client_patterns("/org.matrix.msc2697.v2/dehydrated_device$", releases=())

def __init__(self, hs: "HomeServer"):
def __init__(self, hs: "HomeServer", msc2697: bool = True):
super().__init__()
self.hs = hs
self.auth = hs.get_auth()
handler = hs.get_device_handler()
assert isinstance(handler, DeviceHandler)
self.device_handler = handler

self.PATTERNS = client_patterns(
"/org.matrix.msc2697.v2/dehydrated_device$"
if msc2697
else "/org.matrix.msc3814.v1/dehydrated_device$",
releases=(),
)

async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
dehydrated_device = await self.device_handler.get_dehydrated_device(
Expand Down Expand Up @@ -347,14 +356,55 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
return 200, result


class DehydratedDeviceEventsServlet(RestServlet):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for multiple clients to be hitting this endpoint at once? What solves that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be something like a scenario where (let's say) two devices have called GET /dehydrated_device and received the dehydrated device id, and are now both calling this endpoint with the id?
I don't think there is anything currently preventing this - iirc from the MSC proposal, this sort of scenario was the rationale for deleting the dehydrated device the first time this endpoint is called, as that functions as "claiming" the dehydrated device so it can't be used by other clients. This behavior was contentious on the MSC though - I implemented deleting the device after all the messages are delivered, but that won't help in the case where two devices are calling this endpoint at the same time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah probably a conversation for the MSC, but seems quite likely now.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what will help, and will be necessary for perfect resumption, is that we shouldn't delete the events untill the device is fully deleted.

This way any device can attempt to rehydrated the device at any point in time, even if another one is already rehydrating it, until one device succeeds and deletes the dehydrated device.

PATTERNS = client_patterns(
"/org.matrix.msc3814.v1/dehydrated_device/(?P<device_id>[^/]*)/events$",
releases=(),
)

def __init__(self, hs: "HomeServer"):
super().__init__()
self.message_handler = hs.get_device_message_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastores().main

class PostBody(RequestBodyModel):
next_batch: Optional[StrictStr]

@cancellable
clokep marked this conversation as resolved.
Show resolved Hide resolved
async def on_POST(
self, request: SynapseRequest, device_id: str
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)

next_batch = parse_and_validate_json_object_from_request(
request, self.PostBody
).next_batch
limit = parse_integer(request, "limit", 100)

msgs = await self.message_handler.get_events_for_dehydrated_device(
requester=requester,
device_id=device_id,
since_token=next_batch,
limit=limit,
)

return 200, msgs


def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
if (
hs.config.worker.worker_app is None
and not hs.config.experimental.msc3861.enabled
):
DeleteDevicesRestServlet(hs).register(http_server)
DevicesRestServlet(hs).register(http_server)

if hs.config.worker.worker_app is None:
DeviceRestServlet(hs).register(http_server)
DehydratedDeviceServlet(hs).register(http_server)
ClaimDehydratedDeviceServlet(hs).register(http_server)
if hs.config.experimental.msc2697_enabled:
DehydratedDeviceServlet(hs, msc2697=True).register(http_server)
ClaimDehydratedDeviceServlet(hs).register(http_server)
if hs.config.experimental.msc3814_enabled:
DehydratedDeviceServlet(hs, msc2697=False).register(http_server)
DehydratedDeviceEventsServlet(hs).register(http_server)
114 changes: 113 additions & 1 deletion tests/handlers/test_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@
from typing import Optional
from unittest import mock

from twisted.internet.defer import ensureDeferred
from twisted.test.proto_helpers import MemoryReactor

from synapse.api.constants import RoomEncryptionAlgorithms
from synapse.api.errors import NotFoundError, SynapseError
from synapse.appservice import ApplicationService
from synapse.handlers.device import MAX_DEVICE_DISPLAY_NAME_LEN, DeviceHandler
from synapse.rest import admin
from synapse.rest.client import devices, login, register
from synapse.server import HomeServer
from synapse.storage.databases.main.appservice import _make_exclusive_regex
from synapse.types import JsonDict
from synapse.types import JsonDict, create_requester
from synapse.util import Clock

from tests import unittest
Expand Down Expand Up @@ -400,11 +403,19 @@ def test_on_federation_query_user_devices_appservice(self) -> None:


class DehydrationTestCase(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets_for_client_rest_resource,
login.register_servlets,
register.register_servlets,
devices.register_servlets,
]

def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
hs = self.setup_test_homeserver("server", federation_http_client=None)
handler = hs.get_device_handler()
assert isinstance(handler, DeviceHandler)
self.handler = handler
self.message_handler = hs.get_device_message_handler()
self.registration = hs.get_registration_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
Expand Down Expand Up @@ -482,3 +493,104 @@ def test_dehydrate_and_rehydrate_device(self) -> None:
ret = self.get_success(self.handler.get_dehydrated_device(user_id=user_id))

self.assertIsNone(ret)

@unittest.override_config(
{"experimental_features": {"msc2697_enabled": False, "msc3814_enabled": True}}
)
def test_dehydrate_v2_and_fetch_events(self) -> None:
user_id = "@boris:server"

self.get_success(self.store.register_user(user_id, "foobar"))

# First check if we can store and fetch a dehydrated device
stored_dehydrated_device_id = self.get_success(
self.handler.store_dehydrated_device(
user_id=user_id,
device_data={"device_data": {"foo": "bar"}},
initial_device_display_name="dehydrated device",
)
)

device_info = self.get_success(
self.handler.get_dehydrated_device(user_id=user_id)
)
assert device_info is not None
retrieved_device_id, device_data = device_info
self.assertEqual(retrieved_device_id, stored_dehydrated_device_id)
self.assertEqual(device_data, {"device_data": {"foo": "bar"}})

# Create a new login for the user
device_id, access_token, _expiration_time, _refresh_token = self.get_success(
self.registration.register_device(
user_id=user_id,
device_id=None,
initial_display_name="new device",
)
)

requester = create_requester(user_id, device_id=device_id)

# Fetching messages for a non existing device should return an error
self.get_failure(
self.message_handler.get_events_for_dehydrated_device(
requester=requester,
device_id="not the right device ID",
since_token=None,
limit=10,
),
SynapseError,
)

# Send a message to the dehydrated device
ensureDeferred(
self.message_handler.send_device_message(
requester=requester,
message_type="test.message",
messages={user_id: {stored_dehydrated_device_id: {"body": "foo"}}},
)
)
self.pump()

# Fetch the message of the dehydrated device
res = self.get_success(
self.message_handler.get_events_for_dehydrated_device(
requester=requester,
device_id=stored_dehydrated_device_id,
since_token=None,
limit=10,
)
)

self.assertTrue(len(res["next_batch"]) > 1)
self.assertEqual(len(res["events"]), 1)
self.assertEqual(res["events"][0]["content"]["body"], "foo")

# Fetch the message of the dehydrated device again, which should return nothing and delete the old messages
res = self.get_success(
self.message_handler.get_events_for_dehydrated_device(
requester=requester,
device_id=stored_dehydrated_device_id,
since_token=res["next_batch"],
limit=10,
)
)
self.assertTrue(len(res["next_batch"]) > 1)
self.assertEqual(len(res["events"]), 0)

# Fetching messages again should fail, since the messages and dehydrated device
# were deleted
self.get_failure(
self.message_handler.get_events_for_dehydrated_device(
requester=requester,
device_id=stored_dehydrated_device_id,
since_token=None,
limit=10,
),
SynapseError,
)

# make sure that the dehydrated device ID is deleted after fetching messages
res2 = self.get_success(
self.handler.get_dehydrated_device(requester.user.to_string()),
)
self.assertEqual(res2, None)
Loading