在一个流中处理来自不同连接的

在一个流中处理来自不同连接的

本文介绍了Akka-http 在一个流中处理来自不同连接的 HttpRequests的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Akka-http 文档 说:

Akka-http documentation says:

除了将绑定在服务器端的套接字视为Source[IncomingConnection] 和每个连接作为一个Source[HttpRequest] 和 Sink[HttpResponse]

假设我们得到了包含来自许多 Source[IncomingConnection] 的传入连接的合并源.

Assume we get the merged source containing incoming connections from many Source[IncomingConnection].

然后,假设我们从 Source[IncomingConnection] 得到 Source[HttpRequest](见下面的代码).

Then, assume, we get Source[HttpRequest] from Source[IncomingConnection] (see the code below).

那么,没问题,我们可以提供一个流程,把HttpRequest转换成HttpResponse.

Then, no problem, we can provide a flow to convert HttpRequest to HttpResponse.

这里是问题 - 我们如何正确接收响应?我们如何加入对连接的响应?

And here is the problem - how can we properly Sink the responses ? How can we join the responses to connections?

用例背后的整个想法是可以优先处理来自不同连接的传入请求.我想在很多情况下应该很有用...

The whole idea behind the use case is the possibility to prioritize the processing of incoming requests from different connections. Should be useful in many cases I guess...

提前致谢!

基于@RamonJRomeroyVigil 的回答的解决方案:

Solution based on the answer from @RamonJRomeroyVigil:

服务器代码:

val in1 = Http().bind(interface = "localhost", port = 8200)
val in2 = Http().bind(interface = "localhost", port = 8201)

val connSrc = Source.fromGraph(FlowGraph.create() { implicit b =>
  import FlowGraph.Implicits._

  val merge = b.add(Merge[IncomingConnection](2))

  in1 ~> print("in1") ~> merge.in(0)
  in2 ~> print("in2") ~> merge.in(1)

  SourceShape(merge.out)
})

val reqSrc : Source[(HttpRequest, IncomingConnection), _] =
  connSrc.flatMapConcat { conn =>
    Source.empty[HttpResponse]
      .via(conn.flow)
      .map(request => (request, conn))
  }

val flow: Flow[(HttpRequest, IncomingConnection), (HttpResponse, IncomingConnection), _] =
  Flow[(HttpRequest, IncomingConnection)].map{
      case (HttpRequest(HttpMethods.GET, Uri.Path("/ping"), _, entity, _), conn: IncomingConnection) =>
        println(s"${System.currentTimeMillis()}: " +
          s"process request from ${conn.remoteAddress.getHostName}:${conn.remoteAddress.getPort}")
        (HttpResponse(entity = "pong"), conn)
    }

reqSrc.via(flow).to(Sink.foreach { case (resp, conn) =>
  Source.single(resp).via(conn.flow).runWith(Sink.ignore)
}).run()

def print(prefix: String) = Flow[IncomingConnection].map { s =>
  println(s"$prefix [ ${System.currentTimeMillis()} ]: ${s.remoteAddress}"); s
}

所以,我从控制台使用 curl 并看到以下内容:

So, I am using curl from console and see the following:

% curl http://localhost:8200/ping
curl: (52) Empty reply from server

第二个 curl 请求失败:

Second curl request fails:

% curl http://localhost:8200/ping
curl: (7) Failed to connect to localhost port 8200: Connection refused

在服务器控制台上,我在发送第一个请求时看到以下内容:

On the server console I see the following when sending 1st request:

in1 [ 1450287301512 ]: /127.0.0.1:52461
1450287301626: process request from localhost:52461
[INFO] [12/16/2015 20:35:01.641] [default-akka.actor.default-dispatcher-6] [akka://default/system/IO-TCP-STREAM/server-1-localhost%2F127.0.0.1%3A8200] Message [akka.io.Tcp$Unbound$] from Actor[akka://default/system/IO-TCP/selectors/$a/0#119537130] to Actor[akka://default/system/IO-TCP-STREAM/server-1-localhost%2F127.0.0.1%3A8200#-1438663077] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [12/16/2015 20:35:01.641] [default-akka.actor.default-dispatcher-6] [akka://default/system/IO-TCP-STREAM/server-2-localhost%2F127.0.0.1%3A8201] Message [akka.io.Tcp$Unbound$] from Actor[akka://default/system/IO-TCP/selectors/$a/1#679898594] to Actor[akka://default/system/IO-TCP-STREAM/server-2-localhost%2F127.0.0.1%3A8201#1414174163] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

发送第二个请求时什么也没有.

And nothing when sending 2nd request.

因此,内部连接流(如@RamonJRomeroyVigil 所述)或其他问题似乎存在一些问题...

So, it looks like there is some problem with the internal connection flow (as stated @RamonJRomeroyVigil) or with something else...

基本上代码不起作用.

仍在调查问题.

推荐答案

以下解决方案基于问题评论中提供的更多信息.

The below solution is based on further information provided in the question comments.

给定

val connSrc : Source[IncomingConnection,_] = ???

flatMapConcat 方法解决了所述的具体问题:

The flatMapConcat method solves the specific question stated:

val reqSrc : Source[(HttpRequest, IncomingConnection), _] =
  connSrc.flatMapConcat { conn =>
    Source.empty[HttpResponse]
          .via(conn.flow)
          .map(request => (request, conn))
  }

这提供了 (HttpRequest, IncomingConnection) 元组的来源.

This provides a Source of (HttpRequest, IncomingConnection) tuples.

假设您有一个将请求转换为响应的处理步骤

Assuming you have a processing step that converts requests to respones

val flow : Flow[(HttpRequest, IncomingConnection), (HttpResponse, IncomingConnection), _] = ???

您可以将响应发送回客户端:

You can send a response back to the client:

reqSrc.via(flow).to(Sink.foreach { case (resp, conn) =>
  Source.single(resp).via(conn.flow).runWith(Sink.ignore)
})

警告:此解决方案调用 conn.flow 两次:一次创建一个生成请求的流,另一次创建一个发送响应的流.我不知道这种类型的用例是否会破坏 IncomingConnection 逻辑中的某些内容.

Warning: This solution calls conn.flow twice: once to create a flow that generates requests and again to create a flow to send responses to. I do not know if this type of use-case will break something within the IncomingConnection logic.

这篇关于Akka-http 在一个流中处理来自不同连接的 HttpRequests的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 10:25