From 0e85394612ea4a4e618082f81709c3cd848ca310 Mon Sep 17 00:00:00 2001 From: "Frank.R.Wu" Date: Mon, 6 Dec 2021 18:02:47 +0800 Subject: [PATCH] feat: update event mechanism rename EventActions to EventWSActions; add EventActions to handle event messages, and register it in TCP*FrameHandler; update AgentManager to support event mechanism, update NetworkManager and NodeCenterClientController to provide node list --- .../org/bdware/server/ControllerManager.java | 1 - .../java/org/bdware/server/GlobalConf.java | 2 +- .../org/bdware/server/action/CMActions.java | 102 ++++++------- .../bdware/server/action/EventActions.java | 37 +---- .../bdware/server/action/EventWSActions.java | 39 +++++ .../p2p/MasterServerRecoverMechAction.java | 2 +- .../action/p2p/_UNUSED_ExecutionAction.java | 1 + .../client/NodeCenterClientController.java | 138 +++--------------- .../client/NodeCenterClientHandler.java | 2 +- .../server/tcp/TCPClientFrameHandler.java | 15 +- .../server/tcp/TCPServerFrameHandler.java | 20 +-- .../server/trustedmodel/AgentManager.java | 28 +++- .../ws/ContractManagerFrameHandler.java | 2 +- .../java/org/bdware/units/NetworkManager.java | 129 ++++++++-------- 14 files changed, 227 insertions(+), 291 deletions(-) create mode 100644 src/main/java/org/bdware/server/action/EventWSActions.java diff --git a/src/main/java/org/bdware/server/ControllerManager.java b/src/main/java/org/bdware/server/ControllerManager.java index 9bc96e6..c57d46f 100644 --- a/src/main/java/org/bdware/server/ControllerManager.java +++ b/src/main/java/org/bdware/server/ControllerManager.java @@ -5,7 +5,6 @@ import org.bdware.server.nodecenter.client.NodeCenterClientController; import org.bdware.server.nodecenter.client.NodeCenterClientHandler; public class ControllerManager { - private static NodeCenterClientController nodeCenterClientController; private static NodeCenterClientHandler nodeCenterClientHandler; diff --git a/src/main/java/org/bdware/server/GlobalConf.java b/src/main/java/org/bdware/server/GlobalConf.java index 7605fd0..88eed30 100644 --- a/src/main/java/org/bdware/server/GlobalConf.java +++ b/src/main/java/org/bdware/server/GlobalConf.java @@ -91,7 +91,7 @@ public class GlobalConf { if (f.exists()) { KeyValueDBUtil.instance.setValue( dbName, "yjsPath", new File("./yjs.jar").getAbsolutePath()); - }else { + } else { KeyValueDBUtil.instance.setValue( dbName, "yjsPath", new File("./cp/yjs.jar").getAbsolutePath()); } diff --git a/src/main/java/org/bdware/server/action/CMActions.java b/src/main/java/org/bdware/server/action/CMActions.java index ab45649..bca6703 100644 --- a/src/main/java/org/bdware/server/action/CMActions.java +++ b/src/main/java/org/bdware/server/action/CMActions.java @@ -1340,6 +1340,57 @@ public class CMActions implements OnHashCallback { return manager.staticVerify(c); } + @Action(userPermission = 1L << 16, async = true) + public void getControlFlowByFileName(JsonObject args, ResultCallback resultCallback) { + String project = args.get("projectName").getAsString(); + String dirPath; + if (args.has("isPrivate") && args.get("isPrivate").getAsBoolean()) + dirPath = GlobalConf.instance.privateDir; + else dirPath = GlobalConf.instance.publicDir; + String ypkPath = FileActions.autoCompile(dirPath, project); + Contract c = new Contract(); + c.setScript(ypkPath); + // c.setType(Type.Algorithm); + c.setType(ContractExecType.Sole); + Map r = new HashMap<>(); + r.put("action", "onGetControlFlow"); + r.put("result", manager.getControlFlow(c)); + resultCallback.onResult(r); + } + + @Action(async = true, userPermission = 1L << 26) + public void startContractInTempZips(JsonObject args, ResultCallback resultCallback) { + long start = System.currentTimeMillis(); + Contract c = new Contract(); + // c.setType(Type.Algorithm); + c.setType(ContractExecType.Sole); + String pubkey = args.get("owner").getAsString(); + // SCManagerServlet.registerCCCallback(json.getString("requestID"), this); + Map r = new HashMap<>(); + r.put("action", "onStartContract"); + String path = args.get("path").getAsString(); + path = + new File(new File(GlobalConf.instance.projectDir, "tempZips"), path) + .getAbsolutePath(); + c.setScript(path); + c.setSignature(args.get("signature").getAsString()); + c.setOwner(pubkey); + if (!c.verifySignature()) { + r.put("data", "verify failed"); + resultCallback.onResult(r); + return; + } + r.put("data", manager.startContractAndRedirect(c, System.out)); + r.put("cid", c.getID()); + r.put("executeTime", System.currentTimeMillis() - start); + resultCallback.onResult(r); + + if (args.has("dumpPeriod")) { + LOGGER.debug("[CMActions]启动后设置dump周期" + args.get("dumpPeriod").getAsString()); + manager.changeDumpPeriod(c.getID(), args.get("dumpPeriod").getAsString()); + } + } + // @Action(userPermission = 1L << 26, async = true) // public void queryContractInstanceInfoByDOI(JsonObject args, ResultCallback resultCallback) // { @@ -1431,57 +1482,6 @@ public class CMActions implements OnHashCallback { * resultCallback.onResult(gson.toJson(r)); } */ - @Action(userPermission = 1L << 16, async = true) - public void getControlFlowByFileName(JsonObject args, ResultCallback resultCallback) { - String project = args.get("projectName").getAsString(); - String dirPath; - if (args.has("isPrivate") && args.get("isPrivate").getAsBoolean()) - dirPath = GlobalConf.instance.privateDir; - else dirPath = GlobalConf.instance.publicDir; - String ypkPath = FileActions.autoCompile(dirPath, project); - Contract c = new Contract(); - c.setScript(ypkPath); - // c.setType(Type.Algorithm); - c.setType(ContractExecType.Sole); - Map r = new HashMap<>(); - r.put("action", "onGetControlFlow"); - r.put("result", manager.getControlFlow(c)); - resultCallback.onResult(r); - } - - @Action(async = true, userPermission = 1L << 26) - public void startContractInTempZips(JsonObject args, ResultCallback resultCallback) { - long start = System.currentTimeMillis(); - Contract c = new Contract(); - // c.setType(Type.Algorithm); - c.setType(ContractExecType.Sole); - String pubkey = args.get("owner").getAsString(); - // SCManagerServlet.registerCCCallback(json.getString("requestID"), this); - Map r = new HashMap<>(); - r.put("action", "onStartContract"); - String path = args.get("path").getAsString(); - path = - new File(new File(GlobalConf.instance.projectDir, "tempZips"), path) - .getAbsolutePath(); - c.setScript(path); - c.setSignature(args.get("signature").getAsString()); - c.setOwner(pubkey); - if (!c.verifySignature()) { - r.put("data", "verify failed"); - resultCallback.onResult(r); - return; - } - r.put("data", manager.startContractAndRedirect(c, System.out)); - r.put("cid", c.getID()); - r.put("executeTime", System.currentTimeMillis() - start); - resultCallback.onResult(r); - - if (args.has("dumpPeriod")) { - LOGGER.debug("[CMActions]启动后设置dump周期" + args.get("dumpPeriod").getAsString()); - manager.changeDumpPeriod(c.getID(), args.get("dumpPeriod").getAsString()); - } - } - @Action(userPermission = 1L << 26, async = true, httpAccess = false) public void killAllContract(JsonObject args, ResultCallback resultCallback) { if (args.has("verifiedPubKey")) { diff --git a/src/main/java/org/bdware/server/action/EventActions.java b/src/main/java/org/bdware/server/action/EventActions.java index c03a8eb..b13ad9b 100644 --- a/src/main/java/org/bdware/server/action/EventActions.java +++ b/src/main/java/org/bdware/server/action/EventActions.java @@ -1,41 +1,14 @@ package org.bdware.server.action; import com.google.gson.JsonObject; -import org.bdware.sc.ContractClient; import org.bdware.sc.conn.ResultCallback; -import org.bdware.sc.util.HashUtil; -import org.bdware.server.action.Action; -import org.bdware.server.action.CMActions; +import org.bdware.sc.event.REvent; +import org.bdware.sc.util.JsonUtil; public class EventActions { @Action(async = true, userPermission = 0) - public void subEvent(JsonObject args, final ResultCallback rcb) { - JsonObject ret = new JsonObject(); - ret.addProperty("action", "onSubEvent"); - ret.addProperty("status", "Error"); - if (args.has("requestID")) { - ret.addProperty("responseID", args.get("requestID").getAsString()); - } - if (!args.has("topic")) { - ret.addProperty("data", "no topic arg!"); - rcb.onResult(ret.toString()); - return; - } - String topic = args.get("topic").getAsString(); - if (args.has("contractID")) { - String argCID = args.get("contractID").getAsString(); - ContractClient client = CMActions.manager.getClient(argCID); - if (null == client) { - ret.addProperty("data", "invalid contract ID or Name!"); - rcb.onResult(ret.toString()); - return; - } - String contractID = client.getContractID(); - topic = HashUtil.sha3(contractID, topic); - } - CMActions.manager.subEventByClient(topic, rcb.getChannel()); - ret.addProperty("status", "Success"); - ret.addProperty("data", topic); - rcb.onResult(ret); + public void deliverEvent(JsonObject args, final ResultCallback rcb) { + CMActions.manager.deliverEvent( + JsonUtil.fromJson(args.get("data").getAsString(), REvent.class)); } } diff --git a/src/main/java/org/bdware/server/action/EventWSActions.java b/src/main/java/org/bdware/server/action/EventWSActions.java new file mode 100644 index 0000000..5737b52 --- /dev/null +++ b/src/main/java/org/bdware/server/action/EventWSActions.java @@ -0,0 +1,39 @@ +package org.bdware.server.action; + +import com.google.gson.JsonObject; +import org.bdware.sc.ContractClient; +import org.bdware.sc.conn.ResultCallback; +import org.bdware.sc.util.HashUtil; + +public class EventWSActions { + @Action(async = true, userPermission = 0) + public void subEvent(JsonObject args, final ResultCallback rcb) { + JsonObject ret = new JsonObject(); + ret.addProperty("action", "onSubEvent"); + ret.addProperty("status", "Error"); + if (args.has("requestID")) { + ret.addProperty("responseID", args.get("requestID").getAsString()); + } + if (!args.has("topic")) { + ret.addProperty("data", "no topic arg!"); + rcb.onResult(ret.toString()); + return; + } + String topic = args.get("topic").getAsString(); + if (args.has("contractID")) { + String argCID = args.get("contractID").getAsString(); + ContractClient client = CMActions.manager.getClient(argCID); + if (null == client) { + ret.addProperty("data", "invalid contract ID or Name!"); + rcb.onResult(ret.toString()); + return; + } + String contractID = client.getContractID(); + topic = HashUtil.sha3(contractID, topic); + } + CMActions.manager.subEventByClient(topic, rcb.getChannel()); + ret.addProperty("status", "Success"); + ret.addProperty("data", topic); + rcb.onResult(ret); + } +} diff --git a/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java b/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java index fdbdb2e..c6007c7 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java @@ -30,7 +30,7 @@ import java.util.zip.GZIPOutputStream; public class MasterServerRecoverMechAction { private static final Logger LOGGER = LogManager.getLogger(MasterServerRecoverMechAction.class); public static Map> recoverStatus = new ConcurrentHashMap<>(); - private Map stateFileMap = new HashMap<>(); + private final Map stateFileMap = new HashMap<>(); public MasterServerRecoverMechAction() { } diff --git a/src/main/java/org/bdware/server/action/p2p/_UNUSED_ExecutionAction.java b/src/main/java/org/bdware/server/action/p2p/_UNUSED_ExecutionAction.java index 7a12a7f..167ffa4 100644 --- a/src/main/java/org/bdware/server/action/p2p/_UNUSED_ExecutionAction.java +++ b/src/main/java/org/bdware/server/action/p2p/_UNUSED_ExecutionAction.java @@ -31,6 +31,7 @@ import org.bdware.units.msghandler.ResponseCenter; import java.io.File; +@SuppressWarnings("unused") public class _UNUSED_ExecutionAction implements OnHashCallback { private static final Logger LOGGER = LogManager.getLogger(_UNUSED_ExecutionAction.class); diff --git a/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientController.java b/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientController.java index a635ac3..79a2cbf 100644 --- a/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientController.java +++ b/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientController.java @@ -12,9 +12,7 @@ import org.bdware.sc.conn.ByteUtil; import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.db.CMTables; import org.bdware.sc.db.KeyValueDBUtil; -import org.bdware.sc.event.REvent; import org.bdware.sc.units.MultiContractMeta; -import org.bdware.sc.util.HashUtil; import org.bdware.sc.util.JsonUtil; import org.bdware.server.GlobalConf; import org.bdware.server.action.Action; @@ -33,7 +31,6 @@ import org.zz.gmhelper.SM2KeyPair; import org.zz.gmhelper.SM2Util; import java.io.*; -import java.math.BigInteger; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.zip.ZipEntry; @@ -44,21 +41,18 @@ public class NodeCenterClientController implements NodeCenterConn { public static SyncResult sync = new SyncResult(); private static boolean startCheck = false; private final Map fileMap; - public Map distributeReqMap = new ConcurrentHashMap<>(); - private final NetNeighbors neighbors; + public Map distributeReqMap = new ConcurrentHashMap<>(); // public NodeCenterClientController cmClientController; String nodeID; - Map> subscribedInfo; NodeCenterClientHandler handler; // 合约contractID,master的公钥 Map contractID2PubKey = new ConcurrentHashMap<>(); public NodeCenterClientController(String nodeID) { - subscribedInfo = new HashMap<>(); - fileMap = new HashMap<>(); + this.fileMap = new HashMap<>(); this.nodeID = nodeID; - neighbors = new NetNeighbors(); + this.neighbors = new NetNeighbors(); } public void init(NodeCenterClientHandler webSocketClientHandler) { @@ -270,12 +264,6 @@ public class NodeCenterClientController implements NodeCenterConn { updateContract(); } - @Action(async = true) - public void deliverEvent(JsonObject jo, ResultCallback cb) { - CMActions.manager.deliverEvent( - JsonUtil.fromJson(jo.get("data").getAsString(), REvent.class)); - } - @Action(async = true) public void executeContractLocally(JsonObject jo, ResultCallback resultCallback) { String requestID = jo.get("requestID").getAsString(); @@ -299,45 +287,27 @@ public class NodeCenterClientController implements NodeCenterConn { sync.wakeUp(requestID, data); } - // deliver REvent @Override - public boolean deliverEvent(String event, String target) { - if (null != target && !nodeID.equals(target)) { - LOGGER.info("Deliver event message in node " + target.substring(0, 6)); - JsonObject jo = new JsonObject(); - jo.addProperty("msg", event); - jo.addProperty("target", target); - jo.addProperty("action", "deliverEvent"); - sendMsg(JsonUtil.toJson(jo)); - return true; - } - return false; - } - - @Override - public NodeKey[] listNodes() { - NodeKey[] ret; + public String[] listNodes() { + String[] ret; synchronized (neighbors) { - if (neighbors.arr.length != neighbors.map.size()) { - Set set = new TreeSet<>(neighbors.map.keySet()); - neighbors.arr = new NodeKey[set.size()]; + if (neighbors.arr.length != neighbors.set.size()) { + Set set = new TreeSet<>(neighbors.set); + neighbors.arr = new String[set.size()]; int i = 0; for (String id : set) { - neighbors.arr[i++] = new NodeKey(id); + neighbors.arr[i++] = id; } } ret = neighbors.arr; - LOGGER.debug(neighbors.map.size() + " nodes registered"); + LOGGER.debug(neighbors.set.size() + " nodes registered"); } return ret; } @Override - public String getNodeId(String str) { - if (null == str) { - return nodeID; - } - return neighbors.map.get(str); + public String getNodeId() { + return nodeID; } @Action(async = true) @@ -803,89 +773,19 @@ public class NodeCenterClientController implements NodeCenterConn { sync.wakeUp(jo.get("responseID").getAsString(), jo.toString()); } - @Override - public String[] getClusterByKey(String key, int k) { - NodeKey[] nodes = this.listNodes(); - if (nodes.length == 0) { - return null; - } - if (nodes.length == 1) { - return new String[]{this.getNodeId(nodes[0].id)}; - } - - // get hash with enough length - String hash = HashUtil.sha3ToFixedLen(key, nodes[0].id.length()); - BigInteger biH = new BigInteger(hash, 16); - - // binary search, to find the nearest node with hash - int l = 0, r = nodes.length - 1, m = 0, - comL = biH.compareTo(nodes[l].biId), comR = nodes[r].biId.compareTo(biH), - comM; - String selected; - do { - if (comL < 1) { - selected = nodes[l].id; - break; - } - if (comR < 1) { - selected = nodes[r].id; - break; - } - if (l + 1 == r) { - if (biH.subtract(nodes[l].biId).compareTo(nodes[r].biId.subtract(biH)) < 1) { - selected = nodes[l].id; - } else { - selected = nodes[r].id; - } - break; - } - m = (l + r) >> 1; - comM = biH.compareTo(nodes[m].biId); - if (comM < 1) { - r = m; - comR = -comM; - } else { - l = m; - comL = comM; - } - } while (true); - List ret = new ArrayList<>(); - ret.add(this.getNodeId(selected)); - if (k > 1) { - l = m - 1; - r = m + 1; - while (ret.size() < k && (l >= 0 || r < nodes.length)) { - if (l < 0) { - ret.add(this.getNodeId(nodes[r++].id)); - } else if (r >= nodes.length) { - ret.add(this.getNodeId(nodes[l--].id)); - } else { - if (biH.subtract(nodes[l].biId).compareTo(nodes[r].biId.subtract(biH)) < 1) { - ret.add(this.getNodeId(nodes[l--].id)); - } else { - ret.add(this.getNodeId(nodes[r++].id)); - } - } - } - } - - return ret.toArray(new String[0]); - } - static class NetNeighbors { - NodeKey[] arr; - Map map; + String[] arr; + Set set; NetNeighbors() { - arr = new NodeKey[0]; - map = new HashMap<>(); + arr = new String[0]; + set = new HashSet<>(); } synchronized void addNewNode(String id) { - String key = id.substring(2); - if (!map.containsKey(key)) { - map.put(key, id); - LOGGER.info(map.size() + " nodes registered"); + if (!set.contains(id)) { + set.add(id); + LOGGER.info(set.size() + " nodes registered"); } } } diff --git a/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientHandler.java b/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientHandler.java index 14d7921..29973ce 100644 --- a/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientHandler.java +++ b/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientHandler.java @@ -29,7 +29,7 @@ public class NodeCenterClientHandler extends SimpleChannelInboundHandler private static final Logger LOGGER = LogManager.getLogger(NodeCenterClientHandler.class); public static String[] clientToClusterPlugins; public boolean hasPermission; - private NodeCenterClientController controller; + private final NodeCenterClientController controller; Channel channel; // UDPTrustfulExecutor udpExecutor; // RecoverMechExecutor recoverMechExecutor; diff --git a/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java b/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java index ceff348..247f126 100644 --- a/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java +++ b/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java @@ -14,6 +14,7 @@ import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.util.JsonUtil; import org.bdware.server.CongestionControl; import org.bdware.server.action.ActionExecutor; +import org.bdware.server.action.EventActions; import org.bdware.server.action.p2p.*; import org.bdware.units.NetworkManager; @@ -34,15 +35,21 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler { public ActionExecutor ae; ChannelHandlerContext ctx; private Channel channel; - private String master; // master node pubKey + private final String master; // master node pubKey public TCPClientFrameHandler(String masterPubkey) { master = masterPubkey; aliveCheckClientAction = new AliveCheckClientAction(masterPubkey); ae = new ActionExecutor<>( - executorService, aliveCheckClientAction, - new MasterClientTCPAction(), new MasterClientRecoverMechAction(), MasterClientTransferAction.instance, - new MasterServerRecoverMechAction(), new MasterServerTransferAction(), new MasterServerTCPAction()); + executorService, + aliveCheckClientAction, + new MasterClientTCPAction(), + new MasterClientRecoverMechAction(), + MasterClientTransferAction.instance, + new MasterServerRecoverMechAction(), + new MasterServerTransferAction(), + new MasterServerTCPAction(), + new EventActions()); for (String str : clientToAgentPlugins) { Object obj = createInstanceByClzName(str); ae.appendHandler(obj); diff --git a/src/main/java/org/bdware/server/tcp/TCPServerFrameHandler.java b/src/main/java/org/bdware/server/tcp/TCPServerFrameHandler.java index ea0114b..ffcaa13 100644 --- a/src/main/java/org/bdware/server/tcp/TCPServerFrameHandler.java +++ b/src/main/java/org/bdware/server/tcp/TCPServerFrameHandler.java @@ -1,7 +1,6 @@ package org.bdware.server.tcp; import com.google.gson.JsonObject; -import com.google.gson.JsonParser; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.Unpooled; @@ -14,6 +13,7 @@ import org.bdware.sc.util.JsonUtil; import org.bdware.server.CongestionControl; import org.bdware.server.action.Action; import org.bdware.server.action.ActionExecutor; +import org.bdware.server.action.EventActions; import org.bdware.server.action.p2p.*; import java.io.ByteArrayOutputStream; @@ -33,9 +33,15 @@ public class TCPServerFrameHandler extends SimpleChannelInboundHandler { checkAction = new AliveCheckServerAction(this); ae = new ActionExecutor( - executorService, checkAction, - new MasterClientTCPAction(), new MasterClientRecoverMechAction(), MasterClientTransferAction.instance, - new MasterServerRecoverMechAction(), new MasterServerTransferAction(), new MasterServerTCPAction()) { + executorService, + checkAction, + new MasterClientTCPAction(), + new MasterClientRecoverMechAction(), + MasterClientTransferAction.instance, + new MasterServerRecoverMechAction(), + new MasterServerTransferAction(), + new MasterServerTCPAction(), + new EventActions()) { @Override public boolean checkPermission( Action a, final JsonObject args, long permission) { @@ -92,11 +98,7 @@ public class TCPServerFrameHandler extends SimpleChannelInboundHandler { ByteBuf bb = (ByteBuf) frame; JsonObject arg; try { - - arg = - new JsonParser() - .parse(new InputStreamReader(new ByteBufInputStream(bb))) - .getAsJsonObject(); + arg = JsonUtil.parseReaderAsJsonObject(new InputStreamReader(new ByteBufInputStream(bb))); // logger.info("[MasterServer] receive:" + arg.toString()); } catch (Exception e) { diff --git a/src/main/java/org/bdware/server/trustedmodel/AgentManager.java b/src/main/java/org/bdware/server/trustedmodel/AgentManager.java index 8fb97c1..a03b741 100644 --- a/src/main/java/org/bdware/server/trustedmodel/AgentManager.java +++ b/src/main/java/org/bdware/server/trustedmodel/AgentManager.java @@ -1,5 +1,6 @@ package org.bdware.server.trustedmodel; +import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -18,6 +19,8 @@ import org.bdware.units.NetworkManager; import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; public class AgentManager implements AgentPeerManagerIntf { // key is masters' pubKey @@ -28,7 +31,6 @@ public class AgentManager implements AgentPeerManagerIntf { return NetworkManager.CONNECTORS.get(masterID).handler; } - @Override public void executeByOtherNodeAsync(String pubKey, ContractRequest c, ResultCallback cb) { LOGGER.error(pubKey + " " + c.getRequestID() + " " + c.getContractID() + " cb:" + cb); @@ -65,6 +67,29 @@ public class AgentManager implements AgentPeerManagerIntf { return NetworkManager.instance.hasAgentConnection(pubKey); } + @Override + public boolean deliverEvent(String pubKey, String event) { + if (null != pubKey && !CMActions.manager.nodeCenterConn.getNodeId().equals(pubKey)) { + LOGGER.info("Deliver event message in node " + pubKey.substring(0, 6)); + JsonObject msg = new JsonObject(); + msg.addProperty("action", "deliverEvent"); + msg.addProperty("data", event); + NetworkManager.instance.sendToAgent(pubKey, JsonUtil.toJson(msg)); + return true; + } + return false; + } + + @Override + public String[] listNodes() { + String[] ret = CMActions.manager.nodeCenterConn.listNodes(); + if (ret.length == 0) { + Set set = new TreeSet<>(NetworkManager.CONNECTORS.keySet()); + set.addAll(NetworkManager.SERVER_CONNECTORS.keySet()); + ret = set.toArray(new String[0]); + } + return ret; + } @Override public void transferToOtherNode(String pubKey, String contractID) { @@ -75,5 +100,4 @@ public class AgentManager implements AgentPeerManagerIntf { MasterClientTransferAction.instance.transferInstance(pubKey, meta.getID()); } - } diff --git a/src/main/java/org/bdware/server/ws/ContractManagerFrameHandler.java b/src/main/java/org/bdware/server/ws/ContractManagerFrameHandler.java index 67be079..5238cda 100644 --- a/src/main/java/org/bdware/server/ws/ContractManagerFrameHandler.java +++ b/src/main/java/org/bdware/server/ws/ContractManagerFrameHandler.java @@ -62,7 +62,7 @@ public class ContractManagerFrameHandler extends SimpleChannelInboundHandlerclient connection; public static final Map CONNECTORS = new ConcurrentHashMap<>(); //Manage client->server connection; - public static final Map SERVERCONNECTORS = new ConcurrentHashMap<>(); - private static Map slaverRouter = new HashMap<>(); - public static Map id2Slaves = new ConcurrentHashMap<>(); - + public static final Map SERVER_CONNECTORS = new ConcurrentHashMap<>(); public static final String NODE_CENTER_CLIENT = "NODE_CENTER_CLIENT"; public static final String P2P_GRPC_CLIENT = "P2P_GRPC_CLIENT"; + private static final Map slaverRouter = new HashMap<>(); private static final Logger LOGGER = LogManager.getLogger(NetworkManager.class); + public static Map id2Slaves = new ConcurrentHashMap<>(); public static NetworkManager instance = new NetworkManager(); - private NodeCenterClientHandler nodeCenterClientHandler; private final Map peerID2TCPAddress; + private NodeCenterClientHandler nodeCenterClientHandler; public NetworkManager() { peerID2TCPAddress = new HashMap<>(); } + public static void reconnectAgent(String master) { + LOGGER.debug( + String.format("master=%s\t%s", + master, + JsonUtil.toJson(slaverRouter))); + try { + NetworkManager.AgentConnector conn; + synchronized (conn = NetworkManager.CONNECTORS.get(master)) { + if (!conn.handler.isOpen()) { + String[] ipAndPort = slaverRouter.get(master).split(":"); + conn.bootstrap + .connect(ipAndPort[0], Integer.parseInt(ipAndPort[1])) + .sync() + .channel(); + } + } + } catch (Exception e) { + LOGGER.error("reconnect failed: " + e.getMessage()); + } + } public void initTCP(int tcpPort, EventLoopGroup workerGroup) { createTCPServer(tcpPort, workerGroup); @@ -96,13 +108,12 @@ public class NetworkManager { try { // manager.clearCache(); String URL = GlobalConf.getNodeCenterUrl(); - // System.out.println("GlobalConf.getNodeCenterUrl() - // URL" + URL); +// LOGGER.debug("GlobalConf.getNodeCenterUrl() -> URL=" + URL); URI uri = null; try { uri = new URI(URL); - } catch (URISyntaxException e1) { - LOGGER.error("creating uri failed! " + e1.getMessage()); + } catch (URISyntaxException e) { + LOGGER.error("creating uri failed! " + e.getMessage()); } if (!nodeCenterClientHandler.isConnected() || !ControllerManager.getNodeCenterController().syncPing()) { @@ -116,8 +127,7 @@ public class NetworkManager { + uri.getPort()); } } catch (Exception e) { - - e.printStackTrace(); +// e.printStackTrace(); LOGGER.error("connecting to node center failed! " + e.getMessage()); } }, @@ -190,13 +200,13 @@ public class NetworkManager { public void registerConnection(String nodeID, TCPServerFrameHandler handler) { LOGGER.info("nodeID:" + nodeID + " connected!!"); - SERVERCONNECTORS.put(nodeID, handler); + SERVER_CONNECTORS.put(nodeID, handler); } public void closeAgent(String agentPubkey) { - if (NetworkManager.SERVERCONNECTORS.containsKey(agentPubkey)) { - NetworkManager.SERVERCONNECTORS.get(agentPubkey).close(); - NetworkManager.SERVERCONNECTORS.remove(agentPubkey); + if (NetworkManager.SERVER_CONNECTORS.containsKey(agentPubkey)) { + NetworkManager.SERVER_CONNECTORS.get(agentPubkey).close(); + NetworkManager.SERVER_CONNECTORS.remove(agentPubkey); } if (NetworkManager.CONNECTORS.containsKey(agentPubkey)) { NetworkManager.CONNECTORS.get(agentPubkey).handler.close(); @@ -206,8 +216,8 @@ public class NetworkManager { } public void connectToAgent(String master, String contractID) { - LOGGER.info("[CMClientController] connectToMaster master= " + master); - // logger.debug("ConnectToMaster:" + master + "\nMasterRoute:" + slaverRouter.get(master)); + LOGGER.info("master=" + master); +// LOGGER.debug("master=" + master + "\n\trouter=" + slaverRouter.get(master)); Bootstrap b; AgentConnector connector = null; synchronized (CONNECTORS) { @@ -216,9 +226,10 @@ public class NetworkManager { CONNECTORS.put(master, connector); } } - if (connector != null) { + if (null != connector) { b = new Bootstrap(); - org.bdware.server.tcp.TCPClientFrameHandler handler = new org.bdware.server.tcp.TCPClientFrameHandler(master); + org.bdware.server.tcp.TCPClientFrameHandler handler = + new org.bdware.server.tcp.TCPClientFrameHandler(master); connector.bootstrap = b; connector.handler = handler; b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000); @@ -236,28 +247,6 @@ public class NetworkManager { reconnectAgent(master); } - public static void reconnectAgent(String master) { - LOGGER.debug( - "[MasterProxy] reconnect:" - + JsonUtil.toJson(slaverRouter) - + "\nmaster=" - + master); - try { - NetworkManager.AgentConnector conn; - synchronized (conn = NetworkManager.CONNECTORS.get(master)) { - if (!conn.handler.isOpen()) { - String[] ipAndPort = slaverRouter.get(master).split(":"); - conn.bootstrap - .connect(ipAndPort[0], Integer.parseInt(ipAndPort[1])) - .sync() - .channel(); - } - } - } catch (Exception e) { - e.printStackTrace(); - } - } - public void sendToAgent(String pubkey, String content) { try { if (sendToAgentByServer(pubkey, content)) { @@ -273,8 +262,10 @@ public class NetworkManager { } private boolean sendToAgentByServer(String pubkey, String content) { - TCPServerFrameHandler handler = SERVERCONNECTORS.get(pubkey); - if (handler == null) return false; + TCPServerFrameHandler handler = SERVER_CONNECTORS.get(pubkey); + if (null == handler) { + return false; + } try { handler.sendMsg(content); return true; @@ -296,7 +287,7 @@ public class NetworkManager { public boolean hasAgentConnection(String pubKey) { if (pubKey != null) { - if (SERVERCONNECTORS.containsKey(pubKey)) + if (SERVER_CONNECTORS.containsKey(pubKey)) return true; } return CONNECTORS.containsKey(pubKey); @@ -354,14 +345,13 @@ public class NetworkManager { send(unitMessage); return; } - if (NetworkType.P2P == networkType) { + if (NetworkType.P2P.equals(networkType)) { JavaContractServiceGrpcServer.sendMsg(unitMessage); return; } send(unitMessage); } - /** * UNUSED send to TCP nodes, if fail send by p2p * @@ -380,23 +370,19 @@ public class NetworkManager { continue; } // tcpClientFrameHandler = NetworkManager.instance.tcpClientMap.getOrDefault(peer, null); - if (peer != null) { - if (peerID2TCPAddress.containsKey(peer)) { - //recreateTCPClient(peer); - // instance.tcpClientMap.put(peer, tcpClientFrameHandler); - UnitMessage unitMessage = - msg.toBuilder().clearReceiver().addReceiver(peer).build(); - LOGGER.info("send msg by p2p to " + peer); - JavaContractServiceGrpcServer.sendMsg(unitMessage); - } else { - UnitMessage unitMessage = - msg.toBuilder().clearReceiver().addReceiver(peer).build(); - LOGGER.info("send msg by p2p to " + peer); - JavaContractServiceGrpcServer.sendMsg(unitMessage); - } - continue; + if (peerID2TCPAddress.containsKey(peer)) { + //recreateTCPClient(peer); + // instance.tcpClientMap.put(peer, tcpClientFrameHandler); + UnitMessage unitMessage = + msg.toBuilder().clearReceiver().addReceiver(peer).build(); + LOGGER.info("send msg by p2p to " + peer); + JavaContractServiceGrpcServer.sendMsg(unitMessage); + } else { + UnitMessage unitMessage = + msg.toBuilder().clearReceiver().addReceiver(peer).build(); + LOGGER.info("send msg by p2p to " + peer); + JavaContractServiceGrpcServer.sendMsg(unitMessage); } - LOGGER.info("send msg by tcp to " + peer); // tcpClientFrameHandler.sendMsg(msg); } } @@ -404,4 +390,9 @@ public class NetworkManager { public void sendTo(UnitMessage unitMessage, String symbol) { // } + + public static class AgentConnector { + public Bootstrap bootstrap; + public org.bdware.server.tcp.TCPClientFrameHandler handler; + } }