feat: update SelfAdaptiveSharding

set delay of period task to 1s
This commit is contained in:
Frank.R.Wu 2022-01-25 20:24:39 +08:00
parent ea71e8a358
commit 78366fcb55

View File

@ -1,305 +1,306 @@
//package org.bdware.server.trustedmodel;
//
//import com.google.gson.JsonObject;
//import com.google.gson.JsonPrimitive;
//import org.apache.logging.log4j.LogManager;
//import org.apache.logging.log4j.Logger;
//import org.bdware.sc.ContractClient;
//import org.bdware.sc.ContractManager;
//import org.bdware.sc.ContractResult;
//import org.bdware.sc.bean.ContractRequest;
//import org.bdware.sc.bean.FunctionDesp;
//import org.bdware.sc.bean.SM2Verifiable;
//import org.bdware.sc.conn.OnHashCallback;
//import org.bdware.sc.conn.ResultCallback;
//import org.bdware.sc.units.MultiContractMeta;
//import org.bdware.sc.units.RecoverFlag;
//import org.bdware.sc.util.HashUtil;
//import org.bdware.sc.util.JsonUtil;
//import org.bdware.server.action.CMActions;
//import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
//import org.bdware.units.NetworkManager;
//
//import java.util.*;
//import java.util.concurrent.*;
//import java.util.stream.Collectors;
//
///**
// * @author Kaidong Wu
// */
//public class SelfAdaptiveShardingExecutor implements ContractExecutor {
// private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class);
// private static final int SUBMIT_LIMIT = 1024;
// private final Queue<ContractRequest> reqQueue = new ConcurrentLinkedQueue<>();
// private final MultiContractMeta meta;
// private final Map<String, Block> toExecuted = new ConcurrentHashMap<>();
// private final Set<String> executedBlocks = ConcurrentHashMap.newKeySet();
// private final Map<String, Boolean> executedTxs = new ConcurrentHashMap<>();
// private final Object flag = new Object();
// private final ScheduledFuture<?> future;
// private boolean running = true;
// private Block b = new Block();
//
// public SelfAdaptiveShardingExecutor(String contractID) {
// this.meta =
// CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
// this.future = ContractManager.scheduledThreadPool.scheduleWithFixedDelay(
// this::submitBlock,
// 2,
// 2,
// TimeUnit.SECONDS);
// LOGGER.debug(String.format("ContractManager.threadPool=%d/%d",
// ((ThreadPoolExecutor) ContractManager.threadPool).getActiveCount(),
// ((ThreadPoolExecutor) ContractManager.threadPool).getPoolSize()));
// ContractManager.threadPool.execute(() -> {
// LOGGER.info(
// "[SelfAdaptiveShardingExecutor " + meta.getContractID() + "] starting service..." + running);
// while (running) {
// LOGGER.info("checking blocks to be executed, latest block=" +
// this.b.prevHash + ", to be executed size=" + toExecuted.size());
// LOGGER.debug("executed: " + JsonUtil.toJson(executedBlocks) + "\n\t" + JsonUtil.toJson(executedTxs));
// while (!toExecuted.isEmpty()) {
// String key = this.b.prevHash;
// Block block = toExecuted.get(key);
// if (null != block) {
// executeBlock(block);
// }
// toExecuted.remove(key);
// }
// synchronized (flag) {
// try {
// flag.wait();
// } catch (InterruptedException e) {
// LOGGER.warn(String.format(
// "[SelfAdaptiveShardingExecutor %s] waiting is interrupted: %s",
// meta.getContractID(),
// e.getMessage()));
// }
// }
// }
// });
// }
//
// @Override
// public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) {
// // check client
// ContractClient client = CMActions.manager.getClient(meta.getContractID());
// if (null == client) {
// LOGGER.error("contract " + meta.getContractID() + " not found!");
// rcb.onResult(JsonUtil.toJson(new ContractResult(
// ContractResult.Status.Error,
// new JsonPrimitive("contract " + meta.getContractID() + " not found!"))));
// return;
// }
// // check function
// FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction());
// if (null == funDesp) {
// LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!");
// rcb.onResult(JsonUtil.toJson(new ContractResult(
// ContractResult.Status.Error,
// new JsonPrimitive(
// String.format("action %s of contract %s not found!",
// req.getAction(),
// meta.getContractID())))));
// return;
// }
// // for view function, execute it
// if (funDesp.isView) {
// CMActions.manager.executeLocallyAsync(req, rcb, hcb);
// return;
// }
// // normal function, check if it is in blocks
// if (executedTxs.containsKey(requestID)) {
// rcb.onResult(JsonUtil.toJson(new ContractResult(
// ContractResult.Status.Error,
// new JsonPrimitive("this request has been packed!"))));
// return;
// }
// // add blocks into request cache
// LOGGER.debug("receive contract request " + requestID);
// executedTxs.put(requestID, false);
// reqQueue.add(req);
// rcb.onResult(JsonUtil.toJson(new ContractResult(
// ContractResult.Status.Executing,
// new JsonPrimitive("this request is adding into blocks"))));
// // if cache is full, submit
// if (reqQueue.size() >= SUBMIT_LIMIT) {
// ContractManager.threadPool.execute(this::submitBlock);
// }
// }
//
// @Override
// public void close() {
// // stop threads
// this.future.cancel(true);
// this.running = false;
// LOGGER.info("destruct executor of contract " + meta.getContractID());
// }
//
// public void execute(String blockStr) {
// Block block = JsonUtil.fromJson(blockStr, Block.class);
// // the block must have not been cached or executed, and must be valid
// if (!toExecuted.containsKey(block.prevHash) &&
// !executedBlocks.contains(block.hash) &&
// block.isValid()) {
// // add block into block cache
// LOGGER.info(String.format(
// "[SelfAdaptiveShardingExecutor %s] receive block %s -> %s," +
// " %d transactions, timestamp=%d, size=%d",
// meta.getContractID(),
// block.hash,
// block.prevHash,
// block.requests.length,
// block.timestamp,
// blockStr.length()));
// toExecuted.put(block.prevHash, block);
// // notify thread to execute blocks
// synchronized (flag) {
// flag.notify();
// }
// }
// }
//
// private synchronized void executeBlock(Block block) {
// // used for the thread to execute blocks
// LOGGER.debug("start");
// // check contract requests, requests must have not been executed
// for (ContractRequest request : block.requests) {
// if (executedTxs.containsKey(request.getRequestID()) && executedTxs.get(request.getRequestID())) {
// LOGGER.debug("find request " + request.getRequestID() + " has been executed!");
// return;
// }
// }
// // TODO check status
// // executed requests
// for (ContractRequest request : block.requests) {
// String ret = CMActions.manager.executeLocally(request, null);
// LOGGER.debug(String.format(
// "[SelfAdaptiveShardingExecutor %s] result of request %s: %s",
// meta.getContractID(),
// request.getRequestID(),
// ret));
// executedTxs.put(request.getRequestID(), true);
// }
// LOGGER.info(String.format(
// "[SelfAdaptiveShardingExecutor %s] execute %d transactions of block %s",
// meta.getContractID(),
// block.requests.length,
// block.hash));
// // TODO create check point
// this.b = new Block(block.hash, this.b.height + 1);
// executedBlocks.add(block.hash);
// }
//
// private void submitBlock() {
// Block block = fillBlock();
// if (null != block) {
// LOGGER.info("deliver block " + block.hash + "...");
// LOGGER.debug(JsonUtil.toPrettyJson(block));
// String[] nodes = this.meta.getMembers();
// JsonObject req = new JsonObject();
// req.addProperty("action", "deliverBlock");
// req.addProperty("data", JsonUtil.toJson(block));
// req.addProperty("contractID", this.meta.getContractID());
// String reqStr = req.toString();
// // deliver blocks
// for (String node : nodes) {
// if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID())
// == RecoverFlag.Fine) {
// NetworkManager.instance.sendToAgent(node, reqStr);
// }
// }
// }
// }
//
// private synchronized Block fillBlock() {
// // pack contract requests into a block
// ContractRequest[] requests = new ContractRequest[Math.min(reqQueue.size(), SUBMIT_LIMIT)];
// if (requests.length == 0) {
// return null;
// }
// for (int i = 0; i < requests.length; ++i) {
// requests[i] = reqQueue.poll();
// }
// this.b.fillBlock(requests);
// return this.b;
// }
//
// static class Block extends SM2Verifiable {
// String prevHash = "0";
// String hash;
// int height;
// String checkPoint;
// String body;
// String nodePubKey;
// ContractRequest[] requests;
// long timestamp;
//
// public Block() {
// this.height = 0;
// }
//
// public Block(String prev, int height) {
// this.prevHash = prev;
// this.height = height;
// }
//
// public void fillBlock(ContractRequest[] requests) {
// this.requests = requests;
// this.timestamp = System.currentTimeMillis();
// this.body = merkle(requests);
// this.hash = computeHash();
// doSignature(CMActions.manager.nodeCenterConn.getNodeKeyPair());
// }
//
// public boolean isValid() {
// return computeHash().equals(hash) && body.equals(merkle(this.requests)) && verifySignature();
// }
//
// private String computeHash() {
// return HashUtil.sha3(
// String.valueOf(this.height),
// this.prevHash,
// this.checkPoint,
// this.body);
// }
//
// private String merkle(ContractRequest[] requests) {
// // manage requests as a merkle tree
// if (requests.length == 0) {
// return null;
// }
// if (requests.length == 1) {
// return HashUtil.sha3(requests[0].getRequestID());
// }
// Queue<String> reqQueue =
// Arrays.stream(requests).map(ContractRequest::getRequestID)
// .collect(Collectors.toCollection(ArrayDeque::new));
// do {
// int size;
// for (size = reqQueue.size(); size > 1; size -= 2) {
// reqQueue.add(HashUtil.sha3(reqQueue.poll(), reqQueue.poll()));
// }
// if (size == 1) {
// reqQueue.add(reqQueue.poll());
// }
// } while (1 != reqQueue.size());
// return reqQueue.poll();
// }
//
// @Override
// public String getPublicKey() {
// return nodePubKey;
// }
//
// @Override
// public void setPublicKey(String pubkey) {
// this.nodePubKey = pubkey;
// }
//
// @Override
// public String getContentStr() {
// return this.hash;
// }
// }
//}
package org.bdware.server.trustedmodel;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.ContractClient;
import org.bdware.sc.ContractManager;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.bean.FunctionDesp;
import org.bdware.sc.bean.SM2Verifiable;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.RecoverFlag;
import org.bdware.sc.util.HashUtil;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
import org.bdware.units.NetworkManager;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* @author Kaidong Wu
*/
public class SelfAdaptiveShardingExecutor implements ContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class);
private static final int SUBMIT_LIMIT = 1024;
private static final int DELAY = 1;
private final Queue<ContractRequest> reqQueue = new ConcurrentLinkedQueue<>();
private final MultiContractMeta meta;
private final Map<String, Block> toExecuted = new ConcurrentHashMap<>();
private final Set<String> executedBlocks = ConcurrentHashMap.newKeySet();
private final Map<String, Boolean> executedTxs = new ConcurrentHashMap<>();
private final Object flag = new Object();
private final ScheduledFuture<?> future;
private boolean running = true;
private Block b = new Block();
public SelfAdaptiveShardingExecutor(String contractID) {
this.meta =
CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
this.future = ContractManager.scheduledThreadPool.scheduleWithFixedDelay(
this::submitBlock,
DELAY,
DELAY,
TimeUnit.SECONDS);
LOGGER.debug(String.format("ContractManager.threadPool=%d/%d",
((ThreadPoolExecutor) ContractManager.threadPool).getActiveCount(),
((ThreadPoolExecutor) ContractManager.threadPool).getPoolSize()));
ContractManager.threadPool.execute(() -> {
LOGGER.info(
"[SelfAdaptiveShardingExecutor " + meta.getContractID() + "] starting service..." + running);
while (running) {
LOGGER.info("checking blocks to be executed, latest block=" +
this.b.prevHash + ", to be executed size=" + toExecuted.size());
LOGGER.debug("executed: " + JsonUtil.toJson(executedBlocks) + "\n\t" + JsonUtil.toJson(executedTxs));
while (!toExecuted.isEmpty()) {
String key = this.b.prevHash;
Block block = toExecuted.get(key);
if (null != block) {
executeBlock(block);
}
toExecuted.remove(key);
}
synchronized (flag) {
try {
flag.wait();
} catch (InterruptedException e) {
LOGGER.warn(String.format(
"[SelfAdaptiveShardingExecutor %s] waiting is interrupted: %s",
meta.getContractID(),
e.getMessage()));
}
}
}
});
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) {
// check client
ContractClient client = CMActions.manager.getClient(meta.getContractID());
if (null == client) {
LOGGER.error("contract " + meta.getContractID() + " not found!");
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("contract " + meta.getContractID() + " not found!"))));
return;
}
// check function
FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction());
if (null == funDesp) {
LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!");
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive(
String.format("action %s of contract %s not found!",
req.getAction(),
meta.getContractID())))));
return;
}
// for view function, execute it
if (funDesp.isView) {
CMActions.manager.executeLocallyAsync(req, rcb, hcb);
return;
}
// normal function, check if it is in blocks
if (executedTxs.containsKey(requestID)) {
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("this request has been packed!"))));
return;
}
// add blocks into request cache
LOGGER.debug("receive contract request " + requestID);
executedTxs.put(requestID, false);
reqQueue.add(req);
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Executing,
new JsonPrimitive("this request is adding into blocks"))));
// if cache is full, submit
if (reqQueue.size() >= SUBMIT_LIMIT) {
ContractManager.threadPool.execute(this::submitBlock);
}
}
@Override
public void close() {
// stop threads
this.future.cancel(true);
this.running = false;
LOGGER.info("destruct executor of contract " + meta.getContractID());
}
public void execute(String blockStr) {
Block block = JsonUtil.fromJson(blockStr, Block.class);
// the block must have not been cached or executed, and must be valid
if (!toExecuted.containsKey(block.prevHash) &&
!executedBlocks.contains(block.hash) &&
block.isValid()) {
// add block into block cache
LOGGER.info(String.format(
"[SelfAdaptiveShardingExecutor %s] receive block %s -> %s," +
" %d transactions, timestamp=%d, size=%d",
meta.getContractID(),
block.hash,
block.prevHash,
block.requests.length,
block.timestamp,
blockStr.length()));
toExecuted.put(block.prevHash, block);
// notify thread to execute blocks
synchronized (flag) {
flag.notify();
}
}
}
private synchronized void executeBlock(Block block) {
// used for the thread to execute blocks
LOGGER.debug("start");
// check contract requests, requests must have not been executed
for (ContractRequest request : block.requests) {
if (executedTxs.containsKey(request.getRequestID()) && executedTxs.get(request.getRequestID())) {
LOGGER.debug("find request " + request.getRequestID() + " has been executed!");
return;
}
}
// TODO check status
// executed requests
for (ContractRequest request : block.requests) {
String ret = CMActions.manager.executeLocally(request, null);
LOGGER.debug(String.format(
"[SelfAdaptiveShardingExecutor %s] result of request %s: %s",
meta.getContractID(),
request.getRequestID(),
ret));
executedTxs.put(request.getRequestID(), true);
}
LOGGER.info(String.format(
"[SelfAdaptiveShardingExecutor %s] execute %d transactions of block %s",
meta.getContractID(),
block.requests.length,
block.hash));
// TODO create check point
this.b = new Block(block.hash, this.b.height + 1);
executedBlocks.add(block.hash);
}
private void submitBlock() {
Block block = fillBlock();
if (null != block) {
LOGGER.info("deliver block " + block.hash + "...");
LOGGER.debug(JsonUtil.toPrettyJson(block));
String[] nodes = this.meta.getMembers();
JsonObject req = new JsonObject();
req.addProperty("action", "deliverBlock");
req.addProperty("data", JsonUtil.toJson(block));
req.addProperty("contractID", this.meta.getContractID());
String reqStr = req.toString();
// deliver blocks
for (String node : nodes) {
if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID())
== RecoverFlag.Fine) {
NetworkManager.instance.sendToAgent(node, reqStr);
}
}
}
}
private synchronized Block fillBlock() {
// pack contract requests into a block
ContractRequest[] requests = new ContractRequest[Math.min(reqQueue.size(), SUBMIT_LIMIT)];
if (requests.length == 0) {
return null;
}
for (int i = 0; i < requests.length; ++i) {
requests[i] = reqQueue.poll();
}
this.b.fillBlock(requests);
return this.b;
}
static class Block extends SM2Verifiable {
String prevHash = "0";
String hash;
int height;
String checkPoint;
String body;
String nodePubKey;
ContractRequest[] requests;
long timestamp;
public Block() {
this.height = 0;
}
public Block(String prev, int height) {
this.prevHash = prev;
this.height = height;
}
public void fillBlock(ContractRequest[] requests) {
this.requests = requests;
this.timestamp = System.currentTimeMillis();
this.body = merkle(requests);
this.hash = computeHash();
doSignature(CMActions.manager.nodeCenterConn.getNodeKeyPair());
}
public boolean isValid() {
return computeHash().equals(hash) && body.equals(merkle(this.requests)) && verifySignature();
}
private String computeHash() {
return HashUtil.sha3(
String.valueOf(this.height),
this.prevHash,
this.checkPoint,
this.body);
}
private String merkle(ContractRequest[] requests) {
// manage requests as a merkle tree
if (requests.length == 0) {
return null;
}
if (requests.length == 1) {
return HashUtil.sha3(requests[0].getRequestID());
}
Queue<String> reqQueue =
Arrays.stream(requests).map(ContractRequest::getRequestID)
.collect(Collectors.toCollection(ArrayDeque::new));
do {
int size;
for (size = reqQueue.size(); size > 1; size -= 2) {
reqQueue.add(HashUtil.sha3(reqQueue.poll(), reqQueue.poll()));
}
if (size == 1) {
reqQueue.add(reqQueue.poll());
}
} while (1 != reqQueue.size());
return reqQueue.poll();
}
@Override
public String getPublicKey() {
return nodePubKey;
}
@Override
public void setPublicKey(String pubkey) {
this.nodePubKey = pubkey;
}
@Override
public String getContentStr() {
return this.hash;
}
}
}