思路图示分析 Server.java
package com.lagou.zk.onoffline;
import org.I0Itec.zkclient.ZkClient;
public class Server {
ZkClient zkClient = null;
private void connectZk(){
zkClient = new ZkClient("centos7-1:2181,centos7-2:2181");
if (!zkClient.exists("/servers")){
zkClient.createPersistent("/servers");
}
}
private void saveServerInfo(String ip,String port){
String sequencePath = zkClient.createEphemeralSequential("/servers/server", ip + ":" + port);
System.out.println("服务器:"+ip+":"+port+"向zk保存信息成功,成功上线可以接受client查询");
}
public static void main(String[] args) {
Server server = new Server();
server.connectZk();
server.saveServerInfo(args[0],args[1]);
new TimeService(Integer.parseInt(args[1])).start();
}
}
TimeService.java
package com.lagou.zk.onoffline;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Date;
public class TimeService extends Thread {
private int port = 0;
public TimeService(int port) {
this.port = port;
}
@Override
public void run() {
try {
ServerSocket serverSocket = new ServerSocket(port);
while (true){
Socket accept = serverSocket.accept();
OutputStream out = accept.getOutputStream();
out.write(new Date().toString().getBytes());
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
如下图所示,两个服务器已经起来 后台看一下是/servers否已经创建成功,可以看到有两个节点 Client.java
package com.lagou.zk.onoffline;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class Client {
ZkClient zkClient = null;
ArrayList<String> infos = new ArrayList<>();
private void connectZk(){
zkClient = new ZkClient("centos7-1:2181,centos7-2:2181");
List<String> childs = zkClient.getChildren("/servers");
for (String child : childs) {
Object o = zkClient.readData("/servers/" + child);
infos.add(String.valueOf(o));
}
zkClient.subscribeChildChanges("/servers", new IZkChildListener() {
@Override
public void handleChildChange(String s, List<String> child) throws Exception {
ArrayList<String> list = new ArrayList<>();
for (String path : child) {
Object o = zkClient.readData("/servers/" + path);
list.add(String.valueOf(o));
}
infos = list;
System.out.println("接收到通知,最新服务器信息为:"+infos);
}
});
}
public void sendRequest() throws IOException {
Random random = new Random();
int i = random.nextInt(infos.size());
String[] arr = infos.get(i).split(":");
Socket socket = new Socket(arr[0], Integer.parseInt(arr[1]));
OutputStream out = socket.getOutputStream();
InputStream in = socket.getInputStream();
out.write("query time".getBytes());
byte[] b = new byte[1024];
in.read(b);
System.out.println("client端接受到server"+infos.get(i)+"返回结果"+new String(b));
in.close();
out.close();
socket.close();
}
public static void main(String[] args) throws IOException, InterruptedException {
Client client = new Client();
client.connectZk();
while (true){
client.sendRequest();
Thread.sleep(2000);
}
}
}
接下来启动客户端 效果如下 已成功完成监听!
|