feat: update SelfAdaptiveShardingExecutor

update SelfAdaptiveShardingExecutor.submitBlock to avoid creating too many threads; add logs
This commit is contained in:
Frank.R.Wu 2021-11-19 14:51:25 +08:00
parent 969e456687
commit 9fb5cd6eb4
4 changed files with 80 additions and 66 deletions

View File

@ -120,15 +120,17 @@ public class CMActions implements OnHashCallback {
final JsonObject args,
final ResultCallback resultCallback,
final OnHashCallback hashcb) {
final ContractRequest c = new ContractRequest();
if (!args.has("contractName") && !args.has("contractID") && !args.has("contractDOI")) {
final ContractRequest cReq = new ContractRequest();
if (!args.has("contractName") &&
!args.has("contractID") &&
!args.has("contractDOI")) {
resultCallback.onResult(MISSING_ARGUMENT);
return;
}
if (args.has("contractDOI") && !args.has("contractID")) {
LOGGER.debug("contractDOI: " + args.get("contractDOI").getAsString());
try {
c.setContractDOI(args.get("contractDOI").getAsString());
cReq.setContractDOI(args.get("contractDOI").getAsString());
} catch (Exception e) {
e.printStackTrace();
resultCallback.onResult(INVALID_DOI);
@ -136,86 +138,91 @@ public class CMActions implements OnHashCallback {
}
} else {
if (args.has("contractName")) {
c.setContractID(args.get("contractName").getAsString());
cReq.setContractID(args.get("contractName").getAsString());
}
if (args.has("contractID")) {
c.setContractID(args.get("contractID").getAsString());
cReq.setContractID(args.get("contractID").getAsString());
}
}
if (args.has("isDebug")) c.setFromDebug(args.get("isDebug").getAsBoolean());
if (args.has("isDebug")) {
cReq.setFromDebug(args.get("isDebug").getAsBoolean());
}
if (args.has("withDynamicAnalysis"))
c.withDynamicAnalysis = args.get("withDynamicAnalysis").getAsBoolean();
cReq.withDynamicAnalysis = args.get("withDynamicAnalysis").getAsBoolean();
if (args.has("withEvaluatesAnalysis"))
c.withEvaluatesAnalysis = args.get("withEvaluatesAnalysis").getAsBoolean();
cReq.withEvaluatesAnalysis = args.get("withEvaluatesAnalysis").getAsBoolean();
if (!args.has("arg")) {
resultCallback.onResult(MISSING_ARGUMENT);
return;
}
if (args.has("operation")) {
c.setAction(args.get("operation").getAsString());
c.setArg(args.get("arg").getAsString());
cReq.setAction(args.get("operation").getAsString());
cReq.setArg(args.get("arg").getAsString());
} else {
JsonObject jo = JsonParser.parseString(args.get("arg").getAsString()).getAsJsonObject();
if (!jo.has("action") || !jo.has("arg")) {
resultCallback.onResult(MISSING_ARGUMENT);
return;
}
c.setAction(jo.get("action").getAsString());
c.setArg(jo.get("arg").getAsString());
if (c.withEvaluatesAnalysis) {
c.setValue(jo.get("hasValue").getAsLong());
cReq.setAction(jo.get("action").getAsString());
cReq.setArg(jo.get("arg").getAsString());
if (cReq.withEvaluatesAnalysis) {
cReq.setValue(jo.get("hasValue").getAsLong());
}
}
if (args.has("gasLimit")) c.setGasLimit(args.get("gasLimit").getAsLong());
if (args.has("gasLimit")) {
cReq.setGasLimit(args.get("gasLimit").getAsLong());
}
if (args.has("requester")) {
c.setPublicKey(args.get("requester").getAsString());
cReq.setPublicKey(args.get("requester").getAsString());
byte[] sign = ByteUtils.fromHexString(args.get("signature").getAsString());
c.setSignature(ByteUtils.toHexString(sign));
cReq.setSignature(ByteUtils.toHexString(sign));
}
if (args.has("requesterDOI")) {
c.setRequesterDOI(args.get("requesterDOI").getAsString());
cReq.setRequesterDOI(args.get("requesterDOI").getAsString());
} else {
c.setRequesterDOI("empty");
cReq.setRequesterDOI("empty");
}
if (args.has("pubkey")) {
c.setPublicKey(args.get("pubkey").getAsString());
cReq.setPublicKey(args.get("pubkey").getAsString());
byte[] sign = ByteUtils.fromHexString(args.get("signature").getAsString());
c.setSignature(ByteUtils.toHexString(sign));
cReq.setSignature(ByteUtils.toHexString(sign));
}
if (c.getPublicKey() != null) {
if (!c.verifySignature()) {
c.setPublicKey(null);
c.setRequester(null);
if (cReq.getPublicKey() != null) {
if (!cReq.verifySignature()) {
cReq.setPublicKey(null);
cReq.setRequester(null);
}
}
String reqID;
if (args.has("requestID")) reqID = args.get("requestID").getAsString();
else {
if (args.has("requestID")) {
reqID = args.get("requestID").getAsString();
} else {
reqID = System.currentTimeMillis() + "_" + (int) (Math.random() * 1000);
args.addProperty("requestID", reqID);
}
c.setRequestID(reqID);
cReq.setRequestID(reqID);
long start = System.currentTimeMillis();
manager.executeContractInternal(
c,
cReq,
new ResultCallback() {
@Override
public void onResult(JsonObject ret) {
ret.addProperty("responseID", c.getRequestID());
ret.addProperty("responseID", cReq.getRequestID());
ret.addProperty("action", "onExecuteResult");
String costTime = (System.currentTimeMillis() - start) + "";
ret.addProperty("executeTime", costTime);
ContractMeta meta =
manager.statusRecorder.getContractMeta(c.getContractID());
manager.statusRecorder.getContractMeta(cReq.getContractID());
if (meta != null && meta.getIsDebug()) {
FUNCINVOKEINFO.putOneInvoke(
c.getContractID(),
c.getAction(),
c.getRequestID(),
c.getArg(),
cReq.getContractID(),
cReq.getAction(),
cReq.getRequestID(),
cReq.getArg(),
ret.has("result") ? ret.get("result").toString() : "");
}
LOGGER.debug("result of request " + cReq.getRequestID() + ": " + ret);
resultCallback.onResult(ret.toString());
}

View File

@ -303,7 +303,7 @@ public class MasterWSAction {
+ contract.getID()
+ "\","
+ "\"action\":\"onStartTrustfulContract\"}");
LOGGER.info("success!");
LOGGER.info("startContractMultiPoint succeed!");
}
}

View File

@ -193,13 +193,13 @@ public class MasterClientTCPAction {
Contract contract = JsonUtil.fromJson(jo.get("contractStr").getAsString(), Contract.class);
String contractID = contract.getID();
// 获取contract type
LOGGER.info(contract.getType());
LOGGER.debug(contract.getType());
if (contract.getType() == ContractExecType.Sharding) {
// 每节点都是master且MPCI中需要实例化出MultiPointCooperationExecutor
// 需要计算出自己的ShardingID路由规则id/requester/arg-->shardingId)
// 也在MultiPointCooperationExecutor中实现
}
//TOODO master连接
//TODO master连接
// contractID2MasterInfo.put(contractID, this); // 记录contractID master之间的对应关系
MultiContractMeta cei =
CMActions.manager.multiContractRecorder.createIfNotExist(contractID);
@ -252,12 +252,10 @@ public class MasterClientTCPAction {
cei.setIsMaster(true);
}
LOGGER.info("启动参数: " + JsonUtil.toJson(contract));
LOGGER.debug("startup arguments: " + JsonUtil.toJson(contract));
String ret = CMActions.manager.startContract(contract); // 调用CMActions 里的启动合约的方法,启动结果
LOGGER.info("启动结果为 " + ret);
LOGGER.debug("startup result: " + ret);
CMActions.manager.multiContractRecorder.updateValue(cei);
ContractMeta meta = CMActions.manager.statusRecorder.createIfNotExist(contractID);

View File

@ -17,10 +17,7 @@ import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
import org.bdware.units.NetworkManager;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
@ -47,7 +44,10 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
2,
2,
TimeUnit.SECONDS);
ContractManager.threadPool.submit(() -> {
LOGGER.debug(String.format("ContractManager.threadPool=%d/%d",
((ThreadPoolExecutor) ContractManager.threadPool).getActiveCount(),
((ThreadPoolExecutor) ContractManager.threadPool).getPoolSize()));
ContractManager.threadPool.execute(() -> {
LOGGER.info(
"[SelfAdaptiveShardingExecutor " + meta.getContractID() + "] starting service..." + running);
while (running) {
@ -91,10 +91,16 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
ContractResult.Status.Executing,
new JsonPrimitive("this request is adding into blocks"))));
if (reqQueue.size() >= SUBMIT_LIMIT) {
submitBlock();
ContractManager.threadPool.execute(this::submitBlock);
}
}
@Override
public void close() {
this.future.cancel(false);
this.running = false;
LOGGER.info("destruct executor of contract " + meta.getContractID());
}
public void execute(String blockStr) {
Block block = JsonUtil.fromJson(blockStr, Block.class);
@ -128,7 +134,11 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
// TODO check status
for (ContractRequest request : block.requests) {
String ret = ContractManager.instance.executeLocally(request, null);
LOGGER.debug("result of request " + request.getRequestID() + ": " + ret);
LOGGER.debug(String.format(
"[SelfAdaptiveShardingExecutor %s] result of request %s: %s",
meta.getContractID(),
request.getRequestID(),
ret));
executedTxs.put(request.getRequestID(), true);
}
LOGGER.info(String.format(
@ -142,24 +152,23 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
}
private void submitBlock() {
ContractManager.threadPool.execute(() -> {
Block block = fillBlock();
if (null != block) {
LOGGER.debug(JsonUtil.toPrettyJson(block));
String[] nodes = this.meta.getMembers();
JsonObject req = new JsonObject();
req.addProperty("action", "deliverBlock");
req.addProperty("data", JsonUtil.toJson(block));
req.addProperty("contractID", this.meta.getContractID());
String reqStr = req.toString();
for (String node : nodes) {
if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID())
== RecoverFlag.Fine) {
NetworkManager.instance.sendToAgent(node, reqStr);
}
Block block = fillBlock();
if (null != block) {
LOGGER.info("deliver block " + block.hash + "...");
LOGGER.debug(JsonUtil.toPrettyJson(block));
String[] nodes = this.meta.getMembers();
JsonObject req = new JsonObject();
req.addProperty("action", "deliverBlock");
req.addProperty("data", JsonUtil.toJson(block));
req.addProperty("contractID", this.meta.getContractID());
String reqStr = req.toString();
for (String node : nodes) {
if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID())
== RecoverFlag.Fine) {
NetworkManager.instance.sendToAgent(node, reqStr);
}
}
});
}
}
private synchronized Block fillBlock() {