feat: update event mechanism

rename EventActions to EventWSActions; add EventActions to handle event messages, and register it in TCP*FrameHandler; update AgentManager to support event mechanism, update NetworkManager and NodeCenterClientController to provide node list
This commit is contained in:
Frank.R.Wu 2021-12-06 18:02:47 +08:00
parent 5ffc5fb813
commit 0e85394612
14 changed files with 227 additions and 291 deletions

View File

@ -5,7 +5,6 @@ import org.bdware.server.nodecenter.client.NodeCenterClientController;
import org.bdware.server.nodecenter.client.NodeCenterClientHandler; import org.bdware.server.nodecenter.client.NodeCenterClientHandler;
public class ControllerManager { public class ControllerManager {
private static NodeCenterClientController nodeCenterClientController; private static NodeCenterClientController nodeCenterClientController;
private static NodeCenterClientHandler nodeCenterClientHandler; private static NodeCenterClientHandler nodeCenterClientHandler;

View File

@ -91,7 +91,7 @@ public class GlobalConf {
if (f.exists()) { if (f.exists()) {
KeyValueDBUtil.instance.setValue( KeyValueDBUtil.instance.setValue(
dbName, "yjsPath", new File("./yjs.jar").getAbsolutePath()); dbName, "yjsPath", new File("./yjs.jar").getAbsolutePath());
}else { } else {
KeyValueDBUtil.instance.setValue( KeyValueDBUtil.instance.setValue(
dbName, "yjsPath", new File("./cp/yjs.jar").getAbsolutePath()); dbName, "yjsPath", new File("./cp/yjs.jar").getAbsolutePath());
} }

View File

@ -1340,6 +1340,57 @@ public class CMActions implements OnHashCallback {
return manager.staticVerify(c); return manager.staticVerify(c);
} }
@Action(userPermission = 1L << 16, async = true)
public void getControlFlowByFileName(JsonObject args, ResultCallback resultCallback) {
String project = args.get("projectName").getAsString();
String dirPath;
if (args.has("isPrivate") && args.get("isPrivate").getAsBoolean())
dirPath = GlobalConf.instance.privateDir;
else dirPath = GlobalConf.instance.publicDir;
String ypkPath = FileActions.autoCompile(dirPath, project);
Contract c = new Contract();
c.setScript(ypkPath);
// c.setType(Type.Algorithm);
c.setType(ContractExecType.Sole);
Map<String, String> r = new HashMap<>();
r.put("action", "onGetControlFlow");
r.put("result", manager.getControlFlow(c));
resultCallback.onResult(r);
}
@Action(async = true, userPermission = 1L << 26)
public void startContractInTempZips(JsonObject args, ResultCallback resultCallback) {
long start = System.currentTimeMillis();
Contract c = new Contract();
// c.setType(Type.Algorithm);
c.setType(ContractExecType.Sole);
String pubkey = args.get("owner").getAsString();
// SCManagerServlet.registerCCCallback(json.getString("requestID"), this);
Map<String, Object> r = new HashMap<>();
r.put("action", "onStartContract");
String path = args.get("path").getAsString();
path =
new File(new File(GlobalConf.instance.projectDir, "tempZips"), path)
.getAbsolutePath();
c.setScript(path);
c.setSignature(args.get("signature").getAsString());
c.setOwner(pubkey);
if (!c.verifySignature()) {
r.put("data", "verify failed");
resultCallback.onResult(r);
return;
}
r.put("data", manager.startContractAndRedirect(c, System.out));
r.put("cid", c.getID());
r.put("executeTime", System.currentTimeMillis() - start);
resultCallback.onResult(r);
if (args.has("dumpPeriod")) {
LOGGER.debug("[CMActions]启动后设置dump周期" + args.get("dumpPeriod").getAsString());
manager.changeDumpPeriod(c.getID(), args.get("dumpPeriod").getAsString());
}
}
// @Action(userPermission = 1L << 26, async = true) // @Action(userPermission = 1L << 26, async = true)
// public void queryContractInstanceInfoByDOI(JsonObject args, ResultCallback resultCallback) // public void queryContractInstanceInfoByDOI(JsonObject args, ResultCallback resultCallback)
// { // {
@ -1431,57 +1482,6 @@ public class CMActions implements OnHashCallback {
* resultCallback.onResult(gson.toJson(r)); } * resultCallback.onResult(gson.toJson(r)); }
*/ */
@Action(userPermission = 1L << 16, async = true)
public void getControlFlowByFileName(JsonObject args, ResultCallback resultCallback) {
String project = args.get("projectName").getAsString();
String dirPath;
if (args.has("isPrivate") && args.get("isPrivate").getAsBoolean())
dirPath = GlobalConf.instance.privateDir;
else dirPath = GlobalConf.instance.publicDir;
String ypkPath = FileActions.autoCompile(dirPath, project);
Contract c = new Contract();
c.setScript(ypkPath);
// c.setType(Type.Algorithm);
c.setType(ContractExecType.Sole);
Map<String, String> r = new HashMap<>();
r.put("action", "onGetControlFlow");
r.put("result", manager.getControlFlow(c));
resultCallback.onResult(r);
}
@Action(async = true, userPermission = 1L << 26)
public void startContractInTempZips(JsonObject args, ResultCallback resultCallback) {
long start = System.currentTimeMillis();
Contract c = new Contract();
// c.setType(Type.Algorithm);
c.setType(ContractExecType.Sole);
String pubkey = args.get("owner").getAsString();
// SCManagerServlet.registerCCCallback(json.getString("requestID"), this);
Map<String, Object> r = new HashMap<>();
r.put("action", "onStartContract");
String path = args.get("path").getAsString();
path =
new File(new File(GlobalConf.instance.projectDir, "tempZips"), path)
.getAbsolutePath();
c.setScript(path);
c.setSignature(args.get("signature").getAsString());
c.setOwner(pubkey);
if (!c.verifySignature()) {
r.put("data", "verify failed");
resultCallback.onResult(r);
return;
}
r.put("data", manager.startContractAndRedirect(c, System.out));
r.put("cid", c.getID());
r.put("executeTime", System.currentTimeMillis() - start);
resultCallback.onResult(r);
if (args.has("dumpPeriod")) {
LOGGER.debug("[CMActions]启动后设置dump周期" + args.get("dumpPeriod").getAsString());
manager.changeDumpPeriod(c.getID(), args.get("dumpPeriod").getAsString());
}
}
@Action(userPermission = 1L << 26, async = true, httpAccess = false) @Action(userPermission = 1L << 26, async = true, httpAccess = false)
public void killAllContract(JsonObject args, ResultCallback resultCallback) { public void killAllContract(JsonObject args, ResultCallback resultCallback) {
if (args.has("verifiedPubKey")) { if (args.has("verifiedPubKey")) {

View File

@ -1,41 +1,14 @@
package org.bdware.server.action; package org.bdware.server.action;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import org.bdware.sc.ContractClient;
import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.util.HashUtil; import org.bdware.sc.event.REvent;
import org.bdware.server.action.Action; import org.bdware.sc.util.JsonUtil;
import org.bdware.server.action.CMActions;
public class EventActions { public class EventActions {
@Action(async = true, userPermission = 0) @Action(async = true, userPermission = 0)
public void subEvent(JsonObject args, final ResultCallback rcb) { public void deliverEvent(JsonObject args, final ResultCallback rcb) {
JsonObject ret = new JsonObject(); CMActions.manager.deliverEvent(
ret.addProperty("action", "onSubEvent"); JsonUtil.fromJson(args.get("data").getAsString(), REvent.class));
ret.addProperty("status", "Error");
if (args.has("requestID")) {
ret.addProperty("responseID", args.get("requestID").getAsString());
}
if (!args.has("topic")) {
ret.addProperty("data", "no topic arg!");
rcb.onResult(ret.toString());
return;
}
String topic = args.get("topic").getAsString();
if (args.has("contractID")) {
String argCID = args.get("contractID").getAsString();
ContractClient client = CMActions.manager.getClient(argCID);
if (null == client) {
ret.addProperty("data", "invalid contract ID or Name!");
rcb.onResult(ret.toString());
return;
}
String contractID = client.getContractID();
topic = HashUtil.sha3(contractID, topic);
}
CMActions.manager.subEventByClient(topic, rcb.getChannel());
ret.addProperty("status", "Success");
ret.addProperty("data", topic);
rcb.onResult(ret);
} }
} }

View File

@ -0,0 +1,39 @@
package org.bdware.server.action;
import com.google.gson.JsonObject;
import org.bdware.sc.ContractClient;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.util.HashUtil;
public class EventWSActions {
@Action(async = true, userPermission = 0)
public void subEvent(JsonObject args, final ResultCallback rcb) {
JsonObject ret = new JsonObject();
ret.addProperty("action", "onSubEvent");
ret.addProperty("status", "Error");
if (args.has("requestID")) {
ret.addProperty("responseID", args.get("requestID").getAsString());
}
if (!args.has("topic")) {
ret.addProperty("data", "no topic arg!");
rcb.onResult(ret.toString());
return;
}
String topic = args.get("topic").getAsString();
if (args.has("contractID")) {
String argCID = args.get("contractID").getAsString();
ContractClient client = CMActions.manager.getClient(argCID);
if (null == client) {
ret.addProperty("data", "invalid contract ID or Name!");
rcb.onResult(ret.toString());
return;
}
String contractID = client.getContractID();
topic = HashUtil.sha3(contractID, topic);
}
CMActions.manager.subEventByClient(topic, rcb.getChannel());
ret.addProperty("status", "Success");
ret.addProperty("data", topic);
rcb.onResult(ret);
}
}

View File

@ -30,7 +30,7 @@ import java.util.zip.GZIPOutputStream;
public class MasterServerRecoverMechAction { public class MasterServerRecoverMechAction {
private static final Logger LOGGER = LogManager.getLogger(MasterServerRecoverMechAction.class); private static final Logger LOGGER = LogManager.getLogger(MasterServerRecoverMechAction.class);
public static Map<String, Map<String, RecoverFlag>> recoverStatus = new ConcurrentHashMap<>(); public static Map<String, Map<String, RecoverFlag>> recoverStatus = new ConcurrentHashMap<>();
private Map<String, OutputStream> stateFileMap = new HashMap<>(); private final Map<String, OutputStream> stateFileMap = new HashMap<>();
public MasterServerRecoverMechAction() { public MasterServerRecoverMechAction() {
} }

View File

@ -31,6 +31,7 @@ import org.bdware.units.msghandler.ResponseCenter;
import java.io.File; import java.io.File;
@SuppressWarnings("unused")
public class _UNUSED_ExecutionAction implements OnHashCallback { public class _UNUSED_ExecutionAction implements OnHashCallback {
private static final Logger LOGGER = LogManager.getLogger(_UNUSED_ExecutionAction.class); private static final Logger LOGGER = LogManager.getLogger(_UNUSED_ExecutionAction.class);

View File

@ -12,9 +12,7 @@ import org.bdware.sc.conn.ByteUtil;
import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.db.CMTables; import org.bdware.sc.db.CMTables;
import org.bdware.sc.db.KeyValueDBUtil; import org.bdware.sc.db.KeyValueDBUtil;
import org.bdware.sc.event.REvent;
import org.bdware.sc.units.MultiContractMeta; import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.util.HashUtil;
import org.bdware.sc.util.JsonUtil; import org.bdware.sc.util.JsonUtil;
import org.bdware.server.GlobalConf; import org.bdware.server.GlobalConf;
import org.bdware.server.action.Action; import org.bdware.server.action.Action;
@ -33,7 +31,6 @@ import org.zz.gmhelper.SM2KeyPair;
import org.zz.gmhelper.SM2Util; import org.zz.gmhelper.SM2Util;
import java.io.*; import java.io.*;
import java.math.BigInteger;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
@ -44,21 +41,18 @@ public class NodeCenterClientController implements NodeCenterConn {
public static SyncResult sync = new SyncResult(); public static SyncResult sync = new SyncResult();
private static boolean startCheck = false; private static boolean startCheck = false;
private final Map<String, FileOutputStream> fileMap; private final Map<String, FileOutputStream> fileMap;
public Map<String, ResultCallback> distributeReqMap = new ConcurrentHashMap<>();
private final NetNeighbors neighbors; private final NetNeighbors neighbors;
public Map<String, ResultCallback> distributeReqMap = new ConcurrentHashMap<>();
// public NodeCenterClientController cmClientController; // public NodeCenterClientController cmClientController;
String nodeID; String nodeID;
Map<String, Set<String>> subscribedInfo;
NodeCenterClientHandler handler; NodeCenterClientHandler handler;
// 合约contractIDmaster的公钥 // 合约contractIDmaster的公钥
Map<String, String> contractID2PubKey = new ConcurrentHashMap<>(); Map<String, String> contractID2PubKey = new ConcurrentHashMap<>();
public NodeCenterClientController(String nodeID) { public NodeCenterClientController(String nodeID) {
subscribedInfo = new HashMap<>(); this.fileMap = new HashMap<>();
fileMap = new HashMap<>();
this.nodeID = nodeID; this.nodeID = nodeID;
neighbors = new NetNeighbors(); this.neighbors = new NetNeighbors();
} }
public void init(NodeCenterClientHandler webSocketClientHandler) { public void init(NodeCenterClientHandler webSocketClientHandler) {
@ -270,12 +264,6 @@ public class NodeCenterClientController implements NodeCenterConn {
updateContract(); updateContract();
} }
@Action(async = true)
public void deliverEvent(JsonObject jo, ResultCallback cb) {
CMActions.manager.deliverEvent(
JsonUtil.fromJson(jo.get("data").getAsString(), REvent.class));
}
@Action(async = true) @Action(async = true)
public void executeContractLocally(JsonObject jo, ResultCallback resultCallback) { public void executeContractLocally(JsonObject jo, ResultCallback resultCallback) {
String requestID = jo.get("requestID").getAsString(); String requestID = jo.get("requestID").getAsString();
@ -299,46 +287,28 @@ public class NodeCenterClientController implements NodeCenterConn {
sync.wakeUp(requestID, data); sync.wakeUp(requestID, data);
} }
// deliver REvent
@Override @Override
public boolean deliverEvent(String event, String target) { public String[] listNodes() {
if (null != target && !nodeID.equals(target)) { String[] ret;
LOGGER.info("Deliver event message in node " + target.substring(0, 6));
JsonObject jo = new JsonObject();
jo.addProperty("msg", event);
jo.addProperty("target", target);
jo.addProperty("action", "deliverEvent");
sendMsg(JsonUtil.toJson(jo));
return true;
}
return false;
}
@Override
public NodeKey[] listNodes() {
NodeKey[] ret;
synchronized (neighbors) { synchronized (neighbors) {
if (neighbors.arr.length != neighbors.map.size()) { if (neighbors.arr.length != neighbors.set.size()) {
Set<String> set = new TreeSet<>(neighbors.map.keySet()); Set<String> set = new TreeSet<>(neighbors.set);
neighbors.arr = new NodeKey[set.size()]; neighbors.arr = new String[set.size()];
int i = 0; int i = 0;
for (String id : set) { for (String id : set) {
neighbors.arr[i++] = new NodeKey(id); neighbors.arr[i++] = id;
} }
} }
ret = neighbors.arr; ret = neighbors.arr;
LOGGER.debug(neighbors.map.size() + " nodes registered"); LOGGER.debug(neighbors.set.size() + " nodes registered");
} }
return ret; return ret;
} }
@Override @Override
public String getNodeId(String str) { public String getNodeId() {
if (null == str) {
return nodeID; return nodeID;
} }
return neighbors.map.get(str);
}
@Action(async = true) @Action(async = true)
public void onQueryAllRouteInfo(JsonObject jo, ResultCallback cb) { public void onQueryAllRouteInfo(JsonObject jo, ResultCallback cb) {
@ -803,89 +773,19 @@ public class NodeCenterClientController implements NodeCenterConn {
sync.wakeUp(jo.get("responseID").getAsString(), jo.toString()); sync.wakeUp(jo.get("responseID").getAsString(), jo.toString());
} }
@Override
public String[] getClusterByKey(String key, int k) {
NodeKey[] nodes = this.listNodes();
if (nodes.length == 0) {
return null;
}
if (nodes.length == 1) {
return new String[]{this.getNodeId(nodes[0].id)};
}
// get hash with enough length
String hash = HashUtil.sha3ToFixedLen(key, nodes[0].id.length());
BigInteger biH = new BigInteger(hash, 16);
// binary search, to find the nearest node with hash
int l = 0, r = nodes.length - 1, m = 0,
comL = biH.compareTo(nodes[l].biId), comR = nodes[r].biId.compareTo(biH),
comM;
String selected;
do {
if (comL < 1) {
selected = nodes[l].id;
break;
}
if (comR < 1) {
selected = nodes[r].id;
break;
}
if (l + 1 == r) {
if (biH.subtract(nodes[l].biId).compareTo(nodes[r].biId.subtract(biH)) < 1) {
selected = nodes[l].id;
} else {
selected = nodes[r].id;
}
break;
}
m = (l + r) >> 1;
comM = biH.compareTo(nodes[m].biId);
if (comM < 1) {
r = m;
comR = -comM;
} else {
l = m;
comL = comM;
}
} while (true);
List<String> ret = new ArrayList<>();
ret.add(this.getNodeId(selected));
if (k > 1) {
l = m - 1;
r = m + 1;
while (ret.size() < k && (l >= 0 || r < nodes.length)) {
if (l < 0) {
ret.add(this.getNodeId(nodes[r++].id));
} else if (r >= nodes.length) {
ret.add(this.getNodeId(nodes[l--].id));
} else {
if (biH.subtract(nodes[l].biId).compareTo(nodes[r].biId.subtract(biH)) < 1) {
ret.add(this.getNodeId(nodes[l--].id));
} else {
ret.add(this.getNodeId(nodes[r++].id));
}
}
}
}
return ret.toArray(new String[0]);
}
static class NetNeighbors { static class NetNeighbors {
NodeKey[] arr; String[] arr;
Map<String, String> map; Set<String> set;
NetNeighbors() { NetNeighbors() {
arr = new NodeKey[0]; arr = new String[0];
map = new HashMap<>(); set = new HashSet<>();
} }
synchronized void addNewNode(String id) { synchronized void addNewNode(String id) {
String key = id.substring(2); if (!set.contains(id)) {
if (!map.containsKey(key)) { set.add(id);
map.put(key, id); LOGGER.info(set.size() + " nodes registered");
LOGGER.info(map.size() + " nodes registered");
} }
} }
} }

View File

@ -29,7 +29,7 @@ public class NodeCenterClientHandler extends SimpleChannelInboundHandler<Object>
private static final Logger LOGGER = LogManager.getLogger(NodeCenterClientHandler.class); private static final Logger LOGGER = LogManager.getLogger(NodeCenterClientHandler.class);
public static String[] clientToClusterPlugins; public static String[] clientToClusterPlugins;
public boolean hasPermission; public boolean hasPermission;
private NodeCenterClientController controller; private final NodeCenterClientController controller;
Channel channel; Channel channel;
// UDPTrustfulExecutor udpExecutor; // UDPTrustfulExecutor udpExecutor;
// RecoverMechExecutor recoverMechExecutor; // RecoverMechExecutor recoverMechExecutor;

View File

@ -14,6 +14,7 @@ import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.util.JsonUtil; import org.bdware.sc.util.JsonUtil;
import org.bdware.server.CongestionControl; import org.bdware.server.CongestionControl;
import org.bdware.server.action.ActionExecutor; import org.bdware.server.action.ActionExecutor;
import org.bdware.server.action.EventActions;
import org.bdware.server.action.p2p.*; import org.bdware.server.action.p2p.*;
import org.bdware.units.NetworkManager; import org.bdware.units.NetworkManager;
@ -34,15 +35,21 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler<Object> {
public ActionExecutor<ResultCallback, JsonObject> ae; public ActionExecutor<ResultCallback, JsonObject> ae;
ChannelHandlerContext ctx; ChannelHandlerContext ctx;
private Channel channel; private Channel channel;
private String master; // master node pubKey private final String master; // master node pubKey
public TCPClientFrameHandler(String masterPubkey) { public TCPClientFrameHandler(String masterPubkey) {
master = masterPubkey; master = masterPubkey;
aliveCheckClientAction = new AliveCheckClientAction(masterPubkey); aliveCheckClientAction = new AliveCheckClientAction(masterPubkey);
ae = new ActionExecutor<>( ae = new ActionExecutor<>(
executorService, aliveCheckClientAction, executorService,
new MasterClientTCPAction(), new MasterClientRecoverMechAction(), MasterClientTransferAction.instance, aliveCheckClientAction,
new MasterServerRecoverMechAction(), new MasterServerTransferAction(), new MasterServerTCPAction()); new MasterClientTCPAction(),
new MasterClientRecoverMechAction(),
MasterClientTransferAction.instance,
new MasterServerRecoverMechAction(),
new MasterServerTransferAction(),
new MasterServerTCPAction(),
new EventActions());
for (String str : clientToAgentPlugins) { for (String str : clientToAgentPlugins) {
Object obj = createInstanceByClzName(str); Object obj = createInstanceByClzName(str);
ae.appendHandler(obj); ae.appendHandler(obj);

View File

@ -1,7 +1,6 @@
package org.bdware.server.tcp; package org.bdware.server.tcp;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
@ -14,6 +13,7 @@ import org.bdware.sc.util.JsonUtil;
import org.bdware.server.CongestionControl; import org.bdware.server.CongestionControl;
import org.bdware.server.action.Action; import org.bdware.server.action.Action;
import org.bdware.server.action.ActionExecutor; import org.bdware.server.action.ActionExecutor;
import org.bdware.server.action.EventActions;
import org.bdware.server.action.p2p.*; import org.bdware.server.action.p2p.*;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
@ -33,9 +33,15 @@ public class TCPServerFrameHandler extends SimpleChannelInboundHandler<Object> {
checkAction = new AliveCheckServerAction(this); checkAction = new AliveCheckServerAction(this);
ae = ae =
new ActionExecutor<ResultCallback, JsonObject>( new ActionExecutor<ResultCallback, JsonObject>(
executorService, checkAction, executorService,
new MasterClientTCPAction(), new MasterClientRecoverMechAction(), MasterClientTransferAction.instance, checkAction,
new MasterServerRecoverMechAction(), new MasterServerTransferAction(), new MasterServerTCPAction()) { new MasterClientTCPAction(),
new MasterClientRecoverMechAction(),
MasterClientTransferAction.instance,
new MasterServerRecoverMechAction(),
new MasterServerTransferAction(),
new MasterServerTCPAction(),
new EventActions()) {
@Override @Override
public boolean checkPermission( public boolean checkPermission(
Action a, final JsonObject args, long permission) { Action a, final JsonObject args, long permission) {
@ -92,11 +98,7 @@ public class TCPServerFrameHandler extends SimpleChannelInboundHandler<Object> {
ByteBuf bb = (ByteBuf) frame; ByteBuf bb = (ByteBuf) frame;
JsonObject arg; JsonObject arg;
try { try {
arg = JsonUtil.parseReaderAsJsonObject(new InputStreamReader(new ByteBufInputStream(bb)));
arg =
new JsonParser()
.parse(new InputStreamReader(new ByteBufInputStream(bb)))
.getAsJsonObject();
// logger.info("[MasterServer] receive:" + arg.toString()); // logger.info("[MasterServer] receive:" + arg.toString());
} catch (Exception e) { } catch (Exception e) {

View File

@ -1,5 +1,6 @@
package org.bdware.server.trustedmodel; package org.bdware.server.trustedmodel;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive; import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -18,6 +19,8 @@ import org.bdware.units.NetworkManager;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
public class AgentManager implements AgentPeerManagerIntf { public class AgentManager implements AgentPeerManagerIntf {
// key is masters' pubKey // key is masters' pubKey
@ -28,7 +31,6 @@ public class AgentManager implements AgentPeerManagerIntf {
return NetworkManager.CONNECTORS.get(masterID).handler; return NetworkManager.CONNECTORS.get(masterID).handler;
} }
@Override @Override
public void executeByOtherNodeAsync(String pubKey, ContractRequest c, ResultCallback cb) { public void executeByOtherNodeAsync(String pubKey, ContractRequest c, ResultCallback cb) {
LOGGER.error(pubKey + " " + c.getRequestID() + " " + c.getContractID() + " cb:" + cb); LOGGER.error(pubKey + " " + c.getRequestID() + " " + c.getContractID() + " cb:" + cb);
@ -65,6 +67,29 @@ public class AgentManager implements AgentPeerManagerIntf {
return NetworkManager.instance.hasAgentConnection(pubKey); return NetworkManager.instance.hasAgentConnection(pubKey);
} }
@Override
public boolean deliverEvent(String pubKey, String event) {
if (null != pubKey && !CMActions.manager.nodeCenterConn.getNodeId().equals(pubKey)) {
LOGGER.info("Deliver event message in node " + pubKey.substring(0, 6));
JsonObject msg = new JsonObject();
msg.addProperty("action", "deliverEvent");
msg.addProperty("data", event);
NetworkManager.instance.sendToAgent(pubKey, JsonUtil.toJson(msg));
return true;
}
return false;
}
@Override
public String[] listNodes() {
String[] ret = CMActions.manager.nodeCenterConn.listNodes();
if (ret.length == 0) {
Set<String> set = new TreeSet<>(NetworkManager.CONNECTORS.keySet());
set.addAll(NetworkManager.SERVER_CONNECTORS.keySet());
ret = set.toArray(new String[0]);
}
return ret;
}
@Override @Override
public void transferToOtherNode(String pubKey, String contractID) { public void transferToOtherNode(String pubKey, String contractID) {
@ -75,5 +100,4 @@ public class AgentManager implements AgentPeerManagerIntf {
MasterClientTransferAction.instance.transferInstance(pubKey, meta.getID()); MasterClientTransferAction.instance.transferInstance(pubKey, meta.getID());
} }
} }

View File

@ -62,7 +62,7 @@ public class ContractManagerFrameHandler extends SimpleChannelInboundHandler<Web
new CMLogAction(), new CMLogAction(),
new ProcessAction(), new ProcessAction(),
GRPCPool.instance, GRPCPool.instance,
new EventActions()) { new EventWSActions()) {
@Override @Override
public boolean checkPermission(Action a, JsonObject arg, long permission) { public boolean checkPermission(Action a, JsonObject arg, long permission) {
long val = a.userPermission(); long val = a.userPermission();

View File

@ -41,31 +41,43 @@ import java.util.concurrent.TimeUnit;
* @author OliveDS (Shuang Deng) * @author OliveDS (Shuang Deng)
*/ */
public class NetworkManager { public class NetworkManager {
public static class AgentConnector {
public Bootstrap bootstrap;
public org.bdware.server.tcp.TCPClientFrameHandler handler;
}
//Manage server->client connection; //Manage server->client connection;
public static final Map<String, AgentConnector> CONNECTORS = new ConcurrentHashMap<>(); public static final Map<String, AgentConnector> CONNECTORS = new ConcurrentHashMap<>();
//Manage client->server connection; //Manage client->server connection;
public static final Map<String, TCPServerFrameHandler> SERVERCONNECTORS = new ConcurrentHashMap<>(); public static final Map<String, TCPServerFrameHandler> SERVER_CONNECTORS = new ConcurrentHashMap<>();
private static Map<String, String> slaverRouter = new HashMap<>();
public static Map<String, SlaveNode> id2Slaves = new ConcurrentHashMap<>();
public static final String NODE_CENTER_CLIENT = "NODE_CENTER_CLIENT"; public static final String NODE_CENTER_CLIENT = "NODE_CENTER_CLIENT";
public static final String P2P_GRPC_CLIENT = "P2P_GRPC_CLIENT"; public static final String P2P_GRPC_CLIENT = "P2P_GRPC_CLIENT";
private static final Map<String, String> slaverRouter = new HashMap<>();
private static final Logger LOGGER = LogManager.getLogger(NetworkManager.class); private static final Logger LOGGER = LogManager.getLogger(NetworkManager.class);
public static Map<String, SlaveNode> id2Slaves = new ConcurrentHashMap<>();
public static NetworkManager instance = new NetworkManager(); public static NetworkManager instance = new NetworkManager();
private NodeCenterClientHandler nodeCenterClientHandler;
private final Map<String, String> peerID2TCPAddress; private final Map<String, String> peerID2TCPAddress;
private NodeCenterClientHandler nodeCenterClientHandler;
public NetworkManager() { public NetworkManager() {
peerID2TCPAddress = new HashMap<>(); peerID2TCPAddress = new HashMap<>();
} }
public static void reconnectAgent(String master) {
LOGGER.debug(
String.format("master=%s\t%s",
master,
JsonUtil.toJson(slaverRouter)));
try {
NetworkManager.AgentConnector conn;
synchronized (conn = NetworkManager.CONNECTORS.get(master)) {
if (!conn.handler.isOpen()) {
String[] ipAndPort = slaverRouter.get(master).split(":");
conn.bootstrap
.connect(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
.sync()
.channel();
}
}
} catch (Exception e) {
LOGGER.error("reconnect failed: " + e.getMessage());
}
}
public void initTCP(int tcpPort, EventLoopGroup workerGroup) { public void initTCP(int tcpPort, EventLoopGroup workerGroup) {
createTCPServer(tcpPort, workerGroup); createTCPServer(tcpPort, workerGroup);
@ -96,13 +108,12 @@ public class NetworkManager {
try { try {
// manager.clearCache(); // manager.clearCache();
String URL = GlobalConf.getNodeCenterUrl(); String URL = GlobalConf.getNodeCenterUrl();
// System.out.println("GlobalConf.getNodeCenterUrl() // LOGGER.debug("GlobalConf.getNodeCenterUrl() -> URL=" + URL);
// URL" + URL);
URI uri = null; URI uri = null;
try { try {
uri = new URI(URL); uri = new URI(URL);
} catch (URISyntaxException e1) { } catch (URISyntaxException e) {
LOGGER.error("creating uri failed! " + e1.getMessage()); LOGGER.error("creating uri failed! " + e.getMessage());
} }
if (!nodeCenterClientHandler.isConnected() if (!nodeCenterClientHandler.isConnected()
|| !ControllerManager.getNodeCenterController().syncPing()) { || !ControllerManager.getNodeCenterController().syncPing()) {
@ -116,8 +127,7 @@ public class NetworkManager {
+ uri.getPort()); + uri.getPort());
} }
} catch (Exception e) { } catch (Exception e) {
// e.printStackTrace();
e.printStackTrace();
LOGGER.error("connecting to node center failed! " + e.getMessage()); LOGGER.error("connecting to node center failed! " + e.getMessage());
} }
}, },
@ -190,13 +200,13 @@ public class NetworkManager {
public void registerConnection(String nodeID, TCPServerFrameHandler handler) { public void registerConnection(String nodeID, TCPServerFrameHandler handler) {
LOGGER.info("nodeID:" + nodeID + " connected!!"); LOGGER.info("nodeID:" + nodeID + " connected!!");
SERVERCONNECTORS.put(nodeID, handler); SERVER_CONNECTORS.put(nodeID, handler);
} }
public void closeAgent(String agentPubkey) { public void closeAgent(String agentPubkey) {
if (NetworkManager.SERVERCONNECTORS.containsKey(agentPubkey)) { if (NetworkManager.SERVER_CONNECTORS.containsKey(agentPubkey)) {
NetworkManager.SERVERCONNECTORS.get(agentPubkey).close(); NetworkManager.SERVER_CONNECTORS.get(agentPubkey).close();
NetworkManager.SERVERCONNECTORS.remove(agentPubkey); NetworkManager.SERVER_CONNECTORS.remove(agentPubkey);
} }
if (NetworkManager.CONNECTORS.containsKey(agentPubkey)) { if (NetworkManager.CONNECTORS.containsKey(agentPubkey)) {
NetworkManager.CONNECTORS.get(agentPubkey).handler.close(); NetworkManager.CONNECTORS.get(agentPubkey).handler.close();
@ -206,8 +216,8 @@ public class NetworkManager {
} }
public void connectToAgent(String master, String contractID) { public void connectToAgent(String master, String contractID) {
LOGGER.info("[CMClientController] connectToMaster master= " + master); LOGGER.info("master=" + master);
// logger.debug("ConnectToMaster:" + master + "\nMasterRoute:" + slaverRouter.get(master)); // LOGGER.debug("master=" + master + "\n\trouter=" + slaverRouter.get(master));
Bootstrap b; Bootstrap b;
AgentConnector connector = null; AgentConnector connector = null;
synchronized (CONNECTORS) { synchronized (CONNECTORS) {
@ -216,9 +226,10 @@ public class NetworkManager {
CONNECTORS.put(master, connector); CONNECTORS.put(master, connector);
} }
} }
if (connector != null) { if (null != connector) {
b = new Bootstrap(); b = new Bootstrap();
org.bdware.server.tcp.TCPClientFrameHandler handler = new org.bdware.server.tcp.TCPClientFrameHandler(master); org.bdware.server.tcp.TCPClientFrameHandler handler =
new org.bdware.server.tcp.TCPClientFrameHandler(master);
connector.bootstrap = b; connector.bootstrap = b;
connector.handler = handler; connector.handler = handler;
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000); b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
@ -236,28 +247,6 @@ public class NetworkManager {
reconnectAgent(master); reconnectAgent(master);
} }
public static void reconnectAgent(String master) {
LOGGER.debug(
"[MasterProxy] reconnect:"
+ JsonUtil.toJson(slaverRouter)
+ "\nmaster="
+ master);
try {
NetworkManager.AgentConnector conn;
synchronized (conn = NetworkManager.CONNECTORS.get(master)) {
if (!conn.handler.isOpen()) {
String[] ipAndPort = slaverRouter.get(master).split(":");
conn.bootstrap
.connect(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
.sync()
.channel();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendToAgent(String pubkey, String content) { public void sendToAgent(String pubkey, String content) {
try { try {
if (sendToAgentByServer(pubkey, content)) { if (sendToAgentByServer(pubkey, content)) {
@ -273,8 +262,10 @@ public class NetworkManager {
} }
private boolean sendToAgentByServer(String pubkey, String content) { private boolean sendToAgentByServer(String pubkey, String content) {
TCPServerFrameHandler handler = SERVERCONNECTORS.get(pubkey); TCPServerFrameHandler handler = SERVER_CONNECTORS.get(pubkey);
if (handler == null) return false; if (null == handler) {
return false;
}
try { try {
handler.sendMsg(content); handler.sendMsg(content);
return true; return true;
@ -296,7 +287,7 @@ public class NetworkManager {
public boolean hasAgentConnection(String pubKey) { public boolean hasAgentConnection(String pubKey) {
if (pubKey != null) { if (pubKey != null) {
if (SERVERCONNECTORS.containsKey(pubKey)) if (SERVER_CONNECTORS.containsKey(pubKey))
return true; return true;
} }
return CONNECTORS.containsKey(pubKey); return CONNECTORS.containsKey(pubKey);
@ -354,14 +345,13 @@ public class NetworkManager {
send(unitMessage); send(unitMessage);
return; return;
} }
if (NetworkType.P2P == networkType) { if (NetworkType.P2P.equals(networkType)) {
JavaContractServiceGrpcServer.sendMsg(unitMessage); JavaContractServiceGrpcServer.sendMsg(unitMessage);
return; return;
} }
send(unitMessage); send(unitMessage);
} }
/** /**
* UNUSED send to TCP nodes, if fail send by p2p * UNUSED send to TCP nodes, if fail send by p2p
* *
@ -380,7 +370,6 @@ public class NetworkManager {
continue; continue;
} }
// tcpClientFrameHandler = NetworkManager.instance.tcpClientMap.getOrDefault(peer, null); // tcpClientFrameHandler = NetworkManager.instance.tcpClientMap.getOrDefault(peer, null);
if (peer != null) {
if (peerID2TCPAddress.containsKey(peer)) { if (peerID2TCPAddress.containsKey(peer)) {
//recreateTCPClient(peer); //recreateTCPClient(peer);
// instance.tcpClientMap.put(peer, tcpClientFrameHandler); // instance.tcpClientMap.put(peer, tcpClientFrameHandler);
@ -394,9 +383,6 @@ public class NetworkManager {
LOGGER.info("send msg by p2p to " + peer); LOGGER.info("send msg by p2p to " + peer);
JavaContractServiceGrpcServer.sendMsg(unitMessage); JavaContractServiceGrpcServer.sendMsg(unitMessage);
} }
continue;
}
LOGGER.info("send msg by tcp to " + peer);
// tcpClientFrameHandler.sendMsg(msg); // tcpClientFrameHandler.sendMsg(msg);
} }
} }
@ -404,4 +390,9 @@ public class NetworkManager {
public void sendTo(UnitMessage unitMessage, String symbol) { public void sendTo(UnitMessage unitMessage, String symbol) {
// //
} }
public static class AgentConnector {
public Bootstrap bootstrap;
public org.bdware.server.tcp.TCPClientFrameHandler handler;
}
} }