diff --git a/src/main/java/org/bdware/server/action/CMActions.java b/src/main/java/org/bdware/server/action/CMActions.java index 94423c9..d1ee020 100644 --- a/src/main/java/org/bdware/server/action/CMActions.java +++ b/src/main/java/org/bdware/server/action/CMActions.java @@ -21,7 +21,9 @@ import org.bdware.server.GRPCPool; import org.bdware.server.GlobalConf; import org.bdware.server.action.p2p.MasterClientTCPAction; import org.bdware.server.trustedmodel.AgentManager; +import org.bdware.server.trustedmodel.ContractExecutor; import org.bdware.server.trustedmodel.KillUnitContractInfo; +import org.bdware.server.trustedmodel.SelfAdaptiveShardingExecutor; import org.bdware.server.ws.ContractManagerFrameHandler; import org.bdware.units.NetworkManager; import org.bdware.units.function.CommunicationManager; @@ -51,7 +53,6 @@ public class CMActions implements OnHashCallback { public static FuncInvokeInfo FUNCINVOKEINFO = new FuncInvokeInfo(); // 合约调用时参数和结果 private static SecureRandom RANDOM; public ContractManagerFrameHandler handler; - public CMActions() { handler = null; } @@ -532,6 +533,16 @@ public class CMActions implements OnHashCallback { } } + @Action(async = true, userPermission = 0) + public void checkBlocks(JsonObject args, final ResultCallback rcb) { + ContractExecutor executor = CMActions.manager.getExecutor(args.get("contractID").getAsString()); + try { + ReplyUtil.simpleReply(rcb, "onCheckBlocks", ((SelfAdaptiveShardingExecutor) executor).checkCache()); + } catch (Exception e) { + ReplyUtil.simpleReply(rcb, "onCheckBlocks", "error! " + e.getMessage()); + } + } + // 节点管理者 @Action(userPermission = 1L << 19) public void listAllContractProcess(JsonObject args, ResultCallback resultCallback) { @@ -1584,97 +1595,6 @@ public class CMActions implements OnHashCallback { // rc.onResult(peers); } - // @Action(userPermission = 1L << 26, async = true) - // public void queryContractInstanceInfoByDOI(JsonObject args, ResultCallback resultCallback) - // { - // long s = System.currentTimeMillis(); - // String ret; - // try { - // String contractDOI = args.get("doi").getAsString(); - // DigitalObject contractDO; - // DoipClient doipClient = - // DoipClient.createByRepoUrlAndMsgFmt( - // DOIPMainServer.repoUrl, DoipMessageFormat.PACKET.getName()); - // DoMessage response = doipClient.retrieve(contractDOI, null, null); - // if (response.parameters.response == DoResponse.Success) { - // contractDO = DigitalObject.parse(response.body); - // } else { - // DoMessage resp = DOAClient.getGlobalInstance().retrieve(contractDOI, null, - // null); - // contractDO = DigitalObject.parse(resp.body); - // } - // ContractInstanceDO contractInstanceDO = - // (ContractInstanceDO) - // ContractManager.toObject(contractDO.elements.get(0).getData()); - // // Dictionary contractInfo = JsonUtil.fromJson(new - // // String(contractDO.getData()), new Hashtable().getClass()); - // - // ret = - // String.format( - // "Contract ID: %s\nContract PublicKey: %s", - // contractInstanceDO.id, contractInstanceDO.publicKey); - // } catch (Exception e) { - // ByteArrayOutputStream bo = new ByteArrayOutputStream(); - // e.printStackTrace(new PrintStream(bo)); - // ret = bo.toString(); - // } - // Map r = new HashMap<>(); - // r.put("action", "onQueryContractInstanceInfoByDOI"); - // r.put("data", ret); - // r.put("executeTime", System.currentTimeMillis() - s); - // resultCallback.onResult(JsonUtil.toJson(r)); - // if (client != null && client.controller != null) { - // client.controller.updateContract(); - // } - // } - - /* - * @Action(userPermission = 1 << 19, async = true) public void - * staticVerify(JsonObject args, ResultCallback resultCallback) { Map r = new HashMap<>(); r.put("action", "onStaticVerifyResult"); - * - * long start = System.currentTimeMillis(); Map ret = new - * HashMap<>(); Contract c = new Contract(); // c.setType(Type.Algorithm); - * c.setType(ContractType.Sole); ret.put("action", "onStartContract"); String path = - * null; if (args.has("path")) path = args.get("path").getAsString(); if (path - * != null && path.startsWith("/")) c.setScript(args.get("path").getAsString()); - * else c.setScript(args.get("script").getAsString()); if - * (args.has("publicKey")) { c.setOwner(args.get("publicKey").getAsString()); - * c.setSignature(args.get("signature").getAsString()); } else if - * (args.has("owner")) { c.setOwner(args.get("owner").getAsString()); - * c.setSignature(args.get("signature").getAsString()); - * - * } else { c.setOwner(GlobalConf.instance.keyPair.getPublicKeyStr()); - * c.doSignature(GlobalConf.instance.keyPair); } - * - * if (!c.verifySignature()) { ret.put("data", "verify failed"); - * resultCallback.onResult(gson.toJson(ret)); return; } if (path != null && - * path.startsWith("/")) { String parPath; if (args.has("isPrivate") && - * args.get("isPrivate").getAsBoolean()) { parPath = - * GlobalConf.instance.privateDir + "/" + handler.pubKey; } else { parPath = - * GlobalConf.instance.publicDir; } try { String[] pp = path.split("/"); String - * parentPath = path; for (int i = 0; i < pp.length && i < 2; i++) { parentPath - * += pp[i] + "/"; } System.out.println("[CMActions] pack Dir, from:" + path + - * " --> " + parentPath); byte[] bb = YJSPacker.pack(new File(parPath, - * parentPath).getAbsolutePath()); File temp = File.createTempFile(pp[pp.length - * - 1], ".zip"); FileOutputStream fout = new FileOutputStream(temp); - * fout.write(bb); fout.close(); System.out.println("StartContract, zipPath:" + - * temp.getAbsolutePath()); // TODO script should encoded!! - * c.setScript(temp.getAbsolutePath()); } catch (Exception e) { - * e.printStackTrace(); } } - * - * System.out.println("[CMActions] verifyContract: " + gson.toJson(c)); - * r.put("data", manager.staticVerify(c)); r.put("cid", c.getID()); - * r.put("executeTime", System.currentTimeMillis() - start); // - * GRPCPool.writeToChain(c.getOwner(), privKey, gson.toJson(r), // - * json.getString("requestID")); - * - * //addLocalContractLog("staticVerify", c.getID(), path.split("/")[1], - * c.getOwner(),null); - * - * resultCallback.onResult(gson.toJson(r)); } - */ - @Action(async = true, userPermission = 0L) public void updateNodeUnits(JsonObject args, ResultCallback rc) { LOGGER.debug("updateNodeUnits"); @@ -1842,6 +1762,97 @@ public class CMActions implements OnHashCallback { resultCallback.onResult(ret); } + // @Action(userPermission = 1L << 26, async = true) + // public void queryContractInstanceInfoByDOI(JsonObject args, ResultCallback resultCallback) + // { + // long s = System.currentTimeMillis(); + // String ret; + // try { + // String contractDOI = args.get("doi").getAsString(); + // DigitalObject contractDO; + // DoipClient doipClient = + // DoipClient.createByRepoUrlAndMsgFmt( + // DOIPMainServer.repoUrl, DoipMessageFormat.PACKET.getName()); + // DoMessage response = doipClient.retrieve(contractDOI, null, null); + // if (response.parameters.response == DoResponse.Success) { + // contractDO = DigitalObject.parse(response.body); + // } else { + // DoMessage resp = DOAClient.getGlobalInstance().retrieve(contractDOI, null, + // null); + // contractDO = DigitalObject.parse(resp.body); + // } + // ContractInstanceDO contractInstanceDO = + // (ContractInstanceDO) + // ContractManager.toObject(contractDO.elements.get(0).getData()); + // // Dictionary contractInfo = JsonUtil.fromJson(new + // // String(contractDO.getData()), new Hashtable().getClass()); + // + // ret = + // String.format( + // "Contract ID: %s\nContract PublicKey: %s", + // contractInstanceDO.id, contractInstanceDO.publicKey); + // } catch (Exception e) { + // ByteArrayOutputStream bo = new ByteArrayOutputStream(); + // e.printStackTrace(new PrintStream(bo)); + // ret = bo.toString(); + // } + // Map r = new HashMap<>(); + // r.put("action", "onQueryContractInstanceInfoByDOI"); + // r.put("data", ret); + // r.put("executeTime", System.currentTimeMillis() - s); + // resultCallback.onResult(JsonUtil.toJson(r)); + // if (client != null && client.controller != null) { + // client.controller.updateContract(); + // } + // } + + /* + * @Action(userPermission = 1 << 19, async = true) public void + * staticVerify(JsonObject args, ResultCallback resultCallback) { Map r = new HashMap<>(); r.put("action", "onStaticVerifyResult"); + * + * long start = System.currentTimeMillis(); Map ret = new + * HashMap<>(); Contract c = new Contract(); // c.setType(Type.Algorithm); + * c.setType(ContractType.Sole); ret.put("action", "onStartContract"); String path = + * null; if (args.has("path")) path = args.get("path").getAsString(); if (path + * != null && path.startsWith("/")) c.setScript(args.get("path").getAsString()); + * else c.setScript(args.get("script").getAsString()); if + * (args.has("publicKey")) { c.setOwner(args.get("publicKey").getAsString()); + * c.setSignature(args.get("signature").getAsString()); } else if + * (args.has("owner")) { c.setOwner(args.get("owner").getAsString()); + * c.setSignature(args.get("signature").getAsString()); + * + * } else { c.setOwner(GlobalConf.instance.keyPair.getPublicKeyStr()); + * c.doSignature(GlobalConf.instance.keyPair); } + * + * if (!c.verifySignature()) { ret.put("data", "verify failed"); + * resultCallback.onResult(gson.toJson(ret)); return; } if (path != null && + * path.startsWith("/")) { String parPath; if (args.has("isPrivate") && + * args.get("isPrivate").getAsBoolean()) { parPath = + * GlobalConf.instance.privateDir + "/" + handler.pubKey; } else { parPath = + * GlobalConf.instance.publicDir; } try { String[] pp = path.split("/"); String + * parentPath = path; for (int i = 0; i < pp.length && i < 2; i++) { parentPath + * += pp[i] + "/"; } System.out.println("[CMActions] pack Dir, from:" + path + + * " --> " + parentPath); byte[] bb = YJSPacker.pack(new File(parPath, + * parentPath).getAbsolutePath()); File temp = File.createTempFile(pp[pp.length + * - 1], ".zip"); FileOutputStream fout = new FileOutputStream(temp); + * fout.write(bb); fout.close(); System.out.println("StartContract, zipPath:" + + * temp.getAbsolutePath()); // TODO script should encoded!! + * c.setScript(temp.getAbsolutePath()); } catch (Exception e) { + * e.printStackTrace(); } } + * + * System.out.println("[CMActions] verifyContract: " + gson.toJson(c)); + * r.put("data", manager.staticVerify(c)); r.put("cid", c.getID()); + * r.put("executeTime", System.currentTimeMillis() - start); // + * GRPCPool.writeToChain(c.getOwner(), privKey, gson.toJson(r), // + * json.getString("requestID")); + * + * //addLocalContractLog("staticVerify", c.getID(), path.split("/")[1], + * c.getOwner(),null); + * + * resultCallback.onResult(gson.toJson(r)); } + */ + @Action(async = true) public void askMasterElectTimeRecorder(JsonObject args, ResultCallback resultCallback) { String data = "null"; @@ -1953,6 +1964,4 @@ public class CMActions implements OnHashCallback { } } } - - } \ No newline at end of file diff --git a/src/main/java/org/bdware/server/action/MasterWSAction.java b/src/main/java/org/bdware/server/action/MasterWSAction.java index b5a936c..fe12551 100644 --- a/src/main/java/org/bdware/server/action/MasterWSAction.java +++ b/src/main/java/org/bdware/server/action/MasterWSAction.java @@ -186,8 +186,7 @@ public class MasterWSAction { .collect(Collectors.toSet()); // } - SM2KeyPair keyPair = GlobalConf.instance.keyPair; - String masterNode = keyPair.getPublicKeyStr(); + String masterNode = GlobalConf.instance.keyPair.getPublicKeyStr(); nodeNames.add(masterNode); int nodeSize = nodeNames.size(); @@ -237,7 +236,7 @@ public class MasterWSAction { nodeSize); MasterServerTCPAction.sync.sleepWithTimeout(requestID, collector, 20); Map request = new HashMap<>(); - request.put("master", keyPair.getPublicKeyStr()); + request.put("master", masterNode); if (args.has("isPrivate")) { request.put("isPrivate", args.get("isPrivate").getAsString()); } diff --git a/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java b/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java index c624343..c95ad9f 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java @@ -24,8 +24,6 @@ import org.bdware.server.trustedmodel.KillUnitContractResultCollector; import org.bdware.server.trustedmodel.ResultCollector; import org.bdware.units.NetworkManager; -import java.text.SimpleDateFormat; -import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -132,15 +130,10 @@ public class MasterServerTCPAction { ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID); MultiContractMeta ret = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); - System.out.println( - new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") - .format(new Date(System.currentTimeMillis())) - + " [MasterServerTCPAction] getCMInfo: " - + meta.getName() - + " " - + meta.getID() - + " " - + meta.getStatus()); + LOGGER.debug(String.format("getCMInfo: %s %s %s", + meta.getName(), + meta.getID(), + meta.getStatus())); return ret; } @@ -239,7 +232,7 @@ public class MasterServerTCPAction { ContractMeta contractMeta = CMActions.manager.statusRecorder.getContractMeta(contractID); if (contractMeta == null || contractMeta.getStatus() == KILLED) { - LOGGER.debug("send ReRoute response:" + cr.toString()); + LOGGER.debug("send ReRoute response: " + cr); JsonObject result = new JsonObject(); result.addProperty("action", "reRouteContract"); result.addProperty("responseID", cr.get("requestID").getAsString()); @@ -255,7 +248,9 @@ public class MasterServerTCPAction { // null") + // "\n"); - if (info != null && contractMeta.contract.getType() != ContractExecType.Sharding) { + if (info != null && + ContractExecType.Sharding != contractMeta.contract.getType() + && !ContractExecType.SelfAdaptiveSharding.equals(contractMeta.contract.getType())) { // 这个是个多节点的合约 // Just forward it to the correct Node // Master节点直接发3个,聚合后返回结果。 @@ -296,7 +291,7 @@ public class MasterServerTCPAction { } }, null); } else { - LOGGER.debug("send ReRoute response:" + cr.toString()); + LOGGER.debug("send ReRoute response:" + cr); JsonObject result = new JsonObject(); result.addProperty("action", "reRouteContract"); result.addProperty("responseID", cr.get("requestID").getAsString()); diff --git a/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientController.java b/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientController.java index b8cf84d..6c84d24 100644 --- a/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientController.java +++ b/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientController.java @@ -209,8 +209,9 @@ public class NodeCenterClientController implements NodeCenterConn { public void updateNonMasters(JsonObject jo, ResultCallback cb) { String[] contracts = jo.get("contracts").getAsString().split(","); for (String id : contracts) { - LOGGER.info("旧的master设置合约 " + id + " 自己不再是master"); - CMActions.manager.setContractIsMaster(id, false + ""); + if (null != CMActions.manager.setContractIsMaster(id, String.valueOf(false))) { + LOGGER.warn("master of contract " + id + " changes"); + } } } @@ -331,8 +332,8 @@ public class NodeCenterClientController implements NodeCenterConn { @Override public String routeContract(String contractID) { - LOGGER.info("[CMClientController] routeContract : " + contractID); - LOGGER.info("contractID2Pubkey.contractsKey=" + contractID2PubKey.containsKey(contractID)); + LOGGER.debug("[CMClientController] routeContract : " + contractID); + LOGGER.debug("contractID2Pubkey.contractsKey=" + contractID2PubKey.containsKey(contractID)); // TODO RouteContract是不是?IRP协议/DOIP协议? if (contractID2PubKey.containsKey(contractID)) { return contractID2PubKey.get(contractID); @@ -519,93 +520,6 @@ public class NodeCenterClientController implements NodeCenterConn { queryNCRepoDOI(json, result); } - - class ReceiveFileThread extends Thread { - private final Map fileMap = new HashMap<>(); - - ReceiveFileThread() { - super(); - this.start(); - } - - public void run() { - for (; ; ) { - if (receiveQueue.size() > 0) { - try { - JsonObject jo = receiveQueue.poll(); - receiveProject(jo); - } catch (Exception e) { - e.printStackTrace(); - } - } else { - synchronized (ReceiveFileThread.this) { - try { - this.wait(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - } - - } - - public void receiveProject(JsonObject args) { - - String fileName = args.get("fileName").getAsString(); - boolean isAppend = args.get("isAppend").getAsBoolean(); - boolean isDone = args.get("isDone").getAsBoolean(); - boolean isPrivate = args.get("isPrivate").getAsBoolean(); - LOGGER.debug( - String.format("isAppend=%b isDone=%b isPrivate=%b", isAppend, isDone, isPrivate)); - String path = GlobalConf.instance.publicCompiledDir; - if (isPrivate && args.has("pubKey")) { - path = GlobalConf.instance.privateCompiledDir + "/" + args.get("pubKey").getAsString(); - } - File dir = new File(path); - if (!dir.exists()) { - LOGGER.debug("mkdir " + dir.getAbsoluteFile() + ": " + dir.mkdirs()); - } - FileOutputStream fout = null; - if (!isAppend) { - try { - fout = new FileOutputStream(new File(dir, fileName)); - fileMap.put(fileName, fout); - } catch (FileNotFoundException e) { - e.printStackTrace(); - } - } else { - fout = fileMap.get(fileName); - } - if (isDone) { - if (fout != null) - try { - fout.close(); - fileMap.remove(fileName); - } catch (IOException e) { - e.printStackTrace(); - } - LOGGER.debug("receive finish."); - Map req = new HashMap<>(); - req.put("action", "onReceiveProject"); - req.put("requestID", args.get("requestID").getAsString()); - req.put("nodeID", nodeID); - req.put("progress", "100"); - NetworkManager.instance.sendToNodeCenter(JsonUtil.toJson(req)); - } else { - String data = args.get("data").getAsString(); - byte[] byteData = ByteUtil.decodeBASE64(data); - try { - if (null != fout && null != byteData) { - fout.write(byteData); - } - } catch (IOException e) { - e.printStackTrace(); - } - } - } - } - @Action(async = false) public synchronized void receiveProject(JsonObject args, final ResultCallback rc) { try { @@ -758,6 +672,11 @@ public class NodeCenterClientController implements NodeCenterConn { } } + @Action(async = true) + public void onDistributeYPK(JsonObject json, ResultCallback rc) { + onDistribute(json, rc); + } + @Action(async = true) public void transferInstance(JsonObject jo, ResultCallback result) { LOGGER.info("transferInstance"); @@ -831,4 +750,90 @@ public class NodeCenterClientController implements NodeCenterConn { } } } + + class ReceiveFileThread extends Thread { + private final Map fileMap = new HashMap<>(); + + ReceiveFileThread() { + super(); + this.start(); + } + + public void run() { + for (; ; ) { + if (receiveQueue.size() > 0) { + try { + JsonObject jo = receiveQueue.poll(); + receiveProject(jo); + } catch (Exception e) { + e.printStackTrace(); + } + } else { + synchronized (ReceiveFileThread.this) { + try { + this.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + } + + public void receiveProject(JsonObject args) { + + String fileName = args.get("fileName").getAsString(); + boolean isAppend = args.get("isAppend").getAsBoolean(); + boolean isDone = args.get("isDone").getAsBoolean(); + boolean isPrivate = args.get("isPrivate").getAsBoolean(); + LOGGER.debug( + String.format("isAppend=%b isDone=%b isPrivate=%b", isAppend, isDone, isPrivate)); + String path = GlobalConf.instance.publicCompiledDir; + if (isPrivate && args.has("pubKey")) { + path = GlobalConf.instance.privateCompiledDir + "/" + args.get("pubKey").getAsString(); + } + File dir = new File(path); + if (!dir.exists()) { + LOGGER.debug("mkdir " + dir.getAbsoluteFile() + ": " + dir.mkdirs()); + } + FileOutputStream fout = null; + if (!isAppend) { + try { + fout = new FileOutputStream(new File(dir, fileName)); + fileMap.put(fileName, fout); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } + } else { + fout = fileMap.get(fileName); + } + if (isDone) { + if (fout != null) + try { + fout.close(); + fileMap.remove(fileName); + } catch (IOException e) { + e.printStackTrace(); + } + LOGGER.debug("receive finish."); + Map req = new HashMap<>(); + req.put("action", "onReceiveProject"); + req.put("requestID", args.get("requestID").getAsString()); + req.put("nodeID", nodeID); + req.put("progress", "100"); + NetworkManager.instance.sendToNodeCenter(JsonUtil.toJson(req)); + } else { + String data = args.get("data").getAsString(); + byte[] byteData = ByteUtil.decodeBASE64(data); + try { + if (null != fout && null != byteData) { + fout.write(byteData); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } } diff --git a/src/main/java/org/bdware/server/trustedmodel/AgentManager.java b/src/main/java/org/bdware/server/trustedmodel/AgentManager.java index 5b949c5..99d1ce3 100644 --- a/src/main/java/org/bdware/server/trustedmodel/AgentManager.java +++ b/src/main/java/org/bdware/server/trustedmodel/AgentManager.java @@ -33,7 +33,7 @@ public class AgentManager implements AgentPeerManagerIntf { @Override public void executeByOtherNodeAsync(String pubKey, ContractRequest c, ResultCallback cb) { - LOGGER.error(pubKey + " " + c.getRequestID() + " " + c.getContractID() + " cb:" + cb); + LOGGER.debug(pubKey + " " + c.getRequestID() + " " + c.getContractID() + " cb:" + cb); int maxMasterProxyLoad = CongestionControl.masterProxyLoad.incrementAndGet(); if (maxMasterProxyLoad > CongestionControl.maxMasterProxyLoad) CongestionControl.maxMasterProxyLoad = maxMasterProxyLoad; diff --git a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java index f4f2785..d97dac5 100644 --- a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java +++ b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java @@ -1,5 +1,6 @@ package org.bdware.server.trustedmodel; +import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; import org.apache.logging.log4j.LogManager; @@ -13,15 +14,16 @@ import org.bdware.sc.bean.SM2Verifiable; import org.bdware.sc.conn.OnHashCallback; import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.units.MultiContractMeta; -import org.bdware.sc.units.RecoverFlag; import org.bdware.sc.util.HashUtil; import org.bdware.sc.util.JsonUtil; import org.bdware.server.action.CMActions; -import org.bdware.server.action.p2p.MasterServerRecoverMechAction; import org.bdware.units.NetworkManager; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -29,66 +31,76 @@ import java.util.stream.Collectors; */ public class SelfAdaptiveShardingExecutor implements ContractExecutor { private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class); - private static final int SUBMIT_LIMIT = 1024; + private static final int SUBMIT_LIMIT = 5120; private static final int DELAY = 1; private final Queue reqQueue = new ConcurrentLinkedQueue<>(); private final MultiContractMeta meta; private final Map toExecuted = new ConcurrentHashMap<>(); private final Set executedBlocks = ConcurrentHashMap.newKeySet(); private final Map executedTxs = new ConcurrentHashMap<>(); - private final Object flag = new Object(); + private final Object executorFlag = new Object(); private final ScheduledFuture future; private boolean running = true; + // the pointer to the latest executed block + private String executorPointer = "0"; + // the block to be submitted private Block b = new Block(); public SelfAdaptiveShardingExecutor(String contractID) { - this.meta = - CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); + this.meta = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); this.future = ContractManager.scheduledThreadPool.scheduleWithFixedDelay( this::submitBlock, DELAY, DELAY, TimeUnit.SECONDS); - LOGGER.debug(String.format("ContractManager.threadPool=%d/%d", - ((ThreadPoolExecutor) ContractManager.threadPool).getActiveCount(), - ((ThreadPoolExecutor) ContractManager.threadPool).getPoolSize())); ContractManager.threadPool.execute(() -> { - LOGGER.info( - "[SelfAdaptiveShardingExecutor " + meta.getContractID() + "] starting service..." + running); + LOGGER.info(String.format( + "[Executor %s] starting executing service... %b", + meta.getContractID(), meta.isMaster())); while (running) { - LOGGER.info("checking blocks to be executed, latest block=" + - this.b.prevHash + ", to be executed size=" + toExecuted.size()); + LOGGER.info(String.format( + "[Executor %s] checking blocks to be executed, latest block=%s, to be executed size=%d", + meta.getContractID(), executorPointer, toExecuted.size())); LOGGER.debug("executed: " + JsonUtil.toJson(executedBlocks) + "\n\t" + JsonUtil.toJson(executedTxs)); while (!toExecuted.isEmpty()) { - String key = this.b.prevHash; - Block block = toExecuted.get(key); - if (null != block) { - executeBlock(block); + Block block = toExecuted.get(executorPointer); + // check if the execution ends + if (null == block) { + break; } - toExecuted.remove(key); + executeBlock(block); + toExecuted.remove(executorPointer); + executorPointer = block.hash; } - synchronized (flag) { - try { - flag.wait(); - } catch (InterruptedException e) { - LOGGER.warn(String.format( - "[SelfAdaptiveShardingExecutor %s] waiting is interrupted: %s", - meta.getContractID(), - e.getMessage())); + try { + synchronized (executorFlag) { + executorFlag.wait(); } + } catch (InterruptedException e) { + LOGGER.warn(String.format( + "[Executor %s] waiting is interrupted: %s", + meta.getContractID(), e.getMessage())); } } }); } + public JsonArray checkCache() { + JsonArray ret = new JsonArray(); + ret.add(executorPointer); + for (Map.Entry entry : toExecuted.entrySet()) { + ret.add(String.format("%s,%s,%d", entry.getKey(), entry.getValue().hash, entry.getValue().height)); + } + return ret; + } + @Override public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) { // check client ContractClient client = CMActions.manager.getClient(meta.getContractID()); if (null == client) { LOGGER.error("contract " + meta.getContractID() + " not found!"); - rcb.onResult(JsonUtil.toJson(new ContractResult( - ContractResult.Status.Error, + rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Error, new JsonPrimitive("contract " + meta.getContractID() + " not found!")))); return; } @@ -96,12 +108,10 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction()); if (null == funDesp) { LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!"); - rcb.onResult(JsonUtil.toJson(new ContractResult( - ContractResult.Status.Error, - new JsonPrimitive( - String.format("action %s of contract %s not found!", - req.getAction(), - meta.getContractID()))))); + rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Error, + new JsonPrimitive(String.format( + "action %s of contract %s not found!", + req.getAction(), meta.getContractID()))))); return; } // for view function, execute it @@ -111,18 +121,21 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { } // normal function, check if it is in blocks if (executedTxs.containsKey(requestID)) { - rcb.onResult(JsonUtil.toJson(new ContractResult( - ContractResult.Status.Error, + rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Error, new JsonPrimitive("this request has been packed!")))); return; } + // forward to master + if (!meta.isMaster()) { + CMActions.manager.executeContractOnOtherNodes(req, rcb); + return; + } // add blocks into request cache LOGGER.debug("receive contract request " + requestID); executedTxs.put(requestID, false); reqQueue.add(req); - rcb.onResult(JsonUtil.toJson(new ContractResult( - ContractResult.Status.Executing, - new JsonPrimitive("this request is adding into blocks")))); + rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Executing, + new JsonPrimitive("this request is added into blocks")))); // if cache is full, submit if (reqQueue.size() >= SUBMIT_LIMIT) { ContractManager.threadPool.execute(this::submitBlock); @@ -137,17 +150,18 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { LOGGER.info("destruct executor of contract " + meta.getContractID()); } - public void execute(String blockStr) { + public void receiveBlock(String blockStr) { Block block = JsonUtil.fromJson(blockStr, Block.class); // the block must have not been cached or executed, and must be valid + boolean valid = block.isValid(); if (!toExecuted.containsKey(block.prevHash) && !executedBlocks.contains(block.hash) && - block.isValid()) { + valid) { // add block into block cache LOGGER.info(String.format( - "[SelfAdaptiveShardingExecutor %s] receive block %s -> %s," + - " %d transactions, timestamp=%d, size=%d", + "[Executor %s] receive block [%d] %s -> %s, %d transactions, timestamp=%d, size=%d", meta.getContractID(), + block.height, block.hash, block.prevHash, block.requests.length, @@ -155,9 +169,16 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { blockStr.length())); toExecuted.put(block.prevHash, block); // notify thread to execute blocks - synchronized (flag) { - flag.notify(); + synchronized (executorFlag) { + executorFlag.notify(); } + } else { + LOGGER.warn(String.format("receive block [%d] %s failed! checking: cached %b executed %b valid %b", + block.height, + block.hash, + toExecuted.containsKey(block.prevHash), + executedBlocks.contains(block.hash), + valid)); } } @@ -176,41 +197,48 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { for (ContractRequest request : block.requests) { String ret = CMActions.manager.executeLocally(request, null); LOGGER.debug(String.format( - "[SelfAdaptiveShardingExecutor %s] result of request %s: %s", - meta.getContractID(), - request.getRequestID(), - ret)); + "[Executor %s] result of request %s: %s", + meta.getContractID(), request.getRequestID(), ret)); executedTxs.put(request.getRequestID(), true); } LOGGER.info(String.format( - "[SelfAdaptiveShardingExecutor %s] execute %d transactions of block %s", - meta.getContractID(), - block.requests.length, - block.hash)); + "[Executor %s] execute %d transactions of block [%d] %s", + meta.getContractID(), block.requests.length, block.height, block.hash)); // TODO create check point - this.b = new Block(block.hash, this.b.height + 1); +// this.b = new Block(block.hash, this.b.height + 1); executedBlocks.add(block.hash); } private void submitBlock() { Block block = fillBlock(); if (null != block) { - LOGGER.info("deliver block " + block.hash + "..."); LOGGER.debug(JsonUtil.toPrettyJson(block)); String[] nodes = this.meta.getMembers(); + LOGGER.debug(JsonUtil.toJson(nodes)); JsonObject req = new JsonObject(); + String blockStr = JsonUtil.toJson(block); req.addProperty("action", "deliverBlock"); - req.addProperty("data", JsonUtil.toJson(block)); + req.addProperty("data", blockStr); req.addProperty("contractID", this.meta.getContractID()); String reqStr = req.toString(); // deliver blocks + LOGGER.info("deliver block " + block.hash + "..."); + String myself = CMActions.manager.nodeCenterConn.getNodeId(); for (String node : nodes) { - if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) - == RecoverFlag.Fine) { + // TODO: find dead lock here +// if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) +// == RecoverFlag.Fine) { +// LOGGER.info("deliver block " + block.hash + " to node " + node); +// NetworkManager.instance.sendToAgent(node, reqStr); +// } + if (!Objects.equals(myself, node)) { NetworkManager.instance.sendToAgent(node, reqStr); } +// LOGGER.info("delivering done: " + node); } + this.receiveBlock(blockStr); } +// LOGGER.info("end " + (null != block)); } private synchronized Block fillBlock() { @@ -222,8 +250,10 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { for (int i = 0; i < requests.length; ++i) { requests[i] = reqQueue.poll(); } - this.b.fillBlock(requests); - return this.b; + Block block = this.b; + block.fillBlock(requests); + this.b = new Block(block.hash, block.height + 1); + return block; } static class Block extends SM2Verifiable { @@ -254,15 +284,19 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { } public boolean isValid() { - return computeHash().equals(hash) && body.equals(merkle(this.requests)) && verifySignature(); + boolean hashValid = computeHash().equals(hash), +// bodyValid = body.equals(merkle(this.requests)), + bodyValid = true, + signValid = verifySignature(); + boolean ret = hashValid & bodyValid & signValid; + if (!ret) { + LOGGER.warn(String.format("hash %b body %b sign %b", hashValid, bodyValid, signValid)); + } + return ret; } private String computeHash() { - return HashUtil.sha3( - String.valueOf(this.height), - this.prevHash, - this.checkPoint, - this.body); + return HashUtil.sha3(String.valueOf(this.height), this.prevHash, this.checkPoint, this.body); } private String merkle(ContractRequest[] requests) { @@ -273,19 +307,19 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { if (requests.length == 1) { return HashUtil.sha3(requests[0].getRequestID()); } - Queue reqQueue = + Queue merkleQueue = Arrays.stream(requests).map(ContractRequest::getRequestID) .collect(Collectors.toCollection(ArrayDeque::new)); do { int size; - for (size = reqQueue.size(); size > 1; size -= 2) { - reqQueue.add(HashUtil.sha3(reqQueue.poll(), reqQueue.poll())); + for (size = merkleQueue.size(); size > 1; size -= 2) { + merkleQueue.add(HashUtil.sha3(merkleQueue.poll(), merkleQueue.poll())); } if (size == 1) { - reqQueue.add(reqQueue.poll()); + merkleQueue.add(merkleQueue.poll()); } - } while (1 != reqQueue.size()); - return reqQueue.poll(); + } while (1 != merkleQueue.size()); + return merkleQueue.poll(); } @Override