Skip to content

Package: GrizzlyMemcachedCache$Builder

GrizzlyMemcachedCache$Builder

nameinstructionbranchcomplexitylinemethod
GrizzlyMemcachedCache.Builder(String, GrizzlyMemcachedCacheManager, TCPNIOTransport)
M: 58 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 19 C: 0
0%
M: 1 C: 0
0%
allowDisposableConnection(boolean)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
borrowValidation(boolean)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
build()
M: 22 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 6 C: 0
0%
M: 1 C: 0
0%
connectTimeoutInMillis(long)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
failover(boolean)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
healthMonitorIntervalInSecs(long)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
keepAliveTimeoutInSecs(long)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
maxConnectionPerServer(int)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
minConnectionPerServer(int)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
preferRemoteConfig(boolean)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
responseTimeoutInMillis(long)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
retryCount(int)
M: 2 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
returnValidation(boolean)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
servers(Set)
M: 12 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
writeTimeoutInMillis(long)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*
2: * Copyright (c) 2012, 2017 Oracle and/or its affiliates. All rights reserved.
3: *
4: * This program and the accompanying materials are made available under the
5: * terms of the Eclipse Public License v. 2.0, which is available at
6: * http://www.eclipse.org/legal/epl-2.0.
7: *
8: * This Source Code may also be made available under the following Secondary
9: * Licenses when the conditions for such availability set forth in the
10: * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
11: * version 2 with the GNU Classpath Exception, which is available at
12: * https://www.gnu.org/software/classpath/license.html.
13: *
14: * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
15: */
16:
17: package org.glassfish.grizzly.memcached;
18:
19: import org.glassfish.grizzly.Buffer;
20: import org.glassfish.grizzly.CompletionHandler;
21: import org.glassfish.grizzly.Connection;
22: import org.glassfish.grizzly.ConnectorHandler;
23: import org.glassfish.grizzly.Grizzly;
24: import org.glassfish.grizzly.GrizzlyFuture;
25: import org.glassfish.grizzly.Processor;
26: import org.glassfish.grizzly.WriteResult;
27: import org.glassfish.grizzly.attributes.Attribute;
28: import org.glassfish.grizzly.attributes.AttributeHolder;
29: import org.glassfish.grizzly.filterchain.FilterChain;
30: import org.glassfish.grizzly.memcached.pool.BaseObjectPool;
31: import org.glassfish.grizzly.memcached.pool.NoValidObjectException;
32: import org.glassfish.grizzly.memcached.pool.ObjectPool;
33: import org.glassfish.grizzly.memcached.pool.PoolExhaustedException;
34: import org.glassfish.grizzly.memcached.pool.PoolableObjectFactory;
35: import org.glassfish.grizzly.memcached.zookeeper.BarrierListener;
36: import org.glassfish.grizzly.memcached.zookeeper.CacheServerListBarrierListener;
37: import org.glassfish.grizzly.memcached.zookeeper.PreferRemoteConfigBarrierListener;
38: import org.glassfish.grizzly.memcached.zookeeper.ZKClient;
39: import org.glassfish.grizzly.memcached.zookeeper.ZooKeeperSupportCache;
40: import org.glassfish.grizzly.nio.transport.TCPNIOConnectorHandler;
41: import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
42:
43: import java.io.UnsupportedEncodingException;
44: import java.net.SocketAddress;
45: import java.util.ArrayList;
46: import java.util.Collections;
47: import java.util.HashMap;
48: import java.util.HashSet;
49: import java.util.List;
50: import java.util.Map;
51: import java.util.Set;
52: import java.util.concurrent.ConcurrentHashMap;
53: import java.util.concurrent.ExecutionException;
54: import java.util.concurrent.Executors;
55: import java.util.concurrent.Future;
56: import java.util.concurrent.ScheduledExecutorService;
57: import java.util.concurrent.ScheduledFuture;
58: import java.util.concurrent.TimeUnit;
59: import java.util.concurrent.TimeoutException;
60: import java.util.concurrent.atomic.AtomicBoolean;
61: import java.util.concurrent.atomic.AtomicInteger;
62: import java.util.logging.Level;
63: import java.util.logging.Logger;
64:
65: /**
66: * The implementation of the {@link MemcachedCache} based on Grizzly
67: * <p>
68: * Basically, this class use {@link BaseObjectPool} for pooling connections of the memcached server
69: * and {@link ConsistentHashStore} for selecting the memcached server corresponding to the given key.
70: * <p>
71: * When a Cache operation is called,
72: * 1. finding the correct server by consistent hashing
73: * 2. borrowing the connection from the connection pool
74: * 3. queueing request and sending packets to the memcached server and waiting for notification
75: * 4. being waken by Grizzly filter when the response is arrived
76: * 5. returning the connection to the pool
77: * <p>
78: * For the failback of the memcached server, {@link HealthMonitorTask} will be scheduled by {@code healthMonitorIntervalInSecs}.
79: * If connecting and writing are failed, this cache retries failure operations by {@code retryCount}.
80: * If retrials also failed, the server will be regarded as not valid and removed in {@link ConsistentHashStore}.
81: * Sometimes, automatical changes of the server list can cause stale cache data at runtime.
82: * So this cache provides {@code failover} flag which can turn off the failover/failback.
83: * <p>
84: * This cache also supports bulk operations such as {@link #setMulti} as well as {@link #getMulti}.
85: * <p>
86: * Example of use:
87: * {@code
88: * // creates a CacheManager
89: * final GrizzlyMemcachedCacheManager manager = new GrizzlyMemcachedCacheManager.Builder().build();
90: *
91: * // creates a CacheBuilder
92: * final GrizzlyMemcachedCache.Builder<String, String> builder = manager.createCacheBuilder("USER");
93: * // sets initial servers
94: * builder.servers(initServerSet);
95: * // creates the specific Cache
96: * final MemcachedCache<String, String> userCache = builder.build();
97: *
98: * // if you need to add another server
99: * userCache.addServer(anotherServerAddress);
100: *
101: * // cache operations
102: * boolean result = userCache.set("name", "foo", expirationTimeoutInSec, false);
103: * String value = userCache.get("name", false);
104: * // ...
105: *
106: * // shuts down
107: * manager.shutdown();
108: * }
109: *
110: * @author Bongjae Chang
111: */
112: public class GrizzlyMemcachedCache<K, V> implements MemcachedCache<K, V>, ZooKeeperSupportCache {
113:
114: private static final Logger logger = Grizzly.logger(GrizzlyMemcachedCache.class);
115:
116: private static final AtomicInteger opaqueIndex = new AtomicInteger();
117:
118: private final String cacheName;
119: private final TCPNIOTransport transport;
120: private final long connectTimeoutInMillis;
121: private final long writeTimeoutInMillis;
122: private final long responseTimeoutInMillis;
123:
124: public static final String CONNECTION_POOL_ATTRIBUTE_NAME = "GrizzlyMemcachedCache.ConnectionPool";
125: private final Attribute<ObjectPool<SocketAddress, Connection<SocketAddress>>> connectionPoolAttribute =
126: Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(CONNECTION_POOL_ATTRIBUTE_NAME);
127: private final ObjectPool<SocketAddress, Connection<SocketAddress>> connectionPool;
128:
129: private final Set<SocketAddress> servers;
130:
131: private final long healthMonitorIntervalInSecs;
132: private final ScheduledFuture<?> scheduledFuture;
133: private final HealthMonitorTask healthMonitorTask;
134: private final ScheduledExecutorService scheduledExecutor;
135:
136: private final boolean failover;
137:
138: private final boolean preferRemoteConfig;
139:
140: private final ConsistentHashStore<SocketAddress> consistentHash = new ConsistentHashStore<SocketAddress>();
141:
142: private final ZKClient zkClient;
143: private final CacheServerListBarrierListener zkListener;
144: private String zooKeeperServerListPath;
145:
146: private MemcachedClientFilter clientFilter;
147:
148: private GrizzlyMemcachedCache(Builder<K, V> builder) {
149: this.cacheName = builder.cacheName;
150: this.transport = builder.transport;
151: this.connectTimeoutInMillis = builder.connectTimeoutInMillis;
152: this.writeTimeoutInMillis = builder.writeTimeoutInMillis;
153: this.responseTimeoutInMillis = builder.responseTimeoutInMillis;
154: this.healthMonitorIntervalInSecs = builder.healthMonitorIntervalInSecs;
155:
156: @SuppressWarnings("unchecked")
157: final BaseObjectPool.Builder<SocketAddress, Connection<SocketAddress>> connectionPoolBuilder =
158: new BaseObjectPool.Builder<SocketAddress, Connection<SocketAddress>>(new PoolableObjectFactory<SocketAddress, Connection<SocketAddress>>() {
159: @Override
160: public Connection<SocketAddress> createObject(final SocketAddress key) throws Exception {
161: final ConnectorHandler<SocketAddress> connectorHandler =
162: TCPNIOConnectorHandler.builder(transport).setReuseAddress(true).build();
163: final Future<Connection> future = connectorHandler.connect(key);
164: final Connection<SocketAddress> connection;
165: try {
166: if (connectTimeoutInMillis < 0) {
167: connection = future.get();
168: } else {
169: connection = future.get(connectTimeoutInMillis, TimeUnit.MILLISECONDS);
170: }
171: } catch (InterruptedException ie) {
172: if (!future.cancel(false) && future.isDone()) {
173: final Connection c = future.get();
174: if (c != null && c.isOpen()) {
175: c.closeSilently();
176: }
177: }
178: if (logger.isLoggable(Level.FINER)) {
179: logger.log(Level.FINER, "failed to get the connection. address=" + key, ie);
180: }
181: throw ie;
182: } catch (ExecutionException ee) {
183: if (!future.cancel(false) && future.isDone()) {
184: final Connection c = future.get();
185: if (c != null && c.isOpen()) {
186: c.closeSilently();
187: }
188: }
189: if (logger.isLoggable(Level.FINER)) {
190: logger.log(Level.FINER, "failed to get the connection. address=" + key, ee);
191: }
192: throw ee;
193: } catch (TimeoutException te) {
194: if (!future.cancel(false) && future.isDone()) {
195: final Connection c = future.get();
196: if (c != null && c.isOpen()) {
197: c.closeSilently();
198: }
199: }
200: if (logger.isLoggable(Level.FINER)) {
201: logger.log(Level.FINER, "failed to get the connection. address=" + key, te);
202: }
203: throw te;
204: }
205: if (connection != null) {
206: connectionPoolAttribute.set(connection, connectionPool);
207: return connection;
208: } else {
209: throw new IllegalStateException("connection must not be null");
210: }
211: }
212:
213: @Override
214: public void destroyObject(final SocketAddress key, final Connection<SocketAddress> value) throws Exception {
215: if (value != null) {
216: if (value.isOpen()) {
217: value.closeSilently();
218: }
219: final AttributeHolder attributeHolder = value.getAttributes();
220: if (attributeHolder != null) {
221: attributeHolder.removeAttribute(CONNECTION_POOL_ATTRIBUTE_NAME);
222: }
223: if (logger.isLoggable(Level.FINEST)) {
224: logger.log(Level.FINEST, "the connection has been destroyed. key={0}, value={1}", new Object[]{key, value});
225: }
226: }
227: }
228:
229: @Override
230: public boolean validateObject(final SocketAddress key, final Connection<SocketAddress> value) throws Exception {
231: return GrizzlyMemcachedCache.this.validateConnectionWithNoopCommand(value);
232: // or return GrizzlyMemcachedCache.this.validateConnectionWithVersionCommand(value);
233: }
234: });
235: connectionPoolBuilder.min(builder.minConnectionPerServer);
236: connectionPoolBuilder.max(builder.maxConnectionPerServer);
237: connectionPoolBuilder.keepAliveTimeoutInSecs(builder.keepAliveTimeoutInSecs);
238: connectionPoolBuilder.disposable(builder.allowDisposableConnection);
239: connectionPoolBuilder.borrowValidation(builder.borrowValidation);
240: connectionPoolBuilder.returnValidation(builder.returnValidation);
241: connectionPool = connectionPoolBuilder.build();
242:
243: this.failover = builder.failover;
244:
245: this.servers = builder.servers;
246:
247: if (failover && healthMonitorIntervalInSecs > 0) {
248: healthMonitorTask = new HealthMonitorTask();
249: scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
250: scheduledFuture = scheduledExecutor.scheduleWithFixedDelay(healthMonitorTask, healthMonitorIntervalInSecs, healthMonitorIntervalInSecs, TimeUnit.SECONDS);
251: } else {
252: healthMonitorTask = null;
253: scheduledExecutor = null;
254: scheduledFuture = null;
255: }
256:
257: this.preferRemoteConfig = builder.preferRemoteConfig;
258: if (this.preferRemoteConfig) {
259: this.zkListener = new PreferRemoteConfigBarrierListener(this, servers);
260: } else {
261: this.zkListener = new CacheServerListBarrierListener(this, servers);
262: }
263: this.zkClient = builder.zkClient;
264: }
265:
266: /**
267: * {@inheritDoc}
268: */
269: @Override
270: public void start() {
271: final Processor processor = transport.getProcessor();
272: if (!(processor instanceof FilterChain)) {
273: throw new IllegalStateException("transport's processor has to be a FilterChain");
274: }
275: final FilterChain filterChain = (FilterChain) processor;
276: final int idx = filterChain.indexOfType(MemcachedClientFilter.class);
277: if (idx == -1) {
278: throw new IllegalStateException("transport has to have MemcachedClientFilter in the FilterChain");
279: }
280: clientFilter = (MemcachedClientFilter) filterChain.get(idx);
281: if (clientFilter == null) {
282: throw new IllegalStateException("MemcachedClientFilter should not be null");
283: }
284: if (zkClient != null) {
285: if (!preferRemoteConfig) {
286: for (final SocketAddress address : servers) {
287: addServer(address);
288: }
289: } else {
290: if (logger.isLoggable(Level.INFO)) {
291: logger.log(Level.INFO, "local config has been ignored because preferRemoteConfig is true. servers={0}", servers);
292: }
293: }
294: // need to initialize the remote server with local initalServers if the remote server data is empty?
295: // currently, do nothing
296: zooKeeperServerListPath = zkClient.registerBarrier(cacheName, zkListener, null);
297: } else {
298: for (final SocketAddress address : servers) {
299: addServer(address);
300: }
301: }
302: }
303:
304: /**
305: * {@inheritDoc}
306: */
307: @Override
308: public void stop() {
309: if (scheduledFuture != null) {
310: scheduledFuture.cancel(true);
311: }
312: if (scheduledExecutor != null) {
313: scheduledExecutor.shutdown();
314: }
315: servers.clear();
316: consistentHash.clear();
317: if (connectionPool != null) {
318: connectionPool.destroy();
319: }
320: if (zkClient != null) {
321: zkClient.unregisterBarrier(cacheName);
322: }
323: }
324:
325: /**
326: * {@inheritDoc}
327: */
328: @SuppressWarnings("unchecked")
329: @Override
330: public boolean addServer(final SocketAddress serverAddress) {
331: return addServer(serverAddress, true);
332: }
333:
334: @SuppressWarnings("unchecked")
335: private boolean addServer(final SocketAddress serverAddress, boolean initial) {
336: if (serverAddress == null) {
337: return true;
338: }
339: if (connectionPool != null) {
340: try {
341: connectionPool.createAllMinObjects(serverAddress);
342: } catch (Exception e) {
343: if (logger.isLoggable(Level.SEVERE)) {
344: logger.log(Level.SEVERE, "failed to create min connections in the pool. address=" + serverAddress, e);
345: }
346: try {
347: connectionPool.destroy(serverAddress);
348: } catch (Exception ignore) {
349: }
350: if (!initial) {
351: return false;
352: }
353: }
354: }
355: consistentHash.add(serverAddress);
356: servers.add(serverAddress);
357:
358: if (logger.isLoggable(Level.INFO)) {
359: logger.log(Level.INFO, "added the server to the consistent hash successfully. address={0}", serverAddress);
360: }
361: return true;
362: }
363:
364: /**
365: * {@inheritDoc}
366: */
367: @Override
368: public void removeServer(final SocketAddress serverAddress) {
369: removeServer(serverAddress, true);
370: }
371:
372: private void removeServer(final SocketAddress serverAddress, final boolean forcibly) {
373: if (serverAddress == null) {
374: return;
375: }
376: if (!forcibly) {
377: if (healthMonitorTask != null && healthMonitorTask.failure(serverAddress)) {
378: consistentHash.remove(serverAddress);
379: servers.remove(serverAddress);
380: if (logger.isLoggable(Level.INFO)) {
381: logger.log(Level.INFO, "removed the server from the consistent hash successfully. address={0}", serverAddress);
382: }
383: }
384: } else {
385: consistentHash.remove(serverAddress);
386: servers.remove(serverAddress);
387: if (logger.isLoggable(Level.INFO)) {
388: logger.log(Level.INFO, "removed the server from the consistent hash successfully. address={0}", serverAddress);
389: }
390: }
391: if (connectionPool != null) {
392: try {
393: connectionPool.destroy(serverAddress);
394: if (logger.isLoggable(Level.INFO)) {
395: logger.log(Level.INFO, "removed the server in the pool successfully. address={0}", serverAddress);
396: }
397: } catch (Exception e) {
398: if (logger.isLoggable(Level.WARNING)) {
399: logger.log(Level.WARNING, "failed to remove connections in the pool", e);
400: }
401: }
402: }
403: }
404:
405: /**
406: * {@inheritDoc}
407: */
408: @Override
409: public boolean isInServerList(final SocketAddress serverAddress) {
410: return consistentHash.hasValue(serverAddress);
411: }
412:
413: /**
414: * {@inheritDoc}
415: */
416: @Override
417: public List<SocketAddress> getCurrentServerList() {
418: if (!servers.isEmpty()) {
419: return new ArrayList<SocketAddress>(servers);
420: } else {
421: return Collections.emptyList();
422: }
423: }
424:
425: /**
426: * {@inheritDoc}
427: */
428: @Override
429: public boolean isZooKeeperSupported() {
430: return zkClient != null;
431: }
432:
433: /**
434: * {@inheritDoc}
435: */
436: @Override
437: public String getZooKeeperServerListPath() {
438: if (!isZooKeeperSupported()) {
439: return null;
440: }
441: return zooKeeperServerListPath;
442: }
443:
444: /**
445: * {@inheritDoc}
446: */
447: @Override
448: public String getCurrentServerListFromZooKeeper() {
449: if (!isZooKeeperSupported()) {
450: return null;
451: }
452: final byte[] serverListBytes = zkClient.getData(zooKeeperServerListPath, null);
453: if (serverListBytes == null) {
454: return null;
455: }
456: final String serverListString;
457: try {
458: serverListString = new String(serverListBytes, CacheServerListBarrierListener.DEFAULT_SERVER_LIST_CHARSET);
459: } catch (UnsupportedEncodingException e) {
460: if (logger.isLoggable(Level.WARNING)) {
461: logger.log(Level.WARNING, "failed to decode the server list bytes");
462: }
463: return null;
464: }
465: return serverListString;
466: }
467:
468: /**
469: * {@inheritDoc}
470: */
471: @Override
472: public boolean setCurrentServerListOfZooKeeper(final String cacheServerList) {
473: if (!isZooKeeperSupported()) {
474: return false;
475: }
476: if (cacheServerList == null) {
477: return false;
478: }
479: final byte[] serverListBytes;
480: try {
481: serverListBytes = cacheServerList.getBytes(CacheServerListBarrierListener.DEFAULT_SERVER_LIST_CHARSET);
482: } catch (UnsupportedEncodingException e) {
483: if (logger.isLoggable(Level.WARNING)) {
484: logger.log(Level.WARNING, "failed to eecode the server list");
485: }
486: return false;
487: }
488: return zkClient.setData(zooKeeperServerListPath, serverListBytes, -1) != null;
489: }
490:
491: /**
492: * {@inheritDoc}
493: */
494: @Override
495: public void addZooKeeperListener(final BarrierListener listener) {
496: if (!isZooKeeperSupported()) {
497: return;
498: }
499: zkListener.addCustomListener(listener);
500: }
501:
502: /**
503: * {@inheritDoc}
504: */
505: @Override
506: public void removeZooKeeperListener(final BarrierListener listener) {
507: if (!isZooKeeperSupported()) {
508: return;
509: }
510: zkListener.removeCustomListener(listener);
511: }
512:
513: /**
514: * {@inheritDoc}
515: */
516: @Override
517: public String getName() {
518: return cacheName;
519: }
520:
521: @Override
522: public boolean set(final K key, final V value, final int expirationInSecs, final boolean noReply) {
523: return set(key, value, expirationInSecs, noReply, writeTimeoutInMillis, responseTimeoutInMillis);
524:
525: }
526:
527: @Override
528: public boolean set(final K key, final V value, final int expirationInSecs, final boolean noReply, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
529: if (key == null || value == null) {
530: return false;
531: }
532: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(true, true, true);
533: builder.op(noReply ? CommandOpcodes.SetQ : CommandOpcodes.Set);
534: builder.noReply(noReply);
535: builder.opaque(noReply ? generateOpaque() : 0);
536: builder.originKey(key);
537: final BufferWrapper<K> keyWrapper = BufferWrapper.wrap(key, transport.getMemoryManager());
538: final Buffer keyBuffer = keyWrapper.getBuffer();
539: builder.key(keyBuffer);
540: keyWrapper.recycle();
541: final BufferWrapper valueWrapper = BufferWrapper.wrap(value, transport.getMemoryManager());
542: builder.value(valueWrapper.getBuffer());
543: builder.flags(valueWrapper.getType().flags);
544: valueWrapper.recycle();
545: builder.expirationInSecs(expirationInSecs);
546: final MemcachedRequest request = builder.build();
547:
548: final SocketAddress address = consistentHash.get(keyBuffer.toByteBuffer());
549: if (address == null) {
550: builder.recycle();
551: return false;
552: }
553: try {
554: if (noReply) {
555: sendNoReply(address, request);
556: return true;
557: } else {
558: final Object result = send(address, request, writeTimeoutInMillis, responseTimeoutInMillis);
559: if (result instanceof Boolean) {
560: return (Boolean) result;
561: } else {
562: return false;
563: }
564: }
565: } catch (InterruptedException ie) {
566: Thread.currentThread().interrupt();
567: if (logger.isLoggable(Level.SEVERE)) {
568: logger.log(Level.SEVERE, "failed to set. address=" + address + ", request=" + request, ie);
569: }
570: return false;
571: } catch (Exception e) {
572: if (logger.isLoggable(Level.SEVERE)) {
573: logger.log(Level.SEVERE, "failed to set. address=" + address + ", request=" + request, e);
574: }
575: return false;
576: } finally {
577: builder.recycle();
578: }
579: }
580:
581: @Override
582: public Map<K, Boolean> setMulti(final Map<K, V> map, final int expirationInSecs) {
583: return setMulti(map, expirationInSecs, writeTimeoutInMillis, responseTimeoutInMillis);
584: }
585:
586: @Override
587: public Map<K, Boolean> setMulti(final Map<K, V> map, final int expirationInSecs, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
588: final Map<K, Boolean> result = new HashMap<K, Boolean>();
589: if (map == null || map.isEmpty()) {
590: return result;
591: }
592:
593: // categorize keys by address
594: final Map<SocketAddress, List<BufferWrapper<K>>> categorizedMap = new HashMap<SocketAddress, List<BufferWrapper<K>>>();
595: for (K key : map.keySet()) {
596: final BufferWrapper<K> keyWrapper = BufferWrapper.wrap(key, transport.getMemoryManager());
597: final Buffer keyBuffer = keyWrapper.getBuffer();
598: final SocketAddress address = consistentHash.get(keyBuffer.toByteBuffer());
599: if (address == null) {
600: if (logger.isLoggable(Level.WARNING)) {
601: logger.log(Level.WARNING, "failed to get the address from the consistent hash in setMulti(). key buffer={0}", keyBuffer);
602: }
603: keyWrapper.recycle();
604: continue;
605: }
606: List<BufferWrapper<K>> keyList = categorizedMap.get(address);
607: if (keyList == null) {
608: keyList = new ArrayList<BufferWrapper<K>>();
609: categorizedMap.put(address, keyList);
610: }
611: keyList.add(keyWrapper);
612: }
613:
614: // set multi from server
615: for (Map.Entry<SocketAddress, List<BufferWrapper<K>>> entry : categorizedMap.entrySet()) {
616: final SocketAddress address = entry.getKey();
617: final List<BufferWrapper<K>> keyList = entry.getValue();
618: try {
619: sendSetMulti(entry.getKey(), keyList, map, expirationInSecs, writeTimeoutInMillis, responseTimeoutInMillis, result);
620: } catch (InterruptedException ie) {
621: Thread.currentThread().interrupt();
622: if (logger.isLoggable(Level.SEVERE)) {
623: logger.log(Level.SEVERE, "failed to execute setMulti(). address=" + address + ", keySize=" + keyList.size(), ie);
624: } else if (logger.isLoggable(Level.FINER)) {
625: logger.log(Level.FINER, "failed to execute setMulti(). address=" + address + ", keyList=" + keyList, ie);
626: }
627: } catch (Exception e) {
628: if (logger.isLoggable(Level.SEVERE)) {
629: logger.log(Level.SEVERE, "failed to execute setMulti(). address=" + address + ", keySize=" + keyList.size(), e);
630: } else if (logger.isLoggable(Level.FINER)) {
631: logger.log(Level.FINER, "failed to execute setMulti(). address=" + address + ", keyList=" + keyList, e);
632: }
633: } finally {
634: recycleBufferWrappers(keyList);
635: }
636: }
637: return result;
638: }
639:
640: @Override
641: public boolean add(final K key, final V value, final int expirationInSecs, final boolean noReply) {
642: return add(key, value, expirationInSecs, noReply, writeTimeoutInMillis, responseTimeoutInMillis);
643: }
644:
645: @Override
646: public boolean add(final K key, final V value, final int expirationInSecs, final boolean noReply, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
647: if (key == null || value == null) {
648: return false;
649: }
650: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(true, true, true);
651: builder.op(noReply ? CommandOpcodes.AddQ : CommandOpcodes.Add);
652: builder.noReply(noReply);
653: builder.opaque(noReply ? generateOpaque() : 0);
654: builder.originKey(key);
655: final BufferWrapper<K> keyWrapper = BufferWrapper.wrap(key, transport.getMemoryManager());
656: final Buffer keyBuffer = keyWrapper.getBuffer();
657: builder.key(keyBuffer);
658: keyWrapper.recycle();
659: final BufferWrapper valueWrapper = BufferWrapper.wrap(value, transport.getMemoryManager());
660: builder.value(valueWrapper.getBuffer());
661: builder.flags(valueWrapper.getType().flags);
662: valueWrapper.recycle();
663: builder.expirationInSecs(expirationInSecs);
664: final MemcachedRequest request = builder.build();
665:
666: final SocketAddress address = consistentHash.get(keyBuffer.toByteBuffer());
667: if (address == null) {
668: builder.recycle();
669: return false;
670: }
671: try {
672: if (noReply) {
673: sendNoReply(address, request);
674: return true;
675: } else {
676: final Object result = send(address, request, writeTimeoutInMillis, responseTimeoutInMillis);
677: if (result instanceof Boolean) {
678: return (Boolean) result;
679: } else {
680: return false;
681: }
682: }
683: } catch (InterruptedException ie) {
684: Thread.currentThread().interrupt();
685: if (logger.isLoggable(Level.SEVERE)) {
686: logger.log(Level.SEVERE, "failed to add. address=" + address + ", request=" + request, ie);
687: }
688: return false;
689: } catch (Exception e) {
690: if (logger.isLoggable(Level.SEVERE)) {
691: logger.log(Level.SEVERE, "failed to add. address=" + address + ", request=" + request, e);
692: }
693: return false;
694: } finally {
695: builder.recycle();
696: }
697: }
698:
699: @Override
700: public boolean replace(final K key, final V value, final int expirationInSecs, final boolean noReply) {
701: return replace(key, value, expirationInSecs, noReply, writeTimeoutInMillis, responseTimeoutInMillis);
702: }
703:
704: @Override
705: public boolean replace(final K key, final V value, final int expirationInSecs, final boolean noReply, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
706: if (key == null || value == null) {
707: return false;
708: }
709: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(true, true, true);
710: builder.op(noReply ? CommandOpcodes.ReplaceQ : CommandOpcodes.Replace);
711: builder.noReply(noReply);
712: builder.opaque(noReply ? generateOpaque() : 0);
713: builder.originKey(key);
714: final BufferWrapper<K> keyWrapper = BufferWrapper.wrap(key, transport.getMemoryManager());
715: final Buffer keyBuffer = keyWrapper.getBuffer();
716: builder.key(keyBuffer);
717: keyWrapper.recycle();
718: final BufferWrapper valueWrapper = BufferWrapper.wrap(value, transport.getMemoryManager());
719: builder.value(valueWrapper.getBuffer());
720: builder.flags(valueWrapper.getType().flags);
721: valueWrapper.recycle();
722: builder.expirationInSecs(expirationInSecs);
723: final MemcachedRequest request = builder.build();
724:
725: final SocketAddress address = consistentHash.get(keyBuffer.toByteBuffer());
726: if (address == null) {
727: builder.recycle();
728: return false;
729: }
730: try {
731: if (noReply) {
732: sendNoReply(address, request);
733: return true;
734: } else {
735: final Object result = send(address, request, writeTimeoutInMillis, responseTimeoutInMillis);
736: if (result instanceof Boolean) {
737: return (Boolean) result;
738: } else {
739: return false;
740: }
741: }
742: } catch (InterruptedException ie) {
743: Thread.currentThread().interrupt();
744: if (logger.isLoggable(Level.SEVERE)) {
745: logger.log(Level.SEVERE, "failed to replace. address=" + address + ", request=" + request, ie);
746: }
747: return false;
748: } catch (Exception e) {
749: if (logger.isLoggable(Level.SEVERE)) {
750: logger.log(Level.SEVERE, "failed to replace. address=" + address + ", request=" + request, e);
751: }
752: return false;
753: } finally {
754: builder.recycle();
755: }
756: }
757:
758: @Override
759: public boolean cas(final K key, final V value, final int expirationInSecs, final long cas, final boolean noReply) {
760: return cas(key, value, expirationInSecs, cas, noReply, writeTimeoutInMillis, responseTimeoutInMillis);
761: }
762:
763: @Override
764: public boolean cas(final K key, final V value, final int expirationInSecs, final long cas, final boolean noReply, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
765: if (key == null || value == null) {
766: return false;
767: }
768: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(true, true, true);
769: builder.op(noReply ? CommandOpcodes.SetQ : CommandOpcodes.Set);
770: builder.noReply(noReply);
771: builder.opaque(noReply ? generateOpaque() : 0);
772: builder.cas(cas);
773: builder.originKey(key);
774: final BufferWrapper<K> keyWrapper = BufferWrapper.wrap(key, transport.getMemoryManager());
775: final Buffer keyBuffer = keyWrapper.getBuffer();
776: builder.key(keyBuffer);
777: keyWrapper.recycle();
778: final BufferWrapper valueWrapper = BufferWrapper.wrap(value, transport.getMemoryManager());
779: builder.value(valueWrapper.getBuffer());
780: builder.flags(valueWrapper.getType().flags);
781: valueWrapper.recycle();
782: builder.expirationInSecs(expirationInSecs);
783: final MemcachedRequest request = builder.build();
784:
785: final SocketAddress address = consistentHash.get(keyBuffer.toByteBuffer());
786: if (address == null) {
787: builder.recycle();
788: return false;
789: }
790: try {
791: if (noReply) {
792: sendNoReply(address, request);
793: return true;
794: } else {
795: final Object result = send(address, request, writeTimeoutInMillis, responseTimeoutInMillis);
796: if (result instanceof Boolean) {
797: return (Boolean) result;
798: } else {
799: return false;
800: }
801: }
802: } catch (InterruptedException ie) {
803: Thread.currentThread().interrupt();
804: if (logger.isLoggable(Level.SEVERE)) {
805: logger.log(Level.SEVERE, "failed to set with cas. address=" + address + ", request=" + request, ie);
806: }
807: return false;
808: } catch (Exception e) {
809: if (logger.isLoggable(Level.SEVERE)) {
810: logger.log(Level.SEVERE, "failed to set with cas. address=" + address + ", request=" + request, e);
811: }
812: return false;
813: } finally {
814: builder.recycle();
815: }
816: }
817:
818: @Override
819: public Map<K, Boolean> casMulti(final Map<K, ValueWithCas<V>> map, final int expirationInSecs) {
820: return casMulti(map, expirationInSecs, writeTimeoutInMillis, responseTimeoutInMillis);
821: }
822:
823: @Override
824: public Map<K, Boolean> casMulti(final Map<K, ValueWithCas<V>> map, final int expirationInSecs, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
825: final Map<K, Boolean> result = new HashMap<K, Boolean>();
826: if (map == null || map.isEmpty()) {
827: return result;
828: }
829:
830: // categorize keys by address
831: final Map<SocketAddress, List<BufferWrapper<K>>> categorizedMap = new HashMap<SocketAddress, List<BufferWrapper<K>>>();
832: for (K key : map.keySet()) {
833: final BufferWrapper<K> keyWrapper = BufferWrapper.wrap(key, transport.getMemoryManager());
834: final Buffer keyBuffer = keyWrapper.getBuffer();
835: final SocketAddress address = consistentHash.get(keyBuffer.toByteBuffer());
836: if (address == null) {
837: if (logger.isLoggable(Level.WARNING)) {
838: logger.log(Level.WARNING, "failed to get the address from the consistent hash in casMulti(). key buffer={0}", keyBuffer);
839: }
840: keyWrapper.recycle();
841: continue;
842: }
843: List<BufferWrapper<K>> keyList = categorizedMap.get(address);
844: if (keyList == null) {
845: keyList = new ArrayList<BufferWrapper<K>>();
846: categorizedMap.put(address, keyList);
847: }
848: keyList.add(keyWrapper);
849: }
850:
851: // cas multi from server
852: for (Map.Entry<SocketAddress, List<BufferWrapper<K>>> entry : categorizedMap.entrySet()) {
853: final SocketAddress address = entry.getKey();
854: final List<BufferWrapper<K>> keyList = entry.getValue();
855: try {
856: sendCasMulti(entry.getKey(), keyList, map, expirationInSecs, writeTimeoutInMillis, responseTimeoutInMillis, result);
857: } catch (InterruptedException ie) {
858: Thread.currentThread().interrupt();
859: if (logger.isLoggable(Level.SEVERE)) {
860: logger.log(Level.SEVERE, "failed to execute casMulti(). address=" + address + ", keySize=" + keyList.size(), ie);
861: } else if (logger.isLoggable(Level.FINER)) {
862: logger.log(Level.FINER, "failed to execute casMulti(). address=" + address + ", keyList=" + keyList, ie);
863: }
864: } catch (Exception e) {
865: if (logger.isLoggable(Level.SEVERE)) {
866: logger.log(Level.SEVERE, "failed to execute casMulti(). address=" + address + ", keySize=" + keyList.size(), e);
867: } else if (logger.isLoggable(Level.FINER)) {
868: logger.log(Level.FINER, "failed to execute casMulti(). address=" + address + ", keyList=" + keyList, e);
869: }
870: } finally {
871: recycleBufferWrappers(keyList);
872: }
873: }
874: return result;
875: }
876:
877: @Override
878: public boolean append(final K key, final V value, final boolean noReply) {
879: return append(key, value, noReply, writeTimeoutInMillis, responseTimeoutInMillis);
880: }
881:
882: @Override
883: public boolean append(final K key, final V value, final boolean noReply, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
884: if (key == null || value == null) {
885: return false;
886: }
887: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(false, true, true);
888: builder.op(noReply ? CommandOpcodes.AppendQ : CommandOpcodes.Append);
889: builder.noReply(noReply);
890: builder.opaque(noReply ? generateOpaque() : 0);
891: builder.originKey(key);
892: final BufferWrapper<K> keyWrapper = BufferWrapper.wrap(key, transport.getMemoryManager());
893: final Buffer keyBuffer = keyWrapper.getBuffer();
894: builder.key(keyBuffer);
895: keyWrapper.recycle();
896: final BufferWrapper valueWrapper = BufferWrapper.wrap(value, transport.getMemoryManager());
897: builder.value(valueWrapper.getBuffer());
898: valueWrapper.recycle();
899: final MemcachedRequest request = builder.build();
900:
901: final SocketAddress address = consistentHash.get(keyBuffer.toByteBuffer());
902: if (address == null) {
903: builder.recycle();
904: return false;
905: }
906: try {
907: if (noReply) {
908: sendNoReply(address, request);
909: return true;
910: } else {
911: final Object result = send(address, request, writeTimeoutInMillis, responseTimeoutInMillis);
912: if (result instanceof Boolean) {
913: return (Boolean) result;
914: } else {
915: return false;
916: }
917: }
918: } catch (InterruptedException ie) {
919: Thread.currentThread().interrupt();
920: if (logger.isLoggable(Level.SEVERE)) {
921: logger.log(Level.SEVERE, "failed to append. address=" + address + ", request=" + request, ie);
922: }
923: return false;
924: } catch (Exception e) {
925: if (logger.isLoggable(Level.SEVERE)) {
926: logger.log(Level.SEVERE, "failed to append. address=" + address + ", request=" + request, e);
927: }
928: return false;
929: } finally {
930: builder.recycle();
931: }
932: }
933:
934: @Override
935: public boolean prepend(final K key, final V value, final boolean noReply) {
936: return prepend(key, value, noReply, writeTimeoutInMillis, responseTimeoutInMillis);
937: }
938:
939: @Override
940: public boolean prepend(final K key, final V value, final boolean noReply, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
941: if (key == null || value == null) {
942: return false;
943: }
944: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(false, true, true);
945: builder.op(noReply ? CommandOpcodes.PrependQ : CommandOpcodes.Prepend);
946: builder.noReply(noReply);
947: builder.opaque(noReply ? generateOpaque() : 0);
948: builder.originKey(key);
949: final BufferWrapper<K> keyWrapper = BufferWrapper.wrap(key, transport.getMemoryManager());
950: final Buffer keyBuffer = keyWrapper.getBuffer();
951: builder.key(keyBuffer);
952: keyWrapper.recycle();
953: final BufferWrapper valueWrapper = BufferWrapper.wrap(value, transport.getMemoryManager());
954: builder.value(valueWrapper.getBuffer());
955: valueWrapper.recycle();
956: final MemcachedRequest request = builder.build();
957:
958: final SocketAddress address = consistentHash.get(keyBuffer.toByteBuffer());
959: if (address == null) {
960: builder.recycle();
961: return false;
962: }
963: try {
964: if (noReply) {
965: sendNoReply(address, request);
966: return true;
967: } else {
968: final Object result = send(address, request, writeTimeoutInMillis, responseTimeoutInMillis);
969: if (result instanceof Boolean) {
970: return (Boolean) result;
971: } else {
972: return false;
973: }
974: }
975: } catch (InterruptedException ie) {
976: Thread.currentThread().interrupt();
977: if (logger.isLoggable(Level.SEVERE)) {
978: logger.log(Level.SEVERE, "failed to prepend. address=" + address + ", request=" + request, ie);
979: }
980: return false;
981: } catch (Exception e) {
982: if (logger.isLoggable(Level.SEVERE)) {
983: logger.log(Level.SEVERE, "failed to prepend. address=" + address + ", request=" + request, e);
984: }
985: return false;
986: } finally {
987: builder.recycle();
988: }
989: }
990:
991: @Override
992: public Map<K, V> getMulti(final Set<K> keys) {
993: return getMulti(keys, writeTimeoutInMillis, responseTimeoutInMillis);
994: }
995:
996: @Override
997: public Map<K, V> getMulti(final Set<K> keys, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
998: final Map<K, V> result = new HashMap<K, V>();
999: if (keys == null || keys.isEmpty()) {
1000: return result;
1001: }
1002:
1003: // categorize keys by address
1004: final Map<SocketAddress, List<BufferWrapper<K>>> categorizedMap = new HashMap<SocketAddress, List<BufferWrapper<K>>>();
1005: for (K key : keys) {
1006: final BufferWrapper<K> keyWrapper = BufferWrapper.wrap(key, transport.getMemoryManager());
1007: final Buffer keyBuffer = keyWrapper.getBuffer();
1008: final SocketAddress address = consistentHash.get(keyBuffer.toByteBuffer());
1009: if (address == null) {
1010: if (logger.isLoggable(Level.WARNING)) {
1011: logger.log(Level.WARNING, "failed to get the address from the consistent hash in getMulti(). key buffer={0}", keyBuffer);
1012: }
1013: keyWrapper.recycle();
1014: continue;
1015: }
1016: List<BufferWrapper<K>> keyList = categorizedMap.get(address);
1017: if (keyList == null) {
1018: keyList = new ArrayList<BufferWrapper<K>>();
1019: categorizedMap.put(address, keyList);
1020: }
1021: keyList.add(keyWrapper);
1022: }
1023:
1024: // get multi from server
1025: for (Map.Entry<SocketAddress, List<BufferWrapper<K>>> entry : categorizedMap.entrySet()) {
1026: final SocketAddress address = entry.getKey();
1027: final List<BufferWrapper<K>> keyList = entry.getValue();
1028: try {
1029: sendGetMulti(entry.getKey(), keyList, writeTimeoutInMillis, responseTimeoutInMillis, result);
1030: } catch (InterruptedException ie) {
1031: Thread.currentThread().interrupt();
1032: if (logger.isLoggable(Level.SEVERE)) {
1033: logger.log(Level.SEVERE, "failed to execute getMulti(). address=" + address + ", keySize=" + keyList.size(), ie);
1034: } else if (logger.isLoggable(Level.FINER)) {
1035: logger.log(Level.FINER, "failed to execute getMulti(). address=" + address + ", keyList=" + keyList, ie);
1036: }
1037: } catch (Exception e) {
1038: if (logger.isLoggable(Level.SEVERE)) {
1039: logger.log(Level.SEVERE, "failed to execute getMulti(). address=" + address + ", keySize=" + keyList.size(), e);
1040: } else if (logger.isLoggable(Level.FINER)) {
1041: logger.log(Level.FINER, "failed to execute getMulti(). address=" + address + ", keyList=" + keyList, e);
1042: }
1043: } finally {
1044: recycleBufferWrappers(keyList);
1045: }
1046: }
1047: return result;
1048: }
1049:
1050: @Override
1051: public V get(final K key, final boolean noReply) {
1052: return get(key, noReply, writeTimeoutInMillis, responseTimeoutInMillis);
1053: }
1054:
1055: @SuppressWarnings("unchecked")
1056: @Override
1057: public V get(final K key, final boolean noReply, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1058: if (key == null) {
1059: return null;
1060: }
1061: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(false, true, false);
1062: builder.op(noReply ? CommandOpcodes.GetQ : CommandOpcodes.Get);
1063: builder.noReply(false);
1064: builder.opaque(noReply ? generateOpaque() : 0);
1065: builder.originKey(key);
1066: final BufferWrapper<K> keyWrapper = BufferWrapper.wrap(key, transport.getMemoryManager());
1067: final Buffer keyBuffer = keyWrapper.getBuffer();
1068: builder.key(keyBuffer);
1069: keyWrapper.recycle();
1070: final MemcachedRequest request = builder.build();
1071:
1072: final SocketAddress address = consistentHash.get(keyBuffer.toByteBuffer());
1073: if (address == null) {
1074: builder.recycle();
1075: return null;
1076: }
1077: try {
1078: final Object result = send(address, request, writeTimeoutInMillis, responseTimeoutInMillis);
1079: if (result != null) {
1080: return (V) result;
1081: } else {
1082: return null;
1083: }
1084: } catch (InterruptedException ie) {
1085: Thread.currentThread().interrupt();
1086: if (logger.isLoggable(Level.SEVERE)) {
1087: logger.log(Level.SEVERE, "failed to get. address=" + address + ", request=" + request, ie);
1088: }
1089: return null;
1090: } catch (Exception e) {
1091: if (logger.isLoggable(Level.SEVERE)) {
1092: logger.log(Level.SEVERE, "failed to get. address=" + address + ", request=" + request, e);
1093: }
1094: return null;
1095: } finally {
1096: builder.recycle();
1097: }
1098: }
1099:
1100: @Override
1101: public ValueWithKey<K, V> getKey(final K key, final boolean noReply) {
1102: return getKey(key, noReply, writeTimeoutInMillis, responseTimeoutInMillis);
1103: }
1104:
1105: @SuppressWarnings("unchecked")
1106: @Override
1107: public ValueWithKey<K, V> getKey(final K key, final boolean noReply, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1108: if (key == null) {
1109: return null;
1110: }
1111: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(false, true, false);
1112: builder.op(noReply ? CommandOpcodes.GetKQ : CommandOpcodes.GetK);
1113: builder.noReply(false);
1114: builder.opaque(noReply ? generateOpaque() : 0);
1115: builder.originKey(key);
1116: final BufferWrapper<K> keyWrapper = BufferWrapper.wrap(key, transport.getMemoryManager());
1117: final Buffer keyBuffer = keyWrapper.getBuffer();
1118: builder.key(keyBuffer);
1119: keyWrapper.recycle();
1120: final MemcachedRequest request = builder.build();
1121:
1122: final SocketAddress address = consistentHash.get(keyBuffer.toByteBuffer());
1123: if (address == null) {
1124: builder.recycle();
1125: return null;
1126: }
1127: try {
1128: final Object result = send(address, request, writeTimeoutInMillis, responseTimeoutInMillis);
1129: if (result != null) {
1130: return (ValueWithKey<K, V>) result;
1131: } else {
1132: return null;
1133: }
1134: } catch (InterruptedException ie) {
1135: Thread.currentThread().interrupt();
1136: if (logger.isLoggable(Level.SEVERE)) {
1137: logger.log(Level.SEVERE, "failed to get with key. address=" + address + ", request=" + request, ie);
1138: }
1139: return null;
1140: } catch (Exception e) {
1141: if (logger.isLoggable(Level.SEVERE)) {
1142: logger.log(Level.SEVERE, "failed to get with key. address=" + address + ", request=" + request, e);
1143: }
1144: return null;
1145: } finally {
1146: builder.recycle();
1147: }
1148: }
1149:
1150: @Override
1151: public ValueWithCas<V> gets(final K key, final boolean noReply) {
1152: return gets(key, noReply, writeTimeoutInMillis, responseTimeoutInMillis);
1153: }
1154:
1155: @SuppressWarnings("unchecked")
1156: @Override
1157: public ValueWithCas<V> gets(final K key, final boolean noReply, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1158: if (key == null) {
1159: return null;
1160: }
1161: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(false, true, false);
1162: builder.op(noReply ? CommandOpcodes.GetsQ : CommandOpcodes.Gets);
1163: builder.noReply(false);
1164: builder.opaque(noReply ? generateOpaque() : 0);
1165: builder.originKey(key);
1166: final BufferWrapper<K> keyWrapper = BufferWrapper.wrap(key, transport.getMemoryManager());
1167: final Buffer keyBuffer = keyWrapper.getBuffer();
1168: builder.key(keyBuffer);
1169: keyWrapper.recycle();
1170: final MemcachedRequest request = builder.build();
1171:
1172: final SocketAddress address = consistentHash.get(keyBuffer.toByteBuffer());
1173: if (address == null) {
1174: builder.recycle();
1175: return null;
1176: }
1177: try {
1178: final Object result = send(address, request, writeTimeoutInMillis, responseTimeoutInMillis);
1179: if (result != null) {
1180: return (ValueWithCas<V>) result;
1181: } else {
1182: return null;
1183: }
1184: } catch (InterruptedException ie) {
1185: Thread.currentThread().interrupt();
1186: if (logger.isLoggable(Level.SEVERE)) {
1187: logger.log(Level.SEVERE, "failed to get with cas. address=" + address + ", request=" + request, ie);
1188: }
1189: return null;
1190: } catch (Exception e) {
1191: if (logger.isLoggable(Level.SEVERE)) {
1192: logger.log(Level.SEVERE, "failed to get with cas. address=" + address + ", request=" + request, e);
1193: }
1194: return null;
1195: } finally {
1196: builder.recycle();
1197: }
1198: }
1199:
1200: @Override
1201: public Map<K, ValueWithCas<V>> getsMulti(final Set<K> keys) {
1202: return getsMulti(keys, writeTimeoutInMillis, responseTimeoutInMillis);
1203: }
1204:
1205: @Override
1206: public Map<K, ValueWithCas<V>> getsMulti(final Set<K> keys, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1207: final Map<K, ValueWithCas<V>> result = new HashMap<K, ValueWithCas<V>>();
1208: if (keys == null || keys.isEmpty()) {
1209: return result;
1210: }
1211:
1212: // categorize keys by address
1213: final Map<SocketAddress, List<BufferWrapper<K>>> categorizedMap = new HashMap<SocketAddress, List<BufferWrapper<K>>>();
1214: for (K key : keys) {
1215: final BufferWrapper<K> keyWrapper = BufferWrapper.wrap(key, transport.getMemoryManager());
1216: final Buffer keyBuffer = keyWrapper.getBuffer();
1217: final SocketAddress address = consistentHash.get(keyBuffer.toByteBuffer());
1218: if (address == null) {
1219: if (logger.isLoggable(Level.WARNING)) {
1220: logger.log(Level.WARNING, "failed to get the address from the consistent hash in getsMulti(). key buffer={0}", keyBuffer);
1221: }
1222: keyWrapper.recycle();
1223: continue;
1224: }
1225: List<BufferWrapper<K>> keyList = categorizedMap.get(address);
1226: if (keyList == null) {
1227: keyList = new ArrayList<BufferWrapper<K>>();
1228: categorizedMap.put(address, keyList);
1229: }
1230: keyList.add(keyWrapper);
1231: }
1232:
1233: // get multi from server
1234: for (Map.Entry<SocketAddress, List<BufferWrapper<K>>> entry : categorizedMap.entrySet()) {
1235: final SocketAddress address = entry.getKey();
1236: final List<BufferWrapper<K>> keyList = entry.getValue();
1237: try {
1238: sendGetsMulti(entry.getKey(), keyList, writeTimeoutInMillis, responseTimeoutInMillis, result);
1239: } catch (InterruptedException ie) {
1240: Thread.currentThread().interrupt();
1241: if (logger.isLoggable(Level.SEVERE)) {
1242: logger.log(Level.SEVERE, "failed to execute getsMulti(). address=" + address + ", keySize=" + keyList.size(), ie);
1243: } else if (logger.isLoggable(Level.FINER)) {
1244: logger.log(Level.FINER, "failed to execute getsMulti(). address=" + address + ", keyList=" + keyList, ie);
1245: }
1246: } catch (Exception e) {
1247: if (logger.isLoggable(Level.SEVERE)) {
1248: logger.log(Level.SEVERE, "failed to execute getsMulti(). address=" + address + ", keySize=" + keyList.size(), e);
1249: } else if (logger.isLoggable(Level.FINER)) {
1250: logger.log(Level.FINER, "failed to execute getsMulti(). address=" + address + ", keyList=" + keyList, e);
1251: }
1252: } finally {
1253: recycleBufferWrappers(keyList);
1254: }
1255: }
1256: return result;
1257: }
1258:
1259: @Override
1260: public V gat(final K key, final int expirationInSecs, final boolean noReply) {
1261: return gat(key, expirationInSecs, noReply, writeTimeoutInMillis, responseTimeoutInMillis);
1262: }
1263:
1264: @SuppressWarnings("unchecked")
1265: @Override
1266: public V gat(final K key, final int expirationInSecs, final boolean noReply, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1267: if (key == null) {
1268: return null;
1269: }
1270: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(true, true, false);
1271: builder.op(noReply ? CommandOpcodes.GATQ : CommandOpcodes.GAT);
1272: builder.noReply(false);
1273: builder.opaque(noReply ? generateOpaque() : 0);
1274: builder.originKey(key);
1275: final BufferWrapper<K> keyWrapper = BufferWrapper.wrap(key, transport.getMemoryManager());
1276: final Buffer keyBuffer = keyWrapper.getBuffer();
1277: builder.key(keyBuffer);
1278: keyWrapper.recycle();
1279: builder.expirationInSecs(expirationInSecs);
1280: final MemcachedRequest request = builder.build();
1281:
1282: final SocketAddress address = consistentHash.get(keyBuffer.toByteBuffer());
1283: if (address == null) {
1284: builder.recycle();
1285: return null;
1286: }
1287: try {
1288: final Object result = send(address, request, writeTimeoutInMillis, responseTimeoutInMillis);
1289: if (result != null) {
1290: return (V) result;
1291: } else {
1292: return null;
1293: }
1294: } catch (InterruptedException ie) {
1295: Thread.currentThread().interrupt();
1296: if (logger.isLoggable(Level.SEVERE)) {
1297: logger.log(Level.SEVERE, "failed to get and touch. address=" + address + ", request=" + request, ie);
1298: }
1299: return null;
1300: } catch (Exception e) {
1301: if (logger.isLoggable(Level.SEVERE)) {
1302: logger.log(Level.SEVERE, "failed to get and touch. address=" + address + ", request=" + request, e);
1303: }
1304: return null;
1305: } finally {
1306: builder.recycle();
1307: }
1308: }
1309:
1310: @Override
1311: public boolean delete(final K key, final boolean noReply) {
1312: return delete(key, noReply, writeTimeoutInMillis, responseTimeoutInMillis);
1313: }
1314:
1315: @Override
1316: public boolean delete(final K key, final boolean noReply, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1317: if (key == null) {
1318: return false;
1319: }
1320: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(false, true, false);
1321: builder.op(noReply ? CommandOpcodes.DeleteQ : CommandOpcodes.Delete);
1322: builder.noReply(noReply);
1323: builder.opaque(noReply ? generateOpaque() : 0);
1324: builder.originKey(key);
1325: final BufferWrapper<K> keyWrapper = BufferWrapper.wrap(key, transport.getMemoryManager());
1326: final Buffer keyBuffer = keyWrapper.getBuffer();
1327: builder.key(keyBuffer);
1328: keyWrapper.recycle();
1329: final MemcachedRequest request = builder.build();
1330:
1331: final SocketAddress address = consistentHash.get(keyBuffer.toByteBuffer());
1332: if (address == null) {
1333: builder.recycle();
1334: return false;
1335: }
1336: try {
1337: if (noReply) {
1338: sendNoReply(address, request);
1339: return true;
1340: } else {
1341: final Object result = send(address, request, writeTimeoutInMillis, responseTimeoutInMillis);
1342: if (result instanceof Boolean) {
1343: return (Boolean) result;
1344: } else {
1345: return false;
1346: }
1347: }
1348: } catch (InterruptedException ie) {
1349: Thread.currentThread().interrupt();
1350: if (logger.isLoggable(Level.SEVERE)) {
1351: logger.log(Level.SEVERE, "failed to delete. address=" + address + ", request=" + request, ie);
1352: }
1353: return false;
1354: } catch (Exception e) {
1355: if (logger.isLoggable(Level.SEVERE)) {
1356: logger.log(Level.SEVERE, "failed to delete. address=" + address + ", request=" + request, e);
1357: }
1358: return false;
1359: } finally {
1360: builder.recycle();
1361: }
1362: }
1363:
1364: @Override
1365: public Map<K, Boolean> deleteMulti(final Set<K> keys) {
1366: return deleteMulti(keys, writeTimeoutInMillis, responseTimeoutInMillis);
1367: }
1368:
1369: @Override
1370: public Map<K, Boolean> deleteMulti(final Set<K> keys, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1371: final Map<K, Boolean> result = new HashMap<K, Boolean>();
1372: if (keys == null || keys.isEmpty()) {
1373: return result;
1374: }
1375:
1376: // categorize keys by address
1377: final Map<SocketAddress, List<BufferWrapper<K>>> categorizedMap = new HashMap<SocketAddress, List<BufferWrapper<K>>>();
1378: for (K key : keys) {
1379: final BufferWrapper<K> keyWrapper = BufferWrapper.wrap(key, transport.getMemoryManager());
1380: final Buffer keyBuffer = keyWrapper.getBuffer();
1381: final SocketAddress address = consistentHash.get(keyBuffer.toByteBuffer());
1382: if (address == null) {
1383: if (logger.isLoggable(Level.WARNING)) {
1384: logger.log(Level.WARNING, "failed to get the address from the consistent hash in deleteMulti(). key buffer={0}", keyBuffer);
1385: }
1386: keyWrapper.recycle();
1387: continue;
1388: }
1389: List<BufferWrapper<K>> keyList = categorizedMap.get(address);
1390: if (keyList == null) {
1391: keyList = new ArrayList<BufferWrapper<K>>();
1392: categorizedMap.put(address, keyList);
1393: }
1394: keyList.add(keyWrapper);
1395: }
1396:
1397: // delete multi from server
1398: for (Map.Entry<SocketAddress, List<BufferWrapper<K>>> entry : categorizedMap.entrySet()) {
1399: final SocketAddress address = entry.getKey();
1400: final List<BufferWrapper<K>> keyList = entry.getValue();
1401: try {
1402: sendDeleteMulti(entry.getKey(), keyList, writeTimeoutInMillis, responseTimeoutInMillis, result);
1403: } catch (InterruptedException ie) {
1404: Thread.currentThread().interrupt();
1405: if (logger.isLoggable(Level.SEVERE)) {
1406: logger.log(Level.SEVERE, "failed to execute deleteMulti(). address=" + address + ", keySize=" + keyList.size(), ie);
1407: } else if (logger.isLoggable(Level.FINER)) {
1408: logger.log(Level.FINER, "failed to execute deleteMulti(). address=" + address + ", keyList=" + keyList, ie);
1409: }
1410: } catch (Exception e) {
1411: if (logger.isLoggable(Level.SEVERE)) {
1412: logger.log(Level.SEVERE, "failed to execute deleteMulti(). address=" + address + ", keySize=" + keyList.size(), e);
1413: } else if (logger.isLoggable(Level.FINER)) {
1414: logger.log(Level.FINER, "failed to execute deleteMulti(). address=" + address + ", keyList=" + keyList, e);
1415: }
1416: } finally {
1417: recycleBufferWrappers(keyList);
1418: }
1419: }
1420: return result;
1421: }
1422:
1423: @Override
1424: public long incr(final K key, final long delta, final long initial, final int expirationInSecs, final boolean noReply) {
1425: return incr(key, delta, initial, expirationInSecs, noReply, writeTimeoutInMillis, responseTimeoutInMillis);
1426: }
1427:
1428: @Override
1429: public long incr(final K key, final long delta, final long initial, final int expirationInSecs, final boolean noReply, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1430: if (key == null) {
1431: return -1;
1432: }
1433: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(true, true, false);
1434: builder.op(noReply ? CommandOpcodes.IncrementQ : CommandOpcodes.Increment);
1435: builder.noReply(noReply);
1436: builder.opaque(noReply ? generateOpaque() : 0);
1437: builder.originKey(key);
1438: final BufferWrapper<K> keyWrapper = BufferWrapper.wrap(key, transport.getMemoryManager());
1439: final Buffer keyBuffer = keyWrapper.getBuffer();
1440: builder.key(keyBuffer);
1441: keyWrapper.recycle();
1442: builder.delta(delta);
1443: builder.initial(initial);
1444: builder.expirationInSecs(expirationInSecs);
1445: final MemcachedRequest request = builder.build();
1446:
1447: final SocketAddress address = consistentHash.get(keyBuffer.toByteBuffer());
1448: if (address == null) {
1449: builder.recycle();
1450: return -1;
1451: }
1452: try {
1453: if (noReply) {
1454: sendNoReply(address, request);
1455: return -1;
1456: } else {
1457: final Object result = send(address, request, writeTimeoutInMillis, responseTimeoutInMillis);
1458: if (result != null) {
1459: return (Long) result;
1460: } else {
1461: return -1;
1462: }
1463: }
1464: } catch (InterruptedException ie) {
1465: Thread.currentThread().interrupt();
1466: if (logger.isLoggable(Level.SEVERE)) {
1467: logger.log(Level.SEVERE, "failed to increase. address=" + address + ", request=" + request, ie);
1468: }
1469: return -1;
1470: } catch (Exception e) {
1471: if (logger.isLoggable(Level.SEVERE)) {
1472: logger.log(Level.SEVERE, "failed to increase. address=" + address + ", request=" + request, e);
1473: }
1474: return -1;
1475: } finally {
1476: builder.recycle();
1477: }
1478: }
1479:
1480: @Override
1481: public long decr(final K key, final long delta, final long initial, final int expirationInSecs, final boolean noReply) {
1482: return decr(key, delta, initial, expirationInSecs, noReply, writeTimeoutInMillis, responseTimeoutInMillis);
1483: }
1484:
1485: @Override
1486: public long decr(final K key, final long delta, final long initial, final int expirationInSecs, final boolean noReply, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1487: if (key == null) {
1488: return -1;
1489: }
1490: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(true, true, false);
1491: builder.op(noReply ? CommandOpcodes.DecrementQ : CommandOpcodes.Decrement);
1492: builder.noReply(noReply);
1493: builder.opaque(noReply ? generateOpaque() : 0);
1494: builder.originKey(key);
1495: final BufferWrapper<K> keyWrapper = BufferWrapper.wrap(key, transport.getMemoryManager());
1496: final Buffer keyBuffer = keyWrapper.getBuffer();
1497: builder.key(keyBuffer);
1498: keyWrapper.recycle();
1499: builder.delta(delta);
1500: builder.initial(initial);
1501: builder.expirationInSecs(expirationInSecs);
1502: final MemcachedRequest request = builder.build();
1503:
1504: final SocketAddress address = consistentHash.get(keyBuffer.toByteBuffer());
1505: if (address == null) {
1506: builder.recycle();
1507: return -1;
1508: }
1509: try {
1510: if (noReply) {
1511: sendNoReply(address, request);
1512: return -1;
1513: } else {
1514: final Object result = send(address, request, writeTimeoutInMillis, responseTimeoutInMillis);
1515: if (result != null) {
1516: return (Long) result;
1517: } else {
1518: return -1;
1519: }
1520: }
1521: } catch (InterruptedException ie) {
1522: Thread.currentThread().interrupt();
1523: if (logger.isLoggable(Level.SEVERE)) {
1524: logger.log(Level.SEVERE, "failed to decrease. address=" + address + ", request=" + request, ie);
1525: }
1526: return -1;
1527: } catch (Exception e) {
1528: if (logger.isLoggable(Level.SEVERE)) {
1529: logger.log(Level.SEVERE, "failed to decrease. address=" + address + ", request=" + request, e);
1530: }
1531: return -1;
1532: } finally {
1533: builder.recycle();
1534: }
1535: }
1536:
1537: @Override
1538: public String saslAuth(final SocketAddress address, final String mechanism, final byte[] data) {
1539: return saslAuth(address, mechanism, data, writeTimeoutInMillis, responseTimeoutInMillis);
1540: }
1541:
1542: @Override
1543: public String saslAuth(final SocketAddress address, final String mechanism, final byte[] data, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1544: throw new UnsupportedOperationException();
1545: }
1546:
1547: @Override
1548: public String saslStep(final SocketAddress address, final String mechanism, final byte[] data) {
1549: return saslStep(address, mechanism, data, writeTimeoutInMillis, responseTimeoutInMillis);
1550: }
1551:
1552: @Override
1553: public String saslStep(final SocketAddress address, final String mechanism, final byte[] data, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1554: throw new UnsupportedOperationException();
1555: }
1556:
1557: @Override
1558: public String saslList(final SocketAddress address) {
1559: return saslList(address, writeTimeoutInMillis, responseTimeoutInMillis);
1560: }
1561:
1562: @Override
1563: public String saslList(final SocketAddress address, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1564: throw new UnsupportedOperationException();
1565: }
1566:
1567: @Override
1568: public Map<String, String> stats(final SocketAddress address) {
1569: return stats(address, writeTimeoutInMillis, responseTimeoutInMillis);
1570: }
1571:
1572: @Override
1573: public Map<String, String> stats(final SocketAddress address, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1574: return statsItems(address, null, writeTimeoutInMillis, responseTimeoutInMillis);
1575: }
1576:
1577: @Override
1578: public Map<String, String> statsItems(final SocketAddress address, final String item) {
1579: return statsItems(address, item, writeTimeoutInMillis, responseTimeoutInMillis);
1580: }
1581:
1582: @SuppressWarnings("unchecked")
1583: @Override
1584: public Map<String, String> statsItems(final SocketAddress address, final String item, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1585: if (address == null) {
1586: return null;
1587: }
1588: if (connectionPool == null) {
1589: throw new IllegalStateException("connection pool must not be null");
1590: }
1591: if (clientFilter == null) {
1592: throw new IllegalStateException("client filter must not be null");
1593: }
1594:
1595: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(false, item != null, false);
1596: builder.op(CommandOpcodes.Stat);
1597: builder.noReply(false);
1598: if (item != null) {
1599: builder.originKey(item);
1600: final BufferWrapper<String> keyWrapper = BufferWrapper.wrap(item, transport.getMemoryManager());
1601: final Buffer keyBuffer = keyWrapper.getBuffer();
1602: builder.key(keyBuffer);
1603: keyWrapper.recycle();
1604: }
1605: final MemcachedRequest request = builder.build();
1606:
1607: final Connection<SocketAddress> connection;
1608: try {
1609: connection = connectionPool.borrowObject(address, connectTimeoutInMillis);
1610: } catch (PoolExhaustedException pee) {
1611: if (logger.isLoggable(Level.SEVERE)) {
1612: logger.log(Level.SEVERE, "failed to get the stats. address=" + address + ", timeout=" + connectTimeoutInMillis + "ms", pee);
1613: }
1614: return null;
1615: } catch (NoValidObjectException nvoe) {
1616: if (logger.isLoggable(Level.SEVERE)) {
1617: logger.log(Level.SEVERE, "failed to get the stats. address=" + address + ", timeout=" + connectTimeoutInMillis + "ms", nvoe);
1618: }
1619: return null;
1620: } catch (TimeoutException te) {
1621: if (logger.isLoggable(Level.SEVERE)) {
1622: logger.log(Level.SEVERE, "failed to get the stats. address=" + address + ", timeout=" + connectTimeoutInMillis + "ms", te);
1623: }
1624: return null;
1625: } catch (InterruptedException ie) {
1626: if (logger.isLoggable(Level.SEVERE)) {
1627: logger.log(Level.SEVERE, "failed to get the stats. address=" + address + ", timeout=" + connectTimeoutInMillis + "ms", ie);
1628: }
1629: return null;
1630: }
1631: try {
1632: final GrizzlyFuture<WriteResult<MemcachedRequest[], SocketAddress>> future = connection.write(new MemcachedRequest[]{request});
1633: try {
1634: if (writeTimeoutInMillis > 0) {
1635: future.get(writeTimeoutInMillis, TimeUnit.MILLISECONDS);
1636: } else {
1637: future.get();
1638: }
1639: } catch (ExecutionException ee) {
1640: if (logger.isLoggable(Level.SEVERE)) {
1641: logger.log(Level.SEVERE, "failed to get the stats. address=" + address + ", request=" + request, ee);
1642: }
1643: return null;
1644: } catch (TimeoutException te) {
1645: if (logger.isLoggable(Level.SEVERE)) {
1646: logger.log(Level.SEVERE, "failed to get the stats. address=" + address + ", request=" + request, te);
1647: }
1648: return null;
1649: } catch (InterruptedException ie) {
1650: Thread.currentThread().interrupt();
1651: if (logger.isLoggable(Level.SEVERE)) {
1652: logger.log(Level.SEVERE, "failed to get the stats. address=" + address + ", request=" + request, ie);
1653: }
1654: return null;
1655: } finally {
1656: builder.recycle();
1657: }
1658:
1659: final Map<String, String> stats = new HashMap<String, String>();
1660: while (true) {
1661: final ValueWithKey<String, String> value;
1662: try {
1663: value = clientFilter.getCorrelatedResponse(connection, request, responseTimeoutInMillis);
1664: } catch (TimeoutException te) {
1665: if (logger.isLoggable(Level.SEVERE)) {
1666: logger.log(Level.SEVERE, "failed to get the stats. timeout=" + responseTimeoutInMillis + "ms", te);
1667: }
1668: break;
1669: } catch (InterruptedException ie) {
1670: if (logger.isLoggable(Level.SEVERE)) {
1671: logger.log(Level.SEVERE, "failed to get the stats. timeout=" + responseTimeoutInMillis + "ms", ie);
1672: }
1673: break;
1674: }
1675: if (value != null) {
1676: final String statKey = value.getKey();
1677: final String statValue = value.getValue();
1678: if (statKey != null && statValue != null) {
1679: stats.put(statKey, statValue);
1680: } else {
1681: break;
1682: }
1683: } else {
1684: break;
1685: }
1686: }
1687: return stats;
1688: } finally {
1689: returnConnectionSafely(address, connection);
1690: }
1691: }
1692:
1693: @Override
1694: public boolean quit(final SocketAddress address, final boolean noReply) {
1695: return quit(address, noReply, writeTimeoutInMillis, responseTimeoutInMillis);
1696: }
1697:
1698: @Override
1699: public boolean quit(final SocketAddress address, final boolean noReply, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1700: if (address == null) {
1701: return false;
1702: }
1703: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(false, false, false);
1704: builder.op(noReply ? CommandOpcodes.QuitQ : CommandOpcodes.Quit);
1705: builder.noReply(noReply);
1706: final MemcachedRequest request = builder.build();
1707:
1708: try {
1709: if (noReply) {
1710: sendNoReply(address, request);
1711: return true;
1712: } else {
1713: final Object result = send(address, request, writeTimeoutInMillis, responseTimeoutInMillis);
1714: if (result instanceof Boolean) {
1715: return (Boolean) result;
1716: } else {
1717: return false;
1718: }
1719: }
1720: } catch (InterruptedException ie) {
1721: Thread.currentThread().interrupt();
1722: if (logger.isLoggable(Level.SEVERE)) {
1723: logger.log(Level.SEVERE, "failed to quit. address=" + address + ", request=" + request, ie);
1724: }
1725: return false;
1726: } catch (Exception e) {
1727: if (logger.isLoggable(Level.SEVERE)) {
1728: logger.log(Level.SEVERE, "failed to quit. address=" + address + ", request=" + request, e);
1729: }
1730: return false;
1731: } finally {
1732: builder.recycle();
1733: }
1734: }
1735:
1736: @Override
1737: public boolean flushAll(final SocketAddress address, final int expirationInSecs, final boolean noReply) {
1738: return flushAll(address, expirationInSecs, noReply, writeTimeoutInMillis, responseTimeoutInMillis);
1739: }
1740:
1741: @Override
1742: public boolean flushAll(final SocketAddress address, final int expirationInSecs, final boolean noReply, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1743: if (address == null) {
1744: return false;
1745: }
1746: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(expirationInSecs > 0, false, false);
1747: builder.op(noReply ? CommandOpcodes.FlushQ : CommandOpcodes.Flush);
1748: builder.noReply(noReply);
1749: builder.expirationInSecs(expirationInSecs);
1750: final MemcachedRequest request = builder.build();
1751:
1752: try {
1753: if (noReply) {
1754: sendNoReply(address, request);
1755: return true;
1756: } else {
1757: final Object result = send(address, request, writeTimeoutInMillis, responseTimeoutInMillis);
1758: if (result instanceof Boolean) {
1759: return (Boolean) result;
1760: } else {
1761: return false;
1762: }
1763: }
1764: } catch (InterruptedException ie) {
1765: Thread.currentThread().interrupt();
1766: if (logger.isLoggable(Level.SEVERE)) {
1767: logger.log(Level.SEVERE, "failed to flush. address=" + address + ", request=" + request, ie);
1768: }
1769: return false;
1770: } catch (Exception e) {
1771: if (logger.isLoggable(Level.SEVERE)) {
1772: logger.log(Level.SEVERE, "failed to flush. address=" + address + ", request=" + request, e);
1773: }
1774: return false;
1775: } finally {
1776: builder.recycle();
1777: }
1778: }
1779:
1780: @Override
1781: public boolean touch(final K key, final int expirationInSecs) {
1782: return touch(key, expirationInSecs, writeTimeoutInMillis, responseTimeoutInMillis);
1783: }
1784:
1785: @Override
1786: public boolean touch(final K key, final int expirationInSecs, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1787: if (key == null) {
1788: return false;
1789: }
1790: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(true, true, false);
1791: builder.op(CommandOpcodes.Touch);
1792: builder.noReply(false);
1793: builder.originKey(key);
1794: final BufferWrapper<K> keyWrapper = BufferWrapper.wrap(key, transport.getMemoryManager());
1795: final Buffer keyBuffer = keyWrapper.getBuffer();
1796: builder.key(keyBuffer);
1797: keyWrapper.recycle();
1798: builder.expirationInSecs(expirationInSecs);
1799: final MemcachedRequest request = builder.build();
1800:
1801: final SocketAddress address = consistentHash.get(keyBuffer.toByteBuffer());
1802: if (address == null) {
1803: builder.recycle();
1804: return false;
1805: }
1806: try {
1807: final Object result = send(address, request, writeTimeoutInMillis, responseTimeoutInMillis);
1808: if (result instanceof Boolean) {
1809: return (Boolean) result;
1810: } else {
1811: return false;
1812: }
1813: } catch (InterruptedException ie) {
1814: Thread.currentThread().interrupt();
1815: if (logger.isLoggable(Level.SEVERE)) {
1816: logger.log(Level.SEVERE, "failed to touch. address=" + address + ", request=" + request, ie);
1817: }
1818: return false;
1819: } catch (Exception e) {
1820: if (logger.isLoggable(Level.SEVERE)) {
1821: logger.log(Level.SEVERE, "failed to touch. address=" + address + ", request=" + request, e);
1822: }
1823: return false;
1824: } finally {
1825: builder.recycle();
1826: }
1827: }
1828:
1829: @Override
1830: public boolean noop(final SocketAddress address) {
1831: return noop(address, writeTimeoutInMillis, responseTimeoutInMillis);
1832: }
1833:
1834: @Override
1835: public boolean noop(final SocketAddress address, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1836: if (address == null) {
1837: return false;
1838: }
1839: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(false, false, false);
1840: builder.op(CommandOpcodes.Noop);
1841: builder.opaque(generateOpaque());
1842: builder.noReply(false);
1843: final MemcachedRequest request = builder.build();
1844:
1845: try {
1846: final Object result = send(address, request, writeTimeoutInMillis, responseTimeoutInMillis);
1847: if (result instanceof Boolean) {
1848: return (Boolean) result;
1849: } else {
1850: return false;
1851: }
1852: } catch (InterruptedException ie) {
1853: Thread.currentThread().interrupt();
1854: if (logger.isLoggable(Level.SEVERE)) {
1855: logger.log(Level.SEVERE, "failed to execute the noop operation. address=" + address + ", request=" + request, ie);
1856: }
1857: return false;
1858: } catch (Exception e) {
1859: if (logger.isLoggable(Level.SEVERE)) {
1860: logger.log(Level.SEVERE, "failed to execute the noop operation. address=" + address + ", request=" + request, e);
1861: }
1862: return false;
1863: } finally {
1864: builder.recycle();
1865: }
1866: }
1867:
1868: @Override
1869: public boolean verbosity(final SocketAddress address, final int verbosity) {
1870: return verbosity(address, verbosity, writeTimeoutInMillis, responseTimeoutInMillis);
1871: }
1872:
1873: @Override
1874: public boolean verbosity(final SocketAddress address, final int verbosity, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1875: if (address == null) {
1876: return false;
1877: }
1878: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(true, false, false);
1879: builder.op(CommandOpcodes.Verbosity);
1880: builder.noReply(false);
1881: builder.verbosity(verbosity);
1882: final MemcachedRequest request = builder.build();
1883:
1884: try {
1885: final Object result = send(address, request, writeTimeoutInMillis, responseTimeoutInMillis);
1886: if (result instanceof Boolean) {
1887: return (Boolean) result;
1888: } else {
1889: return false;
1890: }
1891: } catch (InterruptedException ie) {
1892: Thread.currentThread().interrupt();
1893: if (logger.isLoggable(Level.SEVERE)) {
1894: logger.log(Level.SEVERE, "failed to execute the vebosity operation. address=" + address + ", request=" + request, ie);
1895: }
1896: return false;
1897: } catch (Exception e) {
1898: if (logger.isLoggable(Level.SEVERE)) {
1899: logger.log(Level.SEVERE, "failed to execute the vebosity operation. address=" + address + ", request=" + request, e);
1900: }
1901: return false;
1902: } finally {
1903: builder.recycle();
1904: }
1905: }
1906:
1907: @Override
1908: public String version(final SocketAddress address) {
1909: return version(address, writeTimeoutInMillis, responseTimeoutInMillis);
1910: }
1911:
1912: @Override
1913: public String version(final SocketAddress address, final long writeTimeoutInMillis, final long responseTimeoutInMillis) {
1914: if (address == null) {
1915: return null;
1916: }
1917: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(false, false, false);
1918: builder.op(CommandOpcodes.Version);
1919: builder.noReply(false);
1920: final MemcachedRequest request = builder.build();
1921:
1922: try {
1923: final Object result = send(address, request, writeTimeoutInMillis, responseTimeoutInMillis);
1924: if (result instanceof String) {
1925: return (String) result;
1926: } else {
1927: return null;
1928: }
1929: } catch (InterruptedException ie) {
1930: Thread.currentThread().interrupt();
1931: if (logger.isLoggable(Level.SEVERE)) {
1932: logger.log(Level.SEVERE, "failed to execute the version operation. address=" + address + ", request=" + request, ie);
1933: }
1934: return null;
1935: } catch (Exception e) {
1936: if (logger.isLoggable(Level.SEVERE)) {
1937: logger.log(Level.SEVERE, "failed to execute the version operation. address=" + address + ", request=" + request, e);
1938: }
1939: return null;
1940: } finally {
1941: builder.recycle();
1942: }
1943: }
1944:
1945: private boolean validateConnectionWithNoopCommand(final Connection<SocketAddress> connection) {
1946: if (connection == null) {
1947: return false;
1948: }
1949: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(false, false, false);
1950: builder.op(CommandOpcodes.Noop);
1951: builder.opaque(generateOpaque());
1952: builder.noReply(false);
1953: final MemcachedRequest request = builder.build();
1954:
1955: try {
1956: final GrizzlyFuture<WriteResult<MemcachedRequest[], SocketAddress>> future = connection.write(new MemcachedRequest[]{request});
1957: if (writeTimeoutInMillis > 0) {
1958: future.get(writeTimeoutInMillis, TimeUnit.MILLISECONDS);
1959: } else {
1960: future.get();
1961: }
1962: final Object result = clientFilter.getCorrelatedResponse(connection, request, responseTimeoutInMillis);
1963: if (result instanceof Boolean) {
1964: return (Boolean) result;
1965: } else {
1966: return false;
1967: }
1968: } catch (InterruptedException ie) {
1969: Thread.currentThread().interrupt();
1970: if (logger.isLoggable(Level.SEVERE)) {
1971: logger.log(Level.SEVERE, "failed to execute the noop operation. connection=" + connection + ", request=" + request, ie);
1972: }
1973: return false;
1974: } catch (Exception e) {
1975: if (logger.isLoggable(Level.SEVERE)) {
1976: logger.log(Level.SEVERE, "failed to execute the noop operation. connection=" + connection + ", request=" + request, e);
1977: }
1978: return false;
1979: } finally {
1980: builder.recycle();
1981: }
1982: }
1983:
1984: private boolean validateConnectionWithVersionCommand(final Connection<SocketAddress> connection) {
1985: if (connection == null) {
1986: return false;
1987: }
1988: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(false, false, false);
1989: builder.op(CommandOpcodes.Version);
1990: builder.noReply(false);
1991: final MemcachedRequest request = builder.build();
1992:
1993: try {
1994: final GrizzlyFuture<WriteResult<MemcachedRequest[], SocketAddress>> future = connection.write(new MemcachedRequest[]{request});
1995: if (writeTimeoutInMillis > 0) {
1996: future.get(writeTimeoutInMillis, TimeUnit.MILLISECONDS);
1997: } else {
1998: future.get();
1999: }
2000: if (clientFilter == null) {
2001: throw new IllegalStateException("client filter must not be null");
2002: }
2003: final Object result = clientFilter.getCorrelatedResponse(connection, request, responseTimeoutInMillis);
2004: return result instanceof String;
2005: } catch (TimeoutException te) {
2006: if (logger.isLoggable(Level.SEVERE)) {
2007: logger.log(Level.SEVERE, "failed to check the connection. connection=" + connection, te);
2008: }
2009: return false;
2010: } catch (ExecutionException ee) {
2011: if (logger.isLoggable(Level.SEVERE)) {
2012: logger.log(Level.SEVERE, "failed to check the connection. connection=" + connection, ee);
2013: }
2014: return false;
2015: } catch (InterruptedException ie) {
2016: if (logger.isLoggable(Level.SEVERE)) {
2017: logger.log(Level.SEVERE, "failed to check the connection. connection=" + connection, ie);
2018: }
2019: Thread.currentThread().interrupt();
2020: return false;
2021: } finally {
2022: builder.recycle();
2023: }
2024: }
2025:
2026: private void sendNoReply(final SocketAddress address, final MemcachedRequest request)
2027: throws PoolExhaustedException, NoValidObjectException, TimeoutException, InterruptedException {
2028: if (address == null) {
2029: throw new IllegalArgumentException("address must not be null");
2030: }
2031: if (request == null) {
2032: throw new IllegalArgumentException("request must not be null");
2033: }
2034: if (connectionPool == null) {
2035: throw new IllegalStateException("connection pool must not be null");
2036: }
2037:
2038: final Connection<SocketAddress> connection;
2039: try {
2040: connection = connectionPool.borrowObject(address, connectTimeoutInMillis);
2041: } catch (PoolExhaustedException pee) {
2042: if (logger.isLoggable(Level.FINER)) {
2043: logger.log(Level.FINER, "failed to get the connection. address=" + address + ", timeout=" + connectTimeoutInMillis + "ms", pee);
2044: }
2045: throw pee;
2046: } catch (NoValidObjectException nvoe) {
2047: if (logger.isLoggable(Level.FINER)) {
2048: logger.log(Level.FINER, "failed to get the connection. address=" + address + ", timeout=" + connectTimeoutInMillis + "ms", nvoe);
2049: }
2050: removeServer(address, false);
2051: throw nvoe;
2052: } catch (TimeoutException te) {
2053: if (logger.isLoggable(Level.FINER)) {
2054: logger.log(Level.FINER, "failed to get the connection. address=" + address + ", timeout=" + connectTimeoutInMillis + "ms", te);
2055: }
2056: throw te;
2057: } catch (InterruptedException ie) {
2058: if (logger.isLoggable(Level.FINER)) {
2059: logger.log(Level.FINER, "failed to get the connection. address=" + address + ", timeout=" + connectTimeoutInMillis + "ms", ie);
2060: }
2061: throw ie;
2062: }
2063: if (request.isNoReply()) {
2064: connection.write(new MemcachedRequest[]{request}, new CompletionHandler<WriteResult<MemcachedRequest[], SocketAddress>>() {
2065: @Override
2066: public void cancelled() {
2067: returnConnectionSafely(address, connection);
2068: if (logger.isLoggable(Level.SEVERE)) {
2069: logger.log(Level.SEVERE, "failed to send the request. request={0}, connection={1}", new Object[]{request, connection});
2070: }
2071: }
2072:
2073: @Override
2074: public void failed(Throwable t) {
2075: try {
2076: connectionPool.removeObject(address, connection);
2077: } catch (Exception e) {
2078: if (logger.isLoggable(Level.SEVERE)) {
2079: logger.log(Level.SEVERE, "failed to remove the connection. address=" + address + ", connection=" + connection, e);
2080: }
2081: }
2082: if (logger.isLoggable(Level.SEVERE)) {
2083: logger.log(Level.SEVERE, "failed to send the request. request=" + request + ", connection=" + connection, t);
2084: }
2085: }
2086:
2087: @Override
2088: public void completed(WriteResult<MemcachedRequest[], SocketAddress> result) {
2089: returnConnectionSafely(address, connection);
2090: }
2091:
2092: @Override
2093: public void updated(WriteResult<MemcachedRequest[], SocketAddress> result) {
2094: }
2095: });
2096: }
2097: }
2098:
2099: private boolean sendNoReplySafely(final Connection<SocketAddress> connection, final MemcachedRequest request) {
2100: if (connection == null) {
2101: throw new IllegalArgumentException("connection must not be null");
2102: }
2103: if (request == null) {
2104: throw new IllegalArgumentException("request must not be null");
2105: }
2106: if (request.isNoReply()) {
2107: GrizzlyFuture future = connection.write(new MemcachedRequest[]{request});
2108: try {
2109: future.get(writeTimeoutInMillis, TimeUnit.MILLISECONDS);
2110: } catch (InterruptedException ie) {
2111: if (logger.isLoggable(Level.SEVERE)) {
2112: logger.log(Level.SEVERE, "failed to check the connection. connection=" + connection, ie);
2113: }
2114: Thread.currentThread().interrupt();
2115: return false;
2116: } catch (ExecutionException ee) {
2117: if (logger.isLoggable(Level.SEVERE)) {
2118: logger.log(Level.SEVERE, "failed to check the connection. connection=" + connection, ee);
2119: }
2120: return false;
2121: } catch (TimeoutException te) {
2122: if (logger.isLoggable(Level.SEVERE)) {
2123: logger.log(Level.SEVERE, "failed to check the connection. connection=" + connection, te);
2124: }
2125: return false;
2126: }
2127: }
2128: return true;
2129: }
2130:
2131: private Object send(final SocketAddress address,
2132: final MemcachedRequest request,
2133: final long writeTimeoutInMillis,
2134: final long responseTimeoutInMillis) throws TimeoutException, InterruptedException, PoolExhaustedException, NoValidObjectException, ExecutionException {
2135: if (address == null) {
2136: throw new IllegalArgumentException("address must not be null");
2137: }
2138: if (request == null) {
2139: throw new IllegalArgumentException("request must not be null");
2140: }
2141: if (request.isNoReply()) {
2142: throw new IllegalStateException("request type is no reply");
2143: }
2144: return sendInternal(address, new MemcachedRequest[]{request}, writeTimeoutInMillis, responseTimeoutInMillis, null);
2145: }
2146:
2147: private void sendGetMulti(final SocketAddress address,
2148: final List<BufferWrapper<K>> keyList,
2149: final long writeTimeoutInMillis,
2150: final long responseTimeoutInMillis,
2151: final Map<K, V> result) throws ExecutionException, TimeoutException, InterruptedException, PoolExhaustedException, NoValidObjectException {
2152: if (address == null || keyList == null || keyList.isEmpty() || result == null) {
2153: return;
2154: }
2155:
2156: // make multi requests based on key list
2157: final MemcachedRequest[] requests = new MemcachedRequest[keyList.size()];
2158: final BufferWrapper.BufferType keyType = !keyList.isEmpty() ? keyList.get(0).getType() : null;
2159: for (int i = 0; i < keyList.size(); i++) {
2160: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(false, true, false);
2161: builder.originKeyType(keyType);
2162: builder.originKey(keyList.get(i).getOrigin());
2163: builder.key(keyList.get(i).getBuffer());
2164: if (i == keyList.size() - 1) {
2165: builder.noReply(false);
2166: builder.op(CommandOpcodes.Get);
2167: } else {
2168: builder.noReply(true);
2169: builder.op(CommandOpcodes.GetQ);
2170: builder.opaque(generateOpaque());
2171: }
2172: requests[i] = builder.build();
2173: builder.recycle();
2174: }
2175: sendInternal(address, requests, writeTimeoutInMillis, responseTimeoutInMillis, result);
2176: }
2177:
2178: private void sendGetsMulti(final SocketAddress address,
2179: final List<BufferWrapper<K>> keyList,
2180: final long writeTimeoutInMillis,
2181: final long responseTimeoutInMillis,
2182: final Map<K, ValueWithCas<V>> result) throws ExecutionException, TimeoutException, InterruptedException, PoolExhaustedException, NoValidObjectException {
2183: if (address == null || keyList == null || keyList.isEmpty() || result == null) {
2184: return;
2185: }
2186:
2187: // make multi requests based on key list
2188: final MemcachedRequest[] requests = new MemcachedRequest[keyList.size()];
2189: final BufferWrapper.BufferType keyType = !keyList.isEmpty() ? keyList.get(0).getType() : null;
2190: for (int i = 0; i < keyList.size(); i++) {
2191: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(false, true, false);
2192: builder.originKeyType(keyType);
2193: builder.originKey(keyList.get(i).getOrigin());
2194: builder.key(keyList.get(i).getBuffer());
2195: if (i == keyList.size() - 1) {
2196: builder.noReply(false);
2197: builder.op(CommandOpcodes.Gets);
2198: } else {
2199: builder.noReply(true);
2200: builder.op(CommandOpcodes.GetsQ);
2201: builder.opaque(generateOpaque());
2202: }
2203: requests[i] = builder.build();
2204: builder.recycle();
2205: }
2206: sendInternal(address, requests, writeTimeoutInMillis, responseTimeoutInMillis, result);
2207: }
2208:
2209: private void sendSetMulti(final SocketAddress address,
2210: final List<BufferWrapper<K>> keyList,
2211: final Map<K, V> map,
2212: final int expirationInSecs,
2213: final long writeTimeoutInMillis,
2214: final long responseTimeoutInMillis,
2215: final Map<K, Boolean> result) throws ExecutionException, TimeoutException, InterruptedException, PoolExhaustedException, NoValidObjectException {
2216: if (address == null || keyList == null || keyList.isEmpty() || result == null) {
2217: return;
2218: }
2219:
2220: // make multi requests based on key list
2221: final MemcachedRequest[] requests = new MemcachedRequest[keyList.size()];
2222: final BufferWrapper.BufferType keyType = !keyList.isEmpty() ? keyList.get(0).getType() : null;
2223: for (int i = 0; i < keyList.size(); i++) {
2224: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(true, true, true);
2225: if (i == keyList.size() - 1) {
2226: builder.op(CommandOpcodes.Set);
2227: builder.noReply(false);
2228: builder.opaque(0);
2229: } else {
2230: builder.op(CommandOpcodes.SetQ);
2231: builder.noReply(true);
2232: builder.opaque(generateOpaque());
2233: }
2234: final K originKey = keyList.get(i).getOrigin();
2235: builder.originKeyType(keyType);
2236: builder.originKey(originKey);
2237: builder.key(keyList.get(i).getBuffer());
2238: final BufferWrapper valueWrapper = BufferWrapper.wrap(map.get(originKey), transport.getMemoryManager());
2239: builder.value(valueWrapper.getBuffer());
2240: builder.flags(valueWrapper.getType().flags);
2241: valueWrapper.recycle();
2242: builder.expirationInSecs(expirationInSecs);
2243: requests[i] = builder.build();
2244: builder.recycle();
2245: }
2246: sendInternal(address, requests, writeTimeoutInMillis, responseTimeoutInMillis, result);
2247: }
2248:
2249: private void sendCasMulti(final SocketAddress address,
2250: final List<BufferWrapper<K>> keyList,
2251: final Map<K, ValueWithCas<V>> map,
2252: final int expirationInSecs,
2253: final long writeTimeoutInMillis,
2254: final long responseTimeoutInMillis,
2255: final Map<K, Boolean> result) throws ExecutionException, TimeoutException, InterruptedException, PoolExhaustedException, NoValidObjectException {
2256: if (address == null || keyList == null || keyList.isEmpty() || result == null) {
2257: return;
2258: }
2259:
2260: // make multi requests based on key list
2261: final MemcachedRequest[] requests = new MemcachedRequest[keyList.size()];
2262: final BufferWrapper.BufferType keyType = !keyList.isEmpty() ? keyList.get(0).getType() : null;
2263: for (int i = 0; i < keyList.size(); i++) {
2264: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(true, true, true);
2265: if (i == keyList.size() - 1) {
2266: builder.op(CommandOpcodes.Set);
2267: builder.noReply(false);
2268: builder.opaque(0);
2269: } else {
2270: builder.op(CommandOpcodes.SetQ);
2271: builder.noReply(true);
2272: builder.opaque(generateOpaque());
2273: }
2274: final K originKey = keyList.get(i).getOrigin();
2275: builder.originKeyType(keyType);
2276: builder.originKey(originKey);
2277: builder.key(keyList.get(i).getBuffer());
2278: final ValueWithCas<V> vwc = map.get(originKey);
2279: if (vwc != null) {
2280: final BufferWrapper valueWrapper = BufferWrapper.wrap(vwc.getValue(), transport.getMemoryManager());
2281: builder.value(valueWrapper.getBuffer());
2282: builder.flags(valueWrapper.getType().flags);
2283: valueWrapper.recycle();
2284: builder.cas(vwc.getCas());
2285: }
2286: builder.expirationInSecs(expirationInSecs);
2287: requests[i] = builder.build();
2288: builder.recycle();
2289: }
2290: sendInternal(address, requests, writeTimeoutInMillis, responseTimeoutInMillis, result);
2291: }
2292:
2293: private void sendDeleteMulti(final SocketAddress address,
2294: final List<BufferWrapper<K>> keyList,
2295: final long writeTimeoutInMillis,
2296: final long responseTimeoutInMillis,
2297: final Map<K, Boolean> result) throws ExecutionException, TimeoutException, InterruptedException, PoolExhaustedException, NoValidObjectException {
2298: if (address == null || keyList == null || keyList.isEmpty() || result == null) {
2299: return;
2300: }
2301:
2302: // make multi requests based on key list
2303: final MemcachedRequest[] requests = new MemcachedRequest[keyList.size()];
2304: final BufferWrapper.BufferType keyType = !keyList.isEmpty() ? keyList.get(0).getType() : null;
2305: for (int i = 0; i < keyList.size(); i++) {
2306: final MemcachedRequest.Builder builder = MemcachedRequest.Builder.create(false, true, false);
2307: if (i == keyList.size() - 1) {
2308: builder.op(CommandOpcodes.Delete);
2309: builder.noReply(false);
2310: builder.opaque(0);
2311: } else {
2312: builder.op(CommandOpcodes.DeleteQ);
2313: builder.noReply(true);
2314: builder.opaque(generateOpaque());
2315: }
2316: final K originKey = keyList.get(i).getOrigin();
2317: builder.originKeyType(keyType);
2318: builder.originKey(originKey);
2319: builder.key(keyList.get(i).getBuffer());
2320: requests[i] = builder.build();
2321: builder.recycle();
2322: }
2323: sendInternal(address, requests, writeTimeoutInMillis, responseTimeoutInMillis, result);
2324: }
2325:
2326: private Object sendInternal(final SocketAddress address,
2327: final MemcachedRequest[] requests,
2328: final long writeTimeoutInMillis,
2329: final long responseTimeoutInMillis,
2330: final Map<K, ?> result) throws PoolExhaustedException, NoValidObjectException, InterruptedException, TimeoutException, ExecutionException {
2331: if (address == null || requests == null || requests.length == 0) {
2332: return null;
2333: }
2334: if (connectionPool == null) {
2335: throw new IllegalStateException("connection pool must not be null");
2336: }
2337: if (clientFilter == null) {
2338: throw new IllegalStateException("client filter must not be null");
2339: }
2340:
2341: final boolean isMulti = (result != null);
2342: final Connection<SocketAddress> connection;
2343: try {
2344: connection = connectionPool.borrowObject(address, connectTimeoutInMillis);
2345: } catch (PoolExhaustedException pee) {
2346: if (logger.isLoggable(Level.FINER)) {
2347: logger.log(Level.FINER, "failed to get the connection. address=" + address + ", timeout=" + connectTimeoutInMillis + "ms", pee);
2348: }
2349: throw pee;
2350: } catch (NoValidObjectException nvoe) {
2351: if (logger.isLoggable(Level.FINER)) {
2352: logger.log(Level.FINER, "failed to get the connection. address=" + address + ", timeout=" + connectTimeoutInMillis + "ms", nvoe);
2353: }
2354: removeServer(address, false);
2355: throw nvoe;
2356: } catch (TimeoutException te) {
2357: if (logger.isLoggable(Level.FINER)) {
2358: logger.log(Level.FINER, "failed to get the connection. address=" + address + ", timeout=" + connectTimeoutInMillis + "ms", te);
2359: }
2360: throw te;
2361: } catch (InterruptedException ie) {
2362: if (logger.isLoggable(Level.FINER)) {
2363: logger.log(Level.FINER, "failed to get the connection. address=" + address + ", timeout=" + connectTimeoutInMillis + "ms", ie);
2364: }
2365: throw ie;
2366: }
2367:
2368: try {
2369: final GrizzlyFuture<WriteResult<MemcachedRequest[], SocketAddress>> future = connection.write(requests);
2370: if (writeTimeoutInMillis > 0) {
2371: future.get(writeTimeoutInMillis, TimeUnit.MILLISECONDS);
2372: } else {
2373: future.get();
2374: }
2375: } catch (ExecutionException ee) {
2376: // invalid connection
2377: try {
2378: connectionPool.removeObject(address, connection);
2379: } catch (Exception e) {
2380: if (logger.isLoggable(Level.SEVERE)) {
2381: logger.log(Level.SEVERE, "failed to remove the connection. address=" + address + ", connection=" + connection, e);
2382: }
2383: }
2384: throw ee;
2385: } catch (TimeoutException te) {
2386: try {
2387: connectionPool.removeObject(address, connection);
2388: } catch (Exception e) {
2389: if (logger.isLoggable(Level.SEVERE)) {
2390: logger.log(Level.SEVERE, "failed to remove the connection. address=" + address + ", connection=" + connection, e);
2391: }
2392: }
2393: throw te;
2394: } catch (InterruptedException ie) {
2395: try {
2396: connectionPool.removeObject(address, connection);
2397: } catch (Exception e) {
2398: if (logger.isLoggable(Level.SEVERE)) {
2399: logger.log(Level.SEVERE, "failed to remove the connection. address=" + address + ", connection=" + connection, e);
2400: }
2401: }
2402: throw ie;
2403: } catch (Exception unexpected) {
2404: try {
2405: connectionPool.removeObject(address, connection);
2406: } catch (Exception e) {
2407: if (logger.isLoggable(Level.SEVERE)) {
2408: logger.log(Level.SEVERE, "failed to remove the connection. address=" + address + ", connection=" + connection, e);
2409: }
2410: }
2411: throw new ExecutionException(unexpected);
2412: }
2413:
2414: final Object response;
2415: try {
2416: if (!isMulti) {
2417: response = clientFilter.getCorrelatedResponse(connection, requests[0], responseTimeoutInMillis);
2418: } else {
2419: response = clientFilter.getMultiResponse(connection, requests, responseTimeoutInMillis, result);
2420: }
2421:
2422: } catch (TimeoutException te) {
2423: returnConnectionSafely(address, connection);
2424: throw te;
2425: } catch (InterruptedException ie) {
2426: try {
2427: connectionPool.removeObject(address, connection);
2428: } catch (Exception e) {
2429: if (logger.isLoggable(Level.SEVERE)) {
2430: logger.log(Level.SEVERE, "failed to remove the connection. address=" + address + ", connection=" + connection, e);
2431: }
2432: }
2433: throw ie;
2434: } catch (Exception unexpected) {
2435: try {
2436: connectionPool.removeObject(address, connection);
2437: } catch (Exception e) {
2438: if (logger.isLoggable(Level.SEVERE)) {
2439: logger.log(Level.SEVERE, "failed to remove the connection. address=" + address + ", connection=" + connection, e);
2440: }
2441: }
2442: throw new ExecutionException(unexpected);
2443: }
2444: returnConnectionSafely(address, connection);
2445: return response;
2446: }
2447:
2448: private void returnConnectionSafely(final SocketAddress address, final Connection<SocketAddress> connection) {
2449: if (address == null || connection == null) {
2450: return;
2451: }
2452: if (connection.isOpen()) {
2453: try {
2454: connectionPool.returnObject(address, connection);
2455: } catch (Exception e) {
2456: if (logger.isLoggable(Level.SEVERE)) {
2457: logger.log(Level.SEVERE, "failed to return the connection. address=" + address + ", connection=" + connection, e);
2458: }
2459: }
2460: } else {
2461: try {
2462: connectionPool.removeObject(address, connection);
2463: } catch (Exception e) {
2464: if (logger.isLoggable(Level.SEVERE)) {
2465: logger.log(Level.SEVERE, "failed to remove the connection. address=" + address + ", connection=" + connection, e);
2466: }
2467: }
2468: }
2469: }
2470:
2471: private static <K> void recycleBufferWrappers(List<BufferWrapper<K>> bufferWrapperList) {
2472: if (bufferWrapperList == null) {
2473: return;
2474: }
2475: for (BufferWrapper<K> wrapper : bufferWrapperList) {
2476: wrapper.recycle();
2477: }
2478: }
2479:
2480: private static int generateOpaque() {
2481: return opaqueIndex.getAndIncrement() & 0x7fffffff;
2482: }
2483:
2484: private class HealthMonitorTask implements Runnable {
2485:
2486: private final Map<SocketAddress, Boolean> failures = new ConcurrentHashMap<>();
2487: private final Map<SocketAddress, Boolean> revivals = new ConcurrentHashMap<>();
2488: private final AtomicBoolean running = new AtomicBoolean();
2489:
2490: public boolean failure(final SocketAddress address) {
2491: if (address == null) {
2492: return true;
2493: }
2494: if (failures.get(address) == null && revivals.get(address) == null) {
2495: failures.put(address, Boolean.TRUE);
2496: return true;
2497: } else {
2498: return false;
2499: }
2500: }
2501:
2502: @SuppressWarnings("unchecked")
2503: @Override
2504: public void run() {
2505: if (transport == null) {
2506: throw new IllegalStateException("transport must not be null");
2507: }
2508: if (!running.compareAndSet(false, true)) {
2509: return;
2510: }
2511: try {
2512: revivals.clear();
2513: final Set<SocketAddress> failuresSet = failures.keySet();
2514: if (logger.isLoggable(Level.FINE)) {
2515: logger.log(Level.FINE, "try to check the failures in health monitor. failed list hint={0}, interval={1}secs", new Object[]{failuresSet, healthMonitorIntervalInSecs});
2516: } else if (logger.isLoggable(Level.INFO) && !failuresSet.isEmpty()) {
2517: logger.log(Level.INFO, "try to check the failures in health monitor. failed list hint={0}, interval={1}secs", new Object[]{failuresSet, healthMonitorIntervalInSecs});
2518: }
2519: for (SocketAddress failure : failuresSet) {
2520: try {
2521: // get the temporary connection
2522: final ConnectorHandler<SocketAddress> connectorHandler = TCPNIOConnectorHandler.builder(transport).setReuseAddress(true).build();
2523: Future<Connection> future = connectorHandler.connect(failure);
2524: final Connection<SocketAddress> connection;
2525: try {
2526: if (connectTimeoutInMillis < 0) {
2527: connection = future.get();
2528: } else {
2529: connection = future.get(connectTimeoutInMillis, TimeUnit.MILLISECONDS);
2530: }
2531: } catch (InterruptedException ie) {
2532: if (!future.cancel(false) && future.isDone()) {
2533: final Connection c = future.get();
2534: if (c != null && c.isOpen()) {
2535: c.closeSilently();
2536: }
2537: }
2538: if (logger.isLoggable(Level.SEVERE)) {
2539: logger.log(Level.SEVERE, "failed to get the connection in health monitor. address=" + failure, ie);
2540: }
2541: continue;
2542: } catch (ExecutionException ee) {
2543: if (!future.cancel(false) && future.isDone()) {
2544: final Connection c = future.get();
2545: if (c != null && c.isOpen()) {
2546: c.closeSilently();
2547: }
2548: }
2549: if (logger.isLoggable(Level.SEVERE)) {
2550: logger.log(Level.SEVERE, "failed to get the connection in health monitor. address=" + failure, ee);
2551: }
2552: continue;
2553: } catch (TimeoutException te) {
2554: if (!future.cancel(false) && future.isDone()) {
2555: final Connection c = future.get();
2556: if (c != null && c.isOpen()) {
2557: c.closeSilently();
2558: }
2559: }
2560: if (logger.isLoggable(Level.SEVERE)) {
2561: logger.log(Level.SEVERE, "failed to get the connection in health monitor. address=" + failure, te);
2562: }
2563: continue;
2564: }
2565: if (validateConnectionWithVersionCommand(connection)) {
2566: failures.remove(failure);
2567: revivals.put(failure, Boolean.TRUE);
2568: }
2569: if (connection.isOpen()) {
2570: connection.closeSilently();
2571: }
2572: } catch (Throwable t) {
2573: if (logger.isLoggable(Level.SEVERE)) {
2574: logger.log(Level.SEVERE, "unexpected exception thrown", t);
2575: }
2576: }
2577: }
2578: final Set<SocketAddress> revivalsSet = revivals.keySet();
2579: if (logger.isLoggable(Level.FINE)) {
2580: logger.log(Level.FINE, "try to restore revivals in health monitor. revival list hint={0}, interval={1}secs", new Object[]{revivalsSet, healthMonitorIntervalInSecs});
2581: } else if (logger.isLoggable(Level.INFO) && !revivalsSet.isEmpty()) {
2582: logger.log(Level.INFO, "try to restore revivals in health monitor. revival list hint={0}, interval={1}secs", new Object[]{revivalsSet, healthMonitorIntervalInSecs});
2583: }
2584: for (SocketAddress revival : revivalsSet) {
2585: if (!addServer(revival, false)) {
2586: if (logger.isLoggable(Level.WARNING)) {
2587: logger.log(Level.WARNING, "the revival was failed again in health monitor. revival={0}", revival);
2588: }
2589: failures.put(revival, Boolean.TRUE);
2590: }
2591: }
2592: } finally {
2593: running.set(false);
2594: }
2595: }
2596: }
2597:
2598: public static class Builder<K, V> implements CacheBuilder<K, V> {
2599:
2600: private final String cacheName;
2601: private final GrizzlyMemcachedCacheManager manager;
2602: private final TCPNIOTransport transport;
2603: private Set<SocketAddress> servers = Collections.synchronizedSet(new HashSet<SocketAddress>());
2604: private long connectTimeoutInMillis = 5000; // 5secs
2605: private long writeTimeoutInMillis = 5000; // 5secs
2606: private long responseTimeoutInMillis = 10000; // 10secs
2607:
2608: private long healthMonitorIntervalInSecs = 60; // 1 min
2609: private boolean failover = true;
2610: private boolean preferRemoteConfig = false;
2611:
2612: // connection pool config
2613: private int minConnectionPerServer = 5;
2614: private int maxConnectionPerServer = Integer.MAX_VALUE;
2615: private long keepAliveTimeoutInSecs = 30 * 60; // 30 min
2616: private boolean allowDisposableConnection = false;
2617: private boolean borrowValidation = false;
2618: private boolean returnValidation = false;
2619:
2620: private final ZKClient zkClient;
2621:
2622: public Builder(final String cacheName, final GrizzlyMemcachedCacheManager manager, final TCPNIOTransport transport) {
2623: this.cacheName = cacheName;
2624: this.manager = manager;
2625: this.transport = transport;
2626: this.zkClient = manager.getZkClient();
2627: }
2628:
2629: /**
2630: * {@inheritDoc}
2631: */
2632: @Override
2633: public GrizzlyMemcachedCache<K, V> build() {
2634: final GrizzlyMemcachedCache<K, V> cache = new GrizzlyMemcachedCache<K, V>(this);
2635: cache.start();
2636:• if (!manager.addCache(cache)) {
2637: cache.stop();
2638: throw new IllegalStateException("failed to add the cache because the CacheManager already stopped or the same cache name existed");
2639: }
2640: return cache;
2641: }
2642:
2643: /**
2644: * Set global connect-timeout
2645: * <p>
2646: * If the given param is negative, the timeout is infite.
2647: * Default is 5000.
2648: *
2649: * @param connectTimeoutInMillis connect-timeout in milli-seconds
2650: * @return this builder
2651: */
2652: public Builder<K, V> connectTimeoutInMillis(final long connectTimeoutInMillis) {
2653: this.connectTimeoutInMillis = connectTimeoutInMillis;
2654: return this;
2655: }
2656:
2657: /**
2658: * Set global write-timeout
2659: * <p>
2660: * If the given param is negative, the timeout is infite.
2661: * Default is 5000.
2662: *
2663: * @param writeTimeoutInMillis write-timeout in milli-seconds
2664: * @return this builder
2665: */
2666: public Builder<K, V> writeTimeoutInMillis(final long writeTimeoutInMillis) {
2667: this.writeTimeoutInMillis = writeTimeoutInMillis;
2668: return this;
2669: }
2670:
2671: /**
2672: * Set global response-timeout
2673: * <p>
2674: * If the given param is negative, the timeout is infite.
2675: * Default is 10000.
2676: *
2677: * @param responseTimeoutInMillis response-timeout in milli-seconds
2678: * @return this builder
2679: */
2680: public Builder<K, V> responseTimeoutInMillis(final long responseTimeoutInMillis) {
2681: this.responseTimeoutInMillis = responseTimeoutInMillis;
2682: return this;
2683: }
2684:
2685: /**
2686: * Set connection pool's min
2687: * <p>
2688: * Default is 5.
2689: *
2690: * @param minConnectionPerServer connection pool's min
2691: * @return this builder
2692: * @see BaseObjectPool.Builder#min(int)
2693: */
2694: public Builder<K, V> minConnectionPerServer(final int minConnectionPerServer) {
2695: this.minConnectionPerServer = minConnectionPerServer;
2696: return this;
2697: }
2698:
2699: /**
2700: * Set connection pool's max
2701: * <p>
2702: * Default is {@link Integer#MAX_VALUE}
2703: *
2704: * @param maxConnectionPerServer connection pool's max
2705: * @return this builder
2706: * @see BaseObjectPool.Builder#max(int)
2707: */
2708: public Builder<K, V> maxConnectionPerServer(final int maxConnectionPerServer) {
2709: this.maxConnectionPerServer = maxConnectionPerServer;
2710: return this;
2711: }
2712:
2713: /**
2714: * Set connection pool's KeepAliveTimeout
2715: * <p>
2716: * Default is 1800.
2717: *
2718: * @param keepAliveTimeoutInSecs connection pool's KeepAliveTimeout in seconds
2719: * @return this builder
2720: * @see BaseObjectPool.Builder#keepAliveTimeoutInSecs(long)
2721: */
2722: public Builder<K, V> keepAliveTimeoutInSecs(final long keepAliveTimeoutInSecs) {
2723: this.keepAliveTimeoutInSecs = keepAliveTimeoutInSecs;
2724: return this;
2725: }
2726:
2727: /**
2728: * Set health monitor's interval
2729: * <p>
2730: * This cache will schedule HealthMonitorTask with this interval.
2731: * HealthMonitorTask will check the failure servers periodically and detect the revived server.
2732: * If the given parameter is negative, this cache never schedules HealthMonitorTask
2733: * so this behavior is similar to seting {@code failover} to be false.
2734: * Default is 60.
2735: *
2736: * @param healthMonitorIntervalInSecs interval in seconds
2737: * @return this builder
2738: */
2739: public Builder<K, V> healthMonitorIntervalInSecs(final long healthMonitorIntervalInSecs) {
2740: this.healthMonitorIntervalInSecs = healthMonitorIntervalInSecs;
2741: return this;
2742: }
2743:
2744: /**
2745: * Allow or disallow disposable connections
2746: * <p>
2747: * Default is false.
2748: *
2749: * @param allowDisposableConnection true if this cache allows disposable connections
2750: * @return this builder
2751: */
2752: public Builder<K, V> allowDisposableConnection(final boolean allowDisposableConnection) {
2753: this.allowDisposableConnection = allowDisposableConnection;
2754: return this;
2755: }
2756:
2757: /**
2758: * Enable or disable the connection validation when the connection is borrowed from the connection pool
2759: * <p>
2760: * Default is false.
2761: *
2762: * @param borrowValidation true if this cache should make sure the borrowed connection is valid
2763: * @return this builder
2764: */
2765: public Builder<K, V> borrowValidation(final boolean borrowValidation) {
2766: this.borrowValidation = borrowValidation;
2767: return this;
2768: }
2769:
2770: /**
2771: * Enable or disable the connection validation when the connection is returned to the connection pool
2772: * <p>
2773: * Default is false.
2774: *
2775: * @param returnValidation true if this cache should make sure the returned connection is valid
2776: * @return this builder
2777: */
2778: public Builder<K, V> returnValidation(final boolean returnValidation) {
2779: this.returnValidation = returnValidation;
2780: return this;
2781: }
2782:
2783: /**
2784: * Set initial servers
2785: *
2786: * @param servers server set
2787: * @return this builder
2788: */
2789: public Builder<K, V> servers(final Set<SocketAddress> servers) {
2790:• if (servers != null && !servers.isEmpty()) {
2791: this.servers.addAll(servers);
2792: }
2793: return this;
2794: }
2795:
2796: /**
2797: * Enable or disable failover/failback
2798: * <p>
2799: * Default is true.
2800: *
2801: * @param failover true if this cache should support failover/failback when the server is failed or revived
2802: * @return this builder
2803: */
2804: public Builder<K, V> failover(final boolean failover) {
2805: this.failover = failover;
2806: return this;
2807: }
2808:
2809: /**
2810: * @deprecated not supported anymore
2811: */
2812: public Builder<K, V> retryCount(final int retryCount) {
2813: return this;
2814: }
2815:
2816: public Builder<K, V> preferRemoteConfig(final boolean preferRemoteConfig) {
2817: this.preferRemoteConfig = preferRemoteConfig;
2818: return this;
2819: }
2820: }
2821:
2822: @Override
2823: public String toString() {
2824: final StringBuilder sb = new StringBuilder("GrizzlyMemcachedCache{");
2825: sb.append("cacheName='").append(cacheName).append('\'');
2826: sb.append(", transport=").append(transport);
2827: sb.append(", connectTimeoutInMillis=").append(connectTimeoutInMillis);
2828: sb.append(", writeTimeoutInMillis=").append(writeTimeoutInMillis);
2829: sb.append(", responseTimeoutInMillis=").append(responseTimeoutInMillis);
2830: sb.append(", connectionPool=").append(connectionPool);
2831: sb.append(", servers=").append(servers);
2832: sb.append(", healthMonitorIntervalInSecs=").append(healthMonitorIntervalInSecs);
2833: sb.append(", failover=").append(failover);
2834: sb.append(", preferRemoteConfig=").append(preferRemoteConfig);
2835: sb.append(", zkListener=").append(zkListener);
2836: sb.append(", zooKeeperServerListPath='").append(zooKeeperServerListPath).append('\'');
2837: sb.append('}');
2838: return sb.toString();
2839: }
2840: }