选择镜像下载压缩包
http://mirrors.hust.edu.cn/apache/hadoop/core/hadoop-3.2.1/
解压后配置etc/hadoop下的各个配置文件.jdk的地址以及主机名根据实际情况决定
修改配置文件
hadoop-env.sh
JAVA_HOME=/home/tangsong.math/jdk1.8.0_251
HADOOP_HOME=/home/tangsong.math/repos/hadoop-3.2.1
core-site.xml
<property><name>fs.defaultFS</name><value>hdfs://n227-026-077:9000</value></property>
<property><name>hadoop.tmp.dir</name><value>/home/tangsong.math/repos/hadoop-3.2.1/data</value></property>
(3.2.1版本只有mapred-site.xml但是2.10.1版本只有mapred-site.xml.template)
cp mapred-site.xml.template mapred-site.xml
mapred-site.xml
<property><name>mapreduce.framework.name</name><value>yarn</value></property>
<property><name>mapreduce.jobhistory.address</name><value>n227-026-077:10020</value></property>
<property><name>mapreduce.jobhistory.webapp.address</name><value>n227-026-077:19888</value></property>
<property><name>yarn.app.mapreduce.am.env</name><value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value></property>
<property><name>mapreduce.map.env</name><value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value></property>
<property><name>mapreduce.reduce.env</name><value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value></property>
yarn-site.xml
<property><name>yarn.resourcemanager.hostname</name><value>n227-026-077</value></property>
<property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property>
#配置本机ssh免密登陆
cd ~/.ssh/
ssh-keygen -t rsa //提示按ENTER就行
cat./id_rsa.pub>>./authorized_key //加入授权
#启动hadoop
在hadoop的bin目录执行
sudo ./hdfs namenode -format
sbin目录执行
./start-all.sh
用jps命令看一下,应该有下面几个进程.如果没有正确启动,到hadoop/logs下看看日志,通常是因为没有权限等问题.
DataNode ResourceManager SecondaryNameNode NameNode,NodeManager
查看yarn的历史任务(hadoop3才是默认8088端口)
http://10.227.26.77:8088/cluster
查看集群情况
http://10.227.26.77:9870/dfshealth.html#tab-overview
maven工程配置
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>3.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.2.1</version>
</dependency>
在resource下新建一个log4j.properties内容如下
log4j.rootLogger=DEBUG, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%nWordCount
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class Foo {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
conf.set("fs.defaultFS","hdfs://10.227.26.77:9000");
conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
System.getProperties().setProperty("HADOOP_USER_NAME","root");
FileSystem fs=FileSystem.get(conf);
String inputPath="/hdfsapi/input";
String outputPath="/hdfsapi/output";
fs.mkdirs(new Path(inputPath));
fs.copyFromLocalFile(new Path("/Users/bytedance/repos/test.py"),new Path(inputPath));
fs.close();
Job job= Job.getInstance(conf,"word count");
job.setJarByClass(Foo.class);
job.setMapperClass(WMapper.class);
job.setReducerClass(WReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job,new Path(inputPath));
FileOutputFormat.setOutputPath(job,new Path(outputPath));
job.waitForCompletion(true);
}
}
class WMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
private LongWritable one = new LongWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().split("\\s+");
for (String token : tokens) {
word.set(token);
context.write(word, one);
}
}
}
class WReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (LongWritable val : values) count++;
context.write(key, new LongWritable(count));
}
}
矩阵乘法
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
public class Foo {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://10.227.26.77:9000");
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
conf.set("ARowCount", "4");
conf.set("AColCount", "3");
conf.set("BRowCount", "3");
conf.set("BColCount", "2");
System.getProperties().setProperty("HADOOP_USER_NAME", "root");
FileSystem fs = FileSystem.get(conf);
String inputPath = "/hdfsapi/input";
String outputPath = "/hdfsapi/output";
fs.mkdirs(new Path(inputPath));
fs.copyFromLocalFile(new Path("/Users/bytedance/repos/a.json"), new Path(inputPath));
fs.close();
Job job = Job.getInstance(conf, "matrix multiply");
job.setJarByClass(Foo.class);
job.setMapperClass(WMapper.class);
job.setReducerClass(WReducer.class);
job.setMapOutputKeyClass(MatrixKey.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.waitForCompletion(true);
}
}
class MatrixKey implements WritableComparable<MatrixKey> {
private int row;
public MatrixKey() {
}
public MatrixKey(int row, int col) {
this.row = row;
this.col = col;
}
private int col;
@Override
public int compareTo(MatrixKey o) {
if (this.row > o.row) return 1;
else if (this.row < o.row) return -1;
if (this.col > o.col) return 1;
else if (this.col < o.col) return -1;
return 0;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MatrixKey matrixKey = (MatrixKey) o;
return row == matrixKey.row &&
col == matrixKey.col;
}
@Override
public int hashCode() {
return Objects.hash(row, col);
}
public void setRow(int row) {
this.row = row;
}
public void setCol(int col) {
this.col = col;
}
public int getRow() {
return row;
}
public int getCol() {
return col;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(row);
dataOutput.writeInt(col);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
row = dataInput.readInt();
col = dataInput.readInt();
}
}
// A 4*3 B3*2
class WMapper extends Mapper<LongWritable, Text, MatrixKey, Text> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().split("\\s+");
if (tokens.length != 4) return;
int row = Integer.parseInt(tokens[1]);
int col = Integer.parseInt(tokens[2]);
if (tokens[0].equals("A")) {
//矩阵C是ARowCount*BColCount的.一个aij会对所有的cik,1<=k<=BColCount有效
int count = Integer.parseInt(context.getConfiguration().get("BColCount"));
for (int k = 1; k <= count; k++)
context.write(new MatrixKey(row, k), value);
} else if (tokens[0].equals("B")) {
//矩阵C是RowsAndCols.ARowCount*RowsAndCols.BColCount的.一个bij会对所有的ckj,1<=ARowCount<=k
int count = Integer.parseInt(context.getConfiguration().get("ARowCount"));
for (int k = 1; k <= count; k++)
context.write(new MatrixKey(k, col), value);
}
}
}
class WReducer extends Reducer<MatrixKey, Text, Text, LongWritable> {
@Override
public void reduce(MatrixKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int row = key.getRow();
int col = key.getCol();
//有贡献的是a[row,k]以及b[k,col],1<=k<=
int count = Integer.parseInt(context.getConfiguration().get("AColCount"));
int[] buf1 = new int[count + 1];
int[] buf2 = new int[count + 1];
for (Text value : values) {
String[] tokens = value.toString().split("\\s+");
if (tokens.length != 4) return;
if (tokens[0].equals("A")) {
int k = Integer.parseInt(tokens[2]);
int val = Integer.parseInt(tokens[3]);
buf1[k]+=val;
} else if (tokens[0].equals("B")) {
int k = Integer.parseInt(tokens[1]);
int val = Integer.parseInt(tokens[3]);
buf2[k]+=val;
}
}
int sum=0;
for(int i=1;i<=count;i++)sum+=buf1[i]*buf2[i];
context.write(new Text("C "+row+" "+col),new LongWritable(sum));
}
}
http://mirrors.hust.edu.cn/apache/hadoop/core/hadoop-3.2.1/
解压后配置etc/hadoop下的各个配置文件.jdk的地址以及主机名根据实际情况决定
任务提交到yarn队列运行
此时必须先在本地打好jar然后调用Job的setJar而不是setJarByClass
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Foo {
public static void main(String[] args)
throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://10.227.26.77:9000");
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "n227-026-077");
conf.set("yarn.resourcemanager.resource-tracker.address", "n227-026-077:8031");
conf.set("yarn.resourcemanager.address", "n227-026-077:8032");
conf.set("yarn.resourcemanager.scheduler.address", "n227-026-077:8030");
conf.set("yarn.resourcemanager.admin.address", "n227-026-077:8033");
conf.set(
"yarn.application.classpath",
"/home/tangsong.math/repos/hadoop-3.2.1/etc/hadoop:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/common/lib/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/common/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/hdfs:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/hdfs/lib/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/hdfs/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/mapreduce/lib/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/mapreduce/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/yarn:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/yarn/lib/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/yarn/*");
conf.set("yarn.app.mapreduce.am.env", "HADOOP_MAPRED_HOME=${HADOOP_HOME}");
conf.set("mapreduce.map.env", "HADOOP_MAPRED_HOME=${HADOOP_HOME}");
conf.set("mapreduce.reduce.env", "HADOOP_MAPRED_HOME=${HADOOP_HOME}");
conf.set("HADOOP_HOME", "/home/tangsong.math/repos/hadoop-3.2.1");
conf.set("hadoop.home.dir", "/home/tangsong.math/repos/hadoop-3.2.1");
conf.set(
"mapreduce.application.classpath",
"/home/tangsong.math/repos/hadoop-3.2.1/etc/hadoop:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*:$HADOOP_MAPRED_HOME/share/hadoop/common/*:$HADOOP_MAPRED_HOME/share/hadoop/common/lib/*:$HADOOP_MAPRED_HOME/share/hadoop/yarn/*:$HADOOP_MAPRED_HOME/share/hadoop/yarn/lib/*:$HADOOP_MAPRED_HOME/share/hadoop/hdfs/*:$HADOOP_MAPRED_HOME/share/hadoop/hdfs/lib/*");
conf.set("ARowCount", "4");
conf.set("AColCount", "3");
conf.set("BRowCount", "3");
conf.set("BColCount", "2");
System.getProperties().setProperty("HADOOP_USER_NAME", "root");
FileSystem fs = FileSystem.get(conf);
String inputPath = "/hdfsapi/input";
String outputPath = "/hdfsapi/output";
fs.mkdirs(new Path(inputPath));
fs.copyFromLocalFile(new Path("/Users/bytedance/repos/a.json"), new Path(inputPath));
fs.close();
Job job = Job.getInstance(conf, "matrix multiply");
// job.setJarByClass(Foo.class);
job.setJar("/Users/bytedance/repos/untitled3/target/untitled3-1.0-SNAPSHOT.jar");
job.setMapperClass(WMapper.class);
job.setReducerClass(WReducer.class);
job.setMapOutputKeyClass(MatrixKey.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
int ret = job.waitForCompletion(true) ? 0 : 1;
System.out.println("ret= " + ret);
org.apache.hadoop.mapreduce.Counters counters = job.getCounters();
org.apache.hadoop.mapreduce.Counter counter =
counters.findCounter(IncorrectFormatReason.NOT_A_OR_B);
System.out.println("IncorrectFormatReason.NOT_A_OR_B:" + counter.getValue());
}
}
class MatrixKey implements WritableComparable<MatrixKey> {
private int row;
public MatrixKey() {}
public MatrixKey(int row, int col) {
this.row = row;
this.col = col;
}
private int col;
@Override
public int compareTo(MatrixKey o) {
if (this.row > o.row) return 1;
else if (this.row < o.row) return -1;
if (this.col > o.col) return 1;
else if (this.col < o.col) return -1;
return 0;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MatrixKey matrixKey = (MatrixKey) o;
return row == matrixKey.row && col == matrixKey.col;
}
@Override
public int hashCode() {
return Objects.hash(row, col);
}
public void setRow(int row) {
this.row = row;
}
public void setCol(int col) {
this.col = col;
}
public int getRow() {
return row;
}
public int getCol() {
return col;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(row);
dataOutput.writeInt(col);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
row = dataInput.readInt();
col = dataInput.readInt();
}
}
enum IncorrectFormatReason {
NOT_A_OR_B
}
// A 4*3 B3*2
class WMapper extends Mapper<LongWritable, Text, MatrixKey, Text> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split("\\s+");
if (tokens.length != 4) {
context.getCounter(IncorrectFormatReason.NOT_A_OR_B).increment(1);
return;
}
int row = Integer.parseInt(tokens[1]);
int col = Integer.parseInt(tokens[2]);
if (tokens[0].equals("A")) {
// 矩阵C是ARowCount*BColCount的.一个aij会对所有的cik,1<=k<=BColCount有效
int count = Integer.parseInt(context.getConfiguration().get("BColCount"));
for (int k = 1; k <= count; k++) context.write(new MatrixKey(row, k), value);
} else if (tokens[0].equals("B")) {
// 矩阵C是RowsAndCols.ARowCount*RowsAndCols.BColCount的.一个bij会对所有的ckj,1<=ARowCount<=k
int count = Integer.parseInt(context.getConfiguration().get("ARowCount"));
for (int k = 1; k <= count; k++) context.write(new MatrixKey(k, col), value);
}
}
}
class WReducer extends Reducer<MatrixKey, Text, Text, LongWritable> {
@Override
public void reduce(MatrixKey key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int row = key.getRow();
int col = key.getCol();
// 有贡献的是a[row,k]以及b[k,col],1<=k<=
int count = Integer.parseInt(context.getConfiguration().get("AColCount"));
int[] buf1 = new int[count + 1];
int[] buf2 = new int[count + 1];
for (Text value : values) {
String[] tokens = value.toString().split("\\s+");
if (tokens.length != 4) return;
if (tokens[0].equals("A")) {
int k = Integer.parseInt(tokens[2]);
int val = Integer.parseInt(tokens[3]);
buf1[k] += val;
} else if (tokens[0].equals("B")) {
int k = Integer.parseInt(tokens[1]);
int val = Integer.parseInt(tokens[3]);
buf2[k] += val;
}
}
int sum = 0;
for (int i = 1; i <= count; i++) sum += buf1[i] * buf2[i];
context.write(new Text("C " + row + " " + col), new LongWritable(sum));
}
}
spark安装与WordCount
scala版本与jdk版本一定要匹配否则无法运行。如果用scala 2.12版本或以上需要jdk9或更高,用jdk8报错
jdk8可以用scala 2.11.7
pom如下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>untitled4</artifactId>
<version>1.0-SNAPSHOT</version>
<inceptionYear>2008</inceptionYear>
<properties>
<scala.version>2.11.7</scala.version>
</properties>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.7</version>
</dependency>
<dependency>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-eclipse-plugin -->
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.8</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures>
<classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>
下面分别给出scala版和java版的wordcount
package org.example
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object App {
def main(args: Array[String]): Unit = {
val sparkConf=new SparkConf().setMaster("local").setAppName("WordCount")
val sc=new SparkContext(sparkConf)
val lines:RDD[String]=sc.textFile("/Users/bytedance/Downloads/a.cpp")
val words=lines.flatMap(_.split(" "))
val wordGroup:RDD[(String,Iterable[String])]=words.groupBy(word=>word)
val array=wordGroup.map({
case (word,list)=>(word,list.size)
}).collect()
array.foreach(println)
sc.stop()
}
}
package com.zkdx;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
public class WordCountSpark {
public static void main(String[]args){
SparkConf conf = new SparkConf().setAppName("testWordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("/Users/bytedance/Downloads/a.cpp");
JavaRDD<String> words = lines.flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split(" ")).iterator());
JavaPairRDD<String, Integer> pairs = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<String,Integer>(s,1));
JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey((Function2<Integer, Integer, Integer>) (v1, v2) -> v1+v2);
wordCounts.foreach((VoidFunction<Tuple2<String, Integer>>) wordAndCount -> System.out.println(wordAndCount._1+wordAndCount._2));
sc.close();
}
}spark 2.4.7安装
https://mirrors.ustc.edu.cn/apache/spark/spark-2.4.7/
下载2.4.7 on hadoop2.7 版本的tgz文件. tgz的解压也可以用tar -zxvf
解压到/home/tangsong.math/repos/spark-2.4.7-bin-hadoop2.7/
cd conf
cp slaves.template slaves
默认这个slaves只有一个localhost
cp spark-env.sh.template spark-env.sh
配置为
export JAVA_HOME=/home/tangsong.math/jdk1.8.0_251
SPARK_MASTER_HOST=localhost
SPARK_MASTER_PORT=7077
我们在data目录新建一个文件
然后执行bin目录下的spark-shell
可以看到使用的是scala 2.11.12
已经预先准备好一个SparkContext.控制台会显示
Spark context available as 'sc' (master = local[*], app id = local-1616228222799).
Spark session available as 'spark'.
注意,此时的目录就是data目录.
执行
sc.textFile("words.txt").flatMap(o=>o.split(" ")).map(o=>(o,1)).reduceByKey((a,b)=>a+b).collect
要退出命令行环境可以输入:quit 注意有个冒号
spark自然也有一个web监控页面(退出命令行的话这个页面就打不开了)
http://10.227.26.77:4040/jobs/
提交任务,也可以用命令行.下面是一个自带的计算pi的例子
bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[2] ./examples/jars/spark-examples_2.11-2.4.7.jar 10
为了使得退出命令行后仍然可以看历史记录,需要进行一些配置.首先启动hadoop.
访问hdfs的url为
hdfs://n227-026-077:9000
新建一个directory目录
bin/hadoop fs -mkdir /directory
在spark的conf目录下复制一下文件
cp spark-defaults.conf.template spark-defaults.conf
新增配置项
spark.eventLog.enabled true
spark.eventLog.dir hdfs://n227-026-077:9000/directory
再在spark-env.sh中新增配置
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://n227-026-077:9000/directory
-Dspark.history.retainedApplication=30"
启动spark
sbin/start-all.sh
sbin/start-history-server.sh
执行一个任务
bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://localhost:7077 ./examples/jars/spark-examples_2.11-2.4.7.jar 10
打开
http://10.227.26.77:18080/
即可看到历史记录.
为了向yarn cluster提交任务,需要修改spark-env.sh
export HADOOP_CONF_DIR=/home/tangsong.math/repos/hadoop-3.2.1/etc/hadoop
启动Hadoop后在spark的bin目录下执行
./spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.4.7.jar 100
要查看结果需要到Hadoop的日志目录中
/home/tangsong.math/repos/hadoop-3.2.1/logs/userlogs/application_1618850623125_0002/container_1618850623125_0002_01_000001
cat stdout即可查看.当然也可以在yarn webui中查看这个task的日志
如果我们希望远程提交spark任务需要将spark-env.sh中的
SPARK_MASTER_HOST=localhost
改为
SPARK_MASTER_HOST=n227-026-077
现在我们可以
SparkConf conf = new SparkConf().setAppName("testWordCount").setMaster("spark://10.227.26.77:7077");
另外有一个默认的webui(不是jobhistory)
http://10.227.26.77:8080/
这里可以看正在运行的任务,也可以kill掉他们
如果出现下面的WARN
WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
然后任务一直running,打开webui查看worker的log
http://n227-026-077:8080/app/?appId=app-20210423224539-0000
实际上是因为worker连不上driver(driver就是在mac上运行那个)
spark on yarn有client和cluster两种模式.在client模式下,driver运行在提交任务的机器上.在cluster模式下driver是AM(ApplicationMaster)的一部分
在我们的mac idea控制台上可以看到类似于
21/04/23 23:23:20 INFO NettyBlockTransferService: Server created on 192.168.1.113:51414
21/04/23 23:23:20 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 51414.
21/04/23 23:23:20 INFO NettyBlockTransferService: Server created on 192.168.1.113:51414
如果ifconfig | grep inet也可以看到本机ip.
Caused by: java.io.IOException: Failed to connect to /192.168.1.113:51412
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198)
at org.apache.spark.rpc.netty.Outbox$1.call(Outbox.scala:194)
at org.apache.spark.rpc.netty.Outbox$1.call(Outbox.scala:190)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
解决是手动设置driver.host以及jar
String[] jars={"/Users/bytedance/repos/untitled4/target/untitled4-1.0-SNAPSHOT.jar"};
SparkConf conf = new SparkConf().setAppName("testWordCount")
.setMaster("spark://10.227.26.77:7077")
.set("spark.executor.memory", "2g")
.set("spark.driver.host","10.254.5.116")
.setJars(jars);
下面报错可以参考https://stackoverflow.com/questions/28186607/java-lang-classcastexception-using-lambda-expressions-in-spark-job-on-remote-ser/28367602#28367602
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, 10.227.26.77, executor 0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDDLike$fn
1.f
fn
1
报错FileNotFound则可能是由于worker节点没有读到本地文件,所以最好用hdfs
报错java.lang.NumberFormatException: For input string: "30s"一般是spark与hadoop版本不匹配(很多坑)
./spark-shell 执行下面的代码,我们发现可以直接访问hdfs.
sc.textFile("hdfs://10.227.26.77:9000/input/query_result.tsv").flatMap(o =>o.split(" ")).map(o=>(o,1)).reduceByKey((a,b)=>a+b).collect().map(a=>(a.2,a._1)).sortBy(._1).reverse.map(a=>(a._2,a._1)).collect({
case o:(String,Int) => o
}
).foreach(println)
最终找到一个可以运行的版本(Cannot resolve org.apache.hadoop:hadoop-hdfs-client:2.7.7不过好像没影响?或者直接把这一项删掉)
HDFS的驱动类org.apache.hadoop.hdfs.DistributedFileSystem在
<groupid>org.apache.hadoop</groupid>
<artifactid>hadoop-hdfs</artifactid>
中.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.7</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-eclipse-plugin -->
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency> </dependencies> <build> <sourcedirectory>src/main/java</sourcedirectory> <testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.8</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>${mainClass}</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins> </build> <reporting> <plugins> <plugin> <groupid>org.scala-tools</groupid> <artifactid>maven-scala-plugin</artifactid> <configuration> <scalaversion>${scala.version}</scalaversion> </configuration> </plugin> </plugins> </reporting> </project> 测试结果:必须调用setJars(将jar上传到master)必须用实际的实现类而不是lambda否则会有SerializedLambda的问题(不同hadoop版本报的错可能还不同)
运行结果可以在spark对应application的日志的stdout看到打印到控制台的内容.改了代码的话要重新打包才会使得master上运行的是新版代码.
http://n227-026-077:8080/
package com.zkdx;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class WordCountSpark {
public static void main(String[]args) throws ClassNotFoundException {
Class.forName("org.apache.hadoop.hdfs.DistributedFileSystem");
String[] jars={"/Users/bytedance/repos/untitled4/target/untitled4-1.0-SNAPSHOT-jar-with-dependencies.jar"};
SparkConf conf = new SparkConf().setAppName("testWordCount")
.setMaster("spark://10.227.26.77:7077")
.set("spark.executor.memory", "2g")
.set("spark.driver.host","10.254.35.95")
.setJars(jars);
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<string> lines = sc.textFile("hdfs://10.227.26.77:9000/input/query_result.tsv");
JavaRDD<string> words = lines.flatMap(new Helper1());
JavaPairRDD<String, Integer> pairs = words.mapToPair(new Helper2());
JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Helper3());
wordCounts.foreach(new Helper4());
sc.close();</string></string>
}
}
class Helper1 implements FlatMapFunction<String, String>{
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}}
class Helper2 implements PairFunction<String, String, Integer>{
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String,Integer>(s,1);
}
}
class Helper3 implements Function2<Integer, Integer, Integer>{
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}}
class Helper4 implements VoidFunction<Tuple2<String, Integer>>{
@Override
public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
System.out.println(stringIntegerTuple2._1+stringIntegerTuple2._2);
}}
SparkSQL
将hive的hive-site.xml拷到spark的conf目录下从spark的bin目录执行spark-sql即可进入.
有问题的case.
java.sql.SQLException: Cannot convert column 1 to integerjava.lang.NumberFormatException: For input string: "key_date1.id"
看起来返回结果类似于一个csv结构,最上面一行是字段名
key_date1.id key_date1.name key_date1.start_date
字段类型分别为int,string,string所以调用jd.show()或者jd.collectAsList()都会因为最上面一行的字段名中首个字段名不能转为int报错.
public class WordCountSpark {
public static void main(String[] args) throws ClassNotFoundException {
SparkConf conf = new SparkConf().setAppName("testSparkSQL").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
Class.forName("org.apache.hadoop.hdfs.DistributedFileSystem");
SQLContext sqlContext = new SQLContext(sc);
String url = "jdbc:hive2://n227-026-077:10000/analysis";
Properties connectionProperties = new Properties();
connectionProperties.put("user", "root");
connectionProperties.put("password", "");
connectionProperties.put("driver", "org.apache.hive.jdbc.HiveDriver");
Dataset<Row> jd = sqlContext.read().jdbc(url, "key_date1", connectionProperties);
System.out.println(jd.count());
List<Row> rows = jd.collectAsList();
for (int i = 0; i < rows.size(); i++)
if (i == 0)
System.out.println(
String.format(
"%s %s %s",
rows.get(0).getString(1), rows.get(0).getString(1), rows.get(0).getString(2)));
else
System.out.println(
new KeyDate(rows.get(i).getInt(1), rows.get(i).getString(2), rows.get(i).getString(3)));}
public static class KeyDate implements Serializable {
private Integer id;
private String name;
private String startDate;
public KeyDate() {}
public void setId(Integer id) {
this.id = id;
}
public void setName(String name) {
this.name = name;
}
public void setStartDate(String startDate) {
this.startDate = startDate;
}
public KeyDate(Integer id, String name, String startDate) {
this.id = id;
this.name = name;
this.startDate = startDate;
}
@Override
public String toString() {
return "KeyDate{"
+ "id="
+ id
+ ", name='"
+ name
+ '\''
+ ", startDate='"
+ startDate
+ '\''
+ '}';
} }
}
改用SparkSession也不行
SparkSession sparkSession =
SparkSession.builder().appName("HiveMySQLApp").master("local").getOrCreate();
String url = "jdbc:hive2://n227-026-077:10000/analysis";
Properties connectionProperties = new Properties();
connectionProperties.put("user", "root");
connectionProperties.put("password", "");
connectionProperties.put("driver", "org.apache.hive.jdbc.HiveDriver");
Dataset<Row> jd = sparkSession.read().jdbc(url, "key_date1", connectionProperties);
jd.collectAsList();在hive server2的页面上我们看到生成的sql是
SELECT "key_date1.id","key_date1.name","key_date1.start_date" FROM key_date1
注意这里的双引号,这会导致生成key_date表行数那么多的相同的3个字符串.
换成javaRDD也没用
SparkSession sparkSession =
SparkSession.builder().appName("HiveMySQLApp").master("local").getOrCreate();
String url = "jdbc:hive2://n227-026-077:10000";
Properties connectionProperties = new Properties();
connectionProperties.put("user", "root");
connectionProperties.put("password", "");
connectionProperties.put("driver", "org.apache.hive.jdbc.HiveDriver");
sparkSession
.read()
.jdbc(url, "analysis.key_date1", connectionProperties)
.createOrReplaceTempView("view");
JavaRDD<Row> javaRDD = sparkSession.table("view").javaRDD();
javaRDD.foreach(
row -> System.out.println(new KeyDate(row.getInt(1), row.getString(2), row.getString(3))));似乎是包中代码冲突(反编译scala的spark-sql包后发现调用的会抛出异常的方法).但是为什么直接运行命令行的spark-sql没有这个问题
用spark2.4.7目录下的jar覆盖本地m2目录中的jar
/Users/bytedance/.m2/repository/org/apache/spark/spark-core_2.11/2.4.7
scp tangsong.math@10.227.26.77:/repos/spark-2.4.7-bin-hadoop2.7/jars/spark-sql_2.11-2.4.7.jar .//repos/spark-2.4.7-bin-hadoop2.7/jars/hive-jdbc-1.2.1.spark2.jar ./
/Users/bytedance/.m2/repository/org/spark-project/hive/hive-jdbc/1.2.1.spark2
scp tangsong.math@10.227.26.77:
事后证明是配置文件不对.resources目录下的hive-site.xml文件没写对.
更换了hive1.2.2以及hadoop2.7.7版本
注意给spark用的hive-site.xml中路径为本地路径.文件新增如下配置.(如果在开发机上部署的远程spark运行,则mysql的ip:port可以写成localhost:3306.最下面的hive.downloaded.resources.dir也要相应修改.
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://10.227.26.77:3306/hive?createDatabaseIfNotExist=true&serverTimezone=Asia/Shanghai</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.cj.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<property>
<name>hive.querylog.location</name>
<value>/data/hive_repo/querylog</value>
</property>
<property>
<name>hive.exec.local.scratchdir</name>
<value>/Users/bytedance/repos/scratchdir</value>
</property>
<property>
<name>hive.downloaded.resources.dir</name>
<value>/Users/bytedance/repos/resources</value>
</property> public static void main(String[] args) {
String url = "jdbc:hive2://n227-026-077:10000";
SparkSession sparkSession =
SparkSession.builder()
.appName("HiveMySQLApp")
.config("url", url)
.enableHiveSupport()
.master("local")
.getOrCreate();
sparkSession.sql("select* from analysis.key_date").show();}
spark sql读取到的下标从0开始
List<row> rows = sparkSession.sql("select* from analysis.key_date").collectAsList();
for(Row row:rows){
System.out.println(new KeyDate(row.getInt(0),row.getString(1),row.getString(2)));
}</row>
最终配置
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemalocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelversion>4.0.0</modelversion>
<groupid>org.example</groupid>
<artifactid>untitled4</artifactid>
<version>1.0-SNAPSHOT</version>
<inceptionyear>2008</inceptionyear>
</project>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <mainClass>com.zkdx.WordCountSpark</mainClass> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <hadoop.version>2.7.3</hadoop.version> <spark.version>2.3.2</spark.version><dependencies> <dependency> <groupid>mysql</groupid> <artifactid>mysql-connector-java</artifactid> <version>8.0.16</version> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1</version>
</dependency>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-client -->
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-eclipse-plugin -->
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency> </dependencies> <build> <plugins> <plugin> <artifactid>maven-compiler-plugin</artifactid> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> <compilerargument>-proc:none</compilerargument> </configuration>
</plugin>
<plugin>
<groupId>com.coveo</groupId>
<artifactId>fmt-maven-plugin</artifactId>
<version>2.5.1</version>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<!--放置生成的文件覆盖我们自定义的文件-->
<addMavenDescriptor>false</addMavenDescriptor>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"></transformer>
</transformers><createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
</plugins> </configuration> </plugin> 也可以创建表,此时必须指定用户名密码否则有权限问题(直接访问HDFD因此是指定hadoop用户名)
当使用create as select语句的时候不能创建external table.创建external talel必须指定path.
String url = "jdbc:hive2://n227-026-077:10000";
System.getProperties().setProperty("HADOOP_USER_NAME", "root");
SparkSession sparkSession =
SparkSession.builder()
.appName("HiveMySQLApp")
.config("url", url)
.enableHiveSupport()
.master("local")
.getOrCreate();
sparkSession.sql(
"create table analysis.key_date1 as select* from analysis.key_date where id>10");运行pyspark
会提示
Using Python version 2.7.13 (default, Sep 26 2018 18:42:22)
SparkSession available as 'spark'.
输入
spark.sql('select* from analysis.key_date1').show()

京公网安备 11010502036488号