我面临着一个奇怪的问题,无法解决该问题。
我正在尝试实现服务器推送通知。通知应广播给所有已连接的客户端(最多10个,这是一个Intranet Web应用程序),而不是ANDROID 4.2浏览器。

通知运行良好,所有东西都被推送,但是大气层会创建大量的tomcat线程,最终在请求大约3-4k页面后导致线程泄漏。
Tomcat 7.0.40(如果配置了NIO连接器),具有150个最大线程和60000个超时

web.xml:

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://java.sun.com/xml/ns/javaee"
    xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
    xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
    id="WebApp_ID" version="3.0">

    <context-param>
        <param-name>contextClass</param-name>
        <param-value>org.springframework.web.context.support.XmlWebApplicationContext</param-value>
    </context-param>
    <listener>
        <listener-class>org.springframework.security.web.session.HttpSessionEventPublisher</listener-class>
    </listener>
    <listener>
        <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
    </listener>
    <listener>
        <listener-class>org.springframework.web.context.request.RequestContextListener</listener-class>
    </listener>

    <!-- Hibernate -->
    <filter>
        <filter-name>hibernateFilter</filter-name>
        <filter-class>org.springframework.orm.hibernate4.support.OpenSessionInViewFilter</filter-class>
        <async-supported>true</async-supported>
        <init-param>
            <param-name>singleSession</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>hibernateFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <!-- SECURITY -->
    <filter>
        <filter-name>springSecurityFilterChain</filter-name>
        <filter-class>org.springframework.web.filter.DelegatingFilterProxy</filter-class>
        <async-supported>true</async-supported>
    </filter>
    <filter-mapping>
        <filter-name>springSecurityFilterChain</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <!-- char encoding -->
    <filter>
        <filter-name>encodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <async-supported>true</async-supported>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
            <param-name>forceEncoding</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>encodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <!-- Declare a DispatcherServlet as usual -->

    <servlet>
        <servlet-name>dispatcher</servlet-name>
        <servlet-class>org.atmosphere.cpr.MeteorServlet</servlet-class>
        <init-param>
            <param-name>org.atmosphere.servlet</param-name>
            <param-value>org.springframework.web.servlet.DispatcherServlet</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.asyncSupport</param-name>
            <param-value>org.atmosphere.container.Tomcat7BIOSupportWithWebSocket</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.broadcasterClass</param-name>
            <param-value>org.atmosphere.cpr.DefaultBroadcaster</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.broadcasterCacheClass</param-name>
            <param-value>org.atmosphere.cache.UUIDBroadcasterCache</param-value>
       </init-param>
<!--       <init-param> -->
<!--            <param-name>org.atmosphere.cpr.sessionSupport</param-name> -->
<!--            <param-value>true</param-value> -->
<!--        </init-param> -->
        <init-param>
            <param-name>org.atmosphere.resumeOnBroadcast</param-name>
            <param-value>true</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.broadcaster.shareableThreadPool</param-name>
            <param-value>true</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.broadcaster.maxProcessingThreads</param-name>
            <param-value>20</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.broadcaster.maxAsyncWriteThreads</param-name>
            <param-value>20</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.useNative</param-name>
            <param-value>true</param-value>
        </init-param>
<!--         <init-param> -->
<!--            <param-name>org.atmosphere.useBlocking</param-name> -->
<!--            <param-value>false</param-value> -->
<!--        </init-param> -->
        <init-param>
            <param-name>org.atmosphere.useStream</param-name>
            <param-value>true</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.AtmosphereInterceptor</param-name>
            <param-value>org.atmosphere.interceptor.HeartbeatInterceptor</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.interceptor.HeartbeatInterceptor.heartbeatFrequencyInSeconds</param-name>
            <param-value>60</param-value>
        </init-param>
        <init-param>
          <param-name>org.atmosphere.cpr.broadcasterLifeCyclePolicy</param-name>
          <param-value>EMPTY_DESTROY</param-value>
      </init-param>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>/WEB-INF/dispatcher-servlet.xml</param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
        <async-supported>true</async-supported>
    </servlet>

    <servlet-mapping>
        <servlet-name>dispatcher</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>

    <!-- Default page to serve -->
    <display-name>cielo-cp</display-name>
    <welcome-file-list>
        <welcome-file>/</welcome-file>
    </welcome-file-list>

    <session-config>
      <session-timeout>1440</session-timeout>
    </session-config>

    <error-page>
        <error-code>404</error-code>
        <location>/404</location>
    </error-page>
    <error-page>
        <error-code>403</error-code>
        <location>/accessDenied</location>
    </error-page>
</web-app>


Spring MVC控制器:

@Controller
public class PushController extends AbstractController{

    @ResponseBody
    @RequestMapping(value="/push")
    public void pushAsync(AtmosphereResource atmosphereResource){
        AtmosphereUtils.suspend(atmosphereResource);
    }

}


BroadcasterService:

@Service
public class BroadcasterService {

    @Autowired
    private PushNotificationService service;

    private ObjectMapper mapper = new ObjectMapper();


    public void receiveMessage(@Observes PushEvent e) {
        List<PushableMessage> messages = service.pollMessages(e.getType());
        try {
            Broadcaster b = AtmosphereUtils.lookupBroadcaster(false);
            try {
                b.broadcast(mapper.writeValueAsString(messages));

            } catch(Exception ex){
                logger.error(ex.getMessage(), ex);
            }
        } catch (Throwable t){
            logger.debug(t.getMessage(), t);
        }
    }

}




@Service
public class PushNotificationService {

    private static Logger logger = Logger.getLogger(PushNotificationService.class);

    @Autowired
    private Event<PushEvent> event;


    public List<PushableMessage> pollMessages(TipologiaPush key) {
            ....
        return result;
    }


    public void pushMessage(PushableMessage message){
        ...
        queue.put(message);
        event.fire(message.getEvent());
    }


}

和实用程序:

public final class AtmosphereUtils {

    public static AtmosphereResource getAtmosphereResource(HttpServletRequest request) {
        return getMeteor(request).getAtmosphereResource();
    }

    public static Meteor getMeteor(HttpServletRequest request) {
        return Meteor.build(request);
    }

    public static void suspend(final AtmosphereResource resource) {

        final CountDownLatch countDownLatch = new CountDownLatch(1);
        resource.addEventListener(new AtmosphereResourceEventListenerAdapter() {
            @Override
            public void onSuspend(AtmosphereResourceEvent event) {
                countDownLatch.countDown();
                logger.debug("Suspending Client..." + resource.uuid() + " with transport " + resource.transport());
                resource.removeEventListener(this);
            }

            @Override
            public void onDisconnect(AtmosphereResourceEvent event) {
                logger.debug("Disconnecting Client..." + resource.uuid());
                super.onDisconnect(event);
            }
        });

        if (AtmosphereResource.TRANSPORT.LONG_POLLING.equals(resource.transport())) {
            resource.resumeOnBroadcast(true).suspend();
        } else {
            resource.suspend();
        }

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            logger.error("Interrupted while trying to suspend resource {}", resource);
        }

        AtmosphereUtils.lookupBroadcaster(true).addAtmosphereResource(resource);
    }

    public static Broadcaster lookupBroadcaster(boolean create) {
        Broadcaster b = BroadcasterFactory.getDefault().lookup("/*", create);
        return b;
    }

}


其他服务调用:notificationService.pushMessage(....);
JavaScript部分是:

var socket = $.atmosphere;

function handleAtmosphere(url, handleResult) {

    var request = new $.atmosphere.AtmosphereRequest();
    request.transport = "websocket"; // "streaming is even worse";
    request.url = url;
    request.contentType = "application/json";
    request.fallbackTransport = "long-polling"; //for android 4.2, default browser don't support websocket

    request.onMessage = function(response){
        ....
    };


    var subSocket = socket.subscribe(request);

}


话虽这么说,我已经尝试了许多配置,但是仍然有一些线程保持活动状态。
我正在使用Spring 3.2.5,SpringSec 3.2 RC2和Atmosphere 2.0.4

最佳答案

检查一下:https://github.com/Atmosphere/atmosphere/issues/717

该线程需要注意的几件事:
使用共享线程池,您必须自己关闭关联的执行程序。那是一个框架限制。但是为什么BroadcastConfig会停止而不是AsyncWrite需要调查



运行中的线程与ExecutorServices相关联。由ExecutorService决定何时杀死这些线程,而不是Atmosphere。由于您正在使用CachedThreadPool,因此这是可以预期的。我已经使用Tomcat 7.0.27,共享线程池和NIO连接器进行了测试。如果使用BIO连接器,则EMPTY_DESTROY将无法工作,因为将不会检测到断开连接。

因此,我建议您使用有限的线程池,以免导致线程运行过多。

看看这就是您要面对的。

10-06 16:11