当使用hbase作为数据源时,spark是否利用hbase键的排序顺序

怀疑

我将time-series资料储存在中HBase行键由user_id组成timestamp,如下所示:

{
    "userid1-1428364800" : {
        "columnFamily1" : {
            "val" : "1"
            }
        }
    }
    "userid1-1428364803" : {
        "columnFamily1" : {
            "val" : "2"
            }
        }
    }

    "userid2-1428364812" : {
        "columnFamily1" : {
            "val" : "abc"
            }
        }
    }

}

现在,我需要执行每个用户的分析。这是hbase_rdd(从这里开始的初始化

sc = SparkContext(appName="HBaseInputFormat")

conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

hbase_rdd = sc.newAPIHadoopRDD(
        "org.apache.hadoop.hbase.mapreduce.TableInputFormat",
        "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
        "org.apache.hadoop.hbase.client.Result",
        keyConverter=keyConv,
        valueConverter=valueConv,
        conf=conf)

类似于mapreduce的自然处理方式为:

hbase_rdd
   .map(lambda row: (row[0].split('-')[0], (row[0].split('-')[1], row[1])))  # shift timestamp from key to value
   .groupByKey()
   .map(processUserData)  # process user's data

在执行第一个映射(将时间戳从键更改为值)时,至关重要的是要知道当前用户的时间序列数据何时完成,因此可以启动groupByKey转换。因此,我们不需要映射所有表并存储所有临时数据。这是可能的,因为hbase以排序的顺序存储行键。

使用hadoop流,可以通过以下方式完成:

import sys

current_user_data = []
last_userid = None
for line in sys.stdin:
    k, v = line.split('\t')
    userid, timestamp = k.split('-')
    if userid != last_userid and current_user_data:
        print processUserData(last_userid, current_user_data)
        last_userid = userid
        current_user_data = [(timestamp, v)]
    else:
        current_user_data.append((timestamp, v))

问题是:如何在Spark中利用hbase键的排序顺序?

伊姆兰·拉希德(Imran Rashid)

我对从HBase提取数据的方式所获得的保证并不十分熟悉,但是如果我理解正确的话,我只能用普通的Spark来回答。

你有一些RDD[X]据Spark所知,其中的XsRDD是完全无序的。但是您具有一些外部知识,可以保证数据实际上是按某个字段分组的X(甚至可能按另一个字段排序的)。

在这种情况下,您mapPartitions实际上可以使用与hadoop流相同的功能。这样一来,您就可以遍历一个分区中的所有记录,因此可以查找具有相同键的记录块。

val myRDD: RDD[X] = ...
val groupedData: RDD[Seq[X]] = myRdd.mapPartitions { itr =>
  var currentUserData = new scala.collection.mutable.ArrayBuffer[X]()
  var currentUser: X = null
  //itr is an iterator over *all* the records in one partition
  itr.flatMap { x => 
    if (currentUser != null && x.userId == currentUser.userId) {
      // same user as before -- add the data to our list
      currentUserData += x
      None
    } else {
      // its a new user -- return all the data for the old user, and make
      // another buffer for the new user
      val userDataGrouped = currentUserData
      currentUserData = new scala.collection.mutable.ArrayBuffer[X]()
      currentUserData += x
      currentUser = x
      Some(userDataGrouped)
    }
  }
}
// now groupedRDD has all the data for one user grouped together, and we didn't
// need to do an expensive shuffle.  Also, the above transformation is lazy, so
// we don't necessarily even store all that data in memory -- we could still
// do more filtering on the fly, eg:
val usersWithLotsOfData = groupedRDD.filter{ userData => userData.size > 10 }

我意识到您想使用python-对不起,我想如果我使用Scala编写示例,我更有可能得到正确的示例。而且我认为类型注释使含义更清晰,但它可能是Scala偏见... :)。无论如何,希望您能理解正在发生的事情并进行翻译。(不要过分担心flatMapSomeNone,也许不重要,如果你理解的想法...)

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Hbase使用整数作为行键和字典顺序

来自分类Dev

使用静态类作为数据源

来自分类Dev

使用Spark从HBase读取特定的列数据

来自分类Dev

Websphere MQ作为Apache Spark流的数据源

来自分类Dev

使用SqlBulkCopy时,将流作为二进制列的数据源

来自分类Dev

使用servlet作为数据源时,iggrid的空值失败

来自分类Dev

Angular - 使用 FormControlArray 作为数据源时向 mat-table 添加新行

来自分类Dev

使用熊猫从在线数据源读取时是否可以指定数据频率?

来自分类Dev

Kendo UI 使用外键从数据源获取数据

来自分类Dev

Spring&JDBCTemplate:使用基础数据源时资源是否会自动关闭?

来自分类Dev

Spring&JDBCTemplate:使用基础数据源时资源是否会自动关闭?

来自分类Dev

调用 cellForItem(at:) 时,UICollectionView 是否在后台使用其数据源?

来自分类Dev

无法使用对象数据源进行排序

来自分类Dev

使用字典作为组合框数据源

来自分类Dev

直接使用领域对象(即作为CollectionView数据源)

来自分类Dev

使用json作为Assemble.io的数据源

来自分类Dev

如何使用DataSet作为数据源过滤DataGridView

来自分类Dev

使用存储桶作为droplink / tree字段的数据源

来自分类Dev

使用Observable作为数据源的角度拖放

来自分类Dev

使用在线.txt文件作为数据源

来自分类Dev

使用flask变量作为高图的数据源

来自分类Dev

使用json作为Assemble.io的数据源

来自分类Dev

使用Node.js作为图表的数据源

来自分类Dev

Emberjs:使用Websocket后端作为数据源

来自分类Dev

使用HTML类作为Highcharts表数据源

来自分类Dev

直接使用领域对象(即作为CollectionView数据源)

来自分类Dev

使用sframe作为数据源绘制箱线图

来自分类Dev

使用GeoDataFrame作为osgeo.ogr数据源

来自分类Dev

使用 unbouded 数据源的 fixedWindow 作为 parDo 的侧输入?

Related 相关文章

  1. 1

    Hbase使用整数作为行键和字典顺序

  2. 2

    使用静态类作为数据源

  3. 3

    使用Spark从HBase读取特定的列数据

  4. 4

    Websphere MQ作为Apache Spark流的数据源

  5. 5

    使用SqlBulkCopy时,将流作为二进制列的数据源

  6. 6

    使用servlet作为数据源时,iggrid的空值失败

  7. 7

    Angular - 使用 FormControlArray 作为数据源时向 mat-table 添加新行

  8. 8

    使用熊猫从在线数据源读取时是否可以指定数据频率?

  9. 9

    Kendo UI 使用外键从数据源获取数据

  10. 10

    Spring&JDBCTemplate:使用基础数据源时资源是否会自动关闭?

  11. 11

    Spring&JDBCTemplate:使用基础数据源时资源是否会自动关闭?

  12. 12

    调用 cellForItem(at:) 时,UICollectionView 是否在后台使用其数据源?

  13. 13

    无法使用对象数据源进行排序

  14. 14

    使用字典作为组合框数据源

  15. 15

    直接使用领域对象(即作为CollectionView数据源)

  16. 16

    使用json作为Assemble.io的数据源

  17. 17

    如何使用DataSet作为数据源过滤DataGridView

  18. 18

    使用存储桶作为droplink / tree字段的数据源

  19. 19

    使用Observable作为数据源的角度拖放

  20. 20

    使用在线.txt文件作为数据源

  21. 21

    使用flask变量作为高图的数据源

  22. 22

    使用json作为Assemble.io的数据源

  23. 23

    使用Node.js作为图表的数据源

  24. 24

    Emberjs:使用Websocket后端作为数据源

  25. 25

    使用HTML类作为Highcharts表数据源

  26. 26

    直接使用领域对象(即作为CollectionView数据源)

  27. 27

    使用sframe作为数据源绘制箱线图

  28. 28

    使用GeoDataFrame作为osgeo.ogr数据源

  29. 29

    使用 unbouded 数据源的 fixedWindow 作为 parDo 的侧输入?

热门标签

归档