21、客户端启动
21.1、客户端启动源码解析流程图
21.1.1、创建 ZookeeperMain
在 ZkCli.sh 启动 Zookeeper 时,会调用 ZooKeeperMain.java Ctrl + n 查找 ZooKeeperMain,找到程序的入口 main()方法
public static void main(String args[]) throws CliException, IOException, InterruptedException
{
ZooKeeperMain main = new ZooKeeperMain(args);
main.run();
}
ZooKeeperMain
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
cl.parseOptions(args);
System.out.println("Connecting to " + cl.getOption("server"));
connectToZK(cl.getOption("server"));
}
connectToZK(cl.getOption(“server”));
protected void connectToZK(String newHost) throws InterruptedException, IOException {
if (zk != null && zk.getState().isAlive()) {
zk.close();
}
host = newHost;
boolean readOnly = cl.getOption("readonly") != null;
if (cl.getOption("secure") != null) {
System.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
System.out.println("Secure connection is enabled");
}
zk = new ZooKeeperAdmin(host, Integer.parseInt(cl.getOption("timeout")), new MyWatcher(), readOnly);
}
ZooKeeperAdmin
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly) throws IOException {
this(connectString, sessionTimeout, watcher, canBeReadOnly,
createDefaultHostProvider(connectString));
}
this(connectString, sessionTimeout, watcher, canBeReadOnly, createDefaultHostProvider(connectString))
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly, HostProvider aHostProvider)
throws IOException {
this(connectString, sessionTimeout, watcher, canBeReadOnly,aHostProvider, null);
}
21.1.2、初始化监听器
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly, HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
hostProvider = aHostProvider;
cnxn = createConnection(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
21.1.3、解析连接地址
ConnectStringParser
public ConnectStringParser(String connectString) {
int off = connectString.indexOf('/');
if (off >= 0) {
String chrootPath = connectString.substring(off);
if (chrootPath.length() == 1) {
this.chrootPath = null;
} else {
PathUtils.validatePath(chrootPath);
this.chrootPath = chrootPath;
}
connectString = connectString.substring(0, off);
} else {
this.chrootPath = null;
}
List<String> hostsList = split(connectString,",");
for (String host : hostsList) {
int port = DEFAULT_PORT;
int pidx = host.lastIndexOf(':');
if (pidx >= 0) {
if (pidx < host.length() - 1) {
port = Integer.parseInt(host.substring(pidx + 1));
}
host = host.substring(0, pidx);
}
serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
}
}
InetSocketAddress.createUnresolved(host, port)
private static class InetSocketAddressHolder {
private String hostname;
private InetAddress addr;
private int port;
private InetSocketAddressHolder(String hostname, InetAddress addr, int port) {
this.hostname = hostname;
this.addr = addr;
this.port = port;
}
......
21.1.4、创建通信
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly, HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
hostProvider = aHostProvider;
cnxn = createConnection(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
getClientCnxnSocket()
private ClientCnxnSocket getClientCnxnSocket() throws IOException {
String clientCnxnSocketName = getClientConfig().getProperty(
ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
try {
Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class);
ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
return clientCxnSocket;
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ clientCnxnSocketName);
ioe.initCause(e);
throw ioe;
}
}
createConnection
protected ClientCnxn createConnection(String chrootPath,
HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
boolean canBeReadOnly) throws IOException {
return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this,
watchManager, clientCnxnSocket, canBeReadOnly);
}
new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this, watchManager, clientCnxnSocket, canBeReadOnly);
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
this.clientConfig=zooKeeper.getClientConfig();
initRequestTimeout();
}
SendThread(clientCnxnSocket);
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(makeThreadName("-SendThread()"));
state = States.CONNECTING;
this.clientCnxnSocket = clientCnxnSocket;
setDaemon(true);
}
super(makeThreadName("-SendThread()"));
public ZooKeeperThread(String threadName) {
super(threadName);
setUncaughtExceptionHandler(uncaughtExceptionalHandler);
}
// ZooKeeperThread 是一个线程,执行它的 run()方法
public void run() {
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000;
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
try {
if (!clientCnxnSocket.isConnected()) {
if (closing) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
serverAddress = hostProvider.next(1000);
}
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
if (state.isConnected()) {
......
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
......
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
......
}
}
synchronized (state) {
cleanup();
}
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Closed, null));
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x"
+ Long.toHexString(getSessionId()));
}
startConnect(serverAddress);
private void startConnect(InetSocketAddress addr) throws IOException {
saslLoginFailed = false;
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
state = States.CONNECTING;
String hostPort = addr.getHostString() + ":" + addr.getPort();
MDC.put("myid", hostPort);
setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")"));
......
logStartConnect(addr);
clientCnxnSocket.connect(addr);
}
connect(addr);ctrl + alt +B 查找 connect 实现类,ClientCnxnSocketNIO.java
void connect(InetSocketAddress addr) throws IOException {
SocketChannel sock = createSock();
try {
registerAndConnect(sock, addr);
} catch (IOException e) {
LOG.error("Unable to open socket to " + addr);
sock.close();
throw e;
}
initialized = false;
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
registerAndConnect
void registerAndConnect(SocketChannel sock, InetSocketAddress addr)
throws IOException {
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
boolean immediateConnect = sock.connect(addr);
if (immediateConnect) {
sendThread.primeConnection();
}
}
sendThread.primeConnection();
void primeConnection() throws IOException {
LOG.info("Socket connection established, initiating session, client: {}, server: {}",
clientCnxnSocket.getLocalSocketAddress(),
clientCnxnSocket.getRemoteSocketAddress());
isFirstConnect = false;
... ...
}
ctrl + alt +B 查找 doTransport 实现类,ClientCnxnSocketNIO.java
void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
updateSocketAddresses();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
if (findSendablePacket(outgoingQueue,
sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
}
selected.clear();
}
doIO(pendingQueue, cnxn);
void doIO(List<Packet> pendingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
......
}
21.1.5、执行 run()
public static void main(String args[]) throws CliException, IOException, InterruptedException
{
ZooKeeperMain main = new ZooKeeperMain(args);
main.run();
}
main.run();
void run() throws CliException, IOException, InterruptedException {
if (cl.getCommand() == null) {
System.out.println("Welcome to ZooKeeper!");
boolean jlinemissing = false;
try {
Class<?> consoleC = Class.forName("jline.console.ConsoleReader");
Class<?> completorC =
Class.forName("org.apache.zookeeper.JLineZNodeCompleter");
System.out.println("JLine support is enabled");
Object console =
consoleC.getConstructor().newInstance();
Object completor =
completorC.getConstructor(ZooKeeper.class).newInstance(zk);
Method addCompletor = consoleC.getMethod("addCompleter",
Class.forName("jline.console.completer.Completer"));
addCompletor.invoke(console, completor);
String line;
Method readLine = consoleC.getMethod("readLine", String.class);
while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
executeLine(line);
}
} catch (ClassNotFoundException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (NoSuchMethodException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (InvocationTargetException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (IllegalAccessException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (InstantiationException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
}
if (jlinemissing) {
System.out.println("JLine support is disabled");
BufferedReader br =
new BufferedReader(new InputStreamReader(System.in));
String line;
while ((line = br.readLine()) != null) {
executeLine(line);
}
}
} else {
processCmd(cl);
}
System.exit(exitCode);
}
executeLine(line);
public void executeLine(String line) throws CliException, InterruptedException, IOException {
if (!line.equals("")) {
cl.parseCommand(line);
addToHistory(commandCount,line);
processCmd(cl);
commandCount++;
}
}
processCmd(cl);
protected boolean processCmd(MyCommandOptions co) throws CliException, IOException, InterruptedException {
boolean watch = false;
try {
watch = processZKCmd(co);
exitCode = 0;
} catch (CliException ex) {
exitCode = ex.getExitCode();
System.err.println(ex.getMessage());
}
return watch;
}
processZKCmd(co);
protected boolean processZKCmd(MyCommandOptions co) throws CliException, IOException, InterruptedException {
String[] args = co.getArgArray();
String cmd = co.getCommand();
if (args.length < 1) {
usage();
throw new MalformedCommandException("No command entered");
}
if (!commandMap.containsKey(cmd)) {
usage();
throw new CommandNotFoundException("Command not found " + cmd);
}
boolean watch = false;
LOG.debug("Processing " + cmd);
if (cmd.equals("quit")) {
zk.close();
System.exit(exitCode);
} else if (cmd.equals("redo") && args.length >= 2) {
Integer i = Integer.decode(args[1]);
if (commandCount <= i || i < 0) {
throw new MalformedCommandException("Command index out of range");
}
cl.parseCommand(history.get(i));
if (cl.getCommand().equals("redo")) {
throw new MalformedCommandException("No redoing redos");
}
history.put(commandCount, history.get(i));
processCmd(cl);
} else if (cmd.equals("history")) {
for (int i = commandCount - 10; i <= commandCount; ++i) {
if (i < 0) continue;
System.out.println(i + " - " + history.get(i));
}
} else if (cmd.equals("printwatches")) {
if (args.length == 1) {
System.out.println("printwatches is " + (printWatches ? "on" : "off"));
} else {
printWatches = args[1].equals("on");
}
} else if (cmd.equals("connect")) {
if (args.length >= 2) {
connectToZK(args[1]);
} else {
connectToZK(host);
}
}
if (zk == null || !zk.getState().isAlive()) {
System.out.println("Not connected");
return false;
}
CliCommand cliCmd = commandMapCli.get(cmd);
if(cliCmd != null) {
cliCmd.setZk(zk);
watch = cliCmd.parse(args).exec();
} else if (!commandMap.containsKey(cmd)) {
usage();
}
return watch;
}
|