Skip to content

Package: MemcachedClientFilter

MemcachedClientFilter

nameinstructionbranchcomplexitylinemethod
MemcachedClientFilter()
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
MemcachedClientFilter(boolean, boolean)
M: 37 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 11 C: 0
0%
M: 1 C: 0
0%
calculateTotalPacketSize(MemcachedRequest[])
M: 42 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 8 C: 0
0%
M: 1 C: 0
0%
getCorrelatedResponse(Connection, MemcachedRequest, long)
M: 89 C: 0
0%
M: 18 C: 0
0%
M: 10 C: 0
0%
M: 21 C: 0
0%
M: 1 C: 0
0%
getMultiResponse(Connection, MemcachedRequest[], long, Map)
M: 182 C: 0
0%
M: 30 C: 0
0%
M: 16 C: 0
0%
M: 34 C: 0
0%
M: 1 C: 0
0%
handleClose(FilterChainContext)
M: 57 C: 0
0%
M: 8 C: 0
0%
M: 5 C: 0
0%
M: 16 C: 0
0%
M: 1 C: 0
0%
handleRead(FilterChainContext)
M: 590 C: 0
0%
M: 72 C: 0
0%
M: 40 C: 0
0%
M: 154 C: 0
0%
M: 1 C: 0
0%
handleWrite(FilterChainContext)
M: 100 C: 0
0%
M: 16 C: 0
0%
M: 9 C: 0
0%
M: 25 C: 0
0%
M: 1 C: 0
0%
makePackets(MemoryManager, Connection, MemcachedRequest[], BlockingQueue)
M: 173 C: 0
0%
M: 20 C: 0
0%
M: 11 C: 0
0%
M: 43 C: 0
0%
M: 1 C: 0
0%
makePacketsByOnceAllocation(MemoryManager, Connection, MemcachedRequest[], BlockingQueue, int)
M: 162 C: 0
0%
M: 20 C: 0
0%
M: 11 C: 0
0%
M: 40 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*
2: * Copyright (c) 2012, 2017 Oracle and/or its affiliates. All rights reserved.
3: *
4: * This program and the accompanying materials are made available under the
5: * terms of the Eclipse Public License v. 2.0, which is available at
6: * http://www.eclipse.org/legal/epl-2.0.
7: *
8: * This Source Code may also be made available under the following Secondary
9: * Licenses when the conditions for such availability set forth in the
10: * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
11: * version 2 with the GNU Classpath Exception, which is available at
12: * https://www.gnu.org/software/classpath/license.html.
13: *
14: * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
15: */
16:
17: package org.glassfish.grizzly.memcached;
18:
19: import org.glassfish.grizzly.Buffer;
20: import org.glassfish.grizzly.Connection;
21: import org.glassfish.grizzly.Grizzly;
22: import org.glassfish.grizzly.attributes.Attribute;
23: import org.glassfish.grizzly.filterchain.BaseFilter;
24: import org.glassfish.grizzly.filterchain.FilterChainContext;
25: import org.glassfish.grizzly.filterchain.NextAction;
26: import org.glassfish.grizzly.memcached.pool.ObjectPool;
27: import org.glassfish.grizzly.memory.Buffers;
28: import org.glassfish.grizzly.memory.CompositeBuffer;
29: import org.glassfish.grizzly.memory.MemoryManager;
30: import org.glassfish.grizzly.utils.NullaryFunction;
31:
32: import java.io.IOException;
33: import java.net.SocketAddress;
34: import java.util.Map;
35: import java.util.concurrent.BlockingQueue;
36: import java.util.concurrent.LinkedTransferQueue;
37: import java.util.concurrent.TimeUnit;
38: import java.util.concurrent.TimeoutException;
39: import java.util.logging.Level;
40: import java.util.logging.Logger;
41:
42: /**
43: * The {@link org.glassfish.grizzly.filterchain.Filter} implementation for memcached
44: * <p>
45: * This filter has an unbounded {@link BlockingQueue} per a connection for storing user's request.
46: * When the response will be received, the corresponding request will be removed in queue
47: * and the filter will pass the complete result to original request and notify the waiting sender.
48: * <p>
49: * If the memcached's command is a kind of quiet's, it is possible for the server not to send the response to client.
50: * Then the filter skips the quiet request with {@link ParsingStatus#NO_REPLY} status and processes next request.
51: * <p>
52: * This filter has two options.
53: * 1) {@code localParsingOptimizing}:
54: * the input buffer has more than 1 complete memcached message,
55: * the filter will parse the input buffer continuously in the same thread and local loop
56: * without going through filter chains and spliting up the input buffer if this flag is true.
57: * <p>
58: * 2) {@code onceAllocationOptimizing}:
59: * Before multi-command(bulk-command) like getMulti and setMulti will be sent to the server, individual packets should be allocated.
60: * If this flag is true, the filter will calculate the total buffer size of individual requests in advance
61: * and will allocate only a {@link Buffer} once.
62: *
63: * @author Bongjae Chang
64: */
65: public class MemcachedClientFilter extends BaseFilter {
66:
67: private static final Logger logger = Grizzly.logger(MemcachedClientFilter.class);
68:
69: private static final int MAX_WRITE_BUFFER_SIZE_FOR_OPTIMIZING = 1024 * 1024; // 1m
70:
71: private static final int HEADER_LENGTH = 24;
72: private static final byte REQUEST_MAGIC_NUMBER = (byte) (0x80 & 0xFF);
73: private static final byte RESPONSE_MAGIC_NUMBER = (byte) (0x81 & 0xFF);
74:
75: public enum ParsingStatus {
76: NONE, READ_HEADER, READ_EXTRAS, READ_KEY, READ_VALUE, DONE, NO_REPLY
77: }
78:
79: private final Attribute<ParsingStatus> statusAttribute = Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("MemcachedClientFilter.Status");
80: private final Attribute<MemcachedResponse> responseAttribute =
81: Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("MemcachedClientFilter.Response",
82: new NullaryFunction<MemcachedResponse>() {
83: public MemcachedResponse evaluate() {
84: return MemcachedResponse.create();
85: }
86: });
87:
88: private final Attribute<BlockingQueue<MemcachedRequest>> requestQueueAttribute =
89: Grizzly.DEFAULT_ATTRIBUTE_BUILDER.<BlockingQueue<MemcachedRequest>>createAttribute("MemcachedClientFilter.RequestQueue",
90: new NullaryFunction<BlockingQueue<MemcachedRequest>>() {
91: public BlockingQueue<MemcachedRequest> evaluate() {
92: return new LinkedTransferQueue<>();
93: }
94: });
95:
96: private final Attribute<ObjectPool<SocketAddress, Connection<SocketAddress>>> connectionPoolAttribute =
97: Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(GrizzlyMemcachedCache.CONNECTION_POOL_ATTRIBUTE_NAME);
98:
99: private final boolean localParsingOptimizing;
100: private final boolean onceAllocationOptimizing;
101:
102: public MemcachedClientFilter() {
103: this(false, true);
104: }
105:
106: public MemcachedClientFilter(final boolean localParsingOptimizing, final boolean onceAllocationOptimizing) {
107: this.localParsingOptimizing = localParsingOptimizing;
108: this.onceAllocationOptimizing = onceAllocationOptimizing;
109: }
110:
111: @Override
112: public NextAction handleRead(FilterChainContext ctx) throws IOException {
113: final Buffer input = ctx.getMessage();
114:• if (input == null) {
115: throw new IOException("input message could not be null");
116: }
117:• if (!input.hasRemaining()) {
118: return ctx.getStopAction();
119: }
120: final Connection connection = ctx.getConnection();
121:• if (connection == null) {
122: throw new IOException("connection could not be null");
123: }
124: MemoryManager memoryManager = ctx.getMemoryManager();
125:• if (memoryManager == null) {
126: memoryManager = MemoryManager.DEFAULT_MEMORY_MANAGER;
127: }
128:
129: ParsingStatus status = statusAttribute.get(connection);
130:• if (status == null) {
131: status = ParsingStatus.NONE;
132: statusAttribute.set(connection, status);
133: }
134:
135: final BlockingQueue<MemcachedRequest> requestQueue = requestQueueAttribute.get(connection);
136:• if (requestQueue == null) {
137: throw new IOException("request queue must not be null");
138: }
139:
140: short keyLength;
141: byte extraLength;
142: int totalBodyLength;
143: int valueLength;
144: MemcachedRequest sentRequest;
145: MemcachedResponse response = responseAttribute.get(connection);
146: while (true) {
147:• switch (status) {
148: case NONE:
149:• if (input.remaining() < HEADER_LENGTH) {
150: return ctx.getStopAction(input);
151: }
152:
153: status = ParsingStatus.READ_HEADER;
154: statusAttribute.set(connection, status);
155: break;
156: case READ_HEADER:
157: /*
158: |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
159: +---------------+---------------+---------------+---------------+
160: 0| Magic | Opcode | Key Length |
161: +---------------+---------------+---------------+---------------+
162: 4| Extras length | Data type | Status |
163: +---------------+---------------+---------------+---------------+
164: 8| Total body length |
165: +---------------+---------------+---------------+---------------+
166: 12| Opaque |
167: +---------------+---------------+---------------+---------------+
168: 16| CAS |
169: | |
170: +---------------+---------------+---------------+---------------+
171: Total 24 bytes
172: */
173: input.mark(); // for processing the request again if there is no-reply
174:
175: final byte magic = input.get();
176:• if (magic != RESPONSE_MAGIC_NUMBER) {
177: throw new IOException("invalid magic");
178: }
179: final byte op = input.get();
180: sentRequest = requestQueue.peek();
181:• if (sentRequest == null) {
182: throw new IOException("invalid response");
183: }
184: final CommandOpcodes commandOpcode = sentRequest.getOp();
185: response.setOp(commandOpcode);
186:• if (op != commandOpcode.opcode()) {
187:• if (sentRequest.isNoReply()) {
188: status = ParsingStatus.NO_REPLY;
189: statusAttribute.set(connection, status);
190: break;
191: } else {
192: throw new IOException("invalid op: " + op);
193: }
194: }
195: keyLength = input.getShort();
196:• if (keyLength < 0) {
197: throw new IOException("invalid key length: " + keyLength);
198: }
199: response.setKeyLength(keyLength);
200: extraLength = input.get();
201:• if (extraLength < 0) {
202: throw new IOException("invalid extra length: " + extraLength);
203: }
204: response.setExtraLength(extraLength);
205: response.setDataType(input.get());
206: response.setStatus(ResponseStatus.getResponseStatus(input.getShort()));
207: totalBodyLength = input.getInt();
208:• if (totalBodyLength < 0) {
209: throw new IOException("invalid total body length: " + totalBodyLength);
210: }
211: response.setTotalBodyLength(totalBodyLength);
212: final int opaque = input.getInt();
213:• if (sentRequest.isNoReply() && opaque != sentRequest.getOpaque()) {
214: status = ParsingStatus.NO_REPLY;
215: statusAttribute.set(connection, status);
216: break;
217: } else {
218: response.setOpaque(opaque);
219: }
220: response.setCas(input.getLong());
221:
222: status = ParsingStatus.READ_EXTRAS;
223: statusAttribute.set(connection, status);
224: break;
225: case READ_EXTRAS:
226: extraLength = response.getExtraLength();
227:• if (input.remaining() < extraLength) {
228: return ctx.getStopAction(input);
229: }
230:• if (extraLength == 4) {
231: response.setFlags(input.getInt());
232: } else {
233: input.position(input.position() + extraLength); // skip
234: }
235:
236: status = ParsingStatus.READ_KEY;
237: statusAttribute.set(connection, status);
238: break;
239: case READ_KEY:
240: keyLength = response.getKeyLength();
241:• if (input.remaining() < keyLength) {
242: return ctx.getStopAction(input);
243: }
244:• if (keyLength > 0) {
245: final int currentPosition = input.position();
246: final int limit = currentPosition + keyLength;
247: response.setDecodedKey(input, currentPosition, limit, memoryManager);
248: input.position(limit);
249: } else {
250: response.setDecodedKey(null);
251: }
252:
253: status = ParsingStatus.READ_VALUE;
254: statusAttribute.set(connection, status);
255: break;
256: case READ_VALUE:
257: totalBodyLength = response.getTotalBodyLength();
258: keyLength = response.getKeyLength();
259: extraLength = response.getExtraLength();
260: valueLength = totalBodyLength - keyLength - extraLength;
261:• if (valueLength < 0) {
262: throw new IOException("invalid length fields: "
263: + "total body length=" + totalBodyLength
264: + ", key length = " + keyLength
265: + ", extra length = " + extraLength);
266: }
267:• if (input.remaining() < valueLength) {
268: return ctx.getStopAction(input);
269: }
270:
271: final int currentPosition = input.position();
272: final int limit = currentPosition + valueLength;
273:• if (response.getStatus() == ResponseStatus.No_Error) {
274:• if (valueLength > 0) {
275: sentRequest = requestQueue.peek();
276:• if (sentRequest == null) {
277: throw new IOException("invalid response");
278: }
279: response.setDecodedValue(input, currentPosition, limit, memoryManager);
280: input.position(limit);
281: } else {
282: response.setDecodedValue(null);
283: }
284: } else {
285: response.setDecodedValue(null);
286: input.position(limit);
287: }
288:
289: status = ParsingStatus.DONE;
290: statusAttribute.set(connection, status);
291: break;
292: case DONE:
293: final boolean complete = response.complete();
294:• if (complete) {
295: sentRequest = requestQueue.remove();
296: response.setResult(sentRequest.getOriginKey(), ParsingStatus.DONE);
297:• if (sentRequest.disposed.compareAndSet(false, true)) {
298: sentRequest.response = response.getResult();
299: sentRequest.isError = response.isError();
300: sentRequest.notify.countDown();
301: }
302: } else {
303: sentRequest = requestQueue.peek();
304: response.setResult(sentRequest.getOriginKey(), ParsingStatus.DONE);
305:• if (!sentRequest.disposed.get()) {
306: sentRequest.response = response.getResult();
307: sentRequest.isError = response.isError();
308: sentRequest.notify.countDown();
309: }
310: }
311:
312:• if (localParsingOptimizing) {
313:• if (input.remaining() > 0) {
314: status = ParsingStatus.NONE;
315: statusAttribute.set(connection, status);
316: response.clear();
317: break;
318: } else {
319: input.tryDispose();
320: statusAttribute.remove(connection);
321: responseAttribute.remove(connection);
322: response.recycle();
323: return ctx.getStopAction();
324: }
325: } else {
326: // Check if the input buffer has more than 1 complete memcached message
327: // If yes - split up the first message and the remainder
328:• final Buffer remainder = input.remaining() > 0 ? input.split(input.position()) : null;
329: input.tryDispose();
330: statusAttribute.remove(connection);
331: responseAttribute.remove(connection);
332: response.recycle();
333:• if (remainder == null) {
334: return ctx.getStopAction();
335: } else {
336: // Instruct FilterChain to store the remainder (if any) and continue execution
337: return ctx.getInvokeAction(remainder);
338: }
339: }
340: case NO_REPLY:
341: // processing next internal memcached request
342: sentRequest = requestQueue.remove();
343: response.setResult(sentRequest.getOriginKey(), ParsingStatus.NO_REPLY);
344: sentRequest.response = response.getResult();
345: sentRequest.isError = Boolean.FALSE;
346: sentRequest.notify.countDown();
347: input.reset();
348:
349: status = ParsingStatus.READ_HEADER;
350: statusAttribute.set(connection, status);
351: response.clear();
352: break;
353: default:
354: throw new IllegalStateException("invalid internal status");
355: }
356: }
357: }
358:
359: @Override
360: public NextAction handleWrite(FilterChainContext ctx) throws IOException {
361: final MemcachedRequest[] requests = ctx.getMessage();
362:• if (requests == null) {
363: throw new IOException("Input message could not be null");
364: }
365: final Connection connection = ctx.getConnection();
366:• if (connection == null) {
367: throw new IOException("connection must not be null. this connection was already closed or not opened");
368: }
369:
370: final BlockingQueue<MemcachedRequest> requestQueue = requestQueueAttribute.get(connection);
371:• if (requestQueue == null) {
372: throw new IOException("request queue must not be null. this connection was already closed or not opened. connection=" + connection);
373: }
374: MemoryManager memoryManager = ctx.getMemoryManager();
375:• if (memoryManager == null) {
376: memoryManager = MemoryManager.DEFAULT_MEMORY_MANAGER;
377: }
378:
379: final Buffer resultBuffer;
380:
381:• if (onceAllocationOptimizing) {
382: final int totalSize = calculateTotalPacketSize(requests);
383:• if (totalSize <= MAX_WRITE_BUFFER_SIZE_FOR_OPTIMIZING) {
384: resultBuffer = makePacketsByOnceAllocation(memoryManager, connection, requests, requestQueue, totalSize);
385: } else {
386: resultBuffer = makePackets(memoryManager, connection, requests, requestQueue);
387: }
388: } else {
389: resultBuffer = makePackets(memoryManager, connection, requests, requestQueue);
390: }
391:• if (resultBuffer != null) {
392: resultBuffer.allowBufferDispose(true);
393:• if (resultBuffer.isComposite()) {
394: ((CompositeBuffer) resultBuffer).allowInternalBuffersDispose(true);
395: }
396: ctx.setMessage(resultBuffer);
397: }
398: return ctx.getInvokeAction();
399: }
400:
401: private int calculateTotalPacketSize(final MemcachedRequest[] requests) {
402:• if (requests == null) {
403: return 0;
404: }
405: int totalSize = requests.length * HEADER_LENGTH;
406:• for (MemcachedRequest request : requests) {
407: totalSize += request.getExtrasLength();
408: totalSize += request.getKeyLength();
409: totalSize += request.getValueLength();
410: }
411: return totalSize;
412: }
413:
414: private Buffer makePacketsByOnceAllocation(final MemoryManager memoryManager,
415: final Connection connection,
416: final MemcachedRequest[] requests,
417: final BlockingQueue<MemcachedRequest> requestQueue,
418: final int totalSize) throws IOException {
419:• if (memoryManager == null) {
420: throw new IllegalArgumentException("memory manager must not be null");
421: }
422:• if (connection == null) {
423: throw new IllegalArgumentException("connection must not be null");
424: }
425:• if (requests == null) {
426: throw new IllegalArgumentException("requests must not be null");
427: }
428:• if (requestQueue == null) {
429: throw new IllegalArgumentException("request queue must not be null");
430: }
431:• if (totalSize < HEADER_LENGTH) {
432: throw new IllegalArgumentException("invalid packet size");
433: }
434:
435: final Buffer buffer = memoryManager.allocate(totalSize);
436:• for (MemcachedRequest request : requests) {
437: // header
438: final byte extrasLength = request.getExtrasLength();
439: buffer.put(REQUEST_MAGIC_NUMBER);
440: buffer.put(request.getOp().opcode());
441: final short keyLength = request.getKeyLength();
442: buffer.putShort(keyLength);
443: buffer.put(extrasLength);
444: buffer.put(request.getDataType());
445: buffer.putShort(request.getvBucketId());
446: final int totalLength = keyLength + request.getValueLength() + extrasLength;
447: buffer.putInt(totalLength);
448: buffer.putInt(request.getOpaque());
449: buffer.putLong(request.getCas());
450:
451: // extras
452: request.fillExtras(buffer);
453:
454: // key
455: final Buffer keyBuffer = request.getKey();
456:• if (request.hasKey() && keyBuffer != null) {
457: buffer.put(keyBuffer);
458: keyBuffer.tryDispose();
459: }
460:
461: // value
462: final Buffer valueBuffer = request.getValue();
463:• if (request.hasValue() && valueBuffer != null) {
464: buffer.put(valueBuffer);
465: valueBuffer.tryDispose();
466: }
467: // store request
468: try {
469: requestQueue.put(request);
470: } catch (InterruptedException ie) {
471: Thread.currentThread().interrupt();
472: throw new IOException("failed to put the request", ie);
473: }
474: }
475: buffer.flip();
476: return buffer;
477: }
478:
479: private Buffer makePackets(final MemoryManager memoryManager,
480: final Connection connection,
481: final MemcachedRequest[] requests,
482: final BlockingQueue<MemcachedRequest> requestQueue) throws IOException {
483:• if (memoryManager == null) {
484: throw new IllegalArgumentException("memory manager must not be null");
485: }
486:• if (connection == null) {
487: throw new IllegalArgumentException("connection must not be null");
488: }
489:• if (requests == null) {
490: throw new IllegalArgumentException("requests must not be null");
491: }
492:• if (requestQueue == null) {
493: throw new IllegalArgumentException("request queue must not be null");
494: }
495: Buffer resultBuffer = null;
496:• for (MemcachedRequest request : requests) {
497: // header
498: final byte extrasLength = request.getExtrasLength();
499: final Buffer buffer = memoryManager.allocate(HEADER_LENGTH + extrasLength);
500: buffer.put(REQUEST_MAGIC_NUMBER);
501: buffer.put(request.getOp().opcode());
502:
503: final short keyLength = request.getKeyLength();
504: buffer.putShort(keyLength);
505: buffer.put(extrasLength);
506: buffer.put(request.getDataType());
507: buffer.putShort(request.getvBucketId());
508: final int totalLength = keyLength + request.getValueLength() + extrasLength;
509: buffer.putInt(totalLength);
510: buffer.putInt(request.getOpaque());
511: buffer.putLong(request.getCas());
512:
513: // extras
514: request.fillExtras(buffer);
515:
516: buffer.flip();
517: buffer.allowBufferDispose(true);
518:• if (resultBuffer == null) {
519: resultBuffer = buffer;
520: } else {
521: resultBuffer = Buffers.appendBuffers(memoryManager, resultBuffer, buffer);
522: }
523:
524: // key
525: final Buffer keyBuffer = request.getKey();
526:• if (request.hasKey() && keyBuffer != null) {
527: keyBuffer.allowBufferDispose(true);
528: resultBuffer = Buffers.appendBuffers(memoryManager, resultBuffer, keyBuffer);
529: }
530:
531: // value
532: final Buffer valueBuffer = request.getValue();
533:• if (request.hasValue() && valueBuffer != null) {
534: valueBuffer.allowBufferDispose(true);
535: resultBuffer = Buffers.appendBuffers(memoryManager, resultBuffer, valueBuffer);
536: }
537:
538: // store request
539: try {
540: requestQueue.put(request);
541: } catch (InterruptedException ie) {
542: Thread.currentThread().interrupt();
543: throw new IOException("failed to put the request", ie);
544: }
545: }
546: return resultBuffer;
547: }
548:
549: @SuppressWarnings("unchecked")
550: @Override
551: public NextAction handleClose(FilterChainContext ctx) throws IOException {
552: final Connection connection = ctx.getConnection();
553:• if (connection != null) {
554: final BlockingQueue<MemcachedRequest> requestQueue = requestQueueAttribute.get(connection);
555:• if (requestQueue != null) {
556: requestQueue.clear();
557: requestQueueAttribute.remove(connection);
558: }
559: responseAttribute.remove(connection);
560: statusAttribute.remove(connection);
561:
562: final ObjectPool connectionPool = connectionPoolAttribute.remove(connection);
563:• if (connectionPool != null) {
564: try {
565: connectionPool.removeObject(connection.getPeerAddress(), connection);
566:• if (logger.isLoggable(Level.FINE)) {
567: logger.log(Level.FINE, "the connection has been removed in pool. connection={0}", connection);
568: }
569: } catch (Exception ignore) {
570: }
571: }
572: }
573: return ctx.getInvokeAction();
574: }
575:
576: @SuppressWarnings("unchecked")
577: public <K, V> Map<K, V> getMultiResponse(final Connection connection,
578: final MemcachedRequest[] requests,
579: final long timeoutInMillis,
580: final Map<K, V> result) throws InterruptedException, TimeoutException {
581:• if (connection == null) {
582: throw new IllegalArgumentException("connection must not be null");
583: }
584:• if (requests == null) {
585: throw new IllegalArgumentException("requests must not be null");
586: }
587: final int requestLen = requests.length;
588:• if (requestLen < 1) {
589: throw new IllegalArgumentException("requests must include at least one request");
590: }
591:• if (result == null) {
592: throw new IllegalArgumentException("result must not be null");
593: }
594:
595: Object response;
596: Boolean isError;
597: final int lastIndex = requestLen - 1;
598: // wait for receiving last packet
599:• if (timeoutInMillis < 0) {
600: requests[lastIndex].notify.await();
601: response = requests[lastIndex].response;
602: isError = requests[lastIndex].isError;
603: } else {
604: requests[lastIndex].notify.await(timeoutInMillis, TimeUnit.MILLISECONDS);
605: response = requests[lastIndex].response;
606: isError = requests[lastIndex].isError;
607: }
608:• if (response == null && isError == null) {
609: throw new TimeoutException("timed out while getting the response");
610: }
611:• if (isError != null && !isError) {
612: result.put((K) requests[lastIndex].getOriginKey(), (V) response);
613: } else {
614:• if (logger.isLoggable(Level.FINE)) {
615: logger.log(Level.FINE, "error status op={0}, key={1}",
616: new Object[]{requests[lastIndex].getOp(), requests[lastIndex].getOriginKey()});
617: }
618: }
619: // collect previous packets
620:• for (int i = 0; i < requestLen - 1; i++) {
621: response = requests[i].response;
622: isError = requests[i].isError;
623:• if (response != null) {
624:• if (isError != null && !isError) {
625: result.put((K) requests[i].getOriginKey(), (V) response);
626: } else {
627:• if (logger.isLoggable(Level.FINE)) {
628: logger.log(Level.FINE, "error status op={0}, key={1}",
629: new Object[]{requests[i].getOp(), requests[i].getOriginKey()});
630: }
631: }
632: }
633: }
634: return result;
635: }
636:
637: @SuppressWarnings("unchecked")
638: public <V> V getCorrelatedResponse(final Connection connection,
639: final MemcachedRequest request,
640: final long timeoutInMillis) throws InterruptedException, TimeoutException {
641:• if (connection == null) {
642: throw new IllegalArgumentException("connection must not be null");
643: }
644:• if (request == null) {
645: throw new IllegalArgumentException("request must not be null");
646: }
647:• if (request.isNoReply()) {
648: throw new IllegalArgumentException("request type is no reply");
649: }
650:
651: final Object response;
652: final Boolean isError;
653:• if (timeoutInMillis < 0) {
654: request.notify.await();
655: response = request.response;
656: isError = request.isError;
657: } else {
658: request.notify.await(timeoutInMillis, TimeUnit.MILLISECONDS);
659: response = request.response;
660: isError = request.isError;
661: }
662:
663:• if (response == null && isError == null) {
664: throw new TimeoutException("timed out while getting the response");
665: }
666: final V result;
667:• if (isError != null && !isError) {
668: result = (V) response;
669: } else {
670: result = null;
671:• if (logger.isLoggable(Level.FINE)) {
672: logger.log(Level.FINE, "error status op={0}, key={1}", new Object[]{request.getOp(), request.getOriginKey()});
673: }
674: }
675: return result;
676: }
677: }