refactor: prune unused code

This commit is contained in:
Frank.R.Wu 2021-11-01 15:04:56 +08:00
parent 9f3563334d
commit bb33d5b0e7
11 changed files with 185 additions and 142 deletions

View File

@ -30,14 +30,13 @@ import org.bdware.sc.util.ExceptionUtil;
import org.bdware.server.action.FileActions; import org.bdware.server.action.FileActions;
import org.bdware.server.doip.ContractRepositoryMain; import org.bdware.server.doip.ContractRepositoryMain;
import org.bdware.server.http.CMHttpHandler; import org.bdware.server.http.CMHttpHandler;
import org.bdware.server.runnable.BDLedgerContractServerTask;
import org.bdware.server.runnable.BDLedgerTask;
import org.bdware.server.ws.ContractManagerFrameHandler; import org.bdware.server.ws.ContractManagerFrameHandler;
import org.bdware.units.NetworkManager; import org.bdware.units.NetworkManager;
import java.io.*; import java.io.*;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.Date; import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
public class CMHttpServer { public class CMHttpServer {
@ -73,11 +72,22 @@ public class CMHttpServer {
} }
if (cmdConf.withBdledgerServer) { if (cmdConf.withBdledgerServer) {
ContractManager.threadPool.execute( ContractManager.threadPool.execute(
new BDLedgerContractServerTask(cmdConf.servicePort + 4)); () -> NetworkManager.instance.initP2P(cmdConf.servicePort + 4));
} }
// 可自动运行bdledger可执行文件也可在shell脚步中运行和停止 // 可自动运行bdledger可执行文件也可在shell脚步中运行和停止
if (!cmdConf.withBdledgerClient.isEmpty()) { if (!cmdConf.withBdledgerClient.isEmpty()) {
ContractManager.threadPool.execute(new BDLedgerTask(cmdConf.withBdledgerClient)); ContractManager.scheduledThreadPool.schedule(
() -> {
File ledgerClient = new File(cmdConf.withBdledgerClient);
LOGGER.debug("canRead=" + ledgerClient.canRead() +
" path=" + ledgerClient.getAbsolutePath());
try {
Runtime.getRuntime().exec(ledgerClient.getAbsolutePath());
} catch (IOException e) {
LOGGER.warn("start bdledger client failed: " + e.getMessage());
}
},
1, TimeUnit.SECONDS);
} }
if (cmdConf.enableEventPersistence) { if (cmdConf.enableEventPersistence) {
ContractManager.eventPersistenceEnabled = true; ContractManager.eventPersistenceEnabled = true;

View File

@ -26,11 +26,12 @@ import org.bdware.units.NetworkManager;
import org.zz.gmhelper.SM2KeyPair; import org.zz.gmhelper.SM2KeyPair;
import org.zz.gmhelper.SM2Util; import org.zz.gmhelper.SM2Util;
import java.util.ArrayList; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
// WebSocket接口 // WebSocket接口
public class MasterWSAction { public class MasterWSAction {
@ -138,13 +139,16 @@ public class MasterWSAction {
} }
} }
*/ */
List<String> nodeNames; // nodes' peerID
// all nodes' peerID in the unit
String[] nodeNamesStr = String[] nodeNamesStr =
args.get("peersID").getAsString().split(","); // all nodes' peerID in the unit args.get("peersID").getAsString().split(",");
// record information of these nodes
List<String> nodeNames = new ArrayList<>(); // nodes' peerID nodeNames =
for (String str : nodeNamesStr) { Arrays.stream(nodeNamesStr)
if (null != str && str.length() > 0) nodeNames.add(str); .filter(x -> null != x && !x.isEmpty())
} // 记录这些节点信息 .collect(Collectors.toList());
int nodeSize = nodeNames.size();
// 方式一向NodeCenter发要求Slave节点主动连接到Master节点. // 方式一向NodeCenter发要求Slave节点主动连接到Master节点.
Map<String, Object> requestConnect = new HashMap<>(); Map<String, Object> requestConnect = new HashMap<>();
@ -207,7 +211,7 @@ public class MasterWSAction {
rc.onResult(JsonUtil.toJson(ret)); rc.onResult(JsonUtil.toJson(ret));
} }
}, },
nodeNames.size()); nodeSize);
MasterServerTCPAction.sync.sleepWithTimeout(requestID, collector, 20); MasterServerTCPAction.sync.sleepWithTimeout(requestID, collector, 20);
Map<String, Object> request = new HashMap<>(); Map<String, Object> request = new HashMap<>();
request.put("master", keyPair.getPublicKeyStr()); request.put("master", keyPair.getPublicKeyStr());
@ -219,8 +223,6 @@ public class MasterWSAction {
request.put("requestID", requestID); request.put("requestID", requestID);
request.put("members", nodeNames); // 执行这个合约的所有节点的pubKey request.put("members", nodeNames); // 执行这个合约的所有节点的pubKey
int nodeSize = nodeNames.size();
contract.setNumOfCopies(1); // TODO copies是1
contract.setShadingId(0); // 默认 contract.setShadingId(0); // 默认
switch (contract.getType()) { switch (contract.getType()) {
case RequestAllResponseFirst: case RequestAllResponseFirst:
@ -230,6 +232,7 @@ public class MasterWSAction {
contract.setNumOfCopies(nodeSize); contract.setNumOfCopies(nodeSize);
break; break;
default: default:
contract.setNumOfCopies(1); // TODO copies是1
break; break;
} }
LOGGER.debug("contract " + contract.getID() + " has been added into contractID2Members"); LOGGER.debug("contract " + contract.getID() + " has been added into contractID2Members");
@ -238,7 +241,7 @@ public class MasterWSAction {
String startReq = JsonUtil.toJson(request); String startReq = JsonUtil.toJson(request);
LOGGER.debug("start contract " + startReq); LOGGER.debug("start contract " + startReq);
for (int i = 0; i < nodeNames.size(); ++i) { for (int i = 0; i < nodeSize; ++i) {
String nodeID = nodeNames.get(i); // 根据i获得nodeID String nodeID = nodeNames.get(i); // 根据i获得nodeID
// for (String nodeID : nodeNames) { // for (String nodeID : nodeNames) {
// 设置字段 // 设置字段
@ -246,7 +249,7 @@ public class MasterWSAction {
contract.setShadingId(i); contract.setShadingId(i);
request.put("contractStr", JsonUtil.toJson(contract)); request.put("contractStr", JsonUtil.toJson(contract));
startReq = JsonUtil.toJson(request); startReq = JsonUtil.toJson(request);
LOGGER.debug("启动合约:" + startReq); LOGGER.debug("start contract: " + startReq);
} }
SlaveNode node; SlaveNode node;
node = MasterServerTCPAction.id2Slaves.get(nodeID); // slave node信息 node = MasterServerTCPAction.id2Slaves.get(nodeID); // slave node信息

View File

@ -10,7 +10,6 @@ import org.bdware.sc.*;
import org.bdware.sc.bean.Contract; import org.bdware.sc.bean.Contract;
import org.bdware.sc.bean.ContractExecType; import org.bdware.sc.bean.ContractExecType;
import org.bdware.sc.bean.ContractRequest; import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.InstrumentedResultCallback;
import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.db.CMTables; import org.bdware.sc.db.CMTables;
import org.bdware.sc.db.KeyValueDBUtil; import org.bdware.sc.db.KeyValueDBUtil;
@ -127,7 +126,7 @@ public class MasterClientTCPAction {
int nodeSize = contract.getNumOfCopies(); int nodeSize = contract.getNumOfCopies();
switch (contract.getType()) { switch (contract.getType()) {
case Sole: case Sole:
LOGGER.info("can't support Solo in multi-point mode"); LOGGER.info("Sole contract is not supported in multi-point mode");
return null; return null;
case RequestOnce: case RequestOnce:
executor = new RequestOnceExecutor(contractID); executor = new RequestOnceExecutor(contractID);
@ -207,7 +206,7 @@ public class MasterClientTCPAction {
// SimpleDateFormat("yyyy-MM-dd.HH:mm:ss") // SimpleDateFormat("yyyy-MM-dd.HH:mm:ss")
// //
// .format(System.currentTimeMillis())); // .format(System.currentTimeMillis()));
Map<String, String> ping = new HashMap<String, String>(); Map<String, String> ping = new HashMap<>();
ping.put("action", "masterPing"); ping.put("action", "masterPing");
handler.sendMsg(JsonUtil.toJson(ping)); handler.sendMsg(JsonUtil.toJson(ping));
} }
@ -229,10 +228,10 @@ public class MasterClientTCPAction {
long cur = System.currentTimeMillis(); long cur = System.currentTimeMillis();
if (cur - lastMasterPongTime >= (2 * sendDelay)) { if (cur - lastMasterPongTime >= (2 * sendDelay)) {
LOGGER.info( LOGGER.info(
"lastMasterPongTime=" "lastMasterPongTime="
+ new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss") + new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss")
.format(lastMasterPongTime) .format(lastMasterPongTime)
+ " 认为master崩溃!"); + " 认为master崩溃!");
// 向NC发通知重新选举master如果NC没有收到所有节点的重选请求就认为是这个节点和master连接断开这个节点需要重连master // 向NC发通知重新选举master如果NC没有收到所有节点的重选请求就认为是这个节点和master连接断开这个节点需要重连master
@ -243,13 +242,13 @@ public class MasterClientTCPAction {
for (String contractID : contractID2MasterInfo.keySet()) { for (String contractID : contractID2MasterInfo.keySet()) {
if (contractID2MasterInfo.get(contractID).master.equals(master) if (contractID2MasterInfo.get(contractID).master.equals(master)
&& KeyValueDBUtil.instance.containsKey( && KeyValueDBUtil.instance.containsKey(
CMTables.UnitContracts.toString(), contractID)) { CMTables.UnitContracts.toString(), contractID)) {
LOGGER.info( LOGGER.info(
"认为合约 " "认为合约 "
+ contractID + contractID
+ " 的master崩溃 master1=" + " 的master崩溃 master1="
+ contractID2MasterInfo.get(contractID) + contractID2MasterInfo.get(contractID)
.master .master
+ " master2=" + " master2="
+ master); + master);
request.put("contractID", contractID); request.put("contractID", contractID);
@ -268,7 +267,7 @@ public class MasterClientTCPAction {
request.put("members", StringUtils.join(members, ",")); request.put("members", StringUtils.join(members, ","));
NetworkManager.instance.sendToNodeCenter( NetworkManager.instance.sendToNodeCenter(
JsonUtil.toJson(request)); JsonUtil.toJson(request));
LOGGER.info( LOGGER.info(
"认为合约 " "认为合约 "
+ contractID + contractID
+ " 的master崩溃 当前master为 " + " 的master崩溃 当前master为 "
@ -614,9 +613,8 @@ public class MasterClientTCPAction {
// } catch (InterruptedException e) { // } catch (InterruptedException e) {
// e.printStackTrace(); // e.printStackTrace();
// } // }
MultiContractMeta cei = MultiContractMeta cei;
CMActions.manager.multiContractRecorder.createIfNotExist(contractID); synchronized (cei = CMActions.manager.multiContractRecorder.createIfNotExist(contractID)) {
synchronized (cei) {
while (!cei.queue.isEmpty()) { while (!cei.queue.isEmpty()) {
ContractRequest request = cei.queue.peek(); ContractRequest request = cei.queue.peek();
// logger.info("此时队列为 "); // logger.info("此时队列为 ");
@ -627,11 +625,8 @@ public class MasterClientTCPAction {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
cei.curExeSeq = request.seq; cei.curExeSeq = request.seq;
LOGGER.info("调试位置 1 cei.curExeSeq=" + cei.curExeSeq); LOGGER.info("调试位置 1 cei.curExeSeq=" + cei.curExeSeq);
boolean isMultiReq = false; boolean isMultiReq = request.getRequestID().endsWith("_mul");
if (request.getRequestID().endsWith("_mul")) {
isMultiReq = true;
}
if (isMultiReq) { if (isMultiReq) {
// 开始执行多点合约请求需要将缓存的其他节点发来的同requestID的请求也触发调用 // 开始执行多点合约请求需要将缓存的其他节点发来的同requestID的请求也触发调用
LOGGER.info("开始执行多点合约请求需要将缓存的其他节点发来的同requestID的请求也触发调用"); LOGGER.info("开始执行多点合约请求需要将缓存的其他节点发来的同requestID的请求也触发调用");
@ -731,7 +726,7 @@ public class MasterClientTCPAction {
} else { } else {
// ask master to re-send requests // ask master to re-send requests
LOGGER.info("send request to get cachedRequests"); LOGGER.info("send request to get cachedRequests");
Map<String, String> ret = new HashMap<String, String>(); Map<String, String> ret = new HashMap<>();
ret.put("action", "sendCachedRequests"); ret.put("action", "sendCachedRequests");
ret.put("contractID", contractID); ret.put("contractID", contractID);
SM2KeyPair keyPair = GlobalConf.instance.keyPair; SM2KeyPair keyPair = GlobalConf.instance.keyPair;
@ -770,55 +765,52 @@ public class MasterClientTCPAction {
LOGGER.info("Receive Reroute Info:" + jo.toString()); LOGGER.info("Receive Reroute Info:" + jo.toString());
MasterServerTCPAction.sync.instrumentWakeUp( MasterServerTCPAction.sync.instrumentWakeUp(
jo.get("responseID").getAsString(), jo.get("responseID").getAsString(),
new InstrumentedResultCallback() { (resultCallback, result1) -> {
@Override resultCallback.cancelTimeOut();
public void onResult(ResultCallback resultCallback, JsonObject result) { LOGGER.info("try To reRoute");
resultCallback.cancelTimeOut();
LOGGER.info("try To reRoute");
JsonObject cr = result.get("data").getAsJsonObject(); JsonObject cr = result1.get("data").getAsJsonObject();
if (resultCallback instanceof SyncResult.ContractResultCallback) { if (resultCallback instanceof SyncResult.ContractResultCallback) {
SyncResult.ContractResultCallback cb = SyncResult.ContractResultCallback cb =
(SyncResult.ContractResultCallback) resultCallback; (SyncResult.ContractResultCallback) resultCallback;
if (cb.getReRouteCount() > 0) { if (cb.getReRouteCount() > 0) {
ContractResult contractResult =
new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive(
"Contract "
+ cr.get("contractID").getAsString()
+ "can't be located in router"));
resultCallback.onResult(JsonUtil.toJson(contractResult));
return;
} else cb.incReRouteCount();
LOGGER.info("inc reRoute:" + cb.getReRouteCount());
}
String pubkey =
CMActions.manager.nodeCenterConn.reRouteContract(
cr.get("contractID").getAsString());
LOGGER.info(
"ReRoute Result:"
+ cr.get("contractID").getAsString()
+ " pubKey:"
+ pubkey);
if (pubkey == null) {
ContractResult contractResult = ContractResult contractResult =
new ContractResult( new ContractResult(
ContractResult.Status.Error, ContractResult.Status.Error,
new JsonPrimitive( new JsonPrimitive(
"Contract " "Contract "
+ cr.get("contractID").getAsString() + cr.get("contractID").getAsString()
+ "can't be located in router using reroute")); + "can't be located in router"));
resultCallback.onResult(JsonUtil.toJson(contractResult)); resultCallback.onResult(JsonUtil.toJson(contractResult));
return; return;
} } else cb.incReRouteCount();
LOGGER.debug("Receive Reroute Result:" + pubkey); LOGGER.info("inc reRoute:" + cb.getReRouteCount());
ContractRequest contractRequest =
JsonUtil.fromJson(cr, ContractRequest.class);
CMActions.manager.masterStub.executeByOtherNodeAsync(
pubkey, contractRequest, resultCallback);
} }
String pubkey =
CMActions.manager.nodeCenterConn.reRouteContract(
cr.get("contractID").getAsString());
LOGGER.info(
"ReRoute Result:"
+ cr.get("contractID").getAsString()
+ " pubKey:"
+ pubkey);
if (pubkey == null) {
ContractResult contractResult =
new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive(
"Contract "
+ cr.get("contractID").getAsString()
+ "can't be located in router using reroute"));
resultCallback.onResult(JsonUtil.toJson(contractResult));
return;
}
LOGGER.debug("Receive Reroute Result:" + pubkey);
ContractRequest contractRequest =
JsonUtil.fromJson(cr, ContractRequest.class);
CMActions.manager.masterStub.executeByOtherNodeAsync(
pubkey, contractRequest, resultCallback);
}, },
jo); jo);
} }

View File

@ -5,7 +5,10 @@ import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.bdware.heartbeat.HeartBeatUtil; import org.bdware.heartbeat.HeartBeatUtil;
import org.bdware.sc.*; import org.bdware.sc.ContractClient;
import org.bdware.sc.ContractManager;
import org.bdware.sc.ContractMeta;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractExecType; import org.bdware.sc.bean.ContractExecType;
import org.bdware.sc.bean.ContractRequest; import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.conn.ResultCallback;
@ -30,6 +33,9 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.bdware.sc.ContractStatusEnum.HANGED;
import static org.bdware.sc.ContractStatusEnum.KILLED;
public class MasterServerTCPAction { public class MasterServerTCPAction {
public static final org.bdware.server.action.SyncResult sync = new SyncResult(); public static final org.bdware.server.action.SyncResult sync = new SyncResult();
private static final Logger LOGGER = LogManager.getLogger(MasterServerTCPAction.class); private static final Logger LOGGER = LogManager.getLogger(MasterServerTCPAction.class);
@ -148,7 +154,7 @@ public class MasterServerTCPAction {
MultiContractMeta contractMeta = MultiContractMeta contractMeta =
CMActions.manager.multiContractRecorder.getMultiContractMeta( CMActions.manager.multiContractRecorder.getMultiContractMeta(
client.getContractID()); client.getContractID());
if (contractMeta != null && contractMeta.isMaster()) { // 如果是master if (null != contractMeta && contractMeta.isMaster()) { // 如果是master
ResultCollector collector = ResultCollector collector =
new ResultCollector( new ResultCollector(
request.get("requestID").getAsString(), request.get("requestID").getAsString(),
@ -241,9 +247,9 @@ public class MasterServerTCPAction {
@Action(async = true) @Action(async = true)
public void receiveTrustfullyResult(JsonObject jo, ResultCallback cb) { public void receiveTrustfullyResult(JsonObject jo, ResultCallback cb) {
String resonseID = jo.get("responseID").getAsString(); String responseID = jo.get("responseID").getAsString();
LOGGER.info("========== ExecuteContractLocally wakeUp:" + resonseID); LOGGER.info("========== ExecuteContractLocally wakeUp:" + responseID);
MasterServerTCPAction.sync.wakeUp(resonseID, jo.toString()); MasterServerTCPAction.sync.wakeUp(responseID, jo.toString());
} }
@Action(async = true) @Action(async = true)
@ -306,7 +312,7 @@ public class MasterServerTCPAction {
String contractID = cr.get("contractID").getAsString(); String contractID = cr.get("contractID").getAsString();
ContractMeta contractMeta = ContractMeta contractMeta =
CMActions.manager.statusRecorder.getContractMeta(contractID); CMActions.manager.statusRecorder.getContractMeta(contractID);
if (contractMeta == null || contractMeta.getStatus() == ContractStatusEnum.KILLED) { if (contractMeta == null || contractMeta.getStatus() == KILLED) {
LOGGER.debug("send ReRoute response:" + cr.toString()); LOGGER.debug("send ReRoute response:" + cr.toString());
JsonObject result = new JsonObject(); JsonObject result = new JsonObject();
result.addProperty("action", "reRouteContract"); result.addProperty("action", "reRouteContract");
@ -341,10 +347,9 @@ public class MasterServerTCPAction {
} }
}, },
JsonUtil.fromJson(cr, ContractRequest.class)); JsonUtil.fromJson(cr, ContractRequest.class));
return;
} else { } else {
ContractRequest contractRequest = JsonUtil.fromJson(cr, ContractRequest.class); ContractRequest contractRequest = JsonUtil.fromJson(cr, ContractRequest.class);
if ( contractMeta.getStatus()==ContractStatusEnum.HANGED){ if (contractMeta.getStatus() == HANGED) {
CMActions.manager.statusRecorder.ensureRunning(contractRequest); CMActions.manager.statusRecorder.ensureRunning(contractRequest);
} }
ContractClient client = ContractClient client =
@ -419,7 +424,7 @@ public class MasterServerTCPAction {
+ nodeID.substring(0, 5) + nodeID.substring(0, 5)
+ " 下线! " + " 下线! "
+ this.toString()); + this.toString());
HeartBeatUtil.getInstance().cancel(this); HeartBeatUtil.getInstance().cancel(this);
id2Slaves.remove(nodeID); id2Slaves.remove(nodeID);
try { try {

View File

@ -20,6 +20,7 @@ import org.bdware.sc.db.CMTables;
import org.bdware.sc.db.KeyValueDBUtil; import org.bdware.sc.db.KeyValueDBUtil;
import org.bdware.sc.event.REvent; import org.bdware.sc.event.REvent;
import org.bdware.sc.units.MultiContractMeta; import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.util.HashUtil;
import org.bdware.sc.util.JsonUtil; import org.bdware.sc.util.JsonUtil;
import org.bdware.server.CMHttpServer; import org.bdware.server.CMHttpServer;
import org.bdware.server.GlobalConf; import org.bdware.server.GlobalConf;
@ -39,6 +40,7 @@ import org.zz.gmhelper.SM2KeyPair;
import org.zz.gmhelper.SM2Util; import org.zz.gmhelper.SM2Util;
import java.io.*; import java.io.*;
import java.math.BigInteger;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
@ -438,7 +440,7 @@ public class NodeCenterClientController implements NodeCenterConn {
} }
MasterClientRecoverMechAction.recoverSet.add(contractID); MasterClientRecoverMechAction.recoverSet.add(contractID);
LOGGER.info("queryUnitContractsID master为" + master.substring(0, 5)+" -> 合约id:"+contractID); LOGGER.info("queryUnitContractsID master为" + master.substring(0, 5) + " -> 合约id:" + contractID);
RecoverMechTimeRecorder.queryMasterFinish = System.currentTimeMillis(); RecoverMechTimeRecorder.queryMasterFinish = System.currentTimeMillis();
queryUnitContractsID2(contractID, master); queryUnitContractsID2(contractID, master);
} }
@ -805,6 +807,75 @@ public class NodeCenterClientController implements NodeCenterConn {
sync.wakeUp(jo.get("responseID").getAsString(), jo.toString()); sync.wakeUp(jo.get("responseID").getAsString(), jo.toString());
} }
@Override
public String[] getClusterByKey(String key, int k) {
NodeKey[] nodes = this.listNodes();
if (nodes.length == 0) {
return null;
}
if (nodes.length == 1) {
return new String[]{this.getNodeId(nodes[0].id)};
}
// get hash with enough length
String hash = HashUtil.sha3ToFixedLen(key, nodes[0].id.length());
BigInteger biH = new BigInteger(hash, 16);
// binary search, to find the nearest node with hash
int l = 0, r = nodes.length - 1, m = 0,
comL = biH.compareTo(nodes[l].biId), comR = nodes[r].biId.compareTo(biH),
comM;
String selected;
do {
if (comL < 1) {
selected = nodes[l].id;
break;
}
if (comR < 1) {
selected = nodes[r].id;
break;
}
if (l + 1 == r) {
if (biH.subtract(nodes[l].biId).compareTo(nodes[r].biId.subtract(biH)) < 1) {
selected = nodes[l].id;
} else {
selected = nodes[r].id;
}
break;
}
m = (l + r) >> 1;
comM = biH.compareTo(nodes[m].biId);
if (comM < 1) {
r = m;
comR = -comM;
} else {
l = m;
comL = comM;
}
} while (true);
List<String> ret = new ArrayList<>();
ret.add(this.getNodeId(selected));
if (k > 1) {
l = m - 1;
r = m + 1;
while (ret.size() < k && (l >= 0 || r < nodes.length)) {
if (l < 0) {
ret.add(this.getNodeId(nodes[r++].id));
} else if (r >= nodes.length) {
ret.add(this.getNodeId(nodes[l--].id));
} else {
if (biH.subtract(nodes[l].biId).compareTo(nodes[r].biId.subtract(biH)) < 1) {
ret.add(this.getNodeId(nodes[l--].id));
} else {
ret.add(this.getNodeId(nodes[r++].id));
}
}
}
}
return ret.toArray(new String[0]);
}
static class NetNeighbors { static class NetNeighbors {
NodeKey[] arr; NodeKey[] arr;
Map<String, String> map; Map<String, String> map;

View File

@ -1,20 +0,0 @@
package org.bdware.server.runnable;
import org.bdware.units.NetworkManager;
public class BDLedgerContractServerTask implements Runnable {
private int port;
public BDLedgerContractServerTask(int port) {
this.port = port;
}
@Override
public void run() {
try {
NetworkManager.instance.initP2P(port);
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -1,24 +0,0 @@
package org.bdware.server.runnable;
import java.io.File;
public class BDLedgerTask implements Runnable{
private String path;
public BDLedgerTask(String path) {
this.path = path;
}
@Override
public void run() {
try {
Thread.sleep(1000);
File file = new File(path);
System.out.println(file.canRead() + file.getAbsolutePath());
Runtime.getRuntime().exec(file.getAbsolutePath());
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -66,7 +66,7 @@ public class MasterProxy implements MasterStub {
assert client.isUnit(); assert client.isUnit();
// ********** hyy ********** // // ********** hyy ********** //
// 修改这个地方的执行逻辑,判断路由 // 修改这个地方的执行逻辑,判断路由
LOGGER.info(client.getContractType());//shading LOGGER.debug(client.getContractType());//shading
// ********** hyy ********** // // ********** hyy ********** //
// assert client.isMaster(); // assert client.isMaster();
@ -99,7 +99,7 @@ public class MasterProxy implements MasterStub {
@Override @Override
public void executeByOtherNodeAsync(String pubKey, ContractRequest c, ResultCallback cb) { public void executeByOtherNodeAsync(String pubKey, ContractRequest c, ResultCallback cb) {
MasterConnector handler = CONNECTORS.get(pubKey); MasterConnector handler = CONNECTORS.get(pubKey);
LOGGER.error(pubKey+" "+c.getRequestID()+" "+c.getContractID()+" cb:"+cb); LOGGER.error(pubKey + " " + c.getRequestID() + " " + c.getContractID() + " cb:" + cb);
int maxMasterProxyLoad = CongestionControl.masterProxyLoad.incrementAndGet(); int maxMasterProxyLoad = CongestionControl.masterProxyLoad.incrementAndGet();
if (maxMasterProxyLoad > CongestionControl.maxMasterProxyLoad) if (maxMasterProxyLoad > CongestionControl.maxMasterProxyLoad)
CongestionControl.maxMasterProxyLoad = maxMasterProxyLoad; CongestionControl.maxMasterProxyLoad = maxMasterProxyLoad;

View File

@ -83,10 +83,11 @@ public class ShadingExecutor implements ContractExecutor {
// TODO 多调多统一个seq的有多个请求这个需要改 // TODO 多调多统一个seq的有多个请求这个需要改
MasterServerTCPAction.requestCache.get(contractID).put(req.seq, sendStr); MasterServerTCPAction.requestCache.get(contractID).put(req.seq, sendStr);
LOGGER.info("[ShadingExecutor] sendRequests " + JsonUtil.toJson(req)); LOGGER.debug(JsonUtil.toJson(req));
String[] nodes = getAccordingToRouteInfo(req); String[] nodes = getAccordingToRouteInfo(req);
LOGGER.info("node.size = " + nodes.length + " " + JsonUtil.toJson(nodes.toString())); LOGGER.info("node size = " + nodes.length);
LOGGER.debug("nodes:" + JsonUtil.toJson(nodes));
for (String node : nodes) { for (String node : nodes) {
LOGGER.info(node); LOGGER.info(node);
// String node = nodes.get(n);//根据下标随机获得一个 // String node = nodes.get(n);//根据下标随机获得一个
@ -132,19 +133,23 @@ public class ShadingExecutor implements ContractExecutor {
ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID); ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID);
FunctionDesp fun = meta.getExportedFunction(req.getAction()); FunctionDesp fun = meta.getExportedFunction(req.getAction());
RouteInfo routeInfo = fun.getRoute(); RouteInfo routeInfo = fun.getRoute();
int val = 0; int val;
switch (routeInfo.useDefault) { switch (routeInfo.useDefault) {
case byRequester: case byRequester:
val = val =
new BigInteger(req.getRequester(), 16) new BigInteger(req.getRequester(), 16)
.mod(new BigInteger("" + members.length)) .mod(new BigInteger("" + members.length))
.intValue(); .intValue();
for (; val < 0; ) val = val + members.length; while (val < 0) {
val = val + members.length;
}
return new String[]{members[val]}; return new String[]{members[val]};
case byArgHash: case byArgHash:
val = req.getArg().hashCode(); val = req.getArg().hashCode();
val = val % members.length; val = val % members.length;
for (; val < 0; ) val += members.length; while (val < 0) {
val += members.length;
}
return new String[]{members[val]}; return new String[]{members[val]};
case byTarget: case byTarget:
JsonObject jo = JsonParser.parseString(req.getArg()).getAsJsonObject(); JsonObject jo = JsonParser.parseString(req.getArg()).getAsJsonObject();
@ -152,7 +157,9 @@ public class ShadingExecutor implements ContractExecutor {
new BigInteger(jo.get("target").getAsString(), 16) new BigInteger(jo.get("target").getAsString(), 16)
.mod(new BigInteger("" + members.length)) .mod(new BigInteger("" + members.length))
.intValue(); .intValue();
for (; val < 0; ) val = val + members.length; while (val < 0) {
val = val + members.length;
}
return new String[]{members[val]}; return new String[]{members[val]};
default: default:
return members; return members;

View File

@ -4,27 +4,26 @@ import org.bdware.sc.bean.ContractExecType;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
//改名 MultiPointCooperateContractInfo //改名 MultiPointCooperateContractInfo
public class ShadingMultiPointContractInfo extends MultiPointContractInfo{ public class ShadingMultiPointContractInfo extends MultiPointContractInfo {
public List<String> members; //pubKey public List<String> members; //pubKey
public List<Integer> shadingId; //shading id 初始化时被分配 public List<Integer> shadingId; //shading id 初始化时被分配
public String masterNode; public String masterNode;
public ContractExecType type; public ContractExecType type;
public ContractUnitStatus unitStatus = ContractUnitStatus.CommonMode; public ContractUnitStatus unitStatus = ContractUnitStatus.CommonMode;
public transient ContractExecutor rcf;
AtomicInteger ai = new AtomicInteger(-1); AtomicInteger ai = new AtomicInteger(-1);
public transient ContractExecutor rcf; public int getCurSeq() {
public int getCurSeq(){
return ai.get(); return ai.get();
} }
public void nextSeq(){ public void nextSeq() {
ai.getAndIncrement(); ai.getAndIncrement();
} }
public void setSeq(int seq){ public void setSeq(int seq) {
ai = new AtomicInteger(seq); ai = new AtomicInteger(seq);
} }
} }

View File

@ -31,7 +31,7 @@ public class EventActions {
return; return;
} }
String contractID = client.getContractID(); String contractID = client.getContractID();
topic = HashUtil.sha3(contractID + topic); topic = HashUtil.sha3(contractID, topic);
} }
CMActions.manager.subEventByClient(topic, rcb.getChannel()); CMActions.manager.subEventByClient(topic, rcb.getChannel());
ret.addProperty("status", "Success"); ret.addProperty("status", "Success");