From 5f283513e23f5d41f794ed4189139c13149078d2 Mon Sep 17 00:00:00 2001 From: CaiHQ Date: Sun, 21 Nov 2021 23:20:44 +0800 Subject: [PATCH] merge dengshuang-feature refactor: NetworkManager --- .../org/bdware/server/ControllerManager.java | 28 ++ .../org/bdware/server/action/CMActions.java | 81 ++-- .../server/action/CheckPointCallback.java | 16 +- .../org/bdware/server/action/FileActions.java | 4 +- .../bdware/server/action/ManagerActions.java | 7 +- .../bdware/server/action/MasterWSAction.java | 78 ++-- .../action/p2p/AliveCheckClientAction.java | 164 +++++++++ .../action/p2p/AliveCheckServerAction.java | 110 ++++++ .../p2p/MasterClientRecoverMechAction.java | 59 ++- .../action/p2p/MasterClientTCPAction.java | 347 ++++-------------- .../p2p/MasterClientTransferAction.java | 54 ++- .../p2p/MasterServerRecoverMechAction.java | 103 ++---- .../action/p2p/MasterServerTCPAction.java | 201 ++-------- .../p2p/MasterServerTransferAction.java | 12 +- .../server/action/p2p/RecoveryAction.java | 8 - .../server/action/p2p/UnitsInfoAction.java | 4 +- ...tion.java => _UNUSED_ExecutionAction.java} | 6 +- .../server/executor/RequestAllExecutor.java | 23 +- .../MultiPointCooperationExecutor.java | 16 +- .../unconsistency/RequestOnceExecutor.java | 23 +- .../unconsistency/ResponseOnceExecutor.java | 17 +- .../_UNUSED_RouteEnabledExecutor.java | 17 +- .../client/NodeCenterClientController.java | 113 +++--- .../client/NodeCenterClientHandler.java | 7 +- .../server/tcp/TCPClientFrameHandler.java | 57 ++- .../server/tcp/TCPServerFrameHandler.java | 30 +- .../server/trustedmodel/AgentManager.java | 79 ++++ .../server/trustedmodel/MasterProxy.java | 200 ---------- .../SelfAdaptiveShardingExecutor.java | 19 +- .../bdware/server/trustedmodel/SlaveNode.java | 33 -- .../java/org/bdware/units/NetworkManager.java | 279 +++++++++----- .../units/function/CommunicationManager.java | 4 +- .../units/function/ExecutionManager.java | 8 +- 33 files changed, 984 insertions(+), 1223 deletions(-) create mode 100644 src/main/java/org/bdware/server/ControllerManager.java create mode 100644 src/main/java/org/bdware/server/action/p2p/AliveCheckClientAction.java create mode 100644 src/main/java/org/bdware/server/action/p2p/AliveCheckServerAction.java delete mode 100644 src/main/java/org/bdware/server/action/p2p/RecoveryAction.java rename src/main/java/org/bdware/server/action/p2p/{ExecutionAction.java => _UNUSED_ExecutionAction.java} (98%) create mode 100644 src/main/java/org/bdware/server/trustedmodel/AgentManager.java delete mode 100644 src/main/java/org/bdware/server/trustedmodel/MasterProxy.java diff --git a/src/main/java/org/bdware/server/ControllerManager.java b/src/main/java/org/bdware/server/ControllerManager.java new file mode 100644 index 0000000..9bc96e6 --- /dev/null +++ b/src/main/java/org/bdware/server/ControllerManager.java @@ -0,0 +1,28 @@ +package org.bdware.server; + +import org.bdware.server.action.CMActions; +import org.bdware.server.nodecenter.client.NodeCenterClientController; +import org.bdware.server.nodecenter.client.NodeCenterClientHandler; + +public class ControllerManager { + + private static NodeCenterClientController nodeCenterClientController; + private static NodeCenterClientHandler nodeCenterClientHandler; + + public static NodeCenterClientHandler createNodeCenterClientHandler() { + assert nodeCenterClientHandler == null; + nodeCenterClientHandler = new NodeCenterClientHandler(); + nodeCenterClientController = nodeCenterClientHandler.getController(); + CMActions.manager.nodeCenterConn = getNodeCenterController(); + return nodeCenterClientHandler; + } + + public static NodeCenterClientController getNodeCenterController() { + return nodeCenterClientController; + + } + + public static NodeCenterClientHandler getNodeCenterHandler() { + return nodeCenterClientHandler; + } +} diff --git a/src/main/java/org/bdware/server/action/CMActions.java b/src/main/java/org/bdware/server/action/CMActions.java index 66159bd..b5d8f66 100644 --- a/src/main/java/org/bdware/server/action/CMActions.java +++ b/src/main/java/org/bdware/server/action/CMActions.java @@ -11,11 +11,14 @@ import org.bdware.sc.conn.ByteUtil; import org.bdware.sc.conn.OnHashCallback; import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.node.AnnotationNode; +import org.bdware.sc.units.MultiContractMeta; import org.bdware.sc.util.JsonUtil; +import org.bdware.server.ControllerManager; import org.bdware.server.GRPCPool; import org.bdware.server.GlobalConf; import org.bdware.server.action.p2p.MasterClientTCPAction; -import org.bdware.server.trustedmodel.MasterProxy; +import org.bdware.server.trustedmodel.AgentManager; +import org.bdware.server.trustedmodel.KillUnitContractInfo; import org.bdware.server.ws.ContractManagerFrameHandler; import org.bdware.units.NetworkManager; import org.bdware.units.function.CommunicationManager; @@ -34,9 +37,9 @@ import java.util.*; public class CMActions implements OnHashCallback { private static final String PARAM_ACTION = "action"; - private static final JsonObject MISSING_ARGUMENT = JsonUtil.parseString("{\"action\":\"onExecuteResult\",\"executeTime\":-1," + private static final JsonObject MISSING_ARGUMENT = JsonUtil.parseStringAsJsonObject("{\"action\":\"onExecuteResult\",\"executeTime\":-1," + "\"status\":\"Error\",\"result\":\"missing arguments\"}"); - private static final JsonObject INVALID_DOI = JsonUtil.parseString( + private static final JsonObject INVALID_DOI = JsonUtil.parseStringAsJsonObject( "{\"action\":\"onExecuteResult\",\"executeTime\":-1," + "\"status\":\"Error\",\"result\":\"invalid contract doi\"}"); private static final Logger LOGGER = LogManager.getLogger(CMActions.class); @@ -58,7 +61,7 @@ public class CMActions implements OnHashCallback { ContractManager.yjsPath = GlobalConf.instance.yjsPath; ContractManager.dbPath = GlobalConf.getDBPath(); final ContractManager contractManager = new ContractManager(); - contractManager.masterStub = new MasterProxy(); + contractManager.masterStub = new AgentManager(); contractManager.chainOpener = GRPCPool.instance; GRPCPool.logsDB = ContractManager.logsDB; // expiredDate = Long.parseLong(GlobalConf.instance.licence); @@ -72,11 +75,11 @@ public class CMActions implements OnHashCallback { for (ContractMeta meta : contractManager.statusRecorder.getStatus().values()) { if (meta.getStatus() == ContractStatusEnum.RUNNING) { contractIDS.add(meta.getID()); + if (meta.contract.getType().needSeq()) + contractManager.setContractIsMaster(meta.getID(), "false"); } } - for (String id : contractIDS) { - contractManager.setContractIsMaster(id, "false"); - } + LOGGER.info( "reconnectDone! " + (System.currentTimeMillis() - start) @@ -776,7 +779,7 @@ public class CMActions implements OnHashCallback { LOGGER.debug("[CMActions]启动后设置dump周期" + args.get("dumpPeriod").getAsString()); manager.changeDumpPeriod(c.getID(), args.get("dumpPeriod").getAsString()); } - ExecutionManager.instance.updateLocalContractToNodeCenter(); + ControllerManager.getNodeCenterController().updateContract(); } @Action(userPermission = 1L << 26, async = true) @@ -1158,17 +1161,17 @@ public class CMActions implements OnHashCallback { public void connectTo(JsonObject args, ResultCallback resultCallback) { String data; if (!args.has("id")) { - ReplyUtil.simpleReply(resultCallback,"onConnectTo","missing contract id"); + ReplyUtil.simpleReply(resultCallback, "onConnectTo", "missing contract id"); return; } String contractID = args.get("id").getAsString(); LOGGER.info("connectTo:" + contractID); if (contractID == null) { - ReplyUtil.simpleReply(resultCallback,"onConnectTo","can't find contract id"); + ReplyUtil.simpleReply(resultCallback, "onConnectTo", "can't find contract id"); return; } manager.redirect(contractID, createPS(), ""); - ReplyUtil.simpleReply(resultCallback,"onConnectTo","success"); + ReplyUtil.simpleReply(resultCallback, "onConnectTo", "success"); } private PrintStream createPS() { @@ -1187,10 +1190,6 @@ public class CMActions implements OnHashCallback { if (args.has("verifiedPubKey") && (args.has("id") || args.has("name"))) { ContractRequest rc = new ContractRequest(); long s = System.currentTimeMillis(); - // if (args.has("id")) - // rc.setContractID(args.get("id").getAsString()); - // else - //System.out.println(args); if (args.has("id")) { // stop unit contract using contract name rc.setContractID(args.get("id").getAsString()); @@ -1220,10 +1219,10 @@ public class CMActions implements OnHashCallback { return; } - // TODO TOMERGE if (cl.isUnit()) { LOGGER.debug("killContractProcess : 集群合约 kill"); - MasterClientTCPAction.killContract(s, cl, args, resultCallback); + + killContractByMaster(rc.getContractID(), args, resultCallback); } else { String ret = manager.stopContractWithOwner( @@ -1243,7 +1242,29 @@ public class CMActions implements OnHashCallback { ExecutionManager.instance.updateLocalContractToNodeCenter(); } } else { - ReplyUtil.simpleReply(resultCallback,"onKillContractProcess","Failed: Illegal parameters"); + ReplyUtil.simpleReply(resultCallback, "onKillContractProcess", "Failed: Illegal parameters"); + } + } + + public static void killContractByMaster( + String contractID, JsonObject request, ResultCallback rc) { + LOGGER.info("[MasterClientTCPAction] killContract : "); + try { + MasterClientTCPAction.killUnitContractMap.put( + request.get("requestID").getAsString(), + new KillUnitContractInfo(rc, System.currentTimeMillis())); + MultiContractMeta mcm = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); + NetworkManager.instance.sendToAgent(mcm.getMasterNode(), JsonUtil.toJson(request)); + } catch (Exception e) { + e.printStackTrace(); + } finally { + CMActions.manager.stopContractWithOwner( + request.get("verifiedPubKey").getAsString(), contractID); + ContractMeta meta = manager.statusRecorder.getContractMeta(contractID); + if (null != meta && meta.contractExecutor != null) { + //TODO why close? + // meta.contractExecutor.close(); + } } } @@ -1503,14 +1524,14 @@ public class CMActions implements OnHashCallback { e.printStackTrace(); } ExecutionManager.instance.updateLocalContractToNodeCenter(); - ReplyUtil.simpleReply(resultCallback,"onKillAllContract",sb.toString()); + ReplyUtil.simpleReply(resultCallback, "onKillAllContract", sb.toString()); manager.stopAllContracts(); } else { manager.stopAllContractsWithOwner(args.get(("verifiedPubKey")).getAsString()); - ReplyUtil.simpleReply(resultCallback,"onKillAllContract","Success"); + ReplyUtil.simpleReply(resultCallback, "onKillAllContract", "Success"); } } else { - ReplyUtil.simpleReply(resultCallback,"onKillAllContract","Failed: Illegal user"); + ReplyUtil.simpleReply(resultCallback, "onKillAllContract", "Failed: Illegal user"); } } @@ -1646,15 +1667,15 @@ public class CMActions implements OnHashCallback { peerID = jsonPeer.get("peerID").getAsString(); ipPort = jsonPeer.get("ipPort").getAsString(); // NetworkManager.instance.peerID2TCPAddress.put(peer, ipPort); - try { - NetworkManager.instance.createTCPClient(peerID, ipPort); - } catch (InterruptedException e) { - e.printStackTrace(); - JsonObject ret = new JsonObject(); - ret.addProperty("status", "Error"); - ret.addProperty("result", "Cannot form TCP connection to " + nodeName); - rc.onResult(ret); - } +// try { +// NetworkManager.instance.createTCPClient(peerID, ipPort); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// JsonObject ret = new JsonObject(); +// ret.addProperty("status", "Error"); +// ret.addProperty("result", "Cannot form TCP connection to " + nodeName); +// rc.onResult(ret); +// } } } } diff --git a/src/main/java/org/bdware/server/action/CheckPointCallback.java b/src/main/java/org/bdware/server/action/CheckPointCallback.java index 807e034..28c102a 100644 --- a/src/main/java/org/bdware/server/action/CheckPointCallback.java +++ b/src/main/java/org/bdware/server/action/CheckPointCallback.java @@ -1,17 +1,16 @@ package org.bdware.server.action; -import com.google.gson.Gson; import org.bdware.sc.conn.OnHashCallback; import org.bdware.sc.units.MultiContractMeta; import org.bdware.sc.util.JsonUtil; -import org.bdware.server.action.p2p.MasterClientTCPAction; -import org.bdware.server.action.p2p.MasterServerTCPAction; -import org.bdware.server.trustedmodel.SlaveNode; +import org.bdware.units.NetworkManager; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +//UNUSED +//TO Merge public class CheckPointCallback implements OnHashCallback { public static Map lastHash = new ConcurrentHashMap<>(); // contractID,hash @@ -26,12 +25,9 @@ public class CheckPointCallback implements OnHashCallback { String sendStr = JsonUtil.toJson(reqStr); MultiContractMeta info = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); for (String node : info.getMembers()) { - SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(node); - if (cmNode != null) { - System.out.println( - "发送请求给 " + cmNode.pubKey.substring(0, 5) + " 更新ledger中最新检查点的hash!"); - cmNode.connection.sendMsg(sendStr); - } + NetworkManager.instance.sendToAgent(node, sendStr); + System.out.println( + "发送请求给 " + node.substring(0, 5) + " 更新ledger中最新检查点的hash!"); } } } diff --git a/src/main/java/org/bdware/server/action/FileActions.java b/src/main/java/org/bdware/server/action/FileActions.java index c2b602e..8cdf785 100644 --- a/src/main/java/org/bdware/server/action/FileActions.java +++ b/src/main/java/org/bdware/server/action/FileActions.java @@ -28,6 +28,7 @@ import org.bdware.sc.db.TimeDBUtil; import org.bdware.sc.node.ContractManifest; import org.bdware.sc.util.FileUtil; import org.bdware.sc.util.JsonUtil; +import org.bdware.server.ControllerManager; import org.bdware.server.GlobalConf; import org.bdware.server.doip.ContractRepositoryMain; import org.bdware.server.http.URIPath; @@ -1631,8 +1632,7 @@ public class FileActions { json.addProperty("distributeID", reqID); LOGGER.debug("[FileActions] distributeContract : "); LOGGER.debug(JsonUtil.toJson(json)); - NetworkManager.instance.getNCClientHandler().distributeReqMap.put(reqID, resultCallback); - NetworkManager.instance.sendToNodeCenter(JsonUtil.toJson(json)); + ControllerManager.getNodeCenterController().distributeContract(reqID,resultCallback,json); } static class ListProjectResp { diff --git a/src/main/java/org/bdware/server/action/ManagerActions.java b/src/main/java/org/bdware/server/action/ManagerActions.java index d9a95b6..8b0bf19 100644 --- a/src/main/java/org/bdware/server/action/ManagerActions.java +++ b/src/main/java/org/bdware/server/action/ManagerActions.java @@ -10,6 +10,7 @@ import org.bdware.sc.db.KeyValueDBUtil; import org.bdware.sc.encrypt.HardwareInfo; import org.bdware.sc.util.JsonUtil; import org.bdware.server.CongestionControl; +import org.bdware.server.ControllerManager; import org.bdware.server.GRPCPool; import org.bdware.server.GlobalConf; import org.bdware.server.permission.Role; @@ -123,7 +124,7 @@ public class ManagerActions { @Action(userPermission = 1L << 11) public void listNodeInfos(JsonObject args, ResultCallback resultCallback) { - NetworkManager.instance.getNCClientHandler().controller.getNodeInfos( + ControllerManager.getNodeCenterController().getNodeInfos( new ResultCallback() { @Override public void onResult(String str) { @@ -134,7 +135,7 @@ public class ManagerActions { @Action(userPermission = 1L << 26) public void updateContract(JsonObject args, ResultCallback resultCallback) { - NetworkManager.instance.getNCClientHandler().controller.listCMInfo(); + ControllerManager.getNodeCenterController().listCMInfo(); resultCallback.onResult("{\"data\":\"" + System.currentTimeMillis() + "\"}"); } @@ -151,7 +152,7 @@ public class ManagerActions { data.put("isLAN", String.valueOf(GlobalConf.isLAN())); data.put("peerID", GlobalConf.instance.peerID); data.put("bdledger", GlobalConf.instance.datachainConf.replace("\n", " ")); - data.put("clusterConnected", String.valueOf(NetworkManager.instance.getNCClientHandler().isConnected())); + data.put("clusterConnected", String.valueOf(NetworkManager.instance.isConnectedToNodeCenter())); data.put("nodePubKey", GlobalConf.instance.keyPair.getPublicKeyStr()); data.put("masterAddress", GlobalConf.instance.masterAddress); ReplyUtil.replyWithStatus(resultCallback, "onLoadNodeConfig", true, data); diff --git a/src/main/java/org/bdware/server/action/MasterWSAction.java b/src/main/java/org/bdware/server/action/MasterWSAction.java index 4fd73ec..2545f9c 100644 --- a/src/main/java/org/bdware/server/action/MasterWSAction.java +++ b/src/main/java/org/bdware/server/action/MasterWSAction.java @@ -21,7 +21,6 @@ import org.bdware.server.action.p2p.MasterServerTCPAction; import org.bdware.server.trustedmodel.MultiPointContractInfo; import org.bdware.server.trustedmodel.MultiPointCooperateContractInfo; import org.bdware.server.trustedmodel.ResultCollector; -import org.bdware.server.trustedmodel.SlaveNode; import org.bdware.units.NetworkManager; import org.zz.gmhelper.SM2KeyPair; import org.zz.gmhelper.SM2Util; @@ -42,7 +41,31 @@ public class MasterWSAction { public static boolean hostMaster(String contractID) { return CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).isMaster(); } - + private boolean waitForConnection(Set nodeNames) { + LOGGER.info("waitForAllNodes:" + JsonUtil.toJson(nodeNames)); + for (int i = 0; i < 5; i++) { + boolean all = true; + for (String str : nodeNames) { + if (!NetworkManager.instance.hasAgentConnection(str)) { + all = false; + break; + } + } + if (!all) { + synchronized (MasterServerTCPAction.sync) { // TODO why? + try { + MasterServerTCPAction.sync.wait(2000L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } else { + return true; + } + } + LOGGER.info("waitForAllNodes return false;"); + return false; + } @Action(async = true, userPermission = 1L << 26) // zyx modified public void startContractMultiPoint(JsonObject args, final ResultCallback rc) { @@ -53,7 +76,6 @@ public class MasterWSAction { contract.setType(ContractExecType.getContractTypeByInt(args.get("type").getAsInt())); } - // this multiPointContractInfo problem // this multiPointContractInfo problem MultiPointContractInfo multiPointContractInfo = new MultiPointContractInfo(); if (contract.getType() == ContractExecType.Sharding) { @@ -166,6 +188,7 @@ public class MasterWSAction { int nodeSize = nodeNames.size(); // 方式一向NodeCenter发,要求Slave节点主动连接到Master节点. + Map requestConnect = new HashMap<>(); requestConnect.put("action", "requestConnectToMaster"); LOGGER.debug(multiPointContractInfo.masterNode); @@ -175,12 +198,8 @@ public class MasterWSAction { requestConnect.put("connectAll", true); } NetworkManager.instance.sendToNodeCenter(JsonUtil.toJson(requestConnect)); // 向NC发 + waitForConnection(nodeNames); LOGGER.debug(JsonUtil.toPrettyJson(requestConnect)); - boolean isSuccess = waitForConnection(nodeNames); - if (!isSuccess) { - return; - } - ContractManager.threadPool.execute( () -> { // 多点合约更新repository信息 @@ -265,23 +284,13 @@ public class MasterWSAction { startReq = JsonUtil.toJson(request); LOGGER.debug("start contract: " + startReq); } - SlaveNode node; - node = MasterServerTCPAction.id2Slaves.get(nodeID); // slave node信息 - node.connection.sendMsg(startReq); + NetworkManager.instance.sendToAgent(nodeID, startReq); if (!MasterServerRecoverMechAction.recoverStatus.containsKey(nodeID)) { MasterServerRecoverMechAction.recoverStatus.put(nodeID, new ConcurrentHashMap<>()); } MasterServerRecoverMechAction.recoverStatus .get(nodeID) .put(contractID, RecoverFlag.Fine); - - // if (!MasterServerTCPAction.recoverMap.containsKey(nodeID)) { - // MasterServerTCPAction.recoverMap.put(nodeID, new - // ConcurrentHashMap<>()); - // } - // ContractRecord record = new ContractRecord(contract.getID()); - // MasterServerTCPAction.recoverMap.get(nodeID).put(contract.getID(), - // record); } rc.onResult( "{\"status\":\"Success\",\"result\":\"" @@ -291,35 +300,4 @@ public class MasterWSAction { LOGGER.info("success!"); } - private boolean waitForConnection(Collection nodeNames) { - LOGGER.info("waitForAllNodes:" + JsonUtil.toJson(nodeNames)); - for (int i = 0; i < 5; i++) { - boolean all = true; - for (String str : nodeNames) { - if (!MasterServerTCPAction.id2Slaves.containsKey(str)) { - all = false; - break; - } - } - if (!all) { - synchronized (MasterServerTCPAction.sync) { // TODO why? - try { - MasterServerTCPAction.sync.wait(2000L); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } else { - return true; - } - } - LOGGER.info("waitForAllNodes return false;"); - - return false; - } - - @Action(async = true, userPermission = 1L << 22) - public void listSlaves(JsonObject jo, ResultCallback cb) { - cb.onResult(JsonUtil.toJson(MasterServerTCPAction.id2Slaves)); - } } diff --git a/src/main/java/org/bdware/server/action/p2p/AliveCheckClientAction.java b/src/main/java/org/bdware/server/action/p2p/AliveCheckClientAction.java new file mode 100644 index 0000000..0f7efb4 --- /dev/null +++ b/src/main/java/org/bdware/server/action/p2p/AliveCheckClientAction.java @@ -0,0 +1,164 @@ +package org.bdware.server.action.p2p; + +import com.google.gson.JsonObject; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.bdware.heartbeat.HeartBeatUtil; +import org.bdware.sc.MasterElectTimeRecorder; +import org.bdware.sc.conn.ResultCallback; +import org.bdware.sc.units.MultiContractMeta; +import org.bdware.sc.util.JsonUtil; +import org.bdware.server.GlobalConf; +import org.bdware.server.action.Action; +import org.bdware.server.action.CMActions; +import org.bdware.units.NetworkManager; +import org.zz.gmhelper.SM2KeyPair; + +import java.text.SimpleDateFormat; +import java.util.HashMap; +import java.util.Map; +import java.util.TimerTask; + +public class AliveCheckClientAction { + private static final Logger LOGGER = LogManager.getLogger(AliveCheckClientAction.class); + protected boolean waitForSetNode = false; + + public static final int sendDelay = 2000; + public static final int checkDelay = 5000; + private final String masterPubkey; + TimerTask sendPingTask; + TimerTask checkAliveTask; + private long lastMasterPongTime = System.currentTimeMillis(); + + public AliveCheckClientAction(String masterPubkey) { + this.masterPubkey = masterPubkey; + } + + @Action(async = true) + public void checkMasterAlive(JsonObject jo, ResultCallback result) { + if (masterPubkey != null) { + LOGGER.info("start heartbeat to " + masterPubkey.substring(0, 5)); + waitForSetNode = true; + checkMasterAlive(result); + } + } + + // 关闭和master的连接 + public void closeMaster() { + if (checkAliveTask != null) { + HeartBeatUtil.getInstance().cancel(checkAliveTask); + checkAliveTask = null; + } + if (sendPingTask != null) { + HeartBeatUtil.getInstance().cancel(sendPingTask); + sendPingTask = null; + } + NetworkManager.instance.closeAgent(masterPubkey); + } + + public void checkMasterAlive(ResultCallback rc) { + if (sendPingTask == null) + sendPingTask = + new TimerTask() { + @Override + public void run() { + try { + LOGGER.debug( + String.format( + "f %s", + new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss") + .format(System.currentTimeMillis()))); + Map ping = new HashMap<>(); + ping.put("action", "masterPing"); + rc.onResult(JsonUtil.toJson(ping)); + } catch (Throwable t) { + t.printStackTrace(); + } + } + }; + if (null == checkAliveTask) + checkAliveTask = + new TimerTask() { + @Override + public void run() { + try { + run1(); + } catch (Throwable t) { + t.printStackTrace(); + HeartBeatUtil.getInstance().cancel(this); + } + } + + public void run1() { + long cur = System.currentTimeMillis(); + if (cur - lastMasterPongTime >= (2 * sendDelay)) { + LOGGER.info( + "lastMasterPongTime=" + + new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss") + .format(lastMasterPongTime) + + " 认为master崩溃!"); + + // 向NC发通知重新选举master,如果NC没有收到所有节点的重选请求,就认为是这个节点和master连接断开 + // 这个节点需要重连master + Map request = new HashMap<>(); + request.put("action", "electMaster"); + SM2KeyPair keyPair = GlobalConf.instance.keyPair; + request.put("nodeID", keyPair.getPublicKeyStr()); + for (MultiContractMeta meta : CMActions.manager.multiContractRecorder.getStatus().values()) { + if (meta.getMasterNode().equals(masterPubkey)) { + LOGGER.info( + "认为合约 " + + meta.getContractID() + + " 的master崩溃 master=" + + masterPubkey); + request.put("contractID", meta.getContractID()); + int lastSeq = meta.getLastExeSeq(); + request.put("lastExe", lastSeq + ""); + request.put("master", masterPubkey); + String[] members = + meta.getMembers(); + request.put("members", StringUtils.join(members, ",")); + NetworkManager.instance.sendToNodeCenter( + JsonUtil.toJson(request)); + MasterElectTimeRecorder.findMasterCrash = + System.currentTimeMillis(); + } + } + closeMaster(); + } // if + } + }; + lastMasterPongTime = System.currentTimeMillis(); + LOGGER.info( + "设置 lastMasterPongTime=" + + new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss").format(lastMasterPongTime)); + HeartBeatUtil.getInstance().schedule(sendPingTask, sendDelay / 2, checkDelay); + HeartBeatUtil.getInstance().schedule(checkAliveTask, sendDelay, checkDelay); + } + + @Action(async = true) + public void masterPong(JsonObject jo, ResultCallback result) { + lastMasterPongTime = System.currentTimeMillis(); + } + + public void waitForSetNodeID() { + for (int i = 0; i < 100; i++) { + if (waitForSetNode) return; + try { + Thread.sleep(30); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public void init() { + LOGGER.info("init nodeID:"); + assert masterPubkey != null; + NetworkManager.instance.sendToAgent(masterPubkey, "{\"action\":\"setNodeInfo\",\"pubKey\":\"" + + GlobalConf.instance.keyPair.getPublicKeyStr() + + "\"}"); + + } +} diff --git a/src/main/java/org/bdware/server/action/p2p/AliveCheckServerAction.java b/src/main/java/org/bdware/server/action/p2p/AliveCheckServerAction.java new file mode 100644 index 0000000..6c163fd --- /dev/null +++ b/src/main/java/org/bdware/server/action/p2p/AliveCheckServerAction.java @@ -0,0 +1,110 @@ +package org.bdware.server.action.p2p; + +import com.google.gson.JsonObject; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.bdware.heartbeat.HeartBeatUtil; +import org.bdware.sc.conn.ResultCallback; +import org.bdware.sc.units.RecoverFlag; +import org.bdware.sc.util.JsonUtil; +import org.bdware.server.action.Action; +import org.bdware.server.tcp.TCPServerFrameHandler; +import org.bdware.units.NetworkManager; + +import java.text.SimpleDateFormat; +import java.util.*; + +public class AliveCheckServerAction { + private static final Logger LOGGER = LogManager.getLogger(AliveCheckServerAction.class); + private final TCPServerFrameHandler handler; + + TimerTask checkAliveTask; + private long lastSlavePingTime; + String pubKey; + + public AliveCheckServerAction(TCPServerFrameHandler handler) { + lastSlavePingTime = + System.currentTimeMillis() + + 5000L + + AliveCheckClientAction.sendDelay + + AliveCheckClientAction.checkDelay; + + checkAliveTask = new HeartBeatTask(AliveCheckClientAction.sendDelay); + HeartBeatUtil.getInstance() + .schedule( + checkAliveTask, + AliveCheckClientAction.sendDelay, + AliveCheckClientAction.checkDelay); + this.handler = handler; + } + + private class HeartBeatTask extends TimerTask { + int delay; + HeartBeatTask(int delay) { + this.delay = delay; + } + @Override + public void run() { + try { + long cur = System.currentTimeMillis(); + if (cur - lastSlavePingTime >= (2L * delay)) { + LOGGER.info( + new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss").format(lastSlavePingTime) + + " " + + delay); + Set contracts = new HashSet<>(); + String nodeID = pubKey; + if (nodeID == null) { + LOGGER.info("nodeID == null " + this); + HeartBeatUtil.getInstance().cancel(this); + return; + } + LOGGER.info( + "Master心跳机制发现节点 " + + nodeID.substring(0, 5) + + " 下线! " + + this.toString()); + HeartBeatUtil.getInstance().cancel(this); + for (String contractID : + MasterServerRecoverMechAction.recoverStatus.get(nodeID).keySet()) { + // RecoverFlag flag = + // + // MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID); + // if (flag == RecoverFlag.Fine) + MasterServerRecoverMechAction.recoverStatus + .get(nodeID) + .put(contractID, RecoverFlag.ToRecover); + contracts.add(contractID); + MasterServerTCPAction.notifyNodeOffline(contractID, nodeID); + } + for (String contractID : contracts) { + MasterServerRecoverMechAction.unitModeCheck(contractID); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + @Action(async = true) + public void setNodeInfo(JsonObject jo, ResultCallback cb) { + LOGGER.info("[MasterServerTCPAction] setNodeInfo : "); + String nodeID = jo.get("pubKey").getAsString(); + pubKey = nodeID; + NetworkManager.instance.registerConnection(nodeID, handler); + LOGGER.info("[MasterServerTCPAction] setNodeInfo id2Slaves put " + nodeID.substring(0, 5)); + Map request = new HashMap<>(); + request.put("action", "checkMasterAlive"); + cb.onResult(JsonUtil.toJson(request)); + } + + @Action(async = true) + public void masterPing(JsonObject args, ResultCallback cb) { + lastSlavePingTime = System.currentTimeMillis(); + Map request = new HashMap<>(); + request.put("action", "masterPong"); + cb.onResult(JsonUtil.toJson(request)); + + } +} diff --git a/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java b/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java index ad00cb3..f3151e4 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java @@ -1,7 +1,6 @@ package org.bdware.server.action.p2p; import com.google.gson.JsonObject; -import io.netty.util.internal.ConcurrentSet; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bdware.sc.ContractClient; @@ -20,32 +19,29 @@ import org.bdware.server.action.Action; import org.bdware.server.action.CMActions; import org.bdware.server.action.RequestToMaster; import org.bdware.server.tcp.TCPClientFrameHandler; +import org.bdware.server.trustedmodel.AgentManager; import org.bdware.server.trustedmodel.ContractUnitStatus; -import org.bdware.server.trustedmodel.MasterProxy; import org.bdware.units.function.ExecutionManager; import org.zz.gmhelper.SM2KeyPair; import java.io.*; import java.text.SimpleDateFormat; import java.util.*; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; public class MasterClientRecoverMechAction { private static final Logger LOGGER = LogManager.getLogger(MasterClientRecoverMechAction.class); - public static ConcurrentSet recoverSet = - new ConcurrentSet<>(); // contracts which don't finish recoverRestart + public static Set recoverSet = + new ConcurrentSkipListSet<>(); // contracts which don't finish recoverRestart public static Map> requestsToMaster; // when master is re-electing,client node cache ites received requests - private final TCPClientFrameHandler handler; - private final MasterClientTCPAction clientAction; private final Map stateFileMap = new HashMap<>(); private final Map transFileMap = new HashMap<>(); - public MasterClientRecoverMechAction( - TCPClientFrameHandler handler, MasterClientTCPAction action) { - this.handler = handler; - clientAction = action; + public MasterClientRecoverMechAction() { + } // 告知master自己需要恢复,携带之前的运行模式,如果是StableMode需要携带lastExeSeq信息 @@ -58,7 +54,7 @@ public class MasterClientRecoverMechAction { + contractID + " masterID=" + masterID.substring(0, 5)); - TCPClientFrameHandler master = MasterProxy.getHandler(masterID); + TCPClientFrameHandler master = AgentManager.getHandler(masterID); Map ret = new HashMap(); ret.put("action", "askForRecover"); @@ -191,7 +187,7 @@ public class MasterClientRecoverMechAction { e.printStackTrace(); } LOGGER.info("slave receive last state package,isDone=true"); - recoverFromCommonMode(GlobalConf.instance.projectDir + "/stateFiles/" + fileName); + recoverFromCommonMode(GlobalConf.instance.projectDir + "/stateFiles/" + fileName, rc); } } @@ -199,14 +195,14 @@ public class MasterClientRecoverMechAction { public void recoverFromCommonMode(JsonObject args, final ResultCallback rc) { String path = args.get("filePath").getAsString(); if (args.has("isMaster")) { - recoverFromCommonMode(path, args.get("isMaster").getAsBoolean()); + recoverFromCommonMode(path, args.get("isMaster").getAsBoolean(), rc); } else { - recoverFromCommonMode(path); + recoverFromCommonMode(path, rc); } } - public void recoverFromCommonMode(String path) { - recoverFromCommonMode(path, false); + public void recoverFromCommonMode(String path, ResultCallback rc) { + recoverFromCommonMode(path, false, rc); } // //无状态合约同步lastExeSeq之后发送恢复完成 @@ -232,7 +228,7 @@ public class MasterClientRecoverMechAction { // } // 普通节点从CommonMode恢复 - public void recoverFromCommonMode(String path, boolean isMaster) { + public void recoverFromCommonMode(String path, boolean isMaster, ResultCallback rc) { LOGGER.info("recoverFromCommonMode"); RecoverMechTimeRecorder.startCommonRecover = System.currentTimeMillis(); @@ -286,10 +282,10 @@ public class MasterClientRecoverMechAction { // CMActions.manager.dumpContract(cei.getContractID(), "")); // not in the recovering state, send msd to NC,turn to fine state - sendRecoverFinish(cei.getContractID(), "common"); + sendRecoverFinish(cei.getContractID(), "common", rc); LOGGER.info("恢复之后处理请求队列"); - clientAction.dealRequests(cei.getContractID()); + MasterClientTCPAction.dealRequests(cei.getContractID()); } // 检查是否有该合约进行并进行恢复 @@ -355,7 +351,7 @@ public class MasterClientRecoverMechAction { CMTables.UnitContracts.toString(), cei.getContractID(), "exist"); } CMActions.manager.multiContractRecorder.updateValue(cei); - MasterClientTCPAction.contractID2MasterInfo.put(contractID, clientAction); + //MasterClientTCPAction.contractID2MasterInfo.put(contractID, clientAction); CMActions.manager.setContractIsMaster(contractID, "false"); } else { LOGGER.info("[MasterClientRecoverMechAction] 恢复节点没有这个合约进程 : "); @@ -377,7 +373,7 @@ public class MasterClientRecoverMechAction { CMTables.UnitContracts.toString(), cei.getContractID(), "exist"); } CMActions.manager.multiContractRecorder.updateValue(cei); - MasterClientTCPAction.contractID2MasterInfo.put(contractID, clientAction); + //MasterClientTCPAction.contractID2MasterInfo.put(contractID, clientAction); parPath = GlobalConf.instance.publicCompiledDir + "/" + cei.getYpkName(); if (cei.isPrivate()) { @@ -419,10 +415,10 @@ public class MasterClientRecoverMechAction { @Action(async = true) public void sendRecoverFinish(JsonObject args, final ResultCallback rc) { - sendRecoverFinish(args.get("contractID").getAsString(), null); + sendRecoverFinish(args.get("contractID").getAsString(), null, rc); } - public void sendRecoverFinish(String contractID, String method) { + public void sendRecoverFinish(String contractID, String method, ResultCallback rc) { LOGGER.info("恢复步骤-----------6 发送恢复完成"); MasterClientRecoverMechAction.recoverSet.remove(contractID); // 本地认为恢复结束 Map ret = new HashMap(); @@ -433,8 +429,7 @@ public class MasterClientRecoverMechAction { if (method != null) { ret.put("method", method); } - handler.sendMsg(JsonUtil.toJson(ret)); - + rc.onResult(JsonUtil.toJson(ret)); // 更新 ExecutionManager.instance.updateLocalContractToNodeCenter(); } @@ -501,7 +496,7 @@ public class MasterClientRecoverMechAction { req.put("isDone", true + ""); } LOGGER.info("len=" + len + " index=" + index); - handler.sendMsg(JsonUtil.toJson(req)); + rc.onResult(JsonUtil.toJson(req)); index++; req.put("isAppend", "true"); @@ -565,7 +560,7 @@ public class MasterClientRecoverMechAction { SM2KeyPair keyPair = GlobalConf.instance.keyPair; ret.put("nodeID", keyPair.getPublicKeyStr()); ret.put("contractID", contractID); - handler.sendMsg(JsonUtil.toJson(ret)); + result.onResult(JsonUtil.toJson(ret)); return; } @@ -657,13 +652,13 @@ public class MasterClientRecoverMechAction { request.put("contractID", contractID); SM2KeyPair keyPair = GlobalConf.instance.keyPair; request.put("nodeID", keyPair.getPublicKeyStr()); - handler.sendMsg(JsonUtil.toJson(request)); + result.onResult(JsonUtil.toJson(request)); } else { - sendRecoverFinish(contractID, "stable"); + sendRecoverFinish(contractID, "stable", result); } } - public void dealTransRecords(String contractID, String path, int lastSeq) { + public void dealTransRecords(String contractID, String path, int lastSeq, ResultCallback rc) { LOGGER.info("[MasterClientRecoverMechAction] dealTransRecords : lastSeq= " + lastSeq); if (!MasterClientRecoverMechAction.recoverSet.contains(contractID)) { @@ -686,7 +681,7 @@ public class MasterClientRecoverMechAction { // String m2 = CMActions.manager.dumpContract(contractID, ""); // logger.info("通过master的transRecods恢复之后状态为 \n" + m2); - sendRecoverFinish(contractID, "stable"); + sendRecoverFinish(contractID, "stable", rc); if (file.isFile() && file.exists()) { // file.delete(); } @@ -785,7 +780,7 @@ public class MasterClientRecoverMechAction { } String contractID = args.get("contractID").getAsString(); int last = args.get("last").getAsInt(); - dealTransRecords(contractID, dir.getAbsolutePath(), last); + dealTransRecords(contractID, dir.getAbsolutePath(), last, rc); } } catch (IOException e) { e.printStackTrace(); diff --git a/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java b/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java index b72a2d2..174e4b9 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java @@ -5,7 +5,6 @@ import com.google.gson.JsonPrimitive; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.bdware.heartbeat.HeartBeatUtil; import org.bdware.sc.*; import org.bdware.sc.bean.Contract; import org.bdware.sc.bean.ContractExecType; @@ -21,77 +20,27 @@ import org.bdware.server.executor.RequestAllExecutor; import org.bdware.server.executor.unconsistency.MultiPointCooperationExecutor; import org.bdware.server.executor.unconsistency.RequestOnceExecutor; import org.bdware.server.executor.unconsistency.ResponseOnceExecutor; -import org.bdware.server.tcp.TCPClientFrameHandler; import org.bdware.server.trustedmodel.ContractExecutor; import org.bdware.server.trustedmodel.KillUnitContractInfo; -import org.bdware.server.trustedmodel.MasterProxy; +import org.bdware.server.trustedmodel.SelfAdaptiveShardingExecutor; import org.bdware.server.trustedmodel.SingleNodeExecutor; import org.bdware.units.NetworkManager; import org.bdware.units.function.ExecutionManager; import org.zz.gmhelper.SM2KeyPair; -import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; -import java.io.PrintStream; -import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.Map; import java.util.Random; -import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; public class MasterClientTCPAction { - public static final int sendDelay = 2000; - public static final int checkDelay = 5000; + private static final Logger LOGGER = LogManager.getLogger(MasterClientTCPAction.class); - public static Map contractID2MasterInfo = - new ConcurrentHashMap<>(); - static Map killUnitContractMap = new ConcurrentHashMap<>(); - private final TCPClientFrameHandler handler; - private final String master; - TimerTask sendPingTask; - TimerTask checkAliveTask; - TCPClientFrameHandler controller; - private long lastMasterPongTime = System.currentTimeMillis(); - private boolean waitForSetNode = false; - - public MasterClientTCPAction(TCPClientFrameHandler handler, String master) { - this.handler = handler; - this.master = master; - } - - public static void killContract( - long startTime, ContractClient client, JsonObject request, ResultCallback rc) { - LOGGER.info("[MasterClientTCPAction] killContract : "); - - killUnitContractMap.put( - request.get("requestID").getAsString(), - new KillUnitContractInfo(rc, System.currentTimeMillis())); - - // 向master发送信息停止集群中所有该合约实例 - MasterClientTCPAction mcta = contractID2MasterInfo.get(client.getContractID()); - if (mcta == null) mcta = contractID2MasterInfo.get(client.getContractName()); - // FIXME - // 重连后,kill报错。Caused by: java.lang.NullPointerException - // at - // org.bdware.server.action.MasterClientTCPAction.killContract(MasterClientTCPAction.java:60) - try { - mcta.handler.sendMsg(JsonUtil.toJson(request)); - } catch (Exception e) { - e.printStackTrace(); - } finally { - CMActions.manager.stopContractWithOwner( - request.get("verifiedPubKey").getAsString(), client.getContractID()); - MultiContractMeta cei = - CMActions.manager.multiContractRecorder.getMultiContractMeta(client.getContractID()); - if (null != cei) { - cei.contractExecutor.close(); - } - } - } + public static Map killUnitContractMap = new ConcurrentHashMap<>(); // NC发现master为null,让各个节点重选 public static void NCStartElect(ContractClient cc, String uniNumber) { @@ -103,14 +52,14 @@ public class MasterClientTCPAction { SM2KeyPair keyPair = GlobalConf.instance.keyPair; request.put("nodeID", keyPair.getPublicKeyStr()); - if (!contractID2MasterInfo.containsKey(contractID)) { - contractID = cc.getContractID(); - } - - MasterClientTCPAction mcta = contractID2MasterInfo.get(contractID); - if (mcta != null) { - mcta.closeMaster(); - } +// if (!contractID2MasterInfo.containsKey(contractID)) { +// contractID = cc.getContractID(); +// } +// +// MasterClientTCPAction mcta = contractID2MasterInfo.get(contractID); +// if (mcta != null) { +// mcta.closeMaster(); +// } request.put("contractID", contractID); int lastSeq = @@ -173,164 +122,23 @@ public class MasterClientTCPAction { return executor; } - @Action(async = true) - public void checkMasterAlive(JsonObject jo, ResultCallback result) { - LOGGER.info("start heartbeat to " + master.substring(0, 5)); - waitForSetNode = true; - checkMasterAlive(); - } - - // 关闭和master的连接 - public void closeMaster() { - if (checkAliveTask != null) { - HeartBeatUtil.getInstance().cancel(checkAliveTask); - checkAliveTask = null; - } - if (sendPingTask != null) { - HeartBeatUtil.getInstance().cancel(sendPingTask); - sendPingTask = null; - } - if (handler != null) { - try { - handler.close(); - } catch (Exception e) { - e.printStackTrace(); - } - } - MasterProxy.CONNECTORS.remove(master); - } - - public void checkMasterAlive() { - if (sendPingTask == null) - sendPingTask = - new TimerTask() { - @Override - public void run() { - try { - run1(); - } catch (Throwable t) { - t.printStackTrace(); - // this.cancel(); - } - } - - public void run1() { - LOGGER.debug( - String.format( - "f %s", - new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss") - .format(System.currentTimeMillis()))); - Map ping = new HashMap<>(); - ping.put("action", "masterPing"); - handler.sendMsg(JsonUtil.toJson(ping)); - } - }; - if (null == checkAliveTask) - checkAliveTask = - new TimerTask() { - @Override - public void run() { - try { - run1(); - } catch (Throwable t) { - t.printStackTrace(); - HeartBeatUtil.getInstance().cancel(this); - } - } - - public void run1() { - long cur = System.currentTimeMillis(); - - if (cur - lastMasterPongTime >= (2 * sendDelay)) { - LOGGER.info( - "lastMasterPongTime=" - + new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss") - .format(lastMasterPongTime) - + " 认为master崩溃!"); - - // 向NC发通知重新选举master,如果NC没有收到所有节点的重选请求,就认为是这个节点和master连接断开 - // 这个节点需要重连master - Map request = new HashMap<>(); - request.put("action", "electMaster"); - SM2KeyPair keyPair = GlobalConf.instance.keyPair; - request.put("nodeID", keyPair.getPublicKeyStr()); - for (String contractID : contractID2MasterInfo.keySet()) { - if (contractID2MasterInfo.get(contractID).master.equals(master) - && KeyValueDBUtil.instance.containsKey( - CMTables.UnitContracts.toString(), contractID)) { - LOGGER.info( - "认为合约 " - + contractID - + " 的master崩溃 master1=" - + contractID2MasterInfo.get(contractID) - .master - + " master2=" - + master); - request.put("contractID", contractID); - int lastSeq = - CMActions.manager - .multiContractRecorder - .getMultiContractMeta(contractID) - .getLastExeSeq(); - request.put("lastExe", lastSeq + ""); - request.put("master", master); - String[] members = - CMActions.manager - .multiContractRecorder - .getMultiContractMeta(contractID) - .getMembers(); - request.put("members", StringUtils.join(members, ",")); - NetworkManager.instance.sendToNodeCenter( - JsonUtil.toJson(request)); - LOGGER.info( - "认为合约 " - + contractID - + " 的master崩溃 当前master为 " - + master.substring(0, 5) - + " 向NC发送重选信息"); - MasterElectTimeRecorder.findMasterCrash = - System.currentTimeMillis(); - } - } - closeMaster(); - } // if - } - }; - - lastMasterPongTime = System.currentTimeMillis(); - LOGGER.info( - "设置 lastMasterPongTime=" - + new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss").format(lastMasterPongTime)); - - HeartBeatUtil.getInstance().schedule(sendPingTask, 1000, checkDelay); - HeartBeatUtil.getInstance().schedule(checkAliveTask, sendDelay, checkDelay); - // HeartBeatUtil.getInstance().schedule(checkAliveTask, 10000, checkDealy); - } - - // @Action(async = true) - // public void onKillContractProcess(JsonObject jo, ResultCallback result) { - // logger.info("[MasterClientTCPAction] : onKillContractProcess"); - // if (killUnitContractMap.containsKey(jo.get("requestID").getAsString())) { - // KillUnitContractInfo info = - // killUnitContractMap.get(jo.get("requestID").getAsString()); - // Map r = new HashMap<>(); - // r.put("action", "onKillContractProcess"); - // r.put("data", jo.get("data")); - // r.put("executeTime", System.currentTimeMillis() - info.startTime); - // logger.info("[MasterClientTCPAction] onKillContractProcess : " + jo.get("data")); - // info.rc.onResult(JsonUtil.toJson(r)); - // } - // } @Action(async = true) - public void masterPong(JsonObject jo, ResultCallback result) { - lastMasterPongTime = System.currentTimeMillis(); - // logger.info( - // "设置 lastMasterPongTime=" - // + new - // SimpleDateFormat("yyyy-MM-dd.HH:mm:ss").format(lastMasterPongTime)); + public void onKillContractProcess(JsonObject jo, ResultCallback result) { + LOGGER.info("[MasterClientTCPAction] : onKillContractProcess"); + if (killUnitContractMap.containsKey(jo.get("requestID").getAsString())) { + KillUnitContractInfo info = + killUnitContractMap.get(jo.get("requestID").getAsString()); + Map r = new HashMap<>(); + r.put("action", "onKillContractProcess"); + r.put("data", jo.get("data")); + r.put("executeTime", System.currentTimeMillis() - info.startTime); + LOGGER.info("[MasterClientTCPAction] onKillContractProcess : " + jo.get("data")); + info.rc.onResult(JsonUtil.toJson(r)); + } } + // kill 本地的该集群合约实例 @Action(async = true) public void killContractProcessAtSlave(JsonObject jo, ResultCallback result) { @@ -339,10 +147,7 @@ public class MasterClientTCPAction { else id = jo.get("name").getAsString(); ContractClient cc = CMActions.manager.getClient(id); - // 清理 - if (contractID2MasterInfo.containsKey(cc.getContractID())) { - contractID2MasterInfo.remove(cc.getContractID()); - } + if (KeyValueDBUtil.instance.containsKey( CMTables.LastExeSeq.toString(), cc.getContractID())) { KeyValueDBUtil.instance.delete(CMTables.LastExeSeq.toString(), cc.getContractID()); @@ -387,10 +192,10 @@ public class MasterClientTCPAction { // 需要计算出自己的ShardingID?路由规则(id/requester/arg-->shardingId), // 也在MultiPointCooperationExecutor中实现 } - contractID2MasterInfo.put(contractID, this); // 记录contractID 和 master之间的对应关系 + //TOODO master连接 + // contractID2MasterInfo.put(contractID, this); // 记录contractID 和 master之间的对应关系 MultiContractMeta cei = CMActions.manager.multiContractRecorder.createIfNotExist(contractID); - cei.setLastExeSeq(-1); if (!contract.getScriptStr().startsWith("/")) { contract.setScript(dumpToDisk(contract, jo)); @@ -434,7 +239,7 @@ public class MasterClientTCPAction { } // 这个地方判定,从参数中的master数据 和 globalConf中的数据 进行对比,如果一致的话,说明该节点为master cei.setMaster(jo.get("master").getAsString()); - if (contract.getType() != ContractExecType.Sharding) + if (contract.getType().needSeq()) cei.setIsMaster(GlobalConf.getNodeID().equals(jo.get("master").getAsString())); else { cei.setIsMaster(true); @@ -469,11 +274,11 @@ public class MasterClientTCPAction { resultMap.put("pubKey", GlobalConf.instance.keyPair.getPublicKeyStr()); result.onResult(JsonUtil.toJson(resultMap)); } - if (contract.getType() == ContractExecType.Sharding) { - for (String str : cei.getMembers()) { - NetworkManager.instance.getNCClientHandler().controller.connectToMaster(str, null); - } - } +// if (contract.getType() == ContractExecType.Sharding) { +// for (String str : cei.getMembers()) { +// NetworkManager.instance.getNCClientHandler().controller.connectToMaster(str, null); +// } +// } } private String dumpToDisk(Contract contract, JsonObject jo) { @@ -506,13 +311,14 @@ public class MasterClientTCPAction { return "/" + scriptName + ".ypk"; } + //TODO 这个奇怪的action可以放到这个SelfAdaptiveSharding的相关类里 @Action(async = true) public void deliverBlock(JsonObject jo, ResultCallback result) { if (jo.has("contractID") && jo.has("data")) { String contractID = jo.get("contractID").getAsString(); - MultiContractMeta cei = ContractManager.instance.multiContractRecorder.getMultiContractMeta(contractID); - if (null != cei && cei.contractExecutor instanceof SelfAdaptiveShardingExecutor) { - ((SelfAdaptiveShardingExecutor) cei.contractExecutor).execute(jo.get("data").getAsString()); + ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID); + if (null != meta && meta.contractExecutor instanceof SelfAdaptiveShardingExecutor) { + ((SelfAdaptiveShardingExecutor) meta.contractExecutor).execute(jo.get("data").getAsString()); } } } @@ -629,7 +435,7 @@ public class MasterClientTCPAction { } } - public void dealRequests(String contractID) { + public static void dealRequests(String contractID) { LOGGER.info("dealRequests"); // If still in recovering state,don't dealRequests now @@ -637,12 +443,9 @@ public class MasterClientTCPAction { LOGGER.info("本地还没恢复完成,不执行请求!"); return; } - // try { - // Thread.sleep(25000); - // } catch (InterruptedException e) { - // e.printStackTrace(); - // } + MultiContractMeta cei; + synchronized (cei = CMActions.manager.multiContractRecorder.createIfNotExist(contractID)) { while (!cei.queue.isEmpty()) { ContractRequest request = cei.queue.peek(); @@ -762,29 +565,16 @@ public class MasterClientTCPAction { ret.put("nodeID", keyPair.getPublicKeyStr()); ret.put("start", cei.getLastExeSeq() + ""); ret.put("end", request.seq + ""); // 左开右开区间 - handler.sendMsg(JsonUtil.toJson(ret)); - + NetworkManager.instance.sendToAgent(cei.getMasterNode(), JsonUtil.toJson(ret)); break; } } } } - public void init(TCPClientFrameHandler masterClientFrameHandler) { - LOGGER.info("[MasterClientTCPAction] init : "); - controller = masterClientFrameHandler; - controller.sendMsg( - "{\"action\":\"setNodeInfo\",\"pubKey\":\"" - + GlobalConf.instance.keyPair.getPublicKeyStr() - + "\"}"); - } - - public void callMaster(ContractRequest request, ResultCallback resultCallback) { - LOGGER.info("[TODO CallMaster]"); - } @Action(async = true) - public void receiveContractExecution(JsonObject jsonObject, ResultCallback resultCallback) { + public void receiveContractExecutionServer(JsonObject jsonObject, ResultCallback resultCallback) { MasterServerTCPAction.sync.wakeUp( jsonObject.get("responseID").getAsString(), jsonObject.get("data").getAsString()); } @@ -844,36 +634,27 @@ public class MasterClientTCPAction { jo); } - public String requestContractExecution(ContractRequest c) { - try { - LOGGER.info("[MasterClientTCPAction] requestContractExecution " + JsonUtil.toJson(c)); - Map req = new HashMap<>(); - req.put("action", "requestContractExecution"); - req.put("requestID", c.getRequestID()); - req.put("data", c); - handler.sendMsg(JsonUtil.toJson(req)); - // 这里可能出错。是不是在这里校验CollectResult? - ContractResult str = MasterServerTCPAction.sync.syncSleep(c.getRequestID()); - LOGGER.info("[RequestContractGet]" + JsonUtil.toJson(str)); - return JsonUtil.toJson(str); - } catch (Exception e) { - ByteArrayOutputStream bo = new ByteArrayOutputStream(); - e.printStackTrace(new PrintStream(bo)); - ContractResult cr = - new ContractResult( - ContractResult.Status.Exception, new JsonPrimitive(bo.toString())); - return JsonUtil.toJson(cr); - } - } +// public String requestContractExecution(ContractRequest c) { +// try { +// LOGGER.info("[MasterClientTCPAction] requestContractExecution " + JsonUtil.toJson(c)); +// Map req = new HashMap<>(); +// req.put("action", "requestContractExecution"); +// req.put("requestID", c.getRequestID()); +// req.put("data", c); +// handler.sendMsg(JsonUtil.toJson(req)); +// // 这里可能出错。是不是在这里校验CollectResult? +// ContractResult str = MasterServerTCPAction.sync.syncSleep(c.getRequestID()); +// LOGGER.info("[RequestContractGet]" + JsonUtil.toJson(str)); +// return JsonUtil.toJson(str); +// } catch (Exception e) { +// ByteArrayOutputStream bo = new ByteArrayOutputStream(); +// e.printStackTrace(new PrintStream(bo)); +// ContractResult cr = +// new ContractResult( +// ContractResult.Status.Exception, new JsonPrimitive(bo.toString())); +// return JsonUtil.toJson(cr); +// } +// } + - public void waitForSetNodeID() { - for (int i = 0; i < 100; i++) { - if (waitForSetNode) return; - try { - Thread.sleep(30); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } } diff --git a/src/main/java/org/bdware/server/action/p2p/MasterClientTransferAction.java b/src/main/java/org/bdware/server/action/p2p/MasterClientTransferAction.java index 90e8832..b22fd1b 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterClientTransferAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterClientTransferAction.java @@ -11,8 +11,7 @@ import org.bdware.sc.util.JsonUtil; import org.bdware.server.GlobalConf; import org.bdware.server.action.Action; import org.bdware.server.action.CMActions; -import org.bdware.server.tcp.TCPClientFrameHandler; -import org.bdware.server.trustedmodel.MasterProxy; +import org.bdware.units.NetworkManager; import java.io.*; import java.util.HashMap; @@ -22,29 +21,23 @@ import java.util.zip.GZIPOutputStream; public class MasterClientTransferAction { private static final Logger LOGGER = LogManager.getLogger(MasterClientTransferAction.class); - private final String master; - TCPClientFrameHandler handler; - MasterClientTCPAction action; + public static MasterClientTransferAction instance = new MasterClientTransferAction(); + private final Map id2Memory = new HashMap<>(); - public MasterClientTransferAction(TCPClientFrameHandler h, String pubKey, MasterClientTCPAction a) { - handler = h; - master = pubKey; - action = a; + private MasterClientTransferAction() { } - public void transferInstance(String contractID) { + public void transferInstance(String agentPubkey, String contractID) { LOGGER.info("transferInstance contractID=" + contractID); - //step2 save state String mem = CMActions.manager.dumpContract(contractID, ""); id2Memory.put(contractID, mem); - //step3 send ypk or script and start other,kill local - sendAndStart(contractID); + sendAndStart(agentPubkey, contractID); } - public void sendAndStart(String contractID) { + public void sendAndStart(String agentPubkey, String contractID) { LOGGER.info("sendAndStart contractID=" + contractID); ContractClient cc = CMActions.manager.getClient(contractID); @@ -67,7 +60,7 @@ public class MasterClientTransferAction { req.put("script", script); req.put("contractID", contractID); LOGGER.info("transferByScript send last"); - handler.sendMsg(JsonUtil.toJson(req)); + NetworkManager.instance.sendToAgent(agentPubkey, JsonUtil.toJson(req)); } else { req.put("action", "transferByYPK"); File file; @@ -101,7 +94,7 @@ public class MasterClientTransferAction { String data = ByteUtil.encodeBASE64(buff, len); req.put("data", data); count += len; - handler.sendMsg(JsonUtil.toJson(req)); + NetworkManager.instance.sendToAgent(agentPubkey, JsonUtil.toJson(req)); req.put("isAppend", "true"); Thread.sleep(300); @@ -112,7 +105,7 @@ public class MasterClientTransferAction { req.remove("data"); req.put("contractID", contractID); LOGGER.info("transferByYPK send last"); - handler.sendMsg(JsonUtil.toJson(req)); + NetworkManager.instance.sendToAgent(agentPubkey, JsonUtil.toJson(req)); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { @@ -128,13 +121,13 @@ public class MasterClientTransferAction { LOGGER.info("onSendAndStart contractID=" + contractID); String mem = id2Memory.get(contractID); id2Memory.remove(contractID); - boolean res1 = sendMemory(contractID, mem); + boolean res1 = sendMemory(contractID, mem, result); if (!res1) { LOGGER.info(res1); } } - public boolean sendMemory(String contractID, String memory) { + public boolean sendMemory(String contractID, String memory, ResultCallback cb) { LOGGER.info("sendMemory contractID=" + contractID); String path = contractID + "_temp_stateFile_" + new Random().nextLong() + "_" + System.currentTimeMillis(); @@ -171,7 +164,7 @@ public class MasterClientTransferAction { String data = ByteUtil.encodeBASE64(buff, len); req.put("data", data); count += len; - handler.sendMsg(JsonUtil.toJson(req)); + cb.onResult(JsonUtil.toJson(req)); req.put("isAppend", "true"); Thread.sleep(300); @@ -181,9 +174,7 @@ public class MasterClientTransferAction { req.put("isDone", "true"); req.put("contractID", contractID); req.remove("data"); - - handler.sendMsg(JsonUtil.toJson(req)); - + cb.onResult(JsonUtil.toJson(req)); //delete state file if (file.isFile() && file.exists()) { file.delete(); @@ -200,14 +191,15 @@ public class MasterClientTransferAction { @Action(async = true) public void onSendMemory(JsonObject jo, ResultCallback result) { //step5 close connect - LOGGER.info("onSendMemory"); - if (handler != null) { - LOGGER.info("handler close"); - action.closeMaster(); - } - if (MasterProxy.CONNECTORS.containsKey(master)) { - MasterProxy.CONNECTORS.remove(master); - } + LOGGER.info("onSendMemory done"); +// if (handler != null) { +// LOGGER.info("handler close"); +// NetworkManager.instance.closeAgent(master); +// +// } +// if (NetworkManager.CONNECTORS.containsKey(master)) { +// NetworkManager.CONNECTORS.remove(master); +// } LOGGER.info("transfer contract instance finished."); } } diff --git a/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java b/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java index b6e2556..38f6025 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java @@ -16,10 +16,8 @@ import org.bdware.sc.util.JsonUtil; import org.bdware.server.GlobalConf; import org.bdware.server.action.Action; import org.bdware.server.action.CMActions; -import org.bdware.server.tcp.TCPServerFrameHandler; -import org.bdware.server.trustedmodel.ContractUnitStatus; import org.bdware.server.executor.RequestAllExecutor; -import org.bdware.server.trustedmodel.SlaveNode; +import org.bdware.server.trustedmodel.ContractUnitStatus; import org.bdware.units.NetworkManager; import org.zz.gmhelper.SM2KeyPair; @@ -31,11 +29,9 @@ import java.util.zip.GZIPOutputStream; public class MasterServerRecoverMechAction { private static final Logger LOGGER = LogManager.getLogger(MasterServerRecoverMechAction.class); public static Map> recoverStatus = new ConcurrentHashMap<>(); - private final TCPServerFrameHandler handler; private Map stateFileMap = new HashMap<>(); - public MasterServerRecoverMechAction(TCPServerFrameHandler masterFrameHandler) { - handler = masterFrameHandler; + public MasterServerRecoverMechAction() { } // 从disk-durable恢复 @@ -54,8 +50,7 @@ public class MasterServerRecoverMechAction { int temp = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getCurSeqAtMaster(); request.put("unitLastExeSeq", temp + ""); - SlaveNode node = MasterServerTCPAction.id2Slaves.get(nodeID); - node.connection.sendMsg(JsonUtil.toJson(request)); + NetworkManager.instance.sendToAgent(nodeID, JsonUtil.toJson(request)); } // 通过从别的节点loadMemory来恢复 @@ -126,7 +121,7 @@ public class MasterServerRecoverMechAction { number = (file.length() / unit) + 1; } byte[] buff = new byte[unit]; - SlaveNode node = MasterServerTCPAction.id2Slaves.get(nodeID); + long times = file.length() / unit; times++; LOGGER.info("times=" + times); @@ -142,7 +137,7 @@ public class MasterServerRecoverMechAction { req.put("isDone", true + ""); } LOGGER.info("len=" + len + " index=" + index); - node.connection.sendMsg(JsonUtil.toJson(req)); + NetworkManager.instance.sendToAgent(nodeID, JsonUtil.toJson(req)); index++; req.put("isAppend", "true"); @@ -177,7 +172,7 @@ public class MasterServerRecoverMechAction { recoverStatus.get(masterID).put(contractID, RecoverFlag.Recovering); // 找一个普通节点来dump - SlaveNode relyNode = null; + String relyNode = null; for (String nodeID : CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers()) { if (nodeID.equals(masterID)) { @@ -186,9 +181,9 @@ public class MasterServerRecoverMechAction { if (recoverStatus.get(nodeID).get(contractID) != RecoverFlag.Fine) { continue; } - - if (MasterServerTCPAction.id2Slaves.containsKey(nodeID)) { - relyNode = MasterServerTCPAction.id2Slaves.get(nodeID); + //TODO????似乎人重连接? + if (NetworkManager.instance.hasAgentConnection(nodeID)) { + relyNode = nodeID; break; } } @@ -198,12 +193,12 @@ public class MasterServerRecoverMechAction { return; } - LOGGER.info("master向 " + relyNode.pubKey.substring(0, 5) + " 请求恢复!"); + LOGGER.info("master向 " + relyNode.substring(0, 5) + " 请求恢复!"); Map req = new HashMap<>(); req.put("action", "requestState"); req.put("contractID", contractID); - relyNode.connection.sendMsg(JsonUtil.toJson(req)); + NetworkManager.instance.sendToAgent(relyNode, JsonUtil.toJson(req)); } public static void unitModeCheck(String contractID) { @@ -217,7 +212,7 @@ public class MasterServerRecoverMechAction { synchronized (mpci) { int total = 0, fineNum = 0; for (String nodeId : mpci.getMembers()) { - if (MasterServerTCPAction.id2Slaves.containsKey(nodeId) + if (NetworkManager.instance.hasAgentConnection(nodeId) && recoverStatus.containsKey(nodeId) && recoverStatus.get(nodeId).containsKey(contractID) && recoverStatus.get(nodeId).get(contractID) == RecoverFlag.Fine) { @@ -240,50 +235,31 @@ public class MasterServerRecoverMechAction { + (fineNum > Math.ceil(total / 2)) + " mpci.unitStatus=" + mpci.unitStatus); - if (fineNum > Math.ceil((double) total / 2)) { - LOGGER.info( - "合约" + contractID + "的集群更改模式为" + ContractUnitStatus.CommonMode.toString()); + ContractUnitStatus unitStatus = ContractUnitStatus.CommonMode; - Map req = new HashMap(); - req.put("action", "changeUnitStatus"); - req.put("contractID", contractID); - req.put("mode", ContractUnitStatus.CommonMode.toString()); - for (String nodeId : mpci.getMembers()) { - if (MasterServerTCPAction.id2Slaves.containsKey(nodeId)) { - SlaveNode sNode = MasterServerTCPAction.id2Slaves.get(nodeId); - LOGGER.info( - "发消息给节点 " - + sNode.pubKey.substring(0, 5) - + " 设置合约" - + contractID - + "的集群模式为CommonMode"); - sNode.connection.sendMsg(JsonUtil.toJson(req)); - } - } - mpci.unitStatus = ContractUnitStatus.CommonMode; - - } else if (fineNum <= Math.ceil((double) total / 2)) { - LOGGER.info( - "合约" + contractID + "的集群更改模式为" + ContractUnitStatus.StableMode.toString()); - - Map req = new HashMap(); - req.put("action", "changeUnitStatus"); - req.put("contractID", contractID); - req.put("mode", ContractUnitStatus.StableMode.toString()); - for (String nodeId : mpci.getMembers()) { - if (MasterServerTCPAction.id2Slaves.containsKey(nodeId)) { - SlaveNode sNode = MasterServerTCPAction.id2Slaves.get(nodeId); - LOGGER.info( - "发消息给节点 " - + sNode.pubKey.substring(0, 5) - + " 设置合约" - + contractID - + "的集群模式为StableMode"); - sNode.connection.sendMsg(JsonUtil.toJson(req)); - } - } - mpci.unitStatus = ContractUnitStatus.StableMode; + if (fineNum <= Math.ceil((double) total / 2)) { + unitStatus = ContractUnitStatus.StableMode; } + + LOGGER.info( + "合约" + contractID + "的集群更改模式为" + unitStatus); + + Map req = new HashMap(); + req.put("action", "changeUnitStatus"); + req.put("contractID", contractID); + req.put("mode", unitStatus.toString()); + for (String nodeId : mpci.getMembers()) { + NetworkManager.instance.sendToAgentIfConnected(nodeId, JsonUtil.toJson(req)); + LOGGER.info( + "发消息给节点 " + + nodeId.substring(0, 5) + + " 设置合约" + + contractID + + "的集群模式为StableMode"); + + + } + mpci.unitStatus = unitStatus; } } @@ -470,7 +446,7 @@ public class MasterServerRecoverMechAction { Map request = new HashMap<>(); request.put("action", "sendRecoverFinish"); request.put("contractID", contractID); - handler.sendMsg(JsonUtil.toJson(request)); + cb.onResult(JsonUtil.toJson(request)); return; } } @@ -503,7 +479,7 @@ public class MasterServerRecoverMechAction { String[] str = records.split("==>>"); String last = str[1]; records = str[0]; - SlaveNode node = MasterServerTCPAction.id2Slaves.get(nodeId); + String path = contractID + "_temp_TransFile_" + new Random().nextLong(); File file = new File(GlobalConf.instance.projectDir + "/stateFiles/" + path); File parent = file.getParentFile(); @@ -559,7 +535,7 @@ public class MasterServerRecoverMechAction { req.put("last", last); } LOGGER.info("len=" + len + " index=" + index); - node.connection.sendMsg(JsonUtil.toJson(req)); + NetworkManager.instance.sendToAgent(nodeId, JsonUtil.toJson(req)); index++; req.put("isAppend", "true"); @@ -650,8 +626,7 @@ public class MasterServerRecoverMechAction { req.put("isMaster", "true"); req.put("filePath", dir.getAbsolutePath()); SM2KeyPair keyPair = GlobalConf.instance.keyPair; - SlaveNode selfNode = MasterServerTCPAction.id2Slaves.get(keyPair.getPublicKeyStr()); - selfNode.connection.sendMsg(JsonUtil.toJson(req)); + NetworkManager.instance.sendToAgent(keyPair.getPublicKeyStr(), JsonUtil.toJson(req)); } } catch (IOException e) { e.printStackTrace(); diff --git a/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java b/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java index 9107850..890c1ea 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java @@ -4,7 +4,6 @@ import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.bdware.heartbeat.HeartBeatUtil; import org.bdware.sc.ContractClient; import org.bdware.sc.ContractManager; import org.bdware.sc.ContractMeta; @@ -13,7 +12,6 @@ import org.bdware.sc.bean.ContractExecType; import org.bdware.sc.bean.ContractRequest; import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.units.MultiContractMeta; -import org.bdware.sc.units.RecoverFlag; import org.bdware.sc.units.RequestCache; import org.bdware.sc.util.JsonUtil; import org.bdware.server.CongestionControl; @@ -21,13 +19,14 @@ import org.bdware.server.action.Action; import org.bdware.server.action.CMActions; import org.bdware.server.action.SyncResult; import org.bdware.server.executor.RequestAllExecutor; -import org.bdware.server.tcp.TCPServerFrameHandler; import org.bdware.server.trustedmodel.KillUnitContractResultCollector; import org.bdware.server.trustedmodel.ResultCollector; -import org.bdware.server.trustedmodel.SlaveNode; +import org.bdware.units.NetworkManager; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -38,7 +37,6 @@ public class MasterServerTCPAction { public static final org.bdware.server.action.SyncResult sync = new SyncResult(); private static final Logger LOGGER = LogManager.getLogger(MasterServerTCPAction.class); // key is nodes' pubKey - public static Map id2Slaves = new ConcurrentHashMap<>(); public static Map requestCache = new ConcurrentHashMap<>(); static { @@ -61,30 +59,10 @@ public class MasterServerTCPAction { TimeUnit.SECONDS); } - public final TCPServerFrameHandler handler; - TimerTask checkAliveTask; - private long lastSlavePingTime; - - public MasterServerTCPAction(TCPServerFrameHandler masterFrameHandler) { - handler = masterFrameHandler; - // 心跳 - // int delay = 200; - // int delay = 50; - lastSlavePingTime = - System.currentTimeMillis() - + 5000L - + MasterClientTCPAction.sendDelay - + MasterClientTCPAction.checkDelay; - checkAliveTask = new HeartBeatTask(MasterClientTCPAction.sendDelay); - HeartBeatUtil.getInstance() - .schedule( - checkAliveTask, - MasterClientTCPAction.sendDelay, - MasterClientTCPAction.checkDelay); - + public MasterServerTCPAction() { } - private static void notifyNodeOffline(String contractID, String nodeID) { + public static void notifyNodeOffline(String contractID, String nodeID) { synchronized (sync) { for (String requestID : sync.waitObj.keySet()) { ResultCallback cb = sync.waitObj.get(requestID); @@ -93,21 +71,23 @@ public class MasterServerTCPAction { if (rc.getCommitter() instanceof RequestAllExecutor.ResultMerger) { RequestAllExecutor.ResultMerger merger = (RequestAllExecutor.ResultMerger) rc.getCommitter(); - LOGGER.info("node " + nodeID + " offline! in the cluster of contract " + contractID); - LOGGER.debug("nodeID=" + nodeID + " contractID=" + contractID + - ": " + merger.getInfo()); - ContractResult cr = - new ContractResult( - ContractResult.Status.Exception, - new JsonPrimitive("node offline")); - JsonObject jo = new JsonObject(); - jo.addProperty("data", JsonUtil.toJson(cr)); - jo.addProperty("responseID", requestID); - jo.addProperty("action", "receiveTrustfullyResult"); - jo.addProperty("nodeID", nodeID); - LOGGER.debug("[cb] nodeID=" + nodeID + " contractID=" + contractID + - ": " + jo); - sync.wakeUp(requestID, jo.toString()); + if (merger.getContractID().equals(contractID)) { + LOGGER.info("node " + nodeID + " offline! in the cluster of contract " + contractID); + LOGGER.debug("nodeID=" + nodeID + " contractID=" + contractID + + ": " + merger.getInfo()); + ContractResult cr = + new ContractResult( + ContractResult.Status.Exception, + new JsonPrimitive("node offline")); + JsonObject jo = new JsonObject(); + jo.addProperty("data", JsonUtil.toJson(cr)); + jo.addProperty("responseID", requestID); + jo.addProperty("action", "receiveTrustfullyResult"); + jo.addProperty("nodeID", nodeID); + LOGGER.debug("[cb] nodeID=" + nodeID + " contractID=" + contractID + + ": " + jo); + sync.wakeUp(requestID, jo.toString()); + } } } } @@ -147,32 +127,6 @@ public class MasterServerTCPAction { return false; } - public static void killContract( - ContractClient client, JsonObject request, ResultCallback resultCallback) { - LOGGER.info("[MasterServerTCPAction] killContract : "); - MultiContractMeta contractMeta = - CMActions.manager.multiContractRecorder.getMultiContractMeta( - client.getContractID()); - if (null != contractMeta && contractMeta.isMaster()) { // 如果是master - ResultCollector collector = - new ResultCollector( - request.get("requestID").getAsString(), - resultCallback, - contractMeta.getMembers().length); - sync.sleep(request.get("requestID").getAsString(), collector); - for (String member : contractMeta.getMembers()) { - SlaveNode node = id2Slaves.get(member); - node.connection.sendMsg(request.toString()); - } - } else { - // TODO 如果不是master 应该返回出错,这里暂时只kill本地!用于调试 - // logger.info( - // CMActions.manager.stopContractWithOwner( - // request.get("verifiedPubKey").getAsString(), - // request.get("id").getAsString())); - } - } - private static MultiContractMeta getMPCInfo(String contractID) { ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID); MultiContractMeta ret = @@ -189,28 +143,18 @@ public class MasterServerTCPAction { return ret; } - @Action(async = true) - public void masterPing(JsonObject args, ResultCallback cb) { - lastSlavePingTime = System.currentTimeMillis(); - Map request = new HashMap<>(); - request.put("action", "masterPong"); - sendMsg(JsonUtil.toJson(request)); - } - @Action(async = true) public void sendCachedRequests(JsonObject args, ResultCallback cb) { String contractID = args.get("contractID").getAsString(); int start = args.get("start").getAsInt(); int end = args.get("end").getAsInt(); String nodeID = args.get("nodeID").getAsString(); - SlaveNode node = id2Slaves.get(nodeID); - RequestCache cache = getCache(contractID); for (int i = start + 1; i < end; i++) { if (cache != null && cache.containsKey(i)) { String jo = cache.get(i); LOGGER.info("Master发送缓存的请求\n" + jo); - node.connection.sendMsg(jo); + NetworkManager.instance.sendToAgent(nodeID, jo); } } } @@ -239,8 +183,7 @@ public class MasterServerTCPAction { jo.get("requestID").getAsString(), cb, contractMeta.getMembers().length); sync.sleep(jo.get("requestID").getAsString(), collector); for (String member : contractMeta.getMembers()) { - SlaveNode node = id2Slaves.get(member); - node.connection.sendMsg(jo.toString()); + NetworkManager.instance.sendToAgent(member, jo.toString()); } } @@ -269,33 +212,15 @@ public class MasterServerTCPAction { // cb.onResult(JsonUtil.toJson(ret)); // } - @Action(async = true) - public void setNodeInfo(JsonObject jo, ResultCallback cb) { - LOGGER.info("[MasterServerTCPAction] setNodeInfo : "); - - String nodeID = jo.get("pubKey").getAsString(); - handler.pubKey = nodeID; - SlaveNode node = new SlaveNode(nodeID, this); - id2Slaves.put(nodeID, node); - LOGGER.info("[MasterServerTCPAction] setNodeInfo id2Slaves put " + nodeID.substring(0, 5)); - - // 通知node可以检测master - Map request = new HashMap<>(); - request.put("action", "checkMasterAlive"); - sendMsg(JsonUtil.toJson(request)); - } @Action(async = true) public void onStartContractTrustfully(JsonObject jo, ResultCallback cb) { sync.wakeUp(jo.get("requestID").getAsString(), jo.toString()); } - public void sendMsg(String req) { - handler.sendMsg(req); - } @Action(async = true) - public void requestContractExecution(JsonObject jo, ResultCallback cb) { + public void requestContractExecutionServer(JsonObject jo, ResultCallback cb) { try { int maxMasterServerLoad = CongestionControl.masterServerLoad.incrementAndGet(); if (maxMasterServerLoad > CongestionControl.maxMasterServerLoad) @@ -340,7 +265,7 @@ public class MasterServerTCPAction { @Override public void onResult(String str) { Map result = new HashMap<>(); - result.put("action", "receiveContractExecution"); + result.put("action", "receiveContractExecutionServer"); result.put("responseID", requestID); result.put("data", str); cb.onResult(JsonUtil.toJson(result)); @@ -362,12 +287,10 @@ public class MasterServerTCPAction { @Override public void onResult(String str) { Map result = new HashMap<>(); - result.put("action", "receiveContractExecution"); + result.put("action", "receiveContractExecutionServer"); result.put("responseID", cr.get("requestID").getAsString()); result.put("data", str); cb.onResult(JsonUtil.toJson(result)); - LOGGER.debug("Return requestContractExecution:"+JsonUtil.toJson(result)); - CongestionControl.masterServerLoad.decrementAndGet(); } }, null); @@ -386,73 +309,5 @@ public class MasterServerTCPAction { } } - private class HeartBeatTask extends TimerTask { - int delay; - HeartBeatTask(int delay) { - this.delay = delay; - } - - @Override - public void run() { - try { - long cur = System.currentTimeMillis(); - // LOGGER.info( - // "Master 心跳检测 " + handler.pubKey.substring(0,5) + " " - // + new - // SimpleDateFormat("yyyy-MM-dd.HH:mm:ss").format(cur) - // + " " - // + new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss") - // .format(lastSlavePingTime)); - if (cur - lastSlavePingTime >= (2L * delay)) { - LOGGER.info( - new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss").format(lastSlavePingTime) - + " " - + delay); - Set contracts = new HashSet<>(); - String nodeID = handler.pubKey; - if (nodeID == null) { - LOGGER.info("nodeID == null " + this); - HeartBeatUtil.getInstance().cancel(this); - return; - } - - SlaveNode info = id2Slaves.get(nodeID); - LOGGER.info( - "Master心跳机制发现节点 " - + nodeID.substring(0, 5) - + " 下线! " - + this.toString()); - HeartBeatUtil.getInstance().cancel(this); - - id2Slaves.remove(nodeID); - try { - info.connection.handler.ctx.close(); - } catch (Exception e) { - e.printStackTrace(); - } - - for (String contractID : - MasterServerRecoverMechAction.recoverStatus.get(nodeID).keySet()) { - // RecoverFlag flag = - // - // MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID); - // if (flag == RecoverFlag.Fine) - MasterServerRecoverMechAction.recoverStatus - .get(nodeID) - .put(contractID, RecoverFlag.ToRecover); - contracts.add(contractID); - notifyNodeOffline(contractID, nodeID); - } - - // 某个节点下线,需要检测所处集群是否需要状态切换 - for (String contractID : contracts) { - MasterServerRecoverMechAction.unitModeCheck(contractID); - } - } // if - } catch (Exception e) { - e.printStackTrace(); - } - } // run - } } diff --git a/src/main/java/org/bdware/server/action/p2p/MasterServerTransferAction.java b/src/main/java/org/bdware/server/action/p2p/MasterServerTransferAction.java index da9ea0c..5562f26 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterServerTransferAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterServerTransferAction.java @@ -9,11 +9,9 @@ import org.bdware.sc.bean.Contract; import org.bdware.sc.bean.ContractExecType; import org.bdware.sc.conn.ByteUtil; import org.bdware.sc.conn.ResultCallback; -import org.bdware.sc.util.JsonUtil; import org.bdware.server.GlobalConf; import org.bdware.server.action.Action; import org.bdware.server.action.CMActions; -import org.bdware.server.tcp.TCPServerFrameHandler; import java.io.*; import java.util.HashMap; @@ -22,11 +20,9 @@ import java.util.zip.GZIPInputStream; public class MasterServerTransferAction { private static final Logger LOGGER = LogManager.getLogger(MasterServerTransferAction.class); - private final TCPServerFrameHandler handler; private Map fileMap = new HashMap<>(); - public MasterServerTransferAction(TCPServerFrameHandler h) { - handler = h; + public MasterServerTransferAction() { } @Action(async = true) @@ -73,7 +69,7 @@ public class MasterServerTransferAction { Map request = new HashMap<>(); request.put("action", "onSendAndStart"); request.put("contractID", contractID); - handler.sendMsg(JsonUtil.toJson(request)); + cb.onResult(request); } else { String data = args.get("data").getAsString(); try { @@ -112,7 +108,7 @@ public class MasterServerTransferAction { Map request = new HashMap<>(); request.put("action", "onSendAndStart"); request.put("contractID", contractID); - handler.sendMsg(JsonUtil.toJson(request)); + cb.onResult(request); } public String startByScript(String script, String contractID) { @@ -163,7 +159,7 @@ public class MasterServerTransferAction { load(contractID, dir.getAbsolutePath()); Map request = new HashMap<>(); request.put("action", "onSendMemory"); - handler.sendMsg(JsonUtil.toJson(request)); + rc.onResult(request); } else { String data = args.get("data").getAsString(); try { diff --git a/src/main/java/org/bdware/server/action/p2p/RecoveryAction.java b/src/main/java/org/bdware/server/action/p2p/RecoveryAction.java deleted file mode 100644 index b616b0b..0000000 --- a/src/main/java/org/bdware/server/action/p2p/RecoveryAction.java +++ /dev/null @@ -1,8 +0,0 @@ -package org.bdware.server.action.p2p; - -/** - * 原MasterServerTCPAction,MasterServerTransferAction,MasterServerRecoverMechAction, - * MasterClientTCPAction,MasterClientTransferAction,MasterClientRecoverMechAction合并 - */ -public class RecoveryAction { -} diff --git a/src/main/java/org/bdware/server/action/p2p/UnitsInfoAction.java b/src/main/java/org/bdware/server/action/p2p/UnitsInfoAction.java index 3abea09..4aeb182 100644 --- a/src/main/java/org/bdware/server/action/p2p/UnitsInfoAction.java +++ b/src/main/java/org/bdware/server/action/p2p/UnitsInfoAction.java @@ -29,7 +29,7 @@ public class UnitsInfoAction { JsonObject jsonObject = new JsonObject(); jsonObject.addProperty("action", "getContractUnit"); jsonObject.addProperty("contractId", contractId); - ContractUnitManager.instance.send(jsonObject.toString(), new String[]{NetworkManager.instance.getTcpNodeCenter()}, callback); + ContractUnitManager.instance.send(jsonObject.toString(), new String[]{"NetworkManager.instance.getTcpNodeCenter()"}, callback); } } @@ -43,7 +43,7 @@ public class UnitsInfoAction { JsonObject jsonObject = new JsonObject(); jsonObject.addProperty("action", "getContractUnitMaster"); jsonObject.addProperty("content", contractId); - ContractUnitManager.instance.send(jsonObject.toString(), new String[]{NetworkManager.instance.getTcpNodeCenter()}, callback); + ContractUnitManager.instance.send(jsonObject.toString(), new String[]{"NetworkManager.instance.getTcpNodeCenter()"}, callback); } } diff --git a/src/main/java/org/bdware/server/action/p2p/ExecutionAction.java b/src/main/java/org/bdware/server/action/p2p/_UNUSED_ExecutionAction.java similarity index 98% rename from src/main/java/org/bdware/server/action/p2p/ExecutionAction.java rename to src/main/java/org/bdware/server/action/p2p/_UNUSED_ExecutionAction.java index be4bcd3..7a12a7f 100644 --- a/src/main/java/org/bdware/server/action/p2p/ExecutionAction.java +++ b/src/main/java/org/bdware/server/action/p2p/_UNUSED_ExecutionAction.java @@ -31,10 +31,10 @@ import org.bdware.units.msghandler.ResponseCenter; import java.io.File; -public class ExecutionAction implements OnHashCallback { - private static final Logger LOGGER = LogManager.getLogger(ExecutionAction.class); +public class _UNUSED_ExecutionAction implements OnHashCallback { + private static final Logger LOGGER = LogManager.getLogger(_UNUSED_ExecutionAction.class); - public ExecutionAction() { + public _UNUSED_ExecutionAction() { } // TODO TOMerge diff --git a/src/main/java/org/bdware/server/executor/RequestAllExecutor.java b/src/main/java/org/bdware/server/executor/RequestAllExecutor.java index 22faf72..ca6bd88 100644 --- a/src/main/java/org/bdware/server/executor/RequestAllExecutor.java +++ b/src/main/java/org/bdware/server/executor/RequestAllExecutor.java @@ -22,7 +22,7 @@ import org.bdware.server.action.p2p.MasterServerTCPAction; import org.bdware.server.trustedmodel.ContractExecutor; import org.bdware.server.trustedmodel.MultiReqSeq; import org.bdware.server.trustedmodel.ResultCollector; -import org.bdware.server.trustedmodel.SlaveNode; +import org.bdware.units.NetworkManager; import java.util.HashMap; import java.util.HashSet; @@ -87,9 +87,8 @@ public class RequestAllExecutor implements ContractExecutor { LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID()); LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes)); for (String node : nodes) { - SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(node); LOGGER.debug("get cmNode " + node.substring(0, 5)); - if (null == cmNode) { + if (!NetworkManager.instance.hasAgentConnection(node)) { LOGGER.warn("cmNode " + node.substring(0, 5) + " is null"); collector.onResult( "{\"status\":\"Error\",\"result\":\"node offline\"," @@ -105,10 +104,10 @@ public class RequestAllExecutor implements ContractExecutor { + node + "\"," + "\"action\":\"onExecuteContractTrustfully\"}"); - cmNode.connection.sendMsg(sendStr); + NetworkManager.instance.sendToAgent(node, sendStr); } else { LOGGER.info("send request to cmNode " + node.substring(0, 5)); - cmNode.connection.sendMsg(sendStr); + NetworkManager.instance.sendToAgent(node, sendStr); } } } @@ -116,19 +115,19 @@ public class RequestAllExecutor implements ContractExecutor { public boolean checkCurNodeNumValid() { String[] nodes = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers(); - int validNode = 0; + Map mapResult = new HashMap<>(); for (String node : nodes) { - SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(node); - if (null != cmNode + mapResult.put(node.substring(0, 5), String.format("%s %s", NetworkManager.instance.hasAgentConnection(node) + "", + MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID))); + if (NetworkManager.instance.hasAgentConnection(node) && MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID) == RecoverFlag.Fine) { validNode++; } } - + LOGGER.info(JsonUtil.toPrettyJson(mapResult)); int c = resultCount; - // TODO if (ContractExecType.RequestAllResponseAll.equals(type)) { c = (int) Math.ceil((double) c / 2); } @@ -236,6 +235,10 @@ public class RequestAllExecutor implements ContractExecutor { order = new AtomicInteger(0); } + public String getContractID() { + return contractID; + } + public String getInfo() { return "contractID=" + contractID diff --git a/src/main/java/org/bdware/server/executor/unconsistency/MultiPointCooperationExecutor.java b/src/main/java/org/bdware/server/executor/unconsistency/MultiPointCooperationExecutor.java index fe08596..dd8ac64 100644 --- a/src/main/java/org/bdware/server/executor/unconsistency/MultiPointCooperationExecutor.java +++ b/src/main/java/org/bdware/server/executor/unconsistency/MultiPointCooperationExecutor.java @@ -25,7 +25,7 @@ import org.bdware.server.action.p2p.MasterServerTCPAction; import org.bdware.server.trustedmodel.ContractExecutor; import org.bdware.server.trustedmodel.MultiReqSeq; import org.bdware.server.trustedmodel.ResultCollector; -import org.bdware.server.trustedmodel.SlaveNode; +import org.bdware.units.NetworkManager; import java.math.BigInteger; import java.util.HashMap; @@ -95,11 +95,8 @@ public class MultiPointCooperationExecutor implements ContractExecutor { LOGGER.debug("nodes:" + JsonUtil.toJson(nodes)); for (String node : nodes) { LOGGER.info(node); - // String node = nodes.get(n);//根据下标随机获得一个 - SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(node); LOGGER.info("[sendRequests] get cmNode " + node.substring(0, 5)); - // 向slave发送数据 获得结果调用onResult函数 - if (cmNode == null) { + if (!NetworkManager.instance.hasAgentConnection(node)) { LOGGER.info("[sendRequests] get cmNode " + node.substring(0, 5) + " is null"); collector.onResult( "{\"status\":\"Error\",\"result\":\"node offline\"," @@ -115,7 +112,7 @@ public class MultiPointCooperationExecutor implements ContractExecutor { + node + "\"," + "\"action\":\"onExecuteContractTrustfully\"}"); - cmNode.connection.sendMsg(sendStr); + NetworkManager.instance.sendToAgent(node, sendStr); } else { LOGGER.info( "[sendRequests] get cmNode " @@ -123,7 +120,7 @@ public class MultiPointCooperationExecutor implements ContractExecutor { + " not null " + "RequestAllExecutor 发送请求给 " + node.substring(0, 5)); - cmNode.connection.sendMsg(sendStr); + NetworkManager.instance.sendToAgent(node, sendStr); } } } @@ -184,16 +181,13 @@ public class MultiPointCooperationExecutor implements ContractExecutor { // List nodes = info.members; int validNode = 0; for (String node : nodes) { - SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(node); - if (cmNode != null + if (NetworkManager.instance.hasAgentConnection(node) && MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID) == RecoverFlag.Fine) { validNode++; } } - int c = resultCount; - // TODO if (type == ContractExecType.Sharding) c = (int) Math.ceil((double) c / 2); LOGGER.info("c=" + c + " validNode=" + validNode); return validNode >= c; diff --git a/src/main/java/org/bdware/server/executor/unconsistency/RequestOnceExecutor.java b/src/main/java/org/bdware/server/executor/unconsistency/RequestOnceExecutor.java index 9bcc541..f8a7f95 100644 --- a/src/main/java/org/bdware/server/executor/unconsistency/RequestOnceExecutor.java +++ b/src/main/java/org/bdware/server/executor/unconsistency/RequestOnceExecutor.java @@ -8,10 +8,12 @@ import org.bdware.sc.bean.ContractRequest; import org.bdware.sc.conn.OnHashCallback; import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.util.JsonUtil; +import org.bdware.server.ControllerManager; import org.bdware.server.action.CMActions; import org.bdware.server.action.p2p.MasterServerTCPAction; import org.bdware.server.trustedmodel.ContractExecutor; -import org.bdware.server.trustedmodel.SlaveNode; +import org.bdware.server.trustedmodel.AgentManager; +import org.bdware.units.NetworkManager; import java.util.HashMap; import java.util.Map; @@ -25,6 +27,7 @@ public class RequestOnceExecutor implements ContractExecutor { public RequestOnceExecutor(String contractID) { this.contractID = contractID; } + @Override public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { ResultCallback cb = @@ -46,17 +49,17 @@ public class RequestOnceExecutor implements ContractExecutor { MasterServerTCPAction.sync.sleep(requestID, cb); String[] members = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers(); for (int i = 0; i < members.length; i++) { + LOGGER.info("[members]:" + members.length); int size = members.length; String nodeID = members[order.incrementAndGet() % size]; - SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(nodeID); - if (cmNode != null) { - Map obj = new HashMap<>(); - obj.put("action", "executeContractLocally"); - obj.put("data", req); - obj.put("uniReqID", requestID); - cmNode.connection.sendMsg(JsonUtil.toJson(obj)); - return; - } + //ADD Connect + Map obj = new HashMap<>(); + obj.put("action", "executeContractLocally"); + obj.put("requestID",requestID); + obj.put("data", req); + obj.put("uniReqID", requestID); + NetworkManager.instance.sendToAgent(nodeID,JsonUtil.toJson(obj)); + return; } rc.onResult( "{\"status\":\"Error\",\"result\":\"all nodes " diff --git a/src/main/java/org/bdware/server/executor/unconsistency/ResponseOnceExecutor.java b/src/main/java/org/bdware/server/executor/unconsistency/ResponseOnceExecutor.java index ea16df3..485e708 100644 --- a/src/main/java/org/bdware/server/executor/unconsistency/ResponseOnceExecutor.java +++ b/src/main/java/org/bdware/server/executor/unconsistency/ResponseOnceExecutor.java @@ -12,7 +12,7 @@ import org.bdware.sc.util.JsonUtil; import org.bdware.server.action.CMActions; import org.bdware.server.action.p2p.MasterServerTCPAction; import org.bdware.server.trustedmodel.ContractExecutor; -import org.bdware.server.trustedmodel.SlaveNode; +import org.bdware.units.NetworkManager; import java.util.HashMap; import java.util.Map; @@ -71,15 +71,12 @@ public class ResponseOnceExecutor implements ContractExecutor { for (int i = 0; i < members.length; i++) { int size = members.length; String nodeID = members[order.incrementAndGet() % size]; - SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(nodeID); - if (cmNode != null) { - Map obj = new HashMap<>(); - obj.put("action", "executeContractLocally"); - obj.put("data", req); - obj.put("uniReqID", requestID); - cmNode.connection.sendMsg(JsonUtil.toJson(obj)); - return true; - } + Map obj = new HashMap<>(); + obj.put("action", "executeContractLocally"); + obj.put("data", req); + obj.put("uniReqID", requestID); + NetworkManager.instance.sendToAgent(nodeID, JsonUtil.toJson(obj)); + return true; } return false; } diff --git a/src/main/java/org/bdware/server/executor/unconsistency/_UNUSED_RouteEnabledExecutor.java b/src/main/java/org/bdware/server/executor/unconsistency/_UNUSED_RouteEnabledExecutor.java index 640c0d2..e962aa6 100644 --- a/src/main/java/org/bdware/server/executor/unconsistency/_UNUSED_RouteEnabledExecutor.java +++ b/src/main/java/org/bdware/server/executor/unconsistency/_UNUSED_RouteEnabledExecutor.java @@ -16,7 +16,7 @@ import org.bdware.sc.util.JsonUtil; import org.bdware.server.action.CMActions; import org.bdware.server.action.p2p.MasterServerTCPAction; import org.bdware.server.trustedmodel.ContractExecutor; -import org.bdware.server.trustedmodel.SlaveNode; +import org.bdware.units.NetworkManager; import java.math.BigInteger; import java.util.HashMap; @@ -95,15 +95,12 @@ public class _UNUSED_RouteEnabledExecutor implements ContractExecutor { int size = members.length; if (hash != -1) nodeID = members[hash % size]; else nodeID = members[order.incrementAndGet() % size]; - SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(nodeID); - if (cmNode != null) { - Map obj = new HashMap<>(); - obj.put("action", "executeContractLocally"); - obj.put("data", req); - obj.put("uniReqID", requestID); - cmNode.connection.sendMsg(JsonUtil.toJson(obj)); - return true; - } + Map obj = new HashMap<>(); + obj.put("action", "executeContractLocally"); + obj.put("data", req); + obj.put("uniReqID", requestID); + NetworkManager.instance.sendToAgent(nodeID, JsonUtil.toJson(obj)); + return true; } return false; } diff --git a/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientController.java b/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientController.java index 23a2093..6c134ce 100644 --- a/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientController.java +++ b/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientController.java @@ -2,12 +2,6 @@ package org.bdware.server.nodecenter.client; import com.google.gson.JsonNull; import com.google.gson.JsonObject; -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bdware.sc.*; @@ -22,7 +16,6 @@ import org.bdware.sc.event.REvent; import org.bdware.sc.units.MultiContractMeta; import org.bdware.sc.util.HashUtil; import org.bdware.sc.util.JsonUtil; -import org.bdware.server.CMHttpServer; import org.bdware.server.GlobalConf; import org.bdware.server.action.Action; import org.bdware.server.action.CMActions; @@ -33,8 +26,8 @@ import org.bdware.server.action.p2p.MasterClientTCPAction; import org.bdware.server.action.p2p.MasterServerRecoverMechAction; import org.bdware.server.action.p2p.MasterServerTCPAction; import org.bdware.server.tcp.TCPClientFrameHandler; -import org.bdware.server.trustedmodel.MasterProxy; -import org.bdware.server.ws.DelimiterCodec; +import org.bdware.server.trustedmodel.AgentManager; +import org.bdware.units.NetworkManager; import org.bouncycastle.pqc.math.linearalgebra.ByteUtils; import org.zz.gmhelper.SM2KeyPair; import org.zz.gmhelper.SM2Util; @@ -51,6 +44,8 @@ public class NodeCenterClientController implements NodeCenterConn { public static SyncResult sync = new SyncResult(); private static boolean startCheck = false; private final Map fileMap; + public Map distributeReqMap = new ConcurrentHashMap<>(); + private final NetNeighbors neighbors; // public NodeCenterClientController cmClientController; String nodeID; @@ -93,28 +88,33 @@ public class NodeCenterClientController implements NodeCenterConn { return cr.result.getAsJsonObject(); } + public void distributeContract(String reqID, ResultCallback resultCallback, JsonObject jo) { + distributeReqMap.put(reqID, resultCallback); + sendMsg(JsonUtil.toJson(jo)); + } + // ==========Handler @Action(async = true) public void requestConnectToMaster(JsonObject jo, ResultCallback cb) { JsonObject keys = jo.get("key2address").getAsJsonObject(); for (String str : keys.keySet()) { - MasterProxy.slaverRouter.put(str, keys.get(str).getAsString()); + NetworkManager.instance.updateAgentRouter(str, keys.get(str).getAsString()); } String master = jo.get("master").getAsString(); LOGGER.info(" request connect to " + master.substring(0, 5)); if (jo.has("connectAll") && jo.get("connectAll").getAsBoolean()) { for (String str : keys.keySet()) { if (jo.has("contractID")) { - connectToMaster(str, jo.get("contractID").getAsString()); + NetworkManager.instance.connectToAgent(str, jo.get("contractID").getAsString()); } else { - connectToMaster(str, null); + NetworkManager.instance.connectToAgent(str, null); } } } else { if (jo.has("contractID")) { - connectToMaster(master, jo.get("contractID").getAsString()); + NetworkManager.instance.connectToAgent(master, jo.get("contractID").getAsString()); } else { - connectToMaster(master, null); + NetworkManager.instance.connectToAgent(master, null); } } } @@ -352,6 +352,12 @@ public class NodeCenterClientController implements NodeCenterConn { sync.wakeUp(jo.get("responseID").getAsString(), jo.toString()); } + @Action(async = true) + public void onQueryNodeAddress(JsonObject jo, ResultCallback cb) { + LOGGER.info("onQueryNodeAddress:" + jo.toString()); + sync.wakeUp(jo.get("responseID").getAsString(), jo.toString()); + } + @Override public String routeContract(String contractID) { LOGGER.info("[CMClientController] routeContract : " + contractID); @@ -364,6 +370,30 @@ public class NodeCenterClientController implements NodeCenterConn { } } + public String connectToNode(String pubKey) { + Map req = new HashMap<>(); + req.put("action", "queryNodeAddress"); + req.put("pubKey", pubKey); + String requestID = System.currentTimeMillis() + "_" + (int) (Math.random() * 100000); + req.put("requestID", requestID); + sendMsg(JsonUtil.toJson(req)); + //TODO use async instead? + ContractResult cr = sync.syncSleep(requestID); + LOGGER.info("connectToNode result:" + JsonUtil.toJson(cr)); + if (!cr.result.equals(JsonNull.INSTANCE)) { + try { + JsonObject jo = cr.result.getAsJsonObject(); + NetworkManager.instance.updateAgentRouter( + jo.get("pubKey").getAsString(), jo.get("masterAddress").getAsString()); + NetworkManager.instance.connectToAgent(jo.get("pubKey").getAsString(), null); + return "success"; + } catch (Exception e) { + e.printStackTrace(); + } + } + return "failed"; + } + @Override public String reRouteContract(String contractID) { try { @@ -378,9 +408,9 @@ public class NodeCenterClientController implements NodeCenterConn { if (!cr.result.equals(JsonNull.INSTANCE)) { try { JsonObject jo = cr.result.getAsJsonObject(); - MasterProxy.slaverRouter.put( + NetworkManager.instance.updateAgentRouter( jo.get("pubKey").getAsString(), jo.get("masterAddress").getAsString()); - connectToMaster(jo.get("pubKey").getAsString(), null); + NetworkManager.instance.connectToAgent(jo.get("pubKey").getAsString(), null); LOGGER.info( "[CMClientController] " + contractID @@ -447,49 +477,14 @@ public class NodeCenterClientController implements NodeCenterConn { } public void queryUnitContractsID2(String contractID, String master) { - connectToMaster(master, null); // TODO 等待确保连接成功? - TCPClientFrameHandler clientHandler = MasterProxy.getHandler(master); + NetworkManager.instance.connectToAgent(master, null); // TODO 等待确保连接成功? + TCPClientFrameHandler clientHandler = AgentManager.getHandler(master); clientHandler.waitForSetNodeID(); RecoverMechTimeRecorder.connectMasterFinish = System.currentTimeMillis(); MasterClientRecoverMechAction.askForRecover(contractID, master, nodeID); } // TODO add syncPing - public void connectToMaster(String master, String contractID) { - LOGGER.info("[CMClientController] connectToMaster master= " + master); - - // logger.debug("ConnectToMaster:" + master + "\nMasterRoute:" + slaverRouter.get(master)); - Bootstrap b; - MasterProxy.MasterConnector connector = null; - synchronized (MasterProxy.CONNECTORS) { - if (!MasterProxy.CONNECTORS.containsKey(master)) { - connector = new MasterProxy.MasterConnector(); - MasterProxy.CONNECTORS.put(master, connector); - } - } - if (connector != null) { - b = new Bootstrap(); - TCPClientFrameHandler handler = new TCPClientFrameHandler(master); - connector.bootstrap = b; - connector.handler = handler; - b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000); - b.group(CMHttpServer.workerGroup); - if (contractID != null) { - handler.updateContractID2Client(contractID); - } - b.channel(NioSocketChannel.class) - .handler( - new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel ch) { - ChannelPipeline p = ch.pipeline(); - p.addLast(new DelimiterCodec()).addLast(handler); - } - }); - } - MasterProxy.reconnect(master); - } - public void getNodeInfos(ResultCallback resultCallback) { Map request = new HashMap<>(); request.put("action", "listCMInfo"); @@ -715,7 +710,7 @@ public class NodeCenterClientController implements NodeCenterConn { for (int i = 0; i < 20; ++i) { boolean all = true; for (String str : nodeNames) { - if (!MasterServerTCPAction.id2Slaves.containsKey(str)) { + if (NetworkManager.instance.hasAgentConnection(str)) { all = false; break; } @@ -742,12 +737,12 @@ public class NodeCenterClientController implements NodeCenterConn { public void onDistribute(JsonObject json, ResultCallback rc) { if (json.has("over")) { String distributeID = json.get("distributeID").getAsString(); - ResultCallback to = handler.distributeReqMap.get(distributeID); - handler.distributeReqMap.remove(distributeID); + ResultCallback to = distributeReqMap.get(distributeID); + distributeReqMap.remove(distributeID); to.onResult(json.get("content").getAsString()); } else { String distributeID = json.get("distributeID").getAsString(); - handler.distributeReqMap.get(distributeID).onResult(json.get("content").getAsString()); + distributeReqMap.get(distributeID).onResult(json.get("content").getAsString()); } } @@ -758,7 +753,7 @@ public class NodeCenterClientController implements NodeCenterConn { String pubKey = jo.get("nodeID").getAsString(); String contractID = jo.get("contractID").getAsString(); String address = jo.get("address").getAsString(); - MasterProxy.slaverRouter.put(pubKey, address); + NetworkManager.instance.updateAgentRouter(pubKey, address); ContractClient cc = CMActions.manager.getClient(contractID); if (cc == null) { @@ -769,7 +764,7 @@ public class NodeCenterClientController implements NodeCenterConn { if (null != pubKey) { // step1 connect to node LOGGER.info("pubKey=" + pubKey); - connectToMaster(pubKey, null); + NetworkManager.instance.connectToAgent(pubKey, null); CMActions.manager.masterStub.transferToOtherNode(pubKey, contractID); } else { LOGGER.info("pubKey is null."); diff --git a/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientHandler.java b/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientHandler.java index 837de19..14d7921 100644 --- a/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientHandler.java +++ b/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientHandler.java @@ -29,8 +29,7 @@ public class NodeCenterClientHandler extends SimpleChannelInboundHandler private static final Logger LOGGER = LogManager.getLogger(NodeCenterClientHandler.class); public static String[] clientToClusterPlugins; public boolean hasPermission; - public NodeCenterClientController controller; - public Map distributeReqMap = new ConcurrentHashMap<>(); + private NodeCenterClientController controller; Channel channel; // UDPTrustfulExecutor udpExecutor; // RecoverMechExecutor recoverMechExecutor; @@ -144,4 +143,8 @@ public class NodeCenterClientHandler extends SimpleChannelInboundHandler channel = null; } } + + public NodeCenterClientController getController() { + return controller; + } } diff --git a/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java b/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java index fbc0205..a05bb13 100644 --- a/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java +++ b/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java @@ -14,10 +14,8 @@ import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.util.JsonUtil; import org.bdware.server.CongestionControl; import org.bdware.server.action.ActionExecutor; -import org.bdware.server.action.p2p.MasterClientRecoverMechAction; -import org.bdware.server.action.p2p.MasterClientTCPAction; -import org.bdware.server.action.p2p.MasterClientTransferAction; -import org.bdware.server.trustedmodel.MasterProxy; +import org.bdware.server.action.p2p.*; +import org.bdware.units.NetworkManager; import java.io.ByteArrayOutputStream; import java.io.InputStreamReader; @@ -32,23 +30,19 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler { private static final Logger LOGGER = LogManager.getLogger(TCPClientFrameHandler.class); private static final ExecutorService executorService = Executors.newFixedThreadPool(10); public static String[] clientToAgentPlugins; - public String pubKey; + private final AliveCheckClientAction aliveCheckClientAction; public ActionExecutor ae; - MasterClientTCPAction actions; - MasterClientRecoverMechAction recoverActions; - MasterClientTransferAction transferActions; ChannelHandlerContext ctx; private Channel channel; - private boolean isConnected; private String master; // master node pubKey public TCPClientFrameHandler(String masterPubkey) { master = masterPubkey; - actions = new MasterClientTCPAction(this, master); - recoverActions = new MasterClientRecoverMechAction(this, actions); - transferActions = new MasterClientTransferAction(this, master, actions); + aliveCheckClientAction = new AliveCheckClientAction(masterPubkey); ae = new ActionExecutor<>( - executorService, actions, recoverActions, transferActions); + executorService, aliveCheckClientAction, + new MasterClientTCPAction(), new MasterClientRecoverMechAction(), MasterClientTransferAction.instance, + new MasterServerRecoverMechAction(), new MasterServerTransferAction(), new MasterServerTCPAction()); for (String str : clientToAgentPlugins) { Object obj = createInstanceByClzName(str); ae.appendHandler(obj); @@ -65,20 +59,19 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler { } - public void updateContractID2Client(String contract) { - // 如果该合约正在和旧的master保持连接 - if (MasterClientTCPAction.contractID2MasterInfo.containsKey(contract)) { - MasterClientTCPAction former = - MasterClientTCPAction.contractID2MasterInfo.get(contract); - former.closeMaster(); - } - - MasterClientTCPAction.contractID2MasterInfo.put(contract, actions); - } +// public void updateContractID2Client(String contract) { +// // 如果该合约正在和旧的master保持连接 +// if (MasterClientTCPAction.contractID2MasterInfo.containsKey(contract)) { +// MasterClientTCPAction former = +// MasterClientTCPAction.contractID2MasterInfo.get(contract); +// former.closeMaster(); +// } +// +// MasterClientTCPAction.contractID2MasterInfo.put(contract, actions); +// } public void close() { try { - isConnected = false; if (channel != null) channel.close(); } catch (Exception e) { e.printStackTrace(); @@ -87,17 +80,12 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler { } } - public void transferInstance(String contractID) { - LOGGER.info("[MasterCLientFrameHandler] transferInstance contractID=" + contractID); - transferActions.transferInstance(contractID); - } @Override public void channelActive(ChannelHandlerContext ctx) { channel = ctx.channel(); - isConnected = true; this.ctx = ctx; - actions.init(this); + aliveCheckClientAction.init(); CongestionControl.slaveCounter.getAndIncrement(); } @@ -112,7 +100,7 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler { ByteBuf bb = (ByteBuf) frame; JsonObject arg; try { - arg = JsonUtil.parseObject(new InputStreamReader(new ByteBufInputStream(bb))); + arg = JsonUtil.parseReaderAsJsonObject(new InputStreamReader(new ByteBufInputStream(bb))); } catch (Exception e) { e.printStackTrace(); Response response = new Response(); @@ -123,9 +111,8 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler { } Response response; try { - + LOGGER.info("====== TCPClientFrameHandler:" + arg.toString()); final String action = arg.get("action").getAsString(); - // logger.info("[MasterClientFrameHandler] handle:" + arg.toString()); ae.handle( action, arg, @@ -172,7 +159,7 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler { public void sendMsg(String json) { for (int i = 0; i < 3 && !isOpen(); i++) { try { - MasterProxy.reconnect(master); + NetworkManager.reconnectAgent(master); Thread.sleep(250); } catch (InterruptedException e) { e.printStackTrace(); @@ -195,7 +182,7 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler { } public void waitForSetNodeID() { - actions.waitForSetNodeID(); + aliveCheckClientAction.waitForSetNodeID(); } static class Response { diff --git a/src/main/java/org/bdware/server/tcp/TCPServerFrameHandler.java b/src/main/java/org/bdware/server/tcp/TCPServerFrameHandler.java index f3b0fc7..e0da824 100644 --- a/src/main/java/org/bdware/server/tcp/TCPServerFrameHandler.java +++ b/src/main/java/org/bdware/server/tcp/TCPServerFrameHandler.java @@ -14,10 +14,7 @@ import org.bdware.sc.util.JsonUtil; import org.bdware.server.CongestionControl; import org.bdware.server.action.Action; import org.bdware.server.action.ActionExecutor; -import org.bdware.server.action.p2p.ExecutionAction; -import org.bdware.server.action.p2p.MasterServerRecoverMechAction; -import org.bdware.server.action.p2p.MasterServerTCPAction; -import org.bdware.server.action.p2p.MasterServerTransferAction; +import org.bdware.server.action.p2p.*; import java.io.ByteArrayOutputStream; import java.io.InputStreamReader; @@ -28,22 +25,14 @@ import java.util.concurrent.Executors; public class TCPServerFrameHandler extends SimpleChannelInboundHandler { private static final Logger LOGGER = LogManager.getLogger(TCPServerFrameHandler.class); static ExecutorService executorService = Executors.newFixedThreadPool(10); - private final ExecutionAction executionActions; - public String pubKey; //slave's pubKey public ChannelHandlerContext ctx; public ActionExecutor ae; - MasterServerTCPAction actions; - MasterServerRecoverMechAction recoverActions; - MasterServerTransferAction transferAction; - public TCPServerFrameHandler() { - actions = new MasterServerTCPAction(this); - recoverActions = new MasterServerRecoverMechAction(this); - transferAction = new MasterServerTransferAction(this); - executionActions = new ExecutionAction(); ae = new ActionExecutor( - executorService, actions, recoverActions, executionActions, transferAction) { + executorService, new AliveCheckServerAction(this), + new MasterClientTCPAction(), new MasterClientRecoverMechAction(), MasterClientTransferAction.instance, + new MasterServerRecoverMechAction(), new MasterServerTransferAction(), new MasterServerTCPAction() ) { @Override public boolean checkPermission( Action a, final JsonObject args, long permission) { @@ -81,9 +70,6 @@ public class TCPServerFrameHandler extends SimpleChannelInboundHandler { }; } - public void setPermission(long l) { - ae.permission = l; - } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { @@ -175,14 +161,6 @@ public class TCPServerFrameHandler extends SimpleChannelInboundHandler { } } - public boolean isOpen() { - return ctx.channel().isOpen(); - } - - static class Log { - String pubKey; - String action; - } static class Response { public String cid; diff --git a/src/main/java/org/bdware/server/trustedmodel/AgentManager.java b/src/main/java/org/bdware/server/trustedmodel/AgentManager.java new file mode 100644 index 0000000..8fb97c1 --- /dev/null +++ b/src/main/java/org/bdware/server/trustedmodel/AgentManager.java @@ -0,0 +1,79 @@ +package org.bdware.server.trustedmodel; + +import com.google.gson.JsonPrimitive; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.bdware.sc.AgentPeerManagerIntf; +import org.bdware.sc.ContractMeta; +import org.bdware.sc.ContractResult; +import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.conn.ResultCallback; +import org.bdware.sc.util.JsonUtil; +import org.bdware.server.CongestionControl; +import org.bdware.server.action.CMActions; +import org.bdware.server.action.p2p.MasterClientTransferAction; +import org.bdware.server.action.p2p.MasterServerTCPAction; +import org.bdware.server.tcp.TCPClientFrameHandler; +import org.bdware.units.NetworkManager; + +import java.util.HashMap; +import java.util.Map; + +public class AgentManager implements AgentPeerManagerIntf { + // key is masters' pubKey + private static final Logger LOGGER = LogManager.getLogger(AgentManager.class); + // nodePubkey,该node作为master的地址 + + public static TCPClientFrameHandler getHandler(String masterID) { + return NetworkManager.CONNECTORS.get(masterID).handler; + } + + + @Override + public void executeByOtherNodeAsync(String pubKey, ContractRequest c, ResultCallback cb) { + LOGGER.error(pubKey + " " + c.getRequestID() + " " + c.getContractID() + " cb:" + cb); + int maxMasterProxyLoad = CongestionControl.masterProxyLoad.incrementAndGet(); + if (maxMasterProxyLoad > CongestionControl.maxMasterProxyLoad) + CongestionControl.maxMasterProxyLoad = maxMasterProxyLoad; + if (CongestionControl.slaveControl()) { + ContractResult cr = + new ContractResult( + ContractResult.Status.Error, + new JsonPrimitive("canceled because of queue too long")); + cb.onResult(JsonUtil.parseObjectAsJsonObject(cr)); + CongestionControl.masterProxyLoad.decrementAndGet(); + return; + } + MasterServerTCPAction.sync.sleep( + c.getRequestID(), + new ResultCallback() { + @Override + public void onResult(String str) { + cb.onResult(JsonUtil.parseStringAsJsonObject(str)); + CongestionControl.masterProxyLoad.decrementAndGet(); + } + }); + Map req = new HashMap<>(); + req.put("action", "requestContractExecutionServer"); + req.put("requestID", c.getRequestID()); + req.put("data", c); + NetworkManager.instance.sendToAgent(pubKey, JsonUtil.toJson(req)); + } + + @Override + public boolean hasAgentConnection(String pubKey) { + return NetworkManager.instance.hasAgentConnection(pubKey); + } + + + @Override + public void transferToOtherNode(String pubKey, String contractID) { + // TODO 问题1,合约的ypk或者script怎么获取?ypk如果是包含私有路径的可以在Contract中设置一个字段,启动的时候写入吗 目前认为这个从前端传入 + // TODO 问题2 转移过程中进度返回给哪个前端显示?需要显示吗? + LOGGER.info("transferToOtherNode : pubKey=" + pubKey + " contractID=" + contractID); + ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID); + MasterClientTransferAction.instance.transferInstance(pubKey, meta.getID()); + } + + +} diff --git a/src/main/java/org/bdware/server/trustedmodel/MasterProxy.java b/src/main/java/org/bdware/server/trustedmodel/MasterProxy.java deleted file mode 100644 index d0a15fa..0000000 --- a/src/main/java/org/bdware/server/trustedmodel/MasterProxy.java +++ /dev/null @@ -1,200 +0,0 @@ -package org.bdware.server.trustedmodel; - -import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; -import io.netty.bootstrap.Bootstrap; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.bdware.sc.ContractClient; -import org.bdware.sc.ContractManager; -import org.bdware.sc.ContractResult; -import org.bdware.sc.MasterStub; -import org.bdware.sc.bean.ContractRequest; -import org.bdware.sc.conn.ResultCallback; -import org.bdware.sc.units.MultiContractMeta; -import org.bdware.sc.util.JsonUtil; -import org.bdware.server.CongestionControl; -import org.bdware.server.action.CMActions; -import org.bdware.server.action.p2p.MasterServerTCPAction; -import org.bdware.server.tcp.TCPClientFrameHandler; - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class MasterProxy implements MasterStub { - // key is masters' pubKey - public static final Map CONNECTORS = new ConcurrentHashMap<>(); - private static final Logger LOGGER = LogManager.getLogger(MasterProxy.class); - // nodePubkey,该node作为master的地址 - public static Map slaverRouter = new HashMap<>(); - - public static TCPClientFrameHandler getHandler(String masterID) { - return CONNECTORS.get(masterID).handler; - } - - public static void reconnect(String master) { - LOGGER.debug( - "[MasterProxy] reconnect:" - + JsonUtil.toJson(slaverRouter) - + "\nmaster=" - + master); - try { - MasterConnector conn; - synchronized (conn = CONNECTORS.get(master)) { - if (!conn.handler.isOpen()) { - String[] ipAndPort = slaverRouter.get(master).split(":"); - conn.bootstrap - .connect(ipAndPort[0], Integer.parseInt(ipAndPort[1])) - .sync() - .channel(); - } - } - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Override - public void executeByOtherNodeAsync(String pubKey, ContractRequest c, ResultCallback cb) { - MasterConnector handler = CONNECTORS.get(pubKey); - LOGGER.error(pubKey + " " + c.getRequestID() + " " + c.getContractID() + " cb:" + cb); - int maxMasterProxyLoad = CongestionControl.masterProxyLoad.incrementAndGet(); - if (maxMasterProxyLoad > CongestionControl.maxMasterProxyLoad) - CongestionControl.maxMasterProxyLoad = maxMasterProxyLoad; - if (CongestionControl.slaveControl()) { - ContractResult cr = - new ContractResult( - ContractResult.Status.Error, - new JsonPrimitive("canceled because of queue too long")); - cb.onResult(JsonUtil.parseObjectAsJsonObject(cr)); - CongestionControl.masterProxyLoad.decrementAndGet(); - return; - } - MasterServerTCPAction.sync.sleep( - c.getRequestID(), - new ResultCallback() { - @Override - public void onResult(String str) { - cb.onResult(JsonUtil.parseStringAsJsonObject(str)); - CongestionControl.masterProxyLoad.decrementAndGet(); - } - }); - Map req = new HashMap<>(); - req.put("action", "requestContractExecution"); - req.put("requestID", c.getRequestID()); - req.put("data", c); - handler.handler.sendMsg(JsonUtil.toJson(req)); - } - - @Override - public boolean hasConnection(String pubKey) { - if (pubKey != null) return CONNECTORS.containsKey(pubKey); - return true; - } - -// @Override -// public ContractResult executeByOtherNode(String pubKey, ContractRequest c) { -// // get contract's master node -// try { -// int maxMasterProxyLoad = CongestionControl.masterProxyLoad.addAndGet(700); -// if (maxMasterProxyLoad > CongestionControl.maxMasterProxyLoad) -// CongestionControl.maxMasterProxyLoad = maxMasterProxyLoad; -// MasterConnector handler = connectors.get(pubKey); -// if (CongestionControl.slaveControl()) { -// ContractResult cr = -// new ContractResult( -// ContractResult.Status.Error, -// new JsonPrimitive("sync call canceled because of queue too long")); -// -// return cr; -// } -// // TODO 如果handler是空,需要先去连接这个master -// String ret = handler.handler.requestContractExecution(c); -// LOGGER.info("[MasterProxy] executeByOtherNodes 结果是 " + ret); -// // TODO -// // 这回来可能带了nodeIDs和executeTimes -// // 在这里是否需要保留nodeIDs和executeTimes?? -// return JsonUtil.fromJson(ret, ContractResult.class); -// } finally { -// CongestionControl.masterProxyLoad.addAndGet(-700); -// } -// } - - // @Override - // public String executeGlobally(ContractRequest c, OnHashCallback cb) { - // StrCollector resultCallback = new StrCollector(); - // CMActions.manager.executeContractInternal(c, resultCallback, cb); - // resultCallback.waitForResult(); - // return resultCallback.strRet; - // } - - // transfer contract instance - @Override - public void transferToOtherNode(String pubKey, String contractID) { - // TODO 问题1,合约的ypk或者script怎么获取?ypk如果是包含私有路径的可以在Contract中设置一个字段,启动的时候写入吗 目前认为这个从前端传入 - // TODO 问题2 转移过程中进度返回给哪个前端显示?需要显示吗? - - LOGGER.info("transferToOtherNode : pubKey=" + pubKey + " contractID=" + contractID); - MasterConnector handler = CONNECTORS.get(pubKey); - if (handler == null) { - LOGGER.info("connect failed."); - return; - } - contractID = CMActions.manager.getContractIDByName(contractID); - handler.handler.transferInstance(contractID); - } - - public static class MasterConnector { - public Bootstrap bootstrap; - public TCPClientFrameHandler handler; - } - - static class StrCollector extends ResultCallback { - String strRet = "{\"data\":\"Timeout\"}"; - boolean hasResult = false; - long start = System.currentTimeMillis(); - - @Override - public void onResult(String str) { - synchronized (this) { - strRet = str; - hasResult = true; - LOGGER.debug( - "[StrCollector] onResultTakes:" + (System.currentTimeMillis() - start)); - this.notifyAll(); - } - LOGGER.debug( - "[StrCollector] onResultTakesNotifyDone:" - + (System.currentTimeMillis() - start)); - } - - public void waitForResult() { - synchronized (this) { - try { - if (!hasResult) { - this.wait(5000); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - LOGGER.debug("[StrCollector] waitTakes:" + (System.currentTimeMillis() - start)); - } - - public void waitForResultCounter(int count) { - synchronized (this) { - try { - if (!hasResult) { - this.wait(20000L * count); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - LOGGER.debug("[StrCollector] waitTakes:" + (System.currentTimeMillis() - start)); - } - } -} diff --git a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java index f2d0f48..91e9215 100644 --- a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java +++ b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java @@ -7,13 +7,14 @@ import org.apache.logging.log4j.Logger; import org.bdware.sc.ContractManager; import org.bdware.sc.ContractResult; import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.conn.OnHashCallback; import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.units.MultiContractMeta; import org.bdware.sc.units.RecoverFlag; import org.bdware.sc.util.HashUtil; import org.bdware.sc.util.JsonUtil; import org.bdware.server.action.p2p.MasterServerRecoverMechAction; -import org.bdware.server.action.p2p.MasterServerTCPAction; +import org.bdware.units.NetworkManager; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -76,7 +77,7 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { } @Override - public void execute(String requestID, ResultCallback rc, ContractRequest req) { + public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hrc) { if (executedTxs.containsKey(requestID)) { rc.onResult(JsonUtil.toJson(new ContractResult( ContractResult.Status.Error, @@ -94,12 +95,6 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { } } - @Override - public void close() { - this.future.cancel(false); - this.running = false; - LOGGER.info("destruct executor of contract " + meta.getContractID()); - } public void execute(String blockStr) { Block block = JsonUtil.fromJson(blockStr, Block.class); @@ -158,11 +153,9 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { req.addProperty("contractID", this.meta.getContractID()); String reqStr = req.toString(); for (String node : nodes) { - SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(node); - if (cmNode != null && - MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) - == RecoverFlag.Fine) { - cmNode.connection.sendMsg(reqStr); + if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) + == RecoverFlag.Fine) { + NetworkManager.instance.sendToAgent(node, reqStr); } } } diff --git a/src/main/java/org/bdware/server/trustedmodel/SlaveNode.java b/src/main/java/org/bdware/server/trustedmodel/SlaveNode.java index b75982d..51950ec 100644 --- a/src/main/java/org/bdware/server/trustedmodel/SlaveNode.java +++ b/src/main/java/org/bdware/server/trustedmodel/SlaveNode.java @@ -14,38 +14,5 @@ public class SlaveNode { this.pubKey = pubKey; } - public boolean isAlive() { - if (connection == null) { - LOGGER.info(pubKey.substring(0, 5) + " is not alive, connection is null"); - return false; - } - try { - if (!connection.handler.isOpen()) { - LOGGER.info(pubKey.substring(0, 5) + " is not open, try is 3seconds later"); - return isAliveAfter3(); - } else { - return true; - } - } catch (Exception e) { - e.printStackTrace(); - } - return false; - } - private boolean isAliveAfter3() { - try { - Thread.sleep(3000L); - - if (connection.handler.isOpen()) { - LOGGER.info(pubKey.substring(0, 5) + " is open, after 3seconds later"); - - return true; - } - LOGGER.info(pubKey.substring(0, 5) + " is not open, after 3seconds later"); - - } catch (Exception e) { - e.printStackTrace(); - } - return false; - } } diff --git a/src/main/java/org/bdware/units/NetworkManager.java b/src/main/java/org/bdware/units/NetworkManager.java index ed1d3a0..b3e22d5 100644 --- a/src/main/java/org/bdware/units/NetworkManager.java +++ b/src/main/java/org/bdware/units/NetworkManager.java @@ -12,17 +12,19 @@ import io.netty.handler.timeout.IdleStateHandler; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bdware.sc.ContractManager; +import org.bdware.sc.util.JsonUtil; +import org.bdware.server.CMHttpServer; +import org.bdware.server.ControllerManager; import org.bdware.server.GlobalConf; -import org.bdware.server.action.CMActions; import org.bdware.server.nodecenter.client.NodeCenterClientHandler; import org.bdware.server.tcp.TCPServerFrameHandler; +import org.bdware.server.trustedmodel.SlaveNode; import org.bdware.server.ws.DelimiterCodec; import org.bdware.units.enums.NetworkType; import org.bdware.units.grpc.BDLedgerContract.UnitMessage; import org.bdware.units.grpc.JavaContractServiceGrpcServer; import org.bdware.units.msghandler.UnitMessageHandler; import org.bdware.units.tcp.TCPClientFrameHandler; -import org.bdware.units.tcp.TCPUtils; import java.net.InetSocketAddress; import java.net.URI; @@ -30,6 +32,7 @@ import java.net.URISyntaxException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** @@ -38,40 +41,45 @@ import java.util.concurrent.TimeUnit; * @author OliveDS (Shuang Deng) */ public class NetworkManager { + + + public static class AgentConnector { + public Bootstrap bootstrap; + public org.bdware.server.tcp.TCPClientFrameHandler handler; + } + + //Manage server->client connection; + public static final Map CONNECTORS = new ConcurrentHashMap<>(); + //Manage client->server connection; + public static final Map SERVERCONNECTORS = new ConcurrentHashMap<>(); + private static Map slaverRouter = new HashMap<>(); + public static Map id2Slaves = new ConcurrentHashMap<>(); + public static final String NODE_CENTER_CLIENT = "NODE_CENTER_CLIENT"; public static final String P2P_GRPC_CLIENT = "P2P_GRPC_CLIENT"; private static final Logger LOGGER = LogManager.getLogger(NetworkManager.class); public static NetworkManager instance = new NetworkManager(); - private final Map peerID2TCPAddress; - private final Map tcpClientMap; - private String tcpNodeCenter; - private String tcpSelf; - // connection to node center private NodeCenterClientHandler nodeCenterClientHandler; + private final Map peerID2TCPAddress; public NetworkManager() { peerID2TCPAddress = new HashMap<>(); - tcpClientMap = new HashMap<>(); } - public void initP2P(int port) { - JavaContractServiceGrpcServer.init(port); - } public void initTCP(int tcpPort, EventLoopGroup workerGroup) { createTCPServer(tcpPort, workerGroup); connectToTCPNodeCenter(); } - public boolean containsTcpClient(String peer) { - return tcpClientMap.containsKey(peer); + public void initP2P(int port) { + JavaContractServiceGrpcServer.init(port); } private void connectToTCPNodeCenter() { final Bootstrap b = new Bootstrap(); b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000); - nodeCenterClientHandler = new NodeCenterClientHandler(); - CMActions.manager.nodeCenterConn = nodeCenterClientHandler.controller; + nodeCenterClientHandler = ControllerManager.createNodeCenterClientHandler(); EventLoopGroup group = new NioEventLoopGroup(); b.group(group); b.channel(NioSocketChannel.class) @@ -97,7 +105,7 @@ public class NetworkManager { LOGGER.error("creating uri failed! " + e1.getMessage()); } if (!nodeCenterClientHandler.isConnected() - || !nodeCenterClientHandler.controller.syncPing()) { + || !ControllerManager.getNodeCenterController().syncPing()) { nodeCenterClientHandler.close(); assert null != uri; b.connect(uri.getHost(), uri.getPort()).sync().channel(); @@ -108,6 +116,8 @@ public class NetworkManager { + uri.getPort()); } } catch (Exception e) { + + e.printStackTrace(); LOGGER.error("connecting to node center failed! " + e.getMessage()); } }, @@ -143,6 +153,154 @@ public class NetworkManager { } } + + //TODO Remove in future + public void sendToNodeCenter(String msg) { + nodeCenterClientHandler.sendMsg(msg); + } + + public boolean isConnectedToNodeCenter() { + return null != nodeCenterClientHandler && nodeCenterClientHandler.isConnected(); + } + + public void waitForNodeCenterConnected() { + for (int i = 0; + i < 10 && null != nodeCenterClientHandler && !nodeCenterClientHandler.isConnected(); + i++) { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public void reInitNodeCenter() { + if (null != nodeCenterClientHandler) { + nodeCenterClientHandler.close(); + } + } + + //----------AgentNetworkManagement + + + public void updateAgentRouter(String nodeID, String address) { + slaverRouter.put(nodeID, address); + } + + public void registerConnection(String nodeID, TCPServerFrameHandler handler) { + LOGGER.info("nodeID:"+nodeID+" connected!!"); + SERVERCONNECTORS.put(nodeID, handler); + } + + public void closeAgent(String agentPubkey) { + //TODO +// if (handler != null) { +// try { +// handler.close(); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } + //AliveCheckAction.closeMaster(); + NetworkManager.CONNECTORS.remove(agentPubkey); + } + + public void connectToAgent(String master, String contractID) { + LOGGER.info("[CMClientController] connectToMaster master= " + master); + // logger.debug("ConnectToMaster:" + master + "\nMasterRoute:" + slaverRouter.get(master)); + Bootstrap b; + AgentConnector connector = null; + synchronized (CONNECTORS) { + if (!CONNECTORS.containsKey(master)) { + connector = new AgentConnector(); + CONNECTORS.put(master, connector); + } + } + if (connector != null) { + b = new Bootstrap(); + org.bdware.server.tcp.TCPClientFrameHandler handler = new org.bdware.server.tcp.TCPClientFrameHandler(master); + connector.bootstrap = b; + connector.handler = handler; + b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000); + b.group(CMHttpServer.workerGroup); + b.channel(NioSocketChannel.class) + .handler( + new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline p = ch.pipeline(); + p.addLast(new DelimiterCodec()).addLast(handler); + } + }); + } + reconnectAgent(master); + } + + public static void reconnectAgent(String master) { + LOGGER.debug( + "[MasterProxy] reconnect:" + + JsonUtil.toJson(slaverRouter) + + "\nmaster=" + + master); + try { + NetworkManager.AgentConnector conn; + synchronized (conn = NetworkManager.CONNECTORS.get(master)) { + if (!conn.handler.isOpen()) { + String[] ipAndPort = slaverRouter.get(master).split(":"); + conn.bootstrap + .connect(ipAndPort[0], Integer.parseInt(ipAndPort[1])) + .sync() + .channel(); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void sendToAgent(String pubkey, String content) { + if (sendToAgentByServer(pubkey, content)) { + return; + } + if (!hasAgentConnection(pubkey)) { + nodeCenterClientHandler.getController().connectToNode(pubkey); + } + CONNECTORS.get(pubkey).handler.sendMsg(content); + } + + private boolean sendToAgentByServer(String pubkey, String content) { + TCPServerFrameHandler handler = SERVERCONNECTORS.get(pubkey); + if (handler == null) return false; + try { + handler.sendMsg(content); + return true; + } catch (Exception e) { + e.printStackTrace(); + LOGGER.info("Remove ServerConnection,client pubkey:" + pubkey); + } + return false; + } + + public void sendToAgentIfConnected(String pubkey, String content) { + try { + if (hasAgentConnection(pubkey)) + CONNECTORS.get(pubkey).handler.sendMsg(content); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public boolean hasAgentConnection(String pubKey) { + if (pubKey != null) { + if (SERVERCONNECTORS.containsKey(pubKey)) + return true; + } + return CONNECTORS.containsKey(pubKey); + } + + //-------UNUSED TOMerge------------ + //UNUSED public TCPClientFrameHandler createTCPClient(String peer, String ipPort) throws InterruptedException { if (peer.equals(GlobalConf.instance.peerID)) { @@ -155,7 +313,7 @@ public class NetworkManager { NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); final TCPClientFrameHandler handler = new TCPClientFrameHandler(peer); - tcpClientMap.put(peer, handler); + //tcpClientMap.put(peer, handler); b.group(group) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host, port)) @@ -182,15 +340,8 @@ public class NetworkManager { return handler; } - public NodeCenterClientHandler getNodeCenterClientHandler() { - return nodeCenterClientHandler; - } - public void sendToNodeCenter(String msg) { - nodeCenterClientHandler.sendMsg(msg); - } - /** - * send to all kinds including special receivers + * UNUSED send to all kinds including special receivers * * @param unitMessage unit message * @param networkType type of network @@ -209,7 +360,7 @@ public class NetworkManager { /** - * send to TCP nodes, if fail send by p2p + * UNUSED send to TCP nodes, if fail send by p2p * * @param msg unit message */ @@ -225,10 +376,10 @@ public class NetworkManager { LOGGER.info("send msg to itself " + msg); continue; } - tcpClientFrameHandler = NetworkManager.instance.tcpClientMap.getOrDefault(peer, null); - if (null == tcpClientFrameHandler) { + // tcpClientFrameHandler = NetworkManager.instance.tcpClientMap.getOrDefault(peer, null); + if (peer != null) { if (peerID2TCPAddress.containsKey(peer)) { - recreateTCPClient(peer); + //recreateTCPClient(peer); // instance.tcpClientMap.put(peer, tcpClientFrameHandler); UnitMessage unitMessage = msg.toBuilder().clearReceiver().addReceiver(peer).build(); @@ -243,79 +394,11 @@ public class NetworkManager { continue; } LOGGER.info("send msg by tcp to " + peer); - tcpClientFrameHandler.sendMsg(msg); + // tcpClientFrameHandler.sendMsg(msg); } } - private TCPClientFrameHandler recreateTCPClient(String peer) { - TCPClientFrameHandler tcpClientFrameHandler; - try { - tcpClientFrameHandler = instance.createTCPClient(peer, peerID2TCPAddress.get(peer)); - return tcpClientFrameHandler; - } catch (Exception e) { - LOGGER.error("[NetworkManager] createTCPClient error"); - e.printStackTrace(); - return null; - } - } - - public void closeConnection(String peer) { - if (TCPUtils.isTCPAddress(peer)) { - NetworkManager.instance.tcpClientMap.remove(peer); - } else { - LOGGER.debug("[NetworkManager] closeConnection p2p " + peer); - } - } - - public void updateLocalContractToNodeCenter() { - if (null != nodeCenterClientHandler) { - nodeCenterClientHandler.controller.updateContract(); - } - } - - public String getTcpNodeCenter() { - return instance.tcpNodeCenter; - } - - public void setTcpNodeCenter(String tcpNodeCenter) { - instance.tcpNodeCenter = tcpNodeCenter; - } - - public String getTcpSelf() { - return instance.tcpSelf; - } - - public void setTcpSelf(String tcpSelf) { - instance.tcpSelf = tcpSelf; - } - - public boolean isConnectedToNodeCenter() { - return null != nodeCenterClientHandler; - } - - public NodeCenterClientHandler getNCClientHandler() { - return nodeCenterClientHandler; - } - public void sendTo(UnitMessage unitMessage, String symbol) { // } - - public void waitForNodeCenterConnected() { - for (int i = 0; - i < 10 && null != nodeCenterClientHandler && !nodeCenterClientHandler.isConnected(); - i++) { - try { - Thread.sleep(200); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - public void reInitNodeCenter() { - if (null != nodeCenterClientHandler) { - nodeCenterClientHandler.close(); - } - } } diff --git a/src/main/java/org/bdware/units/function/CommunicationManager.java b/src/main/java/org/bdware/units/function/CommunicationManager.java index c2a5fe9..9e73fda 100644 --- a/src/main/java/org/bdware/units/function/CommunicationManager.java +++ b/src/main/java/org/bdware/units/function/CommunicationManager.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.util.JsonUtil; +import org.bdware.server.ControllerManager; import org.bdware.server.GlobalConf; import org.bdware.server.action.Action; import org.bdware.server.action.ActionExecutor; @@ -46,8 +47,7 @@ public class CommunicationManager extends BaseFunctionManager { return instance; } instance = new CommunicationManager(); - instance.nodeCenterClientController = new NodeCenterClientController(GlobalConf.getNodeID()); - instance.nodeCenterClientController.init(NetworkManager.instance.getNCClientHandler()); + instance.nodeCenterClientController = ControllerManager.getNodeCenterController(); instance.communicationAction = new CommunicationAction(); instance.ae = new ActionExecutor( executorService, diff --git a/src/main/java/org/bdware/units/function/ExecutionManager.java b/src/main/java/org/bdware/units/function/ExecutionManager.java index ffbc574..69671d4 100644 --- a/src/main/java/org/bdware/units/function/ExecutionManager.java +++ b/src/main/java/org/bdware/units/function/ExecutionManager.java @@ -11,10 +11,10 @@ import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.units.RequestCache; import org.bdware.sc.units.RespCache; import org.bdware.sc.util.JsonUtil; +import org.bdware.server.ControllerManager; import org.bdware.server.GlobalConf; import org.bdware.server.action.Action; import org.bdware.server.action.ActionExecutor; -import org.bdware.server.action.p2p.ExecutionAction; import org.bdware.units.NetworkManager; import org.bdware.units.beans.MultiPointContractInfo; import org.bdware.units.beans.UnitContractMessage; @@ -46,7 +46,6 @@ public class ExecutionManager extends BaseFunctionManager { private final BDLedgerContract.UnitMessageType UNIT_MESSAGE_TYPE = BDLedgerContract.UnitMessageType.UnitContractMessage; public Map contractID2Sequencing = new HashMap<>(); - protected ExecutionAction executionAction; // protected MasterServerTCPAction masterActions; // protected MasterClientTCPAction clientActions; protected ActionExecutor actionExecutor; @@ -65,10 +64,9 @@ public class ExecutionManager extends BaseFunctionManager { instance = new ExecutionManager(); // instance.masterActions = new MasterServerTCPAction(); // instance.clientActions = new MasterClientTCPAction(); - instance.executionAction = new ExecutionAction(); instance.actionExecutor = new ActionExecutor( - executorService, instance.executionAction) { // , instance.masterActions, + executorService) { // , instance.masterActions, // instance.clientActions) { @Override public boolean checkPermission( @@ -144,7 +142,7 @@ public class ExecutionManager extends BaseFunctionManager { * 需要最后调用,在动作结束后,否则调用的方法后面不会执行 */ public void updateLocalContractToNodeCenter() { - NetworkManager.instance.updateLocalContractToNodeCenter(); + ControllerManager.getNodeCenterController().updateContract(); // List info = CMActions.manager.getContractDespList(); // JsonObject jo = new JsonObject(); // jo.addProperty("action", "updateContract");