无法在Akka Stream中使用GraphStage类运行SourceShape

诺里奥

我正在尝试使用GraphStage构造创建Redis Akka流源。这个想法是,只要我从subscription方法获得更新,就将其推送到下一个组件。同样,如果没有拉动信号,则组件应背压。这是代码:

class SubscriberSourceShape(channel: String, subscriber: Subscriber) 
    extends GraphStage[SourceShape[String]] {

  private val outlet: Outlet[String] = Outlet("SubscriberSource.Out")

  override def shape: SourceShape[String] = SourceShape(outlet)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) {
      val callback = getAsyncCallback((msg: String) => push(outlet, msg)
      val handler = (msg: String) => callback.invoke(msg)

      override def preStart(): Unit = subscriber.subscribe(channel)(handler)
    }
  }
}

但是,当我用一个简单的接收器运行它时,出现此错误:

Error in stage [akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@58344854]: No handler defined in stage [...SubscriberSourceShape@6a9193dd] for out port [RedisSubscriberSource.Out(1414886352). All inlets and outlets must be assigned a handler with setHandler in the constructor of your graph stage logic.

怎么了?

Locorecto

由于未为Source设置任何输出处理程序,因此出现此错误,因此,当下游组件(Flow或Sink)向该Source发送拉动信号时,将没有处理程序处理该拉动信号。

您可以添加OutputHandler来消除该错误。将onPull方法留空,因为您是在asyncCallback上生成元素的。只需将其添加到GraphStageLogic主体中:

      setHandler(outlet, new OutHandler {
        override def onPull(): Unit = {}
      })

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

无法在Akka Stream中使用GraphStage类运行SourceShape

来自分类Dev

无法使用SBT运行简单的Akka示例

来自分类Dev

Akka actor无法使用actorSelection解决

来自分类Dev

在Akka中使用SSL连接-无法进行配置

来自分类Dev

无法使用 Akka 流获得正确的分块响应

来自分类Dev

无法使用 scala 2.12.6 sbt Akka 下载 ZeroMQ 依赖项

来自分类Dev

Akka(2.3.0)无法使用java.lang.ClassNotFoundException加载Slf4jEventHandler类

来自分类Dev

Akka(2.3.0)无法使用java.lang.ClassNotFoundException加载Slf4jEventHandler类

来自分类Dev

无法在可运行类的kotlin / java中使用applicationContext

来自分类Dev

从GraphStage内部关闭Akka流(Akka 2.4.2)

来自分类Dev

无法使用 scala akka-http 将文件保存到 s3

来自分类Dev

无法使用 AKKA-Http 和 Spray-Json 将 JSON Marshal B 定义为 List[A]

来自分类Dev

在Akka中使用logback

来自分类Dev

无法在类中使用$ this赋值

来自分类Dev

在Akka 2.XX的演员类定义中使用@Transactional?

来自分类Dev

Akka OneForOneStrategy无法正常工作

来自分类Dev

在akka流中使用mapFuture

来自分类Dev

在Eclipse(Java)中使用Akka

来自分类Dev

无法在Eclipse中使用FileEditorInput类

来自分类Dev

无法在QML中使用C ++类

来自分类Dev

无法在C ++中使用类实例

来自分类Dev

无法在Python中使用OpenCV GeneralizedHoughTransform类

来自分类Dev

使用可运行的jar运行Akka

来自分类Dev

Akka 远程共享类

来自分类Dev

如何在akka actor中使用akka-http cachedHostConnectionPool?

来自分类Dev

无法在Razor Pages代码隐藏类中使用注入的类

来自分类Dev

无法在Razor Pages代码隐藏类中使用注入的类

来自分类Dev

Ruby,在类外部定义的Socket实例,无法在类中使用

来自分类Dev

如何使用键入的akka解决akka.stream.Graph接收器错误akka流

Related 相关文章

  1. 1

    无法在Akka Stream中使用GraphStage类运行SourceShape

  2. 2

    无法使用SBT运行简单的Akka示例

  3. 3

    Akka actor无法使用actorSelection解决

  4. 4

    在Akka中使用SSL连接-无法进行配置

  5. 5

    无法使用 Akka 流获得正确的分块响应

  6. 6

    无法使用 scala 2.12.6 sbt Akka 下载 ZeroMQ 依赖项

  7. 7

    Akka(2.3.0)无法使用java.lang.ClassNotFoundException加载Slf4jEventHandler类

  8. 8

    Akka(2.3.0)无法使用java.lang.ClassNotFoundException加载Slf4jEventHandler类

  9. 9

    无法在可运行类的kotlin / java中使用applicationContext

  10. 10

    从GraphStage内部关闭Akka流(Akka 2.4.2)

  11. 11

    无法使用 scala akka-http 将文件保存到 s3

  12. 12

    无法使用 AKKA-Http 和 Spray-Json 将 JSON Marshal B 定义为 List[A]

  13. 13

    在Akka中使用logback

  14. 14

    无法在类中使用$ this赋值

  15. 15

    在Akka 2.XX的演员类定义中使用@Transactional?

  16. 16

    Akka OneForOneStrategy无法正常工作

  17. 17

    在akka流中使用mapFuture

  18. 18

    在Eclipse(Java)中使用Akka

  19. 19

    无法在Eclipse中使用FileEditorInput类

  20. 20

    无法在QML中使用C ++类

  21. 21

    无法在C ++中使用类实例

  22. 22

    无法在Python中使用OpenCV GeneralizedHoughTransform类

  23. 23

    使用可运行的jar运行Akka

  24. 24

    Akka 远程共享类

  25. 25

    如何在akka actor中使用akka-http cachedHostConnectionPool?

  26. 26

    无法在Razor Pages代码隐藏类中使用注入的类

  27. 27

    无法在Razor Pages代码隐藏类中使用注入的类

  28. 28

    Ruby,在类外部定义的Socket实例,无法在类中使用

  29. 29

    如何使用键入的akka解决akka.stream.Graph接收器错误akka流

热门标签

归档