并发编程面试题
(1)LongAddr的结构是怎样的?
(6)如何保证线程操作被分配的Cell元素的原子性?
下面看下Cell的构造
/** * Padded variant of AtomicLong supporting only raw accesses plus CAS. * * JVM intrinsics note: It would be possible to use a release-only * form of CAS here, if it were provided. */ @sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }
(2)当前线程应该访问Cell数组里面的哪一个Cell元素?
/** * Adds the given value. * * @param x the value to add */ public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { //——(1) boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || //——(2) (a = as[getProbe() & m]) == null || //——(3) !(uncontended = a.cas(v = a.value, v + x))) //——(4) longAccumulate(x, null, uncontended); //——(5) } }
(3)如何初始化Cell数组?
/** * Handles cases of updates involving initialization, resizing, * creating new Cells, and/or contention. See above for * explanation. This method suffers the usual non-modularity * problems of optimistic retry code, relying on rechecked sets of * reads. * * @param x the value * @param fn the update function, or null for add (this convention * avoids the need for an extra field or function in LongAdder). * @param wasUncontended false if CAS failed before call */ final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { ....省略无关代码.... //*******************************初始化Cell数组******************************************// else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table if (cells == as) { //——(1) Cell[] rs = new Cell[2]; //——(2) rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { //——(3) cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
(4)Cell数组如何扩容?
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { ...... else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; else if (n >= NCPU || cells != as) //——(1) collide = false; // At max size or stale else if (!collide) //——(2) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { //——(3) try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; //——(4) for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; //——(5) } collide = false; //——(6) continue; // Retry with expanded table } h = advanceProbe(h); //——(7) } ..... }
(5)线程访问分配的Cell元素有冲突后如何处理?
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { ....... boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { //——(1) if ((a = as[(n - 1) & h]) == null) { //——(2) if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } ..... h = advanceProbe(h); //——(3) } ..... }
代码(3)实现
/** * Pseudo-randomly advances and records the given probe value for the * given thread. * Duplicated from ThreadLocalRandom because of packaging restrictions. */ static final int advanceProbe(int probe) { probe ^= probe << 13; // xorshift probe ^= probe >>> 17; probe ^= probe << 5; UNSAFE.putInt(Thread.currentThread(), PROBE, probe); return probe; }
附录
总结
参考书籍
Java并发编程之美