前言
FutureTask 是 Java 的并发工具类,它继承了 Future 和 Runnable 的特性。它允许在一个单独的线程中执行一个任务,并可以在另一个线程中通过FutureTask.get()获取该任务的结果。类图如下:
一、FutureTask 的底层实现
FutureTask 的底层实现主要包括以下几个部分:
任务执行:FutureTask 内部维护了一个 Runnable 任务,当调用 FutureTask 的 run 方法时,这个任务会被执行。这个任务是在 FutureTask 实例被创建时传入的。
结果存储:任务执行的结果被存储在内部的一个 volatile 变量中。由于 volatile 保证了可见性,因此其他线程可以立即看到最新的结果。
状态管理:FutureTask 维护了一个内部状态,包括 NOT_RUNNING、RUNNING 和 DONE。这个状态用于跟踪任务是否正在运行或者已经完成。
阻塞等待:当其他线程调用 FutureTask 的 get 方法时,如果任务还没有完成,这个线程会被阻塞,直到任务完成并且结果可用。
异常处理:如果任务在执行过程中抛出了异常,那么这个异常会被捕获并存储在 FutureTask 中。调用 get 方法时,如果任务完成并且结果异常,那么这个异常会被重新抛出。
二、FutureTask核心方法run
代码如下:
public void run() {
// state != NEW说明该任务已经执行过或被cancel,非NEW状态,就不处理了
// compareAndSwapObject CAS原子比较this对象指定位置字段,若是期望值null,则更新该字段为当前线程,失败就返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行任务逻辑
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
compareAndSwapObject将当前线程赋值给成员变量 Thread runner;
runnerOffset 为FutureTask.class对象的runner的偏移量
compareAndSwapObject 方法其实比较的就是两个 Java Object 的地址,如果相等则将新的地址(Java Object)赋给该字段。
二、相关c++实现源码
C++代码如下:
virtual jboolean compareAndSwapObject(::java::lang::Object *, jlong, ::java::lang::Object *, ::java::lang::Object *);
// natUnsafe.cc
static inline bool
compareAndSwap (volatile jobject *addr, jobject old, jobject new_val)
{
jboolean result = false;
spinlock lock;
// 如果字段的地址与期望的地址相等则将字段的地址更新
if ((result = (*addr == old)))
*addr = new_val;
return result;
}
// natUnsafe.cc
jboolean
sun::misc::Unsafe::compareAndSwapObject (jobject obj, jlong offset,
jobject expect, jobject update)
{
// 获取字段地址并转换为字符串
jobject *addr = (jobject*)((char *) obj + offset);
// 调用 compareAndSwap 方法进行比较
return compareAndSwap (addr, expect, update);
}
底层的实现总结来说:
通过我们传入的字段在对象中的偏移量来获取到字段的地址(对象首地址 + 字段在对象中的偏移量);然后调用 CompareAndSwap 方法比较字段的地址是否与我们期望的地址相等,如果相等则使用我们传入的新地址更新字段的地址;