现在Spark支撑四种办法从数据库中读取数据,这里以Mysql为例进行介绍。
一、不指定查询条件
这个办法连接MySql的函数原型是:
def jdbc(url: String, table: String, properties: Properties): DataFrame
咱们只需求供给Driver的url,需求查询的表名,以及连接表有关特点properties。下面是详细比如:
val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog"
val prop = new Properties()
val df = sqlContext.read.jdbc(url, "iteblog", prop )
println(df.count())
println(df.rdd.partitions.size)
咱们运转上面的程序,能够看到df.rdd.partitions.size输出成果是1,这个成果的意义是iteblog表的所有数据都是由RDD的一个分区处理的,所以说,假如你这个表很大,很可能会呈现OOM
WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 14, spark047219):
java.lang.OutOfMemoryError: GC overhead limit exceeded at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3380)
这种办法在数据量大的时分不主张运用。
二、指定数据库字段的规模
这种办法即是经过指定数据库中某个字段的规模,可是惋惜的是,这个字段有必要是数字,来看看这个函数的函数原型:
def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame
前两个字段的意义和办法一相似。columnName即是需求分区的字段,这个字段在数据库中的类型有必要是数字;lowerBound即是分区的下界;upperBound即是分区的上界;numPartitions是分区的个数。同样,咱们也来看看怎么运用:
val lowerBound = 1
val upperBound = 100000
val numPartitions = 5
val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog"
val prop = new Properties()
val df = sqlContext.read.jdbc(url, "iteblog", "id", lowerBound, upperBound, numPartitions, prop)
这个办法能够将iteblog表的数据散布到RDD的几个分区中,分区的数量由numPartitions参数决议,在抱负情况下,每个分区处理一样数量的数据,咱们在运用的时分不主张将这个值设置的比较大,由于这可能致使数据库挂掉!可是根据前面介绍,这个函数的缺陷即是只能运用整形数据字段作为分区关键词。
这个函数在极端情况下,也即是设置将numPartitions设置为1,其意义和第一种办法共同。
三、根据恣意字段进行分区
根据前面两种办法的约束,Spark还供给了根据恣意字段进行分区的办法,函数原型如下:
def jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame
这个函数相比第一种办法多了predicates参数,咱们能够经过这个参数设置分区的根据,来看看比如:
val predicates = Array[String]("reportDate <= '2014-12-31'",
"reportDate > '2014-12-31' and reportDate <= '2015-12-31'")
val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog"
val prop = new Properties()
val df = sqlContext.read.jdbc(url, "iteblog", predicates, prop)
最终rdd的分区数量就等于predicates.length。
四、经过load获取
Spark还供给经过load的办法来读取数据。
sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog",
"dbtable" -> "iteblog")).load()
options函数支撑url、driver、dbtable、partitionColumn、lowerBound、upperBound以及numPartitions选项,仔细的同学必定发现这个和办法二的参数共同。是的,其内部完成原理部分和办法二大体共同。一起load办法还支撑json、orc等数据源的读取
(责任编辑:最模板) |