From 46d1ebb9bfcd198daa7512b426472e074fe446e7 Mon Sep 17 00:00:00 2001 From: garvey-wong Date: Tue, 3 May 2022 15:01:46 +0800 Subject: [PATCH] feat: sharding executor exec locally --- agent-backend | 2 +- .../api/context/IMasterClientTCPAction.java | 8 +++++ .../consistency/api/context/ISDKContext.java | 2 ++ .../AbstractContextContractExecutor.java | 1 + .../MultiPointCooperationExecutor.java | 32 +++++++++++-------- 5 files changed, 30 insertions(+), 15 deletions(-) create mode 100644 consistency-sdk/src/main/java/org/bdware/sdk/consistency/api/context/IMasterClientTCPAction.java diff --git a/agent-backend b/agent-backend index 5a4fdb8..da1f524 160000 --- a/agent-backend +++ b/agent-backend @@ -1 +1 @@ -Subproject commit 5a4fdb8a13f4eb60d7cf63c040b53be81392ea45 +Subproject commit da1f524a06bfb1a05753145bcd1f765ea4a42e81 diff --git a/consistency-sdk/src/main/java/org/bdware/sdk/consistency/api/context/IMasterClientTCPAction.java b/consistency-sdk/src/main/java/org/bdware/sdk/consistency/api/context/IMasterClientTCPAction.java new file mode 100644 index 0000000..dfb3733 --- /dev/null +++ b/consistency-sdk/src/main/java/org/bdware/sdk/consistency/api/context/IMasterClientTCPAction.java @@ -0,0 +1,8 @@ +package org.bdware.sdk.consistency.api.context; + +import com.google.gson.JsonObject; +import org.bdware.sc.conn.ResultCallback; + +public interface IMasterClientTCPAction { + void asyncExecuteContractLocally(JsonObject jo, ResultCallback rc); +} diff --git a/consistency-sdk/src/main/java/org/bdware/sdk/consistency/api/context/ISDKContext.java b/consistency-sdk/src/main/java/org/bdware/sdk/consistency/api/context/ISDKContext.java index 1b9008f..e49c310 100644 --- a/consistency-sdk/src/main/java/org/bdware/sdk/consistency/api/context/ISDKContext.java +++ b/consistency-sdk/src/main/java/org/bdware/sdk/consistency/api/context/ISDKContext.java @@ -1,6 +1,8 @@ package org.bdware.sdk.consistency.api.context; public interface ISDKContext { + IMasterClientTCPAction getMasterClientTCPAction(); + IMasterServerTCPAction getMasterServerTCPAction(); INetworkManager getNetworkManager(); diff --git a/custom-plugin/src/main/java/org/bdware/consistency/plugin/common/AbstractContextContractExecutor.java b/custom-plugin/src/main/java/org/bdware/consistency/plugin/common/AbstractContextContractExecutor.java index 76f69ea..1785741 100644 --- a/custom-plugin/src/main/java/org/bdware/consistency/plugin/common/AbstractContextContractExecutor.java +++ b/custom-plugin/src/main/java/org/bdware/consistency/plugin/common/AbstractContextContractExecutor.java @@ -8,6 +8,7 @@ public abstract class AbstractContextContractExecutor implements ContractExecuto static protected IGlobalConf globalConf = ConsistencyPluginManager.getContext().getGlobalConf(); static protected ICMActions cmActions = ConsistencyPluginManager.getContext().getCMActions(); static protected INetworkManager networkManager = ConsistencyPluginManager.getContext().getNetworkManager(); + static protected IMasterClientTCPAction masterClientTCPAction = ConsistencyPluginManager.getContext().getMasterClientTCPAction(); static protected IMasterServerTCPAction masterServerTCPAction = ConsistencyPluginManager.getContext().getMasterServerTCPAction(); static protected IMasterServerRecoverMechAction masterServerRecoverMechAction = ConsistencyPluginManager.getContext().getMasterServerRecoverMechAction(); } diff --git a/custom-plugin/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java b/custom-plugin/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java index 42fdee8..f91fc69 100644 --- a/custom-plugin/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java +++ b/custom-plugin/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java @@ -65,13 +65,13 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut count); // 把count改成了1,设置成获得1个响应就行 } - public void sendRequest(String id, ContractRequest req, String[] nodes) { - Map reqStr = new HashMap<>(); - reqStr.put("uniReqID", id); - reqStr.put("data", req); + public void sendRequest(String id, ContractRequest req, String[] nodes, ResultCallback rc) { req.needSeq = false; - reqStr.put("action", "executeContractLocally"); - String sendStr = JsonUtil.toJson(reqStr); + JsonObject jo = new JsonObject(); + jo.addProperty("uniReqID", id); + jo.add("data", JsonUtil.parseObject(req)); + jo.addProperty("action", "executeContractLocally"); + String sendStr = jo.toString(); // master负责缓存请求 if (!masterServerTCPAction.getReqCache().containsKey(contractID)) { masterServerTCPAction.getReqCache().put(contractID, new RequestCache()); @@ -82,13 +82,17 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut LOGGER.info("node size = " + nodes.length); LOGGER.debug("nodes:" + JsonUtil.toJson(nodes)); for (String node : nodes) { - LOGGER.info( - "[sendRequests] get cmNode " - + node.substring(0, 5) - + " not null " - + "RequestAllExecutor 发送请求给 " - + node.substring(0, 5)); - networkManager.sendToAgent(node, sendStr); + if (node.equals(globalConf.getNodeID())) { + masterClientTCPAction.asyncExecuteContractLocally(jo, rc); + } else { + LOGGER.info( + "[sendRequests] get cmNode " + + node.substring(0, 5) + + " not null " + + "RequestAllExecutor 发送请求给 " + + node.substring(0, 5)); + networkManager.sendToAgent(node, sendStr); + } } } @@ -218,7 +222,7 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器 masterServerTCPAction.getSync().sleep(id, collector); LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq); - sendRequest(id, req, nodes); // 发送请求 + sendRequest(id, req, nodes, collector); // 发送请求 } else { LOGGER.info("invalidNodeNumOnResult"); request_index.getAndDecrement();