from mysql #

val df_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://dev-db/finance").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "(select a.*,b.productBaseDetailSeq,b.optionSrl,b.paymentSrl,b.amount,b.salesSupplyFee,b.salesSupplyFeeVat,b.paymentSupplyFee,b.paymentSupplyFeeVat,b.count from product_base a join product_base_detail b on a.productBaseSeq=b.productBaseSeq) as prbase").option("user", "ididid").option("password", "xxxxx").load()

df_mysql.registerTempTable("prbase")
val amountByOption = sqlContext.sql("select optionSrl, amountType, sum(amount) amount from prbase group by optionSrl, amountType")
amountByOption.show()
amountByOption.write.parquet("prbase.amountByOption.par")


# loading parquet
val amountByOption = sqlContext.parquetFile("prbase.amountByOption.par")

amountByOption.registerTempTable("amounts")
val amtByType = sqlContext.sql("select amountType, sum(amount) from amounts group by amountType")
amtByType.show()


load csv #

spark-shell --packages com.databricks:spark-csv_2.11:1.4.0
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("export.csv")
df.registerTempTable("base")
val amtByOp = sqlContext.sql("select optionSrl, amountType, sum(amount) amount from base group by optionSrl, amountType order by amount desc")
amtByOp.show()
amtByOp.write.parquet("base.amtByOp.par")

Valid XHTML 1.0! Valid CSS! powered by MoniWiki
last modified 2021-07-19 23:59:18
Processing time 0.0054 sec