fix: fix bugs in SelfAdaptiveShardingExecutor

remove body check in SelfAdaptiveShardingExecutor.Block.isValid
This commit is contained in:
Frank.R.Wu 2022-02-08 22:51:46 +08:00
parent 78366fcb55
commit ed507b14b4
6 changed files with 318 additions and 276 deletions

View File

@ -21,7 +21,9 @@ import org.bdware.server.GRPCPool;
import org.bdware.server.GlobalConf;
import org.bdware.server.action.p2p.MasterClientTCPAction;
import org.bdware.server.trustedmodel.AgentManager;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.KillUnitContractInfo;
import org.bdware.server.trustedmodel.SelfAdaptiveShardingExecutor;
import org.bdware.server.ws.ContractManagerFrameHandler;
import org.bdware.units.NetworkManager;
import org.bdware.units.function.CommunicationManager;
@ -51,7 +53,6 @@ public class CMActions implements OnHashCallback {
public static FuncInvokeInfo FUNCINVOKEINFO = new FuncInvokeInfo(); // 合约调用时参数和结果
private static SecureRandom RANDOM;
public ContractManagerFrameHandler handler;
public CMActions() {
handler = null;
}
@ -532,6 +533,16 @@ public class CMActions implements OnHashCallback {
}
}
@Action(async = true, userPermission = 0)
public void checkBlocks(JsonObject args, final ResultCallback rcb) {
ContractExecutor executor = CMActions.manager.getExecutor(args.get("contractID").getAsString());
try {
ReplyUtil.simpleReply(rcb, "onCheckBlocks", ((SelfAdaptiveShardingExecutor) executor).checkCache());
} catch (Exception e) {
ReplyUtil.simpleReply(rcb, "onCheckBlocks", "error! " + e.getMessage());
}
}
// 节点管理者
@Action(userPermission = 1L << 19)
public void listAllContractProcess(JsonObject args, ResultCallback resultCallback) {
@ -1584,97 +1595,6 @@ public class CMActions implements OnHashCallback {
// rc.onResult(peers);
}
// @Action(userPermission = 1L << 26, async = true)
// public void queryContractInstanceInfoByDOI(JsonObject args, ResultCallback resultCallback)
// {
// long s = System.currentTimeMillis();
// String ret;
// try {
// String contractDOI = args.get("doi").getAsString();
// DigitalObject contractDO;
// DoipClient doipClient =
// DoipClient.createByRepoUrlAndMsgFmt(
// DOIPMainServer.repoUrl, DoipMessageFormat.PACKET.getName());
// DoMessage response = doipClient.retrieve(contractDOI, null, null);
// if (response.parameters.response == DoResponse.Success) {
// contractDO = DigitalObject.parse(response.body);
// } else {
// DoMessage resp = DOAClient.getGlobalInstance().retrieve(contractDOI, null,
// null);
// contractDO = DigitalObject.parse(resp.body);
// }
// ContractInstanceDO contractInstanceDO =
// (ContractInstanceDO)
// ContractManager.toObject(contractDO.elements.get(0).getData());
// // Dictionary<String, String> contractInfo = JsonUtil.fromJson(new
// // String(contractDO.getData()), new Hashtable<String, String>().getClass());
//
// ret =
// String.format(
// "Contract ID: %s\nContract PublicKey: %s",
// contractInstanceDO.id, contractInstanceDO.publicKey);
// } catch (Exception e) {
// ByteArrayOutputStream bo = new ByteArrayOutputStream();
// e.printStackTrace(new PrintStream(bo));
// ret = bo.toString();
// }
// Map<String, Object> r = new HashMap<>();
// r.put("action", "onQueryContractInstanceInfoByDOI");
// r.put("data", ret);
// r.put("executeTime", System.currentTimeMillis() - s);
// resultCallback.onResult(JsonUtil.toJson(r));
// if (client != null && client.controller != null) {
// client.controller.updateContract();
// }
// }
/*
* @Action(userPermission = 1 << 19, async = true) public void
* staticVerify(JsonObject args, ResultCallback resultCallback) { Map<String,
* Object> r = new HashMap<>(); r.put("action", "onStaticVerifyResult");
*
* long start = System.currentTimeMillis(); Map<String, Object> ret = new
* HashMap<>(); Contract c = new Contract(); // c.setType(Type.Algorithm);
* c.setType(ContractType.Sole); ret.put("action", "onStartContract"); String path =
* null; if (args.has("path")) path = args.get("path").getAsString(); if (path
* != null && path.startsWith("/")) c.setScript(args.get("path").getAsString());
* else c.setScript(args.get("script").getAsString()); if
* (args.has("publicKey")) { c.setOwner(args.get("publicKey").getAsString());
* c.setSignature(args.get("signature").getAsString()); } else if
* (args.has("owner")) { c.setOwner(args.get("owner").getAsString());
* c.setSignature(args.get("signature").getAsString());
*
* } else { c.setOwner(GlobalConf.instance.keyPair.getPublicKeyStr());
* c.doSignature(GlobalConf.instance.keyPair); }
*
* if (!c.verifySignature()) { ret.put("data", "verify failed");
* resultCallback.onResult(gson.toJson(ret)); return; } if (path != null &&
* path.startsWith("/")) { String parPath; if (args.has("isPrivate") &&
* args.get("isPrivate").getAsBoolean()) { parPath =
* GlobalConf.instance.privateDir + "/" + handler.pubKey; } else { parPath =
* GlobalConf.instance.publicDir; } try { String[] pp = path.split("/"); String
* parentPath = path; for (int i = 0; i < pp.length && i < 2; i++) { parentPath
* += pp[i] + "/"; } System.out.println("[CMActions] pack Dir, from:" + path +
* " --> " + parentPath); byte[] bb = YJSPacker.pack(new File(parPath,
* parentPath).getAbsolutePath()); File temp = File.createTempFile(pp[pp.length
* - 1], ".zip"); FileOutputStream fout = new FileOutputStream(temp);
* fout.write(bb); fout.close(); System.out.println("StartContract, zipPath:" +
* temp.getAbsolutePath()); // TODO script should encoded!!
* c.setScript(temp.getAbsolutePath()); } catch (Exception e) {
* e.printStackTrace(); } }
*
* System.out.println("[CMActions] verifyContract: " + gson.toJson(c));
* r.put("data", manager.staticVerify(c)); r.put("cid", c.getID());
* r.put("executeTime", System.currentTimeMillis() - start); //
* GRPCPool.writeToChain(c.getOwner(), privKey, gson.toJson(r), //
* json.getString("requestID"));
*
* //addLocalContractLog("staticVerify", c.getID(), path.split("/")[1],
* c.getOwner(),null);
*
* resultCallback.onResult(gson.toJson(r)); }
*/
@Action(async = true, userPermission = 0L)
public void updateNodeUnits(JsonObject args, ResultCallback rc) {
LOGGER.debug("updateNodeUnits");
@ -1842,6 +1762,97 @@ public class CMActions implements OnHashCallback {
resultCallback.onResult(ret);
}
// @Action(userPermission = 1L << 26, async = true)
// public void queryContractInstanceInfoByDOI(JsonObject args, ResultCallback resultCallback)
// {
// long s = System.currentTimeMillis();
// String ret;
// try {
// String contractDOI = args.get("doi").getAsString();
// DigitalObject contractDO;
// DoipClient doipClient =
// DoipClient.createByRepoUrlAndMsgFmt(
// DOIPMainServer.repoUrl, DoipMessageFormat.PACKET.getName());
// DoMessage response = doipClient.retrieve(contractDOI, null, null);
// if (response.parameters.response == DoResponse.Success) {
// contractDO = DigitalObject.parse(response.body);
// } else {
// DoMessage resp = DOAClient.getGlobalInstance().retrieve(contractDOI, null,
// null);
// contractDO = DigitalObject.parse(resp.body);
// }
// ContractInstanceDO contractInstanceDO =
// (ContractInstanceDO)
// ContractManager.toObject(contractDO.elements.get(0).getData());
// // Dictionary<String, String> contractInfo = JsonUtil.fromJson(new
// // String(contractDO.getData()), new Hashtable<String, String>().getClass());
//
// ret =
// String.format(
// "Contract ID: %s\nContract PublicKey: %s",
// contractInstanceDO.id, contractInstanceDO.publicKey);
// } catch (Exception e) {
// ByteArrayOutputStream bo = new ByteArrayOutputStream();
// e.printStackTrace(new PrintStream(bo));
// ret = bo.toString();
// }
// Map<String, Object> r = new HashMap<>();
// r.put("action", "onQueryContractInstanceInfoByDOI");
// r.put("data", ret);
// r.put("executeTime", System.currentTimeMillis() - s);
// resultCallback.onResult(JsonUtil.toJson(r));
// if (client != null && client.controller != null) {
// client.controller.updateContract();
// }
// }
/*
* @Action(userPermission = 1 << 19, async = true) public void
* staticVerify(JsonObject args, ResultCallback resultCallback) { Map<String,
* Object> r = new HashMap<>(); r.put("action", "onStaticVerifyResult");
*
* long start = System.currentTimeMillis(); Map<String, Object> ret = new
* HashMap<>(); Contract c = new Contract(); // c.setType(Type.Algorithm);
* c.setType(ContractType.Sole); ret.put("action", "onStartContract"); String path =
* null; if (args.has("path")) path = args.get("path").getAsString(); if (path
* != null && path.startsWith("/")) c.setScript(args.get("path").getAsString());
* else c.setScript(args.get("script").getAsString()); if
* (args.has("publicKey")) { c.setOwner(args.get("publicKey").getAsString());
* c.setSignature(args.get("signature").getAsString()); } else if
* (args.has("owner")) { c.setOwner(args.get("owner").getAsString());
* c.setSignature(args.get("signature").getAsString());
*
* } else { c.setOwner(GlobalConf.instance.keyPair.getPublicKeyStr());
* c.doSignature(GlobalConf.instance.keyPair); }
*
* if (!c.verifySignature()) { ret.put("data", "verify failed");
* resultCallback.onResult(gson.toJson(ret)); return; } if (path != null &&
* path.startsWith("/")) { String parPath; if (args.has("isPrivate") &&
* args.get("isPrivate").getAsBoolean()) { parPath =
* GlobalConf.instance.privateDir + "/" + handler.pubKey; } else { parPath =
* GlobalConf.instance.publicDir; } try { String[] pp = path.split("/"); String
* parentPath = path; for (int i = 0; i < pp.length && i < 2; i++) { parentPath
* += pp[i] + "/"; } System.out.println("[CMActions] pack Dir, from:" + path +
* " --> " + parentPath); byte[] bb = YJSPacker.pack(new File(parPath,
* parentPath).getAbsolutePath()); File temp = File.createTempFile(pp[pp.length
* - 1], ".zip"); FileOutputStream fout = new FileOutputStream(temp);
* fout.write(bb); fout.close(); System.out.println("StartContract, zipPath:" +
* temp.getAbsolutePath()); // TODO script should encoded!!
* c.setScript(temp.getAbsolutePath()); } catch (Exception e) {
* e.printStackTrace(); } }
*
* System.out.println("[CMActions] verifyContract: " + gson.toJson(c));
* r.put("data", manager.staticVerify(c)); r.put("cid", c.getID());
* r.put("executeTime", System.currentTimeMillis() - start); //
* GRPCPool.writeToChain(c.getOwner(), privKey, gson.toJson(r), //
* json.getString("requestID"));
*
* //addLocalContractLog("staticVerify", c.getID(), path.split("/")[1],
* c.getOwner(),null);
*
* resultCallback.onResult(gson.toJson(r)); }
*/
@Action(async = true)
public void askMasterElectTimeRecorder(JsonObject args, ResultCallback resultCallback) {
String data = "null";
@ -1953,6 +1964,4 @@ public class CMActions implements OnHashCallback {
}
}
}
}

View File

@ -186,8 +186,7 @@ public class MasterWSAction {
.collect(Collectors.toSet());
// }
SM2KeyPair keyPair = GlobalConf.instance.keyPair;
String masterNode = keyPair.getPublicKeyStr();
String masterNode = GlobalConf.instance.keyPair.getPublicKeyStr();
nodeNames.add(masterNode);
int nodeSize = nodeNames.size();
@ -237,7 +236,7 @@ public class MasterWSAction {
nodeSize);
MasterServerTCPAction.sync.sleepWithTimeout(requestID, collector, 20);
Map<String, Object> request = new HashMap<>();
request.put("master", keyPair.getPublicKeyStr());
request.put("master", masterNode);
if (args.has("isPrivate")) {
request.put("isPrivate", args.get("isPrivate").getAsString());
}

View File

@ -24,8 +24,6 @@ import org.bdware.server.trustedmodel.KillUnitContractResultCollector;
import org.bdware.server.trustedmodel.ResultCollector;
import org.bdware.units.NetworkManager;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -132,15 +130,10 @@ public class MasterServerTCPAction {
ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID);
MultiContractMeta ret =
CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
System.out.println(
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(new Date(System.currentTimeMillis()))
+ " [MasterServerTCPAction] getCMInfo: "
+ meta.getName()
+ " "
+ meta.getID()
+ " "
+ meta.getStatus());
LOGGER.debug(String.format("getCMInfo: %s %s %s",
meta.getName(),
meta.getID(),
meta.getStatus()));
return ret;
}
@ -239,7 +232,7 @@ public class MasterServerTCPAction {
ContractMeta contractMeta =
CMActions.manager.statusRecorder.getContractMeta(contractID);
if (contractMeta == null || contractMeta.getStatus() == KILLED) {
LOGGER.debug("send ReRoute response:" + cr.toString());
LOGGER.debug("send ReRoute response: " + cr);
JsonObject result = new JsonObject();
result.addProperty("action", "reRouteContract");
result.addProperty("responseID", cr.get("requestID").getAsString());
@ -255,7 +248,9 @@ public class MasterServerTCPAction {
// null") +
// "\n");
if (info != null && contractMeta.contract.getType() != ContractExecType.Sharding) {
if (info != null &&
ContractExecType.Sharding != contractMeta.contract.getType()
&& !ContractExecType.SelfAdaptiveSharding.equals(contractMeta.contract.getType())) {
// 这个是个多节点的合约
// Just forward it to the correct Node
// Master节点直接发3个聚合后返回结果
@ -296,7 +291,7 @@ public class MasterServerTCPAction {
}
}, null);
} else {
LOGGER.debug("send ReRoute response:" + cr.toString());
LOGGER.debug("send ReRoute response:" + cr);
JsonObject result = new JsonObject();
result.addProperty("action", "reRouteContract");
result.addProperty("responseID", cr.get("requestID").getAsString());

View File

@ -209,8 +209,9 @@ public class NodeCenterClientController implements NodeCenterConn {
public void updateNonMasters(JsonObject jo, ResultCallback cb) {
String[] contracts = jo.get("contracts").getAsString().split(",");
for (String id : contracts) {
LOGGER.info("旧的master设置合约 " + id + " 自己不再是master");
CMActions.manager.setContractIsMaster(id, false + "");
if (null != CMActions.manager.setContractIsMaster(id, String.valueOf(false))) {
LOGGER.warn("master of contract " + id + " changes");
}
}
}
@ -331,8 +332,8 @@ public class NodeCenterClientController implements NodeCenterConn {
@Override
public String routeContract(String contractID) {
LOGGER.info("[CMClientController] routeContract : " + contractID);
LOGGER.info("contractID2Pubkey.contractsKey=" + contractID2PubKey.containsKey(contractID));
LOGGER.debug("[CMClientController] routeContract : " + contractID);
LOGGER.debug("contractID2Pubkey.contractsKey=" + contractID2PubKey.containsKey(contractID));
// TODO RouteContract是不是IRP协议/DOIP协议
if (contractID2PubKey.containsKey(contractID)) {
return contractID2PubKey.get(contractID);
@ -519,93 +520,6 @@ public class NodeCenterClientController implements NodeCenterConn {
queryNCRepoDOI(json, result);
}
class ReceiveFileThread extends Thread {
private final Map<String, FileOutputStream> fileMap = new HashMap<>();
ReceiveFileThread() {
super();
this.start();
}
public void run() {
for (; ; ) {
if (receiveQueue.size() > 0) {
try {
JsonObject jo = receiveQueue.poll();
receiveProject(jo);
} catch (Exception e) {
e.printStackTrace();
}
} else {
synchronized (ReceiveFileThread.this) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public void receiveProject(JsonObject args) {
String fileName = args.get("fileName").getAsString();
boolean isAppend = args.get("isAppend").getAsBoolean();
boolean isDone = args.get("isDone").getAsBoolean();
boolean isPrivate = args.get("isPrivate").getAsBoolean();
LOGGER.debug(
String.format("isAppend=%b isDone=%b isPrivate=%b", isAppend, isDone, isPrivate));
String path = GlobalConf.instance.publicCompiledDir;
if (isPrivate && args.has("pubKey")) {
path = GlobalConf.instance.privateCompiledDir + "/" + args.get("pubKey").getAsString();
}
File dir = new File(path);
if (!dir.exists()) {
LOGGER.debug("mkdir " + dir.getAbsoluteFile() + ": " + dir.mkdirs());
}
FileOutputStream fout = null;
if (!isAppend) {
try {
fout = new FileOutputStream(new File(dir, fileName));
fileMap.put(fileName, fout);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
} else {
fout = fileMap.get(fileName);
}
if (isDone) {
if (fout != null)
try {
fout.close();
fileMap.remove(fileName);
} catch (IOException e) {
e.printStackTrace();
}
LOGGER.debug("receive finish.");
Map<String, String> req = new HashMap<>();
req.put("action", "onReceiveProject");
req.put("requestID", args.get("requestID").getAsString());
req.put("nodeID", nodeID);
req.put("progress", "100");
NetworkManager.instance.sendToNodeCenter(JsonUtil.toJson(req));
} else {
String data = args.get("data").getAsString();
byte[] byteData = ByteUtil.decodeBASE64(data);
try {
if (null != fout && null != byteData) {
fout.write(byteData);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@Action(async = false)
public synchronized void receiveProject(JsonObject args, final ResultCallback rc) {
try {
@ -758,6 +672,11 @@ public class NodeCenterClientController implements NodeCenterConn {
}
}
@Action(async = true)
public void onDistributeYPK(JsonObject json, ResultCallback rc) {
onDistribute(json, rc);
}
@Action(async = true)
public void transferInstance(JsonObject jo, ResultCallback result) {
LOGGER.info("transferInstance");
@ -831,4 +750,90 @@ public class NodeCenterClientController implements NodeCenterConn {
}
}
}
class ReceiveFileThread extends Thread {
private final Map<String, FileOutputStream> fileMap = new HashMap<>();
ReceiveFileThread() {
super();
this.start();
}
public void run() {
for (; ; ) {
if (receiveQueue.size() > 0) {
try {
JsonObject jo = receiveQueue.poll();
receiveProject(jo);
} catch (Exception e) {
e.printStackTrace();
}
} else {
synchronized (ReceiveFileThread.this) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public void receiveProject(JsonObject args) {
String fileName = args.get("fileName").getAsString();
boolean isAppend = args.get("isAppend").getAsBoolean();
boolean isDone = args.get("isDone").getAsBoolean();
boolean isPrivate = args.get("isPrivate").getAsBoolean();
LOGGER.debug(
String.format("isAppend=%b isDone=%b isPrivate=%b", isAppend, isDone, isPrivate));
String path = GlobalConf.instance.publicCompiledDir;
if (isPrivate && args.has("pubKey")) {
path = GlobalConf.instance.privateCompiledDir + "/" + args.get("pubKey").getAsString();
}
File dir = new File(path);
if (!dir.exists()) {
LOGGER.debug("mkdir " + dir.getAbsoluteFile() + ": " + dir.mkdirs());
}
FileOutputStream fout = null;
if (!isAppend) {
try {
fout = new FileOutputStream(new File(dir, fileName));
fileMap.put(fileName, fout);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
} else {
fout = fileMap.get(fileName);
}
if (isDone) {
if (fout != null)
try {
fout.close();
fileMap.remove(fileName);
} catch (IOException e) {
e.printStackTrace();
}
LOGGER.debug("receive finish.");
Map<String, String> req = new HashMap<>();
req.put("action", "onReceiveProject");
req.put("requestID", args.get("requestID").getAsString());
req.put("nodeID", nodeID);
req.put("progress", "100");
NetworkManager.instance.sendToNodeCenter(JsonUtil.toJson(req));
} else {
String data = args.get("data").getAsString();
byte[] byteData = ByteUtil.decodeBASE64(data);
try {
if (null != fout && null != byteData) {
fout.write(byteData);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

View File

@ -33,7 +33,7 @@ public class AgentManager implements AgentPeerManagerIntf {
@Override
public void executeByOtherNodeAsync(String pubKey, ContractRequest c, ResultCallback cb) {
LOGGER.error(pubKey + " " + c.getRequestID() + " " + c.getContractID() + " cb:" + cb);
LOGGER.debug(pubKey + " " + c.getRequestID() + " " + c.getContractID() + " cb:" + cb);
int maxMasterProxyLoad = CongestionControl.masterProxyLoad.incrementAndGet();
if (maxMasterProxyLoad > CongestionControl.maxMasterProxyLoad)
CongestionControl.maxMasterProxyLoad = maxMasterProxyLoad;

View File

@ -1,5 +1,6 @@
package org.bdware.server.trustedmodel;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager;
@ -13,15 +14,16 @@ import org.bdware.sc.bean.SM2Verifiable;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.RecoverFlag;
import org.bdware.sc.util.HashUtil;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
import org.bdware.units.NetworkManager;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@ -29,66 +31,76 @@ import java.util.stream.Collectors;
*/
public class SelfAdaptiveShardingExecutor implements ContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class);
private static final int SUBMIT_LIMIT = 1024;
private static final int SUBMIT_LIMIT = 5120;
private static final int DELAY = 1;
private final Queue<ContractRequest> reqQueue = new ConcurrentLinkedQueue<>();
private final MultiContractMeta meta;
private final Map<String, Block> toExecuted = new ConcurrentHashMap<>();
private final Set<String> executedBlocks = ConcurrentHashMap.newKeySet();
private final Map<String, Boolean> executedTxs = new ConcurrentHashMap<>();
private final Object flag = new Object();
private final Object executorFlag = new Object();
private final ScheduledFuture<?> future;
private boolean running = true;
// the pointer to the latest executed block
private String executorPointer = "0";
// the block to be submitted
private Block b = new Block();
public SelfAdaptiveShardingExecutor(String contractID) {
this.meta =
CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
this.meta = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
this.future = ContractManager.scheduledThreadPool.scheduleWithFixedDelay(
this::submitBlock,
DELAY,
DELAY,
TimeUnit.SECONDS);
LOGGER.debug(String.format("ContractManager.threadPool=%d/%d",
((ThreadPoolExecutor) ContractManager.threadPool).getActiveCount(),
((ThreadPoolExecutor) ContractManager.threadPool).getPoolSize()));
ContractManager.threadPool.execute(() -> {
LOGGER.info(
"[SelfAdaptiveShardingExecutor " + meta.getContractID() + "] starting service..." + running);
LOGGER.info(String.format(
"[Executor %s] starting executing service... %b",
meta.getContractID(), meta.isMaster()));
while (running) {
LOGGER.info("checking blocks to be executed, latest block=" +
this.b.prevHash + ", to be executed size=" + toExecuted.size());
LOGGER.info(String.format(
"[Executor %s] checking blocks to be executed, latest block=%s, to be executed size=%d",
meta.getContractID(), executorPointer, toExecuted.size()));
LOGGER.debug("executed: " + JsonUtil.toJson(executedBlocks) + "\n\t" + JsonUtil.toJson(executedTxs));
while (!toExecuted.isEmpty()) {
String key = this.b.prevHash;
Block block = toExecuted.get(key);
if (null != block) {
executeBlock(block);
Block block = toExecuted.get(executorPointer);
// check if the execution ends
if (null == block) {
break;
}
toExecuted.remove(key);
executeBlock(block);
toExecuted.remove(executorPointer);
executorPointer = block.hash;
}
synchronized (flag) {
try {
flag.wait();
} catch (InterruptedException e) {
LOGGER.warn(String.format(
"[SelfAdaptiveShardingExecutor %s] waiting is interrupted: %s",
meta.getContractID(),
e.getMessage()));
try {
synchronized (executorFlag) {
executorFlag.wait();
}
} catch (InterruptedException e) {
LOGGER.warn(String.format(
"[Executor %s] waiting is interrupted: %s",
meta.getContractID(), e.getMessage()));
}
}
});
}
public JsonArray checkCache() {
JsonArray ret = new JsonArray();
ret.add(executorPointer);
for (Map.Entry<String, Block> entry : toExecuted.entrySet()) {
ret.add(String.format("%s,%s,%d", entry.getKey(), entry.getValue().hash, entry.getValue().height));
}
return ret;
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) {
// check client
ContractClient client = CMActions.manager.getClient(meta.getContractID());
if (null == client) {
LOGGER.error("contract " + meta.getContractID() + " not found!");
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Error,
new JsonPrimitive("contract " + meta.getContractID() + " not found!"))));
return;
}
@ -96,12 +108,10 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction());
if (null == funDesp) {
LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!");
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive(
String.format("action %s of contract %s not found!",
req.getAction(),
meta.getContractID())))));
rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Error,
new JsonPrimitive(String.format(
"action %s of contract %s not found!",
req.getAction(), meta.getContractID())))));
return;
}
// for view function, execute it
@ -111,18 +121,21 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
}
// normal function, check if it is in blocks
if (executedTxs.containsKey(requestID)) {
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Error,
new JsonPrimitive("this request has been packed!"))));
return;
}
// forward to master
if (!meta.isMaster()) {
CMActions.manager.executeContractOnOtherNodes(req, rcb);
return;
}
// add blocks into request cache
LOGGER.debug("receive contract request " + requestID);
executedTxs.put(requestID, false);
reqQueue.add(req);
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Executing,
new JsonPrimitive("this request is adding into blocks"))));
rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Executing,
new JsonPrimitive("this request is added into blocks"))));
// if cache is full, submit
if (reqQueue.size() >= SUBMIT_LIMIT) {
ContractManager.threadPool.execute(this::submitBlock);
@ -137,17 +150,18 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
LOGGER.info("destruct executor of contract " + meta.getContractID());
}
public void execute(String blockStr) {
public void receiveBlock(String blockStr) {
Block block = JsonUtil.fromJson(blockStr, Block.class);
// the block must have not been cached or executed, and must be valid
boolean valid = block.isValid();
if (!toExecuted.containsKey(block.prevHash) &&
!executedBlocks.contains(block.hash) &&
block.isValid()) {
valid) {
// add block into block cache
LOGGER.info(String.format(
"[SelfAdaptiveShardingExecutor %s] receive block %s -> %s," +
" %d transactions, timestamp=%d, size=%d",
"[Executor %s] receive block [%d] %s -> %s, %d transactions, timestamp=%d, size=%d",
meta.getContractID(),
block.height,
block.hash,
block.prevHash,
block.requests.length,
@ -155,9 +169,16 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
blockStr.length()));
toExecuted.put(block.prevHash, block);
// notify thread to execute blocks
synchronized (flag) {
flag.notify();
synchronized (executorFlag) {
executorFlag.notify();
}
} else {
LOGGER.warn(String.format("receive block [%d] %s failed! checking: cached %b executed %b valid %b",
block.height,
block.hash,
toExecuted.containsKey(block.prevHash),
executedBlocks.contains(block.hash),
valid));
}
}
@ -176,41 +197,48 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
for (ContractRequest request : block.requests) {
String ret = CMActions.manager.executeLocally(request, null);
LOGGER.debug(String.format(
"[SelfAdaptiveShardingExecutor %s] result of request %s: %s",
meta.getContractID(),
request.getRequestID(),
ret));
"[Executor %s] result of request %s: %s",
meta.getContractID(), request.getRequestID(), ret));
executedTxs.put(request.getRequestID(), true);
}
LOGGER.info(String.format(
"[SelfAdaptiveShardingExecutor %s] execute %d transactions of block %s",
meta.getContractID(),
block.requests.length,
block.hash));
"[Executor %s] execute %d transactions of block [%d] %s",
meta.getContractID(), block.requests.length, block.height, block.hash));
// TODO create check point
this.b = new Block(block.hash, this.b.height + 1);
// this.b = new Block(block.hash, this.b.height + 1);
executedBlocks.add(block.hash);
}
private void submitBlock() {
Block block = fillBlock();
if (null != block) {
LOGGER.info("deliver block " + block.hash + "...");
LOGGER.debug(JsonUtil.toPrettyJson(block));
String[] nodes = this.meta.getMembers();
LOGGER.debug(JsonUtil.toJson(nodes));
JsonObject req = new JsonObject();
String blockStr = JsonUtil.toJson(block);
req.addProperty("action", "deliverBlock");
req.addProperty("data", JsonUtil.toJson(block));
req.addProperty("data", blockStr);
req.addProperty("contractID", this.meta.getContractID());
String reqStr = req.toString();
// deliver blocks
LOGGER.info("deliver block " + block.hash + "...");
String myself = CMActions.manager.nodeCenterConn.getNodeId();
for (String node : nodes) {
if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID())
== RecoverFlag.Fine) {
// TODO: find dead lock here
// if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID())
// == RecoverFlag.Fine) {
// LOGGER.info("deliver block " + block.hash + " to node " + node);
// NetworkManager.instance.sendToAgent(node, reqStr);
// }
if (!Objects.equals(myself, node)) {
NetworkManager.instance.sendToAgent(node, reqStr);
}
// LOGGER.info("delivering done: " + node);
}
this.receiveBlock(blockStr);
}
// LOGGER.info("end " + (null != block));
}
private synchronized Block fillBlock() {
@ -222,8 +250,10 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
for (int i = 0; i < requests.length; ++i) {
requests[i] = reqQueue.poll();
}
this.b.fillBlock(requests);
return this.b;
Block block = this.b;
block.fillBlock(requests);
this.b = new Block(block.hash, block.height + 1);
return block;
}
static class Block extends SM2Verifiable {
@ -254,15 +284,19 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
}
public boolean isValid() {
return computeHash().equals(hash) && body.equals(merkle(this.requests)) && verifySignature();
boolean hashValid = computeHash().equals(hash),
// bodyValid = body.equals(merkle(this.requests)),
bodyValid = true,
signValid = verifySignature();
boolean ret = hashValid & bodyValid & signValid;
if (!ret) {
LOGGER.warn(String.format("hash %b body %b sign %b", hashValid, bodyValid, signValid));
}
return ret;
}
private String computeHash() {
return HashUtil.sha3(
String.valueOf(this.height),
this.prevHash,
this.checkPoint,
this.body);
return HashUtil.sha3(String.valueOf(this.height), this.prevHash, this.checkPoint, this.body);
}
private String merkle(ContractRequest[] requests) {
@ -273,19 +307,19 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
if (requests.length == 1) {
return HashUtil.sha3(requests[0].getRequestID());
}
Queue<String> reqQueue =
Queue<String> merkleQueue =
Arrays.stream(requests).map(ContractRequest::getRequestID)
.collect(Collectors.toCollection(ArrayDeque::new));
do {
int size;
for (size = reqQueue.size(); size > 1; size -= 2) {
reqQueue.add(HashUtil.sha3(reqQueue.poll(), reqQueue.poll()));
for (size = merkleQueue.size(); size > 1; size -= 2) {
merkleQueue.add(HashUtil.sha3(merkleQueue.poll(), merkleQueue.poll()));
}
if (size == 1) {
reqQueue.add(reqQueue.poll());
merkleQueue.add(merkleQueue.poll());
}
} while (1 != reqQueue.size());
return reqQueue.poll();
} while (1 != merkleQueue.size());
return merkleQueue.poll();
}
@Override