Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pause/resume feature to MBassador #122

Open
bgroenks96 opened this issue Jul 28, 2015 · 15 comments
Open

Add pause/resume feature to MBassador #122

bgroenks96 opened this issue Jul 28, 2015 · 15 comments

Comments

@bgroenks96
Copy link

Hi,
I've been working on a pull request to add a 'pause' and 'resume' feature to the message bus. Here are the methods from the new interface 'PubSubPauseSupport' with Javadoc comments to explain the concept:

/**
 * Pauses event publishing. All messages submitted via {@link #publish(Object)} will be stored in a queue until
 * {@link #resume()} is called. Any subsequent calls to this method before a call to {@link #resume()} will have no
 * effect.
 */
void pause();

/**
 * Resumes event publishing. All messages enqueued since the first call to {@link #pause()} will be subsequently
 * flushed and published in the order that they arrived. Does nothing if the runtime is not currently in a paused
 * state from a call to {@link #pause()}.
 */
void resume();

/**
 * @return true if this PubSubPauseSupport is currently paused, false otherwise.
 */
boolean isPaused();

This is especially useful in event-based applications that go through "transitions" i.e. a short period of time in which no Handler is able to receive events, yet it is problematic for inbound events to be completely lost. Being able to 'pause' the event publishing, queueing published events until it is resumed, solves this problem. There shouldn't be any performance cost on normal operation either.

The only thing that's tripping me up a bit is how to support publishAsync calls? Should AbstractSyncAsyncMessageBus keep another queue separate from the normal BlockingQueue or should that be the implementation's job?

I would appreciate any feedback, and let me know if you want me to go ahead an push the pull request so you can see the rest of the changes.

@nikoliazekter
Copy link

I want this too!

@bgroenks96
Copy link
Author

👍 Can we get some input from collaborators....?

@bennidi
Copy link
Owner

bennidi commented Oct 5, 2015

The interface looks good. But I am quite unsure about the implications for the existing implementation. I have the feeling that this feature would require a lot of changes in the core. Currently there is no awareness of running iterators. Can you outline the necessary changes that you see fit to support that feature?

@bgroenks96
Copy link
Author

It's been a while since I've looked at this... but I just pushed the changes I had made to my remote fork. So you can take a look here:

https://github.com/bgroenks96/mbassador/tree/add-event-bus-pause-resume

@bennidi
Copy link
Owner

bennidi commented Oct 25, 2015

Hi, I just took a quick look at the code you used to support the pause/resume feature. I don't have the time to make more specific comments but wouldn't it be easier to just swap in a queue that has no workers attached (pause()) and then switch back the old queue on resume(). You could drain the intermediate queue into the working queue while checking for the isPaused flag. With your current implementation you will have problems if one thread resumes the processing while another pauses it. I am also seeing other race conditions as you don't use compareAndSwap, I am not sure though if those represent synchronization bugs.

@bgroenks96
Copy link
Author

Are you referring to the 'pauseAsync' stuff or the code in AbstractPubSubPauseSupport?

I haven't fully implemented and/or reviewed how this will work with async. I actually was hoping for your input on that xD

@bennidi
Copy link
Owner

bennidi commented Oct 30, 2015

I would suggest you have a look at the mbassador-spring repository. The entire extension is based on a decorator like pattern. Instead of extending the core, the transactional bus wraps the sync-async bus, adds additional methods and delegates existing ones (adding extra code where necessary). I would advice you to take the same approach. In your method delegates to publish / publishAsync you just check for the AtomicBoolean and either route messages to your intermediate queue or to the underlying bus' method. Whenever resume is triggered you drain the queue (until paused again or queue is empty). Depending on your usecase you might need separate queues for sync and async publication. But maybe you can go with one and drain its content just into the async queue of Mbassador.

I hope I could make myself clear.

@bgroenks96
Copy link
Author

How does this look?

@bennidi
Copy link
Owner

bennidi commented Nov 2, 2015

At first glance it looks fine. Good job! But I would change/add the constructor to consume a bus instance as parameter. This way it would be possible to write adapters that mix in various flavors. E.g. you could have a ResumableAndTransactionalBus(-Adapter) -- welcome to Spring Framework naming patterns :)

I am also not sure about draining the queue using synchronous publish. This would make resume() a blocking call. Maybe it could be extended to have a boolean param to decide which publication method to use.

Regarding unit tests: They should cover concurrent scenarios (use the helpers to spin up concurrent workloads) and then check invariants. One approach would be to have handlers that synchronize on the same monitor (another atomic boolean) that is set atomically together with the pause()/resume() of the bus. The handlers throw an exception in case they receive messages while being in paused state.

@bgroenks96
Copy link
Author

Ok, I added another constructor and a new 'resumeAsync' method. How does that look?

Now you just need unit tests?

@bennidi
Copy link
Owner

bennidi commented Nov 2, 2015

The implementation of publish() and publishAsync() violates the DRY principle and needs to be refactored. Have a go with unit tests and we will see how the code works, then.

@bgroenks96
Copy link
Author

The implementation of publish() and publishAsync() violates the DRY principle and needs to be refactored.

So would you prefer to just have something like resume(boolean async)?

I think I actually find having a separate resumeAsync method slightly more clear, but I won't argue the case. I'll change it to whatever you want.

@bgroenks96
Copy link
Author

@bennidi I think I have discovered several issues with taking the spring-like wrapper pattern you suggested:

  1. It makes it impossible to extend MBassador and treat the new pause/resume bus polymorphically as such. The reason being that the constructor of AbstractSyncAsyncMessageBus makes a polymorphic call to the method getRuntime() which has to be overloaded by the wrapper subclass in order to provide the internal bus runtime to callers (see code here). This causes a NullPointerException to be thrown since the superclass constructor has not yet finished executing and thus the internal bus reference has not yet been set by the child constructor.

  2. This leaves us with the option of creating a standalone class that extends nothing but implements the appropriate interfaces and has an MBassador instance injected, much like those in your mbassador-spring repository. While I think this is a good idea in theory, it has some pitfalls in practice:
    a. It's utterly impractical for present users of the MBassador library to go through and change every MBassador reference to SuspendableMBassador just to use this feature, which they would have to do. Now you might ask, why not wrap their current MBassador instances in SuspendableMBassadors whenever they need it? That brings us to the next issues with this approach:
    b. The injection creates a deceptive relationship between SuspendableMBassador and its injected internal message bus. When the user of SuspendableMBassador calls pause() or resume(), they will expect all message publication to halt for Handlers subscribed to that message bus. However, this can only happen for messages published through the SuspendableMBassador wrapper. i.e. Every other event being published via references to the injected MBassador (so all of their existing code) will still be delivered, and there is no way for SuspendableMBassador to do anything about that. This pretty much renders the feature useless.
    c. The rest of the API is not designed with this spring-like pattern in mind. It is much more of a top-down, inheritance based pattern heavily based around the MBassador concrete implementation, and I don't see any good way to mix the two; especially as far as practical API use goes.

In conclusion, I just don't think the wrapper/injection approach is going to work. Feel free to correct me if I have missed something, though.

In the meantime, I am going to return to my original approach of injecting the PubSubPauseSupport functionality into the existing MBassador type via an AbstractPubSubPauseSupport implementation between AbstractPubSubSupport and AbstractSyncAsyncMessageBus. This seems to fall the most in line with the existing API design and will allow users to take advantage of the new pause/resume features without making any changes to their code. It also shouldn't incur any performance costs on people not using pause/resume since publish and publishAsync will more or less behave the same minus the addition of making a very cheap conditional check for the current pause state.

Let me know what you think @bennidi. I'll keep working on unit tests in the meantime.

bgroenks96 added a commit to bgroenks96/mbassador that referenced this issue Dec 29, 2015
Notes on resume() methods:
PubSubPauseSupport.resume() takes a single parameter, an enum type FlushMode that determines the behavior or subsequent calls to pause while the queue flush is in progress. The default setting is FlushMode.ATOMIC, which will engage a lock that causes calls to 'pause()' to block until resume returns. FlushMode.NONATOMIC will cause 'resume()' to flush until either the queue is empty OR 'pause()' is called again.

AsyncPubSubPauseSupport defines another enum PublishMode which differentiates between synchronous or asynchronous publishing of events on resume. The class is implemented by AbstractPauseSyncAsyncMessageBus (subclass of pre-existing AbstractSyncAsyncMessageBus) from which MBassador now inherits.

Closes bennidi#122
@bgroenks96
Copy link
Author

@bennidi It's been a while, but I am still interested to know what you think about this and about #131

@bennidi
Copy link
Owner

bennidi commented Oct 2, 2016

@bgroenks96 Sorry for this long pause in communication. I had my priorities shifted to other projects. Looking at your commit I believe that you did a good job. Can you rebase that to the current master branch and make a new PR (I made small API changes that will also affect your code). Would be great if you could add a test that uses the different types of flush modes.

I will profile the performance impact of your changes and have a look at the complete code base in my IDE. I won't promise that it makes it to the next release because I am very careful with additions to core but it looks promising and would be a great addition.

bgroenks96 added a commit to bgroenks96/mbassador that referenced this issue Oct 23, 2016
Notes on resume() methods:
PubSubPauseSupport.resume() takes a single parameter, an enum type FlushMode that determines the behavior or subsequent calls to pause while the queue flush is in progress. The default setting is FlushMode.ATOMIC, which will engage a lock that causes calls to 'pause()' to block until resume returns. FlushMode.NONATOMIC will cause 'resume()' to flush until either the queue is empty OR 'pause()' is called again.

AsyncPubSubPauseSupport defines another enum PublishMode which differentiates between synchronous or asynchronous publishing of events on resume. The class is implemented by AbstractPauseSyncAsyncMessageBus (subclass of pre-existing AbstractSyncAsyncMessageBus) from which MBassador now inherits.

Closes bennidi#122
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants