mirror of
https://gitee.com/BDWare/cm
synced 2025-04-28 15:02:16 +00:00
Compare commits
36 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
c96ab57b17 | ||
|
8efc164d99 | ||
|
cdf94fc50d | ||
|
8e07feb733 | ||
|
2c4a3bda9a | ||
|
f53c2374df | ||
|
fc453d56fc | ||
|
6cd99cf17a | ||
|
630bb2eb7e | ||
|
9cf42f1381 | ||
|
487697aa87 | ||
|
bc5dd8a106 | ||
|
8ce956a5c7 | ||
|
ca9c51e1bc | ||
|
22e0127bd0 | ||
|
ad7ba56dcf | ||
|
ea6781adb2 | ||
|
3dec7e9f47 | ||
|
2b595271c9 | ||
|
2b2f57b7cf | ||
|
2f874832b8 | ||
|
b251e7e603 | ||
|
540fbd65f8 | ||
|
3db9180ceb | ||
|
90a9d8af37 | ||
|
9f79db3a63 | ||
|
c84cd3fac0 | ||
|
71fa74ac14 | ||
|
32cc66dad1 | ||
|
a2f8ab528b | ||
|
a87549a4c9 | ||
|
bb368c8bee | ||
|
1ec1cac894 | ||
|
4315992b73 | ||
|
893be2e954 | ||
|
b1eb54c26b |
13
build.gradle
13
build.gradle
@ -2,7 +2,10 @@ plugins {
|
|||||||
id 'java'
|
id 'java'
|
||||||
id 'java-library'
|
id 'java-library'
|
||||||
}
|
}
|
||||||
|
apply from: '../spotless.gradle'
|
||||||
|
repositories {
|
||||||
|
mavenCentral()
|
||||||
|
}
|
||||||
sourceSets {
|
sourceSets {
|
||||||
main {
|
main {
|
||||||
java {
|
java {
|
||||||
@ -24,14 +27,10 @@ sourceSets {
|
|||||||
|
|
||||||
sourceCompatibility = 1.8
|
sourceCompatibility = 1.8
|
||||||
|
|
||||||
repositories {
|
|
||||||
mavenCentral()
|
|
||||||
}
|
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
api project(":common")
|
api project(":common")
|
||||||
api 'io.prometheus:simpleclient:0.12.0'
|
api 'io.prometheus:simpleclient:0.12.0'
|
||||||
api 'org.knowhowlab.osgi:sigar:1.6.5_01'
|
api 'org.hyperic.sigar:sigar:1.6.4'
|
||||||
api fileTree(dir: 'libs', include: '*.jar')
|
api 'com.github.ben-manes.caffeine:caffeine:2.8.8'
|
||||||
testImplementation 'junit:junit:4.13.2'
|
testImplementation 'junit:junit:4.13.2'
|
||||||
}
|
}
|
||||||
|
Binary file not shown.
@ -1,32 +1,22 @@
|
|||||||
package org.bdware.sc;
|
package org.bdware.sc;
|
||||||
|
|
||||||
|
import com.google.gson.JsonElement;
|
||||||
import org.bdware.sc.bean.ContractRequest;
|
import org.bdware.sc.bean.ContractRequest;
|
||||||
import org.bdware.sc.conn.OnHashCallback;
|
import org.bdware.sc.conn.OnHashCallback;
|
||||||
|
|
||||||
public interface ChainOpener {
|
public interface ChainOpener {
|
||||||
void reRegister(String doid);
|
void reRegister(String doid);
|
||||||
|
|
||||||
String register(String arg);
|
String register(String arg);
|
||||||
|
|
||||||
void writeContractResultToLocalAndLedger(
|
void writeContractResultToLocalAndLedger(String result, ContractClient client,
|
||||||
String result,
|
ContractRequest contractRequest, OnHashCallback cb, long start, long l);
|
||||||
ContractClient client,
|
|
||||||
ContractRequest contractRequest,
|
|
||||||
OnHashCallback cb, long start, long l);
|
|
||||||
|
|
||||||
void writeToChain(
|
void writeToChain(OnHashCallback cb, String from, String to, String data, String requestID,
|
||||||
OnHashCallback cb,
|
|
||||||
String from,
|
|
||||||
String to,
|
|
||||||
String data,
|
|
||||||
String requestID,
|
|
||||||
String namedLedger);
|
String namedLedger);
|
||||||
|
|
||||||
void writeToChainWithContract(
|
void writeToChainWithContract(OnHashCallback cb, String from, String to, String data,
|
||||||
OnHashCallback cb,
|
String requestID, String contractID, String namedLedger);
|
||||||
String from,
|
|
||||||
String to,
|
JsonElement getLedgerParams();
|
||||||
String data,
|
|
||||||
String requestID,
|
|
||||||
String contractID,
|
|
||||||
String namedLedger);
|
|
||||||
}
|
}
|
||||||
|
@ -18,11 +18,13 @@ import org.bdware.sc.encrypt.HardwareInfo.OSType;
|
|||||||
import org.bdware.sc.event.REvent.REventSemantics;
|
import org.bdware.sc.event.REvent.REventSemantics;
|
||||||
import org.bdware.sc.node.AnnotationNode;
|
import org.bdware.sc.node.AnnotationNode;
|
||||||
import org.bdware.sc.node.YjsType;
|
import org.bdware.sc.node.YjsType;
|
||||||
|
import org.bdware.sc.units.MultiContractMeta;
|
||||||
import org.bdware.sc.util.JsonUtil;
|
import org.bdware.sc.util.JsonUtil;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
|
import java.lang.reflect.Constructor;
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
||||||
@ -30,7 +32,6 @@ public class ContractClient {
|
|||||||
private static final Logger LOGGER = LogManager.getLogger(ContractClient.class);
|
private static final Logger LOGGER = LogManager.getLogger(ContractClient.class);
|
||||||
public static String cmi = "";
|
public static String cmi = "";
|
||||||
public ContractMeta contractMeta;
|
public ContractMeta contractMeta;
|
||||||
public boolean isDebug;
|
|
||||||
transient SocketGet get;
|
transient SocketGet get;
|
||||||
int port;
|
int port;
|
||||||
String pid;
|
String pid;
|
||||||
@ -67,10 +68,14 @@ public class ContractClient {
|
|||||||
isRunning = false;
|
isRunning = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ContractClient(int startPort) {
|
||||||
|
this("127.0.0.1", startPort);
|
||||||
|
}
|
||||||
|
|
||||||
// Once the reconnect is called, all cp in the same node will reconnect to
|
// Once the reconnect is called, all cp in the same node will reconnect to
|
||||||
// current ContractManager
|
// current ContractManager
|
||||||
// We only consider the.
|
// We only consider the.
|
||||||
public ContractClient(int startPort) {
|
public ContractClient(String host, int startPort) {
|
||||||
// LOGGER.info("ContractClient 构造器 : ");
|
// LOGGER.info("ContractClient 构造器 : ");
|
||||||
contractMeta = new ContractMeta();
|
contractMeta = new ContractMeta();
|
||||||
outputTracer = new ContractPrinter();
|
outputTracer = new ContractPrinter();
|
||||||
@ -82,7 +87,7 @@ public class ContractClient {
|
|||||||
port = startPort;
|
port = startPort;
|
||||||
// LOGGER.info("ContractClient----构造器----- 端口 " + startPort);
|
// LOGGER.info("ContractClient----构造器----- 端口 " + startPort);
|
||||||
|
|
||||||
get = new SocketGet("127.0.0.1", startPort);
|
get = new SocketGet(host, startPort);
|
||||||
// LOGGER.info("ContractClient----构造器----- position---2");
|
// LOGGER.info("ContractClient----构造器----- position---2");
|
||||||
|
|
||||||
String cpCMI = get.syncGet("", "isContractProcess", ""); // CMI
|
String cpCMI = get.syncGet("", "isContractProcess", ""); // CMI
|
||||||
@ -128,26 +133,21 @@ public class ContractClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void initProps() {
|
private void initProps() {
|
||||||
// LOGGER.info("initProps ---- position-----1");
|
LOGGER.info("initProps ---- position-----1");
|
||||||
contractMeta.name = get.syncGet("", "getContractName", "");
|
contractMeta.name = get.syncGet("", "getContractName", "");
|
||||||
// LOGGER.info("initProps ---- position-----2");
|
LOGGER.info("initProps ---- position-----2");
|
||||||
String strEvent = get.syncGet("", "getDeclaredEvents", "");
|
String strEvent = get.syncGet("", "getDeclaredEvents", "");
|
||||||
LOGGER.debug("event: " + strEvent);
|
LOGGER.debug("event: " + strEvent);
|
||||||
contractMeta.declaredEvents =
|
contractMeta.declaredEvents = JsonUtil.fromJson(strEvent,
|
||||||
JsonUtil.fromJson(
|
new TypeToken<Map<String, REventSemantics>>() {}.getType());
|
||||||
strEvent, new TypeToken<Map<String, REventSemantics>>() {
|
LOGGER.info("initProps ---- position-----3");
|
||||||
}.getType());
|
contractMeta.dependentContracts =
|
||||||
// LOGGER.info("initProps ---- position-----3");
|
JsonUtil.fromJson(get.syncGet("", "getDependentContracts", ""),
|
||||||
contractMeta.dependentContracts = JsonUtil.fromJson(
|
new TypeToken<Set<String>>() {}.getType());
|
||||||
get.syncGet("", "getDependentContracts", ""),
|
LOGGER.info("initProps ---- position-----4");
|
||||||
new TypeToken<Set<String>>() {
|
|
||||||
}.getType());
|
|
||||||
// LOGGER.info("initProps ---- position-----4");
|
|
||||||
contractMeta.exportedFunctions =
|
contractMeta.exportedFunctions =
|
||||||
JsonUtil.fromJson(
|
JsonUtil.fromJson(get.syncGet("", "getExportedFunctions", ""),
|
||||||
get.syncGet("", "getExportedFunctions", ""),
|
new TypeToken<List<FunctionDesp>>() {}.getType());
|
||||||
new TypeToken<List<FunctionDesp>>() {
|
|
||||||
}.getType());
|
|
||||||
contractMeta.logDetail = new HashMap<>();
|
contractMeta.logDetail = new HashMap<>();
|
||||||
for (FunctionDesp func : contractMeta.exportedFunctions) {
|
for (FunctionDesp func : contractMeta.exportedFunctions) {
|
||||||
StringBuilder str = new StringBuilder();
|
StringBuilder str = new StringBuilder();
|
||||||
@ -159,8 +159,7 @@ public class ContractClient {
|
|||||||
}
|
}
|
||||||
if (anno.getType().equals("LogLocation")) {
|
if (anno.getType().equals("LogLocation")) {
|
||||||
for (String logLoc : anno.getArgs()) {
|
for (String logLoc : anno.getArgs()) {
|
||||||
if (logLoc.equals("\"dataware\"")
|
if (logLoc.equals("\"dataware\"") || logLoc.equals("\"bdledger\"")
|
||||||
|| logLoc.equals("\"bdledger\"")
|
|
||||||
|| logLoc.equals("\"bdledger:\"")) {
|
|| logLoc.equals("\"bdledger:\"")) {
|
||||||
str.append("bdcontract;");
|
str.append("bdcontract;");
|
||||||
} else if (logLoc.startsWith("\"bdledger:") && logLoc.length() > 11) {
|
} else if (logLoc.startsWith("\"bdledger:") && logLoc.length() > 11) {
|
||||||
@ -173,54 +172,47 @@ public class ContractClient {
|
|||||||
}
|
}
|
||||||
contractMeta.logDetail.put(func.functionName, str.toString());
|
contractMeta.logDetail.put(func.functionName, str.toString());
|
||||||
}
|
}
|
||||||
// LOGGER.info("initProps ---- position-----5");
|
LOGGER.info("initProps ---- position-----5");
|
||||||
try {
|
try {
|
||||||
|
|
||||||
String anno = get.syncGet("", "getAnnotations", "");
|
String anno = get.syncGet("", "getAnnotations", "");
|
||||||
contractMeta.annotations =
|
contractMeta.annotations =
|
||||||
JsonUtil.fromJson(anno, new TypeToken<List<AnnotationNode>>() {
|
JsonUtil.fromJson(anno, new TypeToken<List<AnnotationNode>>() {}.getType());
|
||||||
}.getType());
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// supoort contract process before version 0.70
|
// supoort contract process before version 0.70
|
||||||
contractMeta.annotations = new ArrayList<>();
|
contractMeta.annotations = new ArrayList<>();
|
||||||
}
|
}
|
||||||
// LOGGER.info("initProps ---- position-----6");
|
LOGGER.info("initProps ---- position-----6");
|
||||||
contractMeta.sigRequired = Boolean.parseBoolean(get.syncGet("", "isSigRequired", ""));
|
contractMeta.sigRequired = Boolean.parseBoolean(get.syncGet("", "isSigRequired", ""));
|
||||||
// LOGGER.info("initProps ---- position-----7");
|
LOGGER.info("initProps ---- position-----7");
|
||||||
contractMeta.thisPermission = get.syncGet("", "showPermission", "");
|
contractMeta.thisPermission = get.syncGet("", "showPermission", "");
|
||||||
// LOGGER.info("initProps ---- position-----8");
|
LOGGER.info("initProps ---- position-----8");
|
||||||
isRunning = true;
|
isRunning = true;
|
||||||
contractMeta.isDebug = Boolean.parseBoolean(get.syncGet("", "getDebug", ""));
|
LOGGER.info("initProps ---- position-----9");
|
||||||
// LOGGER.info("initProps ---- position-----9");
|
|
||||||
get.syncGet("", "registerMangerPort", ContractManager.cPort.getCMPort() + "");
|
get.syncGet("", "registerMangerPort", ContractManager.cPort.getCMPort() + "");
|
||||||
contractMeta.contract =
|
contractMeta.contract =
|
||||||
JsonUtil.fromJson(get.syncGet("", "getContract", ""), Contract.class);
|
JsonUtil.fromJson(get.syncGet("", "getContract", ""), Contract.class);
|
||||||
contractMeta.id = contractMeta.contract.getID();
|
contractMeta.id = contractMeta.contract.getID();
|
||||||
get.setOfflineExceptionHandler(
|
get.setOfflineExceptionHandler(new SocketGet.OfflineHandler() {
|
||||||
new SocketGet.OfflineHandler() {
|
|
||||||
@Override
|
@Override
|
||||||
public void onException(SocketGet socketGet, Exception e) {
|
public void onException(SocketGet socketGet, Exception e) {
|
||||||
if (e.getMessage().contains("Connection refused")) {
|
if (e.getMessage().contains("Connection refused")) {
|
||||||
contractMeta.status = ContractStatusEnum.HANGED;
|
contractMeta.status = ContractStatusEnum.HANGED;
|
||||||
ContractManager.instance.statusRecorder.resumeContractProcess(
|
ContractManager.instance.statusRecorder.resumeContractProcess(contractMeta.id);
|
||||||
contractMeta.id);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
loadTimesAndTraffic();
|
loadTimesAndTraffic();
|
||||||
// LOGGER.info("initProps ---- position-----10");
|
LOGGER.info("initProps ---- position-----10 DONE!");
|
||||||
// LOGGER.debug("======= registerPort:" + ret + "-->" + ContractManager.startPort);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String startProcess(PrintStream ps) {
|
public String startProcess(PrintStream ps) throws Exception {
|
||||||
isRunning = false;
|
isRunning = false;
|
||||||
port = -1;
|
port = -1;
|
||||||
|
|
||||||
String darg = "-Djava.library.path=";
|
String darg = "-Djava.library.path=";
|
||||||
String classpath;
|
String classpath;
|
||||||
File jniPath;
|
File jniPath;
|
||||||
|
|
||||||
if (ContractManager.yjsPath == null) {
|
if (ContractManager.yjsPath == null) {
|
||||||
jniPath = new File("./jni/");
|
jniPath = new File("./jni/");
|
||||||
classpath = System.getProperty("java.class.path");
|
classpath = System.getProperty("java.class.path");
|
||||||
@ -228,60 +220,64 @@ public class ContractClient {
|
|||||||
classpath = ContractManager.yjsPath;
|
classpath = ContractManager.yjsPath;
|
||||||
jniPath = new File(classpath).getParentFile();
|
jniPath = new File(classpath).getParentFile();
|
||||||
}
|
}
|
||||||
|
|
||||||
String osJni = ((HardwareInfo.type == OSType.linux) ? "/jni/linux" : "/jni/mac");
|
String osJni = ((HardwareInfo.type == OSType.linux) ? "/jni/linux" : "/jni/mac");
|
||||||
darg += jniPath.getAbsolutePath() + osJni;
|
darg += jniPath.getAbsolutePath() + osJni;
|
||||||
if (!new File(classpath).exists()) {
|
if (!new File(classpath).exists()) {
|
||||||
ContractResult r =
|
ContractResult r = new ContractResult(Status.Exception,
|
||||||
new ContractResult(
|
new JsonPrimitive("incorrect path: yjs.jar"));
|
||||||
Status.Exception, new JsonPrimitive("incorrect path: yjs.jar"));
|
|
||||||
return JsonUtil.toJson(r);
|
return JsonUtil.toJson(r);
|
||||||
}
|
}
|
||||||
if (!new File(jniPath, "libs").exists()) {
|
if (!new File(jniPath, "libs").exists()) {
|
||||||
ContractResult r =
|
ContractResult r = new ContractResult(Status.Exception,
|
||||||
new ContractResult(
|
|
||||||
Status.Exception,
|
|
||||||
new JsonPrimitive("incorrect path: yjs.jar, missing libs"));
|
new JsonPrimitive("incorrect path: yjs.jar, missing libs"));
|
||||||
return JsonUtil.toJson(r);
|
return JsonUtil.toJson(r);
|
||||||
}
|
}
|
||||||
// ProcessBuilder builder =
|
|
||||||
// new ProcessBuilder(
|
|
||||||
// "java",
|
|
||||||
// "-Dfile.encoding=UTF-8",
|
|
||||||
// darg,
|
|
||||||
// "-cp",
|
|
||||||
// jniPath.getAbsolutePath() + "/libs/*:" + classpath,
|
|
||||||
// "org.bdware.sc.ContractProcess",
|
|
||||||
// "-port=" + cPort.getPort());
|
|
||||||
int startPort = ContractManager.cPort.getPortAndInc();
|
int startPort = ContractManager.cPort.getPortAndInc();
|
||||||
ProcessBuilder builder =
|
List<String> pbParameters = new ArrayList<>();
|
||||||
new ProcessBuilder(
|
pbParameters.add("java");
|
||||||
"java",
|
pbParameters.add("-Dfile.encoding=UTF-8");
|
||||||
"-Dfile.encoding=UTF-8",
|
pbParameters.add(darg);
|
||||||
darg,
|
if (contractMeta.contract.getRemoteDebugPort() != 0) {
|
||||||
"-jar",
|
pbParameters.add(String.format(
|
||||||
classpath,
|
"-agentlib:jdwp=transport=dt_socket,address=%d,server=y,suspend=n",
|
||||||
"-port=" + startPort,
|
contractMeta.contract.getRemoteDebugPort()));
|
||||||
"-cmi=" + cmi, // cmi 区分不同CM的cp
|
}
|
||||||
(isDebug ? "-debug" : ""));
|
File classParent = new File(classpath).getParentFile();
|
||||||
File directory = new File("");
|
if (contractMeta.contract.isDebug()) {
|
||||||
|
pbParameters.add("-Dlog4j.configurationFile="
|
||||||
|
+ new File(classParent, "log4j2.debug.properties").getAbsolutePath());
|
||||||
|
} else {
|
||||||
|
pbParameters.add("-Dlog4j.configurationFile="
|
||||||
|
+ new File(classParent, "log4j2.properties").getAbsolutePath());
|
||||||
|
}
|
||||||
|
pbParameters.add("-jar");
|
||||||
|
pbParameters.add(classpath);
|
||||||
|
pbParameters.add("-port=" + startPort);
|
||||||
|
pbParameters.add("-cmi=" + cmi);
|
||||||
|
if (contractMeta.contract.isDebug())
|
||||||
|
pbParameters.add("-debug");
|
||||||
|
ProcessBuilder builder;
|
||||||
|
String[] result = new String[pbParameters.size()];
|
||||||
|
pbParameters.toArray(result);
|
||||||
|
Constructor<ProcessBuilder> pbc =
|
||||||
|
ProcessBuilder.class.getDeclaredConstructor(String[].class);
|
||||||
|
builder = pbc.newInstance(new Object[] {result});
|
||||||
|
|
||||||
|
File directory = new File("./");
|
||||||
LOGGER.debug("[CMD] path: " + directory.getAbsolutePath());
|
LOGGER.debug("[CMD] path: " + directory.getAbsolutePath());
|
||||||
LOGGER.debug(JsonUtil.toPrettyJson(builder.command()));
|
LOGGER.debug(JsonUtil.toPrettyJson(builder.command()));
|
||||||
|
|
||||||
Map<String, String> map = builder.environment();
|
Map<String, String> map = builder.environment();
|
||||||
map.put("java.library.path", jniPath.getAbsolutePath() + osJni);
|
map.put("java.library.path", jniPath.getAbsolutePath() + osJni);
|
||||||
builder.directory(new File("./"));
|
builder.directory(directory);
|
||||||
LOGGER.debug("start process:");
|
LOGGER.debug("start process:");
|
||||||
|
|
||||||
try {
|
|
||||||
process = builder.start();
|
process = builder.start();
|
||||||
|
|
||||||
this.pid = getPid(process);
|
this.pid = getPid(process);
|
||||||
LOGGER.info("[CP PPID ] " + pid);
|
LOGGER.info("[CP PPID] " + pid);
|
||||||
PrintStream printStream = new PrintStream(process.getOutputStream());
|
PrintStream printStream = new PrintStream(process.getOutputStream());
|
||||||
printStream.println("CP PID:" + pid);
|
printStream.println("CP PID:" + pid);
|
||||||
printStream.close();
|
printStream.close();
|
||||||
Scanner sc = new Scanner(process.getInputStream());
|
InputStream processInputStream = process.getInputStream();
|
||||||
|
Scanner sc = new Scanner(processInputStream);
|
||||||
String status = null;
|
String status = null;
|
||||||
while (sc.hasNext()) {
|
while (sc.hasNext()) {
|
||||||
status = sc.nextLine();
|
status = sc.nextLine();
|
||||||
@ -290,10 +286,8 @@ public class ContractClient {
|
|||||||
try {
|
try {
|
||||||
// Set contractPort to max(mainPort, contractPort)
|
// Set contractPort to max(mainPort, contractPort)
|
||||||
int portIndex = status.indexOf("mainPort");
|
int portIndex = status.indexOf("mainPort");
|
||||||
int port =
|
int port = Integer.parseInt(
|
||||||
Integer.parseInt(
|
status.substring(portIndex + 9, portIndex + 14).replaceAll("\\s+", ""));
|
||||||
status.substring(portIndex + 9, portIndex + 14)
|
|
||||||
.replaceAll("\\s+", ""));
|
|
||||||
if (port != startPort) {
|
if (port != startPort) {
|
||||||
ContractManager.cPort.reSetPort(port + 1);
|
ContractManager.cPort.reSetPort(port + 1);
|
||||||
}
|
}
|
||||||
@ -311,39 +305,52 @@ public class ContractClient {
|
|||||||
ContractManager.cPort.updateDb(port, true);
|
ContractManager.cPort.updateDb(port, true);
|
||||||
get = new SocketGet("127.0.0.1", port);
|
get = new SocketGet("127.0.0.1", port);
|
||||||
get.syncGet("", "setDBInfo", ContractManager.dbPath);
|
get.syncGet("", "setDBInfo", ContractManager.dbPath);
|
||||||
String tagA = (ps == System.out ? "[Contract_" + port + "_out] " : "");
|
if (contractMeta.contract.isDebug() || get != null) {
|
||||||
String tagB = (ps == System.out ? "[Contract_" + port + "_err] " : "");
|
String tagA = (ps instanceof LoggerPrintStream ? "[Contract_" + port + "_out] " : "");
|
||||||
|
String tagB = (ps instanceof LoggerPrintStream ? "[Contract_" + port + "_err] " : "");
|
||||||
outputTracer.track(process, sc, tagA, ps);
|
outputTracer.track(process, sc, tagA, ps);
|
||||||
errorTracer.track(process, new Scanner(process.getErrorStream()), tagB, ps);
|
errorTracer.track(process, new Scanner(process.getErrorStream()), tagB, ps);
|
||||||
|
} else {
|
||||||
|
// 关闭流,否则缓冲区打满会阻塞进程
|
||||||
|
processInputStream.close();
|
||||||
|
sc.close();
|
||||||
|
}
|
||||||
get.syncGet("", "registerMangerPort", String.valueOf(ContractManager.cPort.getCMPort()));
|
get.syncGet("", "registerMangerPort", String.valueOf(ContractManager.cPort.getCMPort()));
|
||||||
|
|
||||||
|
MultiContractMeta multiContractMeta = ContractManager.instance.multiContractRecorder
|
||||||
|
.getMultiContractMeta(contractMeta.getID());
|
||||||
|
if (multiContractMeta != null && multiContractMeta.getMembers() != null) {
|
||||||
|
String setMemberResult =
|
||||||
|
get.syncGet("", "setMembers", JsonUtil.toJson(multiContractMeta.getMembers()));
|
||||||
|
LOGGER.info("setMember:" + setMemberResult);
|
||||||
|
} else {
|
||||||
|
LOGGER.info("setMember ignore, meta:" + (multiContractMeta == null)
|
||||||
|
+ (multiContractMeta == null ? "NULL"
|
||||||
|
: " members:" + multiContractMeta.getMembers()));
|
||||||
|
}
|
||||||
|
|
||||||
if (isBundlePath(contractMeta.contract.getScriptStr())) {
|
if (isBundlePath(contractMeta.contract.getScriptStr())) {
|
||||||
status =
|
status = get.syncGet("", "setContractBundle", JsonUtil.toJson(contractMeta.contract));
|
||||||
get.syncGet(
|
|
||||||
"", "setContractBundle", JsonUtil.toJson(contractMeta.contract));
|
|
||||||
} else {
|
} else {
|
||||||
status = get.syncGet("", "setContract", JsonUtil.toJson(contractMeta.contract));
|
status = get.syncGet("", "setContract", JsonUtil.toJson(contractMeta.contract));
|
||||||
}
|
}
|
||||||
LOGGER.debug("port:" + port + " status:" + status);
|
LOGGER.info("start status, port:" + port + " status:" + status);
|
||||||
ContractResult r = JsonUtil.fromJson(status, ContractResult.class);
|
ContractResult r = JsonUtil.fromJson(status, ContractResult.class);
|
||||||
if (r.status == Status.Success) {
|
if (r.status == Status.Success) {
|
||||||
|
LOGGER.info("init prop start!");
|
||||||
initProps();
|
initProps();
|
||||||
|
LOGGER.info("init prop done!");
|
||||||
get.syncGet("", "setPID", pid);
|
get.syncGet("", "setPID", pid);
|
||||||
} else if (r.status == null) {
|
} else {
|
||||||
|
LOGGER.info("kill contract");
|
||||||
|
if (r.status == null) {
|
||||||
r.status = Status.Error;
|
r.status = Status.Error;
|
||||||
r.result = new JsonPrimitive(status);
|
r.result = new JsonPrimitive(status);
|
||||||
status = JsonUtil.toJson(r);
|
}
|
||||||
contractMeta.name = get.syncGet("", "getContractName", "");
|
killProcess();
|
||||||
|
throw new IllegalStateException(r.result.getAsString());
|
||||||
}
|
}
|
||||||
return status;
|
return status;
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
ByteArrayOutputStream bo = new ByteArrayOutputStream();
|
|
||||||
e.printStackTrace(new PrintStream(bo));
|
|
||||||
ContractResult r =
|
|
||||||
new ContractResult(Status.Exception, new JsonPrimitive(bo.toString()));
|
|
||||||
return JsonUtil.toJson(r);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updateMemory() {
|
public void updateMemory() {
|
||||||
@ -352,11 +359,7 @@ public class ContractClient {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
lastUpdate = System.currentTimeMillis();
|
lastUpdate = System.currentTimeMillis();
|
||||||
get.asyncGet(
|
get.asyncGet("", ".UsedMemory", "", new ResultCallback() {
|
||||||
"",
|
|
||||||
".UsedMemory",
|
|
||||||
"",
|
|
||||||
new ResultCallback() {
|
|
||||||
@Override
|
@Override
|
||||||
public void onResult(String str) {
|
public void onResult(String str) {
|
||||||
memory = Long.parseLong(str);
|
memory = Long.parseLong(str);
|
||||||
@ -375,17 +378,8 @@ public class ContractClient {
|
|||||||
return contractMeta.getExportedFunctions();
|
return contractMeta.getExportedFunctions();
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getIdentifier() {
|
public String executeMethod(String pkgName, String method, String arg) {
|
||||||
return get.syncGet("", "getIdentifier", " ");
|
return get.syncGet(pkgName, method, arg);
|
||||||
}
|
|
||||||
|
|
||||||
public void setIdentifier(String alias) {
|
|
||||||
get.asyncGet("", "setIdentifier", alias, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public String signResult(String result) {
|
|
||||||
return contractMeta.contract.signResult(result);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getPubkey() {
|
public String getPubkey() {
|
||||||
@ -400,10 +394,6 @@ public class ContractClient {
|
|||||||
return contractMeta.name;
|
return contractMeta.name;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getContractDOI() {
|
|
||||||
return contractMeta.contract.getDOI();
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getContractKey() {
|
public String getContractKey() {
|
||||||
return contractMeta.contract.getKey();
|
return contractMeta.contract.getKey();
|
||||||
}
|
}
|
||||||
@ -428,10 +418,6 @@ public class ContractClient {
|
|||||||
return contractMeta.logDetail.get(action);
|
return contractMeta.logDetail.get(action);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isDebug() {
|
|
||||||
return contractMeta.isDebug;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getTimes() {
|
public long getTimes() {
|
||||||
return times;
|
return times;
|
||||||
}
|
}
|
||||||
@ -457,12 +443,10 @@ public class ContractClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void loadTimesAndTraffic() {
|
public void loadTimesAndTraffic() {
|
||||||
String tempTime2 =
|
String tempTime2 = KeyValueDBUtil.instance.getValue(CMTables.ContractInfo.toString(),
|
||||||
KeyValueDBUtil.instance.getValue(
|
contractMeta.name + "-Times");
|
||||||
CMTables.ContractInfo.toString(), contractMeta.name + "-Times");
|
String tempTraffic2 = KeyValueDBUtil.instance.getValue(CMTables.ContractInfo.toString(),
|
||||||
String tempTraffic2 =
|
contractMeta.name + "-Traffic");
|
||||||
KeyValueDBUtil.instance.getValue(
|
|
||||||
CMTables.ContractInfo.toString(), contractMeta.name + "-Traffic");
|
|
||||||
if (tempTime2 != null && !tempTime2.equals("")) {
|
if (tempTime2 != null && !tempTime2.equals("")) {
|
||||||
times = Long.parseLong(tempTime2);
|
times = Long.parseLong(tempTime2);
|
||||||
}
|
}
|
||||||
@ -472,15 +456,15 @@ public class ContractClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void saveTimesAndTraffic() {
|
public void saveTimesAndTraffic() {
|
||||||
KeyValueDBUtil.instance.setValue(
|
KeyValueDBUtil.instance.setValue(CMTables.ContractInfo.toString(),
|
||||||
CMTables.ContractInfo.toString(), contractMeta.name + "-Times", times + "");
|
contractMeta.name + "-Times", times + "");
|
||||||
KeyValueDBUtil.instance.setValue(
|
KeyValueDBUtil.instance.setValue(CMTables.ContractInfo.toString(),
|
||||||
CMTables.ContractInfo.toString(), contractMeta.name + "-Traffic", traffic + "");
|
contractMeta.name + "-Traffic", traffic + "");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setMask(JsonObject args) {
|
public void setMask(JsonObject args) {
|
||||||
|
|
||||||
//get.asyncGet("",,,,);
|
// get.asyncGet("",,,,);
|
||||||
get.asyncGet("", "setMask", args.toString(), null);
|
get.asyncGet("", "setMask", args.toString(), null);
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -488,7 +472,7 @@ public class ContractClient {
|
|||||||
public void setProjectConfig(String args) {
|
public void setProjectConfig(String args) {
|
||||||
get.asyncGet("", "setProjectConfig", args, null);
|
get.asyncGet("", "setProjectConfig", args, null);
|
||||||
}
|
}
|
||||||
//public String
|
// public String
|
||||||
|
|
||||||
static class ReqScript {
|
static class ReqScript {
|
||||||
String mode;
|
String mode;
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -1,5 +1,7 @@
|
|||||||
package org.bdware.sc;
|
package org.bdware.sc;
|
||||||
|
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.bdware.sc.bean.Contract;
|
import org.bdware.sc.bean.Contract;
|
||||||
import org.bdware.sc.bean.FunctionDesp;
|
import org.bdware.sc.bean.FunctionDesp;
|
||||||
import org.bdware.sc.bean.IDSerializable;
|
import org.bdware.sc.bean.IDSerializable;
|
||||||
@ -20,7 +22,6 @@ public class ContractMeta implements IDSerializable {
|
|||||||
ContractStatusEnum status;
|
ContractStatusEnum status;
|
||||||
String name;
|
String name;
|
||||||
String id;
|
String id;
|
||||||
boolean isDebug;
|
|
||||||
Map<String, REvent.REventSemantics> declaredEvents;
|
Map<String, REvent.REventSemantics> declaredEvents;
|
||||||
List<FunctionDesp> exportedFunctions;
|
List<FunctionDesp> exportedFunctions;
|
||||||
Map<String, String> logDetail;
|
Map<String, String> logDetail;
|
||||||
@ -30,14 +31,7 @@ public class ContractMeta implements IDSerializable {
|
|||||||
boolean sigRequired;
|
boolean sigRequired;
|
||||||
String thisPermission; // 合约当前权限
|
String thisPermission; // 合约当前权限
|
||||||
/*
|
/*
|
||||||
{
|
* { "name": "dx_substr", "parameter": { "columnIndex":5, "paras":["1","3"] } },
|
||||||
"name": "dx_substr",
|
|
||||||
"parameter":
|
|
||||||
{
|
|
||||||
"columnIndex":5,
|
|
||||||
"paras":["1","3"]
|
|
||||||
}
|
|
||||||
},
|
|
||||||
*/
|
*/
|
||||||
// Map<Object,Object>MaskInfo;
|
// Map<Object,Object>MaskInfo;
|
||||||
|
|
||||||
@ -96,7 +90,7 @@ public class ContractMeta implements IDSerializable {
|
|||||||
// public setMask(){}
|
// public setMask(){}
|
||||||
|
|
||||||
public boolean getIsDebug() {
|
public boolean getIsDebug() {
|
||||||
return isDebug;
|
return contract.isDebug();
|
||||||
}
|
}
|
||||||
|
|
||||||
public FunctionDesp getExportedFunction(String action) {
|
public FunctionDesp getExportedFunction(String action) {
|
||||||
@ -115,7 +109,8 @@ public class ContractMeta implements IDSerializable {
|
|||||||
|
|
||||||
private FunctionDesp seekFunction(String action) {
|
private FunctionDesp seekFunction(String action) {
|
||||||
for (FunctionDesp desp : exportedFunctions) {
|
for (FunctionDesp desp : exportedFunctions) {
|
||||||
if (desp != null && desp.functionName.equals(action)) return desp;
|
if (desp != null && desp.functionName.equals(action))
|
||||||
|
return desp;
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -124,9 +119,10 @@ public class ContractMeta implements IDSerializable {
|
|||||||
contract = c;
|
contract = c;
|
||||||
id = c.getID();
|
id = c.getID();
|
||||||
status = ContractStatusEnum.HANGED;
|
status = ContractStatusEnum.HANGED;
|
||||||
isDebug = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static Logger LOGGER = LogManager.getLogger(ContractMeta.class);
|
||||||
|
|
||||||
public void setContractExecutor(ContractExecutor executor) {
|
public void setContractExecutor(ContractExecutor executor) {
|
||||||
this.contractExecutor = executor;
|
this.contractExecutor = executor;
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,8 @@
|
|||||||
package org.bdware.sc;
|
package org.bdware.sc;
|
||||||
|
|
||||||
public enum ContractStatusEnum {
|
public enum ContractStatusEnum {
|
||||||
INIT(0, "初始化"),
|
INIT(0, "初始化"), RUNNING(1, "内存运行中"), HANGED(2, "硬盘运行中"), KILLED(3, "已停止");
|
||||||
RUNNING(1, "内存运行中"),
|
|
||||||
HANGED(2, "硬盘运行中"),
|
|
||||||
KILLED(3, "已停止");
|
|
||||||
private Integer code;
|
private Integer code;
|
||||||
private String desc;
|
private String desc;
|
||||||
|
|
||||||
|
@ -33,8 +33,7 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
|
|||||||
static {
|
static {
|
||||||
final Object flag = new Object();
|
final Object flag = new Object();
|
||||||
// 调度一个task,在delay(ms)后开始调度,每次调度完后,最少等待period(ms)后才开始调
|
// 调度一个task,在delay(ms)后开始调度,每次调度完后,最少等待period(ms)后才开始调
|
||||||
ContractManager.scheduledThreadPool.scheduleWithFixedDelay(
|
ContractManager.scheduledThreadPool.scheduleWithFixedDelay(() -> {
|
||||||
() -> {
|
|
||||||
boolean cleared = dealTimerContractProcess();
|
boolean cleared = dealTimerContractProcess();
|
||||||
if (cleared) {
|
if (cleared) {
|
||||||
synchronized (flag) {
|
synchronized (flag) {
|
||||||
@ -45,10 +44,7 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
}, 0, 1, TimeUnit.SECONDS);
|
||||||
0,
|
|
||||||
1,
|
|
||||||
TimeUnit.SECONDS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// contract id to client, changes when some contract is started/stopped/resumed
|
// contract id to client, changes when some contract is started/stopped/resumed
|
||||||
@ -102,7 +98,8 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
|
|||||||
// TODO 是不是还有别的?这只是在内存里的哦。
|
// TODO 是不是还有别的?这只是在内存里的哦。
|
||||||
try {
|
try {
|
||||||
for (ContractClient c : id2ContractClient.values()) {
|
for (ContractClient c : id2ContractClient.values()) {
|
||||||
if (Integer.parseInt(c.getPID()) == pid) return true;
|
if (Integer.parseInt(c.getPID()) == pid)
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// e.printStackTrace();
|
// e.printStackTrace();
|
||||||
@ -174,17 +171,23 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
|
|||||||
|
|
||||||
// TODO index name to contract
|
// TODO index name to contract
|
||||||
public ContractMeta getContractMeta(String idOrNameOrDOI) {
|
public ContractMeta getContractMeta(String idOrNameOrDOI) {
|
||||||
if (idOrNameOrDOI == null) return null;
|
if (idOrNameOrDOI == null)
|
||||||
|
return null;
|
||||||
ContractMeta meta = getStatus().get(idOrNameOrDOI);
|
ContractMeta meta = getStatus().get(idOrNameOrDOI);
|
||||||
if (meta != null) return meta;
|
if (meta != null)
|
||||||
|
return meta;
|
||||||
for (ContractMeta cc : getStatus().values()) {
|
for (ContractMeta cc : getStatus().values()) {
|
||||||
if (cc.status == ContractStatusEnum.RUNNING || cc.status == ContractStatusEnum.HANGED)
|
if (cc.status == ContractStatusEnum.RUNNING || cc.status == ContractStatusEnum.HANGED)
|
||||||
if (idOrNameOrDOI.equals(cc.name) || idOrNameOrDOI.equals(cc.contract.getDOI())) {
|
if (idOrNameOrDOI.equals(cc.name)) {
|
||||||
return cc;
|
return cc;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (ContractMeta cc : getStatus().values()) {
|
for (ContractMeta cc : getStatus().values()) {
|
||||||
if (idOrNameOrDOI.equals(cc.name) || idOrNameOrDOI.equals(cc.contract.getDOI())) {
|
if (cc.name == null)
|
||||||
|
continue;
|
||||||
|
if (cc.contract == null)
|
||||||
|
continue;
|
||||||
|
if (idOrNameOrDOI.equals(cc.name)) {
|
||||||
return cc;
|
return cc;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -222,9 +225,7 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
|
|||||||
LOGGER.info("need dump when stop");
|
LOGGER.info("need dump when stop");
|
||||||
String contractName = client.getContractName();
|
String contractName = client.getContractName();
|
||||||
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss");
|
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd.HH:mm:ss");
|
||||||
File f =
|
File f = new File(ContractManager.dir + "/memory/" + contractName,
|
||||||
new File(
|
|
||||||
ContractManager.dir + "/memory/" + contractName,
|
|
||||||
df.format(new Date()));
|
df.format(new Date()));
|
||||||
client.get.syncGet("", "getMemoryDump", f.getAbsolutePath());
|
client.get.syncGet("", "getMemoryDump", f.getAbsolutePath());
|
||||||
ContractManager.instance.addLocalContractLog("dumpContract", meta);
|
ContractManager.instance.addLocalContractLog("dumpContract", meta);
|
||||||
@ -311,13 +312,12 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
|
|||||||
client.saveTimesAndTraffic();
|
client.saveTimesAndTraffic();
|
||||||
|
|
||||||
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd.HH_mm_ss"); // 设置日期格式
|
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd.HH_mm_ss"); // 设置日期格式
|
||||||
File f =
|
File f = new File(ContractManager.dir + "/memory/" + contractMeta.name,
|
||||||
new File(
|
|
||||||
ContractManager.dir + "/memory/" + contractMeta.name,
|
|
||||||
df.format(new Date()));
|
df.format(new Date()));
|
||||||
File parent = f.getParentFile();
|
File parent = f.getParentFile();
|
||||||
if (!parent.exists()) {
|
if (!parent.exists()) {
|
||||||
LOGGER.trace("make directory " + parent.getAbsolutePath() + ":" + parent.mkdirs());
|
LOGGER.trace(
|
||||||
|
"make directory " + parent.getAbsolutePath() + ":" + parent.mkdirs());
|
||||||
}
|
}
|
||||||
ContractManager.instance.dumpContract(contract.getID(), f.getAbsolutePath());
|
ContractManager.instance.dumpContract(contract.getID(), f.getAbsolutePath());
|
||||||
ContractManager.instance.invokeContractSuicide(client);
|
ContractManager.instance.invokeContractSuicide(client);
|
||||||
@ -342,27 +342,19 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
|
|||||||
JsonUtil.fromJson(startContractResult, ContractResult.class);
|
JsonUtil.fromJson(startContractResult, ContractResult.class);
|
||||||
if (contractResult.status == ContractResult.Status.Error) {
|
if (contractResult.status == ContractResult.Status.Error) {
|
||||||
// 记录错误日志
|
// 记录错误日志
|
||||||
ContractManager.instance.addLocalContractLog(
|
ContractManager.instance.addLocalContractLog("resumeContract error",
|
||||||
"resumeContract error",
|
meta.contract.getID(), meta.name, meta.contract.getOwner());
|
||||||
meta.contract.getID(),
|
|
||||||
meta.name,
|
|
||||||
meta.contract.getOwner());
|
|
||||||
} else {
|
} else {
|
||||||
ContractManager.instance.addLocalContractLog(
|
ContractManager.instance.addLocalContractLog("resumeContract success",
|
||||||
"resumeContract success",
|
meta.contract.getID(), meta.name, meta.contract.getOwner());
|
||||||
meta.contract.getID(),
|
|
||||||
meta.name,
|
|
||||||
meta.contract.getOwner());
|
|
||||||
// TODO 可能会重复load相同的镜像?
|
// TODO 可能会重复load相同的镜像?
|
||||||
// 如果是killed 只根据manifest去判断是否加载memory
|
// 如果是killed 只根据manifest去判断是否加载memory
|
||||||
// 可增加一个判断,如果hanged朋manifest里是加载memory,这里就不再加载了。
|
// 可增加一个判断,如果hanged朋manifest里是加载memory,这里就不再加载了。
|
||||||
ContractClient client = id2ContractClient.get(meta.getID());
|
ContractClient client = id2ContractClient.get(meta.getID());
|
||||||
if (preStatus == ContractStatusEnum.HANGED) {
|
if (preStatus == ContractStatusEnum.HANGED) {
|
||||||
|
String memory = ContractManager.instance.findNewestMemory(meta.name);
|
||||||
ContractManager.instance.loadMemory(
|
if (memory != null)
|
||||||
client, ContractManager.instance.findNewestMemory(meta.name));
|
ContractManager.instance.loadMemory(client, memory);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
client.contractMeta.setStatus(ContractStatusEnum.RUNNING);
|
client.contractMeta.setStatus(ContractStatusEnum.RUNNING);
|
||||||
updateValue(client.contractMeta);
|
updateValue(client.contractMeta);
|
||||||
@ -389,6 +381,11 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean visit(int i) {
|
public boolean visit(int i) {
|
||||||
|
return visit("127.0.0.1", i);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean visit(String host, int i) {
|
||||||
if (i == ContractManager.cPort.getCMPort()) {
|
if (i == ContractManager.cPort.getCMPort()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -396,7 +393,7 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
ContractClient client = new ContractClient(i);
|
ContractClient client = new ContractClient(host, i);
|
||||||
// LOGGER.info("[CM重连] position-----1");
|
// LOGGER.info("[CM重连] position-----1");
|
||||||
if (client.isRunning) {
|
if (client.isRunning) {
|
||||||
LOGGER.debug("CP listened to port " + i + " is running");
|
LOGGER.debug("CP listened to port " + i + " is running");
|
||||||
@ -411,30 +408,23 @@ public class ContractStatusRecorder extends StatusRecorder<ContractMeta> {
|
|||||||
ContractManager.instance.invokeContractSuicide(client);
|
ContractManager.instance.invokeContractSuicide(client);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
LOGGER.debug(
|
LOGGER.debug(String.format(
|
||||||
String.format(
|
|
||||||
"CP listened to port %d:\n\tID=%s\n\tName=%s\n\tType=%s\n"
|
"CP listened to port %d:\n\tID=%s\n\tName=%s\n\tType=%s\n"
|
||||||
+ "\tKey=%s\n\tPubKey=%s\n\tCopies=%d\n\t%s",
|
+ "\tKey=%s\n\tPubKey=%s\n\tCopies=%d\n\t%s",
|
||||||
i,
|
i, client.getContractID(), client.contractMeta.name,
|
||||||
client.getContractID(),
|
client.getContractType(), client.getContractKey(), client.getPubkey(),
|
||||||
client.contractMeta.name,
|
client.getContractCopies(), client.contractMeta.contract.startInfo));
|
||||||
client.getContractType(),
|
|
||||||
client.getContractKey(),
|
|
||||||
client.getPubkey(),
|
|
||||||
client.getContractCopies(),
|
|
||||||
client.contractMeta.contract.startInfo));
|
|
||||||
client.get.syncGet("", "setDir", ContractManager.dir);
|
client.get.syncGet("", "setDir", ContractManager.dir);
|
||||||
// String str = client.getIdentifier();
|
// String str = client.getIdentifier();
|
||||||
client.get.syncGet("", "getDumpPeriod", "a");
|
client.get.syncGet("", "getDumpPeriod", "a");
|
||||||
client.get.syncGet("", "startAutoDump", "a");
|
client.get.syncGet("", "startAutoDump", "a");
|
||||||
createContract(client);
|
createContract(client);
|
||||||
LOGGER.info(
|
LOGGER.info(String.format("reconnect to port %d: contract %s", i,
|
||||||
String.format("reconnect to port %d: contract %s",
|
client.contractMeta.name));
|
||||||
i, client.contractMeta.name));
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
//just ignore
|
// just ignore
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
19
src/main/java/org/bdware/sc/LoggerPrintStream.java
Normal file
19
src/main/java/org/bdware/sc/LoggerPrintStream.java
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
package org.bdware.sc;
|
||||||
|
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
import java.io.PrintStream;
|
||||||
|
|
||||||
|
public class LoggerPrintStream extends PrintStream {
|
||||||
|
public LoggerPrintStream(Logger logger) {
|
||||||
|
super(System.out, true);
|
||||||
|
this.logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
Logger logger;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void println(String str) {
|
||||||
|
logger.info(str);
|
||||||
|
}
|
||||||
|
}
|
@ -1,15 +1,13 @@
|
|||||||
package org.bdware.sc;
|
package org.bdware.sc;
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
|
|
||||||
//Node
|
// Node
|
||||||
public class MasterElectTimeRecorder {
|
public class MasterElectTimeRecorder {
|
||||||
public static Long findMasterCrash; //发现master崩溃的时间
|
public static Long findMasterCrash; // 发现master崩溃的时间
|
||||||
public static Long newMasterStart; //作为新的master newMasterStart被调用时间
|
public static Long newMasterStart; // 作为新的master newMasterStart被调用时间
|
||||||
public static Long setLocalMaster; //在本地标记自己为master
|
public static Long setLocalMaster; // 在本地标记自己为master
|
||||||
public static Long slaveConnectFinish; //被slave连接完成
|
public static Long slaveConnectFinish; // 被slave连接完成
|
||||||
public static Long masterStartRecover; //开始master恢复
|
public static Long masterStartRecover; // 开始master恢复
|
||||||
public static Long masterRecoverFinish; //master恢复结束
|
public static Long masterRecoverFinish; // master恢复结束
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,9 @@ import org.bdware.sc.db.KeyValueDBUtil;
|
|||||||
import org.bdware.sc.db.StatusRecorder;
|
import org.bdware.sc.db.StatusRecorder;
|
||||||
import org.bdware.sc.units.MultiContractMeta;
|
import org.bdware.sc.units.MultiContractMeta;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
public class MultiContractRecorder extends StatusRecorder<MultiContractMeta> {
|
public class MultiContractRecorder extends StatusRecorder<MultiContractMeta> {
|
||||||
static final String dbName = CMTables.UnitContracts.toString();
|
static final String dbName = CMTables.UnitContracts.toString();
|
||||||
static final String prefix = "Multi_C_Meta_";
|
static final String prefix = "Multi_C_Meta_";
|
||||||
@ -17,26 +20,31 @@ public class MultiContractRecorder extends StatusRecorder<MultiContractMeta> {
|
|||||||
for (MultiContractMeta meta : getStatus().values()) {
|
for (MultiContractMeta meta : getStatus().values()) {
|
||||||
try {
|
try {
|
||||||
meta.initQueue();
|
meta.initQueue();
|
||||||
int lastExeSeq =
|
int lastExeSeq = Integer.parseInt(KeyValueDBUtil.instance
|
||||||
Integer.parseInt(
|
.getValue(CMTables.LastExeSeq.toString(), meta.getContractID()));
|
||||||
KeyValueDBUtil.instance.getValue(
|
|
||||||
CMTables.LastExeSeq.toString(), meta.getContractID()));
|
|
||||||
meta.setLastExeSeq(lastExeSeq);
|
meta.setLastExeSeq(lastExeSeq);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
LOGGER.error(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public MultiContractMeta getMultiContractMeta(String idOrNameOrDOI) {
|
public MultiContractMeta getMultiContractMeta(String idOrNameOrDOI) {
|
||||||
if (idOrNameOrDOI == null) return null;
|
if (idOrNameOrDOI == null)
|
||||||
|
return null;
|
||||||
ContractMeta meta = ContractManager.instance.statusRecorder.getContractMeta(idOrNameOrDOI);
|
ContractMeta meta = ContractManager.instance.statusRecorder.getContractMeta(idOrNameOrDOI);
|
||||||
if (meta == null) return null;
|
return getMultiContractMeta(meta);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MultiContractMeta getMultiContractMeta(ContractMeta meta) {
|
||||||
|
if (meta == null)
|
||||||
|
return null;
|
||||||
return getStatus().get(meta.id);
|
return getStatus().get(meta.id);
|
||||||
}
|
}
|
||||||
|
|
||||||
public MultiContractMeta createIfNotExist(String contractID) {
|
public MultiContractMeta createIfNotExist(String contractID) {
|
||||||
MultiContractMeta ret = getMultiContractMeta(contractID);
|
ContractMeta meta = ContractManager.instance.statusRecorder.createIfNotExist(contractID);
|
||||||
|
MultiContractMeta ret = getMultiContractMeta(meta);
|
||||||
if (null == ret) {
|
if (null == ret) {
|
||||||
LOGGER.info("requests don't contain contract " + contractID);
|
LOGGER.info("requests don't contain contract " + contractID);
|
||||||
ret = new MultiContractMeta(contractID);
|
ret = new MultiContractMeta(contractID);
|
||||||
@ -44,4 +52,14 @@ public class MultiContractRecorder extends StatusRecorder<MultiContractMeta> {
|
|||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void removeNotExist(ContractStatusRecorder statusRecorder) {
|
||||||
|
Set<MultiContractMeta> toRemove = new HashSet<>();
|
||||||
|
for (String key : getStatus().keySet()) {
|
||||||
|
if (statusRecorder.getContractMeta(key) == null)
|
||||||
|
toRemove.add(getStatus().get(key));
|
||||||
|
}
|
||||||
|
for (MultiContractMeta meta : toRemove)
|
||||||
|
remove(meta);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,7 @@ import org.bdware.sc.db.StatusRecorder;
|
|||||||
public class ProjectRecorder extends StatusRecorder<ProjectConfig> {
|
public class ProjectRecorder extends StatusRecorder<ProjectConfig> {
|
||||||
static final String dbName = CMTables.ProjectConfig.toString();
|
static final String dbName = CMTables.ProjectConfig.toString();
|
||||||
static final String prefix = "Project_Config_";
|
static final String prefix = "Project_Config_";
|
||||||
// private static final Logger LOGGER = LogManager.getLogger(ProjectRecorder.class);
|
// private static final Logger LOGGER = LogManager.getLogger(ProjectRecorder.class);
|
||||||
|
|
||||||
public ProjectRecorder(String dir) {
|
public ProjectRecorder(String dir) {
|
||||||
super(dir, dbName, prefix);
|
super(dir, dbName, prefix);
|
||||||
|
@ -4,41 +4,41 @@ import java.util.HashMap;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public class RecoverMechTimeRecorder {
|
public class RecoverMechTimeRecorder {
|
||||||
//slave记录
|
// slave记录
|
||||||
public static Long startCMHttpServer; //启动
|
public static Long startCMHttpServer; // 启动
|
||||||
public static Long startFinish; //启动完成,开始连接NC
|
public static Long startFinish; // 启动完成,开始连接NC
|
||||||
public static Long connectNCFinish; //连接NC完成
|
public static Long connectNCFinish; // 连接NC完成
|
||||||
|
|
||||||
public static Long startReconnectCP; //开始重连CP
|
public static Long startReconnectCP; // 开始重连CP
|
||||||
public static Long reconnectCPFinish; //重连CP完成
|
public static Long reconnectCPFinish; // 重连CP完成
|
||||||
|
|
||||||
public static Long startQueryMaster; //开始查看master是谁
|
public static Long startQueryMaster; // 开始查看master是谁
|
||||||
public static Long queryMasterFinish; //查到master是谁,开始连接master
|
public static Long queryMasterFinish; // 查到master是谁,开始连接master
|
||||||
public static Long connectMasterFinish; //连接master完成
|
public static Long connectMasterFinish; // 连接master完成
|
||||||
|
|
||||||
public static Long startRecover; //开始恢复,slave的askForRecover被调用
|
public static Long startRecover; // 开始恢复,slave的askForRecover被调用
|
||||||
|
|
||||||
public static Long startCommonRecover; //slave开始从common恢复
|
public static Long startCommonRecover; // slave开始从common恢复
|
||||||
public static Long startStableRecover; //slave开始从stable恢复
|
public static Long startStableRecover; // slave开始从stable恢复
|
||||||
|
|
||||||
public static Long stableLoadFinish; //stable恢复模式load完成
|
public static Long stableLoadFinish; // stable恢复模式load完成
|
||||||
public static Long stableRedoFinish; //stable恢复模式redo完成
|
public static Long stableRedoFinish; // stable恢复模式redo完成
|
||||||
|
|
||||||
public static Long startRedoTransFromMaster; //redo从master传来的trans
|
public static Long startRedoTransFromMaster; // redo从master传来的trans
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//master记录,key是slave的pubKey
|
// master记录,key是slave的pubKey
|
||||||
public static Map<String,Long> masterStartRecoverNode = new HashMap<String, Long>(); //master的askForRecover被调用
|
public static Map<String, Long> masterStartRecoverNode = new HashMap<String, Long>(); // master的askForRecover被调用
|
||||||
|
|
||||||
public static Map<String,Long> startJudgeRecoverMethod = new HashMap<String,Long>(); //开始判断某个节点的恢复方式
|
public static Map<String, Long> startJudgeRecoverMethod = new HashMap<String, Long>(); // 开始判断某个节点的恢复方式
|
||||||
public static Map<String,Long> judgeRecoverMethodFinish = new HashMap<String,Long>(); //判断某节点恢复方式完成
|
public static Map<String, Long> judgeRecoverMethodFinish = new HashMap<String, Long>(); // 判断某节点恢复方式完成
|
||||||
|
|
||||||
public static Map<String,Long> startRecoverFromCommon = new HashMap<String,Long>(); //slave从common恢复master的nodeRestartFromCommonMode方法开始时间
|
public static Map<String, Long> startRecoverFromCommon = new HashMap<String, Long>(); // slave从common恢复master的nodeRestartFromCommonMode方法开始时间
|
||||||
public static Map<String,Long> writeCEIStart = new HashMap<String, Long>(); //开始dumpContract等cei信息
|
public static Map<String, Long> writeCEIStart = new HashMap<String, Long>(); // 开始dumpContract等cei信息
|
||||||
public static Map<String,Long> finishWriteCEI = new HashMap<String, Long>(); //将cei写入文件完成
|
public static Map<String, Long> finishWriteCEI = new HashMap<String, Long>(); // 将cei写入文件完成
|
||||||
|
|
||||||
public static Map<String,Long> startRecoverFromStable = new HashMap<String,Long>(); //slave从common恢复master的restartFromStableMode方法开始时间
|
public static Map<String, Long> startRecoverFromStable = new HashMap<String, Long>(); // slave从common恢复master的restartFromStableMode方法开始时间
|
||||||
|
|
||||||
public static Map<String,Long> recoverFinish = new HashMap<String,Long>();
|
public static Map<String, Long> recoverFinish = new HashMap<String, Long>();
|
||||||
}
|
}
|
||||||
|
92
src/main/java/org/bdware/sc/YPKResourceManager.java
Normal file
92
src/main/java/org/bdware/sc/YPKResourceManager.java
Normal file
@ -0,0 +1,92 @@
|
|||||||
|
package org.bdware.sc;
|
||||||
|
|
||||||
|
import com.github.benmanes.caffeine.cache.Cache;
|
||||||
|
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||||
|
import org.bdware.sc.bean.Contract;
|
||||||
|
|
||||||
|
import java.io.*;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.zip.ZipEntry;
|
||||||
|
import java.util.zip.ZipInputStream;
|
||||||
|
|
||||||
|
public class YPKResourceManager {
|
||||||
|
public static String ypkCacheDir = "./ypkcache";
|
||||||
|
static ExecutorService es = Executors.newFixedThreadPool(2);
|
||||||
|
|
||||||
|
public static void unzipYPK(Contract contract) {
|
||||||
|
es.execute(() -> unzip(contract.getScriptStr(), getUnzipDir(contract)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getUnzipDir(Contract contract) {
|
||||||
|
return ypkCacheDir + "/" + contract.getID();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final long MAX_CACHE_SIZE = 2L * 1024 * 1024 * 1024; // 2GB
|
||||||
|
private static final long MAX_FILE_SIZE = 10 * 1024 * 1024; // 10MB
|
||||||
|
|
||||||
|
private static final Cache<String, byte[]> cache = Caffeine.newBuilder()
|
||||||
|
.maximumSize(MAX_CACHE_SIZE).expireAfterAccess(30, TimeUnit.MINUTES).build();
|
||||||
|
|
||||||
|
public static InputStream getCachedStream(Contract c, String path) throws IOException {
|
||||||
|
File target = new File(getUnzipDir(c), path);
|
||||||
|
String key = target.getAbsolutePath();
|
||||||
|
|
||||||
|
if (target.length() > MAX_FILE_SIZE) {
|
||||||
|
// 文件过大,直接从文件系统读取
|
||||||
|
return new FileInputStream(target);
|
||||||
|
} else {
|
||||||
|
byte[] data = cache.get(key, k -> {
|
||||||
|
try {
|
||||||
|
return Files.readAllBytes(target.toPath());
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// 从缓存中返回一个新的 ByteArrayInputStream
|
||||||
|
return new ByteArrayInputStream(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void unzip(String zipFilePath, String destDir) {
|
||||||
|
File dir = new File(destDir);
|
||||||
|
if (!dir.exists())
|
||||||
|
dir.mkdirs();
|
||||||
|
byte[] buffer = new byte[1024];
|
||||||
|
try {
|
||||||
|
ZipInputStream zis = new ZipInputStream(new FileInputStream(zipFilePath));
|
||||||
|
ZipEntry zipEntry = zis.getNextEntry();
|
||||||
|
while (zipEntry != null) {
|
||||||
|
String fileName = zipEntry.getName();
|
||||||
|
File newFile = new File(destDir + File.separator + fileName);
|
||||||
|
// 创建所有上级目录
|
||||||
|
new File(newFile.getParent()).mkdirs();
|
||||||
|
|
||||||
|
if (zipEntry.isDirectory()) {
|
||||||
|
if (!newFile.isDirectory() && !newFile.mkdirs()) {
|
||||||
|
throw new IOException("Failed to create directory " + newFile);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// 写入文件内容
|
||||||
|
FileOutputStream fos = new FileOutputStream(newFile);
|
||||||
|
int len;
|
||||||
|
while ((len = zis.read(buffer)) > 0) {
|
||||||
|
fos.write(buffer, 0, len);
|
||||||
|
}
|
||||||
|
fos.close();
|
||||||
|
}
|
||||||
|
zipEntry = zis.getNextEntry();
|
||||||
|
}
|
||||||
|
zis.closeEntry();
|
||||||
|
zis.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean existYPK(Contract contract) {
|
||||||
|
return new File(contract.getScriptStr()).exists();
|
||||||
|
}
|
||||||
|
}
|
@ -79,7 +79,8 @@ public class PBFTAlgorithm implements CommitAlgorithm {
|
|||||||
prepareMsg = new PBFTMessage();
|
prepareMsg = new PBFTMessage();
|
||||||
prepareMsg.order = allocatedID.incrementAndGet();
|
prepareMsg.order = allocatedID.incrementAndGet();
|
||||||
prepareMsg.type = PBFTType.PrePrepare;
|
prepareMsg.type = PBFTType.PrePrepare;
|
||||||
prepareMsg.content = (java.util.Arrays.hashCode(pbftMessage.content) + "").getBytes();
|
prepareMsg.content =
|
||||||
|
(java.util.Arrays.hashCode(pbftMessage.content) + "").getBytes();
|
||||||
|
|
||||||
temp = new PCInfo();
|
temp = new PCInfo();
|
||||||
temp.request = pbftMessage;
|
temp.request = pbftMessage;
|
||||||
@ -108,7 +109,8 @@ public class PBFTAlgorithm implements CommitAlgorithm {
|
|||||||
break;
|
break;
|
||||||
case Prepare:
|
case Prepare:
|
||||||
temp = info.get(pbftMessage.order);
|
temp = info.get(pbftMessage.order);
|
||||||
LOGGER.info("receive Prepare from:" + pbftMessage.sendID + " -> " + pbftMessage.order);
|
LOGGER.info(
|
||||||
|
"receive Prepare from:" + pbftMessage.sendID + " -> " + pbftMessage.order);
|
||||||
if (temp == null) {
|
if (temp == null) {
|
||||||
PCInfo pcInfo = new PCInfo();
|
PCInfo pcInfo = new PCInfo();
|
||||||
pcInfo.buff.add(pbftMessage);
|
pcInfo.buff.add(pbftMessage);
|
||||||
@ -155,7 +157,8 @@ public class PBFTAlgorithm implements CommitAlgorithm {
|
|||||||
prepareMsg.order = pbftMessage.order;
|
prepareMsg.order = pbftMessage.order;
|
||||||
prepareMsg.type = PBFTType.PrePrepare;
|
prepareMsg.type = PBFTType.PrePrepare;
|
||||||
temp = info.get(prepareMsg.order);
|
temp = info.get(prepareMsg.order);
|
||||||
prepareMsg.content = (java.util.Arrays.hashCode(temp.request.content) + "").getBytes();
|
prepareMsg.content =
|
||||||
|
(java.util.Arrays.hashCode(temp.request.content) + "").getBytes();
|
||||||
broadcast(pbftMessage);
|
broadcast(pbftMessage);
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
@ -196,18 +199,19 @@ public class PBFTAlgorithm implements CommitAlgorithm {
|
|||||||
|
|
||||||
private void requestPrePrepareFromMaster(long order) {
|
private void requestPrePrepareFromMaster(long order) {
|
||||||
|
|
||||||
// PBFTMessage message = new PBFTMessage();
|
// PBFTMessage message = new PBFTMessage();
|
||||||
// message.type = PBFTType.ReSend;
|
// message.type = PBFTType.ReSend;
|
||||||
// message.order = order;
|
// message.order = order;
|
||||||
// message.content = new byte[1];
|
// message.content = new byte[1];
|
||||||
// sendToMaster(message);
|
// sendToMaster(message);
|
||||||
LOGGER.info("request Resend");
|
LOGGER.info("request Resend");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void matchPrePrepareFromOriginReqeust(int hash, PBFTMessage pbftMessage) {
|
private void matchPrePrepareFromOriginReqeust(int hash, PBFTMessage pbftMessage) {
|
||||||
for (PCInfo pcInfo : info.values()) {
|
for (PCInfo pcInfo : info.values()) {
|
||||||
for (PBFTMessage msg : pcInfo.buff) {
|
for (PBFTMessage msg : pcInfo.buff) {
|
||||||
if (msg.type == PBFTType.PrePrepare && Integer.parseInt(new String(msg.content)) == hash) {
|
if (msg.type == PBFTType.PrePrepare
|
||||||
|
&& Integer.parseInt(new String(msg.content)) == hash) {
|
||||||
handlePrePrepare(msg, original.get(hash).second);
|
handlePrePrepare(msg, original.get(hash).second);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -220,10 +224,8 @@ public class PBFTAlgorithm implements CommitAlgorithm {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void retryLater(int delay, final Node sender, final PBFTMessage pbftMessage) {
|
private void retryLater(int delay, final Node sender, final PBFTMessage pbftMessage) {
|
||||||
ContractManager.scheduledThreadPool.schedule(
|
ContractManager.scheduledThreadPool.schedule(() -> onPBFTMessage(sender, pbftMessage),
|
||||||
() -> onPBFTMessage(sender, pbftMessage),
|
delay, TimeUnit.MILLISECONDS);
|
||||||
delay,
|
|
||||||
TimeUnit.MILLISECONDS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handlePrePrepare(PBFTMessage pbftMessage, PBFTMessage req) {
|
private void handlePrePrepare(PBFTMessage pbftMessage, PBFTMessage req) {
|
||||||
@ -372,8 +374,8 @@ public class PBFTAlgorithm implements CommitAlgorithm {
|
|||||||
if (!isPrePrepareReceived) {
|
if (!isPrePrepareReceived) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
// System.out.println("---: updatePrepare, size:" + prepare.size() + " -->"
|
// System.out.println("---: updatePrepare, size:" + prepare.size() + " -->"
|
||||||
// + (center.members.size() / 3 * 2 + 1) + " --senderID:" + message.sendID);
|
// + (center.members.size() / 3 * 2 + 1) + " --senderID:" + message.sendID);
|
||||||
|
|
||||||
if (prepare.size() > center.members.size() / 3 * 2) {
|
if (prepare.size() > center.members.size() / 3 * 2) {
|
||||||
isSendCommit = true;
|
isSendCommit = true;
|
||||||
@ -399,8 +401,8 @@ public class PBFTAlgorithm implements CommitAlgorithm {
|
|||||||
|
|
||||||
public String getDisplayStr() {
|
public String getDisplayStr() {
|
||||||
return "pSize:" + (prepare == null ? "null" : prepare.size()) + " cSize:"
|
return "pSize:" + (prepare == null ? "null" : prepare.size()) + " cSize:"
|
||||||
+ (commit == null ? "null" : commit.size()) + " isSendCommit:" + isSendCommit + " isSendReply:"
|
+ (commit == null ? "null" : commit.size()) + " isSendCommit:" + isSendCommit
|
||||||
+ isSendReply + " buffSize:" + buff.size();
|
+ " isSendReply:" + isSendReply + " buffSize:" + buff.size();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -66,15 +66,15 @@ public class PBFTMessage {
|
|||||||
|
|
||||||
private static String IDA = "PBFTMsg";
|
private static String IDA = "PBFTMsg";
|
||||||
|
|
||||||
// public void sign(SM2KeyPair pair) {
|
// public void sign(SM2KeyPair pair) {
|
||||||
// sendID = Arrays.hashCode(pair.getPublicKey().getEncoded(false));
|
// sendID = Arrays.hashCode(pair.getPublicKey().getEncoded(false));
|
||||||
// signature = sm02.sign(content, IDA, pair).toString();
|
// signature = sm02.sign(content, IDA, pair).toString();
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// public boolean verify(PBFTMember member) {
|
// public boolean verify(PBFTMember member) {
|
||||||
// return sm02.verify(content, Signature.loadFromString(signature), IDA,
|
// return sm02.verify(content, Signature.loadFromString(signature), IDA,
|
||||||
// SM2KeyPair.publicKeyStr2ECPoint(member.pubKey));
|
// SM2KeyPair.publicKeyStr2ECPoint(member.pubKey));
|
||||||
// }
|
// }
|
||||||
|
|
||||||
public void setType(PBFTType type) {
|
public void setType(PBFTType type) {
|
||||||
this.type = type;
|
this.type = type;
|
||||||
|
@ -2,6 +2,7 @@ package org.bdware.sc.consistency.pbft;
|
|||||||
|
|
||||||
public enum PBFTType {
|
public enum PBFTType {
|
||||||
Request(0), PrePrepare(1), Prepare(2), Commit(3), Reply(4), Unknown(5), ReSend(6), AddMember(7);
|
Request(0), PrePrepare(1), Prepare(2), Commit(3), Reply(4), Unknown(5), ReSend(6), AddMember(7);
|
||||||
|
|
||||||
private int type;
|
private int type;
|
||||||
|
|
||||||
PBFTType(int i) {
|
PBFTType(int i) {
|
||||||
|
@ -4,8 +4,6 @@ import org.bdware.sc.bean.ContractRequest;
|
|||||||
import org.bdware.sc.conn.Node;
|
import org.bdware.sc.conn.Node;
|
||||||
import org.bdware.sc.consistency.CommitAlgorithm;
|
import org.bdware.sc.consistency.CommitAlgorithm;
|
||||||
import org.bdware.sc.consistency.Committer;
|
import org.bdware.sc.consistency.Committer;
|
||||||
import org.bdware.sc.consistency.pbft.PBFTMessage;
|
|
||||||
import org.bdware.sc.consistency.pbft.PBFTType;
|
|
||||||
import org.bdware.sc.units.TrustfulExecutorConnection;
|
import org.bdware.sc.units.TrustfulExecutorConnection;
|
||||||
|
|
||||||
public class ViewAlgorithm implements CommitAlgorithm {
|
public class ViewAlgorithm implements CommitAlgorithm {
|
||||||
@ -13,8 +11,7 @@ public class ViewAlgorithm implements CommitAlgorithm {
|
|||||||
private Committer committer;
|
private Committer committer;
|
||||||
private TrustfulExecutorConnection connection;
|
private TrustfulExecutorConnection connection;
|
||||||
|
|
||||||
public ViewAlgorithm(boolean isMaster) {
|
public ViewAlgorithm(boolean isMaster) {}
|
||||||
}
|
|
||||||
|
|
||||||
public void setCommitter(Committer c) {
|
public void setCommitter(Committer c) {
|
||||||
committer = c;
|
committer = c;
|
||||||
|
@ -49,22 +49,16 @@ public class EventBroker {
|
|||||||
tempTopics = recorder.recoverTempTopicsFromDb();
|
tempTopics = recorder.recoverTempTopicsFromDb();
|
||||||
|
|
||||||
// regularly check temporary topics and clean them
|
// regularly check temporary topics and clean them
|
||||||
ContractManager.scheduledThreadPool.scheduleWithFixedDelay(
|
ContractManager.scheduledThreadPool.scheduleWithFixedDelay(() -> {
|
||||||
() -> {
|
|
||||||
long current = System.currentTimeMillis();
|
long current = System.currentTimeMillis();
|
||||||
int oldSize = tempTopics.size();
|
int oldSize = tempTopics.size();
|
||||||
tempTopics.keySet().forEach(topic -> {
|
tempTopics.keySet().forEach(topic -> {
|
||||||
if (tempTopics.get(topic) + EXPIRED_TIME > current) {
|
if (tempTopics.get(topic) + EXPIRED_TIME > current) {
|
||||||
String reqID =
|
String reqID = ContractManager.instance.nodeCenterConn.getNodeId() + "_"
|
||||||
ContractManager.instance.nodeCenterConn.getNodeId() +
|
+ System.currentTimeMillis();
|
||||||
"_" + System.currentTimeMillis();
|
REvent cleanEvent = new REvent(topic, UNSUBSCRIBE, null, reqID);
|
||||||
REvent cleanEvent =
|
cleanEvent
|
||||||
new REvent(
|
.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair());
|
||||||
topic,
|
|
||||||
UNSUBSCRIBE,
|
|
||||||
null,
|
|
||||||
reqID);
|
|
||||||
cleanEvent.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair());
|
|
||||||
handle(cleanEvent);
|
handle(cleanEvent);
|
||||||
tempTopics.remove(topic);
|
tempTopics.remove(topic);
|
||||||
}
|
}
|
||||||
@ -72,20 +66,15 @@ public class EventBroker {
|
|||||||
if (oldSize != tempTopics.size()) {
|
if (oldSize != tempTopics.size()) {
|
||||||
recorder.saveTempTopics(tempTopics);
|
recorder.saveTempTopics(tempTopics);
|
||||||
}
|
}
|
||||||
},
|
}, 0L, EXPIRED_TIME, TimeUnit.MILLISECONDS);
|
||||||
0L,
|
|
||||||
EXPIRED_TIME,
|
|
||||||
TimeUnit.MILLISECONDS);
|
|
||||||
// regularly create check point in database
|
// regularly create check point in database
|
||||||
ContractManager.scheduledThreadPool.scheduleAtFixedRate(
|
ContractManager.scheduledThreadPool.scheduleAtFixedRate(
|
||||||
() -> recorder.createCheckPoint(topic2cIds, id2Consumers),
|
() -> recorder.createCheckPoint(topic2cIds, id2Consumers), EXPIRED_TIME,
|
||||||
EXPIRED_TIME,
|
EXPIRED_TIME, TimeUnit.MILLISECONDS);
|
||||||
EXPIRED_TIME,
|
|
||||||
TimeUnit.MILLISECONDS);
|
|
||||||
|
|
||||||
NodeConsumer.setCenter(center);
|
NodeConsumer.setCenter(center);
|
||||||
|
|
||||||
// client2Events = new HashMap<>();
|
// client2Events = new HashMap<>();
|
||||||
LOGGER.info("Event Broker starts!");
|
LOGGER.info("Event Broker starts!");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -103,10 +92,11 @@ public class EventBroker {
|
|||||||
switch (event.getType()) {
|
switch (event.getType()) {
|
||||||
case SUBSCRIBE:
|
case SUBSCRIBE:
|
||||||
if (null != topic && !topic.isEmpty()) {
|
if (null != topic && !topic.isEmpty()) {
|
||||||
doSubscribe(event);
|
IEventConsumer consumer = doSubscribe(event);
|
||||||
// save & try to sub in center
|
// save & try to sub in center
|
||||||
recorder.appendEvent(event);
|
recorder.appendEvent(event);
|
||||||
center.subInCenter(event.getTopic(), event.getSemantics());
|
center.subInCenter(event.getTopic(), event.getSemantics(), event.getCenter(),
|
||||||
|
consumer, event);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case UNSUBSCRIBE:
|
case UNSUBSCRIBE:
|
||||||
@ -116,9 +106,10 @@ public class EventBroker {
|
|||||||
case PUBLISH:
|
case PUBLISH:
|
||||||
case PREPUB:
|
case PREPUB:
|
||||||
case PRESUB:
|
case PRESUB:
|
||||||
LOGGER.info(String.format("Receive %s event from topic %s", event.getSemantics(), topic));
|
LOGGER.info(String.format("Receive %s event from topic %s", event.getSemantics(),
|
||||||
LOGGER.debug(String.format("Receive %s event %s: %s",
|
topic));
|
||||||
event.getSemantics(), topic, event.getContent()));
|
LOGGER.debug(String.format("Receive %s event %s: %s", event.getSemantics(), topic,
|
||||||
|
event.getContent()));
|
||||||
if (event.isForward()) {
|
if (event.isForward()) {
|
||||||
// send event to the event center
|
// send event to the event center
|
||||||
event.setForward(center.deliverEvent(topic, event));
|
event.setForward(center.deliverEvent(topic, event));
|
||||||
@ -133,13 +124,17 @@ public class EventBroker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doSubscribe(REvent e) {
|
private IEventConsumer doSubscribe(REvent e) {
|
||||||
subInReg(e.getTopic(), parseConsumer(e.getContent()), this.topic2cIds, this.id2Consumers);
|
IEventConsumer consumer = parseConsumer(e.getContent());
|
||||||
|
if (subInReg(e.getTopic(), consumer, this.topic2cIds, this.id2Consumers)) {
|
||||||
// for events with semantics ONLY_ONCE, mark the topic is a temporary topic
|
// for events with semantics ONLY_ONCE, mark the topic is a temporary topic
|
||||||
if (ONLY_ONCE.equals(e.getSemantics()) && !tempTopics.containsKey(e.getTopic())) {
|
if (ONLY_ONCE.equals(e.getSemantics()) && !tempTopics.containsKey(e.getTopic())) {
|
||||||
tempTopics.put(e.getTopic(), System.currentTimeMillis());
|
tempTopics.put(e.getTopic(), System.currentTimeMillis());
|
||||||
recorder.saveTempTopics(tempTopics);
|
recorder.saveTempTopics(tempTopics);
|
||||||
}
|
}
|
||||||
|
return consumer;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void doSubscribe(String topic, IEventConsumer consumer) {
|
public void doSubscribe(String topic, IEventConsumer consumer) {
|
||||||
@ -155,11 +150,8 @@ public class EventBroker {
|
|||||||
* @param id2Consumers consumer registry of broker or a check point in event recorder
|
* @param id2Consumers consumer registry of broker or a check point in event recorder
|
||||||
* @return if the subscribing succeeds
|
* @return if the subscribing succeeds
|
||||||
*/
|
*/
|
||||||
public boolean subInReg(
|
public boolean subInReg(String topic, IEventConsumer consumer,
|
||||||
String topic,
|
Map<String, Set<String>> topic2cIds, Map<String, IEventConsumer> id2Consumers) {
|
||||||
IEventConsumer consumer,
|
|
||||||
Map<String, Set<String>> topic2cIds,
|
|
||||||
Map<String, IEventConsumer> id2Consumers) {
|
|
||||||
if (null == consumer) {
|
if (null == consumer) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -173,8 +165,8 @@ public class EventBroker {
|
|||||||
topic2cIds.get(topic).add(cId);
|
topic2cIds.get(topic).add(cId);
|
||||||
switch (consumer.getType()) {
|
switch (consumer.getType()) {
|
||||||
case Contract:
|
case Contract:
|
||||||
LOGGER.info("contract " + ((ContractConsumer) consumer).getContract() +
|
LOGGER.info("contract " + ((ContractConsumer) consumer).getContract()
|
||||||
" subscribes topic " + topic);
|
+ " subscribes topic " + topic);
|
||||||
break;
|
break;
|
||||||
case Node:
|
case Node:
|
||||||
LOGGER.info("node " + consumer.getId() + " subscribes topic " + topic);
|
LOGGER.info("node " + consumer.getId() + " subscribes topic " + topic);
|
||||||
@ -195,10 +187,11 @@ public class EventBroker {
|
|||||||
* do subscribing in registry<br/>
|
* do subscribing in registry<br/>
|
||||||
* topic and consumer must not be null at the same time
|
* topic and consumer must not be null at the same time
|
||||||
* <ul>
|
* <ul>
|
||||||
* <li>if consumer is null and topic is not, it means the topic is a temporary topic, remove it</li>
|
* <li>if consumer is null and topic is not, it means the topic is a temporary topic, remove
|
||||||
* <li>if topic is null and consumer is not,
|
* it</li>
|
||||||
* it means a consumer or a contract wants to unsubscribe all topics,
|
* <li>if topic is null and consumer is not, it means a consumer or a contract wants to
|
||||||
* remove all related consumers in topic registry and consumer registry</li>
|
* unsubscribe all topics, remove all related consumers in topic registry and consumer
|
||||||
|
* registry</li>
|
||||||
* <li>if two of them is not null, do unsubscribing in two registries</li>
|
* <li>if two of them is not null, do unsubscribing in two registries</li>
|
||||||
* </ul>
|
* </ul>
|
||||||
*
|
*
|
||||||
@ -208,11 +201,8 @@ public class EventBroker {
|
|||||||
* @param id2Consumers consumer registry of broker or a check point in event recorder
|
* @param id2Consumers consumer registry of broker or a check point in event recorder
|
||||||
* @return if the subscribing succeeds
|
* @return if the subscribing succeeds
|
||||||
*/
|
*/
|
||||||
public boolean unsubInReg(
|
public boolean unsubInReg(String topic, IEventConsumer consumer,
|
||||||
String topic,
|
Map<String, Set<String>> topic2cIds, Map<String, IEventConsumer> id2Consumers) {
|
||||||
IEventConsumer consumer,
|
|
||||||
Map<String, Set<String>> topic2cIds,
|
|
||||||
Map<String, IEventConsumer> id2Consumers) {
|
|
||||||
if (null == topic && null == consumer) {
|
if (null == topic && null == consumer) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -232,7 +222,8 @@ public class EventBroker {
|
|||||||
// if cId belongs to a contract, find all related consumers
|
// if cId belongs to a contract, find all related consumers
|
||||||
toRmIds = new ArrayList<>();
|
toRmIds = new ArrayList<>();
|
||||||
id2Consumers.forEach((k, c) -> {
|
id2Consumers.forEach((k, c) -> {
|
||||||
if (c instanceof ContractConsumer && ((ContractConsumer) c).getContract().equals(cId)) {
|
if (c instanceof ContractConsumer
|
||||||
|
&& ((ContractConsumer) c).getContract().equals(cId)) {
|
||||||
toRmIds.add(k);
|
toRmIds.add(k);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -255,7 +246,8 @@ public class EventBroker {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* parse consumer information from content str<br/>
|
* parse consumer information from content str<br/>
|
||||||
* if caller wants to select all consumers of a contract, the content str is also parsed into a node consumer
|
* if caller wants to select all consumers of a contract, the content str is also parsed into a
|
||||||
|
* node consumer
|
||||||
*
|
*
|
||||||
* @param content json string, {"subscriber": "[subscriber]", "handler?": "[handler]"}
|
* @param content json string, {"subscriber": "[subscriber]", "handler?": "[handler]"}
|
||||||
* @return a node consumer or contract consumer, or null if exception is thrown
|
* @return a node consumer or contract consumer, or null if exception is thrown
|
||||||
@ -275,6 +267,9 @@ public class EventBroker {
|
|||||||
if (null != handler && !handler.isEmpty()) {
|
if (null != handler && !handler.isEmpty()) {
|
||||||
consumer = new ContractConsumer(subscriber, handler);
|
consumer = new ContractConsumer(subscriber, handler);
|
||||||
} else {
|
} else {
|
||||||
|
if (subscriber.equals(ContractManager.instance.nodeCenterConn.getNodeId())) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
consumer = new NodeConsumer(subscriber);
|
consumer = new NodeConsumer(subscriber);
|
||||||
}
|
}
|
||||||
return consumer;
|
return consumer;
|
||||||
@ -298,19 +293,16 @@ public class EventBroker {
|
|||||||
case AT_LEAST_ONCE:
|
case AT_LEAST_ONCE:
|
||||||
case NEED_RETRY:
|
case NEED_RETRY:
|
||||||
// send events to all
|
// send events to all
|
||||||
topicConsumers.forEach(cId ->
|
topicConsumers
|
||||||
deliverEvent(event, cEvent, nEventStr, cId, topic, true));
|
.forEach(cId -> deliverEvent(event, cEvent, nEventStr, cId, topic, true));
|
||||||
break;
|
break;
|
||||||
case AT_MOST_ONCE:
|
case AT_MOST_ONCE:
|
||||||
// send event to a random consumer
|
// send event to a random consumer
|
||||||
// AT_MOST_ONCE, so broker don't need to do anything when delivering fails
|
// AT_MOST_ONCE, so broker don't need to do anything when delivering fails
|
||||||
deliverEvent(
|
deliverEvent(event, cEvent, nEventStr,
|
||||||
event,
|
topicConsumers.toArray()[(int) (Math.random() * topicConsumers.size())]
|
||||||
cEvent,
|
.toString(),
|
||||||
nEventStr,
|
topic, true);
|
||||||
topicConsumers.toArray()[(int) (Math.random() * topicConsumers.size())].toString(),
|
|
||||||
topic,
|
|
||||||
true);
|
|
||||||
break;
|
break;
|
||||||
case ONLY_ONCE:
|
case ONLY_ONCE:
|
||||||
switch (event.getType()) {
|
switch (event.getType()) {
|
||||||
@ -336,20 +328,14 @@ public class EventBroker {
|
|||||||
// send PREPUB events to all consumers
|
// send PREPUB events to all consumers
|
||||||
// TODO if there are no consumers to receive the ONLY_ONCE events?
|
// TODO if there are no consumers to receive the ONLY_ONCE events?
|
||||||
ContractManager.threadPool.execute(() -> {
|
ContractManager.threadPool.execute(() -> {
|
||||||
REvent prePubMsg = new REvent(event.getTopic(),
|
REvent prePubMsg = new REvent(event.getTopic(), PREPUB, contentHash,
|
||||||
PREPUB,
|
|
||||||
contentHash,
|
|
||||||
event.getRequestID());
|
event.getRequestID());
|
||||||
prePubMsg.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair());
|
prePubMsg.doSignature(
|
||||||
topicConsumers.forEach(cId ->
|
ContractManager.instance.nodeCenterConn.getNodeKeyPair());
|
||||||
deliverEvent(prePubMsg,
|
topicConsumers.forEach(cId -> deliverEvent(prePubMsg,
|
||||||
new Event(event.getTopic(),
|
new Event(event.getTopic(), contentHash,
|
||||||
contentHash,
|
|
||||||
prePubMsg.getSemantics()),
|
prePubMsg.getSemantics()),
|
||||||
JsonUtil.toJson(prePubMsg),
|
JsonUtil.toJson(prePubMsg), cId, topic, false));
|
||||||
cId,
|
|
||||||
topic,
|
|
||||||
false));
|
|
||||||
// wait for responses from contracts (PRESUB events)
|
// wait for responses from contracts (PRESUB events)
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
@ -357,21 +343,21 @@ public class EventBroker {
|
|||||||
flag.wait(30 * 1000L);
|
flag.wait(30 * 1000L);
|
||||||
}
|
}
|
||||||
if (!flag.get().isEmpty()) {
|
if (!flag.get().isEmpty()) {
|
||||||
REvent finalMsg = new REvent(flag.get(),
|
REvent finalMsg = new REvent(flag.get(), PUBLISH,
|
||||||
PUBLISH,
|
event.getContent(), HashUtil.sha3(contentHash
|
||||||
event.getContent(),
|
+ System.currentTimeMillis()));
|
||||||
HashUtil.sha3(
|
|
||||||
contentHash + System.currentTimeMillis()));
|
|
||||||
// if the delivering fails, retry publishing
|
// if the delivering fails, retry publishing
|
||||||
finalMsg.setSemantics(NEED_RETRY);
|
finalMsg.setSemantics(NEED_RETRY);
|
||||||
finalMsg.doSignature(
|
finalMsg.doSignature(
|
||||||
ContractManager.instance.nodeCenterConn.getNodeKeyPair());
|
ContractManager.instance.nodeCenterConn
|
||||||
|
.getNodeKeyPair());
|
||||||
handle(finalMsg);
|
handle(finalMsg);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOGGER.warn("ONLY_ONE event delivering is interrupted: " + e.getMessage());
|
LOGGER.warn("ONLY_ONE event delivering is interrupted: "
|
||||||
// e.printStackTrace();
|
+ e.getMessage());
|
||||||
|
// e.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -389,19 +375,15 @@ public class EventBroker {
|
|||||||
* publish the event to the consumer
|
* publish the event to the consumer
|
||||||
*
|
*
|
||||||
* @param event event message
|
* @param event event message
|
||||||
* @param cEvent simple event message to the contract consumer, only topic, content and semantics
|
* @param cEvent simple event message to the contract consumer, only topic, content and
|
||||||
|
* semantics
|
||||||
* @param nEventStr event message to the node
|
* @param nEventStr event message to the node
|
||||||
* @param cId consumer id
|
* @param cId consumer id
|
||||||
* @param topic topic of the event
|
* @param topic topic of the event
|
||||||
* @param isPub if the event is published or pre-published
|
* @param isPub if the event is published or pre-published
|
||||||
*/
|
*/
|
||||||
private void deliverEvent(
|
private void deliverEvent(REvent event, Event cEvent, String nEventStr, String cId,
|
||||||
REvent event,
|
String topic, boolean isPub) {
|
||||||
Event cEvent,
|
|
||||||
String nEventStr,
|
|
||||||
String cId,
|
|
||||||
String topic,
|
|
||||||
boolean isPub) {
|
|
||||||
if (id2Consumers.containsKey(cId)) {
|
if (id2Consumers.containsKey(cId)) {
|
||||||
IEventConsumer consumer = id2Consumers.get(cId);
|
IEventConsumer consumer = id2Consumers.get(cId);
|
||||||
if (consumer instanceof ContractConsumer) {
|
if (consumer instanceof ContractConsumer) {
|
||||||
@ -412,31 +394,24 @@ public class EventBroker {
|
|||||||
public void onResult(String unused) {
|
public void onResult(String unused) {
|
||||||
// if the delivering fails, unsubscribe the consumer
|
// if the delivering fails, unsubscribe the consumer
|
||||||
ContractConsumer c = (ContractConsumer) consumer;
|
ContractConsumer c = (ContractConsumer) consumer;
|
||||||
ContractClient client = ContractManager.instance.getClient(c.getContract());
|
ContractClient client =
|
||||||
String reqID =
|
ContractManager.instance.getClient(c.getContract());
|
||||||
ContractManager.instance.nodeCenterConn.getNodeKeyPair().getPublicKeyStr() +
|
String reqID = ContractManager.instance.nodeCenterConn.getNodeKeyPair()
|
||||||
"_" + System.currentTimeMillis();
|
.getPublicKeyStr() + "_" + System.currentTimeMillis();
|
||||||
REvent unsubEvent =
|
REvent unsubEvent = new REvent(topic, UNSUBSCRIBE,
|
||||||
new REvent(
|
"{\"subscriber\":\"" + cId + "\"}", reqID);
|
||||||
topic,
|
|
||||||
UNSUBSCRIBE,
|
|
||||||
"{\"subscriber\":\"" + cId + "\"}",
|
|
||||||
reqID);
|
|
||||||
unsubEvent.doSignature(client.getPubkey(), client.getContractKey());
|
unsubEvent.doSignature(client.getPubkey(), client.getContractKey());
|
||||||
handle(unsubEvent);
|
handle(unsubEvent);
|
||||||
|
|
||||||
// if the event is an ONLY_ONCE event, retry publishing
|
// if the event is an ONLY_ONCE event, retry publishing
|
||||||
if (NEED_RETRY.equals(event.getSemantics())) {
|
if (NEED_RETRY.equals(event.getSemantics())) {
|
||||||
REvent newMsg =
|
REvent newMsg = new REvent(topic.split("\\|")[0], PUBLISH,
|
||||||
new REvent(
|
event.getContent(), event.getRequestID());
|
||||||
topic.split("\\|")[0],
|
|
||||||
PUBLISH,
|
|
||||||
event.getContent(),
|
|
||||||
event.getRequestID());
|
|
||||||
newMsg.setSemantics(ONLY_ONCE);
|
newMsg.setSemantics(ONLY_ONCE);
|
||||||
newMsg.setHash(event.getHash());
|
newMsg.setHash(event.getHash());
|
||||||
newMsg.setTxHash(event.getTxHash());
|
newMsg.setTxHash(event.getTxHash());
|
||||||
newMsg.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair());
|
newMsg.doSignature(
|
||||||
|
ContractManager.instance.nodeCenterConn.getNodeKeyPair());
|
||||||
handle(newMsg);
|
handle(newMsg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -458,8 +433,8 @@ public class EventBroker {
|
|||||||
});
|
});
|
||||||
} else if (isPub) {
|
} else if (isPub) {
|
||||||
// client consumer
|
// client consumer
|
||||||
ContractManager.threadPool.execute(() ->
|
ContractManager.threadPool
|
||||||
consumer.publishEvent(nEventStr, new ResultCallback() {
|
.execute(() -> consumer.publishEvent(nEventStr, new ResultCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void onResult(String unused) {
|
public void onResult(String unused) {
|
||||||
unsubInReg(null, consumer, topic2cIds, id2Consumers);
|
unsubInReg(null, consumer, topic2cIds, id2Consumers);
|
||||||
|
@ -1,5 +1,8 @@
|
|||||||
package org.bdware.sc.event;
|
package org.bdware.sc.event;
|
||||||
|
|
||||||
|
import org.bdware.sc.event.clients.ContractConsumer;
|
||||||
|
import org.bdware.sc.event.clients.IEventConsumer;
|
||||||
|
import org.bdware.sc.event.clients.NodeConsumer;
|
||||||
import org.bdware.sc.util.DHTUtil;
|
import org.bdware.sc.util.DHTUtil;
|
||||||
import org.bdware.sc.util.JsonUtil;
|
import org.bdware.sc.util.JsonUtil;
|
||||||
|
|
||||||
@ -9,23 +12,24 @@ import static org.bdware.sc.ContractManager.instance;
|
|||||||
* @author Kaidong Wu
|
* @author Kaidong Wu
|
||||||
*/
|
*/
|
||||||
public class EventCenter {
|
public class EventCenter {
|
||||||
// private static final Logger LOGGER = LogManager.getLogger(EventCenter.class);
|
// private static final Logger LOGGER = LogManager.getLogger(EventCenter.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* get the nearest node to the topic in the hash function range
|
* get the nearest node to the topic in the hash function range
|
||||||
*
|
*
|
||||||
* @param topic the topic
|
* @param topic the topic
|
||||||
|
* @param k the number of centers
|
||||||
* @return id of the node
|
* @return id of the node
|
||||||
*/
|
*/
|
||||||
public String getCenterByTopic(String topic) {
|
public String[] getCenterByTopic(String topic, int k) {
|
||||||
if (null == instance.nodeCenterConn) {
|
if (null == instance.nodeCenterConn) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
String[] centers = DHTUtil.getClusterByKey(topic, 1);
|
String[] centers = DHTUtil.getClusterByKey(topic, k);
|
||||||
if (null == centers || centers.length == 0) {
|
if (null == centers || centers.length == 0) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return centers[0];
|
return centers;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -33,21 +37,39 @@ public class EventCenter {
|
|||||||
*
|
*
|
||||||
* @param topic event topic
|
* @param topic event topic
|
||||||
* @param semantics event semantics, used to mark PRESUB events
|
* @param semantics event semantics, used to mark PRESUB events
|
||||||
|
* @param center id of event center if the subscribing has been handled
|
||||||
|
* @param consumer consumer
|
||||||
|
* @param event original event
|
||||||
*/
|
*/
|
||||||
public void subInCenter(String topic, REvent.REventSemantics semantics) {
|
public void subInCenter(String topic, REvent.REventSemantics semantics, String center,
|
||||||
|
IEventConsumer consumer, REvent event) {
|
||||||
if (null == instance.nodeCenterConn) {
|
if (null == instance.nodeCenterConn) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
REvent msg = new REvent(
|
REvent msg = new REvent(topic, REvent.REventType.SUBSCRIBE,
|
||||||
topic,
|
String.format("{\"subscriber\":\"%s\"}", instance.nodeCenterConn.getNodeId()), "");
|
||||||
REvent.REventType.SUBSCRIBE,
|
|
||||||
String.format("{\"subscriber\":\"%s\"}",
|
|
||||||
instance.nodeCenterConn.getNodeId()),
|
|
||||||
"");
|
|
||||||
msg.setSemantics(semantics);
|
msg.setSemantics(semantics);
|
||||||
msg.doSignature(instance.nodeCenterConn.getNodeKeyPair());
|
msg.doSignature(instance.nodeCenterConn.getNodeKeyPair());
|
||||||
String nodeId = getCenterByTopic(topic);
|
msg.setCenter(center);
|
||||||
instance.masterStub.deliverEvent(nodeId, JsonUtil.toJson(msg));
|
String[] centers = getCenterByTopic(topic, 2);
|
||||||
|
if (null == centers) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (null != center) {
|
||||||
|
if (centers[0].equals(center) && centers.length > 1) {
|
||||||
|
instance.masterStub.deliverEvent(centers[1], JsonUtil.toJson(msg));
|
||||||
|
} else {
|
||||||
|
instance.masterStub.deliverEvent(centers[0], JsonUtil.toJson(msg));
|
||||||
|
}
|
||||||
|
} else if (!instance.masterStub.deliverEvent(centers[0], JsonUtil.toJson(msg))) {
|
||||||
|
if (consumer instanceof NodeConsumer) {
|
||||||
|
event.setCenter(centers[0]);
|
||||||
|
instance.masterStub.deliverEvent(consumer.getId(), JsonUtil.toJson(event));
|
||||||
|
} else if (consumer instanceof ContractConsumer && centers.length > 1) {
|
||||||
|
msg.setCenter(centers[0]);
|
||||||
|
instance.masterStub.deliverEvent(centers[1], JsonUtil.toJson(msg));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -61,8 +83,11 @@ public class EventCenter {
|
|||||||
if (null == instance.masterStub) {
|
if (null == instance.masterStub) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
String nodeId = getCenterByTopic(topic);
|
String[] centers = getCenterByTopic(topic, 1);
|
||||||
return instance.masterStub.deliverEvent(nodeId, JsonUtil.toJson(event));
|
if (null == centers) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return instance.masterStub.deliverEvent(centers[0], JsonUtil.toJson(event));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -79,9 +79,11 @@ public class EventRecorder {
|
|||||||
String key = KeyValueRocksDBUtil.instance.getValue(dbName, LATEST_EVENT_KEY);
|
String key = KeyValueRocksDBUtil.instance.getValue(dbName, LATEST_EVENT_KEY);
|
||||||
latestEvent.set(key);
|
latestEvent.set(key);
|
||||||
CheckPoint cp = new CheckPoint();
|
CheckPoint cp = new CheckPoint();
|
||||||
Type topic2cIdsType = TypeToken.getParameterized(ConcurrentHashMap.class, String.class,
|
Type topic2cIdsType =
|
||||||
TypeToken.getParameterized(Set.class, String.class,
|
TypeToken
|
||||||
String.class).getType()).getType();
|
.getParameterized(ConcurrentHashMap.class, String.class,
|
||||||
|
TypeToken.getParameterized(Set.class, String.class).getType())
|
||||||
|
.getType();
|
||||||
// retrieving transactions from database
|
// retrieving transactions from database
|
||||||
while (null != key && !key.isEmpty()) {
|
while (null != key && !key.isEmpty()) {
|
||||||
String json = KeyValueRocksDBUtil.instance.getValue(dbName, key);
|
String json = KeyValueRocksDBUtil.instance.getValue(dbName, key);
|
||||||
@ -93,14 +95,16 @@ public class EventRecorder {
|
|||||||
if (json.startsWith("cp")) {
|
if (json.startsWith("cp")) {
|
||||||
// create check point by the transaction and stop retrieving
|
// create check point by the transaction and stop retrieving
|
||||||
JsonObject data = JsonUtil.parseStringAsJsonObject(json.substring(2));
|
JsonObject data = JsonUtil.parseStringAsJsonObject(json.substring(2));
|
||||||
cp.topic2cIds = JsonUtil.fromJson(data.get("topic2cIds").toString(), topic2cIdsType);
|
cp.topic2cIds =
|
||||||
|
JsonUtil.fromJson(data.get("topic2cIds").toString(), topic2cIdsType);
|
||||||
JsonObject id2Consumers = data.getAsJsonObject("id2Consumers");
|
JsonObject id2Consumers = data.getAsJsonObject("id2Consumers");
|
||||||
for (String k : id2Consumers.keySet()) {
|
for (String k : id2Consumers.keySet()) {
|
||||||
JsonObject consumer = id2Consumers.getAsJsonObject(k);
|
JsonObject consumer = id2Consumers.getAsJsonObject(k);
|
||||||
if (!consumer.has("type")) {
|
if (!consumer.has("type")) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
ConsumerType type = ConsumerType.valueOf(consumer.get("type").getAsString());
|
ConsumerType type =
|
||||||
|
ConsumerType.valueOf(consumer.get("type").getAsString());
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case Contract:
|
case Contract:
|
||||||
cp.id2Consumers.put(k,
|
cp.id2Consumers.put(k,
|
||||||
@ -145,7 +149,8 @@ public class EventRecorder {
|
|||||||
// if empty, return the check point
|
// if empty, return the check point
|
||||||
latestEvent.setCp(true);
|
latestEvent.setCp(true);
|
||||||
} else {
|
} else {
|
||||||
// on the base of old check point, process following sub or unsub events to recover registry
|
// on the base of old check point, process following sub or unsub events to recover
|
||||||
|
// registry
|
||||||
while (!stack.empty()) {
|
while (!stack.empty()) {
|
||||||
Object record = stack.pop();
|
Object record = stack.pop();
|
||||||
if (record instanceof CheckPoint) {
|
if (record instanceof CheckPoint) {
|
||||||
@ -155,15 +160,19 @@ public class EventRecorder {
|
|||||||
IEventConsumer consumer = broker.parseConsumer(tran.content);
|
IEventConsumer consumer = broker.parseConsumer(tran.content);
|
||||||
switch (tran.type) {
|
switch (tran.type) {
|
||||||
case SUBSCRIBE:
|
case SUBSCRIBE:
|
||||||
if (!broker.subInReg(tran.topic, consumer, cp.topic2cIds, cp.id2Consumers)) {
|
if (!broker.subInReg(tran.topic, consumer, cp.topic2cIds,
|
||||||
|
cp.id2Consumers)) {
|
||||||
LOGGER.warn("record damaged! " + key);
|
LOGGER.warn("record damaged! " + key);
|
||||||
LOGGER.debug("record damaged! " + key + ": " + JsonUtil.toJson(tran));
|
LOGGER.debug(
|
||||||
|
"record damaged! " + key + ": " + JsonUtil.toJson(tran));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case UNSUBSCRIBE:
|
case UNSUBSCRIBE:
|
||||||
if (!broker.unsubInReg(tran.topic, consumer, cp.topic2cIds, cp.id2Consumers)) {
|
if (!broker.unsubInReg(tran.topic, consumer, cp.topic2cIds,
|
||||||
|
cp.id2Consumers)) {
|
||||||
LOGGER.warn("record damaged! " + key);
|
LOGGER.warn("record damaged! " + key);
|
||||||
LOGGER.debug("record damaged! " + key + ": " + JsonUtil.toJson(tran));
|
LOGGER.debug(
|
||||||
|
"record damaged! " + key + ": " + JsonUtil.toJson(tran));
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package org.bdware.sc.event.clients;
|
package org.bdware.sc.event.clients;
|
||||||
|
|
||||||
|
import com.google.gson.Gson;
|
||||||
import com.google.gson.annotations.Expose;
|
import com.google.gson.annotations.Expose;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
@ -80,6 +81,7 @@ public class ContractConsumer implements IEventConsumer {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
AtomicInteger callCount = new AtomicInteger(0);
|
AtomicInteger callCount = new AtomicInteger(0);
|
||||||
|
LOGGER.info("ContractConsumer!" + new Gson().toJson(cr));
|
||||||
// TODO sending requests at a high frequency maybe cause that some requests are ignored
|
// TODO sending requests at a high frequency maybe cause that some requests are ignored
|
||||||
ScheduledFuture<?> future = ContractManager.scheduledThreadPool.scheduleAtFixedRate(
|
ScheduledFuture<?> future = ContractManager.scheduledThreadPool.scheduleAtFixedRate(
|
||||||
() -> ContractManager.instance.executeContractInternal(cr, new ResultCallback() {
|
() -> ContractManager.instance.executeContractInternal(cr, new ResultCallback() {
|
||||||
@ -89,19 +91,20 @@ public class ContractConsumer implements IEventConsumer {
|
|||||||
try {
|
try {
|
||||||
ContractResult result = JsonUtil.fromJson(str, ContractResult.class);
|
ContractResult result = JsonUtil.fromJson(str, ContractResult.class);
|
||||||
if (!result.status.equals(ContractResult.Status.Success)) {
|
if (!result.status.equals(ContractResult.Status.Success)) {
|
||||||
if (callCount.get() == TIMEOUT_COUNT ||
|
if (callCount.get() == TIMEOUT_COUNT
|
||||||
(result.status == ContractResult.Status.Exception &&
|
|| (result.status == ContractResult.Status.Exception
|
||||||
result.result.toString().contains("not exported"))) {
|
&& result.result.toString()
|
||||||
LOGGER.error(String.format(
|
.contains("not exported"))) {
|
||||||
"receiving event error! %s.%s: %s", contract, handler, str));
|
LOGGER.error(String.format("receiving event error! %s.%s: %s",
|
||||||
|
contract, handler, str));
|
||||||
rc.onResult((String) null);
|
rc.onResult((String) null);
|
||||||
} else {
|
} else {
|
||||||
ret = false;
|
ret = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOGGER.error(String.format(
|
LOGGER.error(String.format("receiving event error! %s.%s: %s", contract,
|
||||||
"receiving event error! %s.%s: %s", contract, handler, e.getMessage()));
|
handler, e.getMessage()));
|
||||||
rc.onResult((String) null);
|
rc.onResult((String) null);
|
||||||
}
|
}
|
||||||
callCount.incrementAndGet();
|
callCount.incrementAndGet();
|
||||||
@ -119,10 +122,7 @@ public class ContractConsumer implements IEventConsumer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, (reqID, hashStr) -> {
|
}, (reqID, hashStr) -> {
|
||||||
}),
|
}), 500L, 2500L, TimeUnit.MILLISECONDS);
|
||||||
500L,
|
|
||||||
2500L,
|
|
||||||
TimeUnit.MILLISECONDS);
|
|
||||||
scheduledFutures.put(getId(), future);
|
scheduledFutures.put(getId(), future);
|
||||||
synchronized (flag) {
|
synchronized (flag) {
|
||||||
flag.notify();
|
flag.notify();
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package org.bdware.sc.event.clients;
|
package org.bdware.sc.event.clients;
|
||||||
|
|
||||||
import org.bdware.sc.conn.ResultCallback;
|
import org.bdware.sc.conn.ResultCallback;
|
||||||
import org.bdware.sc.event.Event;
|
|
||||||
|
|
||||||
public interface IEventConsumer {
|
public interface IEventConsumer {
|
||||||
String getId();
|
String getId();
|
||||||
@ -13,8 +12,6 @@ public interface IEventConsumer {
|
|||||||
void competeSub(Object msg, ResultCallback rc, String... options);
|
void competeSub(Object msg, ResultCallback rc, String... options);
|
||||||
|
|
||||||
enum ConsumerType {
|
enum ConsumerType {
|
||||||
Contract,
|
Contract, Node, WSClient
|
||||||
Node,
|
|
||||||
WSClient
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -47,6 +47,5 @@ public class WSClientConsumer implements IEventConsumer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void competeSub(Object msg, ResultCallback rc, String... options) {
|
public void competeSub(Object msg, ResultCallback rc, String... options) {}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -6,7 +6,6 @@ import org.bdware.sc.bean.ContractRequest;
|
|||||||
import org.bdware.sc.conn.Description;
|
import org.bdware.sc.conn.Description;
|
||||||
import org.bdware.sc.conn.MsgHandler;
|
import org.bdware.sc.conn.MsgHandler;
|
||||||
import org.bdware.sc.conn.ResultCallback;
|
import org.bdware.sc.conn.ResultCallback;
|
||||||
import org.bdware.sc.event.REvent;
|
|
||||||
import org.bdware.sc.get.GetMessage;
|
import org.bdware.sc.get.GetMessage;
|
||||||
import org.bdware.sc.util.JsonUtil;
|
import org.bdware.sc.util.JsonUtil;
|
||||||
|
|
||||||
@ -35,8 +34,7 @@ public class ManagerHandler extends MsgHandler {
|
|||||||
cb.onResult(cm.getTimesOfExecution(msg.arg));
|
cb.onResult(cm.getTimesOfExecution(msg.arg));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Description(
|
@Description(value = "execute Contract, {\"contractID\":\"112233\",\"arg\":\"\"}",
|
||||||
value = "execute Contract, {\"contractID\":\"112233\",\"arg\":\"\"}",
|
|
||||||
isAsync = true)
|
isAsync = true)
|
||||||
public void executeContract(GetMessage msg, ResultCallback cb) {
|
public void executeContract(GetMessage msg, ResultCallback cb) {
|
||||||
ContractRequest c;
|
ContractRequest c;
|
||||||
@ -89,4 +87,10 @@ public class ManagerHandler extends MsgHandler {
|
|||||||
ContractRequest cr = JsonUtil.fromJson(msg.arg, ContractRequest.class);
|
ContractRequest cr = JsonUtil.fromJson(msg.arg, ContractRequest.class);
|
||||||
cb.onResult(cm.addDEMember(cr.getContractID(), cr.getArg().getAsString()));
|
cb.onResult(cm.addDEMember(cr.getContractID(), cr.getArg().getAsString()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Description("add distributed execution member, sample: {contractID:id/name, arg:ipAndPort}")
|
||||||
|
public void getLedgerParams(GetMessage msg, ResultCallback cb) {
|
||||||
|
|
||||||
|
cb.onResult(ContractManager.instance.getLedgerParams().toString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,81 +4,51 @@ package org.bdware.sc.units;
|
|||||||
import java.io.*;
|
import java.io.*;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* 节点启动后记录自己的units信息和合约的一些信息
|
* 节点启动后记录自己的units信息和合约的一些信息 节点重新上线后,NC发要求让其恢复,则从本地读取自己的这些信息进行恢复
|
||||||
* 节点重新上线后,NC发要求让其恢复,则从本地读取自己的这些信息进行恢复
|
|
||||||
*/
|
*/
|
||||||
public class ContractRecord implements Serializable {
|
public class ContractRecord implements Serializable {
|
||||||
/* private static final long serialVersionUID = 704775241674568688L;
|
/*
|
||||||
|
* private static final long serialVersionUID = 704775241674568688L;
|
||||||
public transient RecoverFlag recoverFlag = RecoverFlag.Fine;
|
*
|
||||||
|
* public transient RecoverFlag recoverFlag = RecoverFlag.Fine;
|
||||||
public String contractID;
|
*
|
||||||
public String contractName;
|
* public String contractID; public String contractName; public String key; public String
|
||||||
public String key;
|
* contractpubKey; public ContractType type; public int copies = 1; public int lastExeSeq = -1;
|
||||||
public String contractpubKey;
|
* public boolean isPrivate = false; public String pubKeyPath; public String ypkName;
|
||||||
public ContractType type;
|
*
|
||||||
public int copies = 1;
|
* //public Map<String,String> members; //k-udpid,v-udpaddress
|
||||||
public int lastExeSeq = -1;
|
*
|
||||||
public boolean isPrivate = false;
|
* public String memory = "";
|
||||||
public String pubKeyPath;
|
*
|
||||||
public String ypkName;
|
* //JavaScriptEntry public String invokeID;
|
||||||
|
*
|
||||||
//public Map<String,String> members; //k-udpid,v-udpaddress
|
* public ContractRecord(String id){ this.contractID = id; }
|
||||||
|
*
|
||||||
public String memory = "";
|
* public ContractRecord(String id,String contractName,String key,String
|
||||||
|
* contractpubKey,ContractType type,int copies){ this.contractID = id; this.contractName =
|
||||||
//JavaScriptEntry
|
* contractName; this.key = key; this.contractpubKey = contractpubKey; this.type = type;
|
||||||
public String invokeID;
|
* this.copies = copies; }
|
||||||
|
*
|
||||||
public ContractRecord(String id){
|
* public void printContent(){ System.out.println("==========ContractRecord========");
|
||||||
this.contractID = id;
|
*
|
||||||
}
|
* System.out.println("contractID=" + contractID == null ? "null" : contractID);
|
||||||
|
* System.out.println("contractName=" + contractName == null ? "null" : contractName);
|
||||||
public ContractRecord(String id,String contractName,String key,String contractpubKey,ContractType type,int copies){
|
* System.out.println("key=" + key == null ? "null" : key); System.out.println("contractPubKey="
|
||||||
this.contractID = id;
|
* + contractpubKey == null ? "null" : contractpubKey); System.out.println("type=" + type ==
|
||||||
this.contractName = contractName;
|
* null ? "null" : type); System.out.println("lastExeSeq=" + lastExeSeq == null ? "null" :
|
||||||
this.key = key;
|
* lastExeSeq); System.out.println("invokeID=" + invokeID == null ? "null" : invokeID);
|
||||||
this.contractpubKey = contractpubKey;
|
* System.out.println("copies=" + copies); System.out.println("isPrivate=" + isPrivate);
|
||||||
this.type = type;
|
* System.out.println("pubKeyPath=" + pubKeyPath == null ? "null" : pubKeyPath);
|
||||||
this.copies = copies;
|
* System.out.println("ypkName=" + ypkName == null ? "null" : ypkName); if(members != null){
|
||||||
}
|
* for(String k : members.keySet()){ System.out.println("members " + k + "-" + members.get(k));
|
||||||
|
* } } System.out.println("memory=" + memory == null ? "null" : memory);
|
||||||
public void printContent(){
|
* System.out.println("==========ContractRecord print finish======="); }
|
||||||
System.out.println("==========ContractRecord========");
|
*
|
||||||
|
* public static void getContentFromFile(String path){ File file = new File(path); try{
|
||||||
System.out.println("contractID=" + contractID == null ? "null" : contractID);
|
* FileInputStream os = new FileInputStream(file); ObjectInputStream oos = new
|
||||||
System.out.println("contractName=" + contractName == null ? "null" : contractName);
|
* ObjectInputStream(os); ContractRecord record = (ContractRecord) oos.readObject();
|
||||||
System.out.println("key=" + key == null ? "null" : key);
|
* record.printContent(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch
|
||||||
System.out.println("contractPubKey=" + contractpubKey == null ? "null" : contractpubKey);
|
* (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) {
|
||||||
System.out.println("type=" + type == null ? "null" : type);
|
* e.printStackTrace(); } }
|
||||||
System.out.println("lastExeSeq=" + lastExeSeq == null ? "null" : lastExeSeq);
|
*/
|
||||||
System.out.println("invokeID=" + invokeID == null ? "null" : invokeID);
|
|
||||||
System.out.println("copies=" + copies);
|
|
||||||
System.out.println("isPrivate=" + isPrivate);
|
|
||||||
System.out.println("pubKeyPath=" + pubKeyPath == null ? "null" : pubKeyPath);
|
|
||||||
System.out.println("ypkName=" + ypkName == null ? "null" : ypkName);
|
|
||||||
if(members != null){
|
|
||||||
for(String k : members.keySet()){
|
|
||||||
System.out.println("members " + k + "-" + members.get(k));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
System.out.println("memory=" + memory == null ? "null" : memory);
|
|
||||||
System.out.println("==========ContractRecord print finish=======");
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void getContentFromFile(String path){
|
|
||||||
File file = new File(path);
|
|
||||||
try{
|
|
||||||
FileInputStream os = new FileInputStream(file);
|
|
||||||
ObjectInputStream oos = new ObjectInputStream(os);
|
|
||||||
ContractRecord record = (ContractRecord) oos.readObject();
|
|
||||||
record.printContent();
|
|
||||||
} catch (FileNotFoundException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
} catch (ClassNotFoundException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}*/
|
|
||||||
}
|
}
|
||||||
|
@ -12,13 +12,11 @@ import java.util.*;
|
|||||||
public class ContractUnitController {
|
public class ContractUnitController {
|
||||||
private static final Logger LOGGER = LogManager.getLogger(ContractUnitController.class);
|
private static final Logger LOGGER = LogManager.getLogger(ContractUnitController.class);
|
||||||
private final TrustfulExecutorConnection connection;
|
private final TrustfulExecutorConnection connection;
|
||||||
private final Map<String, ContractUnit> units; //这个节点上<合约id,合约集群>,其中每个ContractUnit中存储了对应合约id的其他节点udp信息
|
private final Map<String, ContractUnit> units; // 这个节点上<合约id,合约集群>,其中每个ContractUnit中存储了对应合约id的其他节点udp信息
|
||||||
ContractManager manager;
|
ContractManager manager;
|
||||||
SequencingAlgorithmFactory algorithmFactory;
|
SequencingAlgorithmFactory algorithmFactory;
|
||||||
|
|
||||||
public ContractUnitController(
|
public ContractUnitController(TrustfulExecutorConnection connection, ContractManager manager,
|
||||||
TrustfulExecutorConnection connection,
|
|
||||||
ContractManager manager,
|
|
||||||
SequencingAlgorithmFactory factory) {
|
SequencingAlgorithmFactory factory) {
|
||||||
this.connection = connection;
|
this.connection = connection;
|
||||||
this.manager = manager;
|
this.manager = manager;
|
||||||
@ -39,18 +37,13 @@ public class ContractUnitController {
|
|||||||
unit.node2member = new HashMap<>();
|
unit.node2member = new HashMap<>();
|
||||||
units.put(req.contract.getID(), unit);
|
units.put(req.contract.getID(), unit);
|
||||||
|
|
||||||
System.out.println(
|
System.out.println("[ContractUnitController] startContract:" + result + " isMaster:"
|
||||||
"[ContractUnitController] startContract:"
|
+ req.isMaster + " cid:" + req.contract.getID());
|
||||||
+ result
|
|
||||||
+ " isMaster:"
|
|
||||||
+ req.isMaster
|
|
||||||
+ " cid:"
|
|
||||||
+ req.contract.getID());
|
|
||||||
break;
|
break;
|
||||||
case AddMember:
|
case AddMember:
|
||||||
LOGGER.debug("contractID:" + cumsg.getContractID());
|
LOGGER.debug("contractID:" + cumsg.getContractID());
|
||||||
unit = units.get(cumsg.getContractID());
|
unit = units.get(cumsg.getContractID());
|
||||||
unit.addMember(node, cumsg); //启动通过在UDPTrustExecutor中遍历memebers,使得这个合约集群中每个节点都有其他节点的UDP信息,便于之后护发UDP消息
|
unit.addMember(node, cumsg); // 启动通过在UDPTrustExecutor中遍历memebers,使得这个合约集群中每个节点都有其他节点的UDP信息,便于之后护发UDP消息
|
||||||
break;
|
break;
|
||||||
case Sequencing:
|
case Sequencing:
|
||||||
unit = units.get(cumsg.getContractID());
|
unit = units.get(cumsg.getContractID());
|
||||||
@ -59,7 +52,7 @@ public class ContractUnitController {
|
|||||||
break;
|
break;
|
||||||
case Unknown:
|
case Unknown:
|
||||||
default:
|
default:
|
||||||
//recover start
|
// recover start
|
||||||
ContractUnitStartRequest req2 = ContractUnitStartRequest.parse(cumsg.content);
|
ContractUnitStartRequest req2 = ContractUnitStartRequest.parse(cumsg.content);
|
||||||
unit = new ContractUnit();
|
unit = new ContractUnit();
|
||||||
unit.connection = connection;
|
unit.connection = connection;
|
||||||
@ -68,12 +61,8 @@ public class ContractUnitController {
|
|||||||
unit.node2member = new HashMap<>();
|
unit.node2member = new HashMap<>();
|
||||||
units.put(req2.contract.getID(), unit);
|
units.put(req2.contract.getID(), unit);
|
||||||
|
|
||||||
System.out.println(
|
System.out.println("[ContractUnitController] startContract:" + " isMaster:"
|
||||||
"[ContractUnitController] startContract:"
|
+ req2.isMaster + " cid:" + req2.contract.getID());
|
||||||
+ " isMaster:"
|
|
||||||
+ req2.isMaster
|
|
||||||
+ " cid:"
|
|
||||||
+ req2.contract.getID());
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -84,7 +73,7 @@ public class ContractUnitController {
|
|||||||
|
|
||||||
public static class ContractUnit implements TrustfulExecutorConnection, Serializable {
|
public static class ContractUnit implements TrustfulExecutorConnection, Serializable {
|
||||||
public String contractID;
|
public String contractID;
|
||||||
public Map<Node, ContractUnitMember> node2member; //存储和自己在一个合约集群的其他节点信息
|
public Map<Node, ContractUnitMember> node2member; // 存储和自己在一个合约集群的其他节点信息
|
||||||
private transient CommitAlgorithm commitAlgorithm;
|
private transient CommitAlgorithm commitAlgorithm;
|
||||||
private transient TrustfulExecutorConnection connection;
|
private transient TrustfulExecutorConnection connection;
|
||||||
|
|
||||||
@ -148,16 +137,9 @@ public class ContractUnitController {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public String getDisplayStr() {
|
public String getDisplayStr() {
|
||||||
return "pSize:"
|
return "pSize:" + (prepare == null ? "null" : prepare.size()) + " cSize:"
|
||||||
+ (prepare == null ? "null" : prepare.size())
|
+ (commit == null ? "null" : commit.size()) + " isSendCommit:" + isSendCommit
|
||||||
+ " cSize:"
|
+ " isSendReply:" + isSendReply + " buffSize:" + buff.size();
|
||||||
+ (commit == null ? "null" : commit.size())
|
|
||||||
+ " isSendCommit:"
|
|
||||||
+ isSendCommit
|
|
||||||
+ " isSendReply:"
|
|
||||||
+ isSendReply
|
|
||||||
+ " buffSize:"
|
|
||||||
+ buff.size();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -55,15 +55,16 @@ public class ContractUnitMessage implements Serializable {
|
|||||||
|
|
||||||
private static String IDA = "PBFTMsg";
|
private static String IDA = "PBFTMsg";
|
||||||
|
|
||||||
// public void sign(NodeInfo info) {
|
// public void sign(NodeInfo info) {
|
||||||
// sender = info.getNodeID();
|
// sender = info.getNodeID();
|
||||||
// signature = sm02.sign(content, IDA, info.privKey).toString();
|
// signature = sm02.sign(content, IDA, info.privKey).toString();
|
||||||
// }
|
// }
|
||||||
|
|
||||||
public boolean verify(ContractUnitMember member) {
|
public boolean verify(ContractUnitMember member) {
|
||||||
ECPublicKeyParameters param = BCECUtil.createECPublicKeyFromStrParameters(member.pubKey,SM2Util.CURVE,SM2Util.DOMAIN_PARAMS);
|
ECPublicKeyParameters param = BCECUtil.createECPublicKeyFromStrParameters(member.pubKey,
|
||||||
|
SM2Util.CURVE, SM2Util.DOMAIN_PARAMS);
|
||||||
|
|
||||||
return SM2Util.verify(param,content, signature.getBytes());
|
return SM2Util.verify(param, content, signature.getBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setType(ContractUnitType type) {
|
public void setType(ContractUnitType type) {
|
||||||
|
@ -5,6 +5,7 @@ import java.io.Serializable;
|
|||||||
public enum ContractUnitType implements Serializable {
|
public enum ContractUnitType implements Serializable {
|
||||||
|
|
||||||
Start(0), AddMember(1), Sequencing(2), Unknown(3);
|
Start(0), AddMember(1), Sequencing(2), Unknown(3);
|
||||||
|
|
||||||
private int type;
|
private int type;
|
||||||
|
|
||||||
private ContractUnitType(int i) {
|
private ContractUnitType(int i) {
|
||||||
|
@ -101,10 +101,9 @@ public class MultiContractMeta implements IDSerializable {
|
|||||||
|
|
||||||
public void setLastExeSeq(int lastExeSeq) {
|
public void setLastExeSeq(int lastExeSeq) {
|
||||||
this.lastExeSeq.set(lastExeSeq);
|
this.lastExeSeq.set(lastExeSeq);
|
||||||
if (KeyValueDBUtil.instance.containsKey(
|
if (KeyValueDBUtil.instance.containsKey(CMTables.LastExeSeq.toString(), contractID)) { // 如果现在是Stable模式就同步刷到磁盘
|
||||||
CMTables.LastExeSeq.toString(), contractID)) { // 如果现在是Stable模式就同步刷到磁盘
|
KeyValueDBUtil.instance.setValue(CMTables.LastExeSeq.toString(), contractID,
|
||||||
KeyValueDBUtil.instance.setValue(
|
String.valueOf(lastExeSeq));
|
||||||
CMTables.LastExeSeq.toString(), contractID, String.valueOf(lastExeSeq));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -154,19 +153,18 @@ public class MultiContractMeta implements IDSerializable {
|
|||||||
public String[] getMembers() {
|
public String[] getMembers() {
|
||||||
return members;
|
return members;
|
||||||
}
|
}
|
||||||
public String joinMembers(String delimiter){
|
|
||||||
|
public String joinMembers(String delimiter) {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
if (members.length > 0)
|
if (members.length > 0)
|
||||||
sb.append(members[0]);
|
sb.append(members[0]);
|
||||||
else return "";
|
else
|
||||||
|
return "";
|
||||||
for (int i = 1; i < members.length; i++) {
|
for (int i = 1; i < members.length; i++) {
|
||||||
sb.append(delimiter).append(members[i]);
|
sb.append(delimiter).append(members[i]);
|
||||||
}
|
}
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
public void setMembers(String[] m) {
|
|
||||||
members = m;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setMembers(JsonArray members) {
|
public void setMembers(JsonArray members) {
|
||||||
String[] copied = new String[members.size()];
|
String[] copied = new String[members.size()];
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
package org.bdware.sc.units;
|
package org.bdware.sc.units;
|
||||||
|
|
||||||
public enum RecoverFlag {
|
public enum RecoverFlag {
|
||||||
Fine,ToRecover,Recovering;
|
Fine, ToRecover, Recovering;
|
||||||
}
|
}
|
||||||
|
@ -24,7 +24,8 @@ public class RespCache {
|
|||||||
} else {
|
} else {
|
||||||
waiter.wait(5000L);
|
waiter.wait(5000L);
|
||||||
timeout &= waiter.get() * 2 > count;
|
timeout &= waiter.get() * 2 > count;
|
||||||
if (!timeout) waiter.notifyAll();
|
if (!timeout)
|
||||||
|
waiter.notifyAll();
|
||||||
return timeout;
|
return timeout;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -25,9 +25,8 @@ public class DHTUtil {
|
|||||||
|
|
||||||
String hash = HashUtil.sha3ToFixedLen(key, nodes[0].length() - 2);
|
String hash = HashUtil.sha3ToFixedLen(key, nodes[0].length() - 2);
|
||||||
|
|
||||||
int l = 0, r = nodes.length - 1, m,
|
int l = 0, r = nodes.length - 1, m, h2l = hash.compareTo(nodes[l].substring(2)),
|
||||||
h2l = hash.compareTo(nodes[l].substring(2)), r2h = nodes[r].substring(2).compareTo(hash),
|
r2h = nodes[r].substring(2).compareTo(hash), h2m;
|
||||||
h2m;
|
|
||||||
BigInteger bigH = null;
|
BigInteger bigH = null;
|
||||||
String selected;
|
String selected;
|
||||||
do {
|
do {
|
||||||
@ -42,8 +41,7 @@ public class DHTUtil {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (l + 1 == r) {
|
if (l + 1 == r) {
|
||||||
BigInteger bigL = getBigInteger(nodes[l]),
|
BigInteger bigL = getBigInteger(nodes[l]), bigR = getBigInteger(nodes[r]);
|
||||||
bigR = getBigInteger(nodes[r]);
|
|
||||||
bigH = new BigInteger(hash, 16);
|
bigH = new BigInteger(hash, 16);
|
||||||
if (bigR.subtract(bigH).compareTo(bigH.subtract(bigL)) > -1) {
|
if (bigR.subtract(bigH).compareTo(bigH.subtract(bigL)) > -1) {
|
||||||
selected = nodes[l];
|
selected = nodes[l];
|
||||||
@ -65,7 +63,7 @@ public class DHTUtil {
|
|||||||
}
|
}
|
||||||
} while (true);
|
} while (true);
|
||||||
if (k == 1) {
|
if (k == 1) {
|
||||||
return new String[]{selected};
|
return new String[] {selected};
|
||||||
}
|
}
|
||||||
List<String> ret = new ArrayList<>();
|
List<String> ret = new ArrayList<>();
|
||||||
ret.add(selected);
|
ret.add(selected);
|
||||||
|
@ -5,12 +5,16 @@ import org.bdware.sc.conn.Node;
|
|||||||
import org.bdware.sc.conn.OnHashCallback;
|
import org.bdware.sc.conn.OnHashCallback;
|
||||||
import org.bdware.sc.conn.ResultCallback;
|
import org.bdware.sc.conn.ResultCallback;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
public interface ContractExecutor {
|
public interface ContractExecutor {
|
||||||
void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb);
|
void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb);
|
||||||
|
|
||||||
default void close() {
|
default void onRecover(Map<String, Object> args) {}
|
||||||
}
|
|
||||||
|
|
||||||
default void onSyncMessage(Node node, byte[] data) {
|
default void onDeliverBlock(String data) {}
|
||||||
}
|
|
||||||
|
default void close() {}
|
||||||
|
|
||||||
|
default void onSyncMessage(Node node, byte[] data) {}
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,5 @@ package org.bdware.server.trustedmodel;
|
|||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
public enum ContractUnitStatus implements Serializable {
|
public enum ContractUnitStatus implements Serializable {
|
||||||
CommonMode,
|
CommonMode, StableMode;
|
||||||
StableMode;
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,11 @@
|
|||||||
|
package org.bdware.server.trustedmodel;
|
||||||
|
|
||||||
|
public class MultiReqSeq {
|
||||||
|
public final int seq;
|
||||||
|
public final long startTime;
|
||||||
|
|
||||||
|
public MultiReqSeq(int s) {
|
||||||
|
seq = s;
|
||||||
|
startTime = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
}
|
@ -14,7 +14,8 @@ public class SingleNodeExecutor implements ContractExecutor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) {
|
public void execute(String requestID, ContractRequest req, ResultCallback rcb,
|
||||||
|
OnHashCallback hcb) {
|
||||||
cm.executeLocallyAsync(req, rcb, hcb);
|
cm.executeLocallyAsync(req, rcb, hcb);
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -2,9 +2,11 @@ package org.bdware.sc.test;
|
|||||||
|
|
||||||
import org.bdware.sc.ContractManager;
|
import org.bdware.sc.ContractManager;
|
||||||
|
|
||||||
|
|
||||||
public class ContractManagerTest {
|
public class ContractManagerTest {
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
ContractManager.yjsPath = "./generatedlib/yjs.jar";
|
ContractManager.yjsPath = "./generatedlib/yjs.jar";
|
||||||
ContractManager.instance = new ContractManager();
|
ContractManager.instance = new ContractManager();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
20
src/test/java/org/bdware/sc/test/ReflectionTest.java
Normal file
20
src/test/java/org/bdware/sc/test/ReflectionTest.java
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
package org.bdware.sc.test;
|
||||||
|
|
||||||
|
import com.google.gson.Gson;
|
||||||
|
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
|
||||||
|
public class ReflectionTest {
|
||||||
|
public static class Test {
|
||||||
|
public static void go(String... args) {
|
||||||
|
System.out.println(new Gson().toJson(args));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@org.junit.Test
|
||||||
|
public void go() throws Exception {
|
||||||
|
Method m = Test.class.getDeclaredMethod("go", String[].class);
|
||||||
|
String[] abc = new String[] {"ab", "cd"};
|
||||||
|
m.invoke(null, new Object[] {abc});
|
||||||
|
}
|
||||||
|
}
|
@ -12,17 +12,15 @@ public class RespCacheTest {
|
|||||||
new Thread(() -> {
|
new Thread(() -> {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(j * 1000);
|
Thread.sleep(j * 1000);
|
||||||
if (j > 2) Thread.sleep(6 * 1000);
|
if (j > 2)
|
||||||
|
Thread.sleep(6 * 1000);
|
||||||
|
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
boolean waitResult = cache.waitForHalf();
|
boolean waitResult = cache.waitForHalf();
|
||||||
System.out.println(
|
System.out.println("tid:" + Thread.currentThread().getId()
|
||||||
"tid:"
|
+ " reach target, waitResult:" + waitResult);
|
||||||
+ Thread.currentThread().getId()
|
|
||||||
+ " reach target, waitResult:"
|
|
||||||
+ waitResult);
|
|
||||||
}).start();
|
}).start();
|
||||||
}
|
}
|
||||||
Thread.sleep(20000);
|
Thread.sleep(20000);
|
||||||
@ -37,17 +35,15 @@ public class RespCacheTest {
|
|||||||
new Thread(() -> {
|
new Thread(() -> {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(j * 1000);
|
Thread.sleep(j * 1000);
|
||||||
if (j > 2) Thread.sleep(1900);
|
if (j > 2)
|
||||||
|
Thread.sleep(1900);
|
||||||
|
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
boolean waitResult = cache.waitForHalf();
|
boolean waitResult = cache.waitForHalf();
|
||||||
System.out.println(
|
System.out.println("tid:" + Thread.currentThread().getId()
|
||||||
"tid:"
|
+ " reach target, waitResult:" + waitResult);
|
||||||
+ Thread.currentThread().getId()
|
|
||||||
+ " reach target, waitResult:"
|
|
||||||
+ waitResult);
|
|
||||||
}).start();
|
}).start();
|
||||||
}
|
}
|
||||||
Thread.sleep(20000);
|
Thread.sleep(20000);
|
||||||
@ -66,11 +62,8 @@ public class RespCacheTest {
|
|||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
boolean waitResult = cache.waitForHalf();
|
boolean waitResult = cache.waitForHalf();
|
||||||
System.out.println(
|
System.out.println("tid:" + Thread.currentThread().getId()
|
||||||
"tid:"
|
+ " reach target, waitResult:" + waitResult);
|
||||||
+ Thread.currentThread().getId()
|
|
||||||
+ " reach target, waitResult:"
|
|
||||||
+ waitResult);
|
|
||||||
}).start();
|
}).start();
|
||||||
}
|
}
|
||||||
Thread.sleep(10000);
|
Thread.sleep(10000);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user