选择镜像下载压缩包

http://mirrors.hust.edu.cn/apache/hadoop/core/hadoop-3.2.1/
解压后配置etc/hadoop下的各个配置文件.jdk的地址以及主机名根据实际情况决定

修改配置文件

hadoop-env.sh
JAVA_HOME=/home/tangsong.math/jdk1.8.0_251
HADOOP_HOME=/home/tangsong.math/repos/hadoop-3.2.1
core-site.xml
<property><name>fs.defaultFS</name><value>hdfs://n227-026-077:9000</value></property>
<property><name>hadoop.tmp.dir</name><value>/home/tangsong.math/repos/hadoop-3.2.1/data</value></property>

(3.2.1版本只有mapred-site.xml但是2.10.1版本只有mapred-site.xml.template)
cp mapred-site.xml.template mapred-site.xml
mapred-site.xml
<property><name>mapreduce.framework.name</name><value>yarn</value></property>
<property><name>mapreduce.jobhistory.address</name><value>n227-026-077:10020</value></property>
<property><name>mapreduce.jobhistory.webapp.address</name><value>n227-026-077:19888</value></property>
<property><name>yarn.app.mapreduce.am.env</name><value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value></property>
<property><name>mapreduce.map.env</name><value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value></property>
<property><name>mapreduce.reduce.env</name><value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value></property>

yarn-site.xml
<property><name>yarn.resourcemanager.hostname</name><value>n227-026-077</value></property>
<property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property>
#配置本机ssh免密登陆 
cd ~/.ssh/
ssh-keygen -t rsa     //提示按ENTER就行
cat./id_rsa.pub>>./authorized_key         //加入授权
#启动hadoop
在hadoop的bin目录执行
sudo ./hdfs namenode -format  
sbin目录执行 
./start-all.sh
用jps命令看一下,应该有下面几个进程.如果没有正确启动,到hadoop/logs下看看日志,通常是因为没有权限等问题.
 DataNode ResourceManager SecondaryNameNode NameNode,NodeManager
查看yarn的历史任务(hadoop3才是默认8088端口)
http://10.227.26.77:8088/cluster
查看集群情况
http://10.227.26.77:9870/dfshealth.html#tab-overview

maven工程配置

 <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.2.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs-client</artifactId>
            <version>3.2.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.1</version>
        </dependency>


        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.2.1</version>
        </dependency>

在resource下新建一个log4j.properties内容如下
log4j.rootLogger=DEBUG, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n

WordCount

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


import java.io.IOException;

public class Foo {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf=new Configuration();
        conf.set("fs.defaultFS","hdfs://10.227.26.77:9000");
        conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
        System.getProperties().setProperty("HADOOP_USER_NAME","root");

        FileSystem fs=FileSystem.get(conf);
        String inputPath="/hdfsapi/input";
        String outputPath="/hdfsapi/output";
        fs.mkdirs(new Path(inputPath));
        fs.copyFromLocalFile(new Path("/Users/bytedance/repos/test.py"),new Path(inputPath));
        fs.close();

        Job job= Job.getInstance(conf,"word count");

        job.setJarByClass(Foo.class);
        job.setMapperClass(WMapper.class);
        job.setReducerClass(WReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job,new Path(inputPath));
        FileOutputFormat.setOutputPath(job,new Path(outputPath));

        job.waitForCompletion(true);

    }

}

class WMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    private LongWritable one = new LongWritable(1);
    private Text word = new Text();

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] tokens = value.toString().split("\\s+");
        for (String token : tokens) {
            word.set(token);
            context.write(word, one);
        }

    }
}

class WReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    @Override
    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
        for (LongWritable val : values) count++;
        context.write(key, new LongWritable(count));
    }
}

矩阵乘法

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;

public class Foo {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://10.227.26.77:9000");
        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");

        conf.set("ARowCount", "4");
        conf.set("AColCount", "3");
        conf.set("BRowCount", "3");
        conf.set("BColCount", "2");

        System.getProperties().setProperty("HADOOP_USER_NAME", "root");

        FileSystem fs = FileSystem.get(conf);
        String inputPath = "/hdfsapi/input";
        String outputPath = "/hdfsapi/output";
        fs.mkdirs(new Path(inputPath));
        fs.copyFromLocalFile(new Path("/Users/bytedance/repos/a.json"), new Path(inputPath));
        fs.close();
        Job job = Job.getInstance(conf, "matrix multiply");

        job.setJarByClass(Foo.class);
        job.setMapperClass(WMapper.class);
        job.setReducerClass(WReducer.class);

        job.setMapOutputKeyClass(MatrixKey.class);
        job.setMapOutputValueClass(Text.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        job.waitForCompletion(true);

    }

}

class MatrixKey implements WritableComparable<MatrixKey> {

    private int row;

    public MatrixKey() {
    }

    public MatrixKey(int row, int col) {
        this.row = row;
        this.col = col;
    }

    private int col;

    @Override
    public int compareTo(MatrixKey o) {
        if (this.row > o.row) return 1;
        else if (this.row < o.row) return -1;
        if (this.col > o.col) return 1;
        else if (this.col < o.col) return -1;
        return 0;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        MatrixKey matrixKey = (MatrixKey) o;
        return row == matrixKey.row &&
                col == matrixKey.col;
    }

    @Override
    public int hashCode() {
        return Objects.hash(row, col);
    }

    public void setRow(int row) {
        this.row = row;
    }

    public void setCol(int col) {
        this.col = col;
    }

    public int getRow() {
        return row;
    }

    public int getCol() {
        return col;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(row);
        dataOutput.writeInt(col);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        row = dataInput.readInt();
        col = dataInput.readInt();
    }
}

// A 4*3 B3*2
class WMapper extends Mapper<LongWritable, Text, MatrixKey, Text> {

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] tokens = value.toString().split("\\s+");
        if (tokens.length != 4) return;
        int row = Integer.parseInt(tokens[1]);
        int col = Integer.parseInt(tokens[2]);
        if (tokens[0].equals("A")) {
            //矩阵C是ARowCount*BColCount的.一个aij会对所有的cik,1<=k<=BColCount有效

            int count = Integer.parseInt(context.getConfiguration().get("BColCount"));
            for (int k = 1; k <= count; k++)
                context.write(new MatrixKey(row, k), value);
        } else if (tokens[0].equals("B")) {
            //矩阵C是RowsAndCols.ARowCount*RowsAndCols.BColCount的.一个bij会对所有的ckj,1<=ARowCount<=k
            int count = Integer.parseInt(context.getConfiguration().get("ARowCount"));
            for (int k = 1; k <= count; k++)
                context.write(new MatrixKey(k, col), value);
        }


    }
}

class WReducer extends Reducer<MatrixKey, Text, Text, LongWritable> {
    @Override
    public void reduce(MatrixKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        int row = key.getRow();
        int col = key.getCol();
        //有贡献的是a[row,k]以及b[k,col],1<=k<=
        int count = Integer.parseInt(context.getConfiguration().get("AColCount"));
        int[] buf1 = new int[count + 1];
        int[] buf2 = new int[count + 1];
        for (Text value : values) {
            String[] tokens = value.toString().split("\\s+");
            if (tokens.length != 4) return;
            if (tokens[0].equals("A")) {

                int k = Integer.parseInt(tokens[2]);
                int val = Integer.parseInt(tokens[3]);
                buf1[k]+=val;
            } else if (tokens[0].equals("B")) {
                int k = Integer.parseInt(tokens[1]);
                int val = Integer.parseInt(tokens[3]);
                buf2[k]+=val;
            }
        }
        int sum=0;
        for(int i=1;i<=count;i++)sum+=buf1[i]*buf2[i];
        context.write(new Text("C "+row+" "+col),new LongWritable(sum));
    }
}

http://mirrors.hust.edu.cn/apache/hadoop/core/hadoop-3.2.1/
解压后配置etc/hadoop下的各个配置文件.jdk的地址以及主机名根据实际情况决定

任务提交到yarn队列运行

此时必须先在本地打好jar然后调用Job的setJar而不是setJarByClass

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class Foo {
  public static void main(String[] args)
      throws IOException, ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "hdfs://10.227.26.77:9000");
    conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");

    conf.set("mapreduce.framework.name", "yarn");
    conf.set("yarn.resourcemanager.hostname", "n227-026-077");
    conf.set("yarn.resourcemanager.resource-tracker.address", "n227-026-077:8031");
    conf.set("yarn.resourcemanager.address", "n227-026-077:8032");
    conf.set("yarn.resourcemanager.scheduler.address", "n227-026-077:8030");
    conf.set("yarn.resourcemanager.admin.address", "n227-026-077:8033");
    conf.set(
        "yarn.application.classpath",
        "/home/tangsong.math/repos/hadoop-3.2.1/etc/hadoop:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/common/lib/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/common/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/hdfs:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/hdfs/lib/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/hdfs/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/mapreduce/lib/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/mapreduce/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/yarn:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/yarn/lib/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/yarn/*");
    conf.set("yarn.app.mapreduce.am.env", "HADOOP_MAPRED_HOME=${HADOOP_HOME}");
    conf.set("mapreduce.map.env", "HADOOP_MAPRED_HOME=${HADOOP_HOME}");
    conf.set("mapreduce.reduce.env", "HADOOP_MAPRED_HOME=${HADOOP_HOME}");
    conf.set("HADOOP_HOME", "/home/tangsong.math/repos/hadoop-3.2.1");
    conf.set("hadoop.home.dir", "/home/tangsong.math/repos/hadoop-3.2.1");
    conf.set(
        "mapreduce.application.classpath",
        "/home/tangsong.math/repos/hadoop-3.2.1/etc/hadoop:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*:$HADOOP_MAPRED_HOME/share/hadoop/common/*:$HADOOP_MAPRED_HOME/share/hadoop/common/lib/*:$HADOOP_MAPRED_HOME/share/hadoop/yarn/*:$HADOOP_MAPRED_HOME/share/hadoop/yarn/lib/*:$HADOOP_MAPRED_HOME/share/hadoop/hdfs/*:$HADOOP_MAPRED_HOME/share/hadoop/hdfs/lib/*");

    conf.set("ARowCount", "4");
    conf.set("AColCount", "3");
    conf.set("BRowCount", "3");
    conf.set("BColCount", "2");

    System.getProperties().setProperty("HADOOP_USER_NAME", "root");

    FileSystem fs = FileSystem.get(conf);
    String inputPath = "/hdfsapi/input";
    String outputPath = "/hdfsapi/output";
    fs.mkdirs(new Path(inputPath));
    fs.copyFromLocalFile(new Path("/Users/bytedance/repos/a.json"), new Path(inputPath));
    fs.close();
    Job job = Job.getInstance(conf, "matrix multiply");
    // job.setJarByClass(Foo.class);
    job.setJar("/Users/bytedance/repos/untitled3/target/untitled3-1.0-SNAPSHOT.jar");
    job.setMapperClass(WMapper.class);
    job.setReducerClass(WReducer.class);

    job.setMapOutputKeyClass(MatrixKey.class);
    job.setMapOutputValueClass(Text.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(inputPath));
    FileOutputFormat.setOutputPath(job, new Path(outputPath));

    int ret = job.waitForCompletion(true) ? 0 : 1;
    System.out.println("ret= " + ret);
    org.apache.hadoop.mapreduce.Counters counters = job.getCounters();
    org.apache.hadoop.mapreduce.Counter counter =
        counters.findCounter(IncorrectFormatReason.NOT_A_OR_B);
    System.out.println("IncorrectFormatReason.NOT_A_OR_B:" + counter.getValue());
  }
}

class MatrixKey implements WritableComparable<MatrixKey> {

  private int row;

  public MatrixKey() {}

  public MatrixKey(int row, int col) {
    this.row = row;
    this.col = col;
  }

  private int col;

  @Override
  public int compareTo(MatrixKey o) {
    if (this.row > o.row) return 1;
    else if (this.row < o.row) return -1;
    if (this.col > o.col) return 1;
    else if (this.col < o.col) return -1;
    return 0;
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;
    MatrixKey matrixKey = (MatrixKey) o;
    return row == matrixKey.row && col == matrixKey.col;
  }

  @Override
  public int hashCode() {
    return Objects.hash(row, col);
  }

  public void setRow(int row) {
    this.row = row;
  }

  public void setCol(int col) {
    this.col = col;
  }

  public int getRow() {
    return row;
  }

  public int getCol() {
    return col;
  }

  @Override
  public void write(DataOutput dataOutput) throws IOException {
    dataOutput.writeInt(row);
    dataOutput.writeInt(col);
  }

  @Override
  public void readFields(DataInput dataInput) throws IOException {
    row = dataInput.readInt();
    col = dataInput.readInt();
  }
}

enum IncorrectFormatReason {
  NOT_A_OR_B
}
// A 4*3 B3*2
class WMapper extends Mapper<LongWritable, Text, MatrixKey, Text> {

  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    String[] tokens = value.toString().split("\\s+");
    if (tokens.length != 4) {
      context.getCounter(IncorrectFormatReason.NOT_A_OR_B).increment(1);
      return;
    }
    int row = Integer.parseInt(tokens[1]);
    int col = Integer.parseInt(tokens[2]);
    if (tokens[0].equals("A")) {
      // 矩阵C是ARowCount*BColCount的.一个aij会对所有的cik,1<=k<=BColCount有效

      int count = Integer.parseInt(context.getConfiguration().get("BColCount"));
      for (int k = 1; k <= count; k++) context.write(new MatrixKey(row, k), value);
    } else if (tokens[0].equals("B")) {
      // 矩阵C是RowsAndCols.ARowCount*RowsAndCols.BColCount的.一个bij会对所有的ckj,1<=ARowCount<=k
      int count = Integer.parseInt(context.getConfiguration().get("ARowCount"));
      for (int k = 1; k <= count; k++) context.write(new MatrixKey(k, col), value);
    }
  }
}

class WReducer extends Reducer<MatrixKey, Text, Text, LongWritable> {
  @Override
  public void reduce(MatrixKey key, Iterable<Text> values, Context context)
      throws IOException, InterruptedException {
    int row = key.getRow();
    int col = key.getCol();
    // 有贡献的是a[row,k]以及b[k,col],1<=k<=
    int count = Integer.parseInt(context.getConfiguration().get("AColCount"));
    int[] buf1 = new int[count + 1];
    int[] buf2 = new int[count + 1];
    for (Text value : values) {
      String[] tokens = value.toString().split("\\s+");
      if (tokens.length != 4) return;
      if (tokens[0].equals("A")) {

        int k = Integer.parseInt(tokens[2]);
        int val = Integer.parseInt(tokens[3]);
        buf1[k] += val;
      } else if (tokens[0].equals("B")) {
        int k = Integer.parseInt(tokens[1]);
        int val = Integer.parseInt(tokens[3]);
        buf2[k] += val;
      }
    }
    int sum = 0;
    for (int i = 1; i <= count; i++) sum += buf1[i] * buf2[i];
    context.write(new Text("C " + row + " " + col), new LongWritable(sum));
  }
}

spark安装与WordCount

scala版本与jdk版本一定要匹配否则无法运行。如果用scala 2.12版本或以上需要jdk9或更高,用jdk8报错
jdk8可以用scala 2.11.7

pom如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.example</groupId>
  <artifactId>untitled4</artifactId>
  <version>1.0-SNAPSHOT</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
    <scala.version>2.11.7</scala.version>
  </properties>

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.4.7</version>
    </dependency>
    <dependency>
      <groupId>org.scala-tools</groupId>
      <artifactId>maven-scala-plugin</artifactId>
      <version>2.11</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-eclipse-plugin -->
    <dependency>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-eclipse-plugin</artifactId>
      <version>2.5.1</version>
    </dependency>

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.4</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.specs</groupId>
      <artifactId>specs</artifactId>
      <version>1.2.5</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
          <args>
            <arg>-target:jvm-1.8</arg>
          </args>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-eclipse-plugin</artifactId>
        <configuration>
          <downloadSources>true</downloadSources>
          <buildcommands>
            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
          </buildcommands>
          <additionalProjectnatures>
            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
          </additionalProjectnatures>
          <classpathContainers>           <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>           <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
          </classpathContainers>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>
</project>

下面分别给出scala版和java版的wordcount

package org.example

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object App {

  def main(args: Array[String]): Unit = {
    val sparkConf=new SparkConf().setMaster("local").setAppName("WordCount")
    val sc=new SparkContext(sparkConf)
    val lines:RDD[String]=sc.textFile("/Users/bytedance/Downloads/a.cpp")
    val words=lines.flatMap(_.split(" "))
    val wordGroup:RDD[(String,Iterable[String])]=words.groupBy(word=>word)
    val array=wordGroup.map({
      case (word,list)=>(word,list.size)
    }).collect()
    array.foreach(println)
    sc.stop()
  }
}
package com.zkdx;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;

public class WordCountSpark {
    public static void main(String[]args){

        SparkConf conf = new SparkConf().setAppName("testWordCount").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("/Users/bytedance/Downloads/a.cpp");
        JavaRDD<String> words = lines.flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split(" ")).iterator());
        JavaPairRDD<String, Integer> pairs = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<String,Integer>(s,1));
        JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey((Function2<Integer, Integer, Integer>) (v1, v2) -> v1+v2);
        wordCounts.foreach((VoidFunction<Tuple2<String, Integer>>) wordAndCount -> System.out.println(wordAndCount._1+wordAndCount._2));
        sc.close();

    }

}

spark 2.4.7安装

https://mirrors.ustc.edu.cn/apache/spark/spark-2.4.7/
下载2.4.7 on hadoop2.7 版本的tgz文件. tgz的解压也可以用tar -zxvf
解压到/home/tangsong.math/repos/spark-2.4.7-bin-hadoop2.7/
cd conf
cp slaves.template slaves
默认这个slaves只有一个localhost

cp spark-env.sh.template spark-env.sh
配置为
export JAVA_HOME=/home/tangsong.math/jdk1.8.0_251
SPARK_MASTER_HOST=localhost
SPARK_MASTER_PORT=7077

我们在data目录新建一个文件
然后执行bin目录下的spark-shell
可以看到使用的是scala 2.11.12
已经预先准备好一个SparkContext.控制台会显示
Spark context available as 'sc' (master = local[*], app id = local-1616228222799).
Spark session available as 'spark'.
注意,此时的目录就是data目录.
执行
sc.textFile("words.txt").flatMap(o=>o.split(" ")).map(o=>(o,1)).reduceByKey((a,b)=>a+b).collect
要退出命令行环境可以输入:quit 注意有个冒号
spark自然也有一个web监控页面(退出命令行的话这个页面就打不开了)
http://10.227.26.77:4040/jobs/

提交任务,也可以用命令行.下面是一个自带的计算pi的例子
bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[2] ./examples/jars/spark-examples_2.11-2.4.7.jar 10

为了使得退出命令行后仍然可以看历史记录,需要进行一些配置.首先启动hadoop.
访问hdfs的url为
hdfs://n227-026-077:9000
新建一个directory目录
bin/hadoop fs -mkdir /directory
在spark的conf目录下复制一下文件
cp spark-defaults.conf.template spark-defaults.conf
新增配置项

spark.eventLog.enabled true
spark.eventLog.dir hdfs://n227-026-077:9000/directory
再在spark-env.sh中新增配置
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://n227-026-077:9000/directory
-Dspark.history.retainedApplication=30"

启动spark
sbin/start-all.sh
sbin/start-history-server.sh

执行一个任务
bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://localhost:7077 ./examples/jars/spark-examples_2.11-2.4.7.jar 10
打开
http://10.227.26.77:18080/
即可看到历史记录.

为了向yarn cluster提交任务,需要修改spark-env.sh
export HADOOP_CONF_DIR=/home/tangsong.math/repos/hadoop-3.2.1/etc/hadoop
启动Hadoop后在spark的bin目录下执行

./spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.4.7.jar 100

要查看结果需要到Hadoop的日志目录中
/home/tangsong.math/repos/hadoop-3.2.1/logs/userlogs/application_1618850623125_0002/container_1618850623125_0002_01_000001
cat stdout即可查看.当然也可以在yarn webui中查看这个task的日志

如果我们希望远程提交spark任务需要将spark-env.sh中的
SPARK_MASTER_HOST=localhost
改为
SPARK_MASTER_HOST=n227-026-077
现在我们可以
SparkConf conf = new SparkConf().setAppName("testWordCount").setMaster("spark://10.227.26.77:7077");

另外有一个默认的webui(不是jobhistory)
http://10.227.26.77:8080/
这里可以看正在运行的任务,也可以kill掉他们
如果出现下面的WARN
WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
然后任务一直running,打开webui查看worker的log
http://n227-026-077:8080/app/?appId=app-20210423224539-0000
实际上是因为worker连不上driver(driver就是在mac上运行那个)
spark on yarn有client和cluster两种模式.在client模式下,driver运行在提交任务的机器上.在cluster模式下driver是AM(ApplicationMaster)的一部分
在我们的mac idea控制台上可以看到类似于
21/04/23 23:23:20 INFO NettyBlockTransferService: Server created on 192.168.1.113:51414
21/04/23 23:23:20 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 51414.
21/04/23 23:23:20 INFO NettyBlockTransferService: Server created on 192.168.1.113:51414
如果ifconfig | grep inet也可以看到本机ip.
Caused by: java.io.IOException: Failed to connect to /192.168.1.113:51412
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198)
at org.apache.spark.rpc.netty.Outbox$1.call(Outbox.scala:194)
at org.apache.spark.rpc.netty.Outbox$1.call(Outbox.scala:190)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
解决是手动设置driver.host以及jar
String[] jars={"/Users/bytedance/repos/untitled4/target/untitled4-1.0-SNAPSHOT.jar"};
SparkConf conf = new SparkConf().setAppName("testWordCount")
.setMaster("spark://10.227.26.77:7077")
.set("spark.executor.memory", "2g")
.set("spark.driver.host","10.254.5.116")
.setJars(jars);

下面报错可以参考https://stackoverflow.com/questions/28186607/java-lang-classcastexception-using-lambda-expressions-in-spark-job-on-remote-ser/28367602#28367602
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, 10.227.26.77, executor 0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDDLike$fn1.ffn1

报错FileNotFound则可能是由于worker节点没有读到本地文件,所以最好用hdfs
报错java.lang.NumberFormatException: For input string: "30s"一般是spark与hadoop版本不匹配(很多坑)
./spark-shell 执行下面的代码,我们发现可以直接访问hdfs.
sc.textFile("hdfs://10.227.26.77:9000/input/query_result.tsv").flatMap(o =>o.split(" ")).map(o=>(o,1)).reduceByKey((a,b)=>a+b).collect().map(a=>(a.2,a._1)).sortBy(._1).reverse.map(a=>(a._2,a._1)).collect({
case o:(String,Int) => o
}
).foreach(println)

最终找到一个可以运行的版本(Cannot resolve org.apache.hadoop:hadoop-hdfs-client:2.7.7不过好像没影响?或者直接把这一项删掉)
HDFS的驱动类org.apache.hadoop.hdfs.DistributedFileSystem在
<groupid>org.apache.hadoop</groupid>
<artifactid>hadoop-hdfs</artifactid>
中.

<project xmlns="http&#58;&#47;&#47;maven&#46;apache&#46;org&#47;POM&#47;4&#46;0&#46;0" xmlns&#58;xsi="http&#58;&#47;&#47;www&#46;w3&#46;org&#47;2001&#47;XMLSchema&#45;instance" xsi&#58;schemalocation="http&#58;&#47;&#47;maven&#46;apache&#46;org&#47;POM&#47;4&#46;0&#46;0 http&#58;&#47;&#47;maven&#46;apache&#46;org&#47;maven&#45;v4&#95;0&#95;0&#46;xsd"> <modelversion>4.0.0</modelversion> <groupid>org.example</groupid> <artifactid>untitled4</artifactid> <version>1.0-SNAPSHOT</version> <inceptionyear>2008</inceptionyear> <properties> <scala.version>2.11.7</scala.version> <project.build.sourceencoding>UTF-8</project.build.sourceencoding> <mainclass>com.zkdx.WordCountSpark</mainclass> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <hadoop.version>2.7.7</hadoop.version> </properties> <repositories> <repository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </repository> </repositories> <pluginrepositories> <pluginrepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginrepository> </pluginrepositories> <dependencies>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.4.7</version>
  <exclusions>
    <exclusion>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
    </exclusion>
    <exclusion>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs-client</artifactId>
    </exclusion>
    <exclusion>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
    </exclusion>
    <exclusion>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
    </exclusion>
  </exclusions>
</dependency>
<dependency>
  <groupId>org.scala-tools</groupId>
  <artifactId>maven-scala-plugin</artifactId>
  <version>2.11</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-common</artifactId>
  <version>${hadoop.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-client -->
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-hdfs-client</artifactId>
  <version>${hadoop.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-hdfs</artifactId>
  <version>${hadoop.version}</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-eclipse-plugin -->
<dependency>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-eclipse-plugin</artifactId>
  <version>2.5.1</version>
</dependency>

<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>${scala.version}</version>
</dependency>
<dependency>
  <groupId>junit</groupId>
  <artifactId>junit</artifactId>
  <version>4.4</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.specs</groupId>
  <artifactId>specs</artifactId>
  <version>1.2.5</version>
  <scope>test</scope>
</dependency>
</dependencies> <build> <sourcedirectory>src/main/java</sourcedirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
  <plugin>
    <groupId>org.scala-tools</groupId>
    <artifactId>maven-scala-plugin</artifactId>
    <executions>
      <execution>
        <goals>
          <goal>compile</goal>
          <goal>testCompile</goal>
        </goals>
      </execution>
    </executions>
    <configuration>
      <scalaVersion>${scala.version}</scalaVersion>
      <args>
        <arg>-target:jvm-1.8</arg>
      </args>
    </configuration>
  </plugin>
  <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-eclipse-plugin</artifactId>
    <configuration>
      <downloadSources>true</downloadSources>
      <buildcommands>
        <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
      </buildcommands>
      <additionalProjectnatures>
        <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
      </additionalProjectnatures>
      <classpathContainers>
        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
        <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
      </classpathContainers>
    </configuration>
  </plugin>

  <plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
      <archive>
        <manifest>
          <mainClass>${mainClass}</mainClass>
        </manifest>
      </archive>
      <descriptorRefs>
        <descriptorRef>jar-with-dependencies</descriptorRef>
      </descriptorRefs>
    </configuration>

    <executions>
      <execution>
        <id>make-assembly</id>
        <phase>package</phase>
        <goals>
          <goal>assembly</goal>
        </goals>
      </execution>
    </executions>
  </plugin>

</plugins>
</build> <reporting> <plugins> <plugin> <groupid>org.scala-tools</groupid> <artifactid>maven-scala-plugin</artifactid> <configuration> <scalaversion>${scala.version}</scalaversion> </configuration> </plugin> </plugins> </reporting> </project>

测试结果:必须调用setJars(将jar上传到master)必须用实际的实现类而不是lambda否则会有SerializedLambda的问题(不同hadoop版本报的错可能还不同)
运行结果可以在spark对应application的日志的stdout看到打印到控制台的内容.改了代码的话要重新打包才会使得master上运行的是新版代码.
http://n227-026-077:8080/
package com.zkdx;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;

public class WordCountSpark {
public static void main(String[]args) throws ClassNotFoundException {
Class.forName("org.apache.hadoop.hdfs.DistributedFileSystem");
String[] jars={"/Users/bytedance/repos/untitled4/target/untitled4-1.0-SNAPSHOT-jar-with-dependencies.jar"};
SparkConf conf = new SparkConf().setAppName("testWordCount")
.setMaster("spark://10.227.26.77:7077")
.set("spark.executor.memory", "2g")
.set("spark.driver.host","10.254.35.95")
.setJars(jars);
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<string> lines = sc.textFile("hdfs://10.227.26.77:9000/input/query_result.tsv");
JavaRDD<string> words = lines.flatMap(new Helper1());
JavaPairRDD<String, Integer> pairs = words.mapToPair(new Helper2());
JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Helper3());
wordCounts.foreach(new Helper4());
sc.close();</string></string>

}

}

class Helper1 implements FlatMapFunction<String, String>{

@Override
public Iterator<String> call(String s) throws Exception {
    return Arrays.asList(s.split(" ")).iterator();
}

}
class Helper2 implements PairFunction<String, String, Integer>{
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String,Integer>(s,1);
}
}
class Helper3 implements Function2<Integer, Integer, Integer>{

@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
    return integer+integer2;
}

}

class Helper4 implements VoidFunction<Tuple2<String, Integer>>{

@Override
public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
    System.out.println(stringIntegerTuple2._1+stringIntegerTuple2._2);
}

}

SparkSQL
将hive的hive-site.xml拷到spark的conf目录下从spark的bin目录执行spark-sql即可进入.

有问题的case.
java.sql.SQLException: Cannot convert column 1 to integerjava.lang.NumberFormatException: For input string: "key_date1.id"
看起来返回结果类似于一个csv结构,最上面一行是字段名
key_date1.id key_date1.name key_date1.start_date
字段类型分别为int,string,string所以调用jd.show()或者jd.collectAsList()都会因为最上面一行的字段名中首个字段名不能转为int报错.

public class WordCountSpark {
public static void main(String[] args) throws ClassNotFoundException {

SparkConf conf = new SparkConf().setAppName("testSparkSQL").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
Class.forName("org.apache.hadoop.hdfs.DistributedFileSystem");

SQLContext sqlContext = new SQLContext(sc);

String url = "jdbc:hive2://n227-026-077:10000/analysis";

Properties connectionProperties = new Properties();
connectionProperties.put("user", "root");
connectionProperties.put("password", "");
connectionProperties.put("driver", "org.apache.hive.jdbc.HiveDriver");

Dataset<Row> jd = sqlContext.read().jdbc(url, "key_date1", connectionProperties);
System.out.println(jd.count());
List<Row> rows = jd.collectAsList();
for (int i = 0; i < rows.size(); i++)
  if (i == 0)
    System.out.println(
        String.format(
            "%s %s %s",
            rows.get(0).getString(1), rows.get(0).getString(1), rows.get(0).getString(2)));
  else
    System.out.println(
        new KeyDate(rows.get(i).getInt(1), rows.get(i).getString(2), rows.get(i).getString(3)));

}

public static class KeyDate implements Serializable {
private Integer id;
private String name;
private String startDate;

public KeyDate() {}

public void setId(Integer id) {
  this.id = id;
}

public void setName(String name) {
  this.name = name;
}

public void setStartDate(String startDate) {
  this.startDate = startDate;
}

public KeyDate(Integer id, String name, String startDate) {
  this.id = id;
  this.name = name;
  this.startDate = startDate;
}

@Override
public String toString() {
  return "KeyDate{"
      + "id="
      + id
      + ", name='"
      + name
      + '\''
      + ", startDate='"
      + startDate
      + '\''
      + '}';
}

}
}

改用SparkSession也不行
SparkSession sparkSession =
SparkSession.builder().appName("HiveMySQLApp").master("local").getOrCreate();

String url = "jdbc:hive2://n227-026-077:10000/analysis";

Properties connectionProperties = new Properties();
connectionProperties.put("user", "root");
connectionProperties.put("password", "");
connectionProperties.put("driver", "org.apache.hive.jdbc.HiveDriver");

Dataset<Row> jd = sparkSession.read().jdbc(url, "key_date1", connectionProperties);
jd.collectAsList();

在hive server2的页面上我们看到生成的sql是
SELECT "key_date1.id","key_date1.name","key_date1.start_date" FROM key_date1
注意这里的双引号,这会导致生成key_date表行数那么多的相同的3个字符串.

换成javaRDD也没用
SparkSession sparkSession =
SparkSession.builder().appName("HiveMySQLApp").master("local").getOrCreate();

String url = "jdbc:hive2://n227-026-077:10000";

Properties connectionProperties = new Properties();
connectionProperties.put("user", "root");
connectionProperties.put("password", "");
connectionProperties.put("driver", "org.apache.hive.jdbc.HiveDriver");

sparkSession
    .read()
    .jdbc(url, "analysis.key_date1", connectionProperties)
    .createOrReplaceTempView("view");
JavaRDD<Row> javaRDD = sparkSession.table("view").javaRDD();
javaRDD.foreach(
    row -> System.out.println(new KeyDate(row.getInt(1), row.getString(2), row.getString(3))));

似乎是包中代码冲突(反编译scala的spark-sql包后发现调用的会抛出异常的方法).但是为什么直接运行命令行的spark-sql没有这个问题
用spark2.4.7目录下的jar覆盖本地m2目录中的jar
/Users/bytedance/.m2/repository/org/apache/spark/spark-core_2.11/2.4.7
scp tangsong.math@10.227.26.77:/repos/spark-2.4.7-bin-hadoop2.7/jars/spark-sql_2.11-2.4.7.jar ./
/Users/bytedance/.m2/repository/org/spark-project/hive/hive-jdbc/1.2.1.spark2
scp tangsong.math@10.227.26.77:
/repos/spark-2.4.7-bin-hadoop2.7/jars/hive-jdbc-1.2.1.spark2.jar ./

事后证明是配置文件不对.resources目录下的hive-site.xml文件没写对.
更换了hive1.2.2以及hadoop2.7.7版本
注意给spark用的hive-site.xml中路径为本地路径.文件新增如下配置.(如果在开发机上部署的远程spark运行,则mysql的ip:port可以写成localhost:3306.最下面的hive.downloaded.resources.dir也要相应修改.
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://10.227.26.77:3306/hive?createDatabaseIfNotExist=true&serverTimezone=Asia/Shanghai</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.cj.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>

<property>
    <name>hive.querylog.location</name>
    <value>/data/hive_repo/querylog</value>
</property>
<property>
    <name>hive.exec.local.scratchdir</name>
    <value>/Users/bytedance/repos/scratchdir</value>
</property>
<property>
    <name>hive.downloaded.resources.dir</name>
    <value>/Users/bytedance/repos/resources</value>
</property>

public static void main(String[] args) {
String url = "jdbc:hive2://n227-026-077:10000";

SparkSession sparkSession =
    SparkSession.builder()
        .appName("HiveMySQLApp")
        .config("url", url)
        .enableHiveSupport()
        .master("local")
        .getOrCreate();
sparkSession.sql("select* from analysis.key_date").show();

}

spark sql读取到的下标从0开始
List<row> rows = sparkSession.sql("select* from analysis.key_date").collectAsList();
for(Row row:rows){
System.out.println(new KeyDate(row.getInt(0),row.getString(1),row.getString(2)));
}</row>

最终配置
<project xmlns="http&#58;&#47;&#47;maven&#46;apache&#46;org&#47;POM&#47;4&#46;0&#46;0" xmlns&#58;xsi="http&#58;&#47;&#47;www&#46;w3&#46;org&#47;2001&#47;XMLSchema&#45;instance" xsi&#58;schemalocation="http&#58;&#47;&#47;maven&#46;apache&#46;org&#47;POM&#47;4&#46;0&#46;0 http&#58;&#47;&#47;maven&#46;apache&#46;org&#47;maven&#45;v4&#95;0&#95;0&#46;xsd">
<modelversion>4.0.0</modelversion>
<groupid>org.example</groupid>
<artifactid>untitled4</artifactid>
<version>1.0-SNAPSHOT</version>
<inceptionyear>2008</inceptionyear>
</project>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<mainClass>com.zkdx.WordCountSpark</mainClass>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<hadoop.version>2.7.3</hadoop.version>
<spark.version>2.3.2</spark.version>
<dependencies> <dependency> <groupid>mysql</groupid> <artifactid>mysql-connector-java</artifactid> <version>8.0.16</version> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-jdbc</artifactId>
  <version>1.2.1</version>
</dependency>






<dependency> <!-- Spark dependency -->
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.11</artifactId>
  <version>${spark.version}</version>
</dependency>

<dependency>

  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>${spark.version}</version>
  <exclusions>
    <exclusion>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
    </exclusion>
    <exclusion>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs-client</artifactId>
    </exclusion>
    <exclusion>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
    </exclusion>
    <exclusion>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
    </exclusion>
  </exclusions>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-common</artifactId>
  <version>${hadoop.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-client -->


<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-hdfs</artifactId>
  <version>${hadoop.version}</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-eclipse-plugin -->
<dependency>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-eclipse-plugin</artifactId>
  <version>2.5.1</version>
</dependency>


<dependency>
  <groupId>junit</groupId>
  <artifactId>junit</artifactId>
  <version>4.4</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.specs</groupId>
  <artifactId>specs</artifactId>
  <version>1.2.5</version>
  <scope>test</scope>
</dependency>
</dependencies> <build> <plugins> <plugin> <artifactid>maven-compiler-plugin</artifactid> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> <compilerargument>-proc:none</compilerargument>
    </configuration>
  </plugin>
  <plugin>
    <groupId>com.coveo</groupId>
    <artifactId>fmt-maven-plugin</artifactId>
    <version>2.5.1</version>
    <executions>
      <execution>
        <goals>
          <goal>check</goal>
        </goals>
      </execution>
    </executions>
  </plugin>
  <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-jar-plugin</artifactId>
    <configuration>
      <archive>
        <!--放置生成的文件覆盖我们自定义的文件-->
        <addMavenDescriptor>false</addMavenDescriptor>
      </archive>
    </configuration>
  </plugin>
  <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>2.4.1</version>
    <executions>
      <execution>
        <phase>package</phase>
        <goals>
          <goal>shade</goal>
        </goals>
        <configuration>
          <transformers>
            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"></transformer>
          </transformers><createDependencyReducedPom>false</createDependencyReducedPom>
        </configuration>
      </execution>
    </executions>
  </plugin>

</plugins>
</configuration> </plugin>

也可以创建表,此时必须指定用户名密码否则有权限问题(直接访问HDFD因此是指定hadoop用户名)
当使用create as select语句的时候不能创建external table.创建external talel必须指定path.
String url = "jdbc:hive2://n227-026-077:10000";
System.getProperties().setProperty("HADOOP_USER_NAME", "root");
SparkSession sparkSession =
SparkSession.builder()
.appName("HiveMySQLApp")
.config("url", url)
.enableHiveSupport()
.master("local")
.getOrCreate();

sparkSession.sql(
    "create table analysis.key_date1 as select* from analysis.key_date where id>10");

运行pyspark
会提示
Using Python version 2.7.13 (default, Sep 26 2018 18:42:22)
SparkSession available as 'spark'.
输入
spark.sql('select* from analysis.key_date1').show()

</plugins></build>