This commit is contained in:
Frank.R.Wu 2021-11-21 23:48:32 +08:00
parent 9fb5cd6eb4
commit 691045326e
4 changed files with 222 additions and 219 deletions

View File

@ -43,11 +43,10 @@ public class CMActions implements OnHashCallback {
"{\"action\":\"onExecuteResult\",\"executeTime\":-1,"
+ "\"status\":\"Error\",\"result\":\"invalid contract doi\"}");
private static final Logger LOGGER = LogManager.getLogger(CMActions.class);
public static ContractManager manager = initManager();
public static FuncInvokeInfo FUNCINVOKEINFO = new FuncInvokeInfo(); // 合约调用时参数和结果
public static ContractManager manager = initManager();
private static SecureRandom RANDOM;
public ContractManagerFrameHandler handler;
public CMActions() {
handler = null;
}
@ -392,6 +391,28 @@ public class CMActions implements OnHashCallback {
return str;
}
public static void killContractByMaster(
String contractID, JsonObject request, ResultCallback rc) {
LOGGER.info("[MasterClientTCPAction] killContract : ");
try {
MasterClientTCPAction.killUnitContractMap.put(
request.get("requestID").getAsString(),
new KillUnitContractInfo(rc, System.currentTimeMillis()));
MultiContractMeta mcm = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
NetworkManager.instance.sendToAgent(mcm.getMasterNode(), JsonUtil.toJson(request));
} catch (Exception e) {
e.printStackTrace();
} finally {
CMActions.manager.stopContractWithOwner(
request.get("verifiedPubKey").getAsString(), contractID);
ContractMeta meta = manager.statusRecorder.getContractMeta(contractID);
if (null != meta && meta.contractExecutor != null) {
//TODO why close?
// meta.contractExecutor.close();
}
}
}
@Action(async = true)
public void ping(JsonObject args, ResultCallback resultCallback) {
ReplyUtil.simpleReply(resultCallback, "pong", null);
@ -1253,28 +1274,6 @@ public class CMActions implements OnHashCallback {
}
}
public static void killContractByMaster(
String contractID, JsonObject request, ResultCallback rc) {
LOGGER.info("[MasterClientTCPAction] killContract : ");
try {
MasterClientTCPAction.killUnitContractMap.put(
request.get("requestID").getAsString(),
new KillUnitContractInfo(rc, System.currentTimeMillis()));
MultiContractMeta mcm = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
NetworkManager.instance.sendToAgent(mcm.getMasterNode(), JsonUtil.toJson(request));
} catch (Exception e) {
e.printStackTrace();
} finally {
CMActions.manager.stopContractWithOwner(
request.get("verifiedPubKey").getAsString(), contractID);
ContractMeta meta = manager.statusRecorder.getContractMeta(contractID);
if (null != meta && meta.contractExecutor != null) {
//TODO why close?
// meta.contractExecutor.close();
}
}
}
@Action(userPermission = 1L << 26, async = true)
public void queryContractInstanceDOI(JsonObject args, ResultCallback resultCallback) {
long s = System.currentTimeMillis();
@ -1295,6 +1294,58 @@ public class CMActions implements OnHashCallback {
ExecutionManager.instance.updateLocalContractToNodeCenter();
}
@Action(userPermission = 1 << 16, async = true)
public void staticVerifyContract(JsonObject args, ResultCallback resultCallback) {
Contract c = new Contract();
c.setID(args.get("contractid").getAsString());
c.setType(ContractExecType.Sole);
c.setScript(args.get("script").getAsString());
Map<String, Object> r = new HashMap<>();
r.put("action", "onStaticVerifyResult");
long start = System.currentTimeMillis();
String path = args.get("path").getAsString();
if (path.startsWith("/")) {
try {
String[] pp = path.split("/");
StringBuilder parentPath = new StringBuilder();
for (String s : pp) {
parentPath.append(s).append("/");
}
String parPath = GlobalConf.instance.publicDir;
if (args.has("isPrivate") && args.get("isPrivate").getAsBoolean()) {
parPath = GlobalConf.instance.privateDir + "/" + handler.getPubKey();
}
byte[] bb =
YJSPacker.pack(new File(parPath, parentPath.toString()).getAbsolutePath());
File temp = File.createTempFile(pp[pp.length - 1], ".zip");
FileOutputStream fout = new FileOutputStream(temp);
assert bb != null;
fout.write(bb);
fout.close();
LOGGER.debug("StartContract, zipPath:" + temp.getAbsolutePath());
// TODO script should encoded!!
c.setScript(temp.getAbsolutePath());
LOGGER.debug(temp.getAbsolutePath());
} catch (Exception e) {
e.printStackTrace();
}
}
LOGGER.debug("verifyContract: " + JsonUtil.toJson(c));
r.put("data", staticVerify(c));
r.put("cid", c.getID());
r.put("executeTime", System.currentTimeMillis() - start);
// GRPCPool.writeToChain(c.getOwner(), privKey, gson.toJson(r),
// json.getString("requestID"));
resultCallback.onResult(r);
}
public String staticVerify(Contract c) {
return manager.staticVerify(c);
}
// @Action(userPermission = 1L << 26, async = true)
// public void queryContractInstanceInfoByDOI(JsonObject args, ResultCallback resultCallback)
// {
@ -1386,58 +1437,6 @@ public class CMActions implements OnHashCallback {
* resultCallback.onResult(gson.toJson(r)); }
*/
@Action(userPermission = 1 << 16, async = true)
public void staticVerifyContract(JsonObject args, ResultCallback resultCallback) {
Contract c = new Contract();
c.setID(args.get("contractid").getAsString());
c.setType(ContractExecType.Sole);
c.setScript(args.get("script").getAsString());
Map<String, Object> r = new HashMap<>();
r.put("action", "onStaticVerifyResult");
long start = System.currentTimeMillis();
String path = args.get("path").getAsString();
if (path.startsWith("/")) {
try {
String[] pp = path.split("/");
StringBuilder parentPath = new StringBuilder();
for (String s : pp) {
parentPath.append(s).append("/");
}
String parPath = GlobalConf.instance.publicDir;
if (args.has("isPrivate") && args.get("isPrivate").getAsBoolean()) {
parPath = GlobalConf.instance.privateDir + "/" + handler.getPubKey();
}
byte[] bb =
YJSPacker.pack(new File(parPath, parentPath.toString()).getAbsolutePath());
File temp = File.createTempFile(pp[pp.length - 1], ".zip");
FileOutputStream fout = new FileOutputStream(temp);
assert bb != null;
fout.write(bb);
fout.close();
LOGGER.debug("StartContract, zipPath:" + temp.getAbsolutePath());
// TODO script should encoded!!
c.setScript(temp.getAbsolutePath());
LOGGER.debug(temp.getAbsolutePath());
} catch (Exception e) {
e.printStackTrace();
}
}
LOGGER.debug("verifyContract: " + JsonUtil.toJson(c));
r.put("data", staticVerify(c));
r.put("cid", c.getID());
r.put("executeTime", System.currentTimeMillis() - start);
// GRPCPool.writeToChain(c.getOwner(), privKey, gson.toJson(r),
// json.getString("requestID"));
resultCallback.onResult(r);
}
public String staticVerify(Contract c) {
return manager.staticVerify(c);
}
@Action(userPermission = 1L << 16, async = true)
public void getControlFlowByFileName(JsonObject args, ResultCallback resultCallback) {
String project = args.get("projectName").getAsString();
@ -1933,4 +1932,8 @@ public class CMActions implements OnHashCallback {
}
}
}
}

View File

@ -44,7 +44,6 @@ public class MasterClientRecoverMechAction {
private final Map<String, OutputStream> transFileMap = new HashMap<>();
public MasterClientRecoverMechAction() {
}
// 告知master自己需要恢复携带之前的运行模式如果是StableMode需要携带lastExeSeq信息

View File

@ -129,6 +129,142 @@ public class MasterClientTCPAction {
return executor;
}
public static void dealRequests(String contractID) {
LOGGER.info("dealRequests");
// If still in recovering state,don't dealRequests now
if (MasterClientRecoverMechAction.recoverSet.contains(contractID)) {
LOGGER.info("本地还没恢复完成,不执行请求!");
return;
}
MultiContractMeta cei;
synchronized (cei = CMActions.manager.multiContractRecorder.createIfNotExist(contractID)) {
while (!cei.queue.isEmpty()) {
ContractRequest request = cei.queue.peek();
// logger.info("此时队列为 ");
// cei.printSet();
if (cei.isSequent(request.seq)) {
// request.setSeq(jo.get("seq").getAsString());
// request.needSeq = jo.get("needSeq").getAsBoolean();
long start = System.currentTimeMillis();
cei.curExeSeq = request.seq;
LOGGER.info("调试位置 1 cei.curExeSeq=" + cei.curExeSeq);
boolean isMultiReq = request.getRequestID().endsWith("_mul");
if (isMultiReq) {
// 开始执行多点合约请求需要将缓存的其他节点发来的同requestID的请求也触发调用
LOGGER.info("开始执行多点合约请求需要将缓存的其他节点发来的同requestID的请求也触发调用");
MultiRequestInfo mri =
MultiRequestInfo.reqInfos.get(request.getRequestID());
for (String uniID : mri.callbackMap.keySet()) {
LOGGER.info("触发 uniID=" + uniID);
MultiRequestInfo.exeMultiReq(request, uniID);
}
}
String data2 = CMActions.manager.executeLocally(request, null);
LOGGER.info("本地执行的结果为\n" + data2);
if (isMultiReq) {
MultiRequestInfo.reqInfos.get(request.getRequestID()).countIncrease();
}
Map<String, String> ret = new HashMap<>();
ret.put("action", "receiveTrustfullyResult");
SM2KeyPair keyPair = GlobalConf.instance.keyPair;
ret.put("nodeID", keyPair.getPublicKeyStr());
int seq = request.seq;
LOGGER.info("dealRequests 执行请求 " + seq);
ret.put("responseID", cei.uniReqIDMap.get(seq));
ret.put("executeTime", (System.currentTimeMillis() - start) + "");
ret.put("data", data2);
LOGGER.info("返回 uniID=" + cei.uniReqIDMap.get(seq) + " 的结果");
cei.resultMap.get(seq).onResult(JsonUtil.toJson(ret));
cei.setLastExeSeq(seq);
if (KeyValueDBUtil.instance.containsKey(
CMTables.LastExeSeq.toString(), contractID)) {
KeyValueDBUtil.instance.setValue(
CMTables.LastExeSeq.toString(), contractID, seq + "");
}
// ledger检查点 public static final int LEDGER_PERIOD = 100; //账本检查点
if (seq % 100 == 0) {
LOGGER.info("遇到ledger检查点 seq=" + cei.getLastExeSeq());
boolean isMaster = CMActions.manager.getContractIsMaster(contractID);
if (isMaster) {
String lastHash;
if (KeyValueDBUtil.instance.containsKey(
CMTables.CheckPointLastHash.toString(), contractID)) {
lastHash =
KeyValueDBUtil.instance.getValue(
CMTables.CheckPointLastHash.toString(), contractID);
} else {
lastHash = "firstCheckPoint";
}
Map<String, String> map = new HashMap<>();
String state = CMActions.manager.dumpContract(contractID, "");
map.put("state", state);
map.put("lastHash", lastHash);
String data = JsonUtil.toJson(map);
String requestID =
contractID + "_" + "checkPoint_" + new Random().nextInt();
CheckPointCallback cb = new CheckPointCallback();
ContractManager.checkPointToLedger(cb, contractID, data, requestID);
} else {
LOGGER.info("遇到检查点,但非master!");
}
}
cei.queue.poll();
cei.uniReqIDMap.remove(seq);
cei.resultMap.remove(seq);
} else if (cei.getLastExeSeq() >= request.seq) {
// delete already done requests
ContractRequest temp = cei.queue.peek();
while (!cei.queue.isEmpty() && cei.getLastExeSeq() >= temp.seq) {
// logger.info("队列中弹出请求 " + cei.queue.peek().getSeq());
int seq = temp.seq;
String responseID = cei.uniReqIDMap.get(seq);
ResultCallback cb = cei.resultMap.get(seq);
LOGGER.info("dealRequests 执行过期请求 " + seq);
if (responseID != null && cb != null) {
Map<String, String> ret = new HashMap<>();
ret.put("action", "receiveTrustfullyResult");
SM2KeyPair keyPair = GlobalConf.instance.keyPair;
ret.put("nodeID", keyPair.getPublicKeyStr());
ret.put("responseID", responseID);
ContractResult cr =
new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive(ComponedContractResult.EXPIRED_REQ));
ret.put("data", JsonUtil.toJson(cr));
cb.onResult(JsonUtil.toJson(ret));
}
cei.queue.poll();
cei.uniReqIDMap.remove(seq);
cei.resultMap.remove(seq);
temp = cei.queue.peek();
}
} else {
// ask master to re-send requests
LOGGER.info("send request to get cachedRequests");
Map<String, String> ret = new HashMap<>();
ret.put("action", "sendCachedRequests");
ret.put("contractID", contractID);
SM2KeyPair keyPair = GlobalConf.instance.keyPair;
ret.put("nodeID", keyPair.getPublicKeyStr());
ret.put("start", cei.getLastExeSeq() + "");
ret.put("end", request.seq + ""); // 左开右开区间
NetworkManager.instance.sendToAgent(cei.getMasterNode(), JsonUtil.toJson(ret));
break;
}
}
}
}
@Action(async = true)
public void onKillContractProcess(JsonObject jo, ResultCallback result) {
@ -145,13 +281,15 @@ public class MasterClientTCPAction {
}
}
// kill 本地的该集群合约实例
@Action(async = true)
public void killContractProcessAtSlave(JsonObject jo, ResultCallback result) {
String id;
if (jo.has("id")) id = jo.get("id").getAsString();
else id = jo.get("name").getAsString();
if (jo.has("id")) {
id = jo.get("id").getAsString();
} else {
id = jo.get("name").getAsString();
}
ContractClient cc = CMActions.manager.getClient(id);
@ -185,6 +323,7 @@ public class MasterClientTCPAction {
map.put("requestID", jo.get("requestID").getAsString());
map.put("data", ret);
result.onResult(JsonUtil.toJson(map));
}
@Action(async = true)
@ -441,144 +580,6 @@ public class MasterClientTCPAction {
}
}
public static void dealRequests(String contractID) {
LOGGER.info("dealRequests");
// If still in recovering state,don't dealRequests now
if (MasterClientRecoverMechAction.recoverSet.contains(contractID)) {
LOGGER.info("本地还没恢复完成,不执行请求!");
return;
}
MultiContractMeta cei;
synchronized (cei = CMActions.manager.multiContractRecorder.createIfNotExist(contractID)) {
while (!cei.queue.isEmpty()) {
ContractRequest request = cei.queue.peek();
// logger.info("此时队列为 ");
// cei.printSet();
if (cei.isSequent(request.seq)) {
// request.setSeq(jo.get("seq").getAsString());
// request.needSeq = jo.get("needSeq").getAsBoolean();
long start = System.currentTimeMillis();
cei.curExeSeq = request.seq;
LOGGER.info("调试位置 1 cei.curExeSeq=" + cei.curExeSeq);
boolean isMultiReq = request.getRequestID().endsWith("_mul");
if (isMultiReq) {
// 开始执行多点合约请求需要将缓存的其他节点发来的同requestID的请求也触发调用
LOGGER.info("开始执行多点合约请求需要将缓存的其他节点发来的同requestID的请求也触发调用");
MultiRequestInfo mri =
MultiRequestInfo.reqInfos.get(request.getRequestID());
for (String uniID : mri.callbackMap.keySet()) {
LOGGER.info("触发 uniID=" + uniID);
MultiRequestInfo.exeMultiReq(request, uniID);
}
}
String data2 = CMActions.manager.executeLocally(request, null);
LOGGER.info("本地执行的结果为\n" + data2);
if (isMultiReq) {
MultiRequestInfo.reqInfos.get(request.getRequestID()).countIncrease();
}
Map<String, String> ret = new HashMap<>();
ret.put("action", "receiveTrustfullyResult");
SM2KeyPair keyPair = GlobalConf.instance.keyPair;
ret.put("nodeID", keyPair.getPublicKeyStr());
int seq = request.seq;
LOGGER.info("dealRequests 执行请求 " + seq);
ret.put("responseID", cei.uniReqIDMap.get(seq));
ret.put("executeTime", (System.currentTimeMillis() - start) + "");
ret.put("data", data2);
LOGGER.info("返回 uniID=" + cei.uniReqIDMap.get(seq) + " 的结果");
cei.resultMap.get(seq).onResult(JsonUtil.toJson(ret));
cei.setLastExeSeq(seq);
if (KeyValueDBUtil.instance.containsKey(
CMTables.LastExeSeq.toString(), contractID)) {
KeyValueDBUtil.instance.setValue(
CMTables.LastExeSeq.toString(), contractID, seq + "");
}
// ledger检查点 public static final int LEDGER_PERIOD = 100; //账本检查点
if (seq % 100 == 0) {
LOGGER.info("遇到ledger检查点 seq=" + cei.getLastExeSeq());
boolean isMaster = CMActions.manager.getContractIsMaster(contractID);
if (isMaster) {
String lastHash;
if (KeyValueDBUtil.instance.containsKey(
CMTables.CheckPointLastHash.toString(), contractID)) {
lastHash =
KeyValueDBUtil.instance.getValue(
CMTables.CheckPointLastHash.toString(), contractID);
} else {
lastHash = "firstCheckPoint";
}
Map<String, String> map = new HashMap<>();
String state = CMActions.manager.dumpContract(contractID, "");
map.put("state", state);
map.put("lastHash", lastHash);
String data = JsonUtil.toJson(map);
String requestID =
contractID + "_" + "checkPoint_" + new Random().nextInt();
CheckPointCallback cb = new CheckPointCallback();
ContractManager.checkPointToLedger(cb, contractID, data, requestID);
} else {
LOGGER.info("遇到检查点,但非master!");
}
}
cei.queue.poll();
cei.uniReqIDMap.remove(seq);
cei.resultMap.remove(seq);
} else if (cei.getLastExeSeq() >= request.seq) {
// delete already done requests
ContractRequest temp = cei.queue.peek();
while (!cei.queue.isEmpty() && cei.getLastExeSeq() >= temp.seq) {
// logger.info("队列中弹出请求 " + cei.queue.peek().getSeq());
int seq = temp.seq;
String responseID = cei.uniReqIDMap.get(seq);
ResultCallback cb = cei.resultMap.get(seq);
LOGGER.info("dealRequests 执行过期请求 " + seq);
if (responseID != null && cb != null) {
Map<String, String> ret = new HashMap<>();
ret.put("action", "receiveTrustfullyResult");
SM2KeyPair keyPair = GlobalConf.instance.keyPair;
ret.put("nodeID", keyPair.getPublicKeyStr());
ret.put("responseID", responseID);
ContractResult cr =
new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive(ComponedContractResult.EXPIRED_REQ));
ret.put("data", JsonUtil.toJson(cr));
cb.onResult(JsonUtil.toJson(ret));
}
cei.queue.poll();
cei.uniReqIDMap.remove(seq);
cei.resultMap.remove(seq);
temp = cei.queue.peek();
}
} else {
// ask master to re-send requests
LOGGER.info("send request to get cachedRequests");
Map<String, String> ret = new HashMap<>();
ret.put("action", "sendCachedRequests");
ret.put("contractID", contractID);
SM2KeyPair keyPair = GlobalConf.instance.keyPair;
ret.put("nodeID", keyPair.getPublicKeyStr());
ret.put("start", cei.getLastExeSeq() + "");
ret.put("end", request.seq + ""); // 左开右开区间
NetworkManager.instance.sendToAgent(cei.getMasterNode(), JsonUtil.toJson(ret));
break;
}
}
}
}
@Action(async = true)
public void receiveContractExecutionServer(JsonObject jsonObject, ResultCallback resultCallback) {
MasterServerTCPAction.sync.wakeUp(