build: config spotless plugin and reformat code

This commit is contained in:
Frank.R.Wu 2023-06-15 11:07:56 +08:00
parent b2124820ca
commit 16350d3a78
36 changed files with 1775 additions and 2094 deletions

View File

@ -3,6 +3,8 @@ plugins {
id 'application'
}
apply from: '../spotless.gradle'
mainClassName = 'org.bdware.server.NodeCenterServer'
application {

View File

@ -18,8 +18,7 @@ public class UDPMessage implements Serializable {
int requestID;
byte[] content;
public UDPMessage() {
}
public UDPMessage() {}
public void setIsPart(boolean isPart) {
this.isPart = isPart ? 1 : 0;

View File

@ -59,11 +59,9 @@ public class NodeCenterServer {
KeyValueDBUtil.setupNC();
// TimeDBUtil.setupNC();
TimeDBUtil.setupNC();
Configurator.setLevel(
"io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder",
Configurator.setLevel("io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder",
Level.OFF);
Configurator.setLevel(
"io.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder",
Configurator.setLevel("io.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder",
Level.OFF);
}
@ -125,18 +123,18 @@ public class NodeCenterServer {
try {
BufferedReader br = new BufferedReader(new FileReader(keyFile));
String pubKey = br.readLine();
String nowManager =
KeyValueDBUtil.instance.getValue(
NCTables.ConfigDB.toString(), NCManagerAction.centerManger);
String nowManager = KeyValueDBUtil.instance.getValue(NCTables.ConfigDB.toString(),
NCManagerAction.centerManger);
// manager.key is used when node manager isn' set
if (null == nowManager || nowManager.isEmpty()) {
KeyValueDBUtil.instance.setValue(
NCTables.ConfigDB.toString(), NCManagerAction.centerManger, pubKey);
KeyValueDBUtil.instance.setValue(
NCTables.ConfigDB.toString(), NCManagerAction.clusterName, "clusterName_" + pubKey.substring(0, 5));
KeyValueDBUtil.instance.setValue(NCTables.NodeUser.toString(), pubKey, Role.CenterManager.toString());
KeyValueDBUtil.instance.setValue(
NCTables.NodeTime.toString(), pubKey, Long.toString(new Date().getTime()));
KeyValueDBUtil.instance.setValue(NCTables.ConfigDB.toString(),
NCManagerAction.centerManger, pubKey);
KeyValueDBUtil.instance.setValue(NCTables.ConfigDB.toString(),
NCManagerAction.clusterName, "clusterName_" + pubKey.substring(0, 5));
KeyValueDBUtil.instance.setValue(NCTables.NodeUser.toString(), pubKey,
Role.CenterManager.toString());
KeyValueDBUtil.instance.setValue(NCTables.NodeTime.toString(), pubKey,
Long.toString(new Date().getTime()));
LOGGER.info("set node manager from manager.key");
}
} catch (IOException ignored) {
@ -157,16 +155,12 @@ public class NodeCenterServer {
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.localAddress(port)
.childHandler(
new ChannelInitializer<SocketChannel>() {
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100).localAddress(port)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel arg0) {
arg0.pipeline()
.addLast(new DelimiterCodec())
arg0.pipeline().addLast(new DelimiterCodec())
.addLast(new NodeCenterFrameHandler());
}
});
@ -174,8 +168,8 @@ public class NodeCenterServer {
}
public static void startHttp(int port) {
File[] pluginJar = new File("./pluginLib/")
.listFiles(pathname -> pathname.getName().endsWith(".jar"));
File[] pluginJar =
new File("./pluginLib/").listFiles(pathname -> pathname.getName().endsWith(".jar"));
URL[] urls;
if (pluginJar != null && pluginJar.length > 0) {
urls = new URL[pluginJar.length];
@ -199,9 +193,7 @@ public class NodeCenterServer {
ServerBootstrap b = new ServerBootstrap();
ControlledChannelInitializer initializer = new ControlledChannelInitializer();
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(port)
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).localAddress(port)
.childHandler(initializer);
final Channel ch = b.bind(port).sync().channel();
ch.closeFuture().sync();
@ -233,12 +225,9 @@ public class NodeCenterServer {
} else {
LOGGER.warn("disable ssl");
}
arg0.pipeline()
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(65536))
arg0.pipeline().addLast(new HttpServerCodec()).addLast(new HttpObjectAggregator(65536))
.addLast(new WebSocketServerProtocolHandler(PATH, null, true))
.addLast(new ChunkedWriteHandler())
.addLast(handler)
.addLast(new ChunkedWriteHandler()).addLast(handler)
.addLast(new NodeCenterWSFrameHandler());
}
}

View File

@ -24,7 +24,8 @@ public class DistributeCallback extends ResultCallback {
int index;
String distributeID; // 发起节点的分发id
public DistributeCallback(String ss, String sponID, Map<String, String> n, ResultCallback re, String p, String s) {
public DistributeCallback(String ss, String sponID, Map<String, String> n, ResultCallback re,
String p, String s) {
distributeID = ss;
count = 0;
index = 1;
@ -67,14 +68,10 @@ public class DistributeCallback extends ResultCallback {
public void onDistribute(String progress, String nodeIP) {
Map<String, String> ret2 = new HashMap<>();
ret2.put("action", "onDistributeContract");
ret2.put(
"progress",
"NC is sending ypk to NO."
+ index
+ " node in the units,progress is "
+ progress
+ "%.");
if (progress.equals("100.00")) index++;
ret2.put("progress", "NC is sending ypk to NO." + index + " node in the units,progress is "
+ progress + "%.");
if (progress.equals("100.00"))
index++;
Map<String, String> map = new HashMap<>();
map.put("action", "onDistribute");
@ -123,8 +120,8 @@ public class DistributeCallback extends ResultCallback {
public void onResult(String str) {
LOGGER.info("[DistributeCallback] str=" + str);
Map<String, String> map = JsonUtil.fromJson(str, new TypeToken<Map<String, String>>() {
}.getType());
Map<String, String> map =
JsonUtil.fromJson(str, new TypeToken<Map<String, String>>() {}.getType());
NodeCenterActions.sync.sleepWithTimeout(map.get("requestID"), this, 60);

View File

@ -25,14 +25,15 @@ public class RequestAllExecutor implements ContractExecutor {
resultCount = count;
}
public ResultCallback createResultCallback(
final String requestID, final ResultCallback originalCb, int count, String contractID) {
public ResultCallback createResultCallback(final String requestID,
final ResultCallback originalCb, int count, String contractID) {
return new Collector(requestID, originalCb, count, contractID);
}
// only for test ADSP
// public ResultCallback createResultCallback(
// final String requestID, final ResultCallback originalCb, int count,String contractID,String seq) {
// final String requestID, final ResultCallback originalCb, int count,String contractID,String
// seq) {
// ResultCallback collector = new Collector(requestID, originalCb, count,contractID,seq);
// return collector;
// }
@ -42,9 +43,7 @@ public class RequestAllExecutor implements ContractExecutor {
for (String node : nodes) {
CMNode cmNode = NodeCenterActions.nodeInfos.get(node);
if (cmNode == null) {
collector.onResult(
"{\"status\":\"Error\",\"result\":\"node "
+ node
collector.onResult("{\"status\":\"Error\",\"result\":\"node " + node
+ " offline\",\"action\":\"onExecuteContractTrustfully\"}");
} else {
cmNode.connection.controller.sendMsg(req);
@ -58,7 +57,8 @@ public class RequestAllExecutor implements ContractExecutor {
String id = jo2.get("contractID").getAsString();
// only for test ADSP
// ResultCallback collector = createResultCallback(requestID, rc, resultCount,id,jo2.get("seq").getAsString());
// ResultCallback collector = createResultCallback(requestID, rc,
// resultCount,id,jo2.get("seq").getAsString());
ResultCallback collector = createResultCallback(requestID, rc, resultCount, id);
@ -156,16 +156,14 @@ public class RequestAllExecutor implements ContractExecutor {
// }
/* if (cr.status == ContractResult.Status.Error) { //TODO 规范Status的使用 改成 ==Statuc.Error才恢复
String nodePubKey = obj.get("nodeID").getAsString();
if(NodeCenterActions.recoverMap.containsKey(nodePubKey)){
ContractRecord record = NodeCenterActions.recoverMap.get(nodePubKey).get(contractID);
if(record.recoverFlag == RecoverFlag.Fine){
record.recoverFlag = RecoverFlag.ToRecover;
MasterActions.restartContracts(nodePubKey);
}
}
}*/
/*
* if (cr.status == ContractResult.Status.Error) { //TODO 规范Status的使用 改成
* ==Statuc.Error才恢复 String nodePubKey = obj.get("nodeID").getAsString();
* if(NodeCenterActions.recoverMap.containsKey(nodePubKey)){ ContractRecord record =
* NodeCenterActions.recoverMap.get(nodePubKey).get(contractID);
* if(record.recoverFlag == RecoverFlag.Fine){ record.recoverFlag =
* RecoverFlag.ToRecover; MasterActions.restartContracts(nodePubKey); } } }
*/
}
}
}

View File

@ -26,8 +26,7 @@ public class RequestOnceExecutor implements ContractExecutor {
JsonObject jo2 = (new JsonParser()).parse(req).getAsJsonObject();
String contractID = jo2.get("contractID").getAsString();
ResultCallback cb =
new ResultCallback() {
ResultCallback cb = new ResultCallback() {
@Override
public void onResult(String str) {
LOGGER.debug(str);
@ -49,8 +48,7 @@ public class RequestOnceExecutor implements ContractExecutor {
return;
}
}
rc.onResult(
"{\"status\":\"Error\",\"result\":\"all nodes "
rc.onResult("{\"status\":\"Error\",\"result\":\"all nodes "
+ " offline\",\"action\":\"onExecuteContract\"}");
}
}

View File

@ -32,8 +32,7 @@ public class ResponseOnceExecutor implements ContractExecutor {
JsonObject jo2 = (new JsonParser()).parse(req).getAsJsonObject();
String contractID = jo2.get("contractID").getAsString();
// TODO 标注失效节点是否选择重新迁移
ResultCallback cb =
new ResultCallback() {
ResultCallback cb = new ResultCallback() {
@Override
public void onResult(String str) {
LOGGER.debug(str);

View File

@ -32,8 +32,8 @@ public class CMNode {
contracts = cons;
contractVersion = version;
for (ContractDesp desp : cons) {
KeyValueDBUtil.instance.setValue(
NCTables.ContractMeta.toString(), desp.contractID, desp.contractName);
KeyValueDBUtil.instance.setValue(NCTables.ContractMeta.toString(), desp.contractID,
desp.contractName);
}
}
@ -53,7 +53,8 @@ public class CMNode {
if (null != contracts) {
for (ContractDesp desp : contracts) {
if (desp.contractID.equals(contractIDOrName)
|| desp.contractName.equals(contractIDOrName)) return true;
|| desp.contractName.equals(contractIDOrName))
return true;
}
}
return false;
@ -79,7 +80,8 @@ public class CMNode {
}
try {
if (!connection.controller.isOpen()) {
LOGGER.info("node " + nodeName + "(" + pubKey.substring(0, 5) + ") may be offline!");
LOGGER.info(
"node " + nodeName + "(" + pubKey.substring(0, 5) + ") may be offline!");
// System.out.println(
// new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// .format(new Date(System.currentTimeMillis()))
@ -116,9 +118,7 @@ public class CMNode {
LOGGER.debug("removeCIManager" + string);
int start = this.cimanager.indexOf(string);
if (start > 0) {
this.cimanager =
this.cimanager
.substring(0, start)
this.cimanager = this.cimanager.substring(0, start)
.concat(this.cimanager.substring(start + 130));
}
}

View File

@ -35,13 +35,8 @@ public class CheckAgentAliveTimer extends TimerTask {
for (ContractDesp cd : info.contracts) {
cd.setIsMaster(false);
// 注意 选举不通过NC的机制触发
LOGGER.info(
"checkAlive---- 设置节点 "
+ info.pubKey.substring(0, 5)
+ " 的合约 "
+ cd.contractID
+ " isMaster="
+ false);
LOGGER.info("checkAlive---- 设置节点 " + info.pubKey.substring(0, 5) + " 的合约 "
+ cd.contractID + " isMaster=" + false);
}
}
}

View File

@ -49,9 +49,7 @@ public class FileActions {
return ret != null && ret.equals("locked");
}
@URIPath(
method = org.bdware.server.http.HttpMethod.POST,
value = {"/upload"})
@URIPath(method = org.bdware.server.http.HttpMethod.POST, value = {"/upload"})
public static void handleUploadRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
LOGGER.info("[FileActions] handleUploadRequest : ");
// Upload method is POST
@ -65,18 +63,12 @@ public class FileActions {
}
}
if (!transformedParam.containsKey("fileName")
|| !transformedParam.containsKey("order")
|| !transformedParam.containsKey("count")
|| !transformedParam.containsKey("pubKey")
if (!transformedParam.containsKey("fileName") || !transformedParam.containsKey("order")
|| !transformedParam.containsKey("count") || !transformedParam.containsKey("pubKey")
|| !transformedParam.containsKey("sign")) {
DefaultFullHttpResponse fullResponse =
new DefaultFullHttpResponse(
request.protocolVersion(),
OK,
Unpooled.wrappedBuffer(
"{\"status\":\"false\",\"data\":\"Missing argument!\"}"
.getBytes()));
DefaultFullHttpResponse fullResponse = new DefaultFullHttpResponse(
request.protocolVersion(), OK, Unpooled.wrappedBuffer(
"{\"status\":\"false\",\"data\":\"Missing argument!\"}".getBytes()));
ChannelFuture f = ctx.write(fullResponse);
f.addListener(ChannelFutureListener.CLOSE);
@ -85,9 +77,7 @@ public class FileActions {
// 验签
// HttpMethod method = request.method();
String uri =
URLDecoder.decode(request.uri())
.split("\\?")[1]; // http请求中规定签名必须是最后一个且公钥名必须为pubKey否则验签失败
String uri = URLDecoder.decode(request.uri()).split("\\?")[1]; // http请求中规定签名必须是最后一个且公钥名必须为pubKey否则验签失败
int index = uri.lastIndexOf('&');
String str = uri.substring(0, index);
@ -115,9 +105,8 @@ public class FileActions {
Method mm;
Action a = null;
try {
mm =
FileActions.class.getDeclaredMethod(
"uploadFile", JsonObject.class, ResultCallback.class);
mm = FileActions.class.getDeclaredMethod("uploadFile", JsonObject.class,
ResultCallback.class);
a = mm.getAnnotation(Action.class);
} catch (SecurityException | NoSuchMethodException e1) {
// TODO Auto-generated catch block
@ -147,28 +136,17 @@ public class FileActions {
status = "accept";
}
TimeDBUtil.instance.put(
NCTables.NodeHttpLog.toString(),
"{\"action\":\""
+ action
+ "\",\"pubKey\":\""
+ transformedParam.get("pubKey")
+ "\",\"status\":\""
+ status
+ "\",\"date\":"
+ System.currentTimeMillis()
TimeDBUtil.instance.put(NCTables.NodeHttpLog.toString(),
"{\"action\":\"" + action + "\",\"pubKey\":\"" + transformedParam.get("pubKey")
+ "\",\"status\":\"" + status + "\",\"date\":" + System.currentTimeMillis()
+ "}");
// logger.info("[CMHttpHandler] flag = " + flag + " flag2 = " + flag2);
if (!flag || !flag2) {
DefaultFullHttpResponse fullResponse =
new DefaultFullHttpResponse(
request.protocolVersion(),
OK,
Unpooled.wrappedBuffer(
"{\"status\":\"false\",\"data\":\"Permission denied!\"}"
.getBytes()));
DefaultFullHttpResponse fullResponse = new DefaultFullHttpResponse(
request.protocolVersion(), OK, Unpooled.wrappedBuffer(
"{\"status\":\"false\",\"data\":\"Permission denied!\"}".getBytes()));
ChannelFuture f = ctx.write(fullResponse);
f.addListener(ChannelFutureListener.CLOSE);
@ -191,13 +169,9 @@ public class FileActions {
}
File target = new File(dir, fileName);
if (fileName.contains("..")) {
DefaultFullHttpResponse fullResponse =
new DefaultFullHttpResponse(
request.protocolVersion(),
OK,
Unpooled.wrappedBuffer(
"{\"status\":\"false\",\"data\":\"FileName illegal!\"}"
.getBytes()));
DefaultFullHttpResponse fullResponse = new DefaultFullHttpResponse(
request.protocolVersion(), OK, Unpooled.wrappedBuffer(
"{\"status\":\"false\",\"data\":\"FileName illegal!\"}".getBytes()));
ChannelFuture f = ctx.write(fullResponse);
f.addListener(ChannelFutureListener.CLOSE);
@ -211,7 +185,8 @@ public class FileActions {
if (order == 0) {
try {
LOGGER.debug("Path:" + target.getAbsolutePath());
if (!target.getParentFile().exists()) target.getParentFile().mkdirs();
if (!target.getParentFile().exists())
target.getParentFile().mkdirs();
fout = new FileOutputStream(target, false);
fileMap.put(target.getAbsolutePath(), fout);
} catch (FileNotFoundException e) {
@ -238,16 +213,15 @@ public class FileActions {
retStr = retStr.replaceFirst("null", doi);
}
}
DefaultFullHttpResponse fullResponse =
new DefaultFullHttpResponse(
DefaultFullHttpResponse fullResponse = new DefaultFullHttpResponse(
request.protocolVersion(), OK, Unpooled.wrappedBuffer(retStr.getBytes()));
fullResponse.headers().add("Access-Control-Allow-Origin", "*");
ChannelFuture f = ctx.write(fullResponse);
f.addListener(ChannelFutureListener.CLOSE);
}
private static void writeChunk(
ChannelHandlerContext ctx, HttpPostRequestDecoder httpDecoder, File file) {
private static void writeChunk(ChannelHandlerContext ctx, HttpPostRequestDecoder httpDecoder,
File file) {
try {
if (!file.exists()) {
file.createNewFile();
@ -274,19 +248,14 @@ public class FileActions {
@URIPath(method = HttpMethod.OPTIONS)
public static void crossOrigin(ChannelHandlerContext ctx, FullHttpRequest request) {
DefaultFullHttpResponse fullResponse =
new DefaultFullHttpResponse(
request.protocolVersion(),
OK,
Unpooled.wrappedBuffer("success".getBytes()));
DefaultFullHttpResponse fullResponse = new DefaultFullHttpResponse(
request.protocolVersion(), OK, Unpooled.wrappedBuffer("success".getBytes()));
fullResponse.headers().remove("Access-Control-Allow-Origin");
fullResponse.headers().remove("Access-Control-Allow-Headers");
fullResponse.headers().add("Access-Control-Allow-Origin", "*");
fullResponse
.headers()
.add("Access-Control-Allow-Headers",
"Content-Type, Cookie, Accept-Encoding, User-Agent, Host, Referer, " +
"X-Requested-With, Accept, Accept-Language, Cache-Control, Connection");
fullResponse.headers().add("Access-Control-Allow-Headers",
"Content-Type, Cookie, Accept-Encoding, User-Agent, Host, Referer, "
+ "X-Requested-With, Accept, Accept-Language, Cache-Control, Connection");
ChannelFuture f = ctx.write(fullResponse);
f.addListener(ChannelFutureListener.CLOSE);
LOGGER.info("[OOOOOOOOption] received!");
@ -338,10 +307,8 @@ public class FileActions {
// 文本文件
if (!path.contains("..") && isTextFile(path)) {
LOGGER.debug("[FileActions] 上传文本文件类型 : ");
BufferedWriter bw =
new BufferedWriter(
new FileWriter(
target.getAbsolutePath() + "/" + fileName, isAppend));
BufferedWriter bw = new BufferedWriter(
new FileWriter(target.getAbsolutePath() + "/" + fileName, isAppend));
bw.write(content);
bw.close();
} else { // 其他类型文件
@ -426,10 +393,7 @@ public class FileActions {
File f = new File(parPath + "/" + oldFile);
if (!oldFile.contains("..") && f.exists()) {
LOGGER.info(
"[FileController] delete:"
+ f.getAbsolutePath()
+ " exists:"
+ f.exists());
"[FileController] delete:" + f.getAbsolutePath() + " exists:" + f.exists());
f.delete();
}
response.data = "success";

View File

@ -7,7 +7,6 @@ import org.apache.logging.log4j.Logger;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.db.KeyValueDBUtil;
import org.bdware.sc.db.MultiIndexTimeDBUtilIntf;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.NodeCenterServer;
import org.bdware.server.action.Action;
@ -33,10 +32,7 @@ public class LogActions {
resultCallback.onResult(jsonResult);
}
private void queryInternal(
String actResp,
MultiIndexTimeDBUtilIntf db,
JsonObject json,
private void queryInternal(String actResp, MultiIndexTimeDBUtilIntf db, JsonObject json,
ResultCallback resultCallback) {
long start = System.currentTimeMillis();
if (!json.has("start")) {
@ -55,19 +51,18 @@ public class LogActions {
Map<String, Object> data = new HashMap<>();
for (String str : category) {
List<JsonObject> array = db.queryByDateAsJson(str, startTime, endTime);
if (str == null) data.put("primary", array);
else data.put(str, array);
if (str == null)
data.put("primary", array);
else
data.put(str, array);
}
simpleReply(resultCallback, actResp, data);
long end = System.currentTimeMillis();
LOGGER.debug("[queryInternal:time]" + (end - start));
}
private void countLogByCategoryInternal(
String actResp,
MultiIndexTimeDBUtilIntf db,
JsonObject json,
ResultCallback resultCallback) {
private void countLogByCategoryInternal(String actResp, MultiIndexTimeDBUtilIntf db,
JsonObject json, ResultCallback resultCallback) {
if (!json.has("start")) {
simpleReply(resultCallback, actResp, new ArrayList<>());
return;
@ -85,8 +80,10 @@ public class LogActions {
JsonObject data = new JsonObject();
for (String str : category) {
JsonArray array = db.countInInterval(str, startTime, interval, endTime);
if (str == null) data.add("primary", array);
else data.add(str, array);
if (str == null)
data.add("primary", array);
else
data.add(str, array);
}
simpleReply(resultCallback, actResp, data);
}
@ -98,8 +95,8 @@ public class LogActions {
@Action(userPermission = 1 << 8, async = true)
public void countActionLogByCategory(JsonObject json, ResultCallback resultCallback) {
countLogByCategoryInternal(
"onCountActionLogByCategory", NodeCenterServer.nodeHttpLogDB, json, resultCallback);
countLogByCategoryInternal("onCountActionLogByCategory", NodeCenterServer.nodeHttpLogDB,
json, resultCallback);
}
@Action(userPermission = 1 << 8, async = true)
@ -109,8 +106,8 @@ public class LogActions {
@Action(userPermission = 1 << 8, async = true)
public void countCMLogByCategory(JsonObject json, ResultCallback resultCallback) {
countLogByCategoryInternal(
"onCountCMLogByCategory", NodeCenterServer.nodeTcpLogDB, json, resultCallback);
countLogByCategoryInternal("onCountCMLogByCategory", NodeCenterServer.nodeTcpLogDB, json,
resultCallback);
}
@Action(userPermission = 1 << 9)
@ -138,8 +135,7 @@ public class LogActions {
List<CMNode> onlineNodes = new ArrayList<>();
Map<String, Object> ret = new HashMap<>();
if (KeyValueDBUtil.instance
.getValue(NCTables.ConfigDB.toString(), "__CenterManager__")
if (KeyValueDBUtil.instance.getValue(NCTables.ConfigDB.toString(), "__CenterManager__")
.contains(pubKey)) {
LOGGER.debug("is center manager");
LOGGER.debug("dbnodes " + dbnodes.toString());

View File

@ -12,12 +12,15 @@ import java.util.concurrent.TimeUnit;
public class MasterActions {
private static final Logger LOGGER = LogManager.getLogger(MasterActions.class);
// TODO 定期清缓存
public static Map<String, RequestCache> requestCache =
new ConcurrentHashMap<>(); // key is contractID,only for requestAll type contract
public static Map<String, RequestCache> requestCache = new ConcurrentHashMap<>(); // key is
// contractID,only
// for
// requestAll
// type
// contract
static {
NodeCenterServer.scheduledThreadPool.scheduleWithFixedDelay(
() -> {
NodeCenterServer.scheduledThreadPool.scheduleWithFixedDelay(() -> {
boolean flag = clearCache();
if (flag) {
try {
@ -26,10 +29,7 @@ public class MasterActions {
LOGGER.error("sleeping is interrupted! " + e.getMessage());
}
}
},
0,
0,
TimeUnit.SECONDS);
}, 0, 0, TimeUnit.SECONDS);
}
public NodeCenterFrameHandler controller;
@ -39,185 +39,156 @@ public class MasterActions {
}
// judge the just online node whwther need to restart contracts,send the contracts'info
/* public static void restartContracts(String nodePubKey){
if(!NodeCenterActions.recoverMap.containsKey(nodePubKey)){
return;
}
System.out.println("开始恢复节点" + NodeCenterActions.nodeinfos.get(nodePubKey).nodeName);
// 恢复该节点的每一个集群运行的合约
for (ContractRecord record : NodeCenterActions.recoverMap.get(nodePubKey).values()) {
String contractID = record.contractID;
if (record.recoverFlag != RecoverFlag.ToRecover)
continue;
Map<String, String> req = new HashMap<String, String>();
req.put("action", "queryUnitStatus");
req.put("contractID", contractID);
CMNode cmNode = NodeCenterActions.nodeinfos.get(nodePubKey);
cmNode.connection.controller.sendMsg(gson.toJson(req));
}
}*/
/* @Action(async = true)
public void onQueryUnitStatus(Map<String, String> args, final ResultCallback rc) {
String mode = args.get("mode");
String nodeID = args.get("nodeID");
String contractID = args.get("contractID");
ContractRecord record = NodeCenterActions.recoverMap.get(nodeID).get(contractID);
logger.debug("节点" + NodeCenterActions.nodeinfos.get(args.get("nodeID")).nodeName + "崩溃前模式为" + mode);
if(mode.equals(ContractUnitStatus.CommonMode.toString())){
restartFromCommonMode(nodeID,record);
}else if(mode.equals(ContractUnitStatus.StableMode.toString())){
restartFromStableMode(nodeID,record);
}
}*/
// 当StableMode的节点的lastExeSeq和集群相差超过10时通过CommonNode的恢复方式恢复
/* @Action(async = true)
public void restartByCommonNode(Map<String, String> args, final ResultCallback rc){
String nodeID = args.get("nodeID");
String contractID = args.get("contractID");
ContractRecord record = NodeCenterActions.recoverMap.get(nodeID).get(contractID);
restartFromCommonMode(nodeID,record);
}*/
/*
* public static void restartContracts(String nodePubKey){
* if(!NodeCenterActions.recoverMap.containsKey(nodePubKey)){ return; }
*
* System.out.println("开始恢复节点" + NodeCenterActions.nodeinfos.get(nodePubKey).nodeName);
*
*
* // 恢复该节点的每一个集群运行的合约 for (ContractRecord record :
* NodeCenterActions.recoverMap.get(nodePubKey).values()) { String contractID =
* record.contractID; if (record.recoverFlag != RecoverFlag.ToRecover) continue;
*
* Map<String, String> req = new HashMap<String, String>(); req.put("action",
* "queryUnitStatus"); req.put("contractID", contractID); CMNode cmNode =
* NodeCenterActions.nodeinfos.get(nodePubKey);
* cmNode.connection.controller.sendMsg(gson.toJson(req)); } }
*/
/*
public static void restartFromCommonMode(String nodePubKey,ContractRecord record){
logger.debug("从CommonMode中恢复:");
* @Action(async = true) public void onQueryUnitStatus(Map<String, String> args, final
* ResultCallback rc) { String mode = args.get("mode"); String nodeID = args.get("nodeID");
* String contractID = args.get("contractID"); ContractRecord record =
* NodeCenterActions.recoverMap.get(nodeID).get(contractID); logger.debug("节点" +
* NodeCenterActions.nodeinfos.get(args.get("nodeID")).nodeName + "崩溃前模式为" + mode);
* if(mode.equals(ContractUnitStatus.CommonMode.toString())){
* restartFromCommonMode(nodeID,record); }else
* if(mode.equals(ContractUnitStatus.StableMode.toString())){
* restartFromStableMode(nodeID,record); } }
*/
String contractID = record.contractID;
// 当StableMode的节点的lastExeSeq和集群相差超过10时通过CommonNode的恢复方式恢复
/*
* @Action(async = true) public void restartByCommonNode(Map<String, String> args, final
* ResultCallback rc){ String nodeID = args.get("nodeID"); String contractID =
* args.get("contractID"); ContractRecord record =
* NodeCenterActions.recoverMap.get(nodeID).get(contractID);
* restartFromCommonMode(nodeID,record); }
*/
record.recoverFlag = RecoverFlag.Recovering;
//先发消息让恢复节点的该合约收到消息后只加入队列不dealRequests
Map<String, String> request = new HashMap<>();
request.put("action", "setRecovering");
request.put("contractID",contractID);
CMNode node = NodeCenterActions.nodeinfos.get(nodePubKey);
node.connection.controller.sendMsg(gson.toJson(request));
//System.out.println("第一步 : [NodeCeterActions] restartContracts 开始处理合约 contractID=" + contractID);
if (NodeCenterActions.contractID2Members.containsKey(contractID)) {
// 在nodeinfos中找节点该节点的pubKey在pubKeys中
MultiPointContractInfo info = NodeCenterActions.contractID2Members.get(contractID);
CMNode cmNode = null;
for (int i = 0; i < info.members.size(); i++) {
int size = info.members.size();
String tempNodeID = info.members.get(record.order.incrementAndGet() % size);
if(NodeCenterActions.nodeinfos.containsKey(tempNodeID))
cmNode = NodeCenterActions.nodeinfos.get(tempNodeID);
else continue;
//System.out.println("查询节点 " + cmNode.nodeName);
if (cmNode != null && !cmNode.pubKey.equals(nodePubKey)) {
//System.out.println("第二步 : [NodeCenterActions] 找到一个依赖恢复节点,其节点名为 " + cmNode.nodeName);
Map<String, String> req = new HashMap<String, String>();
req.put("action", "dumpCurrentState");
req.put("contractID", contractID);
req.put("targetNodePubkey", nodePubKey);
// NC向该节点发送请求让其存储自身当前状态并发给NC
cmNode.connection.controller.sendMsg(gson.toJson(req));
return;
}
}
if(cmNode == null){
logger.debug("[NodeCenterActions] Can't find a recover rely node!");
}
}
}
/*
* public static void restartFromCommonMode(String nodePubKey,ContractRecord record){
* logger.debug("从CommonMode中恢复:");
*
* String contractID = record.contractID;
*
* record.recoverFlag = RecoverFlag.Recovering; //先发消息让恢复节点的该合约收到消息后只加入队列不dealRequests
* Map<String, String> request = new HashMap<>(); request.put("action", "setRecovering");
* request.put("contractID",contractID); CMNode node =
* NodeCenterActions.nodeinfos.get(nodePubKey);
* node.connection.controller.sendMsg(gson.toJson(request));
*
* //System.out.println("第一步 : [NodeCeterActions] restartContracts 开始处理合约 contractID=" +
* contractID);
*
* if (NodeCenterActions.contractID2Members.containsKey(contractID)) {
*
* // 在nodeinfos中找节点该节点的pubKey在pubKeys中 MultiPointContractInfo info =
* NodeCenterActions.contractID2Members.get(contractID); CMNode cmNode = null; for (int i = 0; i
* < info.members.size(); i++) { int size = info.members.size(); String tempNodeID =
* info.members.get(record.order.incrementAndGet() % size);
*
* if(NodeCenterActions.nodeinfos.containsKey(tempNodeID)) cmNode =
* NodeCenterActions.nodeinfos.get(tempNodeID); else continue;
*
* //System.out.println("查询节点 " + cmNode.nodeName);
*
* if (cmNode != null && !cmNode.pubKey.equals(nodePubKey)) {
* //System.out.println("第二步 : [NodeCenterActions] 找到一个依赖恢复节点,其节点名为 " + cmNode.nodeName);
*
* Map<String, String> req = new HashMap<String, String>(); req.put("action",
* "dumpCurrentState"); req.put("contractID", contractID); req.put("targetNodePubkey",
* nodePubKey);
*
* // NC向该节点发送请求让其存储自身当前状态并发给NC cmNode.connection.controller.sendMsg(gson.toJson(req));
*
* return; } }
*
* if(cmNode == null){ logger.debug("[NodeCenterActions] Can't find a recover rely node!"); } }
* }
*/
// TODO
/* public static void restartFromStableMode(String nodePubkey,ContractRecord record){
logger.debug("从StableMode中恢复:");
/*
* public static void restartFromStableMode(String nodePubkey,ContractRecord record){
* logger.debug("从StableMode中恢复:");
*
* String contractID = record.contractID; record.recoverFlag = RecoverFlag.Recovering;
* //先发消息让恢复节点的该合约收到消息后只加入队列不dealRequests Map<String, String> request = new HashMap<>();
* request.put("action", "setRecovering"); request.put("contractID",contractID);
* request.put("mode", ContractUnitStatus.StableMode.toString()); int lastExeSeq =
* NodeCenterActions.contractID2Members.get(contractID).ai.get() - 1;
* request.put("lastExeSeq",lastExeSeq + ""); CMNode node =
* NodeCenterActions.nodeinfos.get(nodePubkey);
* node.connection.controller.sendMsg(gson.toJson(request)); }
*/
String contractID = record.contractID;
record.recoverFlag = RecoverFlag.Recovering;
//先发消息让恢复节点的该合约收到消息后只加入队列不dealRequests
Map<String, String> request = new HashMap<>();
request.put("action", "setRecovering");
request.put("contractID",contractID);
request.put("mode", ContractUnitStatus.StableMode.toString());
int lastExeSeq = NodeCenterActions.contractID2Members.get(contractID).ai.get() - 1;
request.put("lastExeSeq",lastExeSeq + "");
CMNode node = NodeCenterActions.nodeinfos.get(nodePubkey);
node.connection.controller.sendMsg(gson.toJson(request));
}*/
/*
* public static void unitModeCheck(String contractID){ MultiPointContractInfo mpci =
* NodeCenterActions.contractID2Members.get(contractID); int total = 0,online = 0; for(String
* nodeId : mpci.members){ if(NodeCenterActions.nodeinfos.containsKey(nodeId)){ online++; }
* total++; }
*
* logger.debug("合约" + contractID + "的集群,上线节点有" + online + "个,总共节点有" + total +
* "个. Math.ceil(total / 2)=" + Math.ceil((double)total / 2) +
* " online > Math.ceil(total / 2)" + (online > Math.ceil(total / 2)) + " mpci.unitStatus=" +
* mpci.unitStatus); if(online > Math.ceil((double)total / 2) && mpci.unitStatus ==
* ContractUnitStatus.StableMode){ logger.debug("合约" + contractID + "的集群更改模式为" +
* ContractUnitStatus.CommonMode.toString());
*
* Map<String, String> req = new HashMap<String, String>(); req.put("action",
* "changeUnitStatus"); req.put("contractID", contractID);
* req.put("mode",ContractUnitStatus.CommonMode.toString()); for(String nodeId : mpci.members){
* if(NodeCenterActions.nodeinfos.containsKey(nodeId)){ CMNode cmNode =
* NodeCenterActions.nodeinfos.get(nodeId); logger.debug("发消息给节点 " + cmNode.nodeName + " 设置合约" +
* contractID + "的集群模式为CommonMode"); cmNode.connection.controller.sendMsg(gson.toJson(req)); } }
*
* mpci.unitStatus = ContractUnitStatus.CommonMode; }else if(online <= Math.ceil((double)total /
* 2) && mpci.unitStatus == ContractUnitStatus.CommonMode){ logger.debug("合约" + contractID +
* "的集群更改模式为" + ContractUnitStatus.StableMode.toString());
*
* Map<String, String> req = new HashMap<String, String>(); req.put("action",
* "changeUnitStatus"); req.put("contractID", contractID);
* req.put("mode",ContractUnitStatus.StableMode.toString()); for(String nodeId : mpci.members){
* if(NodeCenterActions.nodeinfos.containsKey(nodeId)){ CMNode cmNode =
* NodeCenterActions.nodeinfos.get(nodeId); Map<String,String> map =
* NodeCenterActions.recoverMap.get(nodeId).get(contractID).members;
* req.put("membersStr",JsonUtil.toJson(map)); //ContractRecord's members logger.debug("发消息给节点 "
* + cmNode.nodeName + " 设置合约" + contractID + "的集群模式为StableMode");
* cmNode.connection.controller.sendMsg(gson.toJson(req)); } }
*
* mpci.unitStatus = ContractUnitStatus.StableMode;
*
* //将ContractRecord中members发给集群中节点
*
* } }
*/
/* public static void unitModeCheck(String contractID){
MultiPointContractInfo mpci = NodeCenterActions.contractID2Members.get(contractID);
int total = 0,online = 0;
for(String nodeId : mpci.members){
if(NodeCenterActions.nodeinfos.containsKey(nodeId)){
online++;
}
total++;
}
logger.debug("合约" + contractID + "的集群,上线节点有" + online + "个,总共节点有" + total + "个. Math.ceil(total / 2)=" + Math.ceil((double)total / 2) + " online > Math.ceil(total / 2)" + (online > Math.ceil(total / 2)) + " mpci.unitStatus=" + mpci.unitStatus);
if(online > Math.ceil((double)total / 2) && mpci.unitStatus == ContractUnitStatus.StableMode){
logger.debug("合约" + contractID + "的集群更改模式为" + ContractUnitStatus.CommonMode.toString());
Map<String, String> req = new HashMap<String, String>();
req.put("action", "changeUnitStatus");
req.put("contractID", contractID);
req.put("mode",ContractUnitStatus.CommonMode.toString());
for(String nodeId : mpci.members){
if(NodeCenterActions.nodeinfos.containsKey(nodeId)){
CMNode cmNode = NodeCenterActions.nodeinfos.get(nodeId);
logger.debug("发消息给节点 " + cmNode.nodeName + " 设置合约" + contractID + "的集群模式为CommonMode");
cmNode.connection.controller.sendMsg(gson.toJson(req));
}
}
mpci.unitStatus = ContractUnitStatus.CommonMode;
}else if(online <= Math.ceil((double)total / 2) && mpci.unitStatus == ContractUnitStatus.CommonMode){
logger.debug("合约" + contractID + "的集群更改模式为" + ContractUnitStatus.StableMode.toString());
Map<String, String> req = new HashMap<String, String>();
req.put("action", "changeUnitStatus");
req.put("contractID", contractID);
req.put("mode",ContractUnitStatus.StableMode.toString());
for(String nodeId : mpci.members){
if(NodeCenterActions.nodeinfos.containsKey(nodeId)){
CMNode cmNode = NodeCenterActions.nodeinfos.get(nodeId);
Map<String,String> map = NodeCenterActions.recoverMap.get(nodeId).get(contractID).members;
req.put("membersStr",JsonUtil.toJson(map)); //ContractRecord's members
logger.debug("发消息给节点 " + cmNode.nodeName + " 设置合约" + contractID + "的集群模式为StableMode");
cmNode.connection.controller.sendMsg(gson.toJson(req));
}
}
mpci.unitStatus = ContractUnitStatus.StableMode;
//将ContractRecord中members发给集群中节点
}
}*/
/* @Action(async = true)
public void recoverFinish(Map<String, String> args, final ResultCallback rc) {
ContractRecord cr = NodeCenterActions.recoverMap.get(args.get("nodeID")).get(args.get("contractID"));
logger.debug("节点" + NodeCenterActions.nodeinfos.get(args.get("nodeID")).nodeName + "恢复完成!");
if(cr.recoverFlag == RecoverFlag.Recovering)
cr.recoverFlag = RecoverFlag.Fine;
logger.debug("恢复完成,需要检查合约" + args.get("contractID") + "的集群运行模式");
unitModeCheck(args.get("contractID"));
}*/
/*
* @Action(async = true) public void recoverFinish(Map<String, String> args, final
* ResultCallback rc) { ContractRecord cr =
* NodeCenterActions.recoverMap.get(args.get("nodeID")).get(args.get("contractID"));
* logger.debug("节点" + NodeCenterActions.nodeinfos.get(args.get("nodeID")).nodeName + "恢复完成!");
* if(cr.recoverFlag == RecoverFlag.Recovering) cr.recoverFlag = RecoverFlag.Fine;
*
* logger.debug("恢复完成,需要检查合约" + args.get("contractID") + "的集群运行模式");
* unitModeCheck(args.get("contractID")); }
*/
static boolean clearCache() {
if (requestCache.isEmpty()) return true;
if (requestCache.isEmpty())
return true;
// final long time = System.currentTimeMillis() - 60000L; //60s
// requestCache.entrySet()
@ -253,7 +224,8 @@ public class MasterActions {
static RequestCache getCache(String contractID) {
if (contractID != null) {
RequestCache cache = requestCache.get(contractID);
if (cache != null) return cache;
if (cache != null)
return cache;
else {
LOGGER.debug("[NodeCenterActions] create requestcache:" + contractID);
RequestCache reqc = new RequestCache();
@ -264,26 +236,17 @@ public class MasterActions {
return null;
}
/* @Action(async = true)
public void sendCachedRequests(Map<String, String> args, final ResultCallback rc) {
String contractID = args.get("contractID");
int start = Integer.parseInt(args.get("start"));
int end = Integer.parseInt(args.get("end"));
RequestCache cache = getCache(contractID);
for(int i = start + 1;i < end;i++){
if(cache == null || !cache.containsKey(i)){
//this node crash
String nodeID = args.get("nodeID");
MasterActions.restartContracts(nodeID);
}
JsonObject jo = cache.get(i);
if(jo == null){
logger.debug("在NC中第 " + i + "个请求已经被清,无法发给恢复节点!");
}else
controller.sendMsg(JsonUtil.toJson(jo));
logger.debug("NC发送第 " + i + " " + jo.get("seq").getAsString() + " 个请求给Node");
}
}*/
/*
* @Action(async = true) public void sendCachedRequests(Map<String, String> args, final
* ResultCallback rc) { String contractID = args.get("contractID"); int start =
* Integer.parseInt(args.get("start")); int end = Integer.parseInt(args.get("end"));
*
* RequestCache cache = getCache(contractID); for(int i = start + 1;i < end;i++){ if(cache ==
* null || !cache.containsKey(i)){ //this node crash String nodeID = args.get("nodeID");
* MasterActions.restartContracts(nodeID); } JsonObject jo = cache.get(i); if(jo == null){
* logger.debug("在NC中第 " + i + "个请求已经被清,无法发给恢复节点!"); }else
* controller.sendMsg(JsonUtil.toJson(jo));
*
* logger.debug("NC发送第 " + i + " " + jo.get("seq").getAsString() + " 个请求给Node"); } }
*/
}

View File

@ -90,7 +90,8 @@ public class MetaIndexAction { // public static IndexWriter indexWriter;
Query query = new TermQuery(new Term("contractID", thisDesp.contractID));
TopDocs docs = indexSearcher.search(query, 10);
LOGGER.debug(docs.scoreDocs);
if (null != thisDesp.contractName && (docs.scoreDocs == null || docs.scoreDocs.length == 0)) {
if (null != thisDesp.contractName
&& (docs.scoreDocs == null || docs.scoreDocs.length == 0)) {
req.addProperty("action", "requestReadMe");
req.addProperty("contractID", thisDesp.contractID);
rc.onResult(req);
@ -128,7 +129,8 @@ public class MetaIndexAction { // public static IndexWriter indexWriter;
public static void delIndexbyCID(JsonObject jo) {
try {
if (!jo.has("contractID")) return;
if (!jo.has("contractID"))
return;
String contractID = jo.get("contractID").getAsString();
LOGGER.info("contractID:" + contractID + "-->actually to delete");
indexWriter.deleteDocuments(new Term("contractID", contractID));
@ -140,7 +142,8 @@ public class MetaIndexAction { // public static IndexWriter indexWriter;
public static void delIndexbyOwner(JsonObject jo) {
try {
if (!jo.has("owner")) return;
if (!jo.has("owner"))
return;
String owner = jo.get("owner").getAsString();
LOGGER.info("owner:" + owner + "-->actually to delete");
indexWriter.deleteDocuments(new Term("owner", owner));
@ -188,12 +191,17 @@ public class MetaIndexAction { // public static IndexWriter indexWriter;
public static String getMetabyReadme(JsonObject jo) {
try {
if (!jo.has("keyword")) return "missing arguments: keyword";
if (!jo.has("keyword"))
return "missing arguments: keyword";
if (!jo.has("page")) page = 1;
else page = jo.get("page").getAsInt();
if (page <= 0) page = 1;
if (jo.has("pageSize")) PAGE_SIZE = jo.get("pageSize").getAsInt();
if (!jo.has("page"))
page = 1;
else
page = jo.get("page").getAsInt();
if (page <= 0)
page = 1;
if (jo.has("pageSize"))
PAGE_SIZE = jo.get("pageSize").getAsInt();
String keyword = jo.get("keyword").getAsString();
// Analyzer analyzer = new StandardAnalyzer();
Analyzer analyzer = new IKAnalyzer();
@ -231,11 +239,15 @@ public class MetaIndexAction { // public static IndexWriter indexWriter;
private static String getMetabyPubkey(JsonObject jo) {
try {
if (!jo.has("pubkey")) return "missing arguments: pubkey";
if (!jo.has("pubkey"))
return "missing arguments: pubkey";
if (!jo.has("page")) page = 1;
else page = jo.get("page").getAsInt();
if (page <= 0) page = 1;
if (!jo.has("page"))
page = 1;
else
page = jo.get("page").getAsInt();
if (page <= 0)
page = 1;
String pubkey = jo.get("pubkey").getAsString();
DirectoryReader indexReader = DirectoryReader.open(indexDir);
@ -271,11 +283,15 @@ public class MetaIndexAction { // public static IndexWriter indexWriter;
private static String getMetabyCID(JsonObject jo) {
System.out.println("getMetabyCID" + jo.toString());
try {
if (!jo.has("contractID")) return "missing arguments: contractID";
if (!jo.has("contractID"))
return "missing arguments: contractID";
if (!jo.has("page")) page = 1;
else page = jo.get("page").getAsInt();
if (page <= 0) page = 1;
if (!jo.has("page"))
page = 1;
else
page = jo.get("page").getAsInt();
if (page <= 0)
page = 1;
String contractID = jo.get("contractID").getAsString();
DirectoryReader indexReader = DirectoryReader.open(indexDir);
@ -310,12 +326,17 @@ public class MetaIndexAction { // public static IndexWriter indexWriter;
private static String getMetabyOwner(JsonObject jo) {
try {
if (!jo.has("owner")) return "missing arguments: owner";
if (!jo.has("owner"))
return "missing arguments: owner";
if (!jo.has("page")) page = 1;
else page = jo.get("page").getAsInt();
if (page <= 0) page = 1;
if (jo.has("pageSize")) PAGE_SIZE = jo.get("pageSize").getAsInt();
if (!jo.has("page"))
page = 1;
else
page = jo.get("page").getAsInt();
if (page <= 0)
page = 1;
if (jo.has("pageSize"))
PAGE_SIZE = jo.get("pageSize").getAsInt();
String owner = jo.get("owner").getAsString();
DirectoryReader indexReader = DirectoryReader.open(indexDir);
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
@ -350,18 +371,15 @@ public class MetaIndexAction { // public static IndexWriter indexWriter;
public static void reqHandler(JsonObject req, BooleanQuery.Builder query)
throws ParseException {
if (req.get("contractID").getAsString() != null) {
query.add(
new FuzzyQuery(new Term("contractID", req.get("contractID").getAsString())),
query.add(new FuzzyQuery(new Term("contractID", req.get("contractID").getAsString())),
BooleanClause.Occur.SHOULD);
}
if (req.get("owner").getAsString() != null) {
query.add(
new FuzzyQuery(new Term("owner", req.get("owner").getAsString())),
query.add(new FuzzyQuery(new Term("owner", req.get("owner").getAsString())),
BooleanClause.Occur.SHOULD);
}
if (req.get("pubkey").getAsString() != null) {
query.add(
new PrefixQuery(new Term("pubkey", req.get("pubkey").getAsString())),
query.add(new PrefixQuery(new Term("pubkey", req.get("pubkey").getAsString())),
BooleanClause.Occur.SHOULD);
}
if (req.get("readmeStr").getAsString() != null) {
@ -400,8 +418,7 @@ public class MetaIndexAction { // public static IndexWriter indexWriter;
resultModel.setContractMetaList(contractMetaList);
resultModel.setCurPage(page);
Long pageCount =
docs.totalHits.value % PAGE_SIZE > 0
? (docs.totalHits.value) / PAGE_SIZE + 1
docs.totalHits.value % PAGE_SIZE > 0 ? (docs.totalHits.value) / PAGE_SIZE + 1
: (docs.totalHits.value) / PAGE_SIZE;
resultModel.setPageCount(pageCount);
return resultModel;
@ -421,11 +438,7 @@ public class MetaIndexAction { // public static IndexWriter indexWriter;
// /DOIP/Hello1/assets/logo.png
// String nodeAddr="127.0.0.1:21030";
String pngUrl = "http://" + nodeAddr + "/DOIP/" + name + "/assets/logo.png";
System.out.println(
"node.masterAddress: "
+ node.masterAddress
+ "=========="
+ "nodeAddr: "
System.out.println("node.masterAddress: " + node.masterAddress + "==========" + "nodeAddr: "
+ nodeAddr);
System.out.println("name: " + name + "pngUrl: " + pngUrl);
String contractID = json.get("contractID").getAsString();
@ -465,7 +478,8 @@ public class MetaIndexAction { // public static IndexWriter indexWriter;
public void getMetabyReadme(JsonObject json, ResultCallback rc) {
String result = getMetabyReadme(json);
JsonObject object = new JsonObject();
if (json.has("requestID")) object.add("responseID", json.get("requestID"));
if (json.has("requestID"))
object.add("responseID", json.get("requestID"));
System.out.println("zzzResult" + result);
object.add("result", JsonParser.parseString(result));
object.addProperty("action", "getMetabyReadme");
@ -475,14 +489,18 @@ public class MetaIndexAction { // public static IndexWriter indexWriter;
@Action(async = true, userPermission = 1L)
public void segmentWord(JsonObject json, ResultCallback rc) throws IOException {
// Analyzer analyzer = new IKAnalyzer(true);
CharArraySet stopWords = CharArraySet.unmodifiableSet(WordlistLoader.getWordSet(new InputStreamReader(Objects.requireNonNull(MetaIndexAction.class.getClassLoader().getResourceAsStream(
"org/bdware/server/stopwords.txt")), StandardCharsets.UTF_8)));
CharArraySet stopWords =
CharArraySet.unmodifiableSet(WordlistLoader.getWordSet(new InputStreamReader(
Objects.requireNonNull(MetaIndexAction.class.getClassLoader()
.getResourceAsStream("org/bdware/server/stopwords.txt")),
StandardCharsets.UTF_8)));
Analyzer analyzer = new SmartChineseAnalyzer(stopWords);
JsonObject object = new JsonObject();
String words = "我喜欢区块链,我想试用一下这个,我是做大数据处理的,我是科技局的管理员"; // 从json拿
TokenStream stream = null;
if (json.has("requestID")) object.add("responseID", json.get("requestID"));
if (json.has("requestID"))
object.add("responseID", json.get("requestID"));
try {
stream = analyzer.tokenStream("myfield", words);
CharTermAttribute charTermAtt = stream.addAttribute(CharTermAttribute.class);
@ -511,7 +529,8 @@ public class MetaIndexAction { // public static IndexWriter indexWriter;
public void getMetabyCID(JsonObject json, ResultCallback rc) {
String result = getMetabyCID(json);
JsonObject object = new JsonObject();
if (json.has("requestID")) object.add("responseID", json.get("requestID"));
if (json.has("requestID"))
object.add("responseID", json.get("requestID"));
object.add("result", JsonParser.parseString(result));
object.addProperty("action", "getMetabyCID");
rc.onResult(object);
@ -521,7 +540,8 @@ public class MetaIndexAction { // public static IndexWriter indexWriter;
public void getMetabyOwner(JsonObject json, ResultCallback rc) {
String result = getMetabyOwner(json);
JsonObject object = new JsonObject();
if (json.has("requestID")) object.add("responseID", json.get("requestID"));
if (json.has("requestID"))
object.add("responseID", json.get("requestID"));
object.add("result", JsonParser.parseString(result));
object.addProperty("action", "getMetabyOwner");
rc.onResult(object);
@ -531,7 +551,8 @@ public class MetaIndexAction { // public static IndexWriter indexWriter;
public void getMetabyPubkey(JsonObject json, ResultCallback rc) {
String result = getMetabyPubkey(json);
JsonObject object = new JsonObject();
if (json.has("requestID")) object.add("responseID", json.get("requestID"));
if (json.has("requestID"))
object.add("responseID", json.get("requestID"));
object.add("result", JsonParser.parseString(result));
object.addProperty("action", "getMetabyPubkey");
rc.onResult(object);

View File

@ -35,8 +35,9 @@ public class NCClientActions {
map.put("id", OtherNCProxy.instance.sm2.getPublicKeyStr());
byte[] signature = "no signature".getBytes();
try {
signature = SM2Util.sign(OtherNCProxy.instance.sm2.getPrivateKeyParameter(), (OtherNCProxy.instance.sm2.getPublicKeyStr() + json.get("session").getAsString())
.getBytes());
signature = SM2Util.sign(OtherNCProxy.instance.sm2.getPrivateKeyParameter(),
(OtherNCProxy.instance.sm2.getPublicKeyStr()
+ json.get("session").getAsString()).getBytes());
} catch (CryptoException e) {
e.printStackTrace();
}
@ -62,8 +63,8 @@ public class NCClientActions {
if (json.has("empty")) {
LOGGER.info("receive from NC ,do not have any cp.");
} else {
info = JsonUtil.fromJson(json.get("contracts").getAsString(), new TypeToken<List<OtherNCInfo>>() {
}.getType());
info = JsonUtil.fromJson(json.get("contracts").getAsString(),
new TypeToken<List<OtherNCInfo>>() {}.getType());
}
LOGGER.info(JsonUtil.toJson(info));

View File

@ -67,16 +67,11 @@ public class NCClientHandler extends SimpleChannelInboundHandler<Object> {
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf bb = (ByteBuf) msg;
try {
final JsonObject arg =
new JsonParser()
.parse(new InputStreamReader(new ByteBufInputStream(bb)))
.getAsJsonObject();
final JsonObject arg = new JsonParser()
.parse(new InputStreamReader(new ByteBufInputStream(bb))).getAsJsonObject();
if (arg.has("action")) {
final String action = arg.get("action").getAsString();
ae.handle(
action,
arg,
new ResultCallback() {
ae.handle(action, arg, new ResultCallback() {
@Override
public void onResult(String str) {
sendMsg(str);
@ -100,7 +95,8 @@ public class NCClientHandler extends SimpleChannelInboundHandler<Object> {
public void close() {
try {
isConnected = false;
if (channel != null) channel.close();
if (channel != null)
channel.close();
} catch (Exception e) {
e.printStackTrace();
} finally {

View File

@ -15,7 +15,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class NCElectMasterUtil {
public static final Map<String, ElectInfo> electInfos = new ConcurrentHashMap<>(); //key is contractID
public static final Map<String, ElectInfo> electInfos = new ConcurrentHashMap<>(); // key is
// contractID
private static final Logger LOGGER = LogManager.getLogger(NCElectMasterUtil.class);
public static class ElectInfo {
@ -38,11 +39,10 @@ public class NCElectMasterUtil {
uniNumber = uni;
String[] mem = members.split(",");
/* try {
Thread.sleep(2000); //让NC发现崩溃节点
} catch (InterruptedException e) {
e.printStackTrace();
}*/
/*
* try { Thread.sleep(2000); //让NC发现崩溃节点 } catch (InterruptedException e) {
* e.printStackTrace(); }
*/
for (String memID : mem) {
if (memID == null || memID.length() == 0)
@ -52,8 +52,7 @@ public class NCElectMasterUtil {
onlineNum++;
}
}
NodeCenterServer.scheduledThreadPool.scheduleWithFixedDelay(
() -> {
NodeCenterServer.scheduledThreadPool.scheduleWithFixedDelay(() -> {
// cancel the election if no nodes find the master's crash in delay + 2 seconds
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss.SSS");
if (System.currentTimeMillis() - lastTime > (delay + 15000)) {
@ -64,14 +63,11 @@ public class NCElectMasterUtil {
// if (electInfos.containsKey(contractID) && nodeID2LastExe.size() == onlineNum) {
// elect();
// }
},
delay + 1500,
delay + 1500,
TimeUnit.MILLISECONDS);
}, delay + 1500, delay + 1500, TimeUnit.MILLISECONDS);
// timer.schedule(task, dealy + 2000);
LOGGER.info("new election of contract " + contractID + " is added to electInfos, " +
onlineNum + " node is online");
LOGGER.info("new election of contract " + contractID + " is added to electInfos, "
+ onlineNum + " node is online");
}
public void cancel() {
@ -84,7 +80,8 @@ public class NCElectMasterUtil {
}
}
public synchronized void put(String nodeID, int lastExe, String master, String mem2, String uniNum) {
public synchronized void put(String nodeID, int lastExe, String master, String mem2,
String uniNum) {
LOGGER.info("put nodeID=" + nodeID);
// 确保该合约某时只能有一个选举,其他的选举请求被忽略
@ -111,7 +108,8 @@ public class NCElectMasterUtil {
cancel();
synchronized (electInfos) {
// electInfos.remove(contractID);
NCElectMasterUtil.electInfos.put(contractID, new ElectInfo(master, contractID, mem2, uniNum));
NCElectMasterUtil.electInfos.put(contractID,
new ElectInfo(master, contractID, mem2, uniNum));
ElectInfo eleInfo = electInfos.get(contractID);
eleInfo.put(nodeID, lastExe, master, mem2, uniNum);
}
@ -143,9 +141,11 @@ public class NCElectMasterUtil {
if (node != null) {
synchronized (node) {
for (ContractDesp cd : node.contracts) {
if (cd.contractID.equals(contractID) || cd.contractName.equals(contractID)) {
if (cd.contractID.equals(contractID)
|| cd.contractName.equals(contractID)) {
cd.setIsMaster(false);
LOGGER.debug("设置节点 " + node.pubKey.substring(0, 5) + " 的合约 " + contractID + " isMaster=" + false);
LOGGER.debug("设置节点 " + node.pubKey.substring(0, 5) + " 的合约 "
+ contractID + " isMaster=" + false);
break;
}
}

View File

@ -47,10 +47,8 @@ public class NCHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
static LogActions logActions = new LogActions(managerAction);
static UnitActions unitActions = new UnitActions(managerAction);
private static final ActionExecutor<ResultCallback, JsonObject> actionExecutor =
new ActionExecutor<ResultCallback, JsonObject>(
NodeCenterFrameHandler.executorService,
new NodeCenterActions(null),
managerAction, logActions, unitActions) {
new ActionExecutor<ResultCallback, JsonObject>(NodeCenterFrameHandler.executorService,
new NodeCenterActions(null), managerAction, logActions, unitActions) {
@Override
public boolean checkPermission(Action a, JsonObject arg, long per) {
@ -78,21 +76,14 @@ public class NCHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
if (arg.has("pubKey")) {
pubkey = arg.get("pubKey").getAsString();
}
NodeCenterServer.nodeHttpLogDB.put(
action,
"{\"action\":\""
+ action
+ "\",\"pubKey\":\""
+ pubkey
+ "\",\"date\":"
+ System.currentTimeMillis()
+ "}");
NodeCenterServer.nodeHttpLogDB.put(action,
"{\"action\":\"" + action + "\",\"pubKey\":\"" + pubkey + "\",\"date\":"
+ System.currentTimeMillis() + "}");
LOGGER.debug("[NCHttpHandler] flag = " + flag + " flag2 = " + flag2);
return flag && flag2;
}
};
static TypeToken<Map<String, String>> token = new TypeToken<Map<String, String>>() {
};
static TypeToken<Map<String, String>> token = new TypeToken<Map<String, String>>() {};
private final HttpFileHandleAdapter fileAdapter;
// public static ExecutorService executor = Executors.newFixedThreadPool(5);
public AtomicInteger counter = new AtomicInteger(0);
@ -149,20 +140,15 @@ public class NCHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
@URIPath(method = HttpMethod.OPTIONS)
public void crossOrigin(ChannelHandlerContext ctx, FullHttpRequest request) {
DefaultFullHttpResponse fullResponse =
new DefaultFullHttpResponse(
request.protocolVersion(),
OK,
Unpooled.wrappedBuffer("success".getBytes()));
DefaultFullHttpResponse fullResponse = new DefaultFullHttpResponse(
request.protocolVersion(), OK, Unpooled.wrappedBuffer("success".getBytes()));
fullResponse.headers().remove("Access-Control-Allow-Origin");
fullResponse.headers().remove("Access-Control-Allow-Headers");
fullResponse.headers().add("Access-Control-Allow-Origin", "*");
fullResponse.headers().add("Access-Control-Allow-Methods", "*");
fullResponse
.headers()
.add("Access-Control-Allow-Headers",
"Content-Type, Cookie, Accept-Encoding, User-Agent, Host, Referer, " +
"X-Requested-With, Accept, Accept-Language, Cache-Control, Connection");
fullResponse.headers().add("Access-Control-Allow-Headers",
"Content-Type, Cookie, Accept-Encoding, User-Agent, Host, Referer, "
+ "X-Requested-With, Accept, Accept-Language, Cache-Control, Connection");
ChannelFuture f = ctx.write(fullResponse);
f.addListener(ChannelFutureListener.CLOSE);
LOGGER.info("[Option] received!");
@ -189,14 +175,13 @@ public class NCHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
JsonObject transformedParam = new JsonObject();
for (String key : parameters.keySet()) {
List<String> val = parameters.get(key);
if (val != null) transformedParam.addProperty(key, val.get(0));
if (val != null)
transformedParam.addProperty(key, val.get(0));
}
return transformedParam;
}
@URIPath(
method = org.bdware.server.http.HttpMethod.GET,
value = {"/doip"})
@URIPath(method = org.bdware.server.http.HttpMethod.GET, value = {"/doip"})
public void handleDOIP(ChannelHandlerContext ctx, FullHttpRequest req) {
ByteBuf content = req.content();
JsonObject map = parseArgInQuery(req);
@ -209,10 +194,7 @@ public class NCHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
}
injectPermission(map, str);
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
OK,
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, OK,
Unpooled.wrappedBuffer(MetaIndexAction.search(map).getBytes()));
response.headers().add("Access-Control-Allow-Origin", "*");
response.headers().add("Access-Control-Allow-Methods", "*");
@ -224,8 +206,7 @@ public class NCHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
LOGGER.debug("[NCHttpHandler] TID:" + Thread.currentThread().getId() +
msg.toString());
LOGGER.debug("[NCHttpHandler] TID:" + Thread.currentThread().getId() + msg.toString());
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
handler.handle(ctx, request);
@ -234,9 +215,8 @@ public class NCHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
public static String getRole(String pubKey) {
try {
String ret =
KeyValueDBUtil.instance.getValue(
NCTables.ConfigDB.toString(), "__CenterManager__");
String ret = KeyValueDBUtil.instance.getValue(NCTables.ConfigDB.toString(),
"__CenterManager__");
if (ret != null && ret.length() > 0) {
boolean isCenterManager = (ret.equals(pubKey)); // 表示此节点是否是平台管理员
ret = KeyValueDBUtil.instance.getValue(NCTables.NodeUser.toString(), pubKey);
@ -313,7 +293,8 @@ public class NCHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
if (map.has("callback"))
cb = new HttpResultCallback(ctx, map.get("callback").getAsString());
else cb = new HttpResultCallback(ctx, null);
else
cb = new HttpResultCallback(ctx, null);
cb.addHeader("Access-Control-Allow-Origin", "*");
cb.addHeader("Access-Control-Allow-Methods", "*");
cb.addHeader("charset", "utf-8");
@ -321,19 +302,15 @@ public class NCHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
}
actionExecutor.handle(action, map, cb);
} else {
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, OK, Unpooled.wrappedBuffer(ret.getBytes()));
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
OK, Unpooled.wrappedBuffer(ret.getBytes()));
response.headers().add("Access-Control-Allow-Origin", "*");
response.headers().add("Access-Control-Allow-Methods", "*");
ChannelFuture f = ctx.write(response);
f.addListener(ChannelFutureListener.CLOSE);
}
} catch (IllegalArgumentException e) {
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
OK,
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, OK,
Unpooled.wrappedBuffer(e.getMessage().getBytes()));
response.headers().add("Access-Control-Allow-Origin", "*");
response.headers().add("Access-Control-Allow-Methods", "*");
@ -345,10 +322,7 @@ public class NCHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
ByteArrayOutputStream bo = new ByteArrayOutputStream();
e.printStackTrace(new PrintStream(bo));
ret.put("msg", bo.toString());
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
OK,
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, OK,
Unpooled.wrappedBuffer(JsonUtil.toJson(ret).getBytes()));
response.headers().add("Access-Control-Allow-Origin", "*");
response.headers().add("Access-Control-Allow-Methods", "*");

View File

@ -27,11 +27,9 @@ public class NCManagerAction {
public static final String centerManger = "__CenterManager__";
public static final String clusterName = "__ClusterName__";
static final String Licence = "__LICENCE___";
static final ECPublicKeyParameters licencePub =
BCECUtil.createECPublicKeyFromStrParameters(
static final ECPublicKeyParameters licencePub = BCECUtil.createECPublicKeyFromStrParameters(
"04844412ecb77b07d5ad7097c488ae9dff1013b310d2311f8bd84c491642011e1a6a7041bae8c2ad75da29c27e320bd430abc723cf6bcb0490afc82cc26e6d5637",
SM2Util.CURVE,
SM2Util.DOMAIN_PARAMS);
SM2Util.CURVE, SM2Util.DOMAIN_PARAMS);
private static final Logger LOGGER = LogManager.getLogger(NCManagerAction.class);
String sessionID = null;
private String publicKey;
@ -45,7 +43,8 @@ public class NCManagerAction {
public String getPubKey(JsonObject jo) {
try {
if (publicKey != null) return publicKey;
if (publicKey != null)
return publicKey;
return jo.get("pubKey").getAsString();
} catch (Exception e) {
}
@ -55,8 +54,7 @@ public class NCManagerAction {
public static boolean isCenterManager(String pubkey) {
String ret = KeyValueDBUtil.instance.getValue(NCTables.ConfigDB.toString(), centerManger);
LOGGER.debug("centerManger: " + ret);
return !StringUtil.isNullOrEmpty(ret)
&& !StringUtil.isNullOrEmpty(pubkey)
return !StringUtil.isNullOrEmpty(ret) && !StringUtil.isNullOrEmpty(pubkey)
&& pubkey.equals(ret);
}
@ -104,8 +102,8 @@ public class NCManagerAction {
@Action(userPermission = 0)
public void setClusterName(JsonObject json, ResultCallback resultCallback) {
if (json.has("name")) {
KeyValueDBUtil.instance.setValue(
NCTables.ConfigDB.toString(), clusterName, json.get("name").getAsString());
KeyValueDBUtil.instance.setValue(NCTables.ConfigDB.toString(), clusterName,
json.get("name").getAsString());
simpleReply(resultCallback, "onSetClusterName", "success");
} else {
simpleReply(resultCallback, "onSetClusterName", "failed");
@ -140,10 +138,10 @@ public class NCManagerAction {
handler.setPermission(Role.compoundValue(role.split(",")));
simpleReply(resultCallback, "onLogin", role);
} else {
KeyValueDBUtil.instance.setValue(
NCTables.ConfigDB.toString(), centerManger, pubKey);
KeyValueDBUtil.instance.setValue(
NCTables.ConfigDB.toString(), clusterName, "clusterName_" + pubKey.substring(0, 5));
KeyValueDBUtil.instance.setValue(NCTables.ConfigDB.toString(), centerManger,
pubKey);
KeyValueDBUtil.instance.setValue(NCTables.ConfigDB.toString(), clusterName,
"clusterName_" + pubKey.substring(0, 5));
handler.setPermission(0x30000ffL);
simpleReply(resultCallback, "onLogin", "CenterManager");
}
@ -185,18 +183,22 @@ public class NCManagerAction {
if (str.contains(Role.CenterManager.toString())) {
if (json.has("newPubKey")) {
String newPubKey = json.get("newPubKey").getAsString();
KeyValueDBUtil.instance.setValue(NCTables.ConfigDB.toString(), centerManger, newPubKey);
KeyValueDBUtil.instance.setValue(
NCTables.NodeUser.toString(), newPubKey, Role.CenterManager.toString());
resultCallback.onResult("{\"action\":\"onResetCenterManager\",\"data\":\"success\",\"pubKey\":\""
KeyValueDBUtil.instance.setValue(NCTables.ConfigDB.toString(), centerManger,
newPubKey);
KeyValueDBUtil.instance.setValue(NCTables.NodeUser.toString(), newPubKey,
Role.CenterManager.toString());
resultCallback.onResult(
"{\"action\":\"onResetCenterManager\",\"data\":\"success\",\"pubKey\":\""
+ newPubKey + "\"}");
} else {
// just keep the same
resultCallback.onResult("{\"action\":\"onResetCenterManager\",\"data\":\"success\",\"pubKey\":\""
resultCallback.onResult(
"{\"action\":\"onResetCenterManager\",\"data\":\"success\",\"pubKey\":\""
+ getPubKey(json) + "\"}");
}
} else {
resultCallback.onResult("{\"action\":\"onResetCenterManager\",\"data\":\"failed, no permission\"}");
resultCallback.onResult(
"{\"action\":\"onResetCenterManager\",\"data\":\"failed, no permission\"}");
}
}
@ -225,14 +227,10 @@ public class NCManagerAction {
@Action(userPermission = 1 << 6)
public void addNode(JsonObject json, ResultCallback resultCallback) {
try {
KeyValueDBUtil.instance.setValue(
NCTables.NodeUser.toString(),
json.get("nodePubKey").getAsString(),
Role.Node.toString());
KeyValueDBUtil.instance.setValue(
NCTables.NodeTime.toString(),
json.get("nodePubKey").getAsString(),
Long.toString(new Date().getTime()));
KeyValueDBUtil.instance.setValue(NCTables.NodeUser.toString(),
json.get("nodePubKey").getAsString(), Role.Node.toString());
KeyValueDBUtil.instance.setValue(NCTables.NodeTime.toString(),
json.get("nodePubKey").getAsString(), Long.toString(new Date().getTime()));
simpleReply(resultCallback, "onAddNodes", "success");
} catch (Exception e) {
e.printStackTrace();
@ -251,14 +249,14 @@ public class NCManagerAction {
}
if (str == null || str.length() == 0) {
KeyValueDBUtil.instance.setValue(NCTables.ApplyRole.toString(), pubKey, role.name());
KeyValueDBUtil.instance.setValue(
NCTables.ApplyTime.toString(), pubKey, Long.toString(new Date().getTime()));
KeyValueDBUtil.instance.setValue(NCTables.ApplyTime.toString(), pubKey,
Long.toString(new Date().getTime()));
} else {
if (!str.contains(role.name())) {
KeyValueDBUtil.instance.setValue(
NCTables.ApplyRole.toString(), pubKey, str + "," + role.name());
KeyValueDBUtil.instance.setValue(
NCTables.ApplyTime.toString(), pubKey, Long.toString(new Date().getTime()));
KeyValueDBUtil.instance.setValue(NCTables.ApplyRole.toString(), pubKey,
str + "," + role.name());
KeyValueDBUtil.instance.setValue(NCTables.ApplyTime.toString(), pubKey,
Long.toString(new Date().getTime()));
}
}
simpleReply(resultCallback, "onApplyRole", "success!");
@ -310,8 +308,8 @@ public class NCManagerAction {
already = roles;
}
KeyValueDBUtil.instance.setValue(NCTables.NodeUser.toString(), pubKey, already);
KeyValueDBUtil.instance.setValue(
NCTables.NodeTime.toString(), pubKey, Long.toString(new Date().getTime()));
KeyValueDBUtil.instance.setValue(NCTables.NodeTime.toString(), pubKey,
Long.toString(new Date().getTime()));
KeyValueDBUtil.instance.delete(NCTables.ApplyRole.toString(), pubKey);
KeyValueDBUtil.instance.delete(NCTables.ApplyTime.toString(), pubKey);
simpleReply(resultCallback, "onAuthNodeManager", "success");
@ -445,13 +443,14 @@ public class NCManagerAction {
SM2Util.verify(licencePub, content.getBytes(), ByteUtils.fromHexString(sign));
if (verify) {
KeyValueDBUtil.instance.setValue(
NCTables.ConfigDB.toString(), Licence, json.toString());
KeyValueDBUtil.instance.setValue(NCTables.ConfigDB.toString(), Licence,
json.toString());
simpleReply(resultCallback, "onUpdateLicence", "success");
long end = System.currentTimeMillis();
LOGGER.debug("[listLicence:time]" + (end - start));
return;
} else simpleReply(resultCallback, "onUpdateLicence", "failed");
} else
simpleReply(resultCallback, "onUpdateLicence", "failed");
long end = System.currentTimeMillis();
LOGGER.debug("[updateLicence:time]" + (end - start));
}
@ -535,14 +534,8 @@ public class NCManagerAction {
Map<String, String> result = new HashMap<>();
result.put("action", "onTransfer");
result.put(
"data",
"now start transfer contract "
+ contractID
+ " from node:"
+ node1ID
+ " to node:"
+ node2ID);
result.put("data", "now start transfer contract " + contractID + " from node:" + node1ID
+ " to node:" + node2ID);
resultCallback.onResult(result);
}
}

View File

@ -1,19 +1,7 @@
package org.bdware.server.nodecenter;
public enum NCTables {
NodeUser,
NodeTime,
ApplyRole,
ApplyTime,
NodeHttpLog,
ContractMeta,
ConfigDB,
NodeTcpLog,
NodesDB,
TrustUnitsDB,
OtherNCs,
NCFile,
ApplyNodeManager;
NodeUser, NodeTime, ApplyRole, ApplyTime, NodeHttpLog, ContractMeta, ConfigDB, NodeTcpLog, NodesDB, TrustUnitsDB, OtherNCs, NCFile, ApplyNodeManager;
public String toString() {
return "NC_" + super.toString();

View File

@ -82,11 +82,8 @@ public class NCUDPRunner extends Thread {
private void onUDPMessage(UDPMessage msg, DatagramPacket request) {
switch (msg.type) {
case shakeHand:
LOGGER.debug(
"[NCUDPRunner] shakeHand:"
+ request.getAddress().getHostAddress()
+ ":"
+ request.getPort());
LOGGER.debug("[NCUDPRunner] shakeHand:" + request.getAddress().getHostAddress()
+ ":" + request.getPort());
id2IP.put(msg.id, new UDPNode(request.getSocketAddress()));
default:
}

View File

@ -28,8 +28,8 @@ public class NodeCenterActions {
// static SyncResult syncResult = new SyncResult();
private static final Logger LOGGER = LogManager.getLogger(NodeCenterActions.class);
private static final String MISSING_ARGUMENT = "Missing arguments";
public static Map<String, CMNode> nodeInfos =
new ConcurrentHashMap<>(); // all online nodes,key是节点pubKey
public static Map<String, CMNode> nodeInfos = new ConcurrentHashMap<>(); // all online
// nodes,key是节点pubKey
public static SyncResult sync = new SyncResult();
@ -58,7 +58,8 @@ public class NodeCenterActions {
public static ContractDesp getContractByID(String key) {
for (CMNode node : nodeInfos.values()) {
for (ContractDesp desp : node.contracts) {
if (desp.contractID.equals(key)) return desp;
if (desp.contractID.equals(key))
return desp;
}
}
return null;
@ -67,7 +68,8 @@ public class NodeCenterActions {
public static ContractDesp getContractByName(String key) {
for (CMNode node : nodeInfos.values()) {
for (ContractDesp desp : node.contracts) {
if (desp.contractID.equals(key) || desp.contractName.equals(key)) return desp;
if (desp.contractID.equals(key) || desp.contractName.equals(key))
return desp;
}
}
return null;
@ -88,8 +90,7 @@ public class NodeCenterActions {
String pubKey = args.get("pubKey").getAsString();
String signature = args.get("sign").getAsString();
String nonce = args.get("nonce").getAsString();
if (StringUtil.isNullOrEmpty(pubKey)
|| StringUtil.isNullOrEmpty(signature)
if (StringUtil.isNullOrEmpty(pubKey) || StringUtil.isNullOrEmpty(signature)
|| StringUtil.isNullOrEmpty(nonce)) {
resultCallback.onResult("missing pubkey/sign/nonce");
}
@ -103,8 +104,8 @@ public class NodeCenterActions {
}
String uuid =
ByteUtil.encodeBASE64(HardwareInfo.getCPUID().getBytes()).replaceAll("\n", "");
((HttpResultCallback) resultCallback)
.addHeader("content-disposition", "attachment;filename=encodeduuid.key");
((HttpResultCallback) resultCallback).addHeader("content-disposition",
"attachment;filename=encodeduuid.key");
resultCallback.onResult(uuid);
} else {
resultCallback.onResult("permission denied, not ContractManager");
@ -161,10 +162,8 @@ public class NodeCenterActions {
rc.onResult("{\"action\":\"syncContractInfo\"}");
}
}
rc.onResult(
"{\"action\":\"syncPong\",\"requestID\":\""
+ json.get("requestID").getAsString()
+ "\"}");
rc.onResult("{\"action\":\"syncPong\",\"requestID\":\""
+ json.get("requestID").getAsString() + "\"}");
}
@Action(userPermission = 0)
@ -189,15 +188,10 @@ public class NodeCenterActions {
LOGGER.info("Role=" + ret + " --> pubkey=" + pubKey);
if (null == ret || ret.isEmpty()) {
/*
KeyValueDBUtil.instance.setValue(
NCTables.NodeUser.toString(),
pubKey,
Role.Node.toString());
KeyValueDBUtil.instance.setValue(
NCTables.NodeTime.toString(),
pubKey,
Long.toString(new Date().getTime()));
return Role.Node.getValue();
* KeyValueDBUtil.instance.setValue( NCTables.NodeUser.toString(), pubKey,
* Role.Node.toString()); KeyValueDBUtil.instance.setValue(
* NCTables.NodeTime.toString(), pubKey, Long.toString(new Date().getTime()));
* return Role.Node.getValue();
*/
return Role.Anonymous.getValue();
// TODO fix permission bugs.
@ -252,8 +246,8 @@ public class NodeCenterActions {
nodeInfos.put(pubkey, node);
if (json.has("nodeName")) {
node.nodeName = json.get("nodeName").getAsString();
KeyValueDBUtil.instance.setValue(
NCTables.NodesDB.toString(), pubkey, node.nodeName);
KeyValueDBUtil.instance.setValue(NCTables.NodesDB.toString(), pubkey,
node.nodeName);
LOGGER.info("set node name: " + nodeShortID + " -> " + node.nodeName);
}
if (json.has("masterAddress")) {
@ -274,7 +268,8 @@ public class NodeCenterActions {
// TODO 如果是NC挂了其他节点重连之后需要另外考虑
// 向该合约发送请求令其查看自己崩溃前的集群合约
// LOGGER.info("NodeCenter tells nodes new masters for contracts created before the crash!");
// LOGGER.info("NodeCenter tells nodes new masters for contracts created before the
// crash!");
// Map<String, String> request = new HashMap<>();
// request.put("action", "queryUnitContractsID");
// try {
@ -319,7 +314,8 @@ public class NodeCenterActions {
if (sessionID != null && SM2Util.plainStrVerify(pubkey, pubkey + sessionID, signature)) {
r.data = "success";
NCConFlag = true;
} else r.data = "failed";
} else
r.data = "failed";
rc.onResult(JsonUtil.toJson(r));
}
@ -361,11 +357,8 @@ public class NodeCenterActions {
// 如果有就更改json串并发消息给该节点将Contract的isMaster设置为false
Set<String> contractIDS = new HashSet<>(); // 旧master需要自己非master的合约
// 当前节点的合约
List<ContractDesp> contracts =
JsonUtil.fromJson(
json.get("contracts"),
new TypeToken<List<ContractDesp>>() {
}.getType());
List<ContractDesp> contracts = JsonUtil.fromJson(json.get("contracts"),
new TypeToken<List<ContractDesp>>() {}.getType());
// MetaIndexAction.updateContractsIndex(contracts, rc);
LOGGER.debug("update contracts: " + json.get("contracts"));
int version = -1;
@ -377,7 +370,8 @@ public class NodeCenterActions {
node.events = json.get("events").getAsInt();
}
// just ignore recover
if (nodeInfos != null) return;
if (nodeInfos != null)
return;
// 遍历所有节点
for (CMNode cmNode : nodeInfos.values()) {
if (cmNode.pubKey.equals(nodeID)) {
@ -406,14 +400,9 @@ public class NodeCenterActions {
}
if (desp.getIsMaster() && thisDesp.getIsMaster()) {
LOGGER.info(
"合约 "
+ thisDesp.contractID
+ " 的旧的master "
+ node.pubKey.substring(0, 5)
+ "重新上线后和现在master "
+ cmNode.pubKey.substring(0, 5)
+ "撞车!");
LOGGER.info("合约 " + thisDesp.contractID + " 的旧的master "
+ node.pubKey.substring(0, 5) + "重新上线后和现在master "
+ cmNode.pubKey.substring(0, 5) + "撞车!");
contractIDS.add(thisDesp.contractID);
}
}
@ -424,11 +413,7 @@ public class NodeCenterActions {
for (ContractDesp cd : contracts) {
if (contractIDS.contains(cd.contractID)) {
cd.setIsMaster(false);
LOGGER.info(
"设置合约 "
+ cd.contractID
+ " 的旧master "
+ node.pubKey.substring(0, 5)
LOGGER.info("设置合约 " + cd.contractID + " 的旧master " + node.pubKey.substring(0, 5)
+ " isMaster为false!");
con_ids.append(",").append(cd.contractID);
}
@ -458,10 +443,8 @@ public class NodeCenterActions {
return;
}
for (CMNode node : nodeInfos.values()) {
if (node.nodeName.equals(nodeName)
|| node.pubKey.equals(nodeName)
|| node.udpID.equals(nodeName)
|| node.ipPort.equals(nodeName)) {
if (node.nodeName.equals(nodeName) || node.pubKey.equals(nodeName)
|| node.udpID.equals(nodeName) || node.ipPort.equals(nodeName)) {
String requestID = System.currentTimeMillis() + "_" + (int) (Math.random() * 10000);
Map<String, String> req = new HashMap<>();
req.put("requestID", requestID);
@ -490,10 +473,11 @@ public class NodeCenterActions {
}
String requesterID = json.get("requesterNodeID").getAsString();
CMNode node = nodeInfos.get(requesterID);
if (node != null) node.connection.sendContractResult(json);
if (node != null)
node.connection.sendContractResult(json);
if (requesterID.equals("NodeCenter")) {
sync.wakeUp(
json.get("requestID").getAsString(), json.get("contractResult").getAsString());
sync.wakeUp(json.get("requestID").getAsString(),
json.get("contractResult").getAsString());
}
}
@ -506,10 +490,7 @@ public class NodeCenterActions {
rc.onResult(JsonUtil.toJson(r));
return;
}
LOGGER.debug(
"tid:"
+ Thread.currentThread().getId()
+ "executeContractOnOtherNodes\n"
LOGGER.debug("tid:" + Thread.currentThread().getId() + "executeContractOnOtherNodes\n"
+ JsonUtil.toJson(json));
String requestID = json.get("requestID").getAsString();
try {
@ -518,7 +499,8 @@ public class NodeCenterActions {
ContractRequest cr = JsonUtil.fromJson(crStr, ContractRequest.class);
String requesterNodeID = nodeID;
for (String nodeID : nodeInfos.keySet()) {
if (nodeID.equals(this.nodeID)) continue;
if (nodeID.equals(this.nodeID))
continue;
node = nodeInfos.get(nodeID);
if (node != null && node.contracts != null)
for (ContractDesp cd : node.contracts) {
@ -529,11 +511,8 @@ public class NodeCenterActions {
jo.addProperty("requesterNodeID", requesterNodeID);
jo.addProperty("action", "executeContractLocally");
jo.addProperty("contractRequest", crStr);
LOGGER.debug(
"tid:"
+ Thread.currentThread().getId()
+ "execute in node:"
+ node.nodeName);
LOGGER.debug("tid:" + Thread.currentThread().getId()
+ "execute in node:" + node.nodeName);
node.connection.controller.sendMsg(JsonUtil.toJson(jo));
return;
}
@ -543,9 +522,7 @@ public class NodeCenterActions {
e.printStackTrace();
}
LOGGER.debug("tid:" + Thread.currentThread().getId() + "can't locate in NodeCenter!");
ContractResult cResult =
new ContractResult(
Status.Error,
ContractResult cResult = new ContractResult(Status.Error,
new JsonPrimitive("can't locate the Contract in node center"));
JsonObject jo = new JsonObject();
jo.addProperty("action", "onReceiveContractExecution");
@ -591,21 +568,15 @@ public class NodeCenterActions {
response.put("data", "can not locate contractID: " + json.get("contractID"));
cb.onResult(JsonUtil.toJson(response));
}
sync.sleep(
requestID,
new ResultCallback() {
sync.sleep(requestID, new ResultCallback() {
@Override
public void onResult(String str) {
ContractResult ret = JsonUtil.fromJson(str, ContractResult.class);
Map<String, Object> converted = new HashMap<>();
converted.put("status", ret.status);
try {
List<Map<String, String>> je =
JsonUtil.fromJson(
ret.result,
new TypeToken<
List<Map<String, String>>>() {
}.getType());
List<Map<String, String>> je = JsonUtil.fromJson(ret.result,
new TypeToken<List<Map<String, String>>>() {}.getType());
converted.put("result", je);
} catch (Exception e) {
converted.put("result", ret.result);
@ -616,216 +587,131 @@ public class NodeCenterActions {
});
}
/* // judge the just online node whwther need to restart contracts,send the contracts'info
public static void restartContracts(String nodePubKey) {
if (!recoverMap.containsKey(nodePubKey)) {
return;
}
// 恢复该节点的每一个集群运行的合约
for (ContractRecord record : recoverMap.get(nodePubKey).values()) {
String contractID = record.contractID;
if (record.recoverFlag != RecoverFlag.ToRecover) continue;
// 先发消息让恢复节点的该合约收到消息后只加入队列不dealRequests
Map<String, String> request = new HashMap<>();
request.put("action", "setRecovering");
request.put("contractID", contractID);
CMNode node = nodeinfos.get(nodePubKey);
node.connection.controller.sendMsg(gson.toJson(request));
System.out.println(
"第一步 : [NodeCeterActions] restartContracts 开始处理合约 contractID=" + contractID);
if (contractID2Members.containsKey(contractID)) {
// 在nodeinfos中找节点该节点的pubKey在pubKeys中
MultiPointContractInfo info = contractID2Members.get(contractID);
CMNode cmNode = null;
for (int i = 0; i < info.members.size(); i++) {
int size = info.members.size();
String tempNodeID = info.members.get(record.order.incrementAndGet() % size);
if (nodeinfos.containsKey(tempNodeID))
cmNode = NodeCenterActions.nodeinfos.get(tempNodeID);
else continue;
System.out.println("查询节点 " + cmNode.nodeName);
if (cmNode != null && !cmNode.pubKey.equals(nodePubKey)) {
System.out.println("第二步 : [NodeCenterActions] 找到一个依赖恢复节点,其节点名为 " +
cmNode.nodeName);
Map<String, String> req = new HashMap<String, String>();
req.put("action", "dumpCurrentState");
req.put("contractID", contractID);
req.put("targetNodePubkey", nodePubKey);
// NC向该节点发送请求让其存储自身当前状态并发给NC
cmNode.connection.controller.sendMsg(gson.toJson(req));
return;
}
}
if (cmNode == null) {
logger.debug("[NodeCenterActions] Can't find a recover rely node!");
}
}
}
}*/
/*
* // judge the just online node whwther need to restart contracts,send the contracts'info
* public static void restartContracts(String nodePubKey) { if
* (!recoverMap.containsKey(nodePubKey)) { return; }
*
* // 恢复该节点的每一个集群运行的合约 for (ContractRecord record : recoverMap.get(nodePubKey).values()) {
* String contractID = record.contractID; if (record.recoverFlag != RecoverFlag.ToRecover)
* continue;
*
* // 先发消息让恢复节点的该合约收到消息后只加入队列不dealRequests Map<String, String> request = new HashMap<>();
* request.put("action", "setRecovering"); request.put("contractID", contractID); CMNode node =
* nodeinfos.get(nodePubKey); node.connection.controller.sendMsg(gson.toJson(request));
*
* System.out.println( "第一步 : [NodeCeterActions] restartContracts 开始处理合约 contractID=" +
* contractID);
*
* if (contractID2Members.containsKey(contractID)) {
*
* // 在nodeinfos中找节点该节点的pubKey在pubKeys中 MultiPointContractInfo info =
* contractID2Members.get(contractID); CMNode cmNode = null; for (int i = 0; i <
* info.members.size(); i++) { int size = info.members.size(); String tempNodeID =
* info.members.get(record.order.incrementAndGet() % size);
*
* if (nodeinfos.containsKey(tempNodeID)) cmNode = NodeCenterActions.nodeinfos.get(tempNodeID);
* else continue;
*
* System.out.println("查询节点 " + cmNode.nodeName);
*
* if (cmNode != null && !cmNode.pubKey.equals(nodePubKey)) {
* System.out.println("第二步 : [NodeCenterActions] 找到一个依赖恢复节点,其节点名为 " + cmNode.nodeName);
*
* Map<String, String> req = new HashMap<String, String>(); req.put("action",
* "dumpCurrentState"); req.put("contractID", contractID); req.put("targetNodePubkey",
* nodePubKey);
*
* // NC向该节点发送请求让其存储自身当前状态并发给NC cmNode.connection.controller.sendMsg(gson.toJson(req));
*
* return; } }
*
* if (cmNode == null) { logger.debug("[NodeCenterActions] Can't find a recover rely node!"); }
* } } }
*/
// zyx modified
/* @Action(async = true)
public void receiveState(Map<String, String> args, final ResultCallback rc) {
//System.out.println("第五步 : [NodeCenterActions] 开始从恢复依赖节点接收state");
String fileName = args.get("fileName") + "_NC";
boolean isAppend = Boolean.valueOf(args.get("isAppend"));
boolean isDone = Boolean.valueOf(args.get("isDone"));
//logger.debug("[NodeCenterActions] isAppend=" + isAppend + " isDone=" + isDone);
String path = "./temp/stateFiles/" + fileName;
File file = new File(path);
File dir = file.getParentFile();
if (!dir.exists()) {
dir.mkdirs();
}
FileOutputStream fout = null;
if (!isAppend) {
try {
fout = new FileOutputStream(file);
fileMap.put(fileName, fout);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
} else {
fout = fileMap.get(fileName);
}
if (isDone) {
if (fout != null) {
try {
fout.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// TODO 参数可以加入文件名
//System.out.println("第七步 : [NodeCenterActions] 从恢复依赖节点接收state完成");
//logger.debug("[NodeCenterActions] receive state finish.");
String receiveFileName = new File(dir, fileName).getAbsolutePath();
// 发送state给需要恢复的节点
sendState(fileName, args.get("targetNodePubkey"));
} else {
String data = args.get("data");
try {
String progress = args.get("progress");
// TODO pulish progress
fout.write(ByteUtil.decodeBASE64(data));
} catch (IOException e) {
e.printStackTrace();
}
}
}*/
/*
* @Action(async = true) public void receiveState(Map<String, String> args, final ResultCallback
* rc) { //System.out.println("第五步 : [NodeCenterActions] 开始从恢复依赖节点接收state");
*
* String fileName = args.get("fileName") + "_NC"; boolean isAppend =
* Boolean.valueOf(args.get("isAppend")); boolean isDone = Boolean.valueOf(args.get("isDone"));
*
* //logger.debug("[NodeCenterActions] isAppend=" + isAppend + " isDone=" + isDone);
*
* String path = "./temp/stateFiles/" + fileName; File file = new File(path); File dir =
* file.getParentFile(); if (!dir.exists()) { dir.mkdirs(); }
*
* FileOutputStream fout = null; if (!isAppend) { try { fout = new FileOutputStream(file);
* fileMap.put(fileName, fout); } catch (FileNotFoundException e) { e.printStackTrace(); } }
* else { fout = fileMap.get(fileName); }
*
* if (isDone) { if (fout != null) { try { fout.close(); } catch (IOException e) {
* e.printStackTrace(); } } // TODO 参数可以加入文件名
* //System.out.println("第七步 : [NodeCenterActions] 从恢复依赖节点接收state完成");
*
* //logger.debug("[NodeCenterActions] receive state finish."); String receiveFileName = new
* File(dir, fileName).getAbsolutePath();
*
* // 发送state给需要恢复的节点 sendState(fileName, args.get("targetNodePubkey")); } else { String data =
* args.get("data"); try { String progress = args.get("progress"); // TODO pulish progress
* fout.write(ByteUtil.decodeBASE64(data)); } catch (IOException e) { e.printStackTrace(); } } }
*/
// The resultCallback is an Counter, it will invoke
// in its
// onResult
// zyx modified
/* public void sendState(String path, String targetNodePubkey) {
File project = new File("./temp/stateFiles/" + path);
// 将合约中的memberd加入ContractRecord
ContractRecord record = null;
try {
FileInputStream fileout = new FileInputStream(project);
GZIPInputStream gzin = new GZIPInputStream(fileout);
ObjectInputStream reader = new ObjectInputStream(gzin);
record = (ContractRecord) reader.readObject();
reader.close();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
// record.members = contractID2MembersUDP.get(record.contractID);
//NC delete state file
if(project.isFile() && project.exists()){
project.delete();
}
record.members = new HashMap<>(recoverMap.get(nodeID).get(record.contractID).members);
File file = new File("./temp/stateFiles/" + path);
File parent = file.getParentFile();
if (!parent.exists()) parent.mkdirs();
ObjectOutputStream writer;
try {
FileOutputStream fileout = new FileOutputStream(file);
GZIPOutputStream out = new GZIPOutputStream(fileout);
writer = new ObjectOutputStream(out);
writer.writeObject(record);
writer.flush();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
//System.out.println("第八步 : [NodeCenterActions] 开始向需要恢复的节点转发State");
//record.printContent();
CMNode targetNode = nodeinfos.get(targetNodePubkey);
Map<String, String> req = new HashMap<>();
req.put("action", "receiveState");
req.put("isAppend", "false");
req.put("fileName", path);
req.put("isDone", "false");
String requestID = System.currentTimeMillis() + "_" + (int) (Math.random() * 10000);
req.put("requestID", requestID);
if (project.isFile()) {
try {
FileInputStream fin = new FileInputStream(project);
byte[] buff = new byte[30 * 1024];
long count = 0;
long total = project.length();
for (int len = 0; (len = (fin.read(buff))) > 0; ) {
//logger.debug("len = " + len);
String data = ByteUtil.encodeBASE64(buff, len);
req.put("data", data);
count += len;
targetNode.connection.controller.sendMsg(gson.toJson(req));
req.put("isAppend", "true");
Thread.sleep(300);
}
fin.close();
req.put("isDone", "true");
req.remove("data");
targetNode.connection.controller.sendMsg(gson.toJson(req));
//NC delete state file
if(file.isFile() && file.exists()){
file.delete();
}
//logger.debug("[NodeCenterActions] send state project finish.");
//System.out.println("第十步 : [NodeCenterActions] 向需要恢复的节点转发State完成");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}*/
/*
* public void sendState(String path, String targetNodePubkey) { File project = new
* File("./temp/stateFiles/" + path);
*
* // 将合约中的memberd加入ContractRecord ContractRecord record = null; try { FileInputStream fileout =
* new FileInputStream(project); GZIPInputStream gzin = new GZIPInputStream(fileout);
* ObjectInputStream reader = new ObjectInputStream(gzin); record = (ContractRecord)
* reader.readObject(); reader.close(); } catch (IOException | ClassNotFoundException e) {
* e.printStackTrace(); } // record.members = contractID2MembersUDP.get(record.contractID); //NC
* delete state file if(project.isFile() && project.exists()){ project.delete(); }
*
* record.members = new HashMap<>(recoverMap.get(nodeID).get(record.contractID).members);
*
* File file = new File("./temp/stateFiles/" + path); File parent = file.getParentFile(); if
* (!parent.exists()) parent.mkdirs(); ObjectOutputStream writer; try { FileOutputStream fileout
* = new FileOutputStream(file); GZIPOutputStream out = new GZIPOutputStream(fileout); writer =
* new ObjectOutputStream(out); writer.writeObject(record); writer.flush(); writer.close(); }
* catch (IOException e) { e.printStackTrace(); }
*
* //System.out.println("第八步 : [NodeCenterActions] 开始向需要恢复的节点转发State");
* //record.printContent();
*
* CMNode targetNode = nodeinfos.get(targetNodePubkey);
*
* Map<String, String> req = new HashMap<>(); req.put("action", "receiveState");
* req.put("isAppend", "false"); req.put("fileName", path); req.put("isDone", "false"); String
* requestID = System.currentTimeMillis() + "_" + (int) (Math.random() * 10000);
* req.put("requestID", requestID);
*
* if (project.isFile()) { try { FileInputStream fin = new FileInputStream(project); byte[] buff
* = new byte[30 * 1024]; long count = 0; long total = project.length();
*
* for (int len = 0; (len = (fin.read(buff))) > 0; ) { //logger.debug("len = " + len); String
* data = ByteUtil.encodeBASE64(buff, len); req.put("data", data); count += len;
* targetNode.connection.controller.sendMsg(gson.toJson(req)); req.put("isAppend", "true");
*
* Thread.sleep(300); } fin.close();
*
* req.put("isDone", "true"); req.remove("data");
*
* targetNode.connection.controller.sendMsg(gson.toJson(req)); //NC delete state file
* if(file.isFile() && file.exists()){ file.delete(); }
*
* //logger.debug("[NodeCenterActions] send state project finish.");
*
* //System.out.println("第十步 : [NodeCenterActions] 向需要恢复的节点转发State完成"); } catch (IOException e)
* { e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block
* e.printStackTrace(); } } }
*/
@Action
public void onReceiveSyncRequest(JsonObject jo, final ResultCallback cb) {
@ -886,7 +772,8 @@ public class NodeCenterActions {
boolean hasResult = false;
for (String nodeID : nodeInfos.keySet()) {
if (nodeID.equals(this.nodeID)) continue;
if (nodeID.equals(this.nodeID))
continue;
CMNode node = nodeInfos.get(nodeID);
if (node != null && node.contracts != null)
for (ContractDesp cd : node.contracts) {
@ -898,13 +785,10 @@ public class NodeCenterActions {
jo.addProperty("action", "executeContractLocally");
jo.addProperty("contractRequest", crStr);
node.connection.controller.sendMsg(JsonUtil.toJson(jo));
sync.sleep(
requestID,
new ResultCallback() {
sync.sleep(requestID, new ResultCallback() {
@Override
public void onResult(String str) {
ContractResult cr =
JsonUtil.fromJson(str, ContractResult.class);
ContractResult cr = JsonUtil.fromJson(str, ContractResult.class);
ret.add("result", cr.result);
ret.addProperty("status", cr.status + "");
long executeTime = (System.currentTimeMillis() - start);
@ -919,9 +803,8 @@ public class NodeCenterActions {
}
}
if (!hasResult) {
ContractResult result =
new ContractResult(
Status.Error, new JsonPrimitive("Can't locate contract At NodeCenter"));
ContractResult result = new ContractResult(Status.Error,
new JsonPrimitive("Can't locate contract At NodeCenter"));
ret.addProperty("result", JsonUtil.toJson(result));
long executeTime = (System.currentTimeMillis() - start);
ret.addProperty("executeTime", executeTime + "");
@ -975,8 +858,8 @@ public class NodeCenterActions {
}
// cb will invoke sendProject in the cluster in its onResult method.
public void requestProject(
String projectName, String pubkey, boolean isPrivate, ResultCallback cb) {
public void requestProject(String projectName, String pubkey, boolean isPrivate,
ResultCallback cb) {
LOGGER.debug("[NodeCenterActions] requestProject : -----------position2");
Map<String, Object> req = new HashMap<>();
@ -995,8 +878,8 @@ public class NodeCenterActions {
// The resultCallback is an Counter, it will invoke
// in its
// onResult
public void sendProject(
String filePath, String isPrivate, String pubKey, String nodeIP, ResultCallback result) {
public void sendProject(String filePath, String isPrivate, String pubKey, String nodeIP,
ResultCallback result) {
LOGGER.debug("sendProject : position----6" + filePath);
File project = new File(filePath);
@ -1074,38 +957,28 @@ public class NodeCenterActions {
}
/*
@Action(async = true)
public void startContractInEachNode(Map<String, String> args,final ResultCallback rc) {
logger.debug("[NodeCenterActions] startContractInEachNode : ");
Map<String, Object> req = new HashMap<>();
req.put("action", "startContractByYpk");
req.put("fileName", args.get("fileName"));
req.put("contractOwner",args.get("pubKey"));
req.put("contractSig",args.get("signature"));
String requestID = System.currentTimeMillis() + "_" + (int) (Math.random() * 10000);
req.put("requestID", requestID);
sync.sleep(requestID, rc);
controller.sendMsg(gson.toJson(req));
}
@Action(async = true)
public void onStartEachNode(Map<String, String> args, final ResultCallback rc) {
logger.debug("[NodeCenterActions] onStartEachNode : ");
logger.debug("---action" + args.get("action"));
logger.debug("---requestID" + args.get("requestID"));
logger.debug("---data" + args.get("data"));
logger.debug("---cid" + args.get("cid"));
logger.debug("---executeTime" + args.get("executeTime"));
String str = "onStartEachNode;" + args.get("data") + ";" + args.get("cid") + ";" +
args.get("executeTime");
sync.wakeUp(args.get("requestID"),str);
}
* @Action(async = true) public void startContractInEachNode(Map<String, String> args,final
* ResultCallback rc) { logger.debug("[NodeCenterActions] startContractInEachNode : ");
*
* Map<String, Object> req = new HashMap<>(); req.put("action", "startContractByYpk");
* req.put("fileName", args.get("fileName")); req.put("contractOwner",args.get("pubKey"));
* req.put("contractSig",args.get("signature"));
*
* String requestID = System.currentTimeMillis() + "_" + (int) (Math.random() * 10000);
* req.put("requestID", requestID); sync.sleep(requestID, rc);
* controller.sendMsg(gson.toJson(req)); }
*
*
* @Action(async = true) public void onStartEachNode(Map<String, String> args, final
* ResultCallback rc) { logger.debug("[NodeCenterActions] onStartEachNode : ");
*
* logger.debug("---action" + args.get("action")); logger.debug("---requestID" +
* args.get("requestID")); logger.debug("---data" + args.get("data")); logger.debug("---cid" +
* args.get("cid")); logger.debug("---executeTime" + args.get("executeTime"));
*
*
* String str = "onStartEachNode;" + args.get("data") + ";" + args.get("cid") + ";" +
* args.get("executeTime"); sync.wakeUp(args.get("requestID"),str); }
*/
@Action(async = true)
@ -1242,32 +1115,26 @@ public class NodeCenterActions {
// return null;
// }
/* @Action(async = true)
public void sendCachedRequests(Map<String, String> args, final ResultCallback rc) {
String contractID = args.get("contractID");
int start = Integer.parseInt(args.get("start"));
int end = Integer.parseInt(args.get("end"));
/*
* @Action(async = true) public void sendCachedRequests(Map<String, String> args, final
* ResultCallback rc) { String contractID = args.get("contractID"); int start =
* Integer.parseInt(args.get("start")); int end = Integer.parseInt(args.get("end"));
*
* RequestCache cache = getCache(contractID); for (int i = start + 1; i < end; i++) { if (cache
* == null || !cache.containsKey(i)) {
*
* // this node crash String nodeID = args.get("nodeID"); restartContracts(nodeID); } JsonObject
* jo = cache.get(i); controller.sendMsg(JsonUtil.toJson(jo));
*
* logger.debug("NC发送第 " + jo.get("seq").getAsString() + " 个请求给Node"); } }
*/
RequestCache cache = getCache(contractID);
for (int i = start + 1; i < end; i++) {
if (cache == null || !cache.containsKey(i)) {
// this node crash
String nodeID = args.get("nodeID");
restartContracts(nodeID);
}
JsonObject jo = cache.get(i);
controller.sendMsg(JsonUtil.toJson(jo));
logger.debug("NC发送第 " + jo.get("seq").getAsString() + " 个请求给Node");
}
}*/
/* @Action(async = true)
public void recoverFinish(Map<String, String> args, final ResultCallback rc) {
ContractRecord cr = recoverMap.get(args.get("nodeID")).get(args.get("contractID"));
if (cr.recoverFlag == RecoverFlag.ToRecover) cr.recoverFlag = RecoverFlag.Fine;
}*/
/*
* @Action(async = true) public void recoverFinish(Map<String, String> args, final
* ResultCallback rc) { ContractRecord cr =
* recoverMap.get(args.get("nodeID")).get(args.get("contractID")); if (cr.recoverFlag ==
* RecoverFlag.ToRecover) cr.recoverFlag = RecoverFlag.Fine; }
*/
@Action(async = true)
public void electMaster(JsonObject args, final ResultCallback rc) {
@ -1282,7 +1149,8 @@ public class NodeCenterActions {
String masterID = args.get("master").getAsString(); // 旧的master
String members = args.get("members").getAsString(); // 执行这个合约所有节点的pubkey
String uniNumber = null;
if (args.has("uniNumber")) uniNumber = args.get("uniNumber").getAsString();
if (args.has("uniNumber"))
uniNumber = args.get("uniNumber").getAsString();
// if (nodeinfos.containsKey(masterID)) {
// CMNode node = nodeinfos.get(masterID);
@ -1305,8 +1173,7 @@ public class NodeCenterActions {
synchronized (NCElectMasterUtil.electInfos) {
if (!NCElectMasterUtil.electInfos.containsKey(contractID)) {
NCElectMasterUtil.electInfos.put(
contractID,
NCElectMasterUtil.electInfos.put(contractID,
new NCElectMasterUtil.ElectInfo(masterID, contractID, members, uniNumber));
}
NCElectMasterUtil.ElectInfo eleInfo = NCElectMasterUtil.electInfos.get(contractID);
@ -1321,7 +1188,8 @@ public class NodeCenterActions {
// JsonObject
for (Map.Entry<String, CMNode> stringCMNodeEntry : nodeInfos.entrySet()) {
CMNode node = stringCMNodeEntry.getValue();
if (node.contracts == null) continue;
if (node.contracts == null)
continue;
for (ContractDesp desp : node.contracts) {
if (desp.type == ContractExecType.Sole || desp.getIsMaster()) {
array.add(extractContractRoute(node, desp));
@ -1368,19 +1236,12 @@ public class NodeCenterActions {
}
for (Map.Entry<String, CMNode> stringCMNodeEntry : nodeInfos.entrySet()) {
CMNode node = stringCMNodeEntry.getValue();
if (node.contracts == null) continue;
if (node.contracts == null)
continue;
for (ContractDesp desp : node.contracts) {
if (contractID.equals(desp.contractID) || contractID.equals(desp.contractName)) {
LOGGER.info(
"查看合约 "
+ desp.contractID
+ " "
+ desp.contractName
+ " 节点 "
+ node.pubKey.substring(0, 5)
+ " "
+ desp.type
+ " "
LOGGER.info("查看合约 " + desp.contractID + " " + desp.contractName + " 节点 "
+ node.pubKey.substring(0, 5) + " " + desp.type + " "
+ desp.getIsMaster());
if (desp.type == ContractExecType.Sole) {
args.add("result", extractContractRoute(node, desp));
@ -1393,31 +1254,25 @@ public class NodeCenterActions {
}
}
/* logger.debug("查看合约 " + contractID + " 的master为 " + args.get("result"));
if(!args.containsKey("result") || args.get("result") == null || args.get("result").equals("null")){
//NC发起各个节点重选
logger.debug("NC发现合约 " + contractID + " 的master为nullNC向该合约的各个节点发起重选请求!");
Map<String,String> req = new HashMap<>();
req.put("action","NCStartElect");
req.put("contractID",contractID);
for(CMNode node : nodeinfos.values()){
node.connection.controller.sendMsg(JsonUtil.toJson(req));
break;
}
}*/
/*
* logger.debug("查看合约 " + contractID + " 的master为 " + args.get("result"));
* if(!args.containsKey("result") || args.get("result") == null ||
* args.get("result").equals("null")){ //NC发起各个节点重选 logger.debug("NC发现合约 " + contractID +
* " 的master为nullNC向该合约的各个节点发起重选请求!"); Map<String,String> req = new HashMap<>();
* req.put("action","NCStartElect"); req.put("contractID",contractID); for(CMNode node :
* nodeinfos.values()){ node.connection.controller.sendMsg(JsonUtil.toJson(req)); break; } }
*/
if (args.get("result").equals("null")) {
LOGGER.info("[NodeCenterActions] query route by info from other NC");
// 查询其他节点的信息
String s = OtherNCProxy.instance.search(contractID);
LOGGER.info(
"[NodeCenterActions] query route by info from other NC "
LOGGER.info("[NodeCenterActions] query route by info from other NC "
+ (s == null ? "null" : s));
if (s != null) {
String[] ss = s.split(";");
args.addProperty(
"result",
args.addProperty("result",
"{\"pubKey\":\"" + ss[0] + "\",\"masterAddress\":\"" + ss[1] + "\"}");
}
}
@ -1430,8 +1285,7 @@ public class NodeCenterActions {
String contractID = args.get("contractID").getAsString();
LOGGER.debug("查看合约 " + contractID + " 的master为 " + args.get("result"));
if (!args.has("result")
|| args.get("result") == null
if (!args.has("result") || args.get("result") == null
|| args.get("result").getAsString().equals("null")) {
// NC发起各个节点重选
LOGGER.debug("NC发现合约 " + contractID + " 的master为nullNC向该合约的各个节点发起重选请求!");
@ -1441,12 +1295,8 @@ public class NodeCenterActions {
req.put("uniNumber", System.currentTimeMillis() + "_" + new Random().nextLong());
for (CMNode node : nodeInfos.values()) {
node.connection.controller.sendMsg(JsonUtil.toJson(req));
LOGGER.info(
"[ADSP] [NodeCenterActions] 发现合约 "
+ contractID
+ " 的master为null 向节点 "
+ node.nodeName
+ "发送选举请求");
LOGGER.info("[ADSP] [NodeCenterActions] 发现合约 " + contractID + " 的master为null 向节点 "
+ node.nodeName + "发送选举请求");
}
}
}
@ -1486,15 +1336,8 @@ public class NodeCenterActions {
for (ContractDesp cd : node.contracts) {
if (cd.contractID.equals(contractID) || cd.contractName.equals(contractID)) {
cd.setIsMaster(true);
LOGGER.debug(
"设置节点 "
+ node.pubKey.substring(0, 5)
+ " 的合约 "
+ cd.contractID
+ " "
+ cd.contractName
+ " isMaster="
+ cd.getIsMaster());
LOGGER.debug("设置节点 " + node.pubKey.substring(0, 5) + " 的合约 " + cd.contractID
+ " " + cd.contractName + " isMaster=" + cd.getIsMaster());
}
}
}
@ -1514,11 +1357,12 @@ public class NodeCenterActions {
// String sponsorPubKey = args.get("sponsorPubKey").getAsString();
String sponsorPubkey = args.get("sponsorPubkey").getAsString();
LOGGER.debug("sponsorPubkey=" + sponsorPubkey);
String[] strs =
args.get("nodeIDs").getAsString().split(","); // all nodes' pubKey in the unit
String[] strs = args.get("nodeIDs").getAsString().split(","); // all nodes' pubKey in the
// unit
Set<String> nodePubKeys = new HashSet<>(); // nodes' pubkey
for (String str : strs) {
if (str != null && str.length() > 0) nodePubKeys.add(str);
if (str != null && str.length() > 0)
nodePubKeys.add(str);
}
Map<String, String> nodes = new HashMap<>(); // node's pubKey-node'a name
for (CMNode node : NodeCenterActions.nodeInfos.values()) {
@ -1538,9 +1382,8 @@ public class NodeCenterActions {
// 身份验证
boolean result =
SM2Util.plainStrVerify(
pubKey, "DistributeContract|" + projectName + "|" + pubKey, signature);
boolean result = SM2Util.plainStrVerify(pubKey,
"DistributeContract|" + projectName + "|" + pubKey, signature);
LOGGER.debug("[UnitAcitons] 验证:" + result + " -> projectName:" + projectName);
@ -1631,12 +1474,8 @@ public class NodeCenterActions {
JsonObject ret = new JsonObject();
ret.addProperty("action", "onQueryAEState");
if (json.has("caller")) {
ret.add(
"result",
JsonParser.parseString(
JsonUtil.toJson(
ActionExecutor.getStatistic(
json.get("caller").getAsString()))));
ret.add("result", JsonParser.parseString(JsonUtil
.toJson(ActionExecutor.getStatistic(json.get("caller").getAsString()))));
} else {
ret.add("result", JsonParser.parseString(JsonUtil.toJson(ActionExecutor.getAllData())));
}
@ -1655,7 +1494,8 @@ public class NodeCenterActions {
public void publishEvent(JsonObject args, final ResultCallback rcb) {
try {
args.addProperty("action", "publishEventFromCenter");
nodeInfos.values().iterator().next().connection.controller.sendMsg(JsonUtil.toJson(args));
nodeInfos.values().iterator().next().connection.controller
.sendMsg(JsonUtil.toJson(args));
} catch (Exception e) {
LOGGER.error("publishEvent error! " + e.getMessage());
}

View File

@ -34,12 +34,10 @@ public class NodeCenterFrameHandler extends SimpleChannelInboundHandler<Object>
this.actions = new NodeCenterActions(this);
MetaIndexAction.controller = this;
// TODO 添加那个UnitAction.
ae =
new ActionExecutor<ResultCallback, JsonObject>(
executorService, actions, new MetaIndexAction()) {
ae = new ActionExecutor<ResultCallback, JsonObject>(executorService, actions,
new MetaIndexAction()) {
@Override
public boolean checkPermission(
Action a, final JsonObject args, long permission) {
public boolean checkPermission(Action a, final JsonObject args, long permission) {
long val = a.userPermission();
boolean flag;
String status = "refuse";
@ -54,17 +52,10 @@ public class NodeCenterFrameHandler extends SimpleChannelInboundHandler<Object>
flag = false;
}
if (!action.contains("checkAlive"))
NodeCenterServer.nodeTcpLogDB.put(
action,
"{\"action\":\""
+ action
+ "\",\"pubKey\":\""
+ pubKey
+ "\",\"status\":\""
+ status
+ "\",\"date\":"
+ System.currentTimeMillis()
+ "}");
NodeCenterServer.nodeTcpLogDB.put(action,
"{\"action\":\"" + action + "\",\"pubKey\":\"" + pubKey
+ "\",\"status\":\"" + status + "\",\"date\":"
+ System.currentTimeMillis() + "}");
return flag;
}
};
@ -80,9 +71,8 @@ public class NodeCenterFrameHandler extends SimpleChannelInboundHandler<Object>
this.ctx = ctx;
JsonObject map;
try {
map =
JsonParser.parseReader(
new InputStreamReader(new ByteBufInputStream((ByteBuf) frame)))
map = JsonParser
.parseReader(new InputStreamReader(new ByteBufInputStream((ByteBuf) frame)))
.getAsJsonObject();
} catch (Exception e) {
@ -93,10 +83,7 @@ public class NodeCenterFrameHandler extends SimpleChannelInboundHandler<Object>
try {
final String action = map.get("action").getAsString();
// System.out.println("[NodeCenterFramHandler] handle:" + action);
ae.handle(
action,
map,
new ResultCallback() {
ae.handle(action, map, new ResultCallback() {
@Override
public void onResult(String ret) {
sendMsg(ret);
@ -117,12 +104,9 @@ public class NodeCenterFrameHandler extends SimpleChannelInboundHandler<Object>
StringBuilder ret = new StringBuilder();
int count = 0;
for (String s : strs) {
if (s.contains("sun.reflect")
|| s.contains("java.lang.reflect")
|| s.contains("org.apache")
|| s.contains("java.util")
|| s.contains("java.lang")
|| s.contains("io.netty")) {
if (s.contains("sun.reflect") || s.contains("java.lang.reflect")
|| s.contains("org.apache") || s.contains("java.util")
|| s.contains("java.lang") || s.contains("io.netty")) {
continue;
}
ret.append(s);

View File

@ -39,9 +39,8 @@ public class NodeCenterWSFrameHandler extends SimpleChannelInboundHandler<WebSoc
logActions = new LogActions(managerAction);
unitActions = new UnitActions(managerAction);
fileActions = new FileActions(this);
ae =
new ActionExecutor<ResultCallback, JsonObject>(
executorService, managerAction, logActions, unitActions, fileActions, new MetaIndexAction(), new TracingAction()) {
ae = new ActionExecutor<ResultCallback, JsonObject>(executorService, managerAction,
logActions, unitActions, fileActions, new MetaIndexAction(), new TracingAction()) {
@Override
public boolean checkPermission(Action a, JsonObject arg, long permission) {
// Permission userPermission = a.userPermission();
@ -51,21 +50,16 @@ public class NodeCenterWSFrameHandler extends SimpleChannelInboundHandler<WebSoc
String action = arg.get("action").getAsString();
LOGGER.debug("permission" + permission);
LOGGER.debug("userPermission" + val);
NodeCenterServer.nodeHttpLogDB.put(
action,
"{\"action\":\""
+ action
+ "\",\"pubKey\":\""
+ pubKey
+ "\",\"date\":"
+ System.currentTimeMillis()
+ "}");
NodeCenterServer.nodeHttpLogDB.put(action,
"{\"action\":\"" + action + "\",\"pubKey\":\"" + pubKey + "\",\"date\":"
+ System.currentTimeMillis() + "}");
// if (val == 0) return true;
// return true;
if ((permission & val) == val) {
return true;
} else return false;
} else
return false;
}
};
for (String str : wsPluginActions) {
@ -114,13 +108,13 @@ public class NodeCenterWSFrameHandler extends SimpleChannelInboundHandler<WebSoc
response.action = "sendNextSegment";
try {
boolean isSegment = false;
if (map.has("isSegment")) isSegment = map.get("isSegment").getAsBoolean();
if (map.has("isSegment"))
isSegment = map.get("isSegment").getAsBoolean();
if (isSegment) {
dataCache.append(map.get("data").getAsString());
response = new Response();
response.action = "sendNextSegment";
ctx.channel()
.writeAndFlush(new TextWebSocketFrame(JsonUtil.toJson(response)));
ctx.channel().writeAndFlush(new TextWebSocketFrame(JsonUtil.toJson(response)));
return;
} else {
if (dataCache.length() > 0) {
@ -133,10 +127,7 @@ public class NodeCenterWSFrameHandler extends SimpleChannelInboundHandler<WebSoc
}
String action = map.get("action").getAsString();
final JsonObject jmap = map;
ae.handle(
action,
map,
new ResultCallback() {
ae.handle(action, map, new ResultCallback() {
@Override
public void onResult(Map jo) {
if (jmap.has("requestID")) {
@ -175,15 +166,22 @@ public class NodeCenterWSFrameHandler extends SimpleChannelInboundHandler<WebSoc
String ret = "";
int count = 0;
for (String s : strs) {
if (s.contains("sun.reflect")) continue;
if (s.contains("java.lang.reflect")) continue;
if (s.contains("org.apache")) continue;
if (s.contains("java.util")) continue;
if (s.contains("java.lang")) continue;
if (s.contains("io.netty")) continue;
if (s.contains("sun.reflect"))
continue;
if (s.contains("java.lang.reflect"))
continue;
if (s.contains("org.apache"))
continue;
if (s.contains("java.util"))
continue;
if (s.contains("java.lang"))
continue;
if (s.contains("io.netty"))
continue;
ret += s;
ret += "\n";
if (count++ > 5) break;
if (count++ > 5)
break;
}
response.data = ret;
ctx.channel().writeAndFlush(new TextWebSocketFrame(JsonUtil.toJson(response)));

View File

@ -36,8 +36,7 @@ public class OtherNCProxy {
public Set<String> ncAdds = ConcurrentHashMap.newKeySet(); // 其他NC的ip:port
public SM2KeyPair sm2; // NC证书
private OtherNCProxy() {
}
private OtherNCProxy() {}
// 从DB中加载OtherNC和NCFile的信息
public void init() {
@ -56,12 +55,8 @@ public class OtherNCProxy {
}
public void periodUpdate() {
NodeCenterServer.scheduledThreadPool.scheduleWithFixedDelay(
() -> {
},
0,
30,
TimeUnit.SECONDS);
NodeCenterServer.scheduledThreadPool.scheduleWithFixedDelay(() -> {
}, 0, 30, TimeUnit.SECONDS);
}
public void loadOtherNCs() {
@ -158,7 +153,8 @@ public class OtherNCProxy {
synchronized (otherNodeCenterInfos) {
otherNodeCenterInfos.put(nodePubkey, list);
LOGGER.debug("update route nodePubkey=" + nodePubkey + " list=" + JsonUtil.toJson(list));
LOGGER.debug(
"update route nodePubkey=" + nodePubkey + " list=" + JsonUtil.toJson(list));
}
}
@ -170,7 +166,8 @@ public class OtherNCProxy {
List<OtherNCInfo> list = otherNodeCenterInfos.get(pubKey);
for (OtherNCInfo info : list) {
if (info.contractID.equals(id) || info.contractName.equals(id)) {
LOGGER.debug("[OtherNCProxy] find contract " + id + "pubKey is " + pubKey + " master is " + info.master + " from other NC route info");
LOGGER.debug("[OtherNCProxy] find contract " + id + "pubKey is " + pubKey
+ " master is " + info.master + " from other NC route info");
return pubKey + ";" + info.master;
}
}
@ -207,7 +204,8 @@ public class OtherNCProxy {
continue;
for (ContractDesp desp : node.contracts) {
if (desp.type == ContractExecType.Sole || desp.getIsMaster()) {
list.add(new OtherNCInfo(desp.contractID, desp.contractName, node.masterAddress));
list.add(new OtherNCInfo(desp.contractID, desp.contractName,
node.masterAddress));
}
}
}
@ -227,9 +225,7 @@ public class OtherNCProxy {
b.group(group);
NCConnector conn = new NCConnector(target, b, new NCClientHandler());
connList.put(target, conn);
b.channel(NioSocketChannel.class)
.handler(
new ChannelInitializer<SocketChannel>() {
b.channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
@ -240,9 +236,7 @@ public class OtherNCProxy {
if (!conn.handler.isOpen()) {
String[] ipAndPort = target.split(":");
b.connect(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
.sync()
.channel();
b.connect(ipAndPort[0], Integer.parseInt(ipAndPort[1])).sync().channel();
}
reconnect(target);
@ -257,10 +251,7 @@ public class OtherNCProxy {
NCConnector conn = connList.get(target);
if (!conn.handler.isOpen()) {
String[] ipAndPort = conn.ipAndPort.split(":");
conn.bootstrap
.connect(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
.sync()
.channel();
conn.bootstrap.connect(ipAndPort[0], Integer.parseInt(ipAndPort[1])).sync().channel();
}
}

View File

@ -29,7 +29,8 @@ public class TracingAction {
dependencies.add(val);
}
} else {
if (desp.dependentContracts != null && desp.dependentContracts.contains(contractName)) {
if (desp.dependentContracts != null
&& desp.dependentContracts.contains(contractName)) {
beDependent.add(desp.contractName);
}
}

View File

@ -25,7 +25,8 @@ public class UnitActions {
private static String convertContractName(String contractID) {
ContractDesp desp = NodeCenterActions.getContractByName(contractID);
if (desp != null) return desp.contractID;
if (desp != null)
return desp.contractID;
return contractID;
}
@ -47,10 +48,8 @@ public class UnitActions {
long start = System.currentTimeMillis();
final String pubKey = managerAction.getPubKey(json);
List<KV> allunits = KeyValueDBUtil.instance.getKeyValues(NCTables.TrustUnitsDB.toString());
if (pubKey != null
&& KeyValueDBUtil.instance
.getValue(NCTables.ConfigDB.toString(), "__CenterManager__")
.contains(pubKey)) {
if (pubKey != null && KeyValueDBUtil.instance
.getValue(NCTables.ConfigDB.toString(), "__CenterManager__").contains(pubKey)) {
simpleReply(resultCallback, "onListTrustUnits", allunits);
LOGGER.debug("is center manager");
long end = System.currentTimeMillis();
@ -75,10 +74,8 @@ public class UnitActions {
public void createTrustUnit(JsonObject json, ResultCallback resultCallback) {
final String pubKey = managerAction.getPubKey(json);
LOGGER.debug("[createTrustUnit] " + json.get("data").toString());
KeyValueDBUtil.instance.setValue(
NCTables.TrustUnitsDB.toString(),
pubKey + "_" + json.get("msg").getAsString(),
json.get("data").toString());
KeyValueDBUtil.instance.setValue(NCTables.TrustUnitsDB.toString(),
pubKey + "_" + json.get("msg").getAsString(), json.get("data").toString());
Map<String, Object> ret = new HashMap<>();
ret.put("action", "onCreateTrustUnit");
ret.put("status", "Success");
@ -88,16 +85,12 @@ public class UnitActions {
@Action(userPermission = 1 << 6, async = true)
public void deleteTrustUnit(JsonObject json, ResultCallback resultCallback) {
// final String pubKey = managerAction.pubKey;
LOGGER.debug(
"[deleteTrustUnit] before"
+ KeyValueDBUtil.instance.getValue(
NCTables.TrustUnitsDB.toString(), json.get("data").getAsString()));
KeyValueDBUtil.instance.delete(
NCTables.TrustUnitsDB.toString(), json.get("data").getAsString());
LOGGER.debug(
"[deleteTrustUnit] after"
+ KeyValueDBUtil.instance.getValue(
NCTables.TrustUnitsDB.toString(), json.get("data").getAsString()));
LOGGER.debug("[deleteTrustUnit] before" + KeyValueDBUtil.instance
.getValue(NCTables.TrustUnitsDB.toString(), json.get("data").getAsString()));
KeyValueDBUtil.instance.delete(NCTables.TrustUnitsDB.toString(),
json.get("data").getAsString());
LOGGER.debug("[deleteTrustUnit] after" + KeyValueDBUtil.instance
.getValue(NCTables.TrustUnitsDB.toString(), json.get("data").getAsString()));
Map<String, Object> ret = new HashMap<>();
ret.put("action", "onDeleteTrustUnit");
ret.put("status", "Success");
@ -146,7 +139,8 @@ public class UnitActions {
// logger.debug("tid:" + Thread.currentThread().getId() + " contractID:" + contractID);
// if (nodes == null) {
// rc.onResult(
// "{\"status\":\"Error\",\"result\":\"can't locate contract\",\"action\":\"onExecuteContractTrustfully\"}");
// "{\"status\":\"Error\",\"result\":\"can't locate
// contract\",\"action\":\"onExecuteContractTrustfully\"}");
// } else {
// mInfo.rcf.execute(requestID, rc, JsonUtil.toJson(args));
// }
@ -183,7 +177,8 @@ public class UnitActions {
// logger.debug("tid:" + Thread.currentThread().getId() + " contractID:" + contractID);
// if (nodes == null) {
// rc.onResult(
// "{\"status\":\"Error\",\"result\":\"can't locate contract\",\"action\":\"onExecuteContractTrustfully\"}");
// "{\"status\":\"Error\",\"result\":\"can't locate
// contract\",\"action\":\"onExecuteContractTrustfully\"}");
// } else {
// mInfo.rcf.execute(requestID, rc, JsonUtil.toJson(args));
// }
@ -198,15 +193,12 @@ public class UnitActions {
@Action(async = true, userPermission = 1L << 6)
public void listContractProcess(JsonObject args, final ResultCallback rc) {
List<ContractDesp> info = new ArrayList<>();
LOGGER.debug(
"[contracts] "
+ JsonUtil.toPrettyJson(NodeCenterActions.nodeInfos));
LOGGER.debug(
"[cid2Nodes]"
+ JsonUtil.toPrettyJson(NodeCenterActions.contractID2Members));
LOGGER.debug("[contracts] " + JsonUtil.toPrettyJson(NodeCenterActions.nodeInfos));
LOGGER.debug("[cid2Nodes]" + JsonUtil.toPrettyJson(NodeCenterActions.contractID2Members));
for (String key : NodeCenterActions.contractID2Members.keySet()) {
ContractDesp desp = NodeCenterActions.getContractByID(key);
if (desp != null) info.add(desp);
if (desp != null)
info.add(desp);
}
simpleReply(rc, "onListContractProcess", JsonUtil.toJson(info));
}
@ -214,15 +206,12 @@ public class UnitActions {
@Action(userPermission = 1 << 6, async = true)
public void listMultiPointContractProcess(JsonObject json, ResultCallback resultCallback) {
List<ContractDesp> info = new ArrayList<>();
LOGGER.debug(
"[contracts] "
+ JsonUtil.toPrettyJson(NodeCenterActions.nodeInfos));
LOGGER.debug(
"[cid2Nodes]"
+ JsonUtil.toPrettyJson(NodeCenterActions.contractID2Members));
LOGGER.debug("[contracts] " + JsonUtil.toPrettyJson(NodeCenterActions.nodeInfos));
LOGGER.debug("[cid2Nodes]" + JsonUtil.toPrettyJson(NodeCenterActions.contractID2Members));
for (String key : NodeCenterActions.contractID2Members.keySet()) {
ContractDesp desp = NodeCenterActions.getContractByID(key);
if (desp != null) info.add(desp);
if (desp != null)
info.add(desp);
}
simpleReply(resultCallback, "onListMultiPointContractProcess", JsonUtil.toJson(info));
}
@ -251,7 +240,8 @@ public class UnitActions {
String[] strs = args.get("nodeIDs").getAsString().split(",");
Set<String> nodePubKeys = new HashSet<>(); // nodes' pubkey
for (String str : strs) {
if (str != null && str.length() > 0) nodePubKeys.add(str);
if (str != null && str.length() > 0)
nodePubKeys.add(str);
}
Map<String, String> nodes = new HashMap<>(); // node's pubKey-node'a name
for (CMNode node : NodeCenterActions.nodeInfos.values()) {
@ -264,9 +254,8 @@ public class UnitActions {
simpleReply(rc, "onDistributeYPK", "empty nodes");
}
boolean result =
SM2Util.plainStrVerify(
pubKey, "DistributeYPK|" + projectName + "|" + pubKey, signature);
boolean result = SM2Util.plainStrVerify(pubKey,
"DistributeYPK|" + projectName + "|" + pubKey, signature);
LOGGER.info("[UnitAcitons] 验签:" + result + " -> projectName:" + projectName);
String ypkType = projectName.split("_")[0];

View File

@ -44,8 +44,8 @@ public class PermissionHelper {
printActions(handlers, set);
}
private void printActions(
Map<String, ActionExecutor.Pair<Method, Object>> handlers, EnumSet<Role> set) {
private void printActions(Map<String, ActionExecutor.Pair<Method, Object>> handlers,
EnumSet<Role> set) {
List<Line> lines = new ArrayList<>();
for (String str : handlers.keySet()) {
Method m = handlers.get(str).first();
@ -56,18 +56,18 @@ public class PermissionHelper {
l.permission = a.userPermission();
l.roles = "";
for (Role r : set)
if ((r.getValue() & l.permission) == l.permission) l.roles += r.name() + ";";
if (l.roles.equals("CenterManager;NodeManager;Node;Anonymous;")) l.roles = "任意角色";
if ((r.getValue() & l.permission) == l.permission)
l.roles += r.name() + ";";
if (l.roles.equals("CenterManager;NodeManager;Node;Anonymous;"))
l.roles = "任意角色";
}
lines.sort(
new Comparator<Line>() {
lines.sort(new Comparator<Line>() {
@Override
public int compare(Line o1, Line o2) {
return o1.action.compareTo(o2.action);
}
});
lines.sort(
new Comparator<Line>() {
lines.sort(new Comparator<Line>() {
@Override
public int compare(Line o1, Line o2) {
return o1.roles.compareTo(o2.roles);

View File

@ -18,8 +18,7 @@ public class TestServer {
public void channelActive(ChannelHandlerContext ctx) {
LOGGER.info("active");
System.out.println("======Active=======");
ctx.channel()
.writeAndFlush("ACTIVE from Server")
ctx.channel().writeAndFlush("ACTIVE from Server")
.addListener(future -> LOGGER.info("Active Send Done!!"));
}

View File

@ -10,13 +10,16 @@ public class TestADSP {
// public static String resultPath = "./testADSP/" + "result.txt";
// public static String contractID = "counter";
// public static String action = "executeContract";
// public static String pubkey = "0480204f4ef341359a5f64fcb11baf9ca2e6706ac20cba36ca83066870cf2c1d5de6df67e24e68dde7934af9b31d94a6084281db3d32d5ce42ab8f75bf799aca05";
// public static String pubkey =
// "0480204f4ef341359a5f64fcb11baf9ca2e6706ac20cba36ca83066870cf2c1d5de6df67e24e68dde7934af9b31d94a6084281db3d32d5ce42ab8f75bf799aca05";
//
// public static String operation = "count";
// public static String arg = "1";
//
// public static String priKey = "63464fa9587bbf2a022b3d655f4ed49c7d9a4249de1079d52bd5a1fec3308719";
// public static String signature = "dfd07c00c5c89fb0f901bed132db4342f49f4560f34c15878636623d2a8716a2c93ad32eeb9ee9c4f1d905df292cb4a1a27aa0171f2856848ce07cfc8022e809";
// public static String priKey =
// "63464fa9587bbf2a022b3d655f4ed49c7d9a4249de1079d52bd5a1fec3308719";
// public static String signature =
// "dfd07c00c5c89fb0f901bed132db4342f49f4560f34c15878636623d2a8716a2c93ad32eeb9ee9c4f1d905df292cb4a1a27aa0171f2856848ce07cfc8022e809";
//
// public static List<String> crashSeries = new ArrayList<String>();
// public static AtomicInteger crashIndex = new AtomicInteger();
@ -30,7 +33,8 @@ public class TestADSP {
// static{
// getSig();
//
// nodes.put("zyx's book","0440023d13facddcefbd87f2bb622b1b2b6ca43eb84c0579b82450814233ce557bd7ad7852cd47f1d6df867c5413ecf3dd1954cdbf5a6da683e3a89095f091d83d");
// nodes.put("zyx's
// book","0440023d13facddcefbd87f2bb622b1b2b6ca43eb84c0579b82450814233ce557bd7ad7852cd47f1d6df867c5413ecf3dd1954cdbf5a6da683e3a89095f091d83d");
// nodes.put("localNode1","0480f783cf63294224afbd19e175e5c7ea0c5c60ee115ed9e114fe2691eb28cc1680e5fec532d64c80d2f6b737e8b43b94d5a9c73206ac6235c50ff992133e4d38");
// nodes.put("localNode2","044e0c127e24407ee8a8abd4fe507b32b340ce9775317b8d7b5bb8e182745d6a45c57aaf867d80a5f816a7561564f9294c6aee5916f95e93c0011e16c28b9e849a");
// nodes.put("localNode3","0485040cfd94bec672bb8ba184856188963ee4ad339d247a76e2d9819f20e61bfad24ec763b1371998583f9dc0cf55c4d53cb1a2ec84c67aed1aa5203cc84fc78f");
@ -68,7 +72,8 @@ public class TestADSP {
// case "crash":
// if(pubKey != null){
// System.out.println("[TestADSP] 令节点 " + crash[1] + "崩溃");
// System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(System.currentTimeMillis())));
// System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new
// Date(System.currentTimeMillis())));
// crashSet.add(pubKey);
// }
// break;
@ -79,7 +84,8 @@ public class TestADSP {
// }
// recover.add(pubKey);
// System.out.println("[TestADSP] 令节点 " + crash[1] + "上线");
// System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(System.currentTimeMillis())));
// System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new
// Date(System.currentTimeMillis())));
// }
// break;
// default:
@ -95,7 +101,8 @@ public class TestADSP {
//
//
//
// System.out.println("[TestADSP]发起请求 " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(System.currentTimeMillis())));
// System.out.println("[TestADSP]发起请求 " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new
// Date(System.currentTimeMillis())));
// JsonObject jo = new JsonObject();
// jo.addProperty("action", TestADSP.action);
// jo.addProperty("contractID", TestADSP.contractID);
@ -148,7 +155,8 @@ public class TestADSP {
// System.out.println(file.getAbsolutePath());
// StringBuilder result = new StringBuilder();
// try{
// BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));//构造一个BufferedReader类来读取文件
// BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file),
// "UTF-8"));//构造一个BufferedReader类来读取文件
//
// String s = null;
// while((s = br.readLine())!=null){//使用readLine方法一次读一行
@ -170,7 +178,8 @@ public class TestADSP {
//
// unavailable = totalRequest * 2 - fileTotal;
//
// System.out.println("共有" + totalRequest * 2 + "次请求,其中" + correct + "正确," + incorrect + "不正确," + unavailable + "不可用.");
// System.out.println("共有" + totalRequest * 2 + "次请求,其中" + correct + "正确," + incorrect + "不正确,"
// + unavailable + "不可用.");
// }
//
// public static String convertNameToPubkey(String nodeName){