Netty FastThreadLocal分析
Netty中FastThreadLocal用来代替ThreadLocal存放线程本地变量,从FastThreadLocalThread类型的线程中访问本地变量时,比使用ThreadLocal会有更好的性能。
FastThreadLocal使用InternalThreadLocalMap存放实际的数据。和ThreadLocal实现方式类似,FastThreadLocalThread中有一个InternalThreadLocalMap类型的字段threadLocalMap,这样一个线程对应一个InternalThreadLocalMap实例,该线程下所有的线程本地变量都会放在threadLocalMap中的数组indexedVariables中。下面详细分析FastThreadLocal的实现机制。
一、FastThreadLocal的创建
public class FastThreadLocalTest {
private static FastThreadLocal<Object> threadLocal = new FastThreadLocal<Object>() {
@Override
protected Object initialValue() throws Exception {
return new Object();
}
};
public static void main(String[] args) {
new Thread(() -> {
Object o = threadLocal.get();
System.out.println(o);
}).start();
new Thread(() -> {
Object o = threadLocal.get();
System.out.println(o);
}).start();
}
}
如上所示是使用FastThreadLocal的例子,通过声明静态的FastThreadLocal实例,然后在两个线程中分别通过threadLocal.get()
获取各自的线程本地变量。下面分析FastThreadLocal的创建过程。
private final int index; // 每个FastThreadLocal实例对应一个唯一的index
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}
从FastThreadLocal的构造器可以看出,FastThreadLocal初始化时得到一个index变量,其值通过InternalThreadLocalMap.nextVariableIndex()
获取:
// UnpaddedInternalThreadLocalMap
static final AtomicInteger nextIndex = new AtomicInteger();
// InternalThreadLocalMap
public static int nextVariableIndex() {
int index = nextIndex.getAndIncrement();
if (index < 0) {
nextIndex.decrementAndGet();
throw new IllegalStateException("too many thread-local indexed variables");
}
return index;
}
可知,每当FastThreadLocal初始化时都会初始化index变量,并且在JVM进程中每个FastThreadLocal实例对应一个唯一的index。因此,下面将看到,每当一个FastThreadLocalThread线程通过FastThreadLocal获取线程本地变量时,都是通过FastThreadLocal对应的index变量值,在FastThreadLocalThread的threadLocalMap中的数组indexedVariables中以该index为索引得到的。
二、FastThreadLocal.get()实现
1.获取InternalThreadLocalMap
// FastThreadLocal
public final V get() {
return get(InternalThreadLocalMap.get());
}
// InternalThreadLocalMap
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
return slowGet();
}
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}
// UnpaddedInternalThreadLocalMap
static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
可以看到,调用FastThreadLocal.get()
时,首先获取当前线程实例对应的InternalThreadLocalMap。如果当前线程是FastThreadLocalThread实例,则通过fastGet()方法获取,否则通过slowGet()方法获取。fastGet()方法中直接获取FastThreadLocalThread的threadLocalMap变量,如果为null,则初始化。如果调用的是slowGet(),说明当前线程是普通的Thread,则通过JDK ThreadLocal slowThreadLocalMap
变量获取与线程绑定的InternalThreadLocalMap。
2.通过index索引获取值
在获取到线程的InternalThreadLocalMap实例之后,调用get(InternalThreadLocalMap threadLocalMap)
方法获取线程本地变量。
public final V get(InternalThreadLocalMap threadLocalMap) {
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
return initialize(threadLocalMap);
}
get(InternalThreadLocalMap)方法中通过InternalThreadLocalMap.indexedVariable()方法获取线程本地变量:
public Object indexedVariable(int index) {
Object[] lookup = indexedVariables;
return index < lookup.length? lookup[index] : UNSET;
}
indexedVariables
是线程的InternalThreadLocalMap实例中的一个线程本地变量数组,数组索引是FastThreadLocal的index变量值。因此,通过indexedVariable()方法就可以获取到线程的InternalThreadLocalMap中某个FastThreadLocal对应的线程本地变量值。如下图所示:
如果indexedVariable()方法返回的值不为UNSET,则直接返回该值。如果indexedVariable()方法返回UNSET,表明InternalThreadLocalMap中该index位置的线程本地变量未初始化,则调用initialize()方法进行初始化。
3.初始化
public final V get(InternalThreadLocalMap threadLocalMap) {
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
return initialize(threadLocalMap);
}
private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
v = initialValue(); // 获取初始值
} catch (Exception e) {
PlatformDependent.throwException(e);
}
threadLocalMap.setIndexedVariable(index, v);
addToVariablesToRemove(threadLocalMap, this);
return v;
}
protected V initialValue() throws Exception {
return null;
}
initialize()方法中调用initialValue()方法获取初始化值,该方法可以在实例化FastThreadLocal时重写。在得到初始值v之后,将该值绑定到线程InternalThreadLocalMap的indexedVariables
数组中。
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
expandIndexedVariableTableAndSet(index, value); // 扩容
return true;
}
}
// 扩容
private void expandIndexedVariableTableAndSet(int index, Object value) {
Object[] oldArray = indexedVariables;
final int oldCapacity = oldArray.length;
// 找到大于等于index的2的幂次
int newCapacity = index;
newCapacity |= newCapacity >>> 1;
newCapacity |= newCapacity >>> 2;
newCapacity |= newCapacity >>> 4;
newCapacity |= newCapacity >>> 8;
newCapacity |= newCapacity >>> 16;
newCapacity ++;
Object[] newArray = Arrays.copyOf(oldArray, newCapacity); // copy
Arrays.fill(newArray, oldCapacity, newArray.length, UNSET); // 填充其余部分元素为UNSET
newArray[index] = value;
indexedVariables = newArray;
}
通过setIndexedVariable()方法可以看出,如果该数组容量不够,还会进行扩容操作。
在initialize()方法的最后,调用了addToVariablesToRemove(threadLocalMap, this);
代码:
private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
Set<FastThreadLocal<?>> variablesToRemove;
if (v == InternalThreadLocalMap.UNSET || v == null) {
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}
variablesToRemove.add(variable);
}
addToVariablesToRemove()方法中,通过variablesToRemoveIndex索引获取当前线程InternalThreadLocalMap线程本地变量数组中对应位置的值,该值是一个Set<FastThreadLocal>
变量,并将当前FastThreadLocal实例放入该Set中。这个Set<FastThreadLocal>
变量主要是用来FastThreadLocalThread在清理线程本地变量缓存的时候用的,后面会分析。
到此,FastThreadLocal.get()实现分析完毕。
三、FastThreadLocal.set()实现
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) {
set(InternalThreadLocalMap.get(), value);
} else {
remove();
}
}
1.获取InternalThreadLocalMap
假设设置的值不是UNSET,则先获取当前线程的InternalThreadLocalMap实例,上面已经分析过,这里不再详述。
InternalThreadLocalMap.get()
2.通过index索引设置值
public final void set(InternalThreadLocalMap threadLocalMap, V value) {
if (value != InternalThreadLocalMap.UNSET) {
if (threadLocalMap.setIndexedVariable(index, value)) { // 返回true,表示新创建了一个线程本地变量
addToVariablesToRemove(threadLocalMap, this);
}
} else {
remove(threadLocalMap);
}
}
首先调用threadLocalMap.setIndexedVariable(index, value)
设置线程本地变量:
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
expandIndexedVariableTableAndSet(index, value); // 扩容
return true;
}
}
如果该方法返回true,表示设置线程本地变量成功,并且之前未设置过该index位置处的值。然后再调用addToVariablesToRemove(threadLocalMap, this)
代码,该方法上面已经分析过,这里不再说明。
3.remove对象
如果设置线程本地变量时传入的是UNSET,则调用remove()方法删除线程本地变量。
public final void remove() {
// InternalThreadLocalMap.getIfSet(): 可能返回null
remove(InternalThreadLocalMap.getIfSet());
}
public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}
Object v = threadLocalMap.removeIndexedVariable(index); // 删除线程本地变量
removeFromVariablesToRemove(threadLocalMap, this);
if (v != InternalThreadLocalMap.UNSET) {
try {
onRemoval((V) v);
} catch (Exception e) {
PlatformDependent.throwException(e);
}
}
}
remove(InternalThreadLocalMap)方法中调用threadLocalMap.removeIndexedVariable(index)
删除线程本地变量:
// InternalThreadLocalMap
public Object removeIndexedVariable(int index) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object v = lookup[index];
lookup[index] = UNSET;
return v;
} else {
return UNSET;
}
}
如果removeIndexedVariable()方法返回的值不是UNSET,表示删除的线程本地变量之前已设置了有效值,则调用removeFromVariablesToRemove(threadLocalMap, this)
:
private static void removeFromVariablesToRemove(
InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v == InternalThreadLocalMap.UNSET || v == null) {
return;
}
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
variablesToRemove.remove(variable);
}
在当前线程的InternalThreadLocalMap的indexedVariables
数组中,取出variablesToRemoveIndex索引对应的Set<FastThreadLocal>
集合,并删除当前的FastThreadLocal variable引用。这个Set<FastThreadLocal>
变量主要是用来FastThreadLocalThread在清理线程本地变量缓存的时候用的,后面会分析。
在删除线程本地变量之后,将调用onRemove()方法:
if (v != InternalThreadLocalMap.UNSET) {
try {
onRemoval((V) v);
} catch (Exception e) {
PlatformDependent.throwException(e);
}
}
protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception { }
用户可以重写此方法,在该方法调用时做一些资源清理工作。
四、FastThreadLocal.removeAll()实现
在通过DefaultThreadFactory创建FastThreadLocalThread线程时,Runnbale任务被包装为了DefaultRunnableDecorator实例。
// DefaultThreadFactory
public Thread newThread(Runnable r) {
// name: nioEventLoopGroup-poolId-nextId
// 创建的线程实体: FastThreadLocalThread
Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon()) {
if (!daemon) {
t.setDaemon(false);
}
} else {
if (daemon) {
t.setDaemon(true);
}
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
private static final class DefaultRunnableDecorator implements Runnable {
private final Runnable r;
DefaultRunnableDecorator(Runnable r) {
this.r = r;
}
@Override
public void run() {
try {
r.run();
} finally {
FastThreadLocal.removeAll(); // 移除NIO线程的所有本地变量副本
}
}
}
可以看到,在Runnable任务运行结束后,将调用FastThreadLocal.removeAll()
方法,移除FastThreadLocalThread线程的所有线程本地变量副本。
public static void removeAll() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}
try {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v != null && v != InternalThreadLocalMap.UNSET) {
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
FastThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
tlv.remove(threadLocalMap); // 删除线程threadLocalMap上的所有缓存
}
}
} finally {
InternalThreadLocalMap.remove();
}
}
从FastThreadLocal.removeAll()方法可以看出,这个时候variablesToRemoveIndex索引起到了作用。通过variablesToRemoveIndex拿到了当前线程InternalThreadLocalMap中的indexedVariables
数组中该索引处的Set<FastThreadLocal>
集合。这个集合中的FastThreadLocal引用,与当前线程的InternalThreadLocalMap中的各个本地变量相对应,因此可以通过这些FastThreadLocal引用,删除线程InternalThreadLocalMap中的所有线程本地变量。最后再将线程的threadLocalMap变量设置为null:
InternalThreadLocalMap.remove();
public static void remove() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
((FastThreadLocalThread) thread).setThreadLocalMap(null);
} else {
slowThreadLocalMap.remove();
}
}
至此,有关FastThreadLocal的内容分析完毕。