This commit is contained in:
Frank.R.Wu 2021-11-22 14:38:28 +08:00
parent 691045326e
commit 6029666603
3 changed files with 51 additions and 9 deletions

View File

@ -171,8 +171,8 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler<Object> {
try {
JsonObject jo = JsonUtil.parseStringAsJsonObject(json);
} catch (Exception e) {
e.printStackTrace();
System.out.println("============[MasterClientFrameHandler]JsonParse Error:" + json);
LOGGER.warn("JsonParse Error! " + e.getMessage());
LOGGER.debug("JsonParse Error! " + json);
}
ByteBuf buf = Unpooled.wrappedBuffer(json.getBytes());
ctx.channel().writeAndFlush(buf);

View File

@ -4,15 +4,18 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.ContractClient;
import org.bdware.sc.ContractManager;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.bean.FunctionDesp;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.RecoverFlag;
import org.bdware.sc.util.HashUtil;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
import org.bdware.units.NetworkManager;
@ -38,7 +41,7 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
public SelfAdaptiveShardingExecutor(String contractID) {
this.meta =
ContractManager.instance.multiContractRecorder.getMultiContractMeta(contractID);
CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
this.future = ContractManager.scheduledThreadPool.scheduleWithFixedDelay(
this::submitBlock,
2,
@ -77,19 +80,48 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hrc) {
public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) {
// check client
ContractClient client = CMActions.manager.getClient(meta.getContractID());
if (null == client) {
LOGGER.error("contract " + meta.getContractID() + " not found!");
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("contract " + meta.getContractID() + " not found!"))));
return;
}
// check function
FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction());
if (null == funDesp) {
LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!");
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive(
String.format("action %s of contract %s not found!",
req.getAction(),
meta.getContractID())))));
return;
}
// for view function, execute it
if (funDesp.isView) {
CMActions.manager.executeLocallyAsync(req, rcb, hcb);
return;
}
// normal function, check if it is in blocks
if (executedTxs.containsKey(requestID)) {
rc.onResult(JsonUtil.toJson(new ContractResult(
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("this request has been packed!"))));
return;
}
// add blocks into request cache
LOGGER.debug("receive contract request " + requestID);
executedTxs.put(requestID, false);
reqQueue.add(req);
rc.onResult(JsonUtil.toJson(new ContractResult(
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Executing,
new JsonPrimitive("this request is adding into blocks"))));
// if cache is full, submit
if (reqQueue.size() >= SUBMIT_LIMIT) {
ContractManager.threadPool.execute(this::submitBlock);
}
@ -97,16 +129,19 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
@Override
public void close() {
this.future.cancel(false);
// stop threads
this.future.cancel(true);
this.running = false;
LOGGER.info("destruct executor of contract " + meta.getContractID());
}
public void execute(String blockStr) {
Block block = JsonUtil.fromJson(blockStr, Block.class);
// the block must have not been cached or executed, and must be valid
if (!toExecuted.containsKey(block.prevHash) &&
!executedBlocks.contains(block.hash) &&
block.isValid()) {
// add block into block cache
LOGGER.info(String.format(
"[SelfAdaptiveShardingExecutor %s] receive block %s -> %s," +
" %d transactions, timestamp=%d, size=%d",
@ -117,6 +152,7 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
block.timestamp,
blockStr.length()));
toExecuted.put(block.prevHash, block);
// notify thread to execute blocks
synchronized (flag) {
flag.notify();
}
@ -124,7 +160,9 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
}
private synchronized void executeBlock(Block block) {
// used for the thread to execute blocks
LOGGER.debug("start");
// check contract requests, requests must have not been executed
for (ContractRequest request : block.requests) {
if (executedTxs.containsKey(request.getRequestID()) && executedTxs.get(request.getRequestID())) {
LOGGER.debug("find request " + request.getRequestID() + " has been executed!");
@ -132,8 +170,9 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
}
}
// TODO check status
// executed requests
for (ContractRequest request : block.requests) {
String ret = ContractManager.instance.executeLocally(request, null);
String ret = CMActions.manager.executeLocally(request, null);
LOGGER.debug(String.format(
"[SelfAdaptiveShardingExecutor %s] result of request %s: %s",
meta.getContractID(),
@ -162,6 +201,7 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
req.addProperty("data", JsonUtil.toJson(block));
req.addProperty("contractID", this.meta.getContractID());
String reqStr = req.toString();
// deliver blocks
for (String node : nodes) {
if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID())
== RecoverFlag.Fine) {
@ -172,6 +212,7 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
}
private synchronized Block fillBlock() {
// pack contract requests into a block
ContractRequest[] requests = new ContractRequest[Math.min(reqQueue.size(), SUBMIT_LIMIT)];
if (requests.length == 0) {
return null;
@ -217,6 +258,7 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
}
private String merkle(ContractRequest[] requests) {
// manage requests as a merkle tree
if (requests.length == 0) {
return null;
}

View File

@ -10,6 +10,6 @@ appender.rolling.append=true
appender.rolling.fileName=./log/cm.log
appender.rolling.layout.type=PatternLayout
appender.rolling.layout.pattern=%d-%m%n
rootLogger.level=debug
rootLogger.level=info
rootLogger.appenderRef.stdout.ref=STDOUT
rootLogger.appenderRef.log.ref=log