merge feature-dengshuang

step1. prune ContractExecutor
This commit is contained in:
CaiHQ 2021-11-18 19:45:53 +08:00
parent d3fa52546f
commit ca20ea58f5
12 changed files with 150 additions and 133 deletions

View File

@ -26,6 +26,7 @@ import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.RespCache;
import org.bdware.sc.util.JsonUtil;
import org.bdware.sc.util.VersionUtil;
import org.bdware.server.trustedmodel.SingleNodeExecutor;
import org.hyperic.sigar.Mem;
import org.hyperic.sigar.ProcMem;
import org.hyperic.sigar.Sigar;
@ -33,6 +34,7 @@ import org.zz.gmhelper.SM2KeyPair;
import org.zz.gmhelper.SM2Util;
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.*;
@ -74,7 +76,9 @@ public class ContractManager {
new SynchronousQueue<>());
public static ScheduledExecutorService scheduledThreadPool =
Executors.newScheduledThreadPool(10);
public static DoipServiceInfoConfigurer doipConfigurer;
public static int logStage = 0;
public static Sigar sigar = null; // 获取network等资源什么
@ -97,11 +101,12 @@ public class ContractManager {
ManagerHandler handler;
ServiceServer server;
Map<String, RespCache> reqCache = new ConcurrentHashMap<>(); // key is requestID
// private long expiredTime;
private ContractClient analysisClient;
// private long expiredTime;
public ContractManager() {
instance = this;
SingleNodeExecutor.setContractManager(this);
handler = new ManagerHandler(this);
int startPort = cPort.getPortAndInc();
server = new ServiceServer(handler, startPort);
@ -115,6 +120,20 @@ public class ContractManager {
projectRecoder = new ProjectRecorder("./ContractManagerDB");
}
private static String convertToBytes(long traffic) {
String[] unit = new String[]{"B", "KB", "MB", "GB", "TB"};
double d = traffic;
int i;
for (i = 0; i < unit.length - 1; i++) {
if (d > 1024.0) {
d /= 1024.0;
} else {
break;
}
}
return String.format("%.2f %s", d, unit[i]);
}
// public static void updateContractHandleRecord(Contract c, ResultCallback resultCallback)
// throws Exception {
//
@ -189,20 +208,6 @@ public class ContractManager {
// */
// }
private static String convertToBytes(long traffic) {
String[] unit = new String[]{"B", "KB", "MB", "GB", "TB"};
double d = traffic;
int i;
for (i = 0; i < unit.length - 1; i++) {
if (d > 1024.0) {
d /= 1024.0;
} else {
break;
}
}
return String.format("%.2f %s", d, unit[i]);
}
public static byte[] fileToByteArray(String filename) throws IOException {
File f = new File(filename);
if (!f.exists()) {
@ -430,16 +435,16 @@ public class ContractManager {
return null;
}
// public void setExpiredTime(long time) {
// expiredTime = time;
// }
public String getContractNameByID(String id) {
ContractMeta meta = statusRecorder.getContractMeta(id);
if (meta != null) return meta.name;
return null;
}
// public void setExpiredTime(long time) {
// expiredTime = time;
// }
public void reconnectContractProcess() {
RecoverMechTimeRecorder.startReconnectCP = System.currentTimeMillis();
LOGGER.info("reconnectContractProcess");
@ -544,6 +549,25 @@ public class ContractManager {
return null;
}
public String changeDumpPeriod(String contractName, String dumpPeriod) {
ContractResult r;
ContractClient client = getByName(contractName);
ProjectConfig config = projectRecoder.getProjectConfig(contractName);
config.setDumpPeriod(dumpPeriod);
if (null == client) {
r = new ContractResult(Status.Error, new JsonPrimitive("contract process not found"));
return JsonUtil.toJson(r);
}
loadProjectConfig(client);
ContractMeta meta = client.contractMeta;
addLocalContractLog(
"changeDumpPeriod", meta.contract.getID(), meta.name, meta.contract.getOwner());
r = new ContractResult(Status.Success, new JsonPrimitive("change dump period finished"));
return JsonUtil.toJson(r);
}
// public String resumeStartContractAndRedirect(Contract c, PrintStream ps, String alias) {
// long freeMemory = getFreeMemory();
// if (freeMemory < memoryLimit) {
@ -711,23 +735,19 @@ public class ContractManager {
// return resumeStartContractAndRedirect(c, System.out, null);
// }
public String changeDumpPeriod(String contractName, String dumpPeriod) {
ContractResult r;
ContractClient client = getByName(contractName);
ProjectConfig config = projectRecoder.getProjectConfig(contractName);
config.setDumpPeriod(dumpPeriod);
if (null == client) {
r = new ContractResult(Status.Error, new JsonPrimitive("contract process not found"));
return JsonUtil.toJson(r);
public long getFreeMemory() {
try {
if (null == sigar) {
sigar = new Sigar();
}
loadProjectConfig(client);
ContractMeta meta = client.contractMeta;
addLocalContractLog(
"changeDumpPeriod", meta.contract.getID(), meta.name, meta.contract.getOwner());
r = new ContractResult(Status.Success, new JsonPrimitive("change dump period finished"));
return JsonUtil.toJson(r);
Mem mem = sigar.getMem();
LOGGER.debug("[free memory] " + mem.getFree() + " " + mem.getActualFree());
return mem.getFree();
} catch (Throwable e) {
e.printStackTrace(System.err);
e.printStackTrace();
}
return memoryLimit + 1;
}
// private void hangUpKillContract(String contractID) {
@ -767,21 +787,6 @@ public class ContractManager {
// }
// }
public long getFreeMemory() {
try {
if (null == sigar) {
sigar = new Sigar();
}
Mem mem = sigar.getMem();
LOGGER.debug("[free memory] " + mem.getFree() + " " + mem.getActualFree());
return mem.getFree();
} catch (Throwable e) {
e.printStackTrace(System.err);
e.printStackTrace();
}
return memoryLimit + 1;
}
public void addLocalContractLog(String action, ContractClient client) {
ContractMeta meta = client.contractMeta;
addLocalContractLog(action, meta.contract.getID(), meta.name, meta.contract.getOwner());
@ -884,7 +889,6 @@ public class ContractManager {
case RequestAllResponseFirst:
case RequestAllResponseHalf:
case Sharding:
case SelfAdaptiveSharding:
case Sole:
ret = client.startProcess(ps);
conflictCheck = checkConflict(c, client, ret);
@ -1139,7 +1143,6 @@ public class ContractManager {
// B是授受调用者的因此B需要检查AB不能直接返回需要"等待一半以上的commit?并且存下参数"
reqCache.put(requestID, resp);
LOGGER.debug("put into cache:" + requestID);
return resp;
}
}
@ -1294,7 +1297,7 @@ public class ContractManager {
if (client.contractMeta.sigRequired) {
if (!request.verifySignature()) {
cr = new ContractResult(Status.Error, new JsonPrimitive("sign verified failed"));
rcb.onResult(JsonUtil.parseObjectAsJsonObject(cr));
rcb.onResult(JsonUtil.parseObject(cr));
return;
}
}
@ -1310,12 +1313,8 @@ public class ContractManager {
client.traffic += result.length();
client.contractStatus = ContractStatus.Executed;
JsonObject finalRet = JsonUtil.parseStringAsJsonObject(result);
if (client.getContractCopies() == 1) {
finalRet =
extractEventsFromContractResult(
cb, finalRet, client, request, start);
}
JsonObject finalRet = JsonUtil.parseString(result);
rcb.onResult(finalRet);
if (finalRet != null) {
chainOpener.writeContractResultToLocalAndLedger(
@ -1369,7 +1368,7 @@ public class ContractManager {
client.traffic += result.length();
if (client.getContractCopies() == 1) {
extractEventsFromContractResult(ocb, JsonUtil.parseStringAsJsonObject(result), client, request, start);
extractEventsFromContractResult(ocb, JsonUtil.parseString(result), client, request, start);
}
chainOpener.writeContractResultToLocalAndLedger(
result, client, request, ocb, start, System.currentTimeMillis() - start);
@ -1390,10 +1389,6 @@ public class ContractManager {
long startTime) {
try {
ContractResult cr = JsonUtil.fromJson(result, ContractResult.class);
if (null != client.contractMeta.seekFunction(request.getAction()) &&
client.contractMeta.seekFunction(request.getAction()).isView) {
cr.events = null;
}
if (null != cr.events && !cr.events.isEmpty()) {
List<REvent> msgList = cr.events;
cr.events = null;
@ -1465,53 +1460,46 @@ public class ContractManager {
} else {
statusRecorder.ensureRunning(cr);
ContractClient client = statusRecorder.getContractClient(meta.id);
switch (client.getContractType()) {
case Sole:
executeLocallyAsync(cr, rcb, hcb);
return;
case RequestOnce:
case ResponseOnce:
case Sharding:
masterStub.executeByMaster(client, rcb, cr);
break;
case RequestAllResponseAll:
case RequestAllResponseFirst:
case RequestAllResponseHalf:
case SelfAdaptiveSharding:
if (null != client.contractMeta.seekFunction(cr.getAction()) &&
client.contractMeta.exportedFunctions.get(cr.getAction()).isView) {
executeLocallyAsync(cr, rcb, hcb);
} else if (multiMeta != null && multiMeta.isMaster()) {
masterStub.executeByMaster(client, rcb, cr);
} else {
executeContractOnOtherNodes(cr, rcb);
}
break;
default:
break;
long start = System.currentTimeMillis();
ResultCallback eventExtractor = new ResultCallback() {
@Override
public void onResult(String ret) {
JsonObject result = JsonUtil.parseString(ret);
ContractManager.instance.extractEventsFromContractResult(
null, result, client, cr, start);
LOGGER.debug(
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(new Date(System.currentTimeMillis()))
+ meta.contractExecutor + " 结果是 "
+ ret
+ "\n");
rcb.onResult(result);
}
};
meta.contractExecutor.execute(cr.getRequestID(), cr, eventExtractor, hcb);
}
}
private void executeContractOnOtherNodes(ContractRequest cr, final ResultCallback rcb) {
public void executeContractOnOtherNodes(ContractRequest cr, final ResultCallback rcb) {
ContractResult result;
if (null != nodeCenterConn && null != masterStub) {
String pubKey = nodeCenterConn.routeContract(cr.getContractID());
// TODO 如果此时master正在选举中先缓存请求有必要吗
// if (null == pubKey || pubKey.equals("")) {
// LOGGER.info("Master正在崩溃选举中,先将请求缓存!");
// if (null == MasterClientRecoverMechAction.requestsToMaster) {
// MasterClientRecoverMechAction.requestsToMaster =
// new ConcurrentHashMap<>();
// if(pubKey == null ||pubKey.equals("")){
// logger.info("Master正在崩溃选举中,先将请求缓存!");
// if(MasterClientRecoverMechAction.requestsToMaster == null){
// MasterClientRecoverMechAction.requestsToMaster = new
// ConcurrentHashMap<>();
// }
//
// if(!MasterClientRecoverMechAction.requestsToMaster.containsKey(c.getContractID())){
// MasterClientRecoverMechAction.requestsToMaster
// .put(c.getContractID(), new Queue<>());
//
// MasterClientRecoverMechAction.requestsToMaster.put(c.getContractID(),new
// Queue<>());
// }
//
// MasterClientRecoverMechAction.requestsToMaster.get(c.getContractID())
// .add(new RequestToMaster(c));
// MasterClientRecoverMechAction.requestsToMaster.get(c.getContractID()).add(new
// RequestToMaster(c));
// return;
// }
@ -1553,7 +1541,7 @@ public class ContractManager {
new JsonPrimitive(
"Contract " + cr.getContractID() + " doesn't exists!!"));
}
rcb.onResult(JsonUtil.parseObjectAsJsonObject(result));
rcb.onResult(JsonUtil.parseObject(result));
}
@ -1868,7 +1856,7 @@ public class ContractManager {
desc.contractID = meta.id;
desc.contractName = meta.name;
desc.events = meta.declaredEvents;
desc.exportedFunctions = meta.getExportedFunctions();
desc.exportedFunctions = meta.exportedFunctions;
desc.type = meta.contract.getType();
desc.annotations = meta.annotations;
desc.yjsType = meta.getYjsType();
@ -2171,7 +2159,7 @@ public class ContractManager {
if (null != client) {
try {
String ret = client.get.syncGet("", "suicide", "");
JsonObject jo = JsonUtil.parseStringAsJsonObject(ret);
JsonObject jo = JsonUtil.parseString(ret);
if (jo.has("cleanSub")) {
REvent msg =
new REvent(
@ -2214,6 +2202,7 @@ public class ContractManager {
static class StrCollector extends ResultCallback {
String strRet = "{\"data\":\"Timeout\"}";
boolean hasResult = false;
long start = System.currentTimeMillis();
@Override
public void onResult(String str) {
@ -2307,7 +2296,7 @@ public class ContractManager {
// 合约状态调用次数流量统计内存占用
String traffic, storage;
long times;
Collection<FunctionDesp> exportedFunctions;
List<FunctionDesp> exportedFunctions;
Map<String, REventSemantics> events;
ContractStatusEnum contractStatus;
String contractPermission;

View File

@ -6,6 +6,8 @@ import org.bdware.sc.bean.IDSerializable;
import org.bdware.sc.event.REvent;
import org.bdware.sc.node.AnnotationNode;
import org.bdware.sc.node.YjsType;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.SingleNodeExecutor;
import java.util.*;
@ -21,6 +23,8 @@ public class ContractMeta implements IDSerializable {
List<AnnotationNode> annotations;
Set<String> dependentContracts;
transient Map<String, FunctionDesp> funCache;
public transient ContractExecutor contractExecutor;
boolean sigRequired;
String thisPermission; // 合约当前权限
/*
@ -36,6 +40,7 @@ public class ContractMeta implements IDSerializable {
// Map<Object,Object>MaskInfo;
public ContractMeta() {
contractExecutor = SingleNodeExecutor.instance;
}
public ContractStatusEnum getStatus() {
@ -117,4 +122,8 @@ public class ContractMeta implements IDSerializable {
status = ContractStatusEnum.HANGED;
isDebug = false;
}
public void setContractExecutor(ContractExecutor executor) {
this.contractExecutor = executor;
}
}

View File

@ -8,6 +8,7 @@ import org.bdware.sc.bean.Contract;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.db.CMTables;
import org.bdware.sc.db.StatusRecorder;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.util.JsonUtil;
import org.bdware.sc.util.LRUList;
@ -147,6 +148,16 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
id2ContractClient.put(meta.id, client);
runningProcess.add(meta);
}
public ContractMeta createIfNotExist(String contractID) {
ContractMeta ret = getContractMeta(contractID);
if (null == ret) {
LOGGER.info("requests don't contain contract " + contractID);
ret = new ContractMeta();
ret.id = contractID;
updateValue(ret);
}
return ret;
}
public void hangLeastUsedContractProcess() {
ContractMeta meta = runningProcess.popOldest();

View File

@ -4,8 +4,6 @@ import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.ResultCallback;
public interface MasterStub {
// String executeGlobally(ContractRequest c, OnHashCallback cb);
void executeByMaster(ContractClient client, ResultCallback rcb, ContractRequest c);
void transferToOtherNode(String pubKey, String contractID);

View File

@ -3,10 +3,8 @@ package org.bdware.sc.sequencing;
import org.bdware.sc.conn.Node;
import org.bdware.sc.units.TrustfulExecutorConnection;
public interface SequencingAlgorithm {
public interface CommitAlgorithm {
void onMessage(Node node, byte[] msg);
void setCommitter(Committer c);
void setConnection(TrustfulExecutorConnection c);
}

View File

@ -11,7 +11,7 @@ import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class PBFTAlgorithm implements SequencingAlgorithm {
public class PBFTAlgorithm implements CommitAlgorithm {
static byte[] GETPUBKEY = "GETPUBKEY".getBytes();
Committer committer;
Map<Node, PBFTMember> members;

View File

@ -4,7 +4,7 @@ import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.Node;
import org.bdware.sc.units.TrustfulExecutorConnection;
public class ViewAlgorithm implements SequencingAlgorithm {
public class ViewAlgorithm implements CommitAlgorithm {
private Committer committer;
private TrustfulExecutorConnection connection;

View File

@ -4,7 +4,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.ContractManager;
import org.bdware.sc.conn.Node;
import org.bdware.sc.sequencing.SequencingAlgorithm;
import org.bdware.sc.sequencing.CommitAlgorithm;
import java.io.Serializable;
import java.util.*;
@ -35,7 +35,7 @@ public class ContractUnitController {
unit.connection = connection;
String result = manager.startContract(req.contract);
unit.contractID = req.contract.getID();
unit.sequencingAlgorithm = algorithmFactory.create(req, unit);
unit.commitAlgorithm = algorithmFactory.create(req, unit);
unit.node2member = new HashMap<>();
units.put(req.contract.getID(), unit);
@ -55,7 +55,7 @@ public class ContractUnitController {
case Sequencing:
unit = units.get(cumsg.getContractID());
ContractUnitMember member = unit.node2member.get(node);
unit.sequencingAlgorithm.onMessage(member, cumsg.content);
unit.commitAlgorithm.onMessage(member, cumsg.content);
break;
case Unknown:
default:
@ -64,7 +64,7 @@ public class ContractUnitController {
unit = new ContractUnit();
unit.connection = connection;
unit.contractID = req2.contract.getID();
unit.sequencingAlgorithm = algorithmFactory.create(req2, unit);
unit.commitAlgorithm = algorithmFactory.create(req2, unit);
unit.node2member = new HashMap<>();
units.put(req2.contract.getID(), unit);
@ -85,7 +85,7 @@ public class ContractUnitController {
public static class ContractUnit implements TrustfulExecutorConnection, Serializable {
public String contractID;
public Map<Node, ContractUnitMember> node2member; //存储和自己在一个合约集群的其他节点信息
private transient SequencingAlgorithm sequencingAlgorithm;
private transient CommitAlgorithm commitAlgorithm;
private transient TrustfulExecutorConnection connection;
public synchronized void broadcast(ContractUnitMessage cuMessage) {
@ -99,7 +99,7 @@ public class ContractUnitController {
try {
// TODO
member = new ContractUnitMember(node);
sequencingAlgorithm.onMessage(member, msg.content);
commitAlgorithm.onMessage(member, msg.content);
node2member.put(node, member);
return "success";
} catch (Exception e) {

View File

@ -30,7 +30,6 @@ public class MultiContractMeta implements IDSerializable {
public String invokeID; // TODO
public ContractExecType type;
public ContractUnitStatus unitStatus = ContractUnitStatus.CommonMode;
public transient ContractExecutor contractExecutor;
public transient PriorityQueue<ContractRequest> queue; // contract request
public transient Map<Integer, String> uniReqIDMap; // 用于请求
public transient Map<Integer, ResultCallback> resultMap; // 用于请求
@ -213,7 +212,4 @@ public class MultiContractMeta implements IDSerializable {
return masterNode;
}
public void setContractExecutor(ContractExecutor contractExecutor) {
this.contractExecutor = contractExecutor;
}
}

View File

@ -1,8 +1,8 @@
package org.bdware.sc.units;
import org.bdware.sc.sequencing.SequencingAlgorithm;
import org.bdware.sc.sequencing.CommitAlgorithm;
import org.bdware.sc.units.ContractUnitController.ContractUnit;
public interface SequencingAlgorithmFactory {
public SequencingAlgorithm create(ContractUnitStartRequest req, ContractUnit unit);
public CommitAlgorithm create(ContractUnitStartRequest req, ContractUnit unit);
}

View File

@ -1,11 +1,9 @@
package org.bdware.server.trustedmodel;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
public interface ContractExecutor {
void execute(String requestID, ResultCallback rc, ContractRequest req);
default void close() {
}
void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb);
}

View File

@ -0,0 +1,18 @@
package org.bdware.server.trustedmodel;
import org.bdware.sc.ContractManager;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
public class SingleNodeExecutor implements ContractExecutor {
public static SingleNodeExecutor instance = new SingleNodeExecutor();
private ContractManager cm;
public static void setContractManager(ContractManager cm) {
instance.cm = cm;
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) {
cm.executeLocallyAsync(req, rcb, hcb);
}
}