Skip to content

Commit

Permalink
Enable ZooKeeper client to establish connection in read-only mode
Browse files Browse the repository at this point in the history
  • Loading branch information
massakam committed Mar 25, 2024
1 parent 15171e1 commit 55dee7e
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ public ZooKeeperClient build() throws IOException, KeeperException, InterruptedE

// Create a watcher manager
StatsLogger watcherStatsLogger = statsLogger.scope("watcher");
ZooKeeperWatcherBase watcherManager =
null == watchers ? new ZooKeeperWatcherBase(sessionTimeoutMs, watcherStatsLogger) :
new ZooKeeperWatcherBase(sessionTimeoutMs, watchers, watcherStatsLogger);
ZooKeeperWatcherBase watcherManager = (null == watchers)
? new ZooKeeperWatcherBase(sessionTimeoutMs, allowReadOnlyMode, watcherStatsLogger)
: new ZooKeeperWatcherBase(sessionTimeoutMs, allowReadOnlyMode, watchers, watcherStatsLogger);
ZooKeeperClient client = new ZooKeeperClient(
connectString,
sessionTimeoutMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class ZooKeeperWatcherBase implements Watcher {
.getLogger(ZooKeeperWatcherBase.class);

private final int zkSessionTimeOut;
private final boolean allowReadOnlyMode;
private volatile CountDownLatch clientConnectLatch = new CountDownLatch(1);
private final CopyOnWriteArraySet<Watcher> childWatchers =
new CopyOnWriteArraySet<Watcher>();
Expand All @@ -53,18 +54,20 @@ public class ZooKeeperWatcherBase implements Watcher {
private final ConcurrentHashMap<EventType, Counter> eventCounters =
new ConcurrentHashMap<EventType, Counter>();

public ZooKeeperWatcherBase(int zkSessionTimeOut) {
this(zkSessionTimeOut, NullStatsLogger.INSTANCE);
public ZooKeeperWatcherBase(int zkSessionTimeOut, boolean allowReadOnlyMode) {
this(zkSessionTimeOut, allowReadOnlyMode, NullStatsLogger.INSTANCE);
}

public ZooKeeperWatcherBase(int zkSessionTimeOut, StatsLogger statsLogger) {
this(zkSessionTimeOut, new HashSet<Watcher>(), statsLogger);
public ZooKeeperWatcherBase(int zkSessionTimeOut, boolean allowReadOnlyMode, StatsLogger statsLogger) {
this(zkSessionTimeOut, allowReadOnlyMode, new HashSet<Watcher>(), statsLogger);
}

public ZooKeeperWatcherBase(int zkSessionTimeOut,
boolean allowReadOnlyMode,
Set<Watcher> childWatchers,
StatsLogger statsLogger) {
this.zkSessionTimeOut = zkSessionTimeOut;
this.allowReadOnlyMode = allowReadOnlyMode;
this.childWatchers.addAll(childWatchers);
this.statsLogger = statsLogger;
}
Expand Down Expand Up @@ -130,6 +133,14 @@ public void process(WatchedEvent event) {
LOG.info("ZooKeeper client is connected now.");
clientConnectLatch.countDown();
break;
case ConnectedReadOnly:
if (allowReadOnlyMode) {
LOG.info("ZooKeeper client is connected in read-only mode now.");
clientConnectLatch.countDown();
} else {
LOG.warn("ZooKeeper client is connected in read-only mode, which is not allowed.");
}
break;
case Disconnected:
LOG.info("ZooKeeper client is disconnected from zookeeper now,"
+ " but it is OK unless we received EXPIRED event.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void run() {
byte[] sessionPasswd = bkc.getZkHandle().getSessionPasswd();

try {
ZooKeeperWatcherBase watcher = new ZooKeeperWatcherBase(10000);
ZooKeeperWatcherBase watcher = new ZooKeeperWatcherBase(10000, false);
ZooKeeper zk = new ZooKeeper(zkUtil.getZooKeeperConnectString(), 10000,
watcher, sessionId, sessionPasswd);
zk.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ protected ZooKeeper createZooKeeper() throws IOException {
public void testZKConnectionLossForLedgerCreation() throws Exception {
int zkSessionTimeOut = 10000;
AtomicLong ledgerIdToInjectFailure = new AtomicLong(INVALID_LEDGERID);
ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut,
ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut, false,
NullStatsLogger.INSTANCE);
MockZooKeeperClient zkFaultInjectionWrapper = new MockZooKeeperClient(zkUtil.getZooKeeperConnectString(),
zkSessionTimeOut, zooKeeperWatcherBase, ledgerIdToInjectFailure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ public void testRWShutDownInTheCaseOfZKOperationFailures() throws Exception {
* create MockZooKeeperClient instance and wait for it to be connected.
*/
int zkSessionTimeOut = 10000;
ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut,
ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut, false,
NullStatsLogger.INSTANCE);
MockZooKeeperClient zkFaultInjectionWrapper = new MockZooKeeperClient(zkUtil.getZooKeeperConnectString(),
zkSessionTimeOut, zooKeeperWatcherBase);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void sleepCluster(int time, TimeUnit timeUnit, CountDownLatch l)
default void expireSession(ZooKeeper zk) throws Exception {
long id = zk.getSessionId();
byte[] password = zk.getSessionPasswd();
ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000);
ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000, false);
ZooKeeper zk2 = new ZooKeeper(getZooKeeperConnectString(), zk.getSessionTimeout(), w, id, password);
w.waitForConnection();
zk2.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,12 @@ public void killCluster() throws Exception {
public void sleepCluster(int time, TimeUnit timeUnit, CountDownLatch l) throws InterruptedException, IOException {
throw new UnsupportedOperationException("sleepServer operation is not supported for ZooKeeperClusterUtil");
}

public void stopPeer(int id) throws Exception {
quorumUtil.shutdown(id);
}

public void enableLocalSession(boolean localSessionEnabled) {
quorumUtil.enableLocalSession(localSessionEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void process(WatchedEvent event) {
};
final int timeout = 2000;
ZooKeeperWatcherBase watcherManager =
new ZooKeeperWatcherBase(timeout).addChildWatcher(testWatcher);
new ZooKeeperWatcherBase(timeout, false).addChildWatcher(testWatcher);
List<Watcher> watchers = new ArrayList<Watcher>(1);
watchers.add(testWatcher);
ZooKeeperClient client = new ShutdownZkServerClient(
Expand Down Expand Up @@ -895,4 +895,32 @@ public void processResult(int rc, String path, Object ctx) {
logger.info("Delete children from znode " + path);
}

@Test
public void testAllowReadOnlyMode() throws Exception {
if (zkUtil instanceof ZooKeeperClusterUtil) {
System.setProperty("readonlymode.enabled", "true");
((ZooKeeperClusterUtil) zkUtil).enableLocalSession(true);
zkUtil.restartCluster();
Thread.sleep(2000);
((ZooKeeperClusterUtil) zkUtil).stopPeer(2);
((ZooKeeperClusterUtil) zkUtil).stopPeer(3);
}

try (ZooKeeperClient client = ZooKeeperClient.newBuilder()
.connectString(zkUtil.getZooKeeperConnectString())
.sessionTimeoutMs(100000)
.watchers(new HashSet<Watcher>())
.operationRetryPolicy(retryPolicy)
.allowReadOnlyMode(true)
.build()) {
Assert.assertTrue("Client failed to connect a ZooKeeper in read-only mode.",
client.getState().isConnected());
} finally {
if (zkUtil instanceof ZooKeeperClusterUtil) {
System.setProperty("readonlymode.enabled", "false");
((ZooKeeperClusterUtil) zkUtil).enableLocalSession(false);
}
}
}

}

0 comments on commit 55dee7e

Please sign in to comment.