add data-manager-backend

This commit is contained in:
CaiHQ 2021-12-23 00:10:28 +08:00
parent e94dfe3998
commit 6b8e2320be
4 changed files with 49 additions and 429 deletions

View File

@ -12,9 +12,7 @@ application {
dependencies {
implementation project(":front-base")
implementation project(":test-tool")
implementation project(":sdk-java")
// https://mvnrepository.com/artifact/com.jianggujin/IKAnalyzer-lucene
implementation 'com.jianggujin:IKAnalyzer-lucene:8.0.0'
// https://mvnrepository.com/artifact/org.apache.lucene/lucene-analyzers-common
@ -26,9 +24,7 @@ dependencies {
implementation 'org.apache.lucene:lucene-core:8.10.1'
// https://mvnrepository.com/artifact/org.apache.lucene/lucene-queryparser
implementation 'org.apache.lucene:lucene-queryparser:8.10.1'
testImplementation 'junit:junit:4.13.2'
implementation 'com.jcraft:jsch:0.1.55'
}

View File

@ -30,6 +30,9 @@ import org.bdware.server.nodecenter.*;
import org.bdware.server.ws.DelimiterCodec;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -46,6 +49,7 @@ public class NodeCenterServer {
public static ScheduledExecutorService scheduledThreadPool =
Executors.newScheduledThreadPool(8);
static SslContext sslContext = null;
public static URLClassLoader pluginLoader;
// static byte[] delimiter = "wonbifoodie".getBytes();
static {
@ -99,6 +103,14 @@ public class NodeCenterServer {
if (cmdConf.overwrite) {
cmdConf.write(CONFIG_PATH);
}
NodeCenterWSFrameHandler.wsPluginActions = parseStrAsList(cmdConf.wsPluginActions);
}
private static String[] parseStrAsList(String str) {
if (str == null) {
return new String[]{};
}
return str.split(",");
}
public static void main(String[] args) throws Exception {
@ -156,7 +168,25 @@ public class NodeCenterServer {
}
public static void startHttp(int port) {
File[] pluginJar = new File("./pluginLib/")
.listFiles(pathname -> pathname.getName().endsWith(".jar"));
URL[] urls;
if (pluginJar != null && pluginJar.length > 0) {
urls = new URL[pluginJar.length];
for (int i = 0; i < pluginJar.length; i++) {
try {
urls[i] = pluginJar[i].toURI().toURL();
LOGGER.info("add plugin:" + pluginJar[i].getName());
} catch (MalformedURLException e) {
e.printStackTrace();
}
}
} else {
urls = new URL[]{};
}
pluginLoader = new URLClassLoader(urls, NodeCenterServer.class.getClassLoader());
LOGGER.info("start at: " + port);
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {

View File

@ -22,7 +22,8 @@ import org.bdware.sc.http.HttpUtil;
import org.bdware.sc.util.FileUtil;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.action.Action;
import org.bdware.server.autoinstall.NodeConfig;
import org.bdware.server.http.HttpMethod;
import org.bdware.server.http.URIPath;
import org.bdware.server.permission.Role;
@ -30,7 +31,6 @@ import org.bouncycastle.crypto.CryptoException;
import org.bouncycastle.pqc.math.linearalgebra.ByteUtils;
import org.zz.gmhelper.SM2KeyPair;
import org.zz.gmhelper.SM2Util;
import org.bdware.server.SSHTool;
import org.bdware.client.SmartContractClient;
@ -49,8 +49,7 @@ public class FileActions {
static NodeCenterWSFrameHandler handler;
SmartContractClient clusterClient;
SmartContractClient nodeClient;
public FileActions(NodeCenterWSFrameHandler nodeCenterWSFrameHandler) {
handler = nodeCenterWSFrameHandler;
@ -479,423 +478,4 @@ public class FileActions {
List<FileItem> subFiles;
}
@Action(userPermission = 0)
public void initBDServer(JsonObject args, ResultCallback resultCallback) {
Response response = new Response();
JsonObject result = new JsonObject();
response.action = "onInitBDServer";
response.responseID = args.get("requestID").getAsString();
response.data = result;
// 登录参数
String host = args.get("host").getAsString();
int sshPort = 22;
String username = args.get("username").getAsString();
String password = args.get("password").getAsString();
String name = args.get("name").getAsString();
// 域参数
String sm2Key = args.get("sm2Key").getAsString();
String clusterHost = args.get("clusterHost").getAsString();
// bdserverzip的打开路径和目标服务器的解压路径
String defaultAgentPath = "/data/agent-datanet/";
String defaultBdserverPath = defaultAgentPath + "bdserver/";
String bdserverzip = "bdserver.zip";
String localPath = "./NodeCenterDB/";
String bdserverZipPath = localPath + bdserverzip;
// String bdserverZipPath = defaultAgentPath + bdserverzip;
try {
Session session = SSHTool.getSession(host, sshPort, username, password);
if (session != null) {
// 1.检查JDK version
String jdk = SSHTool.checkJDK(session);
result.addProperty("jdk", jdk);
LOGGER.info("[checkJDK done] " + jdk);
if (!jdk.isEmpty()) {
// 2.创建defaultAgentPath设置所有者为当前用户关闭可能正在运行的bdserver删除bdserver目录
String mkdirCmd = "mkdir -p " + defaultAgentPath + "; ";
String chownCmd = "chown -R " + session.getUserName() + " " + defaultAgentPath + "; ";
String cdCmd = "cd " + defaultBdserverPath + "; ";
String stopCmd = "bash ./cmstop.sh; ";
String rmCmd = "rm -rf " + defaultBdserverPath;
SSHTool.executeSh(session, mkdirCmd + chownCmd + cdCmd + stopCmd + rmCmd, true);
LOGGER.info("[mkdir done] " + defaultAgentPath);
// 3.检查zip相同则不传输
if (!SSHTool.testISTheSame(session, new File(bdserverZipPath).getAbsolutePath(), defaultAgentPath + bdserverzip)) {
SSHTool.copyLocalToRemote(session, new File(bdserverZipPath).getAbsolutePath(), defaultAgentPath, new SSHTool.CopyProgress() {
@Override
public void onProgress(long len, long total) {
Map<String, Object> progress = new HashMap<>();
progress.put("responseID", response.responseID);
if (total > 0)
progress.put("progress", String.format("%.2f", len * 100F / total));
else
progress.put("progress", -1);
progress.put("action", "onInitBDServer");
resultCallback.onResult(progress);
}
@Override
public void onFinish() {
onProgress(100, 100);
}
});
LOGGER.info("[copyLocalToRemote done] " + host);
}
// 4.解压配置ip启动
String unzipCmd = "unzip -q -d " + defaultBdserverPath + " -o " + defaultAgentPath + bdserverzip;
SSHTool.executeSh(session, unzipCmd);
LOGGER.info("[unzip done] " + host);
String startCmd = "echo $?; cd " + defaultBdserverPath + "; ";
startCmd += "cp cmconfig.json.template cmconfig.json; ";
startCmd += ("sed -i 's/\"ip\": \"127.0.0.1\"/\"ip\": \"" + host + "\"/g' cmconfig.json; ");
startCmd += "sh cmstart.sh";
SSHTool.executeSh(session, startCmd, false);
LOGGER.info("[cmstart.sh done] " + host);
// 5.cmstart后检测server是否已启动启动后用manager的Key去登录为节点配置参数
for (int count = 0; count < 10; ++count) {
try {
Thread.sleep(2000);
LOGGER.info("[Connecting agent... ] " + host);
Map<String, Object> httpResult = HttpUtil.httpGet("http://" + host + ":" + NodeConfig.nodePort);
if (httpResult.containsKey("responseCode") && Integer.valueOf(httpResult.get("responseCode").toString()) == 200) {
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
String nodeAddress = "ws://" + host + ":" + NodeConfig.nodePort + "/SCIDE/SCExecutor";
SM2KeyPair nodeKeyPair = SM2KeyPair.fromJson(sm2Key);
String clusterAddress = "ws://" + clusterHost + ":" + (NodeConfig.clusterPort + 1);
nodeClient =
new SmartContractClient(nodeAddress, nodeKeyPair) {
public void onLogin(JsonObject obj) {
LOGGER.info("[nodeClient onLogin] " + nodeKeyPair.toJson());
this.setName(name);
LOGGER.info("[setName] " + name);
String bdledgerAddr = "022.node.internetapi.cn:21121";
this.setBDLedger(bdledgerAddr);
this.setMasterAddress(host + ":" + (NodeConfig.nodePort + 1));
String apply =
"{\"action\":\"applyNodeRole\",\"role\":\"_ROLE_\",\"pubKey\":\"" + nodeKeyPair.getPublicKeyStr() + "\"}";
this.sendMsg(apply.replaceAll("_ROLE_", "ContractProvider"));
this.sendMsg(apply.replaceAll("_ROLE_", "ContractUser"));
this.sendMsg(apply.replaceAll("_ROLE_", "ContractInstanceManager"));
LOGGER.info("[applyNodeRole] " + apply);
this.setClusterAddress(clusterAddress);
LOGGER.info("[setClusterAddress] " + clusterAddress);
String yjsPath = defaultBdserverPath + "yjs.jar";
String setYjsPath = "{\"action\":\"changeYJSPath\",\"data\":\"" + yjsPath + "\"}";
this.sendMsg(setYjsPath);
LOGGER.info("[setYjsPath] " + yjsPath);
this.sendMsg("{\"action\":\"loadNodeConfig\"}");
}
@Action
public void onApplyRole(JsonObject obj) {
String data = obj.get("data").getAsString();
LOGGER.info("[onApplyRole] " + data);
if (data.equals("success")) {
String auth =
"{\"action\":\"authNodeRole\",\"pubKey\":\"" + nodeKeyPair.getPublicKeyStr() + "\",\"isAccept\":true}";
this.sendMsg(auth);
LOGGER.info("[authNodeRole] " + auth);
}
}
@Action
public void onLoadNodeConfig(JsonObject obj) {
JsonObject jsonObject = obj.get("data").getAsJsonObject();
String nodePubKey = jsonObject.get("nodePubKey").getAsString();
result.addProperty("nodePubKey", nodePubKey);
result.addProperty("status", "success");
resultCallback.onResult(JsonUtil.toJson(response));
LOGGER.info("[onLoadNodeConfig] " + jsonObject);
}
};
nodeClient.waitForConnect();
nodeClient.login();
result.addProperty("status", "success");
} else {
result.addProperty("status", "wrong jdk version");
resultCallback.onResult(JsonUtil.toJson(response));
}
} else {
result.addProperty("status", "Session failed");
resultCallback.onResult(JsonUtil.toJson(response));
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Action(userPermission = 0)
public void initBDCluster(JsonObject args, ResultCallback resultCallback) {
Response response = new Response();
JsonObject result = new JsonObject();
response.action = "onInitBDCluster";
response.responseID = args.get("requestID").getAsString();
// 登录参数
String host = args.get("host").getAsString();
int sshPort = 22;
String username = args.get("username").getAsString();
String password = args.get("password").getAsString();
String name = args.get("name").getAsString();
// 域参数
String sm2Key = args.get("sm2Key").getAsString();
JsonArray agents = args.get("agents").getAsJsonArray();
// bdclusterzip的打开路径和目标服务器的解压路径
String defaultManagerPath = "/data/manager-datanet/";
String defaultBDClusterPath = defaultManagerPath + "bdcluster/";
String bdclusterzip = "bdcluster.zip";
String localPath = "./NodeCenterDB/";
String bdclusterZipPath = localPath + bdclusterzip;
// String bdclusterZipPath = defaultManagerPath + bdclusterzip;
try {
Session session = SSHTool.getSession(host, sshPort, username, password);
if (session != null) {
// 1.检查JDK version
String jdk = SSHTool.checkJDK(session);
result.addProperty("jdk", jdk);
LOGGER.info("[checkJDK done] " + jdk);
if (!jdk.isEmpty()) {
// 2.创建defaultManagerPath设置所有者为当前用户关闭可能正在运行的bdcluster删除bdcluster目录
String mkdirCmd = "mkdir -p " + defaultManagerPath + "; ";
String chownCmd = "chown -R " + session.getUserName() + " " + defaultManagerPath + "; ";
String cdCmd = "cd " + defaultBDClusterPath + "; ";
String stopCmd = "bash ./ncstop.sh; ";
String rmCmd = "rm -rf " + defaultBDClusterPath;
SSHTool.executeSh(session, mkdirCmd + chownCmd + cdCmd + stopCmd + rmCmd, true);
LOGGER.info("[mkdir done] " + defaultManagerPath);
// 3.检查zip相同则不传输
if (!SSHTool.testISTheSame(session, new File(bdclusterZipPath).getAbsolutePath(), defaultManagerPath + bdclusterzip)) {
SSHTool.copyLocalToRemote(session, new File(bdclusterZipPath).getAbsolutePath(), defaultManagerPath, new SSHTool.CopyProgress() {
@Override
public void onProgress(long len, long total) {
Map<String, Object> progress = new HashMap<>();
progress.put("responseID", response.responseID);
if (total > 0)
progress.put("progress", String.format("%.2f", len * 100F / total));
else
progress.put("progress", -1);
progress.put("action", "onInitBDCluster");
resultCallback.onResult(progress);
}
@Override
public void onFinish() {
onProgress(100, 100);
}
});
LOGGER.info("[bdcluster.zip copyLocalToRemote done] " + host);
}
// 4.解压配置ip启动
String unzipCmd = "unzip -q -d " + defaultBDClusterPath + " -o " + defaultManagerPath + bdclusterzip;
SSHTool.executeSh(session, unzipCmd);
LOGGER.info("[unzip bdcluster.zip done] " + host);
String startCmd = "echo $?; cd " + defaultBDClusterPath + "; ";
startCmd += "cp ncconfig.json.template ncconfig.json; ";
startCmd += ("sed -i 's/\"ip\": \"127.0.0.1\"/\"ip\": \"" + host + "\"/g' ncconfig.json; ");
startCmd += "sh ncstart.sh";
SSHTool.executeSh(session, startCmd, false);
LOGGER.info("[ncstart.sh done] " + host);
// 5.部署成功后检测server是否已启动启动后登录为节点配置参数
for (int count = 0; count < 10; ++count) {
try {
Thread.sleep(2000);
LOGGER.info("[Connecting Manager... ] " + host);
Map<String, Object> httpResult = HttpUtil.httpGet("http://" + host + ":" + NodeConfig.clusterPort);
if (httpResult.containsKey("responseCode") && Integer.valueOf(httpResult.get("responseCode").toString()) == 200) {
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
// 6.启动cluster后发送YPK包并解压到NodeCenterDB
String ypkZip = "NC_YPKs.zip";
String ypkZipPath = localPath + ypkZip;
// String ypkZipPath = defaultManagerPath + ypkZip;
if (!SSHTool.testISTheSame(session, new File(ypkZipPath).getAbsolutePath(), defaultManagerPath + ypkZip)) {
SSHTool.copyLocalToRemote(session, new File(ypkZipPath).getAbsolutePath(), defaultManagerPath, new SSHTool.CopyProgress() {
@Override
public void onProgress(long len, long total) {
Map<String, Object> progress = new HashMap<>();
progress.put("responseID", response.responseID);
if (total > 0)
progress.put("progress", String.format("%.2f", len * 100F / total));
else
progress.put("progress", -1);
progress.put("action", "onInitBDCluster");
progress.put("operation", "copyYPK");
resultCallback.onResult(progress);
}
@Override
public void onFinish() {
onProgress(100, 100);
}
});
LOGGER.info("[NC_YPKs.zip copyLocalToRemote done] " + host);
}
String ypkUnzipPath = defaultBDClusterPath + "NodeCenterDB/NC_YPKs";
unzipCmd = "unzip -q -d " + ypkUnzipPath + " -o " + defaultManagerPath + ypkZip;
SSHTool.executeSh(session, unzipCmd);
LOGGER.info("[unzip NC_YPKs.zip done] " + host);
String clusterAddress = "ws://" + host + ":" + NodeConfig.clusterPort + "/NodeCenterWS";
SM2KeyPair clusterKeyPair = SM2KeyPair.fromJson(sm2Key);
clusterClient =
new SmartContractClient(clusterAddress, clusterKeyPair) {
int agentIndex = 0;
public void onLogin(JsonObject obj) {
LOGGER.info("[new clusterClient] " + clusterKeyPair.toJson());
String setClusterName = "{\"action\":\"setClusterName\",\"name\":\"" + name + "\"}";
sendMsg(setClusterName);
LOGGER.info("[setClusterName] " + name);
LOGGER.info("[initBDServer] start...");
if (agents.size() > 0) {
for (int i = 0; i < agents.size(); ++i) {
JsonObject agent = agents.get(i).getAsJsonObject();
agent.addProperty("action", "initBDServer");
agent.addProperty("requestID", response.responseID);
agent.addProperty("sm2Key", sm2Key);
agent.addProperty("clusterHost", host);
this.sendMsg(JsonUtil.toJson(agent));
LOGGER.info("[initBDServer] " + i + ": " + agent);
}
}
}
public void onInitBDServer(JsonObject obj) {
if (obj.has("progress")) {
String progress = obj.get("progress").getAsString();
LOGGER.info("[onInitBDServer agent] " + agentIndex + " " + progress);
}
obj.addProperty("agentIndex", agentIndex);
if (obj.has("data")) {
++agentIndex;
JsonObject data = obj.get("data").getAsJsonObject();
if (data.get("status").getAsString().equals("success")) {
// 根据节点的type分发ypk然后依次启动
LOGGER.info("[onInitBDServer] " + data);
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("action", "distributeYPK");
jsonObject.addProperty("requestID", response.responseID);
String projectName = "Router_1.0.ypk";
jsonObject.addProperty("projectName", projectName);
jsonObject.addProperty("pubKey", clusterKeyPair.getPublicKeyStr());
String content =
String.format("DistributeYPK|%s|%s", projectName, clusterKeyPair.getPublicKeyStr());
try {
String sig =
ByteUtils.toHexString(
SM2Util.sign(
clusterKeyPair.getPrivateKeyParameter(), content.getBytes()));
jsonObject.addProperty("signature", sig);
} catch (CryptoException e) {
e.printStackTrace();
}
String nodePubKey = data.get("nodePubKey").getAsString();
jsonObject.addProperty("nodeIDs", nodePubKey + ",");
try {
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
}
this.sendMsg(JsonUtil.toJson(jsonObject));
LOGGER.info("[DistributeYPK] " + jsonObject);
}
}
resultCallback.onResult(JsonUtil.toJson(obj));
}
public void onDistributeYPK(JsonObject obj) {
// 分发ypk成功后启动该合约
LOGGER.info("[onDistributeYPK]" + obj);
resultCallback.onResult(JsonUtil.toJson(obj));
if (obj.get("progress").getAsString().equals("100.00")) {
String nodeIP = obj.get("nodeIP").getAsString();
String nodeAddress = "ws://" + nodeIP + "/SCIDE/SCExecutor";
SmartContractClient nodeClient = new SmartContractClient(nodeAddress, clusterKeyPair) {
public void onLogin(JsonObject obj) {
LOGGER.info("[onDistributeYPK: nodeClient onLogin] " + nodeAddress);
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("action", "startContractByYPK");
jsonObject.addProperty("isPrivate", true);
jsonObject.addProperty("requestID", response.responseID);
jsonObject.addProperty("owner", clusterKeyPair.getPublicKeyStr());
String ypkName = "Router_1.0.ypk";
jsonObject.addProperty("path", "/" + ypkName);
String content =
String.format("Fixed|%s|%s", ypkName, getKeyPair().getPublicKeyStr());
try {
String sig =
ByteUtils.toHexString(
SM2Util.sign(
clusterKeyPair.getPrivateKeyParameter(), content.getBytes()));
jsonObject.addProperty("signature", sig);
} catch (CryptoException e) {
e.printStackTrace();
}
this.sendMsg(JsonUtil.toJson(jsonObject));
LOGGER.info("[startContractByYPK] " + jsonObject);
}
public void onStartContract(JsonObject obj) {
resultCallback.onResult(JsonUtil.toJson(obj));
String dataStr = obj.get("data").getAsString();
JsonObject data = JsonParser.parseString(dataStr).getAsJsonObject();
LOGGER.info("[onStartContract] " + data);
// 若router启动成功,调用合约set前缀接口
if (data.get("status").getAsString().equals("Success")) {
JsonObject jsonObject = new JsonObject();
String contractID = "id";
String operation = "set";
String arg = "prevCode";
jsonObject.addProperty("requestID", response.responseID);
jsonObject.addProperty("action", "executeContract");
jsonObject.addProperty("contractID", contractID);
jsonObject.addProperty("operation", operation);
jsonObject.addProperty("arg", arg);
jsonObject.addProperty("pubkey", "set");
String content = String.format("%s|%s|%s|%s", contractID, operation, arg, getKeyPair().getPublicKeyStr());
try {
String sig =
ByteUtils.toHexString(
SM2Util.sign(
clusterKeyPair.getPrivateKeyParameter(), content.getBytes()));
jsonObject.addProperty("signature", sig);
} catch (CryptoException e) {
e.printStackTrace();
}
this.sendMsg(JsonUtil.toJson(jsonObject));
LOGGER.info("[executeContract] " + jsonObject);
}
}
};
nodeClient.waitForConnect();
nodeClient.login();
}
}
}
;
clusterClient.waitForConnect();
clusterClient.login();
result.addProperty("status", "success");
} else {
result.addProperty("status", "wrong jdk version");
}
} else {
result.addProperty("status", "Session failed");
}
} catch (Exception e) {
e.printStackTrace();
}
response.data = result;
resultCallback.onResult(JsonUtil.toJson(response));
}
}
}

View File

@ -20,9 +20,12 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.bdware.server.NodeCenterServer.pluginLoader;
public class NodeCenterWSFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
private static final Logger LOGGER = LogManager.getLogger(NodeCenterWSFrameHandler.class);
public static String[] wsPluginActions;
StringBuilder dataCache = new StringBuilder();
ActionExecutor<ResultCallback, JsonObject> ae;
private ChannelHandlerContext ctx;
@ -65,8 +68,19 @@ public class NodeCenterWSFrameHandler extends SimpleChannelInboundHandler<WebSoc
} else return false;
}
};
for (String str : wsPluginActions) {
Object obj = createInstanceByClzName(str);
ae.appendHandler(obj);
}
}
private Object createInstanceByClzName(String clzName) {
try {
Class<?> clz = Class.forName(clzName, true, pluginLoader);
return clz.newInstance();
} catch (Exception e) {
return null;
}
}
public ActionExecutor getAE() {
return ae;
}