feat: update SelfAdaptiveShardingExecutor

This commit is contained in:
Frank.R.Wu 2022-07-30 16:26:43 +08:00
parent 5e45904ceb
commit 1356c1b56e

View File

@ -1,5 +1,6 @@
package org.bdware.consistency.plugin.sharding; package org.bdware.consistency.plugin.sharding;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive; import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
@ -14,7 +15,6 @@ import org.bdware.sc.bean.SM2Verifiable;
import org.bdware.sc.conn.OnHashCallback; import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta; import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.RecoverFlag;
import org.bdware.sc.util.HashUtil; import org.bdware.sc.util.HashUtil;
import org.bdware.sc.util.JsonUtil; import org.bdware.sc.util.JsonUtil;
@ -27,65 +27,70 @@ import java.util.stream.Collectors;
*/ */
public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecutor { public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class); private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class);
private static final int SUBMIT_LIMIT = 1024; private static final int SUBMIT_LIMIT = 5120;
private static final int DELAY = 2;
private final Queue<ContractRequest> reqQueue = new ConcurrentLinkedQueue<>(); private final Queue<ContractRequest> reqQueue = new ConcurrentLinkedQueue<>();
private final MultiContractMeta meta; private final MultiContractMeta meta;
private final Map<String, Block> toExecuted = new ConcurrentHashMap<>(); private final Map<Integer, Block> toExecuted = new ConcurrentHashMap<>();
private final Set<String> executedBlocks = ConcurrentHashMap.newKeySet();
private final Map<String, Boolean> executedTxs = new ConcurrentHashMap<>(); private final Map<String, Boolean> executedTxs = new ConcurrentHashMap<>();
private final Object flag = new Object(); private final Object executorFlag = new Object();
private final ScheduledFuture<?> future; private final ScheduledFuture<?> future;
private boolean running = true; private boolean running = true;
// the pointer to the latest executed block
private int executorPointer = 0;
// the block to be submitted
private Block b = new Block(); private Block b = new Block();
public SelfAdaptiveShardingExecutor(String contractID) { public SelfAdaptiveShardingExecutor(String contractID) {
this.meta = this.meta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID);
cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID); this.future = Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(
this.future = ContractManager.scheduledThreadPool.scheduleWithFixedDelay(
this::submitBlock, this::submitBlock,
2, DELAY,
2, DELAY,
TimeUnit.SECONDS); TimeUnit.SECONDS);
LOGGER.debug(String.format("ContractManager.threadPool=%d/%d", Executors.newCachedThreadPool().execute(() -> {
((ThreadPoolExecutor) ContractManager.threadPool).getActiveCount(), LOGGER.info(String.format(
((ThreadPoolExecutor) ContractManager.threadPool).getPoolSize())); "[Executor %s] starting executing service... %b",
ContractManager.threadPool.execute(() -> { meta.getContractID(), meta.isMaster()));
LOGGER.info(
"[SelfAdaptiveShardingExecutor " + meta.getContractID() + "] starting service..." + running);
while (running) { while (running) {
LOGGER.info("checking blocks to be executed, latest block=" + LOGGER.info(String.format(
this.b.prevHash + ", to be executed size=" + toExecuted.size()); "[Executor %s] checking blocks to be executed, latest height=%s, to be executed size=%d",
LOGGER.debug("executed: " + JsonUtil.toJson(executedBlocks) + "\n\t" + JsonUtil.toJson(executedTxs)); meta.getContractID(), executorPointer, toExecuted.size()));
while (!toExecuted.isEmpty()) { for (Block block = toExecuted.get(executorPointer);
String key = this.b.prevHash; null != block;
Block block = toExecuted.get(key); executorPointer += 1, block = toExecuted.get(executorPointer)) {
if (null != block) { executeBlock(block);
executeBlock(block); toExecuted.remove(executorPointer);
}
toExecuted.remove(key);
} }
synchronized (flag) { try {
try { synchronized (executorFlag) {
flag.wait(); executorFlag.wait();
} catch (InterruptedException e) {
LOGGER.warn(String.format(
"[SelfAdaptiveShardingExecutor %s] waiting is interrupted: %s",
meta.getContractID(),
e.getMessage()));
} }
} catch (InterruptedException e) {
LOGGER.warn(String.format(
"[Executor %s] waiting is interrupted: %s",
meta.getContractID(), e.getMessage()));
} }
} }
}); });
} }
public JsonArray checkCache() {
JsonArray ret = new JsonArray();
ret.add(executorPointer);
for (Map.Entry<Integer, Block> entry : toExecuted.entrySet()) {
ret.add(String.format("%d %s", entry.getValue().height, entry.getValue().hash));
}
return ret;
}
@Override @Override
public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) { public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) {
// check client // check client
ContractClient client = cmActions.getManager().getClient(meta.getContractID()); ContractClient client = cmActions.getManager().getClient(meta.getContractID());
if (null == client) { if (null == client) {
LOGGER.error("contract " + meta.getContractID() + " not found!"); LOGGER.error("contract " + meta.getContractID() + " not found!");
rcb.onResult(JsonUtil.toJson(new ContractResult( rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Error,
ContractResult.Status.Error,
new JsonPrimitive("contract " + meta.getContractID() + " not found!")))); new JsonPrimitive("contract " + meta.getContractID() + " not found!"))));
return; return;
} }
@ -93,12 +98,10 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto
FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction()); FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction());
if (null == funDesp) { if (null == funDesp) {
LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!"); LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!");
rcb.onResult(JsonUtil.toJson(new ContractResult( rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Error,
ContractResult.Status.Error, new JsonPrimitive(String.format(
new JsonPrimitive( "action %s of contract %s not found!",
String.format("action %s of contract %s not found!", req.getAction(), meta.getContractID())))));
req.getAction(),
meta.getContractID())))));
return; return;
} }
// for view function, execute it // for view function, execute it
@ -108,18 +111,21 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto
} }
// normal function, check if it is in blocks // normal function, check if it is in blocks
if (executedTxs.containsKey(requestID)) { if (executedTxs.containsKey(requestID)) {
rcb.onResult(JsonUtil.toJson(new ContractResult( rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Error,
ContractResult.Status.Error,
new JsonPrimitive("this request has been packed!")))); new JsonPrimitive("this request has been packed!"))));
return; return;
} }
// forward to master
if (!meta.isMaster()) {
cmActions.getManager().executeContractOnOtherNodes(req, rcb);
return;
}
// add blocks into request cache // add blocks into request cache
LOGGER.debug("receive contract request " + requestID); LOGGER.debug("receive contract request " + requestID);
executedTxs.put(requestID, false); executedTxs.put(requestID, false);
reqQueue.add(req); reqQueue.add(req);
rcb.onResult(JsonUtil.toJson(new ContractResult( rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Executing,
ContractResult.Status.Executing, new JsonPrimitive("this request is added into blocks"))));
new JsonPrimitive("this request is adding into blocks"))));
// if cache is full, submit // if cache is full, submit
if (reqQueue.size() >= SUBMIT_LIMIT) { if (reqQueue.size() >= SUBMIT_LIMIT) {
ContractManager.threadPool.execute(this::submitBlock); ContractManager.threadPool.execute(this::submitBlock);
@ -131,30 +137,38 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto
// stop threads // stop threads
this.future.cancel(true); this.future.cancel(true);
this.running = false; this.running = false;
LOGGER.info("destruct executor of contract " + meta.getContractID()); LOGGER.info("[Executor " + meta.getContractID() + "] destructed");
} }
public void execute(String blockStr) { @Override
public void onDeliverBlock(String blockStr) {
Block block = JsonUtil.fromJson(blockStr, Block.class); Block block = JsonUtil.fromJson(blockStr, Block.class);
// the block must have not been cached or executed, and must be valid // the block must have not been cached or executed, and must be valid
if (!toExecuted.containsKey(block.prevHash) && boolean valid = block.isValid();
!executedBlocks.contains(block.hash) && if (!toExecuted.containsKey(block.height) &&
block.isValid()) { block.height >= executorPointer &&
valid) {
// add block into block cache // add block into block cache
LOGGER.info(String.format( LOGGER.info(String.format(
"[SelfAdaptiveShardingExecutor %s] receive block %s -> %s," + "[Executor %s] receive block [%d] %s -> %s, %d transactions, timestamp=%d, size=%d",
" %d transactions, timestamp=%d, size=%d",
meta.getContractID(), meta.getContractID(),
block.height,
block.hash, block.hash,
block.prevHash, block.prevHash,
block.requests.length, block.requests.length,
block.timestamp, block.timestamp,
blockStr.length())); blockStr.length()));
toExecuted.put(block.prevHash, block); toExecuted.put(block.height, block);
// notify thread to execute blocks // notify thread to execute blocks
synchronized (flag) { synchronized (executorFlag) {
flag.notify(); executorFlag.notify();
} }
} else {
LOGGER.warn(String.format("receive block [%d] %s failed! checking: cached %b valid %b",
block.height,
block.hash,
toExecuted.containsKey(block.height),
valid));
} }
} }
@ -169,41 +183,46 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto
} }
} }
// TODO check status // TODO check status
if (null != block.checkPoint && !block.checkPoint.isEmpty()) {
cmActions.getManager().recoverUnitFromCheckPoint(meta.getContractID(), block.checkPoint);
}
// executed requests // executed requests
for (ContractRequest request : block.requests) { for (ContractRequest request : block.requests) {
String ret = cmActions.getManager().executeLocally(request, null); String ret = cmActions.getManager().executeLocally(request, null);
LOGGER.debug(String.format( LOGGER.debug(String.format(
"[SelfAdaptiveShardingExecutor %s] result of request %s: %s", "[Executor %s] result of request %s: %s",
meta.getContractID(), meta.getContractID(), request.getRequestID(), ret));
request.getRequestID(),
ret));
executedTxs.put(request.getRequestID(), true); executedTxs.put(request.getRequestID(), true);
} }
LOGGER.info(String.format( LOGGER.info(String.format(
"[SelfAdaptiveShardingExecutor %s] execute %d transactions of block %s", "[Executor %s] execute %d transactions of block [%d] %s",
meta.getContractID(), meta.getContractID(), block.requests.length, block.height, block.hash));
block.requests.length,
block.hash));
// TODO create check point
this.b = new Block(block.hash, this.b.height + 1);
executedBlocks.add(block.hash);
} }
private void submitBlock() { private void submitBlock() {
Block block = fillBlock(); Block block = fillBlock();
if (null != block) { if (null != block) {
LOGGER.info("deliver block " + block.hash + "...");
LOGGER.debug(JsonUtil.toPrettyJson(block)); LOGGER.debug(JsonUtil.toPrettyJson(block));
String[] nodes = this.meta.getMembers(); String[] nodes = this.meta.getMembers();
LOGGER.debug(JsonUtil.toJson(nodes));
JsonObject req = new JsonObject(); JsonObject req = new JsonObject();
String blockStr = JsonUtil.toJson(block);
req.addProperty("action", "deliverBlock"); req.addProperty("action", "deliverBlock");
req.addProperty("data", JsonUtil.toJson(block)); req.addProperty("data", blockStr);
req.addProperty("contractID", this.meta.getContractID()); req.addProperty("contractID", this.meta.getContractID());
String reqStr = req.toString(); String reqStr = req.toString();
// deliver blocks // deliver blocks
LOGGER.info("[Executor " + meta.getContractID() + "] deliver block " + block.hash + "...");
String myself = cmActions.getManager().nodeCenterConn.getNodeId();
this.onDeliverBlock(blockStr);
for (String node : nodes) { for (String node : nodes) {
if (masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(this.meta.getContractID()) // TODO: find dead lock here
== RecoverFlag.Fine) { // if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID())
// == RecoverFlag.Fine) {
// LOGGER.info("deliver block " + block.hash + " to node " + node);
// NetworkManager.instance.sendToAgent(node, reqStr);
// }
if (!Objects.equals(myself, node)) {
networkManager.sendToAgent(node, reqStr); networkManager.sendToAgent(node, reqStr);
} }
} }
@ -219,8 +238,15 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto
for (int i = 0; i < requests.length; ++i) { for (int i = 0; i < requests.length; ++i) {
requests[i] = reqQueue.poll(); requests[i] = reqQueue.poll();
} }
this.b.fillBlock(requests); Block block = this.b;
return this.b; if (executorPointer == block.height - 1) {
// TODO create check point
block.checkPoint = cmActions.getManager().getCheckPointFromUnit(meta.getContractID());
LOGGER.info("create check point in block " + block.height);
}
block.fillBlock(requests);
this.b = new Block(block.hash, block.height + 1);
return block;
} }
static class Block extends SM2Verifiable { static class Block extends SM2Verifiable {
@ -251,15 +277,19 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto
} }
public boolean isValid() { public boolean isValid() {
return computeHash().equals(hash) && body.equals(merkle(this.requests)) && verifySignature(); boolean hashValid = computeHash().equals(hash),
// bodyValid = body.equals(merkle(this.requests)),
bodyValid = true,
signValid = verifySignature();
boolean ret = hashValid & bodyValid & signValid;
if (!ret) {
LOGGER.warn(String.format("hash %b body %b sign %b", hashValid, bodyValid, signValid));
}
return ret;
} }
private String computeHash() { private String computeHash() {
return HashUtil.sha3( return HashUtil.sha3(String.valueOf(this.height), this.prevHash, this.checkPoint, this.body);
String.valueOf(this.height),
this.prevHash,
this.checkPoint,
this.body);
} }
private String merkle(ContractRequest[] requests) { private String merkle(ContractRequest[] requests) {
@ -270,19 +300,19 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto
if (requests.length == 1) { if (requests.length == 1) {
return HashUtil.sha3(requests[0].getRequestID()); return HashUtil.sha3(requests[0].getRequestID());
} }
Queue<String> reqQueue = Queue<String> merkleQueue =
Arrays.stream(requests).map(ContractRequest::getRequestID) Arrays.stream(requests).map(ContractRequest::getRequestID)
.collect(Collectors.toCollection(ArrayDeque::new)); .collect(Collectors.toCollection(ArrayDeque::new));
do { do {
int size; int size;
for (size = reqQueue.size(); size > 1; size -= 2) { for (size = merkleQueue.size(); size > 1; size -= 2) {
reqQueue.add(HashUtil.sha3(reqQueue.poll(), reqQueue.poll())); merkleQueue.add(HashUtil.sha3(merkleQueue.poll(), merkleQueue.poll()));
} }
if (size == 1) { if (size == 1) {
reqQueue.add(reqQueue.poll()); merkleQueue.add(merkleQueue.poll());
} }
} while (1 != reqQueue.size()); } while (1 != merkleQueue.size());
return reqQueue.poll(); return merkleQueue.poll();
} }
@Override @Override
@ -300,9 +330,4 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto
return this.hash; return this.hash;
} }
} }
}
@Override
public void onDeliverBlock(String data) {
execute(data);
}
}