merge pbft algorithm

This commit is contained in:
CaiHQ 2021-11-22 11:50:39 +08:00
parent 18f3fe7f5c
commit 969e456687
15 changed files with 570 additions and 48 deletions

View File

@ -1219,7 +1219,7 @@ public class CMActions implements OnHashCallback {
return;
}
if (cl.isUnit()) {
if (cl.contractMeta.contract.getType().isUnit()) {
LOGGER.debug("killContractProcess : 集群合约 kill");
killContractByMaster(rc.getContractID(), args, resultCallback);

View File

@ -25,7 +25,10 @@ import org.bdware.units.NetworkManager;
import org.zz.gmhelper.SM2KeyPair;
import org.zz.gmhelper.SM2Util;
import java.util.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@ -41,6 +44,7 @@ public class MasterWSAction {
public static boolean hostMaster(String contractID) {
return CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).isMaster();
}
private boolean waitForConnection(Set<String> nodeNames) {
LOGGER.info("waitForAllNodes:" + JsonUtil.toJson(nodeNames));
for (int i = 0; i < 5; i++) {
@ -66,6 +70,7 @@ public class MasterWSAction {
LOGGER.info("waitForAllNodes return false;");
return false;
}
@Action(async = true, userPermission = 1L << 26)
// zyx modified
public void startContractMultiPoint(JsonObject args, final ResultCallback rc) {
@ -262,6 +267,7 @@ public class MasterWSAction {
case RequestAllResponseAll:
case Sharding:
case SelfAdaptiveSharding:
case PBFT:
contract.setNumOfCopies(nodeSize);
break;
default:

View File

@ -54,7 +54,6 @@ public class AliveCheckClientAction {
HeartBeatUtil.getInstance().cancel(sendPingTask);
sendPingTask = null;
}
NetworkManager.instance.closeAgent(masterPubkey);
}
public void checkMasterAlive(ResultCallback rc) {
@ -125,7 +124,7 @@ public class AliveCheckClientAction {
System.currentTimeMillis();
}
}
closeMaster();
NetworkManager.instance.closeAgent(masterPubkey);
} // if
}
};

View File

@ -20,7 +20,7 @@ public class AliveCheckServerAction {
TimerTask checkAliveTask;
private long lastSlavePingTime;
String pubKey;
public String pubKey;
public AliveCheckServerAction(TCPServerFrameHandler handler) {
lastSlavePingTime =
@ -38,11 +38,20 @@ public class AliveCheckServerAction {
this.handler = handler;
}
public void close() {
if (checkAliveTask != null) {
HeartBeatUtil.getInstance().cancel(checkAliveTask);
checkAliveTask = null;
}
}
private class HeartBeatTask extends TimerTask {
int delay;
HeartBeatTask(int delay) {
this.delay = delay;
}
@Override
public void run() {
try {
@ -80,6 +89,7 @@ public class AliveCheckServerAction {
for (String contractID : contracts) {
MasterServerRecoverMechAction.unitModeCheck(contractID);
}
NetworkManager.instance.closeAgent(pubKey);
}
} catch (Exception e) {
e.printStackTrace();

View File

@ -5,6 +5,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.ContractClient;
import org.bdware.sc.ContractManager;
import org.bdware.sc.ContractMeta;
import org.bdware.sc.RecoverMechTimeRecorder;
import org.bdware.sc.bean.Contract;
import org.bdware.sc.conn.ByteUtil;
@ -18,6 +19,8 @@ import org.bdware.server.GlobalConf;
import org.bdware.server.action.Action;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.RequestToMaster;
import org.bdware.server.executor.consistency.PBFTExecutor;
import org.bdware.server.executor.consistency.RequestAllExecutor;
import org.bdware.server.tcp.TCPClientFrameHandler;
import org.bdware.server.trustedmodel.AgentManager;
import org.bdware.server.trustedmodel.ContractUnitStatus;
@ -324,6 +327,7 @@ public class MasterClientRecoverMechAction {
}
LOGGER.info("恢复步骤-----------5 检查是否有合约进程");
// cei.printContent();
ContractClient client = CMActions.manager.getClient(cei.getContractID());
@ -338,7 +342,23 @@ public class MasterClientRecoverMechAction {
+ client.getContractType()
+ "\n");
}
ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(cei.getContractID());
meta.setContractExecutor(
MasterClientTCPAction.createContractExecutor(meta.contract, contractID, cei.getMasterNode(), cei.getMembers()));
switch (meta.contract.getType()) {
case RequestAllResponseFirst:
case RequestAllResponseHalf:
case RequestAllResponseAll:
((RequestAllExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1);
break;
case PBFT:
((PBFTExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1);
break;
case Sharding:
break;
default:
break;
}
// 认为contractID不会重
if (client != null
&& client.isProcessAlive()
@ -599,7 +619,23 @@ public class MasterClientRecoverMechAction {
cei.getContractID()));
}
CMActions.manager.multiContractRecorder.updateValue(cei);
ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID);
switch (meta.contract.getType()) {
case RequestAllResponseFirst:
case RequestAllResponseHalf:
case RequestAllResponseAll:
((RequestAllExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1);
break;
case PBFT:
((PBFTExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1);
break;
case Sharding:
break;
default:
break;
}
boolean flag = checkAndRestart(cei); // if need,restart the contract process
if (!flag) {
return;
}
@ -638,6 +674,8 @@ public class MasterClientRecoverMechAction {
cei.setLastExeSeq(lastExeSeq); // 通过数据库中值设置lastExeSeq和本地记录的ContractRecord是同步的
LOGGER.info("本地节点恢复完成之后 lastExeSeq = " + cei.getLastExeSeq());
}
// String m1 = CMActions.manager.dumpContract(contractID, "");
// System.out.println("从本地transRecods恢复之后状态为 \n" + m1);

View File

@ -9,17 +9,21 @@ import org.bdware.sc.*;
import org.bdware.sc.bean.Contract;
import org.bdware.sc.bean.ContractExecType;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.ByteUtil;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.db.CMTables;
import org.bdware.sc.db.KeyValueDBUtil;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.PubKeyNode;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.GlobalConf;
import org.bdware.server.action.*;
import org.bdware.server.executor.RequestAllExecutor;
import org.bdware.server.executor.consistency.PBFTExecutor;
import org.bdware.server.executor.consistency.RequestAllExecutor;
import org.bdware.server.executor.unconsistency.MultiPointCooperationExecutor;
import org.bdware.server.executor.unconsistency.RequestOnceExecutor;
import org.bdware.server.executor.unconsistency.ResponseOnceExecutor;
import org.bdware.server.tcp.PubkeyResultCallback;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.KillUnitContractInfo;
import org.bdware.server.trustedmodel.SelfAdaptiveShardingExecutor;
@ -82,7 +86,7 @@ public class MasterClientTCPAction {
LOGGER.info("认为合约 " + contractID + " 的master崩溃 当前master为null 向NC发送重选信息");
}
public static ContractExecutor createContractExecutor(Contract contract, String contractID) {
public static ContractExecutor createContractExecutor(Contract contract, String contractID, String masterPubkey, String[] members) {
ContractExecutor executor = null;
int nodeSize = contract.getNumOfCopies();
switch (contract.getType()) {
@ -118,6 +122,9 @@ public class MasterClientTCPAction {
case SelfAdaptiveSharding:
executor = new SelfAdaptiveShardingExecutor(contractID);
break;
case PBFT:
executor = new PBFTExecutor(nodeSize, contractID, masterPubkey, members);
break;
}
return executor;
}
@ -253,7 +260,8 @@ public class MasterClientTCPAction {
LOGGER.info("启动结果为 " + ret);
CMActions.manager.multiContractRecorder.updateValue(cei);
ContractMeta meta = CMActions.manager.statusRecorder.createIfNotExist(contractID);
meta.setContractExecutor(createContractExecutor(contract, contractID)); // 分配不同的Executor
meta.setContractExecutor(createContractExecutor(contract, contractID, jo.get("master").getAsString(), cei.getMembers())); // 分配不同的Executor
// TODO 合约终止后从数据库中移除但是为了测试可以人为制造合约终止但不从数据库中移除异常停止
KeyValueDBUtil.instance.setValue(CMTables.UnitContracts.toString(), contractID, "exist");
@ -579,6 +587,20 @@ public class MasterClientTCPAction {
jsonObject.get("responseID").getAsString(), jsonObject.get("data").getAsString());
}
@Action(async = true)
public void contractSyncMessage(JsonObject jsonObject, ResultCallback resultCallback) {
String contractID = jsonObject.get("contractID").getAsString();
MultiContractMeta mcm = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID);
byte[] data = ByteUtil.decodeBASE64(jsonObject.get("data").getAsString());
PubKeyNode node = new PubKeyNode();
if (resultCallback instanceof PubkeyResultCallback) {
PubkeyResultCallback p = (PubkeyResultCallback) resultCallback;
node.pubkey = p.getPubkey();
}
meta.contractExecutor.onSyncMessage(node, data);
}
@Action(async = true)
public void reRouteContract(JsonObject jo, ResultCallback result) {
LOGGER.info("Receive Reroute Info:" + jo.toString());

View File

@ -16,7 +16,8 @@ import org.bdware.sc.util.JsonUtil;
import org.bdware.server.GlobalConf;
import org.bdware.server.action.Action;
import org.bdware.server.action.CMActions;
import org.bdware.server.executor.RequestAllExecutor;
import org.bdware.server.executor.consistency.PBFTExecutor;
import org.bdware.server.executor.consistency.RequestAllExecutor;
import org.bdware.server.trustedmodel.ContractUnitStatus;
import org.bdware.units.NetworkManager;
import org.zz.gmhelper.SM2KeyPair;
@ -294,14 +295,18 @@ public class MasterServerRecoverMechAction {
ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID);
meta.setContractExecutor(
MasterClientTCPAction.createContractExecutor(meta.contract, contractID));
MasterClientTCPAction.createContractExecutor(meta.contract, contractID, cei.getMasterNode(), cei.getMembers()));
switch (meta.contract.getType()) {
case RequestAllResponseFirst:
case RequestAllResponseHalf:
case RequestAllResponseAll:
case Sharding:
((RequestAllExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1);
break;
case PBFT:
((PBFTExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1);
break;
case Sharding:
break;
default:
break;
}

View File

@ -18,7 +18,7 @@ import org.bdware.server.CongestionControl;
import org.bdware.server.action.Action;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.SyncResult;
import org.bdware.server.executor.RequestAllExecutor;
import org.bdware.server.executor.consistency.RequestAllExecutor;
import org.bdware.server.trustedmodel.KillUnitContractResultCollector;
import org.bdware.server.trustedmodel.ResultCollector;
import org.bdware.units.NetworkManager;

View File

@ -0,0 +1,371 @@
package org.bdware.server.executor.consistency;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.ComponedContractResult;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.Node;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.sequencing.*;
import org.bdware.sc.units.*;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.GlobalConf;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.trustedmodel.ContractCluster;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.MultiReqSeq;
import org.bdware.server.trustedmodel.ResultCollector;
import org.bdware.units.NetworkManager;
import org.zz.gmhelper.SM2KeyPair;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
//TODO 追赶差下的调用
public class PBFTExecutor implements ContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(PBFTExecutor.class);
final Object lock = new Object();
private final List<PubKeyNode> members;
int resultCount;
AtomicInteger request_index = new AtomicInteger(0);
// key为requestIDvalue为其seq
Map<String, MultiReqSeq> seqMap = new ConcurrentHashMap<>();
Map<String, ResultCache> resultCache = new ConcurrentHashMap<>();
// MultiPointContractInfo info;
String contractID;
PBFTAlgorithm pbft;
ContractCluster contractCluster;
boolean isMaster;
public PBFTExecutor(
int c, String con_id, final String masterPubkey, String[] members) {
resultCount = c;
contractID = con_id;
this.members = new ArrayList<>();
isMaster = GlobalConf.getNodeID().equals(masterPubkey);
pbft = new PBFTAlgorithm(isMaster);
int count = 0;
for (String mem : members) {
PubKeyNode pubkeyNode = new PubKeyNode();
pubkeyNode.pubkey = mem;
PBFTMember pbftMember = new PBFTMember();
pbftMember.isMaster = mem.equals(masterPubkey);
pbft.addMember(pubkeyNode, pbftMember);
this.members.add(pubkeyNode);
if (GlobalConf.getNodeID().equals(mem)) {
pbft.setSendID(count);
}
count++;
}
contractCluster = new ContractCluster(contractID, this.members);
pbft.setConnection(contractCluster);
final MultiContractMeta cei = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
pbft.setCommitter(new Committer() {
@Override
public void onCommit(ContractRequest data) {
ResultCallback ret = null;
final long startTime = System.currentTimeMillis();
ret = new ResultCallback() {
@Override
public void onResult(String str) {
Map<String, String> ret = new HashMap<>();
ret.put("action", "receiveTrustfullyResult");
SM2KeyPair keyPair = GlobalConf.instance.keyPair;
ret.put("nodeID", keyPair.getPublicKeyStr());
ret.put("responseID", data.getRequestID());
ret.put("executeTime", (System.currentTimeMillis() - startTime) + "");
ret.put("data", str);
cei.setLastExeSeq(data.seq);
NetworkManager.instance.sendToAgent(masterPubkey, JsonUtil.toJson(ret));
}
};
CMActions.manager.executeLocallyAsync(data, ret, null);
}
});
}
public void onSyncMessage(Node node, byte[] data) {
pbft.onMessage(node, data);
}
public void setSeq(int seq) {
request_index = new AtomicInteger(seq);
pbft.setAtomSeq(request_index.get());
}
public ResultCallback createResultCallback(
final String requestID,
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID) {
ComponedContractResult componedContractResult = new ComponedContractResult(count);
// TODO 加对应的超时
return new ResultCollector(
requestID, new ResultMerger(originalCb, count, request_seq, contractID), count);
}
public void sendRequest(String id, ContractRequest req, ResultCallback collector) {
// Map<String, Object> reqStr = new HashMap<>();
// reqStr.put("uniReqID", id);
// reqStr.put("data", req);
// reqStr.put("action", "executeContractLocally");
ContractRequest cr2 = ContractRequest.parse(req.toByte());
cr2.setRequestID(id);
PBFTMessage request = new PBFTMessage();
request.setOrder(req.seq);
request.setType(PBFTType.Request);
request.setContent(cr2.toByte());
for (PubKeyNode node : members) {
if (!NetworkManager.instance.hasAgentConnection(node.pubkey)) {
LOGGER.warn("cmNode " + node.pubkey.substring(0, 5) + " is null");
collector.onResult(
"{\"status\":\"Error\",\"result\":\"node offline\","
+ "\"nodeID\":\""
+ node
+ "\","
+ "\"action\":\"onExecuteContractTrustfully\"}");
// } else if (MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID)
// != RecoverFlag.Fine) {
// collector.onResult(
// "{\"status\":\"Error\",\"result\":\"node recovering\","
// + "\"nodeID\":\""
// + node
// + "\","
// + "\"action\":\"onExecuteContractTrustfully\"}");
// contractCluster.sendMessage(node, request.getBytes());
} else {
contractCluster.sendMessage(node, request.getBytes());
}
}
// master负责缓存请求
if (!MasterServerTCPAction.requestCache.containsKey(contractID)) {
MasterServerTCPAction.requestCache.put(contractID, new RequestCache());
}
// TODO 多调多统一个seq的有多个请求这个需要改
String[] nodes =
CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers();
LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID());
LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes));
}
public boolean checkCurNodeNumValid() {
return true;
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
LOGGER.debug(JsonUtil.toJson(req));
MultiContractMeta meta = CMActions.manager.multiContractRecorder.getMultiContractMeta(req.getContractID());
if (meta == null || !meta.isMaster()) {
CMActions.manager.executeContractOnOtherNodes(req, rc);
return;
}
req.setContractID(CMActions.manager.getContractIDByName(req.getContractID()));
// 三个相同requestID进来的时候会有冲突
// 仅在此处有冲突么
// 这里是从MasterServer->MasterClient请求的是"executeContractLocally"
// 如果是多点合约的请求A1A2A3的序号应该一致不能分配一个新的seq根据requestID判断是否不需要重新分配一个序号
//TODO seqMap memory leak
//TODO
//TODO
if (null != requestID && requestID.endsWith("_mul")) {
synchronized (lock) {
if (seqMap.containsKey(requestID)) {
req.seq = seqMap.get(requestID).seq;
} else {
req.seq = request_index.getAndIncrement();
seqMap.put(requestID, new MultiReqSeq(req.seq));
}
}
} else {
req.seq = request_index.getAndIncrement();
}
req.needSeq = true;
String id =
System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq;
LOGGER.info("execute receive requestID=" + requestID + " msgID=" + id);
if (checkCurNodeNumValid()) {
LOGGER.debug("checkCurNodeNumValid=true");
ResultCallback collector =
createResultCallback(id, rc, resultCount, req.seq, req.getContractID());
MasterServerTCPAction.sync.sleep(id, collector);
LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq);
sendRequest(id, req, collector);
} else {
LOGGER.debug("invalidNodeNumOnResult");
request_index.getAndDecrement();
ContractResult finalResult =
new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("node number unavailable, request refused."));
rc.onResult(JsonUtil.toJson(finalResult));
}
// }
/* // 三个相同requestID进来的时候会有冲突
// 仅在此处有冲突么
// 这里是从MasterServer->MasterClient请求的是"executeContractLocally"
req.seq = request_index.getAndIncrement();
req.needSeq = true;
ResultCallback collector = createResultCallback(id, rc, resultCount, req.getContractID());
MasterServerTCPAction.sync.sleep(id, collector);
sendRequest(id, req, collector);*/
}
// 清理缓存的多点合约请求序号
public void clearCache() {
final long time = System.currentTimeMillis() - 30000L;
seqMap.entrySet()
.removeIf(
entry -> {
MultiReqSeq cache = entry.getValue();
if (null == cache) {
return true;
}
return cache.startTime < time;
});
}
public static class ResultMerger extends ResultCallback {
ComponedContractResult componedContractResult;
AtomicInteger order;
String contractID;
int count;
int request_seq;
ResultCallback originalCallback;
Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点
ResultMerger(
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID) {
originalCallback = originalCb;
this.count = count;
this.request_seq = request_seq;
this.contractID = contractID;
componedContractResult = new ComponedContractResult(count);
order = new AtomicInteger(0);
}
public String getContractID() {
return contractID;
}
public String getInfo() {
return "contractID="
+ contractID
+ " 收到第 "
+ order
+ " 个节点回复 : "
+ " order="
+ order
+ " count="
+ count
+ " ";
}
@Override
public void onResult(String str) {
// TODO 必须在这里聚合
// str的data是个ContractResult
// 在这儿也是返回个ContractResult
try {
LOGGER.debug("a result of contract" + contractID + ": " + str);
JsonObject obj = JsonParser.parseString(str).getAsJsonObject();
if (obj.has("nodeID")) {
String id = obj.get("nodeID").getAsString();
if (nodeIDs.contains(id)) {
LOGGER.debug(
"ignored result because the result of node "
+ id.substring(0, 5)
+ " has been received");
return;
}
nodeIDs.add(id);
}
LOGGER.debug(
String.format(
"contractID=%s received=%s order=%d count=%d",
contractID, str, order.get(), count));
componedContractResult.add(obj);
// 收集到所有结果
if (order.incrementAndGet() == count) {
ContractResult finalResult = componedContractResult.figureFinalResult();
finalResult.needSeq = true;
finalResult.seq = request_seq;
// if (null == finalResult) {
// finalResult =
// new ContractResult(
// ContractResult.Status.Exception,
// new JsonPrimitive(
// "no nore than half of the
// consistent result"));
// originalCallback.onResult(new
// Gson().toJson(finalResult));
// } else {
originalCallback.onResult(JsonUtil.toJson(finalResult));
// }
LOGGER.debug(
String.format(
"%d results are the same: %s",
finalResult.size, finalResult.result));
// 集群中事务序号+1
CMActions.manager.multiContractRecorder
.getMultiContractMeta(contractID)
.nextSeqAtMaster();
// recover其中无状态合约CP出错无需恢复
Set<String> nodesID = componedContractResult.getProblemNodes();
if (null == nodesID || nodesID.isEmpty()) {
return;
}
for (String nodeID : nodesID) {
LOGGER.warn("node fails! " + nodeID);
if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID)
== RecoverFlag.Fine) {
MasterServerRecoverMechAction.recoverStatus
.get(nodeID)
.put(contractID, RecoverFlag.ToRecover);
}
}
for (String nodeID : nodesID) {
if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID)
== RecoverFlag.ToRecover) {
LOGGER.warn("node in recover " + nodeID);
// 因为该节点结果有误所以即时是stableMode也认为trans记录不可信
// 直接通过load别的节点来恢复
MasterServerRecoverMechAction.restartContractFromCommonMode(
nodeID, contractID);
}
}
}
// clearCache();
} catch (Exception e) {
e.printStackTrace();
LOGGER.warn("result exception!");
}
}
}
}

View File

@ -1,4 +1,4 @@
package org.bdware.server.executor;
package org.bdware.server.executor.consistency;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

View File

@ -0,0 +1,22 @@
package org.bdware.server.tcp;
import org.bdware.sc.conn.ResultCallback;
public class PubkeyResultCallback extends ResultCallback {
String pubkey;
ResultCallback rc;
PubkeyResultCallback(String pubkey, ResultCallback rc) {
this.pubkey = pubkey;
this.rc = rc;
}
public String getPubkey() {
return pubkey;
}
@Override
public void onResult(String str) {
rc.onResult(str);
}
}

View File

@ -72,6 +72,7 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler<Object> {
public void close() {
try {
aliveCheckClientAction.closeMaster();
if (channel != null) channel.close();
} catch (Exception e) {
e.printStackTrace();
@ -111,17 +112,18 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler<Object> {
}
Response response;
try {
LOGGER.info("====== TCPClientFrameHandler:" + arg.toString());
//LOGGER.info("====== TCPClientFrameHandler:" + arg.toString());
final String action = arg.get("action").getAsString();
PubkeyResultCallback pc = new PubkeyResultCallback(master, new ResultCallback() {
@Override
public void onResult(String ret) {
sendMsg(ret);
}
});
ae.handle(
action,
arg,
new ResultCallback() {
@Override
public void onResult(String ret) {
sendMsg(ret);
}
});
arg, pc
);
} catch (IllegalArgumentException e) {
response = new Response();
response.action = "onException";

View File

@ -25,14 +25,17 @@ import java.util.concurrent.Executors;
public class TCPServerFrameHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger LOGGER = LogManager.getLogger(TCPServerFrameHandler.class);
static ExecutorService executorService = Executors.newFixedThreadPool(10);
private final AliveCheckServerAction checkAction;
public ChannelHandlerContext ctx;
public ActionExecutor<ResultCallback, JsonObject> ae;
public TCPServerFrameHandler() {
checkAction = new AliveCheckServerAction(this);
ae =
new ActionExecutor<ResultCallback, JsonObject>(
executorService, new AliveCheckServerAction(this),
executorService, checkAction,
new MasterClientTCPAction(), new MasterClientRecoverMechAction(), MasterClientTransferAction.instance,
new MasterServerRecoverMechAction(), new MasterServerTransferAction(), new MasterServerTCPAction() ) {
new MasterServerRecoverMechAction(), new MasterServerTransferAction(), new MasterServerTCPAction()) {
@Override
public boolean checkPermission(
Action a, final JsonObject args, long permission) {
@ -108,15 +111,17 @@ public class TCPServerFrameHandler extends SimpleChannelInboundHandler<Object> {
try {
final String action = arg.get("action").getAsString();
PubkeyResultCallback pubkeyResultCallback = new PubkeyResultCallback(checkAction.pubKey, new ResultCallback() {
@Override
public void onResult(String ret) {
sendMsg(ret);
}
});
ae.handle(
action,
arg,
new ResultCallback() {
@Override
public void onResult(String ret) {
sendMsg(ret);
}
});
arg, pubkeyResultCallback
);
} catch (IllegalArgumentException e) {
response = new Response();
@ -161,6 +166,10 @@ public class TCPServerFrameHandler extends SimpleChannelInboundHandler<Object> {
}
}
public void close() {
checkAction.close();
}
static class Response {
public String cid;

View File

@ -0,0 +1,35 @@
package org.bdware.server.trustedmodel;
import com.google.gson.JsonObject;
import org.bdware.sc.conn.ByteUtil;
import org.bdware.sc.units.PubKeyNode;
import org.bdware.sc.units.TrustfulExecutorConnection;
import org.bdware.units.NetworkManager;
import java.util.ArrayList;
import java.util.List;
public class ContractCluster implements TrustfulExecutorConnection<PubKeyNode> {
private final List<PubKeyNode> members;
String contractID;
public ContractCluster(String contractID, List<PubKeyNode> members) {
this.members = new ArrayList<>();
this.members.addAll(members);
this.contractID = contractID;
}
@Override
public void sendMessage(PubKeyNode node, byte[] msg) {
JsonObject jo = new JsonObject();
jo.addProperty("action", "contractSyncMessage");
jo.addProperty("contractID", contractID);
jo.addProperty("data", ByteUtil.encodeBASE64(msg));
NetworkManager.instance.sendToAgent(node.pubkey, jo.toString());
}
@Override
public List<PubKeyNode> getNodes() {
return members;
}
}

View File

@ -189,21 +189,20 @@ public class NetworkManager {
}
public void registerConnection(String nodeID, TCPServerFrameHandler handler) {
LOGGER.info("nodeID:"+nodeID+" connected!!");
LOGGER.info("nodeID:" + nodeID + " connected!!");
SERVERCONNECTORS.put(nodeID, handler);
}
public void closeAgent(String agentPubkey) {
//TODO
// if (handler != null) {
// try {
// handler.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
//AliveCheckAction.closeMaster();
NetworkManager.CONNECTORS.remove(agentPubkey);
if (NetworkManager.SERVERCONNECTORS.containsKey(agentPubkey)) {
NetworkManager.SERVERCONNECTORS.get(agentPubkey).close();
NetworkManager.SERVERCONNECTORS.remove(agentPubkey);
}
if (NetworkManager.CONNECTORS.containsKey(agentPubkey)) {
NetworkManager.CONNECTORS.get(agentPubkey).handler.close();
NetworkManager.CONNECTORS.remove(agentPubkey);
}
}
public void connectToAgent(String master, String contractID) {
@ -260,13 +259,17 @@ public class NetworkManager {
}
public void sendToAgent(String pubkey, String content) {
if (sendToAgentByServer(pubkey, content)) {
return;
try {
if (sendToAgentByServer(pubkey, content)) {
return;
}
if (!hasAgentConnection(pubkey)) {
nodeCenterClientHandler.getController().connectToNode(pubkey);
}
CONNECTORS.get(pubkey).handler.sendMsg(content);
} catch (Exception e) {
e.printStackTrace();
}
if (!hasAgentConnection(pubkey)) {
nodeCenterClientHandler.getController().connectToNode(pubkey);
}
CONNECTORS.get(pubkey).handler.sendMsg(content);
}
private boolean sendToAgentByServer(String pubkey, String content) {