diff --git a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java index ceb86ac..f4f2785 100644 --- a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java +++ b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java @@ -1,305 +1,306 @@ -//package org.bdware.server.trustedmodel; -// -//import com.google.gson.JsonObject; -//import com.google.gson.JsonPrimitive; -//import org.apache.logging.log4j.LogManager; -//import org.apache.logging.log4j.Logger; -//import org.bdware.sc.ContractClient; -//import org.bdware.sc.ContractManager; -//import org.bdware.sc.ContractResult; -//import org.bdware.sc.bean.ContractRequest; -//import org.bdware.sc.bean.FunctionDesp; -//import org.bdware.sc.bean.SM2Verifiable; -//import org.bdware.sc.conn.OnHashCallback; -//import org.bdware.sc.conn.ResultCallback; -//import org.bdware.sc.units.MultiContractMeta; -//import org.bdware.sc.units.RecoverFlag; -//import org.bdware.sc.util.HashUtil; -//import org.bdware.sc.util.JsonUtil; -//import org.bdware.server.action.CMActions; -//import org.bdware.server.action.p2p.MasterServerRecoverMechAction; -//import org.bdware.units.NetworkManager; -// -//import java.util.*; -//import java.util.concurrent.*; -//import java.util.stream.Collectors; -// -///** -// * @author Kaidong Wu -// */ -//public class SelfAdaptiveShardingExecutor implements ContractExecutor { -// private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class); -// private static final int SUBMIT_LIMIT = 1024; -// private final Queue reqQueue = new ConcurrentLinkedQueue<>(); -// private final MultiContractMeta meta; -// private final Map toExecuted = new ConcurrentHashMap<>(); -// private final Set executedBlocks = ConcurrentHashMap.newKeySet(); -// private final Map executedTxs = new ConcurrentHashMap<>(); -// private final Object flag = new Object(); -// private final ScheduledFuture future; -// private boolean running = true; -// private Block b = new Block(); -// -// public SelfAdaptiveShardingExecutor(String contractID) { -// this.meta = -// CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); -// this.future = ContractManager.scheduledThreadPool.scheduleWithFixedDelay( -// this::submitBlock, -// 2, -// 2, -// TimeUnit.SECONDS); -// LOGGER.debug(String.format("ContractManager.threadPool=%d/%d", -// ((ThreadPoolExecutor) ContractManager.threadPool).getActiveCount(), -// ((ThreadPoolExecutor) ContractManager.threadPool).getPoolSize())); -// ContractManager.threadPool.execute(() -> { -// LOGGER.info( -// "[SelfAdaptiveShardingExecutor " + meta.getContractID() + "] starting service..." + running); -// while (running) { -// LOGGER.info("checking blocks to be executed, latest block=" + -// this.b.prevHash + ", to be executed size=" + toExecuted.size()); -// LOGGER.debug("executed: " + JsonUtil.toJson(executedBlocks) + "\n\t" + JsonUtil.toJson(executedTxs)); -// while (!toExecuted.isEmpty()) { -// String key = this.b.prevHash; -// Block block = toExecuted.get(key); -// if (null != block) { -// executeBlock(block); -// } -// toExecuted.remove(key); -// } -// synchronized (flag) { -// try { -// flag.wait(); -// } catch (InterruptedException e) { -// LOGGER.warn(String.format( -// "[SelfAdaptiveShardingExecutor %s] waiting is interrupted: %s", -// meta.getContractID(), -// e.getMessage())); -// } -// } -// } -// }); -// } -// -// @Override -// public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) { -// // check client -// ContractClient client = CMActions.manager.getClient(meta.getContractID()); -// if (null == client) { -// LOGGER.error("contract " + meta.getContractID() + " not found!"); -// rcb.onResult(JsonUtil.toJson(new ContractResult( -// ContractResult.Status.Error, -// new JsonPrimitive("contract " + meta.getContractID() + " not found!")))); -// return; -// } -// // check function -// FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction()); -// if (null == funDesp) { -// LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!"); -// rcb.onResult(JsonUtil.toJson(new ContractResult( -// ContractResult.Status.Error, -// new JsonPrimitive( -// String.format("action %s of contract %s not found!", -// req.getAction(), -// meta.getContractID()))))); -// return; -// } -// // for view function, execute it -// if (funDesp.isView) { -// CMActions.manager.executeLocallyAsync(req, rcb, hcb); -// return; -// } -// // normal function, check if it is in blocks -// if (executedTxs.containsKey(requestID)) { -// rcb.onResult(JsonUtil.toJson(new ContractResult( -// ContractResult.Status.Error, -// new JsonPrimitive("this request has been packed!")))); -// return; -// } -// // add blocks into request cache -// LOGGER.debug("receive contract request " + requestID); -// executedTxs.put(requestID, false); -// reqQueue.add(req); -// rcb.onResult(JsonUtil.toJson(new ContractResult( -// ContractResult.Status.Executing, -// new JsonPrimitive("this request is adding into blocks")))); -// // if cache is full, submit -// if (reqQueue.size() >= SUBMIT_LIMIT) { -// ContractManager.threadPool.execute(this::submitBlock); -// } -// } -// -// @Override -// public void close() { -// // stop threads -// this.future.cancel(true); -// this.running = false; -// LOGGER.info("destruct executor of contract " + meta.getContractID()); -// } -// -// public void execute(String blockStr) { -// Block block = JsonUtil.fromJson(blockStr, Block.class); -// // the block must have not been cached or executed, and must be valid -// if (!toExecuted.containsKey(block.prevHash) && -// !executedBlocks.contains(block.hash) && -// block.isValid()) { -// // add block into block cache -// LOGGER.info(String.format( -// "[SelfAdaptiveShardingExecutor %s] receive block %s -> %s," + -// " %d transactions, timestamp=%d, size=%d", -// meta.getContractID(), -// block.hash, -// block.prevHash, -// block.requests.length, -// block.timestamp, -// blockStr.length())); -// toExecuted.put(block.prevHash, block); -// // notify thread to execute blocks -// synchronized (flag) { -// flag.notify(); -// } -// } -// } -// -// private synchronized void executeBlock(Block block) { -// // used for the thread to execute blocks -// LOGGER.debug("start"); -// // check contract requests, requests must have not been executed -// for (ContractRequest request : block.requests) { -// if (executedTxs.containsKey(request.getRequestID()) && executedTxs.get(request.getRequestID())) { -// LOGGER.debug("find request " + request.getRequestID() + " has been executed!"); -// return; -// } -// } -// // TODO check status -// // executed requests -// for (ContractRequest request : block.requests) { -// String ret = CMActions.manager.executeLocally(request, null); -// LOGGER.debug(String.format( -// "[SelfAdaptiveShardingExecutor %s] result of request %s: %s", -// meta.getContractID(), -// request.getRequestID(), -// ret)); -// executedTxs.put(request.getRequestID(), true); -// } -// LOGGER.info(String.format( -// "[SelfAdaptiveShardingExecutor %s] execute %d transactions of block %s", -// meta.getContractID(), -// block.requests.length, -// block.hash)); -// // TODO create check point -// this.b = new Block(block.hash, this.b.height + 1); -// executedBlocks.add(block.hash); -// } -// -// private void submitBlock() { -// Block block = fillBlock(); -// if (null != block) { -// LOGGER.info("deliver block " + block.hash + "..."); -// LOGGER.debug(JsonUtil.toPrettyJson(block)); -// String[] nodes = this.meta.getMembers(); -// JsonObject req = new JsonObject(); -// req.addProperty("action", "deliverBlock"); -// req.addProperty("data", JsonUtil.toJson(block)); -// req.addProperty("contractID", this.meta.getContractID()); -// String reqStr = req.toString(); -// // deliver blocks -// for (String node : nodes) { -// if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) -// == RecoverFlag.Fine) { -// NetworkManager.instance.sendToAgent(node, reqStr); -// } -// } -// } -// } -// -// private synchronized Block fillBlock() { -// // pack contract requests into a block -// ContractRequest[] requests = new ContractRequest[Math.min(reqQueue.size(), SUBMIT_LIMIT)]; -// if (requests.length == 0) { -// return null; -// } -// for (int i = 0; i < requests.length; ++i) { -// requests[i] = reqQueue.poll(); -// } -// this.b.fillBlock(requests); -// return this.b; -// } -// -// static class Block extends SM2Verifiable { -// String prevHash = "0"; -// String hash; -// int height; -// String checkPoint; -// String body; -// String nodePubKey; -// ContractRequest[] requests; -// long timestamp; -// -// public Block() { -// this.height = 0; -// } -// -// public Block(String prev, int height) { -// this.prevHash = prev; -// this.height = height; -// } -// -// public void fillBlock(ContractRequest[] requests) { -// this.requests = requests; -// this.timestamp = System.currentTimeMillis(); -// this.body = merkle(requests); -// this.hash = computeHash(); -// doSignature(CMActions.manager.nodeCenterConn.getNodeKeyPair()); -// } -// -// public boolean isValid() { -// return computeHash().equals(hash) && body.equals(merkle(this.requests)) && verifySignature(); -// } -// -// private String computeHash() { -// return HashUtil.sha3( -// String.valueOf(this.height), -// this.prevHash, -// this.checkPoint, -// this.body); -// } -// -// private String merkle(ContractRequest[] requests) { -// // manage requests as a merkle tree -// if (requests.length == 0) { -// return null; -// } -// if (requests.length == 1) { -// return HashUtil.sha3(requests[0].getRequestID()); -// } -// Queue reqQueue = -// Arrays.stream(requests).map(ContractRequest::getRequestID) -// .collect(Collectors.toCollection(ArrayDeque::new)); -// do { -// int size; -// for (size = reqQueue.size(); size > 1; size -= 2) { -// reqQueue.add(HashUtil.sha3(reqQueue.poll(), reqQueue.poll())); -// } -// if (size == 1) { -// reqQueue.add(reqQueue.poll()); -// } -// } while (1 != reqQueue.size()); -// return reqQueue.poll(); -// } -// -// @Override -// public String getPublicKey() { -// return nodePubKey; -// } -// -// @Override -// public void setPublicKey(String pubkey) { -// this.nodePubKey = pubkey; -// } -// -// @Override -// public String getContentStr() { -// return this.hash; -// } -// } -//} +package org.bdware.server.trustedmodel; + +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.bdware.sc.ContractClient; +import org.bdware.sc.ContractManager; +import org.bdware.sc.ContractResult; +import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.bean.FunctionDesp; +import org.bdware.sc.bean.SM2Verifiable; +import org.bdware.sc.conn.OnHashCallback; +import org.bdware.sc.conn.ResultCallback; +import org.bdware.sc.units.MultiContractMeta; +import org.bdware.sc.units.RecoverFlag; +import org.bdware.sc.util.HashUtil; +import org.bdware.sc.util.JsonUtil; +import org.bdware.server.action.CMActions; +import org.bdware.server.action.p2p.MasterServerRecoverMechAction; +import org.bdware.units.NetworkManager; + +import java.util.*; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +/** + * @author Kaidong Wu + */ +public class SelfAdaptiveShardingExecutor implements ContractExecutor { + private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class); + private static final int SUBMIT_LIMIT = 1024; + private static final int DELAY = 1; + private final Queue reqQueue = new ConcurrentLinkedQueue<>(); + private final MultiContractMeta meta; + private final Map toExecuted = new ConcurrentHashMap<>(); + private final Set executedBlocks = ConcurrentHashMap.newKeySet(); + private final Map executedTxs = new ConcurrentHashMap<>(); + private final Object flag = new Object(); + private final ScheduledFuture future; + private boolean running = true; + private Block b = new Block(); + + public SelfAdaptiveShardingExecutor(String contractID) { + this.meta = + CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); + this.future = ContractManager.scheduledThreadPool.scheduleWithFixedDelay( + this::submitBlock, + DELAY, + DELAY, + TimeUnit.SECONDS); + LOGGER.debug(String.format("ContractManager.threadPool=%d/%d", + ((ThreadPoolExecutor) ContractManager.threadPool).getActiveCount(), + ((ThreadPoolExecutor) ContractManager.threadPool).getPoolSize())); + ContractManager.threadPool.execute(() -> { + LOGGER.info( + "[SelfAdaptiveShardingExecutor " + meta.getContractID() + "] starting service..." + running); + while (running) { + LOGGER.info("checking blocks to be executed, latest block=" + + this.b.prevHash + ", to be executed size=" + toExecuted.size()); + LOGGER.debug("executed: " + JsonUtil.toJson(executedBlocks) + "\n\t" + JsonUtil.toJson(executedTxs)); + while (!toExecuted.isEmpty()) { + String key = this.b.prevHash; + Block block = toExecuted.get(key); + if (null != block) { + executeBlock(block); + } + toExecuted.remove(key); + } + synchronized (flag) { + try { + flag.wait(); + } catch (InterruptedException e) { + LOGGER.warn(String.format( + "[SelfAdaptiveShardingExecutor %s] waiting is interrupted: %s", + meta.getContractID(), + e.getMessage())); + } + } + } + }); + } + + @Override + public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) { + // check client + ContractClient client = CMActions.manager.getClient(meta.getContractID()); + if (null == client) { + LOGGER.error("contract " + meta.getContractID() + " not found!"); + rcb.onResult(JsonUtil.toJson(new ContractResult( + ContractResult.Status.Error, + new JsonPrimitive("contract " + meta.getContractID() + " not found!")))); + return; + } + // check function + FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction()); + if (null == funDesp) { + LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!"); + rcb.onResult(JsonUtil.toJson(new ContractResult( + ContractResult.Status.Error, + new JsonPrimitive( + String.format("action %s of contract %s not found!", + req.getAction(), + meta.getContractID()))))); + return; + } + // for view function, execute it + if (funDesp.isView) { + CMActions.manager.executeLocallyAsync(req, rcb, hcb); + return; + } + // normal function, check if it is in blocks + if (executedTxs.containsKey(requestID)) { + rcb.onResult(JsonUtil.toJson(new ContractResult( + ContractResult.Status.Error, + new JsonPrimitive("this request has been packed!")))); + return; + } + // add blocks into request cache + LOGGER.debug("receive contract request " + requestID); + executedTxs.put(requestID, false); + reqQueue.add(req); + rcb.onResult(JsonUtil.toJson(new ContractResult( + ContractResult.Status.Executing, + new JsonPrimitive("this request is adding into blocks")))); + // if cache is full, submit + if (reqQueue.size() >= SUBMIT_LIMIT) { + ContractManager.threadPool.execute(this::submitBlock); + } + } + + @Override + public void close() { + // stop threads + this.future.cancel(true); + this.running = false; + LOGGER.info("destruct executor of contract " + meta.getContractID()); + } + + public void execute(String blockStr) { + Block block = JsonUtil.fromJson(blockStr, Block.class); + // the block must have not been cached or executed, and must be valid + if (!toExecuted.containsKey(block.prevHash) && + !executedBlocks.contains(block.hash) && + block.isValid()) { + // add block into block cache + LOGGER.info(String.format( + "[SelfAdaptiveShardingExecutor %s] receive block %s -> %s," + + " %d transactions, timestamp=%d, size=%d", + meta.getContractID(), + block.hash, + block.prevHash, + block.requests.length, + block.timestamp, + blockStr.length())); + toExecuted.put(block.prevHash, block); + // notify thread to execute blocks + synchronized (flag) { + flag.notify(); + } + } + } + + private synchronized void executeBlock(Block block) { + // used for the thread to execute blocks + LOGGER.debug("start"); + // check contract requests, requests must have not been executed + for (ContractRequest request : block.requests) { + if (executedTxs.containsKey(request.getRequestID()) && executedTxs.get(request.getRequestID())) { + LOGGER.debug("find request " + request.getRequestID() + " has been executed!"); + return; + } + } + // TODO check status + // executed requests + for (ContractRequest request : block.requests) { + String ret = CMActions.manager.executeLocally(request, null); + LOGGER.debug(String.format( + "[SelfAdaptiveShardingExecutor %s] result of request %s: %s", + meta.getContractID(), + request.getRequestID(), + ret)); + executedTxs.put(request.getRequestID(), true); + } + LOGGER.info(String.format( + "[SelfAdaptiveShardingExecutor %s] execute %d transactions of block %s", + meta.getContractID(), + block.requests.length, + block.hash)); + // TODO create check point + this.b = new Block(block.hash, this.b.height + 1); + executedBlocks.add(block.hash); + } + + private void submitBlock() { + Block block = fillBlock(); + if (null != block) { + LOGGER.info("deliver block " + block.hash + "..."); + LOGGER.debug(JsonUtil.toPrettyJson(block)); + String[] nodes = this.meta.getMembers(); + JsonObject req = new JsonObject(); + req.addProperty("action", "deliverBlock"); + req.addProperty("data", JsonUtil.toJson(block)); + req.addProperty("contractID", this.meta.getContractID()); + String reqStr = req.toString(); + // deliver blocks + for (String node : nodes) { + if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) + == RecoverFlag.Fine) { + NetworkManager.instance.sendToAgent(node, reqStr); + } + } + } + } + + private synchronized Block fillBlock() { + // pack contract requests into a block + ContractRequest[] requests = new ContractRequest[Math.min(reqQueue.size(), SUBMIT_LIMIT)]; + if (requests.length == 0) { + return null; + } + for (int i = 0; i < requests.length; ++i) { + requests[i] = reqQueue.poll(); + } + this.b.fillBlock(requests); + return this.b; + } + + static class Block extends SM2Verifiable { + String prevHash = "0"; + String hash; + int height; + String checkPoint; + String body; + String nodePubKey; + ContractRequest[] requests; + long timestamp; + + public Block() { + this.height = 0; + } + + public Block(String prev, int height) { + this.prevHash = prev; + this.height = height; + } + + public void fillBlock(ContractRequest[] requests) { + this.requests = requests; + this.timestamp = System.currentTimeMillis(); + this.body = merkle(requests); + this.hash = computeHash(); + doSignature(CMActions.manager.nodeCenterConn.getNodeKeyPair()); + } + + public boolean isValid() { + return computeHash().equals(hash) && body.equals(merkle(this.requests)) && verifySignature(); + } + + private String computeHash() { + return HashUtil.sha3( + String.valueOf(this.height), + this.prevHash, + this.checkPoint, + this.body); + } + + private String merkle(ContractRequest[] requests) { + // manage requests as a merkle tree + if (requests.length == 0) { + return null; + } + if (requests.length == 1) { + return HashUtil.sha3(requests[0].getRequestID()); + } + Queue reqQueue = + Arrays.stream(requests).map(ContractRequest::getRequestID) + .collect(Collectors.toCollection(ArrayDeque::new)); + do { + int size; + for (size = reqQueue.size(); size > 1; size -= 2) { + reqQueue.add(HashUtil.sha3(reqQueue.poll(), reqQueue.poll())); + } + if (size == 1) { + reqQueue.add(reqQueue.poll()); + } + } while (1 != reqQueue.size()); + return reqQueue.poll(); + } + + @Override + public String getPublicKey() { + return nodePubKey; + } + + @Override + public void setPublicKey(String pubkey) { + this.nodePubKey = pubkey; + } + + @Override + public String getContentStr() { + return this.hash; + } + } +}