四、Zookeeper源码解析之server启动过程

QuorumPeerMain和ZooKeeperServerMain

我们在源码中可以找到这两个类,主要区别是一个是集群模式一个是独立模式。我们在后台启动并使用jps -l的时候会发现
kafka对应的进程主类是: org.apache.zookeeper.server.quorum.QuorumPeerMain
所以我们从QuorumPeerMain开始看

启动

直接看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
main.initializeAndRun(args); // 这是主要逻辑
} catch (Exception e) {
LOG.error("Unexpected exception, exiting abnormally", e);
}
LOG.info("Exiting normally");
ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue());
}
// 初始化并启动
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]); // 加载参数
}

// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
config.getDataDir(),
config.getDataLogDir(),
config.getSnapRetainCount(),
config.getPurgeInterval());
purgeMgr.start();

if (args.length == 1 && config.isDistributed()) {
runFromConfig(config); // 集群模式
} else {
LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args); // 单机模式
}
}

从上面的代码中可以看到启动的主要步骤有三个:

  1. 加载配置QuorumPeerConfig
  2. 启动DatadirCleanupManager线程
  3. 根据配置来判断是启动集群环境和单机环境。

加载配置我们略过,主要是从文件中读取properties,我们重点看后面两个,因为我们环境中主要是集群环境

DatadirCleanupManager启动过程

这个类通过使用指定的’autopurge.purgeInterval’来调度自动清除任务,从而管理快照和相应事务日志的清理。—官方说明
这个怎么理解呢?

因为Zookeeper的修改操作(增删改)都会记录一个日志,另外Zookeeper每隔一段时间会生成一次快照,这些会持续化到硬盘的文件中,
所以当运行一段时间后,文件数量和快照会非常多,而我们这个类主要任务就是定时清理这些文件。我们接着看源码。

类结构

1
2
3
4
5
6
private PurgeTaskStatus purgeTaskStatus = PurgeTaskStatus.NOT_STARTED;  // 状态: 未启动、已启动、完成三个状态
private final File snapDir; // 快照文件夹
private final File dataLogDir; // 数据日志文件夹
private final int snapRetainCount; // 快照保存数量 默认是3
private final int purgeInterval; // 清理间隔,也就是多久触发一次清理 默认是0
private Timer timer; // 定时器,负责单独起一个线程去清理

清理过程

主要是创建一个TimeTask的任务,复写run 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Override
public void run() {
LOG.info("Purge task started.");
try {
PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
} catch (Exception e) {
LOG.error("Error occurred while purging.", e);
}
LOG.info("Purge task completed.");
}
// 主要的清理逻辑
public static void purge(File dataDir, File snapDir, int num) throws IOException {
if (num < 3) {
throw new IllegalArgumentException(COUNT_ERR_MSG);
}
FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
List<File> snaps = txnLog.findNValidSnapshots(num); // 找到需要保留的文件
int numSnaps = snaps.size();
if (numSnaps > 0) {
purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1)); // 删除旧的文件
}
}
// 删除逻辑 过滤器
class MyFileFilter implements FileFilter {
private final String prefix;
MyFileFilter(String prefix) {
this.prefix = prefix;
}
public boolean accept(File f) {
if (!f.getName().startsWith(prefix + ".")) {
return false;
}
if (retainedTxnLogs.contains(f)) {
return false;
}
long fZxid = Util.getZxidFromName(f.getName(), prefix);
return fZxid < leastZxidToBeRetain; // 重点代码,删除fZxid 比最后一个有效文件小的
}

}
File[] logs = txnLog.getDataDir().listFiles(new MyFileFilter(PREFIX_LOG));
// 删除文件
for (File f : files) {
if (!f.delete()) {
....
}
}

这里说一下Zookeeper的文件结构,两个重要的文件夹,一个是dataDir 一个是dataLogDir(如果没有配置的话,默认是dataDir),分别是快照(也就是整个内存中的树形节点数据)和事物日志。
所以上面的的过滤器,会匹配对应的前缀log.和snapshot.

手动清理 ./zkCleanup.sh 其实主要原理就是调用这个类 org.apache.zookeeper.server.PurgeTxnLog “$ZOODATALOGDIR” “$ZOODATADIR” $*

单机启动过程

参考Zookeeper单机模式启动
单机启动过程中main函数与QuorumPeerMain一样,都是调用initializeAndRun–>初始化配置ServerConfig(内部使用QuorumPeerConfig,再复制属性)再启动
看主要启动逻辑runFromConfig:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
LOG.info("Starting server");
FileTxnSnapLog txnLog = null;
try {
try {
metricsProvider = MetricsProviderBootstrap.startMetricsProvider(
config.getMetricsProviderClassName(),
config.getMetricsProviderConfiguration()); // 启动度量程序,可以参考metrics
} catch (MetricsProviderLifeCycleException error) {
throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);
}
ServerMetrics.metricsProviderInitialized(metricsProvider);
txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir); // 初始化事务日志和快照管理
JvmPauseMonitor jvmPauseMonitor = null;
if (config.jvmPauseMonitorToRun) {
jvmPauseMonitor = new JvmPauseMonitor(config);
}
// 创建一个 server
final ZooKeeperServer zkServer = new ZooKeeperServer(jvmPauseMonitor, txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, config.listenBacklog, null, config.initialConfig);
// 设置状态
txnLog.setServerStats(zkServer.serverStats());

// 设置服务结束钩子 也就是ZooKeeperServerShutdownHandler ,处理结束任务会触发
final CountDownLatch shutdownLatch = new CountDownLatch(1);
zkServer.registerServerShutdownHandler(new ZooKeeperServerShutdownHandler(shutdownLatch));

// 启动AdminServer ,默认会加载JettyAdminServer,如果没有就加载一个空的DummyAdminServer
adminServer = AdminServerFactory.createAdminServer();
adminServer.setZooKeeperServer(zkServer);
adminServer.start();

boolean needStartZKServer = true;
// 配置连接的服务框架并启动
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
cnxnFactory.startup(zkServer);
// zkServer has been started. So we don't need to start it again in secureCnxnFactory.
needStartZKServer = false;
}
// 配置安全
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
secureCnxnFactory.startup(zkServer, needStartZKServer);
}
// 定时清除容器节点和临时节点
containerManager = new ContainerManager(
zkServer.getZKDatabase(),
zkServer.firstProcessor,
Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)),
Integer.getInteger("znode.container.maxPerMinute", 10000),
Long.getLong("znode.container.maxNeverUsedIntervalMs", 0)
);
containerManager.start();
ZKAuditProvider.addZKStartStopAuditLog();

// Watch status of ZooKeeper server. It will do a graceful shutdown
// if the server is not running or hits an internal error.
shutdownLatch.await();

shutdown();
}

步骤解析:

  1. 启动度量程序,使用反射的方式根据配置中的类名创建一个实例,监控指标,默认是DefaultMetricsProvider
  2. 创建结束钩子
  3. 创建一个AdminServer 用来管理ZooKeeperServer。AdminServer是3.5.0版本中新增特性,是一个内置的Jettry服务,它提供了一个HTTP接口为四字母单词命令。默认的,服务被启动在8080端口,并且命令被发起通过URL “/commands/[command name]”,例如,http://localhost:8080/commands/stat
  4. 配置连接的服务框架ServerCnxnFactory并启动,这是核心服务,之前是NIO实现的,后面的版本增加Netty框架实现,后面继续分析其启动过程
  5. 配置安全相关
  6. 配置定时清除容器节点和临时节点
  7. 等待结束

    集群启动过程

    集群模式的入口是 runFromConfig(config); 其实原理和单机模式类似,前面也是执行一些度量监控子进程,我们来看核心代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @Override
    public synchronized void start() {
    if (!getView().containsKey(myid)) {
    throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    loadDataBase(); // 加载数据
    startServerCnxnFactory(); // 启动网络服务
    try {
    adminServer.start();
    } catch (AdminServerException e) {
    LOG.warn("Problem starting AdminServer", e);
    System.out.println(e);
    }
    startLeaderElection();
    startJvmPauseMonitor();
    super.start();
    }
  8. 加载数据,也就是持久化在文件里面的数据
  9. 启动io服务
  10. 启动AdminServer
  11. 进行主从选举
  12. 启动一个简单的线程,健康程序是否异常

这个几个过程和独立模式有点类似,主要不同的地方就是,多了主从选举,后续我们队各个过程惊醒梳理