相由心生,命由己造

相由心生,命由己造

版本信息:

  • Flink 1.17.1
  • Doris 1.2.3
  • Flink Doris Connector 1.4.0

写入方式

采用 String 数据流,依照社区网站的样例代码,在sink之前将数据转换为DataStream

运行异常

通过Stream Load返回结果json中的ErrorUrl可以看到如题的异常

Reason: actual column number in csv file is less than schema column number. actual number: 10, ..., schema column number: 11; src line: [...]

数据库表明明只有10个字段,提示schema column number却是11个。是自己眼花数错字段了吗?经过反复确认及同事确认,没有错,目标表就是10个字段,我写入的也是10个字段,是Flink Doris Connector 的bug吗?

分析过程

既然怀疑是bug,那就去扒代码。
实际数据写入逻辑封装在org.apache.doris.flink.sink.writer.DorisWriter,该类实现了org.apache.flink.api.connector.sink.SinkWriter接口。查看该类发现,写入Doris的过程实际是使用微批写入的。

    @Override
    public void write(IN in, Context context) throws IOException {
        checkLoadException();
        byte[] serialize = serializer.serialize(in);
        if(Objects.isNull(serialize)){
            //ddl record
            return;
        }
        if(!loading) {
            //Start streamload only when there has data
            dorisStreamLoad.startLoad(currentLabel);
            loading = true;
        }
        dorisStreamLoad.writeRecord(serialize);
    }
    @Override
    public List<DorisCommittable> prepareCommit(boolean flush) throws IOException {
        if(!loading){
            //There is no data during the entire checkpoint period
            return Collections.emptyList();
        }
        // disable exception checker before stop load.
        loading = false;
        Preconditions.checkState(dorisStreamLoad != null);
        RespContent respContent = dorisStreamLoad.stopLoad(currentLabel);
        if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
            String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL());
            throw new DorisRuntimeException(errMsg);
        }
        if (!executionOptions.enabled2PC()) {
            return Collections.emptyList();
        }
        long txnId = respContent.getTxnId();
        return ImmutableList.of(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId));
    }

每一条记录都会触发write操作,从上述代码可以看到根据boolean变量loading的值,程序将会触发dorisStreamLoad.startLoad(currentLabel);,而loading的状态在preCommit方法中进行修改,而preCommit是在checkpoint时触发,所以数据提交动作是通过checkpoint触发的。查看startLoad源代码


    /**
     * start write data for new checkpoint.
     * @param label
     * @throws IOException
     */
    public void startLoad(String label) throws IOException{
        loadBatchFirstRecord = true;
        HttpPutBuilder putBuilder = new HttpPutBuilder();
        recordStream.startInput();
        LOG.info("stream load started for {} on host {}", label, hostPort);
        try {
            InputStreamEntity entity = new InputStreamEntity(recordStream);
            putBuilder.setUrl(loadUrlStr)
                    .baseAuth(user, passwd)
                    .addCommonHeader()
                    .addHiddenColumns(enableDelete)
                    .setLabel(label)
                    .setEntity(entity)
                    .addProperties(streamLoadProp);
            if (enable2PC) {
               putBuilder.enable2PC();
            }
            pendingLoadFuture = executorService.submit(() -> {
                LOG.info("start execute load");
                return httpClient.execute(putBuilder.build());
            });
        } catch (Exception e) {
            String err = "failed to stream load data with label: " + label;
            LOG.warn(err, e);
            throw e;
        }
    }

DorisStreamLoad类负责将数据实际写入Doris,在上面的代码中我看到了一个陌生的词汇HiddenColumns,“隐藏列”,什么是隐藏列?.addHiddenColumns(enableDelete)的参数enableDelete 是一个boolean值,继续扒代码发现,默认值enableDelete = true;,addHiddenColumn(true)?是否意味着我的put操作数据中必须包含隐藏列?继续扒

    public HttpPutBuilder addHiddenColumns(boolean add) {
        if(add){
            header.put("hidden_columns", LoadConstants.DORIS_DELETE_SIGN);
        }
        return this;
    }

在http请求header中添加了一个配置,似乎是指明了"hidden_columns"="DORIS_DELETE_SIGN",看着好像是一个列名称,使用IDEA的跟踪调用功能,查看下哪里用到了这个变量。
Doris写入数据异常提示actual column number in csv file is less than schema column number-LMLPHP
跟踪这些代码更确信,这是一个列名称。我的10列加上这一列就是11列啊,设置enableDelete = false,是否意味着我的put操作不再包含这一隐含列?

解决方案

修改构造DorisSink的代码添加.setDeletable(false);

        DorisExecutionOptions.Builder  executionBuilder = DorisExecutionOptions.builder();
        executionBuilder.setLabelPrefix(labelPrefix) //streamload label prefix
                .setDeletable(false);

重新运行代码,写入成功,问题解决。

总结

出现该异常是因为,Flink Doris Connector 在构造Sink时默认用户写入数据中包含了隐藏列__DORIS_DELETE_SIGN__
尽管问题解决了,但是还是有很多疑问,什么是隐藏列,__DORIS_DELETE_SIGN__这个隐藏列是什么意思,从前面的代码中可以看出其取值为0或1,导入数据时为什么默认需要传递该列,该列在最前面还是在最后面?不传递该列是否会有问题?

07-14 18:57