Compare commits

..

No commits in common. "master" and "v1.6.6" have entirely different histories.

49 changed files with 1822 additions and 1705 deletions

View File

@ -2,10 +2,7 @@ plugins {
id 'java' id 'java'
id 'java-library' id 'java-library'
} }
apply from: '../spotless.gradle'
repositories {
mavenCentral()
}
sourceSets { sourceSets {
main { main {
java { java {
@ -27,10 +24,14 @@ sourceSets {
sourceCompatibility = 1.8 sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies { dependencies {
api project(":common") api project(":common")
api 'io.prometheus:simpleclient:0.12.0' api 'io.prometheus:simpleclient:0.12.0'
api 'org.hyperic.sigar:sigar:1.6.4' api 'org.knowhowlab.osgi:sigar:1.6.5_01'
api 'com.github.ben-manes.caffeine:caffeine:2.8.8' api fileTree(dir: 'libs', include: '*.jar')
testImplementation 'junit:junit:4.13.2' testImplementation 'junit:junit:4.13.2'
} }

Binary file not shown.

View File

@ -1,22 +1,32 @@
package org.bdware.sc; package org.bdware.sc;
import com.google.gson.JsonElement;
import org.bdware.sc.bean.ContractRequest; import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.OnHashCallback; import org.bdware.sc.conn.OnHashCallback;
public interface ChainOpener { public interface ChainOpener {
void reRegister(String doid); void reRegister(String doid);
String register(String arg); String register(String arg);
void writeContractResultToLocalAndLedger(String result, ContractClient client, void writeContractResultToLocalAndLedger(
ContractRequest contractRequest, OnHashCallback cb, long start, long l); String result,
ContractClient client,
ContractRequest contractRequest,
OnHashCallback cb, long start, long l);
void writeToChain(OnHashCallback cb, String from, String to, String data, String requestID, void writeToChain(
OnHashCallback cb,
String from,
String to,
String data,
String requestID,
String namedLedger); String namedLedger);
void writeToChainWithContract(OnHashCallback cb, String from, String to, String data, void writeToChainWithContract(
String requestID, String contractID, String namedLedger); OnHashCallback cb,
String from,
JsonElement getLedgerParams(); String to,
String data,
String requestID,
String contractID,
String namedLedger);
} }

View File

@ -18,13 +18,11 @@ import org.bdware.sc.encrypt.HardwareInfo.OSType;
import org.bdware.sc.event.REvent.REventSemantics; import org.bdware.sc.event.REvent.REventSemantics;
import org.bdware.sc.node.AnnotationNode; import org.bdware.sc.node.AnnotationNode;
import org.bdware.sc.node.YjsType; import org.bdware.sc.node.YjsType;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.util.JsonUtil; import org.bdware.sc.util.JsonUtil;
import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.InputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.*; import java.util.*;
@ -32,6 +30,7 @@ public class ContractClient {
private static final Logger LOGGER = LogManager.getLogger(ContractClient.class); private static final Logger LOGGER = LogManager.getLogger(ContractClient.class);
public static String cmi = ""; public static String cmi = "";
public ContractMeta contractMeta; public ContractMeta contractMeta;
public boolean isDebug;
transient SocketGet get; transient SocketGet get;
int port; int port;
String pid; String pid;
@ -68,14 +67,10 @@ public class ContractClient {
isRunning = false; isRunning = false;
} }
public ContractClient(int startPort) {
this("127.0.0.1", startPort);
}
// Once the reconnect is called, all cp in the same node will reconnect to // Once the reconnect is called, all cp in the same node will reconnect to
// current ContractManager // current ContractManager
// We only consider the. // We only consider the.
public ContractClient(String host, int startPort) { public ContractClient(int startPort) {
// LOGGER.info("ContractClient 构造器 : "); // LOGGER.info("ContractClient 构造器 : ");
contractMeta = new ContractMeta(); contractMeta = new ContractMeta();
outputTracer = new ContractPrinter(); outputTracer = new ContractPrinter();
@ -87,7 +82,7 @@ public class ContractClient {
port = startPort; port = startPort;
// LOGGER.info("ContractClient----构造器----- 端口 " + startPort); // LOGGER.info("ContractClient----构造器----- 端口 " + startPort);
get = new SocketGet(host, startPort); get = new SocketGet("127.0.0.1", startPort);
// LOGGER.info("ContractClient----构造器----- position---2"); // LOGGER.info("ContractClient----构造器----- position---2");
String cpCMI = get.syncGet("", "isContractProcess", ""); // CMI String cpCMI = get.syncGet("", "isContractProcess", ""); // CMI
@ -133,21 +128,26 @@ public class ContractClient {
} }
private void initProps() { private void initProps() {
LOGGER.info("initProps ---- position-----1"); // LOGGER.info("initProps ---- position-----1");
contractMeta.name = get.syncGet("", "getContractName", ""); contractMeta.name = get.syncGet("", "getContractName", "");
LOGGER.info("initProps ---- position-----2"); // LOGGER.info("initProps ---- position-----2");
String strEvent = get.syncGet("", "getDeclaredEvents", ""); String strEvent = get.syncGet("", "getDeclaredEvents", "");
LOGGER.debug("event: " + strEvent); LOGGER.debug("event: " + strEvent);
contractMeta.declaredEvents = JsonUtil.fromJson(strEvent, contractMeta.declaredEvents =
new TypeToken<Map<String, REventSemantics>>() {}.getType()); JsonUtil.fromJson(
LOGGER.info("initProps ---- position-----3"); strEvent, new TypeToken<Map<String, REventSemantics>>() {
contractMeta.dependentContracts = }.getType());
JsonUtil.fromJson(get.syncGet("", "getDependentContracts", ""), // LOGGER.info("initProps ---- position-----3");
new TypeToken<Set<String>>() {}.getType()); contractMeta.dependentContracts = JsonUtil.fromJson(
LOGGER.info("initProps ---- position-----4"); get.syncGet("", "getDependentContracts", ""),
new TypeToken<Set<String>>() {
}.getType());
// LOGGER.info("initProps ---- position-----4");
contractMeta.exportedFunctions = contractMeta.exportedFunctions =
JsonUtil.fromJson(get.syncGet("", "getExportedFunctions", ""), JsonUtil.fromJson(
new TypeToken<List<FunctionDesp>>() {}.getType()); get.syncGet("", "getExportedFunctions", ""),
new TypeToken<List<FunctionDesp>>() {
}.getType());
contractMeta.logDetail = new HashMap<>(); contractMeta.logDetail = new HashMap<>();
for (FunctionDesp func : contractMeta.exportedFunctions) { for (FunctionDesp func : contractMeta.exportedFunctions) {
StringBuilder str = new StringBuilder(); StringBuilder str = new StringBuilder();
@ -159,7 +159,8 @@ public class ContractClient {
} }
if (anno.getType().equals("LogLocation")) { if (anno.getType().equals("LogLocation")) {
for (String logLoc : anno.getArgs()) { for (String logLoc : anno.getArgs()) {
if (logLoc.equals("\"dataware\"") || logLoc.equals("\"bdledger\"") if (logLoc.equals("\"dataware\"")
|| logLoc.equals("\"bdledger\"")
|| logLoc.equals("\"bdledger:\"")) { || logLoc.equals("\"bdledger:\"")) {
str.append("bdcontract;"); str.append("bdcontract;");
} else if (logLoc.startsWith("\"bdledger:") && logLoc.length() > 11) { } else if (logLoc.startsWith("\"bdledger:") && logLoc.length() > 11) {
@ -172,47 +173,54 @@ public class ContractClient {
} }
contractMeta.logDetail.put(func.functionName, str.toString()); contractMeta.logDetail.put(func.functionName, str.toString());
} }
LOGGER.info("initProps ---- position-----5"); // LOGGER.info("initProps ---- position-----5");
try { try {
String anno = get.syncGet("", "getAnnotations", ""); String anno = get.syncGet("", "getAnnotations", "");
contractMeta.annotations = contractMeta.annotations =
JsonUtil.fromJson(anno, new TypeToken<List<AnnotationNode>>() {}.getType()); JsonUtil.fromJson(anno, new TypeToken<List<AnnotationNode>>() {
}.getType());
} catch (Exception e) { } catch (Exception e) {
// supoort contract process before version 0.70 // supoort contract process before version 0.70
contractMeta.annotations = new ArrayList<>(); contractMeta.annotations = new ArrayList<>();
} }
LOGGER.info("initProps ---- position-----6"); // LOGGER.info("initProps ---- position-----6");
contractMeta.sigRequired = Boolean.parseBoolean(get.syncGet("", "isSigRequired", "")); contractMeta.sigRequired = Boolean.parseBoolean(get.syncGet("", "isSigRequired", ""));
LOGGER.info("initProps ---- position-----7"); // LOGGER.info("initProps ---- position-----7");
contractMeta.thisPermission = get.syncGet("", "showPermission", ""); contractMeta.thisPermission = get.syncGet("", "showPermission", "");
LOGGER.info("initProps ---- position-----8"); // LOGGER.info("initProps ---- position-----8");
isRunning = true; isRunning = true;
LOGGER.info("initProps ---- position-----9"); contractMeta.isDebug = Boolean.parseBoolean(get.syncGet("", "getDebug", ""));
// LOGGER.info("initProps ---- position-----9");
get.syncGet("", "registerMangerPort", ContractManager.cPort.getCMPort() + ""); get.syncGet("", "registerMangerPort", ContractManager.cPort.getCMPort() + "");
contractMeta.contract = contractMeta.contract =
JsonUtil.fromJson(get.syncGet("", "getContract", ""), Contract.class); JsonUtil.fromJson(get.syncGet("", "getContract", ""), Contract.class);
contractMeta.id = contractMeta.contract.getID(); contractMeta.id = contractMeta.contract.getID();
get.setOfflineExceptionHandler(new SocketGet.OfflineHandler() { get.setOfflineExceptionHandler(
new SocketGet.OfflineHandler() {
@Override @Override
public void onException(SocketGet socketGet, Exception e) { public void onException(SocketGet socketGet, Exception e) {
if (e.getMessage().contains("Connection refused")) { if (e.getMessage().contains("Connection refused")) {
contractMeta.status = ContractStatusEnum.HANGED; contractMeta.status = ContractStatusEnum.HANGED;
ContractManager.instance.statusRecorder.resumeContractProcess(contractMeta.id); ContractManager.instance.statusRecorder.resumeContractProcess(
contractMeta.id);
} }
} }
}); });
loadTimesAndTraffic(); loadTimesAndTraffic();
LOGGER.info("initProps ---- position-----10 DONE!"); // LOGGER.info("initProps ---- position-----10");
// LOGGER.debug("======= registerPort:" + ret + "-->" + ContractManager.startPort);
} }
public String startProcess(PrintStream ps) throws Exception { public String startProcess(PrintStream ps) {
isRunning = false; isRunning = false;
port = -1; port = -1;
String darg = "-Djava.library.path="; String darg = "-Djava.library.path=";
String classpath; String classpath;
File jniPath; File jniPath;
if (ContractManager.yjsPath == null) { if (ContractManager.yjsPath == null) {
jniPath = new File("./jni/"); jniPath = new File("./jni/");
classpath = System.getProperty("java.class.path"); classpath = System.getProperty("java.class.path");
@ -220,64 +228,60 @@ public class ContractClient {
classpath = ContractManager.yjsPath; classpath = ContractManager.yjsPath;
jniPath = new File(classpath).getParentFile(); jniPath = new File(classpath).getParentFile();
} }
String osJni = ((HardwareInfo.type == OSType.linux) ? "/jni/linux" : "/jni/mac"); String osJni = ((HardwareInfo.type == OSType.linux) ? "/jni/linux" : "/jni/mac");
darg += jniPath.getAbsolutePath() + osJni; darg += jniPath.getAbsolutePath() + osJni;
if (!new File(classpath).exists()) { if (!new File(classpath).exists()) {
ContractResult r = new ContractResult(Status.Exception, ContractResult r =
new JsonPrimitive("incorrect path: yjs.jar")); new ContractResult(
Status.Exception, new JsonPrimitive("incorrect path: yjs.jar"));
return JsonUtil.toJson(r); return JsonUtil.toJson(r);
} }
if (!new File(jniPath, "libs").exists()) { if (!new File(jniPath, "libs").exists()) {
ContractResult r = new ContractResult(Status.Exception, ContractResult r =
new ContractResult(
Status.Exception,
new JsonPrimitive("incorrect path: yjs.jar, missing libs")); new JsonPrimitive("incorrect path: yjs.jar, missing libs"));
return JsonUtil.toJson(r); return JsonUtil.toJson(r);
} }
// ProcessBuilder builder =
// new ProcessBuilder(
// "java",
// "-Dfile.encoding=UTF-8",
// darg,
// "-cp",
// jniPath.getAbsolutePath() + "/libs/*:" + classpath,
// "org.bdware.sc.ContractProcess",
// "-port=" + cPort.getPort());
int startPort = ContractManager.cPort.getPortAndInc(); int startPort = ContractManager.cPort.getPortAndInc();
List<String> pbParameters = new ArrayList<>(); ProcessBuilder builder =
pbParameters.add("java"); new ProcessBuilder(
pbParameters.add("-Dfile.encoding=UTF-8"); "java",
pbParameters.add(darg); "-Dfile.encoding=UTF-8",
if (contractMeta.contract.getRemoteDebugPort() != 0) { darg,
pbParameters.add(String.format( "-jar",
"-agentlib:jdwp=transport=dt_socket,address=%d,server=y,suspend=n", classpath,
contractMeta.contract.getRemoteDebugPort())); "-port=" + startPort,
} "-cmi=" + cmi, // cmi 区分不同CM的cp
File classParent = new File(classpath).getParentFile(); (isDebug ? "-debug" : ""));
if (contractMeta.contract.isDebug()) { File directory = new File("");
pbParameters.add("-Dlog4j.configurationFile="
+ new File(classParent, "log4j2.debug.properties").getAbsolutePath());
} else {
pbParameters.add("-Dlog4j.configurationFile="
+ new File(classParent, "log4j2.properties").getAbsolutePath());
}
pbParameters.add("-jar");
pbParameters.add(classpath);
pbParameters.add("-port=" + startPort);
pbParameters.add("-cmi=" + cmi);
if (contractMeta.contract.isDebug())
pbParameters.add("-debug");
ProcessBuilder builder;
String[] result = new String[pbParameters.size()];
pbParameters.toArray(result);
Constructor<ProcessBuilder> pbc =
ProcessBuilder.class.getDeclaredConstructor(String[].class);
builder = pbc.newInstance(new Object[] {result});
File directory = new File("./");
LOGGER.debug("[CMD] path: " + directory.getAbsolutePath()); LOGGER.debug("[CMD] path: " + directory.getAbsolutePath());
LOGGER.debug(JsonUtil.toPrettyJson(builder.command())); LOGGER.debug(JsonUtil.toPrettyJson(builder.command()));
Map<String, String> map = builder.environment(); Map<String, String> map = builder.environment();
map.put("java.library.path", jniPath.getAbsolutePath() + osJni); map.put("java.library.path", jniPath.getAbsolutePath() + osJni);
builder.directory(directory); builder.directory(new File("./"));
LOGGER.debug("start process:"); LOGGER.debug("start process:");
try {
process = builder.start(); process = builder.start();
this.pid = getPid(process); this.pid = getPid(process);
LOGGER.info("[CP PPID ] " + pid); LOGGER.info("[CP PPID ] " + pid);
PrintStream printStream = new PrintStream(process.getOutputStream()); PrintStream printStream = new PrintStream(process.getOutputStream());
printStream.println("CP PID:" + pid); printStream.println("CP PID:" + pid);
printStream.close(); printStream.close();
InputStream processInputStream = process.getInputStream(); Scanner sc = new Scanner(process.getInputStream());
Scanner sc = new Scanner(processInputStream);
String status = null; String status = null;
while (sc.hasNext()) { while (sc.hasNext()) {
status = sc.nextLine(); status = sc.nextLine();
@ -286,8 +290,10 @@ public class ContractClient {
try { try {
// Set contractPort to max(mainPort, contractPort) // Set contractPort to max(mainPort, contractPort)
int portIndex = status.indexOf("mainPort"); int portIndex = status.indexOf("mainPort");
int port = Integer.parseInt( int port =
status.substring(portIndex + 9, portIndex + 14).replaceAll("\\s+", "")); Integer.parseInt(
status.substring(portIndex + 9, portIndex + 14)
.replaceAll("\\s+", ""));
if (port != startPort) { if (port != startPort) {
ContractManager.cPort.reSetPort(port + 1); ContractManager.cPort.reSetPort(port + 1);
} }
@ -305,52 +311,39 @@ public class ContractClient {
ContractManager.cPort.updateDb(port, true); ContractManager.cPort.updateDb(port, true);
get = new SocketGet("127.0.0.1", port); get = new SocketGet("127.0.0.1", port);
get.syncGet("", "setDBInfo", ContractManager.dbPath); get.syncGet("", "setDBInfo", ContractManager.dbPath);
if (contractMeta.contract.isDebug() || get != null) { String tagA = (ps == System.out ? "[Contract_" + port + "_out] " : "");
String tagA = (ps instanceof LoggerPrintStream ? "[Contract_" + port + "_out] " : ""); String tagB = (ps == System.out ? "[Contract_" + port + "_err] " : "");
String tagB = (ps instanceof LoggerPrintStream ? "[Contract_" + port + "_err] " : "");
outputTracer.track(process, sc, tagA, ps); outputTracer.track(process, sc, tagA, ps);
errorTracer.track(process, new Scanner(process.getErrorStream()), tagB, ps); errorTracer.track(process, new Scanner(process.getErrorStream()), tagB, ps);
} else {
// 关闭流否则缓冲区打满会阻塞进程
processInputStream.close();
sc.close();
}
get.syncGet("", "registerMangerPort", String.valueOf(ContractManager.cPort.getCMPort())); get.syncGet("", "registerMangerPort", String.valueOf(ContractManager.cPort.getCMPort()));
MultiContractMeta multiContractMeta = ContractManager.instance.multiContractRecorder
.getMultiContractMeta(contractMeta.getID());
if (multiContractMeta != null && multiContractMeta.getMembers() != null) {
String setMemberResult =
get.syncGet("", "setMembers", JsonUtil.toJson(multiContractMeta.getMembers()));
LOGGER.info("setMember:" + setMemberResult);
} else {
LOGGER.info("setMember ignore, meta:" + (multiContractMeta == null)
+ (multiContractMeta == null ? "NULL"
: " members:" + multiContractMeta.getMembers()));
}
if (isBundlePath(contractMeta.contract.getScriptStr())) { if (isBundlePath(contractMeta.contract.getScriptStr())) {
status = get.syncGet("", "setContractBundle", JsonUtil.toJson(contractMeta.contract)); status =
get.syncGet(
"", "setContractBundle", JsonUtil.toJson(contractMeta.contract));
} else { } else {
status = get.syncGet("", "setContract", JsonUtil.toJson(contractMeta.contract)); status = get.syncGet("", "setContract", JsonUtil.toJson(contractMeta.contract));
} }
LOGGER.info("start status, port:" + port + " status:" + status); LOGGER.debug("port:" + port + " status:" + status);
ContractResult r = JsonUtil.fromJson(status, ContractResult.class); ContractResult r = JsonUtil.fromJson(status, ContractResult.class);
if (r.status == Status.Success) { if (r.status == Status.Success) {
LOGGER.info("init prop start!");
initProps(); initProps();
LOGGER.info("init prop done!");
get.syncGet("", "setPID", pid); get.syncGet("", "setPID", pid);
} else { } else if (r.status == null) {
LOGGER.info("kill contract");
if (r.status == null) {
r.status = Status.Error; r.status = Status.Error;
r.result = new JsonPrimitive(status); r.result = new JsonPrimitive(status);
} status = JsonUtil.toJson(r);
killProcess(); contractMeta.name = get.syncGet("", "getContractName", "");
throw new IllegalStateException(r.result.getAsString());
} }
return status; return status;
} catch (Exception e) {
e.printStackTrace();
ByteArrayOutputStream bo = new ByteArrayOutputStream();
e.printStackTrace(new PrintStream(bo));
ContractResult r =
new ContractResult(Status.Exception, new JsonPrimitive(bo.toString()));
return JsonUtil.toJson(r);
}
} }
public void updateMemory() { public void updateMemory() {
@ -359,7 +352,11 @@ public class ContractClient {
return; return;
} }
lastUpdate = System.currentTimeMillis(); lastUpdate = System.currentTimeMillis();
get.asyncGet("", ".UsedMemory", "", new ResultCallback() { get.asyncGet(
"",
".UsedMemory",
"",
new ResultCallback() {
@Override @Override
public void onResult(String str) { public void onResult(String str) {
memory = Long.parseLong(str); memory = Long.parseLong(str);
@ -378,8 +375,17 @@ public class ContractClient {
return contractMeta.getExportedFunctions(); return contractMeta.getExportedFunctions();
} }
public String executeMethod(String pkgName, String method, String arg) { public String getIdentifier() {
return get.syncGet(pkgName, method, arg); return get.syncGet("", "getIdentifier", " ");
}
public void setIdentifier(String alias) {
get.asyncGet("", "setIdentifier", alias, null);
}
public String signResult(String result) {
return contractMeta.contract.signResult(result);
} }
public String getPubkey() { public String getPubkey() {
@ -394,6 +400,10 @@ public class ContractClient {
return contractMeta.name; return contractMeta.name;
} }
public String getContractDOI() {
return contractMeta.contract.getDOI();
}
public String getContractKey() { public String getContractKey() {
return contractMeta.contract.getKey(); return contractMeta.contract.getKey();
} }
@ -418,6 +428,10 @@ public class ContractClient {
return contractMeta.logDetail.get(action); return contractMeta.logDetail.get(action);
} }
public boolean isDebug() {
return contractMeta.isDebug;
}
public long getTimes() { public long getTimes() {
return times; return times;
} }
@ -443,10 +457,12 @@ public class ContractClient {
} }
public void loadTimesAndTraffic() { public void loadTimesAndTraffic() {
String tempTime2 = KeyValueDBUtil.instance.getValue(CMTables.ContractInfo.toString(), String tempTime2 =
contractMeta.name + "-Times"); KeyValueDBUtil.instance.getValue(
String tempTraffic2 = KeyValueDBUtil.instance.getValue(CMTables.ContractInfo.toString(), CMTables.ContractInfo.toString(), contractMeta.name + "-Times");
contractMeta.name + "-Traffic"); String tempTraffic2 =
KeyValueDBUtil.instance.getValue(
CMTables.ContractInfo.toString(), contractMeta.name + "-Traffic");
if (tempTime2 != null && !tempTime2.equals("")) { if (tempTime2 != null && !tempTime2.equals("")) {
times = Long.parseLong(tempTime2); times = Long.parseLong(tempTime2);
} }
@ -456,10 +472,10 @@ public class ContractClient {
} }
public void saveTimesAndTraffic() { public void saveTimesAndTraffic() {
KeyValueDBUtil.instance.setValue(CMTables.ContractInfo.toString(), KeyValueDBUtil.instance.setValue(
contractMeta.name + "-Times", times + ""); CMTables.ContractInfo.toString(), contractMeta.name + "-Times", times + "");
KeyValueDBUtil.instance.setValue(CMTables.ContractInfo.toString(), KeyValueDBUtil.instance.setValue(
contractMeta.name + "-Traffic", traffic + ""); CMTables.ContractInfo.toString(), contractMeta.name + "-Traffic", traffic + "");
} }
public void setMask(JsonObject args) { public void setMask(JsonObject args) {

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,5 @@
package org.bdware.sc; package org.bdware.sc;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.bean.Contract; import org.bdware.sc.bean.Contract;
import org.bdware.sc.bean.FunctionDesp; import org.bdware.sc.bean.FunctionDesp;
import org.bdware.sc.bean.IDSerializable; import org.bdware.sc.bean.IDSerializable;
@ -22,6 +20,7 @@ public class ContractMeta implements IDSerializable {
ContractStatusEnum status; ContractStatusEnum status;
String name; String name;
String id; String id;
boolean isDebug;
Map<String, REvent.REventSemantics> declaredEvents; Map<String, REvent.REventSemantics> declaredEvents;
List<FunctionDesp> exportedFunctions; List<FunctionDesp> exportedFunctions;
Map<String, String> logDetail; Map<String, String> logDetail;
@ -31,7 +30,14 @@ public class ContractMeta implements IDSerializable {
boolean sigRequired; boolean sigRequired;
String thisPermission; // 合约当前权限 String thisPermission; // 合约当前权限
/* /*
* { "name": "dx_substr", "parameter": { "columnIndex":5, "paras":["1","3"] } }, {
"name": "dx_substr",
"parameter":
{
"columnIndex":5,
"paras":["1","3"]
}
},
*/ */
// Map<Object,Object>MaskInfo; // Map<Object,Object>MaskInfo;
@ -90,7 +96,7 @@ public class ContractMeta implements IDSerializable {
// public setMask(){} // public setMask(){}
public boolean getIsDebug() { public boolean getIsDebug() {
return contract.isDebug(); return isDebug;
} }
public FunctionDesp getExportedFunction(String action) { public FunctionDesp getExportedFunction(String action) {
@ -109,8 +115,7 @@ public class ContractMeta implements IDSerializable {
private FunctionDesp seekFunction(String action) { private FunctionDesp seekFunction(String action) {
for (FunctionDesp desp : exportedFunctions) { for (FunctionDesp desp : exportedFunctions) {
if (desp != null && desp.functionName.equals(action)) if (desp != null && desp.functionName.equals(action)) return desp;
return desp;
} }
return null; return null;
} }
@ -119,10 +124,9 @@ public class ContractMeta implements IDSerializable {
contract = c; contract = c;
id = c.getID(); id = c.getID();
status = ContractStatusEnum.HANGED; status = ContractStatusEnum.HANGED;
isDebug = false;
} }
static Logger LOGGER = LogManager.getLogger(ContractMeta.class);
public void setContractExecutor(ContractExecutor executor) { public void setContractExecutor(ContractExecutor executor) {
this.contractExecutor = executor; this.contractExecutor = executor;
} }

View File

@ -1,8 +1,10 @@
package org.bdware.sc; package org.bdware.sc;
public enum ContractStatusEnum { public enum ContractStatusEnum {
INIT(0, "初始化"), RUNNING(1, "内存运行中"), HANGED(2, "硬盘运行中"), KILLED(3, "已停止"); INIT(0, "初始化"),
RUNNING(1, "内存运行中"),
HANGED(2, "硬盘运行中"),
KILLED(3, "已停止");
private Integer code; private Integer code;
private String desc; private String desc;

View File

@ -33,7 +33,8 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
static { static {
final Object flag = new Object(); final Object flag = new Object();
// 调度一个task在delayms后开始调度每次调度完后最少等待periodms后才开始调 // 调度一个task在delayms后开始调度每次调度完后最少等待periodms后才开始调
ContractManager.scheduledThreadPool.scheduleWithFixedDelay(() -> { ContractManager.scheduledThreadPool.scheduleWithFixedDelay(
() -> {
boolean cleared = dealTimerContractProcess(); boolean cleared = dealTimerContractProcess();
if (cleared) { if (cleared) {
synchronized (flag) { synchronized (flag) {
@ -44,7 +45,10 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
} }
} }
} }
}, 0, 1, TimeUnit.SECONDS); },
0,
1,
TimeUnit.SECONDS);
} }
// contract id to client, changes when some contract is started/stopped/resumed // contract id to client, changes when some contract is started/stopped/resumed
@ -98,8 +102,7 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
// TODO 是不是还有别的这只是在内存里的哦 // TODO 是不是还有别的这只是在内存里的哦
try { try {
for (ContractClient c : id2ContractClient.values()) { for (ContractClient c : id2ContractClient.values()) {
if (Integer.parseInt(c.getPID()) == pid) if (Integer.parseInt(c.getPID()) == pid) return true;
return true;
} }
} catch (Exception e) { } catch (Exception e) {
// e.printStackTrace(); // e.printStackTrace();
@ -171,23 +174,17 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
// TODO index name to contract // TODO index name to contract
public ContractMeta getContractMeta(String idOrNameOrDOI) { public ContractMeta getContractMeta(String idOrNameOrDOI) {
if (idOrNameOrDOI == null) if (idOrNameOrDOI == null) return null;
return null;
ContractMeta meta = getStatus().get(idOrNameOrDOI); ContractMeta meta = getStatus().get(idOrNameOrDOI);
if (meta != null) if (meta != null) return meta;
return meta;
for (ContractMeta cc : getStatus().values()) { for (ContractMeta cc : getStatus().values()) {
if (cc.status == ContractStatusEnum.RUNNING || cc.status == ContractStatusEnum.HANGED) if (cc.status == ContractStatusEnum.RUNNING || cc.status == ContractStatusEnum.HANGED)
if (idOrNameOrDOI.equals(cc.name)) { if (idOrNameOrDOI.equals(cc.name) || idOrNameOrDOI.equals(cc.contract.getDOI())) {
return cc; return cc;
} }
} }
for (ContractMeta cc : getStatus().values()) { for (ContractMeta cc : getStatus().values()) {
if (cc.name == null) if (idOrNameOrDOI.equals(cc.name) || idOrNameOrDOI.equals(cc.contract.getDOI())) {
continue;
if (cc.contract == null)
continue;
if (idOrNameOrDOI.equals(cc.name)) {
return cc; return cc;
} }
} }
@ -225,7 +222,9 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
LOGGER.info("need dump when stop"); LOGGER.info("need dump when stop");
String contractName = client.getContractName(); String contractName = client.getContractName();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss"); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss");
File f = new File(ContractManager.dir + "/memory/" + contractName, File f =
new File(
ContractManager.dir + "/memory/" + contractName,
df.format(new Date())); df.format(new Date()));
client.get.syncGet("", "getMemoryDump", f.getAbsolutePath()); client.get.syncGet("", "getMemoryDump", f.getAbsolutePath());
ContractManager.instance.addLocalContractLog("dumpContract", meta); ContractManager.instance.addLocalContractLog("dumpContract", meta);
@ -312,12 +311,13 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
client.saveTimesAndTraffic(); client.saveTimesAndTraffic();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd.HH_mm_ss"); // 设置日期格式 SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd.HH_mm_ss"); // 设置日期格式
File f = new File(ContractManager.dir + "/memory/" + contractMeta.name, File f =
new File(
ContractManager.dir + "/memory/" + contractMeta.name,
df.format(new Date())); df.format(new Date()));
File parent = f.getParentFile(); File parent = f.getParentFile();
if (!parent.exists()) { if (!parent.exists()) {
LOGGER.trace( LOGGER.trace("make directory " + parent.getAbsolutePath() + ":" + parent.mkdirs());
"make directory " + parent.getAbsolutePath() + ":" + parent.mkdirs());
} }
ContractManager.instance.dumpContract(contract.getID(), f.getAbsolutePath()); ContractManager.instance.dumpContract(contract.getID(), f.getAbsolutePath());
ContractManager.instance.invokeContractSuicide(client); ContractManager.instance.invokeContractSuicide(client);
@ -342,19 +342,27 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
JsonUtil.fromJson(startContractResult, ContractResult.class); JsonUtil.fromJson(startContractResult, ContractResult.class);
if (contractResult.status == ContractResult.Status.Error) { if (contractResult.status == ContractResult.Status.Error) {
// 记录错误日志 // 记录错误日志
ContractManager.instance.addLocalContractLog("resumeContract error", ContractManager.instance.addLocalContractLog(
meta.contract.getID(), meta.name, meta.contract.getOwner()); "resumeContract error",
meta.contract.getID(),
meta.name,
meta.contract.getOwner());
} else { } else {
ContractManager.instance.addLocalContractLog("resumeContract success", ContractManager.instance.addLocalContractLog(
meta.contract.getID(), meta.name, meta.contract.getOwner()); "resumeContract success",
meta.contract.getID(),
meta.name,
meta.contract.getOwner());
// TODO 可能会重复load相同的镜像 // TODO 可能会重复load相同的镜像
// 如果是killed 只根据manifest去判断是否加载memory // 如果是killed 只根据manifest去判断是否加载memory
// 可增加一个判断如果hanged朋manifest里是加载memory这里就不再加载了 // 可增加一个判断如果hanged朋manifest里是加载memory这里就不再加载了
ContractClient client = id2ContractClient.get(meta.getID()); ContractClient client = id2ContractClient.get(meta.getID());
if (preStatus == ContractStatusEnum.HANGED) { if (preStatus == ContractStatusEnum.HANGED) {
String memory = ContractManager.instance.findNewestMemory(meta.name);
if (memory != null) ContractManager.instance.loadMemory(
ContractManager.instance.loadMemory(client, memory); client, ContractManager.instance.findNewestMemory(meta.name));
} }
client.contractMeta.setStatus(ContractStatusEnum.RUNNING); client.contractMeta.setStatus(ContractStatusEnum.RUNNING);
updateValue(client.contractMeta); updateValue(client.contractMeta);
@ -381,11 +389,6 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
@Override @Override
public boolean visit(int i) { public boolean visit(int i) {
return visit("127.0.0.1", i);
}
@Override
public boolean visit(String host, int i) {
if (i == ContractManager.cPort.getCMPort()) { if (i == ContractManager.cPort.getCMPort()) {
return false; return false;
} }
@ -393,7 +396,7 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
return true; return true;
} }
try { try {
ContractClient client = new ContractClient(host, i); ContractClient client = new ContractClient(i);
// LOGGER.info("[CM重连] position-----1"); // LOGGER.info("[CM重连] position-----1");
if (client.isRunning) { if (client.isRunning) {
LOGGER.debug("CP listened to port " + i + " is running"); LOGGER.debug("CP listened to port " + i + " is running");
@ -408,19 +411,26 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
ContractManager.instance.invokeContractSuicide(client); ContractManager.instance.invokeContractSuicide(client);
return false; return false;
} }
LOGGER.debug(String.format( LOGGER.debug(
String.format(
"CP listened to port %d:\n\tID=%s\n\tName=%s\n\tType=%s\n" "CP listened to port %d:\n\tID=%s\n\tName=%s\n\tType=%s\n"
+ "\tKey=%s\n\tPubKey=%s\n\tCopies=%d\n\t%s", + "\tKey=%s\n\tPubKey=%s\n\tCopies=%d\n\t%s",
i, client.getContractID(), client.contractMeta.name, i,
client.getContractType(), client.getContractKey(), client.getPubkey(), client.getContractID(),
client.getContractCopies(), client.contractMeta.contract.startInfo)); client.contractMeta.name,
client.getContractType(),
client.getContractKey(),
client.getPubkey(),
client.getContractCopies(),
client.contractMeta.contract.startInfo));
client.get.syncGet("", "setDir", ContractManager.dir); client.get.syncGet("", "setDir", ContractManager.dir);
// String str = client.getIdentifier(); // String str = client.getIdentifier();
client.get.syncGet("", "getDumpPeriod", "a"); client.get.syncGet("", "getDumpPeriod", "a");
client.get.syncGet("", "startAutoDump", "a"); client.get.syncGet("", "startAutoDump", "a");
createContract(client); createContract(client);
LOGGER.info(String.format("reconnect to port %d: contract %s", i, LOGGER.info(
client.contractMeta.name)); String.format("reconnect to port %d: contract %s",
i, client.contractMeta.name));
return true; return true;
} }
} catch (Exception e) { } catch (Exception e) {

View File

@ -1,19 +0,0 @@
package org.bdware.sc;
import org.apache.logging.log4j.Logger;
import java.io.PrintStream;
public class LoggerPrintStream extends PrintStream {
public LoggerPrintStream(Logger logger) {
super(System.out, true);
this.logger = logger;
}
Logger logger;
@Override
public void println(String str) {
logger.info(str);
}
}

View File

@ -1,5 +1,7 @@
package org.bdware.sc; package org.bdware.sc;
import java.util.HashMap;
import java.util.Map;
//Node //Node

View File

@ -7,9 +7,6 @@ import org.bdware.sc.db.KeyValueDBUtil;
import org.bdware.sc.db.StatusRecorder; import org.bdware.sc.db.StatusRecorder;
import org.bdware.sc.units.MultiContractMeta; import org.bdware.sc.units.MultiContractMeta;
import java.util.HashSet;
import java.util.Set;
public class MultiContractRecorder extends StatusRecorder<MultiContractMeta> { public class MultiContractRecorder extends StatusRecorder<MultiContractMeta> {
static final String dbName = CMTables.UnitContracts.toString(); static final String dbName = CMTables.UnitContracts.toString();
static final String prefix = "Multi_C_Meta_"; static final String prefix = "Multi_C_Meta_";
@ -20,31 +17,26 @@ public class MultiContractRecorder extends StatusRecorder<MultiContractMeta> {
for (MultiContractMeta meta : getStatus().values()) { for (MultiContractMeta meta : getStatus().values()) {
try { try {
meta.initQueue(); meta.initQueue();
int lastExeSeq = Integer.parseInt(KeyValueDBUtil.instance int lastExeSeq =
.getValue(CMTables.LastExeSeq.toString(), meta.getContractID())); Integer.parseInt(
KeyValueDBUtil.instance.getValue(
CMTables.LastExeSeq.toString(), meta.getContractID()));
meta.setLastExeSeq(lastExeSeq); meta.setLastExeSeq(lastExeSeq);
} catch (Exception e) { } catch (Exception e) {
LOGGER.error(e); e.printStackTrace();
} }
} }
} }
public MultiContractMeta getMultiContractMeta(String idOrNameOrDOI) { public MultiContractMeta getMultiContractMeta(String idOrNameOrDOI) {
if (idOrNameOrDOI == null) if (idOrNameOrDOI == null) return null;
return null;
ContractMeta meta = ContractManager.instance.statusRecorder.getContractMeta(idOrNameOrDOI); ContractMeta meta = ContractManager.instance.statusRecorder.getContractMeta(idOrNameOrDOI);
return getMultiContractMeta(meta); if (meta == null) return null;
}
public MultiContractMeta getMultiContractMeta(ContractMeta meta) {
if (meta == null)
return null;
return getStatus().get(meta.id); return getStatus().get(meta.id);
} }
public MultiContractMeta createIfNotExist(String contractID) { public MultiContractMeta createIfNotExist(String contractID) {
ContractMeta meta = ContractManager.instance.statusRecorder.createIfNotExist(contractID); MultiContractMeta ret = getMultiContractMeta(contractID);
MultiContractMeta ret = getMultiContractMeta(meta);
if (null == ret) { if (null == ret) {
LOGGER.info("requests don't contain contract " + contractID); LOGGER.info("requests don't contain contract " + contractID);
ret = new MultiContractMeta(contractID); ret = new MultiContractMeta(contractID);
@ -52,14 +44,4 @@ public class MultiContractRecorder extends StatusRecorder<MultiContractMeta> {
} }
return ret; return ret;
} }
public void removeNotExist(ContractStatusRecorder statusRecorder) {
Set<MultiContractMeta> toRemove = new HashSet<>();
for (String key : getStatus().keySet()) {
if (statusRecorder.getContractMeta(key) == null)
toRemove.add(getStatus().get(key));
}
for (MultiContractMeta meta : toRemove)
remove(meta);
}
} }

View File

@ -1,92 +0,0 @@
package org.bdware.sc;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.bdware.sc.bean.Contract;
import java.io.*;
import java.nio.file.Files;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
public class YPKResourceManager {
public static String ypkCacheDir = "./ypkcache";
static ExecutorService es = Executors.newFixedThreadPool(2);
public static void unzipYPK(Contract contract) {
es.execute(() -> unzip(contract.getScriptStr(), getUnzipDir(contract)));
}
public static String getUnzipDir(Contract contract) {
return ypkCacheDir + "/" + contract.getID();
}
private static final long MAX_CACHE_SIZE = 2L * 1024 * 1024 * 1024; // 2GB
private static final long MAX_FILE_SIZE = 10 * 1024 * 1024; // 10MB
private static final Cache<String, byte[]> cache = Caffeine.newBuilder()
.maximumSize(MAX_CACHE_SIZE).expireAfterAccess(30, TimeUnit.MINUTES).build();
public static InputStream getCachedStream(Contract c, String path) throws IOException {
File target = new File(getUnzipDir(c), path);
String key = target.getAbsolutePath();
if (target.length() > MAX_FILE_SIZE) {
// 文件过大直接从文件系统读取
return new FileInputStream(target);
} else {
byte[] data = cache.get(key, k -> {
try {
return Files.readAllBytes(target.toPath());
} catch (IOException e) {
throw new RuntimeException(e);
}
});
// 从缓存中返回一个新的 ByteArrayInputStream
return new ByteArrayInputStream(data);
}
}
public static void unzip(String zipFilePath, String destDir) {
File dir = new File(destDir);
if (!dir.exists())
dir.mkdirs();
byte[] buffer = new byte[1024];
try {
ZipInputStream zis = new ZipInputStream(new FileInputStream(zipFilePath));
ZipEntry zipEntry = zis.getNextEntry();
while (zipEntry != null) {
String fileName = zipEntry.getName();
File newFile = new File(destDir + File.separator + fileName);
// 创建所有上级目录
new File(newFile.getParent()).mkdirs();
if (zipEntry.isDirectory()) {
if (!newFile.isDirectory() && !newFile.mkdirs()) {
throw new IOException("Failed to create directory " + newFile);
}
} else {
// 写入文件内容
FileOutputStream fos = new FileOutputStream(newFile);
int len;
while ((len = zis.read(buffer)) > 0) {
fos.write(buffer, 0, len);
}
fos.close();
}
zipEntry = zis.getNextEntry();
}
zis.closeEntry();
zis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static boolean existYPK(Contract contract) {
return new File(contract.getScriptStr()).exists();
}
}

View File

@ -79,8 +79,7 @@ public class PBFTAlgorithm implements CommitAlgorithm {
prepareMsg = new PBFTMessage(); prepareMsg = new PBFTMessage();
prepareMsg.order = allocatedID.incrementAndGet(); prepareMsg.order = allocatedID.incrementAndGet();
prepareMsg.type = PBFTType.PrePrepare; prepareMsg.type = PBFTType.PrePrepare;
prepareMsg.content = prepareMsg.content = (java.util.Arrays.hashCode(pbftMessage.content) + "").getBytes();
(java.util.Arrays.hashCode(pbftMessage.content) + "").getBytes();
temp = new PCInfo(); temp = new PCInfo();
temp.request = pbftMessage; temp.request = pbftMessage;
@ -109,8 +108,7 @@ public class PBFTAlgorithm implements CommitAlgorithm {
break; break;
case Prepare: case Prepare:
temp = info.get(pbftMessage.order); temp = info.get(pbftMessage.order);
LOGGER.info( LOGGER.info("receive Prepare from:" + pbftMessage.sendID + " -> " + pbftMessage.order);
"receive Prepare from:" + pbftMessage.sendID + " -> " + pbftMessage.order);
if (temp == null) { if (temp == null) {
PCInfo pcInfo = new PCInfo(); PCInfo pcInfo = new PCInfo();
pcInfo.buff.add(pbftMessage); pcInfo.buff.add(pbftMessage);
@ -157,8 +155,7 @@ public class PBFTAlgorithm implements CommitAlgorithm {
prepareMsg.order = pbftMessage.order; prepareMsg.order = pbftMessage.order;
prepareMsg.type = PBFTType.PrePrepare; prepareMsg.type = PBFTType.PrePrepare;
temp = info.get(prepareMsg.order); temp = info.get(prepareMsg.order);
prepareMsg.content = prepareMsg.content = (java.util.Arrays.hashCode(temp.request.content) + "").getBytes();
(java.util.Arrays.hashCode(temp.request.content) + "").getBytes();
broadcast(pbftMessage); broadcast(pbftMessage);
} }
default: default:
@ -210,8 +207,7 @@ public class PBFTAlgorithm implements CommitAlgorithm {
private void matchPrePrepareFromOriginReqeust(int hash, PBFTMessage pbftMessage) { private void matchPrePrepareFromOriginReqeust(int hash, PBFTMessage pbftMessage) {
for (PCInfo pcInfo : info.values()) { for (PCInfo pcInfo : info.values()) {
for (PBFTMessage msg : pcInfo.buff) { for (PBFTMessage msg : pcInfo.buff) {
if (msg.type == PBFTType.PrePrepare if (msg.type == PBFTType.PrePrepare && Integer.parseInt(new String(msg.content)) == hash) {
&& Integer.parseInt(new String(msg.content)) == hash) {
handlePrePrepare(msg, original.get(hash).second); handlePrePrepare(msg, original.get(hash).second);
return; return;
} }
@ -224,8 +220,10 @@ public class PBFTAlgorithm implements CommitAlgorithm {
} }
private void retryLater(int delay, final Node sender, final PBFTMessage pbftMessage) { private void retryLater(int delay, final Node sender, final PBFTMessage pbftMessage) {
ContractManager.scheduledThreadPool.schedule(() -> onPBFTMessage(sender, pbftMessage), ContractManager.scheduledThreadPool.schedule(
delay, TimeUnit.MILLISECONDS); () -> onPBFTMessage(sender, pbftMessage),
delay,
TimeUnit.MILLISECONDS);
} }
private void handlePrePrepare(PBFTMessage pbftMessage, PBFTMessage req) { private void handlePrePrepare(PBFTMessage pbftMessage, PBFTMessage req) {
@ -401,8 +399,8 @@ public class PBFTAlgorithm implements CommitAlgorithm {
public String getDisplayStr() { public String getDisplayStr() {
return "pSize:" + (prepare == null ? "null" : prepare.size()) + " cSize:" return "pSize:" + (prepare == null ? "null" : prepare.size()) + " cSize:"
+ (commit == null ? "null" : commit.size()) + " isSendCommit:" + isSendCommit + (commit == null ? "null" : commit.size()) + " isSendCommit:" + isSendCommit + " isSendReply:"
+ " isSendReply:" + isSendReply + " buffSize:" + buff.size(); + isSendReply + " buffSize:" + buff.size();
} }
} }
} }

View File

@ -2,7 +2,6 @@ package org.bdware.sc.consistency.pbft;
public enum PBFTType { public enum PBFTType {
Request(0), PrePrepare(1), Prepare(2), Commit(3), Reply(4), Unknown(5), ReSend(6), AddMember(7); Request(0), PrePrepare(1), Prepare(2), Commit(3), Reply(4), Unknown(5), ReSend(6), AddMember(7);
private int type; private int type;
PBFTType(int i) { PBFTType(int i) {

View File

@ -4,6 +4,8 @@ import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.Node; import org.bdware.sc.conn.Node;
import org.bdware.sc.consistency.CommitAlgorithm; import org.bdware.sc.consistency.CommitAlgorithm;
import org.bdware.sc.consistency.Committer; import org.bdware.sc.consistency.Committer;
import org.bdware.sc.consistency.pbft.PBFTMessage;
import org.bdware.sc.consistency.pbft.PBFTType;
import org.bdware.sc.units.TrustfulExecutorConnection; import org.bdware.sc.units.TrustfulExecutorConnection;
public class ViewAlgorithm implements CommitAlgorithm { public class ViewAlgorithm implements CommitAlgorithm {
@ -11,7 +13,8 @@ public class ViewAlgorithm implements CommitAlgorithm {
private Committer committer; private Committer committer;
private TrustfulExecutorConnection connection; private TrustfulExecutorConnection connection;
public ViewAlgorithm(boolean isMaster) {} public ViewAlgorithm(boolean isMaster) {
}
public void setCommitter(Committer c) { public void setCommitter(Committer c) {
committer = c; committer = c;

View File

@ -49,16 +49,22 @@ public class EventBroker {
tempTopics = recorder.recoverTempTopicsFromDb(); tempTopics = recorder.recoverTempTopicsFromDb();
// regularly check temporary topics and clean them // regularly check temporary topics and clean them
ContractManager.scheduledThreadPool.scheduleWithFixedDelay(() -> { ContractManager.scheduledThreadPool.scheduleWithFixedDelay(
() -> {
long current = System.currentTimeMillis(); long current = System.currentTimeMillis();
int oldSize = tempTopics.size(); int oldSize = tempTopics.size();
tempTopics.keySet().forEach(topic -> { tempTopics.keySet().forEach(topic -> {
if (tempTopics.get(topic) + EXPIRED_TIME > current) { if (tempTopics.get(topic) + EXPIRED_TIME > current) {
String reqID = ContractManager.instance.nodeCenterConn.getNodeId() + "_" String reqID =
+ System.currentTimeMillis(); ContractManager.instance.nodeCenterConn.getNodeId() +
REvent cleanEvent = new REvent(topic, UNSUBSCRIBE, null, reqID); "_" + System.currentTimeMillis();
cleanEvent REvent cleanEvent =
.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair()); new REvent(
topic,
UNSUBSCRIBE,
null,
reqID);
cleanEvent.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair());
handle(cleanEvent); handle(cleanEvent);
tempTopics.remove(topic); tempTopics.remove(topic);
} }
@ -66,11 +72,16 @@ public class EventBroker {
if (oldSize != tempTopics.size()) { if (oldSize != tempTopics.size()) {
recorder.saveTempTopics(tempTopics); recorder.saveTempTopics(tempTopics);
} }
}, 0L, EXPIRED_TIME, TimeUnit.MILLISECONDS); },
0L,
EXPIRED_TIME,
TimeUnit.MILLISECONDS);
// regularly create check point in database // regularly create check point in database
ContractManager.scheduledThreadPool.scheduleAtFixedRate( ContractManager.scheduledThreadPool.scheduleAtFixedRate(
() -> recorder.createCheckPoint(topic2cIds, id2Consumers), EXPIRED_TIME, () -> recorder.createCheckPoint(topic2cIds, id2Consumers),
EXPIRED_TIME, TimeUnit.MILLISECONDS); EXPIRED_TIME,
EXPIRED_TIME,
TimeUnit.MILLISECONDS);
NodeConsumer.setCenter(center); NodeConsumer.setCenter(center);
@ -92,11 +103,10 @@ public class EventBroker {
switch (event.getType()) { switch (event.getType()) {
case SUBSCRIBE: case SUBSCRIBE:
if (null != topic && !topic.isEmpty()) { if (null != topic && !topic.isEmpty()) {
IEventConsumer consumer = doSubscribe(event); doSubscribe(event);
// save & try to sub in center // save & try to sub in center
recorder.appendEvent(event); recorder.appendEvent(event);
center.subInCenter(event.getTopic(), event.getSemantics(), event.getCenter(), center.subInCenter(event.getTopic(), event.getSemantics());
consumer, event);
} }
break; break;
case UNSUBSCRIBE: case UNSUBSCRIBE:
@ -106,10 +116,9 @@ public class EventBroker {
case PUBLISH: case PUBLISH:
case PREPUB: case PREPUB:
case PRESUB: case PRESUB:
LOGGER.info(String.format("Receive %s event from topic %s", event.getSemantics(), LOGGER.info(String.format("Receive %s event from topic %s", event.getSemantics(), topic));
topic)); LOGGER.debug(String.format("Receive %s event %s: %s",
LOGGER.debug(String.format("Receive %s event %s: %s", event.getSemantics(), topic, event.getSemantics(), topic, event.getContent()));
event.getContent()));
if (event.isForward()) { if (event.isForward()) {
// send event to the event center // send event to the event center
event.setForward(center.deliverEvent(topic, event)); event.setForward(center.deliverEvent(topic, event));
@ -124,17 +133,13 @@ public class EventBroker {
} }
} }
private IEventConsumer doSubscribe(REvent e) { private void doSubscribe(REvent e) {
IEventConsumer consumer = parseConsumer(e.getContent()); subInReg(e.getTopic(), parseConsumer(e.getContent()), this.topic2cIds, this.id2Consumers);
if (subInReg(e.getTopic(), consumer, this.topic2cIds, this.id2Consumers)) {
// for events with semantics ONLY_ONCE, mark the topic is a temporary topic // for events with semantics ONLY_ONCE, mark the topic is a temporary topic
if (ONLY_ONCE.equals(e.getSemantics()) && !tempTopics.containsKey(e.getTopic())) { if (ONLY_ONCE.equals(e.getSemantics()) && !tempTopics.containsKey(e.getTopic())) {
tempTopics.put(e.getTopic(), System.currentTimeMillis()); tempTopics.put(e.getTopic(), System.currentTimeMillis());
recorder.saveTempTopics(tempTopics); recorder.saveTempTopics(tempTopics);
} }
return consumer;
}
return null;
} }
public void doSubscribe(String topic, IEventConsumer consumer) { public void doSubscribe(String topic, IEventConsumer consumer) {
@ -150,8 +155,11 @@ public class EventBroker {
* @param id2Consumers consumer registry of broker or a check point in event recorder * @param id2Consumers consumer registry of broker or a check point in event recorder
* @return if the subscribing succeeds * @return if the subscribing succeeds
*/ */
public boolean subInReg(String topic, IEventConsumer consumer, public boolean subInReg(
Map<String, Set<String>> topic2cIds, Map<String, IEventConsumer> id2Consumers) { String topic,
IEventConsumer consumer,
Map<String, Set<String>> topic2cIds,
Map<String, IEventConsumer> id2Consumers) {
if (null == consumer) { if (null == consumer) {
return false; return false;
} }
@ -165,8 +173,8 @@ public class EventBroker {
topic2cIds.get(topic).add(cId); topic2cIds.get(topic).add(cId);
switch (consumer.getType()) { switch (consumer.getType()) {
case Contract: case Contract:
LOGGER.info("contract " + ((ContractConsumer) consumer).getContract() LOGGER.info("contract " + ((ContractConsumer) consumer).getContract() +
+ " subscribes topic " + topic); " subscribes topic " + topic);
break; break;
case Node: case Node:
LOGGER.info("node " + consumer.getId() + " subscribes topic " + topic); LOGGER.info("node " + consumer.getId() + " subscribes topic " + topic);
@ -187,11 +195,10 @@ public class EventBroker {
* do subscribing in registry<br/> * do subscribing in registry<br/>
* topic and consumer must not be null at the same time * topic and consumer must not be null at the same time
* <ul> * <ul>
* <li>if consumer is null and topic is not, it means the topic is a temporary topic, remove * <li>if consumer is null and topic is not, it means the topic is a temporary topic, remove it</li>
* it</li> * <li>if topic is null and consumer is not,
* <li>if topic is null and consumer is not, it means a consumer or a contract wants to * it means a consumer or a contract wants to unsubscribe all topics,
* unsubscribe all topics, remove all related consumers in topic registry and consumer * remove all related consumers in topic registry and consumer registry</li>
* registry</li>
* <li>if two of them is not null, do unsubscribing in two registries</li> * <li>if two of them is not null, do unsubscribing in two registries</li>
* </ul> * </ul>
* *
@ -201,8 +208,11 @@ public class EventBroker {
* @param id2Consumers consumer registry of broker or a check point in event recorder * @param id2Consumers consumer registry of broker or a check point in event recorder
* @return if the subscribing succeeds * @return if the subscribing succeeds
*/ */
public boolean unsubInReg(String topic, IEventConsumer consumer, public boolean unsubInReg(
Map<String, Set<String>> topic2cIds, Map<String, IEventConsumer> id2Consumers) { String topic,
IEventConsumer consumer,
Map<String, Set<String>> topic2cIds,
Map<String, IEventConsumer> id2Consumers) {
if (null == topic && null == consumer) { if (null == topic && null == consumer) {
return false; return false;
} }
@ -222,8 +232,7 @@ public class EventBroker {
// if cId belongs to a contract, find all related consumers // if cId belongs to a contract, find all related consumers
toRmIds = new ArrayList<>(); toRmIds = new ArrayList<>();
id2Consumers.forEach((k, c) -> { id2Consumers.forEach((k, c) -> {
if (c instanceof ContractConsumer if (c instanceof ContractConsumer && ((ContractConsumer) c).getContract().equals(cId)) {
&& ((ContractConsumer) c).getContract().equals(cId)) {
toRmIds.add(k); toRmIds.add(k);
} }
}); });
@ -246,8 +255,7 @@ public class EventBroker {
/** /**
* parse consumer information from content str<br/> * parse consumer information from content str<br/>
* if caller wants to select all consumers of a contract, the content str is also parsed into a * if caller wants to select all consumers of a contract, the content str is also parsed into a node consumer
* node consumer
* *
* @param content json string, {"subscriber": "[subscriber]", "handler?": "[handler]"} * @param content json string, {"subscriber": "[subscriber]", "handler?": "[handler]"}
* @return a node consumer or contract consumer, or null if exception is thrown * @return a node consumer or contract consumer, or null if exception is thrown
@ -267,9 +275,6 @@ public class EventBroker {
if (null != handler && !handler.isEmpty()) { if (null != handler && !handler.isEmpty()) {
consumer = new ContractConsumer(subscriber, handler); consumer = new ContractConsumer(subscriber, handler);
} else { } else {
if (subscriber.equals(ContractManager.instance.nodeCenterConn.getNodeId())) {
return null;
}
consumer = new NodeConsumer(subscriber); consumer = new NodeConsumer(subscriber);
} }
return consumer; return consumer;
@ -293,16 +298,19 @@ public class EventBroker {
case AT_LEAST_ONCE: case AT_LEAST_ONCE:
case NEED_RETRY: case NEED_RETRY:
// send events to all // send events to all
topicConsumers topicConsumers.forEach(cId ->
.forEach(cId -> deliverEvent(event, cEvent, nEventStr, cId, topic, true)); deliverEvent(event, cEvent, nEventStr, cId, topic, true));
break; break;
case AT_MOST_ONCE: case AT_MOST_ONCE:
// send event to a random consumer // send event to a random consumer
// AT_MOST_ONCE, so broker don't need to do anything when delivering fails // AT_MOST_ONCE, so broker don't need to do anything when delivering fails
deliverEvent(event, cEvent, nEventStr, deliverEvent(
topicConsumers.toArray()[(int) (Math.random() * topicConsumers.size())] event,
.toString(), cEvent,
topic, true); nEventStr,
topicConsumers.toArray()[(int) (Math.random() * topicConsumers.size())].toString(),
topic,
true);
break; break;
case ONLY_ONCE: case ONLY_ONCE:
switch (event.getType()) { switch (event.getType()) {
@ -328,14 +336,20 @@ public class EventBroker {
// send PREPUB events to all consumers // send PREPUB events to all consumers
// TODO if there are no consumers to receive the ONLY_ONCE events? // TODO if there are no consumers to receive the ONLY_ONCE events?
ContractManager.threadPool.execute(() -> { ContractManager.threadPool.execute(() -> {
REvent prePubMsg = new REvent(event.getTopic(), PREPUB, contentHash, REvent prePubMsg = new REvent(event.getTopic(),
PREPUB,
contentHash,
event.getRequestID()); event.getRequestID());
prePubMsg.doSignature( prePubMsg.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair());
ContractManager.instance.nodeCenterConn.getNodeKeyPair()); topicConsumers.forEach(cId ->
topicConsumers.forEach(cId -> deliverEvent(prePubMsg, deliverEvent(prePubMsg,
new Event(event.getTopic(), contentHash, new Event(event.getTopic(),
contentHash,
prePubMsg.getSemantics()), prePubMsg.getSemantics()),
JsonUtil.toJson(prePubMsg), cId, topic, false)); JsonUtil.toJson(prePubMsg),
cId,
topic,
false));
// wait for responses from contracts (PRESUB events) // wait for responses from contracts (PRESUB events)
while (true) { while (true) {
try { try {
@ -343,20 +357,20 @@ public class EventBroker {
flag.wait(30 * 1000L); flag.wait(30 * 1000L);
} }
if (!flag.get().isEmpty()) { if (!flag.get().isEmpty()) {
REvent finalMsg = new REvent(flag.get(), PUBLISH, REvent finalMsg = new REvent(flag.get(),
event.getContent(), HashUtil.sha3(contentHash PUBLISH,
+ System.currentTimeMillis())); event.getContent(),
HashUtil.sha3(
contentHash + System.currentTimeMillis()));
// if the delivering fails, retry publishing // if the delivering fails, retry publishing
finalMsg.setSemantics(NEED_RETRY); finalMsg.setSemantics(NEED_RETRY);
finalMsg.doSignature( finalMsg.doSignature(
ContractManager.instance.nodeCenterConn ContractManager.instance.nodeCenterConn.getNodeKeyPair());
.getNodeKeyPair());
handle(finalMsg); handle(finalMsg);
break; break;
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOGGER.warn("ONLY_ONE event delivering is interrupted: " LOGGER.warn("ONLY_ONE event delivering is interrupted: " + e.getMessage());
+ e.getMessage());
// e.printStackTrace(); // e.printStackTrace();
} }
} }
@ -375,15 +389,19 @@ public class EventBroker {
* publish the event to the consumer * publish the event to the consumer
* *
* @param event event message * @param event event message
* @param cEvent simple event message to the contract consumer, only topic, content and * @param cEvent simple event message to the contract consumer, only topic, content and semantics
* semantics
* @param nEventStr event message to the node * @param nEventStr event message to the node
* @param cId consumer id * @param cId consumer id
* @param topic topic of the event * @param topic topic of the event
* @param isPub if the event is published or pre-published * @param isPub if the event is published or pre-published
*/ */
private void deliverEvent(REvent event, Event cEvent, String nEventStr, String cId, private void deliverEvent(
String topic, boolean isPub) { REvent event,
Event cEvent,
String nEventStr,
String cId,
String topic,
boolean isPub) {
if (id2Consumers.containsKey(cId)) { if (id2Consumers.containsKey(cId)) {
IEventConsumer consumer = id2Consumers.get(cId); IEventConsumer consumer = id2Consumers.get(cId);
if (consumer instanceof ContractConsumer) { if (consumer instanceof ContractConsumer) {
@ -394,24 +412,31 @@ public class EventBroker {
public void onResult(String unused) { public void onResult(String unused) {
// if the delivering fails, unsubscribe the consumer // if the delivering fails, unsubscribe the consumer
ContractConsumer c = (ContractConsumer) consumer; ContractConsumer c = (ContractConsumer) consumer;
ContractClient client = ContractClient client = ContractManager.instance.getClient(c.getContract());
ContractManager.instance.getClient(c.getContract()); String reqID =
String reqID = ContractManager.instance.nodeCenterConn.getNodeKeyPair() ContractManager.instance.nodeCenterConn.getNodeKeyPair().getPublicKeyStr() +
.getPublicKeyStr() + "_" + System.currentTimeMillis(); "_" + System.currentTimeMillis();
REvent unsubEvent = new REvent(topic, UNSUBSCRIBE, REvent unsubEvent =
"{\"subscriber\":\"" + cId + "\"}", reqID); new REvent(
topic,
UNSUBSCRIBE,
"{\"subscriber\":\"" + cId + "\"}",
reqID);
unsubEvent.doSignature(client.getPubkey(), client.getContractKey()); unsubEvent.doSignature(client.getPubkey(), client.getContractKey());
handle(unsubEvent); handle(unsubEvent);
// if the event is an ONLY_ONCE event, retry publishing // if the event is an ONLY_ONCE event, retry publishing
if (NEED_RETRY.equals(event.getSemantics())) { if (NEED_RETRY.equals(event.getSemantics())) {
REvent newMsg = new REvent(topic.split("\\|")[0], PUBLISH, REvent newMsg =
event.getContent(), event.getRequestID()); new REvent(
topic.split("\\|")[0],
PUBLISH,
event.getContent(),
event.getRequestID());
newMsg.setSemantics(ONLY_ONCE); newMsg.setSemantics(ONLY_ONCE);
newMsg.setHash(event.getHash()); newMsg.setHash(event.getHash());
newMsg.setTxHash(event.getTxHash()); newMsg.setTxHash(event.getTxHash());
newMsg.doSignature( newMsg.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair());
ContractManager.instance.nodeCenterConn.getNodeKeyPair());
handle(newMsg); handle(newMsg);
} }
} }
@ -433,8 +458,8 @@ public class EventBroker {
}); });
} else if (isPub) { } else if (isPub) {
// client consumer // client consumer
ContractManager.threadPool ContractManager.threadPool.execute(() ->
.execute(() -> consumer.publishEvent(nEventStr, new ResultCallback() { consumer.publishEvent(nEventStr, new ResultCallback() {
@Override @Override
public void onResult(String unused) { public void onResult(String unused) {
unsubInReg(null, consumer, topic2cIds, id2Consumers); unsubInReg(null, consumer, topic2cIds, id2Consumers);

View File

@ -1,8 +1,5 @@
package org.bdware.sc.event; package org.bdware.sc.event;
import org.bdware.sc.event.clients.ContractConsumer;
import org.bdware.sc.event.clients.IEventConsumer;
import org.bdware.sc.event.clients.NodeConsumer;
import org.bdware.sc.util.DHTUtil; import org.bdware.sc.util.DHTUtil;
import org.bdware.sc.util.JsonUtil; import org.bdware.sc.util.JsonUtil;
@ -18,18 +15,17 @@ public class EventCenter {
* get the nearest node to the topic in the hash function range * get the nearest node to the topic in the hash function range
* *
* @param topic the topic * @param topic the topic
* @param k the number of centers
* @return id of the node * @return id of the node
*/ */
public String[] getCenterByTopic(String topic, int k) { public String getCenterByTopic(String topic) {
if (null == instance.nodeCenterConn) { if (null == instance.nodeCenterConn) {
return null; return null;
} }
String[] centers = DHTUtil.getClusterByKey(topic, k); String[] centers = DHTUtil.getClusterByKey(topic, 1);
if (null == centers || centers.length == 0) { if (null == centers || centers.length == 0) {
return null; return null;
} }
return centers; return centers[0];
} }
/** /**
@ -37,39 +33,21 @@ public class EventCenter {
* *
* @param topic event topic * @param topic event topic
* @param semantics event semantics, used to mark PRESUB events * @param semantics event semantics, used to mark PRESUB events
* @param center id of event center if the subscribing has been handled
* @param consumer consumer
* @param event original event
*/ */
public void subInCenter(String topic, REvent.REventSemantics semantics, String center, public void subInCenter(String topic, REvent.REventSemantics semantics) {
IEventConsumer consumer, REvent event) {
if (null == instance.nodeCenterConn) { if (null == instance.nodeCenterConn) {
return; return;
} }
REvent msg = new REvent(topic, REvent.REventType.SUBSCRIBE, REvent msg = new REvent(
String.format("{\"subscriber\":\"%s\"}", instance.nodeCenterConn.getNodeId()), ""); topic,
REvent.REventType.SUBSCRIBE,
String.format("{\"subscriber\":\"%s\"}",
instance.nodeCenterConn.getNodeId()),
"");
msg.setSemantics(semantics); msg.setSemantics(semantics);
msg.doSignature(instance.nodeCenterConn.getNodeKeyPair()); msg.doSignature(instance.nodeCenterConn.getNodeKeyPair());
msg.setCenter(center); String nodeId = getCenterByTopic(topic);
String[] centers = getCenterByTopic(topic, 2); instance.masterStub.deliverEvent(nodeId, JsonUtil.toJson(msg));
if (null == centers) {
return;
}
if (null != center) {
if (centers[0].equals(center) && centers.length > 1) {
instance.masterStub.deliverEvent(centers[1], JsonUtil.toJson(msg));
} else {
instance.masterStub.deliverEvent(centers[0], JsonUtil.toJson(msg));
}
} else if (!instance.masterStub.deliverEvent(centers[0], JsonUtil.toJson(msg))) {
if (consumer instanceof NodeConsumer) {
event.setCenter(centers[0]);
instance.masterStub.deliverEvent(consumer.getId(), JsonUtil.toJson(event));
} else if (consumer instanceof ContractConsumer && centers.length > 1) {
msg.setCenter(centers[0]);
instance.masterStub.deliverEvent(centers[1], JsonUtil.toJson(msg));
}
}
} }
/** /**
@ -83,11 +61,8 @@ public class EventCenter {
if (null == instance.masterStub) { if (null == instance.masterStub) {
return false; return false;
} }
String[] centers = getCenterByTopic(topic, 1); String nodeId = getCenterByTopic(topic);
if (null == centers) { return instance.masterStub.deliverEvent(nodeId, JsonUtil.toJson(event));
return false;
}
return instance.masterStub.deliverEvent(centers[0], JsonUtil.toJson(event));
} }
/** /**

View File

@ -79,11 +79,9 @@ public class EventRecorder {
String key = KeyValueRocksDBUtil.instance.getValue(dbName, LATEST_EVENT_KEY); String key = KeyValueRocksDBUtil.instance.getValue(dbName, LATEST_EVENT_KEY);
latestEvent.set(key); latestEvent.set(key);
CheckPoint cp = new CheckPoint(); CheckPoint cp = new CheckPoint();
Type topic2cIdsType = Type topic2cIdsType = TypeToken.getParameterized(ConcurrentHashMap.class, String.class,
TypeToken TypeToken.getParameterized(Set.class, String.class,
.getParameterized(ConcurrentHashMap.class, String.class, String.class).getType()).getType();
TypeToken.getParameterized(Set.class, String.class).getType())
.getType();
// retrieving transactions from database // retrieving transactions from database
while (null != key && !key.isEmpty()) { while (null != key && !key.isEmpty()) {
String json = KeyValueRocksDBUtil.instance.getValue(dbName, key); String json = KeyValueRocksDBUtil.instance.getValue(dbName, key);
@ -95,16 +93,14 @@ public class EventRecorder {
if (json.startsWith("cp")) { if (json.startsWith("cp")) {
// create check point by the transaction and stop retrieving // create check point by the transaction and stop retrieving
JsonObject data = JsonUtil.parseStringAsJsonObject(json.substring(2)); JsonObject data = JsonUtil.parseStringAsJsonObject(json.substring(2));
cp.topic2cIds = cp.topic2cIds = JsonUtil.fromJson(data.get("topic2cIds").toString(), topic2cIdsType);
JsonUtil.fromJson(data.get("topic2cIds").toString(), topic2cIdsType);
JsonObject id2Consumers = data.getAsJsonObject("id2Consumers"); JsonObject id2Consumers = data.getAsJsonObject("id2Consumers");
for (String k : id2Consumers.keySet()) { for (String k : id2Consumers.keySet()) {
JsonObject consumer = id2Consumers.getAsJsonObject(k); JsonObject consumer = id2Consumers.getAsJsonObject(k);
if (!consumer.has("type")) { if (!consumer.has("type")) {
continue; continue;
} }
ConsumerType type = ConsumerType type = ConsumerType.valueOf(consumer.get("type").getAsString());
ConsumerType.valueOf(consumer.get("type").getAsString());
switch (type) { switch (type) {
case Contract: case Contract:
cp.id2Consumers.put(k, cp.id2Consumers.put(k,
@ -149,8 +145,7 @@ public class EventRecorder {
// if empty, return the check point // if empty, return the check point
latestEvent.setCp(true); latestEvent.setCp(true);
} else { } else {
// on the base of old check point, process following sub or unsub events to recover // on the base of old check point, process following sub or unsub events to recover registry
// registry
while (!stack.empty()) { while (!stack.empty()) {
Object record = stack.pop(); Object record = stack.pop();
if (record instanceof CheckPoint) { if (record instanceof CheckPoint) {
@ -160,19 +155,15 @@ public class EventRecorder {
IEventConsumer consumer = broker.parseConsumer(tran.content); IEventConsumer consumer = broker.parseConsumer(tran.content);
switch (tran.type) { switch (tran.type) {
case SUBSCRIBE: case SUBSCRIBE:
if (!broker.subInReg(tran.topic, consumer, cp.topic2cIds, if (!broker.subInReg(tran.topic, consumer, cp.topic2cIds, cp.id2Consumers)) {
cp.id2Consumers)) {
LOGGER.warn("record damaged! " + key); LOGGER.warn("record damaged! " + key);
LOGGER.debug( LOGGER.debug("record damaged! " + key + ": " + JsonUtil.toJson(tran));
"record damaged! " + key + ": " + JsonUtil.toJson(tran));
} }
break; break;
case UNSUBSCRIBE: case UNSUBSCRIBE:
if (!broker.unsubInReg(tran.topic, consumer, cp.topic2cIds, if (!broker.unsubInReg(tran.topic, consumer, cp.topic2cIds, cp.id2Consumers)) {
cp.id2Consumers)) {
LOGGER.warn("record damaged! " + key); LOGGER.warn("record damaged! " + key);
LOGGER.debug( LOGGER.debug("record damaged! " + key + ": " + JsonUtil.toJson(tran));
"record damaged! " + key + ": " + JsonUtil.toJson(tran));
} }
default: default:
break; break;

View File

@ -1,6 +1,5 @@
package org.bdware.sc.event.clients; package org.bdware.sc.event.clients;
import com.google.gson.Gson;
import com.google.gson.annotations.Expose; import com.google.gson.annotations.Expose;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -81,7 +80,6 @@ public class ContractConsumer implements IEventConsumer {
return; return;
} }
AtomicInteger callCount = new AtomicInteger(0); AtomicInteger callCount = new AtomicInteger(0);
LOGGER.info("ContractConsumer!" + new Gson().toJson(cr));
// TODO sending requests at a high frequency maybe cause that some requests are ignored // TODO sending requests at a high frequency maybe cause that some requests are ignored
ScheduledFuture<?> future = ContractManager.scheduledThreadPool.scheduleAtFixedRate( ScheduledFuture<?> future = ContractManager.scheduledThreadPool.scheduleAtFixedRate(
() -> ContractManager.instance.executeContractInternal(cr, new ResultCallback() { () -> ContractManager.instance.executeContractInternal(cr, new ResultCallback() {
@ -91,20 +89,19 @@ public class ContractConsumer implements IEventConsumer {
try { try {
ContractResult result = JsonUtil.fromJson(str, ContractResult.class); ContractResult result = JsonUtil.fromJson(str, ContractResult.class);
if (!result.status.equals(ContractResult.Status.Success)) { if (!result.status.equals(ContractResult.Status.Success)) {
if (callCount.get() == TIMEOUT_COUNT if (callCount.get() == TIMEOUT_COUNT ||
|| (result.status == ContractResult.Status.Exception (result.status == ContractResult.Status.Exception &&
&& result.result.toString() result.result.toString().contains("not exported"))) {
.contains("not exported"))) { LOGGER.error(String.format(
LOGGER.error(String.format("receiving event error! %s.%s: %s", "receiving event error! %s.%s: %s", contract, handler, str));
contract, handler, str));
rc.onResult((String) null); rc.onResult((String) null);
} else { } else {
ret = false; ret = false;
} }
} }
} catch (Exception e) { } catch (Exception e) {
LOGGER.error(String.format("receiving event error! %s.%s: %s", contract, LOGGER.error(String.format(
handler, e.getMessage())); "receiving event error! %s.%s: %s", contract, handler, e.getMessage()));
rc.onResult((String) null); rc.onResult((String) null);
} }
callCount.incrementAndGet(); callCount.incrementAndGet();
@ -122,7 +119,10 @@ public class ContractConsumer implements IEventConsumer {
} }
} }
}, (reqID, hashStr) -> { }, (reqID, hashStr) -> {
}), 500L, 2500L, TimeUnit.MILLISECONDS); }),
500L,
2500L,
TimeUnit.MILLISECONDS);
scheduledFutures.put(getId(), future); scheduledFutures.put(getId(), future);
synchronized (flag) { synchronized (flag) {
flag.notify(); flag.notify();

View File

@ -1,6 +1,7 @@
package org.bdware.sc.event.clients; package org.bdware.sc.event.clients;
import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.event.Event;
public interface IEventConsumer { public interface IEventConsumer {
String getId(); String getId();
@ -12,6 +13,8 @@ public interface IEventConsumer {
void competeSub(Object msg, ResultCallback rc, String... options); void competeSub(Object msg, ResultCallback rc, String... options);
enum ConsumerType { enum ConsumerType {
Contract, Node, WSClient Contract,
Node,
WSClient
} }
} }

View File

@ -47,5 +47,6 @@ public class WSClientConsumer implements IEventConsumer {
} }
@Override @Override
public void competeSub(Object msg, ResultCallback rc, String... options) {} public void competeSub(Object msg, ResultCallback rc, String... options) {
}
} }

View File

@ -6,6 +6,7 @@ import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.Description; import org.bdware.sc.conn.Description;
import org.bdware.sc.conn.MsgHandler; import org.bdware.sc.conn.MsgHandler;
import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.event.REvent;
import org.bdware.sc.get.GetMessage; import org.bdware.sc.get.GetMessage;
import org.bdware.sc.util.JsonUtil; import org.bdware.sc.util.JsonUtil;
@ -34,7 +35,8 @@ public class ManagerHandler extends MsgHandler {
cb.onResult(cm.getTimesOfExecution(msg.arg)); cb.onResult(cm.getTimesOfExecution(msg.arg));
} }
@Description(value = "execute Contract, {\"contractID\":\"112233\",\"arg\":\"\"}", @Description(
value = "execute Contract, {\"contractID\":\"112233\",\"arg\":\"\"}",
isAsync = true) isAsync = true)
public void executeContract(GetMessage msg, ResultCallback cb) { public void executeContract(GetMessage msg, ResultCallback cb) {
ContractRequest c; ContractRequest c;
@ -87,10 +89,4 @@ public class ManagerHandler extends MsgHandler {
ContractRequest cr = JsonUtil.fromJson(msg.arg, ContractRequest.class); ContractRequest cr = JsonUtil.fromJson(msg.arg, ContractRequest.class);
cb.onResult(cm.addDEMember(cr.getContractID(), cr.getArg().getAsString())); cb.onResult(cm.addDEMember(cr.getContractID(), cr.getArg().getAsString()));
} }
@Description("add distributed execution member, sample: {contractID:id/name, arg:ipAndPort}")
public void getLedgerParams(GetMessage msg, ResultCallback cb) {
cb.onResult(ContractManager.instance.getLedgerParams().toString());
}
} }

View File

@ -4,51 +4,81 @@ package org.bdware.sc.units;
import java.io.*; import java.io.*;
/* /*
* 节点启动后记录自己的units信息和合约的一些信息 节点重新上线后NC发要求让其恢复则从本地读取自己的这些信息进行恢复 * 节点启动后记录自己的units信息和合约的一些信息
* 节点重新上线后NC发要求让其恢复则从本地读取自己的这些信息进行恢复
*/ */
public class ContractRecord implements Serializable { public class ContractRecord implements Serializable {
/* /* private static final long serialVersionUID = 704775241674568688L;
* private static final long serialVersionUID = 704775241674568688L;
* public transient RecoverFlag recoverFlag = RecoverFlag.Fine;
* public transient RecoverFlag recoverFlag = RecoverFlag.Fine;
* public String contractID;
* public String contractID; public String contractName; public String key; public String public String contractName;
* contractpubKey; public ContractType type; public int copies = 1; public int lastExeSeq = -1; public String key;
* public boolean isPrivate = false; public String pubKeyPath; public String ypkName; public String contractpubKey;
* public ContractType type;
* //public Map<String,String> members; //k-udpid,v-udpaddress public int copies = 1;
* public int lastExeSeq = -1;
* public String memory = ""; public boolean isPrivate = false;
* public String pubKeyPath;
* //JavaScriptEntry public String invokeID; public String ypkName;
*
* public ContractRecord(String id){ this.contractID = id; } //public Map<String,String> members; //k-udpid,v-udpaddress
*
* public ContractRecord(String id,String contractName,String key,String public String memory = "";
* contractpubKey,ContractType type,int copies){ this.contractID = id; this.contractName =
* contractName; this.key = key; this.contractpubKey = contractpubKey; this.type = type; //JavaScriptEntry
* this.copies = copies; } public String invokeID;
*
* public void printContent(){ System.out.println("==========ContractRecord========"); public ContractRecord(String id){
* this.contractID = id;
* System.out.println("contractID=" + contractID == null ? "null" : contractID); }
* System.out.println("contractName=" + contractName == null ? "null" : contractName);
* System.out.println("key=" + key == null ? "null" : key); System.out.println("contractPubKey=" public ContractRecord(String id,String contractName,String key,String contractpubKey,ContractType type,int copies){
* + contractpubKey == null ? "null" : contractpubKey); System.out.println("type=" + type == this.contractID = id;
* null ? "null" : type); System.out.println("lastExeSeq=" + lastExeSeq == null ? "null" : this.contractName = contractName;
* lastExeSeq); System.out.println("invokeID=" + invokeID == null ? "null" : invokeID); this.key = key;
* System.out.println("copies=" + copies); System.out.println("isPrivate=" + isPrivate); this.contractpubKey = contractpubKey;
* System.out.println("pubKeyPath=" + pubKeyPath == null ? "null" : pubKeyPath); this.type = type;
* System.out.println("ypkName=" + ypkName == null ? "null" : ypkName); if(members != null){ this.copies = copies;
* for(String k : members.keySet()){ System.out.println("members " + k + "-" + members.get(k)); }
* } } System.out.println("memory=" + memory == null ? "null" : memory);
* System.out.println("==========ContractRecord print finish======="); } public void printContent(){
* System.out.println("==========ContractRecord========");
* public static void getContentFromFile(String path){ File file = new File(path); try{
* FileInputStream os = new FileInputStream(file); ObjectInputStream oos = new System.out.println("contractID=" + contractID == null ? "null" : contractID);
* ObjectInputStream(os); ContractRecord record = (ContractRecord) oos.readObject(); System.out.println("contractName=" + contractName == null ? "null" : contractName);
* record.printContent(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch System.out.println("key=" + key == null ? "null" : key);
* (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { System.out.println("contractPubKey=" + contractpubKey == null ? "null" : contractpubKey);
* e.printStackTrace(); } } System.out.println("type=" + type == null ? "null" : type);
*/ System.out.println("lastExeSeq=" + lastExeSeq == null ? "null" : lastExeSeq);
System.out.println("invokeID=" + invokeID == null ? "null" : invokeID);
System.out.println("copies=" + copies);
System.out.println("isPrivate=" + isPrivate);
System.out.println("pubKeyPath=" + pubKeyPath == null ? "null" : pubKeyPath);
System.out.println("ypkName=" + ypkName == null ? "null" : ypkName);
if(members != null){
for(String k : members.keySet()){
System.out.println("members " + k + "-" + members.get(k));
}
}
System.out.println("memory=" + memory == null ? "null" : memory);
System.out.println("==========ContractRecord print finish=======");
}
public static void getContentFromFile(String path){
File file = new File(path);
try{
FileInputStream os = new FileInputStream(file);
ObjectInputStream oos = new ObjectInputStream(os);
ContractRecord record = (ContractRecord) oos.readObject();
record.printContent();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}*/
} }

View File

@ -16,7 +16,9 @@ public class ContractUnitController {
ContractManager manager; ContractManager manager;
SequencingAlgorithmFactory algorithmFactory; SequencingAlgorithmFactory algorithmFactory;
public ContractUnitController(TrustfulExecutorConnection connection, ContractManager manager, public ContractUnitController(
TrustfulExecutorConnection connection,
ContractManager manager,
SequencingAlgorithmFactory factory) { SequencingAlgorithmFactory factory) {
this.connection = connection; this.connection = connection;
this.manager = manager; this.manager = manager;
@ -37,8 +39,13 @@ public class ContractUnitController {
unit.node2member = new HashMap<>(); unit.node2member = new HashMap<>();
units.put(req.contract.getID(), unit); units.put(req.contract.getID(), unit);
System.out.println("[ContractUnitController] startContract:" + result + " isMaster:" System.out.println(
+ req.isMaster + " cid:" + req.contract.getID()); "[ContractUnitController] startContract:"
+ result
+ " isMaster:"
+ req.isMaster
+ " cid:"
+ req.contract.getID());
break; break;
case AddMember: case AddMember:
LOGGER.debug("contractID:" + cumsg.getContractID()); LOGGER.debug("contractID:" + cumsg.getContractID());
@ -61,8 +68,12 @@ public class ContractUnitController {
unit.node2member = new HashMap<>(); unit.node2member = new HashMap<>();
units.put(req2.contract.getID(), unit); units.put(req2.contract.getID(), unit);
System.out.println("[ContractUnitController] startContract:" + " isMaster:" System.out.println(
+ req2.isMaster + " cid:" + req2.contract.getID()); "[ContractUnitController] startContract:"
+ " isMaster:"
+ req2.isMaster
+ " cid:"
+ req2.contract.getID());
break; break;
} }
} }
@ -137,9 +148,16 @@ public class ContractUnitController {
} }
public String getDisplayStr() { public String getDisplayStr() {
return "pSize:" + (prepare == null ? "null" : prepare.size()) + " cSize:" return "pSize:"
+ (commit == null ? "null" : commit.size()) + " isSendCommit:" + isSendCommit + (prepare == null ? "null" : prepare.size())
+ " isSendReply:" + isSendReply + " buffSize:" + buff.size(); + " cSize:"
+ (commit == null ? "null" : commit.size())
+ " isSendCommit:"
+ isSendCommit
+ " isSendReply:"
+ isSendReply
+ " buffSize:"
+ buff.size();
} }
} }
} }

View File

@ -61,8 +61,7 @@ public class ContractUnitMessage implements Serializable {
// } // }
public boolean verify(ContractUnitMember member) { public boolean verify(ContractUnitMember member) {
ECPublicKeyParameters param = BCECUtil.createECPublicKeyFromStrParameters(member.pubKey, ECPublicKeyParameters param = BCECUtil.createECPublicKeyFromStrParameters(member.pubKey,SM2Util.CURVE,SM2Util.DOMAIN_PARAMS);
SM2Util.CURVE, SM2Util.DOMAIN_PARAMS);
return SM2Util.verify(param,content, signature.getBytes()); return SM2Util.verify(param,content, signature.getBytes());
} }

View File

@ -5,7 +5,6 @@ import java.io.Serializable;
public enum ContractUnitType implements Serializable { public enum ContractUnitType implements Serializable {
Start(0), AddMember(1), Sequencing(2), Unknown(3); Start(0), AddMember(1), Sequencing(2), Unknown(3);
private int type; private int type;
private ContractUnitType(int i) { private ContractUnitType(int i) {

View File

@ -101,9 +101,10 @@ public class MultiContractMeta implements IDSerializable {
public void setLastExeSeq(int lastExeSeq) { public void setLastExeSeq(int lastExeSeq) {
this.lastExeSeq.set(lastExeSeq); this.lastExeSeq.set(lastExeSeq);
if (KeyValueDBUtil.instance.containsKey(CMTables.LastExeSeq.toString(), contractID)) { // 如果现在是Stable模式就同步刷到磁盘 if (KeyValueDBUtil.instance.containsKey(
KeyValueDBUtil.instance.setValue(CMTables.LastExeSeq.toString(), contractID, CMTables.LastExeSeq.toString(), contractID)) { // 如果现在是Stable模式就同步刷到磁盘
String.valueOf(lastExeSeq)); KeyValueDBUtil.instance.setValue(
CMTables.LastExeSeq.toString(), contractID, String.valueOf(lastExeSeq));
} }
} }
@ -153,18 +154,19 @@ public class MultiContractMeta implements IDSerializable {
public String[] getMembers() { public String[] getMembers() {
return members; return members;
} }
public String joinMembers(String delimiter){ public String joinMembers(String delimiter){
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
if (members.length > 0) if (members.length > 0)
sb.append(members[0]); sb.append(members[0]);
else else return "";
return "";
for (int i = 1; i < members.length; i++) { for (int i = 1; i < members.length; i++) {
sb.append(delimiter).append(members[i]); sb.append(delimiter).append(members[i]);
} }
return sb.toString(); return sb.toString();
} }
public void setMembers(String[] m) {
members = m;
}
public void setMembers(JsonArray members) { public void setMembers(JsonArray members) {
String[] copied = new String[members.size()]; String[] copied = new String[members.size()];

View File

@ -24,8 +24,7 @@ public class RespCache {
} else { } else {
waiter.wait(5000L); waiter.wait(5000L);
timeout &= waiter.get() * 2 > count; timeout &= waiter.get() * 2 > count;
if (!timeout) if (!timeout) waiter.notifyAll();
waiter.notifyAll();
return timeout; return timeout;
} }
} }

View File

@ -25,8 +25,9 @@ public class DHTUtil {
String hash = HashUtil.sha3ToFixedLen(key, nodes[0].length() - 2); String hash = HashUtil.sha3ToFixedLen(key, nodes[0].length() - 2);
int l = 0, r = nodes.length - 1, m, h2l = hash.compareTo(nodes[l].substring(2)), int l = 0, r = nodes.length - 1, m,
r2h = nodes[r].substring(2).compareTo(hash), h2m; h2l = hash.compareTo(nodes[l].substring(2)), r2h = nodes[r].substring(2).compareTo(hash),
h2m;
BigInteger bigH = null; BigInteger bigH = null;
String selected; String selected;
do { do {
@ -41,7 +42,8 @@ public class DHTUtil {
break; break;
} }
if (l + 1 == r) { if (l + 1 == r) {
BigInteger bigL = getBigInteger(nodes[l]), bigR = getBigInteger(nodes[r]); BigInteger bigL = getBigInteger(nodes[l]),
bigR = getBigInteger(nodes[r]);
bigH = new BigInteger(hash, 16); bigH = new BigInteger(hash, 16);
if (bigR.subtract(bigH).compareTo(bigH.subtract(bigL)) > -1) { if (bigR.subtract(bigH).compareTo(bigH.subtract(bigL)) > -1) {
selected = nodes[l]; selected = nodes[l];

View File

@ -5,16 +5,12 @@ import org.bdware.sc.conn.Node;
import org.bdware.sc.conn.OnHashCallback; import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.conn.ResultCallback;
import java.util.Map;
public interface ContractExecutor { public interface ContractExecutor {
void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb); void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb);
default void onRecover(Map<String, Object> args) {} default void close() {
}
default void onDeliverBlock(String data) {}
default void onSyncMessage(Node node, byte[] data) {
default void close() {} }
default void onSyncMessage(Node node, byte[] data) {}
} }

View File

@ -3,5 +3,6 @@ package org.bdware.server.trustedmodel;
import java.io.Serializable; import java.io.Serializable;
public enum ContractUnitStatus implements Serializable { public enum ContractUnitStatus implements Serializable {
CommonMode, StableMode; CommonMode,
StableMode;
} }

View File

@ -1,11 +0,0 @@
package org.bdware.server.trustedmodel;
public class MultiReqSeq {
public final int seq;
public final long startTime;
public MultiReqSeq(int s) {
seq = s;
startTime = System.currentTimeMillis();
}
}

View File

@ -14,8 +14,7 @@ public class SingleNodeExecutor implements ContractExecutor {
} }
@Override @Override
public void execute(String requestID, ContractRequest req, ResultCallback rcb, public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) {
OnHashCallback hcb) {
cm.executeLocallyAsync(req, rcb, hcb); cm.executeLocallyAsync(req, rcb, hcb);
} }
} }

View File

@ -2,11 +2,9 @@ package org.bdware.sc.test;
import org.bdware.sc.ContractManager; import org.bdware.sc.ContractManager;
public class ContractManagerTest { public class ContractManagerTest {
public static void main(String[] args) { public static void main(String[] args) {
ContractManager.yjsPath = "./generatedlib/yjs.jar"; ContractManager.yjsPath = "./generatedlib/yjs.jar";
ContractManager.instance = new ContractManager(); ContractManager.instance = new ContractManager();
} }
} }

View File

@ -1,20 +0,0 @@
package org.bdware.sc.test;
import com.google.gson.Gson;
import java.lang.reflect.Method;
public class ReflectionTest {
public static class Test {
public static void go(String... args) {
System.out.println(new Gson().toJson(args));
}
}
@org.junit.Test
public void go() throws Exception {
Method m = Test.class.getDeclaredMethod("go", String[].class);
String[] abc = new String[] {"ab", "cd"};
m.invoke(null, new Object[] {abc});
}
}

View File

@ -12,15 +12,17 @@ public class RespCacheTest {
new Thread(() -> { new Thread(() -> {
try { try {
Thread.sleep(j * 1000); Thread.sleep(j * 1000);
if (j > 2) if (j > 2) Thread.sleep(6 * 1000);
Thread.sleep(6 * 1000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
boolean waitResult = cache.waitForHalf(); boolean waitResult = cache.waitForHalf();
System.out.println("tid:" + Thread.currentThread().getId() System.out.println(
+ " reach target, waitResult:" + waitResult); "tid:"
+ Thread.currentThread().getId()
+ " reach target, waitResult:"
+ waitResult);
}).start(); }).start();
} }
Thread.sleep(20000); Thread.sleep(20000);
@ -35,15 +37,17 @@ public class RespCacheTest {
new Thread(() -> { new Thread(() -> {
try { try {
Thread.sleep(j * 1000); Thread.sleep(j * 1000);
if (j > 2) if (j > 2) Thread.sleep(1900);
Thread.sleep(1900);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
boolean waitResult = cache.waitForHalf(); boolean waitResult = cache.waitForHalf();
System.out.println("tid:" + Thread.currentThread().getId() System.out.println(
+ " reach target, waitResult:" + waitResult); "tid:"
+ Thread.currentThread().getId()
+ " reach target, waitResult:"
+ waitResult);
}).start(); }).start();
} }
Thread.sleep(20000); Thread.sleep(20000);
@ -62,8 +66,11 @@ public class RespCacheTest {
e.printStackTrace(); e.printStackTrace();
} }
boolean waitResult = cache.waitForHalf(); boolean waitResult = cache.waitForHalf();
System.out.println("tid:" + Thread.currentThread().getId() System.out.println(
+ " reach target, waitResult:" + waitResult); "tid:"
+ Thread.currentThread().getId()
+ " reach target, waitResult:"
+ waitResult);
}).start(); }).start();
} }
Thread.sleep(10000); Thread.sleep(10000);