diff --git a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java deleted file mode 100644 index d4e1eba..0000000 --- a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java +++ /dev/null @@ -1,334 +0,0 @@ -package org.bdware.server.trustedmodel; - -import com.google.gson.JsonArray; -import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.bdware.sc.ContractClient; -import org.bdware.sc.ContractManager; -import org.bdware.sc.ContractResult; -import org.bdware.sc.bean.ContractRequest; -import org.bdware.sc.bean.FunctionDesp; -import org.bdware.sc.bean.SM2Verifiable; -import org.bdware.sc.conn.OnHashCallback; -import org.bdware.sc.conn.ResultCallback; -import org.bdware.sc.units.MultiContractMeta; -import org.bdware.sc.util.HashUtil; -import org.bdware.sc.util.JsonUtil; -import org.bdware.server.action.CMActions; -import org.bdware.units.NetworkManager; - -import java.util.*; -import java.util.concurrent.*; -import java.util.stream.Collectors; - -/** - * @author Kaidong Wu - */ -@Deprecated -public class SelfAdaptiveShardingExecutor implements ContractExecutor { - private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class); - private static final int SUBMIT_LIMIT = 5120; - private static final int DELAY = 2; - private final Queue reqQueue = new ConcurrentLinkedQueue<>(); - private final MultiContractMeta meta; - private final Map toExecuted = new ConcurrentHashMap<>(); - private final Map executedTxs = new ConcurrentHashMap<>(); - private final Object executorFlag = new Object(); - private final ScheduledFuture future; - private boolean running = true; - // the pointer to the latest executed block - private int executorPointer = 0; - // the block to be submitted - private Block b = new Block(); - - public SelfAdaptiveShardingExecutor(String contractID) { - this.meta = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); - this.future = Executors.newScheduledThreadPool(1).scheduleWithFixedDelay( - this::submitBlock, - DELAY, - DELAY, - TimeUnit.SECONDS); - Executors.newCachedThreadPool().execute(() -> { - LOGGER.info(String.format( - "[Executor %s] starting executing service... %b", - meta.getContractID(), meta.isMaster())); - while (running) { - LOGGER.info(String.format( - "[Executor %s] checking blocks to be executed, latest height=%s, to be executed size=%d", - meta.getContractID(), executorPointer, toExecuted.size())); - for (Block block = toExecuted.get(executorPointer); - null != block; - executorPointer += 1, block = toExecuted.get(executorPointer)) { - executeBlock(block); - toExecuted.remove(executorPointer); - } - try { - synchronized (executorFlag) { - executorFlag.wait(); - } - } catch (InterruptedException e) { - LOGGER.warn(String.format( - "[Executor %s] waiting is interrupted: %s", - meta.getContractID(), e.getMessage())); - } - } - }); - } - - public JsonArray checkCache() { - JsonArray ret = new JsonArray(); - ret.add(executorPointer); - for (Map.Entry entry : toExecuted.entrySet()) { - ret.add(String.format("%d %s", entry.getValue().height, entry.getValue().hash)); - } - return ret; - } - - @Override - public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) { - // check client - ContractClient client = CMActions.manager.getClient(meta.getContractID()); - if (null == client) { - LOGGER.error("contract " + meta.getContractID() + " not found!"); - rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Error, - new JsonPrimitive("contract " + meta.getContractID() + " not found!")))); - return; - } - // check function - FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction()); - if (null == funDesp) { - LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!"); - rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Error, - new JsonPrimitive(String.format( - "action %s of contract %s not found!", - req.getAction(), meta.getContractID()))))); - return; - } - // for view function, execute it - if (funDesp.isView) { - CMActions.manager.executeLocallyAsync(req, rcb, hcb); - return; - } - // normal function, check if it is in blocks - if (executedTxs.containsKey(requestID)) { - rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Error, - new JsonPrimitive("this request has been packed!")))); - return; - } - // forward to master - if (!meta.isMaster()) { - CMActions.manager.executeContractOnOtherNodes(req, rcb); - return; - } - // add blocks into request cache - LOGGER.debug("receive contract request " + requestID); - executedTxs.put(requestID, false); - reqQueue.add(req); - rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Executing, - new JsonPrimitive("this request is added into blocks")))); - // if cache is full, submit - if (reqQueue.size() >= SUBMIT_LIMIT) { - ContractManager.threadPool.execute(this::submitBlock); - } - } - - @Override - public void close() { - // stop threads - this.future.cancel(true); - this.running = false; - LOGGER.info("[Executor " + meta.getContractID() + "] destructed"); - } - - public void receiveBlock(String blockStr) { - Block block = JsonUtil.fromJson(blockStr, Block.class); - // the block must have not been cached or executed, and must be valid - boolean valid = block.isValid(); - if (!toExecuted.containsKey(block.height) && - block.height >= executorPointer && - valid) { - // add block into block cache - LOGGER.info(String.format( - "[Executor %s] receive block [%d] %s -> %s, %d transactions, timestamp=%d, size=%d", - meta.getContractID(), - block.height, - block.hash, - block.prevHash, - block.requests.length, - block.timestamp, - blockStr.length())); - toExecuted.put(block.height, block); - // notify thread to execute blocks - synchronized (executorFlag) { - executorFlag.notify(); - } - } else { - LOGGER.warn(String.format("receive block [%d] %s failed! checking: cached %b valid %b", - block.height, - block.hash, - toExecuted.containsKey(block.height), - valid)); - } - } - - private synchronized void executeBlock(Block block) { - // used for the thread to execute blocks - LOGGER.debug("start"); - // check contract requests, requests must have not been executed - for (ContractRequest request : block.requests) { - if (executedTxs.containsKey(request.getRequestID()) && executedTxs.get(request.getRequestID())) { - LOGGER.debug("find request " + request.getRequestID() + " has been executed!"); - return; - } - } - // TODO check status - if (null != block.checkPoint && !block.checkPoint.isEmpty()) { - CMActions.manager.recoverUnitFromCheckPoint(meta.getContractID(), block.checkPoint); - } - // executed requests - for (ContractRequest request : block.requests) { - String ret = CMActions.manager.executeLocally(request, null); - LOGGER.debug(String.format( - "[Executor %s] result of request %s: %s", - meta.getContractID(), request.getRequestID(), ret)); - executedTxs.put(request.getRequestID(), true); - } - LOGGER.info(String.format( - "[Executor %s] execute %d transactions of block [%d] %s", - meta.getContractID(), block.requests.length, block.height, block.hash)); - } - - private void submitBlock() { - Block block = fillBlock(); - if (null != block) { - LOGGER.debug(JsonUtil.toPrettyJson(block)); - String[] nodes = this.meta.getMembers(); - LOGGER.debug(JsonUtil.toJson(nodes)); - JsonObject req = new JsonObject(); - String blockStr = JsonUtil.toJson(block); - req.addProperty("action", "deliverBlock"); - req.addProperty("data", blockStr); - req.addProperty("contractID", this.meta.getContractID()); - String reqStr = req.toString(); - // deliver blocks - LOGGER.info("[Executor " + meta.getContractID() + "] deliver block " + block.hash + "..."); - String myself = CMActions.manager.nodeCenterConn.getNodeId(); - this.receiveBlock(blockStr); - for (String node : nodes) { - // TODO: find dead lock here -// if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) -// == RecoverFlag.Fine) { -// LOGGER.info("deliver block " + block.hash + " to node " + node); -// NetworkManager.instance.sendToAgent(node, reqStr); -// } - if (!Objects.equals(myself, node)) { - NetworkManager.instance.sendToAgent(node, reqStr); - } - } - } - } - - private synchronized Block fillBlock() { - // pack contract requests into a block - ContractRequest[] requests = new ContractRequest[Math.min(reqQueue.size(), SUBMIT_LIMIT)]; - if (requests.length == 0) { - return null; - } - for (int i = 0; i < requests.length; ++i) { - requests[i] = reqQueue.poll(); - } - Block block = this.b; - if (executorPointer == block.height - 1) { - // TODO create check point - block.checkPoint = CMActions.manager.getCheckPointFromUnit(meta.getContractID()); - LOGGER.info("create check point in block " + block.height); - } - block.fillBlock(requests); - this.b = new Block(block.hash, block.height + 1); - return block; - } - - static class Block extends SM2Verifiable { - String prevHash = "0"; - String hash; - int height; - String checkPoint; - String body; - String nodePubKey; - ContractRequest[] requests; - long timestamp; - - public Block() { - this.height = 0; - } - - public Block(String prev, int height) { - this.prevHash = prev; - this.height = height; - } - - public void fillBlock(ContractRequest[] requests) { - this.requests = requests; - this.timestamp = System.currentTimeMillis(); - this.body = merkle(requests); - this.hash = computeHash(); - doSignature(CMActions.manager.nodeCenterConn.getNodeKeyPair()); - } - - public boolean isValid() { - boolean hashValid = computeHash().equals(hash), -// bodyValid = body.equals(merkle(this.requests)), - bodyValid = true, - signValid = verifySignature(); - boolean ret = hashValid & bodyValid & signValid; - if (!ret) { - LOGGER.warn(String.format("hash %b body %b sign %b", hashValid, bodyValid, signValid)); - } - return ret; - } - - private String computeHash() { - return HashUtil.sha3(String.valueOf(this.height), this.prevHash, this.checkPoint, this.body); - } - - private String merkle(ContractRequest[] requests) { - // manage requests as a merkle tree - if (requests.length == 0) { - return null; - } - if (requests.length == 1) { - return HashUtil.sha3(requests[0].getRequestID()); - } - Queue merkleQueue = - Arrays.stream(requests).map(ContractRequest::getRequestID) - .collect(Collectors.toCollection(ArrayDeque::new)); - do { - int size; - for (size = merkleQueue.size(); size > 1; size -= 2) { - merkleQueue.add(HashUtil.sha3(merkleQueue.poll(), merkleQueue.poll())); - } - if (size == 1) { - merkleQueue.add(merkleQueue.poll()); - } - } while (1 != merkleQueue.size()); - return merkleQueue.poll(); - } - - @Override - public String getPublicKey() { - return nodePubKey; - } - - @Override - public void setPublicKey(String pubkey) { - this.nodePubKey = pubkey; - } - - @Override - public String getContentStr() { - return this.hash; - } - } -}