feat(agent-backend)

deprecate SelfAdaptiveShardingExecutor
This commit is contained in:
Frank.R.Wu 2022-03-22 16:32:08 +08:00
parent 87d32ab220
commit 90cdabfa92

View File

@ -26,20 +26,20 @@ import java.util.stream.Collectors;
/** /**
* @author Kaidong Wu * @author Kaidong Wu
*/ */
@Deprecated
public class SelfAdaptiveShardingExecutor implements ContractExecutor { public class SelfAdaptiveShardingExecutor implements ContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class); private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class);
private static final int SUBMIT_LIMIT = 5120; private static final int SUBMIT_LIMIT = 5120;
private static final int DELAY = 1; private static final int DELAY = 2;
private final Queue<ContractRequest> reqQueue = new ConcurrentLinkedQueue<>(); private final Queue<ContractRequest> reqQueue = new ConcurrentLinkedQueue<>();
private final MultiContractMeta meta; private final MultiContractMeta meta;
private final Map<String, Block> toExecuted = new ConcurrentHashMap<>(); private final Map<Integer, Block> toExecuted = new ConcurrentHashMap<>();
private final Set<String> executedBlocks = ConcurrentHashMap.newKeySet();
private final Map<String, Boolean> executedTxs = new ConcurrentHashMap<>(); private final Map<String, Boolean> executedTxs = new ConcurrentHashMap<>();
private final Object executorFlag = new Object(); private final Object executorFlag = new Object();
private final ScheduledFuture<?> future; private final ScheduledFuture<?> future;
private boolean running = true; private boolean running = true;
// the pointer to the latest executed block // the pointer to the latest executed block
private String executorPointer = "0"; private int executorPointer = 0;
// the block to be submitted // the block to be submitted
private Block b = new Block(); private Block b = new Block();
@ -56,18 +56,13 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
meta.getContractID(), meta.isMaster())); meta.getContractID(), meta.isMaster()));
while (running) { while (running) {
LOGGER.info(String.format( LOGGER.info(String.format(
"[Executor %s] checking blocks to be executed, latest block=%s, to be executed size=%d", "[Executor %s] checking blocks to be executed, latest height=%s, to be executed size=%d",
meta.getContractID(), executorPointer, toExecuted.size())); meta.getContractID(), executorPointer, toExecuted.size()));
LOGGER.debug("executed: " + JsonUtil.toJson(executedBlocks) + "\n\t" + JsonUtil.toJson(executedTxs)); for (Block block = toExecuted.get(executorPointer);
while (!toExecuted.isEmpty()) { null != block;
Block block = toExecuted.get(executorPointer); executorPointer += 1, block = toExecuted.get(executorPointer)) {
// check if the execution ends
if (null == block) {
break;
}
executeBlock(block); executeBlock(block);
toExecuted.remove(executorPointer); toExecuted.remove(executorPointer);
executorPointer = block.hash;
} }
try { try {
synchronized (executorFlag) { synchronized (executorFlag) {
@ -85,8 +80,8 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
public JsonArray checkCache() { public JsonArray checkCache() {
JsonArray ret = new JsonArray(); JsonArray ret = new JsonArray();
ret.add(executorPointer); ret.add(executorPointer);
for (Map.Entry<String, Block> entry : toExecuted.entrySet()) { for (Map.Entry<Integer, Block> entry : toExecuted.entrySet()) {
ret.add(String.format("%s,%s,%d", entry.getKey(), entry.getValue().hash, entry.getValue().height)); ret.add(String.format("%d %s", entry.getValue().height, entry.getValue().hash));
} }
return ret; return ret;
} }
@ -144,15 +139,15 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
// stop threads // stop threads
this.future.cancel(true); this.future.cancel(true);
this.running = false; this.running = false;
LOGGER.info("destruct executor of contract " + meta.getContractID()); LOGGER.info("[Executor " + meta.getContractID() + "] destructed");
} }
public void receiveBlock(String blockStr) { public void receiveBlock(String blockStr) {
Block block = JsonUtil.fromJson(blockStr, Block.class); Block block = JsonUtil.fromJson(blockStr, Block.class);
// the block must have not been cached or executed, and must be valid // the block must have not been cached or executed, and must be valid
boolean valid = block.isValid(); boolean valid = block.isValid();
if (!toExecuted.containsKey(block.prevHash) && if (!toExecuted.containsKey(block.height) &&
!executedBlocks.contains(block.hash) && block.height >= executorPointer &&
valid) { valid) {
// add block into block cache // add block into block cache
LOGGER.info(String.format( LOGGER.info(String.format(
@ -164,17 +159,16 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
block.requests.length, block.requests.length,
block.timestamp, block.timestamp,
blockStr.length())); blockStr.length()));
toExecuted.put(block.prevHash, block); toExecuted.put(block.height, block);
// notify thread to execute blocks // notify thread to execute blocks
synchronized (executorFlag) { synchronized (executorFlag) {
executorFlag.notify(); executorFlag.notify();
} }
} else { } else {
LOGGER.warn(String.format("receive block [%d] %s failed! checking: cached %b executed %b valid %b", LOGGER.warn(String.format("receive block [%d] %s failed! checking: cached %b valid %b",
block.height, block.height,
block.hash, block.hash,
toExecuted.containsKey(block.prevHash), toExecuted.containsKey(block.height),
executedBlocks.contains(block.hash),
valid)); valid));
} }
} }
@ -190,6 +184,9 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
} }
} }
// TODO check status // TODO check status
if (null != block.checkPoint && !block.checkPoint.isEmpty()) {
CMActions.manager.recoverUnitFromCheckPoint(meta.getContractID(), block.checkPoint);
}
// executed requests // executed requests
for (ContractRequest request : block.requests) { for (ContractRequest request : block.requests) {
String ret = CMActions.manager.executeLocally(request, null); String ret = CMActions.manager.executeLocally(request, null);
@ -201,9 +198,6 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
LOGGER.info(String.format( LOGGER.info(String.format(
"[Executor %s] execute %d transactions of block [%d] %s", "[Executor %s] execute %d transactions of block [%d] %s",
meta.getContractID(), block.requests.length, block.height, block.hash)); meta.getContractID(), block.requests.length, block.height, block.hash));
// TODO create check point
// this.b = new Block(block.hash, this.b.height + 1);
executedBlocks.add(block.hash);
} }
private void submitBlock() { private void submitBlock() {
@ -219,8 +213,9 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
req.addProperty("contractID", this.meta.getContractID()); req.addProperty("contractID", this.meta.getContractID());
String reqStr = req.toString(); String reqStr = req.toString();
// deliver blocks // deliver blocks
LOGGER.info("deliver block " + block.hash + "..."); LOGGER.info("[Executor " + meta.getContractID() + "] deliver block " + block.hash + "...");
String myself = CMActions.manager.nodeCenterConn.getNodeId(); String myself = CMActions.manager.nodeCenterConn.getNodeId();
this.receiveBlock(blockStr);
for (String node : nodes) { for (String node : nodes) {
// TODO: find dead lock here // TODO: find dead lock here
// if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) // if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID())
@ -231,11 +226,8 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
if (!Objects.equals(myself, node)) { if (!Objects.equals(myself, node)) {
NetworkManager.instance.sendToAgent(node, reqStr); NetworkManager.instance.sendToAgent(node, reqStr);
} }
// LOGGER.info("delivering done: " + node);
} }
this.receiveBlock(blockStr);
} }
// LOGGER.info("end " + (null != block));
} }
private synchronized Block fillBlock() { private synchronized Block fillBlock() {
@ -248,6 +240,11 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
requests[i] = reqQueue.poll(); requests[i] = reqQueue.poll();
} }
Block block = this.b; Block block = this.b;
if (executorPointer == block.height - 1) {
// TODO create check point
block.checkPoint = CMActions.manager.getCheckPointFromUnit(meta.getContractID());
LOGGER.info("create check point in block " + block.height);
}
block.fillBlock(requests); block.fillBlock(requests);
this.b = new Block(block.hash, block.height + 1); this.b = new Block(block.hash, block.height + 1);
return block; return block;