Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RLink fixes and enhancements #2095

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions crossbar/router/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def detach(self, session):

for subscription in self._session_to_subscriptions[session]:

was_subscribed, was_last_subscriber = self._subscription_map.drop_observer(session, subscription)
was_subscribed, was_last_subscriber, was_last_local_subscriber = self._subscription_map.drop_observer(session, subscription)
was_deleted = False

# delete it if there are no subscribers and no retained events
Expand All @@ -133,6 +133,10 @@ def detach(self, session):
was_deleted = True
self._subscription_map.delete_observation(subscription)

is_rlink_session = (session._authrole == 'rlink')

exclude_authid = session._authid

# publish WAMP meta events, if we have a service session, but
# not for the meta API itself!
#
Expand All @@ -143,10 +147,16 @@ def detach(self, session):
def _publish(subscription):
service_session = self._router._realm.session

# FIXME: what about exclude_authid as colleced from forward_for? like we do elsewhere in this file!
options = types.PublishOptions(correlation_id=None,
correlation_is_anchor=True,
correlation_is_last=False)
# FIXME: what about exclude_authid as collected from forward_for? like we do elsewhere in this file!
options = types.PublishOptions(
correlation_id=None,
correlation_is_anchor=True,
correlation_is_last=False,
exclude_authid=exclude_authid,
exclude_authrole=['rlink'] if is_rlink_session else None,
eligible_authrole=['rlink'] if was_last_local_subscriber and
not was_last_subscriber else None,
)

if was_subscribed:
service_session.publish(
Expand All @@ -156,7 +166,7 @@ def _publish(subscription):
options=options,
)

if was_deleted:
if was_deleted or was_last_local_subscriber:
options.correlation_is_last = True
service_session.publish(
'wamp.subscription.on_delete',
Expand Down Expand Up @@ -830,12 +840,15 @@ def on_authorize_success(authorization):

# ok, session authorized to subscribe. now get the subscription
#
subscription, was_already_subscribed, is_first_subscriber = self._subscription_map.add_observer(
session, subscribe.topic, subscribe.match, extra=SubscriptionExtra())
subscription, was_already_subscribed, is_first_subscriber, is_first_local_subscriber \
= self._subscription_map.add_observer(session, subscribe.topic,
subscribe.match, extra=SubscriptionExtra())

if not was_already_subscribed:
self._session_to_subscriptions[session].add(subscription)

is_rlink_session = (session._authrole == 'rlink')

# publish WAMP meta events, if we have a service session, but
# not for the meta API itself!
#
Expand All @@ -853,17 +866,20 @@ def on_authorize_success(authorization):
def _publish():
service_session = self._router._realm.session

if exclude_authid or self._router.is_traced:
if exclude_authid or self._router.is_traced or \
is_first_local_subscriber or is_rlink_session:
options = types.PublishOptions(
correlation_id=subscribe.correlation_id,
correlation_is_anchor=False,
correlation_is_last=False,
exclude_authid=exclude_authid,
exclude_authrole=['rlink'] if is_rlink_session else None,
eligible_authrole=['rlink'] if is_first_local_subscriber and not is_first_subscriber else None,
)
else:
options = None

if is_first_subscriber:
if is_first_subscriber or is_first_local_subscriber:
subscription_details = {
'id': subscription.id,
'created': subscription.created,
Expand Down Expand Up @@ -1037,7 +1053,7 @@ def _unsubscribe(self, subscription, session, unsubscribe=None):

# drop session from subscription observers
#
was_subscribed, was_last_subscriber = self._subscription_map.drop_observer(session, subscription)
was_subscribed, was_last_subscriber, was_last_local_subscriber = self._subscription_map.drop_observer(session, subscription)
was_deleted = False

if was_subscribed and was_last_subscriber and not subscription.extra.retained_events:
Expand All @@ -1049,6 +1065,8 @@ def _unsubscribe(self, subscription, session, unsubscribe=None):
if was_subscribed:
self._session_to_subscriptions[session].discard(subscription)

is_rlink_session = (session._authrole == 'rlink')

# publish WAMP meta events, if we have a service session, but
# not for the meta API itself!
#
Expand All @@ -1072,6 +1090,8 @@ def _publish():
correlation_is_anchor=False,
correlation_is_last=False,
exclude_authid=exclude_authid,
exclude_authrole=['rlink'] if is_rlink_session else None,
eligible_authrole=['rlink'] if was_last_local_subscriber and not was_last_subscriber else None,
)
else:
options = None
Expand All @@ -1084,7 +1104,7 @@ def _publish():
options=options,
)

if was_deleted:
if was_deleted or was_last_local_subscriber:
if options:
options.correlation_is_last = True

Expand Down
63 changes: 37 additions & 26 deletions crossbar/router/dealer.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def detach(self, session):
invoke.caller._transport.send(reply)

for registration in self._session_to_registrations[session]:
was_registered, was_last_callee = self._registration_map.drop_observer(session, registration)
was_registered, was_last_callee, was_last_local_callee = self._registration_map.drop_observer(session, registration)

if was_registered and was_last_callee:
self._registration_map.delete_observation(registration)
Expand All @@ -250,7 +250,12 @@ def _publish(registration):
service_session = self._router._realm.session

# FIXME: what about exclude_authid as collected from forward_for? like we do elsewhere in this file!
options = types.PublishOptions(correlation_id=None)
options = types.PublishOptions(
correlation_id=None,
exclude_authrole=['rlink'] if is_rlink_session else None,
eligible_authrole=['rlink'] if was_last_local_callee and
not was_last_callee else None,
)

if was_registered:
service_session.publish(
Expand All @@ -260,14 +265,13 @@ def _publish(registration):
options=options,
)

if was_last_callee:
if not is_rlink_session:
service_session.publish(
'wamp.registration.on_delete',
session._session_id,
registration.id,
options=options,
)
if was_last_callee or was_last_local_callee:
service_session.publish(
'wamp.registration.on_delete',
session._session_id,
registration.id,
options=options,
)

# we postpone actual sending of meta events until we return to this client session
self._reactor.callLater(0, _publish, registration)
Expand All @@ -281,7 +285,7 @@ def processRegister(self, session, register):
"""
Implements :func:`crossbar.router.interfaces.IDealer.processRegister`
"""
# check topic URI: for SUBSCRIBE, must be valid URI (either strict or loose), and all
# check topic URI: for REGISTER, must be valid URI (either strict or loose), and all
# URI components must be non-empty other than for wildcard subscriptions
#
is_rlink_session = (session._authrole == "rlink")
Expand Down Expand Up @@ -437,6 +441,7 @@ def on_authorize_success(authorization):
if authorization['allow']:
registration = self._registration_map.get_observation(register.procedure, register.match)
if register.force_reregister and registration:
# TODO handle Unregistered in RLink
for obs in registration.observers:
self._registration_map.drop_observer(obs, registration)
kicked = message.Unregistered(
Expand All @@ -455,7 +460,7 @@ def on_authorize_success(authorization):
#
registration_extra = RegistrationExtra(register.invoke)
registration_callee_extra = RegistrationCalleeExtra(register.concurrency)
registration, was_already_registered, is_first_callee = self._registration_map.add_observer(
registration, was_already_registered, is_first_callee, is_first_local_callee = self._registration_map.add_observer(
session, register.procedure, register.match, registration_extra, registration_callee_extra)

if not was_already_registered:
Expand Down Expand Up @@ -489,29 +494,34 @@ def on_authorize_success(authorization):
def _publish():
service_session = self._router._realm.session

if exclude_authid or self._router.is_traced:
if exclude_authid or self._router.is_traced or \
is_rlink_session or is_first_local_callee:
options = types.PublishOptions(
correlation_id=register.correlation_id,
correlation_is_anchor=False,
correlation_is_last=False,
exclude_authid=exclude_authid,
exclude_authrole=['rlink'] if is_rlink_session else None,
eligible_authrole=['rlink'] if is_first_local_callee and
not is_first_callee else None,

)
else:
options = None

if is_first_callee:
if is_first_callee or is_first_local_callee:
registration_details = {
'id': registration.id,
'created': registration.created,
'uri': registration.uri,
'match': registration.match,
'invoke': registration.extra.invoke,
'forced_reregister': register.force_reregister
}
if not is_rlink_session:
service_session.publish('wamp.registration.on_create',
session._session_id,
registration_details,
options=options)
service_session.publish('wamp.registration.on_create',
session._session_id,
registration_details,
options=options)

if not was_already_registered:
if options:
Expand Down Expand Up @@ -612,7 +622,7 @@ def _unregister(self, registration, session, unregister=None):

# drop session from registration observers
#
was_registered, was_last_callee = self._registration_map.drop_observer(session, registration)
was_registered, was_last_callee, was_last_local_callee = self._registration_map.drop_observer(session, registration)
was_deleted = False
is_rlink_session = (session._authrole == "rlink")

Expand Down Expand Up @@ -652,6 +662,8 @@ def _publish():
correlation_is_anchor=False,
correlation_is_last=False,
exclude_authid=exclude_authid,
exclude_authrole=['rlink'] if is_rlink_session else None,
eligible_authrole=['rlink'] if was_last_local_callee and not was_last_callee else None,
)
else:
options = None
Expand All @@ -662,15 +674,14 @@ def _publish():
registration.id,
options=options)

if was_deleted:
if was_deleted or was_last_local_callee:
if options:
options.correlation_is_last = True

if not is_rlink_session:
service_session.publish('wamp.registration.on_delete',
session._session_id,
registration.id,
options=options)
service_session.publish('wamp.registration.on_delete',
session._session_id,
registration.id,
options=options)

# we postpone actual sending of meta events until we return to this client session
self._reactor.callLater(0, _publish)
Expand Down
22 changes: 19 additions & 3 deletions crossbar/router/observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def add_observer(self, observer, uri, match="exact", extra=None, observer_extra=
raise Exception("'uri' should be unicode, not {}".format(type(uri).__name__))

is_first_observer = False
is_first_local_observer = False

if match == "exact":

Expand Down Expand Up @@ -218,6 +219,13 @@ def add_observer(self, observer, uri, match="exact", extra=None, observer_extra=
else:
raise Exception("invalid match strategy '{}'".format(match))

is_rlink_observer = observer.authrole == 'rlink'
if is_first_observer:
is_first_local_observer = not is_rlink_observer
else:
is_first_local_observer = not is_rlink_observer and \
next(filter(lambda o: o.authrole != 'rlink', observation.observers), None) is None

# add observer if not already in observation
#
if observer not in observation.observers:
Expand All @@ -232,7 +240,7 @@ def add_observer(self, observer, uri, match="exact", extra=None, observer_extra=
else:
was_already_observed = True

return observation, was_already_observed, is_first_observer
return observation, was_already_observed, is_first_observer, is_first_local_observer

def get_observation(self, uri, match="exact"):
"""
Expand Down Expand Up @@ -383,10 +391,13 @@ def drop_observer(self, observer, observation):
:rtype: tuple
"""
was_last_observer = False
was_last_local_observer = False

if observer in observation.observers:
was_observed = True

is_rlink_observer = observer.authrole == 'rlink'

# remove observer from observation
#
observation.observers.discard(observer)
Expand All @@ -400,12 +411,17 @@ def drop_observer(self, observer, observation):
#
if not observation.observers:
was_last_observer = True

was_last_local_observer = True
else:
was_last_observer = False
was_last_local_observer = not is_rlink_observer and \
next(filter(lambda o: o.authrole != 'rlink', observation.observers),
None) is None
else:
# observer wasn't on this observation
was_observed = False

return was_observed, was_last_observer
return was_observed, was_last_observer, was_last_local_observer

def delete_observation(self, observation):
"""
Expand Down
7 changes: 4 additions & 3 deletions crossbar/router/realmstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,10 @@ def attach_subscription_map(self, subscription_map: UriObservationMap):
for sub in self._config.get('event-history', []):
uri = sub['uri']
match = sub.get('match', 'exact')
observation, was_already_observed, was_first_observer = subscription_map.add_observer(self,
uri=uri,
match=match)
observation, was_already_observed, was_first_observer, was_first_local_observer = \
subscription_map.add_observer(self,
uri=uri,
match=match)
subscription_id = observation.id

# for in-memory history, we just use a double-ended queue
Expand Down
Loading