Skip to content

Package: PooledMemoryManager

PooledMemoryManager

nameinstructionbranchcomplexitylinemethod
PooledMemoryManager()
M: 11 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
PooledMemoryManager(boolean)
M: 11 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
PooledMemoryManager(int, int, int, int, float, float, boolean)
M: 137 C: 0
0%
M: 26 C: 0
0%
M: 14 C: 0
0%
M: 25 C: 0
0%
M: 1 C: 0
0%
allocate(int)
M: 13 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
allocateAtLeast(int)
M: 26 C: 0
0%
M: 6 C: 0
0%
M: 4 C: 0
0%
M: 5 C: 0
0%
M: 1 C: 0
0%
allocateToCompositeBuffer(CompositeBuffer, int)
M: 64 C: 0
0%
M: 10 C: 0
0%
M: 6 C: 0
0%
M: 13 C: 0
0%
M: 1 C: 0
0%
createJmxManagementObject()
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
fillHighestOneBitRight(int)
M: 32 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 6 C: 0
0%
M: 1 C: 0
0%
getMonitoringConfig()
M: 3 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
getPoolFor(int)
M: 26 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 5 C: 0
0%
M: 1 C: 0
0%
getPools()
M: 8 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
isPowerOfTwo(int)
M: 10 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
newCompositeBuffer()
M: 11 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
reallocate(Buffer, int)
M: 146 C: 0
0%
M: 14 C: 0
0%
M: 8 C: 0
0%
M: 40 C: 0
0%
M: 1 C: 0
0%
release(Buffer)
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 11 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
willAllocateDirect(int)
M: 2 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
wrap(ByteBuffer)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
wrap(String)
M: 6 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
wrap(String, Charset)
M: 6 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
wrap(byte[])
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
wrap(byte[], int, int)
M: 7 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*
2: * Copyright (c) 2013, 2020 Oracle and/or its affiliates. All rights reserved.
3: *
4: * This program and the accompanying materials are made available under the
5: * terms of the Eclipse Public License v. 2.0, which is available at
6: * http://www.eclipse.org/legal/epl-2.0.
7: *
8: * This Source Code may also be made available under the following Secondary
9: * Licenses when the conditions for such availability set forth in the
10: * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
11: * version 2 with the GNU Classpath Exception, which is available at
12: * https://www.gnu.org/software/classpath/license.html.
13: *
14: * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
15: */
16:
17: package org.glassfish.grizzly.memory;
18:
19: import java.nio.ByteBuffer;
20: import java.nio.charset.Charset;
21: import java.util.Arrays;
22: import java.util.concurrent.ThreadLocalRandom;
23: import java.util.concurrent.atomic.AtomicInteger;
24: import java.util.concurrent.atomic.AtomicReferenceArray;
25: import java.util.concurrent.locks.LockSupport;
26:
27: import org.glassfish.grizzly.Buffer;
28: import org.glassfish.grizzly.monitoring.DefaultMonitoringConfig;
29: import org.glassfish.grizzly.monitoring.MonitoringConfig;
30: import org.glassfish.grizzly.monitoring.MonitoringUtils;
31:
32: /**
33: * A {@link MemoryManager} implementation based on a series of shared memory pools. Each pool contains multiple buffers
34: * of the fixed length specific for this pool.
35: *
36: * There are several tuning options for this {@link MemoryManager} implementation.
37: * <ul>
38: * <li>The base size of the buffer for the 1st pool, every next pool n will have buffer size equal to bufferSize(n-1) *
39: * 2^growthFactor</li>
40: * <li>The number of pools, responsible for allocation of buffers of a pool-specific size</li>
41: * <li>The buffer size growth factor, that defines 2^x multiplier, used to calculate buffer size for next allocated
42: * pool</li>
43: * <li>The number of pool slices that every pool will stripe allocation requests across</li>
44: * <li>The percentage of the heap that this manager will use when populating the pools</li>
45: * <li>The percentage of buffers to be pre-allocated during MemoryManager initialization</li>
46: * <li>The flag indicating whether direct or heap based {@link Buffer}s will be allocated</li>
47: * </ul>
48: *
49: * If no explicit configuration is provided, the following defaults will be used:
50: * <ul>
51: * <li>Base buffer size: 4 KiB ({@link #DEFAULT_BASE_BUFFER_SIZE})</li>
52: * <li>Number of pools: 3 ({@link #DEFAULT_NUMBER_OF_POOLS})</li>
53: * <li>Growth factor: 2 ({@link #DEFAULT_GROWTH_FACTOR}), which means the first buffer pool will contains buffer of size
54: * 4 KiB, the seconds one buffer of size 16KiB, the third one buffer of size 64KiB</li>
55: * <li>Number of pool slices: Based on the return value of <code>Runtime.getRuntime().availableProcessors()</code></li>
56: * <li>Percentage of heap: 3% ({@link #DEFAULT_HEAP_USAGE_PERCENTAGE})</li>
57: * <li>Percentage of buffers to be pre-allocated: 100% ({@link #DEFAULT_PREALLOCATED_BUFFERS_PERCENTAGE})</li>
58: * <li>Heap based {@link Buffer}s will be allocated</li>
59: * </ul>
60: *
61: * The main advantage of this manager over {@link org.glassfish.grizzly.memory.HeapMemoryManager} or
62: * {@link org.glassfish.grizzly.memory.ByteBufferManager} is that this implementation doesn't use ThreadLocal pools and
63: * as such, doesn't suffer from the memory fragmentation/reallocation cycle that can impact the ThreadLocal versions.
64: *
65: * @since 2.3.11
66: */
67: public class PooledMemoryManager implements MemoryManager<Buffer>, WrapperAware {
68:
69: public static final int DEFAULT_BASE_BUFFER_SIZE = 4 * 1024;
70: public static final int DEFAULT_NUMBER_OF_POOLS = 3;
71: public static final int DEFAULT_GROWTH_FACTOR = 2;
72:
73: public static final float DEFAULT_HEAP_USAGE_PERCENTAGE = 0.03f;
74: public static final float DEFAULT_PREALLOCATED_BUFFERS_PERCENTAGE = 1.0f;
75:
76: private static final boolean FORCE_BYTE_BUFFER_BASED_BUFFERS = Boolean.getBoolean(PooledMemoryManager.class + ".force-byte-buffer-based-buffers");
77:
78: private static final long BACK_OFF_DELAY = Long.getLong(PooledMemoryManager.class + ".back-off-delay", 0L);
79: /**
80: * Basic monitoring support. Concrete implementations of this class need only to implement the
81: * {@link #createJmxManagementObject()} method to plug into the Grizzly 2.0 JMX framework.
82: */
83: protected final DefaultMonitoringConfig<MemoryProbe> monitoringConfig = new DefaultMonitoringConfig<MemoryProbe>(MemoryProbe.class) {
84:
85: @Override
86: public Object createManagementObject() {
87: return createJmxManagementObject();
88: }
89:
90: };
91:
92: // number of pools with different buffer sizes
93: private final Pool[] pools;
94:
95: // the max buffer size pooled by this memory manager
96: private final int maxPooledBufferSize;
97:
98: // ------------------------------------------------------------ Constructors
99:
100: /**
101: * Creates a new <code>PooledMemoryManager</code> using the following defaults:
102: * <ul>
103: * <li>4 KiB base buffer size</li>
104: * <li>3 pools</li>
105: * <li>2 growth factor, which means 1st pool will contain buffers of size 4KiB, the 2nd - 16KiB, the 3rd - 64KiB</li>
106: * <li>Number of pool slices based on <code>Runtime.getRuntime().availableProcessors()</code></li>
107: * <li>The initial allocation will use 3% of the heap</li>
108: * <li>The percentage of buffers to be pre-allocated during MemoryManager initialization</li>
109: * </ul>
110: */
111: public PooledMemoryManager() {
112: this(DEFAULT_BASE_BUFFER_SIZE, DEFAULT_NUMBER_OF_POOLS, DEFAULT_GROWTH_FACTOR, Runtime.getRuntime().availableProcessors(),
113: DEFAULT_HEAP_USAGE_PERCENTAGE, DEFAULT_PREALLOCATED_BUFFERS_PERCENTAGE, false);
114: }
115:
116: /**
117: * Creates a new <code>PooledMemoryManager</code> using the specified parameters for configuration.
118: *
119: * @param isDirect flag, indicating whether direct or heap based {@link Buffer}s will be allocated
120: */
121: @SuppressWarnings("unused")
122: public PooledMemoryManager(final boolean isDirect) {
123: this(DEFAULT_BASE_BUFFER_SIZE, DEFAULT_NUMBER_OF_POOLS, DEFAULT_GROWTH_FACTOR, Runtime.getRuntime().availableProcessors(),
124: DEFAULT_HEAP_USAGE_PERCENTAGE, DEFAULT_PREALLOCATED_BUFFERS_PERCENTAGE, isDirect);
125: }
126:
127: /**
128: * Creates a new <code>PooledMemoryManager</code> using the specified parameters for configuration.
129: *
130: * @param baseBufferSize the base size of the buffer for the 1st pool, every next pool n will have buffer size equal to
131: * bufferSize(n-1) * 2^growthFactor
132: * @param numberOfPools the number of pools, responsible for allocation of buffers of a pool-specific size
133: * @param growthFactor the buffer size growth factor, that defines 2^x multiplier, used to calculate buffer size for
134: * next allocated pool
135: * @param numberOfPoolSlices the number of pool slices that every pool will stripe allocation requests across
136: * @param percentOfHeap percentage of the heap that will be used when populating the pools
137: * @param percentPreallocated percentage of buffers to be pre-allocated during MemoryManager initialization
138: * @param isDirect flag, indicating whether direct or heap based {@link Buffer}s will be allocated
139: */
140: public PooledMemoryManager(final int baseBufferSize, final int numberOfPools, final int growthFactor, final int numberOfPoolSlices,
141: final float percentOfHeap, final float percentPreallocated, final boolean isDirect) {
142:• if (baseBufferSize <= 0) {
143: throw new IllegalArgumentException("baseBufferSize must be greater than zero");
144: }
145:• if (numberOfPools <= 0) {
146: throw new IllegalArgumentException("numberOfPools must be greater than zero");
147: }
148:• if (growthFactor == 0 && numberOfPools > 1) {
149: throw new IllegalArgumentException("if numberOfPools is greater than 0 - growthFactor must be greater than zero");
150: }
151:• if (growthFactor < 0) {
152: throw new IllegalArgumentException("growthFactor must be greater or equal to zero");
153: }
154:• if (numberOfPoolSlices <= 0) {
155: throw new IllegalArgumentException("numberOfPoolSlices must be greater than zero");
156: }
157:
158:• if (!isPowerOfTwo(baseBufferSize) || !isPowerOfTwo(growthFactor)) {
159: throw new IllegalArgumentException("minBufferSize and growthFactor must be a power of two");
160: }
161:
162:• if (percentOfHeap <= 0.0f || percentOfHeap >= 1.0f) {
163: throw new IllegalArgumentException("percentOfHeap must be greater than zero and less than 1");
164: }
165:
166:• if (percentPreallocated < 0.0f || percentPreallocated > 1.0f) {
167: throw new IllegalArgumentException("percentPreallocated must be greater or equal to zero and less or equal to 1");
168: }
169:
170: final long heapSize = Runtime.getRuntime().maxMemory();
171: final long memoryPerSubPool = (long) (heapSize * percentOfHeap / numberOfPools);
172:
173: pools = new Pool[numberOfPools];
174:• for (int i = 0, bufferSize = baseBufferSize; i < numberOfPools; i++, bufferSize <<= growthFactor) {
175: pools[i] = new Pool(bufferSize, memoryPerSubPool, numberOfPoolSlices, percentPreallocated, isDirect, monitoringConfig);
176: }
177: maxPooledBufferSize = pools[numberOfPools - 1].bufferSize;
178: }
179:
180: // ---------------------------------------------- Methods from MemoryManager
181:
182: /**
183: * For this implementation, this method simply calls through to {@link #allocateAtLeast(int)};
184: */
185: @Override
186: public Buffer allocate(final int size) {
187:• if (size < 0) {
188: throw new IllegalArgumentException("Requested allocation size must be greater than or equal to zero.");
189: }
190: return allocateAtLeast(size).limit(size);
191: }
192:
193: /**
194: * Allocates a buffer of at least the size requested.
195: * <p/>
196: * Keep in mind that the capacity of the buffer may be greater than the allocation request. The limit however, will be
197: * set to the specified size. The memory beyond the limit, is available for use.
198: *
199: * @param size the min {@link Buffer} size to be allocated.
200: * @return a buffer with a limit of the specified <tt>size</tt>.
201: */
202: @Override
203: public Buffer allocateAtLeast(int size) {
204:• if (size < 0) {
205: throw new IllegalArgumentException("Requested allocation size must be greater than or equal to zero.");
206: }
207:
208:• if (size == 0) {
209: return Buffers.EMPTY_BUFFER;
210: }
211:
212:• return size <= maxPooledBufferSize ? getPoolFor(size).allocate() : allocateToCompositeBuffer(newCompositeBuffer(), size);
213: }
214:
215: /**
216: * Reallocates an existing buffer to at least the specified size.
217: *
218: * @param oldBuffer old {@link Buffer} to be reallocated.
219: * @param newSize new {@link Buffer} required size.
220: *
221: * @return potentially a new buffer of at least the specified size.
222: */
223: @Override
224: public Buffer reallocate(final Buffer oldBuffer, final int newSize) {
225:• if (newSize == 0) {
226: oldBuffer.tryDispose();
227: return Buffers.EMPTY_BUFFER;
228: }
229:
230: final int curBufSize = oldBuffer.capacity();
231:
232:• if (oldBuffer instanceof PoolBuffer) {
233:• if (curBufSize >= newSize) {
234: final PoolBuffer oldPoolBuffer = (PoolBuffer) oldBuffer;
235:
236: final Pool newPool = getPoolFor(newSize);
237:• if (newPool != oldPoolBuffer.owner().owner) {
238: final int pos = Math.min(oldPoolBuffer.position(), newSize);
239:
240: final Buffer newPoolBuffer = newPool.allocate();
241: Buffers.setPositionLimit(oldPoolBuffer, 0, newSize);
242: newPoolBuffer.put(oldPoolBuffer);
243: Buffers.setPositionLimit(newPoolBuffer, pos, newSize);
244:
245: oldPoolBuffer.tryDispose();
246:
247: return newPoolBuffer;
248: }
249:
250: return oldPoolBuffer.limit(newSize);
251: } else {
252: final int pos = oldBuffer.position();
253: Buffers.setPositionLimit(oldBuffer, 0, curBufSize);
254:
255:• if (newSize <= maxPooledBufferSize) {
256:
257: final Pool newPool = getPoolFor(newSize);
258:
259: final Buffer newPoolBuffer = newPool.allocate();
260: newPoolBuffer.put(oldBuffer);
261: Buffers.setPositionLimit(newPoolBuffer, pos, newSize);
262:
263: oldBuffer.tryDispose();
264:
265: return newPoolBuffer;
266: } else {
267: final CompositeBuffer cb = newCompositeBuffer();
268: cb.append(oldBuffer);
269: allocateToCompositeBuffer(cb, newSize - curBufSize);
270: Buffers.setPositionLimit(cb, pos, newSize);
271: return cb;
272: }
273: }
274: } else {
275:• assert oldBuffer.isComposite();
276: final CompositeBuffer oldCompositeBuffer = (CompositeBuffer) oldBuffer;
277:• if (curBufSize > newSize) {
278: final int oldPos = oldCompositeBuffer.position();
279: Buffers.setPositionLimit(oldBuffer, newSize, newSize);
280: oldCompositeBuffer.trim();
281: oldCompositeBuffer.position(Math.min(oldPos, newSize));
282:
283: return oldCompositeBuffer;
284: } else {
285: return allocateToCompositeBuffer(oldCompositeBuffer, newSize - curBufSize);
286: }
287: }
288: }
289:
290: /**
291: * {@inheritDoc}
292: */
293: @Override
294: public void release(final Buffer buffer) {
295: buffer.tryDispose();
296: }
297:
298: /**
299: * {@inheritDoc}
300: */
301: @Override
302: public boolean willAllocateDirect(final int size) {
303: return false;
304: }
305:
306: /**
307: * {@inheritDoc}
308: */
309: @Override
310: public MonitoringConfig<MemoryProbe> getMonitoringConfig() {
311: return monitoringConfig;
312: }
313:
314: // ----------------------------------------------- Methods from WrapperAware
315:
316: @Override
317: public Buffer wrap(final byte[] data) {
318: return wrap(ByteBuffer.wrap(data));
319: }
320:
321: @Override
322: public Buffer wrap(byte[] data, int offset, int length) {
323: return wrap(ByteBuffer.wrap(data, offset, length));
324: }
325:
326: @Override
327: public Buffer wrap(final String s) {
328: return wrap(s.getBytes(Charset.defaultCharset()));
329: }
330:
331: @Override
332: public Buffer wrap(final String s, final Charset charset) {
333: return wrap(s.getBytes(charset));
334: }
335:
336: @Override
337: public Buffer wrap(final ByteBuffer byteBuffer) {
338: return new ByteBufferWrapper(byteBuffer);
339: }
340:
341: // ------------------------------------------------------- Protected Methods
342:
343: protected Object createJmxManagementObject() {
344:
345: return MonitoringUtils.loadJmxObject("org.glassfish.grizzly.memory.jmx.PooledMemoryManager", this, PooledMemoryManager.class);
346: }
347:
348: Pool[] getPools() {
349: return Arrays.copyOf(pools, pools.length);
350: }
351:
352: // --------------------------------------------------------- Private Methods
353:
354: private Pool getPoolFor(final int size) {
355:• for (int i = 0; i < pools.length; i++) {
356: final Pool pool = pools[i];
357:• if (pool.bufferSize >= size) {
358: return pool;
359: }
360: }
361:
362: throw new IllegalStateException("There is no pool big enough to allocate " + size + " bytes");
363: }
364:
365: private CompositeBuffer allocateToCompositeBuffer(final CompositeBuffer cb, int size) {
366:
367:• assert size >= 0;
368:
369:• if (size >= maxPooledBufferSize) {
370: final Pool maxBufferSizePool = pools[pools.length - 1];
371:
372: do {
373: cb.append(maxBufferSizePool.allocate());
374: size -= maxPooledBufferSize;
375:• } while (size >= maxPooledBufferSize);
376: }
377:
378:• for (int i = 0; i < pools.length; i++) {
379: final Pool pool = pools[i];
380:• if (pool.bufferSize >= size) {
381: final Buffer b = pool.allocate();
382: cb.append(b.limit(size));
383: break;
384: }
385: }
386:
387: return cb;
388: }
389:
390: private CompositeBuffer newCompositeBuffer() {
391: final CompositeBuffer cb = CompositeBuffer.newBuffer(this);
392: cb.allowInternalBuffersDispose(true);
393: cb.allowBufferDispose(true);
394: return cb;
395: }
396:
397: private static boolean isPowerOfTwo(final int valueToCheck) {
398:• return (valueToCheck & valueToCheck - 1) == 0;
399: }
400:
401: /*
402: * Propagates right-most one bit to the right. Each shift right will set all of the bits between the original and new
403: * position to one.
404: *
405: * Ex. If the value is 16, i.e.: 0x0000 0000 0000 0000 0000 0000 0001 0000 the result of this call will be: 0x0000 0000
406: * 0000 0000 0000 0000 0001 1111 or 31.
407: *
408: * In our case, we're using the result of this method as a mask.
409: *
410: * Part of this algorithm came from HD Figure 15-5.
411: */
412: private static int fillHighestOneBitRight(int value) {
413: value |= value >> 1;
414: value |= value >> 2;
415: value |= value >> 4;
416: value |= value >> 8;
417: value |= value >> 16;
418: return value;
419: }
420:
421: static final class Pool {
422: private final PoolSlice[] slices;
423: private final int bufferSize;
424:
425: public Pool(final int bufferSize, final long memoryPerSubPool, final int numberOfPoolSlices, final float percentPreallocated, final boolean isDirect,
426: final DefaultMonitoringConfig<MemoryProbe> monitoringConfig) {
427: this.bufferSize = bufferSize;
428: slices = new PoolSlice[numberOfPoolSlices];
429: final long memoryPerSlice = memoryPerSubPool / numberOfPoolSlices;
430:
431: for (int i = 0; i < numberOfPoolSlices; i++) {
432: slices[i] = new PoolSlice(this, memoryPerSlice, bufferSize, percentPreallocated, isDirect, monitoringConfig);
433: }
434: }
435:
436: public int elementsCount() {
437: int sum = 0;
438: for (int i = 0; i < slices.length; i++) {
439: sum += slices[i].elementsCount();
440: }
441:
442: return sum;
443: }
444:
445: public long size() {
446: return (long) elementsCount() * (long) bufferSize;
447: }
448:
449: public int getBufferSize() {
450: return bufferSize;
451: }
452:
453: public PoolSlice[] getSlices() {
454: return Arrays.copyOf(slices, slices.length);
455: }
456:
457: public Buffer allocate() {
458: final PoolSlice slice = getSlice();
459: PoolBuffer b = slice.poll();
460: if (b == null) {
461: b = slice.allocate();
462: }
463:
464: return b.prepare();
465: }
466:
467: @Override
468: public String toString() {
469: final StringBuilder sb = new StringBuilder(
470: "Pool[" + Integer.toHexString(hashCode()) + "] {" + "buffer size=" + bufferSize + ", slices count=" + slices.length);
471:
472: for (int i = 0; i < slices.length; i++) {
473: if (i == 0) {
474: sb.append("\n");
475: }
476:
477: sb.append("\t[").append(i).append("] ").append(slices[i].toString()).append('\n');
478: }
479:
480: sb.append('}');
481: return sb.toString();
482: }
483:
484: @SuppressWarnings("unchecked")
485: private PoolSlice getSlice() {
486: return slices[ThreadLocalRandom.current().nextInt(slices.length)];
487: }
488: }
489:
490: /*
491: * This array backed by this pool can only support 2^30-1 elements instead of the usual 2^32-1. This is because we use
492: * bit 30 to store information about the read and write pointer 'wrapping' status. Without these bits, it's difficult to
493: * tell if the array is full or empty when both read and write pointers are equal. The pool is considered full when read
494: * and write pointers refer to the same index and the aforementioned bits are equal. The same logic is applied to
495: * determine if the pool is empty, except the bits are not equal.
496: */
497: static final class PoolSlice {
498:
499: // Stride is calculate as 2^LOG2_STRIDE
500: private static final int LOG2_STRIDE = 4;
501:
502: // Array index stride.
503: private static final int STRIDE = 1 << LOG2_STRIDE;
504:
505: // Apply this mask to obtain the first 30 bits of an integer
506: // less the bits for wrap and offset.
507: private static final int MASK = 0x3FFFFFFF;
508:
509: // Apply this mask to get/set the wrap status bit.
510: private static final int WRAP_BIT_MASK = 0x40000000;
511:
512: // Using an AtomicReferenceArray to ensure proper visibility of items
513: // within the pool which will be shared across threads.
514: private final PaddedAtomicReferenceArray<PoolBuffer> pool1, pool2;
515:
516: // Maintain two different pointers for reading/writing to reduce
517: // contention.
518: private final PaddedAtomicInteger pollIdx;
519: private final PaddedAtomicInteger offerIdx;
520:
521: // The Pool this slice belongs to
522: private final Pool owner;
523:
524: // The max size of the pool.
525: private final int maxPoolSize;
526:
527: // Strides in pool
528: private final int stridesInPool;
529:
530: // individual buffer size.
531: private final int bufferSize;
532:
533: // flag, indicating if heap or direct Buffers will be allocated
534: private final boolean isDirect;
535:
536: // MemoryProbe configuration.
537: private final DefaultMonitoringConfig<MemoryProbe> monitoringConfig;
538:
539: // -------------------------------------------------------- Constructors
540:
541: PoolSlice(final Pool owner, final long totalPoolSize, final int bufferSize, final float percentPreallocated, final boolean isDirect,
542: final DefaultMonitoringConfig<MemoryProbe> monitoringConfig) {
543:
544: this.owner = owner;
545: this.bufferSize = bufferSize;
546: this.isDirect = isDirect;
547: this.monitoringConfig = monitoringConfig;
548: int initialSize = (int) (totalPoolSize / bufferSize);
549:
550: // Round up to the nearest multiple of 16 (STRIDE). This is
551: // done as elements will be accessed at (offset + index + STRIDE).
552: // Offset is calculated each time we overflow the array.
553: // This access scheme should help us avoid false sharing.
554: maxPoolSize = initialSize + STRIDE - 1 & ~(STRIDE - 1);
555: stridesInPool = maxPoolSize >> LOG2_STRIDE; // maxPoolSize / STRIDE
556:
557: // poolSize must be less than or equal to 2^30 - 1.
558: if (maxPoolSize >= WRAP_BIT_MASK) {
559: throw new IllegalStateException("Cannot manage a pool larger than 2^30-1");
560: }
561:
562: pool1 = new PaddedAtomicReferenceArray<>(maxPoolSize);
563:
564: final int preallocatedBufs = Math.min(maxPoolSize, (int) (percentPreallocated * maxPoolSize));
565:
566: int idx = 0;
567:
568: for (int i = 0; i < preallocatedBufs; i++, idx = nextIndex(idx)) {
569: pool1.lazySet(idx, allocate().free(true));
570: }
571: pool2 = new PaddedAtomicReferenceArray<>(maxPoolSize);
572:
573: pollIdx = new PaddedAtomicInteger(0);
574: offerIdx = new PaddedAtomicInteger(idx);
575: }
576:
577: // ------------------------------------------------------ Public Methods
578:
579: public PoolBuffer poll() {
580: int pollIdx;
581: for (;;) {
582: pollIdx = this.pollIdx.get();
583: final int offerIdx = this.offerIdx.get();
584:
585: // weak isEmpty check, might return false positives
586: if (isEmpty(pollIdx, offerIdx)) {
587: return null;
588: }
589:
590: final int nextPollIdx = nextIndex(pollIdx);
591: if (this.pollIdx.compareAndSet(pollIdx, nextPollIdx)) {
592: break;
593: }
594:
595: LockSupport.parkNanos(BACK_OFF_DELAY);
596: }
597:
598: final int unmaskedPollIdx = unmask(pollIdx);
599: final AtomicReferenceArray<PoolBuffer> pool = pool(pollIdx);
600: for (;;) {
601: // unmask the current read value to the actual array index.
602: final PoolBuffer pb = pool.getAndSet(unmaskedPollIdx, null);
603: if (pb != null) {
604: ProbeNotifier.notifyBufferAllocatedFromPool(monitoringConfig, bufferSize);
605: return pb;
606: }
607:
608: // give offer at this index time to complete...
609: Thread.yield();
610: }
611: }
612:
613: public boolean offer(final PoolBuffer b) {
614: int offerIdx;
615: for (;;) {
616: offerIdx = this.offerIdx.get();
617: final int pollIdx = this.pollIdx.get();
618:
619: // weak isFull check, might return false positives
620: if (isFull(pollIdx, offerIdx)) {
621: return false;
622: }
623: final int nextOfferIndex = nextIndex(offerIdx);
624: if (this.offerIdx.compareAndSet(offerIdx, nextOfferIndex)) {
625: break;
626: }
627:
628: LockSupport.parkNanos(BACK_OFF_DELAY);
629: }
630:
631: final int unmaskedOfferIdx = unmask(offerIdx);
632: final AtomicReferenceArray<PoolBuffer> pool = pool(offerIdx);
633: for (;;) {
634: // unmask the current write value to the actual array index.
635: if (pool.compareAndSet(unmaskedOfferIdx, null, b)) {
636: ProbeNotifier.notifyBufferReleasedToPool(monitoringConfig, bufferSize);
637:
638: return true;
639: }
640: // give poll at this index time to complete...
641: Thread.yield();
642: }
643: }
644:
645: public int elementsCount() {
646: return elementsCount(pollIdx.get(), offerIdx.get());
647: }
648:
649: /*
650: * There are two cases to consider. 1) When both indexes are on the same array. 2) When index are on different arrays
651: * (i.e., the wrap bit is set on the index value).
652: *
653: * When both indexes are on the same array, then to calculate the number of elements, we have to 'de-virtualize' the
654: * indexes and simply subtract the result.
655: *
656: * When the indexes are on different arrays, the result of subtracting the 'de-virtualized' indexes will be negative. We
657: * then have to add the result of and-ing the maxPoolSize with a mask consisting of the first 31 bits being all ones.
658: */
659: private int elementsCount(final int ridx, final int widx) {
660: return unstride(unmask(widx)) - unstride(unmask(ridx)) + (maxPoolSize & fillHighestOneBitRight((ridx ^ widx) & WRAP_BIT_MASK));
661: }
662:
663: /**
664: * @return the max number of {@link Buffer}s, that could be pooled in this <tt>PoolSlice</tt>
665: */
666: public int getMaxElementsCount() {
667: return maxPoolSize;
668: }
669:
670: public long size() {
671: return (long) elementsCount() * (long) bufferSize;
672: }
673:
674: public void clear() {
675: // noinspection StatementWithEmptyBody
676: while (poll() != null) {
677: ;
678: }
679: }
680:
681: public PoolBuffer allocate() {
682: final PoolBuffer buffer = isDirect || FORCE_BYTE_BUFFER_BASED_BUFFERS ?
683:
684: // if isDirect || FORCE_BYTE_BUFFER - allocate ByteBufferWrapper
685: new PoolByteBufferWrapper(isDirect ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize), this) :
686:
687: // otherwise use HeapBuffer
688: new PoolHeapBuffer(new byte[bufferSize], this);
689:
690: ProbeNotifier.notifyBufferAllocated(monitoringConfig, bufferSize);
691: return buffer;
692: }
693:
694: // ----------------------------------------------------- Private Methods
695:
696: private static boolean isFull(final int pollIdx, final int offerIdx) {
697: return (pollIdx ^ offerIdx) == WRAP_BIT_MASK;
698: }
699:
700: private static boolean isEmpty(final int pollIdx, final int offerIdx) {
701: return pollIdx == offerIdx;
702: }
703:
704: private AtomicReferenceArray<PoolBuffer> pool(final int idx) {
705: return (idx & WRAP_BIT_MASK) == 0 ? pool1 : pool2;
706: }
707:
708: private int nextIndex(final int currentIdx) {
709: final int arrayIndex = unmask(currentIdx);
710: if (arrayIndex + STRIDE < maxPoolSize) {
711: // add stride and return
712: return currentIdx + STRIDE;
713: } else {
714: final int offset = arrayIndex - maxPoolSize + STRIDE + 1;
715:
716: return offset == STRIDE ?
717: // we reached the end on the current array,
718: // set lower 30 bits to zero and flip the wrap bit.
719: WRAP_BIT_MASK ^ currentIdx & WRAP_BIT_MASK :
720: // otherwise we stay on the same array, just flip the index
721: // considering the current offset
722: offset | currentIdx & WRAP_BIT_MASK;
723: }
724: }
725:
726: /*
727: * Return lower 30 bits, i.e., the actual array index.
728: */
729: private static int unmask(final int val) {
730: return val & MASK;
731: }
732:
733: /*
734: * Return only the wrapping bit.
735: */
736: private static int getWrappingBit(final int val) {
737: return val & WRAP_BIT_MASK;
738: }
739:
740: /*
741: * Calculate the index value without stride and offset.
742: */
743: private int unstride(final int idx) {
744: return (idx >> LOG2_STRIDE) + (idx & STRIDE - 1) * stridesInPool;
745: }
746:
747: @Override
748: public String toString() {
749: return toString(pollIdx.get(), offerIdx.get());
750: }
751:
752: private String toString(final int ridx, final int widx) {
753: return "BufferSlice[" + Integer.toHexString(hashCode()) + "] {" + "buffer size=" + bufferSize + ", elements in pool=" + elementsCount(ridx, widx)
754: + ", poll index=" + unmask(ridx) + ", poll wrap bit=" + (fillHighestOneBitRight(getWrappingBit(ridx)) & 1) + ", offer index=" + unmask(widx)
755: + ", offer wrap bit=" + (fillHighestOneBitRight(getWrappingBit(widx)) & 1) + ", maxPoolSize=" + maxPoolSize + '}';
756: }
757:
758: /*
759: * We pad the default AtomicInteger implementation as the offer/poll pointers will be highly contended. The padding
760: * ensures that each AtomicInteger is within it's own cacheline thus reducing false sharing.
761: */
762: @SuppressWarnings("UnusedDeclaration")
763: static final class PaddedAtomicInteger extends AtomicInteger {
764: private long p0, p1, p2, p3, p4, p5, p6, p7 = 7L;
765:
766: PaddedAtomicInteger(int initialValue) {
767: super(initialValue);
768: }
769:
770: } // END PaddedAtomicInteger
771:
772: /*
773: * Padded in order to avoid false sharing when the arrays used by AtomicReferenceArray are laid out end-to-end (pointer
774: * in array one is at end of the array and pointer two in array two is at the beginning - both elements could be loaded
775: * onto the same cacheline).
776: */
777: @SuppressWarnings("UnusedDeclaration")
778: static final class PaddedAtomicReferenceArray<E> extends AtomicReferenceArray<E> {
779: private long p0, p1, p2, p3, p4, p5, p6, p7 = 7L;
780:
781: PaddedAtomicReferenceArray(int length) {
782: super(length);
783: }
784:
785: } // END PaddedAtomicReferenceArray
786:
787: } // END BufferPool
788:
789: interface PoolBuffer extends Buffer {
790: PoolBuffer prepare();
791:
792: boolean free();
793:
794: PoolBuffer free(boolean free);
795:
796: PoolSlice owner();
797: }
798:
799: private static final class PoolHeapBuffer extends HeapBuffer implements PoolBuffer {
800:
801: // The pool slice to which this Buffer instance will be returned.
802: private final PoolSlice owner;
803:
804: // When this Buffer instance resides in the pool, this flag will
805: // be true.
806: boolean free;
807:
808: // represents the number of 'child' buffers that have been created using
809: // this as the foundation. This source buffer can't be returned
810: // to the pool unless this value is zero.
811: protected final AtomicInteger shareCount;
812:
813: // represents the original buffer from the pool. This value will be
814: // non-null in any 'child' buffers created from the original.
815: protected final PoolHeapBuffer source;
816:
817: // ------------------------------------------------------------ Constructors
818:
819: /**
820: * Creates a new PoolBuffer instance wrapping the specified {@link java.nio.ByteBuffer}.
821: *
822: * @param heap the {@code byte[]} instance to wrap.
823: * @param owner the {@link org.glassfish.grizzly.memory.PooledMemoryManager.PoolSlice} that owns this
824: * <tt>PoolBuffer</tt> instance.
825: */
826: private PoolHeapBuffer(final byte[] heap, final PoolSlice owner) {
827: this(heap, 0, heap.length, owner, null, new AtomicInteger());
828: }
829:
830: /**
831: * Creates a new PoolBuffer instance wrapping the specified {@link java.nio.ByteBuffer}.
832: *
833: * @param heap the {@code byte[]} instance to wrap.
834: * @param owner the {@link org.glassfish.grizzly.memory.PooledMemoryManager.PoolSlice} that owns this
835: * <tt>PoolBuffer</tt> instance. May be <tt>null</tt>.
836: * @param source the <tt>PoolBuffer</tt> that is the 'parent' of this new buffer instance. May be <tt>null</tt>.
837: * @param shareCount shared reference to an {@link java.util.concurrent.atomic.AtomicInteger} that enables shared buffer
838: * book-keeping.
839: *
840: * @throws IllegalArgumentException if <tt>underlyingByteBuffer</tt> or <tt>shareCount</tt> are <tt>null</tt>.
841: */
842: private PoolHeapBuffer(final byte[] heap, final int offs, final int cap, final PoolSlice owner, final PoolHeapBuffer source,
843: final AtomicInteger shareCount) {
844: super(heap, offs, cap);
845: if (heap == null) {
846: throw new IllegalArgumentException("heap cannot be null.");
847: }
848: if (shareCount == null) {
849: throw new IllegalArgumentException("shareCount cannot be null");
850: }
851:
852: this.owner = owner;
853: this.shareCount = shareCount;
854: this.source = source != null ? source : this;
855: }
856:
857: @Override
858: public PoolBuffer prepare() {
859: allowBufferDispose = true;
860: free = false;
861:
862: return this;
863: }
864:
865: @Override
866: public PoolSlice owner() {
867: return owner;
868: }
869:
870: @Override
871: public boolean free() {
872: return free;
873: }
874:
875: @Override
876: public PoolBuffer free(final boolean free) {
877: this.free = free;
878: return this;
879: }
880:
881: // ------------------------------------------ Methods from HeapBuffer
882:
883: @Override
884: public HeapBuffer asReadOnlyBuffer() {
885: final HeapBuffer b = asReadOnlyBuffer(offset, cap);
886:
887: b.pos = pos;
888: b.lim = lim;
889: return b;
890: }
891:
892: private HeapBuffer asReadOnlyBuffer(final int offset, final int cap) {
893: checkDispose();
894:
895: onShareHeap();
896: final HeapBuffer b = new ReadOnlyHeapBuffer(heap, offset, cap) {
897:
898: @Override
899: public void dispose() {
900: super.dispose();
901: PoolHeapBuffer.this.dispose0();
902: }
903:
904: @Override
905: protected void onShareHeap() {
906: PoolHeapBuffer.this.onShareHeap();
907: }
908:
909: @Override
910: protected HeapBuffer createHeapBuffer(final int offset, final int capacity) {
911: return PoolHeapBuffer.this.asReadOnlyBuffer(offset, capacity);
912: }
913: };
914:
915: b.allowBufferDispose(true);
916:
917: return b;
918: }
919:
920: @Override
921: public void dispose() {
922: if (free) {
923: return;
924: }
925: free = true;
926:
927: dispose0();
928: }
929:
930: private void dispose0() {
931: // check shared counter optimistically
932: boolean isNotShared = shareCount.get() == 0;
933: if (!isNotShared) {
934: // try pessimistic check using CAS loop
935: isNotShared = shareCount.getAndDecrement() == 0;
936: if (isNotShared) {
937: // if the former check is true - the shared counter is negative,
938: // so we have to reset it
939: shareCount.set(0);
940: }
941: }
942:
943: if (isNotShared) {
944: // we can now safely return source back to the queue
945: source.returnToPool();
946: }
947: }
948:
949: private void returnToPool() {
950: // restore capacity
951: cap = heap.length;
952: // clear
953: clear();
954:
955: owner.offer(this);
956: }
957:
958: // ----------------------------------------------------- Protected Methods
959:
960: /**
961: * Override the default implementation to check the <tt>free</tt> status of this buffer (i.e., once released, operations
962: * on the buffer will no longer succeed).
963: */
964: @Override
965: protected void checkDispose() {
966: if (free) {
967: throw new IllegalStateException("PoolBuffer has already been disposed", disposeStackTrace);
968: }
969: }
970:
971: /**
972: * Create a new {@link HeapBuffer} based on the current heap.
973: *
974: * @param offs relative offset, the absolute value will calculated as (this.offset + offs)
975: * @param capacity the capacity of this {@link HeapBuffer}.
976: *
977: * @return a new {@link HeapBuffer} based on the the method arguments.
978: */
979: @Override
980: protected HeapBuffer createHeapBuffer(final int offs, final int capacity) {
981: onShareHeap();
982:
983: final PoolHeapBuffer b = new PoolHeapBuffer(heap, offs + offset, capacity, null, // don't keep track of the owner for child buffers
984: source, // pass the 'parent' buffer along
985: shareCount); // pass the shareCount
986: b.allowBufferDispose(true);
987:
988: return b;
989: }
990:
991: @Override
992: protected void onShareHeap() {
993: super.onShareHeap();
994:
995: shareCount.incrementAndGet();
996: }
997: } // END PoolBuffer
998:
999: private static final class PoolByteBufferWrapper extends ByteBufferWrapper implements PoolBuffer {
1000:
1001: // The pool slice to which this Buffer instance will be returned.
1002: private final PoolSlice owner;
1003:
1004: // When this Buffer instance resides in the pool, this flag will
1005: // be true.
1006: boolean free;
1007:
1008: // represents the number of 'child' buffers that have been created using
1009: // this as the foundation. This source buffer can't be returned
1010: // to the pool unless this value is zero.
1011: protected final AtomicInteger shareCount;
1012:
1013: // represents the original buffer from the pool. This value will be
1014: // non-null in any 'child' buffers created from the original.
1015: protected final PoolByteBufferWrapper source;
1016:
1017: // Used for the special case of the split() method. This maintains
1018: // the original wrapper from the pool which must ultimately be returned.
1019: private final ByteBuffer origVisible;
1020:
1021: // ------------------------------------------------------------ Constructors
1022:
1023: /**
1024: * Creates a new PoolBuffer instance wrapping the specified {@link java.nio.ByteBuffer}.
1025: *
1026: * @param underlyingByteBuffer the {@link java.nio.ByteBuffer} instance to wrap.
1027: * @param owner the {@link org.glassfish.grizzly.memory.PooledMemoryManager.PoolSlice} that owns this
1028: * <tt>PoolBuffer</tt> instance.
1029: */
1030: private PoolByteBufferWrapper(final ByteBuffer underlyingByteBuffer, final PoolSlice owner) {
1031: this(underlyingByteBuffer, owner, null, new AtomicInteger());
1032: }
1033:
1034: /**
1035: * Creates a new PoolBuffer instance wrapping the specified {@link java.nio.ByteBuffer}.
1036: *
1037: * @param underlyingByteBuffer the {@link java.nio.ByteBuffer} instance to wrap.
1038: * @param owner the {@link org.glassfish.grizzly.memory.PooledMemoryManager.PoolSlice} that owns this
1039: * <tt>PoolBuffer</tt> instance. May be <tt>null</tt>.
1040: * @param source the <tt>PoolBuffer</tt> that is the 'parent' of this new buffer instance. May be <tt>null</tt>.
1041: * @param shareCount shared reference to an {@link java.util.concurrent.atomic.AtomicInteger} that enables shared buffer
1042: * book-keeping.
1043: *
1044: * @throws IllegalArgumentException if <tt>underlyingByteBuffer</tt> or <tt>shareCount</tt> are <tt>null</tt>.
1045: */
1046: private PoolByteBufferWrapper(final ByteBuffer underlyingByteBuffer, final PoolSlice owner, final PoolByteBufferWrapper source,
1047: final AtomicInteger shareCount) {
1048: super(underlyingByteBuffer);
1049: if (underlyingByteBuffer == null) {
1050: throw new IllegalArgumentException("underlyingByteBuffer cannot be null.");
1051: }
1052: if (shareCount == null) {
1053: throw new IllegalArgumentException("shareCount cannot be null");
1054: }
1055:
1056: this.owner = owner;
1057: this.shareCount = shareCount;
1058: this.source = source != null ? source : this;
1059:
1060: this.origVisible = this.source.visible;
1061: }
1062:
1063: @Override
1064: public PoolBuffer prepare() {
1065: allowBufferDispose = true;
1066: free = false;
1067: return this;
1068: }
1069:
1070: @Override
1071: public PoolSlice owner() {
1072: return owner;
1073: }
1074:
1075: @Override
1076: public boolean free() {
1077: return free;
1078: }
1079:
1080: @Override
1081: public PoolBuffer free(final boolean free) {
1082: this.free = free;
1083: return this;
1084: }
1085:
1086: // ------------------------------------------ Methods from ByteBufferWrapper
1087:
1088: @Override
1089: public void dispose() {
1090: if (free) {
1091: return;
1092: }
1093: free = true;
1094:
1095: dispose0();
1096: }
1097:
1098: private void dispose0() {
1099: // check shared counter optimistically
1100: boolean isNotShared = shareCount.get() == 0;
1101: if (!isNotShared) {
1102: // try pessimistic check using CAS loop
1103: isNotShared = shareCount.getAndDecrement() == 0;
1104: if (isNotShared) {
1105: // if the former check is true - the shared counter is negative,
1106: // so we have to reset it
1107: shareCount.set(0);
1108: }
1109: }
1110:
1111: if (isNotShared) {
1112: // we can now safely return source back to the queue
1113: source.returnToPool();
1114: }
1115: }
1116:
1117: // ----------------------------------------------------- Protected Methods
1118:
1119: @Override
1120: protected ByteBufferWrapper wrapByteBuffer(final ByteBuffer buffer) {
1121: final PoolByteBufferWrapper b = new PoolByteBufferWrapper(buffer, null, // don't keep track of the owner for child buffers
1122: source, // pass the 'parent' buffer along
1123: shareCount); // pass the shareCount
1124: b.allowBufferDispose(true);
1125: shareCount.incrementAndGet();
1126:
1127: return b;
1128: }
1129:
1130: /**
1131: * Override the default implementation to check the <tt>free</tt> status of this buffer (i.e., once released, operations
1132: * on the buffer will no longer succeed).
1133: */
1134: @Override
1135: protected void checkDispose() {
1136: if (free) {
1137: throw new IllegalStateException("PoolBuffer has already been disposed", disposeStackTrace);
1138: }
1139: }
1140:
1141: // ----------------------------------------------------- Private Methods
1142:
1143: private void returnToPool() {
1144: // should be called on "source" only
1145: visible = origVisible;
1146: visible.clear();
1147: owner.offer(this);
1148: }
1149: } // END PoolBuffer
1150: }