我将time-series
资料储存在中HBase
。行键由user_id
和组成timestamp
,如下所示:
{
"userid1-1428364800" : {
"columnFamily1" : {
"val" : "1"
}
}
}
"userid1-1428364803" : {
"columnFamily1" : {
"val" : "2"
}
}
}
"userid2-1428364812" : {
"columnFamily1" : {
"val" : "abc"
}
}
}
}
现在,我需要执行每个用户的分析。这是hbase_rdd
(从这里开始)的初始化
sc = SparkContext(appName="HBaseInputFormat")
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD(
"org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result",
keyConverter=keyConv,
valueConverter=valueConv,
conf=conf)
类似于mapreduce的自然处理方式为:
hbase_rdd
.map(lambda row: (row[0].split('-')[0], (row[0].split('-')[1], row[1]))) # shift timestamp from key to value
.groupByKey()
.map(processUserData) # process user's data
在执行第一个映射(将时间戳从键更改为值)时,至关重要的是要知道当前用户的时间序列数据何时完成,因此可以启动groupByKey转换。因此,我们不需要映射所有表并存储所有临时数据。这是可能的,因为hbase以排序的顺序存储行键。
使用hadoop流,可以通过以下方式完成:
import sys
current_user_data = []
last_userid = None
for line in sys.stdin:
k, v = line.split('\t')
userid, timestamp = k.split('-')
if userid != last_userid and current_user_data:
print processUserData(last_userid, current_user_data)
last_userid = userid
current_user_data = [(timestamp, v)]
else:
current_user_data.append((timestamp, v))
问题是:如何在Spark中利用hbase键的排序顺序?
我对从HBase提取数据的方式所获得的保证并不十分熟悉,但是如果我理解正确的话,我只能用普通的Spark来回答。
你有一些RDD[X]
。据Spark所知,其中的X
sRDD
是完全无序的。但是您具有一些外部知识,可以保证数据实际上是按某个字段分组的X
(甚至可能按另一个字段排序的)。
在这种情况下,您mapPartitions
实际上可以使用与hadoop流相同的功能。这样一来,您就可以遍历一个分区中的所有记录,因此可以查找具有相同键的记录块。
val myRDD: RDD[X] = ...
val groupedData: RDD[Seq[X]] = myRdd.mapPartitions { itr =>
var currentUserData = new scala.collection.mutable.ArrayBuffer[X]()
var currentUser: X = null
//itr is an iterator over *all* the records in one partition
itr.flatMap { x =>
if (currentUser != null && x.userId == currentUser.userId) {
// same user as before -- add the data to our list
currentUserData += x
None
} else {
// its a new user -- return all the data for the old user, and make
// another buffer for the new user
val userDataGrouped = currentUserData
currentUserData = new scala.collection.mutable.ArrayBuffer[X]()
currentUserData += x
currentUser = x
Some(userDataGrouped)
}
}
}
// now groupedRDD has all the data for one user grouped together, and we didn't
// need to do an expensive shuffle. Also, the above transformation is lazy, so
// we don't necessarily even store all that data in memory -- we could still
// do more filtering on the fly, eg:
val usersWithLotsOfData = groupedRDD.filter{ userData => userData.size > 10 }
我意识到您想使用python-对不起,我想如果我使用Scala编写示例,我更有可能得到正确的示例。而且我认为类型注释使含义更清晰,但它可能是Scala偏见... :)。无论如何,希望您能理解正在发生的事情并进行翻译。(不要过分担心flatMap
和Some
和None
,也许不重要,如果你理解的想法...)
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句