开心飞艇官网好程序员-千锋教育旗下高端IT职业教育品牌

400-811-9990
我的账户
好程序员

专注高端IT职业培训

亲爱的猿猿,欢迎!

已有账号,请

如尚未注册?

  • 客服QQ
  • 官方微信

    好程序员

    专注高端IT职业培训

[BigData] 好程序员大数据培训分享SparkSQl

[复制链接]
805 0
叶子老师 发表于 2019-8-14 17:41:32 | 只看该作者 |只看大图 |阅读模式 打印 上一主题 下一主题
  好程序员大数据培训分享SparkSQl,Spark SQLSpark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。SparkSql中返回的数据类型是DataFrame
1.1.1.   为什么要学习Spark SQL
我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!
HIVE:简化编写MapReduce的程序的复杂性
Spark SQL转换成RDD:替代MapReduce,提高效率
Spark1.0版本开始就推出了SparkSQL,最早是叫Shark
1、内存列存储--可以大大优化内存使用效率,减少了内存消耗,避免了gc对大量数据的性能开销
2、字节码生成技术(byte-code generation--可以使用动态字节码生成技术来优化性能
3Scala代码的优化
  结构化数据是指任何有结构信息的数据。所谓结构信息,就是每条记录共用的已知的字段集合。当数据符合 这样的条件时,Spark SQL 就会使得针对这些数据的读取和查询变得更加简单高效。具体 来说,Spark SQL 提供了以下三大功能(见图 9-1)。
(1) Spark SQL 可以从各种结构化数据源(例如 JSONHiveParquet 等)中读取数据。
(2) Spark SQL 不仅支持在 Spark 程序内使用 SQL 语句进行数据查询,也支持从类似商业 智能软件 Tableau 这样的外部工具中通过标准数据库连接器(JDBC/ODBC)连接 Spark SQL 进行查询。
(3) 当在 Spark 程序内使用 Spark SQL 时,Spark SQL 支持 SQL 与常规的 Python/Java/Scala 代码高度整合,包括连接 RDD SQL 表、公开的自定义 SQL 函数接口等。这样一来, 许多工作都更容易实现了。
为了实现这些功能,Spark SQL 提供了一种特殊的 RDD,叫作 SchemaRDDSchemaRDD 是存放 Row 对象的 RDD,每个 Row 对象代表一行记录。SchemaRDD 还包含记录的结构信 息(即数据字段)。SchemaRDD 看起来和普通的 RDD 很像,但是在内部,SchemaRDD 可 以利用结构信息更加高效地存储数据。此外,SchemaRDD 还支持 RDD 上所没有的一些新 操作,比如运行 SQL 查询。SchemaRDD 可以从外部数据源创建,也可以从查询结果或普 通 RDD 中创建。
什么是DataFrames
(SparkSql中返回的数据类型: 它在概念上等同于关系数据库中的表,但在查询上进行了优化)
RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema
1.1.1.   创建DataFrames
Spark SQLSQLContext是创建DataFrames和执行SQL的入口,在spark-1.6.1中已经内置了一个sqlContext
1.在本地创建一个文件,有三列,分别是idnameage,用空格分隔,然后上传到hdfs
hdfs dfs -put person.txt /
2.spark shell执行下面命令,读取数据,将每一行的数据使用列分隔符分割
val lineRDD = sc.textFile("hdfs://node01:9000/person.txt").map(_.split(" "))
3.定义case class(相当于表的schema
case class Person(id:Int, name:String, age:Int)
4.RDDcase class关联
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
  (里面的数据是在Array)
5.RDD转换成DataFrame
val personDF = personRDD.toDF
6.DataFrame进行处理
personDF.show
val seq1 = Seq(("1","bingbing",35),("2","yuanyuan",34),("3","mimi",33))
val rdd1 =sc.parallelize(seq1)
val df = rdd1.toDF("id","name","age")
df.show
DSL:领域特定语言
////查看DataFrame中的内容
//查看DataFrame部分列中的内容
1.
2.
3.
//打印DataFrameSchema信息
//查询所有的nameage,并将age+1
1.df.select(col("id"),col("name"),col("age")+1).show
2.df.select(df("id"), df("name"), df("age") + 1).show
//过滤age大于等于18
df.filter(col("age") >= 35).show
//按年龄进行分组并统计相同年龄的人数
df.groupBy("age").count().show()
SQL风格语法
//查询年龄最大的前两名
1.如果想使用SQL风格的语法,需要将DataFrame注册成表
df.registerTempTable("t_person")
2.sqlContext.sql("select * from t_person order by age desc limit 2").show
//显示表的Schema信息
以编程方式执行Spark SQL查询
1.编写Spark SQL查询程序
1.通过反射推断Schema
=======================================================
package com.qf.gp1708.day06
//通过反射获取用户信息
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object InferSchema {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("inferschema")
    val sc = new SparkContext(conf)
    val sqlContext:SQLContext = new SQLContext(sc)
  1.  //获取数据并切分
    val line = sc.textFile("C://Users/Song/Desktop/person.txt").map(_.split(","))
   3 //将获取的数据和Person样例类进行关联
    val personRdd: RDD[Godness] = line.map(arr=>Godness(arr(0).toLong,arr(1),arr(2).toInt,arr(3).toInt))
   
    //引入隐式转换函数,这样才可以调用到toDF方法
    import sqlContext.implicits._
   
   4 //将personRDD转换成DataFrame
    val dF: DataFrame = personRdd.toDF
  5.  //注册一张临时表
    dF.registerTempTable("t_person")
    val sql = "select * from t_person where fv > 70 order by age"
    //查询
    val res: DataFrame = sqlContext.sql(sql)
    res.show()
    sc.stop()
  }
}
2//创建样例类
case class Godness(id:Long,name:String,age:Int,fv:Int)
=========================================================
2.通过StructType直接指定Schema
===========================================
package com.qf.gp1708.day06
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
/**
  * 通过StructType类型直接指定Schema
  */
object StructTypeSchema {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("str")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    //获取数据并切分
    val lines = sc.textFile("hdfs://...").map(_.split(","))
    //指定schema信息
    StructType{
      List(
        StructField("id",IntegerType,false),
        StructField("name",StringType,true),
        StructField("age",IntegerType,true),
        StructField("fv",IntegerType,true),
      )
    }
    //开始映射
    val rowRDD: RDD[Row] = lines.map(arr =>Row(arr(0).toInt,arr(1),arr(2).toInt,arr(3).toInt))
    //把RDD转换为DataFrame
    val personDF: DataFrame = sqlContext.createDataFrame(rowRDD,schema)
    //生成临时表
    personDF.registerTempTable("t_person")
    val sql = "select name,age,fv from t_person where age >30 order by age desc"
    val res = sqlContext.sql(sql)
   
    res.write.mode("append").json("c://out-20180903-1")
   
    sc.stop()
  }
}
=================================================================
1.   数据源
1.1. JDBC
Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。
1.1.1.   MySQL中加载数据(Spark Shell方式)
1.启动Spark Shell,必须指定mysql连接驱动jar
/usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-shell \
--master spark://node01:7077 \
--jars /usr/local/spark-1.6.1-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \
  (指定MySQL包)
--driver-class-path /usr/local/spark-1.6.1-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar        (指定驱动类)
2.mysql中加载数据
val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:mysql://node03:3306/bigdata", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "person", "user" -> "root", "password" -> "root")).load()
3.执行查询
jdbcDF.show()
1.1.2.   将数据写入到MySQL中(打jar包方式)
package com.qf.gp1708.day06
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
/**
  * 写入数据到MySQL
  */
object InsertData2MySQLDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val lines= sc.textFile("").map(_.split(","))
    //生成Schema
    val schema = StructType {
      Array(
        StructField("name", StringType, true),
        StructField("age", IntegerType, true),
        StructField("fv", StringType, true),
      )
    }
    //映射
    val personRDD = lines.map(arr =>Row(arr(1).toString,arr(2).toInt,arr(3).toInt))
    //生成DataFrame
    val personDF = sqlContext.createDataFrame(personRDD,schema)
    //生成用于写入MySQL的配置信息
    val prop = new Properties()
    prop.put("user","root")
    prop.put("password","root")
    prop.put("driver","com.mysql.jdbc.Driver")
    val jdbcUrl="jdbc:mysql://hadoop03:3306/bigdata"
    val table="person"
    //把数据写入MySQL
    personDF.write.mode("append").jdbc(jdbcUrl,table,prop)
    sc.stop()
  }
}
/usr/local/spark-1.6.3-bin-hadoop2.6/spark-submit \
--class com.qf..... \
--master spark://hadoop01:7077 \
--executor-memory 512m \
--total-executor-cores 2 \
--jars /usr/.../mysql-connector-java-5.1.35-bin.jar \
--driver-class-path /usr/.../mysql-connector-java-5.1.35-bin.jar \
/root/1.jar
=======================================================
kafka:消息中间件(缓存数据)---解耦
  为处理实时数据提供一个统一、高吞吐量、低等待的平台
  3、为什么需要消息队列(重要、了解)
  消息系统的核心作用就是三点:解耦,异步和并行
  Kafka对消息保存时根据Topic进行归类
  Topic:底层就是队列,将不同的消息放在不同的队列中进行分类
  
  发布/订阅模式:1对多
  
  JMS:

好程序员大数据培训分享:http://cdxfjz.com/


精彩内容,一键分享给更多人!
收藏
收藏0
转播
转播
分享
淘帖0
支持
支持0
反对
反对0
回复

使用道具 举报

您需要登录后才可以回帖

本版积分规则

关注我们
千锋好程序员

开心飞艇官网北京校区(总部):北京市海淀区宝盛北里西区28号中关村智诚科创大厦

深圳西部硅谷校区:深圳市宝安区宝安大道5010号深圳西部硅谷B座A区605-619

杭州龙驰智慧谷校区:浙江省杭州市下沙经济技术开发区元成路199号龙驰智慧谷B座7层

郑州校区:郑州市二七区航海中路60号海为科技园C区10层、12层

开心飞艇官网Copyright 2007-2019 北京千锋互联科技有限公司 .All Right

京ICP备12003911号-5 京公安网11010802011455号

开心飞艇官网请您保持通讯畅通1对1咨询马上开启