feat: add SelfAdaptiveShardingExecutor

add new type of multi-point contract "SelfAdaptiveSharding"
This commit is contained in:
Frank.R.Wu 2021-11-11 12:52:39 +08:00
parent 40816e8a45
commit 837cf8ba70
6 changed files with 302 additions and 69 deletions

View File

@ -137,6 +137,7 @@ task copyWebContent(type: Copy) {
exclude 'client/README.md' exclude 'client/README.md'
exclude 'client/.idea/' exclude 'client/.idea/'
exclude 'client/.gitignore' exclude 'client/.gitignore'
exclude '.keep'
} }
into "./build/output/WebContent" into "./build/output/WebContent"
} }
@ -167,9 +168,10 @@ task buildBDServerZip(type: Zip, dependsOn: [":agent-backend:copyWebContent",
task buildBDServerZipLite(type: Zip, dependsOn: [":agent-backend:copyWebContent", task buildBDServerZipLite(type: Zip, dependsOn: [":agent-backend:copyWebContent",
":agent-backend:copyBDWareProjectDir", ":agent-backend:copyScript", ":agent-backend:copyBDWareProjectDir", ":agent-backend:copyScript",
":agent-backend:copyJar", ":agent-backend:copyLibs", ":agent-backend:copyKeys"]) { ":agent-backend:copyJar", ":agent-backend:copyLibs",
":agent-backend:copyKeys"]) {
from('./build/output/') { from('./build/output/') {
exclude 'BDWareProjectDir/public/**' exclude 'BDWareProjectDir/public/TF**/'
exclude 'WebContent/doc/' exclude 'WebContent/doc/'
} }
duplicatesStrategy = DuplicatesStrategy.INCLUDE duplicatesStrategy = DuplicatesStrategy.INCLUDE
@ -177,9 +179,23 @@ task buildBDServerZipLite(type: Zip, dependsOn: [":agent-backend:copyWebContent"
destinationDirectory = file('build/') destinationDirectory = file('build/')
} }
task buildBDServerZipMin(type: Zip, dependsOn: [":agent-backend:copyWebContent",
":agent-backend:copyBDWareProjectDir", ":agent-backend:copyScript",
":agent-backend:copyJar", ":agent-backend:copyLibs",
":agent-backend:copyKeys"]) {
from('./build/output/') {
exclude 'BDWareProjectDir/public/**'
exclude 'WebContent/doc/'
}
duplicatesStrategy = DuplicatesStrategy.INCLUDE
archiveFileName = 'bdserver-min.zip'
destinationDirectory = file('build/')
}
task buildBDServerZipUpdate(type: Zip, dependsOn: [":agent-backend:copyCP", ":agent-backend:copyWebContent", task buildBDServerZipUpdate(type: Zip, dependsOn: [":agent-backend:copyCP", ":agent-backend:copyWebContent",
":agent-backend:copyBDWareProjectDir", ":agent-backend:copyScript", ":agent-backend:copyBDWareProjectDir", ":agent-backend:copyScript",
":agent-backend:copyJar", ":agent-backend:copyLibs", ":agent-backend:copyKeys"]) { ":agent-backend:copyJar", ":agent-backend:copyLibs",
":agent-backend:copyKeys"]) {
from('./build/output/') { from('./build/output/') {
include '*.jar' include '*.jar'
include 'libs/' include 'libs/'

View File

@ -117,15 +117,17 @@ public class CMActions implements OnHashCallback {
final JsonObject args, final JsonObject args,
final ResultCallback resultCallback, final ResultCallback resultCallback,
final OnHashCallback hashcb) { final OnHashCallback hashcb) {
final ContractRequest c = new ContractRequest(); final ContractRequest cReq = new ContractRequest();
if (!args.has("contractName") && !args.has("contractID") && !args.has("contractDOI")) { if (!args.has("contractName") &&
!args.has("contractID") &&
!args.has("contractDOI")) {
resultCallback.onResult(MISSING_ARGUMENT); resultCallback.onResult(MISSING_ARGUMENT);
return; return;
} }
if (args.has("contractDOI") && !args.has("contractID")) { if (args.has("contractDOI") && !args.has("contractID")) {
LOGGER.debug("contractDOI: " + args.get("contractDOI").getAsString()); LOGGER.debug("contractDOI: " + args.get("contractDOI").getAsString());
try { try {
c.setContractDOI(args.get("contractDOI").getAsString()); cReq.setContractDOI(args.get("contractDOI").getAsString());
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
resultCallback.onResult(INVALID_DOI); resultCallback.onResult(INVALID_DOI);
@ -133,87 +135,91 @@ public class CMActions implements OnHashCallback {
} }
} else { } else {
if (args.has("contractName")) { if (args.has("contractName")) {
c.setContractID(args.get("contractName").getAsString()); cReq.setContractID(args.get("contractName").getAsString());
} }
if (args.has("contractID")) { if (args.has("contractID")) {
c.setContractID(args.get("contractID").getAsString()); cReq.setContractID(args.get("contractID").getAsString());
} }
} }
if (args.has("isDebug")) c.setFromDebug(args.get("isDebug").getAsBoolean()); if (args.has("isDebug")) {
cReq.setFromDebug(args.get("isDebug").getAsBoolean());
}
if (args.has("withDynamicAnalysis")) if (args.has("withDynamicAnalysis"))
c.withDynamicAnalysis = args.get("withDynamicAnalysis").getAsBoolean(); cReq.withDynamicAnalysis = args.get("withDynamicAnalysis").getAsBoolean();
if (args.has("withEvaluatesAnalysis")) if (args.has("withEvaluatesAnalysis"))
c.withEvaluatesAnalysis = args.get("withEvaluatesAnalysis").getAsBoolean(); cReq.withEvaluatesAnalysis = args.get("withEvaluatesAnalysis").getAsBoolean();
if (!args.has("arg")) { if (!args.has("arg")) {
resultCallback.onResult(MISSING_ARGUMENT); resultCallback.onResult(MISSING_ARGUMENT);
return; return;
} }
if (args.has("operation")) { if (args.has("operation")) {
c.setAction(args.get("operation").getAsString()); cReq.setAction(args.get("operation").getAsString());
c.setArg(args.get("arg").getAsString()); cReq.setArg(args.get("arg").getAsString());
} else { } else {
JsonObject jo = JsonParser.parseString(args.get("arg").getAsString()).getAsJsonObject(); JsonObject jo = JsonParser.parseString(args.get("arg").getAsString()).getAsJsonObject();
if (!jo.has("action") || !jo.has("arg")) { if (!jo.has("action") || !jo.has("arg")) {
resultCallback.onResult(MISSING_ARGUMENT); resultCallback.onResult(MISSING_ARGUMENT);
return; return;
} }
c.setAction(jo.get("action").getAsString()); cReq.setAction(jo.get("action").getAsString());
c.setArg(jo.get("arg").getAsString()); cReq.setArg(jo.get("arg").getAsString());
if (c.withEvaluatesAnalysis) { if (cReq.withEvaluatesAnalysis) {
c.setValue(jo.get("hasValue").getAsLong()); cReq.setValue(jo.get("hasValue").getAsLong());
} }
} }
if (args.has("gasLimit")) c.setGasLimit(args.get("gasLimit").getAsLong()); if (args.has("gasLimit")) {
cReq.setGasLimit(args.get("gasLimit").getAsLong());
}
if (args.has("requester")) { if (args.has("requester")) {
c.setPublicKey(args.get("requester").getAsString()); cReq.setPublicKey(args.get("requester").getAsString());
byte[] sign = ByteUtils.fromHexString(args.get("signature").getAsString()); byte[] sign = ByteUtils.fromHexString(args.get("signature").getAsString());
c.setSignature(ByteUtils.toHexString(sign)); cReq.setSignature(ByteUtils.toHexString(sign));
} }
if (args.has("requesterDOI")) { if (args.has("requesterDOI")) {
c.setRequesterDOI(args.get("requesterDOI").getAsString()); cReq.setRequesterDOI(args.get("requesterDOI").getAsString());
} else { } else {
c.setRequesterDOI("empty"); cReq.setRequesterDOI("empty");
} }
if (args.has("pubkey")) { if (args.has("pubkey")) {
c.setPublicKey(args.get("pubkey").getAsString()); cReq.setPublicKey(args.get("pubkey").getAsString());
byte[] sign = ByteUtils.fromHexString(args.get("signature").getAsString()); byte[] sign = ByteUtils.fromHexString(args.get("signature").getAsString());
c.setSignature(ByteUtils.toHexString(sign)); cReq.setSignature(ByteUtils.toHexString(sign));
} }
if (c.getPublicKey() != null) { if (cReq.getPublicKey() != null) {
if (!c.verifySignature()) { if (!cReq.verifySignature()) {
c.setPublicKey(null); cReq.setPublicKey(null);
c.setRequester(null); cReq.setRequester(null);
} }
} }
String reqID; String reqID;
if (args.has("requestID")) reqID = args.get("requestID").getAsString(); if (args.has("requestID")) {
else { reqID = args.get("requestID").getAsString();
} else {
reqID = System.currentTimeMillis() + "_" + (int) (Math.random() * 1000); reqID = System.currentTimeMillis() + "_" + (int) (Math.random() * 1000);
args.addProperty("requestID", reqID); args.addProperty("requestID", reqID);
} }
c.setRequestID(reqID); cReq.setRequestID(reqID);
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
manager.executeContractInternal( manager.executeContractInternal(
c, cReq,
new ResultCallback() { new ResultCallback() {
@Override @Override
public void onResult(JsonObject ret) { public void onResult(JsonObject ret) {
ret.addProperty("responseID", c.getRequestID()); ret.addProperty("responseID", cReq.getRequestID());
ret.addProperty("action", "onExecuteResult"); ret.addProperty("action", "onExecuteResult");
String costTime = (System.currentTimeMillis() - start) + ""; String costTime = (System.currentTimeMillis() - start) + "";
ret.addProperty("executeTime", costTime); ret.addProperty("executeTime", costTime);
ContractMeta meta = ContractMeta meta =
manager.statusRecorder.getContractMeta(c.getContractID()); manager.statusRecorder.getContractMeta(cReq.getContractID());
if (meta != null && meta.getIsDebug()) { if (meta != null && meta.getIsDebug()) {
FUNCINVOKEINFO.putOneInvoke( FUNCINVOKEINFO.putOneInvoke(
c.getContractID(), cReq.getContractID(),
c.getAction(), cReq.getAction(),
c.getRequestID(), cReq.getRequestID(),
c.getArg(), cReq.getArg(),
ret.has("result") ? ret.get("result").toString() : ""); ret.has("result") ? ret.get("result").toString() : "");
} }
System.out.println(ret.toString()); System.out.println(ret);
resultCallback.onResult(ret.toString()); resultCallback.onResult(ret.toString());
} }
@ -1159,17 +1165,17 @@ public class CMActions implements OnHashCallback {
public void connectTo(JsonObject args, ResultCallback resultCallback) { public void connectTo(JsonObject args, ResultCallback resultCallback) {
String data; String data;
if (!args.has("id")) { if (!args.has("id")) {
ReplyUtil.simpleReply(resultCallback,"onConnectTo","missing contract id"); ReplyUtil.simpleReply(resultCallback, "onConnectTo", "missing contract id");
return; return;
} }
String contractID = args.get("id").getAsString(); String contractID = args.get("id").getAsString();
LOGGER.info("connectTo:" + contractID); LOGGER.info("connectTo:" + contractID);
if (contractID == null) { if (contractID == null) {
ReplyUtil.simpleReply(resultCallback,"onConnectTo","can't find contract id"); ReplyUtil.simpleReply(resultCallback, "onConnectTo", "can't find contract id");
return; return;
} }
manager.redirect(contractID, createPS(), ""); manager.redirect(contractID, createPS(), "");
ReplyUtil.simpleReply(resultCallback,"onConnectTo","success"); ReplyUtil.simpleReply(resultCallback, "onConnectTo", "success");
} }
private PrintStream createPS() { private PrintStream createPS() {
@ -1244,7 +1250,7 @@ public class CMActions implements OnHashCallback {
ExecutionManager.instance.updateLocalContractToNodeCenter(); ExecutionManager.instance.updateLocalContractToNodeCenter();
} }
} else { } else {
ReplyUtil.simpleReply(resultCallback,"onKillContractProcess","Failed: Illegal parameters"); ReplyUtil.simpleReply(resultCallback, "onKillContractProcess", "Failed: Illegal parameters");
} }
} }
@ -1504,14 +1510,14 @@ public class CMActions implements OnHashCallback {
e.printStackTrace(); e.printStackTrace();
} }
ExecutionManager.instance.updateLocalContractToNodeCenter(); ExecutionManager.instance.updateLocalContractToNodeCenter();
ReplyUtil.simpleReply(resultCallback,"onKillAllContract",sb.toString()); ReplyUtil.simpleReply(resultCallback, "onKillAllContract", sb.toString());
manager.stopAllContracts(); manager.stopAllContracts();
} else { } else {
manager.stopAllContractsWithOwner(args.get(("verifiedPubKey")).getAsString()); manager.stopAllContractsWithOwner(args.get(("verifiedPubKey")).getAsString());
ReplyUtil.simpleReply(resultCallback,"onKillAllContract","Success"); ReplyUtil.simpleReply(resultCallback, "onKillAllContract", "Success");
} }
} else { } else {
ReplyUtil.simpleReply(resultCallback,"onKillAllContract","Failed: Illegal user"); ReplyUtil.simpleReply(resultCallback, "onKillAllContract", "Failed: Illegal user");
} }
} }

View File

@ -19,8 +19,8 @@ import org.bdware.server.GlobalConf;
import org.bdware.server.action.p2p.MasterServerRecoverMechAction; import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
import org.bdware.server.action.p2p.MasterServerTCPAction; import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.trustedmodel.MultiPointContractInfo; import org.bdware.server.trustedmodel.MultiPointContractInfo;
import org.bdware.server.trustedmodel.ResultCollector;
import org.bdware.server.trustedmodel.MultiPointCooperateContractInfo; import org.bdware.server.trustedmodel.MultiPointCooperateContractInfo;
import org.bdware.server.trustedmodel.ResultCollector;
import org.bdware.server.trustedmodel.SlaveNode; import org.bdware.server.trustedmodel.SlaveNode;
import org.bdware.units.NetworkManager; import org.bdware.units.NetworkManager;
import org.zz.gmhelper.SM2KeyPair; import org.zz.gmhelper.SM2KeyPair;
@ -140,14 +140,27 @@ public class MasterWSAction {
} }
*/ */
List<String> nodeNames; // nodes' peerID List<String> nodeNames; // nodes' peerID
// all nodes' peerID in the unit if (contract.getType().equals(ContractExecType.SelfAdaptiveSharding)) {
String[] nodeNamesStr = if (ContractManager.instance.nodeCenterConn.listNodes().length < 3) {
args.get("peersID").getAsString().split(","); rc.onResult(
// record information of these nodes "{\"status\":\"Error\",\"result\":\"No enough nodes!\","
nodeNames = + "\"action\":\"onStartTrustfulContract\"}");
Arrays.stream(nodeNamesStr) return;
.filter(x -> null != x && !x.isEmpty()) }
.collect(Collectors.toList()); int unitSize = args.get("selectUnitNum").getAsInt();
nodeNames =
Arrays.asList(ContractManager.instance.nodeCenterConn.getClusterByKey(
contractID,
Math.max(unitSize, 3)));
} else {// all nodes' peerID in the unit
String[] nodeNamesStr =
args.get("peersID").getAsString().split(",");
// record information of these nodes
nodeNames =
Arrays.stream(nodeNamesStr)
.filter(x -> null != x && !x.isEmpty())
.collect(Collectors.toList());
}
int nodeSize = nodeNames.size(); int nodeSize = nodeNames.size();
// 方式一向NodeCenter发要求Slave节点主动连接到Master节点. // 方式一向NodeCenter发要求Slave节点主动连接到Master节点.
@ -229,6 +242,7 @@ public class MasterWSAction {
case RequestAllResponseHalf: case RequestAllResponseHalf:
case RequestAllResponseAll: case RequestAllResponseAll:
case Sharding: case Sharding:
case SelfAdaptiveSharding:
contract.setNumOfCopies(nodeSize); contract.setNumOfCopies(nodeSize);
break; break;
default: default:
@ -274,7 +288,7 @@ public class MasterWSAction {
+ contract.getID() + contract.getID()
+ "\"," + "\","
+ "\"action\":\"onStartTrustfulContract\"}"); + "\"action\":\"onStartTrustfulContract\"}");
LOGGER.info("启动多点合约完成"); LOGGER.info("success!");
} }
private boolean waitForConnection(List<String> nodeNames) { private boolean waitForConnection(List<String> nodeNames) {

View File

@ -154,6 +154,9 @@ public class MasterClientTCPAction {
case Sharding: case Sharding:
executor = new MultiPointCooperationExecutor(ContractExecType.Sharding, nodeSize, contractID); executor = new MultiPointCooperationExecutor(ContractExecType.Sharding, nodeSize, contractID);
break; break;
case SelfAdaptiveSharding:
executor = new SelfAdaptiveShardingExecutor(contractID);
break;
} }
return executor; return executor;
} }
@ -234,7 +237,8 @@ public class MasterClientTCPAction {
.format(lastMasterPongTime) .format(lastMasterPongTime)
+ " 认为master崩溃!"); + " 认为master崩溃!");
// 向NC发通知重新选举master如果NC没有收到所有节点的重选请求就认为是这个节点和master连接断开这个节点需要重连master // 向NC发通知重新选举master如果NC没有收到所有节点的重选请求就认为是这个节点和master连接断开
// 这个节点需要重连master
Map<String, String> request = new HashMap<>(); Map<String, String> request = new HashMap<>();
request.put("action", "electMaster"); request.put("action", "electMaster");
SM2KeyPair keyPair = GlobalConf.instance.keyPair; SM2KeyPair keyPair = GlobalConf.instance.keyPair;
@ -405,11 +409,11 @@ public class MasterClientTCPAction {
File temp = new File(parPath, pp[pp.length - 1]); File temp = new File(parPath, pp[pp.length - 1]);
if (!temp.exists()) { if (!temp.exists()) {
result.onResult( result.onResult(
"{\"action\":\"onStartContractTrustfully\",\"result\":\"missing contract files\",\"requestID\":\"" String.format(
+ jo.get("requestID").getAsString() "{\"action\":\"onStartContractTrustfully\",\"result\":\"missing contract files\"," +
+ "\",\"pubKey\":\"" "\"requestID\":\"%s\",\"pubKey\":\"%s\"}",
+ GlobalConf.instance.keyPair.getPublicKeyStr() jo.get("requestID").getAsString(),
+ "\"}"); GlobalConf.instance.keyPair.getPublicKeyStr()));
return; return;
} }
contract.setScript(temp.getAbsolutePath()); contract.setScript(temp.getAbsolutePath());
@ -420,7 +424,9 @@ public class MasterClientTCPAction {
cei.setMaster(jo.get("master").getAsString()); cei.setMaster(jo.get("master").getAsString());
if (contract.getType() != ContractExecType.Sharding) if (contract.getType() != ContractExecType.Sharding)
cei.setIsMaster(GlobalConf.getNodeID().equals(jo.get("master").getAsString())); cei.setIsMaster(GlobalConf.getNodeID().equals(jo.get("master").getAsString()));
else cei.setIsMaster(true); else {
cei.setIsMaster(true);
}
LOGGER.info("启动参数: " + JsonUtil.toJson(contract)); LOGGER.info("启动参数: " + JsonUtil.toJson(contract));
cei.setContractExecutor(createContractExecutor(contract, contractID)); // 分配不同的Executor cei.setContractExecutor(createContractExecutor(contract, contractID)); // 分配不同的Executor
@ -485,6 +491,17 @@ public class MasterClientTCPAction {
return "/" + scriptName + ".ypk"; return "/" + scriptName + ".ypk";
} }
@Action(async = true)
public void deliverBlock(JsonObject jo, ResultCallback result) {
if (jo.has("contractID") && jo.has("data")) {
String contractID = jo.get("contractID").getAsString();
MultiContractMeta cei = ContractManager.instance.multiContractRecorder.getMultiContractMeta(contractID);
if (null != cei && cei.contractExecutor instanceof SelfAdaptiveShardingExecutor) {
((SelfAdaptiveShardingExecutor) cei.contractExecutor).execute(jo.get("data").getAsString());
}
}
}
@Action(async = true) @Action(async = true)
public void executeContractLocally(JsonObject jo, ResultCallback result) { public void executeContractLocally(JsonObject jo, ResultCallback result) {
final ContractRequest request = final ContractRequest request =
@ -533,8 +550,8 @@ public class MasterClientTCPAction {
MultiRequestInfo mri = MultiRequestInfo mri =
MultiRequestInfo.reqInfos.get(request.getRequestID()); MultiRequestInfo.reqInfos.get(request.getRequestID());
if (request.seq if (request.seq == cei.curExeSeq) {
== cei.curExeSeq) { // 正在执行多点请求时收到多点请求说明这个请求不是第一个到的同requestID的请求s // 正在执行多点请求时收到多点请求说明这个请求不是第一个到的同requestID的请求s
mri.callbackMap.put(jo.get("uniReqID").getAsString(), result); mri.callbackMap.put(jo.get("uniReqID").getAsString(), result);
mri.putFlag(jo.get("uniReqID").getAsString(), false); mri.putFlag(jo.get("uniReqID").getAsString(), false);
} else if (request.seq > cei.curExeSeq } else if (request.seq > cei.curExeSeq

View File

@ -78,9 +78,9 @@ public class MasterProxy implements MasterStub {
new ResultCallback() { new ResultCallback() {
@Override @Override
public void onResult(String ret) { public void onResult(String ret) {
JsonObject result =JsonUtil.parseString(ret); JsonObject result = JsonUtil.parseString(ret);
ContractManager.instance.extractEventsFromContractResult( ContractManager.instance.extractEventsFromContractResult(
null, result, client, request, start); null, result, client, request, start);
LOGGER.debug( LOGGER.debug(
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(new Date(System.currentTimeMillis())) .format(new Date(System.currentTimeMillis()))

View File

@ -0,0 +1,180 @@
package org.bdware.server.trustedmodel;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.ContractManager;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.RecoverFlag;
import org.bdware.sc.util.HashUtil;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
import org.bdware.server.action.p2p.MasterServerTCPAction;
import java.util.Arrays;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author Kaidong Wu
*/
public class SelfAdaptiveShardingExecutor implements ContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class);
private static final int SUBMIT_LIMIT = 1024;
private final Queue<ContractRequest> reqQueue = new ConcurrentLinkedQueue<>();
private final MultiContractMeta meta;
private final Map<String, Block> toExecuted = new ConcurrentHashMap<>();
private final Set<String> executedBlocks = ConcurrentHashMap.newKeySet();
private final Map<String, Boolean> executedTxs = new ConcurrentHashMap<>();
private final Object flag = new Object();
private final ScheduledFuture<?> future;
private boolean running = true;
private Block b = new Block();
public SelfAdaptiveShardingExecutor(String contractID) {
this.meta =
ContractManager.instance.multiContractRecorder.getMultiContractMeta(contractID);
this.future = ContractManager.scheduledThreadPool.scheduleWithFixedDelay(
this::submitBlock,
2,
2,
TimeUnit.SECONDS);
ContractManager.threadPool.submit(() -> {
while (running) {
while (!toExecuted.isEmpty()) {
String key = this.b.prevHash;
Block block = toExecuted.get(key);
if (null != block) {
executeBlock(block);
}
toExecuted.remove(key);
}
synchronized (flag) {
try {
flag.wait();
} catch (InterruptedException e) {
LOGGER.warn(String.format(
"[SelfAdaptiveShardingExecutor %s] waiting is interrupted: %s",
meta.getContractID(),
e.getMessage()));
}
}
}
});
}
public void close() {
this.future.cancel(false);
this.running = false;
}
@Override
public void execute(String requestID, ResultCallback rc, ContractRequest req) {
if (executedTxs.containsKey(requestID)) {
return;
}
executedTxs.put(requestID, false);
reqQueue.add(req);
if (reqQueue.size() >= SUBMIT_LIMIT) {
submitBlock();
}
}
public void execute(String blockStr) {
Block block = JsonUtil.fromJson(blockStr, Block.class);
LOGGER.info(String.format(
"[SelfAdaptiveShardingExecutor %s] receive block %s -> %s, %d transactions, timestamp %d",
meta.getContractID(),
block.hash,
block.prevHash,
block.requests.length,
block.timestamp));
if (!executedBlocks.contains(block.hash)) {
toExecuted.put(block.prevHash, block);
synchronized (flag) {
flag.notify();
}
}
}
private synchronized void executeBlock(Block block) {
for (ContractRequest request : block.requests) {
if (executedTxs.containsKey(request.getContractID()) && executedTxs.get(request.getContractID())) {
return;
}
}
for (ContractRequest request : block.requests) {
ContractManager.instance.executeLocally(request, null);
executedTxs.put(request.getContractID(), true);
}
this.b = new Block(block.hash);
executedBlocks.add(block.hash);
}
private void submitBlock() {
ContractManager.threadPool.execute(() -> {
Block block = fillBlock();
if (null != block) {
String[] nodes = this.meta.getMembers();
JsonObject req = new JsonObject();
req.addProperty("action", "deliverBlock");
req.addProperty("data", JsonUtil.toJson(block));
req.addProperty("contractID", this.meta.getContractID());
String reqStr = req.toString();
for (String node : nodes) {
SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(node);
if (cmNode != null &&
MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID())
== RecoverFlag.Fine) {
cmNode.connection.sendMsg(reqStr);
}
}
}
});
}
private synchronized Block fillBlock() {
ContractRequest[] requests = new ContractRequest[Math.min(reqQueue.size(), SUBMIT_LIMIT)];
if (requests.length == 0) {
return null;
}
for (int i = 0; i < requests.length; ++i) {
requests[i] = reqQueue.poll();
}
this.b.fillBlock(null, requests);
return this.b;
}
static class Block {
String prevHash = "0";
String hash;
String checkPoint;
ContractRequest[] requests;
long timestamp;
public Block() {
}
public Block(String prev) {
this.prevHash = prev;
}
public void fillBlock(String cp, ContractRequest[] requests) {
this.checkPoint = cp;
this.requests = requests;
this.timestamp = System.currentTimeMillis();
hash = HashUtil.sha3(
prevHash,
cp,
Arrays.stream(requests).map(ContractRequest::getRequestID).collect(Collectors.joining()));
}
}
}