diff --git a/src/main/java/org/bdware/consistency/plugin/sharding/SelfAdaptiveShardingExecutor.java b/src/main/java/org/bdware/consistency/plugin/sharding/SelfAdaptiveShardingExecutor.java index 3e61606..fe8e2d5 100644 --- a/src/main/java/org/bdware/consistency/plugin/sharding/SelfAdaptiveShardingExecutor.java +++ b/src/main/java/org/bdware/consistency/plugin/sharding/SelfAdaptiveShardingExecutor.java @@ -1,5 +1,6 @@ package org.bdware.consistency.plugin.sharding; +import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; import org.apache.logging.log4j.LogManager; @@ -14,7 +15,6 @@ import org.bdware.sc.bean.SM2Verifiable; import org.bdware.sc.conn.OnHashCallback; import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.units.MultiContractMeta; -import org.bdware.sc.units.RecoverFlag; import org.bdware.sc.util.HashUtil; import org.bdware.sc.util.JsonUtil; @@ -27,65 +27,70 @@ import java.util.stream.Collectors; */ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecutor { private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class); - private static final int SUBMIT_LIMIT = 1024; + private static final int SUBMIT_LIMIT = 5120; + private static final int DELAY = 2; private final Queue reqQueue = new ConcurrentLinkedQueue<>(); private final MultiContractMeta meta; - private final Map toExecuted = new ConcurrentHashMap<>(); - private final Set executedBlocks = ConcurrentHashMap.newKeySet(); + private final Map toExecuted = new ConcurrentHashMap<>(); private final Map executedTxs = new ConcurrentHashMap<>(); - private final Object flag = new Object(); + private final Object executorFlag = new Object(); private final ScheduledFuture future; private boolean running = true; + // the pointer to the latest executed block + private int executorPointer = 0; + // the block to be submitted private Block b = new Block(); public SelfAdaptiveShardingExecutor(String contractID) { - this.meta = - cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID); - this.future = ContractManager.scheduledThreadPool.scheduleWithFixedDelay( + this.meta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID); + this.future = Executors.newScheduledThreadPool(1).scheduleWithFixedDelay( this::submitBlock, - 2, - 2, + DELAY, + DELAY, TimeUnit.SECONDS); - LOGGER.debug(String.format("ContractManager.threadPool=%d/%d", - ((ThreadPoolExecutor) ContractManager.threadPool).getActiveCount(), - ((ThreadPoolExecutor) ContractManager.threadPool).getPoolSize())); - ContractManager.threadPool.execute(() -> { - LOGGER.info( - "[SelfAdaptiveShardingExecutor " + meta.getContractID() + "] starting service..." + running); + Executors.newCachedThreadPool().execute(() -> { + LOGGER.info(String.format( + "[Executor %s] starting executing service... %b", + meta.getContractID(), meta.isMaster())); while (running) { - LOGGER.info("checking blocks to be executed, latest block=" + - this.b.prevHash + ", to be executed size=" + toExecuted.size()); - LOGGER.debug("executed: " + JsonUtil.toJson(executedBlocks) + "\n\t" + JsonUtil.toJson(executedTxs)); - while (!toExecuted.isEmpty()) { - String key = this.b.prevHash; - Block block = toExecuted.get(key); - if (null != block) { - executeBlock(block); - } - toExecuted.remove(key); + LOGGER.info(String.format( + "[Executor %s] checking blocks to be executed, latest height=%s, to be executed size=%d", + meta.getContractID(), executorPointer, toExecuted.size())); + for (Block block = toExecuted.get(executorPointer); + null != block; + executorPointer += 1, block = toExecuted.get(executorPointer)) { + executeBlock(block); + toExecuted.remove(executorPointer); } - synchronized (flag) { - try { - flag.wait(); - } catch (InterruptedException e) { - LOGGER.warn(String.format( - "[SelfAdaptiveShardingExecutor %s] waiting is interrupted: %s", - meta.getContractID(), - e.getMessage())); + try { + synchronized (executorFlag) { + executorFlag.wait(); } + } catch (InterruptedException e) { + LOGGER.warn(String.format( + "[Executor %s] waiting is interrupted: %s", + meta.getContractID(), e.getMessage())); } } }); } + public JsonArray checkCache() { + JsonArray ret = new JsonArray(); + ret.add(executorPointer); + for (Map.Entry entry : toExecuted.entrySet()) { + ret.add(String.format("%d %s", entry.getValue().height, entry.getValue().hash)); + } + return ret; + } + @Override public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) { // check client ContractClient client = cmActions.getManager().getClient(meta.getContractID()); if (null == client) { LOGGER.error("contract " + meta.getContractID() + " not found!"); - rcb.onResult(JsonUtil.toJson(new ContractResult( - ContractResult.Status.Error, + rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Error, new JsonPrimitive("contract " + meta.getContractID() + " not found!")))); return; } @@ -93,12 +98,10 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction()); if (null == funDesp) { LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!"); - rcb.onResult(JsonUtil.toJson(new ContractResult( - ContractResult.Status.Error, - new JsonPrimitive( - String.format("action %s of contract %s not found!", - req.getAction(), - meta.getContractID()))))); + rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Error, + new JsonPrimitive(String.format( + "action %s of contract %s not found!", + req.getAction(), meta.getContractID()))))); return; } // for view function, execute it @@ -108,18 +111,21 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto } // normal function, check if it is in blocks if (executedTxs.containsKey(requestID)) { - rcb.onResult(JsonUtil.toJson(new ContractResult( - ContractResult.Status.Error, + rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Error, new JsonPrimitive("this request has been packed!")))); return; } + // forward to master + if (!meta.isMaster()) { + cmActions.getManager().executeContractOnOtherNodes(req, rcb); + return; + } // add blocks into request cache LOGGER.debug("receive contract request " + requestID); executedTxs.put(requestID, false); reqQueue.add(req); - rcb.onResult(JsonUtil.toJson(new ContractResult( - ContractResult.Status.Executing, - new JsonPrimitive("this request is adding into blocks")))); + rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Executing, + new JsonPrimitive("this request is added into blocks")))); // if cache is full, submit if (reqQueue.size() >= SUBMIT_LIMIT) { ContractManager.threadPool.execute(this::submitBlock); @@ -131,30 +137,38 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto // stop threads this.future.cancel(true); this.running = false; - LOGGER.info("destruct executor of contract " + meta.getContractID()); + LOGGER.info("[Executor " + meta.getContractID() + "] destructed"); } - public void execute(String blockStr) { + @Override + public void onDeliverBlock(String blockStr) { Block block = JsonUtil.fromJson(blockStr, Block.class); // the block must have not been cached or executed, and must be valid - if (!toExecuted.containsKey(block.prevHash) && - !executedBlocks.contains(block.hash) && - block.isValid()) { + boolean valid = block.isValid(); + if (!toExecuted.containsKey(block.height) && + block.height >= executorPointer && + valid) { // add block into block cache LOGGER.info(String.format( - "[SelfAdaptiveShardingExecutor %s] receive block %s -> %s," + - " %d transactions, timestamp=%d, size=%d", + "[Executor %s] receive block [%d] %s -> %s, %d transactions, timestamp=%d, size=%d", meta.getContractID(), + block.height, block.hash, block.prevHash, block.requests.length, block.timestamp, blockStr.length())); - toExecuted.put(block.prevHash, block); + toExecuted.put(block.height, block); // notify thread to execute blocks - synchronized (flag) { - flag.notify(); + synchronized (executorFlag) { + executorFlag.notify(); } + } else { + LOGGER.warn(String.format("receive block [%d] %s failed! checking: cached %b valid %b", + block.height, + block.hash, + toExecuted.containsKey(block.height), + valid)); } } @@ -169,41 +183,46 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto } } // TODO check status + if (null != block.checkPoint && !block.checkPoint.isEmpty()) { + cmActions.getManager().recoverUnitFromCheckPoint(meta.getContractID(), block.checkPoint); + } // executed requests for (ContractRequest request : block.requests) { String ret = cmActions.getManager().executeLocally(request, null); LOGGER.debug(String.format( - "[SelfAdaptiveShardingExecutor %s] result of request %s: %s", - meta.getContractID(), - request.getRequestID(), - ret)); + "[Executor %s] result of request %s: %s", + meta.getContractID(), request.getRequestID(), ret)); executedTxs.put(request.getRequestID(), true); } LOGGER.info(String.format( - "[SelfAdaptiveShardingExecutor %s] execute %d transactions of block %s", - meta.getContractID(), - block.requests.length, - block.hash)); - // TODO create check point - this.b = new Block(block.hash, this.b.height + 1); - executedBlocks.add(block.hash); + "[Executor %s] execute %d transactions of block [%d] %s", + meta.getContractID(), block.requests.length, block.height, block.hash)); } private void submitBlock() { Block block = fillBlock(); if (null != block) { - LOGGER.info("deliver block " + block.hash + "..."); LOGGER.debug(JsonUtil.toPrettyJson(block)); String[] nodes = this.meta.getMembers(); + LOGGER.debug(JsonUtil.toJson(nodes)); JsonObject req = new JsonObject(); + String blockStr = JsonUtil.toJson(block); req.addProperty("action", "deliverBlock"); - req.addProperty("data", JsonUtil.toJson(block)); + req.addProperty("data", blockStr); req.addProperty("contractID", this.meta.getContractID()); String reqStr = req.toString(); // deliver blocks + LOGGER.info("[Executor " + meta.getContractID() + "] deliver block " + block.hash + "..."); + String myself = cmActions.getManager().nodeCenterConn.getNodeId(); + this.onDeliverBlock(blockStr); for (String node : nodes) { - if (masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(this.meta.getContractID()) - == RecoverFlag.Fine) { + // TODO: find dead lock here +// if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) +// == RecoverFlag.Fine) { +// LOGGER.info("deliver block " + block.hash + " to node " + node); +// NetworkManager.instance.sendToAgent(node, reqStr); +// } + if (!Objects.equals(myself, node)) { networkManager.sendToAgent(node, reqStr); } } @@ -219,8 +238,15 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto for (int i = 0; i < requests.length; ++i) { requests[i] = reqQueue.poll(); } - this.b.fillBlock(requests); - return this.b; + Block block = this.b; + if (executorPointer == block.height - 1) { + // TODO create check point + block.checkPoint = cmActions.getManager().getCheckPointFromUnit(meta.getContractID()); + LOGGER.info("create check point in block " + block.height); + } + block.fillBlock(requests); + this.b = new Block(block.hash, block.height + 1); + return block; } static class Block extends SM2Verifiable { @@ -251,15 +277,19 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto } public boolean isValid() { - return computeHash().equals(hash) && body.equals(merkle(this.requests)) && verifySignature(); + boolean hashValid = computeHash().equals(hash), +// bodyValid = body.equals(merkle(this.requests)), + bodyValid = true, + signValid = verifySignature(); + boolean ret = hashValid & bodyValid & signValid; + if (!ret) { + LOGGER.warn(String.format("hash %b body %b sign %b", hashValid, bodyValid, signValid)); + } + return ret; } private String computeHash() { - return HashUtil.sha3( - String.valueOf(this.height), - this.prevHash, - this.checkPoint, - this.body); + return HashUtil.sha3(String.valueOf(this.height), this.prevHash, this.checkPoint, this.body); } private String merkle(ContractRequest[] requests) { @@ -270,19 +300,19 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto if (requests.length == 1) { return HashUtil.sha3(requests[0].getRequestID()); } - Queue reqQueue = + Queue merkleQueue = Arrays.stream(requests).map(ContractRequest::getRequestID) .collect(Collectors.toCollection(ArrayDeque::new)); do { int size; - for (size = reqQueue.size(); size > 1; size -= 2) { - reqQueue.add(HashUtil.sha3(reqQueue.poll(), reqQueue.poll())); + for (size = merkleQueue.size(); size > 1; size -= 2) { + merkleQueue.add(HashUtil.sha3(merkleQueue.poll(), merkleQueue.poll())); } if (size == 1) { - reqQueue.add(reqQueue.poll()); + merkleQueue.add(merkleQueue.poll()); } - } while (1 != reqQueue.size()); - return reqQueue.poll(); + } while (1 != merkleQueue.size()); + return merkleQueue.poll(); } @Override @@ -300,9 +330,4 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto return this.hash; } } - - @Override - public void onDeliverBlock(String data) { - execute(data); - } -} +} \ No newline at end of file