问题描述
我正在使用 Scala/sbt
浏览以下示例:
I'm going through following samples using Scala / sbt
:
flink/elasticsearch/kibana
flink 教程
我的 built.sbt
包括以下版本:
My built.sbt
includes following versions:
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % "1.3.1" % "provided",
"org.apache.flink" %% "flink-streaming-scala" % "1.3.1" % "provided",
"org.apache.flink" %% "flink-clients" % "1.3.1" % "provided",
"joda-time" % "joda-time" % "2.9.9",
"com.google.guava" % "guava" % "22.0",
"com.typesafe" % "config" % "1.3.0",
"org.apache.flink" % "flink-connector-kafka-0.10_2.10" % "1.2.0",
"org.elasticsearch" % "elasticsearch" % "5.5.1",
"org.elasticsearch.client" % "transport" % "5.5.1",
"org.apache.flink" % "flink-connector-elasticsearch5_2.10" % "1.3.1"
)
程序失败并出现以下异常(减法):
Program fails with following exception (substract):
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NoSuchMethodError: org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
at org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:52)
at org.aj.flink.stream.TaxiDashboard$MyElasticsearchInserter.process(TaxiDashboard.scala:157)
at org.aj.flink.stream.TaxiDashboard$MyElasticsearchInserter.process(TaxiDashboard.scala:137)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:282)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
这是我的相关导入:
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
这是MyElasticsearchInserter
的代码:
final class MyElasticsearchInserter(index: String, mappingType: String)
extends ElasticsearchSinkFunction[(Int, Long, (Float, Float), Short)] {
def process(element: (Int, Long, (Float, Float), Short),
ctx: RuntimeContext,
indexer: RequestIndexer): Unit = {
// construct JSON document to index
val json = Map(
"time" -> element._2.toString,
"location" -> (element._3._1 + "," + element._3._2),
"cnt" -> element._4.toString
)
val rqst: IndexRequest = Requests.indexRequest
.index(index)
.`type`(mappingType)
.source(json.asJava)
indexer.add(rqst)
}
}
这是添加接收器的代码:
Here is code for adding a Sink:
if (writeToElasticsearch) {
// write to Elasticsearch
nycRides.addSink(new ElasticsearchSink(esConfig,
esTransport, new MyElasticsearchInserter("nyc-idx", "popular-locations")))
}
看起来问题是 flink 1.3.1
是否可以与 elasticsearch 5.5.1
一起使用?没有找到以下方法?
It looks the question is rather if flink 1.3.1
can be used with elasticsearch 5.5.1
? Following method is not found?
Caused by: java.lang.NoSuchMethodError: org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
at org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:52)
at org.aj.flink.stream.TaxiDashboard$MyElasticsearchInserter.process(TaxiDashboard.scala:157)
非常感谢
推荐答案
在快速浏览 ES 代码库后,我认为他们在 BulkProcessor#add(ActionRequest)
) 中删除了该方法>5.2.0.
Just after a quick look at ES code base I think they removed that method(BulkProcessor#add(ActionRequest)
) in 5.2.0
.
Flink 在内部使用该方法.不幸的是,我担心 Flink 连接器现在不能与 ES 5.3+ 连接器一起使用.
Flink uses that method internally. Unfortunately I fear Flink connector won't work with ES 5.3+ connector right now.
这篇关于flink 1.3.1 弹性搜索 5.5.1.ElasticsearchSinkFunction 因 java.lang.NoSuchMethodError 失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!