Skip to content

Package: ZKClient$2

ZKClient$2

nameinstructionbranchcomplexitylinemethod
call()
M: 13 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
{...}
M: 18 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*
2: * Copyright (c) 2012, 2017 Oracle and/or its affiliates. All rights reserved.
3: *
4: * This program and the accompanying materials are made available under the
5: * terms of the Eclipse Public License v. 2.0, which is available at
6: * http://www.eclipse.org/legal/epl-2.0.
7: *
8: * This Source Code may also be made available under the following Secondary
9: * Licenses when the conditions for such availability set forth in the
10: * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
11: * version 2 with the GNU Classpath Exception, which is available at
12: * https://www.gnu.org/software/classpath/license.html.
13: *
14: * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
15: */
16:
17: package org.glassfish.grizzly.memcached.zookeeper;
18:
19: import org.apache.zookeeper.CreateMode;
20: import org.apache.zookeeper.KeeperException;
21: import org.apache.zookeeper.WatchedEvent;
22: import org.apache.zookeeper.Watcher;
23: import org.apache.zookeeper.ZooDefs;
24: import org.apache.zookeeper.ZooKeeper;
25: import org.apache.zookeeper.data.ACL;
26: import org.apache.zookeeper.data.Stat;
27: import org.glassfish.grizzly.Grizzly;
28:
29: import java.io.IOException;
30: import java.lang.management.ManagementFactory;
31: import java.util.ArrayList;
32: import java.util.Arrays;
33: import java.util.Date;
34: import java.util.HashSet;
35: import java.util.List;
36: import java.util.Map;
37: import java.util.Set;
38: import java.util.concurrent.Callable;
39: import java.util.concurrent.ConcurrentHashMap;
40: import java.util.concurrent.Executors;
41: import java.util.concurrent.ScheduledExecutorService;
42: import java.util.concurrent.ScheduledFuture;
43: import java.util.concurrent.TimeUnit;
44: import java.util.concurrent.atomic.AtomicBoolean;
45: import java.util.concurrent.locks.Condition;
46: import java.util.concurrent.locks.Lock;
47: import java.util.concurrent.locks.ReentrantLock;
48: import java.util.logging.Level;
49: import java.util.logging.Logger;
50:
51: /**
52: * Zookeeper client implementation for barrier and recoverable operation
53: * <p>
54: * All operations will be executed on the valid connection because the failed connection will be reconnected automatically.
55: * This has Barrier function.
56: * {@link BarrierListener} can be registered with a specific region and initial data by the {@link #registerBarrier} method and unregistered by the {@link #unregisterBarrier} method.
57: * If the zookeeper server doesn't have the data node, the given initial data will be set in the server when the {@link #registerBarrier} method is called.
58: * If the specific data will be changed in remote zookeeper server, all clients which have joined will receive changes simultaneously.
59: * If all clients receive changes successfully,
60: * {@link BarrierListener#onCommit} will be called simultaneously at the scheduled time(data modification time + {@code commitDelayTimeInSecs}).
61: * <p>
62: * This also supports some safe APIs which is similar to original {@link ZooKeeper}'s APIs
63: * like create, delete, exists, getChildren, getData and setData.
64: * <p>
65: * Examples of barrier's use:
66: * {@code
67: * // initial
68: * <p>
69: * final ZKClient.Builder builder = new ZKClient.Builder("myZookeeperClient", "localhost:2181");
70: * builder.rootPath(ROOT).connectTimeoutInMillis(3000).sessionTimeoutInMillis(30000).commitDelayTimeInSecs(60);
71: * final ZKClient zkClient = builder.build();
72: * zkClient.connect()
73: * final String registeredPath = zkClient.registerBarrier( "user", myListener, initData );
74: * // ...
75: * // cleanup
76: * zkClient.unregisterBarrier( "user" );
77: * zkClient.shutdown();
78: * }
79: * <p>
80: * [NOTE]
81: * Zookeeper already guides some simple barrier examples:
82: * http://zookeeper.apache.org/doc/r3.3.4/zookeeperTutorial.html
83: * http://code.google.com/p/good-samples/source/browse/trunk/zookeeper-3.x/src/main/java/com/googlecode/goodsamples/zookeeper/barrier/Barrier.java
84: * <p>
85: * But, their examples have a race condision issue:
86: * https://issues.apache.org/jira/browse/ZOOKEEPER-1011
87: *
88: * @author Bongjae Chang
89: */
90: public class ZKClient {
91:
92: private static final Logger logger = Grizzly.logger(ZKClient.class);
93:
94: private static final String JVM_AND_HOST_UNIQUE_ID = ManagementFactory.getRuntimeMXBean().getName();
95: private static final int RETRY_COUNT_UNTIL_CONNECTED = 3;
96: /**
97: * Path information:
98: * /root/barrier/region_name/current/(client1, client2, ...)
99: * /root/barrier/region_name/data
100: * /root/barrier/region_name/participants/(client1, client2, ...)
101: */
102: private static final String BASE_PATH = "/barrier";
103: private static final String CURRENT_PATH = "/current";
104: private static final String DATA_PATH = "/data";
105: private static final String PARTICIPANTS_PATH = "/participants";
106: private static final byte[] NO_DATA = new byte[0];
107:
108: private final Lock lock = new ReentrantLock();
109: private final Condition lockCondition = lock.newCondition();
110: private final AtomicBoolean reconnectingFlag = new AtomicBoolean(false);
111: private boolean connected;
112: private Watcher.Event.KeeperState currentState;
113: private AtomicBoolean running = new AtomicBoolean(true);
114: private final Map<String, BarrierListener> listenerMap = new ConcurrentHashMap<>();
115: private final ScheduledExecutorService scheduledExecutor;
116: private ZooKeeper zooKeeper;
117:
118: private final String uniqueId;
119: private final String uniqueIdPath;
120: private final String basePath;
121:
122: private final String name;
123: private final String zooKeeperServerList;
124: private final long connectTimeoutInMillis;
125: private final long sessionTimeoutInMillis;
126: private final String rootPath;
127: private final long commitDelayTimeInSecs;
128:
129: private ZKClient(final Builder builder) {
130: this.name = builder.name;
131: this.uniqueId = JVM_AND_HOST_UNIQUE_ID + "_" + this.name;
132: this.uniqueIdPath = normalizePath(this.uniqueId);
133: this.rootPath = normalizePath(builder.rootPath);
134: this.basePath = this.rootPath + BASE_PATH;
135:
136: this.zooKeeperServerList = builder.zooKeeperServerList;
137: this.connectTimeoutInMillis = builder.connectTimeoutInMillis;
138: this.sessionTimeoutInMillis = builder.sessionTimeoutInMillis;
139: this.commitDelayTimeInSecs = builder.commitDelayTimeInSecs;
140:
141: this.scheduledExecutor = Executors.newScheduledThreadPool(7);
142: }
143:
144: /**
145: * Connect this client to the zookeeper server
146: * <p>
147: * this method will wait for {@link Watcher.Event.KeeperState#SyncConnected} from the zookeeper server.
148: *
149: * @throws IOException the io exception of internal ZooKeeper
150: * @throws InterruptedException the interrupted exception of internal ZooKeeper
151: * @return true if the client is connected. Otherwise false
152: */
153: public boolean connect() throws IOException, InterruptedException {
154: lock.lock();
155: try {
156: if (connected) {
157: return true;
158: }
159: zooKeeper = new ZooKeeper(zooKeeperServerList,
160: (int) sessionTimeoutInMillis,
161: new InternalWatcher(new Watcher() {
162:
163: @Override
164: public void process(WatchedEvent event) {
165: }
166: }));
167: if (!ensureConnected(connectTimeoutInMillis)) {
168: zooKeeper.close();
169: currentState = Watcher.Event.KeeperState.Disconnected;
170: connected = false;
171: } else {
172: connected = true;
173: if (logger.isLoggable(Level.FINE)) {
174: logger.log(Level.FINE, "connected the zookeeper server successfully");
175: }
176: }
177: return connected;
178: } finally {
179: lock.unlock();
180: }
181: }
182:
183: private void close() {
184: lock.lock();
185: try {
186: if (!connected) {
187: return;
188: }
189: if (zooKeeper != null) {
190: try {
191: zooKeeper.close();
192: } catch (InterruptedException ignore) {
193: }
194: }
195: currentState = Watcher.Event.KeeperState.Disconnected;
196: connected = false;
197: } finally {
198: lock.unlock();
199: }
200: if (logger.isLoggable(Level.FINE)) {
201: logger.log(Level.FINE, "closed successfully");
202: }
203: }
204:
205: private void reconnect() throws IOException, InterruptedException {
206: if (logger.isLoggable(Level.FINE)) {
207: logger.log(Level.FINE, "trying to reconnect the zookeeper server");
208: }
209: final boolean localReconnectingFlag = reconnectingFlag.get();
210: lock.lock();
211: try {
212: if (!reconnectingFlag.compareAndSet(localReconnectingFlag, !localReconnectingFlag)) {
213: // prevent duplicated trials
214: return;
215: }
216: close();
217: if (connect()) {
218: // register ephemeral node and watcher again
219: for (final String regionName : listenerMap.keySet()) {
220: registerEphemeralNodeAndWatcher(regionName);
221: }
222: }
223: } finally {
224: lock.unlock();
225: }
226: if (logger.isLoggable(Level.INFO)) {
227: logger.log(Level.INFO, "reconnected the zookeeper server successfully");
228: }
229: }
230:
231: /**
232: * Close this client
233: */
234: public void shutdown() {
235: if (!running.compareAndSet(true, false)) {
236: if (logger.isLoggable(Level.FINE)) {
237: logger.log(Level.FINE, "shutting down or already shutted down");
238: }
239: return;
240: }
241: listenerMap.clear();
242: close();
243: if (scheduledExecutor != null) {
244: scheduledExecutor.shutdown();
245: }
246: if (logger.isLoggable(Level.FINE)) {
247: logger.log(Level.FINE, "shutted down successfully");
248: }
249: }
250:
251: /**
252: * Register the specific barrier
253: *
254: * @param regionName specific region name
255: * @param listener {@link BarrierListener} implementations
256: * @param initialData initial data. if the zookeeper server doesn't have any data, this will be set. "null" means {@code NO_DATA} which is byte[0].
257: * @return the registered data path of the zookeeper server
258: */
259: public String registerBarrier(final String regionName, final BarrierListener listener, final byte[] initialData) {
260: if (regionName == null) {
261: throw new IllegalArgumentException("region name must not be null");
262: }
263: if (listener == null) {
264: throw new IllegalArgumentException("listener must not be null");
265: }
266: listenerMap.put(regionName, listener);
267:
268: // ensure all paths exist
269: createWhenThereIsNoNode(rootPath, NO_DATA, CreateMode.PERSISTENT); // ensure root path
270: createWhenThereIsNoNode(basePath, NO_DATA, CreateMode.PERSISTENT); // ensure base path
271: final String currentRegionPath = basePath + normalizePath(regionName);
272: createWhenThereIsNoNode(currentRegionPath, NO_DATA, CreateMode.PERSISTENT); // ensure my region path
273: createWhenThereIsNoNode(currentRegionPath + CURRENT_PATH, NO_DATA, CreateMode.PERSISTENT); // ensure nodes path
274: createWhenThereIsNoNode(currentRegionPath + PARTICIPANTS_PATH, NO_DATA, CreateMode.PERSISTENT); // ensure participants path
275:
276: final String currentDataPath = currentRegionPath + DATA_PATH;
277: final boolean dataCreated = createWhenThereIsNoNode(currentDataPath,
278: initialData == null ? NO_DATA : initialData,
279: CreateMode.PERSISTENT); // ensure data path
280: if (!dataCreated) { // if the remote data already exists
281: if (logger.isLoggable(Level.INFO)) {
282: logger.log(Level.INFO, "the central data exists in the zookeeper server");
283: }
284: final byte[] remoteDataBytes = getData(currentDataPath, false, null);
285: try {
286: listener.onInit(regionName, currentDataPath, remoteDataBytes);
287: } catch (Exception e) {
288: if (logger.isLoggable(Level.WARNING)) {
289: logger.log(Level.WARNING, "failed to onInit. name=" + name + ", regionName=" + regionName + ", listener=" + listener, e);
290: }
291: }
292: } else {
293: if (logger.isLoggable(Level.INFO)) {
294: logger.log(Level.INFO,
295: "initial data was set because there was no remote data in the zookeeper server. initialData={0}",
296: initialData);
297: }
298: try {
299: listener.onInit(regionName, currentDataPath, null);
300: } catch (Exception e) {
301: if (logger.isLoggable(Level.WARNING)) {
302: logger.log(Level.WARNING, "failed to onInit. name=" + name + ", regionName=" + regionName + ", listener=" + listener, e);
303: }
304: }
305: }
306: registerEphemeralNodeAndWatcher(regionName);
307: if (logger.isLoggable(Level.FINE)) {
308: logger.log(Level.FINE, "the path \"{0}\" will be watched. name={1}, regionName={2}", new Object[]{name, currentDataPath, regionName});
309: }
310: return currentDataPath;
311: }
312:
313: private void registerEphemeralNodeAndWatcher(final String regionName) {
314: if (regionName == null) {
315: return;
316: }
317: final String currentRegionPath = basePath + normalizePath(regionName);
318: final String currentDataPath = currentRegionPath + DATA_PATH;
319: createWhenThereIsNoNode(currentRegionPath + CURRENT_PATH + uniqueIdPath, NO_DATA, CreateMode.EPHEMERAL); // register own node path
320: // register the watcher for detecting the data's changes
321: exists(currentDataPath, new RegionWatcher(regionName));
322: }
323:
324: private boolean createWhenThereIsNoNode(final String path, final byte[] data, final CreateMode createMode) {
325: if (exists(path, false) != null) {
326: return false;
327: }
328: create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
329: return true;
330: }
331:
332: /**
333: * Unregister the listener which was registered by {@link #registerBarrier}
334: *
335: * @param regionName specific region name
336: */
337: public void unregisterBarrier(final String regionName) {
338: if (regionName == null) {
339: return;
340: }
341: final BarrierListener listener = listenerMap.remove(regionName);
342: if (listener != null) {
343: try {
344: listener.onDestroy(regionName);
345: } catch (Exception e) {
346: if (logger.isLoggable(Level.WARNING)) {
347: logger.log(Level.WARNING, "failed to onDestroy. name=" + name + ", regionName=" + regionName + ", listener=" + listener, e);
348: }
349: }
350: }
351: }
352:
353: public String create(final String path, final byte[] data, final List<ACL> acl, final CreateMode createMode) {
354: if (zooKeeper == null) {
355: if (logger.isLoggable(Level.WARNING)) {
356: logger.log(Level.WARNING, "this client has not been connected. please call ZKClient#connect() method before calling this create()");
357: }
358: return null;
359: }
360: try {
361: return retryUntilConnected(new Callable<String>() {
362: @Override
363: public String call() throws Exception {
364: return zooKeeper.create(path, data, acl, createMode);
365: }
366: });
367: } catch (Exception e) {
368: if (logger.isLoggable(Level.SEVERE)) {
369: logger.log(Level.SEVERE, "failed to do \"create\". path=" + path + ", data=" + Arrays.toString(data), e);
370: }
371: return null;
372: }
373: }
374:
375: public Stat exists(final String path, final boolean watch) {
376: if (zooKeeper == null) {
377: if (logger.isLoggable(Level.WARNING)) {
378: logger.log(Level.WARNING, "this client has not been connected. please call ZKClient#connect() method before calling this exists()");
379: }
380: return null;
381: }
382: try {
383: return retryUntilConnected(new Callable<Stat>() {
384: @Override
385: public Stat call() throws Exception {
386: return zooKeeper.exists(path, watch);
387: }
388: });
389: } catch (Exception e) {
390: if (logger.isLoggable(Level.SEVERE)) {
391: logger.log(Level.SEVERE, "failed to do \"exists\". path=" + path, e);
392: }
393: return null;
394: }
395: }
396:
397: public Stat exists(final String path, final Watcher watch) {
398: if (zooKeeper == null) {
399: if (logger.isLoggable(Level.WARNING)) {
400: logger.log(Level.WARNING, "this client has not been connected. please call ZKClient#connect() method before calling this exists()");
401: }
402: return null;
403: }
404: try {
405: return retryUntilConnected(new Callable<Stat>() {
406: @Override
407: public Stat call() throws Exception {
408: return zooKeeper.exists(path, new InternalWatcher(watch));
409: }
410: });
411: } catch (Exception e) {
412: if (logger.isLoggable(Level.SEVERE)) {
413: logger.log(Level.SEVERE, "failed to do \"exists\". path=" + path, e);
414: }
415: return null;
416: }
417: }
418:
419: public List<String> getChildren(final String path, final boolean watch) {
420: if (zooKeeper == null) {
421: if (logger.isLoggable(Level.WARNING)) {
422: logger.log(Level.WARNING, "this client has not been connected. please call ZKClient#connect() method before calling this getChildren()");
423: }
424: return null;
425: }
426: try {
427: return retryUntilConnected(new Callable<List<String>>() {
428: @Override
429: public List<String> call() throws Exception {
430: return zooKeeper.getChildren(path, watch);
431: }
432: });
433: } catch (Exception e) {
434: if (logger.isLoggable(Level.SEVERE)) {
435: logger.log(Level.SEVERE, "failed to do \"getChildren\". path=" + path, e);
436: }
437: return null;
438: }
439: }
440:
441: public List<String> getChildren(final String path, final Watcher watcher) {
442: if (zooKeeper == null) {
443: if (logger.isLoggable(Level.WARNING)) {
444: logger.log(Level.WARNING, "this client has not been connected. please call ZKClient#connect() method before calling this getChildren()");
445: }
446: return null;
447: }
448: try {
449: return retryUntilConnected(new Callable<List<String>>() {
450: @Override
451: public List<String> call() throws Exception {
452: return zooKeeper.getChildren(path, new InternalWatcher(watcher));
453: }
454: });
455: } catch (Exception e) {
456: if (logger.isLoggable(Level.SEVERE)) {
457: logger.log(Level.SEVERE, "failed to do \"getChildren\". path=" + path, e);
458: }
459: return null;
460: }
461: }
462:
463: public byte[] getData(final String path, final boolean watch, final Stat stat) {
464: if (zooKeeper == null) {
465: if (logger.isLoggable(Level.WARNING)) {
466: logger.log(Level.WARNING, "this client has not been connected. please call ZKClient#connect() method before calling this getData()");
467: }
468: return null;
469: }
470: try {
471: return retryUntilConnected(new Callable<byte[]>() {
472: @Override
473: public byte[] call() throws Exception {
474: return zooKeeper.getData(path, watch, stat);
475: }
476: });
477: } catch (Exception e) {
478: if (logger.isLoggable(Level.SEVERE)) {
479: logger.log(Level.SEVERE, "failed to do \"getData\". path=" + path, e);
480: }
481: return null;
482: }
483: }
484:
485: public byte[] getData(final String path, final Watcher watcher) {
486: if (zooKeeper == null) {
487: if (logger.isLoggable(Level.WARNING)) {
488: logger.log(Level.WARNING, "this client has not been connected. please call ZKClient#connect() method before calling this getData()");
489: }
490: return null;
491: }
492: try {
493: return retryUntilConnected(new Callable<byte[]>() {
494: @Override
495: public byte[] call() throws Exception {
496: return zooKeeper.getData(path, new InternalWatcher(watcher), null);
497: }
498: });
499: } catch (Exception e) {
500: if (logger.isLoggable(Level.SEVERE)) {
501: logger.log(Level.SEVERE, "failed to do \"getData\". path=" + path, e);
502: }
503: return null;
504: }
505: }
506:
507: public Stat setData(final String path, final byte[] data, final int version) {
508: if (zooKeeper == null) {
509: if (logger.isLoggable(Level.WARNING)) {
510: logger.log(Level.WARNING, "this client has not been connected. please call ZKClient#connect() method before calling this setData()");
511: }
512: return null;
513: }
514: try {
515: return retryUntilConnected(new Callable<Stat>() {
516: @Override
517: public Stat call() throws Exception {
518: return zooKeeper.setData(path, data, version);
519: }
520: });
521: } catch (Exception e) {
522: if (logger.isLoggable(Level.SEVERE)) {
523: logger.log(Level.SEVERE, "failed to do \"setData\". path=" + path + ", data=" + Arrays.toString(data) + ", version=" + version, e);
524: }
525: return null;
526: }
527: }
528:
529: public boolean delete(final String path, final int version) {
530: if (zooKeeper == null) {
531: if (logger.isLoggable(Level.WARNING)) {
532: logger.log(Level.WARNING, "this client has not been connected. please call ZKClient#connect() method before calling this delete()");
533: }
534: return false;
535: }
536: try {
537: retryUntilConnected(new Callable<Boolean>() {
538: @Override
539: public Boolean call() throws Exception {
540: zooKeeper.delete(path, version);
541: return true;
542: }
543: });
544: } catch (Exception e) {
545: if (logger.isLoggable(Level.SEVERE)) {
546: logger.log(Level.SEVERE, "failed to do \"delete\". path=" + path + ", version=" + version, e);
547: }
548: return false;
549: }
550: return false;
551: }
552:
553: private <T> T retryUntilConnected(final Callable<T> callable) throws Exception {
554: for (int i = 0; i < RETRY_COUNT_UNTIL_CONNECTED; i++) {
555: try {
556: return callable.call();
557: } catch (KeeperException.ConnectionLossException cle) {
558: if (logger.isLoggable(Level.INFO)) {
559: logger.log(Level.INFO, "the callable will be retried because of ConnectionLossException");
560: } else if (logger.isLoggable(Level.FINE)) {
561: logger.log(Level.FINE, "the callable will be retried because of ConnectionLossException", cle);
562: }
563: reconnect();
564: } catch (KeeperException.SessionExpiredException see) {
565: if (logger.isLoggable(Level.INFO)) {
566: logger.log(Level.INFO, "the callable will be retried because of SessionExpiredException");
567: } else if (logger.isLoggable(Level.FINE)) {
568: logger.log(Level.FINE, "the callable will be retried because of SessionExpiredException", see);
569: }
570: reconnect();
571: }
572: }
573: if (logger.isLoggable(Level.SEVERE)) {
574: logger.log(Level.SEVERE, "failed to retry. retryCount={0}", RETRY_COUNT_UNTIL_CONNECTED);
575: }
576: return null;
577: }
578:
579: // should be guided by the lock
580: private boolean ensureConnected(final long timeoutInMillis) {
581: final Date timeoutDate;
582: if (timeoutInMillis < 0) {
583: timeoutDate = null;
584: } else {
585: timeoutDate = new Date(System.currentTimeMillis() + timeoutInMillis);
586: }
587: boolean stillWaiting = true;
588: while (currentState != Watcher.Event.KeeperState.SyncConnected) {
589: if (!stillWaiting) {
590: return false;
591: }
592: try {
593: if (timeoutDate == null) {
594: lockCondition.await();
595: } else {
596: stillWaiting = lockCondition.awaitUntil(timeoutDate);
597: }
598: } catch (InterruptedException ie) {
599: Thread.currentThread().interrupt();
600: return false;
601: }
602: }
603: return true;
604: }
605:
606: /**
607: * Internal watcher wrapper for tracking state and reconnecting
608: */
609: private class InternalWatcher implements Watcher {
610:
611: private final Watcher inner;
612:
613: private InternalWatcher(final Watcher inner) {
614: this.inner = inner;
615: }
616:
617: @Override
618: public void process(final WatchedEvent event) {
619: if (event != null && logger.isLoggable(Level.FINER)) {
620: logger.log(Level.FINER,
621: "received event. eventState={0}, eventType={1}, eventPath={2}, watcher={3}",
622: new Object[]{event.getState(), event.getType(), event.getPath(), this});
623: }
624: if (!running.get()) {
625: if (event != null && logger.isLoggable(Level.INFO)) {
626: logger.log(Level.INFO,
627: "this event will be ignored because this client is shutting down or already has shutted down. name={0}, eventState={1}, eventType={2}, eventPath={3}, watcher={4}",
628: new Object[]{name, event.getState(), event.getType(), event.getPath(), this});
629: }
630: return;
631: }
632: if (processStateChanged(event)) {
633: return;
634: }
635: if (inner != null) {
636: inner.process(event);
637: }
638: }
639:
640: @Override
641: public String toString() {
642: return "InternalWatcher{" +
643: "inner=" + inner +
644: '}';
645: }
646: }
647:
648: /**
649: * Watcher implementation for a region
650: */
651: private class RegionWatcher implements Watcher {
652:
653: private final String regionName;
654: private final List<String> aliveNodesExceptMyself = new ArrayList<String>();
655: private final Set<String> toBeCompleted = new HashSet<String>();
656: private final Lock regionLock = new ReentrantLock();
657: private volatile boolean isSynchronizing = false;
658:
659: private byte[] remoteDataBytes = null;
660: private Stat remoteDataStat = null;
661: private ScheduledFuture rollbackFuture = null;
662:
663: private RegionWatcher(final String regionName) {
664: this.regionName = regionName;
665: }
666:
667: @Override
668: public void process(final WatchedEvent event) {
669: if (event == null) {
670: return;
671: }
672: // check if current region is already unregistered
673: if (listenerMap.get(regionName) == null) {
674: if (logger.isLoggable(Level.INFO)) {
675: logger.log(Level.INFO,
676: "this event will be ignored because this region already has unregistered. name={0}, regionName={1}, eventState={2}, eventType={3}, eventPath={4}, watcher={5}",
677: new Object[]{name, regionName, event.getState(), event.getType(), event.getPath(), this});
678: }
679: return;
680: }
681: final Event.KeeperState eventState = event.getState();
682: final String eventPath = event.getPath();
683: final Watcher.Event.EventType eventType = event.getType();
684:
685: final String currentRegionPath = basePath + normalizePath(regionName);
686: final String currentNodesPath = currentRegionPath + CURRENT_PATH;
687: final String currentParticipantPath = currentRegionPath + PARTICIPANTS_PATH;
688: final String currentDataPath = currentRegionPath + DATA_PATH;
689: if ((eventType == Event.EventType.NodeDataChanged || eventType == Event.EventType.NodeCreated) &&
690: currentDataPath.equals(eventPath)) { // data changed
691: if (logger.isLoggable(Level.INFO)) {
692: logger.log(Level.INFO,
693: "the central data has been changed in the remote zookeeper server. name={0}, regionName={1}, eventType={2}, eventPath={3}",
694: new Object[]{name, regionName, eventType, eventPath});
695: }
696: final byte[] currentDataBytes;
697: final Stat currentDataStat = new Stat();
698: // we should watch nodes' changes(watch1) while syncronizing nodes
699: final List<String> currentNodes = getChildren(currentNodesPath, this);
700: // get and store the remote changes at the preparing phase
701: currentDataBytes = getData(currentDataPath, false, currentDataStat);
702: if (currentDataBytes == null) {
703: if (logger.isLoggable(Level.WARNING)) {
704: logger.log(Level.WARNING,
705: "failed to get the remote changes. name={0}, regionName={1}, eventType={2}, eventPath={3}",
706: new Object[]{name, regionName, eventType, eventPath});
707: }
708: }
709: final String myParticipantPath = currentParticipantPath + uniqueIdPath;
710: regionLock.lock();
711: try {
712: isSynchronizing = true;
713: aliveNodesExceptMyself.clear();
714: toBeCompleted.clear();
715: aliveNodesExceptMyself.addAll(currentNodes);
716: // remove own node
717: aliveNodesExceptMyself.remove(uniqueId);
718: for (final String node : currentNodes) {
719: final String participant = currentParticipantPath + "/" + node;
720: // we should watch the creation or deletion event(watch2)
721: if (exists(participant, this) == null) {
722: toBeCompleted.add(participant);
723: } else {
724: toBeCompleted.remove(participant);
725: }
726: }
727: remoteDataBytes = currentDataBytes;
728: remoteDataStat = currentDataStat;
729: if (currentDataBytes != null && exists(myParticipantPath, false) == null &&
730: create(myParticipantPath, NO_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) == null) {
731: if (logger.isLoggable(Level.WARNING)) {
732: logger.log(Level.WARNING, "failed to create myParticipantPath. path={0}", myParticipantPath);
733: }
734: }
735: // prepared to roll back
736: final Long scheduled =
737: currentDataStat.getMtime() + TimeUnit.SECONDS.toMillis(commitDelayTimeInSecs * 2);
738: final long remaining = scheduled - System.currentTimeMillis();
739: rollbackFuture = scheduledExecutor.schedule(new Runnable() {
740: @Override
741: public void run() {
742: regionLock.lock();
743: try {
744: if (logger.isLoggable(Level.WARNING)) {
745: final Long expected = currentDataStat.getMtime() +
746: TimeUnit.SECONDS.toMillis(commitDelayTimeInSecs);
747: final Date expectedDate = new Date(expected);
748: logger.log(Level.WARNING,
749: "commit's schedule has been timed out so synchronization will be rolled back. name={0}, regionName={1}, expectedDate={2}, toBeComplete={3}",
750: new Object[]{name, regionName, expectedDate, toBeCompleted});
751: }
752: clearSynchronization();
753: // delete own barrier path
754: if (!delete(myParticipantPath, -1)) {
755: if (logger.isLoggable(Level.FINE)) {
756: logger.log(Level.FINE,
757: "there is no the participant path to be deleted in rolling back because it may already has been closed. name={0}, regionName={1}, path={2}",
758: new Object[]{name, regionName, myParticipantPath});
759: }
760: }
761: } finally {
762: regionLock.unlock();
763: }
764: }
765: }, remaining, TimeUnit.MILLISECONDS);
766: } finally {
767: regionLock.unlock();
768: }
769: // register the watcher for detecting next data's changes again
770: exists(currentDataPath, this);
771: } else if (isSynchronizing &&
772: eventType == Event.EventType.NodeDeleted &&
773: currentDataPath.equals(eventPath)) { // data deleted
774: regionLock.lock();
775: try {
776: if (isSynchronizing) {
777: if (!toBeCompleted.isEmpty()) {
778: if (logger.isLoggable(Level.WARNING)) {
779: logger.log(Level.WARNING,
780: "the central data deleted in the remote zookeeper server while preparing to synchronize the data. name={0}, regionName={1}, eventPath={2}, toBeCompleted={3}",
781: new Object[]{name, regionName, eventPath, toBeCompleted});
782: }
783: }
784: clearSynchronization();
785: }
786: } finally {
787: regionLock.unlock();
788: }
789: // register the watcher for detecting next data's changes again
790: exists(currentDataPath, this);
791: } else if (isSynchronizing &&
792: (eventType == Watcher.Event.EventType.NodeCreated || eventType == Watcher.Event.EventType.NodeDeleted) &&
793: eventPath != null && eventPath.startsWith(currentParticipantPath)) { // a participant joined from (watch2)
794: regionLock.lock();
795: try {
796: if (isSynchronizing) {
797: toBeCompleted.remove(eventPath);
798: if (toBeCompleted.isEmpty()) {
799: scheduleCommit(event, currentDataPath, currentParticipantPath, remoteDataBytes,
800: remoteDataStat);
801: clearSynchronization();
802: }
803: }
804: } finally {
805: regionLock.unlock();
806: }
807: } else if (isSynchronizing &&
808: eventType == Event.EventType.NodeChildrenChanged &&
809: currentNodesPath.equals(eventPath)) { // nodes changed from (watch1)
810: if (logger.isLoggable(Level.INFO)) {
811: logger.log(Level.INFO,
812: "some clients are failed or added while preparing to synchronize the data. name={0}, regionName={1}, eventPath={2}",
813: new Object[]{name, regionName, eventPath});
814: }
815: // we should watch nodes' changes again(watch1)
816: final List<String> currentNodes = getChildren(currentNodesPath, this);
817: regionLock.lock();
818: try {
819: if (isSynchronizing) {
820: // remove own node
821: currentNodes.remove(uniqueId);
822: final List<String> failureNodes = new ArrayList<String>(aliveNodesExceptMyself);
823: failureNodes.removeAll(currentNodes);
824: if (!failureNodes.isEmpty()) {
825: if (logger.isLoggable(Level.WARNING)) {
826: logger.log(Level.WARNING,
827: "some clients are failed while preparing to synchronize the data. name={0}, regionName={1}, eventPath={2}, failureNodes={3}",
828: new Object[]{name, regionName, eventPath, failureNodes});
829: }
830: for (final String node : failureNodes) {
831: final String participant = currentParticipantPath + "/" + node;
832: toBeCompleted.remove(participant);
833: }
834: if (toBeCompleted.isEmpty()) {
835: scheduleCommit(event, currentDataPath, currentParticipantPath, remoteDataBytes,
836: remoteDataStat);
837: clearSynchronization();
838: }
839: }
840: }
841: } finally {
842: regionLock.unlock();
843: }
844: } else {
845: if (logger.isLoggable(Level.FINE)) {
846: logger.log(Level.FINE,
847: "not interested. name={0}, regionName={1}, eventState={2}, eventType={3}, eventPath={4}, watcher={5}",
848: new Object[]{name, regionName, eventState, eventType, eventPath, this});
849: }
850: }
851: }
852:
853: private void scheduleCommit(final WatchedEvent event,
854: final String currentDataPath,
855: final String currentParticipantPath,
856: final byte[] currentDataBytes,
857: final Stat currentDataStat) {
858: if (event == null || currentDataPath == null || currentDataBytes == null || currentDataStat == null) {
859: return;
860: }
861: if (logger.isLoggable(Level.INFO)) {
862: logger.log(Level.INFO,
863: "all clients are prepared. name={0}, regionName={1}, commitDelayTimeInSecs={2}",
864: new Object[]{name, regionName, commitDelayTimeInSecs});
865: }
866: // all nodes are prepared
867: final Long scheduled = currentDataStat.getMtime() + TimeUnit.SECONDS.toMillis(commitDelayTimeInSecs);
868: final long remaining = scheduled - System.currentTimeMillis();
869: if (remaining < 0) {
870: if (logger.isLoggable(Level.WARNING)) {
871: logger.log(Level.WARNING,
872: "commitDelayTimeInSecs may be too small. so we will commit immediately. name={0}, regionName={1}, scheduledTime=before {2}ms",
873: new Object[]{name, regionName, -remaining});
874: }
875: } else {
876: final Date scheduledDate = new Date(scheduled);
877: if (logger.isLoggable(Level.INFO)) {
878: logger.log(Level.INFO,
879: "the changes of the central data will be applied. name={0}, regionName={1}, scheduledDate={2}, data={3}, dataStat={4}",
880: new Object[]{name, regionName, scheduledDate.toString(), currentDataBytes,
881: currentDataStat});
882: }
883: }
884: scheduledExecutor.schedule(new Runnable() {
885: @Override
886: public void run() {
887: final BarrierListener listener = listenerMap.get(regionName);
888: if (listener == null) {
889: if (logger.isLoggable(Level.INFO)) {
890: logger.log(Level.INFO,
891: "this commit will be ignored because this region already has unregistered. eventState={0}, eventType={1}, eventPath={2}, watcher={3}",
892: new Object[]{event.getState(), event.getType(), event.getPath(), this});
893: }
894: return;
895: }
896: try {
897: if (logger.isLoggable(Level.FINE)) {
898: logger.log(Level.FINE,
899: "name={0}, regionName={1}, scheduledTime={2}ms, commit time={3}ms",
900: new Object[]{name, regionName, scheduled, System.currentTimeMillis()});
901: }
902: listener.onCommit(regionName, currentDataPath, currentDataBytes);
903: if (logger.isLoggable(Level.INFO)) {
904: logger.log(Level.INFO,
905: "committed successfully. name={0}, regionName={1}, listener={2}",
906: new Object[]{name, regionName, listener});
907: }
908: } catch (Exception e) {
909: if (logger.isLoggable(Level.WARNING)) {
910: logger.log(Level.WARNING,
911: "failed to onCommit. name=" + name + ", regionName=" + regionName + ", listener=" + listener,
912: e);
913: }
914: }
915: // delete own barrier path
916: final String path = currentParticipantPath + uniqueIdPath;
917: if (!delete(path, -1)) {
918: if (logger.isLoggable(Level.FINE)) {
919: logger.log(Level.FINE,
920: "there is no the participant path to be deleted because it may already has been closed. name={0}, regionName={1}, path={2}",
921: new Object[]{name, regionName, path});
922: }
923: }
924: }
925: }, remaining, TimeUnit.MILLISECONDS);
926: }
927:
928: private void clearSynchronization() {
929: if (!isSynchronizing) {
930: return;
931: }
932: isSynchronizing = false;
933: aliveNodesExceptMyself.clear();
934: toBeCompleted.clear();
935: remoteDataBytes = null;
936: remoteDataStat = null;
937: if (rollbackFuture != null) {
938: rollbackFuture.cancel(false);
939: rollbackFuture = null;
940: }
941: }
942:
943: @Override
944: public String toString() {
945: return "RegionWatcher{" +
946: "regionName='" + regionName + '\'' +
947: '}';
948: }
949: }
950:
951: private boolean processStateChanged(final WatchedEvent event) {
952: if (event == null) {
953: throw new IllegalArgumentException("event must not be null");
954: }
955: final Watcher.Event.KeeperState eventState = event.getState();
956: final String eventPath = event.getPath();
957: final boolean isStateChangedEvent;
958: // state changed
959: if (eventPath == null) {
960: lock.lock();
961: try {
962: currentState = eventState;
963: lockCondition.signalAll();
964: } finally {
965: lock.unlock();
966: }
967: isStateChangedEvent = true;
968: } else {
969: isStateChangedEvent = false;
970: }
971: if (eventState == Watcher.Event.KeeperState.Expired) {
972: try {
973: reconnect();
974: } catch (Exception e) {
975: if (logger.isLoggable(Level.SEVERE)) {
976: logger.log(Level.SEVERE, "failed to reconnect the zookeeper server", e);
977: }
978: }
979: }
980: return isStateChangedEvent;
981: }
982:
983: @Override
984: public String toString() {
985: return "ZKClient{" +
986: "connected=" + connected +
987: ", running=" + running +
988: ", currentState=" + currentState +
989: ", listenerMap=" + listenerMap +
990: ", name='" + name + '\'' +
991: ", uniqueId='" + uniqueId + '\'' +
992: ", uniqueIdPath='" + uniqueIdPath + '\'' +
993: ", rootPath='" + rootPath + '\'' +
994: ", basePath='" + basePath + '\'' +
995: ", zooKeeperServerList='" + zooKeeperServerList + '\'' +
996: ", connectTimeoutInMillis=" + connectTimeoutInMillis +
997: ", sessionTimeoutInMillis=" + sessionTimeoutInMillis +
998: ", commitDelayTimeInSecs=" + commitDelayTimeInSecs +
999: '}';
1000: }
1001:
1002: /**
1003: * Normalize the given path
1004: *
1005: * @param path path for the zookeeper
1006: * @return normalized path
1007: */
1008: private static String normalizePath(final String path) {
1009: if (path == null) {
1010: return "/";
1011: }
1012: String temp = path.trim();
1013: while (temp.length() > 1 && temp.endsWith("/")) {
1014: temp = temp.substring(0, temp.length() - 1);
1015: }
1016: final StringBuilder builder = new StringBuilder(64);
1017: if (!temp.startsWith("/")) {
1018: builder.append('/');
1019: }
1020: builder.append(temp);
1021: return builder.toString();
1022: }
1023:
1024: /**
1025: * Builder for ZKClient
1026: */
1027: public static class Builder {
1028: private static final String DEFAULT_ROOT_PATH = "/";
1029: private static final long DEFAULT_CONNECT_TIMEOUT_IN_MILLIS = 5000; // 5secs
1030: private static final long DEFAULT_SESSION_TIMEOUT_IN_MILLIS = 30000; // 30secs
1031: private static final long DEFAULT_COMMIT_DELAY_TIME_IN_SECS = 60; // 60secs
1032:
1033: private final String name;
1034: private final String zooKeeperServerList;
1035:
1036: private String rootPath = DEFAULT_ROOT_PATH;
1037: private long connectTimeoutInMillis = DEFAULT_CONNECT_TIMEOUT_IN_MILLIS;
1038: private long sessionTimeoutInMillis = DEFAULT_SESSION_TIMEOUT_IN_MILLIS;
1039: private long commitDelayTimeInSecs = DEFAULT_COMMIT_DELAY_TIME_IN_SECS;
1040:
1041: /**
1042: * The specific name or Id for ZKClient
1043: *
1044: * @param name name or id
1045: * @param zooKeeperServerList comma separated host:port pairs, each corresponding to a zookeeper server.
1046: * e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
1047: */
1048: public Builder(final String name, final String zooKeeperServerList) {
1049: this.name = name;
1050: this.zooKeeperServerList = zooKeeperServerList;
1051: }
1052:
1053: /**
1054: * Root path for ZKClient
1055: *
1056: * @param rootPath root path of the zookeeper. default is "/".
1057: * @return this builder
1058: */
1059: public Builder rootPath(final String rootPath) {
1060: this.rootPath = rootPath;
1061: return this;
1062: }
1063:
1064: /**
1065: * Connect timeout in milli-seconds
1066: *
1067: * @param connectTimeoutInMillis connect timeout. negative value means "never timed out". default is 5000(5 secs).
1068: * @return this builder
1069: */
1070: public Builder connectTimeoutInMillis(final long connectTimeoutInMillis) {
1071: this.connectTimeoutInMillis = connectTimeoutInMillis;
1072: return this;
1073: }
1074:
1075: /**
1076: * Session timeout in milli-seconds
1077: *
1078: * @param sessionTimeoutInMillis Zookeeper connection's timeout. default is 30000(30 secs).
1079: * @return this builder
1080: */
1081: public Builder sessionTimeoutInMillis(final long sessionTimeoutInMillis) {
1082: this.sessionTimeoutInMillis = sessionTimeoutInMillis;
1083: return this;
1084: }
1085:
1086: /**
1087: * Delay time in seconds for committing
1088: *
1089: * @param commitDelayTimeInSecs delay time before committing. default is 60(60secs).
1090: * @return this builder
1091: */
1092: public Builder commitDelayTimeInSecs(final long commitDelayTimeInSecs) {
1093: this.commitDelayTimeInSecs = commitDelayTimeInSecs;
1094: return this;
1095: }
1096:
1097: /**
1098: * Build a ZKClient
1099: *
1100: * @return an instance of ZKClient
1101: */
1102: public ZKClient build() {
1103: return new ZKClient(this);
1104: }
1105: }
1106: }