Skip to content

Package: ServerListBarrierListener

ServerListBarrierListener

nameinstructionbranchcomplexitylinemethod
ServerListBarrierListener(ThriftClient, Set)
M: 35 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 10 C: 0
0%
M: 1 C: 0
0%
addCustomListener(BarrierListener)
M: 9 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
getAddressesFromStringList(String)
M: 97 C: 0
0%
M: 10 C: 0
0%
M: 6 C: 0
0%
M: 15 C: 0
0%
M: 1 C: 0
0%
getStringListFromAddressSet(Set)
M: 68 C: 0
0%
M: 10 C: 0
0%
M: 6 C: 0
0%
M: 14 C: 0
0%
M: 1 C: 0
0%
onCommit(String, String, byte[])
M: 173 C: 0
0%
M: 24 C: 0
0%
M: 13 C: 0
0%
M: 34 C: 0
0%
M: 1 C: 0
0%
onDestroy(String)
M: 39 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 8 C: 0
0%
M: 1 C: 0
0%
onInit(String, String, byte[])
M: 148 C: 0
0%
M: 24 C: 0
0%
M: 13 C: 0
0%
M: 28 C: 0
0%
M: 1 C: 0
0%
removeCustomListener(BarrierListener)
M: 9 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
toString()
M: 29 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*
2: * Copyright (c) 2012, 2017 Oracle and/or its affiliates. All rights reserved.
3: *
4: * This program and the accompanying materials are made available under the
5: * terms of the Eclipse Public License v. 2.0, which is available at
6: * http://www.eclipse.org/legal/epl-2.0.
7: *
8: * This Source Code may also be made available under the following Secondary
9: * Licenses when the conditions for such availability set forth in the
10: * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
11: * version 2 with the GNU Classpath Exception, which is available at
12: * https://www.gnu.org/software/classpath/license.html.
13: *
14: * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
15: */
16:
17: package org.glassfish.grizzly.thrift.client.zookeeper;
18:
19: import java.io.UnsupportedEncodingException;
20: import java.net.InetSocketAddress;
21: import java.net.SocketAddress;
22: import java.util.HashSet;
23: import java.util.List;
24: import java.util.Set;
25: import java.util.concurrent.CopyOnWriteArrayList;
26: import java.util.concurrent.CopyOnWriteArraySet;
27: import java.util.logging.Level;
28: import java.util.logging.Logger;
29:
30: import org.glassfish.grizzly.Grizzly;
31: import org.glassfish.grizzly.thrift.client.ThriftClient;
32:
33: /**
34: * The {@link BarrierListener} implementation for synchronizing the thrift
35: * server list among all clients which have joined the same zookeeper server
36: *
37: * @author Bongjae Chang
38: */
39: public class ServerListBarrierListener implements BarrierListener {
40:
41: private static final Logger logger = Grizzly.logger(ServerListBarrierListener.class);
42: public static final String DEFAULT_SERVER_LIST_CHARSET = "UTF-8";
43:
44: private final String thriftClientName;
45: private final ThriftClient thriftClient;
46: private final Set<SocketAddress> localServerSet = new CopyOnWriteArraySet<SocketAddress>();
47: private final List<BarrierListener> customListenerList = new CopyOnWriteArrayList<BarrierListener>();
48:
49: public ServerListBarrierListener(final ThriftClient thriftClient, final Set<SocketAddress> serverSet) {
50: this.thriftClient = thriftClient;
51:• if (this.thriftClient != null) {
52: thriftClientName = this.thriftClient.getName();
53: } else {
54: thriftClientName = null;
55: }
56:• if (serverSet != null) {
57: this.localServerSet.addAll(serverSet);
58: }
59: }
60:
61: @Override
62: public void onInit(final String regionName, final String path, final byte[] remoteBytes) {
63:• if (remoteBytes == null || remoteBytes.length == 0) {
64: return;
65: }
66: // check the remote thrift server list of the zookeeper server is equal to local
67: // if the server has pre-defined server list
68: try {
69: final String remoteServerList = new String(remoteBytes, DEFAULT_SERVER_LIST_CHARSET);
70: final Set<SocketAddress> remoteServers = getAddressesFromStringList(remoteServerList);
71: boolean checked = true;
72:• for (final SocketAddress local : localServerSet) {
73:• if (!remoteServers.remove(local)) {
74: checked = false;
75: break;
76: }
77: }
78:• if (checked && !remoteServers.isEmpty()) {
79: checked = false;
80: }
81:• if (!checked) {
82:• if (logger.isLoggable(Level.WARNING)) {
83: logger.log(Level.WARNING,
84: "failed to check the thrift server list from the remote. thriftClientName={0}, local={1}, remote={2}",
85: new Object[] { thriftClientName, localServerSet, remoteServers });
86: }
87: } else {
88:• if (logger.isLoggable(Level.INFO)) {
89: logger.log(Level.INFO, "thrift server list confirmed. thriftClientName={0}, list=[{1}]",
90: new Object[] { thriftClientName, remoteServerList });
91: }
92: }
93: } catch (UnsupportedEncodingException uee) {
94:• if (logger.isLoggable(Level.WARNING)) {
95: logger.log(Level.WARNING, "failed to check the thrift server list from the remote. thriftClientName=" + thriftClientName,
96: uee);
97: }
98: } finally {
99:• for (final BarrierListener listener : customListenerList) {
100: try {
101: listener.onInit(regionName, path, remoteBytes);
102: } catch (Exception e) {
103:• if (logger.isLoggable(Level.WARNING)) {
104: logger.log(Level.WARNING,
105: "failed to call onInit(). thriftClientName=" + thriftClientName + ", listener=" + listener, e);
106: }
107: }
108: }
109: }
110: }
111:
112: @Override
113: public void onCommit(final String regionName, final String path, byte[] remoteBytes) {
114:• if (remoteBytes == null || remoteBytes.length == 0) {
115:• if (logger.isLoggable(Level.WARNING)) {
116: logger.log(Level.WARNING, "remote bytes is null or NO_DATA(byte[0]). regionName={0}, path={1}",
117: new Object[] { regionName, path });
118: }
119: return;
120: }
121: try {
122: final String remoteDataString = new String(remoteBytes, DEFAULT_SERVER_LIST_CHARSET);
123: final Set<SocketAddress> remoteServers = getAddressesFromStringList(remoteDataString);
124:• if (!remoteServers.isEmpty()) {
125:• if (thriftClient != null) {
126: final Set<SocketAddress> shouldBeAdded = new HashSet<SocketAddress>();
127: final Set<SocketAddress> shouldBeRemoved = new HashSet<SocketAddress>();
128:• for (final SocketAddress remoteServer : remoteServers) {
129:• if (!localServerSet.remove(remoteServer)) {
130: shouldBeAdded.add(remoteServer);
131: }
132: }
133: shouldBeRemoved.addAll(localServerSet);
134:• for (final SocketAddress address : shouldBeAdded) {
135: thriftClient.addServer(address);
136: }
137:• for (final SocketAddress address : shouldBeRemoved) {
138: thriftClient.removeServer(address);
139: }
140: // refresh local
141: localServerSet.clear();
142: localServerSet.addAll(remoteServers);
143: }
144: }
145: } catch (UnsupportedEncodingException uee) {
146:• if (logger.isLoggable(Level.WARNING)) {
147: logger.log(Level.WARNING, "failed to apply the changed server list of the remote zookeeper server. regionName=" + regionName
148: + ", path=" + path, uee);
149: }
150: } finally {
151:• for (final BarrierListener listener : customListenerList) {
152: try {
153: listener.onCommit(regionName, path, remoteBytes);
154: } catch (Exception e) {
155:• if (logger.isLoggable(Level.WARNING)) {
156: logger.log(Level.WARNING,
157: "failed to call onCommit(). thriftClientName=" + thriftClientName + ", listener=" + listener, e);
158: }
159: }
160: }
161: }
162: }
163:
164: @Override
165: public void onDestroy(final String regionName) {
166:• for (final BarrierListener listener : customListenerList) {
167: try {
168: listener.onDestroy(regionName);
169: } catch (Exception e) {
170:• if (logger.isLoggable(Level.WARNING)) {
171: logger.log(Level.WARNING, "failed to call onDestroy(). thriftClientName=" + thriftClientName + ", listener=" + listener,
172: e);
173: }
174: }
175: }
176: }
177:
178: public void addCustomListener(final BarrierListener listener) {
179:• if (listener == null) {
180: return;
181: }
182: customListenerList.add(listener);
183: }
184:
185: public void removeCustomListener(final BarrierListener listener) {
186:• if (listener == null) {
187: return;
188: }
189: customListenerList.remove(listener);
190: }
191:
192: @Override
193: public String toString() {
194: return "ServerListBarrierListener{" + "thriftClientName='" + thriftClientName + '\'' + ", thriftClient=" + thriftClient
195: + ", localServerSet=" + localServerSet + ", customListenerList=" + customListenerList + '}';
196: }
197:
198: /**
199: * Split a string in the form of "host:port, host2:port" into a Set of
200: * {@link java.net.SocketAddress} instances.
201: * <p>
202: * Note that colon-delimited IPv6 is also supported. For example: ::1:11211
203: *
204: * @param serverList server list in the form of "host:port,host2:port"
205: * @return server set
206: */
207: public static Set<SocketAddress> getAddressesFromStringList(final String serverList) {
208:• if (serverList == null) {
209: throw new IllegalArgumentException("null host list");
210: }
211:• if (serverList.trim().equals("")) {
212: throw new IllegalArgumentException("no hosts in list: ``" + serverList + "''");
213: }
214: final HashSet<SocketAddress> addrs = new HashSet<SocketAddress>();
215:• for (final String hoststuff : serverList.split("(,| )")) {
216:• if (hoststuff.length() == 0) {
217: continue;
218: }
219: int finalColon = hoststuff.lastIndexOf(':');
220:• if (finalColon < 1) {
221: throw new IllegalArgumentException("Invalid server ``" + hoststuff + "'' in list: " + serverList);
222: }
223: final String hostPart = hoststuff.substring(0, finalColon);
224: final String portNum = hoststuff.substring(finalColon + 1);
225: addrs.add(new InetSocketAddress(hostPart, Integer.parseInt(portNum)));
226: }
227: return addrs;
228: }
229:
230: /**
231: * Convert server set into server list like "host:port,host2:port"
232: *
233: * @param servers {@link java.net.InetSocketAddress} set
234: * @return server list in the form of "host:port,host2:port"
235: */
236: public static String getStringListFromAddressSet(final Set<SocketAddress> servers) {
237:• if (servers == null || servers.isEmpty()) {
238: throw new IllegalArgumentException("Null servers");
239: }
240: final StringBuilder builder = new StringBuilder(256);
241:• for (final SocketAddress server : servers) {
242:• if (server instanceof InetSocketAddress) {
243: final InetSocketAddress inetSocketAddress = (InetSocketAddress) server;
244: builder.append(inetSocketAddress.getHostName()).append(':').append(inetSocketAddress.getPort());
245: builder.append(',');
246: }
247: }
248: final String result = builder.toString();
249: final int resultLength = result.length();
250:• if (resultLength < 1) {
251: throw new IllegalArgumentException("there is no InetSocketAddress in the server set");
252: } else {
253: // remove the last comma
254: return result.substring(0, result.length() - 1);
255: }
256: }
257: }