initial commit

This commit is contained in:
CaiHQ 2022-02-17 17:07:01 +08:00
commit 90a1145578
19 changed files with 1852 additions and 0 deletions

26
.gitignore vendored Normal file
View File

@ -0,0 +1,26 @@
/build/
/testoutput/
*/build/*
# Compiled class file
*.class
.DS_Store
# Log file
*.log
# BlueJ files
*.ctxt
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.jar
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*

25
build.gradle Normal file
View File

@ -0,0 +1,25 @@
plugins {
id 'java'
}
repositories {
mavenCentral()
}
dependencies {
implementation project(":consistency-sdk")
testImplementation 'junit:junit:4.13.2'
}
jar {
String libs = ''
configurations.runtimeClasspath.each {
libs = libs + " libs/" + it.name
}
manifest {
attributes 'Manifest-Version': archiveVersion
attributes 'Class-Path': libs
}
}

View File

@ -0,0 +1,13 @@
package org.bdware.consistency.plugin.common;
import org.bdware.sdk.consistency.ConsistencyPluginManager;
import org.bdware.sdk.consistency.api.context.*;
import org.bdware.server.trustedmodel.ContractExecutor;
public abstract class AbstractContextContractExecutor implements ContractExecutor {
static protected IGlobalConf globalConf = ConsistencyPluginManager.getContext().getGlobalConf();
static protected ICMActions cmActions = ConsistencyPluginManager.getContext().getCMActions();
static protected INetworkManager networkManager = ConsistencyPluginManager.getContext().getNetworkManager();
static protected IMasterServerTCPAction masterServerTCPAction = ConsistencyPluginManager.getContext().getMasterServerTCPAction();
static protected IMasterServerRecoverMechAction masterServerRecoverMechAction = ConsistencyPluginManager.getContext().getMasterServerRecoverMechAction();
}

View File

@ -0,0 +1,35 @@
package org.bdware.consistency.plugin.pbft;
import com.google.gson.JsonObject;
import org.bdware.sc.conn.ByteUtil;
import org.bdware.sc.units.PubKeyNode;
import org.bdware.sc.units.TrustfulExecutorConnection;
import org.bdware.sdk.consistency.ConsistencyPluginManager;
import java.util.ArrayList;
import java.util.List;
public class ContractCluster implements TrustfulExecutorConnection<PubKeyNode> {
private final List<PubKeyNode> members;
String contractID;
public ContractCluster(String contractID, List<PubKeyNode> members) {
this.members = new ArrayList<>();
this.members.addAll(members);
this.contractID = contractID;
}
@Override
public void sendMessage(PubKeyNode node, byte[] msg) {
JsonObject jo = new JsonObject();
jo.addProperty("action", "contractSyncMessage");
jo.addProperty("contractID", contractID);
jo.addProperty("data", ByteUtil.encodeBASE64(msg));
ConsistencyPluginManager.getInstance().getContext().getNetworkManager().sendToAgent(node.pubkey, jo.toString());
}
@Override
public List<PubKeyNode> getNodes() {
return members;
}
}

View File

@ -0,0 +1,374 @@
package org.bdware.consistency.plugin.pbft;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.common.AbstractContextContractExecutor;
import org.bdware.sc.ComponedContractResult;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.Node;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.consistency.Committer;
import org.bdware.sc.consistency.pbft.PBFTAlgorithm;
import org.bdware.sc.consistency.pbft.PBFTMember;
import org.bdware.sc.consistency.pbft.PBFTMessage;
import org.bdware.sc.consistency.pbft.PBFTType;
import org.bdware.sc.units.*;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.trustedmodel.MultiReqSeq;
import org.zz.gmhelper.SM2KeyPair;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
//TODO 追赶差下的调用
public class PBFTExecutor extends AbstractContextContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(PBFTExecutor.class);
final Object lock = new Object();
private final List<PubKeyNode> members;
int resultCount;
AtomicInteger request_index = new AtomicInteger(0);
// key为requestIDvalue为其seq
Map<String, MultiReqSeq> seqMap = new ConcurrentHashMap<>();
Map<String, ResultCache> resultCache = new ConcurrentHashMap<>();
// MultiPointContractInfo info;
String contractID;
PBFTAlgorithm pbft;
ContractCluster contractCluster;
boolean isMaster;
public PBFTExecutor(
int c, String con_id, final String masterPubkey, String[] members) {
resultCount = c;
contractID = con_id;
this.members = new ArrayList<>();
isMaster = globalConf.getNodeID().equals(masterPubkey);
pbft = new PBFTAlgorithm(isMaster);
int count = 0;
for (String mem : members) {
PubKeyNode pubkeyNode = new PubKeyNode();
pubkeyNode.pubkey = mem;
PBFTMember pbftMember = new PBFTMember();
pbftMember.isMaster = mem.equals(masterPubkey);
pbft.addMember(pubkeyNode, pbftMember);
this.members.add(pubkeyNode);
if (globalConf.getNodeID().equals(mem)) {
pbft.setSendID(count);
}
count++;
}
contractCluster = new ContractCluster(contractID, this.members);
pbft.setConnection(contractCluster);
final MultiContractMeta cei = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID);
pbft.setCommitter(new Committer() {
@Override
public void onCommit(ContractRequest data) {
ResultCallback ret = null;
final long startTime = System.currentTimeMillis();
ret = new ResultCallback() {
@Override
public void onResult(String str) {
Map<String, String> ret = new HashMap<>();
ret.put("action", "receiveTrustfullyResult");
SM2KeyPair keyPair = globalConf.getKeyPair();
ret.put("nodeID", keyPair.getPublicKeyStr());
ret.put("responseID", data.getRequestID());
ret.put("executeTime", (System.currentTimeMillis() - startTime) + "");
ret.put("data", str);
cei.setLastExeSeq(data.seq);
networkManager.sendToAgent(masterPubkey, JsonUtil.toJson(ret));
}
};
cmActions.getManager().executeLocallyAsync(data, ret, null);
}
});
}
public void onSyncMessage(Node node, byte[] data) {
pbft.onMessage(node, data);
}
public void setSeq(int seq) {
request_index = new AtomicInteger(seq);
pbft.setAtomSeq(request_index.get());
}
public ResultCallback createResultCallback(
final String requestID,
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID) {
ComponedContractResult componedContractResult = new ComponedContractResult(count);
// TODO 加对应的超时
return networkManager.createResultCallback(
requestID, new ResultMerger(originalCb, count, request_seq, contractID), count);
}
public void sendRequest(String id, ContractRequest req, ResultCallback collector) {
// Map<String, Object> reqStr = new HashMap<>();
// reqStr.put("uniReqID", id);
// reqStr.put("data", req);
// reqStr.put("action", "executeContractLocally");
ContractRequest cr2 = ContractRequest.parse(req.toByte());
cr2.setRequestID(id);
PBFTMessage request = new PBFTMessage();
request.setOrder(req.seq);
request.setType(PBFTType.Request);
request.setContent(cr2.toByte());
for (PubKeyNode node : members) {
if (!networkManager.hasAgentConnection(node.pubkey)) {
LOGGER.warn("cmNode " + node.pubkey.substring(0, 5) + " is null");
collector.onResult(
"{\"status\":\"Error\",\"result\":\"node offline\","
+ "\"nodeID\":\""
+ node
+ "\","
+ "\"action\":\"onExecuteContractTrustfully\"}");
// } else if (MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID)
// != RecoverFlag.Fine) {
// collector.onResult(
// "{\"status\":\"Error\",\"result\":\"node recovering\","
// + "\"nodeID\":\""
// + node
// + "\","
// + "\"action\":\"onExecuteContractTrustfully\"}");
// contractCluster.sendMessage(node, request.getBytes());
} else {
contractCluster.sendMessage(node, request.getBytes());
}
}
// master负责缓存请求
if (!masterServerTCPAction.getReqCache().containsKey(contractID)) {
masterServerTCPAction.getReqCache().put(contractID, new RequestCache());
}
// TODO 多调多统一个seq的有多个请求这个需要改
String[] nodes =
cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers();
LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID());
LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes));
}
public boolean checkCurNodeNumValid() {
return true;
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
LOGGER.debug(JsonUtil.toJson(req));
MultiContractMeta meta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(req.getContractID());
if (meta == null || !meta.isMaster()) {
cmActions.getManager().executeContractOnOtherNodes(req, rc);
return;
}
req.setContractID(cmActions.getManager().getContractIDByName(req.getContractID()));
// 三个相同requestID进来的时候会有冲突
// 仅在此处有冲突么
// 这里是从MasterServer->MasterClient请求的是"executeContractLocally"
// 如果是多点合约的请求A1A2A3的序号应该一致不能分配一个新的seq根据requestID判断是否不需要重新分配一个序号
//TODO seqMap memory leak
//TODO
//TODO
if (null != requestID && requestID.endsWith("_mul")) {
synchronized (lock) {
if (seqMap.containsKey(requestID)) {
req.seq = seqMap.get(requestID).seq;
} else {
req.seq = request_index.getAndIncrement();
seqMap.put(requestID, new MultiReqSeq(req.seq));
}
}
} else {
req.seq = request_index.getAndIncrement();
}
req.needSeq = true;
String id =
System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq;
LOGGER.info("execute receive requestID=" + requestID + " msgID=" + id);
if (checkCurNodeNumValid()) {
LOGGER.debug("checkCurNodeNumValid=true");
ResultCallback collector =
createResultCallback(id, rc, resultCount, req.seq, req.getContractID());
masterServerTCPAction.getSync().sleep(id, collector);
LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq);
sendRequest(id, req, collector);
} else {
LOGGER.debug("invalidNodeNumOnResult");
request_index.getAndDecrement();
ContractResult finalResult =
new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("node number unavailable, request refused."));
rc.onResult(JsonUtil.toJson(finalResult));
}
// }
/* // 三个相同requestID进来的时候会有冲突
// 仅在此处有冲突么
// 这里是从MasterServer->MasterClient请求的是"executeContractLocally"
req.seq = request_index.getAndIncrement();
req.needSeq = true;
ResultCallback collector = createResultCallback(id, rc, resultCount, req.getContractID());
MasterServerTCPAction.sync.sleep(id, collector);
sendRequest(id, req, collector);*/
}
// 清理缓存的多点合约请求序号
public void clearCache() {
final long time = System.currentTimeMillis() - 30000L;
seqMap.entrySet()
.removeIf(
entry -> {
MultiReqSeq cache = entry.getValue();
if (null == cache) {
return true;
}
return cache.startTime < time;
});
}
public static class ResultMerger extends ResultCallback {
ComponedContractResult componedContractResult;
AtomicInteger order;
String contractID;
int count;
int request_seq;
ResultCallback originalCallback;
Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点
ResultMerger(
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID) {
originalCallback = originalCb;
this.count = count;
this.request_seq = request_seq;
this.contractID = contractID;
componedContractResult = new ComponedContractResult(count);
order = new AtomicInteger(0);
}
public String getContractID() {
return contractID;
}
public String getInfo() {
return "contractID="
+ contractID
+ " 收到第 "
+ order
+ " 个节点回复 : "
+ " order="
+ order
+ " count="
+ count
+ " ";
}
@Override
public void onResult(String str) {
// TODO 必须在这里聚合
// str的data是个ContractResult
// 在这儿也是返回个ContractResult
try {
LOGGER.debug("a result of contract" + contractID + ": " + str);
JsonObject obj = JsonParser.parseString(str).getAsJsonObject();
if (obj.has("nodeID")) {
String id = obj.get("nodeID").getAsString();
if (nodeIDs.contains(id)) {
LOGGER.debug(
"ignored result because the result of node "
+ id.substring(0, 5)
+ " has been received");
return;
}
nodeIDs.add(id);
}
LOGGER.debug(
String.format(
"contractID=%s received=%s order=%d count=%d",
contractID, str, order.get(), count));
componedContractResult.add(obj);
// 收集到所有结果
if (order.incrementAndGet() == count) {
ContractResult finalResult = componedContractResult.figureFinalResult();
finalResult.needSeq = true;
finalResult.seq = request_seq;
// if (null == finalResult) {
// finalResult =
// new ContractResult(
// ContractResult.Status.Exception,
// new JsonPrimitive(
// "no nore than half of the
// consistent result"));
// originalCallback.onResult(new
// Gson().toJson(finalResult));
// } else {
originalCallback.onResult(JsonUtil.toJson(finalResult));
// }
LOGGER.debug(
String.format(
"%d results are the same: %s",
finalResult.size, finalResult.result));
// 集群中事务序号+1
cmActions.getManager().multiContractRecorder
.getMultiContractMeta(contractID)
.nextSeqAtMaster();
// recover其中无状态合约CP出错无需恢复
Set<String> nodesID = componedContractResult.getProblemNodes();
if (null == nodesID || nodesID.isEmpty()) {
return;
}
for (String nodeID : nodesID) {
LOGGER.warn("node fails! " + nodeID);
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID)
== RecoverFlag.Fine) {
masterServerRecoverMechAction.getRecoverStatusMap()
.get(nodeID)
.put(contractID, RecoverFlag.ToRecover);
}
}
for (String nodeID : nodesID) {
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID)
== RecoverFlag.ToRecover) {
LOGGER.warn("node in recover " + nodeID);
// 因为该节点结果有误所以即时是stableMode也认为trans记录不可信
// 直接通过load别的节点来恢复
masterServerRecoverMechAction.restartContractFromCommonMode(
nodeID, contractID);
}
}
}
// clearCache();
} catch (Exception e) {
e.printStackTrace();
LOGGER.warn("result exception!");
}
}
}
@Override
public void onRecover(Map<String, Object> args) {
int ceiLastExeSeq = (int) args.get("ceiLastExeSeq");
this.setSeq(ceiLastExeSeq + 1);
}
}

View File

@ -0,0 +1,22 @@
package org.bdware.consistency.plugin.pbft;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor;
import java.util.Map;
public class PBFTExecutorFactory implements ContractExecutorFactory {
@Override
public String getExecutorName() {
return "PBFT";
}
@Override
public ContractExecutor getInstance(Map<String, Object> args) {
int nodeSize = (int) args.get("nodeSize");
String contractID = (String) args.get("contractID");
String masterPubkey = (String) args.get("masterPubkey");
String[] members = (String[]) args.get("members");
return new PBFTExecutor(nodeSize, contractID, masterPubkey, members);
}
}

View File

@ -0,0 +1,343 @@
package org.bdware.consistency.plugin.ra;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.common.AbstractContextContractExecutor;
import org.bdware.sc.ComponedContractResult;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractExecType;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.RecoverFlag;
import org.bdware.sc.units.RequestCache;
import org.bdware.sc.units.ResultCache;
import org.bdware.sc.util.JsonUtil;
import org.bdware.sdk.consistency.api.NotifiableResultMerger;
import org.bdware.server.trustedmodel.MultiReqSeq;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class RequestAllExecutor extends AbstractContextContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(RequestAllExecutor.class);
final Object lock = new Object();
int resultCount;
AtomicInteger request_index = new AtomicInteger(0);
ContractExecType type;
// key为requestIDvalue为其seq
Map<String, MultiReqSeq> seqMap = new ConcurrentHashMap<>();
Map<String, ResultCache> resultCache = new ConcurrentHashMap<>();
// MultiPointContractInfo info;
String contractID;
public RequestAllExecutor(
ContractExecType t, int c, String con_id) {
type = t;
resultCount = c;
contractID = con_id;
}
public void setSeq(int seq) {
request_index = new AtomicInteger(seq);
}
public ResultCallback createResultCallback(
final String requestID,
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID) {
ComponedContractResult componedContractResult = new ComponedContractResult(count);
// TODO 加对应的超时
return networkManager.createResultCallback(
requestID, new ResultMerger(originalCb, count, request_seq, contractID), count);
}
public void sendRequest(String id, ContractRequest req, ResultCallback collector) {
Map<String, Object> reqStr = new HashMap<>();
reqStr.put("uniReqID", id);
reqStr.put("data", req);
reqStr.put("action", "executeContractLocally");
String sendStr = JsonUtil.toJson(reqStr);
// master负责缓存请求
if (!masterServerTCPAction.getReqCache().containsKey(contractID)) {
masterServerTCPAction.getReqCache().put(contractID, new RequestCache());
}
// TODO 多调多统一个seq的有多个请求这个需要改
masterServerTCPAction.getReqCache().get(contractID).put(req.seq, sendStr);
LOGGER.debug(JsonUtil.toJson(req));
String[] nodes =
cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers();
LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID());
LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes));
for (String node : nodes) {
LOGGER.debug("get cmNode " + node.substring(0, 5));
if (!networkManager.hasAgentConnection(node)) {
LOGGER.warn("cmNode " + node.substring(0, 5) + " is null");
collector.onResult(
"{\"status\":\"Error\",\"result\":\"node offline\","
+ "\"nodeID\":\""
+ node
+ "\","
+ "\"action\":\"onExecuteContractTrustfully\"}");
} else if (masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID)
!= RecoverFlag.Fine) {
collector.onResult(
"{\"status\":\"Error\",\"result\":\"node recovering\","
+ "\"nodeID\":\""
+ node
+ "\","
+ "\"action\":\"onExecuteContractTrustfully\"}");
networkManager.sendToAgent(node, sendStr);
} else {
LOGGER.info("send request to cmNode " + node.substring(0, 5));
networkManager.sendToAgent(node, sendStr);
}
}
}
public boolean checkCurNodeNumValid() {
String[] nodes =
cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers();
int validNode = 0;
Map<String, String> mapResult = new HashMap<>();
for (String node : nodes) {
mapResult.put(node.substring(0, 5), String.format("%s %s", networkManager.hasAgentConnection(node) + "",
masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID)));
if (networkManager.hasAgentConnection(node)
&& masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID)
== RecoverFlag.Fine) {
validNode++;
}
}
LOGGER.info(JsonUtil.toPrettyJson(mapResult));
int c = resultCount;
if (ContractExecType.RequestAllResponseAll.equals(type)) {
c = (int) Math.ceil((double) c / 2);
}
LOGGER.debug("c=" + c + " validNode=" + validNode);
return validNode >= c;
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
LOGGER.debug(JsonUtil.toJson(req));
MultiContractMeta meta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(req.getContractID());
if (meta == null || !meta.isMaster()) {
cmActions.getManager().executeContractOnOtherNodes(req, rc);
return;
}
req.setContractID(cmActions.getManager().getContractIDByName(req.getContractID()));
// 三个相同requestID进来的时候会有冲突
// 仅在此处有冲突么
// 这里是从MasterServer->MasterClient请求的是"executeContractLocally"
// 如果是多点合约的请求A1A2A3的序号应该一致不能分配一个新的seq根据requestID判断是否不需要重新分配一个序号
//TODO seqMap memory leak
//TODO
//TODO
if (null != requestID && requestID.endsWith("_mul")) {
synchronized (lock) {
if (seqMap.containsKey(requestID)) {
req.seq = seqMap.get(requestID).seq;
} else {
req.seq = request_index.getAndIncrement();
seqMap.put(requestID, new MultiReqSeq(req.seq));
}
}
} else {
req.seq = request_index.getAndIncrement();
}
req.needSeq = true;
String id =
System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq;
LOGGER.info("execute receive requestID=" + requestID + " msgID=" + id);
if (checkCurNodeNumValid()) {
LOGGER.debug("checkCurNodeNumValid=true");
ResultCallback collector =
createResultCallback(id, rc, resultCount, req.seq, req.getContractID());
masterServerTCPAction.getSync().sleep(id, collector);
LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq);
sendRequest(id, req, collector);
} else {
LOGGER.debug("invalidNodeNumOnResult");
request_index.getAndDecrement();
ContractResult finalResult =
new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("node number unavailable, request refused."));
rc.onResult(JsonUtil.toJson(finalResult));
}
// }
/* // 三个相同requestID进来的时候会有冲突
// 仅在此处有冲突么
// 这里是从MasterServer->MasterClient请求的是"executeContractLocally"
req.seq = request_index.getAndIncrement();
req.needSeq = true;
ResultCallback collector = createResultCallback(id, rc, resultCount, req.getContractID());
MasterServerTCPAction.sync.sleep(id, collector);
sendRequest(id, req, collector);*/
}
// 清理缓存的多点合约请求序号
public void clearCache() {
final long time = System.currentTimeMillis() - 30000L;
seqMap.entrySet()
.removeIf(
entry -> {
MultiReqSeq cache = entry.getValue();
if (null == cache) {
return true;
}
return cache.startTime < time;
});
}
public static class ResultMerger extends ResultCallback implements NotifiableResultMerger {
ComponedContractResult componedContractResult;
AtomicInteger order;
String contractID;
int count;
int request_seq;
ResultCallback originalCallback;
Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点
ResultMerger(
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID) {
originalCallback = originalCb;
this.count = count;
this.request_seq = request_seq;
this.contractID = contractID;
componedContractResult = new ComponedContractResult(count);
order = new AtomicInteger(0);
}
public String getContractID() {
return contractID;
}
public String getInfo() {
return "contractID="
+ contractID
+ " 收到第 "
+ order
+ " 个节点回复 : "
+ " order="
+ order
+ " count="
+ count
+ " ";
}
@Override
public void onResult(String str) {
// TODO 必须在这里聚合
// str的data是个ContractResult
// 在这儿也是返回个ContractResult
try {
LOGGER.debug("a result of contract" + contractID + ": " + str);
JsonObject obj = JsonParser.parseString(str).getAsJsonObject();
if (obj.has("nodeID")) {
String id = obj.get("nodeID").getAsString();
if (nodeIDs.contains(id)) {
LOGGER.debug(
"ignored result because the result of node "
+ id.substring(0, 5)
+ " has been received");
return;
}
nodeIDs.add(id);
}
LOGGER.debug(
String.format(
"contractID=%s received=%s order=%d count=%d",
contractID, str, order.get(), count));
componedContractResult.add(obj);
// 收集到所有结果
if (order.incrementAndGet() == count) {
ContractResult finalResult = componedContractResult.figureFinalResult();
finalResult.needSeq = true;
finalResult.seq = request_seq;
// if (null == finalResult) {
// finalResult =
// new ContractResult(
// ContractResult.Status.Exception,
// new JsonPrimitive(
// "no nore than half of the
// consistent result"));
// originalCallback.onResult(new
// Gson().toJson(finalResult));
// } else {
originalCallback.onResult(JsonUtil.toJson(finalResult));
// }
LOGGER.debug(
String.format(
"%d results are the same: %s",
finalResult.size, finalResult.result));
// 集群中事务序号+1
cmActions.getManager().multiContractRecorder
.getMultiContractMeta(contractID)
.nextSeqAtMaster();
// recover其中无状态合约CP出错无需恢复
Set<String> nodesID = componedContractResult.getProblemNodes();
if (null == nodesID || nodesID.isEmpty()) {
return;
}
for (String nodeID : nodesID) {
LOGGER.warn("node fails! " + nodeID);
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID)
== RecoverFlag.Fine) {
masterServerRecoverMechAction.getRecoverStatusMap()
.get(nodeID)
.put(contractID, RecoverFlag.ToRecover);
}
}
for (String nodeID : nodesID) {
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID)
== RecoverFlag.ToRecover) {
LOGGER.warn("node in recover " + nodeID);
// 因为该节点结果有误所以即时是stableMode也认为trans记录不可信
// 直接通过load别的节点来恢复
masterServerRecoverMechAction.restartContractFromCommonMode(
nodeID, contractID);
}
}
}
// clearCache();
} catch (Exception e) {
e.printStackTrace();
LOGGER.warn("result exception!");
}
}
}
@Override
public void onRecover(Map<String, Object> args) {
int ceiLastExeSeq = (int) args.get("ceiLastExeSeq");
this.setSeq(ceiLastExeSeq + 1);
}
}

View File

@ -0,0 +1,21 @@
package org.bdware.consistency.plugin.ra;
import org.bdware.sc.bean.ContractExecType;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor;
import java.util.Map;
public class RequestAllResponseAllFactory implements ContractExecutorFactory {
@Override
public String getExecutorName() {
return "RARA";
}
@Override
public ContractExecutor getInstance(Map<String, Object> args) {
int nodeSize = (int) args.get("nodeSize");
String contractID = (String) args.get("contractID");
return new RequestAllExecutor(ContractExecType.RequestAllResponseAll, nodeSize, contractID);
}
}

View File

@ -0,0 +1,21 @@
package org.bdware.consistency.plugin.ra;
import org.bdware.sc.bean.ContractExecType;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor;
import java.util.Map;
public class RequestAllResponseFirstFactory implements ContractExecutorFactory {
@Override
public String getExecutorName() {
return "RARF";
}
@Override
public ContractExecutor getInstance(Map<String, Object> args) {
String contractID = (String) args.get("contractID");
return new RequestAllExecutor(
ContractExecType.RequestAllResponseFirst, 1, contractID);
}
}

View File

@ -0,0 +1,24 @@
package org.bdware.consistency.plugin.ra;
import org.bdware.sc.bean.ContractExecType;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor;
import java.util.Map;
public class RequestAllResponseHalfFactory implements ContractExecutorFactory {
@Override
public String getExecutorName() {
return "RARH";
}
@Override
public ContractExecutor getInstance(Map<String, Object> args) {
int nodeSize = (int) args.get("nodeSize");
String contractID = (String) args.get("contractID");
return new RequestAllExecutor(
ContractExecType.RequestAllResponseHalf,
nodeSize / 2 + 1,
contractID);
}
}

View File

@ -0,0 +1,63 @@
package org.bdware.consistency.plugin.ro;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.common.AbstractContextContractExecutor;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.util.JsonUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class RequestOnceExecutor extends AbstractContextContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(RequestOnceExecutor.class);
String contractID;
AtomicInteger order = new AtomicInteger(0);
public RequestOnceExecutor(String contractID) {
this.contractID = contractID;
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
ResultCallback cb =
new ResultCallback() {
@Override
public void onResult(String str) {
LOGGER.debug(str);
JsonObject jo = JsonParser.parseString(str).getAsJsonObject();
JsonObject result =
JsonParser.parseString(jo.get("data").getAsString())
.getAsJsonObject();
for (String key : result.keySet()) jo.add(key, result.get(key));
jo.remove("action");
jo.addProperty("action", "onExecuteResult");
LOGGER.debug(jo.toString());
rc.onResult(jo.toString());
}
};
masterServerTCPAction.getSync().sleep(requestID, cb);
String[] members = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers();
for (int i = 0; i < members.length; i++) {
LOGGER.info("[members]:" + members.length);
int size = members.length;
String nodeID = members[order.incrementAndGet() % size];
//ADD Connect
Map<String, Object> obj = new HashMap<>();
obj.put("action", "executeContractLocally");
obj.put("requestID", requestID);
obj.put("data", req);
obj.put("uniReqID", requestID);
networkManager.sendToAgent(nodeID, JsonUtil.toJson(obj));
return;
}
rc.onResult(
"{\"status\":\"Error\",\"result\":\"all nodes "
+ " offline\",\"action\":\"onExecuteContract\"}");
}
}

View File

@ -0,0 +1,19 @@
package org.bdware.consistency.plugin.ro;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor;
import java.util.Map;
public class RequestOnceExecutorFactory implements ContractExecutorFactory {
@Override
public String getExecutorName() {
return "RequestOnce";
}
@Override
public ContractExecutor getInstance(Map<String, Object> args) {
String contractID = (String) args.get("contractID");
return new RequestOnceExecutor(contractID);
}
}

View File

@ -0,0 +1,80 @@
package org.bdware.consistency.plugin.ro;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.common.AbstractContextContractExecutor;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.util.JsonUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class ResponseOnceExecutor extends AbstractContextContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(ResponseOnceExecutor.class);
private final String contractID;
AtomicInteger order = new AtomicInteger(0);
public ResponseOnceExecutor(String contractID) {
this.contractID = contractID;
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
executeInternal(requestID, rc, req, 2);
}
private void executeInternal(
String requestID, ResultCallback rc, ContractRequest req, int count) {
// String contractID = req.getContractID();
// TODO 标注失效节点是否选择重新迁移
ResultCallback cb =
new ResultCallback() {
@Override
public void onResult(String str) {
LOGGER.debug(str);
JsonObject jo = JsonParser.parseString(str).getAsJsonObject();
jo.remove("action");
jo.addProperty("action", "onExecuteResult");
LOGGER.debug(jo.toString());
if (jo.has("data")) {
String data = jo.get("data").getAsString();
ContractResult cr = JsonUtil.fromJson(data, ContractResult.class);
if (cr.status != ContractResult.Status.Success && count > 0) {
executeInternal(requestID, rc, req, count - 1);
} else rc.onResult(jo.toString());
} else {
JsonObject jo2 = new JsonObject();
jo2.addProperty("action", "onExecuteResult");
jo.remove("action");
jo2.addProperty("data", jo.toString());
rc.onResult(jo2.toString());
}
}
};
masterServerTCPAction.getSync().sleepWithTimeout(requestID, cb, 5);
if (!sendOnce(requestID, req))
rc.onResult(
"{\"status\":\"Error\",\"data\":\"{\\\"status\\\":\\\"Error\\\",\\\"result\\\":\\\"all nodes offline\\\"}\",\"action\":\"onExecuteContract\"}");
}
private boolean sendOnce(String requestID, ContractRequest req) {
String[] members = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers();
for (int i = 0; i < members.length; i++) {
int size = members.length;
String nodeID = members[order.incrementAndGet() % size];
Map<String, Object> obj = new HashMap<>();
obj.put("action", "executeContractLocally");
obj.put("data", req);
obj.put("uniReqID", requestID);
networkManager.sendToAgent(nodeID, JsonUtil.toJson(obj));
return true;
}
return false;
}
}

View File

@ -0,0 +1,19 @@
package org.bdware.consistency.plugin.ro;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor;
import java.util.Map;
public class ResponseOnceExecutorFactory implements ContractExecutorFactory {
@Override
public String getExecutorName() {
return "ResponseOnce";
}
@Override
public ContractExecutor getInstance(Map<String, Object> args) {
String contractID = (String) args.get("contractID");
return new ResponseOnceExecutor(contractID);
}
}

View File

@ -0,0 +1,395 @@
package org.bdware.consistency.plugin.sharding;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.common.AbstractContextContractExecutor;
import org.bdware.sc.ComponedContractResult;
import org.bdware.sc.ContractMeta;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.*;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.RecoverFlag;
import org.bdware.sc.units.RequestCache;
import org.bdware.sc.units.ResultCache;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.trustedmodel.MultiReqSeq;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
// 改为MultiPointCooperationExecutor
public class MultiPointCooperationExecutor extends AbstractContextContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(MultiPointCooperationExecutor.class);
final Object lock = new Object();
int resultCount;
AtomicInteger request_index = new AtomicInteger(0);
ContractExecType type;
// key为requestIDvalue为其seq
Map<String, MultiReqSeq> seqMap = new ConcurrentHashMap<>();
Map<String, ResultCache> resultCache = new ConcurrentHashMap<>();
// MultiPointContractInfo info;
MultiContractMeta multiMeta;
String contractID;
public MultiPointCooperationExecutor(ContractExecType t, int c, String con_id) {
LOGGER.info("-- sharding executor---");
type = t;
resultCount = c;
contractID = con_id;
multiMeta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID);
}
public void setSeq(int seq) {
request_index = new AtomicInteger(seq);
}
public ResultCallback createResultCallback(
final String requestID,
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID, JoinInfo joinInfo) {
// TODO 加对应的超时
return networkManager.createResultCallback(
requestID,
new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID, joinInfo),
count); // 把count改成了1设置成获得1个响应就行
}
public void sendRequest(String id, ContractRequest req, String[] nodes) {
Map<String, Object> reqStr = new HashMap<>();
reqStr.put("uniReqID", id);
reqStr.put("data", req);
req.needSeq = false;
reqStr.put("action", "executeContractLocally");
String sendStr = JsonUtil.toJson(reqStr);
// master负责缓存请求
if (!masterServerTCPAction.getReqCache().containsKey(contractID)) {
masterServerTCPAction.getReqCache().put(contractID, new RequestCache());
}
// TODO 多调多统一个seq的有多个请求这个需要改
masterServerTCPAction.getReqCache().get(contractID).put(req.seq, sendStr);
LOGGER.debug(JsonUtil.toJson(req));
LOGGER.info("node size = " + nodes.length);
LOGGER.debug("nodes:" + JsonUtil.toJson(nodes));
for (String node : nodes) {
LOGGER.info(
"[sendRequests] get cmNode "
+ node.substring(0, 5)
+ " not null "
+ "org.bdware.consistency.plugin.ra.RequestAllExecutor 发送请求给 "
+ node.substring(0, 5));
networkManager.sendToAgent(node, sendStr);
}
}
private String[] getAccordingToRouteInfo(RouteInfo routeInfo, ContractRequest req, String[] members) {
try {
int val;
switch (routeInfo.useDefault) {
case byRequester:
val =
new BigInteger(req.getRequester(), 16)
.mod(new BigInteger("" + members.length))
.intValue();
while (val < 0) {
val = val + members.length;
}
return new String[]{members[val]};
case byArgHash:
val = req.getArg().hashCode();
val = val % members.length;
while (val < 0) {
val += members.length;
}
return new String[]{members[val]};
case byTarget:
JsonObject jo = req.getArg().getAsJsonObject();
val =
new BigInteger(jo.get("target").getAsString(), 16)
.mod(new BigInteger("" + members.length))
.intValue();
while (val < 0) {
val = val + members.length;
}
return new String[]{members[val]};
default:
return members;
}
} catch (Exception e) {
return members;
}
}
public boolean checkCurNodeNumValid() {
LOGGER.info("checkCurNodeNumValid");
String[] nodes = multiMeta.getMembers();
// List<String> nodes = info.members;
int validNode = 0;
for (String node : nodes) {
if (networkManager.hasAgentConnection(node)
&& masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID)
== RecoverFlag.Fine) {
validNode++;
}
}
int c = resultCount;
if (type == ContractExecType.Sharding) c = (int) Math.ceil((double) c / 2);
LOGGER.info("c=" + c + " validNode=" + validNode);
return validNode >= c;
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
LOGGER.info("[org.bdware.consistency.plugin.sharding.MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req));
// 获得action 函数名
LOGGER.info("action is : " + req.getAction());
req.setContractID(cmActions.getManager().getContractIDByName(req.getContractID()));
if (requestID != null && requestID.endsWith("_mul")) {
synchronized (lock) {
if (seqMap.containsKey(requestID)) {
req.seq = seqMap.get(requestID).seq;
} else {
req.seq = request_index.getAndIncrement();
seqMap.put(requestID, new MultiReqSeq(req.seq));
}
}
} else {
req.seq = request_index.getAndIncrement();
}
req.needSeq = true;
String id =
System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq;
LOGGER.info("execute receive requestID= " + requestID + " msgID=" + id);
if (checkCurNodeNumValid()) { // 校验成功 current node num 合法
LOGGER.info("checkCurNodeNumValid true");
ContractMeta meta =
cmActions.getManager().statusRecorder.getContractMeta(req.getContractID());
FunctionDesp fun = meta.getExportedFunction(req.getAction());
ResultCallback collector;
// TODO @fanbo 下面的count 1要改应该是根据route的规则来
//Count 根据join规则来
//nodes 根据route规则来
JoinInfo joinInfo = fun.joinInfo;
RouteInfo routeInfo = fun.routeInfo;
int count = getJoinCount(joinInfo, contractID);
LOGGER.info("requestID=" + requestID + " join Count: " + count);
String[] members = multiMeta.getMembers();
String[] nodes = getAccordingToRouteInfo(routeInfo, req, members);
if (nodes.length < count) {
count = nodes.length;
}
collector =
createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器
masterServerTCPAction.getSync().sleep(id, collector);
LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq);
sendRequest(id, req, nodes); // 发送请求
} else {
LOGGER.info("invalidNodeNumOnResult");
request_index.getAndDecrement();
ContractResult finalResult =
new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("node number unavailbale,request refused."));
rc.onResult(JsonUtil.toJson(finalResult));
}
}
private int getJoinCount(JoinInfo joinInfo, String contractID) {
if (joinInfo == null) return resultCount;
if (joinInfo.joinCount.isJsonPrimitive() && joinInfo.joinCount.getAsJsonPrimitive().isNumber()) {
return joinInfo.joinCount.getAsJsonPrimitive().getAsInt();
}
try {
ContractRequest cr = new ContractRequest();
cr.setContractID(contractID);
cr.setAction(joinInfo.joinCount.getAsString());
//TODO Arg需要好好设计一下
//TODO 又好用又简单的那种设计
//TODO
cr.setArg("");
String result = cmActions.getManager().executeLocally(cr, null);
return JsonUtil.parseString(result).getAsJsonObject().get("result").getAsInt();
} catch (Exception e) {
e.printStackTrace();
return 1;
}
}
// 清理缓存的多点合约请求序号
public void clearCache() {
final long time = System.currentTimeMillis() - 30000L;
seqMap.entrySet()
.removeIf(
entry -> {
MultiReqSeq cache = entry.getValue();
if (null == cache) {
return true;
}
return cache.startTime < time;
});
}
public static class ResultMerger extends ResultCallback {
ComponedContractResult componedContractResult;
AtomicInteger order;
String contractID;
int count; // 记录有多少个节点
int request_seq;
ResultCallback originalCallback;
Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点
JoinInfo joinInfo;
ResultMerger(
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID,
final JoinInfo joinInfo) {
originalCallback = originalCb;
this.count = count;
this.request_seq = request_seq;
this.contractID = contractID;
componedContractResult = new ComponedContractResult(count);
order = new AtomicInteger(0);
this.joinInfo = joinInfo;
}
public String getInfo() {
return "contractID="
+ contractID
+ " 收到第 "
+ order
+ " 个节点回复 : "
+ " order="
+ order
+ " count="
+ count
+ " ";
}
@Override
public void onResult(String str) {
// TODO 必须在这里聚合
// str的data是个ContractResult
// 在这儿也是返回个ContractResult
try {
LOGGER.info(str);
JsonObject obj = JsonParser.parseString(str).getAsJsonObject();
String id = obj.get("nodeID").getAsString();
if (nodeIDs.contains(id)) {
LOGGER.info("已经收到节点 " + id.substring(0, 5) + " 的结果,该结果被忽略");
return;
}
nodeIDs.add(id);
LOGGER.info(
"contractID="
+ contractID
+ " 收到第 "
+ order
+ " 个节点回复 : "
+ str
+ " order="
+ order
+ " count="
+ count);
componedContractResult.add(obj);
// 收集到所有结果
if (order.incrementAndGet() == count) {
ContractResult finalResult = componedContractResult.mergeFinalResult();
finalResult.needSeq = true;
finalResult.seq = request_seq;
// if (null == finalResult) {
// finalResult =
// new ContractResult(
// ContractResult.Status.Exception,
// new JsonPrimitive(
// "no nore than half of the
// consistent result"));
// originalCallback.onResult(new
// Gson().toJson(finalResult));
// } else {
if (joinInfo != null) {
handleJoinInfo(finalResult, joinInfo);
}
originalCallback.onResult(JsonUtil.toJson(finalResult));
// }
LOGGER.info(
"本次执行最终结果为 " + finalResult.size + "个节点合并的,结果为 " + finalResult.result);
// 集群中事务序号+1
// MasterServerTCPAction.contractID2Members.get(contractID).nextSeq();
cmActions.getManager()
.multiContractRecorder
.getMultiContractMeta(contractID)
.nextSeqAtMaster();
// recover其中无状态合约CP出错无需恢复
Set<String> nodesID = componedContractResult.getProblemNodes();
if (null == nodesID || nodesID.isEmpty()) {
return;
}
for (String nodeID : nodesID) {
LOGGER.info("结果出现问题的节点有:" + nodeID);
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID)
== RecoverFlag.Fine) {
masterServerRecoverMechAction.getRecoverStatusMap()
.get(nodeID)
.put(contractID, RecoverFlag.ToRecover);
}
}
for (String nodeID : nodesID) {
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID)
== RecoverFlag.ToRecover) {
LOGGER.info("问题节点开始恢复:" + nodeID);
// 因为该节点结果有误所以即时是stableMode也认为trans记录不可信
// 直接通过load别的节点来恢复
masterServerRecoverMechAction.restartContractFromCommonMode(
nodeID, contractID);
}
}
}
// clearCache();
} catch (Exception e) {
e.printStackTrace();
LOGGER.info("本次执行最终结果为有异常");
}
}
private void handleJoinInfo(ContractResult finalResult, JoinInfo joinInfo) {
JsonObject jo = finalResult.result.getAsJsonObject();
if (joinInfo != null && joinInfo.joinRule != null) {
//TODO 不应该是double 类型
switch (joinInfo.joinRule) {
case "add":
double val = 0;
for (String key : jo.keySet()) {
val += jo.get(key).getAsDouble();
}
finalResult.result = new JsonPrimitive(val);
break;
case "multiply":
val = 1;
for (String key : jo.keySet()) {
val *= jo.get(key).getAsDouble();
}
finalResult.result = new JsonPrimitive(val);
break;
}
}
}
}
}

View File

@ -0,0 +1,21 @@
package org.bdware.consistency.plugin.sharding;
import org.bdware.sc.bean.ContractExecType;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor;
import java.util.Map;
public class MultiPointCooperationExecutorFactory implements ContractExecutorFactory {
@Override
public String getExecutorName() {
return "Sharding";
}
@Override
public ContractExecutor getInstance(Map<String, Object> args) {
int nodeSize = (int) args.get("nodeSize");
String contractID = (String) args.get("contractID");
return new MultiPointCooperationExecutor(ContractExecType.Sharding, nodeSize, contractID);
}
}

View File

@ -0,0 +1,308 @@
package org.bdware.consistency.plugin.sharding;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.common.AbstractContextContractExecutor;
import org.bdware.sc.ContractClient;
import org.bdware.sc.ContractManager;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.bean.FunctionDesp;
import org.bdware.sc.bean.SM2Verifiable;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.RecoverFlag;
import org.bdware.sc.util.HashUtil;
import org.bdware.sc.util.JsonUtil;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* @author Kaidong Wu
*/
public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class);
private static final int SUBMIT_LIMIT = 1024;
private final Queue<ContractRequest> reqQueue = new ConcurrentLinkedQueue<>();
private final MultiContractMeta meta;
private final Map<String, Block> toExecuted = new ConcurrentHashMap<>();
private final Set<String> executedBlocks = ConcurrentHashMap.newKeySet();
private final Map<String, Boolean> executedTxs = new ConcurrentHashMap<>();
private final Object flag = new Object();
private final ScheduledFuture<?> future;
private boolean running = true;
private Block b = new Block();
public SelfAdaptiveShardingExecutor(String contractID) {
this.meta =
cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID);
this.future = ContractManager.scheduledThreadPool.scheduleWithFixedDelay(
this::submitBlock,
2,
2,
TimeUnit.SECONDS);
LOGGER.debug(String.format("ContractManager.threadPool=%d/%d",
((ThreadPoolExecutor) ContractManager.threadPool).getActiveCount(),
((ThreadPoolExecutor) ContractManager.threadPool).getPoolSize()));
ContractManager.threadPool.execute(() -> {
LOGGER.info(
"[SelfAdaptiveShardingExecutor " + meta.getContractID() + "] starting service..." + running);
while (running) {
LOGGER.info("checking blocks to be executed, latest block=" +
this.b.prevHash + ", to be executed size=" + toExecuted.size());
LOGGER.debug("executed: " + JsonUtil.toJson(executedBlocks) + "\n\t" + JsonUtil.toJson(executedTxs));
while (!toExecuted.isEmpty()) {
String key = this.b.prevHash;
Block block = toExecuted.get(key);
if (null != block) {
executeBlock(block);
}
toExecuted.remove(key);
}
synchronized (flag) {
try {
flag.wait();
} catch (InterruptedException e) {
LOGGER.warn(String.format(
"[SelfAdaptiveShardingExecutor %s] waiting is interrupted: %s",
meta.getContractID(),
e.getMessage()));
}
}
}
});
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) {
// check client
ContractClient client = cmActions.getManager().getClient(meta.getContractID());
if (null == client) {
LOGGER.error("contract " + meta.getContractID() + " not found!");
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("contract " + meta.getContractID() + " not found!"))));
return;
}
// check function
FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction());
if (null == funDesp) {
LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!");
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive(
String.format("action %s of contract %s not found!",
req.getAction(),
meta.getContractID())))));
return;
}
// for view function, execute it
if (funDesp.isView) {
cmActions.getManager().executeLocallyAsync(req, rcb, hcb);
return;
}
// normal function, check if it is in blocks
if (executedTxs.containsKey(requestID)) {
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("this request has been packed!"))));
return;
}
// add blocks into request cache
LOGGER.debug("receive contract request " + requestID);
executedTxs.put(requestID, false);
reqQueue.add(req);
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Executing,
new JsonPrimitive("this request is adding into blocks"))));
// if cache is full, submit
if (reqQueue.size() >= SUBMIT_LIMIT) {
ContractManager.threadPool.execute(this::submitBlock);
}
}
@Override
public void close() {
// stop threads
this.future.cancel(true);
this.running = false;
LOGGER.info("destruct executor of contract " + meta.getContractID());
}
public void execute(String blockStr) {
Block block = JsonUtil.fromJson(blockStr, Block.class);
// the block must have not been cached or executed, and must be valid
if (!toExecuted.containsKey(block.prevHash) &&
!executedBlocks.contains(block.hash) &&
block.isValid()) {
// add block into block cache
LOGGER.info(String.format(
"[SelfAdaptiveShardingExecutor %s] receive block %s -> %s," +
" %d transactions, timestamp=%d, size=%d",
meta.getContractID(),
block.hash,
block.prevHash,
block.requests.length,
block.timestamp,
blockStr.length()));
toExecuted.put(block.prevHash, block);
// notify thread to execute blocks
synchronized (flag) {
flag.notify();
}
}
}
private synchronized void executeBlock(Block block) {
// used for the thread to execute blocks
LOGGER.debug("start");
// check contract requests, requests must have not been executed
for (ContractRequest request : block.requests) {
if (executedTxs.containsKey(request.getRequestID()) && executedTxs.get(request.getRequestID())) {
LOGGER.debug("find request " + request.getRequestID() + " has been executed!");
return;
}
}
// TODO check status
// executed requests
for (ContractRequest request : block.requests) {
String ret = cmActions.getManager().executeLocally(request, null);
LOGGER.debug(String.format(
"[SelfAdaptiveShardingExecutor %s] result of request %s: %s",
meta.getContractID(),
request.getRequestID(),
ret));
executedTxs.put(request.getRequestID(), true);
}
LOGGER.info(String.format(
"[SelfAdaptiveShardingExecutor %s] execute %d transactions of block %s",
meta.getContractID(),
block.requests.length,
block.hash));
// TODO create check point
this.b = new Block(block.hash, this.b.height + 1);
executedBlocks.add(block.hash);
}
private void submitBlock() {
Block block = fillBlock();
if (null != block) {
LOGGER.info("deliver block " + block.hash + "...");
LOGGER.debug(JsonUtil.toPrettyJson(block));
String[] nodes = this.meta.getMembers();
JsonObject req = new JsonObject();
req.addProperty("action", "deliverBlock");
req.addProperty("data", JsonUtil.toJson(block));
req.addProperty("contractID", this.meta.getContractID());
String reqStr = req.toString();
// deliver blocks
for (String node : nodes) {
if (masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(this.meta.getContractID())
== RecoverFlag.Fine) {
networkManager.sendToAgent(node, reqStr);
}
}
}
}
private synchronized Block fillBlock() {
// pack contract requests into a block
ContractRequest[] requests = new ContractRequest[Math.min(reqQueue.size(), SUBMIT_LIMIT)];
if (requests.length == 0) {
return null;
}
for (int i = 0; i < requests.length; ++i) {
requests[i] = reqQueue.poll();
}
this.b.fillBlock(requests);
return this.b;
}
static class Block extends SM2Verifiable {
String prevHash = "0";
String hash;
int height;
String checkPoint;
String body;
String nodePubKey;
ContractRequest[] requests;
long timestamp;
public Block() {
this.height = 0;
}
public Block(String prev, int height) {
this.prevHash = prev;
this.height = height;
}
public void fillBlock(ContractRequest[] requests) {
this.requests = requests;
this.timestamp = System.currentTimeMillis();
this.body = merkle(requests);
this.hash = computeHash();
doSignature(cmActions.getManager().nodeCenterConn.getNodeKeyPair());
}
public boolean isValid() {
return computeHash().equals(hash) && body.equals(merkle(this.requests)) && verifySignature();
}
private String computeHash() {
return HashUtil.sha3(
String.valueOf(this.height),
this.prevHash,
this.checkPoint,
this.body);
}
private String merkle(ContractRequest[] requests) {
// manage requests as a merkle tree
if (requests.length == 0) {
return null;
}
if (requests.length == 1) {
return HashUtil.sha3(requests[0].getRequestID());
}
Queue<String> reqQueue =
Arrays.stream(requests).map(ContractRequest::getRequestID)
.collect(Collectors.toCollection(ArrayDeque::new));
do {
int size;
for (size = reqQueue.size(); size > 1; size -= 2) {
reqQueue.add(HashUtil.sha3(reqQueue.poll(), reqQueue.poll()));
}
if (size == 1) {
reqQueue.add(reqQueue.poll());
}
} while (1 != reqQueue.size());
return reqQueue.poll();
}
@Override
public String getPublicKey() {
return nodePubKey;
}
@Override
public void setPublicKey(String pubkey) {
this.nodePubKey = pubkey;
}
@Override
public String getContentStr() {
return this.hash;
}
}
@Override
public void onDeliverBlock(String data) {
execute(data);
}
}

View File

@ -0,0 +1,19 @@
package org.bdware.consistency.plugin.sharding;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor;
import java.util.Map;
public class SelfAdaptiveShardingExecutorFactory implements ContractExecutorFactory {
@Override
public String getExecutorName() {
return "SASharding";
}
@Override
public ContractExecutor getInstance(Map<String, Object> args) {
String contractID = (String) args.get("contractID");
return new SelfAdaptiveShardingExecutor(contractID);
}
}

View File

@ -0,0 +1,24 @@
package org.bdware.consistency.plugin.single;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.SingleNodeExecutor;
import java.util.Map;
public class SingleNodeExecutorFactory implements ContractExecutorFactory {
private static final Logger LOGGER = LogManager.getLogger(SingleNodeExecutorFactory.class);
@Override
public String getExecutorName() {
return "Sole";
}
@Override
public ContractExecutor getInstance(Map<String, Object> args) {
LOGGER.info("Sole contract is not supported in multi-point mode");
return SingleNodeExecutor.instance;
}
}