本文介绍了Java并发:并发添加&清除列表条目的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑以下方法:

public void add(final List<ReportingSTG> message) {
        if(stopRequested.get()) {
            synchronized (this) {
                if(stopRequested.get()) {
                    retryQueue.put(message);
                }
            }
        }
        messages.add(message);
        if(messages.size() >= batchSize && waitingThreads.get() == 0) {
            synchronized (this) {
                if(messages.size() >= batchSize && waitingThreads.get() == 0) {
                    final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
                    messages.clear();
                    if(processors.size()>=numOfProcessors) {
                        waitingThreads.incrementAndGet();
                        waitForProcessor();
                        waitingThreads.decrementAndGet();
                    }
                    startProcessor(clone);
                }
            }

        }
    }

特别是这两行:

 1:   final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
 2:   messages.clear();

如果线程A进入了synchronized block并获得了当前对象的锁,这是否意味着这个对象的实例属性的状态不能被synchronized块外的其他线程改变(而线程A在synchronized块中)?

If thread A enters synchronized block and acquires a lock on current object, does this means that state of instance properties of this object can't be changed by other threads outside synchronized block (while thread A is in synchronized block)?

例如,线程A执行了第1行->线程B进入方法并添加了新的列表条目(messages.add(message))->线程a执行了第2行->线程B添加的条目被删除了(一起与其他条目).这种情况可能吗?否则线程 B 将等待线程 A 释放锁,然后才会删除 List 条目

For example, thread A executed line 1 -> thread B entered the method and added new list entry (messages.add(message)) -> Thread a executed line 2 -> entry what was added by thread B removed (together with other entries). Is this scenario possible? Or thread B will wait while lock is released by thread A and only then will remove the List entry

messages 是一个非静态的 synchronizedList

messages is a non-static synchronizedList

UPD:更新方法,可能的解决方案:

UPD: updated method, possible solution:

public void add(final List<ReportingSTG> message) {
    if(stopRequested.get()) {
        synchronized (this) {
            if(stopRequested.get()) {
                retryQueue.put(message);
            }
        }
    }
    while (addLock.get()){
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {}
    }

    messages.add(message);

    if(messages.size() >= batchSize && waitingThreads.get() == 0) {
        synchronized (this) {
            if(messages.size() >= batchSize && waitingThreads.get() == 0) {

                addLock.set(true);
                final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
                messages.clear();
                addLock.set(false);

                if(processors.size()>=numOfProcessors) {
                    waitingThreads.incrementAndGet();
                    waitForProcessor();
                    waitingThreads.decrementAndGet();
                }
                startProcessor(clone);
            }
        }

    }
}

addLock - AtomicBoolean,默认为 false

addLock - AtomicBoolean, false by default

推荐答案

所描述的场景是可能的.即您可能会丢失消息.

The described scenario is possible. i.e. you may loose messages.

synchronized 关键字确保您永远不会有 2 个线程同时运行 synchronized 部分.它不会阻止另一个线程修改在 synchronized 块内操作的对象(只要其他线程可以访问它们).

The synchronized keyword ensure that you never have 2 threads running the synchronized section simultaneously. It doesn't prevent modification by another thread of objects that are manipulated inside the synchronized block (as soon as this other thread have access to them).

这是一个可能的解决方案,因为它同步添加和清除.

This is a possible solution since it synchronize the add and the clear.

private Object lock = new Object();

public void add(final List<ReportingSTG> message) {
    if(stopRequested.get()) {
        synchronized (this) {
            if(stopRequested.get()) {
                retryQueue.put(message);
            }
        }
    }
    synchronized(lock){
        messages.add(message);
        if(messages.size() >= batchSize && waitingThreads.get() == 0) {
                final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
                messages.clear();
                if(processors.size()>=numOfProcessors) {
                    waitingThreads.incrementAndGet();
                    waitForProcessor();
                    waitingThreads.decrementAndGet();
                }
                startProcessor(clone);
            }
    }
}

这篇关于Java并发:并发添加&amp;清除列表条目的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 18:12