Compare commits

...

25 Commits

Author SHA1 Message Date
garvey-wong
46d1ebb9bf feat: sharding executor exec locally 2022-05-03 15:01:46 +08:00
WangXuxin
afa327419b feat: support @RouteInfo byFunc 2022-04-27 10:49:45 +08:00
CaiHQ
ff78ffc35d keep sync 2022-04-21 10:19:44 +08:00
CaiHQ
6f1fc41aad feat: support createParam
feat: docker script
2022-04-21 10:18:48 +08:00
CaiHQ
a8ea5c6d53 updatet submodulee 2022-03-30 11:35:37 +08:00
CaiHQ
d9a99cc60e add nodecenterws config 2022-03-30 11:17:17 +08:00
CaiHQ
44321ba60a keep sync 2022-03-23 14:50:46 +08:00
CaiHQ
1d55fdb69a keep sync 2022-03-22 23:52:10 +08:00
CaiHQ
ee0a158a00 support ledgerparams
add startContract at cmconfig.json
add docker scripts
2022-03-22 23:52:02 +08:00
CaiHQ
be728c012b keep sync 2022-02-18 11:16:10 +08:00
CaiHQ
f5c849893f fix: MultiPointCooperationExecutor 2022-02-18 11:14:11 +08:00
CaiHQ
604fdc80bd Merge branch 'feat/consistency-sdk' of gitee.com:BDWare/bdcontract-bundle 2022-02-17 16:44:38 +08:00
CaiHQ
9563feca03 test action: prune killed Contract 2022-02-17 16:42:17 +08:00
汪旭鑫
fb8e585649 Merge branch 'master' into feat/consistency-sdk
# Conflicts:
#	cm
2022-02-15 15:04:52 +08:00
汪旭鑫
51a89ca929 refactor: sdk for consensus algorithm 2022-02-15 14:44:18 +08:00
Frank.R.Wu
c503c3413f feat: update event mechanism
add second centers for event topics
2022-01-20 20:55:49 +08:00
Frank.R.Wu
331453bb5b chore: keep sync 2022-01-18 19:34:35 +08:00
CaiHQ
779ddbe764 upgrad doipsdk 2022-01-09 00:04:04 +08:00
CaiHQ
65867aff0e keey sync 2022-01-06 20:35:45 +08:00
CaiHQ
d3762b0cc9 keep sync 2022-01-04 11:10:18 +08:00
CaiHQ
3c307f8580 prune: use stable doip-sdk 2022-01-04 10:16:29 +08:00
CaiHQ
b1e31de2de prune: use stable doip-sdk 2022-01-04 10:16:27 +08:00
yanghuanyu
e0b7db9220 chore: sync common 2021-12-31 15:06:51 +08:00
Frank.R.Wu
08d0ac83bb chore: keep sync 2021-12-29 20:26:59 +08:00
CaiHQ
29109b222b optimize receive file 2021-12-29 11:52:30 +08:00
97 changed files with 2519 additions and 24 deletions

28
.gitmodules vendored
View File

@ -1,41 +1,49 @@
[submodule "cm"]
path = cm
url = https://gitee.com/BDWare/cm.git
url = git@gitee.com/BDWare/cm.git
branch = master
[submodule "agent-backend"]
path = agent-backend
url = https://gitee.com/BDWare/agent-backend.git
url = git@gitee.com/BDWare/agent-backend.git
branch = master
[submodule "router-backend"]
path = router-backend
url = https://gitee.com/BDWare/router-backend.git
url = git@gitee.com/BDWare/router-backend.git
branch = master
[submodule "router-frontend"]
path = router-frontend
url = https://gitee.com/BDWare/router-frontend.git
url = git@gitee.com/BDWare/router-frontend.git
branch = master
[submodule "genparser"]
path = genparser
url = https://gitee.com/BDWare/genparser.git
url = git@gitee.com/BDWare/genparser.git
branch = master
[submodule "gmhelper"]
path = gmhelper
url = https://gitee.com/BDWare/gmhelper.git
url = git@gitee.com/BDWare/gmhelper.git
branch = master
[submodule "front-base"]
path = front-base
url = https://gitee.com/BDWare/front-base.git
url = git@gitee.com/BDWare/front-base.git
branch = master
[submodule "MockJava"]
path = mockjava
url = https://gitee.com/BDWare/MockJava.git
url = git@gitee.com/BDWare/MockJava.git
branch = master
[submodule "common"]
path = common
url = https://gitee.com/BDWare/common.git
url = git@gitee.com/BDWare/common.git
branch = master
[submodule "agent-frontend"]
path = agent-frontend
url = https://gitea.internetapi.cn/bdware/bdcontract-web-ide.git
url = git@gitea.internetapi.cn:bdware/bdcontract-web-ide.git
branch = master
[submodule "consistency-sdk"]
path = consistency-sdk
url = git@gitee.com:BDWare/consistency-sdk.git
branch = master
[submodule "custom-plugin"]
path = custom-plugin
url = git@gitee.com:BDWare/custom-plugin.git
branch = master

@ -1 +1 @@
Subproject commit bc4eb8b828e487c8d38c9b627eff26eb26daa663
Subproject commit da1f524a06bfb1a05753145bcd1f765ea4a42e81

@ -1 +1 @@
Subproject commit cdcdfbdfde22640420318b595e13e0b16da10ea0
Subproject commit b0f4c4225b5152b11f5641ac766928a7a7f64b2a

View File

@ -2,7 +2,7 @@ plugins {
id 'java'
id 'idea'
}
def currVersion = "1.6.6"
def currVersion = "1.7.4"
ext.projectIds = ['group': 'com.bdware.sc', 'version': currVersion]
sourceCompatibility = 1.8
@ -18,6 +18,7 @@ project(':common') {
group = projectIds.group
version = projectIds.version
repositories {
mavenCentral()
maven { url 'https://maven.aliyun.com/repository/public' }
}
}
@ -26,6 +27,7 @@ project(':cm') {
group = projectIds.group
version = projectIds.version
repositories {
mavenCentral()
maven { url 'https://maven.aliyun.com/repository/public' }
}
}
@ -34,6 +36,7 @@ project(':front-base') {
group = projectIds.group
version = projectIds.version
repositories {
mavenCentral()
maven { url 'https://maven.aliyun.com/repository/public' }
}
}
@ -42,6 +45,7 @@ project(':agent-backend') {
group = projectIds.group
version = projectIds.version
repositories {
mavenCentral()
maven { url 'https://maven.aliyun.com/repository/public' }
}
}
@ -50,6 +54,7 @@ project(':router-backend') {
group = projectIds.group
version = projectIds.version
repositories {
mavenCentral()
maven { url 'https://maven.aliyun.com/repository/public' }
}
}
@ -59,6 +64,7 @@ project(":gmhelper") {
group = projectIds.group
version = projectIds.version
repositories {
mavenCentral()
maven { url 'https://maven.aliyun.com/repository/public' }
}
}

2
cm

@ -1 +1 @@
Subproject commit ec21fc95001526ed6368c1dc49e98c8736783231
Subproject commit c84cd3fac0a14afe33e5645044c6421efad2ee11

2
common

@ -1 +1 @@
Subproject commit 72878eff91bee161abda23706e99b26f06106870
Subproject commit 7a9ce393cdb14495eeb56507d00b8adf26438746

26
consistency-sdk/.gitignore vendored Normal file
View File

@ -0,0 +1,26 @@
/build/
/testoutput/
*/build/*
# Compiled class file
*.class
.DS_Store
# Log file
*.log
# BlueJ files
*.ctxt
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.jar
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*

View File

@ -0,0 +1,38 @@
plugins {
id 'java-library'
}
group 'com.bdware.sc'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
sourceSets {
main {
java {
srcDir 'src/main/java'
}
resources {
srcDir 'src/main/resources'
}
}
test {
java {
srcDir 'src/test/java'
}
resources {
srcDir 'src/test/resources'
}
}
}
dependencies {
api project(":cm")
api project(":front-base")
testImplementation 'junit:junit:4.13.2'
}

View File

@ -0,0 +1,20 @@
org/bdware/sdk/consistency/api/context/IMasterServerRecoverMechAction.java
org.bdware.sdk.consistency.api.context.IMasterServerRecoverMechAction
org/bdware/sdk/consistency/api/context/INetworkManager.java
org.bdware.sdk.consistency.api.context.INetworkManager
org/bdware/sdk/consistency/api/context/IMasterServerTCPAction.java
org.bdware.sdk.consistency.api.context.IMasterServerTCPAction
org/bdware/sdk/consistency/api/context/IGlobalConf.java
org.bdware.sdk.consistency.api.context.IGlobalConf
org/bdware/sdk/consistency/api/NotifiableResultMerger.java
org.bdware.sdk.consistency.api.NotifiableResultMerger
org/bdware/sdk/consistency/api/ContractExecutorFactory.java
org.bdware.sdk.consistency.api.ContractExecutorFactory
org/bdware/sdk/consistency/api/context/ISDKContext.java
org.bdware.sdk.consistency.api.context.ISDKContext
org/bdware/sdk/consistency/ConsistencyPluginManager.java
org.bdware.sdk.consistency.ConsistencyPluginManager
org.bdware.sdk.consistency.ConsistencyPluginManager$1
org.bdware.sdk.consistency.ConsistencyPluginManager$Inner
org/bdware/sdk/consistency/api/context/ICMActions.java
org.bdware.sdk.consistency.api.context.ICMActions

View File

@ -0,0 +1,2 @@
Manifest-Version: 1.0

View File

@ -0,0 +1,176 @@
package org.bdware.sdk.consistency;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.bean.ContractExecType;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.sdk.consistency.api.context.ISDKContext;
import org.bdware.server.CMDConf;
import org.bdware.server.trustedmodel.ContractExecutor;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Modifier;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
public class ConsistencyPluginManager {
private static final Logger LOGGER = LogManager.getLogger(ConsistencyPluginManager.class);
private static final String CONFIG_PATH = "cmconfig.json";
private ISDKContext SDKContext;
public static void setContext(ISDKContext SDKContext) {
getInstance().SDKContext = SDKContext;
}
public static ConsistencyPluginManager getInstance() {
return Inner.instance;
}
public static ISDKContext getContext() {
return getInstance().SDKContext;
}
/**
* 静态内部类单例 懒加载
*/
private static class Inner {
private static final ConsistencyPluginManager instance = new ConsistencyPluginManager();
}
private final Map<String, ContractExecutorFactory> factoriesMap = new HashMap<>();
private URLClassLoader urlClassLoader;
private ConsistencyPluginManager() {
loadPlugins();
}
private void loadPlugins() {
// 读取配置
CMDConf cmdConf = getCmdConf();
if (cmdConf.consistencyPlugins.isEmpty()) {
LOGGER.info("Consistency SDK: no plugin detected");
return;
}
// 装配urlClassLoader
String[] consistencyPluginURLs = cmdConf.consistencyPlugins.split(",");
URL[] urls = new URL[consistencyPluginURLs.length];
for (int i = 0; i < consistencyPluginURLs.length; i++) {
try {
if(consistencyPluginURLs[i].endsWith(".jar")) {
urls[i] = new URL("file:" + consistencyPluginURLs[i]);
}
} catch (MalformedURLException e) {
e.printStackTrace();
}
}
urlClassLoader = new URLClassLoader(urls, Thread.currentThread().getContextClassLoader());
// 扫描jar包 转载插件
for (String path : consistencyPluginURLs) {
parseJar(path);
}
System.out.println();
}
private CMDConf getCmdConf() {
File confFile = new File(CONFIG_PATH);
File confTemplate = new File(CONFIG_PATH + ".template");
if (!confTemplate.exists()) {
CMDConf conf = new CMDConf();
conf.write(confTemplate.getAbsolutePath());
}
if (!confFile.exists() && confTemplate.exists()) {
try {
FileUtils.copyFile(confTemplate, confFile);
} catch (IOException e) {
e.printStackTrace();
}
}
return CMDConf.parseFile(CONFIG_PATH);
}
/**
* 从jar包中解析ContractExecutorFactory子类
*
* @param path
*/
private void parseJar(String path) {
try {
File file = new File(path);
if (file.exists()) {
JarFile jarFile = new JarFile(file.getCanonicalPath());
Enumeration<JarEntry> enumeration = jarFile.entries();
while (enumeration.hasMoreElements()) {
JarEntry entry = enumeration.nextElement();
String entryName = entry.getName();
if (entryName.endsWith(".class") && entryName.lastIndexOf(".") >= entryName.lastIndexOf("/")) {
try {
String className = entryName.replaceAll("/", ".").substring(0, entryName.length() - 6);
Class<?> clazz = urlClassLoader.loadClass(className);
// ContractExecutorFactory子类 && 非抽象类
if (ContractExecutorFactory.class.isAssignableFrom(clazz) && !Modifier.isAbstract(clazz.getModifiers())) {
ContractExecutorFactory factory = (ContractExecutorFactory)(clazz.newInstance());
factoriesMap.put(factory.getExecutorName(), factory);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public ContractExecutor createContractExecutor(String name, Map<String, Object> args) {
return factoriesMap.containsKey(name) ? factoriesMap.get(name).getInstance(args) : null;
}
@Deprecated
public ContractExecutor createContractExecutor(ContractExecType type, Map<String, Object> args) {
String name = "";
switch (type) {
case Sole:
name = "Sole";
break;
case PBFT:
name = "PBFT";
break;
case Sharding:
name = "Sharding";
break;
case RequestOnce:
name = "RequestOnce";
break;
case ResponseOnce:
name = "ResponseOnce";
break;
case SelfAdaptiveSharding:
name = "SASharding";
break;
case RequestAllResponseAll:
name = "RARA";
break;
case RequestAllResponseHalf:
name = "RARH";
break;
case RequestAllResponseFirst:
name = "RARF";
break;
}
return createContractExecutor(name, args);
}
}

View File

@ -0,0 +1,11 @@
package org.bdware.sdk.consistency.api;
import org.bdware.server.trustedmodel.ContractExecutor;
import java.util.Map;
public interface ContractExecutorFactory {
String getExecutorName();
ContractExecutor getInstance(Map<String, Object> args);
}

View File

@ -0,0 +1,7 @@
package org.bdware.sdk.consistency.api;
public interface NotifiableResultMerger {
String getContractID();
String getInfo();
}

View File

@ -0,0 +1,7 @@
package org.bdware.sdk.consistency.api.context;
import org.bdware.sc.ContractManager;
public interface ICMActions {
ContractManager getManager();
}

View File

@ -0,0 +1,9 @@
package org.bdware.sdk.consistency.api.context;
import org.zz.gmhelper.SM2KeyPair;
public interface IGlobalConf {
String getNodeID();
SM2KeyPair getKeyPair();
}

View File

@ -0,0 +1,8 @@
package org.bdware.sdk.consistency.api.context;
import com.google.gson.JsonObject;
import org.bdware.sc.conn.ResultCallback;
public interface IMasterClientTCPAction {
void asyncExecuteContractLocally(JsonObject jo, ResultCallback rc);
}

View File

@ -0,0 +1,11 @@
package org.bdware.sdk.consistency.api.context;
import org.bdware.sc.units.RecoverFlag;
import java.util.Map;
public interface IMasterServerRecoverMechAction {
Map<String, Map<String, RecoverFlag>> getRecoverStatusMap();
void restartContractFromCommonMode(String nodeID, String contractID);
}

View File

@ -0,0 +1,12 @@
package org.bdware.sdk.consistency.api.context;
import org.bdware.sc.units.RequestCache;
import org.bdware.server.action.SyncResult;
import java.util.Map;
public interface IMasterServerTCPAction {
SyncResult getSync();
Map<String, RequestCache> getReqCache();
}

View File

@ -0,0 +1,11 @@
package org.bdware.sdk.consistency.api.context;
import org.bdware.sc.conn.ResultCallback;
public interface INetworkManager {
void sendToAgent(String pubkey, String content);
boolean hasAgentConnection(String pubKey);
ResultCallback createResultCallback(String requestID, ResultCallback rc, int count);
}

View File

@ -0,0 +1,15 @@
package org.bdware.sdk.consistency.api.context;
public interface ISDKContext {
IMasterClientTCPAction getMasterClientTCPAction();
IMasterServerTCPAction getMasterServerTCPAction();
INetworkManager getNetworkManager();
ICMActions getCMActions();
IMasterServerRecoverMechAction getMasterServerRecoverMechAction();
IGlobalConf getGlobalConf();
}

26
custom-plugin/.gitignore vendored Normal file
View File

@ -0,0 +1,26 @@
/build/
/testoutput/
*/build/*
# Compiled class file
*.class
.DS_Store
# Log file
*.log
# BlueJ files
*.ctxt
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.jar
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*

View File

@ -0,0 +1,25 @@
plugins {
id 'java'
}
repositories {
mavenCentral()
}
dependencies {
implementation project(":consistency-sdk")
testImplementation 'junit:junit:4.13.2'
}
jar {
String libs = ''
configurations.runtimeClasspath.each {
libs = libs + " libs/" + it.name
}
manifest {
attributes 'Manifest-Version': archiveVersion
attributes 'Class-Path': libs
}
}

Binary file not shown.

View File

@ -0,0 +1,43 @@
org/bdware/consistency/plugin/ra/RequestAllResponseHalfFactory.java
org.bdware.consistency.plugin.ra.RequestAllResponseHalfFactory
org/bdware/consistency/plugin/ro/ResponseOnceExecutorFactory.java
org.bdware.consistency.plugin.ro.ResponseOnceExecutorFactory
org/bdware/consistency/plugin/ro/RequestOnceExecutorFactory.java
org.bdware.consistency.plugin.ro.RequestOnceExecutorFactory
org/bdware/consistency/plugin/pbft/PBFTExecutorFactory.java
org.bdware.consistency.plugin.pbft.PBFTExecutorFactory
org/bdware/consistency/plugin/sharding/SelfAdaptiveShardingExecutorFactory.java
org.bdware.consistency.plugin.sharding.SelfAdaptiveShardingExecutorFactory
org/bdware/consistency/plugin/ra/RequestAllResponseAllFactory.java
org.bdware.consistency.plugin.ra.RequestAllResponseAllFactory
org/bdware/consistency/plugin/ra/RequestAllResponseFirstFactory.java
org.bdware.consistency.plugin.ra.RequestAllResponseFirstFactory
org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutorFactory.java
org.bdware.consistency.plugin.sharding.MultiPointCooperationExecutorFactory
org/bdware/consistency/plugin/pbft/ContractCluster.java
org.bdware.consistency.plugin.pbft.ContractCluster
org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java
org.bdware.consistency.plugin.sharding.MultiPointCooperationExecutor
org.bdware.consistency.plugin.sharding.MultiPointCooperationExecutor$1
org.bdware.consistency.plugin.sharding.MultiPointCooperationExecutor$ResultMerger
org/bdware/consistency/plugin/pbft/PBFTExecutor.java
org.bdware.consistency.plugin.pbft.PBFTExecutor
org.bdware.consistency.plugin.pbft.PBFTExecutor$1
org.bdware.consistency.plugin.pbft.PBFTExecutor$1$1
org.bdware.consistency.plugin.pbft.PBFTExecutor$ResultMerger
org/bdware/consistency/plugin/common/AbstractContextContractExecutor.java
org.bdware.consistency.plugin.common.AbstractContextContractExecutor
org/bdware/consistency/plugin/single/SingleNodeExecutorFactory.java
org.bdware.consistency.plugin.single.SingleNodeExecutorFactory
org/bdware/consistency/plugin/ra/RequestAllExecutor.java
org.bdware.consistency.plugin.ra.RequestAllExecutor
org.bdware.consistency.plugin.ra.RequestAllExecutor$ResultMerger
org/bdware/consistency/plugin/sharding/SelfAdaptiveShardingExecutor.java
org.bdware.consistency.plugin.sharding.SelfAdaptiveShardingExecutor
org.bdware.consistency.plugin.sharding.SelfAdaptiveShardingExecutor$Block
org/bdware/consistency/plugin/ro/RequestOnceExecutor.java
org.bdware.consistency.plugin.ro.RequestOnceExecutor
org.bdware.consistency.plugin.ro.RequestOnceExecutor$1
org/bdware/consistency/plugin/ro/ResponseOnceExecutor.java
org.bdware.consistency.plugin.ro.ResponseOnceExecutor
org.bdware.consistency.plugin.ro.ResponseOnceExecutor$1

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,14 @@
package org.bdware.consistency.plugin.common;
import org.bdware.sdk.consistency.ConsistencyPluginManager;
import org.bdware.sdk.consistency.api.context.*;
import org.bdware.server.trustedmodel.ContractExecutor;
public abstract class AbstractContextContractExecutor implements ContractExecutor {
static protected IGlobalConf globalConf = ConsistencyPluginManager.getContext().getGlobalConf();
static protected ICMActions cmActions = ConsistencyPluginManager.getContext().getCMActions();
static protected INetworkManager networkManager = ConsistencyPluginManager.getContext().getNetworkManager();
static protected IMasterClientTCPAction masterClientTCPAction = ConsistencyPluginManager.getContext().getMasterClientTCPAction();
static protected IMasterServerTCPAction masterServerTCPAction = ConsistencyPluginManager.getContext().getMasterServerTCPAction();
static protected IMasterServerRecoverMechAction masterServerRecoverMechAction = ConsistencyPluginManager.getContext().getMasterServerRecoverMechAction();
}

View File

@ -0,0 +1,35 @@
package org.bdware.consistency.plugin.pbft;
import com.google.gson.JsonObject;
import org.bdware.sc.conn.ByteUtil;
import org.bdware.sc.units.PubKeyNode;
import org.bdware.sc.units.TrustfulExecutorConnection;
import org.bdware.sdk.consistency.ConsistencyPluginManager;
import java.util.ArrayList;
import java.util.List;
public class ContractCluster implements TrustfulExecutorConnection<PubKeyNode> {
private final List<PubKeyNode> members;
String contractID;
public ContractCluster(String contractID, List<PubKeyNode> members) {
this.members = new ArrayList<>();
this.members.addAll(members);
this.contractID = contractID;
}
@Override
public void sendMessage(PubKeyNode node, byte[] msg) {
JsonObject jo = new JsonObject();
jo.addProperty("action", "contractSyncMessage");
jo.addProperty("contractID", contractID);
jo.addProperty("data", ByteUtil.encodeBASE64(msg));
ConsistencyPluginManager.getInstance().getContext().getNetworkManager().sendToAgent(node.pubkey, jo.toString());
}
@Override
public List<PubKeyNode> getNodes() {
return members;
}
}

View File

@ -0,0 +1,374 @@
package org.bdware.consistency.plugin.pbft;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.common.AbstractContextContractExecutor;
import org.bdware.sc.ComponedContractResult;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.Node;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.consistency.Committer;
import org.bdware.sc.consistency.pbft.PBFTAlgorithm;
import org.bdware.sc.consistency.pbft.PBFTMember;
import org.bdware.sc.consistency.pbft.PBFTMessage;
import org.bdware.sc.consistency.pbft.PBFTType;
import org.bdware.sc.units.*;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.trustedmodel.MultiReqSeq;
import org.zz.gmhelper.SM2KeyPair;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
//TODO 追赶差下的调用
public class PBFTExecutor extends AbstractContextContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(PBFTExecutor.class);
final Object lock = new Object();
private final List<PubKeyNode> members;
int resultCount;
AtomicInteger request_index = new AtomicInteger(0);
// key为requestIDvalue为其seq
Map<String, MultiReqSeq> seqMap = new ConcurrentHashMap<>();
Map<String, ResultCache> resultCache = new ConcurrentHashMap<>();
// MultiPointContractInfo info;
String contractID;
PBFTAlgorithm pbft;
ContractCluster contractCluster;
boolean isMaster;
public PBFTExecutor(
int c, String con_id, final String masterPubkey, String[] members) {
resultCount = c;
contractID = con_id;
this.members = new ArrayList<>();
isMaster = globalConf.getNodeID().equals(masterPubkey);
pbft = new PBFTAlgorithm(isMaster);
int count = 0;
for (String mem : members) {
PubKeyNode pubkeyNode = new PubKeyNode();
pubkeyNode.pubkey = mem;
PBFTMember pbftMember = new PBFTMember();
pbftMember.isMaster = mem.equals(masterPubkey);
pbft.addMember(pubkeyNode, pbftMember);
this.members.add(pubkeyNode);
if (globalConf.getNodeID().equals(mem)) {
pbft.setSendID(count);
}
count++;
}
contractCluster = new ContractCluster(contractID, this.members);
pbft.setConnection(contractCluster);
final MultiContractMeta cei = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID);
pbft.setCommitter(new Committer() {
@Override
public void onCommit(ContractRequest data) {
ResultCallback ret = null;
final long startTime = System.currentTimeMillis();
ret = new ResultCallback() {
@Override
public void onResult(String str) {
Map<String, String> ret = new HashMap<>();
ret.put("action", "receiveTrustfullyResult");
SM2KeyPair keyPair = globalConf.getKeyPair();
ret.put("nodeID", keyPair.getPublicKeyStr());
ret.put("responseID", data.getRequestID());
ret.put("executeTime", (System.currentTimeMillis() - startTime) + "");
ret.put("data", str);
cei.setLastExeSeq(data.seq);
networkManager.sendToAgent(masterPubkey, JsonUtil.toJson(ret));
}
};
cmActions.getManager().executeLocallyAsync(data, ret, null);
}
});
}
public void onSyncMessage(Node node, byte[] data) {
pbft.onMessage(node, data);
}
public void setSeq(int seq) {
request_index = new AtomicInteger(seq);
pbft.setAtomSeq(request_index.get());
}
public ResultCallback createResultCallback(
final String requestID,
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID) {
ComponedContractResult componedContractResult = new ComponedContractResult(count);
// TODO 加对应的超时
return networkManager.createResultCallback(
requestID, new ResultMerger(originalCb, count, request_seq, contractID), count);
}
public void sendRequest(String id, ContractRequest req, ResultCallback collector) {
// Map<String, Object> reqStr = new HashMap<>();
// reqStr.put("uniReqID", id);
// reqStr.put("data", req);
// reqStr.put("action", "executeContractLocally");
ContractRequest cr2 = ContractRequest.parse(req.toByte());
cr2.setRequestID(id);
PBFTMessage request = new PBFTMessage();
request.setOrder(req.seq);
request.setType(PBFTType.Request);
request.setContent(cr2.toByte());
for (PubKeyNode node : members) {
if (!networkManager.hasAgentConnection(node.pubkey)) {
LOGGER.warn("cmNode " + node.pubkey.substring(0, 5) + " is null");
collector.onResult(
"{\"status\":\"Error\",\"result\":\"node offline\","
+ "\"nodeID\":\""
+ node
+ "\","
+ "\"action\":\"onExecuteContractTrustfully\"}");
// } else if (MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID)
// != RecoverFlag.Fine) {
// collector.onResult(
// "{\"status\":\"Error\",\"result\":\"node recovering\","
// + "\"nodeID\":\""
// + node
// + "\","
// + "\"action\":\"onExecuteContractTrustfully\"}");
// contractCluster.sendMessage(node, request.getBytes());
} else {
contractCluster.sendMessage(node, request.getBytes());
}
}
// master负责缓存请求
if (!masterServerTCPAction.getReqCache().containsKey(contractID)) {
masterServerTCPAction.getReqCache().put(contractID, new RequestCache());
}
// TODO 多调多统一个seq的有多个请求这个需要改
String[] nodes =
cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers();
LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID());
LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes));
}
public boolean checkCurNodeNumValid() {
return true;
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
LOGGER.debug(JsonUtil.toJson(req));
MultiContractMeta meta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(req.getContractID());
if (meta == null || !meta.isMaster()) {
cmActions.getManager().executeContractOnOtherNodes(req, rc);
return;
}
req.setContractID(cmActions.getManager().getContractIDByName(req.getContractID()));
// 三个相同requestID进来的时候会有冲突
// 仅在此处有冲突么
// 这里是从MasterServer->MasterClient请求的是"executeContractLocally"
// 如果是多点合约的请求A1A2A3的序号应该一致不能分配一个新的seq根据requestID判断是否不需要重新分配一个序号
//TODO seqMap memory leak
//TODO
//TODO
if (null != requestID && requestID.endsWith("_mul")) {
synchronized (lock) {
if (seqMap.containsKey(requestID)) {
req.seq = seqMap.get(requestID).seq;
} else {
req.seq = request_index.getAndIncrement();
seqMap.put(requestID, new MultiReqSeq(req.seq));
}
}
} else {
req.seq = request_index.getAndIncrement();
}
req.needSeq = true;
String id =
System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq;
LOGGER.info("execute receive requestID=" + requestID + " msgID=" + id);
if (checkCurNodeNumValid()) {
LOGGER.debug("checkCurNodeNumValid=true");
ResultCallback collector =
createResultCallback(id, rc, resultCount, req.seq, req.getContractID());
masterServerTCPAction.getSync().sleep(id, collector);
LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq);
sendRequest(id, req, collector);
} else {
LOGGER.debug("invalidNodeNumOnResult");
request_index.getAndDecrement();
ContractResult finalResult =
new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("node number unavailable, request refused."));
rc.onResult(JsonUtil.toJson(finalResult));
}
// }
/* // 三个相同requestID进来的时候会有冲突
// 仅在此处有冲突么
// 这里是从MasterServer->MasterClient请求的是"executeContractLocally"
req.seq = request_index.getAndIncrement();
req.needSeq = true;
ResultCallback collector = createResultCallback(id, rc, resultCount, req.getContractID());
MasterServerTCPAction.sync.sleep(id, collector);
sendRequest(id, req, collector);*/
}
// 清理缓存的多点合约请求序号
public void clearCache() {
final long time = System.currentTimeMillis() - 30000L;
seqMap.entrySet()
.removeIf(
entry -> {
MultiReqSeq cache = entry.getValue();
if (null == cache) {
return true;
}
return cache.startTime < time;
});
}
public static class ResultMerger extends ResultCallback {
ComponedContractResult componedContractResult;
AtomicInteger order;
String contractID;
int count;
int request_seq;
ResultCallback originalCallback;
Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点
ResultMerger(
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID) {
originalCallback = originalCb;
this.count = count;
this.request_seq = request_seq;
this.contractID = contractID;
componedContractResult = new ComponedContractResult(count);
order = new AtomicInteger(0);
}
public String getContractID() {
return contractID;
}
public String getInfo() {
return "contractID="
+ contractID
+ " 收到第 "
+ order
+ " 个节点回复 : "
+ " order="
+ order
+ " count="
+ count
+ " ";
}
@Override
public void onResult(String str) {
// TODO 必须在这里聚合
// str的data是个ContractResult
// 在这儿也是返回个ContractResult
try {
LOGGER.debug("a result of contract" + contractID + ": " + str);
JsonObject obj = JsonParser.parseString(str).getAsJsonObject();
if (obj.has("nodeID")) {
String id = obj.get("nodeID").getAsString();
if (nodeIDs.contains(id)) {
LOGGER.debug(
"ignored result because the result of node "
+ id.substring(0, 5)
+ " has been received");
return;
}
nodeIDs.add(id);
}
LOGGER.debug(
String.format(
"contractID=%s received=%s order=%d count=%d",
contractID, str, order.get(), count));
componedContractResult.add(obj);
// 收集到所有结果
if (order.incrementAndGet() == count) {
ContractResult finalResult = componedContractResult.figureFinalResult();
finalResult.needSeq = true;
finalResult.seq = request_seq;
// if (null == finalResult) {
// finalResult =
// new ContractResult(
// ContractResult.Status.Exception,
// new JsonPrimitive(
// "no nore than half of the
// consistent result"));
// originalCallback.onResult(new
// Gson().toJson(finalResult));
// } else {
originalCallback.onResult(JsonUtil.toJson(finalResult));
// }
LOGGER.debug(
String.format(
"%d results are the same: %s",
finalResult.size, finalResult.result));
// 集群中事务序号+1
cmActions.getManager().multiContractRecorder
.getMultiContractMeta(contractID)
.nextSeqAtMaster();
// recover其中无状态合约CP出错无需恢复
Set<String> nodesID = componedContractResult.getProblemNodes();
if (null == nodesID || nodesID.isEmpty()) {
return;
}
for (String nodeID : nodesID) {
LOGGER.warn("node fails! " + nodeID);
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID)
== RecoverFlag.Fine) {
masterServerRecoverMechAction.getRecoverStatusMap()
.get(nodeID)
.put(contractID, RecoverFlag.ToRecover);
}
}
for (String nodeID : nodesID) {
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID)
== RecoverFlag.ToRecover) {
LOGGER.warn("node in recover " + nodeID);
// 因为该节点结果有误所以即时是stableMode也认为trans记录不可信
// 直接通过load别的节点来恢复
masterServerRecoverMechAction.restartContractFromCommonMode(
nodeID, contractID);
}
}
}
// clearCache();
} catch (Exception e) {
e.printStackTrace();
LOGGER.warn("result exception!");
}
}
}
@Override
public void onRecover(Map<String, Object> args) {
int ceiLastExeSeq = (int) args.get("ceiLastExeSeq");
this.setSeq(ceiLastExeSeq + 1);
}
}

View File

@ -0,0 +1,22 @@
package org.bdware.consistency.plugin.pbft;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor;
import java.util.Map;
public class PBFTExecutorFactory implements ContractExecutorFactory {
@Override
public String getExecutorName() {
return "PBFT";
}
@Override
public ContractExecutor getInstance(Map<String, Object> args) {
int nodeSize = (int) args.get("nodeSize");
String contractID = (String) args.get("contractID");
String masterPubkey = (String) args.get("masterPubkey");
String[] members = (String[]) args.get("members");
return new PBFTExecutor(nodeSize, contractID, masterPubkey, members);
}
}

View File

@ -0,0 +1,343 @@
package org.bdware.consistency.plugin.ra;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.common.AbstractContextContractExecutor;
import org.bdware.sc.ComponedContractResult;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractExecType;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.RecoverFlag;
import org.bdware.sc.units.RequestCache;
import org.bdware.sc.units.ResultCache;
import org.bdware.sc.util.JsonUtil;
import org.bdware.sdk.consistency.api.NotifiableResultMerger;
import org.bdware.server.trustedmodel.MultiReqSeq;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class RequestAllExecutor extends AbstractContextContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(RequestAllExecutor.class);
final Object lock = new Object();
int resultCount;
AtomicInteger request_index = new AtomicInteger(0);
ContractExecType type;
// key为requestIDvalue为其seq
Map<String, MultiReqSeq> seqMap = new ConcurrentHashMap<>();
Map<String, ResultCache> resultCache = new ConcurrentHashMap<>();
// MultiPointContractInfo info;
String contractID;
public RequestAllExecutor(
ContractExecType t, int c, String con_id) {
type = t;
resultCount = c;
contractID = con_id;
}
public void setSeq(int seq) {
request_index = new AtomicInteger(seq);
}
public ResultCallback createResultCallback(
final String requestID,
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID) {
ComponedContractResult componedContractResult = new ComponedContractResult(count);
// TODO 加对应的超时
return networkManager.createResultCallback(
requestID, new ResultMerger(originalCb, count, request_seq, contractID), count);
}
public void sendRequest(String id, ContractRequest req, ResultCallback collector) {
Map<String, Object> reqStr = new HashMap<>();
reqStr.put("uniReqID", id);
reqStr.put("data", req);
reqStr.put("action", "executeContractLocally");
String sendStr = JsonUtil.toJson(reqStr);
// master负责缓存请求
if (!masterServerTCPAction.getReqCache().containsKey(contractID)) {
masterServerTCPAction.getReqCache().put(contractID, new RequestCache());
}
// TODO 多调多统一个seq的有多个请求这个需要改
masterServerTCPAction.getReqCache().get(contractID).put(req.seq, sendStr);
LOGGER.debug(JsonUtil.toJson(req));
String[] nodes =
cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers();
LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID());
LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes));
for (String node : nodes) {
LOGGER.debug("get cmNode " + node.substring(0, 5));
if (!networkManager.hasAgentConnection(node)) {
LOGGER.warn("cmNode " + node.substring(0, 5) + " is null");
collector.onResult(
"{\"status\":\"Error\",\"result\":\"node offline\","
+ "\"nodeID\":\""
+ node
+ "\","
+ "\"action\":\"onExecuteContractTrustfully\"}");
} else if (masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID)
!= RecoverFlag.Fine) {
collector.onResult(
"{\"status\":\"Error\",\"result\":\"node recovering\","
+ "\"nodeID\":\""
+ node
+ "\","
+ "\"action\":\"onExecuteContractTrustfully\"}");
networkManager.sendToAgent(node, sendStr);
} else {
LOGGER.info("send request to cmNode " + node.substring(0, 5));
networkManager.sendToAgent(node, sendStr);
}
}
}
public boolean checkCurNodeNumValid() {
String[] nodes =
cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers();
int validNode = 0;
Map<String, String> mapResult = new HashMap<>();
for (String node : nodes) {
mapResult.put(node.substring(0, 5), String.format("%s %s", networkManager.hasAgentConnection(node) + "",
masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID)));
if (networkManager.hasAgentConnection(node)
&& masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID)
== RecoverFlag.Fine) {
validNode++;
}
}
LOGGER.info(JsonUtil.toPrettyJson(mapResult));
int c = resultCount;
if (ContractExecType.RequestAllResponseAll.equals(type)) {
c = (int) Math.ceil((double) c / 2);
}
LOGGER.debug("c=" + c + " validNode=" + validNode);
return validNode >= c;
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
LOGGER.debug(JsonUtil.toJson(req));
MultiContractMeta meta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(req.getContractID());
if (meta == null || !meta.isMaster()) {
cmActions.getManager().executeContractOnOtherNodes(req, rc);
return;
}
req.setContractID(cmActions.getManager().getContractIDByName(req.getContractID()));
// 三个相同requestID进来的时候会有冲突
// 仅在此处有冲突么
// 这里是从MasterServer->MasterClient请求的是"executeContractLocally"
// 如果是多点合约的请求A1A2A3的序号应该一致不能分配一个新的seq根据requestID判断是否不需要重新分配一个序号
//TODO seqMap memory leak
//TODO
//TODO
if (null != requestID && requestID.endsWith("_mul")) {
synchronized (lock) {
if (seqMap.containsKey(requestID)) {
req.seq = seqMap.get(requestID).seq;
} else {
req.seq = request_index.getAndIncrement();
seqMap.put(requestID, new MultiReqSeq(req.seq));
}
}
} else {
req.seq = request_index.getAndIncrement();
}
req.needSeq = true;
String id =
System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq;
LOGGER.info("execute receive requestID=" + requestID + " msgID=" + id);
if (checkCurNodeNumValid()) {
LOGGER.debug("checkCurNodeNumValid=true");
ResultCallback collector =
createResultCallback(id, rc, resultCount, req.seq, req.getContractID());
masterServerTCPAction.getSync().sleep(id, collector);
LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq);
sendRequest(id, req, collector);
} else {
LOGGER.debug("invalidNodeNumOnResult");
request_index.getAndDecrement();
ContractResult finalResult =
new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("node number unavailable, request refused."));
rc.onResult(JsonUtil.toJson(finalResult));
}
// }
/* // 三个相同requestID进来的时候会有冲突
// 仅在此处有冲突么
// 这里是从MasterServer->MasterClient请求的是"executeContractLocally"
req.seq = request_index.getAndIncrement();
req.needSeq = true;
ResultCallback collector = createResultCallback(id, rc, resultCount, req.getContractID());
MasterServerTCPAction.sync.sleep(id, collector);
sendRequest(id, req, collector);*/
}
// 清理缓存的多点合约请求序号
public void clearCache() {
final long time = System.currentTimeMillis() - 30000L;
seqMap.entrySet()
.removeIf(
entry -> {
MultiReqSeq cache = entry.getValue();
if (null == cache) {
return true;
}
return cache.startTime < time;
});
}
public static class ResultMerger extends ResultCallback implements NotifiableResultMerger {
ComponedContractResult componedContractResult;
AtomicInteger order;
String contractID;
int count;
int request_seq;
ResultCallback originalCallback;
Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点
ResultMerger(
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID) {
originalCallback = originalCb;
this.count = count;
this.request_seq = request_seq;
this.contractID = contractID;
componedContractResult = new ComponedContractResult(count);
order = new AtomicInteger(0);
}
public String getContractID() {
return contractID;
}
public String getInfo() {
return "contractID="
+ contractID
+ " 收到第 "
+ order
+ " 个节点回复 : "
+ " order="
+ order
+ " count="
+ count
+ " ";
}
@Override
public void onResult(String str) {
// TODO 必须在这里聚合
// str的data是个ContractResult
// 在这儿也是返回个ContractResult
try {
LOGGER.debug("a result of contract" + contractID + ": " + str);
JsonObject obj = JsonParser.parseString(str).getAsJsonObject();
if (obj.has("nodeID")) {
String id = obj.get("nodeID").getAsString();
if (nodeIDs.contains(id)) {
LOGGER.debug(
"ignored result because the result of node "
+ id.substring(0, 5)
+ " has been received");
return;
}
nodeIDs.add(id);
}
LOGGER.debug(
String.format(
"contractID=%s received=%s order=%d count=%d",
contractID, str, order.get(), count));
componedContractResult.add(obj);
// 收集到所有结果
if (order.incrementAndGet() == count) {
ContractResult finalResult = componedContractResult.figureFinalResult();
finalResult.needSeq = true;
finalResult.seq = request_seq;
// if (null == finalResult) {
// finalResult =
// new ContractResult(
// ContractResult.Status.Exception,
// new JsonPrimitive(
// "no nore than half of the
// consistent result"));
// originalCallback.onResult(new
// Gson().toJson(finalResult));
// } else {
originalCallback.onResult(JsonUtil.toJson(finalResult));
// }
LOGGER.debug(
String.format(
"%d results are the same: %s",
finalResult.size, finalResult.result));
// 集群中事务序号+1
cmActions.getManager().multiContractRecorder
.getMultiContractMeta(contractID)
.nextSeqAtMaster();
// recover其中无状态合约CP出错无需恢复
Set<String> nodesID = componedContractResult.getProblemNodes();
if (null == nodesID || nodesID.isEmpty()) {
return;
}
for (String nodeID : nodesID) {
LOGGER.warn("node fails! " + nodeID);
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID)
== RecoverFlag.Fine) {
masterServerRecoverMechAction.getRecoverStatusMap()
.get(nodeID)
.put(contractID, RecoverFlag.ToRecover);
}
}
for (String nodeID : nodesID) {
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID)
== RecoverFlag.ToRecover) {
LOGGER.warn("node in recover " + nodeID);
// 因为该节点结果有误所以即时是stableMode也认为trans记录不可信
// 直接通过load别的节点来恢复
masterServerRecoverMechAction.restartContractFromCommonMode(
nodeID, contractID);
}
}
}
// clearCache();
} catch (Exception e) {
e.printStackTrace();
LOGGER.warn("result exception!");
}
}
}
@Override
public void onRecover(Map<String, Object> args) {
int ceiLastExeSeq = (int) args.get("ceiLastExeSeq");
this.setSeq(ceiLastExeSeq + 1);
}
}

View File

@ -0,0 +1,21 @@
package org.bdware.consistency.plugin.ra;
import org.bdware.sc.bean.ContractExecType;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor;
import java.util.Map;
public class RequestAllResponseAllFactory implements ContractExecutorFactory {
@Override
public String getExecutorName() {
return "RARA";
}
@Override
public ContractExecutor getInstance(Map<String, Object> args) {
int nodeSize = (int) args.get("nodeSize");
String contractID = (String) args.get("contractID");
return new RequestAllExecutor(ContractExecType.RequestAllResponseAll, nodeSize, contractID);
}
}

View File

@ -0,0 +1,21 @@
package org.bdware.consistency.plugin.ra;
import org.bdware.sc.bean.ContractExecType;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor;
import java.util.Map;
public class RequestAllResponseFirstFactory implements ContractExecutorFactory {
@Override
public String getExecutorName() {
return "RARF";
}
@Override
public ContractExecutor getInstance(Map<String, Object> args) {
String contractID = (String) args.get("contractID");
return new RequestAllExecutor(
ContractExecType.RequestAllResponseFirst, 1, contractID);
}
}

View File

@ -0,0 +1,24 @@
package org.bdware.consistency.plugin.ra;
import org.bdware.sc.bean.ContractExecType;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor;
import java.util.Map;
public class RequestAllResponseHalfFactory implements ContractExecutorFactory {
@Override
public String getExecutorName() {
return "RARH";
}
@Override
public ContractExecutor getInstance(Map<String, Object> args) {
int nodeSize = (int) args.get("nodeSize");
String contractID = (String) args.get("contractID");
return new RequestAllExecutor(
ContractExecType.RequestAllResponseHalf,
nodeSize / 2 + 1,
contractID);
}
}

View File

@ -0,0 +1,63 @@
package org.bdware.consistency.plugin.ro;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.common.AbstractContextContractExecutor;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.util.JsonUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class RequestOnceExecutor extends AbstractContextContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(RequestOnceExecutor.class);
String contractID;
AtomicInteger order = new AtomicInteger(0);
public RequestOnceExecutor(String contractID) {
this.contractID = contractID;
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
ResultCallback cb =
new ResultCallback() {
@Override
public void onResult(String str) {
LOGGER.debug(str);
JsonObject jo = JsonParser.parseString(str).getAsJsonObject();
JsonObject result =
JsonParser.parseString(jo.get("data").getAsString())
.getAsJsonObject();
for (String key : result.keySet()) jo.add(key, result.get(key));
jo.remove("action");
jo.addProperty("action", "onExecuteResult");
LOGGER.debug(jo.toString());
rc.onResult(jo.toString());
}
};
masterServerTCPAction.getSync().sleep(requestID, cb);
String[] members = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers();
for (int i = 0; i < members.length; i++) {
LOGGER.info("[members]:" + members.length);
int size = members.length;
String nodeID = members[order.incrementAndGet() % size];
//ADD Connect
Map<String, Object> obj = new HashMap<>();
obj.put("action", "executeContractLocally");
obj.put("requestID", requestID);
obj.put("data", req);
obj.put("uniReqID", requestID);
networkManager.sendToAgent(nodeID, JsonUtil.toJson(obj));
return;
}
rc.onResult(
"{\"status\":\"Error\",\"result\":\"all nodes "
+ " offline\",\"action\":\"onExecuteContract\"}");
}
}

View File

@ -0,0 +1,19 @@
package org.bdware.consistency.plugin.ro;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor;
import java.util.Map;
public class RequestOnceExecutorFactory implements ContractExecutorFactory {
@Override
public String getExecutorName() {
return "RequestOnce";
}
@Override
public ContractExecutor getInstance(Map<String, Object> args) {
String contractID = (String) args.get("contractID");
return new RequestOnceExecutor(contractID);
}
}

View File

@ -0,0 +1,80 @@
package org.bdware.consistency.plugin.ro;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.common.AbstractContextContractExecutor;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.util.JsonUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class ResponseOnceExecutor extends AbstractContextContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(ResponseOnceExecutor.class);
private final String contractID;
AtomicInteger order = new AtomicInteger(0);
public ResponseOnceExecutor(String contractID) {
this.contractID = contractID;
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
executeInternal(requestID, rc, req, 2);
}
private void executeInternal(
String requestID, ResultCallback rc, ContractRequest req, int count) {
// String contractID = req.getContractID();
// TODO 标注失效节点是否选择重新迁移
ResultCallback cb =
new ResultCallback() {
@Override
public void onResult(String str) {
LOGGER.debug(str);
JsonObject jo = JsonParser.parseString(str).getAsJsonObject();
jo.remove("action");
jo.addProperty("action", "onExecuteResult");
LOGGER.debug(jo.toString());
if (jo.has("data")) {
String data = jo.get("data").getAsString();
ContractResult cr = JsonUtil.fromJson(data, ContractResult.class);
if (cr.status != ContractResult.Status.Success && count > 0) {
executeInternal(requestID, rc, req, count - 1);
} else rc.onResult(jo.toString());
} else {
JsonObject jo2 = new JsonObject();
jo2.addProperty("action", "onExecuteResult");
jo.remove("action");
jo2.addProperty("data", jo.toString());
rc.onResult(jo2.toString());
}
}
};
masterServerTCPAction.getSync().sleepWithTimeout(requestID, cb, 5);
if (!sendOnce(requestID, req))
rc.onResult(
"{\"status\":\"Error\",\"data\":\"{\\\"status\\\":\\\"Error\\\",\\\"result\\\":\\\"all nodes offline\\\"}\",\"action\":\"onExecuteContract\"}");
}
private boolean sendOnce(String requestID, ContractRequest req) {
String[] members = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers();
for (int i = 0; i < members.length; i++) {
int size = members.length;
String nodeID = members[order.incrementAndGet() % size];
Map<String, Object> obj = new HashMap<>();
obj.put("action", "executeContractLocally");
obj.put("data", req);
obj.put("uniReqID", requestID);
networkManager.sendToAgent(nodeID, JsonUtil.toJson(obj));
return true;
}
return false;
}
}

View File

@ -0,0 +1,19 @@
package org.bdware.consistency.plugin.ro;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor;
import java.util.Map;
public class ResponseOnceExecutorFactory implements ContractExecutorFactory {
@Override
public String getExecutorName() {
return "ResponseOnce";
}
@Override
public ContractExecutor getInstance(Map<String, Object> args) {
String contractID = (String) args.get("contractID");
return new ResponseOnceExecutor(contractID);
}
}

View File

@ -0,0 +1,422 @@
package org.bdware.consistency.plugin.sharding;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.common.AbstractContextContractExecutor;
import org.bdware.sc.ComponedContractResult;
import org.bdware.sc.ContractClient;
import org.bdware.sc.ContractMeta;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.*;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.RecoverFlag;
import org.bdware.sc.units.RequestCache;
import org.bdware.sc.units.ResultCache;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.trustedmodel.MultiReqSeq;
import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
// 改为MultiPointCooperationExecutor
public class MultiPointCooperationExecutor extends AbstractContextContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(MultiPointCooperationExecutor.class);
final Object lock = new Object();
int resultCount;
AtomicInteger request_index = new AtomicInteger(0);
ContractExecType type;
// key为requestIDvalue为其seq
Map<String, MultiReqSeq> seqMap = new ConcurrentHashMap<>();
Map<String, ResultCache> resultCache = new ConcurrentHashMap<>();
// MultiPointContractInfo info;
MultiContractMeta multiMeta;
String contractID;
public MultiPointCooperationExecutor(ContractExecType t, int c, String con_id) {
LOGGER.info("-- sharding executor---");
type = t;
resultCount = c;
contractID = con_id;
multiMeta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID);
}
public void setSeq(int seq) {
request_index = new AtomicInteger(seq);
}
public ResultCallback createResultCallback(
final String requestID,
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID, JoinInfo joinInfo) {
// TODO 加对应的超时
return networkManager.createResultCallback(
requestID,
new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID, joinInfo),
count); // 把count改成了1设置成获得1个响应就行
}
public void sendRequest(String id, ContractRequest req, String[] nodes, ResultCallback rc) {
req.needSeq = false;
JsonObject jo = new JsonObject();
jo.addProperty("uniReqID", id);
jo.add("data", JsonUtil.parseObject(req));
jo.addProperty("action", "executeContractLocally");
String sendStr = jo.toString();
// master负责缓存请求
if (!masterServerTCPAction.getReqCache().containsKey(contractID)) {
masterServerTCPAction.getReqCache().put(contractID, new RequestCache());
}
// TODO 多调多统一个seq的有多个请求这个需要改
masterServerTCPAction.getReqCache().get(contractID).put(req.seq, sendStr);
LOGGER.debug(JsonUtil.toJson(req));
LOGGER.info("node size = " + nodes.length);
LOGGER.debug("nodes:" + JsonUtil.toJson(nodes));
for (String node : nodes) {
if (node.equals(globalConf.getNodeID())) {
masterClientTCPAction.asyncExecuteContractLocally(jo, rc);
} else {
LOGGER.info(
"[sendRequests] get cmNode "
+ node.substring(0, 5)
+ " not null "
+ "RequestAllExecutor 发送请求给 "
+ node.substring(0, 5));
networkManager.sendToAgent(node, sendStr);
}
}
}
private String[] getAccordingToRouteInfo(RouteInfo routeInfo, ContractRequest req, String[] members) {
try {
int val;
switch (routeInfo.useDefault) {
case byRequester:
val =
new BigInteger(req.getRequester(), 16)
.mod(new BigInteger("" + members.length))
.intValue();
while (val < 0) {
val = val + members.length;
}
return new String[]{members[val]};
case byArgHash:
val = req.getArg().hashCode();
val = val % members.length;
while (val < 0) {
val += members.length;
}
return new String[]{members[val]};
case byTarget:
JsonObject jo = req.getArg().getAsJsonObject();
val =
new BigInteger(jo.get("target").getAsString(), 16)
.mod(new BigInteger("" + members.length))
.intValue();
while (val < 0) {
val = val + members.length;
}
return new String[]{members[val]};
case byFunc:
ContractClient client = cmActions.getManager().getClient(req.getContractID());
JsonArray membersArr = new JsonArray(members.length);
for (String member : members) {
membersArr.add(member);
}
JsonObject arg = new JsonObject();
arg.addProperty("funcName", routeInfo.funcName);
// func myFunc (currentNode, members, membersCount, sourceArg)
JsonArray funcArgs = new JsonArray();
funcArgs.add(globalConf.getNodeID());
funcArgs.add(membersArr);
funcArgs.add(membersArr.size());
funcArgs.add(req.getArg());
arg.add("funcArgs", funcArgs);
String routeResultStr = client.executeMethod("", "invokeFunctionWithoutLimit", arg.toString());
JsonObject routeResult = JsonUtil.parseString(routeResultStr).getAsJsonObject();
List<String> nodes = new ArrayList<>();
for (String key: routeResult.keySet()) {
nodes.add(routeResult.get(key).getAsString());
}
return nodes.toArray(new String[]{});
default:
return members;
}
} catch (Exception e) {
return members;
}
}
public boolean checkCurNodeNumValid() {
LOGGER.info("checkCurNodeNumValid");
String[] nodes = multiMeta.getMembers();
// List<String> nodes = info.members;
int validNode = 0;
for (String node : nodes) {
if (networkManager.hasAgentConnection(node)) {
//&& masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID)
// == RecoverFlag.Fine
validNode++;
}
}
int c = resultCount;
if (type == ContractExecType.Sharding) c = (int) Math.ceil((double) c / 2);
LOGGER.info("c=" + c + " validNode=" + validNode);
return validNode >= c;
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
LOGGER.info("[MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req));
// 获得action 函数名
LOGGER.info("action is : " + req.getAction());
req.setContractID(cmActions.getManager().getContractIDByName(req.getContractID()));
if (requestID != null && requestID.endsWith("_mul")) {
synchronized (lock) {
if (seqMap.containsKey(requestID)) {
req.seq = seqMap.get(requestID).seq;
} else {
req.seq = request_index.getAndIncrement();
seqMap.put(requestID, new MultiReqSeq(req.seq));
}
}
} else {
req.seq = request_index.getAndIncrement();
}
req.needSeq = true;
String id =
System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq;
LOGGER.info("execute receive requestID= " + requestID + " msgID=" + id);
if (checkCurNodeNumValid()) { // 校验成功 current node num 合法
LOGGER.info("checkCurNodeNumValid true");
ContractMeta meta =
cmActions.getManager().statusRecorder.getContractMeta(req.getContractID());
FunctionDesp fun = meta.getExportedFunction(req.getAction());
ResultCallback collector;
// TODO @fanbo 下面的count 1要改应该是根据route的规则来
//Count 根据join规则来
//nodes 根据route规则来
JoinInfo joinInfo = fun.joinInfo;
RouteInfo routeInfo = fun.routeInfo;
int count = getJoinCount(joinInfo, contractID);
LOGGER.info("requestID=" + requestID + " join Count: " + count);
String[] members = multiMeta.getMembers();
String[] nodes = getAccordingToRouteInfo(routeInfo, req, members);
if (nodes.length < count) {
count = nodes.length;
}
collector =
createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器
masterServerTCPAction.getSync().sleep(id, collector);
LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq);
sendRequest(id, req, nodes, collector); // 发送请求
} else {
LOGGER.info("invalidNodeNumOnResult");
request_index.getAndDecrement();
ContractResult finalResult =
new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("node number unavailbale,request refused."));
rc.onResult(JsonUtil.toJson(finalResult));
}
}
private int getJoinCount(JoinInfo joinInfo, String contractID) {
if (joinInfo == null) return resultCount;
if (joinInfo != null) return joinInfo.joinCount;
try {
ContractRequest cr = new ContractRequest();
cr.setContractID(contractID);
cr.setAction("TODO");
//TODO Arg需要好好设计一下
//TODO 又好用又简单的那种设计
//TODO
cr.setArg("");
String result = cmActions.getManager().executeLocally(cr, null);
return JsonUtil.parseString(result).getAsJsonObject().get("result").getAsInt();
} catch (Exception e) {
e.printStackTrace();
return 1;
}
}
// 清理缓存的多点合约请求序号
public void clearCache() {
final long time = System.currentTimeMillis() - 30000L;
seqMap.entrySet()
.removeIf(
entry -> {
MultiReqSeq cache = entry.getValue();
if (null == cache) {
return true;
}
return cache.startTime < time;
});
}
public static class ResultMerger extends ResultCallback {
ComponedContractResult componedContractResult;
AtomicInteger order;
String contractID;
int count; // 记录有多少个节点
int request_seq;
ResultCallback originalCallback;
Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点
JoinInfo joinInfo;
ResultMerger(
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID,
final JoinInfo joinInfo) {
originalCallback = originalCb;
this.count = count;
this.request_seq = request_seq;
this.contractID = contractID;
componedContractResult = new ComponedContractResult(count);
order = new AtomicInteger(0);
this.joinInfo = joinInfo;
}
public String getInfo() {
return "contractID="
+ contractID
+ " 收到第 "
+ order
+ " 个节点回复 : "
+ " order="
+ order
+ " count="
+ count
+ " ";
}
@Override
public void onResult(String str) {
// TODO 必须在这里聚合
// str的data是个ContractResult
// 在这儿也是返回个ContractResult
try {
LOGGER.info(str);
JsonObject obj = JsonParser.parseString(str).getAsJsonObject();
String id = obj.get("nodeID").getAsString();
if (nodeIDs.contains(id)) {
LOGGER.info("已经收到节点 " + id.substring(0, 5) + " 的结果,该结果被忽略");
return;
}
nodeIDs.add(id);
LOGGER.info(
"contractID="
+ contractID
+ " 收到第 "
+ order
+ " 个节点回复 : "
+ str
+ " order="
+ order
+ " count="
+ count);
componedContractResult.add(obj);
// 收集到所有结果
if (order.incrementAndGet() == count) {
ContractResult finalResult = componedContractResult.mergeFinalResult();
finalResult.needSeq = true;
finalResult.seq = request_seq;
// if (null == finalResult) {
// finalResult =
// new ContractResult(
// ContractResult.Status.Exception,
// new JsonPrimitive(
// "no nore than half of the
// consistent result"));
// originalCallback.onResult(new
// Gson().toJson(finalResult));
// } else {
if (joinInfo != null) {
handleJoinInfo(finalResult, joinInfo);
}
originalCallback.onResult(JsonUtil.toJson(finalResult));
// }
LOGGER.info(
"本次执行最终结果为 " + finalResult.size + "个节点合并的,结果为 " + finalResult.result);
// 集群中事务序号+1
// MasterServerTCPAction.contractID2Members.get(contractID).nextSeq();
cmActions.getManager()
.multiContractRecorder
.getMultiContractMeta(contractID)
.nextSeqAtMaster();
// recover其中无状态合约CP出错无需恢复
Set<String> nodesID = componedContractResult.getProblemNodes();
if (null == nodesID || nodesID.isEmpty()) {
return;
}
for (String nodeID : nodesID) {
LOGGER.info("结果出现问题的节点有:" + nodeID);
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID)
== RecoverFlag.Fine) {
masterServerRecoverMechAction.getRecoverStatusMap()
.get(nodeID)
.put(contractID, RecoverFlag.ToRecover);
}
}
for (String nodeID : nodesID) {
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID)
== RecoverFlag.ToRecover) {
LOGGER.info("问题节点开始恢复:" + nodeID);
// 因为该节点结果有误所以即时是stableMode也认为trans记录不可信
// 直接通过load别的节点来恢复
masterServerRecoverMechAction.restartContractFromCommonMode(
nodeID, contractID);
}
}
}
// clearCache();
} catch (Exception e) {
e.printStackTrace();
LOGGER.info("本次执行最终结果为有异常");
}
}
private void handleJoinInfo(ContractResult finalResult, JoinInfo joinInfo) {
JsonObject jo = finalResult.result.getAsJsonObject();
if (joinInfo != null && joinInfo.joinRule != null) {
//TODO 不应该是double 类型
switch (joinInfo.joinRule) {
case "add":
double val = 0;
for (String key : jo.keySet()) {
val += jo.get(key).getAsDouble();
}
finalResult.result = new JsonPrimitive(val);
break;
case "multiply":
val = 1;
for (String key : jo.keySet()) {
val *= jo.get(key).getAsDouble();
}
finalResult.result = new JsonPrimitive(val);
break;
}
}
}
}
}

View File

@ -0,0 +1,21 @@
package org.bdware.consistency.plugin.sharding;
import org.bdware.sc.bean.ContractExecType;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor;
import java.util.Map;
public class MultiPointCooperationExecutorFactory implements ContractExecutorFactory {
@Override
public String getExecutorName() {
return "Sharding";
}
@Override
public ContractExecutor getInstance(Map<String, Object> args) {
int nodeSize = (int) args.get("nodeSize");
String contractID = (String) args.get("contractID");
return new MultiPointCooperationExecutor(ContractExecType.Sharding, nodeSize, contractID);
}
}

View File

@ -0,0 +1,308 @@
package org.bdware.consistency.plugin.sharding;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.common.AbstractContextContractExecutor;
import org.bdware.sc.ContractClient;
import org.bdware.sc.ContractManager;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.bean.FunctionDesp;
import org.bdware.sc.bean.SM2Verifiable;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.RecoverFlag;
import org.bdware.sc.util.HashUtil;
import org.bdware.sc.util.JsonUtil;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* @author Kaidong Wu
*/
public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class);
private static final int SUBMIT_LIMIT = 1024;
private final Queue<ContractRequest> reqQueue = new ConcurrentLinkedQueue<>();
private final MultiContractMeta meta;
private final Map<String, Block> toExecuted = new ConcurrentHashMap<>();
private final Set<String> executedBlocks = ConcurrentHashMap.newKeySet();
private final Map<String, Boolean> executedTxs = new ConcurrentHashMap<>();
private final Object flag = new Object();
private final ScheduledFuture<?> future;
private boolean running = true;
private Block b = new Block();
public SelfAdaptiveShardingExecutor(String contractID) {
this.meta =
cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID);
this.future = ContractManager.scheduledThreadPool.scheduleWithFixedDelay(
this::submitBlock,
2,
2,
TimeUnit.SECONDS);
LOGGER.debug(String.format("ContractManager.threadPool=%d/%d",
((ThreadPoolExecutor) ContractManager.threadPool).getActiveCount(),
((ThreadPoolExecutor) ContractManager.threadPool).getPoolSize()));
ContractManager.threadPool.execute(() -> {
LOGGER.info(
"[SelfAdaptiveShardingExecutor " + meta.getContractID() + "] starting service..." + running);
while (running) {
LOGGER.info("checking blocks to be executed, latest block=" +
this.b.prevHash + ", to be executed size=" + toExecuted.size());
LOGGER.debug("executed: " + JsonUtil.toJson(executedBlocks) + "\n\t" + JsonUtil.toJson(executedTxs));
while (!toExecuted.isEmpty()) {
String key = this.b.prevHash;
Block block = toExecuted.get(key);
if (null != block) {
executeBlock(block);
}
toExecuted.remove(key);
}
synchronized (flag) {
try {
flag.wait();
} catch (InterruptedException e) {
LOGGER.warn(String.format(
"[SelfAdaptiveShardingExecutor %s] waiting is interrupted: %s",
meta.getContractID(),
e.getMessage()));
}
}
}
});
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) {
// check client
ContractClient client = cmActions.getManager().getClient(meta.getContractID());
if (null == client) {
LOGGER.error("contract " + meta.getContractID() + " not found!");
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("contract " + meta.getContractID() + " not found!"))));
return;
}
// check function
FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction());
if (null == funDesp) {
LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!");
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive(
String.format("action %s of contract %s not found!",
req.getAction(),
meta.getContractID())))));
return;
}
// for view function, execute it
if (funDesp.isView) {
cmActions.getManager().executeLocallyAsync(req, rcb, hcb);
return;
}
// normal function, check if it is in blocks
if (executedTxs.containsKey(requestID)) {
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("this request has been packed!"))));
return;
}
// add blocks into request cache
LOGGER.debug("receive contract request " + requestID);
executedTxs.put(requestID, false);
reqQueue.add(req);
rcb.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Executing,
new JsonPrimitive("this request is adding into blocks"))));
// if cache is full, submit
if (reqQueue.size() >= SUBMIT_LIMIT) {
ContractManager.threadPool.execute(this::submitBlock);
}
}
@Override
public void close() {
// stop threads
this.future.cancel(true);
this.running = false;
LOGGER.info("destruct executor of contract " + meta.getContractID());
}
public void execute(String blockStr) {
Block block = JsonUtil.fromJson(blockStr, Block.class);
// the block must have not been cached or executed, and must be valid
if (!toExecuted.containsKey(block.prevHash) &&
!executedBlocks.contains(block.hash) &&
block.isValid()) {
// add block into block cache
LOGGER.info(String.format(
"[SelfAdaptiveShardingExecutor %s] receive block %s -> %s," +
" %d transactions, timestamp=%d, size=%d",
meta.getContractID(),
block.hash,
block.prevHash,
block.requests.length,
block.timestamp,
blockStr.length()));
toExecuted.put(block.prevHash, block);
// notify thread to execute blocks
synchronized (flag) {
flag.notify();
}
}
}
private synchronized void executeBlock(Block block) {
// used for the thread to execute blocks
LOGGER.debug("start");
// check contract requests, requests must have not been executed
for (ContractRequest request : block.requests) {
if (executedTxs.containsKey(request.getRequestID()) && executedTxs.get(request.getRequestID())) {
LOGGER.debug("find request " + request.getRequestID() + " has been executed!");
return;
}
}
// TODO check status
// executed requests
for (ContractRequest request : block.requests) {
String ret = cmActions.getManager().executeLocally(request, null);
LOGGER.debug(String.format(
"[SelfAdaptiveShardingExecutor %s] result of request %s: %s",
meta.getContractID(),
request.getRequestID(),
ret));
executedTxs.put(request.getRequestID(), true);
}
LOGGER.info(String.format(
"[SelfAdaptiveShardingExecutor %s] execute %d transactions of block %s",
meta.getContractID(),
block.requests.length,
block.hash));
// TODO create check point
this.b = new Block(block.hash, this.b.height + 1);
executedBlocks.add(block.hash);
}
private void submitBlock() {
Block block = fillBlock();
if (null != block) {
LOGGER.info("deliver block " + block.hash + "...");
LOGGER.debug(JsonUtil.toPrettyJson(block));
String[] nodes = this.meta.getMembers();
JsonObject req = new JsonObject();
req.addProperty("action", "deliverBlock");
req.addProperty("data", JsonUtil.toJson(block));
req.addProperty("contractID", this.meta.getContractID());
String reqStr = req.toString();
// deliver blocks
for (String node : nodes) {
if (masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(this.meta.getContractID())
== RecoverFlag.Fine) {
networkManager.sendToAgent(node, reqStr);
}
}
}
}
private synchronized Block fillBlock() {
// pack contract requests into a block
ContractRequest[] requests = new ContractRequest[Math.min(reqQueue.size(), SUBMIT_LIMIT)];
if (requests.length == 0) {
return null;
}
for (int i = 0; i < requests.length; ++i) {
requests[i] = reqQueue.poll();
}
this.b.fillBlock(requests);
return this.b;
}
static class Block extends SM2Verifiable {
String prevHash = "0";
String hash;
int height;
String checkPoint;
String body;
String nodePubKey;
ContractRequest[] requests;
long timestamp;
public Block() {
this.height = 0;
}
public Block(String prev, int height) {
this.prevHash = prev;
this.height = height;
}
public void fillBlock(ContractRequest[] requests) {
this.requests = requests;
this.timestamp = System.currentTimeMillis();
this.body = merkle(requests);
this.hash = computeHash();
doSignature(cmActions.getManager().nodeCenterConn.getNodeKeyPair());
}
public boolean isValid() {
return computeHash().equals(hash) && body.equals(merkle(this.requests)) && verifySignature();
}
private String computeHash() {
return HashUtil.sha3(
String.valueOf(this.height),
this.prevHash,
this.checkPoint,
this.body);
}
private String merkle(ContractRequest[] requests) {
// manage requests as a merkle tree
if (requests.length == 0) {
return null;
}
if (requests.length == 1) {
return HashUtil.sha3(requests[0].getRequestID());
}
Queue<String> reqQueue =
Arrays.stream(requests).map(ContractRequest::getRequestID)
.collect(Collectors.toCollection(ArrayDeque::new));
do {
int size;
for (size = reqQueue.size(); size > 1; size -= 2) {
reqQueue.add(HashUtil.sha3(reqQueue.poll(), reqQueue.poll()));
}
if (size == 1) {
reqQueue.add(reqQueue.poll());
}
} while (1 != reqQueue.size());
return reqQueue.poll();
}
@Override
public String getPublicKey() {
return nodePubKey;
}
@Override
public void setPublicKey(String pubkey) {
this.nodePubKey = pubkey;
}
@Override
public String getContentStr() {
return this.hash;
}
}
@Override
public void onDeliverBlock(String data) {
execute(data);
}
}

View File

@ -0,0 +1,19 @@
package org.bdware.consistency.plugin.sharding;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor;
import java.util.Map;
public class SelfAdaptiveShardingExecutorFactory implements ContractExecutorFactory {
@Override
public String getExecutorName() {
return "SASharding";
}
@Override
public ContractExecutor getInstance(Map<String, Object> args) {
String contractID = (String) args.get("contractID");
return new SelfAdaptiveShardingExecutor(contractID);
}
}

View File

@ -0,0 +1,24 @@
package org.bdware.consistency.plugin.single;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor;
import org.bdware.server.trustedmodel.SingleNodeExecutor;
import java.util.Map;
public class SingleNodeExecutorFactory implements ContractExecutorFactory {
private static final Logger LOGGER = LogManager.getLogger(SingleNodeExecutorFactory.class);
@Override
public String getExecutorName() {
return "Sole";
}
@Override
public ContractExecutor getInstance(Map<String, Object> args) {
LOGGER.info("Sole contract is not supported in multi-point mode");
return SingleNodeExecutor.instance;
}
}

View File

@ -4,9 +4,9 @@ ARG GOPRIVATE=bdware.org/*
ARG GOPROXY=https://goproxy.cn
LABEL maintainer="caihuaqian@internetapi.cn"
LABEL org.bdware.version="1.6.6"
LABEL org.bdware.version="1.7.4"
LABEL org.bdware.version.isproduction="true"
LABEL org.bdware.release-date="2021-12-24"
LABEL org.bdware.release-date="2022-04-19"
COPY ./output /bdcontract
WORKDIR /bdcontract

View File

@ -4,9 +4,9 @@ ARG GOPRIVATE=bdware.org/*
ARG GOPROXY=https://goproxy.cn
LABEL maintainer="caihuaqian@internetapi.cn"
LABEL org.bdware.version="1.6.6"
LABEL org.bdware.version="1.7.4"
LABEL org.bdware.version.isproduction="false"
LABEL org.bdware.release-date="2021-12-24"
LABEL org.bdware.release-date="2022-04-19"
COPY ./output /bdcluster
WORKDIR /bdcluster

View File

@ -15,7 +15,7 @@ elif [ "$2" == "save" ]; then
echo "save to $3"
rm -rf ./output/BDWareProjectDir/public/TFMac
rm -rf ./output/WebContent/bdcontract4baas
docker buildx build --platform linux/arm64/v8 -t bdware/bdcontract:$1 ./ --load
docker buildx build --platform linux/amd64 -t bdware/bdcontract:$1 ./ --load
docker save -o $3/bdcontract-$1.tar bdware/bdcontract:$1
else
echo "create at local"

37
docker/deploy/README.md Normal file
View File

@ -0,0 +1,37 @@
本目录支持两种
# 镜像准备
请提前熟悉docker常用命令主要包括`docker ps``docker image xx``docker kill`等。
1. 下载或导入镜像:
```bash
#下载镜像
docker-compose pull
#或者是导入镜像bdcontract-x.x.x.tar为待导入镜像文件。
#docker load -i bdcontract-x.x.x.tar
```
2. 检查是否有"bdware/bdcontract:latest"镜像。
```bash
docker image ls | grep bdcontract
```
如果没有名为"bdware/bdcontract latest"的镜像,就将其中的最新版本打上。
```bash
docker image tag bdware/bdcontract:x.y.z bdware/bdcontract:latest
```
3.执行`init.sh`,创建一些初始化需要的目录。
# 修改配置
1.复制或链接`cp``./bdcontract`包括lib目录,yjs.jar)
2.修改`./bdcontract/cmvar.json`
也可利用cp目录下的jar包自己生成:
```bash
#本操作需要依赖java 1.8以上环境。
java -cp cp/libs:cp/yjs.jar org.bdware.sc.SM2Helper generateKeyToFile
```
生成的文件请妥善保存。
2. 修改配置文件cmconfig.json如果需要配置文件参数详见配置说明`cmconfig.readme.md`
3. 执行脚本`sh start.sh`

View File

@ -0,0 +1,23 @@
{
"cmi": "_CMI",
"debug": "",
"disableDoRepo": false,
"disableLocalLhs": false,
"doipCertPath": "",
"doipLhsAddress": "",
"doipPort": 21032,
"doipUserHandle": "",
"enableEventPersistence": false,
"enableSsl": "./ssl/chained.pem:./ssl/domain.pem",
"ip": "127.0.0.1",
"isLAN": true,
"overwrite": false,
"servicePort": 21030,
"textFileSuffixes": ".yjs,.json,.txt,.css,.js,.html,.md,.conf,.csv",
"withBdledgerClient": "./runnable/bdledger_mac",
"withBdledgerServer": false,
"consistencyPlugins": "/bdcontract/nosuchlib/custom-plugin.jar",
"startContract": [
],
"datachainConf": "_LEDGERHOST:_LEDGERPORT"
}

View File

@ -0,0 +1,6 @@
{
"_HOSTIP": "127.0.0.1",
"_PREFIX": "macjw.zbfz",
"_LEDGERPORT": "2401",
"_LEDGERHOST": "bdledger"
}

View File

@ -0,0 +1,3 @@
#!/bin/bash
java -cp cp/libs/*:cp/yjs.jar org.bdware.sc.SM2Helper generateKeyToFile
java -cp cp/libs/*:cp/yjs.jar org.bdware.sc.SM2Helper generateCMConfig

View File

@ -0,0 +1,42 @@
node:
id: acb1cc2a7013ec8a7ea7eee0ec6f835851dd8e91
api:
grpc:
addr: :2401
http:
enabled: true
datastore:
source: /data/block
p2p:
identity:
peerId: QmeG9PE8NQ7q8M6xiovVp9u1UjZZWc8Wom1K5MomJgLuWs
privKey: CAASqQkwggSlAgEAAoIBAQDAb0X3U2HjgoUniXH8EMs6L8FHGU63BahbY+/7GjnVFXe0bSuPlpaUC2Twmhulvad/1nSCuF178UYoYQ6Qz1AwBl4IiZmQxhSXdZPEXrN3WnC70rrkMMQxz7sjevyEXm9EIUo4Hepxlvd6PlTHUaYOW4TXSsv5JqoZW4+itxane/tOCCpFM/ppftpz3N5RxO181mczm6cljHcShZXkHSxo0nn3sHSqixqbzv+c0x/c5YP+aue3MJX2IjODGEY1a3x3HwCmwK3IgUSKWyZk480codWeO/5JFLNKzlXzWAu2GG7v2MF1h3Fa4lwEGdCLILY2Ux9d+M3wRvoz7ZWYn0CTAgMBAAECggEALLgvLEDGWNa2FvAL+yXz9HuwkNNCxamc7FAXLuVzVx98CAPuwZmfOJxFQtytXCDs0fqGbPJnVyxEv5F+jwx+eAqiw50mgMxncKM0ScgwMKZl2GAqamkizuiVdrNYB3LirJSH6O6a0vbgYBooHYHN8zw1bMrHCAmCMg5jHM8rhpQTp22VIa4YxlQDUAQdvuN4ilIXxwiyxTpbQobH4hVmwwoeU65sbFSZQ219d/KXPkK1l9c3OGf9KTPWzG3uq2lx2BKrJ3TMZYdiGBDKc89m6TnOidVB5jC5VKT8S4idKUQocMcg8eMYEWtR/0YgS0YC0FBhmpHfiam/kJ0mc+L6YQKBgQD2CBwi8kwBqJISM9kOAu7k2vDTB6eb83YWaSq4aQ/AYKvtfUHLz9Ekx+EjPmtMIfvxvJ86NEuCK64Ag4I943dCEyZYE00xBL/Zwark0LxgIuR8tSkxlPoErUmFdLELqIsUJS4Izu8lkPXE6PSxDZMe2Klq0Nd7wVglgjHxpcotOwKBgQDIOz6NEW54F0amGATtVj7DUN36wSQwE2AX5cyWZeKcvJixKKCdLSPln6QFW7+vI7vR4umUcFFEUKO8HwGojcq4+EjiOn/CppQ1GwD2xNXKFiJ5Sl3wpnqARFNHj3rH8l3kI3V5qt0c0OSzKr3bMGUdKNREUaKlqg20e3YznSZkiQKBgQCEnPh1ib6YQkGB2DqNx9z2tGCMjxqz/7XN/J/PSKn52uGxIAvgDMNBnQ9oTNPO9J51vWLiH5/3qQ2gL0J7k2kLz4CihrzbyCCVAkPYE/8Fnqkj4w4yMIfXD4SKj8yCaTWWBThb/RaAXDNtENgbuyJqxQQElE8h4KRfi17aTq+8UwKBgQCONyYSZBMmUMHpLp4xRFSHvWQsugnN67UQxDMvj2YJFRsOmWCawnkAmwaQl73p02OPi6+DstLFxtDEyPEQmsUl45NAu3QK+O3DWk6w9tUyF86cf2mBh3zypZTQ+uOmKErvww+pPuaVlPkbGHyItjLbJyi87Y6sQ8BANICb4D5ooQKBgQDHC6rQTaGIaOgAQzBzFPdi2foFEXTr9zpp0iVYdTFJr68OhROEkzQetHdk5MYR6SWjf3wQk34GDUqtVXNzoIhUkD6P83QCZKmsrkvYiQH0aXjAmjLpcDvSgUFCxS8NcaHKeDNLHw3SH8zh0zDFE1vlXdYzittDjmnYPGEEJzrUlg==
addresses:
swarm:
- /ip4/0.0.0.0/tcp/2416
- /ip4/0.0.0.0/udp/2416/quic
bootstrap:
- /ip4/127.0.0.1/tcp/2416/p2p/QmeG9PE8NQ7q8M6xiovVp9u1UjZZWc8Wom1K5MomJgLuWs
pubsub:
router: gossipsub
heartbeatInterval: 10000
query:
maxDuration: 3600 # seconds
maxRequestLifetime: 3.0 # seconds
waitReservedTime: 1.0 # seconds
nrw:
numWitness: 0
numBackup: 0
txsPerBlock: 1000
blockGenCycle: 30000
requestTimeout: 16
messageTimeout: 5
log:
format: text
level: INFO

View File

@ -0,0 +1,32 @@
version: "3"
services:
bdledger:
image: bdware/bdledger:dev-210909.874f034d
command: "-c /etc/bdledger/config.yml"
restart: unless-stopped
volumes:
- ./bdledger/config.yml:/etc/bdledger/config.yml:ro
- .//bdledger/data:/data
ports:
- "2416:2416" # P2P
- "2401:2401" # API
bdcontract:
image: bdware/bdcontract:latest
command: "-Dfile.encoding=UTF-8 -Djava.library.path='./dynamicLibrary' -cp ./libs/*:bdagent.jar org.bdware.server.CMHttpServer"
restart: unless-stopped
depends_on:
- bdledger
volumes:
- ./bdcontract/rocksdb:/bdcontract/rocksdb
- ./bdcontract/ContractDB:/bdcontract/ContractDB
- ./bdcontract/ContractManagerDB:/bdcontract/ContractManagerDB
- ./bdcontract/BDWareProjectDir:/bdcontract/BDWareProjectDir
- ./bdcontract/cmconfig.json:/bdcontract/cmconfig.json:ro
- ./bdcontract/cp:/bdcontract/cp
- ./bdcontract/tls:/bdcontract/tls:ro
ports:
- "21030:21030" # Contract manager API
- "21031:21031" # Contract manager master
- "21032:21032" # DOA
- "21033:21033" # Prometheus exporter

12
docker/deploy/init.sh Executable file
View File

@ -0,0 +1,12 @@
#!/bin/sh
set -a
#source ./.env
set +a
mkdir -p \
./bdcontract/rocksdb \
./bdcontract/ContractDB \
./bdcontract/ContractManagerDB \
./bdcontract/ssl \
./bdcontract/BDWareProjectDir \
./bdledger/data

37
docker/deploy/start.sh Executable file
View File

@ -0,0 +1,37 @@
#!/bin/bash
if [ ! -d "./BDWareProjectDir" ]; then
mkdir ./BDWareProjectDir
fi
if [ ! -d "./ContractDB" ]; then
mkdir ./ContractDB
fi
if [ ! -d "./ContractManagerDB" ]; then
mkdir ./ContractManagerDB
fi
if [ ! -d "./log" ]; then
mkdir ./log
fi
if [ ! -d "./manager.key" ]; then
echo "missing manager.key! automatically generate!"
java -cp cp/libs:cp/yjs.jar org.bdware.sc.SM2Helper generateKeyToFile
fi
if [ ! -d "./cp" ]; then
echo "missing cp dir!"
exit 1
fi
export CURDIR=`pwd`/bdcontract
docker run -p 21030-21033:21030-21033 -v $CURDIR/BDWareProjectDir:/bdcontract/BDWareProjectDir \
-v $CURDIR/ContractDB:/bdcontract/ContractDB \
-v $CURDIR/ContractManagerDB:/bdcontract/ContractManagerDB \
-v $CURDIR/cmconfig.json:/bdcontract/cmconfig.json \
-v $CURDIR/cp:/bdcontract/cp: \
-v $CURDIR/keys:/bdcontract/keys: \
-v $CURDIR/log:/bdcontract/log \
-v $CURDIR/manager.key:/bdcontract/manager.key: \
-d bdware/bdcontract:latest

View File

@ -0,0 +1,10 @@
#!/bin/bash
rm -rf build/dockerdist
mkdir build/dockerdist
cp docker/dockerdist/* build/dockerdist/
cp ./agent-backend/cmconfig.json.template build/dockerdist/cmconfig.json
cp -r ../cp-bundle/cp/build/output build/dockerdist/cp
if [ "$2" == "push" ]; then
done;

@ -1 +1 @@
Subproject commit eb69c62092df47be4be4a59cb0b4102a0767de7b
Subproject commit 19a033c69983253129ede0fe0f79bd25e36a39d6

@ -1 +1 @@
Subproject commit 8f8c0f27dfe7a45728a58c04f2e29c7a07cffeb5
Subproject commit a696b5a843a1bd09a84002ba6161a2a07d77e153

@ -1 +1 @@
Subproject commit 2b5b3bf001b4964c0bd989180a5a6fea79bbff6b
Subproject commit 0722b05788780d9088f2da9aa73fd68289a02fe4

@ -1 +1 @@
Subproject commit 2ee97ee70411acb661078576da49c93b48e6784a
Subproject commit 310b0f2913b91f23794926b2671c04160f8dab9d

View File

@ -7,3 +7,5 @@ include 'cm'
include 'front-base'
include 'agent-backend'
include 'router-backend'
include 'consistency-sdk'
include 'custom-plugin'