diff --git a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java index 226ad60..d4e1eba 100644 --- a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java +++ b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java @@ -26,20 +26,20 @@ import java.util.stream.Collectors; /** * @author Kaidong Wu */ +@Deprecated public class SelfAdaptiveShardingExecutor implements ContractExecutor { private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class); private static final int SUBMIT_LIMIT = 5120; - private static final int DELAY = 1; + private static final int DELAY = 2; private final Queue reqQueue = new ConcurrentLinkedQueue<>(); private final MultiContractMeta meta; - private final Map toExecuted = new ConcurrentHashMap<>(); - private final Set executedBlocks = ConcurrentHashMap.newKeySet(); + private final Map toExecuted = new ConcurrentHashMap<>(); private final Map executedTxs = new ConcurrentHashMap<>(); private final Object executorFlag = new Object(); private final ScheduledFuture future; private boolean running = true; // the pointer to the latest executed block - private String executorPointer = "0"; + private int executorPointer = 0; // the block to be submitted private Block b = new Block(); @@ -56,18 +56,13 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { meta.getContractID(), meta.isMaster())); while (running) { LOGGER.info(String.format( - "[Executor %s] checking blocks to be executed, latest block=%s, to be executed size=%d", + "[Executor %s] checking blocks to be executed, latest height=%s, to be executed size=%d", meta.getContractID(), executorPointer, toExecuted.size())); - LOGGER.debug("executed: " + JsonUtil.toJson(executedBlocks) + "\n\t" + JsonUtil.toJson(executedTxs)); - while (!toExecuted.isEmpty()) { - Block block = toExecuted.get(executorPointer); - // check if the execution ends - if (null == block) { - break; - } + for (Block block = toExecuted.get(executorPointer); + null != block; + executorPointer += 1, block = toExecuted.get(executorPointer)) { executeBlock(block); toExecuted.remove(executorPointer); - executorPointer = block.hash; } try { synchronized (executorFlag) { @@ -85,8 +80,8 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { public JsonArray checkCache() { JsonArray ret = new JsonArray(); ret.add(executorPointer); - for (Map.Entry entry : toExecuted.entrySet()) { - ret.add(String.format("%s,%s,%d", entry.getKey(), entry.getValue().hash, entry.getValue().height)); + for (Map.Entry entry : toExecuted.entrySet()) { + ret.add(String.format("%d %s", entry.getValue().height, entry.getValue().hash)); } return ret; } @@ -144,15 +139,15 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { // stop threads this.future.cancel(true); this.running = false; - LOGGER.info("destruct executor of contract " + meta.getContractID()); + LOGGER.info("[Executor " + meta.getContractID() + "] destructed"); } public void receiveBlock(String blockStr) { Block block = JsonUtil.fromJson(blockStr, Block.class); // the block must have not been cached or executed, and must be valid boolean valid = block.isValid(); - if (!toExecuted.containsKey(block.prevHash) && - !executedBlocks.contains(block.hash) && + if (!toExecuted.containsKey(block.height) && + block.height >= executorPointer && valid) { // add block into block cache LOGGER.info(String.format( @@ -164,17 +159,16 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { block.requests.length, block.timestamp, blockStr.length())); - toExecuted.put(block.prevHash, block); + toExecuted.put(block.height, block); // notify thread to execute blocks synchronized (executorFlag) { executorFlag.notify(); } } else { - LOGGER.warn(String.format("receive block [%d] %s failed! checking: cached %b executed %b valid %b", + LOGGER.warn(String.format("receive block [%d] %s failed! checking: cached %b valid %b", block.height, block.hash, - toExecuted.containsKey(block.prevHash), - executedBlocks.contains(block.hash), + toExecuted.containsKey(block.height), valid)); } } @@ -190,6 +184,9 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { } } // TODO check status + if (null != block.checkPoint && !block.checkPoint.isEmpty()) { + CMActions.manager.recoverUnitFromCheckPoint(meta.getContractID(), block.checkPoint); + } // executed requests for (ContractRequest request : block.requests) { String ret = CMActions.manager.executeLocally(request, null); @@ -201,9 +198,6 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { LOGGER.info(String.format( "[Executor %s] execute %d transactions of block [%d] %s", meta.getContractID(), block.requests.length, block.height, block.hash)); - // TODO create check point -// this.b = new Block(block.hash, this.b.height + 1); - executedBlocks.add(block.hash); } private void submitBlock() { @@ -219,8 +213,9 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { req.addProperty("contractID", this.meta.getContractID()); String reqStr = req.toString(); // deliver blocks - LOGGER.info("deliver block " + block.hash + "..."); + LOGGER.info("[Executor " + meta.getContractID() + "] deliver block " + block.hash + "..."); String myself = CMActions.manager.nodeCenterConn.getNodeId(); + this.receiveBlock(blockStr); for (String node : nodes) { // TODO: find dead lock here // if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) @@ -231,11 +226,8 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { if (!Objects.equals(myself, node)) { NetworkManager.instance.sendToAgent(node, reqStr); } -// LOGGER.info("delivering done: " + node); } - this.receiveBlock(blockStr); } -// LOGGER.info("end " + (null != block)); } private synchronized Block fillBlock() { @@ -248,6 +240,11 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { requests[i] = reqQueue.poll(); } Block block = this.b; + if (executorPointer == block.height - 1) { + // TODO create check point + block.checkPoint = CMActions.manager.getCheckPointFromUnit(meta.getContractID()); + LOGGER.info("create check point in block " + block.height); + } block.fillBlock(requests); this.b = new Block(block.hash, block.height + 1); return block;