Spark中怎么实现聚合功能,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
互联网公司-面试题:
/**
举个例子,比如要统计用户的总访问次数和去除访问同一个URL之后的总访问次数,随便造了几条样例数据(四个字段:id,name,vtm,url,vtm字段本例没用,不用管)如下:
id1,user1,2,http://www.hupu.com
id1,user1,2,http://www.hupu.com
id1,user1,3,http://www.hupu.com
id1,user1,100,http://www.hupu.com
id2,user2,2,http://www.hupu.com
id2,user2,1,http://www.hupu.com
id2,user2,50,http://www.hupu.com
id2,user2,2,http://touzhu.hupu.com
根据这个数据集,我们可以写hql 实现:
select id,name, count(0) as ct,count(distinct url) as urlcount
from table
group by id,name;
得出结果应该是:
id1,user1,4,1
id2,user2,4,2
下面用Spark实现这个聚合功能<发现Spark还是有难度的,卧槽>
简单说说MR的解析过程:
map阶段: id和name组合为key, url为value
reduce阶段: len(urls) 出现次数, len(set(urls)) 出现用户数
由于本人是不写MR,导致面试很尴尬。
想装逼写个Spark, 发现难度很大,因为的确很多函数不熟悉。
代码如下:
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkDemo2 {
def main(args: Array[String]) {
case class User(id: String, name: String, vtm: String, url: String)
//val rowkey = (new RowKey).evaluate(_)
// val HADOOP_USER = "hdfs"
// 设置访问spark使用的用户名
// System.setProperty("user.name", HADOOP_USER);
// 设置访问hadoop使用的用户名
// System.setProperty("HADOOP_USER_NAME", HADOOP_USER);
val conf = new SparkConf().setAppName("wordcount").setMaster("local") //.setExecutorEnv("HADOOP_USER_NAME", HADOOP_USER)
val sc = new SparkContext(conf)
val data = sc.textFile("/Users/jiangzl/Desktop/test.txt")
val rdd1 = data.map(line => {
val r = line.split(",")
User(r(0), r(1), r(2), r(3))
})
val rdd2 = rdd1.map(r => ((r.id, r.name), r))
val seqOp = (a: (Int, List[String]), b: User) => a match {
case (0, List()) => (1, List(b.url))
case _ => (a._1 + 1, b.url :: a._2)
}
val combOp = (a: (Int, List[String]), b: (Int, List[String])) => {
(a._1 + b._1, a._2 ::: b._2)
}
println("-----------------------------------------")
val rdd3 = rdd2.aggregateByKey((0, List[String]()))(seqOp, combOp).map(a => {
(a._1, a._2._1, a._2._2.distinct.length)
})
rdd3.collect.foreach(println)
println("-----------------------------------------")
sc.stop()
}
}
解决方案-报错Scala版本问题:Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.VolatileObjectRef.zero()Lscala/runtime/VolatileObjectRef;
修改Scala版本2.11.7改为2.10.4
simple.sbt
name := "SparkDemo Project"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1"
———————————————————————————修改为:——————————————————————————
name := "SparkDemo Project"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1"
运行过程
jiangzhongliandeMacBook-Pro:spark-1.4.1-bin-hadoop2.6 jiangzl$ ./bin/spark-submit --class "SparkDemo2" ~/Desktop/tmp/target/scala-2.11/simple-project_2.11-1.0.jar
Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.VolatileObjectRef.zero()Lscala/runtime/VolatileObjectRef;
at SparkDemo2$.main(tmp_spark.scala)
at SparkDemo2.main(tmp_spark.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
———————————————————————————修改为:——————————————————————————
jiangzhongliandeMacBook-Pro:spark-1.4.1-bin-hadoop2.6 jiangzl$ ./bin/spark-submit --class "SparkDemo2" ~/Desktop/tmp/target/scala-2.10/sparkdemo-project_2.10-1.0.jar
16/04/29 12:40:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-----------------------------------------
((id1,user1),4,1)
((id2,user2),4,2)
-----------------------------------------
看完上述内容,你们掌握Spark中怎么实现聚合功能的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注天达云行业资讯频道,感谢各位的阅读!