ObservableRunnable

package com.dwz.concurrency2.chapter5;


public abstract class ObservableRunnable implements Runnable {
    protected final LifeCycleListener listener;

    public ObservableRunnable(final LifeCycleListener listener) {
        this.listener = listener;
    }

    protected void notifyChange(final RunnableEvent event) {
        listener.onEvent(event);
    }

    public enum RunnableState {
        RUNNING, ERROR, DONE;
    }

    public static class RunnableEvent {
        private final RunnableState state;
        private final Thread thread;
        private final Throwable cause;
        public RunnableEvent(RunnableState state, Thread thread, Throwable cause) {
            super();
            this.state = state;
            this.thread = thread;
            this.cause = cause;
        }
        public RunnableState getState() {
            return state;
        }
        public Thread getThread() {
            return thread;
        }
        public Throwable getCause() {
            return cause;
        }
    }
}

LifeCycleListener

package com.dwz.concurrency2.chapter5;

import com.dwz.concurrency2.chapter5.ObservableRunnable.RunnableEvent;

public interface LifeCycleListener {

    public void onEvent(RunnableEvent event);

}

ThreadLifeCycleObserver

package com.dwz.concurrency2.chapter5;

import java.util.List;

import com.dwz.concurrency2.chapter5.ObservableRunnable.RunnableEvent;

public class ThreadLifeCycleObserver implements LifeCycleListener {
    private final Object LOCK = new Object();

    public void concurrentQuery(List<String> ids) {
        if(ids == null || ids.isEmpty()) {
            return;
        }

        ids.stream().forEach(id -> new Thread(new ObservableRunnable(this) {
            @Override
            public void run() {
                try    {
                    notifyChange(new RunnableEvent(RunnableState.RUNNING, Thread.currentThread(), null));
                    System.out.println("query for the id " + id);
                    Thread.sleep(1000L);
                    int x = 1/0;
                    notifyChange(new RunnableEvent(RunnableState.DONE, Thread.currentThread(), null));
                } catch(Exception e) {
                    notifyChange(new RunnableEvent(RunnableState.ERROR, Thread.currentThread(), e));
                }
            }
        }, id).start());
    }

    @Override
    public void onEvent(RunnableEvent event) {
        synchronized (LOCK) {
            System.out.println("The runnable [" + event.getThread().getName() + "] data changed and state is [" + event.getState() + "]");
        }

        if(event.getCause() != null) {
            System.out.println("The runnable [" + event.getThread().getName() + "] process failed.");
            event.getCause().printStackTrace();
        }
    }
}

测试代码:

package com.dwz.concurrency2.chapter5;

import java.util.Arrays;

public class ThreadLifeCycleClient {
    public static void main(String[] args) {
        new ThreadLifeCycleObserver().concurrentQuery(Arrays.asList("1", "2"));
    }
}
12-22 09:32