项目作者: bhavik9243

项目描述 :
MYSQL | SQOOP | SPARK | HIVE工作流程
高级语言: Scala
项目地址: git://github.com/bhavik9243/sqoop-spark-hive.git
创建时间: 2018-03-21T17:33:09Z
项目社区:https://github.com/bhavik9243/sqoop-spark-hive

开源协议:

下载


mysql,sqoop,spark,hive workflow pipeline

it will generate daily revenue from two tables orders and order_items

Sqoop

  • Import orders and order_items data from mysql to hdfs in “parquet” format
  • here we are creating sqoop job for incremental data imports
commands

sqoop job from orders data

  1. # sqoop job for loda data to hdfs from mysql "retail_db" database
  2. sqoop job --create loadorders \
  3. -- import \
  4. --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  5. --username retail_dba \
  6. --password-file /project/sqoop/sqoop.password \
  7. --table orders \
  8. --target-dir /project/data/orders \
  9. --as-parquetfile \
  10. --check-column order_id \
  11. --incremental append \
  12. --last-value 0

sqoop job from order_items data

  1. # sqoop job order_items table
  2. sqoop job --create loadorderitems \
  3. -- import \
  4. --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  5. --username retail_dba \
  6. --password-file /project/sqoop/sqoop.password \
  7. --table order_items \
  8. --target-dir /project/data/order_items \
  9. --as-parquetfile \
  10. --check-column order_item_id \
  11. --incremental append \
  12. --last-value 0
  1. $ sqoop job --list
  2. $ sqoop job --exec loadorders
  3. $ sqoop job --exec loadorderitems

Spark

  • Spark read parquet data and create dataframe and then perform query to compute daily_revenue
  • daily revenue will be stored as parquet file in hdfs

code

  1. import org.apache.spark.{SparkConf,SparkContext}
  2. import org.apache.spark.sql.SQLContext
  3. object myApp {
  4. def main(args: Array[String]): Unit = {
  5. // creating spark conf and context
  6. val conf = new SparkConf().setAppName("daily_revenue_app")
  7. val sc = new SparkContext(conf)
  8. val sqlContext = new SQLContext(sc)
  9. // repartition to set 2
  10. sqlContext.setConf("spark.sql.shuffle.partitions", "2")
  11. //read parquet data
  12. val orderDF = sqlContext.read.parquet(args(0))
  13. val orderItemsDF = sqlContext.read.parquet(args(1))
  14. // register df to temp table
  15. orderDF.registerTempTable("orders")
  16. orderItemsDF.registerTempTable("order_items")
  17. val dailyRevenue = sqlContext.sql(
  18. """
  19. |SELECT from_unixtime(o.order_date/1000) as order_date,sum(oi.order_item_subtotal) as daily_revenue
  20. |FROM orders o join order_items oi on o.order_id = oi.order_item_order_id
  21. |WHERE o.order_status IN('COMPLETE')
  22. |GROUP BY from_unixtime(o.order_date/1000)
  23. |ORDER BY daily_revenue DESC
  24. """.stripMargin)
  25. // save result to hdfs as parquet file
  26. dailyRevenue.write.mode("overwrite").parquet(args(2))
  27. }
  28. }

execution

we need to create jar file to submit

  1. spark-submit --master yarn \
  2. --class com.bhavik.myApp \
  3. --num-executors 2 \
  4. --executor-memory 512M \
  5. myapp_2.10-1.0.jar \
  6. /project/data/orders /project/data/order_items /project/output/daily_revenue

Hive

  • here we are creating hive database and external table to read daily_revenue parquet data computed by spark

query

open hive shell and execute below queries

  1. hive > CREATE DATABASE project;
  2. hive > USE project;
  3. hive > CREATE EXTERNAL TABLE daily_revenue(
  4. order_date string,
  5. daily_revenue double)
  6. STORED AS parquet
  7. LOCATION '/project/output/daily_revenue';

get data from that table

  1. SELECT * FROM daily_revenue limit 20;
##end