本文实现了Spark与Habse之间的简单整合,通过一些入门的案例,有助于理解他们之间的API操作
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object HBaseRDD{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("APP").setMaster("local[*]")
val sc = new SparkContext(conf)
//HBase的配置,为了连接HBase
val configuration = HBaseConfiguration.create()
1.查询数据
//设置与Hbase连接的表名
configuration.set(TableInputFormat.INPUT_TABLE,"student")
val hbaseRDD:RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(configuration,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
hbaseRDD.foreach{
case(rowKey,result)=>{
result.rawCells().foreach(cell=>{
println(Bytes.toString(rowKey.get())+
"="+Bytes.toString(cellUtil.cloneValue(cell)))
})
}
}
2.插入数据
//准备数据
val dataRDD:RDD[(String,String)]=sc.makeRDD(Array(("1","zhangsan"),("2","lisa")))
//转换成Hbase能识别的数据格式(rowKey,put)
//主要方法有rowkey,put,get,scan
val mapRDD = data.map{
case(id,name)=>{
val ids = Bytes.toBytes(id)
val rowkey = new ImmutableBytesWritable(ids)
val put = new Put(ids)
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(name))
//将rowKey,put封装成元组返回
(rowKey,put)
}
}
//保存数据
//这一步要关联Hbase的configuration,告诉客户端去哪里连接Hbase
val jobConf = new JobConf(configuration)
//设置输出格式
jobConf.setoutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE,"student")
mapRDD.saveAsHadoopDataSet(jobConf)
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。