Skip to content

Package: ZKClient$RegionWatcher

ZKClient$RegionWatcher

nameinstructionbranchcomplexitylinemethod
ZKClient.RegionWatcher(ZKClient, String)
M: 36 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 10 C: 0
0%
M: 1 C: 0
0%
clearSynchronization()
M: 31 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 11 C: 0
0%
M: 1 C: 0
0%
process(WatchedEvent)
M: 619 C: 0
0%
M: 76 C: 0
0%
M: 39 C: 0
0%
M: 97 C: 0
0%
M: 1 C: 0
0%
scheduleCommit(WatchedEvent, String, String, byte[], Stat)
M: 141 C: 0
0%
M: 16 C: 0
0%
M: 9 C: 0
0%
M: 17 C: 0
0%
M: 1 C: 0
0%
toString()
M: 14 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*
2: * Copyright (c) 2012, 2017 Oracle and/or its affiliates. All rights reserved.
3: *
4: * This program and the accompanying materials are made available under the
5: * terms of the Eclipse Public License v. 2.0, which is available at
6: * http://www.eclipse.org/legal/epl-2.0.
7: *
8: * This Source Code may also be made available under the following Secondary
9: * Licenses when the conditions for such availability set forth in the
10: * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
11: * version 2 with the GNU Classpath Exception, which is available at
12: * https://www.gnu.org/software/classpath/license.html.
13: *
14: * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
15: */
16:
17: package org.glassfish.grizzly.thrift.client.zookeeper;
18:
19: import java.io.IOException;
20: import java.lang.management.ManagementFactory;
21: import java.util.ArrayList;
22: import java.util.Arrays;
23: import java.util.Date;
24: import java.util.HashSet;
25: import java.util.List;
26: import java.util.Map;
27: import java.util.Set;
28: import java.util.concurrent.Callable;
29: import java.util.concurrent.ConcurrentHashMap;
30: import java.util.concurrent.Executors;
31: import java.util.concurrent.ScheduledExecutorService;
32: import java.util.concurrent.ScheduledFuture;
33: import java.util.concurrent.TimeUnit;
34: import java.util.concurrent.atomic.AtomicBoolean;
35: import java.util.concurrent.locks.Condition;
36: import java.util.concurrent.locks.Lock;
37: import java.util.concurrent.locks.ReentrantLock;
38: import java.util.logging.Level;
39: import java.util.logging.Logger;
40:
41: import org.apache.zookeeper.CreateMode;
42: import org.apache.zookeeper.KeeperException;
43: import org.apache.zookeeper.WatchedEvent;
44: import org.apache.zookeeper.Watcher;
45: import org.apache.zookeeper.ZooDefs;
46: import org.apache.zookeeper.ZooKeeper;
47: import org.apache.zookeeper.data.ACL;
48: import org.apache.zookeeper.data.Stat;
49: import org.glassfish.grizzly.Grizzly;
50:
51: /**
52: * Zookeeper client implementation for barrier and recoverable operation
53: * <p>
54: * All operations will be executed on the valid connection because the failed
55: * connection will be reconnected automatically. This has Barrier function.
56: * {@link BarrierListener} can be registered with a specific region and initial
57: * data by the {@link #registerBarrier} method and unregistered by the
58: * {@link #unregisterBarrier} method. If the zookeeper server doesn't have the
59: * data node, the given initial data will be set in the server when the
60: * {@link #registerBarrier} method is called. If the specific data will be
61: * changed in remote zookeeper server, all clients which have joined will
62: * receive changes simultaneously. If all clients receive changes successfully,
63: * {@link BarrierListener#onCommit} will be called simultaneously at the
64: * scheduled time(data modification time + {@code commitDelayTimeInSecs}).
65: * <p>
66: * This also supports some safe APIs which is similar to original
67: * {@link ZooKeeper}'s APIs like create, delete, exists, getChildren, getData
68: * and setData.
69: * <p>
70: * Examples of barrier's use: {@code
71: * // initial
72: * final ZKClient.Builder builder = new ZKClient.Builder("myZookeeperClient", "localhost:2181");
73: * builder.rootPath(ROOT).connectTimeoutInMillis(3000).sessionTimeoutInMillis(30000).commitDelayTimeInSecs(60);
74: * final ZKClient zkClient = builder.build();
75: * zkClient.connect()
76: * final String registeredPath = zkClient.registerBarrier( "user", myListener, initData );
77: * // ...
78: * // cleanup
79: * zkClient.unregisterBarrier( "user" );
80: * zkClient.shutdown();
81: * }
82: * <p>
83: * [NOTE] Zookeeper already guides some simple barrier examples:
84: * http://zookeeper.apache.org/doc/r3.3.4/zookeeperTutorial.html
85: * http://code.google.com/p/good-samples/source/browse/trunk/zookeeper-3.x/src/main/java/com/googlecode/goodsamples/zookeeper/barrier/Barrier.java
86: * <p>
87: * But, their examples have a race condision issue:
88: * https://issues.apache.org/jira/browse/ZOOKEEPER-1011
89: *
90: * @author Bongjae Chang
91: */
92: public class ZKClient {
93:
94: private static final Logger logger = Grizzly.logger(ZKClient.class);
95:
96: private static final String JVM_AND_HOST_UNIQUE_ID = ManagementFactory.getRuntimeMXBean().getName();
97: private static final int RETRY_COUNT_UNTIL_CONNECTED = 3;
98: /**
99: * Path information: /root/thrift/region_name/current/(client1, client2, ...)
100: * /root/thrift/region_name/data /root/thrift/region_name/participants/(client1,
101: * client2, ...)
102: */
103: private static final String BASE_PATH = "/thrift";
104: private static final String CURRENT_PATH = "/current";
105: private static final String DATA_PATH = "/data";
106: private static final String PARTICIPANTS_PATH = "/participants";
107: private static final byte[] NO_DATA = new byte[0];
108:
109: private final Lock lock = new ReentrantLock();
110: private final Condition lockCondition = lock.newCondition();
111: private final AtomicBoolean reconnectingFlag = new AtomicBoolean(false);
112: private boolean connected;
113: private Watcher.Event.KeeperState currentState;
114: private AtomicBoolean running = new AtomicBoolean(true);
115: private final Map<String, BarrierListener> listenerMap = new ConcurrentHashMap<String, BarrierListener>();
116: private final ScheduledExecutorService scheduledExecutor;
117: private ZooKeeper zooKeeper;
118:
119: private final String uniqueId;
120: private final String uniqueIdPath;
121: private final String basePath;
122:
123: private final String name;
124: private final String zooKeeperServerList;
125: private final long connectTimeoutInMillis;
126: private final long sessionTimeoutInMillis;
127: private final String rootPath;
128: private final long commitDelayTimeInSecs;
129:
130: private ZKClient(final Builder builder) {
131: this.name = builder.name;
132: this.uniqueId = JVM_AND_HOST_UNIQUE_ID + "_" + this.name;
133: this.uniqueIdPath = normalizePath(this.uniqueId);
134: this.rootPath = normalizePath(builder.rootPath);
135: this.basePath = this.rootPath + BASE_PATH;
136:
137: this.zooKeeperServerList = builder.zooKeeperServerList;
138: this.connectTimeoutInMillis = builder.connectTimeoutInMillis;
139: this.sessionTimeoutInMillis = builder.sessionTimeoutInMillis;
140: this.commitDelayTimeInSecs = builder.commitDelayTimeInSecs;
141:
142: this.scheduledExecutor = Executors.newScheduledThreadPool(7);
143: }
144:
145: /**
146: * Connect this client to the zookeeper server
147: * <p>
148: * this method will wait for {@link Watcher.Event.KeeperState#SyncConnected}
149: * from the zookeeper server.
150: *
151: * @throws IOException the io exception of internal ZooKeeper
152: * @throws InterruptedException the interrupted exception of internal ZooKeeper
153: * @return true if the client is connected. Otherwise false
154: */
155: public boolean connect() throws IOException, InterruptedException {
156: lock.lock();
157: try {
158: if (connected) {
159: return true;
160: }
161: zooKeeper = new ZooKeeper(zooKeeperServerList, (int) sessionTimeoutInMillis, new InternalWatcher(new Watcher() {
162:
163: @Override
164: public void process(WatchedEvent event) {
165: }
166: }));
167: if (!ensureConnected(connectTimeoutInMillis)) {
168: zooKeeper.close();
169: currentState = Watcher.Event.KeeperState.Disconnected;
170: connected = false;
171: } else {
172: connected = true;
173: if (logger.isLoggable(Level.FINE)) {
174: logger.log(Level.FINE, "connected the zookeeper server successfully");
175: }
176: }
177: return connected;
178: } finally {
179: lock.unlock();
180: }
181: }
182:
183: private void close() {
184: lock.lock();
185: try {
186: if (!connected) {
187: return;
188: }
189: if (zooKeeper != null) {
190: try {
191: zooKeeper.close();
192: } catch (InterruptedException ignore) {
193: }
194: }
195: currentState = Watcher.Event.KeeperState.Disconnected;
196: connected = false;
197: } finally {
198: lock.unlock();
199: }
200: if (logger.isLoggable(Level.FINE)) {
201: logger.log(Level.FINE, "closed successfully");
202: }
203: }
204:
205: private void reconnect() throws IOException, InterruptedException {
206: if (logger.isLoggable(Level.FINE)) {
207: logger.log(Level.FINE, "trying to reconnect the zookeeper server");
208: }
209: final boolean localReconnectingFlag = reconnectingFlag.get();
210: lock.lock();
211: try {
212: if (!reconnectingFlag.compareAndSet(localReconnectingFlag, !localReconnectingFlag)) {
213: // prevent duplicated trials
214: return;
215: }
216: close();
217: if (connect()) {
218: // register ephemeral node and watcher again
219: for (final String regionName : listenerMap.keySet()) {
220: registerEphemeralNodeAndWatcher(regionName);
221: }
222: }
223: } finally {
224: lock.unlock();
225: }
226: if (logger.isLoggable(Level.INFO)) {
227: logger.log(Level.INFO, "reconnected the zookeeper server successfully");
228: }
229: }
230:
231: /**
232: * Close this client
233: */
234: public void shutdown() {
235: if (!running.compareAndSet(true, false)) {
236: if (logger.isLoggable(Level.FINE)) {
237: logger.log(Level.FINE, "shutting down or already shutted down");
238: }
239: return;
240: }
241: listenerMap.clear();
242: close();
243: if (scheduledExecutor != null) {
244: scheduledExecutor.shutdown();
245: }
246: if (logger.isLoggable(Level.FINE)) {
247: logger.log(Level.FINE, "shutted down successfully");
248: }
249: }
250:
251: /**
252: * Register the specific barrier
253: *
254: * @param regionName specific region name
255: * @param listener {@link BarrierListener} implementations
256: * @param initialData initial data. if the zookeeper server doesn't have any
257: * data, this will be set. "null" means {@code NO_DATA} which is byte[0].
258: * @return the registered data path of the zookeeper server
259: */
260: public String registerBarrier(final String regionName, final BarrierListener listener, final byte[] initialData) {
261: if (regionName == null) {
262: throw new IllegalArgumentException("region name must not be null");
263: }
264: if (listener == null) {
265: throw new IllegalArgumentException("listener must not be null");
266: }
267: listenerMap.put(regionName, listener);
268:
269: // ensure all paths exist
270: createWhenThereIsNoNode(rootPath, NO_DATA, CreateMode.PERSISTENT); // ensure root path
271: createWhenThereIsNoNode(basePath, NO_DATA, CreateMode.PERSISTENT); // ensure base path
272: final String currentRegionPath = basePath + normalizePath(regionName);
273: createWhenThereIsNoNode(currentRegionPath, NO_DATA, CreateMode.PERSISTENT); // ensure my region path
274: createWhenThereIsNoNode(currentRegionPath + CURRENT_PATH, NO_DATA, CreateMode.PERSISTENT); // ensure nodes path
275: createWhenThereIsNoNode(currentRegionPath + PARTICIPANTS_PATH, NO_DATA, CreateMode.PERSISTENT); // ensure participants path
276:
277: final String currentDataPath = currentRegionPath + DATA_PATH;
278: final boolean dataCreated = createWhenThereIsNoNode(currentDataPath, initialData == null ? NO_DATA : initialData,
279: CreateMode.PERSISTENT); // ensure data path
280: if (!dataCreated) { // if the remote data already exists
281: if (logger.isLoggable(Level.INFO)) {
282: logger.log(Level.INFO, "the central data exists in the zookeeper server");
283: }
284: final byte[] remoteDataBytes = getData(currentDataPath, false, null);
285: try {
286: listener.onInit(regionName, currentDataPath, remoteDataBytes);
287: } catch (Exception e) {
288: if (logger.isLoggable(Level.WARNING)) {
289: logger.log(Level.WARNING, "failed to onInit. name=" + name + ", regionName=" + regionName + ", listener=" + listener,
290: e);
291: }
292: }
293: } else {
294: if (logger.isLoggable(Level.INFO)) {
295: logger.log(Level.INFO, "initial data was set because there was no remote data in the zookeeper server. initialData={0}",
296: initialData);
297: }
298: try {
299: listener.onInit(regionName, currentDataPath, null);
300: } catch (Exception e) {
301: if (logger.isLoggable(Level.WARNING)) {
302: logger.log(Level.WARNING, "failed to onInit. name=" + name + ", regionName=" + regionName + ", listener=" + listener,
303: e);
304: }
305: }
306: }
307: registerEphemeralNodeAndWatcher(regionName);
308: if (logger.isLoggable(Level.FINE)) {
309: logger.log(Level.FINE, "the path \"{0}\" will be watched. name={1}, regionName={2}",
310: new Object[] { name, currentDataPath, regionName });
311: }
312: return currentDataPath;
313: }
314:
315: private void registerEphemeralNodeAndWatcher(final String regionName) {
316: if (regionName == null) {
317: return;
318: }
319: final String currentRegionPath = basePath + normalizePath(regionName);
320: final String currentDataPath = currentRegionPath + DATA_PATH;
321: createWhenThereIsNoNode(currentRegionPath + CURRENT_PATH + uniqueIdPath, NO_DATA, CreateMode.EPHEMERAL); // register own node path
322: // register the watcher for detecting the data's changes
323: exists(currentDataPath, new RegionWatcher(regionName));
324: }
325:
326: private boolean createWhenThereIsNoNode(final String path, final byte[] data, final CreateMode createMode) {
327: if (exists(path, false) != null) {
328: return false;
329: }
330: create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
331: return true;
332: }
333:
334: /**
335: * Unregister the listener which was registered by {@link #registerBarrier}
336: *
337: * @param regionName specific region name
338: */
339: public void unregisterBarrier(final String regionName) {
340: if (regionName == null) {
341: return;
342: }
343: final BarrierListener listener = listenerMap.remove(regionName);
344: if (listener != null) {
345: try {
346: listener.onDestroy(regionName);
347: } catch (Exception e) {
348: if (logger.isLoggable(Level.WARNING)) {
349: logger.log(Level.WARNING, "failed to onDestroy. name=" + name + ", regionName=" + regionName + ", listener=" + listener,
350: e);
351: }
352: }
353: }
354: }
355:
356: public String create(final String path, final byte[] data, final List<ACL> acl, final CreateMode createMode) {
357: if (zooKeeper == null) {
358: if (logger.isLoggable(Level.WARNING)) {
359: logger.log(Level.WARNING,
360: "this client has not been connected. please call ZKClient#connect() method before calling this create()");
361: }
362: return null;
363: }
364: try {
365: return retryUntilConnected(new Callable<String>() {
366: @Override
367: public String call() throws Exception {
368: return zooKeeper.create(path, data, acl, createMode);
369: }
370: });
371: } catch (Exception e) {
372: if (logger.isLoggable(Level.SEVERE)) {
373: logger.log(Level.SEVERE, "failed to do \"create\". path=" + path + ", data=" + Arrays.toString(data), e);
374: }
375: return null;
376: }
377: }
378:
379: public Stat exists(final String path, final boolean watch) {
380: if (zooKeeper == null) {
381: if (logger.isLoggable(Level.WARNING)) {
382: logger.log(Level.WARNING,
383: "this client has not been connected. please call ZKClient#connect() method before calling this exists()");
384: }
385: return null;
386: }
387: try {
388: return retryUntilConnected(new Callable<Stat>() {
389: @Override
390: public Stat call() throws Exception {
391: return zooKeeper.exists(path, watch);
392: }
393: });
394: } catch (Exception e) {
395: if (logger.isLoggable(Level.SEVERE)) {
396: logger.log(Level.SEVERE, "failed to do \"exists\". path=" + path, e);
397: }
398: return null;
399: }
400: }
401:
402: public Stat exists(final String path, final Watcher watch) {
403: if (zooKeeper == null) {
404: if (logger.isLoggable(Level.WARNING)) {
405: logger.log(Level.WARNING,
406: "this client has not been connected. please call ZKClient#connect() method before calling this exists()");
407: }
408: return null;
409: }
410: try {
411: return retryUntilConnected(new Callable<Stat>() {
412: @Override
413: public Stat call() throws Exception {
414: return zooKeeper.exists(path, new InternalWatcher(watch));
415: }
416: });
417: } catch (Exception e) {
418: if (logger.isLoggable(Level.SEVERE)) {
419: logger.log(Level.SEVERE, "failed to do \"exists\". path=" + path, e);
420: }
421: return null;
422: }
423: }
424:
425: public List<String> getChildren(final String path, final boolean watch) {
426: if (zooKeeper == null) {
427: if (logger.isLoggable(Level.WARNING)) {
428: logger.log(Level.WARNING,
429: "this client has not been connected. please call ZKClient#connect() method before calling this getChildren()");
430: }
431: return null;
432: }
433: try {
434: return retryUntilConnected(new Callable<List<String>>() {
435: @Override
436: public List<String> call() throws Exception {
437: return zooKeeper.getChildren(path, watch);
438: }
439: });
440: } catch (Exception e) {
441: if (logger.isLoggable(Level.SEVERE)) {
442: logger.log(Level.SEVERE, "failed to do \"getChildren\". path=" + path, e);
443: }
444: return null;
445: }
446: }
447:
448: public List<String> getChildren(final String path, final Watcher watcher) {
449: if (zooKeeper == null) {
450: if (logger.isLoggable(Level.WARNING)) {
451: logger.log(Level.WARNING,
452: "this client has not been connected. please call ZKClient#connect() method before calling this getChildren()");
453: }
454: return null;
455: }
456: try {
457: return retryUntilConnected(new Callable<List<String>>() {
458: @Override
459: public List<String> call() throws Exception {
460: return zooKeeper.getChildren(path, new InternalWatcher(watcher));
461: }
462: });
463: } catch (Exception e) {
464: if (logger.isLoggable(Level.SEVERE)) {
465: logger.log(Level.SEVERE, "failed to do \"getChildren\". path=" + path, e);
466: }
467: return null;
468: }
469: }
470:
471: public byte[] getData(final String path, final boolean watch, final Stat stat) {
472: if (zooKeeper == null) {
473: if (logger.isLoggable(Level.WARNING)) {
474: logger.log(Level.WARNING,
475: "this client has not been connected. please call ZKClient#connect() method before calling this getData()");
476: }
477: return null;
478: }
479: try {
480: return retryUntilConnected(new Callable<byte[]>() {
481: @Override
482: public byte[] call() throws Exception {
483: return zooKeeper.getData(path, watch, stat);
484: }
485: });
486: } catch (Exception e) {
487: if (logger.isLoggable(Level.SEVERE)) {
488: logger.log(Level.SEVERE, "failed to do \"getData\". path=" + path, e);
489: }
490: return null;
491: }
492: }
493:
494: public byte[] getData(final String path, final Watcher watcher) {
495: if (zooKeeper == null) {
496: if (logger.isLoggable(Level.WARNING)) {
497: logger.log(Level.WARNING,
498: "this client has not been connected. please call ZKClient#connect() method before calling this getData()");
499: }
500: return null;
501: }
502: try {
503: return retryUntilConnected(new Callable<byte[]>() {
504: @Override
505: public byte[] call() throws Exception {
506: return zooKeeper.getData(path, new InternalWatcher(watcher), null);
507: }
508: });
509: } catch (Exception e) {
510: if (logger.isLoggable(Level.SEVERE)) {
511: logger.log(Level.SEVERE, "failed to do \"getData\". path=" + path, e);
512: }
513: return null;
514: }
515: }
516:
517: public Stat setData(final String path, final byte[] data, final int version) {
518: if (zooKeeper == null) {
519: if (logger.isLoggable(Level.WARNING)) {
520: logger.log(Level.WARNING,
521: "this client has not been connected. please call ZKClient#connect() method before calling this setData()");
522: }
523: return null;
524: }
525: try {
526: return retryUntilConnected(new Callable<Stat>() {
527: @Override
528: public Stat call() throws Exception {
529: return zooKeeper.setData(path, data, version);
530: }
531: });
532: } catch (Exception e) {
533: if (logger.isLoggable(Level.SEVERE)) {
534: logger.log(Level.SEVERE,
535: "failed to do \"setData\". path=" + path + ", data=" + Arrays.toString(data) + ", version=" + version, e);
536: }
537: return null;
538: }
539: }
540:
541: public boolean delete(final String path, final int version) {
542: if (zooKeeper == null) {
543: if (logger.isLoggable(Level.WARNING)) {
544: logger.log(Level.WARNING,
545: "this client has not been connected. please call ZKClient#connect() method before calling this delete()");
546: }
547: return false;
548: }
549: try {
550: retryUntilConnected(new Callable<Boolean>() {
551: @Override
552: public Boolean call() throws Exception {
553: zooKeeper.delete(path, version);
554: return true;
555: }
556: });
557: } catch (Exception e) {
558: if (logger.isLoggable(Level.SEVERE)) {
559: logger.log(Level.SEVERE, "failed to do \"delete\". path=" + path + ", version=" + version, e);
560: }
561: return false;
562: }
563: return false;
564: }
565:
566: private <T> T retryUntilConnected(final Callable<T> callable) throws Exception {
567: for (int i = 0; i < RETRY_COUNT_UNTIL_CONNECTED; i++) {
568: try {
569: return callable.call();
570: } catch (KeeperException.ConnectionLossException cle) {
571: if (logger.isLoggable(Level.INFO)) {
572: logger.log(Level.INFO, "the callable will be retried because of ConnectionLossException");
573: } else if (logger.isLoggable(Level.FINE)) {
574: logger.log(Level.FINE, "the callable will be retried because of ConnectionLossException", cle);
575: }
576: reconnect();
577: } catch (KeeperException.SessionExpiredException see) {
578: if (logger.isLoggable(Level.INFO)) {
579: logger.log(Level.INFO, "the callable will be retried because of SessionExpiredException");
580: } else if (logger.isLoggable(Level.FINE)) {
581: logger.log(Level.FINE, "the callable will be retried because of SessionExpiredException", see);
582: }
583: reconnect();
584: }
585: }
586: if (logger.isLoggable(Level.SEVERE)) {
587: logger.log(Level.SEVERE, "failed to retry. retryCount={0}", RETRY_COUNT_UNTIL_CONNECTED);
588: }
589: return null;
590: }
591:
592: // should be guided by the lock
593: private boolean ensureConnected(final long timeoutInMillis) {
594: final Date timeoutDate;
595: if (timeoutInMillis < 0) {
596: timeoutDate = null;
597: } else {
598: timeoutDate = new Date(System.currentTimeMillis() + timeoutInMillis);
599: }
600: boolean stillWaiting = true;
601: while (currentState != Watcher.Event.KeeperState.SyncConnected) {
602: if (!stillWaiting) {
603: return false;
604: }
605: try {
606: if (timeoutDate == null) {
607: lockCondition.await();
608: } else {
609: stillWaiting = lockCondition.awaitUntil(timeoutDate);
610: }
611: } catch (InterruptedException ie) {
612: Thread.currentThread().interrupt();
613: return false;
614: }
615: }
616: return true;
617: }
618:
619: /**
620: * Internal watcher wrapper for tracking state and reconnecting
621: */
622: private class InternalWatcher implements Watcher {
623:
624: private final Watcher inner;
625:
626: private InternalWatcher(final Watcher inner) {
627: this.inner = inner;
628: }
629:
630: @Override
631: public void process(final WatchedEvent event) {
632: if (event != null && logger.isLoggable(Level.FINER)) {
633: logger.log(Level.FINER, "received event. eventState={0}, eventType={1}, eventPath={2}, watcher={3}",
634: new Object[] { event.getState(), event.getType(), event.getPath(), this });
635: }
636: if (!running.get()) {
637: if (event != null && logger.isLoggable(Level.INFO)) {
638: logger.log(Level.INFO,
639: "this event will be ignored because this client is shutting down or already has shutted down. name={0}, eventState={1}, eventType={2}, eventPath={3}, watcher={4}",
640: new Object[] { name, event.getState(), event.getType(), event.getPath(), this });
641: }
642: return;
643: }
644: if (processStateChanged(event)) {
645: return;
646: }
647: if (inner != null) {
648: inner.process(event);
649: }
650: }
651:
652: @Override
653: public String toString() {
654: return "InternalWatcher{" + "inner=" + inner + '}';
655: }
656: }
657:
658: /**
659: * Watcher implementation for a region
660: */
661: private class RegionWatcher implements Watcher {
662:
663: private final String regionName;
664: private final List<String> aliveNodesExceptMyself = new ArrayList<String>();
665: private final Set<String> toBeCompleted = new HashSet<String>();
666: private final Lock regionLock = new ReentrantLock();
667: private volatile boolean isSynchronizing = false;
668:
669: private byte[] remoteDataBytes = null;
670: private Stat remoteDataStat = null;
671: private ScheduledFuture rollbackFuture = null;
672:
673: private RegionWatcher(final String regionName) {
674: this.regionName = regionName;
675: }
676:
677: @Override
678: public void process(final WatchedEvent event) {
679:• if (event == null) {
680: return;
681: }
682: // check if current region is already unregistered
683:• if (listenerMap.get(regionName) == null) {
684:• if (logger.isLoggable(Level.INFO)) {
685: logger.log(Level.INFO,
686: "this event will be ignored because this region already has unregistered. name={0}, regionName={1}, eventState={2}, eventType={3}, eventPath={4}, watcher={5}",
687: new Object[] { name, regionName, event.getState(), event.getType(), event.getPath(), this });
688: }
689: return;
690: }
691: final Event.KeeperState eventState = event.getState();
692: final String eventPath = event.getPath();
693: final Watcher.Event.EventType eventType = event.getType();
694:
695: final String currentRegionPath = basePath + normalizePath(regionName);
696: final String currentNodesPath = currentRegionPath + CURRENT_PATH;
697: final String currentParticipantPath = currentRegionPath + PARTICIPANTS_PATH;
698: final String currentDataPath = currentRegionPath + DATA_PATH;
699:• if ((eventType == Event.EventType.NodeDataChanged || eventType == Event.EventType.NodeCreated)
700:• && currentDataPath.equals(eventPath)) { // data changed
701:• if (logger.isLoggable(Level.INFO)) {
702: logger.log(Level.INFO,
703: "the central data has been changed in the remote zookeeper server. name={0}, regionName={1}, eventType={2}, eventPath={3}",
704: new Object[] { name, regionName, eventType, eventPath });
705: }
706: final byte[] currentDataBytes;
707: final Stat currentDataStat = new Stat();
708: // we should watch nodes' changes(watch1) while syncronizing nodes
709: final List<String> currentNodes = getChildren(currentNodesPath, this);
710: // get and store the remote changes at the preparing phase
711: currentDataBytes = getData(currentDataPath, false, currentDataStat);
712:• if (currentDataBytes == null) {
713:• if (logger.isLoggable(Level.WARNING)) {
714: logger.log(Level.WARNING,
715: "failed to get the remote changes. name={0}, regionName={1}, eventType={2}, eventPath={3}",
716: new Object[] { name, regionName, eventType, eventPath });
717: }
718: }
719: final String myParticipantPath = currentParticipantPath + uniqueIdPath;
720: regionLock.lock();
721: try {
722: isSynchronizing = true;
723: aliveNodesExceptMyself.clear();
724: toBeCompleted.clear();
725: aliveNodesExceptMyself.addAll(currentNodes);
726: // remove own node
727: aliveNodesExceptMyself.remove(uniqueId);
728:• for (final String node : currentNodes) {
729: final String participant = currentParticipantPath + "/" + node;
730: // we should watch the creation or deletion event(watch2)
731:• if (exists(participant, this) == null) {
732: toBeCompleted.add(participant);
733: } else {
734: toBeCompleted.remove(participant);
735: }
736: }
737: remoteDataBytes = currentDataBytes;
738: remoteDataStat = currentDataStat;
739:• if (currentDataBytes != null && exists(myParticipantPath, false) == null
740:• && create(myParticipantPath, NO_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) == null) {
741:• if (logger.isLoggable(Level.WARNING)) {
742: logger.log(Level.WARNING, "failed to create myParticipantPath. path={0}", myParticipantPath);
743: }
744: }
745: // prepared to roll back
746: final Long scheduled = currentDataStat.getMtime() + TimeUnit.SECONDS.toMillis(commitDelayTimeInSecs * 2);
747: final long remaining = scheduled - System.currentTimeMillis();
748: rollbackFuture = scheduledExecutor.schedule(new Runnable() {
749: @Override
750: public void run() {
751: regionLock.lock();
752: try {
753: if (logger.isLoggable(Level.WARNING)) {
754: final Long expected = currentDataStat.getMtime() + TimeUnit.SECONDS.toMillis(commitDelayTimeInSecs);
755: final Date expectedDate = new Date(expected);
756: logger.log(Level.WARNING,
757: "commit's schedule has been timed out so synchronization will be rolled back. name={0}, regionName={1}, expectedDate={2}, toBeComplete={3}",
758: new Object[] { name, regionName, expectedDate, toBeCompleted });
759: }
760: clearSynchronization();
761: // delete own barrier path
762: if (!delete(myParticipantPath, -1)) {
763: if (logger.isLoggable(Level.FINE)) {
764: logger.log(Level.FINE,
765: "there is no the participant path to be deleted in rolling back because it may already has been closed. name={0}, regionName={1}, path={2}",
766: new Object[] { name, regionName, myParticipantPath });
767: }
768: }
769: } finally {
770: regionLock.unlock();
771: }
772: }
773: }, remaining, TimeUnit.MILLISECONDS);
774: } finally {
775: regionLock.unlock();
776: }
777: // register the watcher for detecting next data's changes again
778: exists(currentDataPath, this);
779:• } else if (isSynchronizing && eventType == Event.EventType.NodeDeleted && currentDataPath.equals(eventPath)) { // data deleted
780: regionLock.lock();
781: try {
782:• if (isSynchronizing) {
783:• if (!toBeCompleted.isEmpty()) {
784:• if (logger.isLoggable(Level.WARNING)) {
785: logger.log(Level.WARNING,
786: "the central data deleted in the remote zookeeper server while preparing to synchronize the data. name={0}, regionName={1}, eventPath={2}, toBeCompleted={3}",
787: new Object[] { name, regionName, eventPath, toBeCompleted });
788: }
789: }
790: clearSynchronization();
791: }
792: } finally {
793: regionLock.unlock();
794: }
795: // register the watcher for detecting next data's changes again
796: exists(currentDataPath, this);
797:• } else if (isSynchronizing
798: && (eventType == Watcher.Event.EventType.NodeCreated || eventType == Watcher.Event.EventType.NodeDeleted)
799:• && eventPath != null && eventPath.startsWith(currentParticipantPath)) { // a participant joined from (watch2)
800: regionLock.lock();
801: try {
802:• if (isSynchronizing) {
803: toBeCompleted.remove(eventPath);
804:• if (toBeCompleted.isEmpty()) {
805: scheduleCommit(event, currentDataPath, currentParticipantPath, remoteDataBytes, remoteDataStat);
806: clearSynchronization();
807: }
808: }
809: } finally {
810: regionLock.unlock();
811: }
812:• } else if (isSynchronizing && eventType == Event.EventType.NodeChildrenChanged && currentNodesPath.equals(eventPath)) { // nodes
813: // changed
814: // from
815: // (watch1)
816:• if (logger.isLoggable(Level.INFO)) {
817: logger.log(Level.INFO,
818: "some clients are failed or added while preparing to synchronize the data. name={0}, regionName={1}, eventPath={2}",
819: new Object[] { name, regionName, eventPath });
820: }
821: // we should watch nodes' changes again(watch1)
822: final List<String> currentNodes = getChildren(currentNodesPath, this);
823: regionLock.lock();
824: try {
825:• if (isSynchronizing) {
826: // remove own node
827: currentNodes.remove(uniqueId);
828: final List<String> failureNodes = new ArrayList<String>(aliveNodesExceptMyself);
829: failureNodes.removeAll(currentNodes);
830:• if (!failureNodes.isEmpty()) {
831:• if (logger.isLoggable(Level.WARNING)) {
832: logger.log(Level.WARNING,
833: "some clients are failed while preparing to synchronize the data. name={0}, regionName={1}, eventPath={2}, failureNodes={3}",
834: new Object[] { name, regionName, eventPath, failureNodes });
835: }
836:• for (final String node : failureNodes) {
837: final String participant = currentParticipantPath + "/" + node;
838: toBeCompleted.remove(participant);
839: }
840:• if (toBeCompleted.isEmpty()) {
841: scheduleCommit(event, currentDataPath, currentParticipantPath, remoteDataBytes, remoteDataStat);
842: clearSynchronization();
843: }
844: }
845: }
846: } finally {
847: regionLock.unlock();
848: }
849: } else {
850:• if (logger.isLoggable(Level.FINE)) {
851: logger.log(Level.FINE,
852: "not interested. name={0}, regionName={1}, eventState={2}, eventType={3}, eventPath={4}, watcher={5}",
853: new Object[] { name, regionName, eventState, eventType, eventPath, this });
854: }
855: }
856: }
857:
858: private void scheduleCommit(final WatchedEvent event, final String currentDataPath, final String currentParticipantPath,
859: final byte[] currentDataBytes, final Stat currentDataStat) {
860:• if (event == null || currentDataPath == null || currentDataBytes == null || currentDataStat == null) {
861: return;
862: }
863:• if (logger.isLoggable(Level.INFO)) {
864: logger.log(Level.INFO, "all clients are prepared. name={0}, regionName={1}, commitDelayTimeInSecs={2}",
865: new Object[] { name, regionName, commitDelayTimeInSecs });
866: }
867: // all nodes are prepared
868: final Long scheduled = currentDataStat.getMtime() + TimeUnit.SECONDS.toMillis(commitDelayTimeInSecs);
869: final long remaining = scheduled - System.currentTimeMillis();
870:• if (remaining < 0) {
871:• if (logger.isLoggable(Level.WARNING)) {
872: logger.log(Level.WARNING,
873: "commitDelayTimeInSecs may be too small. so we will commit immediately. name={0}, regionName={1}, scheduledTime=before {2}ms",
874: new Object[] { name, regionName, -remaining });
875: }
876: } else {
877: final Date scheduledDate = new Date(scheduled);
878:• if (logger.isLoggable(Level.INFO)) {
879: logger.log(Level.INFO,
880: "the changes of the central data will be applied. name={0}, regionName={1}, scheduledDate={2}, data={3}, dataStat={4}",
881: new Object[] { name, regionName, scheduledDate.toString(), currentDataBytes, currentDataStat });
882: }
883: }
884: scheduledExecutor.schedule(new Runnable() {
885: @Override
886: public void run() {
887: final BarrierListener listener = listenerMap.get(regionName);
888: if (listener == null) {
889: if (logger.isLoggable(Level.INFO)) {
890: logger.log(Level.INFO,
891: "this commit will be ignored because this region already has unregistered. eventState={0}, eventType={1}, eventPath={2}, watcher={3}",
892: new Object[] { event.getState(), event.getType(), event.getPath(), this });
893: }
894: return;
895: }
896: try {
897: if (logger.isLoggable(Level.FINE)) {
898: logger.log(Level.FINE, "name={0}, regionName={1}, scheduledTime={2}ms, commit time={3}ms",
899: new Object[] { name, regionName, scheduled, System.currentTimeMillis() });
900: }
901: listener.onCommit(regionName, currentDataPath, currentDataBytes);
902: if (logger.isLoggable(Level.INFO)) {
903: logger.log(Level.INFO, "committed successfully. name={0}, regionName={1}, listener={2}",
904: new Object[] { name, regionName, listener });
905: }
906: } catch (Exception e) {
907: if (logger.isLoggable(Level.WARNING)) {
908: logger.log(Level.WARNING,
909: "failed to onCommit. name=" + name + ", regionName=" + regionName + ", listener=" + listener, e);
910: }
911: }
912: // delete own barrier path
913: final String path = currentParticipantPath + uniqueIdPath;
914: if (!delete(path, -1)) {
915: if (logger.isLoggable(Level.FINE)) {
916: logger.log(Level.FINE,
917: "there is no the participant path to be deleted because it may already has been closed. name={0}, regionName={1}, path={2}",
918: new Object[] { name, regionName, path });
919: }
920: }
921: }
922: }, remaining, TimeUnit.MILLISECONDS);
923: }
924:
925: private void clearSynchronization() {
926:• if (!isSynchronizing) {
927: return;
928: }
929: isSynchronizing = false;
930: aliveNodesExceptMyself.clear();
931: toBeCompleted.clear();
932: remoteDataBytes = null;
933: remoteDataStat = null;
934:• if (rollbackFuture != null) {
935: rollbackFuture.cancel(false);
936: rollbackFuture = null;
937: }
938: }
939:
940: @Override
941: public String toString() {
942: return "RegionWatcher{" + "regionName='" + regionName + '\'' + '}';
943: }
944: }
945:
946: private boolean processStateChanged(final WatchedEvent event) {
947: if (event == null) {
948: throw new IllegalArgumentException("event must not be null");
949: }
950: final Watcher.Event.KeeperState eventState = event.getState();
951: final String eventPath = event.getPath();
952: final boolean isStateChangedEvent;
953: // state changed
954: if (eventPath == null) {
955: lock.lock();
956: try {
957: currentState = eventState;
958: lockCondition.signalAll();
959: } finally {
960: lock.unlock();
961: }
962: isStateChangedEvent = true;
963: } else {
964: isStateChangedEvent = false;
965: }
966: if (eventState == Watcher.Event.KeeperState.Expired) {
967: try {
968: reconnect();
969: } catch (Exception e) {
970: if (logger.isLoggable(Level.SEVERE)) {
971: logger.log(Level.SEVERE, "failed to reconnect the zookeeper server", e);
972: }
973: }
974: }
975: return isStateChangedEvent;
976: }
977:
978: @Override
979: public String toString() {
980: return "ZKClient{" + "connected=" + connected + ", running=" + running + ", currentState=" + currentState + ", listenerMap="
981: + listenerMap + ", name='" + name + '\'' + ", uniqueId='" + uniqueId + '\'' + ", uniqueIdPath='" + uniqueIdPath + '\''
982: + ", rootPath='" + rootPath + '\'' + ", basePath='" + basePath + '\'' + ", zooKeeperServerList='" + zooKeeperServerList
983: + '\'' + ", connectTimeoutInMillis=" + connectTimeoutInMillis + ", sessionTimeoutInMillis=" + sessionTimeoutInMillis
984: + ", commitDelayTimeInSecs=" + commitDelayTimeInSecs + '}';
985: }
986:
987: /**
988: * Normalize the given path
989: *
990: * @param path path for the zookeeper
991: * @return normalized path
992: */
993: private static String normalizePath(final String path) {
994: if (path == null) {
995: return "/";
996: }
997: String temp = path.trim();
998: while (temp.length() > 1 && temp.endsWith("/")) {
999: temp = temp.substring(0, temp.length() - 1);
1000: }
1001: final StringBuilder builder = new StringBuilder(64);
1002: if (!temp.startsWith("/")) {
1003: builder.append('/');
1004: }
1005: builder.append(temp);
1006: return builder.toString();
1007: }
1008:
1009: /**
1010: * Builder for ZKClient
1011: */
1012: public static class Builder {
1013: private static final String DEFAULT_ROOT_PATH = "/";
1014: private static final long DEFAULT_CONNECT_TIMEOUT_IN_MILLIS = 5000; // 5secs
1015: private static final long DEFAULT_SESSION_TIMEOUT_IN_MILLIS = 30000; // 30secs
1016: private static final long DEFAULT_COMMIT_DELAY_TIME_IN_SECS = 60; // 60secs
1017:
1018: private final String name;
1019: private final String zooKeeperServerList;
1020:
1021: private String rootPath = DEFAULT_ROOT_PATH;
1022: private long connectTimeoutInMillis = DEFAULT_CONNECT_TIMEOUT_IN_MILLIS;
1023: private long sessionTimeoutInMillis = DEFAULT_SESSION_TIMEOUT_IN_MILLIS;
1024: private long commitDelayTimeInSecs = DEFAULT_COMMIT_DELAY_TIME_IN_SECS;
1025:
1026: /**
1027: * The specific name or Id for ZKClient
1028: *
1029: * @param name name or id
1030: * @param zooKeeperServerList comma separated host:port pairs, each
1031: * corresponding to a zookeeper server. e.g.
1032: * "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
1033: */
1034: public Builder(final String name, final String zooKeeperServerList) {
1035: this.name = name;
1036: this.zooKeeperServerList = zooKeeperServerList;
1037: }
1038:
1039: /**
1040: * Root path for ZKClient
1041: *
1042: * @param rootPath root path of the zookeeper. default is "/".
1043: * @return this builder
1044: */
1045: public Builder rootPath(final String rootPath) {
1046: this.rootPath = rootPath;
1047: return this;
1048: }
1049:
1050: /**
1051: * Connect timeout in milli-seconds
1052: *
1053: * @param connectTimeoutInMillis connect timeout. negative value means "never
1054: * timed out". default is 5000(5 secs).
1055: * @return this builder
1056: */
1057: public Builder connectTimeoutInMillis(final long connectTimeoutInMillis) {
1058: this.connectTimeoutInMillis = connectTimeoutInMillis;
1059: return this;
1060: }
1061:
1062: /**
1063: * Session timeout in milli-seconds
1064: *
1065: * @param sessionTimeoutInMillis Zookeeper connection's timeout. default is
1066: * 30000(30 secs).
1067: * @return this builder
1068: */
1069: public Builder sessionTimeoutInMillis(final long sessionTimeoutInMillis) {
1070: this.sessionTimeoutInMillis = sessionTimeoutInMillis;
1071: return this;
1072: }
1073:
1074: /**
1075: * Delay time in seconds for committing
1076: *
1077: * @param commitDelayTimeInSecs delay time before committing. default is
1078: * 60(60secs).
1079: * @return this builder
1080: */
1081: public Builder commitDelayTimeInSecs(final long commitDelayTimeInSecs) {
1082: this.commitDelayTimeInSecs = commitDelayTimeInSecs;
1083: return this;
1084: }
1085:
1086: /**
1087: * Build a ZKClient
1088: *
1089: * @return an instance of ZKClient
1090: */
1091: public ZKClient build() {
1092: return new ZKClient(this);
1093: }
1094: }
1095: }