CentOS 7 搭建Spark2.3.1分布式集群

下载安装包

  1. 官方下载

点击这里下载,官方提供几种构建方式。为了节省时间,选择预先编译版本的hadoop。

  1. 安装前提
  • JDK8
  • ZooKeeper,安装参考这里
  • Hadoop,安装参考这里
  • Scala

注意:从Spark2.0版开始,默认使用Scala 2.11构建。Scala 2.10用户应该下载Spark源包并使用Scala2.10支持构建。

  1. 集群规划
节点名称 IP ZooKeeper Master Worker
spark-node1 192.168.50.200 ZooKeeper 主Master
spark-node2 192.168.50.201 ZooKeeper 备Master Worker
spark-node3 192.168.50.202 ZooKeeper Worker

集群安装

  1. 解压缩
1
2
[xxx@spark-node1 ~]$ tar zxvf spark-2.3.1-bin-hadoop2.7.tgz -C /opt/
[xxx@spark-node1 ~]$ sudo ln -s /opt/spark-2.31-bin-hadoop2.7 /opt/spark
  1. 修改配置文件
  1. 进入配置文件所在目录
1
2
3
4
5
6
7
8
9
10
[xxx@spark-node1 ~]$ cd /opt/spark/conf/
[xxx@spark-node1 ~]$ ll
total 36K
-rw-rw-r--. 1 galudisu galudisu 996 Jun 1 16:49 docker.properties.template
-rw-rw-r--. 1 galudisu galudisu 1.1K Jun 1 16:49 fairscheduler.xml.template
-rw-rw-r--. 1 galudisu galudisu 2.0K Jun 1 16:49 log4j.properties.template
-rw-rw-r--. 1 galudisu galudisu 7.7K Jun 1 16:49 metrics.properties.template
-rw-rw-r--. 1 galudisu galudisu 865 Jun 1 16:49 slaves.template
-rw-rw-r--. 1 galudisu galudisu 1.3K Jun 1 16:49 spark-defaults.conf.template
-rwxrwxr-x. 1 galudisu galudisu 4.2K Jun 1 16:49 spark-env.sh.template
  1. 复制spark-env.sh.template并重命名为spark-env.sh
1
2
[xxx@spark-node1 conf]$ cp spark-env.sh.template spark-env.sh
[xxx@spark-node1 conf]$ vim spark-env.sh

编辑并在文件末尾添加如下配置内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#指定默认master的ip或主机名
export SPARK_MASTER_HOST=node21
#指定maaster提交任务的默认端口为7077
export SPARK_MASTER_PORT=7077
#指定masster节点的webui端口
export SPARK_MASTER_WEBUI_PORT=8080
#每个worker从节点能够支配的内存数
export SPARK_WORKER_MEMORY=1g
#允许Spark应用程序在计算机上使用的核心总数(默认值:所有可用核心)
export SPARK_WORKER_CORES=1
#每个worker从节点的实例(可选配置)
export SPARK_WORKER_INSTANCES=1
#指向包含Hadoop集群的(客户端)配置文件的目录,运行在Yarn上配置此项
export HADOOP_CONF_DIR=/opt/module/hadoop-2.7.6/etc/hadoop
#指定整个集群状态是通过zookeeper来维护的,包括集群恢复
export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=node21:2181,node22:2181,node23:2181
-Dspark.deploy.zookeeper.dir=/spark"
  1. 复制slaves.templateslaves,并修改配置内容
1
2
[xxx@spark-node1 conf]$ cp slaves.template slaves
[xxx@spark-node1 conf]$ vim slaves

修改从节点

1
2
spark-node2
spark-node3
  1. 将安装包分发给其它节点
1
2
[xxx@spark-node1 opt]$ scp -r spark-2.31-bin-hadoop2.7 xxx@spark-node2:/opt/
[xxx@spark-node1 opt]$ scp -r spark-2.31-bin-hadoop2.7 xxx@spark-node3:/opt/

修改spark-node2节点上conf/spark-env.sh配置的MasterIP为SPARK_MASTER_IP=spark-node2

  1. 配置环境变量

所有节点均要配置

1
2
3
4
5
[xxx@spark-node1 spark]$ sudo vim /etc/profile

export SPARK_HOME=/opt/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
[xxx@spark-node1 spark]$ source /etc/profile

启动集群

  1. 启动ZooKeeper集群

所有ZooKeeper节点均要执行

1
[xxx@spark-node1 ~]$ zkServer.sh start
  1. 启动Hadoop集群
1
2
3
[xxx@spark-node1 ~]$ start-dfs.sh
[xxx@spark-node2 ~]$ start-yarn.sh
[xxx@spark-node3 ~]$ yarn-daemon.sh start resourcemanager
  1. 启动Spark集群

启动Spark:启动master节点:sbin/start-master.sh 启动worker节点:sbin/start-slaves.sh
或者:sbin/start-all.sh

1
[xxx@spark-node1 spark]$ sbin/start-all.sh

注意:备用master节点需要手动启动

1
[xxx@spark-node2 spark]$ sbin/start-master.sh
  1. 查看进程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[xxx@spark-node1 spark]$ jps
2368 JobHistoryServer
2928 Jps
2227 DFSZKFailoverController
2533 Master
2006 JournalNode
1512 NodeManager
1610 NameNode
1820 DataNode

[xxx@spark-node2 spark]$ jps
2256 DFSZKFailoverController
1651 NameNode
2435 Worker
2101 JournalNode
2854 Jps
1576 NodeManager
1209 ResourceManager
1835 DataNode

[xxx@spark-node3 spark]$ jps
1796 DataNode
2436 Jps
1900 JournalNode
1613 NodeManager
2095 Worker

验证集群HA

  1. 看Web页面Master状态

spark-node1是ALIVE状态,spark-node2为STANDBY状态,WebUI查看:http://spark-node1:8080/

spark-master1
spark-master2

从节点连接地址:http://spark-node2:8081/

spark-worker1
spark-worker2

  1. 验证HA的高可用

手动干掉spark-node1上面的Master进程,spark-node2:8080将自动切换为ALIVE状态。

spark-alive

  1. HA注意点
  • 主备切换过程中不能提交Application。
  • 主备切换过程中不影响已经在集群中运行的Application。因为Spark是粗粒度资源调度。

spark-ha

集群提交命令方式

  1. Standalone模式

Standalone-client

  1. 提交命令
1
2
3
4
5
[xxx@spark-node1 spark]$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master spark://spark-node1:7077 \
--executor-memory 500m \
--total-executor-cores 1 \
examples/jars/spark-examples_2.11-2.3.1.jar 10

或者

1
2
3
4
5
6
[xxx@spark-node2 spark]$ ./bin/spark-submit --class org.apache.spark.exmaples.SparkPi \
--master spark://spark-node1:7077 \
--deploy-mode client \
--executor-memory 500m \
--total-executor-cores 1 \
examples/jars/spark-examples_2.11-2.3.1.jar 10
  1. 提交原理图解

spark-standalone

  1. 执行流程
  • client模式提交任务后,会在客户端启动Driver进程。
  • Driver会向Master申请启动Application启动的资源。
  • 资源申请成功,Driver端将task发送到worker端执行。
  • worker将task执行结果返回到Driver端。
  1. 总结

client模式使用于测试调试程序。Driver进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。在Driver端可以看到task执行的情况。生产环境下不能使用client模式,是因为:假设要提交100个Application到集群运行,Driver每次都会在client端启动,那么就会导致客户端100次网卡流量暴增的问题。

Standalone-cluster

  1. 提交命令
1
2
3
4
[xxx@spark-node1 spark]$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master spark:spark-node1:7077 \
--deploy-mode cluster \
examples/jars/spark-examples_2.11-2.3.1.jar 10
  1. 提交原理图解

spark-cluster

  1. 执行流程
  • cluster模式提交应用程序后,会向Master请求启动Driver。
  • Master接受请求,随机在集群一台节点启动Driver进程。
  • Driver启动后为当前的应用程序申请资源。
  • Driver端发送task到worker节点上执行。
  • worker将执行情况和执行结果返回给Driver端。
  1. 总结

Driver进程是在集群某一台Worker上启动的,在客户端无法查看task的执行情况的。假设要提交100个application到集群运行,每次Driver会随机在集群中某一台Worker上启动,那么这100次网卡流量暴增的问题就散步在集群上。

  1. Yarn模式

yarn-client

  1. 提交命令

以client模式启动Spark应用程序:

1
./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode client [options] <app jar> [app options]

例如

1
2
3
4
[xxx@spark-node1 spark]$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
examples/jars/spark-examples_2.11-2.3.1.jar 10
  1. 提交原理图解

spark-yarn

  1. 执行流程
  • 客户单提交一个Application,在客户端启动一个Driver进程。
  • 应用程序启动后会向RS(ResourceManager)发送请求,启动AM(ApplicationMaster)的资源。
  • RS收到请求,随机选择一台NM(NodeManager)启动AM。这里的NM相当于Standalone中的Worker节点。
  • AM启动后,会向RS请求一批container资源,用于启动Executor。
  • RS会找到一批NM返回给AM,用于启动Executor。
  • AM会向NM发送命令启动Executor。
  • Executor启动后,会反向注册给Driver,Driver发送task到Executor,执行情况和结果返回给Driver端。
  1. 总结

Yarn-client模式同样是适用于测试,因为Driver运行在本地,Driver会与yarn集群中的Executor进行大量的通信,会造成客户机网卡流量的大量增加。

ApplicationMaster的作用:

  • 为当前的Application申请资源
  • 给NodeManager发送消息启动Executor

注意:ApplicationMaster有launchExecutor和申请资源的功能,并没有作业调度的功能。

yarn-cluster

  1. 提交命令

以cluster模式启动Spark应用程序:

1
./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]

例如:

1
2
3
4
[xxx@spark-node1 spark]$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
examples/jars/sprk-examples_2.11-2.3.1.jar 10
  1. 提交原理图解

spark-yarn

  1. 执行流程
  • 客户机提交Application应用程序,发送请求到RS(ResourceManager),请求启动AM(ApplicationMaster)
  • RS收到请求后随机在一台NM(NodeManager)上启动AM(相当于Driver端)
  • AM启动,AM发送请求到RS,请求一批container用于启动Executor
  • RS返回一批NM节点给AM
  • AM连接到NM,发送请求到NM启动Executor
  • Executor反向注册到AM所在的节点的Driver,Driver发送task到Executor
  1. 总结

Yarn-Cluster主要用于生产环境中,因为Driver运行在Yarn集群中某一台nodeManager中,每次提交任务的Driver所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能通过yarn查看日志。

ApplicationMaster的作用:

  • 为当前的Application申请资源
  • 给NodeManager发送消息启动Executor
  • 任务调度

停止集群任务命令: yarn application -kill applicationID

配置历史服务器

  1. 临时配置

对本次提交的应用程序起作用

1
2
3
4
./spark-shell --master spark://node21:7077 
--name myapp1
--conf spark.eventLog.enabled=true
--conf spark.eventLog.dir=hdfs://spark-node1:8020/spark/test

停止程序,在Web UI中Completed Applications对应的ApplicationID中能查看history.

  1. 永久配置

spark-default.conf配置文件中配置History Server,对所有提交的Application都起作用

在客户端节点,进入../spark/conf/spark-defaults.conf最后加入

1
2
3
4
5
6
7
8
//开启记录事件日志的功能
spark.eventLog.enabled true
//设置事件日志存储的目录
spark.eventLog.dir hdfs://node21:8020/spark/test
//设置HistoryServer加载事件日志的位置
spark.history.fs.logDirectory hdfs://node21:8020/spark/test
//日志优化选项,压缩日志
spark.eventLog.compress true

启动HistorySever:

1
./start-history-server.sh

访问HistoryServer: spark-node1:18080,之后所有提交的应用程序运行状况都会被记录。

加入Systemd

和前面ZooKeeper、HA的配置一样,将Spark的启动加入Systemd,让系统自动维护。在/usr/lib/systemd/system/spark.serivce加入

因为用了ZooKeeper集群做统一化管理,只需要master节点加入service即可。

spark-node1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[Unit]
Description=Spark, Lightning-fast unified analytics engine
After=network.target remote-fs.target nss-lookup.target network-online.target
Requires=network-online.target
Wants=hadoop.target

[Service]
User=galudisu
Group=galudisu
Type=forking
ExecStart=/opt/spark/sbin/start-all.sh &
ExecStop=/opt/spark/sbin/stop-all.sh &
RemainAfterExit=yes
Environment=SPARK_HOME=/opt/spark

[Install]
WantedBy=multi-user.target

spark-node2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[Unit]
Description=Spark, Lightning-fast unified analytics engine
After=network.target remote-fs.target nss-lookup.target network-online.target
Requires=network-online.target
Wants=hadoop.target

[Service]
User=galudisu
Group=galudisu
Type=forking
ExecStart=/opt/spark/sbin/start-master.sh &
ExecStop=/opt/spark/sbin/stop-master.sh &
RemainAfterExit=yes
Environment=SPARK_HOME=/opt/spark

[Install]
WantedBy=multi-user.target

又或者单独编写一个脚本执行。