2021-09-26 13:01:04 +08:00

211 lines
8.4 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package org.bdware.server.nodecenter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.bean.ContractDesp;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.NodeCenterServer;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class NCElectMasterUtil {
public static final Map<String, ElectInfo> electInfos = new ConcurrentHashMap<>(); //key is contractID
private static final Logger LOGGER = LogManager.getLogger(NCElectMasterUtil.class);
public static class ElectInfo {
final long delay = 5000;
final AtomicBoolean startElect = new AtomicBoolean(false);
String formerMaster;
String uniNumber;
long lastTime = System.currentTimeMillis();
int onlineNum; //除旧的master之外的在线节点数
String contractID;
String mems; //执行这个合约的所有节点的pubKey
private Map<String, Integer> nodeID2LastExe = new ConcurrentHashMap<>(); //key is nodeID
private Timer timer;
public ElectInfo(String m, String con, String members, String uni) {
formerMaster = m;
contractID = con;
mems = members;
uniNumber = uni;
String[] mem = members.split(",");
/* try {
Thread.sleep(2000); //让NC发现崩溃节点
} catch (InterruptedException e) {
e.printStackTrace();
}*/
for (String memID : mem) {
if (memID == null || memID.length() == 0)
continue;
if (NodeCenterActions.nodeInfos.containsKey(memID) && !memID.equals(formerMaster)) {
onlineNum++;
}
}
NodeCenterServer.scheduledThreadPool.scheduleWithFixedDelay(
() -> {
// cancel the election if no nodes find the master's crash in delay + 2 seconds
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss.SSS");
if (System.currentTimeMillis() - lastTime > (delay + 15000)) {
LOGGER.info("lastTime=" + df.format(lastTime) + " cancel the election");
cancel();
}
// start timeout election
// if (electInfos.containsKey(contractID) && nodeID2LastExe.size() == onlineNum) {
// elect();
// }
},
delay + 1500,
delay + 1500,
TimeUnit.MILLISECONDS);
//timer.schedule(task, dealy + 2000);
LOGGER.info("new election of contract " + contractID + " is added to electInfos, " +
onlineNum + " node is online");
}
public void cancel() {
if (timer != null) {
timer.cancel();
timer = null;
}
if (electInfos.containsKey(contractID)) {
electInfos.remove(contractID);
}
}
public synchronized void put(String nodeID, int lastExe, String master, String mem2, String uniNum) {
LOGGER.info("put nodeID=" + nodeID);
//确保该合约某时只能有一个选举,其他的选举请求被忽略
if (!master.equals(formerMaster)) {
LOGGER.debug("[NCElectMasterUtil] master error!");
return;
}
if (nodeID.equals(formerMaster)) {
LOGGER.debug("former master voted,error!");
return;
}
if (master.equals("null")) {
LOGGER.info("uniNum=" + uniNum + " uniNumber=" + uniNumber);
if (!uniNum.equals(uniNumber)) {
LOGGER.debug("already has re-elect process when master is null");
return;
}
}
long now = System.currentTimeMillis();
//认为是一个新的选举,之前的作废
if (now - lastTime > delay) {
LOGGER.info("[NCElectMasterUtil] time error!");
cancel();
synchronized (electInfos) {
//electInfos.remove(contractID);
NCElectMasterUtil.electInfos.put(contractID, new ElectInfo(master, contractID, mem2, uniNum));
ElectInfo eleInfo = electInfos.get(contractID);
eleInfo.put(nodeID, lastExe, master, mem2, uniNum);
}
return;
}
lastTime = now;
nodeID2LastExe.put(nodeID, lastExe);
LOGGER.info("[ElectInfo] 加入合约 " + contractID + " 的选举信息节点" + nodeID.substring(0, 5));
if (nodeID2LastExe.size() == onlineNum) {
cancel();
elect();
}
}
public synchronized void elect() {
LOGGER.info("[ElectInfo] 开始选举");
ElectMasterTimeRecorder.startElect = System.currentTimeMillis();
synchronized (startElect) {
startElect.set(true);
}
//更新路由信息这个合约的master暂时为null
if (NodeCenterActions.nodeInfos.containsKey(formerMaster)) {
CMNode node = NodeCenterActions.nodeInfos.get(formerMaster);
if (node != null) {
synchronized (node) {
for (ContractDesp cd : node.contracts) {
if (cd.contractID.equals(contractID) || cd.contractName.equals(contractID)) {
cd.setIsMaster(false);
LOGGER.debug("设置节点 " + node.pubKey.substring(0, 5) + " 的合约 " + contractID + " isMaster=" + false);
break;
}
}
}
}
}
int maxExeSeq = Integer.MIN_VALUE;
String newMaster = "";
for (String id : nodeID2LastExe.keySet()) {
if (nodeID2LastExe.get(id) > maxExeSeq) {
newMaster = id;
}
}
LOGGER.info("[NCElectMasterUtil] 选举出新的master " + newMaster);
cancel();
//electInfos.remove(contractID);
//通知新的master让它发起重连操作并在所有节点重连成功之后开启master的恢复
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
LOGGER.info("开始计算需要连接新master的都有哪些节点:");
StringBuilder onlineMems = new StringBuilder(); //不包含旧的master
for (String memID : nodeID2LastExe.keySet()) {
if (memID == null || memID.length() == 0)
continue;
LOGGER.info("[NCElectMasterUtil] 查看节点 " + memID.substring(0, 5) + " 是否还在线");
if (NodeCenterActions.nodeInfos.containsKey(memID)) {
if (memID.equals(formerMaster))
continue;
LOGGER.info("onlineMems中加入 " + memID.substring(0, 5));
if (onlineMems.toString().equals("")) {
onlineMems = new StringBuilder(memID);
} else {
onlineMems.append(",");
onlineMems.append(memID);
}
}
}
ElectMasterTimeRecorder.findNewMaster = System.currentTimeMillis();
ElectMasterTimeRecorder.newMaster = newMaster;
LOGGER.info("通知新的master让它发起重连操作 " + onlineMems);
CMNode masterNode = NodeCenterActions.nodeInfos.get(newMaster);
Map<String, String> req = new HashMap<>();
req.put("action", "newMasterStart");
req.put("members", mems);
req.put("onlineMems", onlineMems.toString());
req.put("contractID", contractID);
req.put("formerMaster", formerMaster);
masterNode.connection.controller.sendMsg(JsonUtil.toJson(req));
}
}
}