refactor: sdk for consensus algorithm

This commit is contained in:
汪旭鑫 2022-02-15 14:44:15 +08:00
parent fc7512f50f
commit e2df294d65
14 changed files with 1426 additions and 1397 deletions

View File

@ -34,6 +34,7 @@ dependencies {
implementation project(":cm")
implementation project(":mockjava")
implementation project(":front-base")
implementation project(":consistency-sdk")
implementation 'io.prometheus:simpleclient_httpserver:0.12.0'
implementation 'org.knowhowlab.osgi:sigar:1.6.5_01'
implementation 'io.grpc:grpc-all:1.43.1'

View File

@ -26,6 +26,7 @@ import org.bdware.sc.db.CMTables;
import org.bdware.sc.db.KeyValueDBUtil;
import org.bdware.sc.db.MultiIndexTimeRocksDBUtil;
import org.bdware.sc.util.ExceptionUtil;
import org.bdware.sdk.consistency.ConsistencyPluginManager;
import org.bdware.server.action.FileActions;
import org.bdware.server.doip.ContractRepositoryMain;
import org.bdware.server.http.CMHttpHandler;
@ -251,6 +252,7 @@ public class CMHttpServer {
* port: http & websocket port port+1: tcp port port+2: doip port port+3: prometheus
*/
private void start() {
ConsistencyPluginManager.setContext(new SDKContext());
// EpollEventLoopGroup
// EpollServerSocketChannel
// ContractManager.reconnectPort = (port - 18000) * 30 + 1630;

View File

@ -0,0 +1,113 @@
package org.bdware.server;
import org.bdware.sc.ContractManager;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.RecoverFlag;
import org.bdware.sc.units.RequestCache;
import org.bdware.sdk.consistency.api.context.*;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.SyncResult;
import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.trustedmodel.ResultCollector;
import org.bdware.units.NetworkManager;
import org.zz.gmhelper.SM2KeyPair;
import java.util.Map;
public class SDKContext implements ISDKContext {
IMasterServerTCPAction masterServerTCPActionProxy = new MasterServerTCPActionProxy();
INetworkManager networkManagerProxy = new NetworkManagerProxy();
ICMActions cmActionsProxy = new CMActionsProxy();
IMasterServerRecoverMechAction masterServerRecoverMechActionProxy = new MasterServerRecoverMechActionProxy();
IGlobalConf globalConfProxy = new GlobalProxy();
@Override
public IMasterServerTCPAction getMasterServerTCPAction() {
return masterServerTCPActionProxy;
}
@Override
public INetworkManager getNetworkManager() {
return networkManagerProxy;
}
@Override
public ICMActions getCMActions() {
return cmActionsProxy;
}
@Override
public IMasterServerRecoverMechAction getMasterServerRecoverMechAction() {
return masterServerRecoverMechActionProxy;
}
@Override
public IGlobalConf getGlobalConf() {
return globalConfProxy;
}
public static class MasterServerTCPActionProxy implements IMasterServerTCPAction {
@Override
public SyncResult getSync() {
return MasterServerTCPAction.sync;
}
@Override
public Map<String, RequestCache> getReqCache() {
return MasterServerTCPAction.requestCache;
}
}
public static class NetworkManagerProxy implements INetworkManager {
@Override
public void sendToAgent(String pubkey, String content) {
NetworkManager.instance.sendToAgent(pubkey, content);
}
@Override
public boolean hasAgentConnection(String pubkey) {
return NetworkManager.instance.hasAgentConnection(pubkey);
}
@Override
public ResultCallback createResultCallback(String requestID, ResultCallback rc, int count) {
return new ResultCollector(requestID, rc, count);
}
}
public static class CMActionsProxy implements ICMActions {
@Override
public ContractManager getManager() {
return CMActions.manager;
}
}
public static class MasterServerRecoverMechActionProxy implements IMasterServerRecoverMechAction {
@Override
public Map<String, Map<String, RecoverFlag>> getRecoverStatusMap() {
return MasterServerRecoverMechAction.recoverStatus;
}
@Override
public void restartContractFromCommonMode(String nodeID, String contractID) {
MasterServerRecoverMechAction.restartContractFromCommonMode(nodeID, contractID);
}
}
public static class GlobalProxy implements IGlobalConf {
@Override
public String getNodeID() {
return GlobalConf.getNodeID();
}
@Override
public SM2KeyPair getKeyPair() {
return GlobalConf.instance.keyPair;
}
}
}

View File

@ -19,8 +19,6 @@ import org.bdware.server.GlobalConf;
import org.bdware.server.action.Action;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.RequestToMaster;
import org.bdware.server.executor.consistency.PBFTExecutor;
import org.bdware.server.executor.consistency.RequestAllExecutor;
import org.bdware.server.tcp.TCPClientFrameHandler;
import org.bdware.server.trustedmodel.AgentManager;
import org.bdware.server.trustedmodel.ContractUnitStatus;
@ -344,20 +342,9 @@ public class MasterClientRecoverMechAction {
ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(cei.getContractID());
meta.setContractExecutor(
MasterClientTCPAction.createContractExecutor(meta.contract, contractID, cei.getMasterNode(), cei.getMembers()));
switch (meta.contract.getType()) {
case RequestAllResponseFirst:
case RequestAllResponseHalf:
case RequestAllResponseAll:
((RequestAllExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1);
break;
case PBFT:
((PBFTExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1);
break;
case Sharding:
break;
default:
break;
}
Map<String, Object> args = new HashMap<>();
args.put("ceiLastExeSeq", cei.getLastExeSeq());
meta.contractExecutor.onRecover(args);
// 认为contractID不会重
if (client != null
&& client.isProcessAlive()
@ -619,20 +606,9 @@ public class MasterClientRecoverMechAction {
}
CMActions.manager.multiContractRecorder.updateValue(cei);
ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID);
switch (meta.contract.getType()) {
case RequestAllResponseFirst:
case RequestAllResponseHalf:
case RequestAllResponseAll:
((RequestAllExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1);
break;
case PBFT:
((PBFTExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1);
break;
case Sharding:
break;
default:
break;
}
Map<String, Object> args = new HashMap<>();
args.put("ceiLastExeSeq", cei.getLastExeSeq());
meta.contractExecutor.onRecover(args);
boolean flag = checkAndRestart(cei); // if need,restart the contract process
if (!flag) {

View File

@ -15,18 +15,12 @@ import org.bdware.sc.db.KeyValueDBUtil;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.PubKeyNode;
import org.bdware.sc.util.JsonUtil;
import org.bdware.sdk.consistency.ConsistencyPluginManager;
import org.bdware.server.GlobalConf;
import org.bdware.server.action.*;
import org.bdware.server.executor.consistency.PBFTExecutor;
import org.bdware.server.executor.consistency.RequestAllExecutor;
import org.bdware.server.executor.unconsistency.MultiPointCooperationExecutor;
import org.bdware.server.executor.unconsistency.RequestOnceExecutor;
import org.bdware.server.executor.unconsistency.ResponseOnceExecutor;
import org.bdware.server.tcp.PubkeyResultCallback;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.KillUnitContractInfo;
import org.bdware.server.trustedmodel.SelfAdaptiveShardingExecutor;
import org.bdware.server.trustedmodel.SingleNodeExecutor;
import org.bdware.units.NetworkManager;
import org.bdware.units.function.ExecutionManager;
import org.zz.gmhelper.SM2KeyPair;
@ -85,46 +79,12 @@ public class MasterClientTCPAction {
}
public static ContractExecutor createContractExecutor(Contract contract, String contractID, String masterPubkey, String[] members) {
ContractExecutor executor = null;
int nodeSize = contract.getNumOfCopies();
switch (contract.getType()) {
case Sole:
LOGGER.info("Sole contract is not supported in multi-point mode");
return SingleNodeExecutor.instance;
case RequestOnce:
executor = new RequestOnceExecutor(contractID);
break;
case ResponseOnce:
executor = new ResponseOnceExecutor(contractID);
break;
case RequestAllResponseFirst:
executor =
new RequestAllExecutor(
ContractExecType.RequestAllResponseFirst, 1, contractID);
break;
case RequestAllResponseHalf:
executor =
new RequestAllExecutor(
ContractExecType.RequestAllResponseHalf,
nodeSize / 2 + 1,
contractID);
break;
case RequestAllResponseAll:
executor =
new RequestAllExecutor(
ContractExecType.RequestAllResponseAll, nodeSize, contractID);
break;
case Sharding:
executor = new MultiPointCooperationExecutor(ContractExecType.Sharding, nodeSize, contractID);
break;
case SelfAdaptiveSharding:
executor = new SelfAdaptiveShardingExecutor(contractID);
break;
case PBFT:
executor = new PBFTExecutor(nodeSize, contractID, masterPubkey, members);
break;
}
return executor;
Map<String, Object> args = new HashMap<>();
args.put("contractID", contractID);
args.put("nodeSize", contract.getNumOfCopies());
args.put("masterPubkey", masterPubkey);
args.put("members", members);
return ConsistencyPluginManager.getInstance().createContractExecutor(contract.getType(), args);
}
public static void dealRequests(String contractID) {
@ -460,8 +420,8 @@ public class MasterClientTCPAction {
if (jo.has("contractID") && jo.has("data")) {
String contractID = jo.get("contractID").getAsString();
ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID);
if (null != meta && meta.contractExecutor instanceof SelfAdaptiveShardingExecutor) {
((SelfAdaptiveShardingExecutor) meta.contractExecutor).execute(jo.get("data").getAsString());
if (null != meta && meta.contractExecutor != null) {
meta.contractExecutor.onDeliverBlock(jo.get("data").getAsString());
}
}
}

View File

@ -16,8 +16,6 @@ import org.bdware.sc.util.JsonUtil;
import org.bdware.server.GlobalConf;
import org.bdware.server.action.Action;
import org.bdware.server.action.CMActions;
import org.bdware.server.executor.consistency.PBFTExecutor;
import org.bdware.server.executor.consistency.RequestAllExecutor;
import org.bdware.server.trustedmodel.ContractUnitStatus;
import org.bdware.units.NetworkManager;
import org.zz.gmhelper.SM2KeyPair;
@ -296,20 +294,9 @@ public class MasterServerRecoverMechAction {
meta.setContractExecutor(
MasterClientTCPAction.createContractExecutor(meta.contract, contractID, cei.getMasterNode(), cei.getMembers()));
switch (meta.contract.getType()) {
case RequestAllResponseFirst:
case RequestAllResponseHalf:
case RequestAllResponseAll:
((RequestAllExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1);
break;
case PBFT:
((PBFTExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1);
break;
case Sharding:
break;
default:
break;
}
Map<String, Object> args = new HashMap<>();
args.put("ceiLastExeSeq", cei.getLastExeSeq());
meta.contractExecutor.onRecover(args);
CMActions.manager.multiContractRecorder.updateValue(cei);
LOGGER.info("[master recover] contractID2Members put 合约 " + contractID);
LOGGER.info("新master恢复 MultiPointContractInfo 完成!");

View File

@ -14,6 +14,7 @@ import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.RequestCache;
import org.bdware.sc.util.JsonUtil;
import org.bdware.sdk.consistency.api.NotifiableResultMerger;
import org.bdware.server.CongestionControl;
import org.bdware.server.action.Action;
import org.bdware.server.action.CMActions;
@ -68,9 +69,9 @@ public class MasterServerTCPAction {
ResultCallback cb = sync.waitObj.get(requestID);
if (cb instanceof ResultCollector) {
ResultCollector rc = (ResultCollector) cb;
if (rc.getCommitter() instanceof RequestAllExecutor.ResultMerger) {
RequestAllExecutor.ResultMerger merger =
(RequestAllExecutor.ResultMerger) rc.getCommitter();
if (rc.getCommitter() instanceof NotifiableResultMerger) {
NotifiableResultMerger merger =
(NotifiableResultMerger) rc.getCommitter();
if (merger.getContractID().equals(contractID)) {
LOGGER.info("node " + nodeID + " offline! in the cluster of contract " + contractID);
LOGGER.debug("nodeID=" + nodeID + " contractID=" + contractID +

View File

@ -1,375 +1,375 @@
package org.bdware.server.executor.consistency;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.ComponedContractResult;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.Node;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.consistency.Committer;
import org.bdware.sc.consistency.pbft.PBFTAlgorithm;
import org.bdware.sc.consistency.pbft.PBFTMember;
import org.bdware.sc.consistency.pbft.PBFTMessage;
import org.bdware.sc.consistency.pbft.PBFTType;
import org.bdware.sc.units.*;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.GlobalConf;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.trustedmodel.ContractCluster;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.MultiReqSeq;
import org.bdware.server.trustedmodel.ResultCollector;
import org.bdware.units.NetworkManager;
import org.zz.gmhelper.SM2KeyPair;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
//TODO 追赶差下的调用
public class PBFTExecutor implements ContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(PBFTExecutor.class);
final Object lock = new Object();
private final List<PubKeyNode> members;
int resultCount;
AtomicInteger request_index = new AtomicInteger(0);
// key为requestIDvalue为其seq
Map<String, MultiReqSeq> seqMap = new ConcurrentHashMap<>();
Map<String, ResultCache> resultCache = new ConcurrentHashMap<>();
// MultiPointContractInfo info;
String contractID;
PBFTAlgorithm pbft;
ContractCluster contractCluster;
boolean isMaster;
public PBFTExecutor(
int c, String con_id, final String masterPubkey, String[] members) {
resultCount = c;
contractID = con_id;
this.members = new ArrayList<>();
isMaster = GlobalConf.getNodeID().equals(masterPubkey);
pbft = new PBFTAlgorithm(isMaster);
int count = 0;
for (String mem : members) {
PubKeyNode pubkeyNode = new PubKeyNode();
pubkeyNode.pubkey = mem;
PBFTMember pbftMember = new PBFTMember();
pbftMember.isMaster = mem.equals(masterPubkey);
pbft.addMember(pubkeyNode, pbftMember);
this.members.add(pubkeyNode);
if (GlobalConf.getNodeID().equals(mem)) {
pbft.setSendID(count);
}
count++;
}
contractCluster = new ContractCluster(contractID, this.members);
pbft.setConnection(contractCluster);
final MultiContractMeta cei = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
pbft.setCommitter(new Committer() {
@Override
public void onCommit(ContractRequest data) {
ResultCallback ret = null;
final long startTime = System.currentTimeMillis();
ret = new ResultCallback() {
@Override
public void onResult(String str) {
Map<String, String> ret = new HashMap<>();
ret.put("action", "receiveTrustfullyResult");
SM2KeyPair keyPair = GlobalConf.instance.keyPair;
ret.put("nodeID", keyPair.getPublicKeyStr());
ret.put("responseID", data.getRequestID());
ret.put("executeTime", (System.currentTimeMillis() - startTime) + "");
ret.put("data", str);
cei.setLastExeSeq(data.seq);
NetworkManager.instance.sendToAgent(masterPubkey, JsonUtil.toJson(ret));
}
};
CMActions.manager.executeLocallyAsync(data, ret, null);
}
});
}
public void onSyncMessage(Node node, byte[] data) {
pbft.onMessage(node, data);
}
public void setSeq(int seq) {
request_index = new AtomicInteger(seq);
pbft.setAtomSeq(request_index.get());
}
public ResultCallback createResultCallback(
final String requestID,
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID) {
ComponedContractResult componedContractResult = new ComponedContractResult(count);
// TODO 加对应的超时
return new ResultCollector(
requestID, new ResultMerger(originalCb, count, request_seq, contractID), count);
}
public void sendRequest(String id, ContractRequest req, ResultCallback collector) {
// Map<String, Object> reqStr = new HashMap<>();
// reqStr.put("uniReqID", id);
// reqStr.put("data", req);
// reqStr.put("action", "executeContractLocally");
ContractRequest cr2 = ContractRequest.parse(req.toByte());
cr2.setRequestID(id);
PBFTMessage request = new PBFTMessage();
request.setOrder(req.seq);
request.setType(PBFTType.Request);
request.setContent(cr2.toByte());
for (PubKeyNode node : members) {
if (!NetworkManager.instance.hasAgentConnection(node.pubkey)) {
LOGGER.warn("cmNode " + node.pubkey.substring(0, 5) + " is null");
collector.onResult(
"{\"status\":\"Error\",\"result\":\"node offline\","
+ "\"nodeID\":\""
+ node
+ "\","
+ "\"action\":\"onExecuteContractTrustfully\"}");
// } else if (MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID)
// != RecoverFlag.Fine) {
//package org.bdware.server.executor.consistency;
//
//import com.google.gson.JsonObject;
//import com.google.gson.JsonParser;
//import com.google.gson.JsonPrimitive;
//import org.apache.logging.log4j.LogManager;
//import org.apache.logging.log4j.Logger;
//import org.bdware.sc.ComponedContractResult;
//import org.bdware.sc.ContractResult;
//import org.bdware.sc.bean.ContractRequest;
//import org.bdware.sc.conn.Node;
//import org.bdware.sc.conn.OnHashCallback;
//import org.bdware.sc.conn.ResultCallback;
//import org.bdware.sc.consistency.Committer;
//import org.bdware.sc.consistency.pbft.PBFTAlgorithm;
//import org.bdware.sc.consistency.pbft.PBFTMember;
//import org.bdware.sc.consistency.pbft.PBFTMessage;
//import org.bdware.sc.consistency.pbft.PBFTType;
//import org.bdware.sc.units.*;
//import org.bdware.sc.util.JsonUtil;
//import org.bdware.server.GlobalConf;
//import org.bdware.server.action.CMActions;
//import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
//import org.bdware.server.action.p2p.MasterServerTCPAction;
//import org.bdware.server.trustedmodel.ContractCluster;
//import org.bdware.server.trustedmodel.ContractExecutor;
//import org.bdware.server.trustedmodel.MultiReqSeq;
//import org.bdware.server.trustedmodel.ResultCollector;
//import org.bdware.units.NetworkManager;
//import org.zz.gmhelper.SM2KeyPair;
//
//import java.util.*;
//import java.util.concurrent.ConcurrentHashMap;
//import java.util.concurrent.atomic.AtomicInteger;
//
////TODO 追赶差下的调用
//public class PBFTExecutor implements ContractExecutor {
// private static final Logger LOGGER = LogManager.getLogger(PBFTExecutor.class);
// final Object lock = new Object();
// private final List<PubKeyNode> members;
// int resultCount;
//
// AtomicInteger request_index = new AtomicInteger(0);
// // key为requestIDvalue为其seq
// Map<String, MultiReqSeq> seqMap = new ConcurrentHashMap<>();
// Map<String, ResultCache> resultCache = new ConcurrentHashMap<>();
// // MultiPointContractInfo info;
// String contractID;
// PBFTAlgorithm pbft;
// ContractCluster contractCluster;
// boolean isMaster;
//
// public PBFTExecutor(
// int c, String con_id, final String masterPubkey, String[] members) {
// resultCount = c;
// contractID = con_id;
// this.members = new ArrayList<>();
// isMaster = GlobalConf.getNodeID().equals(masterPubkey);
// pbft = new PBFTAlgorithm(isMaster);
// int count = 0;
// for (String mem : members) {
// PubKeyNode pubkeyNode = new PubKeyNode();
// pubkeyNode.pubkey = mem;
// PBFTMember pbftMember = new PBFTMember();
// pbftMember.isMaster = mem.equals(masterPubkey);
// pbft.addMember(pubkeyNode, pbftMember);
// this.members.add(pubkeyNode);
// if (GlobalConf.getNodeID().equals(mem)) {
// pbft.setSendID(count);
// }
// count++;
// }
// contractCluster = new ContractCluster(contractID, this.members);
// pbft.setConnection(contractCluster);
// final MultiContractMeta cei = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
// pbft.setCommitter(new Committer() {
// @Override
// public void onCommit(ContractRequest data) {
// ResultCallback ret = null;
// final long startTime = System.currentTimeMillis();
// ret = new ResultCallback() {
// @Override
// public void onResult(String str) {
// Map<String, String> ret = new HashMap<>();
// ret.put("action", "receiveTrustfullyResult");
// SM2KeyPair keyPair = GlobalConf.instance.keyPair;
// ret.put("nodeID", keyPair.getPublicKeyStr());
// ret.put("responseID", data.getRequestID());
// ret.put("executeTime", (System.currentTimeMillis() - startTime) + "");
// ret.put("data", str);
// cei.setLastExeSeq(data.seq);
// NetworkManager.instance.sendToAgent(masterPubkey, JsonUtil.toJson(ret));
// }
// };
// CMActions.manager.executeLocallyAsync(data, ret, null);
// }
// });
// }
//
// public void onSyncMessage(Node node, byte[] data) {
//
// pbft.onMessage(node, data);
// }
//
// public void setSeq(int seq) {
// request_index = new AtomicInteger(seq);
// pbft.setAtomSeq(request_index.get());
// }
//
// public ResultCallback createResultCallback(
// final String requestID,
// final ResultCallback originalCb,
// final int count,
// final int request_seq,
// final String contractID) {
// ComponedContractResult componedContractResult = new ComponedContractResult(count);
// // TODO 加对应的超时
// return new ResultCollector(
// requestID, new ResultMerger(originalCb, count, request_seq, contractID), count);
// }
//
// public void sendRequest(String id, ContractRequest req, ResultCallback collector) {
//// Map<String, Object> reqStr = new HashMap<>();
//// reqStr.put("uniReqID", id);
//// reqStr.put("data", req);
//// reqStr.put("action", "executeContractLocally");
// ContractRequest cr2 = ContractRequest.parse(req.toByte());
// cr2.setRequestID(id);
// PBFTMessage request = new PBFTMessage();
// request.setOrder(req.seq);
// request.setType(PBFTType.Request);
// request.setContent(cr2.toByte());
// for (PubKeyNode node : members) {
// if (!NetworkManager.instance.hasAgentConnection(node.pubkey)) {
// LOGGER.warn("cmNode " + node.pubkey.substring(0, 5) + " is null");
// collector.onResult(
// "{\"status\":\"Error\",\"result\":\"node recovering\","
// "{\"status\":\"Error\",\"result\":\"node offline\","
// + "\"nodeID\":\""
// + node
// + "\","
// + "\"action\":\"onExecuteContractTrustfully\"}");
//// } else if (MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID)
//// != RecoverFlag.Fine) {
//// collector.onResult(
//// "{\"status\":\"Error\",\"result\":\"node recovering\","
//// + "\"nodeID\":\""
//// + node
//// + "\","
//// + "\"action\":\"onExecuteContractTrustfully\"}");
//// contractCluster.sendMessage(node, request.getBytes());
// } else {
// contractCluster.sendMessage(node, request.getBytes());
} else {
contractCluster.sendMessage(node, request.getBytes());
}
}
// master负责缓存请求
if (!MasterServerTCPAction.requestCache.containsKey(contractID)) {
MasterServerTCPAction.requestCache.put(contractID, new RequestCache());
}
// TODO 多调多统一个seq的有多个请求这个需要改
String[] nodes =
CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers();
LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID());
LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes));
}
public boolean checkCurNodeNumValid() {
return true;
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
LOGGER.debug(JsonUtil.toJson(req));
MultiContractMeta meta = CMActions.manager.multiContractRecorder.getMultiContractMeta(req.getContractID());
if (meta == null || !meta.isMaster()) {
CMActions.manager.executeContractOnOtherNodes(req, rc);
return;
}
req.setContractID(CMActions.manager.getContractIDByName(req.getContractID()));
// 三个相同requestID进来的时候会有冲突
// 仅在此处有冲突么
// 这里是从MasterServer->MasterClient请求的是"executeContractLocally"
// 如果是多点合约的请求A1A2A3的序号应该一致不能分配一个新的seq根据requestID判断是否不需要重新分配一个序号
//TODO seqMap memory leak
//TODO
//TODO
if (null != requestID && requestID.endsWith("_mul")) {
synchronized (lock) {
if (seqMap.containsKey(requestID)) {
req.seq = seqMap.get(requestID).seq;
} else {
req.seq = request_index.getAndIncrement();
seqMap.put(requestID, new MultiReqSeq(req.seq));
}
}
} else {
req.seq = request_index.getAndIncrement();
}
req.needSeq = true;
String id =
System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq;
LOGGER.info("execute receive requestID=" + requestID + " msgID=" + id);
if (checkCurNodeNumValid()) {
LOGGER.debug("checkCurNodeNumValid=true");
ResultCallback collector =
createResultCallback(id, rc, resultCount, req.seq, req.getContractID());
MasterServerTCPAction.sync.sleep(id, collector);
LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq);
sendRequest(id, req, collector);
} else {
LOGGER.debug("invalidNodeNumOnResult");
request_index.getAndDecrement();
ContractResult finalResult =
new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("node number unavailable, request refused."));
rc.onResult(JsonUtil.toJson(finalResult));
}
// }
/* // 三个相同requestID进来的时候会有冲突
// 仅在此处有冲突么
// 这里是从MasterServer->MasterClient请求的是"executeContractLocally"
req.seq = request_index.getAndIncrement();
req.needSeq = true;
ResultCallback collector = createResultCallback(id, rc, resultCount, req.getContractID());
MasterServerTCPAction.sync.sleep(id, collector);
sendRequest(id, req, collector);*/
}
// 清理缓存的多点合约请求序号
public void clearCache() {
final long time = System.currentTimeMillis() - 30000L;
seqMap.entrySet()
.removeIf(
entry -> {
MultiReqSeq cache = entry.getValue();
if (null == cache) {
return true;
}
return cache.startTime < time;
});
}
public static class ResultMerger extends ResultCallback {
ComponedContractResult componedContractResult;
AtomicInteger order;
String contractID;
int count;
int request_seq;
ResultCallback originalCallback;
Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点
ResultMerger(
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID) {
originalCallback = originalCb;
this.count = count;
this.request_seq = request_seq;
this.contractID = contractID;
componedContractResult = new ComponedContractResult(count);
order = new AtomicInteger(0);
}
public String getContractID() {
return contractID;
}
public String getInfo() {
return "contractID="
+ contractID
+ " 收到第 "
+ order
+ " 个节点回复 : "
+ " order="
+ order
+ " count="
+ count
+ " ";
}
@Override
public void onResult(String str) {
// TODO 必须在这里聚合
// str的data是个ContractResult
// 在这儿也是返回个ContractResult
try {
LOGGER.debug("a result of contract" + contractID + ": " + str);
JsonObject obj = JsonParser.parseString(str).getAsJsonObject();
if (obj.has("nodeID")) {
String id = obj.get("nodeID").getAsString();
if (nodeIDs.contains(id)) {
LOGGER.debug(
"ignored result because the result of node "
+ id.substring(0, 5)
+ " has been received");
return;
}
nodeIDs.add(id);
}
LOGGER.debug(
String.format(
"contractID=%s received=%s order=%d count=%d",
contractID, str, order.get(), count));
componedContractResult.add(obj);
// 收集到所有结果
if (order.incrementAndGet() == count) {
ContractResult finalResult = componedContractResult.figureFinalResult();
finalResult.needSeq = true;
finalResult.seq = request_seq;
// if (null == finalResult) {
// finalResult =
// new ContractResult(
// ContractResult.Status.Exception,
// new JsonPrimitive(
// "no nore than half of the
// consistent result"));
// originalCallback.onResult(new
// Gson().toJson(finalResult));
// } else {
originalCallback.onResult(JsonUtil.toJson(finalResult));
// }
LOGGER.debug(
String.format(
"%d results are the same: %s",
finalResult.size, finalResult.result));
// 集群中事务序号+1
CMActions.manager.multiContractRecorder
.getMultiContractMeta(contractID)
.nextSeqAtMaster();
// recover其中无状态合约CP出错无需恢复
Set<String> nodesID = componedContractResult.getProblemNodes();
if (null == nodesID || nodesID.isEmpty()) {
return;
}
for (String nodeID : nodesID) {
LOGGER.warn("node fails! " + nodeID);
if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID)
== RecoverFlag.Fine) {
MasterServerRecoverMechAction.recoverStatus
.get(nodeID)
.put(contractID, RecoverFlag.ToRecover);
}
}
for (String nodeID : nodesID) {
if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID)
== RecoverFlag.ToRecover) {
LOGGER.warn("node in recover " + nodeID);
// 因为该节点结果有误所以即时是stableMode也认为trans记录不可信
// 直接通过load别的节点来恢复
MasterServerRecoverMechAction.restartContractFromCommonMode(
nodeID, contractID);
}
}
}
// clearCache();
} catch (Exception e) {
e.printStackTrace();
LOGGER.warn("result exception!");
}
}
}
}
// }
// }
// // master负责缓存请求
// if (!MasterServerTCPAction.requestCache.containsKey(contractID)) {
// MasterServerTCPAction.requestCache.put(contractID, new RequestCache());
// }
// // TODO 多调多统一个seq的有多个请求这个需要改
// String[] nodes =
// CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers();
// LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID());
// LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes));
//
// }
//
//
// public boolean checkCurNodeNumValid() {
// return true;
// }
//
// @Override
// public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
// LOGGER.debug(JsonUtil.toJson(req));
// MultiContractMeta meta = CMActions.manager.multiContractRecorder.getMultiContractMeta(req.getContractID());
// if (meta == null || !meta.isMaster()) {
// CMActions.manager.executeContractOnOtherNodes(req, rc);
// return;
// }
// req.setContractID(CMActions.manager.getContractIDByName(req.getContractID()));
//
// // 三个相同requestID进来的时候会有冲突
// // 仅在此处有冲突么
// // 这里是从MasterServer->MasterClient请求的是"executeContractLocally"
//
// // 如果是多点合约的请求A1A2A3的序号应该一致不能分配一个新的seq根据requestID判断是否不需要重新分配一个序号
// //TODO seqMap memory leak
// //TODO
// //TODO
// if (null != requestID && requestID.endsWith("_mul")) {
// synchronized (lock) {
// if (seqMap.containsKey(requestID)) {
// req.seq = seqMap.get(requestID).seq;
// } else {
// req.seq = request_index.getAndIncrement();
// seqMap.put(requestID, new MultiReqSeq(req.seq));
// }
// }
// } else {
// req.seq = request_index.getAndIncrement();
// }
// req.needSeq = true;
// String id =
// System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq;
// LOGGER.info("execute receive requestID=" + requestID + " msgID=" + id);
//
// if (checkCurNodeNumValid()) {
// LOGGER.debug("checkCurNodeNumValid=true");
// ResultCallback collector =
// createResultCallback(id, rc, resultCount, req.seq, req.getContractID());
// MasterServerTCPAction.sync.sleep(id, collector);
// LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq);
// sendRequest(id, req, collector);
// } else {
// LOGGER.debug("invalidNodeNumOnResult");
// request_index.getAndDecrement();
// ContractResult finalResult =
// new ContractResult(
// ContractResult.Status.Error,
// new JsonPrimitive("node number unavailable, request refused."));
// rc.onResult(JsonUtil.toJson(finalResult));
// }
//
// // }
//
// /* // 三个相同requestID进来的时候会有冲突
// // 仅在此处有冲突么
// // 这里是从MasterServer->MasterClient请求的是"executeContractLocally"
// req.seq = request_index.getAndIncrement();
// req.needSeq = true;
// ResultCallback collector = createResultCallback(id, rc, resultCount, req.getContractID());
// MasterServerTCPAction.sync.sleep(id, collector);
// sendRequest(id, req, collector);*/
// }
//
// // 清理缓存的多点合约请求序号
// public void clearCache() {
// final long time = System.currentTimeMillis() - 30000L;
// seqMap.entrySet()
// .removeIf(
// entry -> {
// MultiReqSeq cache = entry.getValue();
// if (null == cache) {
// return true;
// }
// return cache.startTime < time;
// });
// }
//
// public static class ResultMerger extends ResultCallback {
// ComponedContractResult componedContractResult;
// AtomicInteger order;
// String contractID;
// int count;
// int request_seq;
// ResultCallback originalCallback;
// Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点
//
// ResultMerger(
// final ResultCallback originalCb,
// final int count,
// final int request_seq,
// final String contractID) {
// originalCallback = originalCb;
// this.count = count;
// this.request_seq = request_seq;
// this.contractID = contractID;
// componedContractResult = new ComponedContractResult(count);
// order = new AtomicInteger(0);
// }
//
// public String getContractID() {
// return contractID;
// }
//
// public String getInfo() {
// return "contractID="
// + contractID
// + " 收到第 "
// + order
// + " 个节点回复 : "
// + " order="
// + order
// + " count="
// + count
// + " ";
// }
//
// @Override
// public void onResult(String str) {
// // TODO 必须在这里聚合
// // str的data是个ContractResult
// // 在这儿也是返回个ContractResult
// try {
// LOGGER.debug("a result of contract" + contractID + ": " + str);
// JsonObject obj = JsonParser.parseString(str).getAsJsonObject();
// if (obj.has("nodeID")) {
// String id = obj.get("nodeID").getAsString();
// if (nodeIDs.contains(id)) {
// LOGGER.debug(
// "ignored result because the result of node "
// + id.substring(0, 5)
// + " has been received");
// return;
// }
// nodeIDs.add(id);
// }
//
// LOGGER.debug(
// String.format(
// "contractID=%s received=%s order=%d count=%d",
// contractID, str, order.get(), count));
// componedContractResult.add(obj);
// // 收集到所有结果
// if (order.incrementAndGet() == count) {
// ContractResult finalResult = componedContractResult.figureFinalResult();
// finalResult.needSeq = true;
// finalResult.seq = request_seq;
//
// // if (null == finalResult) {
// // finalResult =
// // new ContractResult(
// // ContractResult.Status.Exception,
// // new JsonPrimitive(
// // "no nore than half of the
// // consistent result"));
// // originalCallback.onResult(new
// // Gson().toJson(finalResult));
// // } else {
// originalCallback.onResult(JsonUtil.toJson(finalResult));
// // }
// LOGGER.debug(
// String.format(
// "%d results are the same: %s",
// finalResult.size, finalResult.result));
//
// // 集群中事务序号+1
// CMActions.manager.multiContractRecorder
// .getMultiContractMeta(contractID)
// .nextSeqAtMaster();
//
// // recover其中无状态合约CP出错无需恢复
// Set<String> nodesID = componedContractResult.getProblemNodes();
// if (null == nodesID || nodesID.isEmpty()) {
// return;
// }
// for (String nodeID : nodesID) {
// LOGGER.warn("node fails! " + nodeID);
// if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID)
// == RecoverFlag.Fine) {
// MasterServerRecoverMechAction.recoverStatus
// .get(nodeID)
// .put(contractID, RecoverFlag.ToRecover);
// }
// }
// for (String nodeID : nodesID) {
// if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID)
// == RecoverFlag.ToRecover) {
// LOGGER.warn("node in recover " + nodeID);
//
// // 因为该节点结果有误所以即时是stableMode也认为trans记录不可信
// // 直接通过load别的节点来恢复
// MasterServerRecoverMechAction.restartContractFromCommonMode(
// nodeID, contractID);
// }
// }
// }
// // clearCache();
// } catch (Exception e) {
// e.printStackTrace();
// LOGGER.warn("result exception!");
// }
// }
// }
//}

View File

@ -1,400 +1,400 @@
package org.bdware.server.executor.unconsistency;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.ComponedContractResult;
import org.bdware.sc.ContractMeta;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.*;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.RecoverFlag;
import org.bdware.sc.units.RequestCache;
import org.bdware.sc.units.ResultCache;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.MultiReqSeq;
import org.bdware.server.trustedmodel.ResultCollector;
import org.bdware.units.NetworkManager;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
// 改为MultiPointCooperationExecutor
public class MultiPointCooperationExecutor implements ContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(MultiPointCooperationExecutor.class);
final Object lock = new Object();
int resultCount;
AtomicInteger request_index = new AtomicInteger(0);
ContractExecType type;
// key为requestIDvalue为其seq
Map<String, MultiReqSeq> seqMap = new ConcurrentHashMap<>();
Map<String, ResultCache> resultCache = new ConcurrentHashMap<>();
// MultiPointContractInfo info;
MultiContractMeta multiMeta;
String contractID;
public MultiPointCooperationExecutor(ContractExecType t, int c, String con_id) {
LOGGER.info("-- sharding executor---");
type = t;
resultCount = c;
contractID = con_id;
multiMeta = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
}
public void setSeq(int seq) {
request_index = new AtomicInteger(seq);
}
public ResultCallback createResultCallback(
final String requestID,
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID, JoinInfo joinInfo) {
// TODO 加对应的超时
return new ResultCollector(
requestID,
new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID, joinInfo),
count); // 把count改成了1设置成获得1个响应就行
}
public void sendRequest(String id, ContractRequest req, String[] nodes) {
Map<String, Object> reqStr = new HashMap<>();
reqStr.put("uniReqID", id);
reqStr.put("data", req);
req.needSeq = false;
reqStr.put("action", "executeContractLocally");
String sendStr = JsonUtil.toJson(reqStr);
// master负责缓存请求
if (!MasterServerTCPAction.requestCache.containsKey(contractID)) {
MasterServerTCPAction.requestCache.put(contractID, new RequestCache());
}
// TODO 多调多统一个seq的有多个请求这个需要改
MasterServerTCPAction.requestCache.get(contractID).put(req.seq, sendStr);
LOGGER.debug(JsonUtil.toJson(req));
LOGGER.info("node size = " + nodes.length);
LOGGER.debug("nodes:" + JsonUtil.toJson(nodes));
for (String node : nodes) {
LOGGER.info(
"[sendRequests] get cmNode "
+ node.substring(0, 5)
+ " not null "
+ "RequestAllExecutor 发送请求给 "
+ node.substring(0, 5));
NetworkManager.instance.sendToAgent(node, sendStr);
}
}
private String[] getAccordingToRouteInfo(RouteInfo routeInfo, ContractRequest req, String[] members) {
try {
int val;
switch (routeInfo.useDefault) {
case byRequester:
val =
new BigInteger(req.getRequester(), 16)
.mod(new BigInteger("" + members.length))
.intValue();
while (val < 0) {
val = val + members.length;
}
return new String[]{members[val]};
case byArgHash:
val = req.getArg().hashCode();
val = val % members.length;
while (val < 0) {
val += members.length;
}
return new String[]{members[val]};
case byTarget:
JsonObject jo = req.getArg().getAsJsonObject();
val =
new BigInteger(jo.get("target").getAsString(), 16)
.mod(new BigInteger("" + members.length))
.intValue();
while (val < 0) {
val = val + members.length;
}
return new String[]{members[val]};
default:
return members;
}
} catch (Exception e) {
return members;
}
}
public boolean checkCurNodeNumValid() {
LOGGER.info("checkCurNodeNumValid");
String[] nodes = multiMeta.getMembers();
// List<String> nodes = info.members;
int validNode = 0;
for (String node : nodes) {
if (NetworkManager.instance.hasAgentConnection(node)
&& MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID)
== RecoverFlag.Fine) {
validNode++;
}
}
int c = resultCount;
if (type == ContractExecType.Sharding) c = (int) Math.ceil((double) c / 2);
LOGGER.info("c=" + c + " validNode=" + validNode);
return validNode >= c;
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
LOGGER.info("[MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req));
// 获得action 函数名
LOGGER.info("action is : " + req.getAction());
req.setContractID(CMActions.manager.getContractIDByName(req.getContractID()));
if (requestID != null && requestID.endsWith("_mul")) {
synchronized (lock) {
if (seqMap.containsKey(requestID)) {
req.seq = seqMap.get(requestID).seq;
} else {
req.seq = request_index.getAndIncrement();
seqMap.put(requestID, new MultiReqSeq(req.seq));
}
}
} else {
req.seq = request_index.getAndIncrement();
}
req.needSeq = true;
String id =
System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq;
LOGGER.info("execute receive requestID= " + requestID + " msgID=" + id);
if (checkCurNodeNumValid()) { // 校验成功 current node num 合法
LOGGER.info("checkCurNodeNumValid true");
ContractMeta meta =
CMActions.manager.statusRecorder.getContractMeta(req.getContractID());
FunctionDesp fun = meta.getExportedFunction(req.getAction());
ResultCallback collector;
// TODO @fanbo 下面的count 1要改应该是根据route的规则来
//Count 根据join规则来
//nodes 根据route规则来
JoinInfo joinInfo = fun.joinInfo;
RouteInfo routeInfo = fun.routeInfo;
int count = getJoinCount(joinInfo, contractID);
LOGGER.info("requestID=" + requestID + " join Count: " + count);
String[] members = multiMeta.getMembers();
String[] nodes = getAccordingToRouteInfo(routeInfo, req, members);
if (nodes.length < count) {
count = nodes.length;
}
collector =
createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器
MasterServerTCPAction.sync.sleep(id, collector);
LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq);
sendRequest(id, req, nodes); // 发送请求
} else {
LOGGER.info("invalidNodeNumOnResult");
request_index.getAndDecrement();
ContractResult finalResult =
new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("node number unavailbale,request refused."));
rc.onResult(JsonUtil.toJson(finalResult));
}
}
private int getJoinCount(JoinInfo joinInfo, String contractID) {
if (joinInfo == null) return resultCount;
if (joinInfo.joinCount.isJsonPrimitive() && joinInfo.joinCount.getAsJsonPrimitive().isNumber()) {
return joinInfo.joinCount.getAsJsonPrimitive().getAsInt();
}
try {
ContractRequest cr = new ContractRequest();
cr.setContractID(contractID);
cr.setAction(joinInfo.joinCount.getAsString());
//TODO Arg需要好好设计一下
//TODO 又好用又简单的那种设计
//TODO
cr.setArg("");
String result = CMActions.manager.executeLocally(cr, null);
return JsonUtil.parseString(result).getAsJsonObject().get("result").getAsInt();
} catch (Exception e) {
e.printStackTrace();
return 1;
}
}
// 清理缓存的多点合约请求序号
public void clearCache() {
final long time = System.currentTimeMillis() - 30000L;
seqMap.entrySet()
.removeIf(
entry -> {
MultiReqSeq cache = entry.getValue();
if (null == cache) {
return true;
}
return cache.startTime < time;
});
}
public static class ResultMerger extends ResultCallback {
ComponedContractResult componedContractResult;
AtomicInteger order;
String contractID;
int count; // 记录有多少个节点
int request_seq;
ResultCallback originalCallback;
Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点
JoinInfo joinInfo;
ResultMerger(
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID,
final JoinInfo joinInfo) {
originalCallback = originalCb;
this.count = count;
this.request_seq = request_seq;
this.contractID = contractID;
componedContractResult = new ComponedContractResult(count);
order = new AtomicInteger(0);
this.joinInfo = joinInfo;
}
public String getInfo() {
return "contractID="
+ contractID
+ " 收到第 "
+ order
+ " 个节点回复 : "
+ " order="
+ order
+ " count="
+ count
+ " ";
}
@Override
public void onResult(String str) {
// TODO 必须在这里聚合
// str的data是个ContractResult
// 在这儿也是返回个ContractResult
try {
LOGGER.info(str);
JsonObject obj = JsonParser.parseString(str).getAsJsonObject();
String id = obj.get("nodeID").getAsString();
if (nodeIDs.contains(id)) {
LOGGER.info("已经收到节点 " + id.substring(0, 5) + " 的结果,该结果被忽略");
return;
}
nodeIDs.add(id);
LOGGER.info(
"contractID="
+ contractID
+ " 收到第 "
+ order
+ " 个节点回复 : "
+ str
+ " order="
+ order
+ " count="
+ count);
componedContractResult.add(obj);
// 收集到所有结果
if (order.incrementAndGet() == count) {
ContractResult finalResult = componedContractResult.mergeFinalResult();
finalResult.needSeq = true;
finalResult.seq = request_seq;
// if (null == finalResult) {
// finalResult =
// new ContractResult(
// ContractResult.Status.Exception,
// new JsonPrimitive(
// "no nore than half of the
// consistent result"));
// originalCallback.onResult(new
// Gson().toJson(finalResult));
// } else {
if (joinInfo != null) {
handleJoinInfo(finalResult, joinInfo);
}
originalCallback.onResult(JsonUtil.toJson(finalResult));
// }
LOGGER.info(
"本次执行最终结果为 " + finalResult.size + "个节点合并的,结果为 " + finalResult.result);
// 集群中事务序号+1
// MasterServerTCPAction.contractID2Members.get(contractID).nextSeq();
CMActions.manager
.multiContractRecorder
.getMultiContractMeta(contractID)
.nextSeqAtMaster();
// recover其中无状态合约CP出错无需恢复
Set<String> nodesID = componedContractResult.getProblemNodes();
if (null == nodesID || nodesID.isEmpty()) {
return;
}
for (String nodeID : nodesID) {
LOGGER.info("结果出现问题的节点有:" + nodeID);
if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID)
== RecoverFlag.Fine) {
MasterServerRecoverMechAction.recoverStatus
.get(nodeID)
.put(contractID, RecoverFlag.ToRecover);
}
}
for (String nodeID : nodesID) {
if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID)
== RecoverFlag.ToRecover) {
LOGGER.info("问题节点开始恢复:" + nodeID);
// 因为该节点结果有误所以即时是stableMode也认为trans记录不可信
// 直接通过load别的节点来恢复
MasterServerRecoverMechAction.restartContractFromCommonMode(
nodeID, contractID);
}
}
}
// clearCache();
} catch (Exception e) {
e.printStackTrace();
LOGGER.info("本次执行最终结果为有异常");
}
}
private void handleJoinInfo(ContractResult finalResult, JoinInfo joinInfo) {
JsonObject jo = finalResult.result.getAsJsonObject();
if (joinInfo != null && joinInfo.joinRule != null) {
//TODO 不应该是double 类型
switch (joinInfo.joinRule) {
case "add":
double val = 0;
for (String key : jo.keySet()) {
val += jo.get(key).getAsDouble();
}
finalResult.result = new JsonPrimitive(val);
break;
case "multiply":
val = 1;
for (String key : jo.keySet()) {
val *= jo.get(key).getAsDouble();
}
finalResult.result = new JsonPrimitive(val);
break;
}
}
}
}
}
//package org.bdware.server.executor.unconsistency;
//
//import com.google.gson.JsonObject;
//import com.google.gson.JsonParser;
//import com.google.gson.JsonPrimitive;
//import org.apache.logging.log4j.LogManager;
//import org.apache.logging.log4j.Logger;
//import org.bdware.sc.ComponedContractResult;
//import org.bdware.sc.ContractMeta;
//import org.bdware.sc.ContractResult;
//import org.bdware.sc.bean.*;
//import org.bdware.sc.conn.OnHashCallback;
//import org.bdware.sc.conn.ResultCallback;
//import org.bdware.sc.units.MultiContractMeta;
//import org.bdware.sc.units.RecoverFlag;
//import org.bdware.sc.units.RequestCache;
//import org.bdware.sc.units.ResultCache;
//import org.bdware.sc.util.JsonUtil;
//import org.bdware.server.action.CMActions;
//import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
//import org.bdware.server.action.p2p.MasterServerTCPAction;
//import org.bdware.server.trustedmodel.ContractExecutor;
//import org.bdware.server.trustedmodel.MultiReqSeq;
//import org.bdware.server.trustedmodel.ResultCollector;
//import org.bdware.units.NetworkManager;
//
//import java.math.BigInteger;
//import java.util.HashMap;
//import java.util.HashSet;
//import java.util.Map;
//import java.util.Set;
//import java.util.concurrent.ConcurrentHashMap;
//import java.util.concurrent.atomic.AtomicInteger;
//
//// 改为MultiPointCooperationExecutor
//public class MultiPointCooperationExecutor implements ContractExecutor {
// private static final Logger LOGGER = LogManager.getLogger(MultiPointCooperationExecutor.class);
// final Object lock = new Object();
// int resultCount;
// AtomicInteger request_index = new AtomicInteger(0);
// ContractExecType type;
// // key为requestIDvalue为其seq
// Map<String, MultiReqSeq> seqMap = new ConcurrentHashMap<>();
// Map<String, ResultCache> resultCache = new ConcurrentHashMap<>();
// // MultiPointContractInfo info;
// MultiContractMeta multiMeta;
// String contractID;
//
// public MultiPointCooperationExecutor(ContractExecType t, int c, String con_id) {
// LOGGER.info("-- sharding executor---");
// type = t;
// resultCount = c;
// contractID = con_id;
// multiMeta = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
// }
//
// public void setSeq(int seq) {
// request_index = new AtomicInteger(seq);
// }
//
// public ResultCallback createResultCallback(
// final String requestID,
// final ResultCallback originalCb,
// final int count,
// final int request_seq,
// final String contractID, JoinInfo joinInfo) {
// // TODO 加对应的超时
// return new ResultCollector(
// requestID,
// new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID, joinInfo),
// count); // 把count改成了1设置成获得1个响应就行
// }
//
// public void sendRequest(String id, ContractRequest req, String[] nodes) {
// Map<String, Object> reqStr = new HashMap<>();
// reqStr.put("uniReqID", id);
// reqStr.put("data", req);
// req.needSeq = false;
// reqStr.put("action", "executeContractLocally");
// String sendStr = JsonUtil.toJson(reqStr);
// // master负责缓存请求
// if (!MasterServerTCPAction.requestCache.containsKey(contractID)) {
// MasterServerTCPAction.requestCache.put(contractID, new RequestCache());
// }
// // TODO 多调多统一个seq的有多个请求这个需要改
// MasterServerTCPAction.requestCache.get(contractID).put(req.seq, sendStr);
// LOGGER.debug(JsonUtil.toJson(req));
// LOGGER.info("node size = " + nodes.length);
// LOGGER.debug("nodes:" + JsonUtil.toJson(nodes));
// for (String node : nodes) {
// LOGGER.info(
// "[sendRequests] get cmNode "
// + node.substring(0, 5)
// + " not null "
// + "RequestAllExecutor 发送请求给 "
// + node.substring(0, 5));
// NetworkManager.instance.sendToAgent(node, sendStr);
// }
// }
//
// private String[] getAccordingToRouteInfo(RouteInfo routeInfo, ContractRequest req, String[] members) {
// try {
// int val;
// switch (routeInfo.useDefault) {
// case byRequester:
// val =
// new BigInteger(req.getRequester(), 16)
// .mod(new BigInteger("" + members.length))
// .intValue();
// while (val < 0) {
// val = val + members.length;
// }
// return new String[]{members[val]};
// case byArgHash:
// val = req.getArg().hashCode();
// val = val % members.length;
// while (val < 0) {
// val += members.length;
// }
// return new String[]{members[val]};
// case byTarget:
// JsonObject jo = req.getArg().getAsJsonObject();
// val =
// new BigInteger(jo.get("target").getAsString(), 16)
// .mod(new BigInteger("" + members.length))
// .intValue();
// while (val < 0) {
// val = val + members.length;
// }
// return new String[]{members[val]};
// default:
// return members;
// }
// } catch (Exception e) {
// return members;
// }
// }
//
// public boolean checkCurNodeNumValid() {
// LOGGER.info("checkCurNodeNumValid");
// String[] nodes = multiMeta.getMembers();
// // List<String> nodes = info.members;
// int validNode = 0;
// for (String node : nodes) {
// if (NetworkManager.instance.hasAgentConnection(node)
// && MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID)
// == RecoverFlag.Fine) {
// validNode++;
// }
// }
// int c = resultCount;
// if (type == ContractExecType.Sharding) c = (int) Math.ceil((double) c / 2);
// LOGGER.info("c=" + c + " validNode=" + validNode);
// return validNode >= c;
// }
//
// @Override
// public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
// LOGGER.info("[MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req));
// // 获得action 函数名
// LOGGER.info("action is : " + req.getAction());
// req.setContractID(CMActions.manager.getContractIDByName(req.getContractID()));
// if (requestID != null && requestID.endsWith("_mul")) {
// synchronized (lock) {
// if (seqMap.containsKey(requestID)) {
// req.seq = seqMap.get(requestID).seq;
// } else {
// req.seq = request_index.getAndIncrement();
// seqMap.put(requestID, new MultiReqSeq(req.seq));
// }
// }
// } else {
// req.seq = request_index.getAndIncrement();
// }
// req.needSeq = true;
// String id =
// System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq;
// LOGGER.info("execute receive requestID= " + requestID + " msgID=" + id);
// if (checkCurNodeNumValid()) { // 校验成功 current node num 合法
// LOGGER.info("checkCurNodeNumValid true");
// ContractMeta meta =
// CMActions.manager.statusRecorder.getContractMeta(req.getContractID());
// FunctionDesp fun = meta.getExportedFunction(req.getAction());
// ResultCallback collector;
// // TODO @fanbo 下面的count 1要改应该是根据route的规则来
// //Count 根据join规则来
// //nodes 根据route规则来
// JoinInfo joinInfo = fun.joinInfo;
// RouteInfo routeInfo = fun.routeInfo;
// int count = getJoinCount(joinInfo, contractID);
// LOGGER.info("requestID=" + requestID + " join Count: " + count);
//
// String[] members = multiMeta.getMembers();
// String[] nodes = getAccordingToRouteInfo(routeInfo, req, members);
// if (nodes.length < count) {
// count = nodes.length;
// }
// collector =
// createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器
// MasterServerTCPAction.sync.sleep(id, collector);
// LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq);
// sendRequest(id, req, nodes); // 发送请求
// } else {
// LOGGER.info("invalidNodeNumOnResult");
// request_index.getAndDecrement();
// ContractResult finalResult =
// new ContractResult(
// ContractResult.Status.Error,
// new JsonPrimitive("node number unavailbale,request refused."));
// rc.onResult(JsonUtil.toJson(finalResult));
// }
// }
//
// private int getJoinCount(JoinInfo joinInfo, String contractID) {
// if (joinInfo == null) return resultCount;
// if (joinInfo.joinCount.isJsonPrimitive() && joinInfo.joinCount.getAsJsonPrimitive().isNumber()) {
// return joinInfo.joinCount.getAsJsonPrimitive().getAsInt();
// }
// try {
// ContractRequest cr = new ContractRequest();
// cr.setContractID(contractID);
// cr.setAction(joinInfo.joinCount.getAsString());
// //TODO Arg需要好好设计一下
// //TODO 又好用又简单的那种设计
// //TODO
// cr.setArg("");
// String result = CMActions.manager.executeLocally(cr, null);
// return JsonUtil.parseString(result).getAsJsonObject().get("result").getAsInt();
// } catch (Exception e) {
// e.printStackTrace();
// return 1;
// }
// }
//
// // 清理缓存的多点合约请求序号
// public void clearCache() {
// final long time = System.currentTimeMillis() - 30000L;
// seqMap.entrySet()
// .removeIf(
// entry -> {
// MultiReqSeq cache = entry.getValue();
// if (null == cache) {
// return true;
// }
// return cache.startTime < time;
// });
// }
//
// public static class ResultMerger extends ResultCallback {
// ComponedContractResult componedContractResult;
// AtomicInteger order;
// String contractID;
// int count; // 记录有多少个节点
// int request_seq;
// ResultCallback originalCallback;
// Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点
// JoinInfo joinInfo;
//
// ResultMerger(
// final ResultCallback originalCb,
// final int count,
// final int request_seq,
// final String contractID,
// final JoinInfo joinInfo) {
// originalCallback = originalCb;
// this.count = count;
// this.request_seq = request_seq;
// this.contractID = contractID;
// componedContractResult = new ComponedContractResult(count);
// order = new AtomicInteger(0);
// this.joinInfo = joinInfo;
// }
//
// public String getInfo() {
// return "contractID="
// + contractID
// + " 收到第 "
// + order
// + " 个节点回复 : "
// + " order="
// + order
// + " count="
// + count
// + " ";
// }
//
// @Override
// public void onResult(String str) {
// // TODO 必须在这里聚合
// // str的data是个ContractResult
// // 在这儿也是返回个ContractResult
// try {
// LOGGER.info(str);
// JsonObject obj = JsonParser.parseString(str).getAsJsonObject();
// String id = obj.get("nodeID").getAsString();
// if (nodeIDs.contains(id)) {
// LOGGER.info("已经收到节点 " + id.substring(0, 5) + " 的结果,该结果被忽略");
// return;
// }
// nodeIDs.add(id);
// LOGGER.info(
// "contractID="
// + contractID
// + " 收到第 "
// + order
// + " 个节点回复 : "
// + str
// + " order="
// + order
// + " count="
// + count);
// componedContractResult.add(obj);
// // 收集到所有结果
// if (order.incrementAndGet() == count) {
// ContractResult finalResult = componedContractResult.mergeFinalResult();
//
// finalResult.needSeq = true;
// finalResult.seq = request_seq;
//
// // if (null == finalResult) {
// // finalResult =
// // new ContractResult(
// // ContractResult.Status.Exception,
// // new JsonPrimitive(
// // "no nore than half of the
// // consistent result"));
// // originalCallback.onResult(new
// // Gson().toJson(finalResult));
// // } else {
// if (joinInfo != null) {
// handleJoinInfo(finalResult, joinInfo);
// }
// originalCallback.onResult(JsonUtil.toJson(finalResult));
// // }
// LOGGER.info(
// "本次执行最终结果为 " + finalResult.size + "个节点合并的,结果为 " + finalResult.result);
//
// // 集群中事务序号+1
// // MasterServerTCPAction.contractID2Members.get(contractID).nextSeq();
// CMActions.manager
// .multiContractRecorder
// .getMultiContractMeta(contractID)
// .nextSeqAtMaster();
// // recover其中无状态合约CP出错无需恢复
// Set<String> nodesID = componedContractResult.getProblemNodes();
// if (null == nodesID || nodesID.isEmpty()) {
// return;
// }
// for (String nodeID : nodesID) {
// LOGGER.info("结果出现问题的节点有:" + nodeID);
// if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID)
// == RecoverFlag.Fine) {
// MasterServerRecoverMechAction.recoverStatus
// .get(nodeID)
// .put(contractID, RecoverFlag.ToRecover);
// }
// }
// for (String nodeID : nodesID) {
// if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID)
// == RecoverFlag.ToRecover) {
// LOGGER.info("问题节点开始恢复:" + nodeID);
//
// // 因为该节点结果有误所以即时是stableMode也认为trans记录不可信
// // 直接通过load别的节点来恢复
// MasterServerRecoverMechAction.restartContractFromCommonMode(
// nodeID, contractID);
// }
// }
// }
// // clearCache();
// } catch (Exception e) {
// e.printStackTrace();
// LOGGER.info("本次执行最终结果为有异常");
// }
// }
//
// private void handleJoinInfo(ContractResult finalResult, JoinInfo joinInfo) {
// JsonObject jo = finalResult.result.getAsJsonObject();
// if (joinInfo != null && joinInfo.joinRule != null) {
// //TODO 不应该是double 类型
// switch (joinInfo.joinRule) {
// case "add":
// double val = 0;
// for (String key : jo.keySet()) {
// val += jo.get(key).getAsDouble();
// }
// finalResult.result = new JsonPrimitive(val);
// break;
// case "multiply":
// val = 1;
// for (String key : jo.keySet()) {
// val *= jo.get(key).getAsDouble();
// }
// finalResult.result = new JsonPrimitive(val);
// break;
// }
// }
// }
// }
//}

View File

@ -1,68 +1,68 @@
package org.bdware.server.executor.unconsistency;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.ControllerManager;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.AgentManager;
import org.bdware.units.NetworkManager;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class RequestOnceExecutor implements ContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(RequestOnceExecutor.class);
String contractID;
AtomicInteger order = new AtomicInteger(0);
public RequestOnceExecutor(String contractID) {
this.contractID = contractID;
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
ResultCallback cb =
new ResultCallback() {
@Override
public void onResult(String str) {
LOGGER.debug(str);
JsonObject jo = JsonParser.parseString(str).getAsJsonObject();
JsonObject result =
JsonParser.parseString(jo.get("data").getAsString())
.getAsJsonObject();
for (String key : result.keySet()) jo.add(key, result.get(key));
jo.remove("action");
jo.addProperty("action", "onExecuteResult");
LOGGER.debug(jo.toString());
rc.onResult(jo.toString());
}
};
MasterServerTCPAction.sync.sleep(requestID, cb);
String[] members = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers();
for (int i = 0; i < members.length; i++) {
LOGGER.info("[members]:" + members.length);
int size = members.length;
String nodeID = members[order.incrementAndGet() % size];
//ADD Connect
Map<String, Object> obj = new HashMap<>();
obj.put("action", "executeContractLocally");
obj.put("requestID",requestID);
obj.put("data", req);
obj.put("uniReqID", requestID);
NetworkManager.instance.sendToAgent(nodeID,JsonUtil.toJson(obj));
return;
}
rc.onResult(
"{\"status\":\"Error\",\"result\":\"all nodes "
+ " offline\",\"action\":\"onExecuteContract\"}");
}
}
//package org.bdware.server.executor.unconsistency;
//
//import com.google.gson.JsonObject;
//import com.google.gson.JsonParser;
//import org.apache.logging.log4j.LogManager;
//import org.apache.logging.log4j.Logger;
//import org.bdware.sc.bean.ContractRequest;
//import org.bdware.sc.conn.OnHashCallback;
//import org.bdware.sc.conn.ResultCallback;
//import org.bdware.sc.util.JsonUtil;
//import org.bdware.server.ControllerManager;
//import org.bdware.server.action.CMActions;
//import org.bdware.server.action.p2p.MasterServerTCPAction;
//import org.bdware.server.trustedmodel.ContractExecutor;
//import org.bdware.server.trustedmodel.AgentManager;
//import org.bdware.units.NetworkManager;
//
//import java.util.HashMap;
//import java.util.Map;
//import java.util.concurrent.atomic.AtomicInteger;
//
//public class RequestOnceExecutor implements ContractExecutor {
// private static final Logger LOGGER = LogManager.getLogger(RequestOnceExecutor.class);
// String contractID;
// AtomicInteger order = new AtomicInteger(0);
//
// public RequestOnceExecutor(String contractID) {
// this.contractID = contractID;
// }
//
// @Override
// public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
// ResultCallback cb =
// new ResultCallback() {
// @Override
// public void onResult(String str) {
// LOGGER.debug(str);
// JsonObject jo = JsonParser.parseString(str).getAsJsonObject();
// JsonObject result =
// JsonParser.parseString(jo.get("data").getAsString())
// .getAsJsonObject();
// for (String key : result.keySet()) jo.add(key, result.get(key));
// jo.remove("action");
// jo.addProperty("action", "onExecuteResult");
// LOGGER.debug(jo.toString());
// rc.onResult(jo.toString());
// }
// };
// MasterServerTCPAction.sync.sleep(requestID, cb);
// String[] members = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers();
// for (int i = 0; i < members.length; i++) {
// LOGGER.info("[members]:" + members.length);
// int size = members.length;
// String nodeID = members[order.incrementAndGet() % size];
// //ADD Connect
// Map<String, Object> obj = new HashMap<>();
// obj.put("action", "executeContractLocally");
// obj.put("requestID",requestID);
// obj.put("data", req);
// obj.put("uniReqID", requestID);
// NetworkManager.instance.sendToAgent(nodeID,JsonUtil.toJson(obj));
// return;
// }
// rc.onResult(
// "{\"status\":\"Error\",\"result\":\"all nodes "
// + " offline\",\"action\":\"onExecuteContract\"}");
// }
//}

View File

@ -1,83 +1,83 @@
package org.bdware.server.executor.unconsistency;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.units.NetworkManager;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class ResponseOnceExecutor implements ContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(ResponseOnceExecutor.class);
private final String contractID;
AtomicInteger order = new AtomicInteger(0);
public ResponseOnceExecutor(String contractID) {
this.contractID = contractID;
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
executeInternal(requestID, rc, req, 2);
}
private void executeInternal(
String requestID, ResultCallback rc, ContractRequest req, int count) {
// String contractID = req.getContractID();
// TODO 标注失效节点是否选择重新迁移
ResultCallback cb =
new ResultCallback() {
@Override
public void onResult(String str) {
LOGGER.debug(str);
JsonObject jo = JsonParser.parseString(str).getAsJsonObject();
jo.remove("action");
jo.addProperty("action", "onExecuteResult");
LOGGER.debug(jo.toString());
if (jo.has("data")) {
String data = jo.get("data").getAsString();
ContractResult cr = JsonUtil.fromJson(data, ContractResult.class);
if (cr.status != ContractResult.Status.Success && count > 0) {
executeInternal(requestID, rc, req, count - 1);
} else rc.onResult(jo.toString());
} else {
JsonObject jo2 = new JsonObject();
jo2.addProperty("action", "onExecuteResult");
jo.remove("action");
jo2.addProperty("data", jo.toString());
rc.onResult(jo2.toString());
}
}
};
MasterServerTCPAction.sync.sleepWithTimeout(requestID, cb, 5);
if (!sendOnce(requestID, req))
rc.onResult(
"{\"status\":\"Error\",\"data\":\"{\\\"status\\\":\\\"Error\\\",\\\"result\\\":\\\"all nodes offline\\\"}\",\"action\":\"onExecuteContract\"}");
}
private boolean sendOnce(String requestID, ContractRequest req) {
String[] members = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers();
for (int i = 0; i < members.length; i++) {
int size = members.length;
String nodeID = members[order.incrementAndGet() % size];
Map<String, Object> obj = new HashMap<>();
obj.put("action", "executeContractLocally");
obj.put("data", req);
obj.put("uniReqID", requestID);
NetworkManager.instance.sendToAgent(nodeID, JsonUtil.toJson(obj));
return true;
}
return false;
}
}
//package org.bdware.server.executor.unconsistency;
//
//import com.google.gson.JsonObject;
//import com.google.gson.JsonParser;
//import org.apache.logging.log4j.LogManager;
//import org.apache.logging.log4j.Logger;
//import org.bdware.sc.ContractResult;
//import org.bdware.sc.bean.ContractRequest;
//import org.bdware.sc.conn.OnHashCallback;
//import org.bdware.sc.conn.ResultCallback;
//import org.bdware.sc.util.JsonUtil;
//import org.bdware.server.action.CMActions;
//import org.bdware.server.action.p2p.MasterServerTCPAction;
//import org.bdware.server.trustedmodel.ContractExecutor;
//import org.bdware.units.NetworkManager;
//
//import java.util.HashMap;
//import java.util.Map;
//import java.util.concurrent.atomic.AtomicInteger;
//
//public class ResponseOnceExecutor implements ContractExecutor {
// private static final Logger LOGGER = LogManager.getLogger(ResponseOnceExecutor.class);
// private final String contractID;
// AtomicInteger order = new AtomicInteger(0);
//
// public ResponseOnceExecutor(String contractID) {
// this.contractID = contractID;
// }
//
// @Override
// public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
// executeInternal(requestID, rc, req, 2);
// }
//
// private void executeInternal(
// String requestID, ResultCallback rc, ContractRequest req, int count) {
// // String contractID = req.getContractID();
// // TODO 标注失效节点是否选择重新迁移
// ResultCallback cb =
// new ResultCallback() {
// @Override
// public void onResult(String str) {
// LOGGER.debug(str);
// JsonObject jo = JsonParser.parseString(str).getAsJsonObject();
// jo.remove("action");
// jo.addProperty("action", "onExecuteResult");
// LOGGER.debug(jo.toString());
// if (jo.has("data")) {
// String data = jo.get("data").getAsString();
// ContractResult cr = JsonUtil.fromJson(data, ContractResult.class);
// if (cr.status != ContractResult.Status.Success && count > 0) {
// executeInternal(requestID, rc, req, count - 1);
// } else rc.onResult(jo.toString());
// } else {
// JsonObject jo2 = new JsonObject();
// jo2.addProperty("action", "onExecuteResult");
// jo.remove("action");
// jo2.addProperty("data", jo.toString());
// rc.onResult(jo2.toString());
// }
// }
// };
// MasterServerTCPAction.sync.sleepWithTimeout(requestID, cb, 5);
// if (!sendOnce(requestID, req))
// rc.onResult(
// "{\"status\":\"Error\",\"data\":\"{\\\"status\\\":\\\"Error\\\",\\\"result\\\":\\\"all nodes offline\\\"}\",\"action\":\"onExecuteContract\"}");
// }
//
// private boolean sendOnce(String requestID, ContractRequest req) {
// String[] members = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers();
// for (int i = 0; i < members.length; i++) {
// int size = members.length;
// String nodeID = members[order.incrementAndGet() % size];
// Map<String, Object> obj = new HashMap<>();
// obj.put("action", "executeContractLocally");
// obj.put("data", req);
// obj.put("uniReqID", requestID);
// NetworkManager.instance.sendToAgent(nodeID, JsonUtil.toJson(obj));
// return true;
// }
// return false;
// }
//}

View File

@ -1,11 +0,0 @@
package org.bdware.server.trustedmodel;
public class MultiReqSeq {
public final int seq;
public final long startTime;
public MultiReqSeq(int s){
seq = s;
startTime = System.currentTimeMillis();
}
}

View File

@ -1,305 +1,305 @@
package org.bdware.server.trustedmodel;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.ContractClient;
import org.bdware.sc.ContractManager;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.bean.FunctionDesp;
import org.bdware.sc.bean.SM2Verifiable;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.RecoverFlag;
import org.bdware.sc.util.HashUtil;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
import org.bdware.units.NetworkManager;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* @author Kaidong Wu
*/
public class SelfAdaptiveShardingExecutor implements ContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class);
private static final int SUBMIT_LIMIT = 1024;
private final Queue<ContractRequest> reqQueue = new ConcurrentLinkedQueue<>();
private final MultiContractMeta meta;
private final Map<String, Block> toExecuted = new ConcurrentHashMap<>();
private final Set<String> executedBlocks = ConcurrentHashMap.newKeySet();
private final Map<String, Boolean> executedTxs = new ConcurrentHashMap<>();
private final Object flag = new Object();
private final ScheduledFuture<?> future;
private boolean running = true;
private Block b = new Block();
public SelfAdaptiveShardingExecutor(String contractID) {
this.meta =
CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
this.future = ContractManager.scheduledThreadPool.scheduleWithFixedDelay(
this::submitBlock,
2,
2,
TimeUnit.SECONDS);
LOGGER.debug(String.format("ContractManager.threadPool=%d/%d",
((ThreadPoolExecutor) ContractManager.threadPool).getActiveCount(),
((ThreadPoolExecutor) ContractManager.threadPool).getPoolSize()));
ContractManager.threadPool.execute(() -> {
LOGGER.info(
"[SelfAdaptiveShardingExecutor " + meta.getContractID() + "] starting service..." + running);
while (running) {
LOGGER.info("checking blocks to be executed, latest block=" +
this.b.prevHash + ", to be executed size=" + toExecuted.size());
LOGGER.debug("executed: " + JsonUtil.toJson(executedBlocks) + "\n\t" + JsonUtil.toJson(executedTxs));
while (!toExecuted.isEmpty()) {
String key = this.b.prevHash;
Block block = toExecuted.get(key);
if (null != block) {
executeBlock(block);
}
toExecuted.remove(key);
}
synchronized (flag) {
try {
flag.wait();
} catch (InterruptedException e) {
LOGGER.warn(String.format(
"[SelfAdaptiveShardingExecutor %s] waiting is interrupted: %s",
meta.getContractID(),
e.getMessage()));
}
}
}
});
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) {
// check client
ContractClient client = CMActions.manager.getClient(meta.getContractID());
if (null == client) {
LOGGER.error("contract " + meta.getContractID() + " not found!");
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("contract " + meta.getContractID() + " not found!"))));
return;
}
// check function
FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction());
if (null == funDesp) {
LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!");
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive(
String.format("action %s of contract %s not found!",
req.getAction(),
meta.getContractID())))));
return;
}
// for view function, execute it
if (funDesp.isView) {
CMActions.manager.executeLocallyAsync(req, rcb, hcb);
return;
}
// normal function, check if it is in blocks
if (executedTxs.containsKey(requestID)) {
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("this request has been packed!"))));
return;
}
// add blocks into request cache
LOGGER.debug("receive contract request " + requestID);
executedTxs.put(requestID, false);
reqQueue.add(req);
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Executing,
new JsonPrimitive("this request is adding into blocks"))));
// if cache is full, submit
if (reqQueue.size() >= SUBMIT_LIMIT) {
ContractManager.threadPool.execute(this::submitBlock);
}
}
@Override
public void close() {
// stop threads
this.future.cancel(true);
this.running = false;
LOGGER.info("destruct executor of contract " + meta.getContractID());
}
public void execute(String blockStr) {
Block block = JsonUtil.fromJson(blockStr, Block.class);
// the block must have not been cached or executed, and must be valid
if (!toExecuted.containsKey(block.prevHash) &&
!executedBlocks.contains(block.hash) &&
block.isValid()) {
// add block into block cache
LOGGER.info(String.format(
"[SelfAdaptiveShardingExecutor %s] receive block %s -> %s," +
" %d transactions, timestamp=%d, size=%d",
meta.getContractID(),
block.hash,
block.prevHash,
block.requests.length,
block.timestamp,
blockStr.length()));
toExecuted.put(block.prevHash, block);
// notify thread to execute blocks
synchronized (flag) {
flag.notify();
}
}
}
private synchronized void executeBlock(Block block) {
// used for the thread to execute blocks
LOGGER.debug("start");
// check contract requests, requests must have not been executed
for (ContractRequest request : block.requests) {
if (executedTxs.containsKey(request.getRequestID()) && executedTxs.get(request.getRequestID())) {
LOGGER.debug("find request " + request.getRequestID() + " has been executed!");
return;
}
}
// TODO check status
// executed requests
for (ContractRequest request : block.requests) {
String ret = CMActions.manager.executeLocally(request, null);
LOGGER.debug(String.format(
"[SelfAdaptiveShardingExecutor %s] result of request %s: %s",
meta.getContractID(),
request.getRequestID(),
ret));
executedTxs.put(request.getRequestID(), true);
}
LOGGER.info(String.format(
"[SelfAdaptiveShardingExecutor %s] execute %d transactions of block %s",
meta.getContractID(),
block.requests.length,
block.hash));
// TODO create check point
this.b = new Block(block.hash, this.b.height + 1);
executedBlocks.add(block.hash);
}
private void submitBlock() {
Block block = fillBlock();
if (null != block) {
LOGGER.info("deliver block " + block.hash + "...");
LOGGER.debug(JsonUtil.toPrettyJson(block));
String[] nodes = this.meta.getMembers();
JsonObject req = new JsonObject();
req.addProperty("action", "deliverBlock");
req.addProperty("data", JsonUtil.toJson(block));
req.addProperty("contractID", this.meta.getContractID());
String reqStr = req.toString();
// deliver blocks
for (String node : nodes) {
if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID())
== RecoverFlag.Fine) {
NetworkManager.instance.sendToAgent(node, reqStr);
}
}
}
}
private synchronized Block fillBlock() {
// pack contract requests into a block
ContractRequest[] requests = new ContractRequest[Math.min(reqQueue.size(), SUBMIT_LIMIT)];
if (requests.length == 0) {
return null;
}
for (int i = 0; i < requests.length; ++i) {
requests[i] = reqQueue.poll();
}
this.b.fillBlock(requests);
return this.b;
}
static class Block extends SM2Verifiable {
String prevHash = "0";
String hash;
int height;
String checkPoint;
String body;
String nodePubKey;
ContractRequest[] requests;
long timestamp;
public Block() {
this.height = 0;
}
public Block(String prev, int height) {
this.prevHash = prev;
this.height = height;
}
public void fillBlock(ContractRequest[] requests) {
this.requests = requests;
this.timestamp = System.currentTimeMillis();
this.body = merkle(requests);
this.hash = computeHash();
doSignature(CMActions.manager.nodeCenterConn.getNodeKeyPair());
}
public boolean isValid() {
return computeHash().equals(hash) && body.equals(merkle(this.requests)) && verifySignature();
}
private String computeHash() {
return HashUtil.sha3(
String.valueOf(this.height),
this.prevHash,
this.checkPoint,
this.body);
}
private String merkle(ContractRequest[] requests) {
// manage requests as a merkle tree
if (requests.length == 0) {
return null;
}
if (requests.length == 1) {
return HashUtil.sha3(requests[0].getRequestID());
}
Queue<String> reqQueue =
Arrays.stream(requests).map(ContractRequest::getRequestID)
.collect(Collectors.toCollection(ArrayDeque::new));
do {
int size;
for (size = reqQueue.size(); size > 1; size -= 2) {
reqQueue.add(HashUtil.sha3(reqQueue.poll(), reqQueue.poll()));
}
if (size == 1) {
reqQueue.add(reqQueue.poll());
}
} while (1 != reqQueue.size());
return reqQueue.poll();
}
@Override
public String getPublicKey() {
return nodePubKey;
}
@Override
public void setPublicKey(String pubkey) {
this.nodePubKey = pubkey;
}
@Override
public String getContentStr() {
return this.hash;
}
}
}
//package org.bdware.server.trustedmodel;
//
//import com.google.gson.JsonObject;
//import com.google.gson.JsonPrimitive;
//import org.apache.logging.log4j.LogManager;
//import org.apache.logging.log4j.Logger;
//import org.bdware.sc.ContractClient;
//import org.bdware.sc.ContractManager;
//import org.bdware.sc.ContractResult;
//import org.bdware.sc.bean.ContractRequest;
//import org.bdware.sc.bean.FunctionDesp;
//import org.bdware.sc.bean.SM2Verifiable;
//import org.bdware.sc.conn.OnHashCallback;
//import org.bdware.sc.conn.ResultCallback;
//import org.bdware.sc.units.MultiContractMeta;
//import org.bdware.sc.units.RecoverFlag;
//import org.bdware.sc.util.HashUtil;
//import org.bdware.sc.util.JsonUtil;
//import org.bdware.server.action.CMActions;
//import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
//import org.bdware.units.NetworkManager;
//
//import java.util.*;
//import java.util.concurrent.*;
//import java.util.stream.Collectors;
//
///**
// * @author Kaidong Wu
// */
//public class SelfAdaptiveShardingExecutor implements ContractExecutor {
// private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class);
// private static final int SUBMIT_LIMIT = 1024;
// private final Queue<ContractRequest> reqQueue = new ConcurrentLinkedQueue<>();
// private final MultiContractMeta meta;
// private final Map<String, Block> toExecuted = new ConcurrentHashMap<>();
// private final Set<String> executedBlocks = ConcurrentHashMap.newKeySet();
// private final Map<String, Boolean> executedTxs = new ConcurrentHashMap<>();
// private final Object flag = new Object();
// private final ScheduledFuture<?> future;
// private boolean running = true;
// private Block b = new Block();
//
// public SelfAdaptiveShardingExecutor(String contractID) {
// this.meta =
// CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
// this.future = ContractManager.scheduledThreadPool.scheduleWithFixedDelay(
// this::submitBlock,
// 2,
// 2,
// TimeUnit.SECONDS);
// LOGGER.debug(String.format("ContractManager.threadPool=%d/%d",
// ((ThreadPoolExecutor) ContractManager.threadPool).getActiveCount(),
// ((ThreadPoolExecutor) ContractManager.threadPool).getPoolSize()));
// ContractManager.threadPool.execute(() -> {
// LOGGER.info(
// "[SelfAdaptiveShardingExecutor " + meta.getContractID() + "] starting service..." + running);
// while (running) {
// LOGGER.info("checking blocks to be executed, latest block=" +
// this.b.prevHash + ", to be executed size=" + toExecuted.size());
// LOGGER.debug("executed: " + JsonUtil.toJson(executedBlocks) + "\n\t" + JsonUtil.toJson(executedTxs));
// while (!toExecuted.isEmpty()) {
// String key = this.b.prevHash;
// Block block = toExecuted.get(key);
// if (null != block) {
// executeBlock(block);
// }
// toExecuted.remove(key);
// }
// synchronized (flag) {
// try {
// flag.wait();
// } catch (InterruptedException e) {
// LOGGER.warn(String.format(
// "[SelfAdaptiveShardingExecutor %s] waiting is interrupted: %s",
// meta.getContractID(),
// e.getMessage()));
// }
// }
// }
// });
// }
//
// @Override
// public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) {
// // check client
// ContractClient client = CMActions.manager.getClient(meta.getContractID());
// if (null == client) {
// LOGGER.error("contract " + meta.getContractID() + " not found!");
// rcb.onResult(JsonUtil.toJson(new ContractResult(
// ContractResult.Status.Error,
// new JsonPrimitive("contract " + meta.getContractID() + " not found!"))));
// return;
// }
// // check function
// FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction());
// if (null == funDesp) {
// LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!");
// rcb.onResult(JsonUtil.toJson(new ContractResult(
// ContractResult.Status.Error,
// new JsonPrimitive(
// String.format("action %s of contract %s not found!",
// req.getAction(),
// meta.getContractID())))));
// return;
// }
// // for view function, execute it
// if (funDesp.isView) {
// CMActions.manager.executeLocallyAsync(req, rcb, hcb);
// return;
// }
// // normal function, check if it is in blocks
// if (executedTxs.containsKey(requestID)) {
// rcb.onResult(JsonUtil.toJson(new ContractResult(
// ContractResult.Status.Error,
// new JsonPrimitive("this request has been packed!"))));
// return;
// }
// // add blocks into request cache
// LOGGER.debug("receive contract request " + requestID);
// executedTxs.put(requestID, false);
// reqQueue.add(req);
// rcb.onResult(JsonUtil.toJson(new ContractResult(
// ContractResult.Status.Executing,
// new JsonPrimitive("this request is adding into blocks"))));
// // if cache is full, submit
// if (reqQueue.size() >= SUBMIT_LIMIT) {
// ContractManager.threadPool.execute(this::submitBlock);
// }
// }
//
// @Override
// public void close() {
// // stop threads
// this.future.cancel(true);
// this.running = false;
// LOGGER.info("destruct executor of contract " + meta.getContractID());
// }
//
// public void execute(String blockStr) {
// Block block = JsonUtil.fromJson(blockStr, Block.class);
// // the block must have not been cached or executed, and must be valid
// if (!toExecuted.containsKey(block.prevHash) &&
// !executedBlocks.contains(block.hash) &&
// block.isValid()) {
// // add block into block cache
// LOGGER.info(String.format(
// "[SelfAdaptiveShardingExecutor %s] receive block %s -> %s," +
// " %d transactions, timestamp=%d, size=%d",
// meta.getContractID(),
// block.hash,
// block.prevHash,
// block.requests.length,
// block.timestamp,
// blockStr.length()));
// toExecuted.put(block.prevHash, block);
// // notify thread to execute blocks
// synchronized (flag) {
// flag.notify();
// }
// }
// }
//
// private synchronized void executeBlock(Block block) {
// // used for the thread to execute blocks
// LOGGER.debug("start");
// // check contract requests, requests must have not been executed
// for (ContractRequest request : block.requests) {
// if (executedTxs.containsKey(request.getRequestID()) && executedTxs.get(request.getRequestID())) {
// LOGGER.debug("find request " + request.getRequestID() + " has been executed!");
// return;
// }
// }
// // TODO check status
// // executed requests
// for (ContractRequest request : block.requests) {
// String ret = CMActions.manager.executeLocally(request, null);
// LOGGER.debug(String.format(
// "[SelfAdaptiveShardingExecutor %s] result of request %s: %s",
// meta.getContractID(),
// request.getRequestID(),
// ret));
// executedTxs.put(request.getRequestID(), true);
// }
// LOGGER.info(String.format(
// "[SelfAdaptiveShardingExecutor %s] execute %d transactions of block %s",
// meta.getContractID(),
// block.requests.length,
// block.hash));
// // TODO create check point
// this.b = new Block(block.hash, this.b.height + 1);
// executedBlocks.add(block.hash);
// }
//
// private void submitBlock() {
// Block block = fillBlock();
// if (null != block) {
// LOGGER.info("deliver block " + block.hash + "...");
// LOGGER.debug(JsonUtil.toPrettyJson(block));
// String[] nodes = this.meta.getMembers();
// JsonObject req = new JsonObject();
// req.addProperty("action", "deliverBlock");
// req.addProperty("data", JsonUtil.toJson(block));
// req.addProperty("contractID", this.meta.getContractID());
// String reqStr = req.toString();
// // deliver blocks
// for (String node : nodes) {
// if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID())
// == RecoverFlag.Fine) {
// NetworkManager.instance.sendToAgent(node, reqStr);
// }
// }
// }
// }
//
// private synchronized Block fillBlock() {
// // pack contract requests into a block
// ContractRequest[] requests = new ContractRequest[Math.min(reqQueue.size(), SUBMIT_LIMIT)];
// if (requests.length == 0) {
// return null;
// }
// for (int i = 0; i < requests.length; ++i) {
// requests[i] = reqQueue.poll();
// }
// this.b.fillBlock(requests);
// return this.b;
// }
//
// static class Block extends SM2Verifiable {
// String prevHash = "0";
// String hash;
// int height;
// String checkPoint;
// String body;
// String nodePubKey;
// ContractRequest[] requests;
// long timestamp;
//
// public Block() {
// this.height = 0;
// }
//
// public Block(String prev, int height) {
// this.prevHash = prev;
// this.height = height;
// }
//
// public void fillBlock(ContractRequest[] requests) {
// this.requests = requests;
// this.timestamp = System.currentTimeMillis();
// this.body = merkle(requests);
// this.hash = computeHash();
// doSignature(CMActions.manager.nodeCenterConn.getNodeKeyPair());
// }
//
// public boolean isValid() {
// return computeHash().equals(hash) && body.equals(merkle(this.requests)) && verifySignature();
// }
//
// private String computeHash() {
// return HashUtil.sha3(
// String.valueOf(this.height),
// this.prevHash,
// this.checkPoint,
// this.body);
// }
//
// private String merkle(ContractRequest[] requests) {
// // manage requests as a merkle tree
// if (requests.length == 0) {
// return null;
// }
// if (requests.length == 1) {
// return HashUtil.sha3(requests[0].getRequestID());
// }
// Queue<String> reqQueue =
// Arrays.stream(requests).map(ContractRequest::getRequestID)
// .collect(Collectors.toCollection(ArrayDeque::new));
// do {
// int size;
// for (size = reqQueue.size(); size > 1; size -= 2) {
// reqQueue.add(HashUtil.sha3(reqQueue.poll(), reqQueue.poll()));
// }
// if (size == 1) {
// reqQueue.add(reqQueue.poll());
// }
// } while (1 != reqQueue.size());
// return reqQueue.poll();
// }
//
// @Override
// public String getPublicKey() {
// return nodePubKey;
// }
//
// @Override
// public void setPublicKey(String pubkey) {
// this.nodePubKey = pubkey;
// }
//
// @Override
// public String getContentStr() {
// return this.hash;
// }
// }
//}

View File

@ -1,63 +1,63 @@
package org.bdware.server;
import org.bdware.doip.core.doipMessage.DoipMessage;
import org.bdware.doip.core.doipMessage.DoipMessageFactory;
import org.bdware.doip.core.model.operations.BasicOperations;
import org.bdware.doip.endpoint.doipClient.DoipClientImpl;
import org.bdware.doip.endpoint.doipClient.DoipMessageCallback;
import org.junit.Before;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
public class DOIPTest {
public static String repoID = "86.5000.470/doip.localContractRepo";
@Before
public void init() {
}
@Test
public void registryTempOD() {
}
@Test
public void retrieve() {
try {
DoipClientImpl doipClient = new DoipClientImpl();
doipClient.connect("tcp://127.0.0.1:18002");
//86.5000.470/do.Yie0yPsjt4_bdw
List<DoipMessage> ret = new ArrayList<>();
DoipMessage msg = (new DoipMessageFactory.DoipMessageBuilder()).createRequest("86.5000.470/Counter", BasicOperations.Retrieve.getName())
.setBody("{\"operation\":\"count\",\"arg\":\"\"}".getBytes(StandardCharsets.UTF_8)).create();
doipClient.sendMessage(msg, new DoipMessageCallback() {
@Override
public void onResult(DoipMessage doipMessage) {
ret.add(doipMessage);
synchronized (ret) {
ret.notify();
}
}
});
synchronized (ret) {
ret.wait(5000);
}
DoipMessage result = null;
if (ret.size() > 0)
result = ret.get(0);
System.out.println(result.body.getDataAsJsonString());
// GlobalIrpClient.getGlobalClient().reRegister(doHandleRecord);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void call() {
}
}
//package org.bdware.server;
//
//import org.bdware.doip.core.doipMessage.DoipMessage;
//import org.bdware.doip.core.doipMessage.DoipMessageFactory;
//import org.bdware.doip.core.model.operations.BasicOperations;
//import org.bdware.doip.endpoint.doipClient.DoipClientImpl;
//import org.bdware.doip.endpoint.doipClient.DoipMessageCallback;
//import org.junit.Before;
//import org.junit.Test;
//
//import java.nio.charset.StandardCharsets;
//import java.util.ArrayList;
//import java.util.List;
//
//public class DOIPTest {
// public static String repoID = "86.5000.470/doip.localContractRepo";
//
//
// @Before
// public void init() {
// }
//
// @Test
// public void registryTempOD() {
//
// }
//
// @Test
// public void retrieve() {
// try {
// DoipClientImpl doipClient = new DoipClientImpl();
// doipClient.connect("tcp://127.0.0.1:18002");
// //86.5000.470/do.Yie0yPsjt4_bdw
// List<DoipMessage> ret = new ArrayList<>();
// DoipMessage msg = (new DoipMessageFactory.DoipMessageBuilder()).createRequest("86.5000.470/Counter", BasicOperations.Retrieve.getName())
// .setBody("{\"operation\":\"count\",\"arg\":\"\"}".getBytes(StandardCharsets.UTF_8)).create();
// doipClient.sendMessage(msg, new DoipMessageCallback() {
// @Override
// public void onResult(DoipMessage doipMessage) {
// ret.add(doipMessage);
// synchronized (ret) {
// ret.notify();
// }
// }
// });
// synchronized (ret) {
// ret.wait(5000);
// }
// DoipMessage result = null;
// if (ret.size() > 0)
// result = ret.get(0);
// System.out.println(result.body.getDataAsJsonString());
// // GlobalIrpClient.getGlobalClient().reRegister(doHandleRecord);
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
//
// @Test
// public void call() {
//
// }
//}