Skip to content

Commit

Permalink
Merge pull request #20 from tbressler/fix-polling
Browse files Browse the repository at this point in the history
Polling fixed and subscription improved.
  • Loading branch information
tbressler authored Jan 20, 2021
2 parents 6ba2aac + 89fb1e3 commit 9bc41a1
Show file tree
Hide file tree
Showing 66 changed files with 1,376 additions and 349 deletions.
42 changes: 22 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ discovery.stop();

If the connection to the WaterRower gets lost, the auto-discovery tries to reconnect automatically.

Please note, you can use the interface ```IDiscoveryStore``` in order to improve the performance when searching for serial ports.

### Subscribe to values

The core of the WaterRower library are the subscription. You can subscribe to the different values of the WaterRower Performance Monitor. There is a subscription for every signal available.
Expand All @@ -56,8 +54,8 @@ A simple example:
```Java

// Subscribe to events:
waterRower.subscribe(new StrokeSubscription() {
public void onStroke(StrokeType strokeType) {
waterRower.subscribe(new DisplayedDistanceSubscription() {
public void onDistanceUpdated(int distance) {
// ... do your stuff here!
}
}
Expand All @@ -70,15 +68,17 @@ The following subscriptions are available:

| Subscription | Description |
|---|---|
| ```DisplayedDistanceSubscription``` | A subscription for the *displayed distance* (in meters) on the distance window of the Performance Monitor. The value is set to zero if the Performance Monitor was reset. |
| ```DisplayedDurationSubscription``` | A subscription for the *displayed duration* on the duration window of the Performance Monitor. The duration window displays the time covered (or time to be covered in a duration workout). |
| ```AverageVelocitySubscription``` | A subscription for the *displayed average velocity* (in meters per second) on the intensity window of the Performance Monitor. |
| ```StrokeSubscription```* | A subscription for *stroke events*. The values will be send immediately by the Performance Monitor and will not be polled by the library. |
| ```StrokeCountSubscription``` | A subscription for the *stroke count* value (number of strokes). |
| ```AverageStrokeRateSubscription``` | A subscription for the *displayed stroke rate* (strokes/min) of a whole stroke which is displayed in the stroke rate window of the Performance Monitor. |
| ```TankVolumeSubscription``` | A subscription for the *tank volume* value (in liters). This is the value the user has set in the Performance Monitor (see manual). |
| ```ClockCountDownSubscription``` | A subscription for *clock count down* values. This value is transmitted if a count down is programmed in the Performance Monitor. |
| ```TotalDistanceSubscription``` | A subscription for the *total distance* values of the Performance Monitor. The value represents the total distance meter counter - this value will be reset to zero when the Performance Monitor is switched off. |
| ```DisplayedDistanceSubscription``` | Subscribes to the displayed *distance* (in meters), which is displayed in the distance window of the Performance Monitor. The value is set to zero when the Performance Monitor was reset. |
| ```DisplayedDurationSubscription``` | Subscribes to the displayed *duration*, which is the time covered (or the time to be covered in a duration workout) and shown in the duration window of the Performance Monitor. |
| ```AverageVelocitySubscription``` | Subscribes to the displayed *average velocity* (in meters per second) on the intensity window of the Performance Monitor. |
| ```StrokeSubscription```* | Subscribes to the *stroke events* (start of stroke or end of stroke). The values will be send immediately by the Performance Monitor and will not be polled by the library. |
| ```StrokeCountSubscription``` | Subscribes to the *stroke count* value (the number of strokes). |
| ```AverageStrokeRateSubscription``` | Subscribes to the displayed *stroke rate* (strokes/min) of a whole stroke which is displayed in the stroke rate window of the Performance Monitor. |
| ```TankVolumeSubscription``` | Subscribes to the *tank volume* (in liters). This is the value the user has set in the Performance Monitor (see manual). |
| ```ClockCountDownSubscription``` | Subscribes to the *clock count down*. This value is transmitted if a count down is programmed in the Performance Monitor. |
| ```TotalDistanceSubscription``` | Subscribes to the *total distance*. The value represents the total distance meter counter - this value will be reset to zero when the Performance Monitor is switched off. |
| ```WattsSubscription``` | Subscribes to the current *power value* (in watt). |
| ```TotalCaloriesSubscription``` | Subscribes to the *total calories* (in cal) burned. |
| **Advanced subscriptions:** | |
| ```PulseCountSubscription```* | A subscription for *pulse count* events. Will be called, when pulse count was updated. The value is representing the number of pulse’s counted during the last 25mS period; this value can range from 1 to 50 typically. (Zero values will not be transmitted). |
| ```TotalVelocitySubscription``` | A subscription for the *total velocity* (in meters per second). |
Expand Down Expand Up @@ -122,16 +122,18 @@ For workouts the following subscriptions are available:
| Subscription | Description |
|---|---|
| ```WorkoutFlagsSubscription``` | A subscription to observe active workouts. The returned object has flags for each workout mode. |
| ```WorkoutIntervalsSubscription``` | Subscription for the number of configured workout intervals at the Performance Monitor. |
| ```TotalWorkoutTimeSubscription``` | A subscription for values of the total workout total times. The time is updated by the WaterRower after each workout interval. |
| ```TotalWorkoutDistanceSubscription``` | A subscription for values of the total workout total distance. The distance is updated by the WaterRower after each workout interval. |
| ```TotalWorkoutStrokesSubscription``` | A subscription for values of the total workout total strokes. The stroke value is updated by the WaterRower after each workout interval. |
| ```TotalWorkoutLimitSubscription``` | A subscription for values of the total workout total limit. |
| ```WorkoutIntervalValueSubscription``` | A subscription to observe the values of the configured workout and workout intervals at the Performance Monitor. |
| ```WorkoutFlagsSubscription``` | Subscribes to the *workout flags*. The returned object has flags for the different workout modes. |
| ```WorkoutIntervalsSubscription``` | Subscribes to the *number of configured workout intervals* at the Performance Monitor. |
| ```TotalWorkoutTimeSubscription``` | Subscribes to the *total workout time*. The time is updated by the Performance Monitor after each workout interval and at the end of the workout. |
| ```TotalWorkoutDistanceSubscription``` | Subscribes to the *total workout distance*. The distance is updated by the Performance Monitor after each workout interval and at the end of the workout. |
| ```TotalWorkoutStrokesSubscription``` | Subscribes to the *total workout strokes* (number of strokes). The stroke value is updated by the Performance Monitor after each workout interval and at the end of the workout. |
| ```TotalWorkoutLimitSubscription``` | Subscribes to the *total workout limit*. |
| ```WorkoutIntervalValueSubscription``` | Subscribes to the values of the *configured workout* and *workout intervals*. |
### Find available serial ports (manually)
Not recommended, but for the sake of completeness:
```Java
// Get all available serial ports:
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/de/tbressler/waterrower/WaterRower.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import de.tbressler.waterrower.model.ErrorCode;
import de.tbressler.waterrower.model.ModelInformation;
import de.tbressler.waterrower.subscriptions.ISubscription;
import de.tbressler.waterrower.subscriptions.SubscriptionPollingService;
import de.tbressler.waterrower.subscriptions.ISubscriptionPollingService;
import de.tbressler.waterrower.watchdog.DeviceVerificationWatchdog;
import de.tbressler.waterrower.watchdog.ITimeoutListener;
import de.tbressler.waterrower.watchdog.PingWatchdog;
Expand Down Expand Up @@ -47,7 +47,7 @@ public class WaterRower {
private final WaterRowerConnector connector;

/* Polls and handles subscriptions. */
private final SubscriptionPollingService subscriptionPollingService;
private final ISubscriptionPollingService subscriptionPollingService;

/* Watchdog that checks if a ping is received periodically. */
private final PingWatchdog pingWatchdog;
Expand Down Expand Up @@ -105,7 +105,7 @@ public void onError() {
* WaterRower monitor.
*/
public WaterRower() {
this(new WaterRowerInitializer(Duration.ofSeconds(1), Duration.ofSeconds(5), 5));
this(new WaterRowerInitializer(Duration.ofSeconds(5), 5));
}

/**
Expand Down Expand Up @@ -141,7 +141,7 @@ public WaterRower(WaterRowerInitializer initializer) {
WaterRower(WaterRowerConnector connector,
PingWatchdog pingWatchdog,
DeviceVerificationWatchdog deviceVerificationWatchdog,
SubscriptionPollingService subscriptionPollingService) {
ISubscriptionPollingService subscriptionPollingService) {

this.connector = requireNonNull(connector);
this.connector.addConnectionListener(connectionListener);
Expand Down
21 changes: 6 additions & 15 deletions src/main/java/de/tbressler/waterrower/WaterRowerInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import de.tbressler.waterrower.io.ChannelInitializer;
import de.tbressler.waterrower.io.CommunicationService;
import de.tbressler.waterrower.io.WaterRowerConnector;
import de.tbressler.waterrower.log.Log;
import de.tbressler.waterrower.subscriptions.ISubscriptionPollingService;
import de.tbressler.waterrower.subscriptions.SubscriptionPollingService;
import de.tbressler.waterrower.watchdog.DeviceVerificationWatchdog;
import de.tbressler.waterrower.watchdog.PingWatchdog;
Expand All @@ -27,7 +27,7 @@ public class WaterRowerInitializer {
private final WaterRowerConnector connector;

/* Polls and handles subscriptions. */
private final SubscriptionPollingService subscriptionPolling;
private final ISubscriptionPollingService subscriptionPolling;

/* Watchdog that checks if a ping is received periodically. */
private final PingWatchdog pingWatchdog;
Expand All @@ -40,33 +40,24 @@ public class WaterRowerInitializer {
/**
* Initializes the dependencies of the WaterRower class based on the given parameters.
*
* @param pollingInterval The polling interval for the subscriptions, must not be null.
* Recommended = 1 second.
* @param timeoutInterval The timeout interval for messages, if a message was not received from the WaterRower
* during this interval a timeout error will get fired, must not be null.
* Recommended = 5 second.
* @param threadPoolSize The number of threads to keep in the pool, which should be used by the WaterRower
* service even if they are idle.
* Recommended = 5.
*/
public WaterRowerInitializer(Duration pollingInterval, Duration timeoutInterval, int threadPoolSize) {
requireNonNull(pollingInterval);
public WaterRowerInitializer(Duration timeoutInterval, int threadPoolSize) {
requireNonNull(timeoutInterval);
if (threadPoolSize < 1)
throw new IllegalArgumentException("The number of thread must be at least 1!");

// Log a warning if the polling interval is below 1 second.
if (pollingInterval.toMillis() < 1000)
Log.warn("Recommendation: The polling interval should be greater or equal to 1 second in order to avoid performance issues.");
throw new IllegalArgumentException("The number of threads must be at least 1!");

Bootstrap bootstrap = new Bootstrap();

CommunicationService communicationService = new CommunicationService(bootstrap, new ChannelInitializer());

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(threadPoolSize);

connector = new WaterRowerConnector(communicationService);
subscriptionPolling = new SubscriptionPollingService(pollingInterval, connector, executorService);
subscriptionPolling = new SubscriptionPollingService(connector, executorService);
pingWatchdog = new PingWatchdog(timeoutInterval, executorService);
deviceVerificationWatchdog = new DeviceVerificationWatchdog(timeoutInterval, executorService);
}
Expand Down Expand Up @@ -105,7 +96,7 @@ DeviceVerificationWatchdog getDeviceVerificationWatchdog() {
*
* @return The subscription polling service, never null.
*/
SubscriptionPollingService getSubscriptionPollingService() {
ISubscriptionPollingService getSubscriptionPollingService() {
return subscriptionPolling;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.time.Duration.ofSeconds;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;

/**
* Handles the auto-discovery of the WaterRower.
Expand All @@ -40,9 +41,6 @@ public class WaterRowerAutoDiscovery {
/* The WaterRower. */
private final WaterRower waterRower;

/* The discovery store. */
private final IDiscoveryStore discoveryStore;

/* The executor service. */
private final ScheduledExecutorService executorService;

Expand All @@ -68,12 +66,7 @@ public class WaterRowerAutoDiscovery {

@Override
public void onConnected(ModelInformation modelInformation) {

Log.debug("WaterRower successfully connected.");

// Remember the last successful serial port, in order
// to speed up connection next time.
discoveryStore.setLastSuccessfulSerialPort(currentSerialPort);
}

@Override
Expand All @@ -88,49 +81,27 @@ public void onError(ErrorCode errorCode) {}
};


/**
* Handles the auto-discovery of the WaterRower.
*
* @param waterRower The WaterRower, must not be null.
* @param discoveryStore The store for successful serial ports.
* @param executorService The executor service, must not be null.
*/
public WaterRowerAutoDiscovery(WaterRower waterRower, IDiscoveryStore discoveryStore, ScheduledExecutorService executorService) {
this(waterRower, discoveryStore, executorService, new SerialPortWrapper());
}

/**
* Handles the auto-discovery of the WaterRower.
*
* @param waterRower The WaterRower, must not be null.
* @param executorService The executor service, must not be null.
*/
public WaterRowerAutoDiscovery(WaterRower waterRower, ScheduledExecutorService executorService) {
this(waterRower, new IDiscoveryStore() {

@Override
public void setLastSuccessfulSerialPort(String serialPort) {}

@Override
public String getLastSuccessfulSerialPort() {
return null;
}

}, executorService);
this(waterRower, executorService, new SerialPortWrapper());
}


/**
* Handles the auto-discovery of the WaterRower.
*
* @param waterRower The WaterRower, must not be null.
* @param discoveryStore The store for successful serial ports.
* @param executorService The executor service, must not be null.
* @param serialPortWrapper The serial port wrapper, must not be null.
*/
WaterRowerAutoDiscovery(WaterRower waterRower, IDiscoveryStore discoveryStore, ScheduledExecutorService executorService, SerialPortWrapper serialPortWrapper) {
WaterRowerAutoDiscovery(WaterRower waterRower, ScheduledExecutorService executorService, SerialPortWrapper serialPortWrapper) {
this.waterRower = requireNonNull(waterRower);
this.waterRower.addConnectionListener(connectionListener);
this.discoveryStore = requireNonNull(discoveryStore);
this.executorService = requireNonNull(executorService);
this.serialPortWrapper = requireNonNull(serialPortWrapper);
}
Expand All @@ -145,6 +116,7 @@ public void start() {
executorService.submit(() -> tryNextConnectionAttempt());
}


/* Try the next connection attempt. */
private void tryNextConnectionAttempt() {
lock.lock();
Expand Down Expand Up @@ -185,53 +157,28 @@ private void updateAvailablePorts() {

Log.debug("Updating list of available serial ports.");

// Get all available serial ports:
List<AvailablePort> availablePorts = serialPortWrapper.getAvailablePorts();

for(AvailablePort port : availablePorts) {

// Get all available serial ports. Additionally filter out every useless port and
// sort promising ports to the top of the list. Thus the performance of the
// auto-discovery is increased.
List<AvailablePort> availablePorts = serialPortWrapper.getAvailablePorts().stream().filter((port) -> {
String portName = port.getSystemPortName();

// Ignore /dev/cu ports.
if (portName.startsWith("/dev/cu."))
continue;

// Ignore cu ports.
if (portName.startsWith("cu."))
continue;

// Ignore bluetooth ports on macOS.
if (portName.contains("Bluetooth") || portName.contains("BT"))
continue;

if (port.isOpen()) {
Log.warn("Skipping serial port '"+portName+"', because it is already open.");
continue;
}

this.availablePorts.push(new SerialDeviceAddress(portName));

Log.info("Serial port found: " + portName);
}

putLastSuccessfulPortFirst();
}

/* Add the last successful serial port, in order to speed up connection. */
private void putLastSuccessfulPortFirst() {

String lastSuccessfulPort = discoveryStore.getLastSuccessfulSerialPort();

if (lastSuccessfulPort == null)
return;

SerialDeviceAddress lastAddress = new SerialDeviceAddress(lastSuccessfulPort);

boolean wasRemoved = availablePorts.removeIf(address -> address.value().equals(lastSuccessfulPort));
if (!wasRemoved)
return;

availablePorts.push(lastAddress);
return (!portName.startsWith("/dev/cu.") && !portName.startsWith("cu.")
&& !portName.contains("Bluetooth") && !portName.contains("BT")
&& !port.isOpen());
}).sorted((port1, port2) -> {
boolean description1 = port1.getDescription().contains("WR-S") || port1.getDescription().contains("Microchip Technology");
boolean description2 = port2.getDescription().contains("WR-S") || port2.getDescription().contains("Microchip Technology");
if (description1 && description2) return 0;
if (description1) return 1;
if (description2) return -1;
return 0;
}).collect(toList());

// Add the new ports to the top of the available port stack (only these ports
// are used for auto-discovery).
this.availablePorts.addAll(availablePorts.stream()
.map((port) -> new SerialDeviceAddress(port.getSystemPortName()))
.collect(toList()));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void send(AbstractMessage msg) throws IOException {

Log.debug("Sending message '" + msg.toString() + "'.");

currentChannel.write(msg);
currentChannel.writeAndFlush(msg);

} catch (Exception e) {
throw new IOException("Can not send message '"+msg+"'!", e);
Expand Down
Loading

0 comments on commit 9bc41a1

Please sign in to comment.