spark练习总结

spark练习总结

嵌套json处理

json形如{“ope”:12,”user”:[{“apple”:13,”pear”:{“color”:”yellow”}},{“apple”:15,”pear”:{“color”:”red”}}]}

这种一层嵌套一层。

在网上搜到两种处理方法。

1.得到map/list结构

List(Map(name->”sdf”,dict->List(…))

1
2
3
4
5
6
7
8
9
10
11
12
  //repp为json字符串
def regJson(json: Option[Any]) = json match {
case Some(map: Map[String, Any]) => map
// case other => "Unknow data structure : " + other
}
val jsonS = JSON.parseFull(repp)
val first = regJson(jsonS)
val rddData = sc.parallelize(first.values.toList.head.asInstanceOf[List[Object]])//转化成RDD

import org.json4s._
import org.json4s.jackson.JsonMethods._
val z = rddData.map(x => compact(render(Extraction.decompose(x)(DefaultFormats)))).collect()//得到RDD转回的json格式

2.利用sparksql

遇到array用explode散开,遇到dict当做父子关系。比如pear.color,再比如explode(user)

{“ope”:12,”user”:[{“apple”:13,”pear”:{“color”:”yellow”}},{“apple”:15,”pear”:{“color”:”red”}}]}

1
2
3
4
5
6
7
8
9
10
11
    val conf = new SparkConf().setMaster("local").setAppName("JSONApp");
//通过conf来创建sparkcontext
val sc = new SparkContext(conf);
val ss = SparkSession.builder().config(conf).getOrCreate()
ss.read.json("tmp.json").createOrReplaceTempView("users") //得到复杂的json文件
val df1 = ss.sql("select explode(user) from users")
df1.printSchema()
df1.show()
val dft = df1.select(df1("col.id"), df1("col.info"), df1("col.hits")).toDF("id", "info", "hits")

dft.toJSON.collectAsList().toString()//得到dataframe的json形式

dataframe常用操作

遍历

1
2
3
4
5
6
7
8
9
val ids = dft.select(dft("id")).collect() //先求collect
val len = ids.length//再求长度
var q = 0
var opess = dft.select(dft("id"))
while (q < len) {
val table = dft.where("id=%s".format(ids(q)(0)))
println(ids(q)(0))//ids(q)是一个row
q+=1
}

但是求collect需要慎重,因为很可能dataframe数值很多很大,引起问题。

json构造dataframe(一个有趣的知识点)

https://blog.csdn.net/liangrui1988/article/details/97665409

需要构造好scheme

1
2
3
4
5
6
7
8
9
10
11
12
//压平list还要构造结构(因为未来还要还原结构)
val arrayStruct = ArrayType(StructType(Seq(
StructField("date", StringType, true),
StructField("name", StringType, true),
StructField("rid", StringType, true),
StructField("value", LongType, true),
StructField("days", StringType, true))), true)
val ashema = StructType(List(StructField("oid", StringType, true), StructField("recs", arrayStruct, true)))
val bshema = StructType(List(StructField("oid", StringType, true), StructField("reps", arrayStruct, true)))
val schema = StructType(List(StructField("recs", StringType, true)))
val schema2 = StructType(List(StructField("reps", StringType, true)))
// true代表不为空

UDF

可以使用自定义方法,在SQL语句和列。

写的比较好的博客:

https://www.cnblogs.com/cc11001100/p/9463909.html

https://www.jianshu.com/p/b1e9d5cc6193

特殊方法

scala判断类型方法 getClass()

scala强制转换方法.asInstanceOf[List[Object]] (注意是中括号)