百度搜出来的都是什么辣鸡方案,嵌入个页面你TM叫集成?好意思么!好在努力的人都比较幸运,在一个python大神的帮助下实现了集成方案。废话不多少,直接上方案。本文不会贴太多代码,只会描述关键点,有问题可以私信交流。
既然是集成jupyter,那么最简单的办法就是自己请求jupyter sever接口。jupyter本身就是一个前后端项目,其实集成jupyter核心逻辑就是用自己的前端页面请求jupyter server。jupyter server api 可以从The REST API — Jupyter Server documentation?获取,也可以自己启动一个jupyter项目,通过浏览器查看页面请求获取。这个方案是可行,只是获取token 比较麻烦,而且会话还要基于文档创建,感觉比较费事,有兴趣的自行研究吧。我采用的是另外一种方案,就是jupyter kernelgateway ,这个也有文档,不过需要翻墙。其实这个感觉就是启动一个纯净的jupyter server 服务,没有乱七八糟的权限控制,建议用这个。启动命令很简单:
jupyter?kernelgateway?--JupyterWebsocketPersonality.list_kernels=True
那么如何交互进行命令发送和结果获取呢,继续往下看。因为项目原因, 我采用的是三方通信模式,就是前端命令先发送到java服务端,然后java服务端再转发给jupyter server。
编辑器我用的是微软的monaco编辑器,其它编辑器也行,如果后面要做python动态代码提示的话建议用微软的monaco编辑器,毕竟lsp 语言服务协议都是人家定,monaco有天生优势。
编辑器初始化也很简单,我简单贴下我的代码,网上一大堆编辑器初始化方案。
var el =document.getElementById("scriptTt"+id) ;
// create Monaco editor
var scriptCodeMirror= monaco.editor.create(document.getElementById("scriptTt"+id), {
model: monaco.editor.createModel(nowScript, "python"),
glyphMargin: true,
lightbulb: {
enabled: true
},
scrollBeyondLastLine: false,
automaticLayout: true,
autoIndent:true,//自动布局
minimap: { // 关闭代码缩略图
enabled: true // 是否启用预览图
},
fontSize: 10,
wordWrap: "bounded",
wrappingIndent: 'indent'
});
然后就是和java服务端建立websocket 连接。我用的是xterm.js +webssh.js ,代码贴出来参考一下,用什么都行,只要能建立websocket就行。
//初始化客户端
client = new WSSHClient('${applicationScope["webSshServer"]}');
//建立websocket 连接
openJupyterTerminal({
operate : 'connect',
jupyterKerneUrl:$("#kernelUrl").val()
});
function openJupyterTerminal(options) {
//执行连接操作
client.connect({
onError : function(error) {
//连接失败回调
term.write('Error: ' + error + '\r\n');
},
onConnect : function() {
//连接成功回调
client.sendInitData(options);
},
onClose : function() {
//连接关闭回调
$.messager.alert('信息提示', '连接已关闭!', 'info');
},
onData : function(data) {
console.log("curtRunner-----------------");
console.log(curtRunner);
console.log(data);
if(curtRunner=="python"){
data=$.parseJSON( data );
if(data.status==0){
runflag=true;
$("#jupyterConnect").css("display","none");
$("#jupyterOffConnect").css("display","inline");
$.messager.alert("提示", "交互式环境加载完成!", "info");
};
if(data.status==1){
msgData=$.parseJSON( data.msgData );
console.log(msgData);
if(msgData.msg_type=="execute_result"){
if(selectLogCodeMirror != null){
selectLogCodeMirror.setValue(selectLogCodeMirror.getValue()+msgData.content.data["text/plain"]);
}
};
if(msgData.msg_type=="display_data"){
if(msgData.content.data["image/png"]){
$("#figureImg").attr("src", "data:image/jpg;base64,"+msgData.content.data["image/png"]);
$('#wz').window('open');
}
}
if(msgData.msg_type=="stream"){
if(selectLogCodeMirror != null){
selectLogCodeMirror.setValue(selectLogCodeMirror.getValue()+msgData.content.text);
}
};
if(msgData.msg_type=="status"){
if(msgData.content.execution_state=="idle"){
pythonRuning=false;
// $($(runCellId).find(".runCell")).css("display","inline");
// $($(runCellId).find(".stopCell")).css("display","none");
$($(runCellId).find(".outSpan")).html("执行结果");
}else{
// $($(runCellId).find(".runCell")).css("display","none");
// $($(runCellId).find(".stopCell")).css("display","inline");
pythonRuning=true;
};
};
if(msgData.msg_type=="error"){
$.messager.alert("提示", "代码执行异常。<br/>"+msgData.content.evalue , "error");
};
if(msgData.msg_type=="shutdown_reply"){
if(msgData.content.status="ok"){
$("#jupyterConnect").css("display","inline");
$("#jupyterOffConnect").css("display","none");
$.messager.alert("提示", "jupyter server 连接已断开。<br/>", "error");
}
}
};
if(data.status==2||data.status==3){
runflag=false;
$("#jupyterConnect").css("display","inline");
$("#jupyterOffConnect").css("display","none");
msgData=data.msgData;
$.messager.alert("提示", "连接异常,请检查jupyer服务。<br/>"+msgData,"error" );
}
}
}
});
}
这个代码无非就是建立连接,然后解析接收到的数据,没什么好说的。
然后来说java服务端。java服务端的职责就是接收前端命令,然后转发给jupyter。初次接收命令会进行初始化操作,初始化的时候会连接jupyter server。关键代码如下:
private void connectToJupyerServer(JupyterClientInfo jupyterClientInfo, JupyterClientData jupyterClientData, WebSocketSession webSocketSession) throws Exception {
logger.info("connectToJupyerServer");
String baseUrl="";
String[] baseUrlArr=jupyterClientData.getJupyterKerneUrl().split("://");
if(baseUrlArr.length==2){
baseUrl=baseUrlArr[1];
}else{
baseUrl= baseUrlArr[0];
};
jupyterClientInfo.setJupyerUrl(baseUrl);
String result=HttpUtils.httpRequestToString("http://"+baseUrl+"/api/kernels","GET",null);
System.out.println("-----kernels------------");
System.out.println(result);
JSONArray kernelsJson= JSONArray.parseArray(result);
JSONObject kernelobj=new JSONObject();
if(kernelsJson.size()>0){
kernelobj=(JSONObject) kernelsJson.get(0);
}else{
//获取 kernel
result = HttpUtils.httpRequestToString("http://"+baseUrl+"/api/kernelspecs","GET",null);
System.out.println("-----kernelspecs------------");
System.out.println(result);
JSONObject kernelspecs= JSONObject.parseObject(result);
String kernelsNmae=kernelspecs.getString("default");
JSONObject newkernelsJson=new JSONObject();
newkernelsJson.put("name", kernelsNmae);
newkernelsJson.put("path", "/opendata");
//创建内核
result=HttpUtils.httpRequestToString("http://"+baseUrl+"/api/kernels","POST", JSONObject.toJSONString(newkernelsJson) );
kernelobj=JSONObject.parseObject(result);
};
if (kernelobj!=null){
System.out.println("server url---------");
System.out.println("ws://"+baseUrl+String.format("/api/kernels/%s/channels",kernelobj.get("id")));
jupyterClientInfo.setKernelId(kernelobj.get("id").toString());
WebSocketClient webSocketClient = new WebSocketClient(new URI("ws://"+baseUrl+String.format("/api/kernels/%s/channels",kernelobj.get("id"))),new Draft_6455()) {
@SneakyThrows
@Override
public void onOpen(ServerHandshake serverHandshake) {
logger.info("[连接 jupyter server] 连接成功");
SocketMsgData socketMsgData=new SocketMsgData();
socketMsgData.setStatus(SocketMsgData.StatusEnum.ONOPEN.ordinal());
socketMsgData.setMsgData("[连接 jupyter server] 连接成功");
if(webSocketSession.isOpen()){
sendMessage(webSocketSession, JSONObject.toJSONString(socketMsgData).getBytes(StandardCharsets.UTF_8));
}
}
@SneakyThrows
@Override
public void onMessage(String message) {
logger.info("[jupyter client] 收到消息={}",message);
SocketMsgData socketMsgData=new SocketMsgData();
socketMsgData.setStatus(SocketMsgData.StatusEnum.ONMESSAGE.ordinal());
socketMsgData.setMsgData(message);
if(webSocketSession.isOpen()){
sendMessage(webSocketSession, JSONObject.toJSONString(socketMsgData).getBytes(StandardCharsets.UTF_8));
}
}
@SneakyThrows
@Override
public void onClose(int code, String reason, boolean remote) {
logger.info("[jupyter client] 退出连接");
SocketMsgData socketMsgData=new SocketMsgData();
socketMsgData.setStatus(SocketMsgData.StatusEnum.ONCLOSE.ordinal());
socketMsgData.setMsgData("[jupyter client] 退出连接");
if(webSocketSession.isOpen()){
sendMessage(webSocketSession, JSONObject.toJSONString(socketMsgData).getBytes(StandardCharsets.UTF_8));
}
}
@SneakyThrows
@Override
public void onError(Exception ex) {
logger.info("[jupyter client] 连接错误={}",ex.getMessage());
SocketMsgData socketMsgData=new SocketMsgData();
socketMsgData.setStatus(SocketMsgData.StatusEnum.ONCLOSE.ordinal());
socketMsgData.setMsgData("[jupyter client] 连接错误={}"+ex.getMessage());
if(webSocketSession.isOpen()){
sendMessage(webSocketSession, JSONObject.toJSONString(socketMsgData).getBytes(StandardCharsets.UTF_8));
}
}
};
webSocketClient.connect();
jupyterClientInfo.setWebSocketClient(webSocketClient);
}else{
throw new Exception("找不到kernel信息,请确认jupyter kernelgateway服务是否启动");
}
}
说一下java服务端大概逻辑,由于服务端是用来做转发的,所以它是websocket服务端也是websocket的客户端。初始化的时候,会把服务端session 和客户端实例保存到全局map对象里面,用的时候就从全局对象取出来就行。
package cn.objectspace.jupyter.form;
import com.alibaba.fastjson.JSONObject;
import com.jcraft.jsch.Channel;
import com.jcraft.jsch.JSch;
import lombok.Data;
import org.java_websocket.client.WebSocketClient;
import org.springframework.web.socket.WebSocketSession;
import java.util.UUID;
/**
* @Description: jupyter 连接信息
* @Author: zxf
* @Date: 2021/3/23
*/
@Data
public class JupyterClientInfo {
public JupyterClientInfo(){
//组装ws 信息
JSONObject contentJson=new JSONObject();
contentJson.put("code", "1==1");
contentJson.put("silent", false);
contentJson.put("store_history", false);
contentJson.put("user_expressions", new JSONObject());
contentJson.put("allow_stdin", false);
JSONObject hdrJson=new JSONObject();
hdrJson.put("msg_id", UUID.randomUUID().toString().replaceAll("-",""));
hdrJson.put("username", "dataOpen");
hdrJson.put("session", UUID.randomUUID().toString().replaceAll("-",""));
hdrJson.put("msg_type", "execute_request");
hdrJson.put("version","5.0");
JSONObject msgJson=new JSONObject();
msgJson.put("header", hdrJson);
msgJson.put("metadata", new JSONObject());
msgJson.put("parent_header", new JSONObject());
msgJson.put("buffers", new JSONObject());
msgJson.put("content", contentJson);
msgJson.put("channel","shell");
this.jupyerConent=contentJson;
this.jupyerMsg=msgJson;
}
//客户端连接
private WebSocketSession webSocketSession;
// 转发jupytersever 的客户端
private WebSocketClient webSocketClient;
//jupyer 消息体
private JSONObject jupyerMsg;
//jupyer 消息内容
private JSONObject jupyerConent;
//jupyter url
private String jupyerUrl;
//jupyter url
private String kernelId;
}
转发给jupyter的时候,需要消息组装。
private void transToJupyerServer(JupyterClientInfo jupyterClientInfo, String command) throws IOException {
WebSocketClient webSocketClient=jupyterClientInfo.getWebSocketClient();
if (webSocketClient != null) {
JSONObject msg=jupyterClientInfo.getJupyerMsg();
JSONObject content=jupyterClientInfo.getJupyerConent();
content.put("code",command);
msg.put("content",content);
JSONObject header=msg.getJSONObject("header");
header.put("msg_id",UUID.randomUUID().toString().replaceAll("-",""));
header.put("date",getISO8601Timestamp(new Date()));
msg.put("header",header);
logger.info("[消息发送] "+msg);
webSocketClient.send(JSONObject.toJSONString(msg));
}
}
再说一下注意事项,jupyter创建的内核断开连接的时候要注意主动回收,不回收的话python进程会一直存在。如果是多用户访问,那么每个用户都应该去单独创建维护内核 。这也是为什么采用三方通信的初衷,js去维护实在不靠谱!
整体方案大概就这样,如果有什么不明白的,留言吧。
最后展示一下效果。
|