Skip to content

Package: GrizzlyThriftClient$1

GrizzlyThriftClient$1

nameinstructionbranchcomplexitylinemethod
closeTTransport(TTransport)
M: 29 C: 0
0%
M: 6 C: 0
0%
M: 4 C: 0
0%
M: 10 C: 0
0%
M: 1 C: 0
0%
createObject(SocketAddress)
M: 213 C: 0
0%
M: 38 C: 0
0%
M: 20 C: 0
0%
M: 44 C: 0
0%
M: 1 C: 0
0%
destroyObject(SocketAddress, TServiceClient)
M: 25 C: 0
0%
M: 6 C: 0
0%
M: 4 C: 0
0%
M: 10 C: 0
0%
M: 1 C: 0
0%
validateObject(SocketAddress, TServiceClient)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
{...}
M: 6 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*
2: * Copyright (c) 2012, 2017 Oracle and/or its affiliates. All rights reserved.
3: *
4: * This program and the accompanying materials are made available under the
5: * terms of the Eclipse Public License v. 2.0, which is available at
6: * http://www.eclipse.org/legal/epl-2.0.
7: *
8: * This Source Code may also be made available under the following Secondary
9: * Licenses when the conditions for such availability set forth in the
10: * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
11: * version 2 with the GNU Classpath Exception, which is available at
12: * https://www.gnu.org/software/classpath/license.html.
13: *
14: * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
15: */
16:
17: package org.glassfish.grizzly.thrift.client;
18:
19: import java.io.IOException;
20: import java.io.UnsupportedEncodingException;
21: import java.lang.reflect.Method;
22: import java.net.SocketAddress;
23: import java.util.HashMap;
24: import java.util.HashSet;
25: import java.util.Map;
26: import java.util.Set;
27: import java.util.concurrent.ConcurrentHashMap;
28: import java.util.concurrent.ExecutionException;
29: import java.util.concurrent.Executors;
30: import java.util.concurrent.Future;
31: import java.util.concurrent.ScheduledExecutorService;
32: import java.util.concurrent.ScheduledFuture;
33: import java.util.concurrent.TimeUnit;
34: import java.util.concurrent.TimeoutException;
35: import java.util.concurrent.atomic.AtomicBoolean;
36: import java.util.logging.Level;
37: import java.util.logging.Logger;
38:
39: import org.apache.thrift.TServiceClient;
40: import org.apache.thrift.TServiceClientFactory;
41: import org.apache.thrift.protocol.TBinaryProtocol;
42: import org.apache.thrift.protocol.TCompactProtocol;
43: import org.apache.thrift.protocol.TProtocol;
44: import org.apache.thrift.protocol.TProtocolException;
45: import org.apache.thrift.transport.TTransport;
46: import org.apache.thrift.transport.TTransportException;
47: import org.glassfish.grizzly.Connection;
48: import org.glassfish.grizzly.ConnectorHandler;
49: import org.glassfish.grizzly.Grizzly;
50: import org.glassfish.grizzly.Processor;
51: import org.glassfish.grizzly.attributes.Attribute;
52: import org.glassfish.grizzly.filterchain.FilterChain;
53: import org.glassfish.grizzly.filterchain.FilterChainBuilder;
54: import org.glassfish.grizzly.filterchain.TransportFilter;
55: import org.glassfish.grizzly.http.HttpClientFilter;
56: import org.glassfish.grizzly.http.util.Header;
57: import org.glassfish.grizzly.nio.transport.TCPNIOConnectorHandler;
58: import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
59: import org.glassfish.grizzly.thrift.TGrizzlyClientTransport;
60: import org.glassfish.grizzly.thrift.TTimedoutException;
61: import org.glassfish.grizzly.thrift.ThriftClientFilter;
62: import org.glassfish.grizzly.thrift.ThriftFrameFilter;
63: import org.glassfish.grizzly.thrift.client.pool.BaseObjectPool;
64: import org.glassfish.grizzly.thrift.client.pool.NoValidObjectException;
65: import org.glassfish.grizzly.thrift.client.pool.ObjectPool;
66: import org.glassfish.grizzly.thrift.client.pool.PoolExhaustedException;
67: import org.glassfish.grizzly.thrift.client.pool.PoolableObjectFactory;
68: import org.glassfish.grizzly.thrift.client.zookeeper.BarrierListener;
69: import org.glassfish.grizzly.thrift.client.zookeeper.ServerListBarrierListener;
70: import org.glassfish.grizzly.thrift.client.zookeeper.ZKClient;
71: import org.glassfish.grizzly.thrift.client.zookeeper.ZooKeeperSupportThriftClient;
72: import org.glassfish.grizzly.thrift.http.ThriftHttpClientFilter;
73:
74: /**
75: * The implementation of the {@link ThriftClient} based on Grizzly
76: * <p>
77: * Basically, this class use {@link BaseObjectPool} for pooling connections of
78: * the thrift server and {@link RoundRobinStore} for selecting the thrift
79: * server.
80: * <p>
81: * When a thrift operation is called, 1. finding the correct server by
82: * round-robin 2. borrowing the connection from the connection pool 3. returning
83: * the connection to the pool
84: * <p>
85: * For the failback of the thrift server, {@link HealthMonitorTask} will be
86: * scheduled by {@code healthMonitorIntervalInSecs}. If connecting and writing
87: * are failed, this thrift client retries failure operations by
88: * {@code retryCount}. The retrial doesn't request failed server but another
89: * thrift server. And this client provides {@code failover} flag which can turn
90: * off the failover/failback.
91: * <p>
92: * Example of use: {@code
93: * // creates a ThriftClientManager
94: * final GrizzlyThriftClientManager manager = new GrizzlyThriftClientManager.Builder().build();
95: *
96: * // creates a ThriftClientBuilder
97: * final GrizzlyThriftClient.Builder<Calculator.Client> builder = manager.createThriftClientBuilder("Calculator", new Calculator.Client.Factory());
98: * // sets initial servers
99: * builder.servers(initServerSet);
100: * // creates the specific thrift client
101: * final ThriftClient<Calculator.Client> calculatorThriftClient = builder.build();
102: *
103: * // if you need to add another server
104: * calculatorThriftClient.addServer(anotherServerAddress);
105: *
106: * // custom thrift operations
107: * Integer result = calculatorThriftClient.execute(new ThriftClientCallback<Calculator.Client, Integer>() {
108: *
109: * public Integer call(Calculator.Client client) throws TException {
110: * return client.add(1, 2); } }); // ...
111: *
112: * // shuts down manager.shutdown(); }
113: *
114: * @author Bongjae Chang
115: */
116: public class GrizzlyThriftClient<T extends TServiceClient> implements ThriftClient<T>, ZooKeeperSupportThriftClient {
117:
118: private static final Logger logger = Grizzly.logger(GrizzlyThriftClient.class);
119:
120: private final String thriftClientName;
121: private final TCPNIOTransport transport;
122: private final long connectTimeoutInMillis;
123: private final long writeTimeoutInMillis;
124: private final long responseTimeoutInMillis;
125: private final String validationCheckMethodName;
126:
127: public static final String CONNECTION_POOL_ATTRIBUTE_NAME = "GrizzlyThriftClient.ConnectionPool";
128: public static final String CLIENT_ATTRIBUTE_NAME = "GrizzlyThriftClient.Client";
129: public static final String INPUT_BUFFERS_QUEUE_ATTRIBUTE_NAME = "GrizzlyThriftClient.inputBuffersQueue";
130:
131: private final Attribute<ObjectPool<SocketAddress, T>> connectionPoolAttribute = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
132: .createAttribute(CONNECTION_POOL_ATTRIBUTE_NAME);
133: private final Attribute<T> clientAttribute = Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(CLIENT_ATTRIBUTE_NAME);
134:
135: private final ObjectPool<SocketAddress, T> connectionPool;
136:
137: private final Set<SocketAddress> initialServers;
138:
139: private final long healthMonitorIntervalInSecs;
140: private final ScheduledFuture<?> scheduledFuture;
141: private final HealthMonitorTask healthMonitorTask;
142: private final ScheduledExecutorService scheduledExecutor;
143:
144: private final boolean retainLastServer;
145: private final boolean failover;
146: private final int retryCount;
147:
148: private final ThriftProtocols thriftProtocol;
149: private final TServiceClientFactory<T> clientFactory;
150:
151: private final RoundRobinStore<SocketAddress> roundRobinStore = new RoundRobinStore<SocketAddress>();
152:
153: private final ZKClient zkClient;
154: private final ServerListBarrierListener zkListener;
155: private String zooKeeperServerListPath;
156:
157: private enum TransferProtocols {
158: BASIC, HTTP
159: }
160:
161: private final TransferProtocols transferProtocol;
162: private final int maxThriftFrameLength;
163: private final String httpUriPath;
164: private Map<String, String> httpHeaders;
165: private final Processor processor;
166:
167: private GrizzlyThriftClient(Builder<T> builder) {
168: this.thriftClientName = builder.thriftClientName;
169: this.transport = builder.transport;
170: this.clientFactory = builder.clientFactory;
171: this.thriftProtocol = builder.thriftProtocol;
172: this.connectTimeoutInMillis = builder.connectTimeoutInMillis;
173: this.writeTimeoutInMillis = builder.writeTimeoutInMillis;
174: this.responseTimeoutInMillis = builder.responseTimeoutInMillis;
175: this.healthMonitorIntervalInSecs = builder.healthMonitorIntervalInSecs;
176: this.validationCheckMethodName = builder.validationCheckMethodName;
177: this.retainLastServer = builder.retainLastServer;
178:
179: this.maxThriftFrameLength = builder.maxThriftFrameLength;
180: this.transferProtocol = builder.transferProtocol;
181: this.httpUriPath = builder.httpUriPath;
182: this.httpHeaders = builder.httpHeaders;
183:
184: final FilterChainBuilder clientFilterChainBuilder = FilterChainBuilder.stateless();
185: switch (transferProtocol) {
186: case HTTP:
187: clientFilterChainBuilder.add(new TransportFilter()).add(new HttpClientFilter())
188: .add(new ThriftHttpClientFilter(httpUriPath, httpHeaders)).add(new ThriftClientFilter());
189: break;
190: case BASIC:
191: default:
192: clientFilterChainBuilder.add(new TransportFilter()).add(new ThriftFrameFilter(maxThriftFrameLength))
193: .add(new ThriftClientFilter());
194: break;
195: }
196: this.processor = clientFilterChainBuilder.build();
197:
198: @SuppressWarnings("unchecked")
199: final BaseObjectPool.Builder<SocketAddress, T> connectionPoolBuilder = new BaseObjectPool.Builder<SocketAddress, T>(
200: new PoolableObjectFactory<SocketAddress, T>() {
201: @Override
202: public T createObject(final SocketAddress key) throws Exception {
203: final ConnectorHandler<SocketAddress> connectorHandler = TCPNIOConnectorHandler.builder(transport)
204: .processor(processor).setReuseAddress(true).build();
205: final Future<Connection> future = connectorHandler.connect(key);
206: final Connection<SocketAddress> connection;
207: try {
208:• if (connectTimeoutInMillis < 0) {
209: connection = future.get();
210: } else {
211: connection = future.get(connectTimeoutInMillis, TimeUnit.MILLISECONDS);
212: }
213: } catch (InterruptedException ie) {
214:• if (!future.cancel(false) && future.isDone()) {
215: final Connection c = future.get();
216:• if (c != null && c.isOpen()) {
217: c.closeSilently();
218: }
219: }
220:• if (logger.isLoggable(Level.FINER)) {
221: logger.log(Level.FINER, "failed to get the connection. address=" + key, ie);
222: }
223: throw ie;
224: } catch (ExecutionException ee) {
225:• if (!future.cancel(false) && future.isDone()) {
226: final Connection c = future.get();
227:• if (c != null && c.isOpen()) {
228: c.closeSilently();
229: }
230: }
231:• if (logger.isLoggable(Level.FINER)) {
232: logger.log(Level.FINER, "failed to get the connection. address=" + key, ee);
233: }
234: throw ee;
235: } catch (TimeoutException te) {
236:• if (!future.cancel(false) && future.isDone()) {
237: final Connection c = future.get();
238:• if (c != null && c.isOpen()) {
239: c.closeSilently();
240: }
241: }
242:• if (logger.isLoggable(Level.FINER)) {
243: logger.log(Level.FINER, "failed to get the connection. address=" + key, te);
244: }
245: throw te;
246: }
247:• if (connection != null) {
248: connectionPoolAttribute.set(connection, connectionPool);
249: final TGrizzlyClientTransport ttransport = TGrizzlyClientTransport.create(connection, responseTimeoutInMillis,
250: writeTimeoutInMillis);
251: final TProtocol protocol;
252:• if (thriftProtocol == ThriftProtocols.BINARY) {
253: protocol = new TBinaryProtocol(ttransport);
254:• } else if (thriftProtocol == ThriftProtocols.COMPACT) {
255: protocol = new TCompactProtocol(ttransport);
256: } else {
257: protocol = new TBinaryProtocol(ttransport);
258: }
259: final T result = clientFactory.getClient(protocol);
260: clientAttribute.set(connection, result);
261: return result;
262: } else {
263: throw new IllegalStateException("connection must not be null");
264: }
265: }
266:
267: @Override
268: public void destroyObject(final SocketAddress key, final T value) throws Exception {
269:• if (value != null) {
270: final TProtocol inputTProtocol = value.getInputProtocol();
271:• if (inputTProtocol != null) {
272: final TTransport inputTTransport = inputTProtocol.getTransport();
273: closeTTransport(inputTTransport);
274: }
275: final TProtocol outputTProtocol = value.getOutputProtocol();
276:• if (outputTProtocol != null) {
277: final TTransport outputTTransport = outputTProtocol.getTransport();
278: closeTTransport(outputTTransport);
279: }
280: }
281: }
282:
283: private void closeTTransport(final TTransport tTransport) {
284:• if (tTransport == null) {
285: return;
286: }
287:• if (tTransport instanceof TGrizzlyClientTransport) {
288: final TGrizzlyClientTransport tGrizzlyClientTransport = (TGrizzlyClientTransport) tTransport;
289: final Connection connection = tGrizzlyClientTransport.getGrizzlyConnection();
290:• if (connection != null) {
291: connectionPoolAttribute.remove(connection);
292: clientAttribute.remove(connection);
293: }
294: }
295: tTransport.close();
296: }
297:
298: @Override
299: public boolean validateObject(final SocketAddress key, final T value) throws Exception {
300: return GrizzlyThriftClient.this.validateClient(value);
301: }
302: });
303: connectionPoolBuilder.min(builder.minConnectionPerServer);
304: connectionPoolBuilder.max(builder.maxConnectionPerServer);
305: connectionPoolBuilder.keepAliveTimeoutInSecs(builder.keepAliveTimeoutInSecs);
306: connectionPoolBuilder.disposable(builder.allowDisposableConnection);
307: connectionPoolBuilder.borrowValidation(builder.borrowValidation);
308: connectionPoolBuilder.returnValidation(builder.returnValidation);
309: connectionPool = connectionPoolBuilder.build();
310:
311: this.failover = builder.failover;
312: this.retryCount = builder.retryCount;
313:
314: this.initialServers = builder.servers;
315:
316: if (failover && healthMonitorIntervalInSecs > 0) {
317: healthMonitorTask = new HealthMonitorTask();
318: scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
319: scheduledFuture = scheduledExecutor.scheduleWithFixedDelay(healthMonitorTask, healthMonitorIntervalInSecs,
320: healthMonitorIntervalInSecs, TimeUnit.SECONDS);
321: } else {
322: healthMonitorTask = null;
323: scheduledExecutor = null;
324: scheduledFuture = null;
325: }
326:
327: this.zkClient = builder.zkClient;
328: this.zkListener = new ServerListBarrierListener(this, initialServers);
329: }
330:
331: /**
332: * {@inheritDoc}
333: */
334: @Override
335: public void start() {
336: final Processor processor = transport.getProcessor();
337: if (!(processor instanceof FilterChain)) {
338: throw new IllegalStateException("transport's processor has to be a FilterChain");
339: }
340: if (initialServers != null) {
341: for (SocketAddress address : initialServers) {
342: addServer(address);
343: }
344: roundRobinStore.shuffle();
345: }
346: if (zkClient != null) {
347: // need to initialize the remote server with local initalServers if the remote
348: // server data is empty?
349: // currently, do nothing
350: zooKeeperServerListPath = zkClient.registerBarrier(thriftClientName, zkListener, null);
351: }
352: }
353:
354: /**
355: * {@inheritDoc}
356: */
357: @Override
358: public void stop() {
359: if (scheduledFuture != null) {
360: scheduledFuture.cancel(true);
361: }
362: if (scheduledExecutor != null) {
363: scheduledExecutor.shutdown();
364: }
365: if (initialServers != null) {
366: initialServers.clear();
367: }
368: roundRobinStore.clear();
369: if (connectionPool != null) {
370: connectionPool.destroy();
371: }
372: if (zkClient != null) {
373: zkClient.unregisterBarrier(thriftClientName);
374: }
375: }
376:
377: /**
378: * {@inheritDoc}
379: */
380: @SuppressWarnings("unchecked")
381: @Override
382: public boolean addServer(final SocketAddress serverAddress) {
383: return addServer(serverAddress, true);
384: }
385:
386: @SuppressWarnings("unchecked")
387: private boolean addServer(final SocketAddress serverAddress, boolean initial) {
388: if (serverAddress == null) {
389: return true;
390: }
391: if (connectionPool != null) {
392: try {
393: connectionPool.createAllMinObjects(serverAddress);
394: } catch (Exception e) {
395: if (logger.isLoggable(Level.SEVERE)) {
396: logger.log(Level.SEVERE, "failed to create min connections in the pool. address=" + serverAddress, e);
397: }
398: try {
399: connectionPool.destroy(serverAddress);
400: } catch (Exception ignore) {
401: }
402: if (!initial) {
403: return false;
404: }
405: }
406: }
407: roundRobinStore.add(serverAddress);
408: if (logger.isLoggable(Level.INFO)) {
409: logger.log(Level.INFO, "added the server successfully. address={0}", serverAddress);
410: }
411: return true;
412: }
413:
414: /**
415: * {@inheritDoc}
416: */
417: @Override
418: public void removeServer(final SocketAddress serverAddress) {
419: removeServer(serverAddress, true);
420: }
421:
422: private void removeServer(final SocketAddress serverAddress, final boolean forcibly) {
423: if (serverAddress == null) {
424: return;
425: }
426: if (!forcibly) {
427: if (healthMonitorTask != null && healthMonitorTask.failure(serverAddress)
428: && !(retainLastServer && roundRobinStore.hasOnly(serverAddress))) {
429: roundRobinStore.remove(serverAddress);
430: if (logger.isLoggable(Level.INFO)) {
431: logger.log(Level.INFO, "removed the server successfully. address={0}", serverAddress);
432: }
433: }
434: } else {
435: roundRobinStore.remove(serverAddress);
436: if (logger.isLoggable(Level.INFO)) {
437: logger.log(Level.INFO, "removed the server successfully. address={0}", serverAddress);
438: }
439: }
440: if (connectionPool != null) {
441: try {
442: connectionPool.destroy(serverAddress);
443: if (logger.isLoggable(Level.INFO)) {
444: logger.log(Level.INFO, "removed the server in the pool successfully. address={0}", serverAddress);
445: }
446: } catch (Exception e) {
447: if (logger.isLoggable(Level.WARNING)) {
448: logger.log(Level.WARNING, "failed to remove connections in the pool", e);
449: }
450: }
451: }
452: }
453:
454: /**
455: * {@inheritDoc}
456: */
457: @Override
458: public boolean isInServerList(final SocketAddress serverAddress) {
459: return roundRobinStore.hasValue(serverAddress);
460: }
461:
462: /**
463: * {@inheritDoc}
464: */
465: @Override
466: public boolean isZooKeeperSupported() {
467: return zkClient != null;
468: }
469:
470: /**
471: * {@inheritDoc}
472: */
473: @Override
474: public String getZooKeeperServerListPath() {
475: if (!isZooKeeperSupported()) {
476: return null;
477: }
478: return zooKeeperServerListPath;
479: }
480:
481: /**
482: * {@inheritDoc}
483: */
484: @Override
485: public String getCurrentServerListFromZooKeeper() {
486: if (!isZooKeeperSupported()) {
487: return null;
488: }
489: final byte[] serverListBytes = zkClient.getData(zooKeeperServerListPath, null);
490: if (serverListBytes == null) {
491: return null;
492: }
493: final String serverListString;
494: try {
495: serverListString = new String(serverListBytes, ServerListBarrierListener.DEFAULT_SERVER_LIST_CHARSET);
496: } catch (UnsupportedEncodingException e) {
497: if (logger.isLoggable(Level.WARNING)) {
498: logger.log(Level.WARNING, "failed to decode the server list bytes");
499: }
500: return null;
501: }
502: return serverListString;
503: }
504:
505: /**
506: * {@inheritDoc}
507: */
508: @Override
509: public boolean setCurrentServerListOfZooKeeper(final String thriftServerList) {
510: if (!isZooKeeperSupported()) {
511: return false;
512: }
513: if (thriftServerList == null) {
514: return false;
515: }
516: final byte[] serverListBytes;
517: try {
518: serverListBytes = thriftServerList.getBytes(ServerListBarrierListener.DEFAULT_SERVER_LIST_CHARSET);
519: } catch (UnsupportedEncodingException e) {
520: if (logger.isLoggable(Level.WARNING)) {
521: logger.log(Level.WARNING, "failed to eecode the server list");
522: }
523: return false;
524: }
525: return zkClient.setData(zooKeeperServerListPath, serverListBytes, -1) != null;
526: }
527:
528: /**
529: * {@inheritDoc}
530: */
531: @Override
532: public void addZooKeeperListener(final BarrierListener listener) {
533: if (!isZooKeeperSupported()) {
534: return;
535: }
536: zkListener.addCustomListener(listener);
537: }
538:
539: /**
540: * {@inheritDoc}
541: */
542: @Override
543: public void removeZooKeeperListener(final BarrierListener listener) {
544: if (!isZooKeeperSupported()) {
545: return;
546: }
547: zkListener.removeCustomListener(listener);
548: }
549:
550: /**
551: * {@inheritDoc}
552: */
553: @Override
554: public String getName() {
555: return thriftClientName;
556: }
557:
558: @Override
559: public <U> U execute(final ThriftClientCallback<T, U> callback) throws Exception {
560: for (int i = 0; i <= retryCount; i++) {
561: T client;
562: final SocketAddress address = roundRobinStore.get();
563: try {
564: client = connectionPool.borrowObject(address, connectTimeoutInMillis);
565: } catch (PoolExhaustedException pee) {
566: if (logger.isLoggable(Level.FINER)) {
567: logger.log(Level.FINER, "failed to get the client. address=" + address + ", timeout=" + connectTimeoutInMillis + "ms",
568: pee);
569: }
570: continue;
571: } catch (NoValidObjectException nvoe) {
572: if (logger.isLoggable(Level.FINER)) {
573: logger.log(Level.FINER, "failed to get the client. address=" + address + ", timeout=" + connectTimeoutInMillis + "ms",
574: nvoe);
575: }
576: removeServer(address, false);
577: continue;
578: } catch (TimeoutException te) {
579: if (logger.isLoggable(Level.FINER)) {
580: logger.log(Level.FINER, "failed to get the client. address=" + address + ", timeout=" + connectTimeoutInMillis + "ms",
581: te);
582: }
583: continue;
584: } catch (InterruptedException ie) {
585: if (logger.isLoggable(Level.FINER)) {
586: logger.log(Level.FINER, "failed to get the client. address=" + address + ", timeout=" + connectTimeoutInMillis + "ms",
587: ie);
588: }
589: throw ie;
590: }
591: if (client == null) {
592: continue;
593: }
594: final boolean isMaxRetryCountReached = (i == retryCount);
595: final Level logLevel;
596: if (isMaxRetryCountReached) {
597: logLevel = Level.INFO;
598: } else {
599: logLevel = Level.FINER;
600: }
601: boolean systemException = false;
602: try {
603: return callback.call(client);
604: } catch (TTimedoutException tte) {
605: systemException = true;
606: if (logger.isLoggable(logLevel)) {
607: logger.log(logLevel, "timed out. address=" + address + ", client=" + client + ", callback" + callback, tte);
608: }
609: try {
610: connectionPool.removeObject(address, client);
611: } catch (Exception e) {
612: if (logger.isLoggable(logLevel)) {
613: logger.log(logLevel, "failed to remove the client. address=" + address + ", client=" + client, e);
614: }
615: }
616: } catch (TProtocolException tpe) {
617: systemException = true;
618: if (logger.isLoggable(logLevel)) {
619: logger.log(logLevel,
620: "occurred a thrift protocol error. address=" + address + ", client=" + client + ", callback" + callback, tpe);
621: }
622: try {
623: connectionPool.removeObject(address, client);
624: } catch (Exception e) {
625: if (logger.isLoggable(logLevel)) {
626: logger.log(logLevel, "failed to remove the client. address=" + address + ", client=" + client, e);
627: }
628: }
629: } catch (TTransportException tte) {
630: systemException = true;
631: if (logger.isLoggable(logLevel)) {
632: logger.log(logLevel,
633: "occurred a thrift trasport error. address=" + address + ", client=" + client + ", callback" + callback, tte);
634: }
635: try {
636: connectionPool.removeObject(address, client);
637: } catch (Exception e) {
638: if (logger.isLoggable(logLevel)) {
639: logger.log(logLevel, "failed to remove the client. address=" + address + ", client=" + client, e);
640: }
641: }
642: } finally {
643: if (!systemException) {
644: try {
645: connectionPool.returnObject(address, client);
646: } catch (Exception e) {
647: if (logger.isLoggable(logLevel)) {
648: logger.log(logLevel, "failed to return the client. address=" + address + ", client=" + client, e);
649: }
650: }
651: }
652: }
653: }
654: throw new IOException("failed to get the valid client");
655: }
656:
657: private boolean validateClient(final T client) {
658: if (validationCheckMethodName == null) {
659: return true;
660: }
661: if (client == null) {
662: return false;
663: }
664: try {
665: final Method m = client.getClass().getMethod(validationCheckMethodName);
666: try {
667: m.invoke(client);
668: } catch (Throwable t) {
669: if (logger.isLoggable(Level.WARNING)) {
670: logger.log(Level.WARNING, "the client is not valid. client=" + client, t);
671: }
672: return false;
673: }
674: } catch (Throwable ignore) {
675: if (logger.isLoggable(Level.FINER)) {
676: logger.log(Level.FINER, "the '" + validationCheckMethodName + "()' method has not been implemented.", ignore);
677: }
678: }
679: return true;
680: }
681:
682: private boolean validateConnection(final Connection connection) {
683: if (connection == null) {
684: return false;
685: }
686: final TGrizzlyClientTransport ttransport = TGrizzlyClientTransport.create(connection, responseTimeoutInMillis,
687: writeTimeoutInMillis);
688: final TProtocol protocol;
689: if (thriftProtocol == ThriftProtocols.BINARY) {
690: protocol = new TBinaryProtocol(ttransport);
691: } else if (thriftProtocol == ThriftProtocols.COMPACT) {
692: protocol = new TCompactProtocol(ttransport);
693: } else {
694: protocol = new TBinaryProtocol(ttransport);
695: }
696: final T client = clientFactory.getClient(protocol);
697: final boolean result = validateClient(client);
698: ttransport.close();
699: return result;
700: }
701:
702: private class HealthMonitorTask implements Runnable {
703:
704: private final Map<SocketAddress, Boolean> failures = new ConcurrentHashMap<SocketAddress, Boolean>();
705: private final Map<SocketAddress, Boolean> revivals = new ConcurrentHashMap<SocketAddress, Boolean>();
706: private final AtomicBoolean running = new AtomicBoolean();
707:
708: public boolean failure(final SocketAddress address) {
709: if (address == null) {
710: return true;
711: }
712: if (failures.get(address) == null && revivals.get(address) == null) {
713: failures.put(address, Boolean.TRUE);
714: return true;
715: } else {
716: return false;
717: }
718: }
719:
720: @SuppressWarnings("unchecked")
721: @Override
722: public void run() {
723: if (transport == null) {
724: throw new IllegalStateException("transport must not be null");
725: }
726: if (!running.compareAndSet(false, true)) {
727: return;
728: }
729: try {
730: revivals.clear();
731: final Set<SocketAddress> failuresSet = failures.keySet();
732: if (logger.isLoggable(Level.FINE)) {
733: logger.log(Level.FINE, "try to check the failures in health monitor. failed list hint={0}, interval={1}secs",
734: new Object[] { failuresSet, healthMonitorIntervalInSecs });
735: } else if (logger.isLoggable(Level.INFO) && !failuresSet.isEmpty()) {
736: logger.log(Level.INFO, "try to check the failures in health monitor. failed list hint={0}, interval={1}secs",
737: new Object[] { failuresSet, healthMonitorIntervalInSecs });
738: }
739: for (SocketAddress failure : failuresSet) {
740: try {
741: // get the temporary connection
742: final ConnectorHandler<SocketAddress> connectorHandler = TCPNIOConnectorHandler.builder(transport)
743: .setReuseAddress(true).build();
744: Future<Connection> future = connectorHandler.connect(failure);
745: final Connection<SocketAddress> connection;
746: try {
747: if (connectTimeoutInMillis < 0) {
748: connection = future.get();
749: } else {
750: connection = future.get(connectTimeoutInMillis, TimeUnit.MILLISECONDS);
751: }
752: } catch (InterruptedException ie) {
753: if (!future.cancel(false) && future.isDone()) {
754: final Connection c = future.get();
755: if (c != null && c.isOpen()) {
756: c.closeSilently();
757: }
758: }
759: if (logger.isLoggable(Level.SEVERE)) {
760: logger.log(Level.SEVERE, "failed to get the connection in health monitor. address=" + failure, ie);
761: }
762: continue;
763: } catch (ExecutionException ee) {
764: if (!future.cancel(false) && future.isDone()) {
765: final Connection c = future.get();
766: if (c != null && c.isOpen()) {
767: c.closeSilently();
768: }
769: }
770: if (logger.isLoggable(Level.SEVERE)) {
771: logger.log(Level.SEVERE, "failed to get the connection in health monitor. address=" + failure, ee);
772: }
773: continue;
774: } catch (TimeoutException te) {
775: if (!future.cancel(false) && future.isDone()) {
776: final Connection c = future.get();
777: if (c != null && c.isOpen()) {
778: c.closeSilently();
779: }
780: }
781: if (logger.isLoggable(Level.SEVERE)) {
782: logger.log(Level.SEVERE, "failed to get the connection in health monitor. address=" + failure, te);
783: }
784: continue;
785: }
786: if (validateConnection(connection)) {
787: failures.remove(failure);
788: revivals.put(failure, Boolean.TRUE);
789: }
790: connection.closeSilently();
791: } catch (Throwable t) {
792: if (logger.isLoggable(Level.SEVERE)) {
793: logger.log(Level.SEVERE, "unexpected exception thrown", t);
794: }
795: }
796: }
797: final Set<SocketAddress> revivalsSet = revivals.keySet();
798: if (logger.isLoggable(Level.FINE)) {
799: logger.log(Level.FINE, "try to restore revivals in health monitor. revival list hint={0}, interval={1}secs",
800: new Object[] { revivalsSet, healthMonitorIntervalInSecs });
801: } else if (logger.isLoggable(Level.INFO) && !revivalsSet.isEmpty()) {
802: logger.log(Level.INFO, "try to restore revivals in health monitor. revival list hint={0}, interval={1}secs",
803: new Object[] { revivalsSet, healthMonitorIntervalInSecs });
804: }
805: for (SocketAddress revival : revivalsSet) {
806: if (!addServer(revival, false)) {
807: if (logger.isLoggable(Level.WARNING)) {
808: logger.log(Level.WARNING, "the revival was failed again in health monitor. revival={0}", revival);
809: }
810: failures.put(revival, Boolean.TRUE);
811: }
812: }
813: } finally {
814: running.set(false);
815: }
816: }
817: }
818:
819: public static class Builder<T extends TServiceClient> implements ThriftClientBuilder {
820:
821: private final String thriftClientName;
822: private final GrizzlyThriftClientManager manager;
823: private final TCPNIOTransport transport;
824: private final TServiceClientFactory<T> clientFactory;
825:
826: private Set<SocketAddress> servers;
827: private long connectTimeoutInMillis = 5000; // 5secs
828: private long writeTimeoutInMillis = 5000; // 5secs
829: private long responseTimeoutInMillis = 10000; // 10secs
830:
831: private long healthMonitorIntervalInSecs = 60; // 1 min
832: private boolean failover = true;
833: private int retryCount = 1;
834: private ThriftProtocols thriftProtocol = ThriftProtocols.BINARY;
835: private TransferProtocols transferProtocol = TransferProtocols.BASIC;
836: private String validationCheckMethodName = null;
837:
838: // connection pool config
839: private int minConnectionPerServer = 5;
840: private int maxConnectionPerServer = Integer.MAX_VALUE;
841: private long keepAliveTimeoutInSecs = 30 * 60; // 30 min
842: private boolean allowDisposableConnection = false;
843: private boolean borrowValidation = false;
844: private boolean returnValidation = false;
845: private boolean retainLastServer = false;
846:
847: private final ZKClient zkClient;
848: private int maxThriftFrameLength;
849:
850: private String httpUriPath = "/";
851: private Map<String, String> httpHeaders;
852:
853: public Builder(final String thriftClientName, final GrizzlyThriftClientManager manager, final TCPNIOTransport transport,
854: final TServiceClientFactory<T> clientFactory) {
855: this.thriftClientName = thriftClientName;
856: this.manager = manager;
857: this.transport = transport;
858: this.clientFactory = clientFactory;
859: this.zkClient = manager.getZkClient();
860: this.maxThriftFrameLength = manager.getMaxThriftFrameLength();
861: }
862:
863: /**
864: * {@inheritDoc}
865: */
866: @Override
867: public GrizzlyThriftClient<T> build() {
868: final GrizzlyThriftClient<T> thriftClient = new GrizzlyThriftClient<T>(this);
869: thriftClient.start();
870: if (!manager.addThriftClient(thriftClient)) {
871: thriftClient.stop();
872: throw new IllegalStateException(
873: "failed to add the thrift client because the ThriftClientManager already stopped or the same thrift client name existed");
874: }
875: return thriftClient;
876: }
877:
878: /**
879: * Set global connect-timeout
880: * <p>
881: * If the given param is negative, the timeout is infite. Default is 5000.
882: *
883: * @param connectTimeoutInMillis connect-timeout in milli-seconds
884: * @return this builder
885: */
886: public Builder<T> connectTimeoutInMillis(final long connectTimeoutInMillis) {
887: this.connectTimeoutInMillis = connectTimeoutInMillis;
888: return this;
889: }
890:
891: /**
892: * Set global write-timeout
893: * <p>
894: * If the given param is negative, the timeout is infite. Default is 5000.
895: *
896: * @param writeTimeoutInMillis write-timeout in milli-seconds
897: * @return this builder
898: */
899: public Builder<T> writeTimeoutInMillis(final long writeTimeoutInMillis) {
900: this.writeTimeoutInMillis = writeTimeoutInMillis;
901: return this;
902: }
903:
904: /**
905: * Set global response-timeout
906: * <p>
907: * If the given param is negative, the timeout is infite. Default is 10000.
908: *
909: * @param responseTimeoutInMillis response-timeout in milli-seconds
910: * @return this builder
911: */
912: public Builder<T> responseTimeoutInMillis(final long responseTimeoutInMillis) {
913: this.responseTimeoutInMillis = responseTimeoutInMillis;
914: return this;
915: }
916:
917: /**
918: * Set connection pool's min
919: * <p>
920: * Default is 5.
921: *
922: * @param minConnectionPerServer connection pool's min
923: * @return this builder
924: * @see BaseObjectPool.Builder#min(int)
925: */
926: public Builder<T> minConnectionPerServer(final int minConnectionPerServer) {
927: this.minConnectionPerServer = minConnectionPerServer;
928: return this;
929: }
930:
931: /**
932: * Set connection pool's max
933: * <p>
934: * Default is {@link Integer#MAX_VALUE}
935: *
936: * @param maxConnectionPerServer connection pool's max
937: * @return this builder
938: * @see BaseObjectPool.Builder#max(int)
939: */
940: public Builder<T> maxConnectionPerServer(final int maxConnectionPerServer) {
941: this.maxConnectionPerServer = maxConnectionPerServer;
942: return this;
943: }
944:
945: /**
946: * Set connection pool's KeepAliveTimeout
947: * <p>
948: * Default is 1800.
949: *
950: * @param keepAliveTimeoutInSecs connection pool's KeepAliveTimeout in seconds
951: * @return this builder
952: * @see BaseObjectPool.Builder#keepAliveTimeoutInSecs(long)
953: */
954: public Builder<T> keepAliveTimeoutInSecs(final long keepAliveTimeoutInSecs) {
955: this.keepAliveTimeoutInSecs = keepAliveTimeoutInSecs;
956: return this;
957: }
958:
959: /**
960: * Set health monitor's interval
961: * <p>
962: * This thrift client will schedule HealthMonitorTask with this interval.
963: * HealthMonitorTask will check the failure servers periodically and detect the
964: * revived server. If the given parameter is negative, this thrift client never
965: * schedules HealthMonitorTask so this behavior is similar to seting
966: * {@code failover} to be false. Default is 60.
967: *
968: * @param healthMonitorIntervalInSecs interval in seconds
969: * @return this builder
970: */
971: public Builder<T> healthMonitorIntervalInSecs(final long healthMonitorIntervalInSecs) {
972: this.healthMonitorIntervalInSecs = healthMonitorIntervalInSecs;
973: return this;
974: }
975:
976: /**
977: * Allow or disallow disposable connections
978: * <p>
979: * Default is false.
980: *
981: * @param allowDisposableConnection true if this thrift client allows disposable
982: * connections
983: * @return this builder
984: */
985: public Builder<T> allowDisposableConnection(final boolean allowDisposableConnection) {
986: this.allowDisposableConnection = allowDisposableConnection;
987: return this;
988: }
989:
990: /**
991: * Enable or disable the connection validation when the connection is borrowed
992: * from the connection pool
993: * <p>
994: * Default is false.
995: *
996: * @param borrowValidation true if this thrift client should make sure the
997: * borrowed connection is valid
998: * @return this builder
999: */
1000: public Builder<T> borrowValidation(final boolean borrowValidation) {
1001: this.borrowValidation = borrowValidation;
1002: return this;
1003: }
1004:
1005: /**
1006: * Enable or disable the connection validation when the connection is returned
1007: * to the connection pool
1008: * <p>
1009: * Default is false.
1010: *
1011: * @param returnValidation true if this thrift client should make sure the
1012: * returned connection is valid
1013: * @return this builder
1014: */
1015: public Builder<T> returnValidation(final boolean returnValidation) {
1016: this.returnValidation = returnValidation;
1017: return this;
1018: }
1019:
1020: /**
1021: * Enable or disable the keeping a server in the the round-robin list when the
1022: * only one server is remained in the list.
1023: * <p>
1024: * Default is false
1025: *
1026: * @param retainLastServer true if this thrift client should make sure the
1027: * retaining one server in the round-robin list.
1028: * @return this builder
1029: */
1030: public Builder<T> retainLastServer(final boolean retainLastServer) {
1031: this.retainLastServer = retainLastServer;
1032: return this;
1033: }
1034:
1035: /**
1036: * Set initial servers
1037: *
1038: * @param servers server set
1039: * @return this builder
1040: */
1041: public Builder<T> servers(final Set<SocketAddress> servers) {
1042: this.servers = new HashSet<SocketAddress>(servers);
1043: return this;
1044: }
1045:
1046: /**
1047: * Enable or disable failover/failback
1048: * <p>
1049: * Default is true.
1050: *
1051: * @param failover true if this thrift client should support failover/failback
1052: * when the server is failed or revived
1053: * @return this builder
1054: */
1055: public Builder<T> failover(final boolean failover) {
1056: this.failover = failover;
1057: return this;
1058: }
1059:
1060: /**
1061: * Set retry count for connection or sending
1062: * <p>
1063: * Default is 1.
1064: *
1065: * @param retryCount the count for retrials
1066: * @return this builder
1067: */
1068: public Builder<T> retryCount(final int retryCount) {
1069: this.retryCount = retryCount;
1070: return this;
1071: }
1072:
1073: public Builder<T> thriftProtocol(final ThriftProtocols thriftProtocol) {
1074: this.thriftProtocol = thriftProtocol;
1075: return this;
1076: }
1077:
1078: /**
1079: * Set the max length of thrift frame
1080: *
1081: * @param maxThriftFrameLength max frame length
1082: * @return this builder
1083: */
1084: public Builder maxThriftFrameLength(final int maxThriftFrameLength) {
1085: this.maxThriftFrameLength = maxThriftFrameLength;
1086: return this;
1087: }
1088:
1089: public Builder<T> httpUriPath(final String httpUriPath) {
1090: if (httpUriPath == null || httpUriPath.isEmpty()) {
1091: return this;
1092: }
1093: this.transferProtocol = TransferProtocols.HTTP;
1094: this.httpUriPath = httpUriPath;
1095: return this;
1096: }
1097:
1098: public Builder<T> httpHeader(final Header header, final String value) {
1099: return httpHeader(header.toString(), value);
1100: }
1101:
1102: public Builder<T> httpHeader(final String name, final String value) {
1103: if (httpHeaders == null) {
1104: httpHeaders = new HashMap<>();
1105: }
1106: httpHeaders.put(name, value);
1107: return this;
1108: }
1109:
1110: public Builder<T> validationCheckMethodName(final String validationCheckMethodName) {
1111: if (validationCheckMethodName != null) {
1112: this.validationCheckMethodName = validationCheckMethodName;
1113: }
1114: return this;
1115: }
1116: }
1117:
1118: @Override
1119: public String toString() {
1120: final StringBuilder sb = new StringBuilder(256);
1121: sb.append("GrizzlyThriftClient{");
1122: sb.append("thriftClientName='").append(thriftClientName).append('\'');
1123: sb.append(", transport=").append(transport);
1124: sb.append(", connectTimeoutInMillis=").append(connectTimeoutInMillis);
1125: sb.append(", writeTimeoutInMillis=").append(writeTimeoutInMillis);
1126: sb.append(", responseTimeoutInMillis=").append(responseTimeoutInMillis);
1127: sb.append(", validationCheckMethodName='").append(validationCheckMethodName).append('\'');
1128: sb.append(", connectionPool=").append(connectionPool);
1129: sb.append(", initialServers=").append(initialServers);
1130: sb.append(", healthMonitorIntervalInSecs=").append(healthMonitorIntervalInSecs);
1131: sb.append(", healthMonitorTask=").append(healthMonitorTask);
1132: sb.append(", retainLastServer=").append(retainLastServer);
1133: sb.append(", failover=").append(failover);
1134: sb.append(", retryCount=").append(retryCount);
1135: sb.append(", thriftProtocol=").append(thriftProtocol);
1136: sb.append(", clientFactory=").append(clientFactory);
1137: sb.append(", zooKeeperServerListPath='").append(zooKeeperServerListPath).append('\'');
1138: sb.append(", transferProtocol=").append(transferProtocol);
1139: sb.append(", maxThriftFrameLength=").append(maxThriftFrameLength);
1140: sb.append(", httpUriPath='").append(httpUriPath).append('\'');
1141: sb.append('}');
1142: return sb.toString();
1143: }
1144: }