1.写入数据到hbase
首先要保证:
hadoop,zookeeper,hbase,flume,kafka都启动了
(1)进入hadoop的sbin目录启动hdfs
./start-dfs.sh
jps查看:
进入到hbase的bin目录下:
start-hbase.sh
jps查看:
2.创建hbase表
进入hbase命令行
hbase shell
list查看表单
list
数据库中设计好了一个表:‘imooc_course_clickcount’
# 创建表
create 'imooc_course_clickcount','info'
#显示表信息
desc 'imooc_course_clickcount'
#查看表中数据
scan 'imooc_course_clickcount'
3.scala操作hbase
新建一个pojo样例类:CourseClickCount
这里使用java开发了一个工具类访问hbase:
package com.qianliu.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
/** * HBase操作工具类:Java工具类建议采用单例模式封装 */
public class HBaseUtils {
HBaseAdmin admin = null;
Configuration configuration = null;
/** * 私有改造方法 */
private HBaseUtils(){
configuration = new Configuration();
configuration.set("hbase.zookeeper.quorum", "192.168.48.138:2181");
configuration.set("hbase.rootdir", "hdfs://192.168.48.138:8020/hbase");
try {
admin = new HBaseAdmin(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}
private static HBaseUtils instance = null;
public static synchronized HBaseUtils getInstance() {
if(null == instance) {
instance = new HBaseUtils();
}
return instance;
}
/** * 根据表名获取到HTable实例 */
public HTable getTable(String tableName) {
HTable table = null;
try {
table = new HTable(configuration, tableName);
} catch (IOException e) {
e.printStackTrace();
}
return table;
}
/** * 添加一条记录到HBase表 * @param tableName HBase表名 * @param rowkey HBase表的rowkey * @param cf HBase表的columnfamily * @param column HBase表的列 * @param value 写入HBase表的值 */
public void put(String tableName, String rowkey, String cf, String column, String value) {
HTable table = getTable(tableName);
Put put = new Put(Bytes.toBytes(rowkey));
put.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
try {
table.put(put);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
//HTable table = HBaseUtils.getInstance().getTable("imooc_course_clickcount");
//System.out.println(table.getName().getNameAsString());
String tableName = "imooc_course_clickcount" ;
String rowkey = "20171111_88";
String cf = "info" ;
String column = "click_count";
String value = "2";
HBaseUtils.getInstance().put(tableName, rowkey, cf, column, value);
}
}
scala可以借助这个java的HBaseUtils来访问数据库:
package com.qianliu.dao
import com.qianliu.domain.{CourseClickCount, CourseSearchClickCount}
import com.qianliu.utils.HBaseUtils
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.mutable.ListBuffer
/** * 从搜索引擎过来的实战课程点击数-数据访问层 */
object CourseSearchClickCountDAO {
val tableName = "imooc_course_search_clickcount"
val cf = "info"
val qualifer = "click_count"
/** * 保存数据到HBase * * @param list CourseSearchClickCount集合 */
def save(list: ListBuffer[CourseSearchClickCount]): Unit = {
val table = HBaseUtils.getInstance().getTable(tableName)
for(ele <- list) {
table.incrementColumnValue(Bytes.toBytes(ele.day_search_course),
Bytes.toBytes(cf),
Bytes.toBytes(qualifer),
ele.click_count)
}
}
/** * 根据rowkey查询值 */
def count(day_search_course: String):Long = {
val table = HBaseUtils.getInstance().getTable(tableName)
val get = new Get(Bytes.toBytes(day_search_course))
val value = table.get(get).getValue(cf.getBytes, qualifer.getBytes)
if(value == null) {
0L
}else{
Bytes.toLong(value)
}
}
def main(args: Array[String]): Unit = {
val list = new ListBuffer[CourseSearchClickCount]
list.append(CourseSearchClickCount("20171111_www.baidu.com_8",8))
list.append(CourseSearchClickCount("20171111_cn.bing.com_9",9))
save(list)
println(count("20171111_www.baidu.com_8") + " : " + count("20171111_cn.bing.com_9"))
}
}
测试HBaseUtils.java的访问类是否可以正常使用。
前后的两次:scan ‘imooc_course_clickcount’
表明数据hbase插入数据成功!
4.sparkstreaming的清洗的数据写入hbase
package com.qianliu
import com.qianliu.dao.CourseClickCountDAO
import com.qianliu.domain.{ClickLog, CourseClickCount}
import com.qianliu.utils.DateUtils
import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.ListBuffer
/** * Spark Streaming对接Kafka */
object KafkaStreamingApp {
def main(args: Array[String]): Unit = {
//判断输入的数据长度是否符合要去
if(args.length != 4) {
System.err.println("Usage: KafkaStreamingApp <zkQuorum> <group> <topics> <numThreads>")
}
//初始化输入的数据为Array
val Array(zkQuorum, group, topics, numThreads) = args
//初始化sparkcontext
val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount")
.setMaster("local[2]")
sparkConf.set("spark.testing.memory", "512000000")
//sparkstreamingcontext
val ssc = new StreamingContext(sparkConf, Seconds(60))
//将多个topic输入
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
// TODO... Spark Streaming如何对接Kafka
val messages = KafkaUtils.createStream(ssc, zkQuorum, group,topicMap)
// TODO... 自己去测试为什么要取第二个
//messages.map(_._2).count().print()
//第一步:数据清洗:清洗出来的数据为(ip,时间戳,课程号,http状态码,url)
val logs = messages.map(_._2)
val cleanData = logs.map(line => {
val infos = line.split("\t")
// infos(2) = "GET /class/130.html HTTP/1.1"
// url = /class/130.html
val url = infos(2).split(" ")(1)
var courseId = 0
// 把实战课程的课程编号拿到了
if (url.startsWith("/class")) {
val courseIdHTML = url.split("/")(2)
courseId = courseIdHTML.substring(0, courseIdHTML.lastIndexOf(".")).toInt
}
ClickLog(infos(0), DateUtils.parseToMinute(infos(1)), courseId, infos(3).toInt, infos(4))
}).filter(clicklog => clicklog.courseId != 0)
//打印清洗出来的日志
//cleanData.print()
//第二步,将清洗出来的数据拼接成数据库的数据类型
//统计今天到现在为止实战课程的访问量
cleanData.map(x => {
// HBase rowkey设计: 20171111_88
(x.time.substring(0, 8) + "_" + x.courseId, 1)
}).reduceByKey(_ + _).foreachRDD(rdd => {
rdd.foreachPartition(partitionRecords => {
val list = new ListBuffer[CourseClickCount]
partitionRecords.foreach(pair => {
list.append(CourseClickCount(pair._1, pair._2))
})
CourseClickCountDAO.save(list)
})
})
//启动spark
ssc.start()
ssc.awaitTermination()
}
}
运行前后的数据hbase中的数据:
表明已经写入到了hbase,而且数据为6条是因为我们伪造的课程只有六个。