Skip to content

Package: GrizzlyThriftClientManager

GrizzlyThriftClientManager

nameinstructionbranchcomplexitylinemethod
GrizzlyThriftClientManager(GrizzlyThriftClientManager.Builder)
M: 187 C: 0
0%
M: 12 C: 0
0%
M: 7 C: 0
0%
M: 48 C: 0
0%
M: 1 C: 0
0%
addThriftClient(GrizzlyThriftClient)
M: 28 C: 0
0%
M: 10 C: 0
0%
M: 6 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
createThriftClientBuilder(String, TServiceClientFactory)
M: 9 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
getMaxThriftFrameLength()
M: 3 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
getThriftClient(String)
M: 16 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
getZkClient()
M: 3 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
removeThriftClient(String)
M: 24 C: 0
0%
M: 6 C: 0
0%
M: 4 C: 0
0%
M: 9 C: 0
0%
M: 1 C: 0
0%
shutdown()
M: 52 C: 0
0%
M: 12 C: 0
0%
M: 7 C: 0
0%
M: 15 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*
2: * Copyright (c) 2012, 2017 Oracle and/or its affiliates. All rights reserved.
3: *
4: * This program and the accompanying materials are made available under the
5: * terms of the Eclipse Public License v. 2.0, which is available at
6: * http://www.eclipse.org/legal/epl-2.0.
7: *
8: * This Source Code may also be made available under the following Secondary
9: * Licenses when the conditions for such availability set forth in the
10: * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
11: * version 2 with the GNU Classpath Exception, which is available at
12: * https://www.gnu.org/software/classpath/license.html.
13: *
14: * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
15: */
16:
17: package org.glassfish.grizzly.thrift.client;
18:
19: import java.io.IOException;
20: import java.util.concurrent.ConcurrentHashMap;
21: import java.util.concurrent.ExecutorService;
22: import java.util.concurrent.atomic.AtomicBoolean;
23: import java.util.logging.Level;
24: import java.util.logging.Logger;
25:
26: import org.apache.thrift.TServiceClient;
27: import org.apache.thrift.TServiceClientFactory;
28: import org.glassfish.grizzly.Grizzly;
29: import org.glassfish.grizzly.IOStrategy;
30: import org.glassfish.grizzly.filterchain.FilterChainBuilder;
31: import org.glassfish.grizzly.filterchain.TransportFilter;
32: import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
33: import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder;
34: import org.glassfish.grizzly.strategies.SameThreadIOStrategy;
35: import org.glassfish.grizzly.thrift.ThriftClientFilter;
36: import org.glassfish.grizzly.thrift.ThriftFrameFilter;
37: import org.glassfish.grizzly.thrift.client.zookeeper.ZKClient;
38: import org.glassfish.grizzly.thrift.client.zookeeper.ZooKeeperConfig;
39:
40: /**
41: * The implementation of the {@link ThriftClientManager} based on Grizzly
42: * <p>
43: * This thrift client manager has a key(String thrift client
44: * name)/value({@link GrizzlyThriftClient} map for retrieving thrift clients. If
45: * the specific {@link org.glassfish.grizzly.nio.transport.TCPNIOTransport
46: * GrizzlyTransport} is not set at creation time, this will create a main
47: * GrizzlyTransport. The
48: * {@link org.glassfish.grizzly.nio.transport.TCPNIOTransport GrizzlyTransport}
49: * must contain {@link org.glassfish.grizzly.thrift.ThriftFrameFilter} and
50: * {@link org.glassfish.grizzly.thrift.ThriftClientFilter}.
51: *
52: * @author Bongjae Chang
53: */
54: public class GrizzlyThriftClientManager implements ThriftClientManager {
55:
56: private static final Logger logger = Grizzly.logger(GrizzlyThriftClientManager.class);
57:
58: private final ConcurrentHashMap<String, GrizzlyThriftClient<?>> thriftClients = new ConcurrentHashMap<String, GrizzlyThriftClient<?>>();
59: private final TCPNIOTransport transport;
60: private final boolean isExternalTransport;
61: private final AtomicBoolean shutdown = new AtomicBoolean(false);
62:
63: private ZKClient zkClient;
64: private final int maxThriftFrameLength;
65:
66: private GrizzlyThriftClientManager(final Builder builder) {
67: this.maxThriftFrameLength = builder.maxThriftFrameLength;
68: TCPNIOTransport transportLocal = builder.transport;
69:• if (transportLocal == null) {
70: isExternalTransport = false;
71: final FilterChainBuilder clientFilterChainBuilder = FilterChainBuilder.stateless();
72: clientFilterChainBuilder.add(new TransportFilter()).add(new ThriftFrameFilter(builder.maxThriftFrameLength))
73: .add(new ThriftClientFilter());
74: final TCPNIOTransportBuilder clientTCPNIOTransportBuilder = TCPNIOTransportBuilder.newInstance();
75: transportLocal = clientTCPNIOTransportBuilder.build();
76: transportLocal.setProcessor(clientFilterChainBuilder.build());
77: transportLocal.setSelectorRunnersCount(builder.selectorRunnersCount);
78: transportLocal.setIOStrategy(builder.ioStrategy);
79: transportLocal.configureBlocking(builder.blocking);
80:• if (builder.workerThreadPool != null) {
81: transportLocal.setWorkerThreadPool(builder.workerThreadPool);
82: }
83: try {
84: transportLocal.start();
85: } catch (IOException ie) {
86:• if (logger.isLoggable(Level.SEVERE)) {
87: logger.log(Level.SEVERE, "failed to start the transport", ie);
88: }
89: }
90: } else {
91: isExternalTransport = true;
92: }
93: this.transport = transportLocal;
94:• if (builder.zooKeeperConfig != null) {
95: final ZKClient.Builder zkBuilder = new ZKClient.Builder(builder.zooKeeperConfig.getName(),
96: builder.zooKeeperConfig.getZooKeeperServerList());
97: zkBuilder.rootPath(builder.zooKeeperConfig.getRootPath());
98: zkBuilder.connectTimeoutInMillis(builder.zooKeeperConfig.getConnectTimeoutInMillis());
99: zkBuilder.sessionTimeoutInMillis(builder.zooKeeperConfig.getSessionTimeoutInMillis());
100: zkBuilder.commitDelayTimeInSecs(builder.zooKeeperConfig.getCommitDelayTimeInSecs());
101: this.zkClient = zkBuilder.build();
102: try {
103: this.zkClient.connect();
104: } catch (IOException ie) {
105:• if (logger.isLoggable(Level.SEVERE)) {
106: logger.log(Level.SEVERE, "failed to connect the zookeeper server. zkClient=" + this.zkClient, ie);
107: }
108: this.zkClient = null;
109: } catch (InterruptedException ie) {
110:• if (logger.isLoggable(Level.SEVERE)) {
111: logger.log(Level.SEVERE, "failed to connect the zookeeper server. zkClient=" + this.zkClient, ie);
112: }
113: Thread.currentThread().interrupt();
114: this.zkClient = null;
115: }
116: } else {
117: this.zkClient = null;
118: }
119: }
120:
121: /**
122: * {@inheritDoc}
123: */
124: @Override
125: public <T extends TServiceClient> GrizzlyThriftClient.Builder<T> createThriftClientBuilder(final String thriftClientName,
126: final TServiceClientFactory<T> thriftClientFactory) {
127: return new GrizzlyThriftClient.Builder<T>(thriftClientName, this, transport, thriftClientFactory);
128: }
129:
130: /**
131: * {@inheritDoc}
132: */
133: @SuppressWarnings("unchecked")
134: @Override
135: public <T extends TServiceClient> GrizzlyThriftClient<T> getThriftClient(final String thriftClientName) {
136:• if (shutdown.get()) {
137: return null;
138: }
139:• return thriftClientName != null ? (GrizzlyThriftClient<T>) thriftClients.get(thriftClientName) : null;
140: }
141:
142: /**
143: * {@inheritDoc}
144: */
145: @Override
146: public boolean removeThriftClient(final String thriftClientName) {
147:• if (shutdown.get()) {
148: return false;
149: }
150:• if (thriftClientName == null)
151: return false;
152: final GrizzlyThriftClient thriftClient = thriftClients.remove(thriftClientName);
153:• if (thriftClient == null) {
154: return false;
155: }
156: thriftClient.stop();
157: return true;
158: }
159:
160: /**
161: * {@inheritDoc}
162: */
163: @Override
164: public void shutdown() {
165:• if (!shutdown.compareAndSet(false, true)) {
166: return;
167: }
168:• for (ThriftClient thriftClient : thriftClients.values()) {
169: thriftClient.stop();
170: }
171: thriftClients.clear();
172:• if (!isExternalTransport && transport != null) {
173: try {
174: transport.shutdownNow();
175: } catch (IOException ie) {
176:• if (logger.isLoggable(Level.INFO)) {
177: logger.log(Level.INFO, "failed to stop the transport", ie);
178: }
179: }
180: }
181:• if (zkClient != null) {
182: zkClient.shutdown();
183: }
184: }
185:
186: /**
187: * Add the given {@code thriftClient} to this thrift client manager
188: * <p/>
189: * If this returns false, the given {@code thriftClient} should be stopped by
190: * caller. Currently, this method is called by only
191: * {@link org.glassfish.grizzly.thrift.client.GrizzlyThriftClient.Builder#build()}.
192: *
193: * @param thriftClient a thrift client instance
194: * @return true if the thrift client was added
195: */
196: <T extends TServiceClient> boolean addThriftClient(final GrizzlyThriftClient<T> thriftClient) {
197:• return !shutdown.get() && thriftClient != null && thriftClients.putIfAbsent(thriftClient.getName(), thriftClient) == null
198:• && !(shutdown.get() && thriftClients.remove(thriftClient.getName()) == thriftClient);
199: }
200:
201: ZKClient getZkClient() {
202: return zkClient;
203: }
204:
205: int getMaxThriftFrameLength() {
206: return maxThriftFrameLength;
207: }
208:
209: public static class Builder {
210:
211: private TCPNIOTransport transport;
212:
213: // grizzly config
214: private int selectorRunnersCount = Runtime.getRuntime().availableProcessors() * 2;
215: private IOStrategy ioStrategy = SameThreadIOStrategy.getInstance();
216: private boolean blocking = false;
217: private ExecutorService workerThreadPool;
218: private int maxThriftFrameLength = 1024 * 1024; // 1M
219:
220: // zookeeper config
221: private ZooKeeperConfig zooKeeperConfig;
222:
223: /**
224: * Set the specific {@link org.glassfish.grizzly.nio.transport.TCPNIOTransport
225: * GrizzlyTransport}
226: * <p>
227: * If this is not set or set to be null,
228: * {@link org.glassfish.grizzly.thrift.client.GrizzlyThriftClientManager} will
229: * create a default transport. The given {@code transport} must be always
230: * started state if it is not null. Default is null.
231: *
232: * @param transport the specific Grizzly's
233: * {@link org.glassfish.grizzly.nio.transport.TCPNIOTransport}
234: * @return this builder
235: */
236: public Builder transport(final TCPNIOTransport transport) {
237: this.transport = transport;
238: return this;
239: }
240:
241: /**
242: * Set selector threads' count
243: * <p>
244: * If this thrift client manager will create a default transport, the given
245: * selector counts will be passed to
246: * {@link org.glassfish.grizzly.nio.transport.TCPNIOTransport}. Default is
247: * processors' count * 2.
248: *
249: * @param selectorRunnersCount selector threads' count
250: * @return this builder
251: */
252: public Builder selectorRunnersCount(final int selectorRunnersCount) {
253: this.selectorRunnersCount = selectorRunnersCount;
254: return this;
255: }
256:
257: /**
258: * Set the specific IO Strategy of Grizzly
259: * <p>
260: * If this thrift client manager will create a default transport, the given
261: * {@link org.glassfish.grizzly.IOStrategy} will be passed to
262: * {@link org.glassfish.grizzly.nio.transport.TCPNIOTransport}. Default is
263: * {@link org.glassfish.grizzly.strategies.SameThreadIOStrategy}.
264: *
265: * @param ioStrategy the specific IO Strategy
266: * @return this builder
267: */
268: public Builder ioStrategy(final IOStrategy ioStrategy) {
269: this.ioStrategy = ioStrategy;
270: return this;
271: }
272:
273: /**
274: * Enable or disable the blocking mode
275: * <p>
276: * If this thrift client manager will create a default transport, the given mode
277: * will be passed to
278: * {@link org.glassfish.grizzly.nio.transport.TCPNIOTransport}. Default is
279: * false.
280: *
281: * @param blocking true means the blocking mode
282: * @return this builder
283: */
284: public Builder blocking(final boolean blocking) {
285: this.blocking = blocking;
286: return this;
287: }
288:
289: /**
290: * Set the specific worker thread pool
291: * <p>
292: * If this thrift client manager will create a default transport, the given
293: * {@link java.util.concurrent.ExecutorService} will be passed to
294: * {@link org.glassfish.grizzly.nio.transport.TCPNIOTransport}. This is only
295: * effective if {@link org.glassfish.grizzly.IOStrategy} is not
296: * {@link org.glassfish.grizzly.strategies.SameThreadIOStrategy}. Default is
297: * null.
298: *
299: * @param workerThreadPool worker thread pool
300: * @return this builder
301: */
302: public Builder workerThreadPool(final ExecutorService workerThreadPool) {
303: this.workerThreadPool = workerThreadPool;
304: return this;
305: }
306:
307: /**
308: * Set the {@link ZooKeeperConfig} for synchronizing thrift server list among
309: * thrift clients
310: *
311: * @param zooKeeperConfig zookeeper config. if {@code zooKeeperConfig} is null,
312: * the zookeeper is never used.
313: * @return this builder
314: */
315: public Builder zooKeeperConfig(final ZooKeeperConfig zooKeeperConfig) {
316: this.zooKeeperConfig = zooKeeperConfig;
317: return this;
318: }
319:
320: /**
321: * Set the max length of thrift frame
322: *
323: * @param maxThriftFrameLength max frame length
324: * @return this builder
325: */
326: public Builder maxThriftFrameLength(final int maxThriftFrameLength) {
327: this.maxThriftFrameLength = maxThriftFrameLength;
328: return this;
329: }
330:
331: /**
332: * Create a
333: * {@link org.glassfish.grizzly.thrift.client.GrizzlyThriftClientManager}
334: * instance with this builder's properties
335: *
336: * @return a thrift client manager
337: */
338: public GrizzlyThriftClientManager build() {
339: return new GrizzlyThriftClientManager(this);
340: }
341: }
342: }