optimize file distribute

This commit is contained in:
CaiHQ 2022-09-02 21:28:31 +08:00
parent 0fd984d5fd
commit 6030bdf2ad
8 changed files with 154 additions and 97 deletions

View File

@ -77,8 +77,9 @@ public class DistributeCallback extends ResultCallback {
if (progress.equals("100.00")) index++;
Map<String, String> map = new HashMap<>();
map.put("action", "onDistributeYPK");
map.put("action", "onDistribute");
map.put("responseID", distributeID);
map.put("distributeID", distributeID);
map.put("content", JsonUtil.toJson(ret2));
map.put("nodeIP", nodeIP);
map.put("progress", progress);
@ -86,7 +87,7 @@ public class DistributeCallback extends ResultCallback {
}
public void onReceive(Map<String, String> map) {
LOGGER.debug("[DistributeCallback] onReceive : position----9");
LOGGER.info("[DistributeCallback] onReceive : position----9");
String progress = map.get("progress");
String nodeIP = map.get("ipPort");
if (progress.equals("100")) {
@ -95,15 +96,17 @@ public class DistributeCallback extends ResultCallback {
args.put("pubKey", pubKey);
args.put("signature", signature);
count++;
LOGGER.debug(count + "个节点已收完成" + " 总共有" + nodes.size() + " 个节点");
if (count == nodes.size()) {
LOGGER.info(count + "个节点已收完成" + " 总共有" + nodes.size() + " 个节点");
if (count >= nodes.size() - 1) {
// res返回给前端合约分发完成
LOGGER.info("个节点已收完成" + "send on finish!!!!!");
Map<String, String> ret2 = new HashMap<>();
ret2.put("action", "onDistributeFinish");
ret2.put("progress", "100%");
Map<String, String> map_send = new HashMap<>();
map_send.put("action", "onDistribute");
map_send.put("responseID", distributeID);
map_send.put("distributeID", distributeID);
map_send.put("over", "true");
map_send.put("nodeIP", nodeIP);
map_send.put("content", JsonUtil.toJson(ret2));
@ -118,7 +121,7 @@ public class DistributeCallback extends ResultCallback {
@Override
public void onResult(String str) {
LOGGER.debug("[DistributeCallback] str=" + str);
LOGGER.info("[DistributeCallback] str=" + str);
Map<String, String> map = JsonUtil.fromJson(str, new TypeToken<Map<String, String>>() {
}.getType());
@ -128,15 +131,19 @@ public class DistributeCallback extends ResultCallback {
String operation = map.get("operation");
switch (operation) {
//上传到nodecenter
case "NCreceive":
NCreceive(map.get("progress"));
break;
//开始分发给各个节点
case "distribute":
distributeContractProject(map.get("receiveFileName"), map.get("isPrivate"));
break;
//分发给各个节点
case "onDistribute":
onDistribute(map.get("progress"), map.get("nodeIP"));
break;
//各个节点返回收到
case "onReceive":
onReceive(map);
break;

View File

@ -130,7 +130,7 @@ public class LogActions {
@Action(userPermission = 1 << 9, async = true)
public void listNodes(JsonObject json, ResultCallback resultCallback) {
long start = System.currentTimeMillis();
final String pubKey = managerAction.pubKey;
final String pubKey = managerAction.getPubKey(json);
LOGGER.debug("[listNodes] managerAction.pubKey " + pubKey);
Map<String, CMNode> nodeinfos = NodeCenterActions.nodeInfos; // 所有在线节点?
final Map<String, CMNode> cinodeinfos = new HashMap<>();

View File

@ -28,11 +28,7 @@ import org.bdware.server.nodecenter.searchresult.ContractMeta;
import org.bdware.server.nodecenter.searchresult.ResultModel;
import org.wltea.analyzer.lucene.IKAnalyzer;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.io.InputStreamReader;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.ArrayList;
@ -105,9 +101,9 @@ public class MetaIndexAction { // public static IndexWriter indexWriter;
LOGGER.warn("getting index failed! " + e.getMessage());
}
}
req.addProperty("action", "requestReadMe");
req.addProperty("contractID", thisDesp.contractID);
rc.onResult(req);
// req.addProperty("action", "requestReadMe");
// req.addProperty("contractID", thisDesp.contractID);
// rc.onResult(req);
LOGGER.info("contract " + thisDesp.contractName + " --> actually to index");
}
if (null != indexReader) {

View File

@ -1,10 +1,8 @@
package org.bdware.server.nodecenter;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@ -22,13 +20,15 @@ import org.bdware.server.NodeCenterServer;
import org.bdware.server.action.Action;
import org.bdware.server.action.ActionExecutor;
import org.bdware.server.action.HttpResultCallback;
import org.bdware.server.http.HttpFileHandleAdapter;
import org.bdware.server.http.URIHandler;
import org.bdware.server.http.URIPath;
import org.bdware.server.http.HttpMethod;
import org.bdware.server.http.*;
import org.bdware.server.permission.Role;
import org.zz.gmhelper.SM2Util;
import java.io.*;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.PrintStream;
import java.net.URLDecoder;
import java.util.HashMap;
import java.util.List;
@ -43,9 +43,14 @@ public class NCHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
private static final String UNSUPPORTED_HTTP_METHOD = "{\"msg\":\"unsupported http method\"}";
private static final String UNSUPPORTED_ACTION = "{\"msg\":\"unsupported action\"}";
private static final Logger LOGGER = LogManager.getLogger(NCHttpHandler.class);
static NCManagerAction managerAction = new NCManagerAction(null);
static LogActions logActions = new LogActions(managerAction);
static UnitActions unitActions = new UnitActions(managerAction);
private static final ActionExecutor<ResultCallback, JsonObject> actionExecutor =
new ActionExecutor<ResultCallback, JsonObject>(
NodeCenterFrameHandler.executorService, new NodeCenterActions(null)) {
NodeCenterFrameHandler.executorService,
new NodeCenterActions(null),
managerAction, logActions, unitActions) {
@Override
public boolean checkPermission(Action a, JsonObject arg, long per) {
@ -119,42 +124,61 @@ public class NCHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
ctx.flush();
}
@URIPath({"/NodeCenter", "/SCIDE/SCExecutor", "/SCIDE/CMManager", "/SCIDE/SCManager"})
public void parseGetAndHandle(ChannelHandlerContext ctx, FullHttpRequest req) {
QueryStringDecoder decoderQuery = new QueryStringDecoder(req.uri());
Map<String, List<String>> params = decoderQuery.parameters();
JsonObject map = new JsonObject();
for (String key : params.keySet()) {
List<String> val = params.get(key);
if (val != null) map.addProperty(key, val.get(0));
public static class VerifiyCallback implements ArgParser.VerifiedCallback {
@Override
public void onResult(boolean verified, JsonObject transformedParam) {
LOGGER.info("verify signature: " + verified);
if (verified) {
// 查permission
String ret = getRole(transformedParam.get("pubKey").getAsString());
long permission;
if (ret != null && ret.length() > 0) {
permission = 0x86000d41L | Role.compoundValue(ret.split(","));
} else {
permission = Role.compoundValue(ret.split(","));
}
transformedParam.addProperty("permission", permission + "");
LOGGER.debug("[CMHttpHandler] http 请求查看用户权限为 : " + permission);
} else {
transformedParam.addProperty("permission", 0 + "");
}
String uri = URLDecoder.decode(req.uri()).split("\\?")[1];
int index = uri.lastIndexOf('&');
String str = uri;
if (index != -1) {
str = uri.substring(0, index);
}
injectPermission(map, str);
handleReq(map, ctx);
}
@URIPath(
method = org.bdware.server.http.HttpMethod.POST,
@URIPath(method = HttpMethod.OPTIONS)
public void crossOrigin(ChannelHandlerContext ctx, FullHttpRequest request) {
DefaultFullHttpResponse fullResponse =
new DefaultFullHttpResponse(
request.protocolVersion(),
OK,
Unpooled.wrappedBuffer("success".getBytes()));
fullResponse.headers().remove("Access-Control-Allow-Origin");
fullResponse.headers().remove("Access-Control-Allow-Headers");
fullResponse.headers().add("Access-Control-Allow-Origin", "*");
fullResponse.headers().add("Access-Control-Allow-Methods", "*");
fullResponse
.headers()
.add("Access-Control-Allow-Headers",
"Content-Type, Cookie, Accept-Encoding, User-Agent, Host, Referer, " +
"X-Requested-With, Accept, Accept-Language, Cache-Control, Connection");
ChannelFuture f = ctx.write(fullResponse);
f.addListener(ChannelFutureListener.CLOSE);
LOGGER.info("[Option] received!");
}
@URIPath({"/NodeCenter", "/SCIDE/SCExecutor", "/SCIDE/CMManager", "/SCIDE/SCManager"})
public void parseGetAndHandle(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
JsonObject transformedParam = ArgParser.parseGetAndVerify(req, new VerifiyCallback());
handleReq(transformedParam, ctx);
}
@URIPath(method = org.bdware.server.http.HttpMethod.POST,
value = {"/NodeCenter", "/SCIDE/SCExecutor", "/SCIDE/CMManager", "/SCIDE/SCManager"})
public void parsePostAndHandle(ChannelHandlerContext ctx, FullHttpRequest req)
throws UnsupportedEncodingException {
ByteBuf content = req.content();
JsonObject map =
JsonParser.parseReader(new InputStreamReader(new ByteBufInputStream(content)))
.getAsJsonObject();
// FIXME 感觉不太对劲
String uri = URLDecoder.decode(req.uri(), "utf-8").split("\\?")[1];
int index = uri.lastIndexOf('&');
String str = uri;
if (index != -1) {
str = uri.substring(0, index);
}
injectPermission(map, str);
throws Exception {
JsonObject map = ArgParser.parsePostAndVerify(req, new VerifiyCallback());
// injectPermission(map, str);
handleReq(map, ctx);
}
@ -189,6 +213,8 @@ public class NCHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
HttpVersion.HTTP_1_1,
OK,
Unpooled.wrappedBuffer(MetaIndexAction.search(map).getBytes()));
response.headers().add("Access-Control-Allow-Origin", "*");
response.headers().add("Access-Control-Allow-Methods", "*");
ChannelFuture f = ctx.write(response);
f.addListener(ChannelFutureListener.CLOSE);
@ -205,7 +231,7 @@ public class NCHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
}
}
public String getRole(String pubKey) {
public static String getRole(String pubKey) {
try {
String ret =
KeyValueDBUtil.instance.getValue(
@ -239,7 +265,6 @@ public class NCHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
String pubkey = map.get("pubKey").getAsString();
boolean verify = SM2Util.plainStrVerify(pubkey, str, map.get("sign").getAsString());
LOGGER.debug("[CMHttpHandler] HTTP POST 请求验签为 " + verify);
if (verify) {
// 查permission
String ret = getRole(pubkey);
@ -281,13 +306,15 @@ public class NCHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
cb = new HttpResultCallback(ctx, map.get("callback").getAsString());
cb.addHeader("charset", "utf-8");
cb.addHeader("Content-type", "text/plain");
cb.addHeader("Access-Control-Allow-Origin", "*");
cb.addHeader("Access-Control-Allow-Methods", "*");
} else {
if (map.has("callback"))
cb = new HttpResultCallback(ctx, map.get("callback").getAsString());
else cb = new HttpResultCallback(ctx, null);
cb.addHeader("Access-Control-Allow-Origin", "*");
cb.addHeader("Access-Control-Allow-Methods", "*");
cb.addHeader("charset", "utf-8");
cb.addHeader("Content-type", "application/json");
}
@ -296,6 +323,8 @@ public class NCHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, OK, Unpooled.wrappedBuffer(ret.getBytes()));
response.headers().add("Access-Control-Allow-Origin", "*");
response.headers().add("Access-Control-Allow-Methods", "*");
ChannelFuture f = ctx.write(response);
f.addListener(ChannelFutureListener.CLOSE);
}
@ -305,6 +334,8 @@ public class NCHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
HttpVersion.HTTP_1_1,
OK,
Unpooled.wrappedBuffer(e.getMessage().getBytes()));
response.headers().add("Access-Control-Allow-Origin", "*");
response.headers().add("Access-Control-Allow-Methods", "*");
ChannelFuture f = ctx.write(response);
f.addListener(ChannelFutureListener.CLOSE);
@ -318,6 +349,8 @@ public class NCHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
HttpVersion.HTTP_1_1,
OK,
Unpooled.wrappedBuffer(JsonUtil.toJson(ret).getBytes()));
response.headers().add("Access-Control-Allow-Origin", "*");
response.headers().add("Access-Control-Allow-Methods", "*");
ChannelFuture f = ctx.write(response);
f.addListener(ChannelFutureListener.CLOSE);
}

View File

@ -34,13 +34,22 @@ public class NCManagerAction {
SM2Util.DOMAIN_PARAMS);
private static final Logger LOGGER = LogManager.getLogger(NCManagerAction.class);
String sessionID = null;
String pubKey;
private String publicKey;
Random random = new Random();
private NodeCenterWSFrameHandler handler;
public NCManagerAction(NodeCenterWSFrameHandler nodeCenterWSFrameHandler) {
this.handler = nodeCenterWSFrameHandler;
pubKey = null;
publicKey = null;
}
public String getPubKey(JsonObject jo) {
try {
if (publicKey != null) return publicKey;
return jo.get("pubKey").getAsString();
} catch (Exception e) {
}
return null;
}
public static boolean isCenterManager(String pubkey) {
@ -104,6 +113,7 @@ public class NCManagerAction {
}
public void getRole(JsonObject json, ResultCallback resultCallback) {
String pubKey = getPubKey(json);
if (pubKey == null) {
simpleReply(resultCallback, "onLogin", Role.Anonymous.name());
return;
@ -156,7 +166,7 @@ public class NCManagerAction {
boolean result = SM2Util.plainStrVerify(pubKey, sessionID, signature);
LOGGER.debug("session:" + (sessionID));
if (result) {
this.pubKey = pubKey;
this.publicKey = pubKey;
LOGGER.debug("设置公钥" + pubKey);
getRole(json, resultCallback);
} else {
@ -168,6 +178,8 @@ public class NCManagerAction {
@Action(userPermission = 0)
public void applyRole(JsonObject json, ResultCallback resultCallback) {
String pubKey = getPubKey(json);
long start = System.currentTimeMillis();
if (pubKey == null) {
simpleReply(resultCallback, "onApplyRole", "missing pubKey");
@ -176,7 +188,7 @@ public class NCManagerAction {
if (json.has("role")) {
Role role = Role.parse(json.get("role").getAsString());
if (role != Role.Anonymous) {
applyAtDB(role, resultCallback, start);
applyAtDB(role, getPubKey(json), resultCallback, start);
return;
}
}
@ -201,7 +213,9 @@ public class NCManagerAction {
}
}
private void applyAtDB(Role role, ResultCallback resultCallback, long start) {
private void applyAtDB(Role role, String pubKey, ResultCallback resultCallback, long start) {
String str = KeyValueDBUtil.instance.getValue(NCTables.ApplyRole.toString(), pubKey);
String already = KeyValueDBUtil.instance.getValue(NCTables.NodeUser.toString(), pubKey);
if (already != null && already.contains(role.toString())) {
@ -241,6 +255,9 @@ public class NCManagerAction {
long start = System.currentTimeMillis();
String pubKey = json.get("pubKey").getAsString();
boolean isAccept = json.get("isAccept").getAsBoolean();
if (json.has("authorizedPubKey")) {
pubKey = json.get("authorizedPubKey").getAsString();
}
LOGGER.debug("[NCManagerAction] " + json.toString());
if (pubKey == null || pubKey.length() == 0) {
simpleReply(resultCallback, "onAuthNodeManager", "missing pubKey");

View File

@ -41,7 +41,7 @@ public class NodeCenterActions {
// TODO 定期清缓存
// public static Map<String, RequestCache> requestCache =
// new ConcurrentHashMap<>(); // key is contractID,only for requestAll type contract
TimerTask checkAliveTask;
// TimerTask checkAliveTask;
static Timer checkAliveTimer = new Timer();
public NodeCenterFrameHandler controller;
String nodeID; // node pubKey
@ -129,9 +129,9 @@ public class NodeCenterActions {
@Action
public void syncPing(JsonObject json, ResultCallback rc) {
// logger.debug("[NodeCenterAction] syncPing");
checkAliveTask.cancel();
checkAliveTask = new CheckAgentAliveTimer(nodeID);
checkAliveTimer.schedule(checkAliveTask, 20000L);
//checkAliveTask.cancel();
//checkAliveTask = new CheckAgentAliveTimer(nodeID);
//checkAliveTimer.schedule(checkAliveTask, 200000L);
if (json.has("data")) {
String data = json.get("data").getAsString();
LOGGER.debug("[syncPing] data" + data);
@ -229,8 +229,8 @@ public class NodeCenterActions {
r.data = "failed";
if (sessionID != null && SM2Util.plainStrVerify(pubkey, pubkey + sessionID, signature)) {
nodeID = pubkey;
checkAliveTask = new CheckAgentAliveTimer(nodeID);
checkAliveTimer.schedule(checkAliveTask, 20000L);
//checkAliveTask = new CheckAgentAliveTimer(nodeID);
//checkAliveTimer.schedule(checkAliveTask, 20000L);
controller.pubKey = nodeID;
@ -274,30 +274,30 @@ public class NodeCenterActions {
// TODO 如果是NC挂了其他节点重连之后需要另外考虑
// 向该合约发送请求令其查看自己崩溃前的集群合约
LOGGER.info("NodeCenter tells nodes new masters for contracts created before the crash!");
Map<String, String> request = new HashMap<>();
request.put("action", "queryUnitContractsID");
try {
CMNode conn = nodeInfos.get(pubkey);
NodeCenterActions connection = conn.connection;
NodeCenterFrameHandler connController = connection.controller;
connController.sendMsg(JsonUtil.toJson(request));
} catch (Exception e) {
e.printStackTrace();
}
Map<String, String> onNewNodeConnected = new HashMap<>();
onNewNodeConnected.put("action", "onNewNodeConnected");
onNewNodeConnected.put("pubKey", pubkey);
for (CMNode node : nodeInfos.values()) {
if (node.pubKey.equals(pubkey)) {
continue;
}
try {
node.connection.controller.sendMsg(JsonUtil.toJson(onNewNodeConnected));
} catch (Exception e) {
e.printStackTrace();
}
}
// LOGGER.info("NodeCenter tells nodes new masters for contracts created before the crash!");
// Map<String, String> request = new HashMap<>();
// request.put("action", "queryUnitContractsID");
// try {
//// CMNode conn = nodeInfos.get(pubkey);
//// NodeCenterActions connection = conn.connection;
//// NodeCenterFrameHandler connController = connection.controller;
//// connController.sendMsg(JsonUtil.toJson(request));
// } catch (Exception e) {
// e.printStackTrace();
// }
// Map<String, String> onNewNodeConnected = new HashMap<>();
// onNewNodeConnected.put("action", "onNewNodeConnected");
// onNewNodeConnected.put("pubKey", pubkey);
// for (CMNode node : nodeInfos.values()) {
// if (node.pubKey.equals(pubkey)) {
// continue;
// }
// try {
// // node.connection.controller.sendMsg(JsonUtil.toJson(onNewNodeConnected));
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
rc.onResult(JsonUtil.toJson(r));
}
@ -366,7 +366,7 @@ public class NodeCenterActions {
json.get("contracts"),
new TypeToken<List<ContractDesp>>() {
}.getType());
MetaIndexAction.updateContractsIndex(contracts, rc);
//MetaIndexAction.updateContractsIndex(contracts, rc);
LOGGER.debug("update contracts: " + json.get("contracts"));
int version = -1;
if (json.has("contractVersion")) {
@ -376,6 +376,8 @@ public class NodeCenterActions {
if (json.has("events")) {
node.events = json.get("events").getAsInt();
}
//just ignore recover
if (nodeInfos != null) return;
// 遍历所有节点
for (CMNode cmNode : nodeInfos.values()) {
if (cmNode.pubKey.equals(nodeID)) {
@ -1018,7 +1020,7 @@ public class NodeCenterActions {
if (project.isFile()) {
try {
FileInputStream fin = new FileInputStream(project);
byte[] buff = new byte[30 * 1024];
byte[] buff = new byte[500 * 1024];
long count = 0;
long preCount = 0;
long total = project.length();

View File

@ -47,7 +47,7 @@ public class NodeCenterWSFrameHandler extends SimpleChannelInboundHandler<WebSoc
// Permission userPermission = a.userPermission();
// long val=userPermission.getValue();
long val = a.userPermission();
String pubKey = managerAction.pubKey;
String pubKey = managerAction.getPubKey(null);
String action = arg.get("action").getAsString();
LOGGER.debug("permission" + permission);
LOGGER.debug("userPermission" + val);
@ -73,6 +73,7 @@ public class NodeCenterWSFrameHandler extends SimpleChannelInboundHandler<WebSoc
ae.appendHandler(obj);
}
}
private Object createInstanceByClzName(String clzName) {
try {
Class<?> clz = Class.forName(clzName, true, pluginLoader);
@ -81,12 +82,13 @@ public class NodeCenterWSFrameHandler extends SimpleChannelInboundHandler<WebSoc
return null;
}
}
public ActionExecutor getAE() {
return ae;
}
public String getPubKey() {
return managerAction.pubKey;
return managerAction.getPubKey(null);
}
@Override

View File

@ -45,7 +45,7 @@ public class UnitActions {
@Action(userPermission = 1 << 6, async = true)
public void listTrustUnits(JsonObject json, ResultCallback resultCallback) {
long start = System.currentTimeMillis();
final String pubKey = managerAction.pubKey;
final String pubKey = managerAction.getPubKey(json);
List<KV> allunits = KeyValueDBUtil.instance.getKeyValues(NCTables.TrustUnitsDB.toString());
if (pubKey != null
&& KeyValueDBUtil.instance
@ -73,7 +73,7 @@ public class UnitActions {
@Action(userPermission = 1 << 6, async = true)
public void createTrustUnit(JsonObject json, ResultCallback resultCallback) {
final String pubKey = managerAction.pubKey;
final String pubKey = managerAction.getPubKey(json);
LOGGER.debug("[createTrustUnit] " + json.get("data").toString());
KeyValueDBUtil.instance.setValue(
NCTables.TrustUnitsDB.toString(),