merge dengshuang-feature

refactor: NetworkManager
This commit is contained in:
CaiHQ 2021-11-21 23:20:44 +08:00
parent 098c167a95
commit 5f283513e2
33 changed files with 984 additions and 1223 deletions

View File

@ -0,0 +1,28 @@
package org.bdware.server;
import org.bdware.server.action.CMActions;
import org.bdware.server.nodecenter.client.NodeCenterClientController;
import org.bdware.server.nodecenter.client.NodeCenterClientHandler;
public class ControllerManager {
private static NodeCenterClientController nodeCenterClientController;
private static NodeCenterClientHandler nodeCenterClientHandler;
public static NodeCenterClientHandler createNodeCenterClientHandler() {
assert nodeCenterClientHandler == null;
nodeCenterClientHandler = new NodeCenterClientHandler();
nodeCenterClientController = nodeCenterClientHandler.getController();
CMActions.manager.nodeCenterConn = getNodeCenterController();
return nodeCenterClientHandler;
}
public static NodeCenterClientController getNodeCenterController() {
return nodeCenterClientController;
}
public static NodeCenterClientHandler getNodeCenterHandler() {
return nodeCenterClientHandler;
}
}

View File

@ -11,11 +11,14 @@ import org.bdware.sc.conn.ByteUtil;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.node.AnnotationNode;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.ControllerManager;
import org.bdware.server.GRPCPool;
import org.bdware.server.GlobalConf;
import org.bdware.server.action.p2p.MasterClientTCPAction;
import org.bdware.server.trustedmodel.MasterProxy;
import org.bdware.server.trustedmodel.AgentManager;
import org.bdware.server.trustedmodel.KillUnitContractInfo;
import org.bdware.server.ws.ContractManagerFrameHandler;
import org.bdware.units.NetworkManager;
import org.bdware.units.function.CommunicationManager;
@ -34,9 +37,9 @@ import java.util.*;
public class CMActions implements OnHashCallback {
private static final String PARAM_ACTION = "action";
private static final JsonObject MISSING_ARGUMENT = JsonUtil.parseString("{\"action\":\"onExecuteResult\",\"executeTime\":-1,"
private static final JsonObject MISSING_ARGUMENT = JsonUtil.parseStringAsJsonObject("{\"action\":\"onExecuteResult\",\"executeTime\":-1,"
+ "\"status\":\"Error\",\"result\":\"missing arguments\"}");
private static final JsonObject INVALID_DOI = JsonUtil.parseString(
private static final JsonObject INVALID_DOI = JsonUtil.parseStringAsJsonObject(
"{\"action\":\"onExecuteResult\",\"executeTime\":-1,"
+ "\"status\":\"Error\",\"result\":\"invalid contract doi\"}");
private static final Logger LOGGER = LogManager.getLogger(CMActions.class);
@ -58,7 +61,7 @@ public class CMActions implements OnHashCallback {
ContractManager.yjsPath = GlobalConf.instance.yjsPath;
ContractManager.dbPath = GlobalConf.getDBPath();
final ContractManager contractManager = new ContractManager();
contractManager.masterStub = new MasterProxy();
contractManager.masterStub = new AgentManager();
contractManager.chainOpener = GRPCPool.instance;
GRPCPool.logsDB = ContractManager.logsDB;
// expiredDate = Long.parseLong(GlobalConf.instance.licence);
@ -72,11 +75,11 @@ public class CMActions implements OnHashCallback {
for (ContractMeta meta : contractManager.statusRecorder.getStatus().values()) {
if (meta.getStatus() == ContractStatusEnum.RUNNING) {
contractIDS.add(meta.getID());
if (meta.contract.getType().needSeq())
contractManager.setContractIsMaster(meta.getID(), "false");
}
}
for (String id : contractIDS) {
contractManager.setContractIsMaster(id, "false");
}
LOGGER.info(
"reconnectDone! "
+ (System.currentTimeMillis() - start)
@ -776,7 +779,7 @@ public class CMActions implements OnHashCallback {
LOGGER.debug("[CMActions]启动后设置dump周期" + args.get("dumpPeriod").getAsString());
manager.changeDumpPeriod(c.getID(), args.get("dumpPeriod").getAsString());
}
ExecutionManager.instance.updateLocalContractToNodeCenter();
ControllerManager.getNodeCenterController().updateContract();
}
@Action(userPermission = 1L << 26, async = true)
@ -1158,17 +1161,17 @@ public class CMActions implements OnHashCallback {
public void connectTo(JsonObject args, ResultCallback resultCallback) {
String data;
if (!args.has("id")) {
ReplyUtil.simpleReply(resultCallback,"onConnectTo","missing contract id");
ReplyUtil.simpleReply(resultCallback, "onConnectTo", "missing contract id");
return;
}
String contractID = args.get("id").getAsString();
LOGGER.info("connectTo:" + contractID);
if (contractID == null) {
ReplyUtil.simpleReply(resultCallback,"onConnectTo","can't find contract id");
ReplyUtil.simpleReply(resultCallback, "onConnectTo", "can't find contract id");
return;
}
manager.redirect(contractID, createPS(), "");
ReplyUtil.simpleReply(resultCallback,"onConnectTo","success");
ReplyUtil.simpleReply(resultCallback, "onConnectTo", "success");
}
private PrintStream createPS() {
@ -1187,10 +1190,6 @@ public class CMActions implements OnHashCallback {
if (args.has("verifiedPubKey") && (args.has("id") || args.has("name"))) {
ContractRequest rc = new ContractRequest();
long s = System.currentTimeMillis();
// if (args.has("id"))
// rc.setContractID(args.get("id").getAsString());
// else
//System.out.println(args);
if (args.has("id")) {
// stop unit contract using contract name
rc.setContractID(args.get("id").getAsString());
@ -1220,10 +1219,10 @@ public class CMActions implements OnHashCallback {
return;
}
// TODO TOMERGE
if (cl.isUnit()) {
LOGGER.debug("killContractProcess : 集群合约 kill");
MasterClientTCPAction.killContract(s, cl, args, resultCallback);
killContractByMaster(rc.getContractID(), args, resultCallback);
} else {
String ret =
manager.stopContractWithOwner(
@ -1243,7 +1242,29 @@ public class CMActions implements OnHashCallback {
ExecutionManager.instance.updateLocalContractToNodeCenter();
}
} else {
ReplyUtil.simpleReply(resultCallback,"onKillContractProcess","Failed: Illegal parameters");
ReplyUtil.simpleReply(resultCallback, "onKillContractProcess", "Failed: Illegal parameters");
}
}
public static void killContractByMaster(
String contractID, JsonObject request, ResultCallback rc) {
LOGGER.info("[MasterClientTCPAction] killContract : ");
try {
MasterClientTCPAction.killUnitContractMap.put(
request.get("requestID").getAsString(),
new KillUnitContractInfo(rc, System.currentTimeMillis()));
MultiContractMeta mcm = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
NetworkManager.instance.sendToAgent(mcm.getMasterNode(), JsonUtil.toJson(request));
} catch (Exception e) {
e.printStackTrace();
} finally {
CMActions.manager.stopContractWithOwner(
request.get("verifiedPubKey").getAsString(), contractID);
ContractMeta meta = manager.statusRecorder.getContractMeta(contractID);
if (null != meta && meta.contractExecutor != null) {
//TODO why close?
// meta.contractExecutor.close();
}
}
}
@ -1503,14 +1524,14 @@ public class CMActions implements OnHashCallback {
e.printStackTrace();
}
ExecutionManager.instance.updateLocalContractToNodeCenter();
ReplyUtil.simpleReply(resultCallback,"onKillAllContract",sb.toString());
ReplyUtil.simpleReply(resultCallback, "onKillAllContract", sb.toString());
manager.stopAllContracts();
} else {
manager.stopAllContractsWithOwner(args.get(("verifiedPubKey")).getAsString());
ReplyUtil.simpleReply(resultCallback,"onKillAllContract","Success");
ReplyUtil.simpleReply(resultCallback, "onKillAllContract", "Success");
}
} else {
ReplyUtil.simpleReply(resultCallback,"onKillAllContract","Failed: Illegal user");
ReplyUtil.simpleReply(resultCallback, "onKillAllContract", "Failed: Illegal user");
}
}
@ -1646,15 +1667,15 @@ public class CMActions implements OnHashCallback {
peerID = jsonPeer.get("peerID").getAsString();
ipPort = jsonPeer.get("ipPort").getAsString();
// NetworkManager.instance.peerID2TCPAddress.put(peer, ipPort);
try {
NetworkManager.instance.createTCPClient(peerID, ipPort);
} catch (InterruptedException e) {
e.printStackTrace();
JsonObject ret = new JsonObject();
ret.addProperty("status", "Error");
ret.addProperty("result", "Cannot form TCP connection to " + nodeName);
rc.onResult(ret);
}
// try {
// NetworkManager.instance.createTCPClient(peerID, ipPort);
// } catch (InterruptedException e) {
// e.printStackTrace();
// JsonObject ret = new JsonObject();
// ret.addProperty("status", "Error");
// ret.addProperty("result", "Cannot form TCP connection to " + nodeName);
// rc.onResult(ret);
// }
}
}
}

View File

@ -1,17 +1,16 @@
package org.bdware.server.action;
import com.google.gson.Gson;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.action.p2p.MasterClientTCPAction;
import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.trustedmodel.SlaveNode;
import org.bdware.units.NetworkManager;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
//UNUSED
//TO Merge
public class CheckPointCallback implements OnHashCallback {
public static Map<String, String> lastHash = new ConcurrentHashMap<>(); // contractID,hash
@ -26,12 +25,9 @@ public class CheckPointCallback implements OnHashCallback {
String sendStr = JsonUtil.toJson(reqStr);
MultiContractMeta info = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
for (String node : info.getMembers()) {
SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(node);
if (cmNode != null) {
System.out.println(
"发送请求给 " + cmNode.pubKey.substring(0, 5) + " 更新ledger中最新检查点的hash!");
cmNode.connection.sendMsg(sendStr);
}
NetworkManager.instance.sendToAgent(node, sendStr);
System.out.println(
"发送请求给 " + node.substring(0, 5) + " 更新ledger中最新检查点的hash!");
}
}
}

View File

@ -28,6 +28,7 @@ import org.bdware.sc.db.TimeDBUtil;
import org.bdware.sc.node.ContractManifest;
import org.bdware.sc.util.FileUtil;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.ControllerManager;
import org.bdware.server.GlobalConf;
import org.bdware.server.doip.ContractRepositoryMain;
import org.bdware.server.http.URIPath;
@ -1631,8 +1632,7 @@ public class FileActions {
json.addProperty("distributeID", reqID);
LOGGER.debug("[FileActions] distributeContract : ");
LOGGER.debug(JsonUtil.toJson(json));
NetworkManager.instance.getNCClientHandler().distributeReqMap.put(reqID, resultCallback);
NetworkManager.instance.sendToNodeCenter(JsonUtil.toJson(json));
ControllerManager.getNodeCenterController().distributeContract(reqID,resultCallback,json);
}
static class ListProjectResp {

View File

@ -10,6 +10,7 @@ import org.bdware.sc.db.KeyValueDBUtil;
import org.bdware.sc.encrypt.HardwareInfo;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.CongestionControl;
import org.bdware.server.ControllerManager;
import org.bdware.server.GRPCPool;
import org.bdware.server.GlobalConf;
import org.bdware.server.permission.Role;
@ -123,7 +124,7 @@ public class ManagerActions {
@Action(userPermission = 1L << 11)
public void listNodeInfos(JsonObject args, ResultCallback resultCallback) {
NetworkManager.instance.getNCClientHandler().controller.getNodeInfos(
ControllerManager.getNodeCenterController().getNodeInfos(
new ResultCallback() {
@Override
public void onResult(String str) {
@ -134,7 +135,7 @@ public class ManagerActions {
@Action(userPermission = 1L << 26)
public void updateContract(JsonObject args, ResultCallback resultCallback) {
NetworkManager.instance.getNCClientHandler().controller.listCMInfo();
ControllerManager.getNodeCenterController().listCMInfo();
resultCallback.onResult("{\"data\":\"" + System.currentTimeMillis() + "\"}");
}
@ -151,7 +152,7 @@ public class ManagerActions {
data.put("isLAN", String.valueOf(GlobalConf.isLAN()));
data.put("peerID", GlobalConf.instance.peerID);
data.put("bdledger", GlobalConf.instance.datachainConf.replace("\n", " "));
data.put("clusterConnected", String.valueOf(NetworkManager.instance.getNCClientHandler().isConnected()));
data.put("clusterConnected", String.valueOf(NetworkManager.instance.isConnectedToNodeCenter()));
data.put("nodePubKey", GlobalConf.instance.keyPair.getPublicKeyStr());
data.put("masterAddress", GlobalConf.instance.masterAddress);
ReplyUtil.replyWithStatus(resultCallback, "onLoadNodeConfig", true, data);

View File

@ -21,7 +21,6 @@ import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.trustedmodel.MultiPointContractInfo;
import org.bdware.server.trustedmodel.MultiPointCooperateContractInfo;
import org.bdware.server.trustedmodel.ResultCollector;
import org.bdware.server.trustedmodel.SlaveNode;
import org.bdware.units.NetworkManager;
import org.zz.gmhelper.SM2KeyPair;
import org.zz.gmhelper.SM2Util;
@ -42,7 +41,31 @@ public class MasterWSAction {
public static boolean hostMaster(String contractID) {
return CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).isMaster();
}
private boolean waitForConnection(Set<String> nodeNames) {
LOGGER.info("waitForAllNodes:" + JsonUtil.toJson(nodeNames));
for (int i = 0; i < 5; i++) {
boolean all = true;
for (String str : nodeNames) {
if (!NetworkManager.instance.hasAgentConnection(str)) {
all = false;
break;
}
}
if (!all) {
synchronized (MasterServerTCPAction.sync) { // TODO why?
try {
MasterServerTCPAction.sync.wait(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} else {
return true;
}
}
LOGGER.info("waitForAllNodes return false;");
return false;
}
@Action(async = true, userPermission = 1L << 26)
// zyx modified
public void startContractMultiPoint(JsonObject args, final ResultCallback rc) {
@ -53,7 +76,6 @@ public class MasterWSAction {
contract.setType(ContractExecType.getContractTypeByInt(args.get("type").getAsInt()));
}
// this multiPointContractInfo problem
// this multiPointContractInfo problem
MultiPointContractInfo multiPointContractInfo = new MultiPointContractInfo();
if (contract.getType() == ContractExecType.Sharding) {
@ -166,6 +188,7 @@ public class MasterWSAction {
int nodeSize = nodeNames.size();
// 方式一向NodeCenter发要求Slave节点主动连接到Master节点.
Map<String, Object> requestConnect = new HashMap<>();
requestConnect.put("action", "requestConnectToMaster");
LOGGER.debug(multiPointContractInfo.masterNode);
@ -175,12 +198,8 @@ public class MasterWSAction {
requestConnect.put("connectAll", true);
}
NetworkManager.instance.sendToNodeCenter(JsonUtil.toJson(requestConnect)); // 向NC发
waitForConnection(nodeNames);
LOGGER.debug(JsonUtil.toPrettyJson(requestConnect));
boolean isSuccess = waitForConnection(nodeNames);
if (!isSuccess) {
return;
}
ContractManager.threadPool.execute(
() -> {
// 多点合约更新repository信息
@ -265,23 +284,13 @@ public class MasterWSAction {
startReq = JsonUtil.toJson(request);
LOGGER.debug("start contract: " + startReq);
}
SlaveNode node;
node = MasterServerTCPAction.id2Slaves.get(nodeID); // slave node信息
node.connection.sendMsg(startReq);
NetworkManager.instance.sendToAgent(nodeID, startReq);
if (!MasterServerRecoverMechAction.recoverStatus.containsKey(nodeID)) {
MasterServerRecoverMechAction.recoverStatus.put(nodeID, new ConcurrentHashMap<>());
}
MasterServerRecoverMechAction.recoverStatus
.get(nodeID)
.put(contractID, RecoverFlag.Fine);
// if (!MasterServerTCPAction.recoverMap.containsKey(nodeID)) {
// MasterServerTCPAction.recoverMap.put(nodeID, new
// ConcurrentHashMap<>());
// }
// ContractRecord record = new ContractRecord(contract.getID());
// MasterServerTCPAction.recoverMap.get(nodeID).put(contract.getID(),
// record);
}
rc.onResult(
"{\"status\":\"Success\",\"result\":\""
@ -291,35 +300,4 @@ public class MasterWSAction {
LOGGER.info("success!");
}
private boolean waitForConnection(Collection<String> nodeNames) {
LOGGER.info("waitForAllNodes:" + JsonUtil.toJson(nodeNames));
for (int i = 0; i < 5; i++) {
boolean all = true;
for (String str : nodeNames) {
if (!MasterServerTCPAction.id2Slaves.containsKey(str)) {
all = false;
break;
}
}
if (!all) {
synchronized (MasterServerTCPAction.sync) { // TODO why?
try {
MasterServerTCPAction.sync.wait(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} else {
return true;
}
}
LOGGER.info("waitForAllNodes return false;");
return false;
}
@Action(async = true, userPermission = 1L << 22)
public void listSlaves(JsonObject jo, ResultCallback cb) {
cb.onResult(JsonUtil.toJson(MasterServerTCPAction.id2Slaves));
}
}

View File

@ -0,0 +1,164 @@
package org.bdware.server.action.p2p;
import com.google.gson.JsonObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.heartbeat.HeartBeatUtil;
import org.bdware.sc.MasterElectTimeRecorder;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.GlobalConf;
import org.bdware.server.action.Action;
import org.bdware.server.action.CMActions;
import org.bdware.units.NetworkManager;
import org.zz.gmhelper.SM2KeyPair;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.TimerTask;
public class AliveCheckClientAction {
private static final Logger LOGGER = LogManager.getLogger(AliveCheckClientAction.class);
protected boolean waitForSetNode = false;
public static final int sendDelay = 2000;
public static final int checkDelay = 5000;
private final String masterPubkey;
TimerTask sendPingTask;
TimerTask checkAliveTask;
private long lastMasterPongTime = System.currentTimeMillis();
public AliveCheckClientAction(String masterPubkey) {
this.masterPubkey = masterPubkey;
}
@Action(async = true)
public void checkMasterAlive(JsonObject jo, ResultCallback result) {
if (masterPubkey != null) {
LOGGER.info("start heartbeat to " + masterPubkey.substring(0, 5));
waitForSetNode = true;
checkMasterAlive(result);
}
}
// 关闭和master的连接
public void closeMaster() {
if (checkAliveTask != null) {
HeartBeatUtil.getInstance().cancel(checkAliveTask);
checkAliveTask = null;
}
if (sendPingTask != null) {
HeartBeatUtil.getInstance().cancel(sendPingTask);
sendPingTask = null;
}
NetworkManager.instance.closeAgent(masterPubkey);
}
public void checkMasterAlive(ResultCallback rc) {
if (sendPingTask == null)
sendPingTask =
new TimerTask() {
@Override
public void run() {
try {
LOGGER.debug(
String.format(
"f %s",
new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss")
.format(System.currentTimeMillis())));
Map<String, String> ping = new HashMap<>();
ping.put("action", "masterPing");
rc.onResult(JsonUtil.toJson(ping));
} catch (Throwable t) {
t.printStackTrace();
}
}
};
if (null == checkAliveTask)
checkAliveTask =
new TimerTask() {
@Override
public void run() {
try {
run1();
} catch (Throwable t) {
t.printStackTrace();
HeartBeatUtil.getInstance().cancel(this);
}
}
public void run1() {
long cur = System.currentTimeMillis();
if (cur - lastMasterPongTime >= (2 * sendDelay)) {
LOGGER.info(
"lastMasterPongTime="
+ new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss")
.format(lastMasterPongTime)
+ " 认为master崩溃!");
// 向NC发通知重新选举master如果NC没有收到所有节点的重选请求就认为是这个节点和master连接断开
// 这个节点需要重连master
Map<String, String> request = new HashMap<>();
request.put("action", "electMaster");
SM2KeyPair keyPair = GlobalConf.instance.keyPair;
request.put("nodeID", keyPair.getPublicKeyStr());
for (MultiContractMeta meta : CMActions.manager.multiContractRecorder.getStatus().values()) {
if (meta.getMasterNode().equals(masterPubkey)) {
LOGGER.info(
"认为合约 "
+ meta.getContractID()
+ " 的master崩溃 master="
+ masterPubkey);
request.put("contractID", meta.getContractID());
int lastSeq = meta.getLastExeSeq();
request.put("lastExe", lastSeq + "");
request.put("master", masterPubkey);
String[] members =
meta.getMembers();
request.put("members", StringUtils.join(members, ","));
NetworkManager.instance.sendToNodeCenter(
JsonUtil.toJson(request));
MasterElectTimeRecorder.findMasterCrash =
System.currentTimeMillis();
}
}
closeMaster();
} // if
}
};
lastMasterPongTime = System.currentTimeMillis();
LOGGER.info(
"设置 lastMasterPongTime="
+ new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss").format(lastMasterPongTime));
HeartBeatUtil.getInstance().schedule(sendPingTask, sendDelay / 2, checkDelay);
HeartBeatUtil.getInstance().schedule(checkAliveTask, sendDelay, checkDelay);
}
@Action(async = true)
public void masterPong(JsonObject jo, ResultCallback result) {
lastMasterPongTime = System.currentTimeMillis();
}
public void waitForSetNodeID() {
for (int i = 0; i < 100; i++) {
if (waitForSetNode) return;
try {
Thread.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void init() {
LOGGER.info("init nodeID:");
assert masterPubkey != null;
NetworkManager.instance.sendToAgent(masterPubkey, "{\"action\":\"setNodeInfo\",\"pubKey\":\""
+ GlobalConf.instance.keyPair.getPublicKeyStr()
+ "\"}");
}
}

View File

@ -0,0 +1,110 @@
package org.bdware.server.action.p2p;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.heartbeat.HeartBeatUtil;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.RecoverFlag;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.action.Action;
import org.bdware.server.tcp.TCPServerFrameHandler;
import org.bdware.units.NetworkManager;
import java.text.SimpleDateFormat;
import java.util.*;
public class AliveCheckServerAction {
private static final Logger LOGGER = LogManager.getLogger(AliveCheckServerAction.class);
private final TCPServerFrameHandler handler;
TimerTask checkAliveTask;
private long lastSlavePingTime;
String pubKey;
public AliveCheckServerAction(TCPServerFrameHandler handler) {
lastSlavePingTime =
System.currentTimeMillis()
+ 5000L
+ AliveCheckClientAction.sendDelay
+ AliveCheckClientAction.checkDelay;
checkAliveTask = new HeartBeatTask(AliveCheckClientAction.sendDelay);
HeartBeatUtil.getInstance()
.schedule(
checkAliveTask,
AliveCheckClientAction.sendDelay,
AliveCheckClientAction.checkDelay);
this.handler = handler;
}
private class HeartBeatTask extends TimerTask {
int delay;
HeartBeatTask(int delay) {
this.delay = delay;
}
@Override
public void run() {
try {
long cur = System.currentTimeMillis();
if (cur - lastSlavePingTime >= (2L * delay)) {
LOGGER.info(
new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss").format(lastSlavePingTime)
+ " "
+ delay);
Set<String> contracts = new HashSet<>();
String nodeID = pubKey;
if (nodeID == null) {
LOGGER.info("nodeID == null " + this);
HeartBeatUtil.getInstance().cancel(this);
return;
}
LOGGER.info(
"Master心跳机制发现节点 "
+ nodeID.substring(0, 5)
+ " 下线! "
+ this.toString());
HeartBeatUtil.getInstance().cancel(this);
for (String contractID :
MasterServerRecoverMechAction.recoverStatus.get(nodeID).keySet()) {
// RecoverFlag flag =
//
// MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID);
// if (flag == RecoverFlag.Fine)
MasterServerRecoverMechAction.recoverStatus
.get(nodeID)
.put(contractID, RecoverFlag.ToRecover);
contracts.add(contractID);
MasterServerTCPAction.notifyNodeOffline(contractID, nodeID);
}
for (String contractID : contracts) {
MasterServerRecoverMechAction.unitModeCheck(contractID);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Action(async = true)
public void setNodeInfo(JsonObject jo, ResultCallback cb) {
LOGGER.info("[MasterServerTCPAction] setNodeInfo : ");
String nodeID = jo.get("pubKey").getAsString();
pubKey = nodeID;
NetworkManager.instance.registerConnection(nodeID, handler);
LOGGER.info("[MasterServerTCPAction] setNodeInfo id2Slaves put " + nodeID.substring(0, 5));
Map<String, String> request = new HashMap<>();
request.put("action", "checkMasterAlive");
cb.onResult(JsonUtil.toJson(request));
}
@Action(async = true)
public void masterPing(JsonObject args, ResultCallback cb) {
lastSlavePingTime = System.currentTimeMillis();
Map<String, String> request = new HashMap<>();
request.put("action", "masterPong");
cb.onResult(JsonUtil.toJson(request));
}
}

View File

@ -1,7 +1,6 @@
package org.bdware.server.action.p2p;
import com.google.gson.JsonObject;
import io.netty.util.internal.ConcurrentSet;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.ContractClient;
@ -20,32 +19,29 @@ import org.bdware.server.action.Action;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.RequestToMaster;
import org.bdware.server.tcp.TCPClientFrameHandler;
import org.bdware.server.trustedmodel.AgentManager;
import org.bdware.server.trustedmodel.ContractUnitStatus;
import org.bdware.server.trustedmodel.MasterProxy;
import org.bdware.units.function.ExecutionManager;
import org.zz.gmhelper.SM2KeyPair;
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class MasterClientRecoverMechAction {
private static final Logger LOGGER = LogManager.getLogger(MasterClientRecoverMechAction.class);
public static ConcurrentSet<String> recoverSet =
new ConcurrentSet<>(); // contracts which don't finish recoverRestart
public static Set<String> recoverSet =
new ConcurrentSkipListSet<>(); // contracts which don't finish recoverRestart
public static Map<String, Queue<RequestToMaster>>
requestsToMaster; // when master is re-electing,client node cache ites received requests
private final TCPClientFrameHandler handler;
private final MasterClientTCPAction clientAction;
private final Map<String, OutputStream> stateFileMap = new HashMap<>();
private final Map<String, OutputStream> transFileMap = new HashMap<>();
public MasterClientRecoverMechAction(
TCPClientFrameHandler handler, MasterClientTCPAction action) {
this.handler = handler;
clientAction = action;
public MasterClientRecoverMechAction() {
}
// 告知master自己需要恢复携带之前的运行模式如果是StableMode需要携带lastExeSeq信息
@ -58,7 +54,7 @@ public class MasterClientRecoverMechAction {
+ contractID
+ " masterID="
+ masterID.substring(0, 5));
TCPClientFrameHandler master = MasterProxy.getHandler(masterID);
TCPClientFrameHandler master = AgentManager.getHandler(masterID);
Map<String, String> ret = new HashMap<String, String>();
ret.put("action", "askForRecover");
@ -191,7 +187,7 @@ public class MasterClientRecoverMechAction {
e.printStackTrace();
}
LOGGER.info("slave receive last state package,isDone=true");
recoverFromCommonMode(GlobalConf.instance.projectDir + "/stateFiles/" + fileName);
recoverFromCommonMode(GlobalConf.instance.projectDir + "/stateFiles/" + fileName, rc);
}
}
@ -199,14 +195,14 @@ public class MasterClientRecoverMechAction {
public void recoverFromCommonMode(JsonObject args, final ResultCallback rc) {
String path = args.get("filePath").getAsString();
if (args.has("isMaster")) {
recoverFromCommonMode(path, args.get("isMaster").getAsBoolean());
recoverFromCommonMode(path, args.get("isMaster").getAsBoolean(), rc);
} else {
recoverFromCommonMode(path);
recoverFromCommonMode(path, rc);
}
}
public void recoverFromCommonMode(String path) {
recoverFromCommonMode(path, false);
public void recoverFromCommonMode(String path, ResultCallback rc) {
recoverFromCommonMode(path, false, rc);
}
// //无状态合约同步lastExeSeq之后发送恢复完成
@ -232,7 +228,7 @@ public class MasterClientRecoverMechAction {
// }
// 普通节点从CommonMode恢复
public void recoverFromCommonMode(String path, boolean isMaster) {
public void recoverFromCommonMode(String path, boolean isMaster, ResultCallback rc) {
LOGGER.info("recoverFromCommonMode");
RecoverMechTimeRecorder.startCommonRecover = System.currentTimeMillis();
@ -286,10 +282,10 @@ public class MasterClientRecoverMechAction {
// CMActions.manager.dumpContract(cei.getContractID(), ""));
// not in the recovering state, send msd to NC,turn to fine state
sendRecoverFinish(cei.getContractID(), "common");
sendRecoverFinish(cei.getContractID(), "common", rc);
LOGGER.info("恢复之后处理请求队列");
clientAction.dealRequests(cei.getContractID());
MasterClientTCPAction.dealRequests(cei.getContractID());
}
// 检查是否有该合约进行并进行恢复
@ -355,7 +351,7 @@ public class MasterClientRecoverMechAction {
CMTables.UnitContracts.toString(), cei.getContractID(), "exist");
}
CMActions.manager.multiContractRecorder.updateValue(cei);
MasterClientTCPAction.contractID2MasterInfo.put(contractID, clientAction);
//MasterClientTCPAction.contractID2MasterInfo.put(contractID, clientAction);
CMActions.manager.setContractIsMaster(contractID, "false");
} else {
LOGGER.info("[MasterClientRecoverMechAction] 恢复节点没有这个合约进程 : ");
@ -377,7 +373,7 @@ public class MasterClientRecoverMechAction {
CMTables.UnitContracts.toString(), cei.getContractID(), "exist");
}
CMActions.manager.multiContractRecorder.updateValue(cei);
MasterClientTCPAction.contractID2MasterInfo.put(contractID, clientAction);
//MasterClientTCPAction.contractID2MasterInfo.put(contractID, clientAction);
parPath = GlobalConf.instance.publicCompiledDir + "/" + cei.getYpkName();
if (cei.isPrivate()) {
@ -419,10 +415,10 @@ public class MasterClientRecoverMechAction {
@Action(async = true)
public void sendRecoverFinish(JsonObject args, final ResultCallback rc) {
sendRecoverFinish(args.get("contractID").getAsString(), null);
sendRecoverFinish(args.get("contractID").getAsString(), null, rc);
}
public void sendRecoverFinish(String contractID, String method) {
public void sendRecoverFinish(String contractID, String method, ResultCallback rc) {
LOGGER.info("恢复步骤-----------6 发送恢复完成");
MasterClientRecoverMechAction.recoverSet.remove(contractID); // 本地认为恢复结束
Map<String, String> ret = new HashMap<String, String>();
@ -433,8 +429,7 @@ public class MasterClientRecoverMechAction {
if (method != null) {
ret.put("method", method);
}
handler.sendMsg(JsonUtil.toJson(ret));
rc.onResult(JsonUtil.toJson(ret));
// 更新
ExecutionManager.instance.updateLocalContractToNodeCenter();
}
@ -501,7 +496,7 @@ public class MasterClientRecoverMechAction {
req.put("isDone", true + "");
}
LOGGER.info("len=" + len + " index=" + index);
handler.sendMsg(JsonUtil.toJson(req));
rc.onResult(JsonUtil.toJson(req));
index++;
req.put("isAppend", "true");
@ -565,7 +560,7 @@ public class MasterClientRecoverMechAction {
SM2KeyPair keyPair = GlobalConf.instance.keyPair;
ret.put("nodeID", keyPair.getPublicKeyStr());
ret.put("contractID", contractID);
handler.sendMsg(JsonUtil.toJson(ret));
result.onResult(JsonUtil.toJson(ret));
return;
}
@ -657,13 +652,13 @@ public class MasterClientRecoverMechAction {
request.put("contractID", contractID);
SM2KeyPair keyPair = GlobalConf.instance.keyPair;
request.put("nodeID", keyPair.getPublicKeyStr());
handler.sendMsg(JsonUtil.toJson(request));
result.onResult(JsonUtil.toJson(request));
} else {
sendRecoverFinish(contractID, "stable");
sendRecoverFinish(contractID, "stable", result);
}
}
public void dealTransRecords(String contractID, String path, int lastSeq) {
public void dealTransRecords(String contractID, String path, int lastSeq, ResultCallback rc) {
LOGGER.info("[MasterClientRecoverMechAction] dealTransRecords : lastSeq= " + lastSeq);
if (!MasterClientRecoverMechAction.recoverSet.contains(contractID)) {
@ -686,7 +681,7 @@ public class MasterClientRecoverMechAction {
// String m2 = CMActions.manager.dumpContract(contractID, "");
// logger.info("通过master的transRecods恢复之后状态为 \n" + m2);
sendRecoverFinish(contractID, "stable");
sendRecoverFinish(contractID, "stable", rc);
if (file.isFile() && file.exists()) {
// file.delete();
}
@ -785,7 +780,7 @@ public class MasterClientRecoverMechAction {
}
String contractID = args.get("contractID").getAsString();
int last = args.get("last").getAsInt();
dealTransRecords(contractID, dir.getAbsolutePath(), last);
dealTransRecords(contractID, dir.getAbsolutePath(), last, rc);
}
} catch (IOException e) {
e.printStackTrace();

View File

@ -5,7 +5,6 @@ import com.google.gson.JsonPrimitive;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.heartbeat.HeartBeatUtil;
import org.bdware.sc.*;
import org.bdware.sc.bean.Contract;
import org.bdware.sc.bean.ContractExecType;
@ -21,77 +20,27 @@ import org.bdware.server.executor.RequestAllExecutor;
import org.bdware.server.executor.unconsistency.MultiPointCooperationExecutor;
import org.bdware.server.executor.unconsistency.RequestOnceExecutor;
import org.bdware.server.executor.unconsistency.ResponseOnceExecutor;
import org.bdware.server.tcp.TCPClientFrameHandler;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.KillUnitContractInfo;
import org.bdware.server.trustedmodel.MasterProxy;
import org.bdware.server.trustedmodel.SelfAdaptiveShardingExecutor;
import org.bdware.server.trustedmodel.SingleNodeExecutor;
import org.bdware.units.NetworkManager;
import org.bdware.units.function.ExecutionManager;
import org.zz.gmhelper.SM2KeyPair;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
public class MasterClientTCPAction {
public static final int sendDelay = 2000;
public static final int checkDelay = 5000;
private static final Logger LOGGER = LogManager.getLogger(MasterClientTCPAction.class);
public static Map<String, MasterClientTCPAction> contractID2MasterInfo =
new ConcurrentHashMap<>();
static Map<String, KillUnitContractInfo> killUnitContractMap = new ConcurrentHashMap<>();
private final TCPClientFrameHandler handler;
private final String master;
TimerTask sendPingTask;
TimerTask checkAliveTask;
TCPClientFrameHandler controller;
private long lastMasterPongTime = System.currentTimeMillis();
private boolean waitForSetNode = false;
public MasterClientTCPAction(TCPClientFrameHandler handler, String master) {
this.handler = handler;
this.master = master;
}
public static void killContract(
long startTime, ContractClient client, JsonObject request, ResultCallback rc) {
LOGGER.info("[MasterClientTCPAction] killContract : ");
killUnitContractMap.put(
request.get("requestID").getAsString(),
new KillUnitContractInfo(rc, System.currentTimeMillis()));
// 向master发送信息停止集群中所有该合约实例
MasterClientTCPAction mcta = contractID2MasterInfo.get(client.getContractID());
if (mcta == null) mcta = contractID2MasterInfo.get(client.getContractName());
// FIXME
// 重连后kill报错Caused by: java.lang.NullPointerException
// at
// org.bdware.server.action.MasterClientTCPAction.killContract(MasterClientTCPAction.java:60)
try {
mcta.handler.sendMsg(JsonUtil.toJson(request));
} catch (Exception e) {
e.printStackTrace();
} finally {
CMActions.manager.stopContractWithOwner(
request.get("verifiedPubKey").getAsString(), client.getContractID());
MultiContractMeta cei =
CMActions.manager.multiContractRecorder.getMultiContractMeta(client.getContractID());
if (null != cei) {
cei.contractExecutor.close();
}
}
}
public static Map<String, KillUnitContractInfo> killUnitContractMap = new ConcurrentHashMap<>();
// NC发现master为null让各个节点重选
public static void NCStartElect(ContractClient cc, String uniNumber) {
@ -103,14 +52,14 @@ public class MasterClientTCPAction {
SM2KeyPair keyPair = GlobalConf.instance.keyPair;
request.put("nodeID", keyPair.getPublicKeyStr());
if (!contractID2MasterInfo.containsKey(contractID)) {
contractID = cc.getContractID();
}
MasterClientTCPAction mcta = contractID2MasterInfo.get(contractID);
if (mcta != null) {
mcta.closeMaster();
}
// if (!contractID2MasterInfo.containsKey(contractID)) {
// contractID = cc.getContractID();
// }
//
// MasterClientTCPAction mcta = contractID2MasterInfo.get(contractID);
// if (mcta != null) {
// mcta.closeMaster();
// }
request.put("contractID", contractID);
int lastSeq =
@ -173,164 +122,23 @@ public class MasterClientTCPAction {
return executor;
}
@Action(async = true)
public void checkMasterAlive(JsonObject jo, ResultCallback result) {
LOGGER.info("start heartbeat to " + master.substring(0, 5));
waitForSetNode = true;
checkMasterAlive();
}
// 关闭和master的连接
public void closeMaster() {
if (checkAliveTask != null) {
HeartBeatUtil.getInstance().cancel(checkAliveTask);
checkAliveTask = null;
}
if (sendPingTask != null) {
HeartBeatUtil.getInstance().cancel(sendPingTask);
sendPingTask = null;
}
if (handler != null) {
try {
handler.close();
} catch (Exception e) {
e.printStackTrace();
}
}
MasterProxy.CONNECTORS.remove(master);
}
public void checkMasterAlive() {
if (sendPingTask == null)
sendPingTask =
new TimerTask() {
@Override
public void run() {
try {
run1();
} catch (Throwable t) {
t.printStackTrace();
// this.cancel();
}
}
public void run1() {
LOGGER.debug(
String.format(
"f %s",
new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss")
.format(System.currentTimeMillis())));
Map<String, String> ping = new HashMap<>();
ping.put("action", "masterPing");
handler.sendMsg(JsonUtil.toJson(ping));
}
};
if (null == checkAliveTask)
checkAliveTask =
new TimerTask() {
@Override
public void run() {
try {
run1();
} catch (Throwable t) {
t.printStackTrace();
HeartBeatUtil.getInstance().cancel(this);
}
}
public void run1() {
long cur = System.currentTimeMillis();
if (cur - lastMasterPongTime >= (2 * sendDelay)) {
LOGGER.info(
"lastMasterPongTime="
+ new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss")
.format(lastMasterPongTime)
+ " 认为master崩溃!");
// 向NC发通知重新选举master如果NC没有收到所有节点的重选请求就认为是这个节点和master连接断开
// 这个节点需要重连master
Map<String, String> request = new HashMap<>();
request.put("action", "electMaster");
SM2KeyPair keyPair = GlobalConf.instance.keyPair;
request.put("nodeID", keyPair.getPublicKeyStr());
for (String contractID : contractID2MasterInfo.keySet()) {
if (contractID2MasterInfo.get(contractID).master.equals(master)
&& KeyValueDBUtil.instance.containsKey(
CMTables.UnitContracts.toString(), contractID)) {
LOGGER.info(
"认为合约 "
+ contractID
+ " 的master崩溃 master1="
+ contractID2MasterInfo.get(contractID)
.master
+ " master2="
+ master);
request.put("contractID", contractID);
int lastSeq =
CMActions.manager
.multiContractRecorder
.getMultiContractMeta(contractID)
.getLastExeSeq();
request.put("lastExe", lastSeq + "");
request.put("master", master);
String[] members =
CMActions.manager
.multiContractRecorder
.getMultiContractMeta(contractID)
.getMembers();
request.put("members", StringUtils.join(members, ","));
NetworkManager.instance.sendToNodeCenter(
JsonUtil.toJson(request));
LOGGER.info(
"认为合约 "
+ contractID
+ " 的master崩溃 当前master为 "
+ master.substring(0, 5)
+ " 向NC发送重选信息");
MasterElectTimeRecorder.findMasterCrash =
System.currentTimeMillis();
}
}
closeMaster();
} // if
}
};
lastMasterPongTime = System.currentTimeMillis();
LOGGER.info(
"设置 lastMasterPongTime="
+ new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss").format(lastMasterPongTime));
HeartBeatUtil.getInstance().schedule(sendPingTask, 1000, checkDelay);
HeartBeatUtil.getInstance().schedule(checkAliveTask, sendDelay, checkDelay);
// HeartBeatUtil.getInstance().schedule(checkAliveTask, 10000, checkDealy);
}
// @Action(async = true)
// public void onKillContractProcess(JsonObject jo, ResultCallback result) {
// logger.info("[MasterClientTCPAction] : onKillContractProcess");
// if (killUnitContractMap.containsKey(jo.get("requestID").getAsString())) {
// KillUnitContractInfo info =
// killUnitContractMap.get(jo.get("requestID").getAsString());
// Map<String, Object> r = new HashMap<>();
// r.put("action", "onKillContractProcess");
// r.put("data", jo.get("data"));
// r.put("executeTime", System.currentTimeMillis() - info.startTime);
// logger.info("[MasterClientTCPAction] onKillContractProcess : " + jo.get("data"));
// info.rc.onResult(JsonUtil.toJson(r));
// }
// }
@Action(async = true)
public void masterPong(JsonObject jo, ResultCallback result) {
lastMasterPongTime = System.currentTimeMillis();
// logger.info(
// "设置 lastMasterPongTime="
// + new
// SimpleDateFormat("yyyy-MM-dd.HH:mm:ss").format(lastMasterPongTime));
public void onKillContractProcess(JsonObject jo, ResultCallback result) {
LOGGER.info("[MasterClientTCPAction] : onKillContractProcess");
if (killUnitContractMap.containsKey(jo.get("requestID").getAsString())) {
KillUnitContractInfo info =
killUnitContractMap.get(jo.get("requestID").getAsString());
Map<String, Object> r = new HashMap<>();
r.put("action", "onKillContractProcess");
r.put("data", jo.get("data"));
r.put("executeTime", System.currentTimeMillis() - info.startTime);
LOGGER.info("[MasterClientTCPAction] onKillContractProcess : " + jo.get("data"));
info.rc.onResult(JsonUtil.toJson(r));
}
}
// kill 本地的该集群合约实例
@Action(async = true)
public void killContractProcessAtSlave(JsonObject jo, ResultCallback result) {
@ -339,10 +147,7 @@ public class MasterClientTCPAction {
else id = jo.get("name").getAsString();
ContractClient cc = CMActions.manager.getClient(id);
// 清理
if (contractID2MasterInfo.containsKey(cc.getContractID())) {
contractID2MasterInfo.remove(cc.getContractID());
}
if (KeyValueDBUtil.instance.containsKey(
CMTables.LastExeSeq.toString(), cc.getContractID())) {
KeyValueDBUtil.instance.delete(CMTables.LastExeSeq.toString(), cc.getContractID());
@ -387,10 +192,10 @@ public class MasterClientTCPAction {
// 需要计算出自己的ShardingID路由规则id/requester/arg-->shardingId)
// 也在MultiPointCooperationExecutor中实现
}
contractID2MasterInfo.put(contractID, this); // 记录contractID master之间的对应关系
//TOODO master连接
// contractID2MasterInfo.put(contractID, this); // 记录contractID master之间的对应关系
MultiContractMeta cei =
CMActions.manager.multiContractRecorder.createIfNotExist(contractID);
cei.setLastExeSeq(-1);
if (!contract.getScriptStr().startsWith("/")) {
contract.setScript(dumpToDisk(contract, jo));
@ -434,7 +239,7 @@ public class MasterClientTCPAction {
}
// 这个地方判定从参数中的master数据 globalConf中的数据 进行对比如果一致的话说明该节点为master
cei.setMaster(jo.get("master").getAsString());
if (contract.getType() != ContractExecType.Sharding)
if (contract.getType().needSeq())
cei.setIsMaster(GlobalConf.getNodeID().equals(jo.get("master").getAsString()));
else {
cei.setIsMaster(true);
@ -469,11 +274,11 @@ public class MasterClientTCPAction {
resultMap.put("pubKey", GlobalConf.instance.keyPair.getPublicKeyStr());
result.onResult(JsonUtil.toJson(resultMap));
}
if (contract.getType() == ContractExecType.Sharding) {
for (String str : cei.getMembers()) {
NetworkManager.instance.getNCClientHandler().controller.connectToMaster(str, null);
}
}
// if (contract.getType() == ContractExecType.Sharding) {
// for (String str : cei.getMembers()) {
// NetworkManager.instance.getNCClientHandler().controller.connectToMaster(str, null);
// }
// }
}
private String dumpToDisk(Contract contract, JsonObject jo) {
@ -506,13 +311,14 @@ public class MasterClientTCPAction {
return "/" + scriptName + ".ypk";
}
//TODO 这个奇怪的action可以放到这个SelfAdaptiveSharding的相关类里
@Action(async = true)
public void deliverBlock(JsonObject jo, ResultCallback result) {
if (jo.has("contractID") && jo.has("data")) {
String contractID = jo.get("contractID").getAsString();
MultiContractMeta cei = ContractManager.instance.multiContractRecorder.getMultiContractMeta(contractID);
if (null != cei && cei.contractExecutor instanceof SelfAdaptiveShardingExecutor) {
((SelfAdaptiveShardingExecutor) cei.contractExecutor).execute(jo.get("data").getAsString());
ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID);
if (null != meta && meta.contractExecutor instanceof SelfAdaptiveShardingExecutor) {
((SelfAdaptiveShardingExecutor) meta.contractExecutor).execute(jo.get("data").getAsString());
}
}
}
@ -629,7 +435,7 @@ public class MasterClientTCPAction {
}
}
public void dealRequests(String contractID) {
public static void dealRequests(String contractID) {
LOGGER.info("dealRequests");
// If still in recovering state,don't dealRequests now
@ -637,12 +443,9 @@ public class MasterClientTCPAction {
LOGGER.info("本地还没恢复完成,不执行请求!");
return;
}
// try {
// Thread.sleep(25000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
MultiContractMeta cei;
synchronized (cei = CMActions.manager.multiContractRecorder.createIfNotExist(contractID)) {
while (!cei.queue.isEmpty()) {
ContractRequest request = cei.queue.peek();
@ -762,29 +565,16 @@ public class MasterClientTCPAction {
ret.put("nodeID", keyPair.getPublicKeyStr());
ret.put("start", cei.getLastExeSeq() + "");
ret.put("end", request.seq + ""); // 左开右开区间
handler.sendMsg(JsonUtil.toJson(ret));
NetworkManager.instance.sendToAgent(cei.getMasterNode(), JsonUtil.toJson(ret));
break;
}
}
}
}
public void init(TCPClientFrameHandler masterClientFrameHandler) {
LOGGER.info("[MasterClientTCPAction] init : ");
controller = masterClientFrameHandler;
controller.sendMsg(
"{\"action\":\"setNodeInfo\",\"pubKey\":\""
+ GlobalConf.instance.keyPair.getPublicKeyStr()
+ "\"}");
}
public void callMaster(ContractRequest request, ResultCallback resultCallback) {
LOGGER.info("[TODO CallMaster]");
}
@Action(async = true)
public void receiveContractExecution(JsonObject jsonObject, ResultCallback resultCallback) {
public void receiveContractExecutionServer(JsonObject jsonObject, ResultCallback resultCallback) {
MasterServerTCPAction.sync.wakeUp(
jsonObject.get("responseID").getAsString(), jsonObject.get("data").getAsString());
}
@ -844,36 +634,27 @@ public class MasterClientTCPAction {
jo);
}
public String requestContractExecution(ContractRequest c) {
try {
LOGGER.info("[MasterClientTCPAction] requestContractExecution " + JsonUtil.toJson(c));
Map<String, Object> req = new HashMap<>();
req.put("action", "requestContractExecution");
req.put("requestID", c.getRequestID());
req.put("data", c);
handler.sendMsg(JsonUtil.toJson(req));
// 这里可能出错是不是在这里校验CollectResult?
ContractResult str = MasterServerTCPAction.sync.syncSleep(c.getRequestID());
LOGGER.info("[RequestContractGet]" + JsonUtil.toJson(str));
return JsonUtil.toJson(str);
} catch (Exception e) {
ByteArrayOutputStream bo = new ByteArrayOutputStream();
e.printStackTrace(new PrintStream(bo));
ContractResult cr =
new ContractResult(
ContractResult.Status.Exception, new JsonPrimitive(bo.toString()));
return JsonUtil.toJson(cr);
}
}
// public String requestContractExecution(ContractRequest c) {
// try {
// LOGGER.info("[MasterClientTCPAction] requestContractExecution " + JsonUtil.toJson(c));
// Map<String, Object> req = new HashMap<>();
// req.put("action", "requestContractExecution");
// req.put("requestID", c.getRequestID());
// req.put("data", c);
// handler.sendMsg(JsonUtil.toJson(req));
// // 这里可能出错是不是在这里校验CollectResult?
// ContractResult str = MasterServerTCPAction.sync.syncSleep(c.getRequestID());
// LOGGER.info("[RequestContractGet]" + JsonUtil.toJson(str));
// return JsonUtil.toJson(str);
// } catch (Exception e) {
// ByteArrayOutputStream bo = new ByteArrayOutputStream();
// e.printStackTrace(new PrintStream(bo));
// ContractResult cr =
// new ContractResult(
// ContractResult.Status.Exception, new JsonPrimitive(bo.toString()));
// return JsonUtil.toJson(cr);
// }
// }
public void waitForSetNodeID() {
for (int i = 0; i < 100; i++) {
if (waitForSetNode) return;
try {
Thread.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

View File

@ -11,8 +11,7 @@ import org.bdware.sc.util.JsonUtil;
import org.bdware.server.GlobalConf;
import org.bdware.server.action.Action;
import org.bdware.server.action.CMActions;
import org.bdware.server.tcp.TCPClientFrameHandler;
import org.bdware.server.trustedmodel.MasterProxy;
import org.bdware.units.NetworkManager;
import java.io.*;
import java.util.HashMap;
@ -22,29 +21,23 @@ import java.util.zip.GZIPOutputStream;
public class MasterClientTransferAction {
private static final Logger LOGGER = LogManager.getLogger(MasterClientTransferAction.class);
private final String master;
TCPClientFrameHandler handler;
MasterClientTCPAction action;
public static MasterClientTransferAction instance = new MasterClientTransferAction();
private final Map<String, String> id2Memory = new HashMap<>();
public MasterClientTransferAction(TCPClientFrameHandler h, String pubKey, MasterClientTCPAction a) {
handler = h;
master = pubKey;
action = a;
private MasterClientTransferAction() {
}
public void transferInstance(String contractID) {
public void transferInstance(String agentPubkey, String contractID) {
LOGGER.info("transferInstance contractID=" + contractID);
//step2 save state
String mem = CMActions.manager.dumpContract(contractID, "");
id2Memory.put(contractID, mem);
//step3 send ypk or script and start other,kill local
sendAndStart(contractID);
sendAndStart(agentPubkey, contractID);
}
public void sendAndStart(String contractID) {
public void sendAndStart(String agentPubkey, String contractID) {
LOGGER.info("sendAndStart contractID=" + contractID);
ContractClient cc = CMActions.manager.getClient(contractID);
@ -67,7 +60,7 @@ public class MasterClientTransferAction {
req.put("script", script);
req.put("contractID", contractID);
LOGGER.info("transferByScript send last");
handler.sendMsg(JsonUtil.toJson(req));
NetworkManager.instance.sendToAgent(agentPubkey, JsonUtil.toJson(req));
} else {
req.put("action", "transferByYPK");
File file;
@ -101,7 +94,7 @@ public class MasterClientTransferAction {
String data = ByteUtil.encodeBASE64(buff, len);
req.put("data", data);
count += len;
handler.sendMsg(JsonUtil.toJson(req));
NetworkManager.instance.sendToAgent(agentPubkey, JsonUtil.toJson(req));
req.put("isAppend", "true");
Thread.sleep(300);
@ -112,7 +105,7 @@ public class MasterClientTransferAction {
req.remove("data");
req.put("contractID", contractID);
LOGGER.info("transferByYPK send last");
handler.sendMsg(JsonUtil.toJson(req));
NetworkManager.instance.sendToAgent(agentPubkey, JsonUtil.toJson(req));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
@ -128,13 +121,13 @@ public class MasterClientTransferAction {
LOGGER.info("onSendAndStart contractID=" + contractID);
String mem = id2Memory.get(contractID);
id2Memory.remove(contractID);
boolean res1 = sendMemory(contractID, mem);
boolean res1 = sendMemory(contractID, mem, result);
if (!res1) {
LOGGER.info(res1);
}
}
public boolean sendMemory(String contractID, String memory) {
public boolean sendMemory(String contractID, String memory, ResultCallback cb) {
LOGGER.info("sendMemory contractID=" + contractID);
String path = contractID + "_temp_stateFile_" + new Random().nextLong() + "_" + System.currentTimeMillis();
@ -171,7 +164,7 @@ public class MasterClientTransferAction {
String data = ByteUtil.encodeBASE64(buff, len);
req.put("data", data);
count += len;
handler.sendMsg(JsonUtil.toJson(req));
cb.onResult(JsonUtil.toJson(req));
req.put("isAppend", "true");
Thread.sleep(300);
@ -181,9 +174,7 @@ public class MasterClientTransferAction {
req.put("isDone", "true");
req.put("contractID", contractID);
req.remove("data");
handler.sendMsg(JsonUtil.toJson(req));
cb.onResult(JsonUtil.toJson(req));
//delete state file
if (file.isFile() && file.exists()) {
file.delete();
@ -200,14 +191,15 @@ public class MasterClientTransferAction {
@Action(async = true)
public void onSendMemory(JsonObject jo, ResultCallback result) {
//step5 close connect
LOGGER.info("onSendMemory");
if (handler != null) {
LOGGER.info("handler close");
action.closeMaster();
}
if (MasterProxy.CONNECTORS.containsKey(master)) {
MasterProxy.CONNECTORS.remove(master);
}
LOGGER.info("onSendMemory done");
// if (handler != null) {
// LOGGER.info("handler close");
// NetworkManager.instance.closeAgent(master);
//
// }
// if (NetworkManager.CONNECTORS.containsKey(master)) {
// NetworkManager.CONNECTORS.remove(master);
// }
LOGGER.info("transfer contract instance finished.");
}
}

View File

@ -16,10 +16,8 @@ import org.bdware.sc.util.JsonUtil;
import org.bdware.server.GlobalConf;
import org.bdware.server.action.Action;
import org.bdware.server.action.CMActions;
import org.bdware.server.tcp.TCPServerFrameHandler;
import org.bdware.server.trustedmodel.ContractUnitStatus;
import org.bdware.server.executor.RequestAllExecutor;
import org.bdware.server.trustedmodel.SlaveNode;
import org.bdware.server.trustedmodel.ContractUnitStatus;
import org.bdware.units.NetworkManager;
import org.zz.gmhelper.SM2KeyPair;
@ -31,11 +29,9 @@ import java.util.zip.GZIPOutputStream;
public class MasterServerRecoverMechAction {
private static final Logger LOGGER = LogManager.getLogger(MasterServerRecoverMechAction.class);
public static Map<String, Map<String, RecoverFlag>> recoverStatus = new ConcurrentHashMap<>();
private final TCPServerFrameHandler handler;
private Map<String, OutputStream> stateFileMap = new HashMap<>();
public MasterServerRecoverMechAction(TCPServerFrameHandler masterFrameHandler) {
handler = masterFrameHandler;
public MasterServerRecoverMechAction() {
}
// 从disk-durable恢复
@ -54,8 +50,7 @@ public class MasterServerRecoverMechAction {
int temp =
CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getCurSeqAtMaster();
request.put("unitLastExeSeq", temp + "");
SlaveNode node = MasterServerTCPAction.id2Slaves.get(nodeID);
node.connection.sendMsg(JsonUtil.toJson(request));
NetworkManager.instance.sendToAgent(nodeID, JsonUtil.toJson(request));
}
// 通过从别的节点loadMemory来恢复
@ -126,7 +121,7 @@ public class MasterServerRecoverMechAction {
number = (file.length() / unit) + 1;
}
byte[] buff = new byte[unit];
SlaveNode node = MasterServerTCPAction.id2Slaves.get(nodeID);
long times = file.length() / unit;
times++;
LOGGER.info("times=" + times);
@ -142,7 +137,7 @@ public class MasterServerRecoverMechAction {
req.put("isDone", true + "");
}
LOGGER.info("len=" + len + " index=" + index);
node.connection.sendMsg(JsonUtil.toJson(req));
NetworkManager.instance.sendToAgent(nodeID, JsonUtil.toJson(req));
index++;
req.put("isAppend", "true");
@ -177,7 +172,7 @@ public class MasterServerRecoverMechAction {
recoverStatus.get(masterID).put(contractID, RecoverFlag.Recovering);
// 找一个普通节点来dump
SlaveNode relyNode = null;
String relyNode = null;
for (String nodeID :
CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers()) {
if (nodeID.equals(masterID)) {
@ -186,9 +181,9 @@ public class MasterServerRecoverMechAction {
if (recoverStatus.get(nodeID).get(contractID) != RecoverFlag.Fine) {
continue;
}
if (MasterServerTCPAction.id2Slaves.containsKey(nodeID)) {
relyNode = MasterServerTCPAction.id2Slaves.get(nodeID);
//TODO????似乎人重连接
if (NetworkManager.instance.hasAgentConnection(nodeID)) {
relyNode = nodeID;
break;
}
}
@ -198,12 +193,12 @@ public class MasterServerRecoverMechAction {
return;
}
LOGGER.info("master向 " + relyNode.pubKey.substring(0, 5) + " 请求恢复!");
LOGGER.info("master向 " + relyNode.substring(0, 5) + " 请求恢复!");
Map<String, String> req = new HashMap<>();
req.put("action", "requestState");
req.put("contractID", contractID);
relyNode.connection.sendMsg(JsonUtil.toJson(req));
NetworkManager.instance.sendToAgent(relyNode, JsonUtil.toJson(req));
}
public static void unitModeCheck(String contractID) {
@ -217,7 +212,7 @@ public class MasterServerRecoverMechAction {
synchronized (mpci) {
int total = 0, fineNum = 0;
for (String nodeId : mpci.getMembers()) {
if (MasterServerTCPAction.id2Slaves.containsKey(nodeId)
if (NetworkManager.instance.hasAgentConnection(nodeId)
&& recoverStatus.containsKey(nodeId)
&& recoverStatus.get(nodeId).containsKey(contractID)
&& recoverStatus.get(nodeId).get(contractID) == RecoverFlag.Fine) {
@ -240,50 +235,31 @@ public class MasterServerRecoverMechAction {
+ (fineNum > Math.ceil(total / 2))
+ " mpci.unitStatus="
+ mpci.unitStatus);
if (fineNum > Math.ceil((double) total / 2)) {
LOGGER.info(
"合约" + contractID + "的集群更改模式为" + ContractUnitStatus.CommonMode.toString());
ContractUnitStatus unitStatus = ContractUnitStatus.CommonMode;
Map<String, String> req = new HashMap<String, String>();
req.put("action", "changeUnitStatus");
req.put("contractID", contractID);
req.put("mode", ContractUnitStatus.CommonMode.toString());
for (String nodeId : mpci.getMembers()) {
if (MasterServerTCPAction.id2Slaves.containsKey(nodeId)) {
SlaveNode sNode = MasterServerTCPAction.id2Slaves.get(nodeId);
LOGGER.info(
"发消息给节点 "
+ sNode.pubKey.substring(0, 5)
+ " 设置合约"
+ contractID
+ "的集群模式为CommonMode");
sNode.connection.sendMsg(JsonUtil.toJson(req));
}
}
mpci.unitStatus = ContractUnitStatus.CommonMode;
} else if (fineNum <= Math.ceil((double) total / 2)) {
LOGGER.info(
"合约" + contractID + "的集群更改模式为" + ContractUnitStatus.StableMode.toString());
Map<String, String> req = new HashMap<String, String>();
req.put("action", "changeUnitStatus");
req.put("contractID", contractID);
req.put("mode", ContractUnitStatus.StableMode.toString());
for (String nodeId : mpci.getMembers()) {
if (MasterServerTCPAction.id2Slaves.containsKey(nodeId)) {
SlaveNode sNode = MasterServerTCPAction.id2Slaves.get(nodeId);
LOGGER.info(
"发消息给节点 "
+ sNode.pubKey.substring(0, 5)
+ " 设置合约"
+ contractID
+ "的集群模式为StableMode");
sNode.connection.sendMsg(JsonUtil.toJson(req));
}
}
mpci.unitStatus = ContractUnitStatus.StableMode;
if (fineNum <= Math.ceil((double) total / 2)) {
unitStatus = ContractUnitStatus.StableMode;
}
LOGGER.info(
"合约" + contractID + "的集群更改模式为" + unitStatus);
Map<String, String> req = new HashMap<String, String>();
req.put("action", "changeUnitStatus");
req.put("contractID", contractID);
req.put("mode", unitStatus.toString());
for (String nodeId : mpci.getMembers()) {
NetworkManager.instance.sendToAgentIfConnected(nodeId, JsonUtil.toJson(req));
LOGGER.info(
"发消息给节点 "
+ nodeId.substring(0, 5)
+ " 设置合约"
+ contractID
+ "的集群模式为StableMode");
}
mpci.unitStatus = unitStatus;
}
}
@ -470,7 +446,7 @@ public class MasterServerRecoverMechAction {
Map<String, String> request = new HashMap<>();
request.put("action", "sendRecoverFinish");
request.put("contractID", contractID);
handler.sendMsg(JsonUtil.toJson(request));
cb.onResult(JsonUtil.toJson(request));
return;
}
}
@ -503,7 +479,7 @@ public class MasterServerRecoverMechAction {
String[] str = records.split("==>>");
String last = str[1];
records = str[0];
SlaveNode node = MasterServerTCPAction.id2Slaves.get(nodeId);
String path = contractID + "_temp_TransFile_" + new Random().nextLong();
File file = new File(GlobalConf.instance.projectDir + "/stateFiles/" + path);
File parent = file.getParentFile();
@ -559,7 +535,7 @@ public class MasterServerRecoverMechAction {
req.put("last", last);
}
LOGGER.info("len=" + len + " index=" + index);
node.connection.sendMsg(JsonUtil.toJson(req));
NetworkManager.instance.sendToAgent(nodeId, JsonUtil.toJson(req));
index++;
req.put("isAppend", "true");
@ -650,8 +626,7 @@ public class MasterServerRecoverMechAction {
req.put("isMaster", "true");
req.put("filePath", dir.getAbsolutePath());
SM2KeyPair keyPair = GlobalConf.instance.keyPair;
SlaveNode selfNode = MasterServerTCPAction.id2Slaves.get(keyPair.getPublicKeyStr());
selfNode.connection.sendMsg(JsonUtil.toJson(req));
NetworkManager.instance.sendToAgent(keyPair.getPublicKeyStr(), JsonUtil.toJson(req));
}
} catch (IOException e) {
e.printStackTrace();

View File

@ -4,7 +4,6 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.heartbeat.HeartBeatUtil;
import org.bdware.sc.ContractClient;
import org.bdware.sc.ContractManager;
import org.bdware.sc.ContractMeta;
@ -13,7 +12,6 @@ import org.bdware.sc.bean.ContractExecType;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.RecoverFlag;
import org.bdware.sc.units.RequestCache;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.CongestionControl;
@ -21,13 +19,14 @@ import org.bdware.server.action.Action;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.SyncResult;
import org.bdware.server.executor.RequestAllExecutor;
import org.bdware.server.tcp.TCPServerFrameHandler;
import org.bdware.server.trustedmodel.KillUnitContractResultCollector;
import org.bdware.server.trustedmodel.ResultCollector;
import org.bdware.server.trustedmodel.SlaveNode;
import org.bdware.units.NetworkManager;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@ -38,7 +37,6 @@ public class MasterServerTCPAction {
public static final org.bdware.server.action.SyncResult sync = new SyncResult();
private static final Logger LOGGER = LogManager.getLogger(MasterServerTCPAction.class);
// key is nodes' pubKey
public static Map<String, SlaveNode> id2Slaves = new ConcurrentHashMap<>();
public static Map<String, RequestCache> requestCache = new ConcurrentHashMap<>();
static {
@ -61,30 +59,10 @@ public class MasterServerTCPAction {
TimeUnit.SECONDS);
}
public final TCPServerFrameHandler handler;
TimerTask checkAliveTask;
private long lastSlavePingTime;
public MasterServerTCPAction(TCPServerFrameHandler masterFrameHandler) {
handler = masterFrameHandler;
// 心跳
// int delay = 200;
// int delay = 50;
lastSlavePingTime =
System.currentTimeMillis()
+ 5000L
+ MasterClientTCPAction.sendDelay
+ MasterClientTCPAction.checkDelay;
checkAliveTask = new HeartBeatTask(MasterClientTCPAction.sendDelay);
HeartBeatUtil.getInstance()
.schedule(
checkAliveTask,
MasterClientTCPAction.sendDelay,
MasterClientTCPAction.checkDelay);
public MasterServerTCPAction() {
}
private static void notifyNodeOffline(String contractID, String nodeID) {
public static void notifyNodeOffline(String contractID, String nodeID) {
synchronized (sync) {
for (String requestID : sync.waitObj.keySet()) {
ResultCallback cb = sync.waitObj.get(requestID);
@ -93,21 +71,23 @@ public class MasterServerTCPAction {
if (rc.getCommitter() instanceof RequestAllExecutor.ResultMerger) {
RequestAllExecutor.ResultMerger merger =
(RequestAllExecutor.ResultMerger) rc.getCommitter();
LOGGER.info("node " + nodeID + " offline! in the cluster of contract " + contractID);
LOGGER.debug("nodeID=" + nodeID + " contractID=" + contractID +
": " + merger.getInfo());
ContractResult cr =
new ContractResult(
ContractResult.Status.Exception,
new JsonPrimitive("node offline"));
JsonObject jo = new JsonObject();
jo.addProperty("data", JsonUtil.toJson(cr));
jo.addProperty("responseID", requestID);
jo.addProperty("action", "receiveTrustfullyResult");
jo.addProperty("nodeID", nodeID);
LOGGER.debug("[cb] nodeID=" + nodeID + " contractID=" + contractID +
": " + jo);
sync.wakeUp(requestID, jo.toString());
if (merger.getContractID().equals(contractID)) {
LOGGER.info("node " + nodeID + " offline! in the cluster of contract " + contractID);
LOGGER.debug("nodeID=" + nodeID + " contractID=" + contractID +
": " + merger.getInfo());
ContractResult cr =
new ContractResult(
ContractResult.Status.Exception,
new JsonPrimitive("node offline"));
JsonObject jo = new JsonObject();
jo.addProperty("data", JsonUtil.toJson(cr));
jo.addProperty("responseID", requestID);
jo.addProperty("action", "receiveTrustfullyResult");
jo.addProperty("nodeID", nodeID);
LOGGER.debug("[cb] nodeID=" + nodeID + " contractID=" + contractID +
": " + jo);
sync.wakeUp(requestID, jo.toString());
}
}
}
}
@ -147,32 +127,6 @@ public class MasterServerTCPAction {
return false;
}
public static void killContract(
ContractClient client, JsonObject request, ResultCallback resultCallback) {
LOGGER.info("[MasterServerTCPAction] killContract : ");
MultiContractMeta contractMeta =
CMActions.manager.multiContractRecorder.getMultiContractMeta(
client.getContractID());
if (null != contractMeta && contractMeta.isMaster()) { // 如果是master
ResultCollector collector =
new ResultCollector(
request.get("requestID").getAsString(),
resultCallback,
contractMeta.getMembers().length);
sync.sleep(request.get("requestID").getAsString(), collector);
for (String member : contractMeta.getMembers()) {
SlaveNode node = id2Slaves.get(member);
node.connection.sendMsg(request.toString());
}
} else {
// TODO 如果不是master 应该返回出错这里暂时只kill本地用于调试
// logger.info(
// CMActions.manager.stopContractWithOwner(
// request.get("verifiedPubKey").getAsString(),
// request.get("id").getAsString()));
}
}
private static MultiContractMeta getMPCInfo(String contractID) {
ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID);
MultiContractMeta ret =
@ -189,28 +143,18 @@ public class MasterServerTCPAction {
return ret;
}
@Action(async = true)
public void masterPing(JsonObject args, ResultCallback cb) {
lastSlavePingTime = System.currentTimeMillis();
Map<String, String> request = new HashMap<>();
request.put("action", "masterPong");
sendMsg(JsonUtil.toJson(request));
}
@Action(async = true)
public void sendCachedRequests(JsonObject args, ResultCallback cb) {
String contractID = args.get("contractID").getAsString();
int start = args.get("start").getAsInt();
int end = args.get("end").getAsInt();
String nodeID = args.get("nodeID").getAsString();
SlaveNode node = id2Slaves.get(nodeID);
RequestCache cache = getCache(contractID);
for (int i = start + 1; i < end; i++) {
if (cache != null && cache.containsKey(i)) {
String jo = cache.get(i);
LOGGER.info("Master发送缓存的请求\n" + jo);
node.connection.sendMsg(jo);
NetworkManager.instance.sendToAgent(nodeID, jo);
}
}
}
@ -239,8 +183,7 @@ public class MasterServerTCPAction {
jo.get("requestID").getAsString(), cb, contractMeta.getMembers().length);
sync.sleep(jo.get("requestID").getAsString(), collector);
for (String member : contractMeta.getMembers()) {
SlaveNode node = id2Slaves.get(member);
node.connection.sendMsg(jo.toString());
NetworkManager.instance.sendToAgent(member, jo.toString());
}
}
@ -269,33 +212,15 @@ public class MasterServerTCPAction {
// cb.onResult(JsonUtil.toJson(ret));
// }
@Action(async = true)
public void setNodeInfo(JsonObject jo, ResultCallback cb) {
LOGGER.info("[MasterServerTCPAction] setNodeInfo : ");
String nodeID = jo.get("pubKey").getAsString();
handler.pubKey = nodeID;
SlaveNode node = new SlaveNode(nodeID, this);
id2Slaves.put(nodeID, node);
LOGGER.info("[MasterServerTCPAction] setNodeInfo id2Slaves put " + nodeID.substring(0, 5));
// 通知node可以检测master
Map<String, String> request = new HashMap<>();
request.put("action", "checkMasterAlive");
sendMsg(JsonUtil.toJson(request));
}
@Action(async = true)
public void onStartContractTrustfully(JsonObject jo, ResultCallback cb) {
sync.wakeUp(jo.get("requestID").getAsString(), jo.toString());
}
public void sendMsg(String req) {
handler.sendMsg(req);
}
@Action(async = true)
public void requestContractExecution(JsonObject jo, ResultCallback cb) {
public void requestContractExecutionServer(JsonObject jo, ResultCallback cb) {
try {
int maxMasterServerLoad = CongestionControl.masterServerLoad.incrementAndGet();
if (maxMasterServerLoad > CongestionControl.maxMasterServerLoad)
@ -340,7 +265,7 @@ public class MasterServerTCPAction {
@Override
public void onResult(String str) {
Map<String, String> result = new HashMap<>();
result.put("action", "receiveContractExecution");
result.put("action", "receiveContractExecutionServer");
result.put("responseID", requestID);
result.put("data", str);
cb.onResult(JsonUtil.toJson(result));
@ -362,12 +287,10 @@ public class MasterServerTCPAction {
@Override
public void onResult(String str) {
Map<String, String> result = new HashMap<>();
result.put("action", "receiveContractExecution");
result.put("action", "receiveContractExecutionServer");
result.put("responseID", cr.get("requestID").getAsString());
result.put("data", str);
cb.onResult(JsonUtil.toJson(result));
LOGGER.debug("Return requestContractExecution:"+JsonUtil.toJson(result));
CongestionControl.masterServerLoad.decrementAndGet();
}
}, null);
@ -386,73 +309,5 @@ public class MasterServerTCPAction {
}
}
private class HeartBeatTask extends TimerTask {
int delay;
HeartBeatTask(int delay) {
this.delay = delay;
}
@Override
public void run() {
try {
long cur = System.currentTimeMillis();
// LOGGER.info(
// "Master 心跳检测 " + handler.pubKey.substring(0,5) + " "
// + new
// SimpleDateFormat("yyyy-MM-dd.HH:mm:ss").format(cur)
// + " "
// + new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss")
// .format(lastSlavePingTime));
if (cur - lastSlavePingTime >= (2L * delay)) {
LOGGER.info(
new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss").format(lastSlavePingTime)
+ " "
+ delay);
Set<String> contracts = new HashSet<>();
String nodeID = handler.pubKey;
if (nodeID == null) {
LOGGER.info("nodeID == null " + this);
HeartBeatUtil.getInstance().cancel(this);
return;
}
SlaveNode info = id2Slaves.get(nodeID);
LOGGER.info(
"Master心跳机制发现节点 "
+ nodeID.substring(0, 5)
+ " 下线! "
+ this.toString());
HeartBeatUtil.getInstance().cancel(this);
id2Slaves.remove(nodeID);
try {
info.connection.handler.ctx.close();
} catch (Exception e) {
e.printStackTrace();
}
for (String contractID :
MasterServerRecoverMechAction.recoverStatus.get(nodeID).keySet()) {
// RecoverFlag flag =
//
// MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID);
// if (flag == RecoverFlag.Fine)
MasterServerRecoverMechAction.recoverStatus
.get(nodeID)
.put(contractID, RecoverFlag.ToRecover);
contracts.add(contractID);
notifyNodeOffline(contractID, nodeID);
}
// 某个节点下线需要检测所处集群是否需要状态切换
for (String contractID : contracts) {
MasterServerRecoverMechAction.unitModeCheck(contractID);
}
} // if
} catch (Exception e) {
e.printStackTrace();
}
} // run
}
}

View File

@ -9,11 +9,9 @@ import org.bdware.sc.bean.Contract;
import org.bdware.sc.bean.ContractExecType;
import org.bdware.sc.conn.ByteUtil;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.GlobalConf;
import org.bdware.server.action.Action;
import org.bdware.server.action.CMActions;
import org.bdware.server.tcp.TCPServerFrameHandler;
import java.io.*;
import java.util.HashMap;
@ -22,11 +20,9 @@ import java.util.zip.GZIPInputStream;
public class MasterServerTransferAction {
private static final Logger LOGGER = LogManager.getLogger(MasterServerTransferAction.class);
private final TCPServerFrameHandler handler;
private Map<String, OutputStream> fileMap = new HashMap<>();
public MasterServerTransferAction(TCPServerFrameHandler h) {
handler = h;
public MasterServerTransferAction() {
}
@Action(async = true)
@ -73,7 +69,7 @@ public class MasterServerTransferAction {
Map<String, String> request = new HashMap<>();
request.put("action", "onSendAndStart");
request.put("contractID", contractID);
handler.sendMsg(JsonUtil.toJson(request));
cb.onResult(request);
} else {
String data = args.get("data").getAsString();
try {
@ -112,7 +108,7 @@ public class MasterServerTransferAction {
Map<String, String> request = new HashMap<>();
request.put("action", "onSendAndStart");
request.put("contractID", contractID);
handler.sendMsg(JsonUtil.toJson(request));
cb.onResult(request);
}
public String startByScript(String script, String contractID) {
@ -163,7 +159,7 @@ public class MasterServerTransferAction {
load(contractID, dir.getAbsolutePath());
Map<String, String> request = new HashMap<>();
request.put("action", "onSendMemory");
handler.sendMsg(JsonUtil.toJson(request));
rc.onResult(request);
} else {
String data = args.get("data").getAsString();
try {

View File

@ -1,8 +0,0 @@
package org.bdware.server.action.p2p;
/**
* 原MasterServerTCPActionMasterServerTransferActionMasterServerRecoverMechAction
* MasterClientTCPActionMasterClientTransferActionMasterClientRecoverMechAction合并
*/
public class RecoveryAction {
}

View File

@ -29,7 +29,7 @@ public class UnitsInfoAction {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("action", "getContractUnit");
jsonObject.addProperty("contractId", contractId);
ContractUnitManager.instance.send(jsonObject.toString(), new String[]{NetworkManager.instance.getTcpNodeCenter()}, callback);
ContractUnitManager.instance.send(jsonObject.toString(), new String[]{"NetworkManager.instance.getTcpNodeCenter()"}, callback);
}
}
@ -43,7 +43,7 @@ public class UnitsInfoAction {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("action", "getContractUnitMaster");
jsonObject.addProperty("content", contractId);
ContractUnitManager.instance.send(jsonObject.toString(), new String[]{NetworkManager.instance.getTcpNodeCenter()}, callback);
ContractUnitManager.instance.send(jsonObject.toString(), new String[]{"NetworkManager.instance.getTcpNodeCenter()"}, callback);
}
}

View File

@ -31,10 +31,10 @@ import org.bdware.units.msghandler.ResponseCenter;
import java.io.File;
public class ExecutionAction implements OnHashCallback {
private static final Logger LOGGER = LogManager.getLogger(ExecutionAction.class);
public class _UNUSED_ExecutionAction implements OnHashCallback {
private static final Logger LOGGER = LogManager.getLogger(_UNUSED_ExecutionAction.class);
public ExecutionAction() {
public _UNUSED_ExecutionAction() {
}
// TODO TOMerge

View File

@ -22,7 +22,7 @@ import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.MultiReqSeq;
import org.bdware.server.trustedmodel.ResultCollector;
import org.bdware.server.trustedmodel.SlaveNode;
import org.bdware.units.NetworkManager;
import java.util.HashMap;
import java.util.HashSet;
@ -87,9 +87,8 @@ public class RequestAllExecutor implements ContractExecutor {
LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID());
LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes));
for (String node : nodes) {
SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(node);
LOGGER.debug("get cmNode " + node.substring(0, 5));
if (null == cmNode) {
if (!NetworkManager.instance.hasAgentConnection(node)) {
LOGGER.warn("cmNode " + node.substring(0, 5) + " is null");
collector.onResult(
"{\"status\":\"Error\",\"result\":\"node offline\","
@ -105,10 +104,10 @@ public class RequestAllExecutor implements ContractExecutor {
+ node
+ "\","
+ "\"action\":\"onExecuteContractTrustfully\"}");
cmNode.connection.sendMsg(sendStr);
NetworkManager.instance.sendToAgent(node, sendStr);
} else {
LOGGER.info("send request to cmNode " + node.substring(0, 5));
cmNode.connection.sendMsg(sendStr);
NetworkManager.instance.sendToAgent(node, sendStr);
}
}
}
@ -116,19 +115,19 @@ public class RequestAllExecutor implements ContractExecutor {
public boolean checkCurNodeNumValid() {
String[] nodes =
CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers();
int validNode = 0;
Map<String, String> mapResult = new HashMap<>();
for (String node : nodes) {
SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(node);
if (null != cmNode
mapResult.put(node.substring(0, 5), String.format("%s %s", NetworkManager.instance.hasAgentConnection(node) + "",
MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID)));
if (NetworkManager.instance.hasAgentConnection(node)
&& MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID)
== RecoverFlag.Fine) {
validNode++;
}
}
LOGGER.info(JsonUtil.toPrettyJson(mapResult));
int c = resultCount;
// TODO
if (ContractExecType.RequestAllResponseAll.equals(type)) {
c = (int) Math.ceil((double) c / 2);
}
@ -236,6 +235,10 @@ public class RequestAllExecutor implements ContractExecutor {
order = new AtomicInteger(0);
}
public String getContractID() {
return contractID;
}
public String getInfo() {
return "contractID="
+ contractID

View File

@ -25,7 +25,7 @@ import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.MultiReqSeq;
import org.bdware.server.trustedmodel.ResultCollector;
import org.bdware.server.trustedmodel.SlaveNode;
import org.bdware.units.NetworkManager;
import java.math.BigInteger;
import java.util.HashMap;
@ -95,11 +95,8 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
LOGGER.debug("nodes:" + JsonUtil.toJson(nodes));
for (String node : nodes) {
LOGGER.info(node);
// String node = nodes.get(n);//根据下标随机获得一个
SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(node);
LOGGER.info("[sendRequests] get cmNode " + node.substring(0, 5));
// 向slave发送数据 获得结果调用onResult函数
if (cmNode == null) {
if (!NetworkManager.instance.hasAgentConnection(node)) {
LOGGER.info("[sendRequests] get cmNode " + node.substring(0, 5) + " is null");
collector.onResult(
"{\"status\":\"Error\",\"result\":\"node offline\","
@ -115,7 +112,7 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
+ node
+ "\","
+ "\"action\":\"onExecuteContractTrustfully\"}");
cmNode.connection.sendMsg(sendStr);
NetworkManager.instance.sendToAgent(node, sendStr);
} else {
LOGGER.info(
"[sendRequests] get cmNode "
@ -123,7 +120,7 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
+ " not null "
+ "RequestAllExecutor 发送请求给 "
+ node.substring(0, 5));
cmNode.connection.sendMsg(sendStr);
NetworkManager.instance.sendToAgent(node, sendStr);
}
}
}
@ -184,16 +181,13 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
// List<String> nodes = info.members;
int validNode = 0;
for (String node : nodes) {
SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(node);
if (cmNode != null
if (NetworkManager.instance.hasAgentConnection(node)
&& MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID)
== RecoverFlag.Fine) {
validNode++;
}
}
int c = resultCount;
// TODO
if (type == ContractExecType.Sharding) c = (int) Math.ceil((double) c / 2);
LOGGER.info("c=" + c + " validNode=" + validNode);
return validNode >= c;

View File

@ -8,10 +8,12 @@ import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.ControllerManager;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.SlaveNode;
import org.bdware.server.trustedmodel.AgentManager;
import org.bdware.units.NetworkManager;
import java.util.HashMap;
import java.util.Map;
@ -25,6 +27,7 @@ public class RequestOnceExecutor implements ContractExecutor {
public RequestOnceExecutor(String contractID) {
this.contractID = contractID;
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
ResultCallback cb =
@ -46,17 +49,17 @@ public class RequestOnceExecutor implements ContractExecutor {
MasterServerTCPAction.sync.sleep(requestID, cb);
String[] members = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers();
for (int i = 0; i < members.length; i++) {
LOGGER.info("[members]:" + members.length);
int size = members.length;
String nodeID = members[order.incrementAndGet() % size];
SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(nodeID);
if (cmNode != null) {
Map<String, Object> obj = new HashMap<>();
obj.put("action", "executeContractLocally");
obj.put("data", req);
obj.put("uniReqID", requestID);
cmNode.connection.sendMsg(JsonUtil.toJson(obj));
return;
}
//ADD Connect
Map<String, Object> obj = new HashMap<>();
obj.put("action", "executeContractLocally");
obj.put("requestID",requestID);
obj.put("data", req);
obj.put("uniReqID", requestID);
NetworkManager.instance.sendToAgent(nodeID,JsonUtil.toJson(obj));
return;
}
rc.onResult(
"{\"status\":\"Error\",\"result\":\"all nodes "

View File

@ -12,7 +12,7 @@ import org.bdware.sc.util.JsonUtil;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.SlaveNode;
import org.bdware.units.NetworkManager;
import java.util.HashMap;
import java.util.Map;
@ -71,15 +71,12 @@ public class ResponseOnceExecutor implements ContractExecutor {
for (int i = 0; i < members.length; i++) {
int size = members.length;
String nodeID = members[order.incrementAndGet() % size];
SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(nodeID);
if (cmNode != null) {
Map<String, Object> obj = new HashMap<>();
obj.put("action", "executeContractLocally");
obj.put("data", req);
obj.put("uniReqID", requestID);
cmNode.connection.sendMsg(JsonUtil.toJson(obj));
return true;
}
Map<String, Object> obj = new HashMap<>();
obj.put("action", "executeContractLocally");
obj.put("data", req);
obj.put("uniReqID", requestID);
NetworkManager.instance.sendToAgent(nodeID, JsonUtil.toJson(obj));
return true;
}
return false;
}

View File

@ -16,7 +16,7 @@ import org.bdware.sc.util.JsonUtil;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.SlaveNode;
import org.bdware.units.NetworkManager;
import java.math.BigInteger;
import java.util.HashMap;
@ -95,15 +95,12 @@ public class _UNUSED_RouteEnabledExecutor implements ContractExecutor {
int size = members.length;
if (hash != -1) nodeID = members[hash % size];
else nodeID = members[order.incrementAndGet() % size];
SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(nodeID);
if (cmNode != null) {
Map<String, Object> obj = new HashMap<>();
obj.put("action", "executeContractLocally");
obj.put("data", req);
obj.put("uniReqID", requestID);
cmNode.connection.sendMsg(JsonUtil.toJson(obj));
return true;
}
Map<String, Object> obj = new HashMap<>();
obj.put("action", "executeContractLocally");
obj.put("data", req);
obj.put("uniReqID", requestID);
NetworkManager.instance.sendToAgent(nodeID, JsonUtil.toJson(obj));
return true;
}
return false;
}

View File

@ -2,12 +2,6 @@ package org.bdware.server.nodecenter.client;
import com.google.gson.JsonNull;
import com.google.gson.JsonObject;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.*;
@ -22,7 +16,6 @@ import org.bdware.sc.event.REvent;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.util.HashUtil;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.CMHttpServer;
import org.bdware.server.GlobalConf;
import org.bdware.server.action.Action;
import org.bdware.server.action.CMActions;
@ -33,8 +26,8 @@ import org.bdware.server.action.p2p.MasterClientTCPAction;
import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.tcp.TCPClientFrameHandler;
import org.bdware.server.trustedmodel.MasterProxy;
import org.bdware.server.ws.DelimiterCodec;
import org.bdware.server.trustedmodel.AgentManager;
import org.bdware.units.NetworkManager;
import org.bouncycastle.pqc.math.linearalgebra.ByteUtils;
import org.zz.gmhelper.SM2KeyPair;
import org.zz.gmhelper.SM2Util;
@ -51,6 +44,8 @@ public class NodeCenterClientController implements NodeCenterConn {
public static SyncResult sync = new SyncResult();
private static boolean startCheck = false;
private final Map<String, FileOutputStream> fileMap;
public Map<String, ResultCallback> distributeReqMap = new ConcurrentHashMap<>();
private final NetNeighbors neighbors;
// public NodeCenterClientController cmClientController;
String nodeID;
@ -93,28 +88,33 @@ public class NodeCenterClientController implements NodeCenterConn {
return cr.result.getAsJsonObject();
}
public void distributeContract(String reqID, ResultCallback resultCallback, JsonObject jo) {
distributeReqMap.put(reqID, resultCallback);
sendMsg(JsonUtil.toJson(jo));
}
// ==========Handler
@Action(async = true)
public void requestConnectToMaster(JsonObject jo, ResultCallback cb) {
JsonObject keys = jo.get("key2address").getAsJsonObject();
for (String str : keys.keySet()) {
MasterProxy.slaverRouter.put(str, keys.get(str).getAsString());
NetworkManager.instance.updateAgentRouter(str, keys.get(str).getAsString());
}
String master = jo.get("master").getAsString();
LOGGER.info(" request connect to " + master.substring(0, 5));
if (jo.has("connectAll") && jo.get("connectAll").getAsBoolean()) {
for (String str : keys.keySet()) {
if (jo.has("contractID")) {
connectToMaster(str, jo.get("contractID").getAsString());
NetworkManager.instance.connectToAgent(str, jo.get("contractID").getAsString());
} else {
connectToMaster(str, null);
NetworkManager.instance.connectToAgent(str, null);
}
}
} else {
if (jo.has("contractID")) {
connectToMaster(master, jo.get("contractID").getAsString());
NetworkManager.instance.connectToAgent(master, jo.get("contractID").getAsString());
} else {
connectToMaster(master, null);
NetworkManager.instance.connectToAgent(master, null);
}
}
}
@ -352,6 +352,12 @@ public class NodeCenterClientController implements NodeCenterConn {
sync.wakeUp(jo.get("responseID").getAsString(), jo.toString());
}
@Action(async = true)
public void onQueryNodeAddress(JsonObject jo, ResultCallback cb) {
LOGGER.info("onQueryNodeAddress:" + jo.toString());
sync.wakeUp(jo.get("responseID").getAsString(), jo.toString());
}
@Override
public String routeContract(String contractID) {
LOGGER.info("[CMClientController] routeContract : " + contractID);
@ -364,6 +370,30 @@ public class NodeCenterClientController implements NodeCenterConn {
}
}
public String connectToNode(String pubKey) {
Map<String, String> req = new HashMap<>();
req.put("action", "queryNodeAddress");
req.put("pubKey", pubKey);
String requestID = System.currentTimeMillis() + "_" + (int) (Math.random() * 100000);
req.put("requestID", requestID);
sendMsg(JsonUtil.toJson(req));
//TODO use async instead?
ContractResult cr = sync.syncSleep(requestID);
LOGGER.info("connectToNode result:" + JsonUtil.toJson(cr));
if (!cr.result.equals(JsonNull.INSTANCE)) {
try {
JsonObject jo = cr.result.getAsJsonObject();
NetworkManager.instance.updateAgentRouter(
jo.get("pubKey").getAsString(), jo.get("masterAddress").getAsString());
NetworkManager.instance.connectToAgent(jo.get("pubKey").getAsString(), null);
return "success";
} catch (Exception e) {
e.printStackTrace();
}
}
return "failed";
}
@Override
public String reRouteContract(String contractID) {
try {
@ -378,9 +408,9 @@ public class NodeCenterClientController implements NodeCenterConn {
if (!cr.result.equals(JsonNull.INSTANCE)) {
try {
JsonObject jo = cr.result.getAsJsonObject();
MasterProxy.slaverRouter.put(
NetworkManager.instance.updateAgentRouter(
jo.get("pubKey").getAsString(), jo.get("masterAddress").getAsString());
connectToMaster(jo.get("pubKey").getAsString(), null);
NetworkManager.instance.connectToAgent(jo.get("pubKey").getAsString(), null);
LOGGER.info(
"[CMClientController] "
+ contractID
@ -447,49 +477,14 @@ public class NodeCenterClientController implements NodeCenterConn {
}
public void queryUnitContractsID2(String contractID, String master) {
connectToMaster(master, null); // TODO 等待确保连接成功
TCPClientFrameHandler clientHandler = MasterProxy.getHandler(master);
NetworkManager.instance.connectToAgent(master, null); // TODO 等待确保连接成功
TCPClientFrameHandler clientHandler = AgentManager.getHandler(master);
clientHandler.waitForSetNodeID();
RecoverMechTimeRecorder.connectMasterFinish = System.currentTimeMillis();
MasterClientRecoverMechAction.askForRecover(contractID, master, nodeID);
}
// TODO add syncPing
public void connectToMaster(String master, String contractID) {
LOGGER.info("[CMClientController] connectToMaster master= " + master);
// logger.debug("ConnectToMaster:" + master + "\nMasterRoute:" + slaverRouter.get(master));
Bootstrap b;
MasterProxy.MasterConnector connector = null;
synchronized (MasterProxy.CONNECTORS) {
if (!MasterProxy.CONNECTORS.containsKey(master)) {
connector = new MasterProxy.MasterConnector();
MasterProxy.CONNECTORS.put(master, connector);
}
}
if (connector != null) {
b = new Bootstrap();
TCPClientFrameHandler handler = new TCPClientFrameHandler(master);
connector.bootstrap = b;
connector.handler = handler;
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
b.group(CMHttpServer.workerGroup);
if (contractID != null) {
handler.updateContractID2Client(contractID);
}
b.channel(NioSocketChannel.class)
.handler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new DelimiterCodec()).addLast(handler);
}
});
}
MasterProxy.reconnect(master);
}
public void getNodeInfos(ResultCallback resultCallback) {
Map<String, String> request = new HashMap<>();
request.put("action", "listCMInfo");
@ -715,7 +710,7 @@ public class NodeCenterClientController implements NodeCenterConn {
for (int i = 0; i < 20; ++i) {
boolean all = true;
for (String str : nodeNames) {
if (!MasterServerTCPAction.id2Slaves.containsKey(str)) {
if (NetworkManager.instance.hasAgentConnection(str)) {
all = false;
break;
}
@ -742,12 +737,12 @@ public class NodeCenterClientController implements NodeCenterConn {
public void onDistribute(JsonObject json, ResultCallback rc) {
if (json.has("over")) {
String distributeID = json.get("distributeID").getAsString();
ResultCallback to = handler.distributeReqMap.get(distributeID);
handler.distributeReqMap.remove(distributeID);
ResultCallback to = distributeReqMap.get(distributeID);
distributeReqMap.remove(distributeID);
to.onResult(json.get("content").getAsString());
} else {
String distributeID = json.get("distributeID").getAsString();
handler.distributeReqMap.get(distributeID).onResult(json.get("content").getAsString());
distributeReqMap.get(distributeID).onResult(json.get("content").getAsString());
}
}
@ -758,7 +753,7 @@ public class NodeCenterClientController implements NodeCenterConn {
String pubKey = jo.get("nodeID").getAsString();
String contractID = jo.get("contractID").getAsString();
String address = jo.get("address").getAsString();
MasterProxy.slaverRouter.put(pubKey, address);
NetworkManager.instance.updateAgentRouter(pubKey, address);
ContractClient cc = CMActions.manager.getClient(contractID);
if (cc == null) {
@ -769,7 +764,7 @@ public class NodeCenterClientController implements NodeCenterConn {
if (null != pubKey) {
// step1 connect to node
LOGGER.info("pubKey=" + pubKey);
connectToMaster(pubKey, null);
NetworkManager.instance.connectToAgent(pubKey, null);
CMActions.manager.masterStub.transferToOtherNode(pubKey, contractID);
} else {
LOGGER.info("pubKey is null.");

View File

@ -29,8 +29,7 @@ public class NodeCenterClientHandler extends SimpleChannelInboundHandler<Object>
private static final Logger LOGGER = LogManager.getLogger(NodeCenterClientHandler.class);
public static String[] clientToClusterPlugins;
public boolean hasPermission;
public NodeCenterClientController controller;
public Map<String, ResultCallback> distributeReqMap = new ConcurrentHashMap<>();
private NodeCenterClientController controller;
Channel channel;
// UDPTrustfulExecutor udpExecutor;
// RecoverMechExecutor recoverMechExecutor;
@ -144,4 +143,8 @@ public class NodeCenterClientHandler extends SimpleChannelInboundHandler<Object>
channel = null;
}
}
public NodeCenterClientController getController() {
return controller;
}
}

View File

@ -14,10 +14,8 @@ import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.CongestionControl;
import org.bdware.server.action.ActionExecutor;
import org.bdware.server.action.p2p.MasterClientRecoverMechAction;
import org.bdware.server.action.p2p.MasterClientTCPAction;
import org.bdware.server.action.p2p.MasterClientTransferAction;
import org.bdware.server.trustedmodel.MasterProxy;
import org.bdware.server.action.p2p.*;
import org.bdware.units.NetworkManager;
import java.io.ByteArrayOutputStream;
import java.io.InputStreamReader;
@ -32,23 +30,19 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger LOGGER = LogManager.getLogger(TCPClientFrameHandler.class);
private static final ExecutorService executorService = Executors.newFixedThreadPool(10);
public static String[] clientToAgentPlugins;
public String pubKey;
private final AliveCheckClientAction aliveCheckClientAction;
public ActionExecutor<ResultCallback, JsonObject> ae;
MasterClientTCPAction actions;
MasterClientRecoverMechAction recoverActions;
MasterClientTransferAction transferActions;
ChannelHandlerContext ctx;
private Channel channel;
private boolean isConnected;
private String master; // master node pubKey
public TCPClientFrameHandler(String masterPubkey) {
master = masterPubkey;
actions = new MasterClientTCPAction(this, master);
recoverActions = new MasterClientRecoverMechAction(this, actions);
transferActions = new MasterClientTransferAction(this, master, actions);
aliveCheckClientAction = new AliveCheckClientAction(masterPubkey);
ae = new ActionExecutor<>(
executorService, actions, recoverActions, transferActions);
executorService, aliveCheckClientAction,
new MasterClientTCPAction(), new MasterClientRecoverMechAction(), MasterClientTransferAction.instance,
new MasterServerRecoverMechAction(), new MasterServerTransferAction(), new MasterServerTCPAction());
for (String str : clientToAgentPlugins) {
Object obj = createInstanceByClzName(str);
ae.appendHandler(obj);
@ -65,20 +59,19 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler<Object> {
}
public void updateContractID2Client(String contract) {
// 如果该合约正在和旧的master保持连接
if (MasterClientTCPAction.contractID2MasterInfo.containsKey(contract)) {
MasterClientTCPAction former =
MasterClientTCPAction.contractID2MasterInfo.get(contract);
former.closeMaster();
}
MasterClientTCPAction.contractID2MasterInfo.put(contract, actions);
}
// public void updateContractID2Client(String contract) {
// // 如果该合约正在和旧的master保持连接
// if (MasterClientTCPAction.contractID2MasterInfo.containsKey(contract)) {
// MasterClientTCPAction former =
// MasterClientTCPAction.contractID2MasterInfo.get(contract);
// former.closeMaster();
// }
//
// MasterClientTCPAction.contractID2MasterInfo.put(contract, actions);
// }
public void close() {
try {
isConnected = false;
if (channel != null) channel.close();
} catch (Exception e) {
e.printStackTrace();
@ -87,17 +80,12 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler<Object> {
}
}
public void transferInstance(String contractID) {
LOGGER.info("[MasterCLientFrameHandler] transferInstance contractID=" + contractID);
transferActions.transferInstance(contractID);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
channel = ctx.channel();
isConnected = true;
this.ctx = ctx;
actions.init(this);
aliveCheckClientAction.init();
CongestionControl.slaveCounter.getAndIncrement();
}
@ -112,7 +100,7 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler<Object> {
ByteBuf bb = (ByteBuf) frame;
JsonObject arg;
try {
arg = JsonUtil.parseObject(new InputStreamReader(new ByteBufInputStream(bb)));
arg = JsonUtil.parseReaderAsJsonObject(new InputStreamReader(new ByteBufInputStream(bb)));
} catch (Exception e) {
e.printStackTrace();
Response response = new Response();
@ -123,9 +111,8 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler<Object> {
}
Response response;
try {
LOGGER.info("====== TCPClientFrameHandler:" + arg.toString());
final String action = arg.get("action").getAsString();
// logger.info("[MasterClientFrameHandler] handle:" + arg.toString());
ae.handle(
action,
arg,
@ -172,7 +159,7 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler<Object> {
public void sendMsg(String json) {
for (int i = 0; i < 3 && !isOpen(); i++) {
try {
MasterProxy.reconnect(master);
NetworkManager.reconnectAgent(master);
Thread.sleep(250);
} catch (InterruptedException e) {
e.printStackTrace();
@ -195,7 +182,7 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler<Object> {
}
public void waitForSetNodeID() {
actions.waitForSetNodeID();
aliveCheckClientAction.waitForSetNodeID();
}
static class Response {

View File

@ -14,10 +14,7 @@ import org.bdware.sc.util.JsonUtil;
import org.bdware.server.CongestionControl;
import org.bdware.server.action.Action;
import org.bdware.server.action.ActionExecutor;
import org.bdware.server.action.p2p.ExecutionAction;
import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.action.p2p.MasterServerTransferAction;
import org.bdware.server.action.p2p.*;
import java.io.ByteArrayOutputStream;
import java.io.InputStreamReader;
@ -28,22 +25,14 @@ import java.util.concurrent.Executors;
public class TCPServerFrameHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger LOGGER = LogManager.getLogger(TCPServerFrameHandler.class);
static ExecutorService executorService = Executors.newFixedThreadPool(10);
private final ExecutionAction executionActions;
public String pubKey; //slave's pubKey
public ChannelHandlerContext ctx;
public ActionExecutor<ResultCallback, JsonObject> ae;
MasterServerTCPAction actions;
MasterServerRecoverMechAction recoverActions;
MasterServerTransferAction transferAction;
public TCPServerFrameHandler() {
actions = new MasterServerTCPAction(this);
recoverActions = new MasterServerRecoverMechAction(this);
transferAction = new MasterServerTransferAction(this);
executionActions = new ExecutionAction();
ae =
new ActionExecutor<ResultCallback, JsonObject>(
executorService, actions, recoverActions, executionActions, transferAction) {
executorService, new AliveCheckServerAction(this),
new MasterClientTCPAction(), new MasterClientRecoverMechAction(), MasterClientTransferAction.instance,
new MasterServerRecoverMechAction(), new MasterServerTransferAction(), new MasterServerTCPAction() ) {
@Override
public boolean checkPermission(
Action a, final JsonObject args, long permission) {
@ -81,9 +70,6 @@ public class TCPServerFrameHandler extends SimpleChannelInboundHandler<Object> {
};
}
public void setPermission(long l) {
ae.permission = l;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
@ -175,14 +161,6 @@ public class TCPServerFrameHandler extends SimpleChannelInboundHandler<Object> {
}
}
public boolean isOpen() {
return ctx.channel().isOpen();
}
static class Log {
String pubKey;
String action;
}
static class Response {
public String cid;

View File

@ -0,0 +1,79 @@
package org.bdware.server.trustedmodel;
import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.AgentPeerManagerIntf;
import org.bdware.sc.ContractMeta;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.CongestionControl;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterClientTransferAction;
import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.tcp.TCPClientFrameHandler;
import org.bdware.units.NetworkManager;
import java.util.HashMap;
import java.util.Map;
public class AgentManager implements AgentPeerManagerIntf {
// key is masters' pubKey
private static final Logger LOGGER = LogManager.getLogger(AgentManager.class);
// nodePubkey该node作为master的地址
public static TCPClientFrameHandler getHandler(String masterID) {
return NetworkManager.CONNECTORS.get(masterID).handler;
}
@Override
public void executeByOtherNodeAsync(String pubKey, ContractRequest c, ResultCallback cb) {
LOGGER.error(pubKey + " " + c.getRequestID() + " " + c.getContractID() + " cb:" + cb);
int maxMasterProxyLoad = CongestionControl.masterProxyLoad.incrementAndGet();
if (maxMasterProxyLoad > CongestionControl.maxMasterProxyLoad)
CongestionControl.maxMasterProxyLoad = maxMasterProxyLoad;
if (CongestionControl.slaveControl()) {
ContractResult cr =
new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("canceled because of queue too long"));
cb.onResult(JsonUtil.parseObjectAsJsonObject(cr));
CongestionControl.masterProxyLoad.decrementAndGet();
return;
}
MasterServerTCPAction.sync.sleep(
c.getRequestID(),
new ResultCallback() {
@Override
public void onResult(String str) {
cb.onResult(JsonUtil.parseStringAsJsonObject(str));
CongestionControl.masterProxyLoad.decrementAndGet();
}
});
Map<String, Object> req = new HashMap<>();
req.put("action", "requestContractExecutionServer");
req.put("requestID", c.getRequestID());
req.put("data", c);
NetworkManager.instance.sendToAgent(pubKey, JsonUtil.toJson(req));
}
@Override
public boolean hasAgentConnection(String pubKey) {
return NetworkManager.instance.hasAgentConnection(pubKey);
}
@Override
public void transferToOtherNode(String pubKey, String contractID) {
// TODO 问题1合约的ypk或者script怎么获取ypk如果是包含私有路径的可以在Contract中设置一个字段启动的时候写入吗 目前认为这个从前端传入
// TODO 问题2 转移过程中进度返回给哪个前端显示需要显示吗
LOGGER.info("transferToOtherNode : pubKey=" + pubKey + " contractID=" + contractID);
ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID);
MasterClientTransferAction.instance.transferInstance(pubKey, meta.getID());
}
}

View File

@ -1,200 +0,0 @@
package org.bdware.server.trustedmodel;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import io.netty.bootstrap.Bootstrap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.ContractClient;
import org.bdware.sc.ContractManager;
import org.bdware.sc.ContractResult;
import org.bdware.sc.MasterStub;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.CongestionControl;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.tcp.TCPClientFrameHandler;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class MasterProxy implements MasterStub {
// key is masters' pubKey
public static final Map<String, MasterConnector> CONNECTORS = new ConcurrentHashMap<>();
private static final Logger LOGGER = LogManager.getLogger(MasterProxy.class);
// nodePubkey该node作为master的地址
public static Map<String, String> slaverRouter = new HashMap<>();
public static TCPClientFrameHandler getHandler(String masterID) {
return CONNECTORS.get(masterID).handler;
}
public static void reconnect(String master) {
LOGGER.debug(
"[MasterProxy] reconnect:"
+ JsonUtil.toJson(slaverRouter)
+ "\nmaster="
+ master);
try {
MasterConnector conn;
synchronized (conn = CONNECTORS.get(master)) {
if (!conn.handler.isOpen()) {
String[] ipAndPort = slaverRouter.get(master).split(":");
conn.bootstrap
.connect(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
.sync()
.channel();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void executeByOtherNodeAsync(String pubKey, ContractRequest c, ResultCallback cb) {
MasterConnector handler = CONNECTORS.get(pubKey);
LOGGER.error(pubKey + " " + c.getRequestID() + " " + c.getContractID() + " cb:" + cb);
int maxMasterProxyLoad = CongestionControl.masterProxyLoad.incrementAndGet();
if (maxMasterProxyLoad > CongestionControl.maxMasterProxyLoad)
CongestionControl.maxMasterProxyLoad = maxMasterProxyLoad;
if (CongestionControl.slaveControl()) {
ContractResult cr =
new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("canceled because of queue too long"));
cb.onResult(JsonUtil.parseObjectAsJsonObject(cr));
CongestionControl.masterProxyLoad.decrementAndGet();
return;
}
MasterServerTCPAction.sync.sleep(
c.getRequestID(),
new ResultCallback() {
@Override
public void onResult(String str) {
cb.onResult(JsonUtil.parseStringAsJsonObject(str));
CongestionControl.masterProxyLoad.decrementAndGet();
}
});
Map<String, Object> req = new HashMap<>();
req.put("action", "requestContractExecution");
req.put("requestID", c.getRequestID());
req.put("data", c);
handler.handler.sendMsg(JsonUtil.toJson(req));
}
@Override
public boolean hasConnection(String pubKey) {
if (pubKey != null) return CONNECTORS.containsKey(pubKey);
return true;
}
// @Override
// public ContractResult executeByOtherNode(String pubKey, ContractRequest c) {
// // get contract's master node
// try {
// int maxMasterProxyLoad = CongestionControl.masterProxyLoad.addAndGet(700);
// if (maxMasterProxyLoad > CongestionControl.maxMasterProxyLoad)
// CongestionControl.maxMasterProxyLoad = maxMasterProxyLoad;
// MasterConnector handler = connectors.get(pubKey);
// if (CongestionControl.slaveControl()) {
// ContractResult cr =
// new ContractResult(
// ContractResult.Status.Error,
// new JsonPrimitive("sync call canceled because of queue too long"));
//
// return cr;
// }
// // TODO 如果handler是空需要先去连接这个master
// String ret = handler.handler.requestContractExecution(c);
// LOGGER.info("[MasterProxy] executeByOtherNodes 结果是 " + ret);
// // TODO
// // 这回来可能带了nodeIDs和executeTimes
// // 在这里是否需要保留nodeIDs和executeTimes??
// return JsonUtil.fromJson(ret, ContractResult.class);
// } finally {
// CongestionControl.masterProxyLoad.addAndGet(-700);
// }
// }
// @Override
// public String executeGlobally(ContractRequest c, OnHashCallback cb) {
// StrCollector resultCallback = new StrCollector();
// CMActions.manager.executeContractInternal(c, resultCallback, cb);
// resultCallback.waitForResult();
// return resultCallback.strRet;
// }
// transfer contract instance
@Override
public void transferToOtherNode(String pubKey, String contractID) {
// TODO 问题1合约的ypk或者script怎么获取ypk如果是包含私有路径的可以在Contract中设置一个字段启动的时候写入吗 目前认为这个从前端传入
// TODO 问题2 转移过程中进度返回给哪个前端显示需要显示吗
LOGGER.info("transferToOtherNode : pubKey=" + pubKey + " contractID=" + contractID);
MasterConnector handler = CONNECTORS.get(pubKey);
if (handler == null) {
LOGGER.info("connect failed.");
return;
}
contractID = CMActions.manager.getContractIDByName(contractID);
handler.handler.transferInstance(contractID);
}
public static class MasterConnector {
public Bootstrap bootstrap;
public TCPClientFrameHandler handler;
}
static class StrCollector extends ResultCallback {
String strRet = "{\"data\":\"Timeout\"}";
boolean hasResult = false;
long start = System.currentTimeMillis();
@Override
public void onResult(String str) {
synchronized (this) {
strRet = str;
hasResult = true;
LOGGER.debug(
"[StrCollector] onResultTakes:" + (System.currentTimeMillis() - start));
this.notifyAll();
}
LOGGER.debug(
"[StrCollector] onResultTakesNotifyDone:"
+ (System.currentTimeMillis() - start));
}
public void waitForResult() {
synchronized (this) {
try {
if (!hasResult) {
this.wait(5000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
LOGGER.debug("[StrCollector] waitTakes:" + (System.currentTimeMillis() - start));
}
public void waitForResultCounter(int count) {
synchronized (this) {
try {
if (!hasResult) {
this.wait(20000L * count);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
LOGGER.debug("[StrCollector] waitTakes:" + (System.currentTimeMillis() - start));
}
}
}

View File

@ -7,13 +7,14 @@ import org.apache.logging.log4j.Logger;
import org.bdware.sc.ContractManager;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.RecoverFlag;
import org.bdware.sc.util.HashUtil;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.units.NetworkManager;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@ -76,7 +77,7 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
}
@Override
public void execute(String requestID, ResultCallback rc, ContractRequest req) {
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hrc) {
if (executedTxs.containsKey(requestID)) {
rc.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
@ -94,12 +95,6 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
}
}
@Override
public void close() {
this.future.cancel(false);
this.running = false;
LOGGER.info("destruct executor of contract " + meta.getContractID());
}
public void execute(String blockStr) {
Block block = JsonUtil.fromJson(blockStr, Block.class);
@ -158,11 +153,9 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
req.addProperty("contractID", this.meta.getContractID());
String reqStr = req.toString();
for (String node : nodes) {
SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(node);
if (cmNode != null &&
MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID())
== RecoverFlag.Fine) {
cmNode.connection.sendMsg(reqStr);
if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID())
== RecoverFlag.Fine) {
NetworkManager.instance.sendToAgent(node, reqStr);
}
}
}

View File

@ -14,38 +14,5 @@ public class SlaveNode {
this.pubKey = pubKey;
}
public boolean isAlive() {
if (connection == null) {
LOGGER.info(pubKey.substring(0, 5) + " is not alive, connection is null");
return false;
}
try {
if (!connection.handler.isOpen()) {
LOGGER.info(pubKey.substring(0, 5) + " is not open, try is 3seconds later");
return isAliveAfter3();
} else {
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
private boolean isAliveAfter3() {
try {
Thread.sleep(3000L);
if (connection.handler.isOpen()) {
LOGGER.info(pubKey.substring(0, 5) + " is open, after 3seconds later");
return true;
}
LOGGER.info(pubKey.substring(0, 5) + " is not open, after 3seconds later");
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
}

View File

@ -12,17 +12,19 @@ import io.netty.handler.timeout.IdleStateHandler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.ContractManager;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.CMHttpServer;
import org.bdware.server.ControllerManager;
import org.bdware.server.GlobalConf;
import org.bdware.server.action.CMActions;
import org.bdware.server.nodecenter.client.NodeCenterClientHandler;
import org.bdware.server.tcp.TCPServerFrameHandler;
import org.bdware.server.trustedmodel.SlaveNode;
import org.bdware.server.ws.DelimiterCodec;
import org.bdware.units.enums.NetworkType;
import org.bdware.units.grpc.BDLedgerContract.UnitMessage;
import org.bdware.units.grpc.JavaContractServiceGrpcServer;
import org.bdware.units.msghandler.UnitMessageHandler;
import org.bdware.units.tcp.TCPClientFrameHandler;
import org.bdware.units.tcp.TCPUtils;
import java.net.InetSocketAddress;
import java.net.URI;
@ -30,6 +32,7 @@ import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
@ -38,40 +41,45 @@ import java.util.concurrent.TimeUnit;
* @author OliveDS (Shuang Deng)
*/
public class NetworkManager {
public static class AgentConnector {
public Bootstrap bootstrap;
public org.bdware.server.tcp.TCPClientFrameHandler handler;
}
//Manage server->client connection;
public static final Map<String, AgentConnector> CONNECTORS = new ConcurrentHashMap<>();
//Manage client->server connection;
public static final Map<String, TCPServerFrameHandler> SERVERCONNECTORS = new ConcurrentHashMap<>();
private static Map<String, String> slaverRouter = new HashMap<>();
public static Map<String, SlaveNode> id2Slaves = new ConcurrentHashMap<>();
public static final String NODE_CENTER_CLIENT = "NODE_CENTER_CLIENT";
public static final String P2P_GRPC_CLIENT = "P2P_GRPC_CLIENT";
private static final Logger LOGGER = LogManager.getLogger(NetworkManager.class);
public static NetworkManager instance = new NetworkManager();
private final Map<String, String> peerID2TCPAddress;
private final Map<String, TCPClientFrameHandler> tcpClientMap;
private String tcpNodeCenter;
private String tcpSelf;
// connection to node center
private NodeCenterClientHandler nodeCenterClientHandler;
private final Map<String, String> peerID2TCPAddress;
public NetworkManager() {
peerID2TCPAddress = new HashMap<>();
tcpClientMap = new HashMap<>();
}
public void initP2P(int port) {
JavaContractServiceGrpcServer.init(port);
}
public void initTCP(int tcpPort, EventLoopGroup workerGroup) {
createTCPServer(tcpPort, workerGroup);
connectToTCPNodeCenter();
}
public boolean containsTcpClient(String peer) {
return tcpClientMap.containsKey(peer);
public void initP2P(int port) {
JavaContractServiceGrpcServer.init(port);
}
private void connectToTCPNodeCenter() {
final Bootstrap b = new Bootstrap();
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
nodeCenterClientHandler = new NodeCenterClientHandler();
CMActions.manager.nodeCenterConn = nodeCenterClientHandler.controller;
nodeCenterClientHandler = ControllerManager.createNodeCenterClientHandler();
EventLoopGroup group = new NioEventLoopGroup();
b.group(group);
b.channel(NioSocketChannel.class)
@ -97,7 +105,7 @@ public class NetworkManager {
LOGGER.error("creating uri failed! " + e1.getMessage());
}
if (!nodeCenterClientHandler.isConnected()
|| !nodeCenterClientHandler.controller.syncPing()) {
|| !ControllerManager.getNodeCenterController().syncPing()) {
nodeCenterClientHandler.close();
assert null != uri;
b.connect(uri.getHost(), uri.getPort()).sync().channel();
@ -108,6 +116,8 @@ public class NetworkManager {
+ uri.getPort());
}
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("connecting to node center failed! " + e.getMessage());
}
},
@ -143,6 +153,154 @@ public class NetworkManager {
}
}
//TODO Remove in future
public void sendToNodeCenter(String msg) {
nodeCenterClientHandler.sendMsg(msg);
}
public boolean isConnectedToNodeCenter() {
return null != nodeCenterClientHandler && nodeCenterClientHandler.isConnected();
}
public void waitForNodeCenterConnected() {
for (int i = 0;
i < 10 && null != nodeCenterClientHandler && !nodeCenterClientHandler.isConnected();
i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void reInitNodeCenter() {
if (null != nodeCenterClientHandler) {
nodeCenterClientHandler.close();
}
}
//----------AgentNetworkManagement
public void updateAgentRouter(String nodeID, String address) {
slaverRouter.put(nodeID, address);
}
public void registerConnection(String nodeID, TCPServerFrameHandler handler) {
LOGGER.info("nodeID:"+nodeID+" connected!!");
SERVERCONNECTORS.put(nodeID, handler);
}
public void closeAgent(String agentPubkey) {
//TODO
// if (handler != null) {
// try {
// handler.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
//AliveCheckAction.closeMaster();
NetworkManager.CONNECTORS.remove(agentPubkey);
}
public void connectToAgent(String master, String contractID) {
LOGGER.info("[CMClientController] connectToMaster master= " + master);
// logger.debug("ConnectToMaster:" + master + "\nMasterRoute:" + slaverRouter.get(master));
Bootstrap b;
AgentConnector connector = null;
synchronized (CONNECTORS) {
if (!CONNECTORS.containsKey(master)) {
connector = new AgentConnector();
CONNECTORS.put(master, connector);
}
}
if (connector != null) {
b = new Bootstrap();
org.bdware.server.tcp.TCPClientFrameHandler handler = new org.bdware.server.tcp.TCPClientFrameHandler(master);
connector.bootstrap = b;
connector.handler = handler;
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
b.group(CMHttpServer.workerGroup);
b.channel(NioSocketChannel.class)
.handler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new DelimiterCodec()).addLast(handler);
}
});
}
reconnectAgent(master);
}
public static void reconnectAgent(String master) {
LOGGER.debug(
"[MasterProxy] reconnect:"
+ JsonUtil.toJson(slaverRouter)
+ "\nmaster="
+ master);
try {
NetworkManager.AgentConnector conn;
synchronized (conn = NetworkManager.CONNECTORS.get(master)) {
if (!conn.handler.isOpen()) {
String[] ipAndPort = slaverRouter.get(master).split(":");
conn.bootstrap
.connect(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
.sync()
.channel();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendToAgent(String pubkey, String content) {
if (sendToAgentByServer(pubkey, content)) {
return;
}
if (!hasAgentConnection(pubkey)) {
nodeCenterClientHandler.getController().connectToNode(pubkey);
}
CONNECTORS.get(pubkey).handler.sendMsg(content);
}
private boolean sendToAgentByServer(String pubkey, String content) {
TCPServerFrameHandler handler = SERVERCONNECTORS.get(pubkey);
if (handler == null) return false;
try {
handler.sendMsg(content);
return true;
} catch (Exception e) {
e.printStackTrace();
LOGGER.info("Remove ServerConnection,client pubkey:" + pubkey);
}
return false;
}
public void sendToAgentIfConnected(String pubkey, String content) {
try {
if (hasAgentConnection(pubkey))
CONNECTORS.get(pubkey).handler.sendMsg(content);
} catch (Exception e) {
e.printStackTrace();
}
}
public boolean hasAgentConnection(String pubKey) {
if (pubKey != null) {
if (SERVERCONNECTORS.containsKey(pubKey))
return true;
}
return CONNECTORS.containsKey(pubKey);
}
//-------UNUSED TOMerge------------
//UNUSED
public TCPClientFrameHandler createTCPClient(String peer, String ipPort)
throws InterruptedException {
if (peer.equals(GlobalConf.instance.peerID)) {
@ -155,7 +313,7 @@ public class NetworkManager {
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
final TCPClientFrameHandler handler = new TCPClientFrameHandler(peer);
tcpClientMap.put(peer, handler);
//tcpClientMap.put(peer, handler);
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
@ -182,15 +340,8 @@ public class NetworkManager {
return handler;
}
public NodeCenterClientHandler getNodeCenterClientHandler() {
return nodeCenterClientHandler;
}
public void sendToNodeCenter(String msg) {
nodeCenterClientHandler.sendMsg(msg);
}
/**
* send to all kinds including special receivers
* UNUSED send to all kinds including special receivers
*
* @param unitMessage unit message
* @param networkType type of network
@ -209,7 +360,7 @@ public class NetworkManager {
/**
* send to TCP nodes, if fail send by p2p
* UNUSED send to TCP nodes, if fail send by p2p
*
* @param msg unit message
*/
@ -225,10 +376,10 @@ public class NetworkManager {
LOGGER.info("send msg to itself " + msg);
continue;
}
tcpClientFrameHandler = NetworkManager.instance.tcpClientMap.getOrDefault(peer, null);
if (null == tcpClientFrameHandler) {
// tcpClientFrameHandler = NetworkManager.instance.tcpClientMap.getOrDefault(peer, null);
if (peer != null) {
if (peerID2TCPAddress.containsKey(peer)) {
recreateTCPClient(peer);
//recreateTCPClient(peer);
// instance.tcpClientMap.put(peer, tcpClientFrameHandler);
UnitMessage unitMessage =
msg.toBuilder().clearReceiver().addReceiver(peer).build();
@ -243,79 +394,11 @@ public class NetworkManager {
continue;
}
LOGGER.info("send msg by tcp to " + peer);
tcpClientFrameHandler.sendMsg(msg);
// tcpClientFrameHandler.sendMsg(msg);
}
}
private TCPClientFrameHandler recreateTCPClient(String peer) {
TCPClientFrameHandler tcpClientFrameHandler;
try {
tcpClientFrameHandler = instance.createTCPClient(peer, peerID2TCPAddress.get(peer));
return tcpClientFrameHandler;
} catch (Exception e) {
LOGGER.error("[NetworkManager] createTCPClient error");
e.printStackTrace();
return null;
}
}
public void closeConnection(String peer) {
if (TCPUtils.isTCPAddress(peer)) {
NetworkManager.instance.tcpClientMap.remove(peer);
} else {
LOGGER.debug("[NetworkManager] closeConnection p2p " + peer);
}
}
public void updateLocalContractToNodeCenter() {
if (null != nodeCenterClientHandler) {
nodeCenterClientHandler.controller.updateContract();
}
}
public String getTcpNodeCenter() {
return instance.tcpNodeCenter;
}
public void setTcpNodeCenter(String tcpNodeCenter) {
instance.tcpNodeCenter = tcpNodeCenter;
}
public String getTcpSelf() {
return instance.tcpSelf;
}
public void setTcpSelf(String tcpSelf) {
instance.tcpSelf = tcpSelf;
}
public boolean isConnectedToNodeCenter() {
return null != nodeCenterClientHandler;
}
public NodeCenterClientHandler getNCClientHandler() {
return nodeCenterClientHandler;
}
public void sendTo(UnitMessage unitMessage, String symbol) {
//
}
public void waitForNodeCenterConnected() {
for (int i = 0;
i < 10 && null != nodeCenterClientHandler && !nodeCenterClientHandler.isConnected();
i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void reInitNodeCenter() {
if (null != nodeCenterClientHandler) {
nodeCenterClientHandler.close();
}
}
}

View File

@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.ControllerManager;
import org.bdware.server.GlobalConf;
import org.bdware.server.action.Action;
import org.bdware.server.action.ActionExecutor;
@ -46,8 +47,7 @@ public class CommunicationManager extends BaseFunctionManager {
return instance;
}
instance = new CommunicationManager();
instance.nodeCenterClientController = new NodeCenterClientController(GlobalConf.getNodeID());
instance.nodeCenterClientController.init(NetworkManager.instance.getNCClientHandler());
instance.nodeCenterClientController = ControllerManager.getNodeCenterController();
instance.communicationAction = new CommunicationAction();
instance.ae = new ActionExecutor<ResultCallback, JsonObject>(
executorService,

View File

@ -11,10 +11,10 @@ import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.RequestCache;
import org.bdware.sc.units.RespCache;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.ControllerManager;
import org.bdware.server.GlobalConf;
import org.bdware.server.action.Action;
import org.bdware.server.action.ActionExecutor;
import org.bdware.server.action.p2p.ExecutionAction;
import org.bdware.units.NetworkManager;
import org.bdware.units.beans.MultiPointContractInfo;
import org.bdware.units.beans.UnitContractMessage;
@ -46,7 +46,6 @@ public class ExecutionManager extends BaseFunctionManager {
private final BDLedgerContract.UnitMessageType UNIT_MESSAGE_TYPE =
BDLedgerContract.UnitMessageType.UnitContractMessage;
public Map<String, String> contractID2Sequencing = new HashMap<>();
protected ExecutionAction executionAction;
// protected MasterServerTCPAction masterActions;
// protected MasterClientTCPAction clientActions;
protected ActionExecutor<ResultCallback, JsonObject> actionExecutor;
@ -65,10 +64,9 @@ public class ExecutionManager extends BaseFunctionManager {
instance = new ExecutionManager();
// instance.masterActions = new MasterServerTCPAction();
// instance.clientActions = new MasterClientTCPAction();
instance.executionAction = new ExecutionAction();
instance.actionExecutor =
new ActionExecutor<ResultCallback, JsonObject>(
executorService, instance.executionAction) { // , instance.masterActions,
executorService) { // , instance.masterActions,
// instance.clientActions) {
@Override
public boolean checkPermission(
@ -144,7 +142,7 @@ public class ExecutionManager extends BaseFunctionManager {
* 需要最后调用在动作结束后否则调用的方法后面不会执行
*/
public void updateLocalContractToNodeCenter() {
NetworkManager.instance.updateLocalContractToNodeCenter();
ControllerManager.getNodeCenterController().updateContract();
// List<?> info = CMActions.manager.getContractDespList();
// JsonObject jo = new JsonObject();
// jo.addProperty("action", "updateContract");