merge feature-dengshuang

step1. prune ContractExecutor
This commit is contained in:
CaiHQ 2021-11-18 19:45:52 +08:00
parent 801c1a93f5
commit 098c167a95
16 changed files with 151 additions and 166 deletions

View File

@ -0,0 +1,4 @@
# 相关类
ContractExecutor
sendRequest给其他节点MasterClientTCPAction."executeContractLocally")
receiveTrustfullyResult

View File

@ -1,4 +1,4 @@
#项说明 #项说明
在每个bundle 在每个bundle
git clone后 git clone后
都执行 都执行

View File

@ -34,9 +34,9 @@ import java.util.*;
public class CMActions implements OnHashCallback { public class CMActions implements OnHashCallback {
private static final String PARAM_ACTION = "action"; private static final String PARAM_ACTION = "action";
private static final JsonObject MISSING_ARGUMENT = JsonUtil.parseStringAsJsonObject("{\"action\":\"onExecuteResult\",\"executeTime\":-1," private static final JsonObject MISSING_ARGUMENT = JsonUtil.parseString("{\"action\":\"onExecuteResult\",\"executeTime\":-1,"
+ "\"status\":\"Error\",\"result\":\"missing arguments\"}"); + "\"status\":\"Error\",\"result\":\"missing arguments\"}");
private static final JsonObject INVALID_DOI = JsonUtil.parseStringAsJsonObject( private static final JsonObject INVALID_DOI = JsonUtil.parseString(
"{\"action\":\"onExecuteResult\",\"executeTime\":-1," "{\"action\":\"onExecuteResult\",\"executeTime\":-1,"
+ "\"status\":\"Error\",\"result\":\"invalid contract doi\"}"); + "\"status\":\"Error\",\"result\":\"invalid contract doi\"}");
private static final Logger LOGGER = LogManager.getLogger(CMActions.class); private static final Logger LOGGER = LogManager.getLogger(CMActions.class);
@ -117,17 +117,15 @@ public class CMActions implements OnHashCallback {
final JsonObject args, final JsonObject args,
final ResultCallback resultCallback, final ResultCallback resultCallback,
final OnHashCallback hashcb) { final OnHashCallback hashcb) {
final ContractRequest cReq = new ContractRequest(); final ContractRequest c = new ContractRequest();
if (!args.has("contractName") && if (!args.has("contractName") && !args.has("contractID") && !args.has("contractDOI")) {
!args.has("contractID") &&
!args.has("contractDOI")) {
resultCallback.onResult(MISSING_ARGUMENT); resultCallback.onResult(MISSING_ARGUMENT);
return; return;
} }
if (args.has("contractDOI") && !args.has("contractID")) { if (args.has("contractDOI") && !args.has("contractID")) {
LOGGER.debug("contractDOI: " + args.get("contractDOI").getAsString()); LOGGER.debug("contractDOI: " + args.get("contractDOI").getAsString());
try { try {
cReq.setContractDOI(args.get("contractDOI").getAsString()); c.setContractDOI(args.get("contractDOI").getAsString());
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
resultCallback.onResult(INVALID_DOI); resultCallback.onResult(INVALID_DOI);
@ -135,91 +133,86 @@ public class CMActions implements OnHashCallback {
} }
} else { } else {
if (args.has("contractName")) { if (args.has("contractName")) {
cReq.setContractID(args.get("contractName").getAsString()); c.setContractID(args.get("contractName").getAsString());
} }
if (args.has("contractID")) { if (args.has("contractID")) {
cReq.setContractID(args.get("contractID").getAsString()); c.setContractID(args.get("contractID").getAsString());
} }
} }
if (args.has("isDebug")) { if (args.has("isDebug")) c.setFromDebug(args.get("isDebug").getAsBoolean());
cReq.setFromDebug(args.get("isDebug").getAsBoolean());
}
if (args.has("withDynamicAnalysis")) if (args.has("withDynamicAnalysis"))
cReq.withDynamicAnalysis = args.get("withDynamicAnalysis").getAsBoolean(); c.withDynamicAnalysis = args.get("withDynamicAnalysis").getAsBoolean();
if (args.has("withEvaluatesAnalysis")) if (args.has("withEvaluatesAnalysis"))
cReq.withEvaluatesAnalysis = args.get("withEvaluatesAnalysis").getAsBoolean(); c.withEvaluatesAnalysis = args.get("withEvaluatesAnalysis").getAsBoolean();
if (!args.has("arg")) { if (!args.has("arg")) {
resultCallback.onResult(MISSING_ARGUMENT); resultCallback.onResult(MISSING_ARGUMENT);
return; return;
} }
if (args.has("operation")) { if (args.has("operation")) {
cReq.setAction(args.get("operation").getAsString()); c.setAction(args.get("operation").getAsString());
cReq.setArg(args.get("arg").getAsString()); c.setArg(args.get("arg").getAsString());
} else { } else {
JsonObject jo = JsonParser.parseString(args.get("arg").getAsString()).getAsJsonObject(); JsonObject jo = JsonParser.parseString(args.get("arg").getAsString()).getAsJsonObject();
if (!jo.has("action") || !jo.has("arg")) { if (!jo.has("action") || !jo.has("arg")) {
resultCallback.onResult(MISSING_ARGUMENT); resultCallback.onResult(MISSING_ARGUMENT);
return; return;
} }
cReq.setAction(jo.get("action").getAsString()); c.setAction(jo.get("action").getAsString());
cReq.setArg(jo.get("arg").getAsString()); c.setArg(jo.get("arg").getAsString());
if (cReq.withEvaluatesAnalysis) { if (c.withEvaluatesAnalysis) {
cReq.setValue(jo.get("hasValue").getAsLong()); c.setValue(jo.get("hasValue").getAsLong());
} }
} }
if (args.has("gasLimit")) { if (args.has("gasLimit")) c.setGasLimit(args.get("gasLimit").getAsLong());
cReq.setGasLimit(args.get("gasLimit").getAsLong());
}
if (args.has("requester")) { if (args.has("requester")) {
cReq.setPublicKey(args.get("requester").getAsString()); c.setPublicKey(args.get("requester").getAsString());
byte[] sign = ByteUtils.fromHexString(args.get("signature").getAsString()); byte[] sign = ByteUtils.fromHexString(args.get("signature").getAsString());
cReq.setSignature(ByteUtils.toHexString(sign)); c.setSignature(ByteUtils.toHexString(sign));
} }
if (args.has("requesterDOI")) { if (args.has("requesterDOI")) {
cReq.setRequesterDOI(args.get("requesterDOI").getAsString()); c.setRequesterDOI(args.get("requesterDOI").getAsString());
} else { } else {
cReq.setRequesterDOI("empty"); c.setRequesterDOI("empty");
} }
if (args.has("pubkey")) { if (args.has("pubkey")) {
cReq.setPublicKey(args.get("pubkey").getAsString()); c.setPublicKey(args.get("pubkey").getAsString());
byte[] sign = ByteUtils.fromHexString(args.get("signature").getAsString()); byte[] sign = ByteUtils.fromHexString(args.get("signature").getAsString());
cReq.setSignature(ByteUtils.toHexString(sign)); c.setSignature(ByteUtils.toHexString(sign));
} }
if (cReq.getPublicKey() != null) { if (c.getPublicKey() != null) {
if (!cReq.verifySignature()) { if (!c.verifySignature()) {
cReq.setPublicKey(null); c.setPublicKey(null);
cReq.setRequester(null); c.setRequester(null);
} }
} }
String reqID; String reqID;
if (args.has("requestID")) { if (args.has("requestID")) reqID = args.get("requestID").getAsString();
reqID = args.get("requestID").getAsString(); else {
} else {
reqID = System.currentTimeMillis() + "_" + (int) (Math.random() * 1000); reqID = System.currentTimeMillis() + "_" + (int) (Math.random() * 1000);
args.addProperty("requestID", reqID); args.addProperty("requestID", reqID);
} }
cReq.setRequestID(reqID); c.setRequestID(reqID);
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
manager.executeContractInternal( manager.executeContractInternal(
cReq, c,
new ResultCallback() { new ResultCallback() {
@Override @Override
public void onResult(JsonObject ret) { public void onResult(JsonObject ret) {
ret.addProperty("responseID", cReq.getRequestID()); ret.addProperty("responseID", c.getRequestID());
ret.addProperty("action", "onExecuteResult"); ret.addProperty("action", "onExecuteResult");
String costTime = (System.currentTimeMillis() - start) + ""; String costTime = (System.currentTimeMillis() - start) + "";
ret.addProperty("executeTime", costTime); ret.addProperty("executeTime", costTime);
ContractMeta meta = ContractMeta meta =
manager.statusRecorder.getContractMeta(cReq.getContractID()); manager.statusRecorder.getContractMeta(c.getContractID());
if (meta != null && meta.getIsDebug()) { if (meta != null && meta.getIsDebug()) {
FUNCINVOKEINFO.putOneInvoke( FUNCINVOKEINFO.putOneInvoke(
cReq.getContractID(), c.getContractID(),
cReq.getAction(), c.getAction(),
cReq.getRequestID(), c.getRequestID(),
cReq.getArg(), c.getArg(),
ret.has("result") ? ret.get("result").toString() : ""); ret.has("result") ? ret.get("result").toString() : "");
} }
System.out.println(ret);
resultCallback.onResult(ret.toString()); resultCallback.onResult(ret.toString());
} }
@ -247,7 +240,7 @@ public class CMActions implements OnHashCallback {
} }
// JsonObject jo = // JsonObject jo =
// JsonParser.parseStringAsJsonObject(args.get("arg").getAsString()).getAsJsonObject(); // JsonParser.parseString(args.get("arg").getAsString()).getAsJsonObject();
// if (!jo.has("action")) { // if (!jo.has("action")) {
// resultCallback.onResult(MISSING_ARGUMENT); // resultCallback.onResult(MISSING_ARGUMENT);
// return; // return;
@ -475,7 +468,7 @@ public class CMActions implements OnHashCallback {
String data = "failed"; String data = "failed";
String contractID = ""; String contractID = "";
String operation = ""; String operation = "";
//JsonElement mask = JsonParser.parseStringAsJsonObject(""); //JsonElement mask = JsonParser.parseString("");
if (args.has("contractID") && args.has("operation") && args.has("arg")) { if (args.has("contractID") && args.has("operation") && args.has("arg")) {
contractID = args.get("contractID").getAsString(); contractID = args.get("contractID").getAsString();
System.out.println(contractID); System.out.println(contractID);
@ -1165,17 +1158,17 @@ public class CMActions implements OnHashCallback {
public void connectTo(JsonObject args, ResultCallback resultCallback) { public void connectTo(JsonObject args, ResultCallback resultCallback) {
String data; String data;
if (!args.has("id")) { if (!args.has("id")) {
ReplyUtil.simpleReply(resultCallback, "onConnectTo", "missing contract id"); ReplyUtil.simpleReply(resultCallback,"onConnectTo","missing contract id");
return; return;
} }
String contractID = args.get("id").getAsString(); String contractID = args.get("id").getAsString();
LOGGER.info("connectTo:" + contractID); LOGGER.info("connectTo:" + contractID);
if (contractID == null) { if (contractID == null) {
ReplyUtil.simpleReply(resultCallback, "onConnectTo", "can't find contract id"); ReplyUtil.simpleReply(resultCallback,"onConnectTo","can't find contract id");
return; return;
} }
manager.redirect(contractID, createPS(), ""); manager.redirect(contractID, createPS(), "");
ReplyUtil.simpleReply(resultCallback, "onConnectTo", "success"); ReplyUtil.simpleReply(resultCallback,"onConnectTo","success");
} }
private PrintStream createPS() { private PrintStream createPS() {
@ -1250,7 +1243,7 @@ public class CMActions implements OnHashCallback {
ExecutionManager.instance.updateLocalContractToNodeCenter(); ExecutionManager.instance.updateLocalContractToNodeCenter();
} }
} else { } else {
ReplyUtil.simpleReply(resultCallback, "onKillContractProcess", "Failed: Illegal parameters"); ReplyUtil.simpleReply(resultCallback,"onKillContractProcess","Failed: Illegal parameters");
} }
} }
@ -1510,14 +1503,14 @@ public class CMActions implements OnHashCallback {
e.printStackTrace(); e.printStackTrace();
} }
ExecutionManager.instance.updateLocalContractToNodeCenter(); ExecutionManager.instance.updateLocalContractToNodeCenter();
ReplyUtil.simpleReply(resultCallback, "onKillAllContract", sb.toString()); ReplyUtil.simpleReply(resultCallback,"onKillAllContract",sb.toString());
manager.stopAllContracts(); manager.stopAllContracts();
} else { } else {
manager.stopAllContractsWithOwner(args.get(("verifiedPubKey")).getAsString()); manager.stopAllContractsWithOwner(args.get(("verifiedPubKey")).getAsString());
ReplyUtil.simpleReply(resultCallback, "onKillAllContract", "Success"); ReplyUtil.simpleReply(resultCallback,"onKillAllContract","Success");
} }
} else { } else {
ReplyUtil.simpleReply(resultCallback, "onKillAllContract", "Failed: Illegal user"); ReplyUtil.simpleReply(resultCallback,"onKillAllContract","Failed: Illegal user");
} }
} }
@ -1852,7 +1845,7 @@ public class CMActions implements OnHashCallback {
LOGGER.debug("startContractConfig"); LOGGER.debug("startContractConfig");
// TODO private contract // TODO private contract
// if (args.has("isPrivate") && args.get("isPrivate").getAsBoolean()) { // if (args.has("isPrivate") && args.get("isPrivate").getAsBoolean()) {
// args.add("pubkey", JsonParser.parseStringAsJsonObject(handler.getPubKey())); // args.add("pubkey", JsonParser.parseString(handler.getPubKey()));
// } // }
boolean type = args.get("type").getAsString().equalsIgnoreCase("TCP"); boolean type = args.get("type").getAsString().equalsIgnoreCase("TCP");
String contract = args.get("contractName").getAsString(); String contract = args.get("contractName").getAsString();

View File

@ -17,8 +17,15 @@ import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.util.JsonUtil; import org.bdware.sc.util.JsonUtil;
import org.bdware.server.GlobalConf; import org.bdware.server.GlobalConf;
import org.bdware.server.action.*; import org.bdware.server.action.*;
import org.bdware.server.executor.RequestAllExecutor;
import org.bdware.server.executor.unconsistency.MultiPointCooperationExecutor;
import org.bdware.server.executor.unconsistency.RequestOnceExecutor;
import org.bdware.server.executor.unconsistency.ResponseOnceExecutor;
import org.bdware.server.tcp.TCPClientFrameHandler; import org.bdware.server.tcp.TCPClientFrameHandler;
import org.bdware.server.trustedmodel.*; import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.KillUnitContractInfo;
import org.bdware.server.trustedmodel.MasterProxy;
import org.bdware.server.trustedmodel.SingleNodeExecutor;
import org.bdware.units.NetworkManager; import org.bdware.units.NetworkManager;
import org.bdware.units.function.ExecutionManager; import org.bdware.units.function.ExecutionManager;
import org.zz.gmhelper.SM2KeyPair; import org.zz.gmhelper.SM2KeyPair;
@ -132,7 +139,7 @@ public class MasterClientTCPAction {
switch (contract.getType()) { switch (contract.getType()) {
case Sole: case Sole:
LOGGER.info("Sole contract is not supported in multi-point mode"); LOGGER.info("Sole contract is not supported in multi-point mode");
return null; return SingleNodeExecutor.instance;
case RequestOnce: case RequestOnce:
executor = new RequestOnceExecutor(contractID); executor = new RequestOnceExecutor(contractID);
break; break;
@ -383,6 +390,7 @@ public class MasterClientTCPAction {
contractID2MasterInfo.put(contractID, this); // 记录contractID master之间的对应关系 contractID2MasterInfo.put(contractID, this); // 记录contractID master之间的对应关系
MultiContractMeta cei = MultiContractMeta cei =
CMActions.manager.multiContractRecorder.createIfNotExist(contractID); CMActions.manager.multiContractRecorder.createIfNotExist(contractID);
cei.setLastExeSeq(-1); cei.setLastExeSeq(-1);
if (!contract.getScriptStr().startsWith("/")) { if (!contract.getScriptStr().startsWith("/")) {
contract.setScript(dumpToDisk(contract, jo)); contract.setScript(dumpToDisk(contract, jo));
@ -433,11 +441,14 @@ public class MasterClientTCPAction {
} }
LOGGER.info("启动参数: " + JsonUtil.toJson(contract)); LOGGER.info("启动参数: " + JsonUtil.toJson(contract));
cei.setContractExecutor(createContractExecutor(contract, contractID)); // 分配不同的Executor
String ret = CMActions.manager.startContract(contract); // 调用CMActions 里的启动合约的方法,启动结果 String ret = CMActions.manager.startContract(contract); // 调用CMActions 里的启动合约的方法,启动结果
LOGGER.info("启动结果为 " + ret); LOGGER.info("启动结果为 " + ret);
CMActions.manager.multiContractRecorder.updateValue(cei); CMActions.manager.multiContractRecorder.updateValue(cei);
ContractMeta meta = CMActions.manager.statusRecorder.createIfNotExist(contractID);
meta.setContractExecutor(createContractExecutor(contract, contractID)); // 分配不同的Executor
// TODO 合约终止后从数据库中移除但是为了测试可以人为制造合约终止但不从数据库中移除异常停止 // TODO 合约终止后从数据库中移除但是为了测试可以人为制造合约终止但不从数据库中移除异常停止
KeyValueDBUtil.instance.setValue(CMTables.UnitContracts.toString(), contractID, "exist"); KeyValueDBUtil.instance.setValue(CMTables.UnitContracts.toString(), contractID, "exist");
@ -536,15 +547,12 @@ public class MasterClientTCPAction {
result.onResult(JsonUtil.toJson(ret)); result.onResult(JsonUtil.toJson(ret));
} else { // reqeust all response all need seq } else { // reqeust all response all need seq
String contractID = request.getContractID(); String contractID = request.getContractID();
MultiContractMeta cei = MultiContractMeta cei =
CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
boolean putToQueue = false; boolean putToQueue = false;
// 对于多点合约的请求 // 对于多点合约的请求
if (request.getRequestID().endsWith("_mul")) { if (request.getRequestID().endsWith("_mul")) {
// 收到的多点请求不能同时处理 // 收到的多点请求不能同时处理
boolean isFirst = false; boolean isFirst = false;
synchronized (MultiRequestInfo.lock) { // 加锁防止同时放入多个同requestID的请求到cei的队列中 synchronized (MultiRequestInfo.lock) { // 加锁防止同时放入多个同requestID的请求到cei的队列中
if (MultiRequestInfo.reqInfos.containsKey(request.getRequestID())) { if (MultiRequestInfo.reqInfos.containsKey(request.getRequestID())) {

View File

@ -18,7 +18,7 @@ import org.bdware.server.action.Action;
import org.bdware.server.action.CMActions; import org.bdware.server.action.CMActions;
import org.bdware.server.tcp.TCPServerFrameHandler; import org.bdware.server.tcp.TCPServerFrameHandler;
import org.bdware.server.trustedmodel.ContractUnitStatus; import org.bdware.server.trustedmodel.ContractUnitStatus;
import org.bdware.server.trustedmodel.RequestAllExecutor; import org.bdware.server.executor.RequestAllExecutor;
import org.bdware.server.trustedmodel.SlaveNode; import org.bdware.server.trustedmodel.SlaveNode;
import org.bdware.units.NetworkManager; import org.bdware.units.NetworkManager;
import org.zz.gmhelper.SM2KeyPair; import org.zz.gmhelper.SM2KeyPair;
@ -317,14 +317,14 @@ public class MasterServerRecoverMechAction {
} }
ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID); ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID);
cei.setContractExecutor( meta.setContractExecutor(
MasterClientTCPAction.createContractExecutor(meta.contract, contractID)); MasterClientTCPAction.createContractExecutor(meta.contract, contractID));
switch (meta.contract.getType()) { switch (meta.contract.getType()) {
case RequestAllResponseFirst: case RequestAllResponseFirst:
case RequestAllResponseHalf: case RequestAllResponseHalf:
case RequestAllResponseAll: case RequestAllResponseAll:
case Sharding: case Sharding:
((RequestAllExecutor) cei.contractExecutor).setSeq(cei.getLastExeSeq() + 1); ((RequestAllExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1);
break; break;
default: default:
break; break;

View File

@ -17,16 +17,14 @@ import org.bdware.sc.units.RecoverFlag;
import org.bdware.sc.units.RequestCache; import org.bdware.sc.units.RequestCache;
import org.bdware.sc.util.JsonUtil; import org.bdware.sc.util.JsonUtil;
import org.bdware.server.CongestionControl; import org.bdware.server.CongestionControl;
import org.bdware.server.GlobalConf;
import org.bdware.server.action.Action; import org.bdware.server.action.Action;
import org.bdware.server.action.CMActions; import org.bdware.server.action.CMActions;
import org.bdware.server.action.SyncResult; import org.bdware.server.action.SyncResult;
import org.bdware.server.executor.RequestAllExecutor;
import org.bdware.server.tcp.TCPServerFrameHandler; import org.bdware.server.tcp.TCPServerFrameHandler;
import org.bdware.server.trustedmodel.KillUnitContractResultCollector; import org.bdware.server.trustedmodel.KillUnitContractResultCollector;
import org.bdware.server.trustedmodel.RequestAllExecutor;
import org.bdware.server.trustedmodel.ResultCollector; import org.bdware.server.trustedmodel.ResultCollector;
import org.bdware.server.trustedmodel.SlaveNode; import org.bdware.server.trustedmodel.SlaveNode;
import org.zz.gmhelper.SM2KeyPair;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
@ -253,22 +251,23 @@ public class MasterServerTCPAction {
MasterServerTCPAction.sync.wakeUp(responseID, jo.toString()); MasterServerTCPAction.sync.wakeUp(responseID, jo.toString());
} }
@Action(async = true) // TODO to Remove
// 假设该结点有运行这个合约调用的合约 // @Action(async = true)
public void executeContractLocallyServer(JsonObject jo, ResultCallback cb) { // // 假设该结点有运行这个合约调用的合约
final ContractRequest request = // public void executeContractLocallyServer(JsonObject jo, ResultCallback cb) {
JsonUtil.fromJson(jo.get("data").toString(), ContractRequest.class); // final ContractRequest request =
long start = System.currentTimeMillis(); // JsonUtil.fromJson(jo.get("data").toString(), ContractRequest.class);
String data2 = CMActions.manager.executeLocally(request, null); // long start = System.currentTimeMillis();
Map<String, String> ret = new HashMap<>(); // String data2 = CMActions.manager.executeLocally(request, null);
ret.put("action", "receiveTrustfullyResultServer"); // Map<String, String> ret = new HashMap<>();
SM2KeyPair keyPair = GlobalConf.instance.keyPair; // ret.put("action", "receiveTrustfullyResultServer");
ret.put("nodeID", keyPair.getPublicKeyStr()); // SM2KeyPair keyPair = GlobalConf.instance.keyPair;
ret.put("responseID", jo.get("uniReqID").getAsString()); // ret.put("nodeID", keyPair.getPublicKeyStr());
ret.put("executeTime", (System.currentTimeMillis() - start) + ""); // ret.put("responseID", jo.get("uniReqID").getAsString());
ret.put("data", data2); // ret.put("executeTime", (System.currentTimeMillis() - start) + "");
cb.onResult(JsonUtil.toJson(ret)); // ret.put("data", data2);
} // cb.onResult(JsonUtil.toJson(ret));
// }
@Action(async = true) @Action(async = true)
public void setNodeInfo(JsonObject jo, ResultCallback cb) { public void setNodeInfo(JsonObject jo, ResultCallback cb) {
@ -334,8 +333,9 @@ public class MasterServerTCPAction {
// 这个是个多节点的合约 // 这个是个多节点的合约
// Just forward it to the correct Node // Just forward it to the correct Node
// Master节点直接发3个聚合后返回结果 // Master节点直接发3个聚合后返回结果
info.contractExecutor.execute( contractMeta.contractExecutor.execute(
requestID, requestID,
JsonUtil.fromJson(cr, ContractRequest.class),
new ResultCallback() { new ResultCallback() {
@Override @Override
public void onResult(String str) { public void onResult(String str) {
@ -346,8 +346,7 @@ public class MasterServerTCPAction {
cb.onResult(JsonUtil.toJson(result)); cb.onResult(JsonUtil.toJson(result));
CongestionControl.masterServerLoad.decrementAndGet(); CongestionControl.masterServerLoad.decrementAndGet();
} }
}, }, null);
JsonUtil.fromJson(cr, ContractRequest.class));
} else { } else {
ContractRequest contractRequest = JsonUtil.fromJson(cr, ContractRequest.class); ContractRequest contractRequest = JsonUtil.fromJson(cr, ContractRequest.class);
if (contractMeta.getStatus() == HANGED) { if (contractMeta.getStatus() == HANGED) {
@ -359,20 +358,19 @@ public class MasterServerTCPAction {
// 这个是个单节点的合约 // 这个是个单节点的合约
// executeContract(CacheTest应该要有多个进来 // executeContract(CacheTest应该要有多个进来
if (null != client && !client.getContractType().needSeq()) { if (null != client && !client.getContractType().needSeq()) {
CMActions.manager.executeLocallyAsync( contractMeta.contractExecutor.execute(requestID, contractRequest, new ResultCallback() {
contractRequest, @Override
new ResultCallback() { public void onResult(String str) {
@Override Map<String, String> result = new HashMap<>();
public void onResult(String str) { result.put("action", "receiveContractExecution");
Map<String, String> result = new HashMap<>(); result.put("responseID", cr.get("requestID").getAsString());
result.put("action", "receiveContractExecution"); result.put("data", str);
result.put("responseID", cr.get("requestID").getAsString()); cb.onResult(JsonUtil.toJson(result));
result.put("data", str); LOGGER.debug("Return requestContractExecution:"+JsonUtil.toJson(result));
cb.onResult(JsonUtil.toJson(result));
CongestionControl.masterServerLoad.decrementAndGet(); CongestionControl.masterServerLoad.decrementAndGet();
} }
}, }, null);
null);
} else { } else {
LOGGER.debug("send ReRoute response:" + cr.toString()); LOGGER.debug("send ReRoute response:" + cr.toString());
JsonObject result = new JsonObject(); JsonObject result = new JsonObject();

View File

@ -1,4 +1,4 @@
package org.bdware.server.trustedmodel; package org.bdware.server.executor;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import com.google.gson.JsonParser; import com.google.gson.JsonParser;
@ -9,7 +9,9 @@ import org.bdware.sc.ComponedContractResult;
import org.bdware.sc.ContractResult; import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractExecType; import org.bdware.sc.bean.ContractExecType;
import org.bdware.sc.bean.ContractRequest; import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.RecoverFlag; import org.bdware.sc.units.RecoverFlag;
import org.bdware.sc.units.RequestCache; import org.bdware.sc.units.RequestCache;
import org.bdware.sc.units.ResultCache; import org.bdware.sc.units.ResultCache;
@ -17,6 +19,10 @@ import org.bdware.sc.util.JsonUtil;
import org.bdware.server.action.CMActions; import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterServerRecoverMechAction; import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
import org.bdware.server.action.p2p.MasterServerTCPAction; import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.MultiReqSeq;
import org.bdware.server.trustedmodel.ResultCollector;
import org.bdware.server.trustedmodel.SlaveNode;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -131,9 +137,13 @@ public class RequestAllExecutor implements ContractExecutor {
} }
@Override @Override
public void execute(String requestID, ResultCallback rc, ContractRequest req) { public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
LOGGER.debug(JsonUtil.toJson(req)); LOGGER.debug(JsonUtil.toJson(req));
MultiContractMeta meta = CMActions.manager.multiContractRecorder.getMultiContractMeta(req.getContractID());
if (meta == null || !meta.isMaster()) {
CMActions.manager.executeContractOnOtherNodes(req, rc);
return;
}
req.setContractID(CMActions.manager.getContractIDByName(req.getContractID())); req.setContractID(CMActions.manager.getContractIDByName(req.getContractID()));
// 三个相同requestID进来的时候会有冲突 // 三个相同requestID进来的时候会有冲突
@ -141,6 +151,9 @@ public class RequestAllExecutor implements ContractExecutor {
// 这里是从MasterServer->MasterClient请求的是"executeContractLocally" // 这里是从MasterServer->MasterClient请求的是"executeContractLocally"
// 如果是多点合约的请求A1A2A3的序号应该一致不能分配一个新的seq根据requestID判断是否不需要重新分配一个序号 // 如果是多点合约的请求A1A2A3的序号应该一致不能分配一个新的seq根据requestID判断是否不需要重新分配一个序号
//TODO seqMap memory leak
//TODO
//TODO
if (null != requestID && requestID.endsWith("_mul")) { if (null != requestID && requestID.endsWith("_mul")) {
synchronized (lock) { synchronized (lock) {
if (seqMap.containsKey(requestID)) { if (seqMap.containsKey(requestID)) {

View File

@ -1,4 +1,4 @@
package org.bdware.server.trustedmodel; package org.bdware.server.executor.unconsistency;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import com.google.gson.JsonParser; import com.google.gson.JsonParser;
@ -12,6 +12,7 @@ import org.bdware.sc.bean.ContractExecType;
import org.bdware.sc.bean.ContractRequest; import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.bean.FunctionDesp; import org.bdware.sc.bean.FunctionDesp;
import org.bdware.sc.bean.RouteInfo; import org.bdware.sc.bean.RouteInfo;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta; import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.RecoverFlag; import org.bdware.sc.units.RecoverFlag;
@ -21,6 +22,10 @@ import org.bdware.sc.util.JsonUtil;
import org.bdware.server.action.CMActions; import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterServerRecoverMechAction; import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
import org.bdware.server.action.p2p.MasterServerTCPAction; import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.MultiReqSeq;
import org.bdware.server.trustedmodel.ResultCollector;
import org.bdware.server.trustedmodel.SlaveNode;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.HashMap; import java.util.HashMap;
@ -195,7 +200,7 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
} }
@Override @Override
public void execute(String requestID, ResultCallback rc, ContractRequest req) { public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
LOGGER.info("[MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req)); LOGGER.info("[MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req));
// 获得action 函数名 // 获得action 函数名
LOGGER.info("action is : " + req.getAction()); LOGGER.info("action is : " + req.getAction());

View File

@ -1,14 +1,17 @@
package org.bdware.server.trustedmodel; package org.bdware.server.executor.unconsistency;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import com.google.gson.JsonParser; import com.google.gson.JsonParser;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.bdware.sc.bean.ContractRequest; import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.util.JsonUtil; import org.bdware.sc.util.JsonUtil;
import org.bdware.server.action.CMActions; import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterServerTCPAction; import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.SlaveNode;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -22,10 +25,8 @@ public class RequestOnceExecutor implements ContractExecutor {
public RequestOnceExecutor(String contractID) { public RequestOnceExecutor(String contractID) {
this.contractID = contractID; this.contractID = contractID;
} }
@Override @Override
public void execute(String requestID, ResultCallback rc, ContractRequest req) { public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
// String contractID = req.getContractID();
ResultCallback cb = ResultCallback cb =
new ResultCallback() { new ResultCallback() {
@Override @Override

View File

@ -1,4 +1,4 @@
package org.bdware.server.trustedmodel; package org.bdware.server.executor.unconsistency;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import com.google.gson.JsonParser; import com.google.gson.JsonParser;
@ -6,10 +6,13 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.bdware.sc.ContractResult; import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractRequest; import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.util.JsonUtil; import org.bdware.sc.util.JsonUtil;
import org.bdware.server.action.CMActions; import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterServerTCPAction; import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.SlaveNode;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -25,7 +28,7 @@ public class ResponseOnceExecutor implements ContractExecutor {
} }
@Override @Override
public void execute(String requestID, ResultCallback rc, ContractRequest req) { public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
executeInternal(requestID, rc, req, 2); executeInternal(requestID, rc, req, 2);
} }

View File

@ -1,4 +1,4 @@
package org.bdware.server.trustedmodel; package org.bdware.server.executor.unconsistency;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import com.google.gson.JsonParser; import com.google.gson.JsonParser;
@ -9,28 +9,31 @@ import org.bdware.sc.ContractMeta;
import org.bdware.sc.bean.ContractRequest; import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.bean.FunctionDesp; import org.bdware.sc.bean.FunctionDesp;
import org.bdware.sc.bean.RouteInfo; import org.bdware.sc.bean.RouteInfo;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta; import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.util.JsonUtil; import org.bdware.sc.util.JsonUtil;
import org.bdware.server.action.CMActions; import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterServerTCPAction; import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.SlaveNode;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
public class RouteEnabledExecutor implements ContractExecutor { public class _UNUSED_RouteEnabledExecutor implements ContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(RouteEnabledExecutor.class); private static final Logger LOGGER = LogManager.getLogger(_UNUSED_RouteEnabledExecutor.class);
private final String contractID; private final String contractID;
AtomicInteger order = new AtomicInteger(0); AtomicInteger order = new AtomicInteger(0);
public RouteEnabledExecutor(String contractID) { public _UNUSED_RouteEnabledExecutor(String contractID) {
this.contractID = contractID; this.contractID = contractID;
} }
@Override @Override
public void execute(String requestID, ResultCallback rc, ContractRequest req) { public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hashCallback) {
executeInternal(requestID, rc, req, 2); executeInternal(requestID, rc, req, 2);
} }

View File

@ -112,7 +112,7 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler<Object> {
ByteBuf bb = (ByteBuf) frame; ByteBuf bb = (ByteBuf) frame;
JsonObject arg; JsonObject arg;
try { try {
arg = JsonUtil.parseReaderAsJsonObject(new InputStreamReader(new ByteBufInputStream(bb))); arg = JsonUtil.parseObject(new InputStreamReader(new ByteBufInputStream(bb)));
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
Response response = new Response(); Response response = new Response();
@ -183,7 +183,7 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler<Object> {
JsonObject jo = JsonUtil.parseStringAsJsonObject(json); JsonObject jo = JsonUtil.parseStringAsJsonObject(json);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
LOGGER.warn("JsonParse Error: " + e.getMessage() + "\n\t" + json); System.out.println("============[MasterClientFrameHandler]JsonParse Error:" + json);
} }
ByteBuf buf = Unpooled.wrappedBuffer(json.getBytes()); ByteBuf buf = Unpooled.wrappedBuffer(json.getBytes());
ctx.channel().writeAndFlush(buf); ctx.channel().writeAndFlush(buf);

View File

@ -57,42 +57,6 @@ public class MasterProxy implements MasterStub {
} }
} }
@Override
public void executeByMaster(
ContractClient client, ResultCallback rcb, ContractRequest request) {
assert client.isUnit();
// ********** hyy ********** //
// 修改这个地方的执行逻辑,判断路由
LOGGER.debug(client.getContractType());//sharding
// ********** hyy ********** //
// assert client.isMaster();
LOGGER.debug("test location");
// TODO 这儿有问题
// 进来了三个相同的ID的东西
MultiContractMeta contractInfo =
CMActions.manager.multiContractRecorder.getMultiContractMeta(client.getContractID());
long start = System.currentTimeMillis();
contractInfo.contractExecutor.execute(
request.getRequestID(),
new ResultCallback() {
@Override
public void onResult(String ret) {
JsonObject result = JsonUtil.parseStringAsJsonObject(ret);
ContractManager.instance.extractEventsFromContractResult(
null, result, client, request, start);
LOGGER.debug(
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(new Date(System.currentTimeMillis()))
+ " [MasterProxy] executeByMaster 结果是 "
+ ret
+ "\n");
rcb.onResult(result);
}
},
request);
}
@Override @Override
public void executeByOtherNodeAsync(String pubKey, ContractRequest c, ResultCallback cb) { public void executeByOtherNodeAsync(String pubKey, ContractRequest c, ResultCallback cb) {
MasterConnector handler = CONNECTORS.get(pubKey); MasterConnector handler = CONNECTORS.get(pubKey);

View File

@ -185,6 +185,9 @@ public class NetworkManager {
public NodeCenterClientHandler getNodeCenterClientHandler() { public NodeCenterClientHandler getNodeCenterClientHandler() {
return nodeCenterClientHandler; return nodeCenterClientHandler;
} }
public void sendToNodeCenter(String msg) {
nodeCenterClientHandler.sendMsg(msg);
}
/** /**
* send to all kinds including special receivers * send to all kinds including special receivers
@ -204,9 +207,6 @@ public class NetworkManager {
send(unitMessage); send(unitMessage);
} }
public void sendToNodeCenter(String msg) {
nodeCenterClientHandler.sendMsg(msg);
}
/** /**
* send to TCP nodes, if fail send by p2p * send to TCP nodes, if fail send by p2p

View File

@ -1,7 +0,0 @@
### 设置###
log4j.rootLogger=info,stdout
### 输出信息到控制台 ###
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%-5p] %d{HH:mm:ss.SSS} %m (%F:%L)[%M]%n

View File

@ -10,6 +10,6 @@ appender.rolling.append=true
appender.rolling.fileName=./log/cm.log appender.rolling.fileName=./log/cm.log
appender.rolling.layout.type=PatternLayout appender.rolling.layout.type=PatternLayout
appender.rolling.layout.pattern=%d-%m%n appender.rolling.layout.pattern=%d-%m%n
rootLogger.level=info rootLogger.level=debug
rootLogger.appenderRef.stdout.ref=STDOUT rootLogger.appenderRef.stdout.ref=STDOUT
rootLogger.appenderRef.log.ref=log rootLogger.appenderRef.log.ref=log