mirror of
				https://gitee.com/BDWare/custom-plugin
				synced 2025-11-03 22:32:15 +00:00 
			
		
		
		
	update mpcexecutor
This commit is contained in:
		
							parent
							
								
									1356c1b56e
								
							
						
					
					
						commit
						f89ad35d7a
					
				@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 | 
			
		||||
public class MultiPointCooperationExecutor extends AbstractContextContractExecutor {
 | 
			
		||||
    private static final Logger LOGGER = LogManager.getLogger(MultiPointCooperationExecutor.class);
 | 
			
		||||
    final Object lock = new Object();
 | 
			
		||||
    private final ContractMeta meta;
 | 
			
		||||
    int resultCount;
 | 
			
		||||
    AtomicInteger request_index = new AtomicInteger(0);
 | 
			
		||||
    ContractExecType type;
 | 
			
		||||
@ -45,6 +46,7 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
 | 
			
		||||
        resultCount = c;
 | 
			
		||||
        contractID = con_id;
 | 
			
		||||
        multiMeta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID);
 | 
			
		||||
        meta = cmActions.getManager().statusRecorder.getContractMeta(contractID);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void setSeq(int seq) {
 | 
			
		||||
@ -69,14 +71,14 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
 | 
			
		||||
        }
 | 
			
		||||
        // TODO 多调多统一个seq的有多个请求,这个需要改
 | 
			
		||||
        masterServerTCPAction.getReqCache().get(contractID).put(req.seq, sendStr);
 | 
			
		||||
        LOGGER.debug(JsonUtil.toJson(req));
 | 
			
		||||
        LOGGER.info("node size = " + nodes.length);
 | 
			
		||||
        LOGGER.debug("nodes:" + JsonUtil.toJson(nodes));
 | 
			
		||||
        // LOGGER.debug(JsonUtil.toJson(req));
 | 
			
		||||
        // LOGGER.info("node size = " + nodes.length);
 | 
			
		||||
        // LOGGER.debug("nodes:" + JsonUtil.toJson(nodes));
 | 
			
		||||
        for (String node : nodes) {
 | 
			
		||||
            if (node.equals(globalConf.getNodeID())) {
 | 
			
		||||
                masterClientTCPAction.asyncExecuteContractLocally(jo, rc);
 | 
			
		||||
            } else {
 | 
			
		||||
                LOGGER.info("[sendRequests] get cmNode " + node.substring(0, 5) + " not null " + "RequestAllExecutor  发送请求给 " + node.substring(0, 5));
 | 
			
		||||
                // LOGGER.info("[sendRequests] get cmNode " + node.substring(0, 5) + " not null " + "RequestAllExecutor  发送请求给 " + node.substring(0, 5));
 | 
			
		||||
                networkManager.sendToAgent(node, sendStr);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
@ -86,7 +88,6 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
 | 
			
		||||
        ContractClient client = cmActions.getManager().getClient(contractID);
 | 
			
		||||
        JsonObject arg = new JsonObject();
 | 
			
		||||
        arg.addProperty("funcName", funcName);
 | 
			
		||||
 | 
			
		||||
        JsonArray funcArgs = new JsonArray();
 | 
			
		||||
        if (args != null && args.length > 0) for (JsonElement je : args)
 | 
			
		||||
            funcArgs.add(je);
 | 
			
		||||
@ -101,39 +102,82 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
 | 
			
		||||
            int val;
 | 
			
		||||
            if (routeInfo.useDefault == null) {
 | 
			
		||||
                // func myFunc (requester, sourceArg)
 | 
			
		||||
                return invokeFunctionWithoutLimit(req.getContractID(), routeInfo.funcName, String[].class, new JsonPrimitive(req.getRequester()), req.getArg());
 | 
			
		||||
                JsonElement requester = req.getRequester() == null ? JsonNull.INSTANCE : new JsonPrimitive(req.getRequester());
 | 
			
		||||
                return invokeFunctionWithoutLimit(req.getContractID(), routeInfo.funcName, String[].class, requester, req.getArg());
 | 
			
		||||
            }
 | 
			
		||||
            switch (routeInfo.useDefault) {
 | 
			
		||||
                case byRequester:
 | 
			
		||||
                    val = new BigInteger(req.getRequester(), 16).mod(new BigInteger("" + members.length)).intValue();
 | 
			
		||||
                    while (val < 0) {
 | 
			
		||||
                    while (val < 0 && members.length > 0) {
 | 
			
		||||
                        val = val + members.length;
 | 
			
		||||
                    }
 | 
			
		||||
                    return new String[]{members[val]};
 | 
			
		||||
                case byArgHash:
 | 
			
		||||
                    val = req.getArg().hashCode();
 | 
			
		||||
                    val = val % members.length;
 | 
			
		||||
                    while (val < 0) {
 | 
			
		||||
                    while (val < 0 && members.length > 0) {
 | 
			
		||||
                        val += members.length;
 | 
			
		||||
                    }
 | 
			
		||||
                    return new String[]{members[val]};
 | 
			
		||||
                case byTarget:
 | 
			
		||||
                    JsonObject jo = req.getArg().getAsJsonObject();
 | 
			
		||||
                    val = new BigInteger(jo.get("target").getAsString(), 16).mod(new BigInteger("" + members.length)).intValue();
 | 
			
		||||
                    while (val < 0) {
 | 
			
		||||
                        val = val + members.length;
 | 
			
		||||
                case byJsonPropHash:
 | 
			
		||||
                    JsonElement jo = tryLoadJsonProp(req, routeInfo.param);
 | 
			
		||||
                    val = jo.toString().hashCode() % members.length;
 | 
			
		||||
                    while (val < 0 && members.length > 0) {
 | 
			
		||||
                        val += members.length;
 | 
			
		||||
                    }
 | 
			
		||||
                    return new String[]{members[val]};
 | 
			
		||||
                case byShardingIDTree:
 | 
			
		||||
                    return byShardingIDTree(members, meta.contract.shardingId, Integer.valueOf(routeInfo.param));
 | 
			
		||||
                default:
 | 
			
		||||
                    return members;
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            e.printStackTrace();
 | 
			
		||||
            return members;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private String[] byShardingIDTree(String[] members, int shardingId, int childCount) {
 | 
			
		||||
        if (childCount > 0) {
 | 
			
		||||
            int from = shardingId * childCount + 1;
 | 
			
		||||
            int to = Math.min(members.length, shardingId * childCount + childCount + 1);
 | 
			
		||||
            if (to > from) {
 | 
			
		||||
                String[] ret = new String[to - from];
 | 
			
		||||
                System.arraycopy(members, from, ret, 0, to - from);
 | 
			
		||||
                return ret;
 | 
			
		||||
            } else
 | 
			
		||||
                return new String[]{};
 | 
			
		||||
        } else {
 | 
			
		||||
            childCount = Math.abs(childCount);
 | 
			
		||||
            if (shardingId > 0) shardingId = (shardingId - 1) / childCount;
 | 
			
		||||
            return new String[]{members[shardingId]};
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private JsonElement tryLoadJsonProp(ContractRequest req, String param) {
 | 
			
		||||
        try {
 | 
			
		||||
            if (req.getArg() == null)
 | 
			
		||||
                return JsonNull.INSTANCE;
 | 
			
		||||
            JsonObject arg;
 | 
			
		||||
            if (req.getArg().isJsonPrimitive()) {
 | 
			
		||||
                arg = JsonUtil.parseString(req.getArg().getAsString()).getAsJsonObject();
 | 
			
		||||
            } else arg = req.getArg().getAsJsonObject();
 | 
			
		||||
            if (!param.contains(".")) {
 | 
			
		||||
                return arg.get(param);
 | 
			
		||||
            } else {
 | 
			
		||||
                String[] props = param.split("\\.");
 | 
			
		||||
                JsonElement result = arg;
 | 
			
		||||
                for (String str : props)
 | 
			
		||||
                    result = result.getAsJsonObject().get(str);
 | 
			
		||||
                return result;
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            e.printStackTrace();
 | 
			
		||||
            return JsonNull.INSTANCE;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public boolean checkCurNodeNumValid() {
 | 
			
		||||
        LOGGER.info("checkCurNodeNumValid");
 | 
			
		||||
        String[] nodes = multiMeta.getMembers();
 | 
			
		||||
        // List<String> nodes = info.members;
 | 
			
		||||
        int validNode = 0;
 | 
			
		||||
@ -146,15 +190,14 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
 | 
			
		||||
        }
 | 
			
		||||
        int c = resultCount;
 | 
			
		||||
        if (type == ContractExecType.Sharding) c = (int) Math.ceil((double) c / 2);
 | 
			
		||||
        LOGGER.info("c=" + c + "  validNode=" + validNode);
 | 
			
		||||
        return validNode >= c;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) {
 | 
			
		||||
        LOGGER.info("[MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req));
 | 
			
		||||
        //LOGGER.info("[MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req));
 | 
			
		||||
        // 获得action 函数名
 | 
			
		||||
        LOGGER.info("action is : " + req.getAction());
 | 
			
		||||
        //   LOGGER.info("action is : " + req.getAction());
 | 
			
		||||
        req.setContractID(cmActions.getManager().getContractIDByName(req.getContractID()));
 | 
			
		||||
        if (requestID != null && requestID.endsWith("_mul")) {
 | 
			
		||||
            synchronized (lock) {
 | 
			
		||||
@ -170,33 +213,33 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
 | 
			
		||||
        }
 | 
			
		||||
        req.needSeq = true;
 | 
			
		||||
        String id = System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq;
 | 
			
		||||
        LOGGER.info("execute receive requestID= " + requestID + "  msgID=" + id);
 | 
			
		||||
        if (checkCurNodeNumValid()) { // 校验成功 current node num 合法
 | 
			
		||||
            LOGGER.info("checkCurNodeNumValid true");
 | 
			
		||||
            ContractMeta meta = cmActions.getManager().statusRecorder.getContractMeta(req.getContractID());
 | 
			
		||||
            FunctionDesp fun = meta.getExportedFunction(req.getAction());
 | 
			
		||||
            ResultCallback collector;
 | 
			
		||||
            //Count 根据join规则来。
 | 
			
		||||
            //nodes 根据route规则来。
 | 
			
		||||
            JoinInfo joinInfo = fun.joinInfo;
 | 
			
		||||
            RouteInfo routeInfo = fun.routeInfo;
 | 
			
		||||
            int count = getJoinCount(joinInfo, contractID, req);
 | 
			
		||||
            LOGGER.info("requestID=" + requestID + " join Count: " + count);
 | 
			
		||||
        //LOGGER.info("execute receive requestID= " + requestID + "  msgID=" + id);
 | 
			
		||||
        // 校验成功 current node num 合法
 | 
			
		||||
        //LOGGER.info("checkCurNodeNumValid true");
 | 
			
		||||
        ContractMeta meta = cmActions.getManager().statusRecorder.getContractMeta(req.getContractID());
 | 
			
		||||
        FunctionDesp fun = meta.getExportedFunction(req.getAction());
 | 
			
		||||
        ResultCallback collector;
 | 
			
		||||
        //Count 根据join规则来。
 | 
			
		||||
        //nodes 根据route规则来。
 | 
			
		||||
        JoinInfo joinInfo = fun.joinInfo;
 | 
			
		||||
        RouteInfo routeInfo = fun.routeInfo;
 | 
			
		||||
        int count = getJoinCount(joinInfo, contractID, req);
 | 
			
		||||
 | 
			
		||||
            String[] members = multiMeta.getMembers();
 | 
			
		||||
            String[] nodes = getAccordingToRouteInfo(routeInfo, req, members);
 | 
			
		||||
            if (nodes.length < count) {
 | 
			
		||||
                count = nodes.length;
 | 
			
		||||
            }
 | 
			
		||||
            collector = createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器
 | 
			
		||||
        String[] members = multiMeta.getMembers();
 | 
			
		||||
        String[] nodes = getAccordingToRouteInfo(routeInfo, req, members);
 | 
			
		||||
        if (nodes.length < count) {
 | 
			
		||||
            count = nodes.length;
 | 
			
		||||
        }
 | 
			
		||||
        //LOGGER.info("requestID=" + requestID + " join Count: " + count + " nodeCount:" + nodes.length);
 | 
			
		||||
        if (count > 0) {
 | 
			
		||||
            collector = createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo);
 | 
			
		||||
            // 初始化结果收集器
 | 
			
		||||
            masterServerTCPAction.getSync().sleep(id, collector);
 | 
			
		||||
            LOGGER.info("requestID=" + requestID + "  master broadcasts request " + req.seq);
 | 
			
		||||
            //  LOGGER.info("requestID=" + requestID + "  master broadcasts request " + req.seq);
 | 
			
		||||
            sendRequest(id, req, nodes, collector); // 发送请求
 | 
			
		||||
        } else {
 | 
			
		||||
            LOGGER.info("invalidNodeNumOnResult");
 | 
			
		||||
            request_index.getAndDecrement();
 | 
			
		||||
            ContractResult finalResult = new ContractResult(ContractResult.Status.Error, new JsonPrimitive("node number unavailbale,request refused."));
 | 
			
		||||
            rc.onResult(JsonUtil.toJson(finalResult));
 | 
			
		||||
            ContractResult cr = new ContractResult(ContractResult.Status.Exception, new JsonPrimitive("broadcast 0"));
 | 
			
		||||
            rc.onResult(JsonUtil.toJson(cr));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -258,15 +301,22 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
 | 
			
		||||
            // str的data是个ContractResult
 | 
			
		||||
            // 在这儿也是返回个ContractResult
 | 
			
		||||
            try {
 | 
			
		||||
                LOGGER.info(str);
 | 
			
		||||
                //LOGGER.info(str);
 | 
			
		||||
                JsonObject obj = JsonParser.parseString(str).getAsJsonObject();
 | 
			
		||||
                String id = obj.get("nodeID").getAsString();
 | 
			
		||||
 | 
			
		||||
                String id;
 | 
			
		||||
                if (obj.get("nodeID") == null) {
 | 
			
		||||
                    id = "null";
 | 
			
		||||
                    //  LOGGER.info("已经收到第" + order + "节点的结果,无节点ID! 该结果被忽略" + order);
 | 
			
		||||
                    return;
 | 
			
		||||
                } else
 | 
			
		||||
                    id = obj.get("nodeID").getAsString();
 | 
			
		||||
                if (nodeIDs.contains(id)) {
 | 
			
		||||
                    LOGGER.info("已经收到节点 " + id.substring(0, 5) + " 的结果,该结果被忽略");
 | 
			
		||||
                    // LOGGER.info("已经收到节点 " + id.substring(0, 5) + " 的结果,该结果被忽略");
 | 
			
		||||
                    return;
 | 
			
		||||
                }
 | 
			
		||||
                nodeIDs.add(id);
 | 
			
		||||
                LOGGER.info("contractID=" + contractID + "  收到第 " + order + " 个节点回复 : " + str + "  order=" + order + "   count=" + count);
 | 
			
		||||
                //LOGGER.info("contractID=" + contractID + "  收到第 " + order + " 个节点回复 : " + str + "  order=" + order + "   count=" + count);
 | 
			
		||||
                componedContractResult.add(obj);
 | 
			
		||||
                // 收集到所有结果
 | 
			
		||||
                if (order.incrementAndGet() == count) {
 | 
			
		||||
@ -289,7 +339,7 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
 | 
			
		||||
                    }
 | 
			
		||||
                    originalCallback.onResult(JsonUtil.toJson(finalResult));
 | 
			
		||||
                    //                    }
 | 
			
		||||
                    LOGGER.info("本次执行最终结果为 " + finalResult.size + "个节点合并的,结果为 " + finalResult.result);
 | 
			
		||||
                    //LOGGER.info("本次执行最终结果为 " + finalResult.size + "个节点合并的,结果为 " + finalResult.result);
 | 
			
		||||
 | 
			
		||||
                    // 集群中事务序号+1
 | 
			
		||||
                    // MasterServerTCPAction.contractID2Members.get(contractID).nextSeq();
 | 
			
		||||
@ -330,8 +380,8 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
 | 
			
		||||
                        if (joinInfo.joinFuncName != null) {
 | 
			
		||||
                            JsonElement ret = MultiPointCooperationExecutor.invokeFunctionWithoutLimit(contractID, joinInfo.joinFuncName, JsonElement.class, jo);
 | 
			
		||||
                            finalResult.result = ret;
 | 
			
		||||
                            return;
 | 
			
		||||
                        }
 | 
			
		||||
                        return;
 | 
			
		||||
                    }
 | 
			
		||||
                    switch (joinInfo.useDefault) {
 | 
			
		||||
                        case add:
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user