Skip to content

Package: ThriftClientFilter

ThriftClientFilter

nameinstructionbranchcomplexitylinemethod
ThriftClientFilter()
M: 18 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 7 C: 0
0%
M: 1 C: 0
0%
handleClose(FilterChainContext)
M: 57 C: 0
0%
M: 12 C: 0
0%
M: 7 C: 0
0%
M: 15 C: 0
0%
M: 1 C: 0
0%
handleRead(FilterChainContext)
M: 47 C: 0
0%
M: 8 C: 0
0%
M: 5 C: 0
0%
M: 13 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 6 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*
2: * Copyright (c) 2011, 2017 Oracle and/or its affiliates. All rights reserved.
3: *
4: * This program and the accompanying materials are made available under the
5: * terms of the Eclipse Public License v. 2.0, which is available at
6: * http://www.eclipse.org/legal/epl-2.0.
7: *
8: * This Source Code may also be made available under the following Secondary
9: * Licenses when the conditions for such availability set forth in the
10: * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
11: * version 2 with the GNU Classpath Exception, which is available at
12: * https://www.gnu.org/software/classpath/license.html.
13: *
14: * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
15: */
16:
17: package org.glassfish.grizzly.thrift;
18:
19: import java.io.IOException;
20: import java.net.SocketAddress;
21: import java.util.concurrent.BlockingQueue;
22: import java.util.concurrent.TransferQueue;
23: import java.util.logging.Level;
24: import java.util.logging.Logger;
25:
26: import org.apache.thrift.TServiceClient;
27: import org.glassfish.grizzly.Buffer;
28: import org.glassfish.grizzly.Connection;
29: import org.glassfish.grizzly.Grizzly;
30: import org.glassfish.grizzly.attributes.Attribute;
31: import org.glassfish.grizzly.filterchain.BaseFilter;
32: import org.glassfish.grizzly.filterchain.FilterChainContext;
33: import org.glassfish.grizzly.filterchain.NextAction;
34: import org.glassfish.grizzly.memory.Buffers;
35: import org.glassfish.grizzly.thrift.client.GrizzlyThriftClient;
36: import org.glassfish.grizzly.thrift.client.pool.ObjectPool;
37:
38: /**
39: * ThriftClientFilter is a client-side filter for Thrift RPC processors.
40: * <p>
41: * Read-messages will be queued in LinkedBlockingQueue from which
42: * TGrizzlyClientTransport will read it.
43: * <p>
44: * Usages:
45: *
46: * <pre>
47: * {@code
48: * final FilterChainBuilder clientFilterChainBuilder = FilterChainBuilder.stateless();
49: * clientFilterChainBuilder.add(new TransportFilter()).add(new ThriftFrameFilter()).add(new ThriftClientFilter());
50: * <p>
51: * final TCPNIOTransport transport = TCPNIOTransportBuilder.newInstance().build();
52: * transport.setProcessor(clientFilterChainBuilder.build());
53: * transport.start();
54: * Future<Connection> future = transport.connect(ip, port);
55: * final Connection connection = future.get(10, TimeUnit.SECONDS);
56: * <p>
57: * final TTransport ttransport = TGrizzlyClientTransport.create(connection);
58: * final TProtocol tprotocol = new TBinaryProtocol(ttransport);
59: * user-generated.thrift.Client client = new user-generated.thrift.Client(tprotocol);
60: * client.ping();
61: * // execute more works
62: * // ...
63: * // release
64: * ttransport.close();
65: * connection.close();
66: * transport.shutdownNow();
67: * }
68: * </pre>
69: *
70: * @author Bongjae Chang
71: */
72: public class ThriftClientFilter<T extends TServiceClient> extends BaseFilter {
73:
74: private static final Logger logger = Grizzly.logger(ThriftClientFilter.class);
75:
76: private final Attribute<ObjectPool<SocketAddress, T>> connectionPoolAttribute = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
77: .createAttribute(GrizzlyThriftClient.CONNECTION_POOL_ATTRIBUTE_NAME);
78: private final Attribute<T> connectionClientAttribute = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
79: .createAttribute(GrizzlyThriftClient.CLIENT_ATTRIBUTE_NAME);
80: private final Attribute<BlockingQueue<Buffer>> inputBuffersQueueAttribute = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
81: .createAttribute(GrizzlyThriftClient.INPUT_BUFFERS_QUEUE_ATTRIBUTE_NAME);
82:
83: static final Buffer POISON = Buffers.EMPTY_BUFFER;
84:
85: @Override
86: public NextAction handleRead(FilterChainContext ctx) throws IOException {
87: final Buffer input = ctx.getMessage();
88:• if (input == null) {
89: throw new IOException("input message could not be null");
90: }
91:• if (!input.hasRemaining()) {
92: return ctx.getStopAction();
93: }
94: final Connection connection = ctx.getConnection();
95:• if (connection == null) {
96: throw new IOException("connection must not be null");
97: }
98: final BlockingQueue<Buffer> inputBuffersQueue = inputBuffersQueueAttribute.get(connection);
99:• if (inputBuffersQueue == null) {
100: throw new IOException("inputBuffersQueue must not be null");
101: }
102: inputBuffersQueue.offer(input);
103: return ctx.getStopAction();
104: }
105:
106: @SuppressWarnings("unchecked")
107: @Override
108: public NextAction handleClose(FilterChainContext ctx) throws IOException {
109: final Connection<SocketAddress> connection = ctx.getConnection();
110:• if (connection != null) {
111: boolean hasWaitingConsumer = false;
112: final TransferQueue<Buffer> inputBuffersQueue = (TransferQueue<Buffer>) inputBuffersQueueAttribute.remove(connection);
113:• if (inputBuffersQueue != null) {
114: hasWaitingConsumer = inputBuffersQueue.tryTransfer(POISON);
115: }
116:
117: final ObjectPool<SocketAddress, T> connectionPool = connectionPoolAttribute.remove(connection);
118: final T client = connectionClientAttribute.remove(connection);
119:• if (connectionPool != null && client != null && !hasWaitingConsumer) {
120: try {
121: connectionPool.removeObject(connection.getPeerAddress(), client);
122:• if (logger.isLoggable(Level.FINE)) {
123: logger.log(Level.FINE, "the connection has been removed in pool. connection={0}", connection);
124: }
125: } catch (Exception ignore) {
126: }
127: }
128: }
129:
130: return ctx.getInvokeAction();
131: }
132: }