diff --git a/build.gradle b/build.gradle index 5f93783..1f3b7b5 100644 --- a/build.gradle +++ b/build.gradle @@ -34,6 +34,7 @@ dependencies { implementation project(":cm") implementation project(":mockjava") implementation project(":front-base") + implementation project(":consistency-sdk") implementation 'io.prometheus:simpleclient_httpserver:0.12.0' implementation 'org.knowhowlab.osgi:sigar:1.6.5_01' implementation 'io.grpc:grpc-all:1.43.1' diff --git a/src/main/java/org/bdware/server/CMHttpServer.java b/src/main/java/org/bdware/server/CMHttpServer.java index 9251d88..4f7369d 100644 --- a/src/main/java/org/bdware/server/CMHttpServer.java +++ b/src/main/java/org/bdware/server/CMHttpServer.java @@ -26,6 +26,7 @@ import org.bdware.sc.db.CMTables; import org.bdware.sc.db.KeyValueDBUtil; import org.bdware.sc.db.MultiIndexTimeRocksDBUtil; import org.bdware.sc.util.ExceptionUtil; +import org.bdware.sdk.consistency.ConsistencyPluginManager; import org.bdware.server.action.FileActions; import org.bdware.server.doip.ContractRepositoryMain; import org.bdware.server.http.CMHttpHandler; @@ -251,6 +252,7 @@ public class CMHttpServer { * port: http & websocket port port+1: tcp port port+2: doip port port+3: prometheus */ private void start() { + ConsistencyPluginManager.setContext(new SDKContext()); // EpollEventLoopGroup // EpollServerSocketChannel // ContractManager.reconnectPort = (port - 18000) * 30 + 1630; diff --git a/src/main/java/org/bdware/server/SDKContext.java b/src/main/java/org/bdware/server/SDKContext.java new file mode 100644 index 0000000..e741176 --- /dev/null +++ b/src/main/java/org/bdware/server/SDKContext.java @@ -0,0 +1,113 @@ +package org.bdware.server; + +import org.bdware.sc.ContractManager; +import org.bdware.sc.conn.ResultCallback; +import org.bdware.sc.units.RecoverFlag; +import org.bdware.sc.units.RequestCache; +import org.bdware.sdk.consistency.api.context.*; +import org.bdware.server.action.CMActions; +import org.bdware.server.action.SyncResult; +import org.bdware.server.action.p2p.MasterServerRecoverMechAction; +import org.bdware.server.action.p2p.MasterServerTCPAction; +import org.bdware.server.trustedmodel.ResultCollector; +import org.bdware.units.NetworkManager; +import org.zz.gmhelper.SM2KeyPair; + +import java.util.Map; + +public class SDKContext implements ISDKContext { + IMasterServerTCPAction masterServerTCPActionProxy = new MasterServerTCPActionProxy(); + + INetworkManager networkManagerProxy = new NetworkManagerProxy(); + + ICMActions cmActionsProxy = new CMActionsProxy(); + + IMasterServerRecoverMechAction masterServerRecoverMechActionProxy = new MasterServerRecoverMechActionProxy(); + + IGlobalConf globalConfProxy = new GlobalProxy(); + + @Override + public IMasterServerTCPAction getMasterServerTCPAction() { + return masterServerTCPActionProxy; + } + + @Override + public INetworkManager getNetworkManager() { + return networkManagerProxy; + } + + @Override + public ICMActions getCMActions() { + return cmActionsProxy; + } + + @Override + public IMasterServerRecoverMechAction getMasterServerRecoverMechAction() { + return masterServerRecoverMechActionProxy; + } + + @Override + public IGlobalConf getGlobalConf() { + return globalConfProxy; + } + + public static class MasterServerTCPActionProxy implements IMasterServerTCPAction { + @Override + public SyncResult getSync() { + return MasterServerTCPAction.sync; + } + + @Override + public Map getReqCache() { + return MasterServerTCPAction.requestCache; + } + } + + public static class NetworkManagerProxy implements INetworkManager { + @Override + public void sendToAgent(String pubkey, String content) { + NetworkManager.instance.sendToAgent(pubkey, content); + } + + @Override + public boolean hasAgentConnection(String pubkey) { + return NetworkManager.instance.hasAgentConnection(pubkey); + } + + @Override + public ResultCallback createResultCallback(String requestID, ResultCallback rc, int count) { + return new ResultCollector(requestID, rc, count); + } + } + + public static class CMActionsProxy implements ICMActions { + @Override + public ContractManager getManager() { + return CMActions.manager; + } + } + + public static class MasterServerRecoverMechActionProxy implements IMasterServerRecoverMechAction { + @Override + public Map> getRecoverStatusMap() { + return MasterServerRecoverMechAction.recoverStatus; + } + + @Override + public void restartContractFromCommonMode(String nodeID, String contractID) { + MasterServerRecoverMechAction.restartContractFromCommonMode(nodeID, contractID); + } + } + + public static class GlobalProxy implements IGlobalConf { + @Override + public String getNodeID() { + return GlobalConf.getNodeID(); + } + + @Override + public SM2KeyPair getKeyPair() { + return GlobalConf.instance.keyPair; + } + } +} diff --git a/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java b/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java index d245e13..322aac2 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java @@ -19,8 +19,6 @@ import org.bdware.server.GlobalConf; import org.bdware.server.action.Action; import org.bdware.server.action.CMActions; import org.bdware.server.action.RequestToMaster; -import org.bdware.server.executor.consistency.PBFTExecutor; -import org.bdware.server.executor.consistency.RequestAllExecutor; import org.bdware.server.tcp.TCPClientFrameHandler; import org.bdware.server.trustedmodel.AgentManager; import org.bdware.server.trustedmodel.ContractUnitStatus; @@ -344,20 +342,9 @@ public class MasterClientRecoverMechAction { ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(cei.getContractID()); meta.setContractExecutor( MasterClientTCPAction.createContractExecutor(meta.contract, contractID, cei.getMasterNode(), cei.getMembers())); - switch (meta.contract.getType()) { - case RequestAllResponseFirst: - case RequestAllResponseHalf: - case RequestAllResponseAll: - ((RequestAllExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1); - break; - case PBFT: - ((PBFTExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1); - break; - case Sharding: - break; - default: - break; - } + Map args = new HashMap<>(); + args.put("ceiLastExeSeq", cei.getLastExeSeq()); + meta.contractExecutor.onRecover(args); // 认为contractID不会重 if (client != null && client.isProcessAlive() @@ -619,20 +606,9 @@ public class MasterClientRecoverMechAction { } CMActions.manager.multiContractRecorder.updateValue(cei); ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID); - switch (meta.contract.getType()) { - case RequestAllResponseFirst: - case RequestAllResponseHalf: - case RequestAllResponseAll: - ((RequestAllExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1); - break; - case PBFT: - ((PBFTExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1); - break; - case Sharding: - break; - default: - break; - } + Map args = new HashMap<>(); + args.put("ceiLastExeSeq", cei.getLastExeSeq()); + meta.contractExecutor.onRecover(args); boolean flag = checkAndRestart(cei); // if need,restart the contract process if (!flag) { diff --git a/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java b/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java index c7d370e..3742b2d 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java @@ -15,18 +15,12 @@ import org.bdware.sc.db.KeyValueDBUtil; import org.bdware.sc.units.MultiContractMeta; import org.bdware.sc.units.PubKeyNode; import org.bdware.sc.util.JsonUtil; +import org.bdware.sdk.consistency.ConsistencyPluginManager; import org.bdware.server.GlobalConf; import org.bdware.server.action.*; -import org.bdware.server.executor.consistency.PBFTExecutor; -import org.bdware.server.executor.consistency.RequestAllExecutor; -import org.bdware.server.executor.unconsistency.MultiPointCooperationExecutor; -import org.bdware.server.executor.unconsistency.RequestOnceExecutor; -import org.bdware.server.executor.unconsistency.ResponseOnceExecutor; import org.bdware.server.tcp.PubkeyResultCallback; import org.bdware.server.trustedmodel.ContractExecutor; import org.bdware.server.trustedmodel.KillUnitContractInfo; -import org.bdware.server.trustedmodel.SelfAdaptiveShardingExecutor; -import org.bdware.server.trustedmodel.SingleNodeExecutor; import org.bdware.units.NetworkManager; import org.bdware.units.function.ExecutionManager; import org.zz.gmhelper.SM2KeyPair; @@ -85,46 +79,12 @@ public class MasterClientTCPAction { } public static ContractExecutor createContractExecutor(Contract contract, String contractID, String masterPubkey, String[] members) { - ContractExecutor executor = null; - int nodeSize = contract.getNumOfCopies(); - switch (contract.getType()) { - case Sole: - LOGGER.info("Sole contract is not supported in multi-point mode"); - return SingleNodeExecutor.instance; - case RequestOnce: - executor = new RequestOnceExecutor(contractID); - break; - case ResponseOnce: - executor = new ResponseOnceExecutor(contractID); - break; - case RequestAllResponseFirst: - executor = - new RequestAllExecutor( - ContractExecType.RequestAllResponseFirst, 1, contractID); - break; - case RequestAllResponseHalf: - executor = - new RequestAllExecutor( - ContractExecType.RequestAllResponseHalf, - nodeSize / 2 + 1, - contractID); - break; - case RequestAllResponseAll: - executor = - new RequestAllExecutor( - ContractExecType.RequestAllResponseAll, nodeSize, contractID); - break; - case Sharding: - executor = new MultiPointCooperationExecutor(ContractExecType.Sharding, nodeSize, contractID); - break; - case SelfAdaptiveSharding: - executor = new SelfAdaptiveShardingExecutor(contractID); - break; - case PBFT: - executor = new PBFTExecutor(nodeSize, contractID, masterPubkey, members); - break; - } - return executor; + Map args = new HashMap<>(); + args.put("contractID", contractID); + args.put("nodeSize", contract.getNumOfCopies()); + args.put("masterPubkey", masterPubkey); + args.put("members", members); + return ConsistencyPluginManager.getInstance().createContractExecutor(contract.getType(), args); } public static void dealRequests(String contractID) { @@ -460,8 +420,8 @@ public class MasterClientTCPAction { if (jo.has("contractID") && jo.has("data")) { String contractID = jo.get("contractID").getAsString(); ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID); - if (null != meta && meta.contractExecutor instanceof SelfAdaptiveShardingExecutor) { - ((SelfAdaptiveShardingExecutor) meta.contractExecutor).execute(jo.get("data").getAsString()); + if (null != meta && meta.contractExecutor != null) { + meta.contractExecutor.onDeliverBlock(jo.get("data").getAsString()); } } } diff --git a/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java b/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java index c6007c7..24fe0b6 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java @@ -16,8 +16,6 @@ import org.bdware.sc.util.JsonUtil; import org.bdware.server.GlobalConf; import org.bdware.server.action.Action; import org.bdware.server.action.CMActions; -import org.bdware.server.executor.consistency.PBFTExecutor; -import org.bdware.server.executor.consistency.RequestAllExecutor; import org.bdware.server.trustedmodel.ContractUnitStatus; import org.bdware.units.NetworkManager; import org.zz.gmhelper.SM2KeyPair; @@ -296,20 +294,9 @@ public class MasterServerRecoverMechAction { meta.setContractExecutor( MasterClientTCPAction.createContractExecutor(meta.contract, contractID, cei.getMasterNode(), cei.getMembers())); - switch (meta.contract.getType()) { - case RequestAllResponseFirst: - case RequestAllResponseHalf: - case RequestAllResponseAll: - ((RequestAllExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1); - break; - case PBFT: - ((PBFTExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1); - break; - case Sharding: - break; - default: - break; - } + Map args = new HashMap<>(); + args.put("ceiLastExeSeq", cei.getLastExeSeq()); + meta.contractExecutor.onRecover(args); CMActions.manager.multiContractRecorder.updateValue(cei); LOGGER.info("[master recover] contractID2Members put 合约 " + contractID); LOGGER.info("新master恢复 MultiPointContractInfo 完成!"); diff --git a/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java b/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java index 9afa5cd..c624343 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java @@ -14,6 +14,7 @@ import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.units.MultiContractMeta; import org.bdware.sc.units.RequestCache; import org.bdware.sc.util.JsonUtil; +import org.bdware.sdk.consistency.api.NotifiableResultMerger; import org.bdware.server.CongestionControl; import org.bdware.server.action.Action; import org.bdware.server.action.CMActions; @@ -68,9 +69,9 @@ public class MasterServerTCPAction { ResultCallback cb = sync.waitObj.get(requestID); if (cb instanceof ResultCollector) { ResultCollector rc = (ResultCollector) cb; - if (rc.getCommitter() instanceof RequestAllExecutor.ResultMerger) { - RequestAllExecutor.ResultMerger merger = - (RequestAllExecutor.ResultMerger) rc.getCommitter(); + if (rc.getCommitter() instanceof NotifiableResultMerger) { + NotifiableResultMerger merger = + (NotifiableResultMerger) rc.getCommitter(); if (merger.getContractID().equals(contractID)) { LOGGER.info("node " + nodeID + " offline! in the cluster of contract " + contractID); LOGGER.debug("nodeID=" + nodeID + " contractID=" + contractID + diff --git a/src/main/java/org/bdware/server/executor/consistency/PBFTExecutor.java b/src/main/java/org/bdware/server/executor/consistency/PBFTExecutor.java index b8ac817..76d254c 100644 --- a/src/main/java/org/bdware/server/executor/consistency/PBFTExecutor.java +++ b/src/main/java/org/bdware/server/executor/consistency/PBFTExecutor.java @@ -1,375 +1,375 @@ -package org.bdware.server.executor.consistency; - -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonPrimitive; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.bdware.sc.ComponedContractResult; -import org.bdware.sc.ContractResult; -import org.bdware.sc.bean.ContractRequest; -import org.bdware.sc.conn.Node; -import org.bdware.sc.conn.OnHashCallback; -import org.bdware.sc.conn.ResultCallback; -import org.bdware.sc.consistency.Committer; -import org.bdware.sc.consistency.pbft.PBFTAlgorithm; -import org.bdware.sc.consistency.pbft.PBFTMember; -import org.bdware.sc.consistency.pbft.PBFTMessage; -import org.bdware.sc.consistency.pbft.PBFTType; -import org.bdware.sc.units.*; -import org.bdware.sc.util.JsonUtil; -import org.bdware.server.GlobalConf; -import org.bdware.server.action.CMActions; -import org.bdware.server.action.p2p.MasterServerRecoverMechAction; -import org.bdware.server.action.p2p.MasterServerTCPAction; -import org.bdware.server.trustedmodel.ContractCluster; -import org.bdware.server.trustedmodel.ContractExecutor; -import org.bdware.server.trustedmodel.MultiReqSeq; -import org.bdware.server.trustedmodel.ResultCollector; -import org.bdware.units.NetworkManager; -import org.zz.gmhelper.SM2KeyPair; - -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -//TODO 追赶差下的调用 -public class PBFTExecutor implements ContractExecutor { - private static final Logger LOGGER = LogManager.getLogger(PBFTExecutor.class); - final Object lock = new Object(); - private final List members; - int resultCount; - - AtomicInteger request_index = new AtomicInteger(0); - // key为requestID,value为其seq - Map seqMap = new ConcurrentHashMap<>(); - Map resultCache = new ConcurrentHashMap<>(); - // MultiPointContractInfo info; - String contractID; - PBFTAlgorithm pbft; - ContractCluster contractCluster; - boolean isMaster; - - public PBFTExecutor( - int c, String con_id, final String masterPubkey, String[] members) { - resultCount = c; - contractID = con_id; - this.members = new ArrayList<>(); - isMaster = GlobalConf.getNodeID().equals(masterPubkey); - pbft = new PBFTAlgorithm(isMaster); - int count = 0; - for (String mem : members) { - PubKeyNode pubkeyNode = new PubKeyNode(); - pubkeyNode.pubkey = mem; - PBFTMember pbftMember = new PBFTMember(); - pbftMember.isMaster = mem.equals(masterPubkey); - pbft.addMember(pubkeyNode, pbftMember); - this.members.add(pubkeyNode); - if (GlobalConf.getNodeID().equals(mem)) { - pbft.setSendID(count); - } - count++; - } - contractCluster = new ContractCluster(contractID, this.members); - pbft.setConnection(contractCluster); - final MultiContractMeta cei = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); - pbft.setCommitter(new Committer() { - @Override - public void onCommit(ContractRequest data) { - ResultCallback ret = null; - final long startTime = System.currentTimeMillis(); - ret = new ResultCallback() { - @Override - public void onResult(String str) { - Map ret = new HashMap<>(); - ret.put("action", "receiveTrustfullyResult"); - SM2KeyPair keyPair = GlobalConf.instance.keyPair; - ret.put("nodeID", keyPair.getPublicKeyStr()); - ret.put("responseID", data.getRequestID()); - ret.put("executeTime", (System.currentTimeMillis() - startTime) + ""); - ret.put("data", str); - cei.setLastExeSeq(data.seq); - NetworkManager.instance.sendToAgent(masterPubkey, JsonUtil.toJson(ret)); - } - }; - CMActions.manager.executeLocallyAsync(data, ret, null); - } - }); - } - - public void onSyncMessage(Node node, byte[] data) { - - pbft.onMessage(node, data); - } - - public void setSeq(int seq) { - request_index = new AtomicInteger(seq); - pbft.setAtomSeq(request_index.get()); - } - - public ResultCallback createResultCallback( - final String requestID, - final ResultCallback originalCb, - final int count, - final int request_seq, - final String contractID) { - ComponedContractResult componedContractResult = new ComponedContractResult(count); - // TODO 加对应的超时? - return new ResultCollector( - requestID, new ResultMerger(originalCb, count, request_seq, contractID), count); - } - - public void sendRequest(String id, ContractRequest req, ResultCallback collector) { -// Map reqStr = new HashMap<>(); -// reqStr.put("uniReqID", id); -// reqStr.put("data", req); -// reqStr.put("action", "executeContractLocally"); - ContractRequest cr2 = ContractRequest.parse(req.toByte()); - cr2.setRequestID(id); - PBFTMessage request = new PBFTMessage(); - request.setOrder(req.seq); - request.setType(PBFTType.Request); - request.setContent(cr2.toByte()); - for (PubKeyNode node : members) { - if (!NetworkManager.instance.hasAgentConnection(node.pubkey)) { - LOGGER.warn("cmNode " + node.pubkey.substring(0, 5) + " is null"); - collector.onResult( - "{\"status\":\"Error\",\"result\":\"node offline\"," - + "\"nodeID\":\"" - + node - + "\"," - + "\"action\":\"onExecuteContractTrustfully\"}"); -// } else if (MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID) -// != RecoverFlag.Fine) { +//package org.bdware.server.executor.consistency; +// +//import com.google.gson.JsonObject; +//import com.google.gson.JsonParser; +//import com.google.gson.JsonPrimitive; +//import org.apache.logging.log4j.LogManager; +//import org.apache.logging.log4j.Logger; +//import org.bdware.sc.ComponedContractResult; +//import org.bdware.sc.ContractResult; +//import org.bdware.sc.bean.ContractRequest; +//import org.bdware.sc.conn.Node; +//import org.bdware.sc.conn.OnHashCallback; +//import org.bdware.sc.conn.ResultCallback; +//import org.bdware.sc.consistency.Committer; +//import org.bdware.sc.consistency.pbft.PBFTAlgorithm; +//import org.bdware.sc.consistency.pbft.PBFTMember; +//import org.bdware.sc.consistency.pbft.PBFTMessage; +//import org.bdware.sc.consistency.pbft.PBFTType; +//import org.bdware.sc.units.*; +//import org.bdware.sc.util.JsonUtil; +//import org.bdware.server.GlobalConf; +//import org.bdware.server.action.CMActions; +//import org.bdware.server.action.p2p.MasterServerRecoverMechAction; +//import org.bdware.server.action.p2p.MasterServerTCPAction; +//import org.bdware.server.trustedmodel.ContractCluster; +//import org.bdware.server.trustedmodel.ContractExecutor; +//import org.bdware.server.trustedmodel.MultiReqSeq; +//import org.bdware.server.trustedmodel.ResultCollector; +//import org.bdware.units.NetworkManager; +//import org.zz.gmhelper.SM2KeyPair; +// +//import java.util.*; +//import java.util.concurrent.ConcurrentHashMap; +//import java.util.concurrent.atomic.AtomicInteger; +// +////TODO 追赶差下的调用 +//public class PBFTExecutor implements ContractExecutor { +// private static final Logger LOGGER = LogManager.getLogger(PBFTExecutor.class); +// final Object lock = new Object(); +// private final List members; +// int resultCount; +// +// AtomicInteger request_index = new AtomicInteger(0); +// // key为requestID,value为其seq +// Map seqMap = new ConcurrentHashMap<>(); +// Map resultCache = new ConcurrentHashMap<>(); +// // MultiPointContractInfo info; +// String contractID; +// PBFTAlgorithm pbft; +// ContractCluster contractCluster; +// boolean isMaster; +// +// public PBFTExecutor( +// int c, String con_id, final String masterPubkey, String[] members) { +// resultCount = c; +// contractID = con_id; +// this.members = new ArrayList<>(); +// isMaster = GlobalConf.getNodeID().equals(masterPubkey); +// pbft = new PBFTAlgorithm(isMaster); +// int count = 0; +// for (String mem : members) { +// PubKeyNode pubkeyNode = new PubKeyNode(); +// pubkeyNode.pubkey = mem; +// PBFTMember pbftMember = new PBFTMember(); +// pbftMember.isMaster = mem.equals(masterPubkey); +// pbft.addMember(pubkeyNode, pbftMember); +// this.members.add(pubkeyNode); +// if (GlobalConf.getNodeID().equals(mem)) { +// pbft.setSendID(count); +// } +// count++; +// } +// contractCluster = new ContractCluster(contractID, this.members); +// pbft.setConnection(contractCluster); +// final MultiContractMeta cei = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); +// pbft.setCommitter(new Committer() { +// @Override +// public void onCommit(ContractRequest data) { +// ResultCallback ret = null; +// final long startTime = System.currentTimeMillis(); +// ret = new ResultCallback() { +// @Override +// public void onResult(String str) { +// Map ret = new HashMap<>(); +// ret.put("action", "receiveTrustfullyResult"); +// SM2KeyPair keyPair = GlobalConf.instance.keyPair; +// ret.put("nodeID", keyPair.getPublicKeyStr()); +// ret.put("responseID", data.getRequestID()); +// ret.put("executeTime", (System.currentTimeMillis() - startTime) + ""); +// ret.put("data", str); +// cei.setLastExeSeq(data.seq); +// NetworkManager.instance.sendToAgent(masterPubkey, JsonUtil.toJson(ret)); +// } +// }; +// CMActions.manager.executeLocallyAsync(data, ret, null); +// } +// }); +// } +// +// public void onSyncMessage(Node node, byte[] data) { +// +// pbft.onMessage(node, data); +// } +// +// public void setSeq(int seq) { +// request_index = new AtomicInteger(seq); +// pbft.setAtomSeq(request_index.get()); +// } +// +// public ResultCallback createResultCallback( +// final String requestID, +// final ResultCallback originalCb, +// final int count, +// final int request_seq, +// final String contractID) { +// ComponedContractResult componedContractResult = new ComponedContractResult(count); +// // TODO 加对应的超时? +// return new ResultCollector( +// requestID, new ResultMerger(originalCb, count, request_seq, contractID), count); +// } +// +// public void sendRequest(String id, ContractRequest req, ResultCallback collector) { +//// Map reqStr = new HashMap<>(); +//// reqStr.put("uniReqID", id); +//// reqStr.put("data", req); +//// reqStr.put("action", "executeContractLocally"); +// ContractRequest cr2 = ContractRequest.parse(req.toByte()); +// cr2.setRequestID(id); +// PBFTMessage request = new PBFTMessage(); +// request.setOrder(req.seq); +// request.setType(PBFTType.Request); +// request.setContent(cr2.toByte()); +// for (PubKeyNode node : members) { +// if (!NetworkManager.instance.hasAgentConnection(node.pubkey)) { +// LOGGER.warn("cmNode " + node.pubkey.substring(0, 5) + " is null"); // collector.onResult( -// "{\"status\":\"Error\",\"result\":\"node recovering\"," +// "{\"status\":\"Error\",\"result\":\"node offline\"," // + "\"nodeID\":\"" // + node // + "\"," // + "\"action\":\"onExecuteContractTrustfully\"}"); +//// } else if (MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID) +//// != RecoverFlag.Fine) { +//// collector.onResult( +//// "{\"status\":\"Error\",\"result\":\"node recovering\"," +//// + "\"nodeID\":\"" +//// + node +//// + "\"," +//// + "\"action\":\"onExecuteContractTrustfully\"}"); +//// contractCluster.sendMessage(node, request.getBytes()); +// } else { // contractCluster.sendMessage(node, request.getBytes()); - } else { - contractCluster.sendMessage(node, request.getBytes()); - } - } - // master负责缓存请求 - if (!MasterServerTCPAction.requestCache.containsKey(contractID)) { - MasterServerTCPAction.requestCache.put(contractID, new RequestCache()); - } - // TODO 多调多统一个seq的有多个请求,这个需要改 - String[] nodes = - CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers(); - LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID()); - LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes)); - - } - - - public boolean checkCurNodeNumValid() { - return true; - } - - @Override - public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { - LOGGER.debug(JsonUtil.toJson(req)); - MultiContractMeta meta = CMActions.manager.multiContractRecorder.getMultiContractMeta(req.getContractID()); - if (meta == null || !meta.isMaster()) { - CMActions.manager.executeContractOnOtherNodes(req, rc); - return; - } - req.setContractID(CMActions.manager.getContractIDByName(req.getContractID())); - - // 三个相同requestID进来的时候,会有冲突。 - // 仅在此处有冲突么? - // 这里是从MasterServer->MasterClient,请求的是"executeContractLocally"。 - - // 如果是多点合约的请求,A1、A2、A3的序号应该一致,不能分配一个新的seq,根据requestID判断是否不需要重新分配一个序号 - //TODO seqMap memory leak - //TODO - //TODO - if (null != requestID && requestID.endsWith("_mul")) { - synchronized (lock) { - if (seqMap.containsKey(requestID)) { - req.seq = seqMap.get(requestID).seq; - } else { - req.seq = request_index.getAndIncrement(); - seqMap.put(requestID, new MultiReqSeq(req.seq)); - } - } - } else { - req.seq = request_index.getAndIncrement(); - } - req.needSeq = true; - String id = - System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq; - LOGGER.info("execute receive requestID=" + requestID + " msgID=" + id); - - if (checkCurNodeNumValid()) { - LOGGER.debug("checkCurNodeNumValid=true"); - ResultCallback collector = - createResultCallback(id, rc, resultCount, req.seq, req.getContractID()); - MasterServerTCPAction.sync.sleep(id, collector); - LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq); - sendRequest(id, req, collector); - } else { - LOGGER.debug("invalidNodeNumOnResult"); - request_index.getAndDecrement(); - ContractResult finalResult = - new ContractResult( - ContractResult.Status.Error, - new JsonPrimitive("node number unavailable, request refused.")); - rc.onResult(JsonUtil.toJson(finalResult)); - } - - // } - - /* // 三个相同requestID进来的时候,会有冲突。 - // 仅在此处有冲突么? - // 这里是从MasterServer->MasterClient,请求的是"executeContractLocally"。 - req.seq = request_index.getAndIncrement(); - req.needSeq = true; - ResultCallback collector = createResultCallback(id, rc, resultCount, req.getContractID()); - MasterServerTCPAction.sync.sleep(id, collector); - sendRequest(id, req, collector);*/ - } - - // 清理缓存的多点合约请求序号 - public void clearCache() { - final long time = System.currentTimeMillis() - 30000L; - seqMap.entrySet() - .removeIf( - entry -> { - MultiReqSeq cache = entry.getValue(); - if (null == cache) { - return true; - } - return cache.startTime < time; - }); - } - - public static class ResultMerger extends ResultCallback { - ComponedContractResult componedContractResult; - AtomicInteger order; - String contractID; - int count; - int request_seq; - ResultCallback originalCallback; - Set nodeIDs = new HashSet<>(); // 已收到返回结果的节点 - - ResultMerger( - final ResultCallback originalCb, - final int count, - final int request_seq, - final String contractID) { - originalCallback = originalCb; - this.count = count; - this.request_seq = request_seq; - this.contractID = contractID; - componedContractResult = new ComponedContractResult(count); - order = new AtomicInteger(0); - } - - public String getContractID() { - return contractID; - } - - public String getInfo() { - return "contractID=" - + contractID - + " 收到第 " - + order - + " 个节点回复 : " - + " order=" - + order - + " count=" - + count - + " "; - } - - @Override - public void onResult(String str) { - // TODO 必须在这里聚合。 - // str的data是个ContractResult - // 在这儿也是返回个ContractResult - try { - LOGGER.debug("a result of contract" + contractID + ": " + str); - JsonObject obj = JsonParser.parseString(str).getAsJsonObject(); - if (obj.has("nodeID")) { - String id = obj.get("nodeID").getAsString(); - if (nodeIDs.contains(id)) { - LOGGER.debug( - "ignored result because the result of node " - + id.substring(0, 5) - + " has been received"); - return; - } - nodeIDs.add(id); - } - - LOGGER.debug( - String.format( - "contractID=%s received=%s order=%d count=%d", - contractID, str, order.get(), count)); - componedContractResult.add(obj); - // 收集到所有结果 - if (order.incrementAndGet() == count) { - ContractResult finalResult = componedContractResult.figureFinalResult(); - finalResult.needSeq = true; - finalResult.seq = request_seq; - - // if (null == finalResult) { - // finalResult = - // new ContractResult( - // ContractResult.Status.Exception, - // new JsonPrimitive( - // "no nore than half of the - // consistent result")); - // originalCallback.onResult(new - // Gson().toJson(finalResult)); - // } else { - originalCallback.onResult(JsonUtil.toJson(finalResult)); - // } - LOGGER.debug( - String.format( - "%d results are the same: %s", - finalResult.size, finalResult.result)); - - // 集群中事务序号+1 - CMActions.manager.multiContractRecorder - .getMultiContractMeta(contractID) - .nextSeqAtMaster(); - - // recover,其中无状态合约CP出错无需恢复 - Set nodesID = componedContractResult.getProblemNodes(); - if (null == nodesID || nodesID.isEmpty()) { - return; - } - for (String nodeID : nodesID) { - LOGGER.warn("node fails! " + nodeID); - if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID) - == RecoverFlag.Fine) { - MasterServerRecoverMechAction.recoverStatus - .get(nodeID) - .put(contractID, RecoverFlag.ToRecover); - } - } - for (String nodeID : nodesID) { - if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID) - == RecoverFlag.ToRecover) { - LOGGER.warn("node in recover " + nodeID); - - // 因为该节点结果有误,所以即时是stableMode也认为trans记录不可信 - // 直接通过load别的节点来恢复 - MasterServerRecoverMechAction.restartContractFromCommonMode( - nodeID, contractID); - } - } - } - // clearCache(); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn("result exception!"); - } - } - } -} +// } +// } +// // master负责缓存请求 +// if (!MasterServerTCPAction.requestCache.containsKey(contractID)) { +// MasterServerTCPAction.requestCache.put(contractID, new RequestCache()); +// } +// // TODO 多调多统一个seq的有多个请求,这个需要改 +// String[] nodes = +// CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers(); +// LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID()); +// LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes)); +// +// } +// +// +// public boolean checkCurNodeNumValid() { +// return true; +// } +// +// @Override +// public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { +// LOGGER.debug(JsonUtil.toJson(req)); +// MultiContractMeta meta = CMActions.manager.multiContractRecorder.getMultiContractMeta(req.getContractID()); +// if (meta == null || !meta.isMaster()) { +// CMActions.manager.executeContractOnOtherNodes(req, rc); +// return; +// } +// req.setContractID(CMActions.manager.getContractIDByName(req.getContractID())); +// +// // 三个相同requestID进来的时候,会有冲突。 +// // 仅在此处有冲突么? +// // 这里是从MasterServer->MasterClient,请求的是"executeContractLocally"。 +// +// // 如果是多点合约的请求,A1、A2、A3的序号应该一致,不能分配一个新的seq,根据requestID判断是否不需要重新分配一个序号 +// //TODO seqMap memory leak +// //TODO +// //TODO +// if (null != requestID && requestID.endsWith("_mul")) { +// synchronized (lock) { +// if (seqMap.containsKey(requestID)) { +// req.seq = seqMap.get(requestID).seq; +// } else { +// req.seq = request_index.getAndIncrement(); +// seqMap.put(requestID, new MultiReqSeq(req.seq)); +// } +// } +// } else { +// req.seq = request_index.getAndIncrement(); +// } +// req.needSeq = true; +// String id = +// System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq; +// LOGGER.info("execute receive requestID=" + requestID + " msgID=" + id); +// +// if (checkCurNodeNumValid()) { +// LOGGER.debug("checkCurNodeNumValid=true"); +// ResultCallback collector = +// createResultCallback(id, rc, resultCount, req.seq, req.getContractID()); +// MasterServerTCPAction.sync.sleep(id, collector); +// LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq); +// sendRequest(id, req, collector); +// } else { +// LOGGER.debug("invalidNodeNumOnResult"); +// request_index.getAndDecrement(); +// ContractResult finalResult = +// new ContractResult( +// ContractResult.Status.Error, +// new JsonPrimitive("node number unavailable, request refused.")); +// rc.onResult(JsonUtil.toJson(finalResult)); +// } +// +// // } +// +// /* // 三个相同requestID进来的时候,会有冲突。 +// // 仅在此处有冲突么? +// // 这里是从MasterServer->MasterClient,请求的是"executeContractLocally"。 +// req.seq = request_index.getAndIncrement(); +// req.needSeq = true; +// ResultCallback collector = createResultCallback(id, rc, resultCount, req.getContractID()); +// MasterServerTCPAction.sync.sleep(id, collector); +// sendRequest(id, req, collector);*/ +// } +// +// // 清理缓存的多点合约请求序号 +// public void clearCache() { +// final long time = System.currentTimeMillis() - 30000L; +// seqMap.entrySet() +// .removeIf( +// entry -> { +// MultiReqSeq cache = entry.getValue(); +// if (null == cache) { +// return true; +// } +// return cache.startTime < time; +// }); +// } +// +// public static class ResultMerger extends ResultCallback { +// ComponedContractResult componedContractResult; +// AtomicInteger order; +// String contractID; +// int count; +// int request_seq; +// ResultCallback originalCallback; +// Set nodeIDs = new HashSet<>(); // 已收到返回结果的节点 +// +// ResultMerger( +// final ResultCallback originalCb, +// final int count, +// final int request_seq, +// final String contractID) { +// originalCallback = originalCb; +// this.count = count; +// this.request_seq = request_seq; +// this.contractID = contractID; +// componedContractResult = new ComponedContractResult(count); +// order = new AtomicInteger(0); +// } +// +// public String getContractID() { +// return contractID; +// } +// +// public String getInfo() { +// return "contractID=" +// + contractID +// + " 收到第 " +// + order +// + " 个节点回复 : " +// + " order=" +// + order +// + " count=" +// + count +// + " "; +// } +// +// @Override +// public void onResult(String str) { +// // TODO 必须在这里聚合。 +// // str的data是个ContractResult +// // 在这儿也是返回个ContractResult +// try { +// LOGGER.debug("a result of contract" + contractID + ": " + str); +// JsonObject obj = JsonParser.parseString(str).getAsJsonObject(); +// if (obj.has("nodeID")) { +// String id = obj.get("nodeID").getAsString(); +// if (nodeIDs.contains(id)) { +// LOGGER.debug( +// "ignored result because the result of node " +// + id.substring(0, 5) +// + " has been received"); +// return; +// } +// nodeIDs.add(id); +// } +// +// LOGGER.debug( +// String.format( +// "contractID=%s received=%s order=%d count=%d", +// contractID, str, order.get(), count)); +// componedContractResult.add(obj); +// // 收集到所有结果 +// if (order.incrementAndGet() == count) { +// ContractResult finalResult = componedContractResult.figureFinalResult(); +// finalResult.needSeq = true; +// finalResult.seq = request_seq; +// +// // if (null == finalResult) { +// // finalResult = +// // new ContractResult( +// // ContractResult.Status.Exception, +// // new JsonPrimitive( +// // "no nore than half of the +// // consistent result")); +// // originalCallback.onResult(new +// // Gson().toJson(finalResult)); +// // } else { +// originalCallback.onResult(JsonUtil.toJson(finalResult)); +// // } +// LOGGER.debug( +// String.format( +// "%d results are the same: %s", +// finalResult.size, finalResult.result)); +// +// // 集群中事务序号+1 +// CMActions.manager.multiContractRecorder +// .getMultiContractMeta(contractID) +// .nextSeqAtMaster(); +// +// // recover,其中无状态合约CP出错无需恢复 +// Set nodesID = componedContractResult.getProblemNodes(); +// if (null == nodesID || nodesID.isEmpty()) { +// return; +// } +// for (String nodeID : nodesID) { +// LOGGER.warn("node fails! " + nodeID); +// if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID) +// == RecoverFlag.Fine) { +// MasterServerRecoverMechAction.recoverStatus +// .get(nodeID) +// .put(contractID, RecoverFlag.ToRecover); +// } +// } +// for (String nodeID : nodesID) { +// if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID) +// == RecoverFlag.ToRecover) { +// LOGGER.warn("node in recover " + nodeID); +// +// // 因为该节点结果有误,所以即时是stableMode也认为trans记录不可信 +// // 直接通过load别的节点来恢复 +// MasterServerRecoverMechAction.restartContractFromCommonMode( +// nodeID, contractID); +// } +// } +// } +// // clearCache(); +// } catch (Exception e) { +// e.printStackTrace(); +// LOGGER.warn("result exception!"); +// } +// } +// } +//} diff --git a/src/main/java/org/bdware/server/executor/unconsistency/MultiPointCooperationExecutor.java b/src/main/java/org/bdware/server/executor/unconsistency/MultiPointCooperationExecutor.java index 6d7668e..e487116 100644 --- a/src/main/java/org/bdware/server/executor/unconsistency/MultiPointCooperationExecutor.java +++ b/src/main/java/org/bdware/server/executor/unconsistency/MultiPointCooperationExecutor.java @@ -1,400 +1,400 @@ -package org.bdware.server.executor.unconsistency; - -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonPrimitive; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.bdware.sc.ComponedContractResult; -import org.bdware.sc.ContractMeta; -import org.bdware.sc.ContractResult; -import org.bdware.sc.bean.*; -import org.bdware.sc.conn.OnHashCallback; -import org.bdware.sc.conn.ResultCallback; -import org.bdware.sc.units.MultiContractMeta; -import org.bdware.sc.units.RecoverFlag; -import org.bdware.sc.units.RequestCache; -import org.bdware.sc.units.ResultCache; -import org.bdware.sc.util.JsonUtil; -import org.bdware.server.action.CMActions; -import org.bdware.server.action.p2p.MasterServerRecoverMechAction; -import org.bdware.server.action.p2p.MasterServerTCPAction; -import org.bdware.server.trustedmodel.ContractExecutor; -import org.bdware.server.trustedmodel.MultiReqSeq; -import org.bdware.server.trustedmodel.ResultCollector; -import org.bdware.units.NetworkManager; - -import java.math.BigInteger; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -// 改为MultiPointCooperationExecutor -public class MultiPointCooperationExecutor implements ContractExecutor { - private static final Logger LOGGER = LogManager.getLogger(MultiPointCooperationExecutor.class); - final Object lock = new Object(); - int resultCount; - AtomicInteger request_index = new AtomicInteger(0); - ContractExecType type; - // key为requestID,value为其seq - Map seqMap = new ConcurrentHashMap<>(); - Map resultCache = new ConcurrentHashMap<>(); - // MultiPointContractInfo info; - MultiContractMeta multiMeta; - String contractID; - - public MultiPointCooperationExecutor(ContractExecType t, int c, String con_id) { - LOGGER.info("-- sharding executor---"); - type = t; - resultCount = c; - contractID = con_id; - multiMeta = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); - } - - public void setSeq(int seq) { - request_index = new AtomicInteger(seq); - } - - public ResultCallback createResultCallback( - final String requestID, - final ResultCallback originalCb, - final int count, - final int request_seq, - final String contractID, JoinInfo joinInfo) { - // TODO 加对应的超时? - return new ResultCollector( - requestID, - new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID, joinInfo), - count); // 把count改成了1,设置成获得1个响应就行 - } - - public void sendRequest(String id, ContractRequest req, String[] nodes) { - Map reqStr = new HashMap<>(); - reqStr.put("uniReqID", id); - reqStr.put("data", req); - req.needSeq = false; - reqStr.put("action", "executeContractLocally"); - String sendStr = JsonUtil.toJson(reqStr); - // master负责缓存请求 - if (!MasterServerTCPAction.requestCache.containsKey(contractID)) { - MasterServerTCPAction.requestCache.put(contractID, new RequestCache()); - } - // TODO 多调多统一个seq的有多个请求,这个需要改 - MasterServerTCPAction.requestCache.get(contractID).put(req.seq, sendStr); - LOGGER.debug(JsonUtil.toJson(req)); - LOGGER.info("node size = " + nodes.length); - LOGGER.debug("nodes:" + JsonUtil.toJson(nodes)); - for (String node : nodes) { - LOGGER.info( - "[sendRequests] get cmNode " - + node.substring(0, 5) - + " not null " - + "RequestAllExecutor 发送请求给 " - + node.substring(0, 5)); - NetworkManager.instance.sendToAgent(node, sendStr); - } - } - - private String[] getAccordingToRouteInfo(RouteInfo routeInfo, ContractRequest req, String[] members) { - try { - int val; - switch (routeInfo.useDefault) { - case byRequester: - val = - new BigInteger(req.getRequester(), 16) - .mod(new BigInteger("" + members.length)) - .intValue(); - while (val < 0) { - val = val + members.length; - } - return new String[]{members[val]}; - case byArgHash: - val = req.getArg().hashCode(); - val = val % members.length; - while (val < 0) { - val += members.length; - } - return new String[]{members[val]}; - case byTarget: - JsonObject jo = req.getArg().getAsJsonObject(); - val = - new BigInteger(jo.get("target").getAsString(), 16) - .mod(new BigInteger("" + members.length)) - .intValue(); - while (val < 0) { - val = val + members.length; - } - return new String[]{members[val]}; - default: - return members; - } - } catch (Exception e) { - return members; - } - } - - public boolean checkCurNodeNumValid() { - LOGGER.info("checkCurNodeNumValid"); - String[] nodes = multiMeta.getMembers(); - // List nodes = info.members; - int validNode = 0; - for (String node : nodes) { - if (NetworkManager.instance.hasAgentConnection(node) - && MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID) - == RecoverFlag.Fine) { - validNode++; - } - } - int c = resultCount; - if (type == ContractExecType.Sharding) c = (int) Math.ceil((double) c / 2); - LOGGER.info("c=" + c + " validNode=" + validNode); - return validNode >= c; - } - - @Override - public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { - LOGGER.info("[MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req)); - // 获得action 函数名 - LOGGER.info("action is : " + req.getAction()); - req.setContractID(CMActions.manager.getContractIDByName(req.getContractID())); - if (requestID != null && requestID.endsWith("_mul")) { - synchronized (lock) { - if (seqMap.containsKey(requestID)) { - req.seq = seqMap.get(requestID).seq; - } else { - req.seq = request_index.getAndIncrement(); - seqMap.put(requestID, new MultiReqSeq(req.seq)); - } - } - } else { - req.seq = request_index.getAndIncrement(); - } - req.needSeq = true; - String id = - System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq; - LOGGER.info("execute receive requestID= " + requestID + " msgID=" + id); - if (checkCurNodeNumValid()) { // 校验成功 current node num 合法 - LOGGER.info("checkCurNodeNumValid true"); - ContractMeta meta = - CMActions.manager.statusRecorder.getContractMeta(req.getContractID()); - FunctionDesp fun = meta.getExportedFunction(req.getAction()); - ResultCallback collector; - // TODO @fanbo 下面的count 1要改,应该是根据route的规则来。 - //Count 根据join规则来。 - //nodes 根据route规则来。 - JoinInfo joinInfo = fun.joinInfo; - RouteInfo routeInfo = fun.routeInfo; - int count = getJoinCount(joinInfo, contractID); - LOGGER.info("requestID=" + requestID + " join Count: " + count); - - String[] members = multiMeta.getMembers(); - String[] nodes = getAccordingToRouteInfo(routeInfo, req, members); - if (nodes.length < count) { - count = nodes.length; - } - collector = - createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器 - MasterServerTCPAction.sync.sleep(id, collector); - LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq); - sendRequest(id, req, nodes); // 发送请求 - } else { - LOGGER.info("invalidNodeNumOnResult"); - request_index.getAndDecrement(); - ContractResult finalResult = - new ContractResult( - ContractResult.Status.Error, - new JsonPrimitive("node number unavailbale,request refused.")); - rc.onResult(JsonUtil.toJson(finalResult)); - } - } - - private int getJoinCount(JoinInfo joinInfo, String contractID) { - if (joinInfo == null) return resultCount; - if (joinInfo.joinCount.isJsonPrimitive() && joinInfo.joinCount.getAsJsonPrimitive().isNumber()) { - return joinInfo.joinCount.getAsJsonPrimitive().getAsInt(); - } - try { - ContractRequest cr = new ContractRequest(); - cr.setContractID(contractID); - cr.setAction(joinInfo.joinCount.getAsString()); - //TODO Arg需要好好设计一下。 - //TODO 又好用又简单的那种设计 - //TODO - cr.setArg(""); - String result = CMActions.manager.executeLocally(cr, null); - return JsonUtil.parseString(result).getAsJsonObject().get("result").getAsInt(); - } catch (Exception e) { - e.printStackTrace(); - return 1; - } - } - - // 清理缓存的多点合约请求序号 - public void clearCache() { - final long time = System.currentTimeMillis() - 30000L; - seqMap.entrySet() - .removeIf( - entry -> { - MultiReqSeq cache = entry.getValue(); - if (null == cache) { - return true; - } - return cache.startTime < time; - }); - } - - public static class ResultMerger extends ResultCallback { - ComponedContractResult componedContractResult; - AtomicInteger order; - String contractID; - int count; // 记录有多少个节点 - int request_seq; - ResultCallback originalCallback; - Set nodeIDs = new HashSet<>(); // 已收到返回结果的节点 - JoinInfo joinInfo; - - ResultMerger( - final ResultCallback originalCb, - final int count, - final int request_seq, - final String contractID, - final JoinInfo joinInfo) { - originalCallback = originalCb; - this.count = count; - this.request_seq = request_seq; - this.contractID = contractID; - componedContractResult = new ComponedContractResult(count); - order = new AtomicInteger(0); - this.joinInfo = joinInfo; - } - - public String getInfo() { - return "contractID=" - + contractID - + " 收到第 " - + order - + " 个节点回复 : " - + " order=" - + order - + " count=" - + count - + " "; - } - - @Override - public void onResult(String str) { - // TODO 必须在这里聚合。 - // str的data是个ContractResult - // 在这儿也是返回个ContractResult - try { - LOGGER.info(str); - JsonObject obj = JsonParser.parseString(str).getAsJsonObject(); - String id = obj.get("nodeID").getAsString(); - if (nodeIDs.contains(id)) { - LOGGER.info("已经收到节点 " + id.substring(0, 5) + " 的结果,该结果被忽略"); - return; - } - nodeIDs.add(id); - LOGGER.info( - "contractID=" - + contractID - + " 收到第 " - + order - + " 个节点回复 : " - + str - + " order=" - + order - + " count=" - + count); - componedContractResult.add(obj); - // 收集到所有结果 - if (order.incrementAndGet() == count) { - ContractResult finalResult = componedContractResult.mergeFinalResult(); - - finalResult.needSeq = true; - finalResult.seq = request_seq; - - // if (null == finalResult) { - // finalResult = - // new ContractResult( - // ContractResult.Status.Exception, - // new JsonPrimitive( - // "no nore than half of the - // consistent result")); - // originalCallback.onResult(new - // Gson().toJson(finalResult)); - // } else { - if (joinInfo != null) { - handleJoinInfo(finalResult, joinInfo); - } - originalCallback.onResult(JsonUtil.toJson(finalResult)); - // } - LOGGER.info( - "本次执行最终结果为 " + finalResult.size + "个节点合并的,结果为 " + finalResult.result); - - // 集群中事务序号+1 - // MasterServerTCPAction.contractID2Members.get(contractID).nextSeq(); - CMActions.manager - .multiContractRecorder - .getMultiContractMeta(contractID) - .nextSeqAtMaster(); - // recover,其中无状态合约CP出错无需恢复 - Set nodesID = componedContractResult.getProblemNodes(); - if (null == nodesID || nodesID.isEmpty()) { - return; - } - for (String nodeID : nodesID) { - LOGGER.info("结果出现问题的节点有:" + nodeID); - if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID) - == RecoverFlag.Fine) { - MasterServerRecoverMechAction.recoverStatus - .get(nodeID) - .put(contractID, RecoverFlag.ToRecover); - } - } - for (String nodeID : nodesID) { - if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID) - == RecoverFlag.ToRecover) { - LOGGER.info("问题节点开始恢复:" + nodeID); - - // 因为该节点结果有误,所以即时是stableMode也认为trans记录不可信 - // 直接通过load别的节点来恢复 - MasterServerRecoverMechAction.restartContractFromCommonMode( - nodeID, contractID); - } - } - } - // clearCache(); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.info("本次执行最终结果为有异常"); - } - } - - private void handleJoinInfo(ContractResult finalResult, JoinInfo joinInfo) { - JsonObject jo = finalResult.result.getAsJsonObject(); - if (joinInfo != null && joinInfo.joinRule != null) { - //TODO 不应该是double 类型 - switch (joinInfo.joinRule) { - case "add": - double val = 0; - for (String key : jo.keySet()) { - val += jo.get(key).getAsDouble(); - } - finalResult.result = new JsonPrimitive(val); - break; - case "multiply": - val = 1; - for (String key : jo.keySet()) { - val *= jo.get(key).getAsDouble(); - } - finalResult.result = new JsonPrimitive(val); - break; - } - } - } - } -} +//package org.bdware.server.executor.unconsistency; +// +//import com.google.gson.JsonObject; +//import com.google.gson.JsonParser; +//import com.google.gson.JsonPrimitive; +//import org.apache.logging.log4j.LogManager; +//import org.apache.logging.log4j.Logger; +//import org.bdware.sc.ComponedContractResult; +//import org.bdware.sc.ContractMeta; +//import org.bdware.sc.ContractResult; +//import org.bdware.sc.bean.*; +//import org.bdware.sc.conn.OnHashCallback; +//import org.bdware.sc.conn.ResultCallback; +//import org.bdware.sc.units.MultiContractMeta; +//import org.bdware.sc.units.RecoverFlag; +//import org.bdware.sc.units.RequestCache; +//import org.bdware.sc.units.ResultCache; +//import org.bdware.sc.util.JsonUtil; +//import org.bdware.server.action.CMActions; +//import org.bdware.server.action.p2p.MasterServerRecoverMechAction; +//import org.bdware.server.action.p2p.MasterServerTCPAction; +//import org.bdware.server.trustedmodel.ContractExecutor; +//import org.bdware.server.trustedmodel.MultiReqSeq; +//import org.bdware.server.trustedmodel.ResultCollector; +//import org.bdware.units.NetworkManager; +// +//import java.math.BigInteger; +//import java.util.HashMap; +//import java.util.HashSet; +//import java.util.Map; +//import java.util.Set; +//import java.util.concurrent.ConcurrentHashMap; +//import java.util.concurrent.atomic.AtomicInteger; +// +//// 改为MultiPointCooperationExecutor +//public class MultiPointCooperationExecutor implements ContractExecutor { +// private static final Logger LOGGER = LogManager.getLogger(MultiPointCooperationExecutor.class); +// final Object lock = new Object(); +// int resultCount; +// AtomicInteger request_index = new AtomicInteger(0); +// ContractExecType type; +// // key为requestID,value为其seq +// Map seqMap = new ConcurrentHashMap<>(); +// Map resultCache = new ConcurrentHashMap<>(); +// // MultiPointContractInfo info; +// MultiContractMeta multiMeta; +// String contractID; +// +// public MultiPointCooperationExecutor(ContractExecType t, int c, String con_id) { +// LOGGER.info("-- sharding executor---"); +// type = t; +// resultCount = c; +// contractID = con_id; +// multiMeta = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); +// } +// +// public void setSeq(int seq) { +// request_index = new AtomicInteger(seq); +// } +// +// public ResultCallback createResultCallback( +// final String requestID, +// final ResultCallback originalCb, +// final int count, +// final int request_seq, +// final String contractID, JoinInfo joinInfo) { +// // TODO 加对应的超时? +// return new ResultCollector( +// requestID, +// new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID, joinInfo), +// count); // 把count改成了1,设置成获得1个响应就行 +// } +// +// public void sendRequest(String id, ContractRequest req, String[] nodes) { +// Map reqStr = new HashMap<>(); +// reqStr.put("uniReqID", id); +// reqStr.put("data", req); +// req.needSeq = false; +// reqStr.put("action", "executeContractLocally"); +// String sendStr = JsonUtil.toJson(reqStr); +// // master负责缓存请求 +// if (!MasterServerTCPAction.requestCache.containsKey(contractID)) { +// MasterServerTCPAction.requestCache.put(contractID, new RequestCache()); +// } +// // TODO 多调多统一个seq的有多个请求,这个需要改 +// MasterServerTCPAction.requestCache.get(contractID).put(req.seq, sendStr); +// LOGGER.debug(JsonUtil.toJson(req)); +// LOGGER.info("node size = " + nodes.length); +// LOGGER.debug("nodes:" + JsonUtil.toJson(nodes)); +// for (String node : nodes) { +// LOGGER.info( +// "[sendRequests] get cmNode " +// + node.substring(0, 5) +// + " not null " +// + "RequestAllExecutor 发送请求给 " +// + node.substring(0, 5)); +// NetworkManager.instance.sendToAgent(node, sendStr); +// } +// } +// +// private String[] getAccordingToRouteInfo(RouteInfo routeInfo, ContractRequest req, String[] members) { +// try { +// int val; +// switch (routeInfo.useDefault) { +// case byRequester: +// val = +// new BigInteger(req.getRequester(), 16) +// .mod(new BigInteger("" + members.length)) +// .intValue(); +// while (val < 0) { +// val = val + members.length; +// } +// return new String[]{members[val]}; +// case byArgHash: +// val = req.getArg().hashCode(); +// val = val % members.length; +// while (val < 0) { +// val += members.length; +// } +// return new String[]{members[val]}; +// case byTarget: +// JsonObject jo = req.getArg().getAsJsonObject(); +// val = +// new BigInteger(jo.get("target").getAsString(), 16) +// .mod(new BigInteger("" + members.length)) +// .intValue(); +// while (val < 0) { +// val = val + members.length; +// } +// return new String[]{members[val]}; +// default: +// return members; +// } +// } catch (Exception e) { +// return members; +// } +// } +// +// public boolean checkCurNodeNumValid() { +// LOGGER.info("checkCurNodeNumValid"); +// String[] nodes = multiMeta.getMembers(); +// // List nodes = info.members; +// int validNode = 0; +// for (String node : nodes) { +// if (NetworkManager.instance.hasAgentConnection(node) +// && MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID) +// == RecoverFlag.Fine) { +// validNode++; +// } +// } +// int c = resultCount; +// if (type == ContractExecType.Sharding) c = (int) Math.ceil((double) c / 2); +// LOGGER.info("c=" + c + " validNode=" + validNode); +// return validNode >= c; +// } +// +// @Override +// public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { +// LOGGER.info("[MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req)); +// // 获得action 函数名 +// LOGGER.info("action is : " + req.getAction()); +// req.setContractID(CMActions.manager.getContractIDByName(req.getContractID())); +// if (requestID != null && requestID.endsWith("_mul")) { +// synchronized (lock) { +// if (seqMap.containsKey(requestID)) { +// req.seq = seqMap.get(requestID).seq; +// } else { +// req.seq = request_index.getAndIncrement(); +// seqMap.put(requestID, new MultiReqSeq(req.seq)); +// } +// } +// } else { +// req.seq = request_index.getAndIncrement(); +// } +// req.needSeq = true; +// String id = +// System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq; +// LOGGER.info("execute receive requestID= " + requestID + " msgID=" + id); +// if (checkCurNodeNumValid()) { // 校验成功 current node num 合法 +// LOGGER.info("checkCurNodeNumValid true"); +// ContractMeta meta = +// CMActions.manager.statusRecorder.getContractMeta(req.getContractID()); +// FunctionDesp fun = meta.getExportedFunction(req.getAction()); +// ResultCallback collector; +// // TODO @fanbo 下面的count 1要改,应该是根据route的规则来。 +// //Count 根据join规则来。 +// //nodes 根据route规则来。 +// JoinInfo joinInfo = fun.joinInfo; +// RouteInfo routeInfo = fun.routeInfo; +// int count = getJoinCount(joinInfo, contractID); +// LOGGER.info("requestID=" + requestID + " join Count: " + count); +// +// String[] members = multiMeta.getMembers(); +// String[] nodes = getAccordingToRouteInfo(routeInfo, req, members); +// if (nodes.length < count) { +// count = nodes.length; +// } +// collector = +// createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器 +// MasterServerTCPAction.sync.sleep(id, collector); +// LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq); +// sendRequest(id, req, nodes); // 发送请求 +// } else { +// LOGGER.info("invalidNodeNumOnResult"); +// request_index.getAndDecrement(); +// ContractResult finalResult = +// new ContractResult( +// ContractResult.Status.Error, +// new JsonPrimitive("node number unavailbale,request refused.")); +// rc.onResult(JsonUtil.toJson(finalResult)); +// } +// } +// +// private int getJoinCount(JoinInfo joinInfo, String contractID) { +// if (joinInfo == null) return resultCount; +// if (joinInfo.joinCount.isJsonPrimitive() && joinInfo.joinCount.getAsJsonPrimitive().isNumber()) { +// return joinInfo.joinCount.getAsJsonPrimitive().getAsInt(); +// } +// try { +// ContractRequest cr = new ContractRequest(); +// cr.setContractID(contractID); +// cr.setAction(joinInfo.joinCount.getAsString()); +// //TODO Arg需要好好设计一下。 +// //TODO 又好用又简单的那种设计 +// //TODO +// cr.setArg(""); +// String result = CMActions.manager.executeLocally(cr, null); +// return JsonUtil.parseString(result).getAsJsonObject().get("result").getAsInt(); +// } catch (Exception e) { +// e.printStackTrace(); +// return 1; +// } +// } +// +// // 清理缓存的多点合约请求序号 +// public void clearCache() { +// final long time = System.currentTimeMillis() - 30000L; +// seqMap.entrySet() +// .removeIf( +// entry -> { +// MultiReqSeq cache = entry.getValue(); +// if (null == cache) { +// return true; +// } +// return cache.startTime < time; +// }); +// } +// +// public static class ResultMerger extends ResultCallback { +// ComponedContractResult componedContractResult; +// AtomicInteger order; +// String contractID; +// int count; // 记录有多少个节点 +// int request_seq; +// ResultCallback originalCallback; +// Set nodeIDs = new HashSet<>(); // 已收到返回结果的节点 +// JoinInfo joinInfo; +// +// ResultMerger( +// final ResultCallback originalCb, +// final int count, +// final int request_seq, +// final String contractID, +// final JoinInfo joinInfo) { +// originalCallback = originalCb; +// this.count = count; +// this.request_seq = request_seq; +// this.contractID = contractID; +// componedContractResult = new ComponedContractResult(count); +// order = new AtomicInteger(0); +// this.joinInfo = joinInfo; +// } +// +// public String getInfo() { +// return "contractID=" +// + contractID +// + " 收到第 " +// + order +// + " 个节点回复 : " +// + " order=" +// + order +// + " count=" +// + count +// + " "; +// } +// +// @Override +// public void onResult(String str) { +// // TODO 必须在这里聚合。 +// // str的data是个ContractResult +// // 在这儿也是返回个ContractResult +// try { +// LOGGER.info(str); +// JsonObject obj = JsonParser.parseString(str).getAsJsonObject(); +// String id = obj.get("nodeID").getAsString(); +// if (nodeIDs.contains(id)) { +// LOGGER.info("已经收到节点 " + id.substring(0, 5) + " 的结果,该结果被忽略"); +// return; +// } +// nodeIDs.add(id); +// LOGGER.info( +// "contractID=" +// + contractID +// + " 收到第 " +// + order +// + " 个节点回复 : " +// + str +// + " order=" +// + order +// + " count=" +// + count); +// componedContractResult.add(obj); +// // 收集到所有结果 +// if (order.incrementAndGet() == count) { +// ContractResult finalResult = componedContractResult.mergeFinalResult(); +// +// finalResult.needSeq = true; +// finalResult.seq = request_seq; +// +// // if (null == finalResult) { +// // finalResult = +// // new ContractResult( +// // ContractResult.Status.Exception, +// // new JsonPrimitive( +// // "no nore than half of the +// // consistent result")); +// // originalCallback.onResult(new +// // Gson().toJson(finalResult)); +// // } else { +// if (joinInfo != null) { +// handleJoinInfo(finalResult, joinInfo); +// } +// originalCallback.onResult(JsonUtil.toJson(finalResult)); +// // } +// LOGGER.info( +// "本次执行最终结果为 " + finalResult.size + "个节点合并的,结果为 " + finalResult.result); +// +// // 集群中事务序号+1 +// // MasterServerTCPAction.contractID2Members.get(contractID).nextSeq(); +// CMActions.manager +// .multiContractRecorder +// .getMultiContractMeta(contractID) +// .nextSeqAtMaster(); +// // recover,其中无状态合约CP出错无需恢复 +// Set nodesID = componedContractResult.getProblemNodes(); +// if (null == nodesID || nodesID.isEmpty()) { +// return; +// } +// for (String nodeID : nodesID) { +// LOGGER.info("结果出现问题的节点有:" + nodeID); +// if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID) +// == RecoverFlag.Fine) { +// MasterServerRecoverMechAction.recoverStatus +// .get(nodeID) +// .put(contractID, RecoverFlag.ToRecover); +// } +// } +// for (String nodeID : nodesID) { +// if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID) +// == RecoverFlag.ToRecover) { +// LOGGER.info("问题节点开始恢复:" + nodeID); +// +// // 因为该节点结果有误,所以即时是stableMode也认为trans记录不可信 +// // 直接通过load别的节点来恢复 +// MasterServerRecoverMechAction.restartContractFromCommonMode( +// nodeID, contractID); +// } +// } +// } +// // clearCache(); +// } catch (Exception e) { +// e.printStackTrace(); +// LOGGER.info("本次执行最终结果为有异常"); +// } +// } +// +// private void handleJoinInfo(ContractResult finalResult, JoinInfo joinInfo) { +// JsonObject jo = finalResult.result.getAsJsonObject(); +// if (joinInfo != null && joinInfo.joinRule != null) { +// //TODO 不应该是double 类型 +// switch (joinInfo.joinRule) { +// case "add": +// double val = 0; +// for (String key : jo.keySet()) { +// val += jo.get(key).getAsDouble(); +// } +// finalResult.result = new JsonPrimitive(val); +// break; +// case "multiply": +// val = 1; +// for (String key : jo.keySet()) { +// val *= jo.get(key).getAsDouble(); +// } +// finalResult.result = new JsonPrimitive(val); +// break; +// } +// } +// } +// } +//} diff --git a/src/main/java/org/bdware/server/executor/unconsistency/RequestOnceExecutor.java b/src/main/java/org/bdware/server/executor/unconsistency/RequestOnceExecutor.java index f8a7f95..f1d3470 100644 --- a/src/main/java/org/bdware/server/executor/unconsistency/RequestOnceExecutor.java +++ b/src/main/java/org/bdware/server/executor/unconsistency/RequestOnceExecutor.java @@ -1,68 +1,68 @@ -package org.bdware.server.executor.unconsistency; - -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.bdware.sc.bean.ContractRequest; -import org.bdware.sc.conn.OnHashCallback; -import org.bdware.sc.conn.ResultCallback; -import org.bdware.sc.util.JsonUtil; -import org.bdware.server.ControllerManager; -import org.bdware.server.action.CMActions; -import org.bdware.server.action.p2p.MasterServerTCPAction; -import org.bdware.server.trustedmodel.ContractExecutor; -import org.bdware.server.trustedmodel.AgentManager; -import org.bdware.units.NetworkManager; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -public class RequestOnceExecutor implements ContractExecutor { - private static final Logger LOGGER = LogManager.getLogger(RequestOnceExecutor.class); - String contractID; - AtomicInteger order = new AtomicInteger(0); - - public RequestOnceExecutor(String contractID) { - this.contractID = contractID; - } - - @Override - public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { - ResultCallback cb = - new ResultCallback() { - @Override - public void onResult(String str) { - LOGGER.debug(str); - JsonObject jo = JsonParser.parseString(str).getAsJsonObject(); - JsonObject result = - JsonParser.parseString(jo.get("data").getAsString()) - .getAsJsonObject(); - for (String key : result.keySet()) jo.add(key, result.get(key)); - jo.remove("action"); - jo.addProperty("action", "onExecuteResult"); - LOGGER.debug(jo.toString()); - rc.onResult(jo.toString()); - } - }; - MasterServerTCPAction.sync.sleep(requestID, cb); - String[] members = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers(); - for (int i = 0; i < members.length; i++) { - LOGGER.info("[members]:" + members.length); - int size = members.length; - String nodeID = members[order.incrementAndGet() % size]; - //ADD Connect - Map obj = new HashMap<>(); - obj.put("action", "executeContractLocally"); - obj.put("requestID",requestID); - obj.put("data", req); - obj.put("uniReqID", requestID); - NetworkManager.instance.sendToAgent(nodeID,JsonUtil.toJson(obj)); - return; - } - rc.onResult( - "{\"status\":\"Error\",\"result\":\"all nodes " - + " offline\",\"action\":\"onExecuteContract\"}"); - } -} +//package org.bdware.server.executor.unconsistency; +// +//import com.google.gson.JsonObject; +//import com.google.gson.JsonParser; +//import org.apache.logging.log4j.LogManager; +//import org.apache.logging.log4j.Logger; +//import org.bdware.sc.bean.ContractRequest; +//import org.bdware.sc.conn.OnHashCallback; +//import org.bdware.sc.conn.ResultCallback; +//import org.bdware.sc.util.JsonUtil; +//import org.bdware.server.ControllerManager; +//import org.bdware.server.action.CMActions; +//import org.bdware.server.action.p2p.MasterServerTCPAction; +//import org.bdware.server.trustedmodel.ContractExecutor; +//import org.bdware.server.trustedmodel.AgentManager; +//import org.bdware.units.NetworkManager; +// +//import java.util.HashMap; +//import java.util.Map; +//import java.util.concurrent.atomic.AtomicInteger; +// +//public class RequestOnceExecutor implements ContractExecutor { +// private static final Logger LOGGER = LogManager.getLogger(RequestOnceExecutor.class); +// String contractID; +// AtomicInteger order = new AtomicInteger(0); +// +// public RequestOnceExecutor(String contractID) { +// this.contractID = contractID; +// } +// +// @Override +// public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { +// ResultCallback cb = +// new ResultCallback() { +// @Override +// public void onResult(String str) { +// LOGGER.debug(str); +// JsonObject jo = JsonParser.parseString(str).getAsJsonObject(); +// JsonObject result = +// JsonParser.parseString(jo.get("data").getAsString()) +// .getAsJsonObject(); +// for (String key : result.keySet()) jo.add(key, result.get(key)); +// jo.remove("action"); +// jo.addProperty("action", "onExecuteResult"); +// LOGGER.debug(jo.toString()); +// rc.onResult(jo.toString()); +// } +// }; +// MasterServerTCPAction.sync.sleep(requestID, cb); +// String[] members = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers(); +// for (int i = 0; i < members.length; i++) { +// LOGGER.info("[members]:" + members.length); +// int size = members.length; +// String nodeID = members[order.incrementAndGet() % size]; +// //ADD Connect +// Map obj = new HashMap<>(); +// obj.put("action", "executeContractLocally"); +// obj.put("requestID",requestID); +// obj.put("data", req); +// obj.put("uniReqID", requestID); +// NetworkManager.instance.sendToAgent(nodeID,JsonUtil.toJson(obj)); +// return; +// } +// rc.onResult( +// "{\"status\":\"Error\",\"result\":\"all nodes " +// + " offline\",\"action\":\"onExecuteContract\"}"); +// } +//} diff --git a/src/main/java/org/bdware/server/executor/unconsistency/ResponseOnceExecutor.java b/src/main/java/org/bdware/server/executor/unconsistency/ResponseOnceExecutor.java index 485e708..6921560 100644 --- a/src/main/java/org/bdware/server/executor/unconsistency/ResponseOnceExecutor.java +++ b/src/main/java/org/bdware/server/executor/unconsistency/ResponseOnceExecutor.java @@ -1,83 +1,83 @@ -package org.bdware.server.executor.unconsistency; - -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.bdware.sc.ContractResult; -import org.bdware.sc.bean.ContractRequest; -import org.bdware.sc.conn.OnHashCallback; -import org.bdware.sc.conn.ResultCallback; -import org.bdware.sc.util.JsonUtil; -import org.bdware.server.action.CMActions; -import org.bdware.server.action.p2p.MasterServerTCPAction; -import org.bdware.server.trustedmodel.ContractExecutor; -import org.bdware.units.NetworkManager; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -public class ResponseOnceExecutor implements ContractExecutor { - private static final Logger LOGGER = LogManager.getLogger(ResponseOnceExecutor.class); - private final String contractID; - AtomicInteger order = new AtomicInteger(0); - - public ResponseOnceExecutor(String contractID) { - this.contractID = contractID; - } - - @Override - public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { - executeInternal(requestID, rc, req, 2); - } - - private void executeInternal( - String requestID, ResultCallback rc, ContractRequest req, int count) { - // String contractID = req.getContractID(); - // TODO 标注失效节点,是否选择重新迁移?? - ResultCallback cb = - new ResultCallback() { - @Override - public void onResult(String str) { - LOGGER.debug(str); - JsonObject jo = JsonParser.parseString(str).getAsJsonObject(); - jo.remove("action"); - jo.addProperty("action", "onExecuteResult"); - LOGGER.debug(jo.toString()); - if (jo.has("data")) { - String data = jo.get("data").getAsString(); - ContractResult cr = JsonUtil.fromJson(data, ContractResult.class); - if (cr.status != ContractResult.Status.Success && count > 0) { - executeInternal(requestID, rc, req, count - 1); - } else rc.onResult(jo.toString()); - } else { - JsonObject jo2 = new JsonObject(); - jo2.addProperty("action", "onExecuteResult"); - jo.remove("action"); - jo2.addProperty("data", jo.toString()); - rc.onResult(jo2.toString()); - } - } - }; - MasterServerTCPAction.sync.sleepWithTimeout(requestID, cb, 5); - if (!sendOnce(requestID, req)) - rc.onResult( - "{\"status\":\"Error\",\"data\":\"{\\\"status\\\":\\\"Error\\\",\\\"result\\\":\\\"all nodes offline\\\"}\",\"action\":\"onExecuteContract\"}"); - } - - private boolean sendOnce(String requestID, ContractRequest req) { - String[] members = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers(); - for (int i = 0; i < members.length; i++) { - int size = members.length; - String nodeID = members[order.incrementAndGet() % size]; - Map obj = new HashMap<>(); - obj.put("action", "executeContractLocally"); - obj.put("data", req); - obj.put("uniReqID", requestID); - NetworkManager.instance.sendToAgent(nodeID, JsonUtil.toJson(obj)); - return true; - } - return false; - } -} +//package org.bdware.server.executor.unconsistency; +// +//import com.google.gson.JsonObject; +//import com.google.gson.JsonParser; +//import org.apache.logging.log4j.LogManager; +//import org.apache.logging.log4j.Logger; +//import org.bdware.sc.ContractResult; +//import org.bdware.sc.bean.ContractRequest; +//import org.bdware.sc.conn.OnHashCallback; +//import org.bdware.sc.conn.ResultCallback; +//import org.bdware.sc.util.JsonUtil; +//import org.bdware.server.action.CMActions; +//import org.bdware.server.action.p2p.MasterServerTCPAction; +//import org.bdware.server.trustedmodel.ContractExecutor; +//import org.bdware.units.NetworkManager; +// +//import java.util.HashMap; +//import java.util.Map; +//import java.util.concurrent.atomic.AtomicInteger; +// +//public class ResponseOnceExecutor implements ContractExecutor { +// private static final Logger LOGGER = LogManager.getLogger(ResponseOnceExecutor.class); +// private final String contractID; +// AtomicInteger order = new AtomicInteger(0); +// +// public ResponseOnceExecutor(String contractID) { +// this.contractID = contractID; +// } +// +// @Override +// public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { +// executeInternal(requestID, rc, req, 2); +// } +// +// private void executeInternal( +// String requestID, ResultCallback rc, ContractRequest req, int count) { +// // String contractID = req.getContractID(); +// // TODO 标注失效节点,是否选择重新迁移?? +// ResultCallback cb = +// new ResultCallback() { +// @Override +// public void onResult(String str) { +// LOGGER.debug(str); +// JsonObject jo = JsonParser.parseString(str).getAsJsonObject(); +// jo.remove("action"); +// jo.addProperty("action", "onExecuteResult"); +// LOGGER.debug(jo.toString()); +// if (jo.has("data")) { +// String data = jo.get("data").getAsString(); +// ContractResult cr = JsonUtil.fromJson(data, ContractResult.class); +// if (cr.status != ContractResult.Status.Success && count > 0) { +// executeInternal(requestID, rc, req, count - 1); +// } else rc.onResult(jo.toString()); +// } else { +// JsonObject jo2 = new JsonObject(); +// jo2.addProperty("action", "onExecuteResult"); +// jo.remove("action"); +// jo2.addProperty("data", jo.toString()); +// rc.onResult(jo2.toString()); +// } +// } +// }; +// MasterServerTCPAction.sync.sleepWithTimeout(requestID, cb, 5); +// if (!sendOnce(requestID, req)) +// rc.onResult( +// "{\"status\":\"Error\",\"data\":\"{\\\"status\\\":\\\"Error\\\",\\\"result\\\":\\\"all nodes offline\\\"}\",\"action\":\"onExecuteContract\"}"); +// } +// +// private boolean sendOnce(String requestID, ContractRequest req) { +// String[] members = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers(); +// for (int i = 0; i < members.length; i++) { +// int size = members.length; +// String nodeID = members[order.incrementAndGet() % size]; +// Map obj = new HashMap<>(); +// obj.put("action", "executeContractLocally"); +// obj.put("data", req); +// obj.put("uniReqID", requestID); +// NetworkManager.instance.sendToAgent(nodeID, JsonUtil.toJson(obj)); +// return true; +// } +// return false; +// } +//} diff --git a/src/main/java/org/bdware/server/trustedmodel/MultiReqSeq.java b/src/main/java/org/bdware/server/trustedmodel/MultiReqSeq.java deleted file mode 100644 index 9a08e59..0000000 --- a/src/main/java/org/bdware/server/trustedmodel/MultiReqSeq.java +++ /dev/null @@ -1,11 +0,0 @@ -package org.bdware.server.trustedmodel; - -public class MultiReqSeq { - public final int seq; - public final long startTime; - - public MultiReqSeq(int s){ - seq = s; - startTime = System.currentTimeMillis(); - } -} diff --git a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java index c3770fc..ceb86ac 100644 --- a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java +++ b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java @@ -1,305 +1,305 @@ -package org.bdware.server.trustedmodel; - -import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.bdware.sc.ContractClient; -import org.bdware.sc.ContractManager; -import org.bdware.sc.ContractResult; -import org.bdware.sc.bean.ContractRequest; -import org.bdware.sc.bean.FunctionDesp; -import org.bdware.sc.bean.SM2Verifiable; -import org.bdware.sc.conn.OnHashCallback; -import org.bdware.sc.conn.ResultCallback; -import org.bdware.sc.units.MultiContractMeta; -import org.bdware.sc.units.RecoverFlag; -import org.bdware.sc.util.HashUtil; -import org.bdware.sc.util.JsonUtil; -import org.bdware.server.action.CMActions; -import org.bdware.server.action.p2p.MasterServerRecoverMechAction; -import org.bdware.units.NetworkManager; - -import java.util.*; -import java.util.concurrent.*; -import java.util.stream.Collectors; - -/** - * @author Kaidong Wu - */ -public class SelfAdaptiveShardingExecutor implements ContractExecutor { - private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class); - private static final int SUBMIT_LIMIT = 1024; - private final Queue reqQueue = new ConcurrentLinkedQueue<>(); - private final MultiContractMeta meta; - private final Map toExecuted = new ConcurrentHashMap<>(); - private final Set executedBlocks = ConcurrentHashMap.newKeySet(); - private final Map executedTxs = new ConcurrentHashMap<>(); - private final Object flag = new Object(); - private final ScheduledFuture future; - private boolean running = true; - private Block b = new Block(); - - public SelfAdaptiveShardingExecutor(String contractID) { - this.meta = - CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); - this.future = ContractManager.scheduledThreadPool.scheduleWithFixedDelay( - this::submitBlock, - 2, - 2, - TimeUnit.SECONDS); - LOGGER.debug(String.format("ContractManager.threadPool=%d/%d", - ((ThreadPoolExecutor) ContractManager.threadPool).getActiveCount(), - ((ThreadPoolExecutor) ContractManager.threadPool).getPoolSize())); - ContractManager.threadPool.execute(() -> { - LOGGER.info( - "[SelfAdaptiveShardingExecutor " + meta.getContractID() + "] starting service..." + running); - while (running) { - LOGGER.info("checking blocks to be executed, latest block=" + - this.b.prevHash + ", to be executed size=" + toExecuted.size()); - LOGGER.debug("executed: " + JsonUtil.toJson(executedBlocks) + "\n\t" + JsonUtil.toJson(executedTxs)); - while (!toExecuted.isEmpty()) { - String key = this.b.prevHash; - Block block = toExecuted.get(key); - if (null != block) { - executeBlock(block); - } - toExecuted.remove(key); - } - synchronized (flag) { - try { - flag.wait(); - } catch (InterruptedException e) { - LOGGER.warn(String.format( - "[SelfAdaptiveShardingExecutor %s] waiting is interrupted: %s", - meta.getContractID(), - e.getMessage())); - } - } - } - }); - } - - @Override - public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) { - // check client - ContractClient client = CMActions.manager.getClient(meta.getContractID()); - if (null == client) { - LOGGER.error("contract " + meta.getContractID() + " not found!"); - rcb.onResult(JsonUtil.toJson(new ContractResult( - ContractResult.Status.Error, - new JsonPrimitive("contract " + meta.getContractID() + " not found!")))); - return; - } - // check function - FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction()); - if (null == funDesp) { - LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!"); - rcb.onResult(JsonUtil.toJson(new ContractResult( - ContractResult.Status.Error, - new JsonPrimitive( - String.format("action %s of contract %s not found!", - req.getAction(), - meta.getContractID()))))); - return; - } - // for view function, execute it - if (funDesp.isView) { - CMActions.manager.executeLocallyAsync(req, rcb, hcb); - return; - } - // normal function, check if it is in blocks - if (executedTxs.containsKey(requestID)) { - rcb.onResult(JsonUtil.toJson(new ContractResult( - ContractResult.Status.Error, - new JsonPrimitive("this request has been packed!")))); - return; - } - // add blocks into request cache - LOGGER.debug("receive contract request " + requestID); - executedTxs.put(requestID, false); - reqQueue.add(req); - rcb.onResult(JsonUtil.toJson(new ContractResult( - ContractResult.Status.Executing, - new JsonPrimitive("this request is adding into blocks")))); - // if cache is full, submit - if (reqQueue.size() >= SUBMIT_LIMIT) { - ContractManager.threadPool.execute(this::submitBlock); - } - } - - @Override - public void close() { - // stop threads - this.future.cancel(true); - this.running = false; - LOGGER.info("destruct executor of contract " + meta.getContractID()); - } - - public void execute(String blockStr) { - Block block = JsonUtil.fromJson(blockStr, Block.class); - // the block must have not been cached or executed, and must be valid - if (!toExecuted.containsKey(block.prevHash) && - !executedBlocks.contains(block.hash) && - block.isValid()) { - // add block into block cache - LOGGER.info(String.format( - "[SelfAdaptiveShardingExecutor %s] receive block %s -> %s," + - " %d transactions, timestamp=%d, size=%d", - meta.getContractID(), - block.hash, - block.prevHash, - block.requests.length, - block.timestamp, - blockStr.length())); - toExecuted.put(block.prevHash, block); - // notify thread to execute blocks - synchronized (flag) { - flag.notify(); - } - } - } - - private synchronized void executeBlock(Block block) { - // used for the thread to execute blocks - LOGGER.debug("start"); - // check contract requests, requests must have not been executed - for (ContractRequest request : block.requests) { - if (executedTxs.containsKey(request.getRequestID()) && executedTxs.get(request.getRequestID())) { - LOGGER.debug("find request " + request.getRequestID() + " has been executed!"); - return; - } - } - // TODO check status - // executed requests - for (ContractRequest request : block.requests) { - String ret = CMActions.manager.executeLocally(request, null); - LOGGER.debug(String.format( - "[SelfAdaptiveShardingExecutor %s] result of request %s: %s", - meta.getContractID(), - request.getRequestID(), - ret)); - executedTxs.put(request.getRequestID(), true); - } - LOGGER.info(String.format( - "[SelfAdaptiveShardingExecutor %s] execute %d transactions of block %s", - meta.getContractID(), - block.requests.length, - block.hash)); - // TODO create check point - this.b = new Block(block.hash, this.b.height + 1); - executedBlocks.add(block.hash); - } - - private void submitBlock() { - Block block = fillBlock(); - if (null != block) { - LOGGER.info("deliver block " + block.hash + "..."); - LOGGER.debug(JsonUtil.toPrettyJson(block)); - String[] nodes = this.meta.getMembers(); - JsonObject req = new JsonObject(); - req.addProperty("action", "deliverBlock"); - req.addProperty("data", JsonUtil.toJson(block)); - req.addProperty("contractID", this.meta.getContractID()); - String reqStr = req.toString(); - // deliver blocks - for (String node : nodes) { - if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) - == RecoverFlag.Fine) { - NetworkManager.instance.sendToAgent(node, reqStr); - } - } - } - } - - private synchronized Block fillBlock() { - // pack contract requests into a block - ContractRequest[] requests = new ContractRequest[Math.min(reqQueue.size(), SUBMIT_LIMIT)]; - if (requests.length == 0) { - return null; - } - for (int i = 0; i < requests.length; ++i) { - requests[i] = reqQueue.poll(); - } - this.b.fillBlock(requests); - return this.b; - } - - static class Block extends SM2Verifiable { - String prevHash = "0"; - String hash; - int height; - String checkPoint; - String body; - String nodePubKey; - ContractRequest[] requests; - long timestamp; - - public Block() { - this.height = 0; - } - - public Block(String prev, int height) { - this.prevHash = prev; - this.height = height; - } - - public void fillBlock(ContractRequest[] requests) { - this.requests = requests; - this.timestamp = System.currentTimeMillis(); - this.body = merkle(requests); - this.hash = computeHash(); - doSignature(CMActions.manager.nodeCenterConn.getNodeKeyPair()); - } - - public boolean isValid() { - return computeHash().equals(hash) && body.equals(merkle(this.requests)) && verifySignature(); - } - - private String computeHash() { - return HashUtil.sha3( - String.valueOf(this.height), - this.prevHash, - this.checkPoint, - this.body); - } - - private String merkle(ContractRequest[] requests) { - // manage requests as a merkle tree - if (requests.length == 0) { - return null; - } - if (requests.length == 1) { - return HashUtil.sha3(requests[0].getRequestID()); - } - Queue reqQueue = - Arrays.stream(requests).map(ContractRequest::getRequestID) - .collect(Collectors.toCollection(ArrayDeque::new)); - do { - int size; - for (size = reqQueue.size(); size > 1; size -= 2) { - reqQueue.add(HashUtil.sha3(reqQueue.poll(), reqQueue.poll())); - } - if (size == 1) { - reqQueue.add(reqQueue.poll()); - } - } while (1 != reqQueue.size()); - return reqQueue.poll(); - } - - @Override - public String getPublicKey() { - return nodePubKey; - } - - @Override - public void setPublicKey(String pubkey) { - this.nodePubKey = pubkey; - } - - @Override - public String getContentStr() { - return this.hash; - } - } -} +//package org.bdware.server.trustedmodel; +// +//import com.google.gson.JsonObject; +//import com.google.gson.JsonPrimitive; +//import org.apache.logging.log4j.LogManager; +//import org.apache.logging.log4j.Logger; +//import org.bdware.sc.ContractClient; +//import org.bdware.sc.ContractManager; +//import org.bdware.sc.ContractResult; +//import org.bdware.sc.bean.ContractRequest; +//import org.bdware.sc.bean.FunctionDesp; +//import org.bdware.sc.bean.SM2Verifiable; +//import org.bdware.sc.conn.OnHashCallback; +//import org.bdware.sc.conn.ResultCallback; +//import org.bdware.sc.units.MultiContractMeta; +//import org.bdware.sc.units.RecoverFlag; +//import org.bdware.sc.util.HashUtil; +//import org.bdware.sc.util.JsonUtil; +//import org.bdware.server.action.CMActions; +//import org.bdware.server.action.p2p.MasterServerRecoverMechAction; +//import org.bdware.units.NetworkManager; +// +//import java.util.*; +//import java.util.concurrent.*; +//import java.util.stream.Collectors; +// +///** +// * @author Kaidong Wu +// */ +//public class SelfAdaptiveShardingExecutor implements ContractExecutor { +// private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class); +// private static final int SUBMIT_LIMIT = 1024; +// private final Queue reqQueue = new ConcurrentLinkedQueue<>(); +// private final MultiContractMeta meta; +// private final Map toExecuted = new ConcurrentHashMap<>(); +// private final Set executedBlocks = ConcurrentHashMap.newKeySet(); +// private final Map executedTxs = new ConcurrentHashMap<>(); +// private final Object flag = new Object(); +// private final ScheduledFuture future; +// private boolean running = true; +// private Block b = new Block(); +// +// public SelfAdaptiveShardingExecutor(String contractID) { +// this.meta = +// CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); +// this.future = ContractManager.scheduledThreadPool.scheduleWithFixedDelay( +// this::submitBlock, +// 2, +// 2, +// TimeUnit.SECONDS); +// LOGGER.debug(String.format("ContractManager.threadPool=%d/%d", +// ((ThreadPoolExecutor) ContractManager.threadPool).getActiveCount(), +// ((ThreadPoolExecutor) ContractManager.threadPool).getPoolSize())); +// ContractManager.threadPool.execute(() -> { +// LOGGER.info( +// "[SelfAdaptiveShardingExecutor " + meta.getContractID() + "] starting service..." + running); +// while (running) { +// LOGGER.info("checking blocks to be executed, latest block=" + +// this.b.prevHash + ", to be executed size=" + toExecuted.size()); +// LOGGER.debug("executed: " + JsonUtil.toJson(executedBlocks) + "\n\t" + JsonUtil.toJson(executedTxs)); +// while (!toExecuted.isEmpty()) { +// String key = this.b.prevHash; +// Block block = toExecuted.get(key); +// if (null != block) { +// executeBlock(block); +// } +// toExecuted.remove(key); +// } +// synchronized (flag) { +// try { +// flag.wait(); +// } catch (InterruptedException e) { +// LOGGER.warn(String.format( +// "[SelfAdaptiveShardingExecutor %s] waiting is interrupted: %s", +// meta.getContractID(), +// e.getMessage())); +// } +// } +// } +// }); +// } +// +// @Override +// public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) { +// // check client +// ContractClient client = CMActions.manager.getClient(meta.getContractID()); +// if (null == client) { +// LOGGER.error("contract " + meta.getContractID() + " not found!"); +// rcb.onResult(JsonUtil.toJson(new ContractResult( +// ContractResult.Status.Error, +// new JsonPrimitive("contract " + meta.getContractID() + " not found!")))); +// return; +// } +// // check function +// FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction()); +// if (null == funDesp) { +// LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!"); +// rcb.onResult(JsonUtil.toJson(new ContractResult( +// ContractResult.Status.Error, +// new JsonPrimitive( +// String.format("action %s of contract %s not found!", +// req.getAction(), +// meta.getContractID()))))); +// return; +// } +// // for view function, execute it +// if (funDesp.isView) { +// CMActions.manager.executeLocallyAsync(req, rcb, hcb); +// return; +// } +// // normal function, check if it is in blocks +// if (executedTxs.containsKey(requestID)) { +// rcb.onResult(JsonUtil.toJson(new ContractResult( +// ContractResult.Status.Error, +// new JsonPrimitive("this request has been packed!")))); +// return; +// } +// // add blocks into request cache +// LOGGER.debug("receive contract request " + requestID); +// executedTxs.put(requestID, false); +// reqQueue.add(req); +// rcb.onResult(JsonUtil.toJson(new ContractResult( +// ContractResult.Status.Executing, +// new JsonPrimitive("this request is adding into blocks")))); +// // if cache is full, submit +// if (reqQueue.size() >= SUBMIT_LIMIT) { +// ContractManager.threadPool.execute(this::submitBlock); +// } +// } +// +// @Override +// public void close() { +// // stop threads +// this.future.cancel(true); +// this.running = false; +// LOGGER.info("destruct executor of contract " + meta.getContractID()); +// } +// +// public void execute(String blockStr) { +// Block block = JsonUtil.fromJson(blockStr, Block.class); +// // the block must have not been cached or executed, and must be valid +// if (!toExecuted.containsKey(block.prevHash) && +// !executedBlocks.contains(block.hash) && +// block.isValid()) { +// // add block into block cache +// LOGGER.info(String.format( +// "[SelfAdaptiveShardingExecutor %s] receive block %s -> %s," + +// " %d transactions, timestamp=%d, size=%d", +// meta.getContractID(), +// block.hash, +// block.prevHash, +// block.requests.length, +// block.timestamp, +// blockStr.length())); +// toExecuted.put(block.prevHash, block); +// // notify thread to execute blocks +// synchronized (flag) { +// flag.notify(); +// } +// } +// } +// +// private synchronized void executeBlock(Block block) { +// // used for the thread to execute blocks +// LOGGER.debug("start"); +// // check contract requests, requests must have not been executed +// for (ContractRequest request : block.requests) { +// if (executedTxs.containsKey(request.getRequestID()) && executedTxs.get(request.getRequestID())) { +// LOGGER.debug("find request " + request.getRequestID() + " has been executed!"); +// return; +// } +// } +// // TODO check status +// // executed requests +// for (ContractRequest request : block.requests) { +// String ret = CMActions.manager.executeLocally(request, null); +// LOGGER.debug(String.format( +// "[SelfAdaptiveShardingExecutor %s] result of request %s: %s", +// meta.getContractID(), +// request.getRequestID(), +// ret)); +// executedTxs.put(request.getRequestID(), true); +// } +// LOGGER.info(String.format( +// "[SelfAdaptiveShardingExecutor %s] execute %d transactions of block %s", +// meta.getContractID(), +// block.requests.length, +// block.hash)); +// // TODO create check point +// this.b = new Block(block.hash, this.b.height + 1); +// executedBlocks.add(block.hash); +// } +// +// private void submitBlock() { +// Block block = fillBlock(); +// if (null != block) { +// LOGGER.info("deliver block " + block.hash + "..."); +// LOGGER.debug(JsonUtil.toPrettyJson(block)); +// String[] nodes = this.meta.getMembers(); +// JsonObject req = new JsonObject(); +// req.addProperty("action", "deliverBlock"); +// req.addProperty("data", JsonUtil.toJson(block)); +// req.addProperty("contractID", this.meta.getContractID()); +// String reqStr = req.toString(); +// // deliver blocks +// for (String node : nodes) { +// if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) +// == RecoverFlag.Fine) { +// NetworkManager.instance.sendToAgent(node, reqStr); +// } +// } +// } +// } +// +// private synchronized Block fillBlock() { +// // pack contract requests into a block +// ContractRequest[] requests = new ContractRequest[Math.min(reqQueue.size(), SUBMIT_LIMIT)]; +// if (requests.length == 0) { +// return null; +// } +// for (int i = 0; i < requests.length; ++i) { +// requests[i] = reqQueue.poll(); +// } +// this.b.fillBlock(requests); +// return this.b; +// } +// +// static class Block extends SM2Verifiable { +// String prevHash = "0"; +// String hash; +// int height; +// String checkPoint; +// String body; +// String nodePubKey; +// ContractRequest[] requests; +// long timestamp; +// +// public Block() { +// this.height = 0; +// } +// +// public Block(String prev, int height) { +// this.prevHash = prev; +// this.height = height; +// } +// +// public void fillBlock(ContractRequest[] requests) { +// this.requests = requests; +// this.timestamp = System.currentTimeMillis(); +// this.body = merkle(requests); +// this.hash = computeHash(); +// doSignature(CMActions.manager.nodeCenterConn.getNodeKeyPair()); +// } +// +// public boolean isValid() { +// return computeHash().equals(hash) && body.equals(merkle(this.requests)) && verifySignature(); +// } +// +// private String computeHash() { +// return HashUtil.sha3( +// String.valueOf(this.height), +// this.prevHash, +// this.checkPoint, +// this.body); +// } +// +// private String merkle(ContractRequest[] requests) { +// // manage requests as a merkle tree +// if (requests.length == 0) { +// return null; +// } +// if (requests.length == 1) { +// return HashUtil.sha3(requests[0].getRequestID()); +// } +// Queue reqQueue = +// Arrays.stream(requests).map(ContractRequest::getRequestID) +// .collect(Collectors.toCollection(ArrayDeque::new)); +// do { +// int size; +// for (size = reqQueue.size(); size > 1; size -= 2) { +// reqQueue.add(HashUtil.sha3(reqQueue.poll(), reqQueue.poll())); +// } +// if (size == 1) { +// reqQueue.add(reqQueue.poll()); +// } +// } while (1 != reqQueue.size()); +// return reqQueue.poll(); +// } +// +// @Override +// public String getPublicKey() { +// return nodePubKey; +// } +// +// @Override +// public void setPublicKey(String pubkey) { +// this.nodePubKey = pubkey; +// } +// +// @Override +// public String getContentStr() { +// return this.hash; +// } +// } +//} diff --git a/src/test/java/org/bdware/server/DOIPTest.java b/src/test/java/org/bdware/server/DOIPTest.java index 79ca8ee..e2b079a 100644 --- a/src/test/java/org/bdware/server/DOIPTest.java +++ b/src/test/java/org/bdware/server/DOIPTest.java @@ -1,63 +1,63 @@ -package org.bdware.server; - -import org.bdware.doip.core.doipMessage.DoipMessage; -import org.bdware.doip.core.doipMessage.DoipMessageFactory; -import org.bdware.doip.core.model.operations.BasicOperations; -import org.bdware.doip.endpoint.doipClient.DoipClientImpl; -import org.bdware.doip.endpoint.doipClient.DoipMessageCallback; -import org.junit.Before; -import org.junit.Test; - -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; - -public class DOIPTest { - public static String repoID = "86.5000.470/doip.localContractRepo"; - - - @Before - public void init() { - } - - @Test - public void registryTempOD() { - - } - - @Test - public void retrieve() { - try { - DoipClientImpl doipClient = new DoipClientImpl(); - doipClient.connect("tcp://127.0.0.1:18002"); - //86.5000.470/do.Yie0yPsjt4_bdw - List ret = new ArrayList<>(); - DoipMessage msg = (new DoipMessageFactory.DoipMessageBuilder()).createRequest("86.5000.470/Counter", BasicOperations.Retrieve.getName()) - .setBody("{\"operation\":\"count\",\"arg\":\"\"}".getBytes(StandardCharsets.UTF_8)).create(); - doipClient.sendMessage(msg, new DoipMessageCallback() { - @Override - public void onResult(DoipMessage doipMessage) { - ret.add(doipMessage); - synchronized (ret) { - ret.notify(); - } - } - }); - synchronized (ret) { - ret.wait(5000); - } - DoipMessage result = null; - if (ret.size() > 0) - result = ret.get(0); - System.out.println(result.body.getDataAsJsonString()); - // GlobalIrpClient.getGlobalClient().reRegister(doHandleRecord); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Test - public void call() { - - } -} +//package org.bdware.server; +// +//import org.bdware.doip.core.doipMessage.DoipMessage; +//import org.bdware.doip.core.doipMessage.DoipMessageFactory; +//import org.bdware.doip.core.model.operations.BasicOperations; +//import org.bdware.doip.endpoint.doipClient.DoipClientImpl; +//import org.bdware.doip.endpoint.doipClient.DoipMessageCallback; +//import org.junit.Before; +//import org.junit.Test; +// +//import java.nio.charset.StandardCharsets; +//import java.util.ArrayList; +//import java.util.List; +// +//public class DOIPTest { +// public static String repoID = "86.5000.470/doip.localContractRepo"; +// +// +// @Before +// public void init() { +// } +// +// @Test +// public void registryTempOD() { +// +// } +// +// @Test +// public void retrieve() { +// try { +// DoipClientImpl doipClient = new DoipClientImpl(); +// doipClient.connect("tcp://127.0.0.1:18002"); +// //86.5000.470/do.Yie0yPsjt4_bdw +// List ret = new ArrayList<>(); +// DoipMessage msg = (new DoipMessageFactory.DoipMessageBuilder()).createRequest("86.5000.470/Counter", BasicOperations.Retrieve.getName()) +// .setBody("{\"operation\":\"count\",\"arg\":\"\"}".getBytes(StandardCharsets.UTF_8)).create(); +// doipClient.sendMessage(msg, new DoipMessageCallback() { +// @Override +// public void onResult(DoipMessage doipMessage) { +// ret.add(doipMessage); +// synchronized (ret) { +// ret.notify(); +// } +// } +// }); +// synchronized (ret) { +// ret.wait(5000); +// } +// DoipMessage result = null; +// if (ret.size() > 0) +// result = ret.get(0); +// System.out.println(result.body.getDataAsJsonString()); +// // GlobalIrpClient.getGlobalClient().reRegister(doHandleRecord); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } +// +// @Test +// public void call() { +// +// } +//}