Netty内存管理
在Netty中,ByteBuf随处可见,作为读写数据的字节容器,ByteBuf的内存分配十分复杂且值得仔细研究。本文从ByteBuf和ByteBufAllocator两大抽象开始,详细分析Netty内存分配与回收的细节。
本文内容:
- 内存与内存分配器的抽象
- 不同规格大小和不同类别的内存的分配策略
- 内存的回收过程
一、ByteBuf
1.结构
上图显示了ByteBuf的结构,主要由废弃字节、可读字节、可写字节三部分组成,使用readerIndex
与writerIndex
分隔,三部分加起来称为容量capacity。readerIndex
表示可读字节的起始位置,writerIndex
表示可写字节的起始位置。
2.重要API
public abstract byte readByte(); // 读取一个字节,readerIndex右移一位
public abstract ByteBuf writeByte(int value);// 写出一个字节,writerIndex右移一位
public abstract ByteBuf setByte(int index, int value);// 设置数据,readerIndex、writerIndex不变
public abstract ByteBuf markReaderIndex(); // 标记读写指针,可以重置
public abstract ByteBuf resetReaderIndex();
public abstract ByteBuf markWriterIndex();
public abstract ByteBuf resetWriterIndex();
API详细介绍见闪电侠文章数据传输载体 ByteBuf 介绍
3.分类
从上图ByteBuf的类继承体系可以看出,ByteBuf可以从三个角度进行分类:
-
Heap和Direct
表示ByteBuf底层使用的是堆内存字节数组,或者堆外内存DirectByteBuffer。
-
Pooled和Unpooled
Pooled表示池化的内存,即分配内存时可以从预先分配好的内存中取出一块内存;Unpooled表示非池化的内存,即直接调用API向操作系统申请一块内存。
-
Unsafe和非Unsafe
表示ByteBuf读写数据时是否依赖JDK Unsafe对象,Unsafe表示依赖。
一般情况下,用户在分配ByteBuf内存时只需根据Heap和Direct、Pooled和Unpooled两个角度进行内存分配,Unsafe和非Unsafe是由Netty根据运行环境自动识别的。
4.抽象实现AbstractByteBuf
AbstractByteBuf是ByteBuf的抽象实现,实现了ByteBuf公共的方法,如readableBytes()、writableBytes()等。
public int readableBytes() {
return writerIndex - readerIndex;
}
public int writableBytes() {
return capacity() - writerIndex;
}
对于读写数据的方法,如readByte()、writeByte(),实现了基本逻辑,并提供抽象方法由不同子类去具体实现:
public byte readByte() {
checkReadableBytes0(1);
int i = readerIndex;
byte b = _getByte(i);
readerIndex = i + 1;
return b;
}
public ByteBuf writeByte(int value) {
ensureAccessible();
ensureWritable0(1); // 确保写空间足够,必要时扩容
_setByte(writerIndex++, value);
return this;
}
对于readByte()、writeByte()方法来说,提供了_getByte()、_setByte()
两个抽象方法:
protected abstract byte _getByte(int index);
protected abstract void _setByte(int index, int value);
其他的读写数据方法类似的,也提供了抽象方法让子类去实现:
二、ByteBufAllocator
1.功能
ByteBufAllocator作为内存分配器的抽象,具有如下的功能:
// 1.分配ByteBuf,是否是堆外内存依赖于具体的实现
ByteBuf buffer(int initialCapacity, int maxCapacity);
// 2.分配适合于IO的ByteBuf,期待是堆外内存
ByteBuf ioBuffer(int initialCapacity, int maxCapacity);
// 3.分配堆内存ByteBuf
ByteBuf heapBuffer(int initialCapacity, int maxCapacity);
// 4.分配堆外内存DirectByteBuffer
ByteBuf directBuffer(int initialCapacity, int maxCapacity);
2.抽象实现AbstractByteBufAllocator
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf; // 返回空ByteBuf
}
validate(initialCapacity, maxCapacity); // 校验参数有效性
return newDirectBuffer(initialCapacity, maxCapacity);
}
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newHeapBuffer(initialCapacity, maxCapacity);
}
protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity);
protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity);
AbstractByteBufAllocator实际上是ByteBufAllocator的骨架实现,暴露了newHeapBuffer()、newDirectBuffer()两个抽象方法给具体实现子类UnpooledByteBufAllocator、PooledByteBufAllocator,用于这两个子类实现具体的分配动作。
三、UnpooledByteBufAllocator内存分配
1.Heap内存的分配
直接看UnpooledByteBufAllocator的newHeapBuffer()方法:
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity)
: new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
newHeapBuffer()方法根据PlatformDependent.hasUnsafe()结果分别创建了UnpooledHeapByteBuf和UnpooledUnsafeHeapByteBuf。
UnpooledHeapByteBuf
protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);
}
private UnpooledHeapByteBuf(
ByteBufAllocator alloc, byte[] initialArray, int readerIndex, int writerIndex, int maxCapacity) {
super(maxCapacity);
this.alloc = alloc;
setArray(initialArray);
setIndex(readerIndex, writerIndex); // 设置readerIndex,writerIndex
}
private void setArray(byte[] initialArray) {
array = initialArray;
tmpNioBuf = null;
}
public ByteBuf setIndex(int readerIndex, int writerIndex) {
setIndex0(readerIndex, writerIndex);
return this;
}
可见创建UnpooledHeapByteBuf并初始化时,直接new了一个字节数组并传入初始化大小(申请堆内存),然后保存字节数组引用、设置读写指针。
下面看下UnpooledHeapByteBuf读写数据的方法实现:
- 读数据
// 读取数据
public byte getByte(int index) {
ensureAccessible();
return _getByte(index);
}
protected byte _getByte(int index) {
return HeapByteBufUtil.getByte(array, index);
}
static byte getByte(byte[] memory, int index) {
return memory[index]; // 数组索引
}
从代码中可以看出,UnpooledHeapByteBuf读取字节数据时,是直接从字节数组中取出对应索引位置的数据。
- 写数据
public ByteBuf setByte(int index, int value) {
ensureAccessible();
_setByte(index, value);
return this;
}
protected void _setByte(int index, int value) {
HeapByteBufUtil.setByte(array, index, value);
}
static void setByte(byte[] memory, int index, int value) {
memory[index] = (byte) value; // 高24位忽略
}
从代码中可以看出,UnpooledHeapByteBuf写字节数据时,是直接将字节数组中对应索引位置的数据置为传入的值。
UnpooledUnsafeHeapByteBuf
UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
UnpooledUnsafeHeapByteBuf继承UnpooledHeapByteBuf类,在初始化时直接调用了父类UnpooledHeapByteBuf的构造器,同样通过new字节数组的方式申请堆内存。
下面看下UnpooledUnsafeHeapByteBuf读写数据的方法实现:
- 读数据
public byte getByte(int index) {
checkIndex(index);
return _getByte(index);
}
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(array, index);
}
static byte getByte(byte[] array, int index) {
return PlatformDependent.getByte(array, index);
}
public static byte getByte(byte[] data, int index) {
return PlatformDependent0.getByte(data, index);
}
static byte getByte(byte[] data, int index) {
return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);
}
可以看出,UnpooledUnsafeHeapByteBuf在读取字节数据时,是使用了JDK Unsafe对象,直接通过内存地址读取字节数组数据。
- 写数据
public ByteBuf setByte(int index, int value) {
checkIndex(index);
_setByte(index, value);
return this;
}
protected void _setByte(int index, int value) {
UnsafeByteBufUtil.setByte(array, index, value);
}
static void setByte(byte[] array, int index, int value) {
PlatformDependent.putByte(array, index, (byte) value);
}
public static void putByte(byte[] data, int index, byte value) {
PlatformDependent0.putByte(data, index, value);
}
static void putByte(byte[] data, int index, byte value) {
UNSAFE.putByte(data, BYTE_ARRAY_BASE_OFFSET + index, value);
}
可以看出,UnpooledUnsafeHeapByteBuf在写字节数据时,是使用了JDK Unsafe对象,直接通过内存地址写字节数据,相对于UnpooledHeapByteBuf通过数组索引的方式写数据的性能更好。
2.direct堆外内存的分配
直接看UnpooledByteBufAllocator的newDirectBuffer()方法:
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
ByteBuf buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
newDirectBuffer()方法同样根据PlatformDependent.hasUnsafe()结果分别创建了UnpooledDirectByteBuf和UnpooledUnsafeDirectByteBuf。
UnpooledDirectByteBuf
protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
this.alloc = alloc;
setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
}
private void setByteBuffer(ByteBuffer buffer) {
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(oldBuffer);
}
}
this.buffer = buffer;
tmpNioBuf = null;
capacity = buffer.remaining();
}
可见创建UnpooledDirectByteBuf并初始化时,通过ByteBuffer API申请了DirectByteBuffer堆外内存,然后保存DirectByteBuffer引用。
下面看下UnpooledDirectByteBuf读写数据的方法实现:
- 读数据
public byte getByte(int index) {
ensureAccessible();
return _getByte(index);
}
protected byte _getByte(int index) {
return buffer.get(index);
}
可以看出,UnpooledDirectByteBuf读取字节数据时,是直接通过DirectByteBuffer API获取字节数据。
- 写数据
public ByteBuf setByte(int index, int value) {
ensureAccessible();
_setByte(index, value);
return this;
}
protected void _setByte(int index, int value) {
buffer.put(index, (byte) value);
}
可以看出,UnpooledDirectByteBuf写字节数据时,也是通过DirectByteBuffer API写出字节数据。
UnpooledUnsafeDirectByteBuf
protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
this.alloc = alloc;
setByteBuffer(allocateDirect(initialCapacity), false);
}
protected ByteBuffer allocateDirect(int initialCapacity) {
return ByteBuffer.allocateDirect(initialCapacity);
}
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
if (tryFree) {
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(oldBuffer);
}
}
}
this.buffer = buffer;
// 获取DirectByteBuffer address变量的内存地址
memoryAddress = PlatformDependent.directBufferAddress(buffer);
tmpNioBuf = null;
capacity = buffer.remaining();
}
可见创建UnpooledUnsafeDirectByteBuf并初始化时,也是通过ByteBuffer API申请了DirectByteBuffer堆外内存,然后保存DirectByteBuffer引用。
需要注意的是,UnpooledUnsafeDirectByteBuf在申请到DirectByteBuffer时,会通过PlatformDependent.directBufferAddress(buffer);
代码计算DirectByteBuffer内存块起始地址,即memoryAddress
,后续读写数据的时候会用到。
memoryAddress = PlatformDependent.directBufferAddress(buffer);
public static long directBufferAddress(ByteBuffer buffer) {
return PlatformDependent0.directBufferAddress(buffer);
}
static long directBufferAddress(ByteBuffer buffer) {
// ADDRESS_FIELD_OFFSET: long address字段内存偏移
return getLong(buffer, ADDRESS_FIELD_OFFSET);
}
private static long getLong(Object object, long fieldOffset) {
return UNSAFE.getLong(object, fieldOffset);
}
下面看下UnpooledUnsafeDirectByteBuf读写数据的方法实现:
- 读数据
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(addr(index));
}
long addr(int index) {
return memoryAddress + index; // address内存地址+字节index
}
static byte getByte(long address) {
return PlatformDependent.getByte(address);
}
public static byte getByte(long address) {
return PlatformDependent0.getByte(address);
}
static byte getByte(long address) {
return UNSAFE.getByte(address);
}
读取字节数据时,首先调用addr(index)
获取实际的内存偏移,即memoryAddress + index
,然后通过JDK Unsafe对象,通过内存地址获取字节数据,相对于UnpooledDirectByteBuf通过DirectByteBuffer API获取字节数据的方式性能更好。
- 写数据
protected void _setByte(int index, int value) {
UnsafeByteBufUtil.setByte(addr(index), value);
}
static void setByte(long address, int value) {
PlatformDependent.putByte(address, (byte) value);
}
public static void putByte(long address, byte value) {
PlatformDependent0.putByte(address, value);
}
static void putByte(long address, byte value) {
UNSAFE.putByte(address, value);
}
可以看到,写数据时同样先获取index索引实际的内存地址,然后通过JDK Unsafe对象,根据内存地址直接完成数据设置。
四、PooledByteBufAllocator内存分配
下面以堆外内存的分配为例,分析PooledByteBufAllocator的内存分配流程。
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity); // 分配池化的堆外内存
} else {
if (PlatformDependent.hasUnsafe()) {
buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}
return toLeakAwareBuffer(buf);
}
首先通过threadCache(PoolThreadLocalCache)
拿到PoolThreadCache对象。下面看PoolThreadLocalCache的实现:
// PooledByteBufAllocator字段
private final PoolArena<byte[]>[] heapArenas;
private final PoolArena<ByteBuffer>[] directArenas;
private final int tinyCacheSize;
private final int smallCacheSize;
private final int normalCacheSize;
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
@Override
protected synchronized PoolThreadCache initialValue() {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); // 选择被NIO线程使用最少的PoolArena
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
@Override
protected void onRemoval(PoolThreadCache threadCache) {
threadCache.free();
}
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}
PoolArena<T> minArena = arenas[0];
for (int i = 1; i < arenas.length; i++) {
PoolArena<T> arena = arenas[i];
if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
minArena = arena;
}
}
return minArena;
}
}
可知PoolThreadLocalCache为FastThreadLocal实现,因此调用threadCache.get()
获取PoolThreadCache时,不同NIO线程拿到的是各自的PoolThreadCache,这样使得多线程内存分配减少了竞争。
然后调用PoolThreadCache.directArena;
获取堆外内存相关的PoolArena,再调用PoolArena.allocate()
方法进行内存分配。从PoolThreadLocalCache的initialValue()方法获取初始化的PoolThreadCache可以看出,PoolArena实际上由PooledByteBufAllocator进行管理,因此NIO线程最终拿到的也是PooledByteBufAllocator管理的PoolArena。
1.PoolThreadCache结构
上图为PoolThreadCache的主要结构,PoolThreadCache主要由PoolArena和MemoryRegionCache两部分组成,PoolArena表示分配内存时需要进行实际分配的内存,MemoryRegionCache表示已缓存的内存,可以直接拿来用。
2.分配堆外内存的流程
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity); // 从Recycler对象池中获取PooledByteBuf
allocate(cache, buf, reqCapacity); // 分配内存
return buf;
}
newByteBuf(maxCapacity)
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
if (HAS_UNSAFE) {
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity); // 这里!!
} else {
return PooledDirectByteBuf.newInstance(maxCapacity);
}
}
// 对象池
private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
@Override
protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
return new PooledUnsafeDirectByteBuf(handle, 0); // 新建PooledUnsafeDirectByteBuf对象
}
};
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
PooledUnsafeDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity); // 重用PooledUnsafeDirectByteBuf之前,初始化设置maxCapacity等参数
return buf;
}
首先获取PooledUnsafeDirectByteBuf实例。可知获取PooledUnsafeDirectByteBuf时,是通过Recycler对象池进行获取的。如果对象池中PooledUnsafeDirectByteBuf可以直接拿来复用,则直接返回。否则Recycler对象池需要调用newObject()新建一个PooledUnsafeDirectByteBuf实例返回。使用对象池技术,是为了减少对象的创建、减少GC。
allocate(cache, buf, reqCapacity)
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) { // normCapacity < pageSize 8kB
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity); // 是否小于512B
if (tiny) { // < 512B
// 1.首先在已存在的缓存buffer(MemoryRegionCache)中分配
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on 能够从缓存中进行分配
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
// 1.首先在已存在的缓存buffer(MemoryRegionCache)中分配
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
// 2.否则尝试在内存堆subpage上分配, head表示头部
final PoolSubpage<T> head = table[tableIdx];
// Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)}
// and {@link PoolChunk#free(long)} may modify the doubly linked list as well.
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) { // 指向自己,表示subpage链表为空
assert s.doNotDestroy && s.elemSize == normCapacity; // 断言大小匹配
long handle = s.allocate(); // 分配内存,返回handle: bitmap索引 + memoryMapIdx
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, handle, reqCapacity); // ByteBuf初始化
if (tiny) { // 统计
allocationsTiny.increment();
} else {
allocationsSmall.increment();
}
return;
}
}
allocateNormal(buf, reqCapacity, normCapacity); // subpage级别的内存分配
return;
}
if (normCapacity <= chunkSize) { // <=16MB
// 1.首先在已存在的缓存buffer中分配
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
// 2.否则在内存堆上分配
allocateNormal(buf, reqCapacity, normCapacity); // page级别的内存分配
} else { // >16MB
// Huge allocations are never served via the cache so just call allocateHuge
// 在内存堆上分配
allocateHuge(buf, reqCapacity);
}
}
以上代码便是在PoolThreadCache上进行内存分配的主要流程。
对于申请的reqCapacity
内存大小,首先进行规格化:
final int normCapacity = normalizeCapacity(reqCapacity);
// 内存大小规格化
int normalizeCapacity(int reqCapacity) {
if (reqCapacity < 0) {
throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
}
if (reqCapacity >= chunkSize) { // >16MB,表示为huge,不作处理,直接返回
return reqCapacity;
}
if (!isTiny(reqCapacity)) { // >= 512,small/normal
// Doubled
int normalizedCapacity = reqCapacity;
normalizedCapacity --;
normalizedCapacity |= normalizedCapacity >>> 1;
normalizedCapacity |= normalizedCapacity >>> 2;
normalizedCapacity |= normalizedCapacity >>> 4;
normalizedCapacity |= normalizedCapacity >>> 8;
normalizedCapacity |= normalizedCapacity >>> 16;
normalizedCapacity ++;
// 到这里normalizedCapacity已经为大于等于reqCapacity的2的幂次
if (normalizedCapacity < 0) {
normalizedCapacity >>>= 1; // 无符号右移1位
}
return normalizedCapacity;
}
// tiny
// Quantum-spaced
if ((reqCapacity & 15) == 0) { // reqCapacity低4位为0,直接返回,因为已经为16的倍数。
return reqCapacity;
}
return (reqCapacity & ~15) + 16; // reqCapacity去除低4位,加16字节,确保返回值是16字节的倍数
}
从allocate()方法和normalizeCapacity()方法的逻辑可以看出,请求分配的内存分为tiny、small、normal、huge四种:tiny:0-512B;small:512B-8K;normal:8k-16M;huge:>=16MB。经过normalizeCapacity()方法规格化请求分配的内存后,得到normCapacity
:tiny:16B的整数倍,小于512B;small:512B、1K、2K、4K;normal:8K、16K…到16MB;huge:>=16MB。
其次根据规格化后的内存大小(tiny、small、normal、huge四种),完成内存分配。
-
tiny、small
首先在已存在的缓存MemoryRegionCache中分配,否则尝试在内存堆subpage上分配,如果subpage链表为空,则调用allocateNormal()方法完成实际的内存分配。
-
normal
首先在已存在的缓存MemoryRegionCache中分配,否则调用allocateNormal()方法完成实际的内存分配。
-
huge
调用allocateHuge()方法在内存堆上进行实际的内存分配。
3.内存规格大小
从上面的分析可以看出,经过规格化之后的内存大小可以分为tiny、small、normal、huge四种:tiny:0-512B;small:512B-8K;normal:8k-16M;huge:>=16MB。
Netty在首次分配内存时,先申请PoolChunk,大小为16MB的内存。PoolChunK细分为2048个page,每个page大小为8kB。如果申请的内存小于8kB,则page将细分为PoolSubpage,如上图所示。
下面将按照上面所述的流程,分①命中缓存MemoryRegionCache的内存分配、②page级别的内存分配、③subPage级别的内存分配、④PoolByteBuf的回收这四部分内容详细分析PooledByteBufAllocator堆外内存的分配与回收。
五、命中缓存MemoryRegionCache的内存分配
1.MemoryRegionCache缓存结构
// PoolThreadCache
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches; // 32
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches; // 4
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches; // 3
private abstract static class MemoryRegionCache<T> {
private final int size;
private final Queue<Entry<T>> queue;
private final SizeClass sizeClass;
private int allocations;
MemoryRegionCache(int size, SizeClass sizeClass) {
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size); // 找到下一个大于等于size的power of 2
queue = PlatformDependent.newFixedMpscQueue(this.size); // 队列
this.sizeClass = sizeClass;
}
// ...
}
MemoryRegionCache表示缓存的内存,即当ByteBuf调用release()方法回收内存时,会根据内存的大小放到对应的MemoryRegionCache中,以便下次申请相同大小的内存时可以快速分配。
MemoryRegionCache分为三种类型:tinySubPageDirectCaches、smallSubPageDirectCaches和normalDirectCaches,数组大小分别为32(第0个位置不使用)、4、3。从上面两幅图可以看出,tinySubPageDirectCaches对应tiny的内存缓存,大小范围为16B到496B,共31种;smallSubPageDirectCaches对应small的内存缓存,大小范围为512B、1k、2k、4k,共4种;normalDirectCaches对应normal的内存缓存,大小范围为8k、16k、32k,共3种。
每个MemoryRegionCache中有一个队列Queue<Entry<T>> queue
,以及字段SizeClass sizeClass
标识缓存的内存类型,如tiny、small、normal。队列用于缓存内存的地址,用Entry标识:
static final class Entry<T> {
final Handle<Entry<?>> recyclerHandle;
PoolChunk<T> chunk;
long handle = -1; // 指向chunk的唯一一段连续内存
Entry(Handle<Entry<?>> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
void recycle() {
chunk = null;
handle = -1;
recyclerHandle.recycle(this);
}
}
Entry中chunk标识缓存的内存来自于哪个chunk,handle指向chunk的唯一一段连续内存,这样就能确定唯一一块内存。
2.命中缓存的内存分配流程
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) { // normCapacity < pageSize 8kB
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity); // 是否小于512B
if (tiny) { // < 512B
// 1.首先在已存在的缓存buffer(MemoryRegionCache)中分配
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on 能够从缓存中进行分配
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
// ...
}
}
//...
}
// 从MemoryRegionCache缓存中进行分配内存
cache.allocateTiny(this, buf, reqCapacity, normCapacity)
下面以cache.allocateTiny()方法为例,分析命中缓存的内存分配流程。
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
找到对应size的MemoryRegionCache
cacheForTiny(area, normCapacity):
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity); // 获取tinySubPageDirectCaches数组索引
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
首先调用PoolArena.tinyIdx(normCapacity)
,根据normCapacity获取tinySubPageDirectCaches中对应MemoryRegionCache的索引,然后再调用cache(tinySubPageDirectCaches, idx)
获取对应normCapacity的MemoryRegionCache:
private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
if (cache == null || idx > cache.length - 1) { // 超出数组索引,返回null
return null;
}
return cache[idx];
}
从MemoryRegionCache的queue中弹出一个Entry给PooledByteBuf初始化(PoolChunk、handle)
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
// no cache found so just return false here 没有找到cache,返回false
return false;
}
boolean allocated = cache.allocate(buf, reqCapacity); // 分配已缓存的内存
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim();
}
return allocated;
}
看下 cache.allocate(buf, reqCapacity)
的逻辑:
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
Entry<T> entry = queue.poll(); // 从队列中取出Entry,拿到该Entry对应的chunk和handle
if (entry == null) {
return false;
}
initBuf(entry.chunk, entry.handle, buf, reqCapacity); // ByteBuf初始化
entry.recycle(); // 将弹出(用完)的Entry扔到对象池中复用,置chunk=null,handle=-1
++ allocations;
return true;
}
通过queue.poll()
从队列中取出Entry对象,然后初始化PooledByteBuf。
initBuf(entry.chunk, entry.handle, buf, reqCapacity); // ByteBuf初始化
private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
super(size, sizeClass);
}
@Override
protected void initBuf(
PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBufWithSubpage(buf, handle, reqCapacity);
}
}
// PoolChunk
private final PoolSubpage<T>[] subpages; // 大小:2048
void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int reqCapacity) {
initBufWithSubpage(buf, handle, bitmapIdx(handle), reqCapacity);
}
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
assert bitmapIdx != 0;
int memoryMapIdx = memoryMapIdx(handle); // page 索引
// subpageIdx(memoryMapIdx):获取page相对于最左端的偏移,如2048,为0;2049,则为1(在深度11这一层)
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)]; // 获取对应的subpage
assert subpage.doNotDestroy;
assert reqCapacity <= subpage.elemSize;
// runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize: 计算分配到的内存相对于当前PoolChunk的偏移字节数
// - runOffset(memoryMapIdx): 获取当前分配到的内存对应的page,相对于当前PoolChunk的偏移字节数,
// - (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize:获取当前page中,分配到的内存相对于page的偏移字节数
buf.init( // ByteBuf初始化
this, handle,
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize,
arena.parent.threadCache());
}
初始化PooledByteBuf时,首先通过handle获取PoolChunk中对应page的memoryMapIdx:
private static int memoryMapIdx(long handle) {
return (int) handle; // 低32位为memoryMap索引
}
在下面subPage级别的内存分配将看到,该long类型handle由高32位bitmapIdx和低32位memoryMapIdx组成。memoryMapIdx表示该PoolChunk中对应page的索引,bitmapIdx表示该page中子subpage的内存区域的索引。
然后调用subpages[subpageIdx(memoryMapIdx)]
获取PoolSubpage,即对应page细分之后(比如细分为1kB)创建的对象。最后初始化PooledByteBuf。
// PooledByteBuf
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
assert handle >= 0;
assert chunk != null;
this.chunk = chunk; // 内存块
this.handle = handle; // 地址
memory = chunk.memory; // DirectByteBuf/byte[]
this.offset = offset; // 内存偏移(相对于chunk内存块)
this.length = length; // 请求分配的大小
this.maxLength = maxLength; // handle对应的内存的最大字节大小
tmpNioBuf = null;
this.cache = cache; // PoolThreadCache
}
// PooledUnsafeDirectByteBuf
void init(PoolChunk<ByteBuffer> chunk, long handle, int offset, int length, int maxLength,
PoolThreadCache cache) {
// 调用PooledByteBuf构造器
super.init(chunk, handle, offset, length, maxLength, cache);
initMemoryAddress(); // 计算内存实际起始位置
}
private void initMemoryAddress() {
// 一个DirectByteBuf(memory)对应一个PoolChunk,offset为相对于PoolChunk的偏移字节数,
// PlatformDependent.directBufferAddress(memory)得到DirectByteBuf实际内存起始地址
memoryAddress = PlatformDependent.directBufferAddress(memory) + offset; // 得到该ByteBuf实际内存地址
}
可以看到,PooledUnsafeDirectByteBuf在初始化调用父类PooledByteBuf构造器保存各种参数之后,还调用initMemoryAddress()
计算该内存的实际起始位置,便于后续直接通过JDK Unsafe对象和内存地址读写数据。
将弹出的Entry扔到对象池Recycler中复用
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
Entry<T> entry = queue.poll(); // 从队列中取出Entry,拿到该Entry对应的chunk和handle
if (entry == null) {
return false;
}
initBuf(entry.chunk, entry.handle, buf, reqCapacity); // ByteBuf初始化
entry.recycle(); // 将弹出(用完)的Entry扔到对象池中复用,置chunk=null,handle=-1
++ allocations;
return true;
}
在初始化PooledByteBuf之后,需要回收Entry对象到对象池中,以便后续复用。
void recycle() {
chunk = null;
handle = -1;
recyclerHandle.recycle(this);
}
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
stack.push(this); // 对象回收时将DefaultHandle放入栈中
}
recycle()方法中主要重置了chunk、handle字段的值,然后将当前Entry放到了对象池中。该对象会在ByteBuf回收时取出并给字段重新赋值,便于后续分配相同大小的ByteBuf时,直接从缓存的内存中取出使用。
private static Entry newEntry(PoolChunk<?> chunk, long handle) {
Entry entry = RECYCLER.get(); // 从对象池中获取
entry.chunk = chunk;
entry.handle = handle;
return entry;
}
六、page级别的内存分配:allocateNormal
1.PoolArena、PoolChunk、page、PoolSubpage的概念和结构
PoolArena
enum SizeClass {
Tiny,
Small,
Normal
}
private final PoolSubpage<T>[] tinySubpagePools; // 大小:32
private final PoolSubpage<T>[] smallSubpagePools; // 大小:4
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;
从上面可以看到,PoolArena主要由PoolChunkList和PoolSubpage两部分组成,PoolSubpage用于subpage的内存分配,PoolChunkList用于page和subpage的内存分配。在PoolArena里面涉及到PoolChunkList和PoolSubpage对应的结构有PoolChunk和PoolSubpage两个。
PoolChunkList之间使用双向链表连接,单个PoolChunkList由PoolChunk双向链表构成。tinySubpagePools、smallSubpagePools中的PoolSubpage引用,同样指向PoolSubpage构成的双向链表。
PoolChunk
第一次申请内存的时候,PoolChunkList、PoolSubpage都是默认值(为空),需要创建一个PoolChunk,默认一个PoolChunk是16MB。内部结构是完全二叉树,一共有4096个节点,有2048个叶子节点(每个叶子节点大小为一个page,就是8k),非叶子节点的内存大小等于左子树内存大小加上右子树内存大小。
完全二叉树结构如下:
上图中page0到page2047的叶子节点,每个大小8kB。
这颗完全二叉树在PoolChunk中是使用数组来进行表示的。
// Generate the memory map. (重点)
memoryMap = new byte[maxSubpageAllocs << 1]; // maxSubpageAllocs: 2048, memoryMap.length: 4096
depthMap = new byte[memoryMap.length];
int memoryMapIndex = 1;
for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
int depth = 1 << d;
for (int p = 0; p < depth; ++ p) {
// in each level traverse left to right and set value to the depth of subtree
memoryMap[memoryMapIndex] = (byte) d;
depthMap[memoryMapIndex] = (byte) d;
memoryMapIndex ++;
}
}
depthMap
的值初始化后不再改变,表示这4096个节点各自的深度;memoryMap
的值则随着节点分配而改变,初始值为树的深度,从0开始。初始化时,深度为0的节点表示可以分配16MB,深度为1的节点可以分配8MB,深度为11的节点可以分配8KB。如果该节点已经分配完成,就设置为12即可。
page、PoolSubpage
从PoolChunk的结构可知,page表示完全二叉树的叶子节点,大小为8kB。如果申请的内存小于8KB,则page会被细分为subPage。比如申请的内存是2KB,则page被细分为4个2KB的subPage,对应的对象就是PoolSubpage。
page细分时创建的PoolSubpage也会加入到PoolArena维护的tinySubpagePools、smallSubpagePools的双向链表结构中,便于subpage级别的内存分配时快速索引:
2.page级别的内存分配:allocateNormal
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
// ...省略
if (normCapacity <= chunkSize) { // <=16MB
// 1.首先在已存在的缓存buffer中分配
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
// 2.否则在内存堆上分配(这里!!!)
allocateNormal(buf, reqCapacity, normCapacity); // page级别的内存分配
}
}
下面看allocateNormal()进行page级别的内存分配的逻辑。
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
// 1.先在PoolChunkList中已存在的PoolChunk上分配(normal/subpage)
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
++allocationsNormal; // 分配成功,返回
return;
}
// Add a new chunk. 2.否则创建一个PoolChunk进行分配
// pageSize: 8kB, maxOrder:11, pageShifts: 13, chunkSize: 16MB
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
long handle = c.allocate(normCapacity); // 分配内存,返回id(normal: memoryMapIdx, subpage: bitmapIdx+memoryMapIdx)
++allocationsNormal;
assert handle > 0;
c.initBuf(buf, handle, reqCapacity); // 初始化ByteBuf
qInit.add(c); // 添加到PoolChunkList
}
尝试在现有的chunk上分配
首先在PoolChunkList中现有的PoolChunk中进行内存分配:
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
// PoolChunkList为空,或者请求的capacity太大,PoolChunkList中的PoolChunk无法分配
if (head == null || normCapacity > maxCapacity) {
return false;
}
for (PoolChunk<T> cur = head;;) {
long handle = cur.allocate(normCapacity);
if (handle < 0) { // cur无法分配
cur = cur.next;
if (cur == null) { // 该PoolChunkList中的所有PoolChunk都无法分配,返回false
return false;
}
} else { // cur可以分配
cur.initBuf(buf, handle, reqCapacity); // ByteBuf初始化
if (cur.usage() >= maxUsage) { // 判断使用率
remove(cur); // 从当前chink list移除
nextList.add(cur); // 加入到下一个chunk list
}
return true; // 分配成功
}
}
}
对于某个PoolChunkList,如果PoolChunkList为空(head=null),或者请求的capacity太大,则PoolChunkList中的PoolChunk无法分配,直接返回false。否则遍历该PoolChunkList中的PoolChunk,只要有一个分配内存成功,就返回true。
下面看下内存分配过程:
long allocate(int normCapacity) {
if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize 8kB
return allocateRun(normCapacity); // normal分配
} else {
return allocateSubpage(normCapacity); // subpage分配
}
}
allocate()方法根据normCapacity的不同,完成normal内存和subpage内存的分配。由于这里主要分析page级别的内存分配,因此进入allocateRun()方法:
private long allocateRun(int normCapacity) {
int d = maxOrder - (log2(normCapacity) - pageShifts); // 计算平衡二叉树深度
int id = allocateNode(d);
if (id < 0) { // 不可用,返回
return id;
}
freeBytes -= runLength(id); // 计算剩余可用字节数,runLength(id): 计算id对应节点的字节数
return id;
}
allocateRun()方法中首先根据normCapacity计算出完全二叉树节点所在的树深度,然后根据树深度调用allocateNode(d)
分配内存:
private int allocateNode(int d) {
int id = 1;
int initial = - (1 << d); // has last d bits = 0 and rest all = 1
byte val = value(id);
if (val > d) { // unusable 不可用
return -1;
}
while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
id <<= 1; // 向下层遍历
val = value(id);
if (val > d) {
id ^= 1; // 计算兄弟节点id
val = value(id);
}
}
byte value = value(id); // 得到满足分配要求的id
assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
value, id & initial, d);
setValue(id, unusable); // mark as unusable id指向的内存段标记为不可用
updateParentsAlloc(id); // 更新
return id;
}
allocateNode()方法是page级别内存分配时,PoolChunk的完全二叉树节点查找算法的实现,读者需要仔细debug来理解其节点查找流程。找到了对应的节点id,即表示分配了对应的内存。id小于0,表示分配失败。
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if (head == null || normCapacity > maxCapacity) { // PoolChunkList为空,或者请求的capacity太大,PoolChunkList中的PoolChunk无法分配
return false;
}
for (PoolChunk<T> cur = head;;) {
long handle = cur.allocate(normCapacity);
if (handle < 0) { // cur无法分配
cur = cur.next;
if (cur == null) { // 该PoolChunkList中的所有PoolChunk都无法分配,返回false
return false;
}
} else { // cur可以分配
cur.initBuf(buf, handle, reqCapacity); // ByteBuf初始化
if (cur.usage() >= maxUsage) { // 判断使用率
remove(cur); // 从当前chink list移除
nextList.add(cur); // 加入到下一个chunk list
}
return true; // 分配成功
}
}
}
PoolChunkList.allocate()方法在调用cur.allocate(normCapacity)
代码分配内存之后,得到handle,即PoolChunk节点id。
初始化PooledByteBuf
// PoolChunk
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
int memoryMapIdx = memoryMapIdx(handle);
int bitmapIdx = bitmapIdx(handle); // 该变量只在subpage分配时不为0
if (bitmapIdx == 0) { // normal
byte val = value(memoryMapIdx);
assert val == unusable : String.valueOf(val);
// runOffset(memoryMapIdx): chunk内存的字节偏移, runLength(memoryMapIdx): memoryMapIdx对应那块内存的最大值
buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx),
arena.parent.threadCache());
} else {
initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity); // 根据subPage初始化ByteBuf
}
}
PooledByteBuf初始化时,首先通过memoryMapIdx(handle)
获取handle对应的PoolChunk完全二叉树节点在memoryMap数组中的索引,此时bitmapIdx(handle)
为0。下面进行PooledByteBuf真正的初始化:
// runOffset(memoryMapIdx): chunk内存的字节偏移, runLength(memoryMapIdx): memoryMapIdx对应那块内存的最大值
buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx),
arena.parent.threadCache());
PooledByteBuf(PooledUnsafeByteBuf)的初始化过程在前面命中缓存的分配流程中已经详细分析,这里不再介绍。
创建一个chunk进行内存分配
如果通过PoolChunkList在现有的chunk上无法完成内存分配,比如第一次分配内存时,此时需要创建一个chunk进行内存分配。
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
// 1.先在PoolChunkList中已存在的PoolChunk上分配(normal/subpage)
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
++allocationsNormal; // 分配成功,返回
return;
}
// Add a new chunk. 2.否则创建一个PoolChunk进行分配
// pageSize: 8kB, maxOrder:11, pageShifts: 13, chunkSize: 16MB
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
long handle = c.allocate(normCapacity); // 分配内存,返回id(normal: memoryMapIdx, subpage: bitmapIdx+memoryMapIdx)
++allocationsNormal;
assert handle > 0;
c.initBuf(buf, handle, reqCapacity); // 初始化ByteBuf
qInit.add(c); // 添加到PoolChunkList
}
创建PoolChunk:
// DirectArena
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
return new PoolChunk<ByteBuffer>(
this, allocateDirect(chunkSize), // 向操作系统申请DirectByteBuffer内存
pageSize, maxOrder, pageShifts, chunkSize);
}
调用newChunk创建PoolChunk时,首先调用 allocateDirect(chunkSize)
向操作系统申请DirectByteBuffer内存,然后初始化PoolChunk并返回。
private static ByteBuffer allocateDirect(int capacity) {
return PlatformDependent.useDirectBufferNoCleaner() ? // true
PlatformDependent.allocateDirectNoCleaner(capacity) : ByteBuffer.allocateDirect(capacity);
}
初始化PoolChunk:
PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
unpooled = false;
this.arena = arena;
this.memory = memory; // DirectByteBuffer/byte[]
this.pageSize = pageSize; // 8kB
this.pageShifts = pageShifts; // 13
this.maxOrder = maxOrder; // 11
this.chunkSize = chunkSize; // 16MB
unusable = (byte) (maxOrder + 1); // 12表示不可用
log2ChunkSize = log2(chunkSize); // 24
subpageOverflowMask = ~(pageSize - 1); // -pageSize: 8192, 8kB
freeBytes = chunkSize; // 16MB
assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;
maxSubpageAllocs = 1 << maxOrder; // maxOrder: 11
// Generate the memory map. (重点)
memoryMap = new byte[maxSubpageAllocs << 1]; // maxSubpageAllocs: 2048, memoryMap.length: 4096
depthMap = new byte[memoryMap.length];
int memoryMapIndex = 1;
for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
int depth = 1 << d;
for (int p = 0; p < depth; ++ p) {
// in each level traverse left to right and set value to the depth of subtree
memoryMap[memoryMapIndex] = (byte) d;
depthMap[memoryMapIndex] = (byte) d;
memoryMapIndex ++;
}
}
subpages = newSubpageArray(maxSubpageAllocs); // 2048个subpage对象
}
在创建PoolChunk完毕后,调用PoolChunk.allocate(normCapacity)
分配内存并初始化PooledByteBuf,该过程与通过PoolChunkLIst在现有的chunk上分配内存的逻辑相同,这里不再说明。
七、subPage级别的内存分配
// PoolChunk
long allocate(int normCapacity) {
if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize 8kB
return allocateRun(normCapacity); // normal分配
} else {
return allocateSubpage(normCapacity); // subpage分配
}
}
假设申请的内存小于page 8KB,则此时将进入allocateSubpage(normCapacity)
逻辑。
// PoolChunk
private long allocateSubpage(int normCapacity) {
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
synchronized (head) {
int d = maxOrder; // subpage内存分配,只在leaves节点 --> 11
int id = allocateNode(d); // 分配page
if (id < 0) { // 无法分配
return id;
}
final PoolSubpage<T>[] subpages = this.subpages; // subpages.length: 2048
final int pageSize = this.pageSize; // 8kB
freeBytes -= pageSize; // 更新可用字节数
int subpageIdx = subpageIdx(id); // subpage索引,当前page索引对应的subpage对象
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
// 创建PoolSubpage, 会添加到head链表
// runOffset(id): 计算id对应page相对于该PoolChunk的字节偏移
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;
} else {
subpage.init(head, normCapacity);
}
return subpage.allocate(); // subpage内存分配
}
}
subPage级别的内存分配分为定位到SubPage、初始化SubPage、初始化PoolByteBuf三部分。在定位到PoolSubpage之前,执行arena.findSubpagePoolHead(normCapacity)
获取该PoolSubpage将会插入到的双向链表的head,该head由PoolArena进行管理:
// PoolArena
PoolSubpage<T> findSubpagePoolHead(int elemSize) {
int tableIdx;
PoolSubpage<T>[] table;
if (isTiny(elemSize)) { // < 512byte
tableIdx = elemSize >>> 4; // elemSize/16
table = tinySubpagePools;
} else {
tableIdx = 0;
elemSize >>>= 10;
while (elemSize != 0) {
elemSize >>>= 1;
tableIdx ++;
}
table = smallSubpagePools;
}
return table[tableIdx];
}
findSubpagePoolHead()方法根据normCapacity找到对应双向链表的head,双向链表的head已在PoolArena初始化时构造完成:
// PoolArena
private final PoolSubpage<T>[] tinySubpagePools;
private final PoolSubpage<T>[] smallSubpagePools;
protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
// ...省略
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools); // 数目 32
for (int i = 0; i < tinySubpagePools.length; i ++) {
tinySubpagePools[i] = newSubpagePoolHead(pageSize); // 创建head PoolSubpage
}
numSmallSubpagePools = pageShifts - 9; // 4
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = newSubpagePoolHead(pageSize);
}
// ...省略
}
private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
head.prev = head;
head.next = head;
return head;
}
1.定位到SubPage
int d = maxOrder; // subpage内存分配,只在leaves节点 --> 11
int id = allocateNode(d); // 分配page
if (id < 0) { // 无法分配
return id;
}
final PoolSubpage<T>[] subpages = this.subpages; // subpages.length: 2048
final int pageSize = this.pageSize; // 8kB
freeBytes -= pageSize; // 更新可用字节数
int subpageIdx = subpageIdx(id); // subpage索引,当前page索引对应的subpage对象
PoolSubpage<T> subpage = subpages[subpageIdx];
由于subpage级别的内存分配只在PoolChunk完全二叉树的叶子节点上进行分配,因此这里直接调用了allocateNode(d)
得到page叶子节点的id(编号)。然后根据id调用subpageIdx(id)
获取PoolSubpage对象在subpages数组中的索引:
private int subpageIdx(int memoryMapIdx) {
return memoryMapIdx ^ maxSubpageAllocs; // remove highest set bit, to get offset
}
然后取出PoolSubpage对象,第一次分配内存时,PoolSubpage对象为null。
2.初始化SubPage
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
// 创建PoolSubpage, 会添加到head链表
// runOffset(id): 计算id对应page相对于该PoolChunk的字节偏移
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;
} else {
subpage.init(head, normCapacity);
}
由于subpage
为null,则创建一个新的PoolSubpage,并设置到subpages
数组。
PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
this.chunk = chunk; // 所属chunk
this.memoryMapIdx = memoryMapIdx; // page id
this.runOffset = runOffset; // id对应page相对于chunk的内存偏移
this.pageSize = pageSize; // 8kB
// 这里bitmap长度8已足够,因为elemSize最小为16B,一个page 8kB,最多可以分为512份,这512份可以用8个long类型数表示,
// 一个long类型数表示64份,每一个bit位表示每一份的分配情况
bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64 = 8
init(head, elemSize);
}
void init(PoolSubpage<T> head, int elemSize) {
doNotDestroy = true;
this.elemSize = elemSize;
if (elemSize != 0) {
maxNumElems = numAvail = pageSize / elemSize; // page划分的份数
nextAvail = 0;
bitmapLength = maxNumElems >>> 6; // 计算需要使用的bitmap long类型数个数,即bitmapLength
if ((maxNumElems & 63) != 0) {
bitmapLength ++;
}
for (int i = 0; i < bitmapLength; i ++) {
bitmap[i] = 0; // 初始化,bitmap一个long类型元素可表示64个内存段的使用情况(一个long 64位)
}
}
addToPool(head); // 添加到subpage链表
}
PoolSubpage初始化时,会保存chunk、memoryMapIdx等字段的值,同时创建了一个long[] bitmap数组,该数组长度为8,因为每个long型元素一共有64位,每一位可表示大小为elemSize的内存段的使用情况,因此bitmap数组一共最大可表示512个大小为elemSize的内存段的使用情况。这是通过elemSize为16B(最小)时计算出来的,8KB/16B=512, 512/64=8。
PoolSubpage初始化时同时调用了init(head, elemSize);
,在该方法中计算了maxNumElems、numAvail、bitmapLength等值,bitmapLength表示实际bitmap数组中用到的long型元素个数,并根据bitmapLength初始化bitmap。在init方法的最后,调用addToPool(head)
将当前PoolSubpage对象添加到head开头的PoolSubpage双向链表中。
private void addToPool(PoolSubpage<T> head) {
assert prev == null && next == null;
prev = head; // 头插法插入
next = head.next;
next.prev = this;
head.next = this;
}
初始化PoolSubpage之后,同时会将该对象设置到PoolChunk管理的PoolSubpage<T>[] subpages
数组对应索引位置处:
subpages[subpageIdx] = subpage;
3.分配内存
subpage.allocate();
在获取到PoolSubpage之后,调用其allocate()方法分配内存。
long allocate() {
final int bitmapIdx = getNextAvail(); // 内存段位置,如67=64+3
int q = bitmapIdx >>> 6; // bitmap数组索引,如1
int r = bitmapIdx & 63; // bitmap某个数组元素64位中第几位,如3
assert ((bitmap[q] >>> r) & 1) == 0; // 断言该内存段空闲
bitmap[q] |= 1L << r; // 或操作,赋值
if (-- numAvail == 0) { // 可用内存段减1
removeFromPool(); // 将自己从链表移除
}
return toHandle(bitmapIdx);
}
allocate()方法获取下一个可用的细分内存段位置bitmapIdx,同时根据该bitmapIdx值更新bitmap数组中的元素值,主要是修改某个long型元素的某一个bit位值,标记该内存段已被使用。在标记之后,调用toHandle(bitmapIdx)
将bitmapIdx转换成handle返回:
private long toHandle(int bitmapIdx) { // bitmapIdx + memoryMapIdx
return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
}
可以看到,返回的long型 handle值主要由两部分构成,高32位bitmapIdx,低32位memoryMapIdx。memoryMapIdx表示当前PoolSubpage对应的PoolChunk完全二叉树叶子节点page在memoryMap数组中的位置,即编号id;bitmapIdx表示该page细分成更小的内存段后,当前分配给PooledByteBuf的内存段在所有细分内存段中的位置。
4.初始化PoolByteBuf
在subpage级别的内存分配完毕之后,下面进入PooledByteBuf的初始化:
// PoolArena
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
// ...省略
// Add a new chunk. 否则创建一个PoolChunk进行分配
// pageSize: 8kB, maxOrder:11, pageShifts: 13, chunkSize: 16MB
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
long handle = c.allocate(normCapacity); // 分配内存,返回id(normal: memoryMapIdx, subpage: bitmapIdx+memoryMapIdx)
++allocationsNormal;
assert handle > 0;
c.initBuf(buf, handle, reqCapacity); // 初始化ByteBuf
qInit.add(c); // 添加到PoolChunkList
}
c.initBuf(buf, handle, reqCapacity); // 初始化ByteBuf
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
int memoryMapIdx = memoryMapIdx(handle);
int bitmapIdx = bitmapIdx(handle); // 该变量只在subpage分配时不为0
if (bitmapIdx == 0) { // normal
byte val = value(memoryMapIdx);
assert val == unusable : String.valueOf(val);
// runOffset(memoryMapIdx): chunk内存的字节偏移, runLength(memoryMapIdx): memoryMapIdx对应那块内存的最大值
buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx),
arena.parent.threadCache());
} else {
initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity); // 根据subPage初始化ByteBuf
}
}
initBuf()方法中由于handle值主要由高32位bitmapIdx、低32位memoryMapIdx两部分构成,因此bitmapIdx(handle)
返回值bitmapIdx不为0:
private static int memoryMapIdx(long handle) {
return (int) handle; // 低32位为memoryMap索引,得到page id
}
private static int bitmapIdx(long handle) { // 得到subpage细分内存段偏移
return (int) (handle >>> Integer.SIZE);
此时进入initBufWithSubpage()方法:
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
assert bitmapIdx != 0;
int memoryMapIdx = memoryMapIdx(handle); // page 索引
// subpageIdx(memoryMapIdx):获取page相对于最左端的偏移,如2048,为0;2049,则为1(在深度11这一层)
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)]; // 获取对应的subpage
assert subpage.doNotDestroy;
assert reqCapacity <= subpage.elemSize;
// runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize: 计算分配到的内存相对于当前PoolChunk的偏移字节数
// - runOffset(memoryMapIdx): 获取当前分配到的内存对应的page,相对于当前PoolChunk的偏移字节数,
// - (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize:获取当前page中,分配到的内存相对于page的偏移字节数
buf.init( // ByteBuf初始化
this, handle,
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize,
arena.parent.threadCache());
}
initBufWithSubpage()方法中首先根据handle获取page memoryMapIdx,然后根据memoryMapIdx得到PoolSubpage。然后计算分配到的内存相对于当前PoolChunk的偏移字节数:
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize
计算时runOffset(memoryMapIdx)
获取当前分配到的内存对应的page,相对于当前PoolChunk的偏移字节数;(bitmapIdx & 0x3FFFFFFF) * subpage.elemSize
获取当前page中,分配到的内存相对于page的偏移字节数,最终得到的是分配到的内存相对于当前PoolChunk的偏移字节数。然后初始化PooledByteBuf:
// PooledByteBuf
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
assert handle >= 0;
assert chunk != null;
this.chunk = chunk; // 内存块
this.handle = handle; // bitmapIdx+momorymapIdx
memory = chunk.memory; // DirectByteBuf
this.offset = offset; // 内存偏移(相对于chunk内存块)
this.length = length; // 请求分配的大小
this.maxLength = maxLength; // handle对应的内存的最大字节大小
tmpNioBuf = null;
this.cache = cache; // PoolThreadCache
}
// PooledUnsafeDirectByteBuf
void init(PoolChunk<ByteBuffer> chunk, long handle, int offset, int length, int maxLength,
PoolThreadCache cache) {
super.init(chunk, handle, offset, length, maxLength, cache);
initMemoryAddress(); // 计算内存实际起始位置
}
详细的初始化过程前面已分析,不再介绍。
八、PooledByteBuf的回收
ByteBuf.release();
public boolean release() {
return release0(1);
}
private boolean release0(int decrement) {
for (;;) {
int refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) { // CAS更新,减少引用计数
if (refCnt == decrement) {
deallocate(); // 回收分配的ByteBuf内存
return true; // 引用计数到达0,回收内存,返回true
}
return false;
}
}
}
当调用ByteBuf.release()方法减少引用计数时,如果引用计数到达0,则调用deallocate()方法回收分配的ByteBuf内存。
// PooledByteBuf
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
chunk.arena.free(chunk, handle, maxLength, cache);
recycle(); // PoolByteBuf加到对象池
}
}
deallocate()逻辑可以分为两部分:ByteBuf对应的连续内存区段加到缓存MemoryRegionCache中、PooledByteBuf加到对象池Recycler以复用。
1.连续的内存区段加到缓存MemoryRegionCache
void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk);
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else { // 走else分支
SizeClass sizeClass = sizeClass(normCapacity);
if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
freeChunk(chunk, handle, sizeClass); // 缓存队列满了
}
}
free()方法将调用PoolThreadCache.add(this, chunk, handle, normCapacity, sizeClass)
方法将该连续的内存区段加到MemoryRegionCache缓存中:
boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
if (cache == null) {
return false;
}
return cache.add(chunk, handle);
}
private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
switch (sizeClass) {
case Normal:
return cacheForNormal(area, normCapacity);
case Small:
return cacheForSmall(area, normCapacity);
case Tiny:
return cacheForTiny(area, normCapacity);
default:
throw new Error();
}
}
add()方法添加内存区段时,首先找到对应的MemoryRegionCache,然后调用MemoryRegionCache.add()方法:
public final boolean add(PoolChunk<T> chunk, long handle) {
Entry<T> entry = newEntry(chunk, handle);
boolean queued = queue.offer(entry);
if (!queued) {
// If it was not possible to cache the chunk, immediately recycle the entry
entry.recycle(); // 回收Entry对象到对象池
}
return queued;
}
private static Entry newEntry(PoolChunk<?> chunk, long handle) {
Entry entry = RECYCLER.get(); // 从对象池中获取
entry.chunk = chunk;
entry.handle = handle;
return entry;
}
MemoryRegionCache.add()方法中首先从Entry对象池中拿出Entry对象进行复用,并设置chunk、handle字段的值,然后将该Entry放入MemoryRegionCache对应的queue中缓存,以便下次申请相同大小的内存时可以直接使用。如果添加到queue失败,则Entry对象会被再次回收到其对象池,以便别的ByteBuf释放内存时再使用。
再来看free()方法:
void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk);
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else { // 走else分支
SizeClass sizeClass = sizeClass(normCapacity);
if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
freeChunk(chunk, handle, sizeClass); // 缓存队列满了
}
}
如果连续的内存区段加到缓存MemoryRegionCache失败,可能是因为MemoryRegionCache对应的queue满了,则调用freeChunk()方法,标记连续的内存区段为未使用:
// PoolArena
void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {
final boolean destroyChunk;
synchronized (this) {
switch (sizeClass) {
case Normal:
++deallocationsNormal;
break;
case Small:
++deallocationsSmall;
break;
case Tiny:
++deallocationsTiny;
break;
default:
throw new Error();
}
destroyChunk = !chunk.parent.free(chunk, handle); // 这里!!!
}
if (destroyChunk) {
destroyChunk(chunk);
}
}
// PoolChunkList
boolean free(PoolChunk<T> chunk, long handle) {
chunk.free(handle); // 标记该段内存未使用
if (chunk.usage() < minUsage) {
remove(chunk);
// Move the PoolChunk down the PoolChunkList linked-list.
return move0(chunk);
}
return true;
}
freeChunk()方法最终调用到chunk.free(handle);
:
// PoolChunk
void free(long handle) {
int memoryMapIdx = memoryMapIdx(handle);
int bitmapIdx = bitmapIdx(handle); // sub page
if (bitmapIdx != 0) { // free a subpage
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage != null && subpage.doNotDestroy;
PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
synchronized (head) {
if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) { // 标记为可用
return;
}
}
}
freeBytes += runLength(memoryMapIdx); // 可用字节数更新
setValue(memoryMapIdx, depth(memoryMapIdx)); // memoryMapIdx节点标记为可用
updateParentsFree(memoryMapIdx); // 更新父节点
}
PoolChunk.free()方法主要根据handle判断回收的内存是subpage级别的内存还是normal级别的内存,bitmapIdx != 0
表示回收的内存是subpage级别的内存,然后标记该段内存为可用。
2.PooledByteBuf加到对象池Recycler
在连续的内存区段加到缓存MemoryRegionCache之后,将PooledByteBuf加到对象池Recycler以复用。
// PooledByteBuf
recycle(); // PoolByteBuf加到对象池
private void recycle() {
recyclerHandle.recycle(this);
}
// DefaultHandle
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
stack.push(this);
}
###九、池化内存分配的总结
池化内存分配的过程中,用到了对象池(PooledByteBuf、Entry…)、缓存(MemoryRegionCache)、完全二叉树(page)、位图(PoolSubpage)等技术。
十、面试问题
1.ByteBuf内存的类别有哪些
-
Heap和Direct
表示ByteBuf底层使用的是堆内存字节数组,或者堆外内存DirectByteBuffer。
-
Pooled和Unpooled
Pooled表示池化的内存,即分配内存时可以从预先分配好的内存中取出一块内存;Unpooled表示非池化的内存,即直接调用API向操作系统申请一块内存。
-
Unsafe和非Unsafe
表示ByteBuf读写数据时是否依赖JDK Unsafe对象,Unsafe表示依赖。
一般情况下,用户在分配ByteBuf内存时只需根据Heap和Direct、Pooled和Unpooled两个角度进行内存分配,Unsafe和非Unsafe是由Netty根据运行环境自动识别的。
2.如何减少多线程内存分配之间的竞争
// PooledByteBufAllocator#newDirectBuffer
PoolThreadCache cache = threadCache.get(); // threadCache: PoolThreadLocalCache
PoolArena<ByteBuffer> directArena = cache.directArena;
通过PoolThreadLocalCache实现,PoolThreadLocalCache为FastThreadLocal实现,因此调用threadCache.get()
获取PoolThreadCache时,不同NIO线程拿到的是各自的PoolThreadCache,这样使得多线程内存分配减少了竞争。
3.不同大小的内存是如何进行分配的
根据上面分析的内容,page级别的内存是通过PoolChunk的完全二叉树根据一定算法分配的;subpage级别的内存是通过subpage对应的bitmap位图完成分配的。