本文介绍了Netty IllegalReferenceCountException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我的回答。虽然我的业务逻辑没有问题,但事实证明我没有使用Netty ByteBuf 。一旦我更新了我的测试代码以使用 ByteBuf ,我就遇到了无休止的循环。我承认自己是Netty的新手,但这并不能证明回到手动资源分配和发布的时代。创建GC是为了避免这种混乱。迪斯科,有人吗?那么Bell Bottoms呢?

This is a follow up of my answer here. While my business logic has no problem, it turned out that I wasn't using Netty ByteBuf. Once I updated my test code to use ByteBuf, I ran into an endless loop of IllegalReferenceCountException. I admit to being new to Netty, but that doesn't justify being back in the days of manual resource allocation and release. GC was created to avoid this mess. Disco, anyone? How about Bell Bottoms then?

public class StringDecoder extends AbstractDecoder<String> {
    private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r';

    @Override
    public Flux<String> decode(Publisher<DataBuffer> publisher, ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) {
        return Flux.from(publisher)
            .scan(Tuples.<Flux<DataBuffer>, Optional<DataBuffer>>of(Flux.empty(), Optional.empty()),
                    (acc, buffer) -> {
                        List<DataBuffer> results = new ArrayList<>();
                        int startIdx = 0, endIdx = 0, limit = buffer.readableByteCount();
                        Optional<DataBuffer> incomplete = acc.getT2();

                        while (startIdx < limit && endIdx != -1) {
                            endIdx = buffer.indexOf(NEWLINE_DELIMITER, startIdx);
                            int length = (endIdx == -1 ? limit : endIdx) - startIdx;
                            DataBuffer slice = buffer.slice(startIdx, length);
                            DataBuffer tmp = incomplete.map(b -> b.write(slice))
                                    .orElse(buffer.factory().allocateBuffer(length).write(slice));
                            tmp = DataBufferUtils.retain(tmp);

                            if (endIdx != -1) {
                                startIdx = endIdx + 1;
                                results.add(tmp);
                                incomplete = Optional.empty();
                            } else {
                                incomplete = Optional.of(tmp);
                            }
                        }
                        releaseBuffer(buffer);

                        return Tuples.of(Flux.fromIterable(results), incomplete);
                    })
            .flatMap(t -> {
                t.getT2().ifPresent(this::releaseBuffer);
                return t.getT1();
            })
            .map(buffer -> {
                // charset resolution should in general use supplied mimeType
                String s = UTF_8.decode(buffer.asByteBuffer()).toString();
                releaseBuffer(buffer);

                return s;
            })
            .log();
    }

    private void releaseBuffer(DataBuffer buffer) {
        boolean release = DataBufferUtils.release(buffer);
        if (release) {
            System.out.println("Buffer was released.");
        }
    }
}

public class StringDecoderTest {
    private StringDecoder stringDecoder = new StringDecoder();
    DataBufferFactory dataBufferFactory = new NettyDataBufferFactory(UnpooledByteBufAllocator.DEFAULT);

    @Test
    public void testDecode() {
        Flux<DataBuffer> pub = Flux.just("abc\n", "abc", "def\n", "abc", "def\nxyz\n", "abc", "def", "xyz\n")
                .map(s -> dataBufferFactory.wrap(s.getBytes(UTF_8)));

        StepVerifier.create(stringDecoder.decode(pub, null, null, null))
                .expectNext("abc", "abcdef", "abcdef", "xyz", "abcdefxyz")
                .verifyComplete();
    }
}

我一直得到:

[ERROR] (main) onError(io.netty.util.IllegalReferenceCountException: refCnt: 0)
[ERROR] (main)  - io.netty.util.IllegalReferenceCountException: refCnt: 0
io.netty.util.IllegalReferenceCountException: refCnt: 0
    at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1415)
    at io.netty.buffer.UnpooledHeapByteBuf.nioBuffer(UnpooledHeapByteBuf.java:314)
    at io.netty.buffer.AbstractUnpooledSlicedByteBuf.nioBuffer(AbstractUnpooledSlicedByteBuf.java:434)
    at io.netty.buffer.CompositeByteBuf.nioBuffers(CompositeByteBuf.java:1496)
    at io.netty.buffer.CompositeByteBuf.nioBuffer(CompositeByteBuf.java:1468)
    at io.netty.buffer.AbstractByteBuf.nioBuffer(AbstractByteBuf.java:1205)
    at org.springframework.core.io.buffer.NettyDataBuffer.asByteBuffer(NettyDataBuffer.java:234)
    at org.abhijitsarkar.java.StringDecoder.lambda$decode$4(StringDecoder.java:61)


推荐答案

工作代码:

public class StringDecoder extends AbstractDecoder<String> {
    private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r';

    @Override
    public Flux<String> decode(Publisher<DataBuffer> publisher, ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) {
        DataBuffer incomplete = new NettyDataBufferFactory(UnpooledByteBufAllocator.DEFAULT).allocateBuffer(0);

        return Flux.from(publisher)
                .scan(Tuples.<Flux<DataBuffer>, DataBuffer>of(Flux.empty(), retain(incomplete)),
                      (acc, buffer) -> {
                          List<DataBuffer> results = new ArrayList<>();
                          int startIdx = 0, endIdx = 0, limit = buffer.readableByteCount();

                          while (startIdx < limit && endIdx != -1) {
                              endIdx = buffer.indexOf(NEWLINE_DELIMITER, startIdx);
                              int length = (endIdx == -1 ? limit : endIdx) - startIdx;

                              DataBuffer slice = buffer.slice(startIdx, length);
                              byte[] slice1 = new byte[length];
                              slice.read(slice1, 0, slice1.length);

                              if (endIdx != -1) {
                                  byte[] slice2 = new byte[incomplete.readableByteCount()];
                                  incomplete.read(slice2, 0, slice2.length);
                                  // call retain to match release during decoding to string later
                                  results.add(retain(
                                          incomplete.factory().allocateBuffer()
                                                  .write(slice2)
                                                  .write(slice1)
                                  ));
                                  startIdx = endIdx + 1;
                              } else {
                                  incomplete.write(slice1);
                              }
                          }

                          return Tuples.of(Flux.fromIterable(results), incomplete);
                      })
                .flatMap(Tuple2::getT1)
                .map(buffer -> {
                    // charset resolution should in general use supplied mimeType
                    String s = UTF_8.decode(buffer.asByteBuffer()).toString();

                    return s;
                })
                .doOnTerminate(() -> release(incomplete))
                .log();
    }
}

代码可能更清晰,但对于Spring错误。

The code could be a little cleaner, but for Spring bug SPR-16351.

这篇关于Netty IllegalReferenceCountException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-10 18:38