我正在尝试编写一个调用HTTP REST API的actor。其余的API需要一个查询参数,该参数将从调用Actor传递。 official documentation的示例使用preStart方法通过管道将消息传递到自身来实现上述目的:

import akka.actor.{ Actor, ActorLogging }
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.util.ByteString

class Myself extends Actor
  with ActorLogging {

  import akka.pattern.pipe
  import context.dispatcher

  final implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(context.system))

  val http = Http(context.system)

  override def preStart() = {
    http.singleRequest(HttpRequest(uri = "http://akka.io"))
      .pipeTo(self)
  }

  def receive = {
    case HttpResponse(StatusCodes.OK, headers, entity, _) =>
      entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
        log.info("Got response, body: " + body.utf8String)
      }
    case resp @ HttpResponse(code, _, _, _) =>
      log.info("Request failed, response code: " + code)
      resp.discardEntityBytes()
  }

}

上面的方法有效,但是URL是硬编码的。我想要实现的是一个REST客户端参与者,我可以将参数作为消息发送给该对象并获取调用结果。我修改了上面的代码以将参数作为消息接收(伪代码):
      def receive = {

case param: RESTAPIParameter => {
            http.singleRequest(HttpRequest(URI("http://my-rest-url").withQuery("name", "value"))
                .pipeTo(self)
          }

        case HttpResponse(StatusCodes.OK, headers, entity, _) =>
          entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
            log.info("Got response, body: " + body.utf8String)
            sender! body.utf8String    //Will not work
          }
        case resp @ HttpResponse(code, _, _, _) =>
          log.info("Request failed, response code: " + code)
          resp.discardEntityBytes()
      }

上面的方法应该可以工作,但是不能真正用于将响应发送回客户端,因为当REST调用的结果通过管道返回到sender时,self参考丢失了。

我想我可以尝试将发件人本地存储在一个变量中,然后使用它将响应传递回去,但是我认为这不是一个好主意。

那么,处理这种情况的正确方法是什么?

编辑:@ PH88下面建议的解决方案有效,但我想在外循环中保持HttpResponse上的模式匹配。

编辑2 :之所以要将响应通过管道传递回self是因为我想实现一种状态机。状态根据参与者收到的消息类型而改变。举个例子:
  • 第一种状态可能是从调用方收到查询字符串。 actor调用REST api和becomes awaitingResult。数据通过管道传输到自身以进行进一步处理。
  • 收到带有成功代码的HTTPResponse时,状态为becomes dataRecevied。数据再次通过管道传输到self进行更多处理。
  • 然后将接收到的数据转换为内部供应商中性格式,并将结果最终发送回给调用方。
  • 如果上述1中的响应代码不成功,则可以将状态更改为HttpError并进行相应的处理。

  • 希望可以澄清意图。任何其他建议/设计,以实现简洁/简单的设计,欢迎:-)

    最佳答案

    您可以将HttpResponse与您自己的case类包装在一起,并将发送方与其 bundle 在一起:

    case class ServerResponse(requester: ActorRef, resp: HttpResponse)
    

    然后:
      def receive = {
    
        case param: RESTAPIParameter => {
            val requester = sender
            http.singleRequest(HttpRequest(URI("http://my-rest-url").withQuery("name", "value"))
                .map(httpResp =>
                   // This will execute in some other thread, thus
                   // it's important to NOT use sender directly
                   ServerResponse(requester, httpResp)
                )
                .pipeTo(self)
          }
    
        case ServerResponse(requester, HttpResponse(...)) =>
          val result = ...
          requester ! result
    
        ...
      }
    

    关于scala - 如何在Actor中包装REST API客户端,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/43739767/

    10-16 17:08