diff --git a/src/main/java/org/bdware/server/action/CMActions.java b/src/main/java/org/bdware/server/action/CMActions.java index 81d973d..06bda8d 100644 --- a/src/main/java/org/bdware/server/action/CMActions.java +++ b/src/main/java/org/bdware/server/action/CMActions.java @@ -43,11 +43,10 @@ public class CMActions implements OnHashCallback { "{\"action\":\"onExecuteResult\",\"executeTime\":-1," + "\"status\":\"Error\",\"result\":\"invalid contract doi\"}"); private static final Logger LOGGER = LogManager.getLogger(CMActions.class); - public static ContractManager manager = initManager(); public static FuncInvokeInfo FUNCINVOKEINFO = new FuncInvokeInfo(); // 合约调用时参数和结果 + public static ContractManager manager = initManager(); private static SecureRandom RANDOM; public ContractManagerFrameHandler handler; - public CMActions() { handler = null; } @@ -392,6 +391,28 @@ public class CMActions implements OnHashCallback { return str; } + public static void killContractByMaster( + String contractID, JsonObject request, ResultCallback rc) { + LOGGER.info("[MasterClientTCPAction] killContract : "); + try { + MasterClientTCPAction.killUnitContractMap.put( + request.get("requestID").getAsString(), + new KillUnitContractInfo(rc, System.currentTimeMillis())); + MultiContractMeta mcm = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); + NetworkManager.instance.sendToAgent(mcm.getMasterNode(), JsonUtil.toJson(request)); + } catch (Exception e) { + e.printStackTrace(); + } finally { + CMActions.manager.stopContractWithOwner( + request.get("verifiedPubKey").getAsString(), contractID); + ContractMeta meta = manager.statusRecorder.getContractMeta(contractID); + if (null != meta && meta.contractExecutor != null) { + //TODO why close? + // meta.contractExecutor.close(); + } + } + } + @Action(async = true) public void ping(JsonObject args, ResultCallback resultCallback) { ReplyUtil.simpleReply(resultCallback, "pong", null); @@ -1253,28 +1274,6 @@ public class CMActions implements OnHashCallback { } } - public static void killContractByMaster( - String contractID, JsonObject request, ResultCallback rc) { - LOGGER.info("[MasterClientTCPAction] killContract : "); - try { - MasterClientTCPAction.killUnitContractMap.put( - request.get("requestID").getAsString(), - new KillUnitContractInfo(rc, System.currentTimeMillis())); - MultiContractMeta mcm = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); - NetworkManager.instance.sendToAgent(mcm.getMasterNode(), JsonUtil.toJson(request)); - } catch (Exception e) { - e.printStackTrace(); - } finally { - CMActions.manager.stopContractWithOwner( - request.get("verifiedPubKey").getAsString(), contractID); - ContractMeta meta = manager.statusRecorder.getContractMeta(contractID); - if (null != meta && meta.contractExecutor != null) { - //TODO why close? - // meta.contractExecutor.close(); - } - } - } - @Action(userPermission = 1L << 26, async = true) public void queryContractInstanceDOI(JsonObject args, ResultCallback resultCallback) { long s = System.currentTimeMillis(); @@ -1295,6 +1294,58 @@ public class CMActions implements OnHashCallback { ExecutionManager.instance.updateLocalContractToNodeCenter(); } + @Action(userPermission = 1 << 16, async = true) + public void staticVerifyContract(JsonObject args, ResultCallback resultCallback) { + Contract c = new Contract(); + c.setID(args.get("contractid").getAsString()); + c.setType(ContractExecType.Sole); + c.setScript(args.get("script").getAsString()); + Map r = new HashMap<>(); + r.put("action", "onStaticVerifyResult"); + long start = System.currentTimeMillis(); + String path = args.get("path").getAsString(); + if (path.startsWith("/")) { + try { + String[] pp = path.split("/"); + StringBuilder parentPath = new StringBuilder(); + for (String s : pp) { + parentPath.append(s).append("/"); + } + String parPath = GlobalConf.instance.publicDir; + if (args.has("isPrivate") && args.get("isPrivate").getAsBoolean()) { + parPath = GlobalConf.instance.privateDir + "/" + handler.getPubKey(); + } + byte[] bb = + YJSPacker.pack(new File(parPath, parentPath.toString()).getAbsolutePath()); + File temp = File.createTempFile(pp[pp.length - 1], ".zip"); + FileOutputStream fout = new FileOutputStream(temp); + assert bb != null; + fout.write(bb); + fout.close(); + LOGGER.debug("StartContract, zipPath:" + temp.getAbsolutePath()); + // TODO script should encoded!! + c.setScript(temp.getAbsolutePath()); + LOGGER.debug(temp.getAbsolutePath()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + LOGGER.debug("verifyContract: " + JsonUtil.toJson(c)); + + r.put("data", staticVerify(c)); + + r.put("cid", c.getID()); + r.put("executeTime", System.currentTimeMillis() - start); + // GRPCPool.writeToChain(c.getOwner(), privKey, gson.toJson(r), + // json.getString("requestID")); + resultCallback.onResult(r); + } + + public String staticVerify(Contract c) { + return manager.staticVerify(c); + } + // @Action(userPermission = 1L << 26, async = true) // public void queryContractInstanceInfoByDOI(JsonObject args, ResultCallback resultCallback) // { @@ -1386,58 +1437,6 @@ public class CMActions implements OnHashCallback { * resultCallback.onResult(gson.toJson(r)); } */ - @Action(userPermission = 1 << 16, async = true) - public void staticVerifyContract(JsonObject args, ResultCallback resultCallback) { - Contract c = new Contract(); - c.setID(args.get("contractid").getAsString()); - c.setType(ContractExecType.Sole); - c.setScript(args.get("script").getAsString()); - Map r = new HashMap<>(); - r.put("action", "onStaticVerifyResult"); - long start = System.currentTimeMillis(); - String path = args.get("path").getAsString(); - if (path.startsWith("/")) { - try { - String[] pp = path.split("/"); - StringBuilder parentPath = new StringBuilder(); - for (String s : pp) { - parentPath.append(s).append("/"); - } - String parPath = GlobalConf.instance.publicDir; - if (args.has("isPrivate") && args.get("isPrivate").getAsBoolean()) { - parPath = GlobalConf.instance.privateDir + "/" + handler.getPubKey(); - } - byte[] bb = - YJSPacker.pack(new File(parPath, parentPath.toString()).getAbsolutePath()); - File temp = File.createTempFile(pp[pp.length - 1], ".zip"); - FileOutputStream fout = new FileOutputStream(temp); - assert bb != null; - fout.write(bb); - fout.close(); - LOGGER.debug("StartContract, zipPath:" + temp.getAbsolutePath()); - // TODO script should encoded!! - c.setScript(temp.getAbsolutePath()); - LOGGER.debug(temp.getAbsolutePath()); - } catch (Exception e) { - e.printStackTrace(); - } - } - - LOGGER.debug("verifyContract: " + JsonUtil.toJson(c)); - - r.put("data", staticVerify(c)); - - r.put("cid", c.getID()); - r.put("executeTime", System.currentTimeMillis() - start); - // GRPCPool.writeToChain(c.getOwner(), privKey, gson.toJson(r), - // json.getString("requestID")); - resultCallback.onResult(r); - } - - public String staticVerify(Contract c) { - return manager.staticVerify(c); - } - @Action(userPermission = 1L << 16, async = true) public void getControlFlowByFileName(JsonObject args, ResultCallback resultCallback) { String project = args.get("projectName").getAsString(); @@ -1933,4 +1932,8 @@ public class CMActions implements OnHashCallback { } } } + + + + } \ No newline at end of file diff --git a/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java b/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java index 3b0f011..722cb8f 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java @@ -44,7 +44,6 @@ public class MasterClientRecoverMechAction { private final Map transFileMap = new HashMap<>(); public MasterClientRecoverMechAction() { - } // 告知master自己需要恢复,携带之前的运行模式,如果是StableMode需要携带lastExeSeq信息 diff --git a/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java b/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java index b0ca462..df1c2f4 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java @@ -129,6 +129,142 @@ public class MasterClientTCPAction { return executor; } + public static void dealRequests(String contractID) { + LOGGER.info("dealRequests"); + + // If still in recovering state,don't dealRequests now + if (MasterClientRecoverMechAction.recoverSet.contains(contractID)) { + LOGGER.info("本地还没恢复完成,不执行请求!"); + return; + } + + MultiContractMeta cei; + + synchronized (cei = CMActions.manager.multiContractRecorder.createIfNotExist(contractID)) { + while (!cei.queue.isEmpty()) { + ContractRequest request = cei.queue.peek(); + // logger.info("此时队列为 "); + // cei.printSet(); + if (cei.isSequent(request.seq)) { + // request.setSeq(jo.get("seq").getAsString()); + // request.needSeq = jo.get("needSeq").getAsBoolean(); + long start = System.currentTimeMillis(); + cei.curExeSeq = request.seq; + LOGGER.info("调试位置 1 cei.curExeSeq=" + cei.curExeSeq); + boolean isMultiReq = request.getRequestID().endsWith("_mul"); + + if (isMultiReq) { + // 开始执行多点合约请求,需要将缓存的其他节点发来的同requestID的请求也触发调用 + LOGGER.info("开始执行多点合约请求,需要将缓存的其他节点发来的同requestID的请求也触发调用"); + MultiRequestInfo mri = + MultiRequestInfo.reqInfos.get(request.getRequestID()); + for (String uniID : mri.callbackMap.keySet()) { + LOGGER.info("触发 uniID=" + uniID); + MultiRequestInfo.exeMultiReq(request, uniID); + } + } + + String data2 = CMActions.manager.executeLocally(request, null); + + LOGGER.info("本地执行的结果为\n" + data2); + if (isMultiReq) { + MultiRequestInfo.reqInfos.get(request.getRequestID()).countIncrease(); + } + + Map ret = new HashMap<>(); + ret.put("action", "receiveTrustfullyResult"); + SM2KeyPair keyPair = GlobalConf.instance.keyPair; + ret.put("nodeID", keyPair.getPublicKeyStr()); + int seq = request.seq; + LOGGER.info("dealRequests 执行请求 " + seq); + + ret.put("responseID", cei.uniReqIDMap.get(seq)); + ret.put("executeTime", (System.currentTimeMillis() - start) + ""); + ret.put("data", data2); + LOGGER.info("返回 uniID=" + cei.uniReqIDMap.get(seq) + " 的结果"); + cei.resultMap.get(seq).onResult(JsonUtil.toJson(ret)); + cei.setLastExeSeq(seq); + if (KeyValueDBUtil.instance.containsKey( + CMTables.LastExeSeq.toString(), contractID)) { + KeyValueDBUtil.instance.setValue( + CMTables.LastExeSeq.toString(), contractID, seq + ""); + } + + // ledger检查点 public static final int LEDGER_PERIOD = 100; //账本检查点 + if (seq % 100 == 0) { + LOGGER.info("遇到ledger检查点 seq=" + cei.getLastExeSeq()); + boolean isMaster = CMActions.manager.getContractIsMaster(contractID); + if (isMaster) { + String lastHash; + if (KeyValueDBUtil.instance.containsKey( + CMTables.CheckPointLastHash.toString(), contractID)) { + lastHash = + KeyValueDBUtil.instance.getValue( + CMTables.CheckPointLastHash.toString(), contractID); + } else { + lastHash = "firstCheckPoint"; + } + Map map = new HashMap<>(); + String state = CMActions.manager.dumpContract(contractID, ""); + map.put("state", state); + map.put("lastHash", lastHash); + String data = JsonUtil.toJson(map); + String requestID = + contractID + "_" + "checkPoint_" + new Random().nextInt(); + CheckPointCallback cb = new CheckPointCallback(); + ContractManager.checkPointToLedger(cb, contractID, data, requestID); + } else { + LOGGER.info("遇到检查点,但非master!"); + } + } + + cei.queue.poll(); + cei.uniReqIDMap.remove(seq); + cei.resultMap.remove(seq); + } else if (cei.getLastExeSeq() >= request.seq) { + // delete already done requests + ContractRequest temp = cei.queue.peek(); + while (!cei.queue.isEmpty() && cei.getLastExeSeq() >= temp.seq) { + // logger.info("队列中弹出请求 " + cei.queue.peek().getSeq()); + int seq = temp.seq; + String responseID = cei.uniReqIDMap.get(seq); + ResultCallback cb = cei.resultMap.get(seq); + LOGGER.info("dealRequests 执行过期请求 " + seq); + if (responseID != null && cb != null) { + Map ret = new HashMap<>(); + ret.put("action", "receiveTrustfullyResult"); + SM2KeyPair keyPair = GlobalConf.instance.keyPair; + ret.put("nodeID", keyPair.getPublicKeyStr()); + ret.put("responseID", responseID); + ContractResult cr = + new ContractResult( + ContractResult.Status.Error, + new JsonPrimitive(ComponedContractResult.EXPIRED_REQ)); + ret.put("data", JsonUtil.toJson(cr)); + cb.onResult(JsonUtil.toJson(ret)); + } + cei.queue.poll(); + + cei.uniReqIDMap.remove(seq); + cei.resultMap.remove(seq); + temp = cei.queue.peek(); + } + } else { + // ask master to re-send requests + LOGGER.info("send request to get cachedRequests"); + Map ret = new HashMap<>(); + ret.put("action", "sendCachedRequests"); + ret.put("contractID", contractID); + SM2KeyPair keyPair = GlobalConf.instance.keyPair; + ret.put("nodeID", keyPair.getPublicKeyStr()); + ret.put("start", cei.getLastExeSeq() + ""); + ret.put("end", request.seq + ""); // 左开右开区间 + NetworkManager.instance.sendToAgent(cei.getMasterNode(), JsonUtil.toJson(ret)); + break; + } + } + } + } @Action(async = true) public void onKillContractProcess(JsonObject jo, ResultCallback result) { @@ -145,13 +281,15 @@ public class MasterClientTCPAction { } } - // kill 本地的该集群合约实例 @Action(async = true) public void killContractProcessAtSlave(JsonObject jo, ResultCallback result) { String id; - if (jo.has("id")) id = jo.get("id").getAsString(); - else id = jo.get("name").getAsString(); + if (jo.has("id")) { + id = jo.get("id").getAsString(); + } else { + id = jo.get("name").getAsString(); + } ContractClient cc = CMActions.manager.getClient(id); @@ -185,6 +323,7 @@ public class MasterClientTCPAction { map.put("requestID", jo.get("requestID").getAsString()); map.put("data", ret); result.onResult(JsonUtil.toJson(map)); + } @Action(async = true) @@ -441,144 +580,6 @@ public class MasterClientTCPAction { } } - public static void dealRequests(String contractID) { - LOGGER.info("dealRequests"); - - // If still in recovering state,don't dealRequests now - if (MasterClientRecoverMechAction.recoverSet.contains(contractID)) { - LOGGER.info("本地还没恢复完成,不执行请求!"); - return; - } - - MultiContractMeta cei; - - synchronized (cei = CMActions.manager.multiContractRecorder.createIfNotExist(contractID)) { - while (!cei.queue.isEmpty()) { - ContractRequest request = cei.queue.peek(); - // logger.info("此时队列为 "); - // cei.printSet(); - if (cei.isSequent(request.seq)) { - // request.setSeq(jo.get("seq").getAsString()); - // request.needSeq = jo.get("needSeq").getAsBoolean(); - long start = System.currentTimeMillis(); - cei.curExeSeq = request.seq; - LOGGER.info("调试位置 1 cei.curExeSeq=" + cei.curExeSeq); - boolean isMultiReq = request.getRequestID().endsWith("_mul"); - - if (isMultiReq) { - // 开始执行多点合约请求,需要将缓存的其他节点发来的同requestID的请求也触发调用 - LOGGER.info("开始执行多点合约请求,需要将缓存的其他节点发来的同requestID的请求也触发调用"); - MultiRequestInfo mri = - MultiRequestInfo.reqInfos.get(request.getRequestID()); - for (String uniID : mri.callbackMap.keySet()) { - LOGGER.info("触发 uniID=" + uniID); - MultiRequestInfo.exeMultiReq(request, uniID); - } - } - - String data2 = CMActions.manager.executeLocally(request, null); - - LOGGER.info("本地执行的结果为\n" + data2); - if (isMultiReq) { - MultiRequestInfo.reqInfos.get(request.getRequestID()).countIncrease(); - } - - Map ret = new HashMap<>(); - ret.put("action", "receiveTrustfullyResult"); - SM2KeyPair keyPair = GlobalConf.instance.keyPair; - ret.put("nodeID", keyPair.getPublicKeyStr()); - int seq = request.seq; - LOGGER.info("dealRequests 执行请求 " + seq); - - ret.put("responseID", cei.uniReqIDMap.get(seq)); - ret.put("executeTime", (System.currentTimeMillis() - start) + ""); - ret.put("data", data2); - LOGGER.info("返回 uniID=" + cei.uniReqIDMap.get(seq) + " 的结果"); - cei.resultMap.get(seq).onResult(JsonUtil.toJson(ret)); - cei.setLastExeSeq(seq); - if (KeyValueDBUtil.instance.containsKey( - CMTables.LastExeSeq.toString(), contractID)) { - KeyValueDBUtil.instance.setValue( - CMTables.LastExeSeq.toString(), contractID, seq + ""); - } - - // ledger检查点 public static final int LEDGER_PERIOD = 100; //账本检查点 - if (seq % 100 == 0) { - LOGGER.info("遇到ledger检查点 seq=" + cei.getLastExeSeq()); - boolean isMaster = CMActions.manager.getContractIsMaster(contractID); - if (isMaster) { - String lastHash; - if (KeyValueDBUtil.instance.containsKey( - CMTables.CheckPointLastHash.toString(), contractID)) { - lastHash = - KeyValueDBUtil.instance.getValue( - CMTables.CheckPointLastHash.toString(), contractID); - } else { - lastHash = "firstCheckPoint"; - } - Map map = new HashMap<>(); - String state = CMActions.manager.dumpContract(contractID, ""); - map.put("state", state); - map.put("lastHash", lastHash); - String data = JsonUtil.toJson(map); - String requestID = - contractID + "_" + "checkPoint_" + new Random().nextInt(); - CheckPointCallback cb = new CheckPointCallback(); - ContractManager.checkPointToLedger(cb, contractID, data, requestID); - } else { - LOGGER.info("遇到检查点,但非master!"); - } - } - - cei.queue.poll(); - cei.uniReqIDMap.remove(seq); - cei.resultMap.remove(seq); - } else if (cei.getLastExeSeq() >= request.seq) { - // delete already done requests - ContractRequest temp = cei.queue.peek(); - while (!cei.queue.isEmpty() && cei.getLastExeSeq() >= temp.seq) { - // logger.info("队列中弹出请求 " + cei.queue.peek().getSeq()); - int seq = temp.seq; - String responseID = cei.uniReqIDMap.get(seq); - ResultCallback cb = cei.resultMap.get(seq); - LOGGER.info("dealRequests 执行过期请求 " + seq); - if (responseID != null && cb != null) { - Map ret = new HashMap<>(); - ret.put("action", "receiveTrustfullyResult"); - SM2KeyPair keyPair = GlobalConf.instance.keyPair; - ret.put("nodeID", keyPair.getPublicKeyStr()); - ret.put("responseID", responseID); - ContractResult cr = - new ContractResult( - ContractResult.Status.Error, - new JsonPrimitive(ComponedContractResult.EXPIRED_REQ)); - ret.put("data", JsonUtil.toJson(cr)); - cb.onResult(JsonUtil.toJson(ret)); - } - cei.queue.poll(); - - cei.uniReqIDMap.remove(seq); - cei.resultMap.remove(seq); - temp = cei.queue.peek(); - } - } else { - // ask master to re-send requests - LOGGER.info("send request to get cachedRequests"); - Map ret = new HashMap<>(); - ret.put("action", "sendCachedRequests"); - ret.put("contractID", contractID); - SM2KeyPair keyPair = GlobalConf.instance.keyPair; - ret.put("nodeID", keyPair.getPublicKeyStr()); - ret.put("start", cei.getLastExeSeq() + ""); - ret.put("end", request.seq + ""); // 左开右开区间 - NetworkManager.instance.sendToAgent(cei.getMasterNode(), JsonUtil.toJson(ret)); - break; - } - } - } - } - - @Action(async = true) public void receiveContractExecutionServer(JsonObject jsonObject, ResultCallback resultCallback) { MasterServerTCPAction.sync.wakeUp( diff --git a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java index 1f8dff7..30a6d1e 100644 --- a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java +++ b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java @@ -164,7 +164,7 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { String reqStr = req.toString(); for (String node : nodes) { if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) - == RecoverFlag.Fine) { + == RecoverFlag.Fine) { NetworkManager.instance.sendToAgent(node, reqStr); } }