前言

FutureTask 是 Java 的并发工具类,它继承了 Future 和 Runnable 的特性。它允许在一个单独的线程中执行一个任务,并可以在另一个线程中通过FutureTask.get()获取该任务的结果。类图如下:
FutureTask底层实现分析-LMLPHP


一、FutureTask 的底层实现

FutureTask 的底层实现主要包括以下几个部分:
任务执行:FutureTask 内部维护了一个 Runnable 任务,当调用 FutureTask 的 run 方法时,这个任务会被执行。这个任务是在 FutureTask 实例被创建时传入的。
结果存储:任务执行的结果被存储在内部的一个 volatile 变量中。由于 volatile 保证了可见性,因此其他线程可以立即看到最新的结果。
状态管理:FutureTask 维护了一个内部状态,包括 NOT_RUNNING、RUNNING 和 DONE。这个状态用于跟踪任务是否正在运行或者已经完成。
阻塞等待:当其他线程调用 FutureTask 的 get 方法时,如果任务还没有完成,这个线程会被阻塞,直到任务完成并且结果可用。
异常处理:如果任务在执行过程中抛出了异常,那么这个异常会被捕获并存储在 FutureTask 中。调用 get 方法时,如果任务完成并且结果异常,那么这个异常会被重新抛出。

二、FutureTask核心方法run

代码如下:

public void run() {
// state != NEW说明该任务已经执行过或被cancel,非NEW状态,就不处理了
// compareAndSwapObject CAS原子比较this对象指定位置字段,若是期望值null,则更新该字段为当前线程,失败就返回
if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
		// 执行任务逻辑
            result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
compareAndSwapObject将当前线程赋值给成员变量 Thread runner;
runnerOffset 为FutureTask.class对象的runner的偏移量
compareAndSwapObject 方法其实比较的就是两个 Java Object 的地址,如果相等则将新的地址(Java Object)赋给该字段。

二、相关c++实现源码

C++代码如下:

virtual jboolean compareAndSwapObject(::java::lang::Object *, jlong, ::java::lang::Object *, ::java::lang::Object *);
// natUnsafe.cc
static inline bool
compareAndSwap (volatile jobject *addr, jobject old, jobject new_val)
{
	jboolean result = false;
	spinlock lock;
  
  	// 如果字段的地址与期望的地址相等则将字段的地址更新
	if ((result = (*addr == old)))
    	*addr = new_val;
	return result;
}

// natUnsafe.cc
jboolean
sun::misc::Unsafe::compareAndSwapObject (jobject obj, jlong offset,
                     jobject expect, jobject update)
{
	// 获取字段地址并转换为字符串
	jobject *addr = (jobject*)((char *) obj + offset);
	// 调用 compareAndSwap 方法进行比较
    return compareAndSwap (addr, expect, update);
}

底层的实现总结来说:
通过我们传入的字段在对象中的偏移量来获取到字段的地址(对象首地址 + 字段在对象中的偏移量);然后调用 CompareAndSwap 方法比较字段的地址是否与我们期望的地址相等,如果相等则使用我们传入的新地址更新字段的地址;


01-31 13:26