build: config spotless plugin and reformat code

This commit is contained in:
Frank.R.Wu 2023-06-15 11:08:12 +08:00
parent 6cd99cf17a
commit fc453d56fc
46 changed files with 1315 additions and 1584 deletions

View File

@ -2,6 +2,9 @@ plugins {
id 'java'
id 'java-library'
}
apply from: '../spotless.gradle'
repositories {
mavenCentral()
}

View File

@ -9,28 +9,14 @@ public interface ChainOpener {
String register(String arg);
void writeContractResultToLocalAndLedger(
String result,
ContractClient client,
ContractRequest contractRequest,
OnHashCallback cb, long start, long l);
void writeContractResultToLocalAndLedger(String result, ContractClient client,
ContractRequest contractRequest, OnHashCallback cb, long start, long l);
void writeToChain(
OnHashCallback cb,
String from,
String to,
String data,
String requestID,
void writeToChain(OnHashCallback cb, String from, String to, String data, String requestID,
String namedLedger);
void writeToChainWithContract(
OnHashCallback cb,
String from,
String to,
String data,
String requestID,
String contractID,
String namedLedger);
void writeToChainWithContract(OnHashCallback cb, String from, String to, String data,
String requestID, String contractID, String namedLedger);
JsonElement getLedgerParams();
}

View File

@ -42,17 +42,17 @@ public class ContractClient {
long times = 0L;
long traffic = 0L;
long memory;
// boolean withInsnLimit; // 是否需要计算gas
// boolean withInsnLimit; // 是否需要计算gas
ContractStatus contractStatus;
boolean isRunning;
// public boolean changeDumpPeriod(String period) {
// System.out.println("[ContractClient] changeDumpPeriod " + period);
// String res = get.syncGet("", "changeDumpPeriod",period);
// if(res.equals("success"))
// return true;
// return false;
// }
// public boolean changeDumpPeriod(String period) {
// System.out.println("[ContractClient] changeDumpPeriod " + period);
// String res = get.syncGet("", "changeDumpPeriod",period);
// if(res.equals("success"))
// return true;
// return false;
// }
long lastUpdate = 0;
ContractClient(Contract c) {
@ -76,7 +76,7 @@ public class ContractClient {
// current ContractManager
// We only consider the.
public ContractClient(String host, int startPort) {
// LOGGER.info("ContractClient 构造器 : ");
// LOGGER.info("ContractClient 构造器 : ");
contractMeta = new ContractMeta();
outputTracer = new ContractPrinter();
errorTracer = new ContractPrinter();
@ -133,26 +133,21 @@ public class ContractClient {
}
private void initProps() {
// LOGGER.info("initProps ---- position-----1");
// LOGGER.info("initProps ---- position-----1");
contractMeta.name = get.syncGet("", "getContractName", "");
// LOGGER.info("initProps ---- position-----2");
// LOGGER.info("initProps ---- position-----2");
String strEvent = get.syncGet("", "getDeclaredEvents", "");
LOGGER.debug("event: " + strEvent);
contractMeta.declaredEvents =
JsonUtil.fromJson(
strEvent, new TypeToken<Map<String, REventSemantics>>() {
}.getType());
// LOGGER.info("initProps ---- position-----3");
contractMeta.dependentContracts = JsonUtil.fromJson(
get.syncGet("", "getDependentContracts", ""),
new TypeToken<Set<String>>() {
}.getType());
// LOGGER.info("initProps ---- position-----4");
contractMeta.declaredEvents = JsonUtil.fromJson(strEvent,
new TypeToken<Map<String, REventSemantics>>() {}.getType());
// LOGGER.info("initProps ---- position-----3");
contractMeta.dependentContracts =
JsonUtil.fromJson(get.syncGet("", "getDependentContracts", ""),
new TypeToken<Set<String>>() {}.getType());
// LOGGER.info("initProps ---- position-----4");
contractMeta.exportedFunctions =
JsonUtil.fromJson(
get.syncGet("", "getExportedFunctions", ""),
new TypeToken<List<FunctionDesp>>() {
}.getType());
JsonUtil.fromJson(get.syncGet("", "getExportedFunctions", ""),
new TypeToken<List<FunctionDesp>>() {}.getType());
contractMeta.logDetail = new HashMap<>();
for (FunctionDesp func : contractMeta.exportedFunctions) {
StringBuilder str = new StringBuilder();
@ -164,8 +159,7 @@ public class ContractClient {
}
if (anno.getType().equals("LogLocation")) {
for (String logLoc : anno.getArgs()) {
if (logLoc.equals("\"dataware\"")
|| logLoc.equals("\"bdledger\"")
if (logLoc.equals("\"dataware\"") || logLoc.equals("\"bdledger\"")
|| logLoc.equals("\"bdledger:\"")) {
str.append("bdcontract;");
} else if (logLoc.startsWith("\"bdledger:") && logLoc.length() > 11) {
@ -178,42 +172,39 @@ public class ContractClient {
}
contractMeta.logDetail.put(func.functionName, str.toString());
}
// LOGGER.info("initProps ---- position-----5");
// LOGGER.info("initProps ---- position-----5");
try {
String anno = get.syncGet("", "getAnnotations", "");
contractMeta.annotations =
JsonUtil.fromJson(anno, new TypeToken<List<AnnotationNode>>() {
}.getType());
JsonUtil.fromJson(anno, new TypeToken<List<AnnotationNode>>() {}.getType());
} catch (Exception e) {
// supoort contract process before version 0.70
contractMeta.annotations = new ArrayList<>();
}
// LOGGER.info("initProps ---- position-----6");
// LOGGER.info("initProps ---- position-----6");
contractMeta.sigRequired = Boolean.parseBoolean(get.syncGet("", "isSigRequired", ""));
// LOGGER.info("initProps ---- position-----7");
// LOGGER.info("initProps ---- position-----7");
contractMeta.thisPermission = get.syncGet("", "showPermission", "");
// LOGGER.info("initProps ---- position-----8");
// LOGGER.info("initProps ---- position-----8");
isRunning = true;
// LOGGER.info("initProps ---- position-----9");
// LOGGER.info("initProps ---- position-----9");
get.syncGet("", "registerMangerPort", ContractManager.cPort.getCMPort() + "");
contractMeta.contract =
JsonUtil.fromJson(get.syncGet("", "getContract", ""), Contract.class);
contractMeta.id = contractMeta.contract.getID();
get.setOfflineExceptionHandler(
new SocketGet.OfflineHandler() {
@Override
public void onException(SocketGet socketGet, Exception e) {
if (e.getMessage().contains("Connection refused")) {
contractMeta.status = ContractStatusEnum.HANGED;
ContractManager.instance.statusRecorder.resumeContractProcess(
contractMeta.id);
}
}
});
get.setOfflineExceptionHandler(new SocketGet.OfflineHandler() {
@Override
public void onException(SocketGet socketGet, Exception e) {
if (e.getMessage().contains("Connection refused")) {
contractMeta.status = ContractStatusEnum.HANGED;
ContractManager.instance.statusRecorder.resumeContractProcess(contractMeta.id);
}
}
});
loadTimesAndTraffic();
// LOGGER.info("initProps ---- position-----10");
// LOGGER.info("initProps ---- position-----10");
// LOGGER.debug("======= registerPort:" + ret + "-->" + ContractManager.startPort);
}
@ -233,16 +224,13 @@ public class ContractClient {
String osJni = ((HardwareInfo.type == OSType.linux) ? "/jni/linux" : "/jni/mac");
darg += jniPath.getAbsolutePath() + osJni;
if (!new File(classpath).exists()) {
ContractResult r =
new ContractResult(
Status.Exception, new JsonPrimitive("incorrect path: yjs.jar"));
ContractResult r = new ContractResult(Status.Exception,
new JsonPrimitive("incorrect path: yjs.jar"));
return JsonUtil.toJson(r);
}
if (!new File(jniPath, "libs").exists()) {
ContractResult r =
new ContractResult(
Status.Exception,
new JsonPrimitive("incorrect path: yjs.jar, missing libs"));
ContractResult r = new ContractResult(Status.Exception,
new JsonPrimitive("incorrect path: yjs.jar, missing libs"));
return JsonUtil.toJson(r);
}
int startPort = ContractManager.cPort.getPortAndInc();
@ -251,13 +239,17 @@ public class ContractClient {
pbParameters.add("-Dfile.encoding=UTF-8");
pbParameters.add(darg);
if (contractMeta.contract.getRemoteDebugPort() != 0) {
pbParameters.add(String.format("-agentlib:jdwp=transport=dt_socket,address=%d,server=y,suspend=n", contractMeta.contract.getRemoteDebugPort()));
pbParameters.add(String.format(
"-agentlib:jdwp=transport=dt_socket,address=%d,server=y,suspend=n",
contractMeta.contract.getRemoteDebugPort()));
}
File classParent = new File(classpath).getParentFile();
if (contractMeta.contract.isDebug()) {
pbParameters.add("-Dlog4j.configurationFile=" + new File(classParent, "log4j2.debug.properties").getAbsolutePath());
pbParameters.add("-Dlog4j.configurationFile="
+ new File(classParent, "log4j2.debug.properties").getAbsolutePath());
} else {
pbParameters.add("-Dlog4j.configurationFile=" + new File(classParent, "log4j2.properties").getAbsolutePath());
pbParameters.add("-Dlog4j.configurationFile="
+ new File(classParent, "log4j2.properties").getAbsolutePath());
}
pbParameters.add("-jar");
pbParameters.add(classpath);
@ -268,8 +260,9 @@ public class ContractClient {
ProcessBuilder builder;
String[] result = new String[pbParameters.size()];
pbParameters.toArray(result);
Constructor<ProcessBuilder> pbc = ProcessBuilder.class.getDeclaredConstructor(String[].class);
builder = pbc.newInstance(new Object[]{result});
Constructor<ProcessBuilder> pbc =
ProcessBuilder.class.getDeclaredConstructor(String[].class);
builder = pbc.newInstance(new Object[] {result});
File directory = new File("./");
LOGGER.debug("[CMD] path: " + directory.getAbsolutePath());
@ -294,10 +287,8 @@ public class ContractClient {
try {
// Set contractPort to max(mainPort, contractPort)
int portIndex = status.indexOf("mainPort");
int port =
Integer.parseInt(
status.substring(portIndex + 9, portIndex + 14)
.replaceAll("\\s+", ""));
int port = Integer.parseInt(
status.substring(portIndex + 9, portIndex + 14).replaceAll("\\s+", ""));
if (port != startPort) {
ContractManager.cPort.reSetPort(port + 1);
}
@ -327,20 +318,20 @@ public class ContractClient {
}
get.syncGet("", "registerMangerPort", String.valueOf(ContractManager.cPort.getCMPort()));
MultiContractMeta multiContractMeta =
ContractManager.instance.multiContractRecorder.getMultiContractMeta(contractMeta.getID());
MultiContractMeta multiContractMeta = ContractManager.instance.multiContractRecorder
.getMultiContractMeta(contractMeta.getID());
if (multiContractMeta != null && multiContractMeta.getMembers() != null) {
String setMemberResult = get.syncGet(
"", "setMembers", JsonUtil.toJson(multiContractMeta.getMembers()));
String setMemberResult =
get.syncGet("", "setMembers", JsonUtil.toJson(multiContractMeta.getMembers()));
LOGGER.info("setMember:" + setMemberResult);
} else {
LOGGER.info("setMember ignore, meta:" + (multiContractMeta == null) + (multiContractMeta == null ? "NULL" : " members:" + multiContractMeta.getMembers()));
LOGGER.info("setMember ignore, meta:" + (multiContractMeta == null)
+ (multiContractMeta == null ? "NULL"
: " members:" + multiContractMeta.getMembers()));
}
if (isBundlePath(contractMeta.contract.getScriptStr())) {
status =
get.syncGet(
"", "setContractBundle", JsonUtil.toJson(contractMeta.contract));
status = get.syncGet("", "setContractBundle", JsonUtil.toJson(contractMeta.contract));
} else {
status = get.syncGet("", "setContract", JsonUtil.toJson(contractMeta.contract));
}
@ -369,16 +360,12 @@ public class ContractClient {
return;
}
lastUpdate = System.currentTimeMillis();
get.asyncGet(
"",
".UsedMemory",
"",
new ResultCallback() {
@Override
public void onResult(String str) {
memory = Long.parseLong(str);
}
});
get.asyncGet("", ".UsedMemory", "", new ResultCallback() {
@Override
public void onResult(String str) {
memory = Long.parseLong(str);
}
});
} catch (Exception e) {
// e.printStackTrace();
}
@ -457,12 +444,10 @@ public class ContractClient {
}
public void loadTimesAndTraffic() {
String tempTime2 =
KeyValueDBUtil.instance.getValue(
CMTables.ContractInfo.toString(), contractMeta.name + "-Times");
String tempTraffic2 =
KeyValueDBUtil.instance.getValue(
CMTables.ContractInfo.toString(), contractMeta.name + "-Traffic");
String tempTime2 = KeyValueDBUtil.instance.getValue(CMTables.ContractInfo.toString(),
contractMeta.name + "-Times");
String tempTraffic2 = KeyValueDBUtil.instance.getValue(CMTables.ContractInfo.toString(),
contractMeta.name + "-Traffic");
if (tempTime2 != null && !tempTime2.equals("")) {
times = Long.parseLong(tempTime2);
}
@ -472,15 +457,15 @@ public class ContractClient {
}
public void saveTimesAndTraffic() {
KeyValueDBUtil.instance.setValue(
CMTables.ContractInfo.toString(), contractMeta.name + "-Times", times + "");
KeyValueDBUtil.instance.setValue(
CMTables.ContractInfo.toString(), contractMeta.name + "-Traffic", traffic + "");
KeyValueDBUtil.instance.setValue(CMTables.ContractInfo.toString(),
contractMeta.name + "-Times", times + "");
KeyValueDBUtil.instance.setValue(CMTables.ContractInfo.toString(),
contractMeta.name + "-Traffic", traffic + "");
}
public void setMask(JsonObject args) {
//get.asyncGet("",,,,);
// get.asyncGet("",,,,);
get.asyncGet("", "setMask", args.toString(), null);
}
@ -488,7 +473,7 @@ public class ContractClient {
public void setProjectConfig(String args) {
get.asyncGet("", "setProjectConfig", args, null);
}
//public String
// public String
static class ReqScript {
String mode;

File diff suppressed because it is too large Load Diff

View File

@ -31,15 +31,8 @@ public class ContractMeta implements IDSerializable {
boolean sigRequired;
String thisPermission; // 合约当前权限
/*
{
"name": "dx_substr",
"parameter":
{
"columnIndex":5,
"paras":["1","3"]
}
},
*/
* { "name": "dx_substr", "parameter": { "columnIndex":5, "paras":["1","3"] } },
*/
// Map<Object,Object>MaskInfo;
public ContractMeta() {
@ -116,7 +109,8 @@ public class ContractMeta implements IDSerializable {
private FunctionDesp seekFunction(String action) {
for (FunctionDesp desp : exportedFunctions) {
if (desp != null && desp.functionName.equals(action)) return desp;
if (desp != null && desp.functionName.equals(action))
return desp;
}
return null;
}

View File

@ -1,10 +1,8 @@
package org.bdware.sc;
public enum ContractStatusEnum {
INIT(0, "初始化"),
RUNNING(1, "内存运行中"),
HANGED(2, "硬盘运行中"),
KILLED(3, "已停止");
INIT(0, "初始化"), RUNNING(1, "内存运行中"), HANGED(2, "硬盘运行中"), KILLED(3, "已停止");
private Integer code;
private String desc;

View File

@ -27,28 +27,24 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
// 对外会有一个getContractClient的过程如果只是想用元信息的改成getContractClientMeta
// 如果是想调用的再使用getContractClient
// 合约执行的状态 key:合同id value:合同状态
// private static final String DB_NAME = CMTables.ContractInfo.toString();
// 合约执行的状态 key:合同id value:合同状态
// private static final String DB_NAME = CMTables.ContractInfo.toString();
static {
final Object flag = new Object();
// 调度一个task在delayms后开始调度每次调度完后最少等待periodms后才开始调
ContractManager.scheduledThreadPool.scheduleWithFixedDelay(
() -> {
boolean cleared = dealTimerContractProcess();
if (cleared) {
synchronized (flag) {
try {
flag.wait(14000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
ContractManager.scheduledThreadPool.scheduleWithFixedDelay(() -> {
boolean cleared = dealTimerContractProcess();
if (cleared) {
synchronized (flag) {
try {
flag.wait(14000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
},
0,
1,
TimeUnit.SECONDS);
}
}
}, 0, 1, TimeUnit.SECONDS);
}
// contract id to client, changes when some contract is started/stopped/resumed
@ -65,32 +61,32 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
}
public static boolean dealTimerContractProcess() {
// if (id2ContractStatus == null) {
// String value = readDataFromDB(id2ContractStatusKey);
// id2ContractStatus = GSON.fromJson(value, Map.class);
// if (id2ContractStatus == null) {
// return true;
// }
// for (Map.Entry<String, ContractStatusEnum> contractStatusEnumEntry :
// if (id2ContractStatus == null) {
// String value = readDataFromDB(id2ContractStatusKey);
// id2ContractStatus = GSON.fromJson(value, Map.class);
// if (id2ContractStatus == null) {
// return true;
// }
// for (Map.Entry<String, ContractStatusEnum> contractStatusEnumEntry :
// id2ContractStatus.entrySet()) {
// //获取合同id
// String contractID = contractStatusEnumEntry.getKey();
// ContractStatusEnum contractStatusEnum =
// //获取合同id
// String contractID = contractStatusEnumEntry.getKey();
// ContractStatusEnum contractStatusEnum =
// contractStatusEnumEntry.getValue();
// //没有状态信息就停止
// if (contractStatusEnum == null) {
// continue;
// }
// //拥有合同状态信息的如果是已经执行结束我们将合同挂起
// if
// //没有状态信息就停止
// if (contractStatusEnum == null) {
// continue;
// }
// //拥有合同状态信息的如果是已经执行结束我们将合同挂起
// if
// (contractStatusEnum.getCode().equals(ContractStatusEnum.STORE.getCode())) {
// resumeContractProcess(contractID);
// } else if
// resumeContractProcess(contractID);
// } else if
// (contractStatusEnum.getCode().equals(ContractStatusEnum.DONE.getCode())) {
// hangUpContractProcess(contractID);
// }
// }
// }
// hangUpContractProcess(contractID);
// }
// }
// }
return true;
}
@ -102,10 +98,11 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
// TODO 是不是还有别的这只是在内存里的哦
try {
for (ContractClient c : id2ContractClient.values()) {
if (Integer.parseInt(c.getPID()) == pid) return true;
if (Integer.parseInt(c.getPID()) == pid)
return true;
}
} catch (Exception e) {
// e.printStackTrace();
// e.printStackTrace();
}
return false;
}
@ -174,9 +171,11 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
// TODO index name to contract
public ContractMeta getContractMeta(String idOrNameOrDOI) {
if (idOrNameOrDOI == null) return null;
if (idOrNameOrDOI == null)
return null;
ContractMeta meta = getStatus().get(idOrNameOrDOI);
if (meta != null) return meta;
if (meta != null)
return meta;
for (ContractMeta cc : getStatus().values()) {
if (cc.status == ContractStatusEnum.RUNNING || cc.status == ContractStatusEnum.HANGED)
if (idOrNameOrDOI.equals(cc.name)) {
@ -184,8 +183,10 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
}
}
for (ContractMeta cc : getStatus().values()) {
if (cc.name == null) continue;
if (cc.contract == null) continue;
if (cc.name == null)
continue;
if (cc.contract == null)
continue;
if (idOrNameOrDOI.equals(cc.name)) {
return cc;
}
@ -224,10 +225,8 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
LOGGER.info("need dump when stop");
String contractName = client.getContractName();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss");
File f =
new File(
ContractManager.dir + "/memory/" + contractName,
df.format(new Date()));
File f = new File(ContractManager.dir + "/memory/" + contractName,
df.format(new Date()));
client.get.syncGet("", "getMemoryDump", f.getAbsolutePath());
ContractManager.instance.addLocalContractLog("dumpContract", meta);
}
@ -281,9 +280,9 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
resumeContractProcess(contractID);
}
// 似乎这样就可以了
// } else if (meta.status == ContractStatusEnum.RUNNING) {
// hangUpContractProcess(contractID);
// }
// } else if (meta.status == ContractStatusEnum.RUNNING) {
// hangUpContractProcess(contractID);
// }
}
/**
@ -291,15 +290,15 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
*/
public synchronized void hangUpContractProcess(String contractID) {
// dump stop 这里是正常流程代码
// ContractClient contractClient = getContractClientById(contractID);
// if (contractClient == null) {
// return;
// }
// //挂起
// Contract contract = contractClient.contract;
// ContractManager.instance.dumpContract(contract.getID(), null);
// setContractClient(contractID, contractClient);
// ContractManager.instance.hangUpStopContract(contract.getID());
// ContractClient contractClient = getContractClientById(contractID);
// if (contractClient == null) {
// return;
// }
// //挂起
// Contract contract = contractClient.contract;
// ContractManager.instance.dumpContract(contract.getID(), null);
// setContractClient(contractID, contractClient);
// ContractManager.instance.hangUpStopContract(contract.getID());
// todo 临时想强制测试下效果
ContractMeta contractMeta = getContractMeta(contractID);
// 挂起
@ -313,13 +312,12 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
client.saveTimesAndTraffic();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd.HH_mm_ss"); // 设置日期格式
File f =
new File(
ContractManager.dir + "/memory/" + contractMeta.name,
df.format(new Date()));
File f = new File(ContractManager.dir + "/memory/" + contractMeta.name,
df.format(new Date()));
File parent = f.getParentFile();
if (!parent.exists()) {
LOGGER.trace("make directory " + parent.getAbsolutePath() + ":" + parent.mkdirs());
LOGGER.trace(
"make directory " + parent.getAbsolutePath() + ":" + parent.mkdirs());
}
ContractManager.instance.dumpContract(contract.getID(), f.getAbsolutePath());
ContractManager.instance.invokeContractSuicide(client);
@ -335,7 +333,7 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
// 启动合约
// 加载所需要的内存
// 该方法调用方法已经考虑了内存问题但是内存不足的时候重试或者等待或者将合约压到失败中稍后重试可以尝试设计消息重试机制
// 如果是内存中则唤起合同信息 todo 内存不足 关闭合约追赶补偿数据
// 如果是内存中则唤起合同信息 todo 内存不足 关闭合约追赶补偿数据
if (meta.status == ContractStatusEnum.HANGED || meta.status == ContractStatusEnum.KILLED) {
ContractStatusEnum preStatus = meta.status;
// TODO Resume里复用的是start逻辑启动完了就成Running了哦可能不能这么来
@ -344,17 +342,11 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
JsonUtil.fromJson(startContractResult, ContractResult.class);
if (contractResult.status == ContractResult.Status.Error) {
// 记录错误日志
ContractManager.instance.addLocalContractLog(
"resumeContract error",
meta.contract.getID(),
meta.name,
meta.contract.getOwner());
ContractManager.instance.addLocalContractLog("resumeContract error",
meta.contract.getID(), meta.name, meta.contract.getOwner());
} else {
ContractManager.instance.addLocalContractLog(
"resumeContract success",
meta.contract.getID(),
meta.name,
meta.contract.getOwner());
ContractManager.instance.addLocalContractLog("resumeContract success",
meta.contract.getID(), meta.name, meta.contract.getOwner());
// TODO 可能会重复load相同的镜像
// 如果是killed 只根据manifest去判断是否加载memory
// 可增加一个判断如果hanged朋manifest里是加载memory这里就不再加载了
@ -367,8 +359,8 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
client.contractMeta.setStatus(ContractStatusEnum.RUNNING);
updateValue(client.contractMeta);
// 还要判断一发是不是已有permission的值如果有要setPermission一把
// meta.status = ContractStatusEnum.RUNNING;
// updateValue(meta);
// meta.status = ContractStatusEnum.RUNNING;
// updateValue(meta);
}
}
}
@ -416,30 +408,23 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
ContractManager.instance.invokeContractSuicide(client);
return false;
}
LOGGER.debug(
String.format(
"CP listened to port %d:\n\tID=%s\n\tName=%s\n\tType=%s\n"
+ "\tKey=%s\n\tPubKey=%s\n\tCopies=%d\n\t%s",
i,
client.getContractID(),
client.contractMeta.name,
client.getContractType(),
client.getContractKey(),
client.getPubkey(),
client.getContractCopies(),
client.contractMeta.contract.startInfo));
LOGGER.debug(String.format(
"CP listened to port %d:\n\tID=%s\n\tName=%s\n\tType=%s\n"
+ "\tKey=%s\n\tPubKey=%s\n\tCopies=%d\n\t%s",
i, client.getContractID(), client.contractMeta.name,
client.getContractType(), client.getContractKey(), client.getPubkey(),
client.getContractCopies(), client.contractMeta.contract.startInfo));
client.get.syncGet("", "setDir", ContractManager.dir);
// String str = client.getIdentifier();
// String str = client.getIdentifier();
client.get.syncGet("", "getDumpPeriod", "a");
client.get.syncGet("", "startAutoDump", "a");
createContract(client);
LOGGER.info(
String.format("reconnect to port %d: contract %s",
i, client.contractMeta.name));
LOGGER.info(String.format("reconnect to port %d: contract %s", i,
client.contractMeta.name));
return true;
}
} catch (Exception e) {
//just ignore
// just ignore
}
return false;
}

View File

@ -1,15 +1,13 @@
package org.bdware.sc;
import java.util.HashMap;
import java.util.Map;
//Node
// Node
public class MasterElectTimeRecorder {
public static Long findMasterCrash; //发现master崩溃的时间
public static Long newMasterStart; //作为新的master newMasterStart被调用时间
public static Long setLocalMaster; //在本地标记自己为master
public static Long slaveConnectFinish; //被slave连接完成
public static Long masterStartRecover; //开始master恢复
public static Long masterRecoverFinish; //master恢复结束
public static Long findMasterCrash; // 发现master崩溃的时间
public static Long newMasterStart; // 作为新的master newMasterStart被调用时间
public static Long setLocalMaster; // 在本地标记自己为master
public static Long slaveConnectFinish; // 被slave连接完成
public static Long masterStartRecover; // 开始master恢复
public static Long masterRecoverFinish; // master恢复结束
}

View File

@ -20,10 +20,8 @@ public class MultiContractRecorder extends StatusRecorder<MultiContractMeta> {
for (MultiContractMeta meta : getStatus().values()) {
try {
meta.initQueue();
int lastExeSeq =
Integer.parseInt(
KeyValueDBUtil.instance.getValue(
CMTables.LastExeSeq.toString(), meta.getContractID()));
int lastExeSeq = Integer.parseInt(KeyValueDBUtil.instance
.getValue(CMTables.LastExeSeq.toString(), meta.getContractID()));
meta.setLastExeSeq(lastExeSeq);
} catch (Exception e) {
LOGGER.error(e);
@ -32,13 +30,15 @@ public class MultiContractRecorder extends StatusRecorder<MultiContractMeta> {
}
public MultiContractMeta getMultiContractMeta(String idOrNameOrDOI) {
if (idOrNameOrDOI == null) return null;
if (idOrNameOrDOI == null)
return null;
ContractMeta meta = ContractManager.instance.statusRecorder.getContractMeta(idOrNameOrDOI);
return getMultiContractMeta(meta);
}
public MultiContractMeta getMultiContractMeta(ContractMeta meta) {
if (meta == null) return null;
if (meta == null)
return null;
return getStatus().get(meta.id);
}

View File

@ -7,7 +7,7 @@ import org.bdware.sc.db.StatusRecorder;
public class ProjectRecorder extends StatusRecorder<ProjectConfig> {
static final String dbName = CMTables.ProjectConfig.toString();
static final String prefix = "Project_Config_";
// private static final Logger LOGGER = LogManager.getLogger(ProjectRecorder.class);
// private static final Logger LOGGER = LogManager.getLogger(ProjectRecorder.class);
public ProjectRecorder(String dir) {
super(dir, dbName, prefix);

View File

@ -4,41 +4,41 @@ import java.util.HashMap;
import java.util.Map;
public class RecoverMechTimeRecorder {
//slave记录
public static Long startCMHttpServer; //启动
public static Long startFinish; //启动完成开始连接NC
public static Long connectNCFinish; //连接NC完成
// slave记录
public static Long startCMHttpServer; // 启动
public static Long startFinish; // 启动完成开始连接NC
public static Long connectNCFinish; // 连接NC完成
public static Long startReconnectCP; //开始重连CP
public static Long reconnectCPFinish; //重连CP完成
public static Long startReconnectCP; // 开始重连CP
public static Long reconnectCPFinish; // 重连CP完成
public static Long startQueryMaster; //开始查看master是谁
public static Long queryMasterFinish; //查到master是谁开始连接master
public static Long connectMasterFinish; //连接master完成
public static Long startQueryMaster; // 开始查看master是谁
public static Long queryMasterFinish; // 查到master是谁开始连接master
public static Long connectMasterFinish; // 连接master完成
public static Long startRecover; //开始恢复slave的askForRecover被调用
public static Long startRecover; // 开始恢复slave的askForRecover被调用
public static Long startCommonRecover; //slave开始从common恢复
public static Long startStableRecover; //slave开始从stable恢复
public static Long startCommonRecover; // slave开始从common恢复
public static Long startStableRecover; // slave开始从stable恢复
public static Long stableLoadFinish; //stable恢复模式load完成
public static Long stableRedoFinish; //stable恢复模式redo完成
public static Long stableLoadFinish; // stable恢复模式load完成
public static Long stableRedoFinish; // stable恢复模式redo完成
public static Long startRedoTransFromMaster; //redo从master传来的trans
public static Long startRedoTransFromMaster; // redo从master传来的trans
//master记录,key是slave的pubKey
public static Map<String,Long> masterStartRecoverNode = new HashMap<String, Long>(); //master的askForRecover被调用
// master记录,key是slave的pubKey
public static Map<String, Long> masterStartRecoverNode = new HashMap<String, Long>(); // master的askForRecover被调用
public static Map<String,Long> startJudgeRecoverMethod = new HashMap<String,Long>(); //开始判断某个节点的恢复方式
public static Map<String,Long> judgeRecoverMethodFinish = new HashMap<String,Long>(); //判断某节点恢复方式完成
public static Map<String, Long> startJudgeRecoverMethod = new HashMap<String, Long>(); // 开始判断某个节点的恢复方式
public static Map<String, Long> judgeRecoverMethodFinish = new HashMap<String, Long>(); // 判断某节点恢复方式完成
public static Map<String,Long> startRecoverFromCommon = new HashMap<String,Long>(); //slave从common恢复master的nodeRestartFromCommonMode方法开始时间
public static Map<String,Long> writeCEIStart = new HashMap<String, Long>(); //开始dumpContract等cei信息
public static Map<String,Long> finishWriteCEI = new HashMap<String, Long>(); //将cei写入文件完成
public static Map<String, Long> startRecoverFromCommon = new HashMap<String, Long>(); // slave从common恢复master的nodeRestartFromCommonMode方法开始时间
public static Map<String, Long> writeCEIStart = new HashMap<String, Long>(); // 开始dumpContract等cei信息
public static Map<String, Long> finishWriteCEI = new HashMap<String, Long>(); // 将cei写入文件完成
public static Map<String,Long> startRecoverFromStable = new HashMap<String,Long>(); //slave从common恢复master的restartFromStableMode方法开始时间
public static Map<String, Long> startRecoverFromStable = new HashMap<String, Long>(); // slave从common恢复master的restartFromStableMode方法开始时间
public static Map<String,Long> recoverFinish = new HashMap<String,Long>();
public static Map<String, Long> recoverFinish = new HashMap<String, Long>();
}

View File

@ -1,11 +1,11 @@
package org.bdware.sc.consistency;
public class Pair<T1, T2> {
public T1 first;
public T2 second;
public T1 first;
public T2 second;
public Pair(T1 t1, T2 t2) {
first = t1;
second = t2;
}
public Pair(T1 t1, T2 t2) {
first = t1;
second = t2;
}
}

View File

@ -65,9 +65,9 @@ public class PBFTAlgorithm implements CommitAlgorithm {
LOGGER.info("recv: " + pbftMessage.getDisplayStr());
switch (pbftMessage.type) {
case AddMember:
// PBFTMember member = PBFTMember.parse(pbftMessage.content);
// if (member != null)
// members.put(sender, member);
// PBFTMember member = PBFTMember.parse(pbftMessage.content);
// if (member != null)
// members.put(sender, member);
// case DeleteMember:
// JUST ignore
break;
@ -79,7 +79,8 @@ public class PBFTAlgorithm implements CommitAlgorithm {
prepareMsg = new PBFTMessage();
prepareMsg.order = allocatedID.incrementAndGet();
prepareMsg.type = PBFTType.PrePrepare;
prepareMsg.content = (java.util.Arrays.hashCode(pbftMessage.content) + "").getBytes();
prepareMsg.content =
(java.util.Arrays.hashCode(pbftMessage.content) + "").getBytes();
temp = new PCInfo();
temp.request = pbftMessage;
@ -108,7 +109,8 @@ public class PBFTAlgorithm implements CommitAlgorithm {
break;
case Prepare:
temp = info.get(pbftMessage.order);
LOGGER.info("receive Prepare from:" + pbftMessage.sendID + " -> " + pbftMessage.order);
LOGGER.info(
"receive Prepare from:" + pbftMessage.sendID + " -> " + pbftMessage.order);
if (temp == null) {
PCInfo pcInfo = new PCInfo();
pcInfo.buff.add(pbftMessage);
@ -155,7 +157,8 @@ public class PBFTAlgorithm implements CommitAlgorithm {
prepareMsg.order = pbftMessage.order;
prepareMsg.type = PBFTType.PrePrepare;
temp = info.get(prepareMsg.order);
prepareMsg.content = (java.util.Arrays.hashCode(temp.request.content) + "").getBytes();
prepareMsg.content =
(java.util.Arrays.hashCode(temp.request.content) + "").getBytes();
broadcast(pbftMessage);
}
default:
@ -196,18 +199,19 @@ public class PBFTAlgorithm implements CommitAlgorithm {
private void requestPrePrepareFromMaster(long order) {
// PBFTMessage message = new PBFTMessage();
// message.type = PBFTType.ReSend;
// message.order = order;
// message.content = new byte[1];
// sendToMaster(message);
// PBFTMessage message = new PBFTMessage();
// message.type = PBFTType.ReSend;
// message.order = order;
// message.content = new byte[1];
// sendToMaster(message);
LOGGER.info("request Resend");
}
private void matchPrePrepareFromOriginReqeust(int hash, PBFTMessage pbftMessage) {
for (PCInfo pcInfo : info.values()) {
for (PBFTMessage msg : pcInfo.buff) {
if (msg.type == PBFTType.PrePrepare && Integer.parseInt(new String(msg.content)) == hash) {
if (msg.type == PBFTType.PrePrepare
&& Integer.parseInt(new String(msg.content)) == hash) {
handlePrePrepare(msg, original.get(hash).second);
return;
}
@ -220,10 +224,8 @@ public class PBFTAlgorithm implements CommitAlgorithm {
}
private void retryLater(int delay, final Node sender, final PBFTMessage pbftMessage) {
ContractManager.scheduledThreadPool.schedule(
() -> onPBFTMessage(sender, pbftMessage),
delay,
TimeUnit.MILLISECONDS);
ContractManager.scheduledThreadPool.schedule(() -> onPBFTMessage(sender, pbftMessage),
delay, TimeUnit.MILLISECONDS);
}
private void handlePrePrepare(PBFTMessage pbftMessage, PBFTMessage req) {
@ -372,8 +374,8 @@ public class PBFTAlgorithm implements CommitAlgorithm {
if (!isPrePrepareReceived) {
return false;
}
// System.out.println("---: updatePrepare, size:" + prepare.size() + " -->"
// + (center.members.size() / 3 * 2 + 1) + " --senderID:" + message.sendID);
// System.out.println("---: updatePrepare, size:" + prepare.size() + " -->"
// + (center.members.size() / 3 * 2 + 1) + " --senderID:" + message.sendID);
if (prepare.size() > center.members.size() / 3 * 2) {
isSendCommit = true;
@ -399,8 +401,8 @@ public class PBFTAlgorithm implements CommitAlgorithm {
public String getDisplayStr() {
return "pSize:" + (prepare == null ? "null" : prepare.size()) + " cSize:"
+ (commit == null ? "null" : commit.size()) + " isSendCommit:" + isSendCommit + " isSendReply:"
+ isSendReply + " buffSize:" + buff.size();
+ (commit == null ? "null" : commit.size()) + " isSendCommit:" + isSendCommit
+ " isSendReply:" + isSendReply + " buffSize:" + buff.size();
}
}
}

View File

@ -9,35 +9,35 @@ import java.io.Serializable;
public class PBFTMember implements Serializable {
private static final long serialVersionUID = -3058672609865345062L;
public boolean isMaster;
private static final long serialVersionUID = -3058672609865345062L;
public boolean isMaster;
public boolean isMaster() {
return isMaster;
}
public boolean isMaster() {
return isMaster;
}
public static PBFTMember parse(byte[] content) {
try {
ByteArrayInputStream input = new ByteArrayInputStream(content);
ObjectInputStream objectInputStream = new ObjectInputStream(input);
PBFTMember ret = (PBFTMember) objectInputStream.readObject();
return ret;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static PBFTMember parse(byte[] content) {
try {
ByteArrayInputStream input = new ByteArrayInputStream(content);
ObjectInputStream objectInputStream = new ObjectInputStream(input);
PBFTMember ret = (PBFTMember) objectInputStream.readObject();
return ret;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public byte[] toByteArray() {
ObjectOutputStream out;
try {
ByteArrayOutputStream bo = new ByteArrayOutputStream();
out = new ObjectOutputStream(bo);
out.writeObject(this);
return bo.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
public byte[] toByteArray() {
ObjectOutputStream out;
try {
ByteArrayOutputStream bo = new ByteArrayOutputStream();
out = new ObjectOutputStream(bo);
out.writeObject(this);
return bo.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}

View File

@ -7,97 +7,97 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
public class PBFTMessage {
PBFTType type;
int sendID;
long order;
byte[] content;
String stamp;// time-stamp or other string, given by client, to identify the content
String signature;
PBFTType type;
int sendID;
long order;
byte[] content;
String stamp;// time-stamp or other string, given by client, to identify the content
String signature;
public int getBytesHash() {
ByteArrayOutputStream bo = new ByteArrayOutputStream();
bo.write(type.toInt());
ByteUtil.writeLong(bo, sendID);
ByteUtil.writeLong(bo, order);
try {
bo.write(content);
} catch (IOException e) {
e.printStackTrace();
}
byte[] ret = bo.toByteArray();
int hashCode = ret.hashCode();
return hashCode;
}
public int getBytesHash() {
ByteArrayOutputStream bo = new ByteArrayOutputStream();
bo.write(type.toInt());
ByteUtil.writeLong(bo, sendID);
ByteUtil.writeLong(bo, order);
try {
bo.write(content);
} catch (IOException e) {
e.printStackTrace();
}
byte[] ret = bo.toByteArray();
int hashCode = ret.hashCode();
return hashCode;
}
public byte[] getBytes() {
ByteArrayOutputStream bo = new ByteArrayOutputStream();
try {
bo.write(type.toInt());
ByteUtil.writeInt(bo, sendID);
ByteUtil.writeLong(bo, order);
if (signature != null) {
byte[] signature = this.signature.getBytes();
ByteUtil.writeInt(bo, signature.length);
bo.write(signature);
} else
ByteUtil.writeInt(bo, 0);
public byte[] getBytes() {
ByteArrayOutputStream bo = new ByteArrayOutputStream();
try {
bo.write(type.toInt());
ByteUtil.writeInt(bo, sendID);
ByteUtil.writeLong(bo, order);
if (signature != null) {
byte[] signature = this.signature.getBytes();
ByteUtil.writeInt(bo, signature.length);
bo.write(signature);
} else
ByteUtil.writeInt(bo, 0);
bo.write(content);
} catch (IOException e) {
e.printStackTrace();
}
return bo.toByteArray();
}
bo.write(content);
} catch (IOException e) {
e.printStackTrace();
}
return bo.toByteArray();
}
public static PBFTMessage parse(byte[] b) {
PBFTMessage msg = new PBFTMessage();
ByteArrayInputStream bi = new ByteArrayInputStream(b);
msg.type = PBFTType.fromByte(bi.read());
msg.sendID = ByteUtil.readInt(bi);
msg.order = ByteUtil.readLong(bi);
int sigLen = ByteUtil.readInt(bi);
if (sigLen > 0)
msg.signature = new String(ByteUtil.readBytes(bi, sigLen));
else
msg.signature = null;
msg.content = ByteUtil.readBytes(bi, b.length - 1 - 4 - 8 - 4 - sigLen);
return msg;
}
public static PBFTMessage parse(byte[] b) {
PBFTMessage msg = new PBFTMessage();
ByteArrayInputStream bi = new ByteArrayInputStream(b);
msg.type = PBFTType.fromByte(bi.read());
msg.sendID = ByteUtil.readInt(bi);
msg.order = ByteUtil.readLong(bi);
int sigLen = ByteUtil.readInt(bi);
if (sigLen > 0)
msg.signature = new String(ByteUtil.readBytes(bi, sigLen));
else
msg.signature = null;
msg.content = ByteUtil.readBytes(bi, b.length - 1 - 4 - 8 - 4 - sigLen);
return msg;
}
private static String IDA = "PBFTMsg";
private static String IDA = "PBFTMsg";
// public void sign(SM2KeyPair pair) {
// sendID = Arrays.hashCode(pair.getPublicKey().getEncoded(false));
// signature = sm02.sign(content, IDA, pair).toString();
// }
//
// public boolean verify(PBFTMember member) {
// return sm02.verify(content, Signature.loadFromString(signature), IDA,
// SM2KeyPair.publicKeyStr2ECPoint(member.pubKey));
// }
// public void sign(SM2KeyPair pair) {
// sendID = Arrays.hashCode(pair.getPublicKey().getEncoded(false));
// signature = sm02.sign(content, IDA, pair).toString();
// }
//
// public boolean verify(PBFTMember member) {
// return sm02.verify(content, Signature.loadFromString(signature), IDA,
// SM2KeyPair.publicKeyStr2ECPoint(member.pubKey));
// }
public void setType(PBFTType type) {
this.type = type;
}
public void setType(PBFTType type) {
this.type = type;
}
public void setContent(byte[] content) {
this.content = content;
}
public void setContent(byte[] content) {
this.content = content;
}
public String getDisplayStr() {
StringBuilder sb = new StringBuilder();
sb.append(sendID);
sb.append("_");
sb.append(order);
sb.append("_TYPE_").append(type);
sb.append(" --> sig:");
sb.append(signature);
sb.append(" content:");
sb.append(new String(content));
return sb.toString();
}
public String getDisplayStr() {
StringBuilder sb = new StringBuilder();
sb.append(sendID);
sb.append("_");
sb.append(order);
sb.append("_TYPE_").append(type);
sb.append(" --> sig:");
sb.append(signature);
sb.append(" content:");
sb.append(new String(content));
return sb.toString();
}
public void setOrder(long order) {
this.order = order;
}
public void setOrder(long order) {
this.order = order;
}
}

View File

@ -1,35 +1,36 @@
package org.bdware.sc.consistency.pbft;
public enum PBFTType {
Request(0), PrePrepare(1), Prepare(2), Commit(3), Reply(4), Unknown(5), ReSend(6), AddMember(7);
private int type;
Request(0), PrePrepare(1), Prepare(2), Commit(3), Reply(4), Unknown(5), ReSend(6), AddMember(7);
PBFTType(int i) {
type = i;
}
private int type;
public static PBFTType fromByte(int i) {
switch (i) {
case 0:
return Request;
case 1:
return PrePrepare;
case 2:
return Prepare;
case 3:
return Commit;
case 4:
return Reply;
case 6:
return ReSend;
case 7:
return AddMember;
default:
return Unknown;
}
}
PBFTType(int i) {
type = i;
}
public int toInt() {
return type;
}
public static PBFTType fromByte(int i) {
switch (i) {
case 0:
return Request;
case 1:
return PrePrepare;
case 2:
return Prepare;
case 3:
return Commit;
case 4:
return Reply;
case 6:
return ReSend;
case 7:
return AddMember;
default:
return Unknown;
}
}
public int toInt() {
return type;
}
}

View File

@ -4,8 +4,6 @@ import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.Node;
import org.bdware.sc.consistency.CommitAlgorithm;
import org.bdware.sc.consistency.Committer;
import org.bdware.sc.consistency.pbft.PBFTMessage;
import org.bdware.sc.consistency.pbft.PBFTType;
import org.bdware.sc.units.TrustfulExecutorConnection;
public class ViewAlgorithm implements CommitAlgorithm {
@ -13,8 +11,7 @@ public class ViewAlgorithm implements CommitAlgorithm {
private Committer committer;
private TrustfulExecutorConnection connection;
public ViewAlgorithm(boolean isMaster) {
}
public ViewAlgorithm(boolean isMaster) {}
public void setCommitter(Committer c) {
committer = c;

View File

@ -34,7 +34,7 @@ public class EventBroker {
private final Map<String, IEventConsumer> id2Consumers;
private final Map<String, ThreadFlag> threadFlags;
private final Map<String, Long> tempTopics;
// private final Map<String, Stack<REvent>> client2Events;
// private final Map<String, Stack<REvent>> client2Events;
public EventBroker() {
center = new EventCenter();
@ -49,43 +49,32 @@ public class EventBroker {
tempTopics = recorder.recoverTempTopicsFromDb();
// regularly check temporary topics and clean them
ContractManager.scheduledThreadPool.scheduleWithFixedDelay(
() -> {
long current = System.currentTimeMillis();
int oldSize = tempTopics.size();
tempTopics.keySet().forEach(topic -> {
if (tempTopics.get(topic) + EXPIRED_TIME > current) {
String reqID =
ContractManager.instance.nodeCenterConn.getNodeId() +
"_" + System.currentTimeMillis();
REvent cleanEvent =
new REvent(
topic,
UNSUBSCRIBE,
null,
reqID);
cleanEvent.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair());
handle(cleanEvent);
tempTopics.remove(topic);
}
});
if (oldSize != tempTopics.size()) {
recorder.saveTempTopics(tempTopics);
}
},
0L,
EXPIRED_TIME,
TimeUnit.MILLISECONDS);
ContractManager.scheduledThreadPool.scheduleWithFixedDelay(() -> {
long current = System.currentTimeMillis();
int oldSize = tempTopics.size();
tempTopics.keySet().forEach(topic -> {
if (tempTopics.get(topic) + EXPIRED_TIME > current) {
String reqID = ContractManager.instance.nodeCenterConn.getNodeId() + "_"
+ System.currentTimeMillis();
REvent cleanEvent = new REvent(topic, UNSUBSCRIBE, null, reqID);
cleanEvent
.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair());
handle(cleanEvent);
tempTopics.remove(topic);
}
});
if (oldSize != tempTopics.size()) {
recorder.saveTempTopics(tempTopics);
}
}, 0L, EXPIRED_TIME, TimeUnit.MILLISECONDS);
// regularly create check point in database
ContractManager.scheduledThreadPool.scheduleAtFixedRate(
() -> recorder.createCheckPoint(topic2cIds, id2Consumers),
EXPIRED_TIME,
EXPIRED_TIME,
TimeUnit.MILLISECONDS);
() -> recorder.createCheckPoint(topic2cIds, id2Consumers), EXPIRED_TIME,
EXPIRED_TIME, TimeUnit.MILLISECONDS);
NodeConsumer.setCenter(center);
// client2Events = new HashMap<>();
// client2Events = new HashMap<>();
LOGGER.info("Event Broker starts!");
}
@ -106,7 +95,8 @@ public class EventBroker {
IEventConsumer consumer = doSubscribe(event);
// save & try to sub in center
recorder.appendEvent(event);
center.subInCenter(event.getTopic(), event.getSemantics(), event.getCenter(), consumer, event);
center.subInCenter(event.getTopic(), event.getSemantics(), event.getCenter(),
consumer, event);
}
break;
case UNSUBSCRIBE:
@ -116,9 +106,10 @@ public class EventBroker {
case PUBLISH:
case PREPUB:
case PRESUB:
LOGGER.info(String.format("Receive %s event from topic %s", event.getSemantics(), topic));
LOGGER.debug(String.format("Receive %s event %s: %s",
event.getSemantics(), topic, event.getContent()));
LOGGER.info(String.format("Receive %s event from topic %s", event.getSemantics(),
topic));
LOGGER.debug(String.format("Receive %s event %s: %s", event.getSemantics(), topic,
event.getContent()));
if (event.isForward()) {
// send event to the event center
event.setForward(center.deliverEvent(topic, event));
@ -153,17 +144,14 @@ public class EventBroker {
/**
* do subscribing in registry
*
* @param topic event topic
* @param consumer the consumer
* @param topic2cIds topic registry of broker or a check point in event recorder
* @param topic event topic
* @param consumer the consumer
* @param topic2cIds topic registry of broker or a check point in event recorder
* @param id2Consumers consumer registry of broker or a check point in event recorder
* @return if the subscribing succeeds
*/
public boolean subInReg(
String topic,
IEventConsumer consumer,
Map<String, Set<String>> topic2cIds,
Map<String, IEventConsumer> id2Consumers) {
public boolean subInReg(String topic, IEventConsumer consumer,
Map<String, Set<String>> topic2cIds, Map<String, IEventConsumer> id2Consumers) {
if (null == consumer) {
return false;
}
@ -177,8 +165,8 @@ public class EventBroker {
topic2cIds.get(topic).add(cId);
switch (consumer.getType()) {
case Contract:
LOGGER.info("contract " + ((ContractConsumer) consumer).getContract() +
" subscribes topic " + topic);
LOGGER.info("contract " + ((ContractConsumer) consumer).getContract()
+ " subscribes topic " + topic);
break;
case Node:
LOGGER.info("node " + consumer.getId() + " subscribes topic " + topic);
@ -199,24 +187,22 @@ public class EventBroker {
* do subscribing in registry<br/>
* topic and consumer must not be null at the same time
* <ul>
* <li>if consumer is null and topic is not, it means the topic is a temporary topic, remove it</li>
* <li>if topic is null and consumer is not,
* it means a consumer or a contract wants to unsubscribe all topics,
* remove all related consumers in topic registry and consumer registry</li>
* <li>if two of them is not null, do unsubscribing in two registries</li>
* <li>if consumer is null and topic is not, it means the topic is a temporary topic, remove
* it</li>
* <li>if topic is null and consumer is not, it means a consumer or a contract wants to
* unsubscribe all topics, remove all related consumers in topic registry and consumer
* registry</li>
* <li>if two of them is not null, do unsubscribing in two registries</li>
* </ul>
*
* @param topic event topic
* @param consumer the consumer, just id is required
* @param topic2cIds topic registry of broker or a check point in event recorder
* @param topic event topic
* @param consumer the consumer, just id is required
* @param topic2cIds topic registry of broker or a check point in event recorder
* @param id2Consumers consumer registry of broker or a check point in event recorder
* @return if the subscribing succeeds
*/
public boolean unsubInReg(
String topic,
IEventConsumer consumer,
Map<String, Set<String>> topic2cIds,
Map<String, IEventConsumer> id2Consumers) {
public boolean unsubInReg(String topic, IEventConsumer consumer,
Map<String, Set<String>> topic2cIds, Map<String, IEventConsumer> id2Consumers) {
if (null == topic && null == consumer) {
return false;
}
@ -236,7 +222,8 @@ public class EventBroker {
// if cId belongs to a contract, find all related consumers
toRmIds = new ArrayList<>();
id2Consumers.forEach((k, c) -> {
if (c instanceof ContractConsumer && ((ContractConsumer) c).getContract().equals(cId)) {
if (c instanceof ContractConsumer
&& ((ContractConsumer) c).getContract().equals(cId)) {
toRmIds.add(k);
}
});
@ -259,7 +246,8 @@ public class EventBroker {
/**
* parse consumer information from content str<br/>
* if caller wants to select all consumers of a contract, the content str is also parsed into a node consumer
* if caller wants to select all consumers of a contract, the content str is also parsed into a
* node consumer
*
* @param content json string, {"subscriber": "[subscriber]", "handler?": "[handler]"}
* @return a node consumer or contract consumer, or null if exception is thrown
@ -305,19 +293,16 @@ public class EventBroker {
case AT_LEAST_ONCE:
case NEED_RETRY:
// send events to all
topicConsumers.forEach(cId ->
deliverEvent(event, cEvent, nEventStr, cId, topic, true));
topicConsumers
.forEach(cId -> deliverEvent(event, cEvent, nEventStr, cId, topic, true));
break;
case AT_MOST_ONCE:
// send event to a random consumer
// AT_MOST_ONCE, so broker don't need to do anything when delivering fails
deliverEvent(
event,
cEvent,
nEventStr,
topicConsumers.toArray()[(int) (Math.random() * topicConsumers.size())].toString(),
topic,
true);
deliverEvent(event, cEvent, nEventStr,
topicConsumers.toArray()[(int) (Math.random() * topicConsumers.size())]
.toString(),
topic, true);
break;
case ONLY_ONCE:
switch (event.getType()) {
@ -343,20 +328,14 @@ public class EventBroker {
// send PREPUB events to all consumers
// TODO if there are no consumers to receive the ONLY_ONCE events?
ContractManager.threadPool.execute(() -> {
REvent prePubMsg = new REvent(event.getTopic(),
PREPUB,
contentHash,
REvent prePubMsg = new REvent(event.getTopic(), PREPUB, contentHash,
event.getRequestID());
prePubMsg.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair());
topicConsumers.forEach(cId ->
deliverEvent(prePubMsg,
new Event(event.getTopic(),
contentHash,
prePubMsg.getSemantics()),
JsonUtil.toJson(prePubMsg),
cId,
topic,
false));
prePubMsg.doSignature(
ContractManager.instance.nodeCenterConn.getNodeKeyPair());
topicConsumers.forEach(cId -> deliverEvent(prePubMsg,
new Event(event.getTopic(), contentHash,
prePubMsg.getSemantics()),
JsonUtil.toJson(prePubMsg), cId, topic, false));
// wait for responses from contracts (PRESUB events)
while (true) {
try {
@ -364,21 +343,21 @@ public class EventBroker {
flag.wait(30 * 1000L);
}
if (!flag.get().isEmpty()) {
REvent finalMsg = new REvent(flag.get(),
PUBLISH,
event.getContent(),
HashUtil.sha3(
contentHash + System.currentTimeMillis()));
REvent finalMsg = new REvent(flag.get(), PUBLISH,
event.getContent(), HashUtil.sha3(contentHash
+ System.currentTimeMillis()));
// if the delivering fails, retry publishing
finalMsg.setSemantics(NEED_RETRY);
finalMsg.doSignature(
ContractManager.instance.nodeCenterConn.getNodeKeyPair());
ContractManager.instance.nodeCenterConn
.getNodeKeyPair());
handle(finalMsg);
break;
}
} catch (InterruptedException e) {
LOGGER.warn("ONLY_ONE event delivering is interrupted: " + e.getMessage());
// e.printStackTrace();
LOGGER.warn("ONLY_ONE event delivering is interrupted: "
+ e.getMessage());
// e.printStackTrace();
}
}
});
@ -395,20 +374,16 @@ public class EventBroker {
/**
* publish the event to the consumer
*
* @param event event message
* @param cEvent simple event message to the contract consumer, only topic, content and semantics
* @param event event message
* @param cEvent simple event message to the contract consumer, only topic, content and
* semantics
* @param nEventStr event message to the node
* @param cId consumer id
* @param topic topic of the event
* @param isPub if the event is published or pre-published
* @param cId consumer id
* @param topic topic of the event
* @param isPub if the event is published or pre-published
*/
private void deliverEvent(
REvent event,
Event cEvent,
String nEventStr,
String cId,
String topic,
boolean isPub) {
private void deliverEvent(REvent event, Event cEvent, String nEventStr, String cId,
String topic, boolean isPub) {
if (id2Consumers.containsKey(cId)) {
IEventConsumer consumer = id2Consumers.get(cId);
if (consumer instanceof ContractConsumer) {
@ -419,31 +394,24 @@ public class EventBroker {
public void onResult(String unused) {
// if the delivering fails, unsubscribe the consumer
ContractConsumer c = (ContractConsumer) consumer;
ContractClient client = ContractManager.instance.getClient(c.getContract());
String reqID =
ContractManager.instance.nodeCenterConn.getNodeKeyPair().getPublicKeyStr() +
"_" + System.currentTimeMillis();
REvent unsubEvent =
new REvent(
topic,
UNSUBSCRIBE,
"{\"subscriber\":\"" + cId + "\"}",
reqID);
ContractClient client =
ContractManager.instance.getClient(c.getContract());
String reqID = ContractManager.instance.nodeCenterConn.getNodeKeyPair()
.getPublicKeyStr() + "_" + System.currentTimeMillis();
REvent unsubEvent = new REvent(topic, UNSUBSCRIBE,
"{\"subscriber\":\"" + cId + "\"}", reqID);
unsubEvent.doSignature(client.getPubkey(), client.getContractKey());
handle(unsubEvent);
// if the event is an ONLY_ONCE event, retry publishing
if (NEED_RETRY.equals(event.getSemantics())) {
REvent newMsg =
new REvent(
topic.split("\\|")[0],
PUBLISH,
event.getContent(),
event.getRequestID());
REvent newMsg = new REvent(topic.split("\\|")[0], PUBLISH,
event.getContent(), event.getRequestID());
newMsg.setSemantics(ONLY_ONCE);
newMsg.setHash(event.getHash());
newMsg.setTxHash(event.getTxHash());
newMsg.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair());
newMsg.doSignature(
ContractManager.instance.nodeCenterConn.getNodeKeyPair());
handle(newMsg);
}
}
@ -465,8 +433,8 @@ public class EventBroker {
});
} else if (isPub) {
// client consumer
ContractManager.threadPool.execute(() ->
consumer.publishEvent(nEventStr, new ResultCallback() {
ContractManager.threadPool
.execute(() -> consumer.publishEvent(nEventStr, new ResultCallback() {
@Override
public void onResult(String unused) {
unsubInReg(null, consumer, topic2cIds, id2Consumers);

View File

@ -12,13 +12,13 @@ import static org.bdware.sc.ContractManager.instance;
* @author Kaidong Wu
*/
public class EventCenter {
// private static final Logger LOGGER = LogManager.getLogger(EventCenter.class);
// private static final Logger LOGGER = LogManager.getLogger(EventCenter.class);
/**
* get the nearest node to the topic in the hash function range
*
* @param topic the topic
* @param k the number of centers
* @param k the number of centers
* @return id of the node
*/
public String[] getCenterByTopic(String topic, int k) {
@ -35,21 +35,19 @@ public class EventCenter {
/**
* subscribe a topic in center
*
* @param topic event topic
* @param topic event topic
* @param semantics event semantics, used to mark PRESUB events
* @param center id of event center if the subscribing has been handled
* @param consumer consumer
* @param event original event
* @param center id of event center if the subscribing has been handled
* @param consumer consumer
* @param event original event
*/
public void subInCenter(String topic, REvent.REventSemantics semantics, String center, IEventConsumer consumer, REvent event) {
public void subInCenter(String topic, REvent.REventSemantics semantics, String center,
IEventConsumer consumer, REvent event) {
if (null == instance.nodeCenterConn) {
return;
}
REvent msg = new REvent(topic,
REvent.REventType.SUBSCRIBE,
String.format("{\"subscriber\":\"%s\"}",
instance.nodeCenterConn.getNodeId()),
"");
REvent msg = new REvent(topic, REvent.REventType.SUBSCRIBE,
String.format("{\"subscriber\":\"%s\"}", instance.nodeCenterConn.getNodeId()), "");
msg.setSemantics(semantics);
msg.doSignature(instance.nodeCenterConn.getNodeKeyPair());
msg.setCenter(center);
@ -96,7 +94,7 @@ public class EventCenter {
* publish an event to another node; used by NodeConsumer
*
* @param target id of the target node
* @param eStr event string
* @param eStr event string
*/
public void publishEvent(String target, String eStr) {
if (null != instance.masterStub) {

View File

@ -53,7 +53,7 @@ public class EventRecorder {
}
public void createCheckPoint(Map<String, Set<String>> topic2cIds,
Map<String, IEventConsumer> id2Consumers) {
Map<String, IEventConsumer> id2Consumers) {
CheckPoint cp = new CheckPoint(topic2cIds, id2Consumers);
synchronized (latestEvent) {
if (!latestEvent.isCp()) {
@ -79,9 +79,10 @@ public class EventRecorder {
String key = KeyValueRocksDBUtil.instance.getValue(dbName, LATEST_EVENT_KEY);
latestEvent.set(key);
CheckPoint cp = new CheckPoint();
Type topic2cIdsType = TypeToken.getParameterized(ConcurrentHashMap.class, String.class,
TypeToken.getParameterized(Set.class, String.class,
String.class).getType()).getType();
Type topic2cIdsType = TypeToken
.getParameterized(ConcurrentHashMap.class, String.class,
TypeToken.getParameterized(Set.class, String.class, String.class).getType())
.getType();
// retrieving transactions from database
while (null != key && !key.isEmpty()) {
String json = KeyValueRocksDBUtil.instance.getValue(dbName, key);
@ -93,14 +94,16 @@ public class EventRecorder {
if (json.startsWith("cp")) {
// create check point by the transaction and stop retrieving
JsonObject data = JsonUtil.parseStringAsJsonObject(json.substring(2));
cp.topic2cIds = JsonUtil.fromJson(data.get("topic2cIds").toString(), topic2cIdsType);
cp.topic2cIds =
JsonUtil.fromJson(data.get("topic2cIds").toString(), topic2cIdsType);
JsonObject id2Consumers = data.getAsJsonObject("id2Consumers");
for (String k : id2Consumers.keySet()) {
JsonObject consumer = id2Consumers.getAsJsonObject(k);
if (!consumer.has("type")) {
continue;
}
ConsumerType type = ConsumerType.valueOf(consumer.get("type").getAsString());
ConsumerType type =
ConsumerType.valueOf(consumer.get("type").getAsString());
switch (type) {
case Contract:
cp.id2Consumers.put(k,
@ -145,7 +148,8 @@ public class EventRecorder {
// if empty, return the check point
latestEvent.setCp(true);
} else {
// on the base of old check point, process following sub or unsub events to recover registry
// on the base of old check point, process following sub or unsub events to recover
// registry
while (!stack.empty()) {
Object record = stack.pop();
if (record instanceof CheckPoint) {
@ -155,15 +159,19 @@ public class EventRecorder {
IEventConsumer consumer = broker.parseConsumer(tran.content);
switch (tran.type) {
case SUBSCRIBE:
if (!broker.subInReg(tran.topic, consumer, cp.topic2cIds, cp.id2Consumers)) {
if (!broker.subInReg(tran.topic, consumer, cp.topic2cIds,
cp.id2Consumers)) {
LOGGER.warn("record damaged! " + key);
LOGGER.debug("record damaged! " + key + ": " + JsonUtil.toJson(tran));
LOGGER.debug(
"record damaged! " + key + ": " + JsonUtil.toJson(tran));
}
break;
case UNSUBSCRIBE:
if (!broker.unsubInReg(tran.topic, consumer, cp.topic2cIds, cp.id2Consumers)) {
if (!broker.unsubInReg(tran.topic, consumer, cp.topic2cIds,
cp.id2Consumers)) {
LOGGER.warn("record damaged! " + key);
LOGGER.debug("record damaged! " + key + ": " + JsonUtil.toJson(tran));
LOGGER.debug(
"record damaged! " + key + ": " + JsonUtil.toJson(tran));
}
default:
break;
@ -225,7 +233,7 @@ public class EventRecorder {
}
public CheckPoint(Map<String, Set<String>> topic2cIds,
Map<String, IEventConsumer> id2Consumers) {
Map<String, IEventConsumer> id2Consumers) {
this.topic2cIds = topic2cIds;
this.id2Consumers = id2Consumers;
}

View File

@ -81,7 +81,7 @@ public class ContractConsumer implements IEventConsumer {
return;
}
AtomicInteger callCount = new AtomicInteger(0);
LOGGER.info("ContractConsumer!"+new Gson().toJson(cr));
LOGGER.info("ContractConsumer!" + new Gson().toJson(cr));
// TODO sending requests at a high frequency maybe cause that some requests are ignored
ScheduledFuture<?> future = ContractManager.scheduledThreadPool.scheduleAtFixedRate(
() -> ContractManager.instance.executeContractInternal(cr, new ResultCallback() {
@ -91,19 +91,20 @@ public class ContractConsumer implements IEventConsumer {
try {
ContractResult result = JsonUtil.fromJson(str, ContractResult.class);
if (!result.status.equals(ContractResult.Status.Success)) {
if (callCount.get() == TIMEOUT_COUNT ||
(result.status == ContractResult.Status.Exception &&
result.result.toString().contains("not exported"))) {
LOGGER.error(String.format(
"receiving event error! %s.%s: %s", contract, handler, str));
if (callCount.get() == TIMEOUT_COUNT
|| (result.status == ContractResult.Status.Exception
&& result.result.toString()
.contains("not exported"))) {
LOGGER.error(String.format("receiving event error! %s.%s: %s",
contract, handler, str));
rc.onResult((String) null);
} else {
ret = false;
}
}
} catch (Exception e) {
LOGGER.error(String.format(
"receiving event error! %s.%s: %s", contract, handler, e.getMessage()));
LOGGER.error(String.format("receiving event error! %s.%s: %s", contract,
handler, e.getMessage()));
rc.onResult((String) null);
}
callCount.incrementAndGet();
@ -121,10 +122,7 @@ public class ContractConsumer implements IEventConsumer {
}
}
}, (reqID, hashStr) -> {
}),
500L,
2500L,
TimeUnit.MILLISECONDS);
}), 500L, 2500L, TimeUnit.MILLISECONDS);
scheduledFutures.put(getId(), future);
synchronized (flag) {
flag.notify();

View File

@ -1,7 +1,6 @@
package org.bdware.sc.event.clients;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.event.Event;
public interface IEventConsumer {
String getId();
@ -13,8 +12,6 @@ public interface IEventConsumer {
void competeSub(Object msg, ResultCallback rc, String... options);
enum ConsumerType {
Contract,
Node,
WSClient
Contract, Node, WSClient
}
}

View File

@ -47,6 +47,5 @@ public class WSClientConsumer implements IEventConsumer {
}
@Override
public void competeSub(Object msg, ResultCallback rc, String... options) {
}
public void competeSub(Object msg, ResultCallback rc, String... options) {}
}

View File

@ -34,8 +34,7 @@ public class ManagerHandler extends MsgHandler {
cb.onResult(cm.getTimesOfExecution(msg.arg));
}
@Description(
value = "execute Contract, {\"contractID\":\"112233\",\"arg\":\"\"}",
@Description(value = "execute Contract, {\"contractID\":\"112233\",\"arg\":\"\"}",
isAsync = true)
public void executeContract(GetMessage msg, ResultCallback cb) {
ContractRequest c;

View File

@ -4,81 +4,51 @@ package org.bdware.sc.units;
import java.io.*;
/*
* 节点启动后记录自己的units信息和合约的一些信息
* 节点重新上线后NC发要求让其恢复则从本地读取自己的这些信息进行恢复
* 节点启动后记录自己的units信息和合约的一些信息 节点重新上线后NC发要求让其恢复则从本地读取自己的这些信息进行恢复
*/
public class ContractRecord implements Serializable {
/* private static final long serialVersionUID = 704775241674568688L;
public transient RecoverFlag recoverFlag = RecoverFlag.Fine;
public String contractID;
public String contractName;
public String key;
public String contractpubKey;
public ContractType type;
public int copies = 1;
public int lastExeSeq = -1;
public boolean isPrivate = false;
public String pubKeyPath;
public String ypkName;
//public Map<String,String> members; //k-udpid,v-udpaddress
public String memory = "";
//JavaScriptEntry
public String invokeID;
public ContractRecord(String id){
this.contractID = id;
}
public ContractRecord(String id,String contractName,String key,String contractpubKey,ContractType type,int copies){
this.contractID = id;
this.contractName = contractName;
this.key = key;
this.contractpubKey = contractpubKey;
this.type = type;
this.copies = copies;
}
public void printContent(){
System.out.println("==========ContractRecord========");
System.out.println("contractID=" + contractID == null ? "null" : contractID);
System.out.println("contractName=" + contractName == null ? "null" : contractName);
System.out.println("key=" + key == null ? "null" : key);
System.out.println("contractPubKey=" + contractpubKey == null ? "null" : contractpubKey);
System.out.println("type=" + type == null ? "null" : type);
System.out.println("lastExeSeq=" + lastExeSeq == null ? "null" : lastExeSeq);
System.out.println("invokeID=" + invokeID == null ? "null" : invokeID);
System.out.println("copies=" + copies);
System.out.println("isPrivate=" + isPrivate);
System.out.println("pubKeyPath=" + pubKeyPath == null ? "null" : pubKeyPath);
System.out.println("ypkName=" + ypkName == null ? "null" : ypkName);
if(members != null){
for(String k : members.keySet()){
System.out.println("members " + k + "-" + members.get(k));
}
}
System.out.println("memory=" + memory == null ? "null" : memory);
System.out.println("==========ContractRecord print finish=======");
}
public static void getContentFromFile(String path){
File file = new File(path);
try{
FileInputStream os = new FileInputStream(file);
ObjectInputStream oos = new ObjectInputStream(os);
ContractRecord record = (ContractRecord) oos.readObject();
record.printContent();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}*/
/*
* private static final long serialVersionUID = 704775241674568688L;
*
* public transient RecoverFlag recoverFlag = RecoverFlag.Fine;
*
* public String contractID; public String contractName; public String key; public String
* contractpubKey; public ContractType type; public int copies = 1; public int lastExeSeq = -1;
* public boolean isPrivate = false; public String pubKeyPath; public String ypkName;
*
* //public Map<String,String> members; //k-udpid,v-udpaddress
*
* public String memory = "";
*
* //JavaScriptEntry public String invokeID;
*
* public ContractRecord(String id){ this.contractID = id; }
*
* public ContractRecord(String id,String contractName,String key,String
* contractpubKey,ContractType type,int copies){ this.contractID = id; this.contractName =
* contractName; this.key = key; this.contractpubKey = contractpubKey; this.type = type;
* this.copies = copies; }
*
* public void printContent(){ System.out.println("==========ContractRecord========");
*
* System.out.println("contractID=" + contractID == null ? "null" : contractID);
* System.out.println("contractName=" + contractName == null ? "null" : contractName);
* System.out.println("key=" + key == null ? "null" : key); System.out.println("contractPubKey="
* + contractpubKey == null ? "null" : contractpubKey); System.out.println("type=" + type ==
* null ? "null" : type); System.out.println("lastExeSeq=" + lastExeSeq == null ? "null" :
* lastExeSeq); System.out.println("invokeID=" + invokeID == null ? "null" : invokeID);
* System.out.println("copies=" + copies); System.out.println("isPrivate=" + isPrivate);
* System.out.println("pubKeyPath=" + pubKeyPath == null ? "null" : pubKeyPath);
* System.out.println("ypkName=" + ypkName == null ? "null" : ypkName); if(members != null){
* for(String k : members.keySet()){ System.out.println("members " + k + "-" + members.get(k));
* } } System.out.println("memory=" + memory == null ? "null" : memory);
* System.out.println("==========ContractRecord print finish======="); }
*
* public static void getContentFromFile(String path){ File file = new File(path); try{
* FileInputStream os = new FileInputStream(file); ObjectInputStream oos = new
* ObjectInputStream(os); ContractRecord record = (ContractRecord) oos.readObject();
* record.printContent(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch
* (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) {
* e.printStackTrace(); } }
*/
}

View File

@ -12,13 +12,11 @@ import java.util.*;
public class ContractUnitController {
private static final Logger LOGGER = LogManager.getLogger(ContractUnitController.class);
private final TrustfulExecutorConnection connection;
private final Map<String, ContractUnit> units; //这个节点上<合约id合约集群>其中每个ContractUnit中存储了对应合约id的其他节点udp信息
private final Map<String, ContractUnit> units; // 这个节点上<合约id合约集群>其中每个ContractUnit中存储了对应合约id的其他节点udp信息
ContractManager manager;
SequencingAlgorithmFactory algorithmFactory;
public ContractUnitController(
TrustfulExecutorConnection connection,
ContractManager manager,
public ContractUnitController(TrustfulExecutorConnection connection, ContractManager manager,
SequencingAlgorithmFactory factory) {
this.connection = connection;
this.manager = manager;
@ -39,18 +37,13 @@ public class ContractUnitController {
unit.node2member = new HashMap<>();
units.put(req.contract.getID(), unit);
System.out.println(
"[ContractUnitController] startContract:"
+ result
+ " isMaster:"
+ req.isMaster
+ " cid:"
+ req.contract.getID());
System.out.println("[ContractUnitController] startContract:" + result + " isMaster:"
+ req.isMaster + " cid:" + req.contract.getID());
break;
case AddMember:
LOGGER.debug("contractID:" + cumsg.getContractID());
unit = units.get(cumsg.getContractID());
unit.addMember(node, cumsg); //启动通过在UDPTrustExecutor中遍历memebers使得这个合约集群中每个节点都有其他节点的UDP信息便于之后护发UDP消息
unit.addMember(node, cumsg); // 启动通过在UDPTrustExecutor中遍历memebers使得这个合约集群中每个节点都有其他节点的UDP信息便于之后护发UDP消息
break;
case Sequencing:
unit = units.get(cumsg.getContractID());
@ -59,7 +52,7 @@ public class ContractUnitController {
break;
case Unknown:
default:
//recover start
// recover start
ContractUnitStartRequest req2 = ContractUnitStartRequest.parse(cumsg.content);
unit = new ContractUnit();
unit.connection = connection;
@ -68,12 +61,8 @@ public class ContractUnitController {
unit.node2member = new HashMap<>();
units.put(req2.contract.getID(), unit);
System.out.println(
"[ContractUnitController] startContract:"
+ " isMaster:"
+ req2.isMaster
+ " cid:"
+ req2.contract.getID());
System.out.println("[ContractUnitController] startContract:" + " isMaster:"
+ req2.isMaster + " cid:" + req2.contract.getID());
break;
}
}
@ -84,7 +73,7 @@ public class ContractUnitController {
public static class ContractUnit implements TrustfulExecutorConnection, Serializable {
public String contractID;
public Map<Node, ContractUnitMember> node2member; //存储和自己在一个合约集群的其他节点信息
public Map<Node, ContractUnitMember> node2member; // 存储和自己在一个合约集群的其他节点信息
private transient CommitAlgorithm commitAlgorithm;
private transient TrustfulExecutorConnection connection;
@ -148,16 +137,9 @@ public class ContractUnitController {
}
public String getDisplayStr() {
return "pSize:"
+ (prepare == null ? "null" : prepare.size())
+ " cSize:"
+ (commit == null ? "null" : commit.size())
+ " isSendCommit:"
+ isSendCommit
+ " isSendReply:"
+ isSendReply
+ " buffSize:"
+ buff.size();
return "pSize:" + (prepare == null ? "null" : prepare.size()) + " cSize:"
+ (commit == null ? "null" : commit.size()) + " isSendCommit:" + isSendCommit
+ " isSendReply:" + isSendReply + " buffSize:" + buff.size();
}
}
}

View File

@ -4,15 +4,15 @@ import org.bdware.sc.conn.Node;
public class ContractUnitMember extends Node {
private static final long serialVersionUID = 8175664649689078937L;
public Node node;
public String pubKey;
private static final long serialVersionUID = 8175664649689078937L;
public Node node;
public String pubKey;
public ContractUnitMember(Node node) {
this.node = node;
}
public ContractUnitMember(Node node) {
this.node = node;
}
public Node getNode() {
return node;
}
public Node getNode() {
return node;
}
}

View File

@ -12,98 +12,99 @@ import org.zz.gmhelper.SM2Util;
public class ContractUnitMessage implements Serializable {
private static final long serialVersionUID = 584934384202845750L;
ContractUnitType type;
private String contractID;
String requestID;
String signature;
byte[] content;
private static final long serialVersionUID = 584934384202845750L;
ContractUnitType type;
private String contractID;
String requestID;
String signature;
byte[] content;
public ContractUnitMessage(String requestID, String contractID, byte[] sender) {
this.requestID = requestID;
this.setContractID(contractID);
}
public ContractUnitMessage(String requestID, String contractID, byte[] sender) {
this.requestID = requestID;
this.setContractID(contractID);
}
public ContractUnitMessage() {
// TODO Auto-generated constructor stub
}
public ContractUnitMessage() {
// TODO Auto-generated constructor stub
}
public static ContractUnitMessage parse(byte[] b) {
try {
ByteArrayInputStream bi = new ByteArrayInputStream(b);
ObjectInputStream input = new ObjectInputStream(bi);
ContractUnitMessage ret = (ContractUnitMessage) input.readObject();
return ret;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static ContractUnitMessage parse(byte[] b) {
try {
ByteArrayInputStream bi = new ByteArrayInputStream(b);
ObjectInputStream input = new ObjectInputStream(bi);
ContractUnitMessage ret = (ContractUnitMessage) input.readObject();
return ret;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public byte[] toByteArray() {
try {
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream output = new ObjectOutputStream(bo);
output.writeObject(this);
return bo.toByteArray();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public byte[] toByteArray() {
try {
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream output = new ObjectOutputStream(bo);
output.writeObject(this);
return bo.toByteArray();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
private static String IDA = "PBFTMsg";
private static String IDA = "PBFTMsg";
// public void sign(NodeInfo info) {
// sender = info.getNodeID();
// signature = sm02.sign(content, IDA, info.privKey).toString();
// }
// public void sign(NodeInfo info) {
// sender = info.getNodeID();
// signature = sm02.sign(content, IDA, info.privKey).toString();
// }
public boolean verify(ContractUnitMember member) {
ECPublicKeyParameters param = BCECUtil.createECPublicKeyFromStrParameters(member.pubKey,SM2Util.CURVE,SM2Util.DOMAIN_PARAMS);
public boolean verify(ContractUnitMember member) {
ECPublicKeyParameters param = BCECUtil.createECPublicKeyFromStrParameters(member.pubKey,
SM2Util.CURVE, SM2Util.DOMAIN_PARAMS);
return SM2Util.verify(param,content, signature.getBytes());
}
return SM2Util.verify(param, content, signature.getBytes());
}
public void setType(ContractUnitType type) {
this.type = type;
}
public void setType(ContractUnitType type) {
this.type = type;
}
public ContractUnitType getType() {
return type;
}
public ContractUnitType getType() {
return type;
}
public byte[] getContent() {
return content;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
public void setContent(byte[] content) {
this.content = content;
}
public String getContractID() {
return contractID;
}
public String getContractID() {
return contractID;
}
public String getRequestID() {
return requestID;
}
public String getRequestID() {
return requestID;
}
public String getDisplayStr() {
StringBuilder sb = new StringBuilder();
sb.append(requestID);
sb.append("_TYPE_").append(type);
sb.append("_");
sb.append(getContractID());
sb.append("_");
sb.append(" content:");
sb.append(new String(content));
return sb.toString();
}
public String getDisplayStr() {
StringBuilder sb = new StringBuilder();
sb.append(requestID);
sb.append("_TYPE_").append(type);
sb.append("_");
sb.append(getContractID());
sb.append("_");
sb.append(" content:");
sb.append(new String(content));
return sb.toString();
}
public void setContractID(String contractID) {
this.contractID = contractID;
}
public void setContractID(String contractID) {
this.contractID = contractID;
}
}

View File

@ -10,37 +10,37 @@ import org.bdware.sc.bean.Contract;
import org.bdware.sc.conn.Node;
public class ContractUnitStartRequest implements Serializable {
/**
*
*/
private static final long serialVersionUID = -1540780483790689627L;
public Contract contract;
public boolean isMaster;
public Node[] members;
/**
*
*/
private static final long serialVersionUID = -1540780483790689627L;
public Contract contract;
public boolean isMaster;
public Node[] members;
public static ContractUnitStartRequest parse(byte[] content) {
try {
ByteArrayInputStream input = new ByteArrayInputStream(content);
ObjectInputStream objInput;
objInput = new ObjectInputStream(input);
return (ContractUnitStartRequest) objInput.readObject();
public static ContractUnitStartRequest parse(byte[] content) {
try {
ByteArrayInputStream input = new ByteArrayInputStream(content);
ObjectInputStream objInput;
objInput = new ObjectInputStream(input);
return (ContractUnitStartRequest) objInput.readObject();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public byte[] toByteArray() {
try {
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream objOutput = new ObjectOutputStream(bo);
objOutput.writeObject(this);
return bo.toByteArray();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public byte[] toByteArray() {
try {
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream objOutput = new ObjectOutputStream(bo);
objOutput.writeObject(this);
return bo.toByteArray();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}

View File

@ -4,27 +4,28 @@ import java.io.Serializable;
public enum ContractUnitType implements Serializable {
Start(0), AddMember(1), Sequencing(2), Unknown(3);
private int type;
Start(0), AddMember(1), Sequencing(2), Unknown(3);
private ContractUnitType(int i) {
type = i;
}
private int type;
public static ContractUnitType fromByte(int i) {
switch (i) {
case 0:
return Start;
case 1:
return AddMember;
case 2:
return Sequencing;
default:
return Unknown;
}
}
private ContractUnitType(int i) {
type = i;
}
public int toInt() {
return type;
}
public static ContractUnitType fromByte(int i) {
switch (i) {
case 0:
return Start;
case 1:
return AddMember;
case 2:
return Sequencing;
default:
return Unknown;
}
}
public int toInt() {
return type;
}
}

View File

@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class MultiContractMeta implements IDSerializable {
private static final Logger LOGGER = LogManager.getLogger(MultiContractMeta.class);
private final AtomicInteger lastExeSeq; // last executed request seq
public volatile int curExeSeq = -1; // 当前正在执行请求序号 for multipoint contract requests
public volatile int curExeSeq = -1; // 当前正在执行请求序号 for multipoint contract requests
public String memory;
public String key; // privateKey
public String publicKey;
@ -101,10 +101,9 @@ public class MultiContractMeta implements IDSerializable {
public void setLastExeSeq(int lastExeSeq) {
this.lastExeSeq.set(lastExeSeq);
if (KeyValueDBUtil.instance.containsKey(
CMTables.LastExeSeq.toString(), contractID)) { // 如果现在是Stable模式就同步刷到磁盘
KeyValueDBUtil.instance.setValue(
CMTables.LastExeSeq.toString(), contractID, String.valueOf(lastExeSeq));
if (KeyValueDBUtil.instance.containsKey(CMTables.LastExeSeq.toString(), contractID)) { // 如果现在是Stable模式就同步刷到磁盘
KeyValueDBUtil.instance.setValue(CMTables.LastExeSeq.toString(), contractID,
String.valueOf(lastExeSeq));
}
}
@ -154,16 +153,19 @@ public class MultiContractMeta implements IDSerializable {
public String[] getMembers() {
return members;
}
public String joinMembers(String delimiter){
public String joinMembers(String delimiter) {
StringBuilder sb = new StringBuilder();
if (members.length > 0)
sb.append(members[0]);
else return "";
else
return "";
for (int i = 1; i < members.length; i++) {
sb.append(delimiter).append(members[i]);
}
return sb.toString();
}
public void setMembers(JsonArray members) {
String[] copied = new String[members.size()];
for (int i = 0; i < members.size(); i++) {

View File

@ -3,5 +3,5 @@ package org.bdware.sc.units;
import org.bdware.sc.conn.Node;
public class PubKeyNode extends Node {
public String pubkey;
public String pubkey;
}

View File

@ -1,5 +1,5 @@
package org.bdware.sc.units;
public enum RecoverFlag {
Fine,ToRecover,Recovering;
Fine, ToRecover, Recovering;
}

View File

@ -24,7 +24,8 @@ public class RespCache {
} else {
waiter.wait(5000L);
timeout &= waiter.get() * 2 > count;
if (!timeout) waiter.notifyAll();
if (!timeout)
waiter.notifyAll();
return timeout;
}
}

View File

@ -4,5 +4,5 @@ import org.bdware.sc.consistency.CommitAlgorithm;
import org.bdware.sc.units.ContractUnitController.ContractUnit;
public interface SequencingAlgorithmFactory {
public CommitAlgorithm create(ContractUnitStartRequest req, ContractUnit unit);
public CommitAlgorithm create(ContractUnitStartRequest req, ContractUnit unit);
}

View File

@ -25,9 +25,8 @@ public class DHTUtil {
String hash = HashUtil.sha3ToFixedLen(key, nodes[0].length() - 2);
int l = 0, r = nodes.length - 1, m,
h2l = hash.compareTo(nodes[l].substring(2)), r2h = nodes[r].substring(2).compareTo(hash),
h2m;
int l = 0, r = nodes.length - 1, m, h2l = hash.compareTo(nodes[l].substring(2)),
r2h = nodes[r].substring(2).compareTo(hash), h2m;
BigInteger bigH = null;
String selected;
do {
@ -42,8 +41,7 @@ public class DHTUtil {
break;
}
if (l + 1 == r) {
BigInteger bigL = getBigInteger(nodes[l]),
bigR = getBigInteger(nodes[r]);
BigInteger bigL = getBigInteger(nodes[l]), bigR = getBigInteger(nodes[r]);
bigH = new BigInteger(hash, 16);
if (bigR.subtract(bigH).compareTo(bigH.subtract(bigL)) > -1) {
selected = nodes[l];
@ -65,7 +63,7 @@ public class DHTUtil {
}
} while (true);
if (k == 1) {
return new String[]{selected};
return new String[] {selected};
}
List<String> ret = new ArrayList<>();
ret.add(selected);

View File

@ -10,15 +10,11 @@ import java.util.Map;
public interface ContractExecutor {
void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb);
default void onRecover(Map<String, Object> args) {
}
default void onRecover(Map<String, Object> args) {}
default void onDeliverBlock(String data) {
}
default void onDeliverBlock(String data) {}
default void close() {
}
default void close() {}
default void onSyncMessage(Node node, byte[] data) {
}
default void onSyncMessage(Node node, byte[] data) {}
}

View File

@ -3,6 +3,5 @@ package org.bdware.server.trustedmodel;
import java.io.Serializable;
public enum ContractUnitStatus implements Serializable {
CommonMode,
StableMode;
CommonMode, StableMode;
}

View File

@ -4,7 +4,7 @@ public class MultiReqSeq {
public final int seq;
public final long startTime;
public MultiReqSeq(int s){
public MultiReqSeq(int s) {
seq = s;
startTime = System.currentTimeMillis();
}

View File

@ -14,7 +14,8 @@ public class SingleNodeExecutor implements ContractExecutor {
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) {
public void execute(String requestID, ContractRequest req, ResultCallback rcb,
OnHashCallback hcb) {
cm.executeLocallyAsync(req, rcb, hcb);
}
}

View File

@ -1,11 +1,7 @@
package org.bdware.sc.test;
import org.bdware.sc.ContractManager;
import org.bdware.sc.bean.Contract;
import org.junit.Test;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
public class ContractManagerTest {
public static void main(String[] args) {

View File

@ -13,8 +13,8 @@ public class ReflectionTest {
@org.junit.Test
public void go() throws Exception {
Method m = Test.class.getDeclaredMethod("go",String[].class);
String[] abc = new String[]{"ab", "cd"};
m.invoke(null, new Object[]{abc});
Method m = Test.class.getDeclaredMethod("go", String[].class);
String[] abc = new String[] {"ab", "cd"};
m.invoke(null, new Object[] {abc});
}
}

View File

@ -12,17 +12,15 @@ public class RespCacheTest {
new Thread(() -> {
try {
Thread.sleep(j * 1000);
if (j > 2) Thread.sleep(6 * 1000);
if (j > 2)
Thread.sleep(6 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean waitResult = cache.waitForHalf();
System.out.println(
"tid:"
+ Thread.currentThread().getId()
+ " reach target, waitResult:"
+ waitResult);
System.out.println("tid:" + Thread.currentThread().getId()
+ " reach target, waitResult:" + waitResult);
}).start();
}
Thread.sleep(20000);
@ -37,17 +35,15 @@ public class RespCacheTest {
new Thread(() -> {
try {
Thread.sleep(j * 1000);
if (j > 2) Thread.sleep(1900);
if (j > 2)
Thread.sleep(1900);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean waitResult = cache.waitForHalf();
System.out.println(
"tid:"
+ Thread.currentThread().getId()
+ " reach target, waitResult:"
+ waitResult);
System.out.println("tid:" + Thread.currentThread().getId()
+ " reach target, waitResult:" + waitResult);
}).start();
}
Thread.sleep(20000);
@ -66,11 +62,8 @@ public class RespCacheTest {
e.printStackTrace();
}
boolean waitResult = cache.waitForHalf();
System.out.println(
"tid:"
+ Thread.currentThread().getId()
+ " reach target, waitResult:"
+ waitResult);
System.out.println("tid:" + Thread.currentThread().getId()
+ " reach target, waitResult:" + waitResult);
}).start();
}
Thread.sleep(10000);