Websphere MQ作为Apache Spark流的数据源

汤姆

我正在研究将Websphere MQ用作火花流的数据源的可能性,因为在我们的一个用例中需要它。我知道MQTT是支持从MQ数据结构进行通信的协议,但是由于我是引发流传输的新手,因此我需要一些适用的示例。是否有人尝试将MQ与Spark Streaming连接起来。请设计最佳方法。

汤姆

所以,我在这里发布CustomMQReceiver的工作代码,该代码连接Websphere MQ并读取数据:

public class CustomMQReciever extends Receiver<String> { String host = null;
int port = -1;
String qm=null;
String qn=null;
String channel=null;
transient Gson gson=new Gson();
transient MQQueueConnection qCon= null;

Enumeration enumeration =null;

public CustomMQReciever(String host , int port, String qm, String channel, String qn) {
    super(StorageLevel.MEMORY_ONLY_2());
    this.host = host;
    this.port = port;
    this.qm=qm;
    this.qn=qn;
    this.channel=channel;

}

public void onStart() {
    // Start the thread that receives data over a connection
    new Thread()  {
        @Override public void run() {
            try {
                initConnection();
                receive();
            }
            catch (JMSException ex)
            {
                ex.printStackTrace();
            }
        }
    }.start();
}
public void onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself isStopped() returns false
}

 /** Create a MQ connection and receive data until receiver is stopped */
private void receive() {
  System.out.print("Started receiving messages from MQ");

    try {

    JMSMessage receivedMessage= null;

        while (!isStopped() && enumeration.hasMoreElements() )
        {

            receivedMessage= (JMSMessage) enumeration.nextElement();
            String userInput = convertStreamToString(receivedMessage);
            //System.out.println("Received data :'" + userInput + "'");
            store(userInput);
        }

        // Restart in an attempt to connect again when server is active again
        //restart("Trying to connect again");

        stop("No More Messages To read !");
        qCon.close();
        System.out.println("Queue Connection is Closed");

    }
    catch(Exception e)
    {
        e.printStackTrace();
        restart("Trying to connect again");
    }
    catch(Throwable t) {
        // restart if there is any other error
        restart("Error receiving data", t);
    }
    }

  public void initConnection() throws JMSException
{
    MQQueueConnectionFactory conFactory= new MQQueueConnectionFactory();
    conFactory.setHostName(host);
    conFactory.setPort(port);
    conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
    conFactory.setQueueManager(qm);
    conFactory.setChannel(channel);


    qCon= (MQQueueConnection) conFactory.createQueueConnection();
    MQQueueSession qSession=(MQQueueSession) qCon.createQueueSession(false, 1);
    MQQueue queue=(MQQueue) qSession.createQueue(qn);
    MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
    qCon.start();

    enumeration= browser.getEnumeration();
   }

 @Override
public StorageLevel storageLevel() {
    return StorageLevel.MEMORY_ONLY_2();
}
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Spark流:自定义接收器:数据源:Websphere Message Queue

来自分类Dev

Jython脚本在Websphere中创建oracle数据源

来自分类Dev

Jython脚本在Websphere中创建oracle数据源

来自分类Dev

WebSphere Liberty概要文件数据源和CDI

来自分类Dev

使用 websphere 8.5.5.0 和 Spring 配置数据源?

来自分类Dev

在 IBM WebSphere 中创建数据源时出现 NullPointerException

来自分类Dev

对于Websphere MQ教程

来自分类Dev

使用SqlBulkCopy时,将流作为二进制列的数据源

来自分类Dev

如何将Apache Ignite用作Apache Spark的外部数据源

来自分类Dev

如何将Apache Ignite用作Apache Spark的外部数据源

来自分类Dev

当使用hbase作为数据源时,spark是否利用hbase键的排序顺序

来自分类Dev

Java的Websphere MQ类与JMS的Websphere MQ类之间的区别

来自分类Dev

Apache Spark日志记录重定向WebSphere日志输出

来自分类Dev

在WebSphere服务器上使用JNDI的Spring自定义数据源连接

来自分类Dev

如何将Websphere服务器配置(数据源,jms等)分配到多个实例?

来自分类Dev

如何在WebSphere Application Server 7上逐步设置Oracle JDBC数据源

来自分类Dev

从外部独立Java客户端访问Websphere Liberty Profile 8.5中定义的数据源

来自分类Dev

有什么方法可以像传统那样在Websphere Liberty中测试数据源?

来自分类Dev

有什么方法可以像传统那样在Websphere Liberty中测试数据源?

来自分类Dev

我应该在哪里找到数据源与Websphere中部署的战争绑定

来自分类Dev

Websphere MQ持久主题超时

来自分类Dev

MQ Websphere-传输文件

来自分类Dev

将Websphere MQ与boomi集成

来自分类Dev

从Websphere MQ删除动态主题

来自分类Dev

Websphere MQ回退队列配置

来自分类Dev

MQ Websphere-传输文件

来自分类Dev

NServiceBus和WebSphere mq配置

来自分类Dev

Websphere MQ-发布/订阅

来自分类Dev

字典作为ComboBox的数据源

Related 相关文章

  1. 1

    Spark流:自定义接收器:数据源:Websphere Message Queue

  2. 2

    Jython脚本在Websphere中创建oracle数据源

  3. 3

    Jython脚本在Websphere中创建oracle数据源

  4. 4

    WebSphere Liberty概要文件数据源和CDI

  5. 5

    使用 websphere 8.5.5.0 和 Spring 配置数据源?

  6. 6

    在 IBM WebSphere 中创建数据源时出现 NullPointerException

  7. 7

    对于Websphere MQ教程

  8. 8

    使用SqlBulkCopy时,将流作为二进制列的数据源

  9. 9

    如何将Apache Ignite用作Apache Spark的外部数据源

  10. 10

    如何将Apache Ignite用作Apache Spark的外部数据源

  11. 11

    当使用hbase作为数据源时,spark是否利用hbase键的排序顺序

  12. 12

    Java的Websphere MQ类与JMS的Websphere MQ类之间的区别

  13. 13

    Apache Spark日志记录重定向WebSphere日志输出

  14. 14

    在WebSphere服务器上使用JNDI的Spring自定义数据源连接

  15. 15

    如何将Websphere服务器配置(数据源,jms等)分配到多个实例?

  16. 16

    如何在WebSphere Application Server 7上逐步设置Oracle JDBC数据源

  17. 17

    从外部独立Java客户端访问Websphere Liberty Profile 8.5中定义的数据源

  18. 18

    有什么方法可以像传统那样在Websphere Liberty中测试数据源?

  19. 19

    有什么方法可以像传统那样在Websphere Liberty中测试数据源?

  20. 20

    我应该在哪里找到数据源与Websphere中部署的战争绑定

  21. 21

    Websphere MQ持久主题超时

  22. 22

    MQ Websphere-传输文件

  23. 23

    将Websphere MQ与boomi集成

  24. 24

    从Websphere MQ删除动态主题

  25. 25

    Websphere MQ回退队列配置

  26. 26

    MQ Websphere-传输文件

  27. 27

    NServiceBus和WebSphere mq配置

  28. 28

    Websphere MQ-发布/订阅

  29. 29

    字典作为ComboBox的数据源

热门标签

归档