本文介绍了如何在请求被写入代理Netty服务器的outboundChannel中的同一个处理程序中获取响应byteBuf的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在实现netty代理服务器,如下所示:出现了http请求,

I am implementing netty proxy server as follows:A http request comes in,

  • 如果本地缓存中有数据,请写入通道并刷新
  • 如果没有,请从远程服务器获取数据,将其添加到缓存并刷新

我在与写入客户端相同的处理程序中,很难从响应中提取byteBuf.

I am having difficulty extracting the byteBuf from response in samehandler as where I write to client.

在下面的示例中,如果您看到HexDumpProxyFrontendHandlerchannelRead方法,您将看到我如何从缓存中获取并进行写入.我在遇到困难的地方用这种方法添加了评论

In the example below, if you see channelRead method of HexDumpProxyFrontendHandler, you will see how I fetch from cache and write. I have added comments in that method below where I am facing difficulty

此代码首尾相连.因此可以在本地复制和测试.

This code works end to end. so it can be copied and tested locally.

我可以在HexDumpProxyBackendhandler#channelRead中看到FullHttpResponse对象.但是在此方法中,我没有引用缓存,也没有要添加到缓存中的ID.

I can see FullHttpResponse object in HexDumpProxyBackendhandler#channelRead. but inside this method, I have no reference to cache nor the id I want to add inside cache.

我认为可以通过两种方式解决此问题,尽管我不清楚如何解决.

There are two ways I think this can be solved, I am not clear on how this can be done though.

1)在HexdumpProxyBackendHandler中获取缓存引用和ID,然后变得很容易.但是hexDumpBackendhanderHexDumpFrontendHandlerchannelActive中实例化了,这时我还没有解析传入的请求

1) either get cache reference and id in HexdumpProxyBackendHandler, then it becomes easy. but hexDumpBackendhander is instantiated in channelActive of HexDumpFrontendHandler at which point I have not parsed my incoming request

2)获取在HexdumpFrontendHandler#dchannelRead中提取的响应bytebuf,在这种情况下,它只是缓存插入.

2) get the response bytebuf extracted in HexdumpFrontendHandler#dchannelRead, in which case it is just cache insertion.

HexDumpProxy.java

public final class HexDumpProxy {

static final int LOCAL_PORT = Integer.parseInt(System.getProperty("localPort", "8082"));
static final String REMOTE_HOST = System.getProperty("remoteHost", "api.icndb.com");
static final int REMOTE_PORT = Integer.parseInt(System.getProperty("remotePort", "80"));
static Map<Long,String> localCache = new HashMap<>();
public static void main(String[] args) throws Exception {
    System.err.println("Proxying *:" + LOCAL_PORT + " to " + REMOTE_HOST + ':' + REMOTE_PORT + " ...");
    localCache.put(123L, "profile1");
    localCache.put(234L, "profile2");
    // Configure the bootstrap.
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new HexDumpProxyInitializer(localCache, REMOTE_HOST, REMOTE_PORT))
         .childOption(ChannelOption.AUTO_READ, false)
         .bind(LOCAL_PORT).sync().channel().closeFuture().sync();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

}

HexDumpProxyInitializer.java

public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> {

private final String remoteHost;
private final int remotePort;
private Map<Long, String> cache;

public HexDumpProxyInitializer(Map<Long,String> cache, String remoteHost, int remotePort) {
    this.remoteHost = remoteHost;
    this.remotePort = remotePort;
    this.cache=cache;
}

@Override
public void initChannel(SocketChannel ch) {
    ch.pipeline().addLast(
            new LoggingHandler(LogLevel.INFO),
            new HttpServerCodec(),
            new HttpObjectAggregator(8*1024, true),
            new HexDumpProxyFrontendHandler(cache, remoteHost, remotePort));
}

}

HexDumpProxyFrontendHandler.java

 public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
private final String remoteHost;
private final int remotePort;
private Channel outboundChannel;
private Map<Long, String> cache;

public HexDumpProxyFrontendHandler(Map<Long, String> cache, String remoteHost, int remotePort) {
    this.remoteHost = remoteHost;
    this.remotePort = remotePort;
    this.cache = cache;
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
    final Channel inboundChannel = ctx.channel();

    // Start the connection attempt.
    Bootstrap b = new Bootstrap();
    b.group(inboundChannel.eventLoop())
     .channel(ctx.channel().getClass())
     .handler((new ChannelInitializer() {
         protected void initChannel(Channel ch) {
             ChannelPipeline var2 = ch.pipeline();
             var2.addLast((new HttpClientCodec()));
             var2.addLast(new HttpObjectAggregator(8192, true));
             var2.addLast(new HexDumpProxyBackendHandler(inboundChannel));
         }
     }))
     .option(ChannelOption.AUTO_READ, false);
    ChannelFuture f = b.connect(remoteHost, remotePort);
    outboundChannel = f.channel();
    f.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                // connection complete start to read first data
                inboundChannel.read();
            } else {
                // Close the connection if the connection attempt has failed.
                inboundChannel.close();
            }
        }
    });
}

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof HttpRequest) {
        System.out.println("msg is instanceof httpRequest");
        HttpRequest req = (HttpRequest)msg;
        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
        String userId = queryStringDecoder.parameters().get("id").get(0);
        Long id = Long.valueOf(userId);
        if (cache.containsKey(id)){
            StringBuilder buf = new StringBuilder();
            buf.append(cache.get(id));
            writeResponse(req, ctx, buf);
            closeOnFlush(ctx.channel());
            return;
        }
    }
    if (outboundChannel.isActive()) {
        outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    // was able to flush out data, start to read the next chunk
                    ctx.channel().read();
                } else {
                    future.channel().close();
                }
            }
        });
    }

    //get response back from HexDumpProxyBackendHander and write to cache
    //basically I need to do cache.put(id, parse(response));
    //how to get response buf from inboundChannel here is the question I am trying to solve
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
    if (outboundChannel != null) {
        closeOnFlush(outboundChannel);
    }

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    closeOnFlush(ctx.channel());
}

/**
 * Closes the specified channel after all queued write requests are flushed.
 */
static void closeOnFlush(Channel ch) {
    if (ch.isActive()) {
        ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }
}

//borrowed from HttpSnoopServerHandler.java in snoop example
private boolean writeResponse(HttpRequest request, ChannelHandlerContext ctx, StringBuilder buf) {
    // Decide whether to close the connection or not.
    boolean keepAlive = HttpUtil.isKeepAlive(request);
    // Build the response object.
    FullHttpResponse response = new DefaultFullHttpResponse(
            HTTP_1_1, request.decoderResult().isSuccess()? OK : BAD_REQUEST,
            Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8));

    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");

    if (keepAlive) {
        // Add 'Content-Length' header only for a keep-alive connection.
        response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        // Add keep alive header as per:
        // - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
        response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
    }

    // Encode the cookie.
    String cookieString = request.headers().get(HttpHeaderNames.COOKIE);
    if (cookieString != null) {
        Set<Cookie> cookies = ServerCookieDecoder.STRICT.decode(cookieString);
        if (!cookies.isEmpty()) {
            // Reset the cookies if necessary.
            for (io.netty.handler.codec.http.cookie.Cookie cookie: cookies) {
                response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode(cookie));
            }
        }
    } else {
        // Browser sent no cookie.  Add some.
        response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode("key1", "value1"));
        response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode("key2", "value2"));
    }

    // Write the response.
    ctx.write(response);

    return keepAlive;
}

}

HexDumpProxyBackendHandler.java

public class HexDumpProxyBackendHandler extends ChannelInboundHandlerAdapter {

private final Channel inboundChannel;

public HexDumpProxyBackendHandler(Channel inboundChannel) {
    this.inboundChannel = inboundChannel;
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ctx.read();
}

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof FullHttpResponse) {
        System.out.println("this is fullHttpResponse");
    }
    inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                ctx.channel().read();
            } else {
                future.channel().close();
            }
        }
    });
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
    HexDumpProxyFrontendHandler.closeOnFlush(inboundChannel);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    HexDumpProxyFrontendHandler.closeOnFlush(ctx.channel());
}

}

PS:我从 netty-example 项目并对其进行了自定义

P.S: I have taken most of the code from netty-example project and customized it

编辑

根据Ferrygig的建议,我如下更改了FrontEndChannelHander#channelRead.我删除了channelActive并实现了write方法

Per Ferrygig suggestions, I changed the FrontEndChannelHander#channelRead as follows. I have removed channelActive and have write method implemented

@Overridepublic void channelRead(final ChannelHandlerContext ctx,Object msg){

@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) {

if (msg instanceof HttpRequest) {
    System.out.println("msg is instanceof httpRequest");
    HttpRequest req = (HttpRequest)msg;
    QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
    String userId = queryStringDecoder.parameters().get("id").get(0);
    id = Long.valueOf(userId);
    if (cache.containsKey(id)){
        StringBuilder buf = new StringBuilder();
        buf.append(cache.get(id));
        writeResponse(req, ctx, buf);
        closeOnFlush(ctx.channel());
        return;
    }

    final Channel inboundChannel = ctx.channel();

    //copied from channelActive method

    // Start the connection attempt.
    Bootstrap b = new Bootstrap();
    b.group(inboundChannel.eventLoop())
            .channel(ctx.channel().getClass())
            .handler((new ChannelInitializer() {
                protected void initChannel(Channel ch) {
                    ChannelPipeline var2 = ch.pipeline();
                    var2.addLast((new HttpClientCodec()));
                    var2.addLast(new HttpObjectAggregator(8192, true));
                    var2.addLast(new HexDumpProxyBackendHandler(inboundChannel, cache));
                }
            }));
            //.option(ChannelOption.AUTO_READ, false);
    ChannelFuture f = b.connect(remoteHost, remotePort);
    outboundChannel = f.channel();
    f.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                // connection complete start to read first data
                inboundChannel.read();
            } else {
                // Close the connection if the connection attempt has failed.
                inboundChannel.close();
            }
        }
    });
}
if (outboundChannel.isActive()) {
    outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                // was able to flush out data, start to read the next chunk
                ctx.channel().read();
            } else {
                future.channel().close();
            }
        }
    });
}

推荐答案

有多种方法可以解决此问题,并且最终的最终目标也有所不同.

There are multiple ways to approach this problem, and the way to go differs for your ultimate end goal.

目前,您正在使用1个连接入站的拓扑结构是1个连接出站的拓扑,这使系统设计稍微容易一些,因为您不必担心将多个请求同步到相同的出站流.

At the moment, you are using a topology of 1 connection inbound is 1 connection outbound, this makes the system design slightly easier as you don't have to worry about syncing multiple requests to same outbound stream.

此刻,您的前端处理程序扩展了ChannelInboundHandlerAdapter,如果我们使它扩展 ChannelDuplexHandler ,我们还可以处理应用程序中的数据包".

At the moment, you frontend handler extends ChannelInboundHandlerAdapter, this only intercepts "packets" coming into your application, if we make it extends ChannelDuplexHandler, we can also handle "packets" going out of the applications.

要采用此方法,我们需要更新HexDumpProxyFrontendHandler类以扩展 ChannelDuplexHandler (暂时将其称为CDH).

To approach this path, we need to update the HexDumpProxyFrontendHandler class to extend ChannelDuplexHandler (Lets call it CDH for now).

此过程的下一步是覆盖 write 方法来自 CDH ,因此我们可以在后端将响应发送回给我们时进行拦截.

The next step in the process, is to override the write method coming from the CDH, so we can intercept when the backend sends us the response back.

创建write方法后,我们需要通过调用put方法来更新(非线程安全的)映射.

After we created the write method, we need to update our (non-threadsafe) map by calling the put method.

public class HexDumpProxyFrontendHandler extends ChannelDuplexHandler {
    Long lastId;
    // ...
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpRequest) {
            System.out.println("msg is instanceof httpRequest");
            HttpRequest req = (HttpRequest)msg;
            QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
            String userId = queryStringDecoder.parameters().get("id").get(0);
            Long id = Long.valueOf(userId);
            lastId = id; // Store ID of last request
            // ...
        }
        // ...
    }
    // ...
    public void write(
        ChannelHandlerContext ctx,
        java.lang.Object msg,
        ChannelPromise promise
    ) throws java.lang.Exception {

        if (msg instanceof FullHttpResponse) {
            System.out.println("this is fullHttpResponse");
            FullHttpResponse full = (FullHttpResponse)msg;
            cache.put(lastId, parse(full)); // TODO: Include a system here to convert the request to a string
        }
        super.write(ctx, msg, promise);
    }
    // ...
}

在这里我们还没有完成代码,但是我们仍然需要在代码的其他位置修复一些错误.

We are not done here yet, while we have the code in place, we still need to fix a few bugs in other places in the code.

非线程安全映射(严重错误)

其中一个错误是您使用普通的哈希映射来处理缓存.问题是这不是线程安全的,如果多个人同时连接到您的应用程序,可能会发生奇怪的事情,包括随着地图内部结构的更新而完全损坏地图.

One of those bugs in that you are using a normal hash map to handle your cache. The problem with this is that this is not thread safe, if multiple people connect to your app at the same time, weird things may happen, including full map corruption as the internal structure of the map updates.

为解决此问题,我们将地图升级"到 ConcurrentHashMap ,该映射具有特殊的结构,可以处理多个同时请求和存储数据的线程,而不会造成性能上的巨大损失. (如果主要关注性能,则可以通过使用每个线程的哈希映射而不是全局缓存来可能获得更高的性能,但这意味着可以将每个资源缓存到最多线程数量.

To counter this issue, we are going to "upgrade" the map to a ConcurrentHashMap, this map has special structures in place to deal with multiple threads requesting and storing data at the same time, without a huge loss in performance. (if performance is a main concern, you might get higher performance by using an per-thread hash map instead of a global cache, but this means that every resource can be cached up to the amount of threads.

没有缓存删除规则(重大错误)

目前,没有适当的代码来删除过时的资源,这意味着缓存将被填满,直到程序没有内存,然后它将崩溃.

At the moment, there is no code in place to remove outdated resources, this means the cache is going to fill up, until the program has no memory left, and it will then crash spectacularly.

这可以通过使用既提供线程安全访问又提供所谓的删除规则的地图实现来解决,也可以使用诸如 Gnuava缓存.

This can be solved by either using a map implementation that provides both thread-safe access and so called removal rules, or using already pre-made caching solutions like Gnuava caches.

无法正确处理HTTP流水线(重大错误)

HTTP鲜为人知的功能之一是管道化,这基本上意味着客户端可以向服务器发送另一个请求,,等待上一个请求的响应.这种类型的错误包括服务器,这些服务器交换两个请求的内容,甚至完全处理它们.

One of the lesser known features of HTTP is pipelining, this basically means that the client can send another request to the server, without waiting for a response on the previous request. Bugs of this type include servers that swap the content of both requests around, or even mangle them completely.

尽管如今流水线请求越来越少了,HTTP2的支持越来越多,并且知道那里有损坏的服务器,但是使用它的某些CLI工具仍然会发生这种情况.

While pipelined requests are rare these days with more and more HTTP2 support and the knowledge of that there are broken servers out there, it still happens with certain CLI tools that use it.

要解决此问题,请仅在发送上一个响应后读取请求,其中一种方法是保留请求列表,或寻求更高级的预制解决方案

To solve this issue, ONLY read a request AFTER you send the previous response, one of the ways to do is keeping a list of requests, or go for more advanced pre-make solutions

这篇关于如何在请求被写入代理Netty服务器的outboundChannel中的同一个处理程序中获取响应byteBuf的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-14 16:00