Skip to content

Package: TGrizzlyClientTransport

TGrizzlyClientTransport

nameinstructionbranchcomplexitylinemethod
TGrizzlyClientTransport(Connection, long, long)
M: 45 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 12 C: 0
0%
M: 1 C: 0
0%
checkConnectionOpen()
M: 9 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
close()
M: 43 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 15 C: 0
0%
M: 1 C: 0
0%
create(Connection)
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
create(Connection, long)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
create(Connection, long, long)
M: 52 C: 0
0%
M: 8 C: 0
0%
M: 5 C: 0
0%
M: 13 C: 0
0%
M: 1 C: 0
0%
flush()
M: 48 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 16 C: 0
0%
M: 1 C: 0
0%
getGrizzlyConnection()
M: 3 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
getInputBuffer()
M: 41 C: 0
0%
M: 8 C: 0
0%
M: 5 C: 0
0%
M: 12 C: 0
0%
M: 1 C: 0
0%
getLocalInput(long)
M: 28 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 8 C: 0
0%
M: 1 C: 0
0%
getOutputStream()
M: 3 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
isOpen()
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*
2: * Copyright (c) 2011, 2017 Oracle and/or its affiliates. All rights reserved.
3: *
4: * This program and the accompanying materials are made available under the
5: * terms of the Eclipse Public License v. 2.0, which is available at
6: * http://www.eclipse.org/legal/epl-2.0.
7: *
8: * This Source Code may also be made available under the following Secondary
9: * Licenses when the conditions for such availability set forth in the
10: * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
11: * version 2 with the GNU Classpath Exception, which is available at
12: * https://www.gnu.org/software/classpath/license.html.
13: *
14: * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
15: */
16:
17: package org.glassfish.grizzly.thrift;
18:
19: import java.io.IOException;
20: import java.util.concurrent.BlockingQueue;
21: import java.util.concurrent.ExecutionException;
22: import java.util.concurrent.LinkedTransferQueue;
23: import java.util.concurrent.TimeUnit;
24: import java.util.concurrent.TimeoutException;
25: import java.util.concurrent.atomic.AtomicBoolean;
26:
27: import org.apache.thrift.transport.TTransportException;
28: import org.glassfish.grizzly.Buffer;
29: import org.glassfish.grizzly.Connection;
30: import org.glassfish.grizzly.Grizzly;
31: import org.glassfish.grizzly.GrizzlyFuture;
32: import org.glassfish.grizzly.Processor;
33: import org.glassfish.grizzly.attributes.Attribute;
34: import org.glassfish.grizzly.filterchain.FilterChain;
35: import org.glassfish.grizzly.memory.MemoryManager;
36: import org.glassfish.grizzly.thrift.client.GrizzlyThriftClient;
37: import org.glassfish.grizzly.utils.BufferOutputStream;
38:
39: /**
40: * TGrizzlyClientTransport is the client-side TTransport.
41: * <p>
42: * BlockingQueue which belongs to ThriftClientFilter has input messages when
43: * server's response are arrived. Only TTransport#flush() will be called, output
44: * messages will be written. Before flush(), output messages will be stored in
45: * buffer.
46: *
47: * @author Bongjae Chang
48: */
49: public class TGrizzlyClientTransport extends AbstractTGrizzlyTransport {
50:
51: private static final long DEFAULT_READ_TIMEOUT_MILLIS = -1L; // never timed out
52: private static final long DEFAULT_WRITE_TIMEOUT_MILLIS = -1L; // never timed out
53:
54: private Buffer input = null;
55: private final Connection connection;
56: private final BlockingQueue<Buffer> inputBuffersQueue;
57: private final BufferOutputStream outputStream;
58: private final long readTimeoutMillis;
59: private final long writeTimeoutMillis;
60:
61: private final Attribute<BlockingQueue<Buffer>> inputBuffersQueueAttribute = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
62: .createAttribute(GrizzlyThriftClient.INPUT_BUFFERS_QUEUE_ATTRIBUTE_NAME);
63:
64: private final AtomicBoolean running = new AtomicBoolean();
65:
66: public static TGrizzlyClientTransport create(final Connection connection) {
67: return create(connection, DEFAULT_READ_TIMEOUT_MILLIS);
68: }
69:
70: public static TGrizzlyClientTransport create(final Connection connection, final long readTimeoutMillis) {
71: return create(connection, readTimeoutMillis, DEFAULT_WRITE_TIMEOUT_MILLIS);
72: }
73:
74: public static TGrizzlyClientTransport create(final Connection connection, final long readTimeoutMillis, final long writeTimeoutMillis) {
75:• if (connection == null) {
76: throw new IllegalStateException("connection should not be null.");
77: }
78:
79: final Processor processor = connection.getProcessor();
80:• if (!(processor instanceof FilterChain)) {
81: throw new IllegalStateException("connection's processor has to be a FilterChain.");
82: }
83: final FilterChain connectionFilterChain = (FilterChain) processor;
84: final int idx = connectionFilterChain.indexOfType(ThriftClientFilter.class);
85:• if (idx == -1) {
86: throw new IllegalStateException("connection has to have ThriftClientFilter in the FilterChain.");
87: }
88: final ThriftClientFilter thriftClientFilter = (ThriftClientFilter) connectionFilterChain.get(idx);
89:• if (thriftClientFilter == null) {
90: throw new IllegalStateException("thriftClientFilter should not be null.");
91: }
92: return new TGrizzlyClientTransport(connection, readTimeoutMillis, writeTimeoutMillis);
93: }
94:
95: private TGrizzlyClientTransport(final Connection connection, final long readTimeoutMillis, final long writeTimeoutMillis) {
96: this.connection = connection;
97: this.inputBuffersQueue = new LinkedTransferQueue<>();
98: inputBuffersQueueAttribute.set(connection, this.inputBuffersQueue);
99: this.outputStream = new BufferOutputStream(connection.getTransport().getMemoryManager()) {
100:
101: @Override
102: protected Buffer allocateNewBuffer(final MemoryManager memoryManager, final int size) {
103: final Buffer b = memoryManager.allocate(size);
104: b.allowBufferDispose(true);
105: return b;
106: }
107: };
108: this.readTimeoutMillis = readTimeoutMillis;
109: this.writeTimeoutMillis = writeTimeoutMillis;
110: }
111:
112: @Override
113: public boolean isOpen() {
114: return connection.isOpen();
115: }
116:
117: @Override
118: public void close() {
119:• if (!running.compareAndSet(false, true)) {
120: return;
121: }
122: final Buffer output = outputStream.getBuffer();
123: output.dispose();
124: try {
125: outputStream.close();
126: } catch (IOException ignore) {
127: }
128: inputBuffersQueueAttribute.remove(connection);
129: inputBuffersQueue.clear();
130: try {
131: final GrizzlyFuture closeFuture = connection.close();
132: closeFuture.get(3, TimeUnit.SECONDS);
133: } catch (Exception ignore) {
134: }
135: running.set(false);
136: }
137:
138: @Override
139: @SuppressWarnings("unchecked")
140: public void flush() throws TTransportException {
141: checkConnectionOpen();
142: final Buffer output = outputStream.getBuffer();
143: output.trim();
144: outputStream.reset();
145: try {
146: final GrizzlyFuture future = connection.write(output);
147:• if (writeTimeoutMillis > 0) {
148: future.get(writeTimeoutMillis, TimeUnit.MILLISECONDS);
149: } else {
150: future.get();
151: }
152: } catch (TimeoutException te) {
153: throw new TTimedoutException(te);
154: } catch (ExecutionException ee) {
155: throw new TTransportException(ee);
156: } catch (InterruptedException ie) {
157: Thread.currentThread().interrupt();
158: }
159: }
160:
161: public Connection getGrizzlyConnection() {
162: return connection;
163: }
164:
165: @Override
166: protected Buffer getInputBuffer() throws TTransportException {
167: Buffer localInput = this.input;
168:• if (localInput == null) {
169: localInput = getLocalInput(readTimeoutMillis);
170:• } else if (localInput.remaining() <= 0) {
171: localInput.dispose();
172: localInput = getLocalInput(readTimeoutMillis);
173: }
174:• if (localInput == null) {
175: throw new TTimedoutException("timed out while reading the input buffer.");
176:• } else if (localInput == ThriftClientFilter.POISON) {
177: throw new TTransportException("client connection was already closed.");
178: }
179: this.input = localInput;
180: return localInput;
181: }
182:
183: private Buffer getLocalInput(final long readTimeoutMillis) throws TTransportException {
184: final Buffer localInput;
185: try {
186:• if (readTimeoutMillis < 0) {
187: localInput = inputBuffersQueue.take();
188: } else {
189: localInput = inputBuffersQueue.poll(readTimeoutMillis, TimeUnit.MILLISECONDS);
190: }
191: } catch (InterruptedException ie) {
192: Thread.currentThread().interrupt();
193: throw new TTransportException(ie);
194: }
195: return localInput;
196: }
197:
198: @Override
199: protected BufferOutputStream getOutputStream() {
200: return outputStream;
201: }
202:
203: private void checkConnectionOpen() throws TTransportException {
204:• if (!isOpen()) {
205: throw new TTransportException("client connection is closed.");
206: }
207: }
208: }