From 6030bdf2ad0bf3f5771e436440f4317475544127 Mon Sep 17 00:00:00 2001 From: CaiHQ Date: Fri, 2 Sep 2022 21:28:31 +0800 Subject: [PATCH] optimize file distribute --- .../server/action/DistributeCallback.java | 17 ++- .../bdware/server/nodecenter/LogActions.java | 2 +- .../server/nodecenter/MetaIndexAction.java | 12 +- .../server/nodecenter/NCHttpHandler.java | 117 +++++++++++------- .../server/nodecenter/NCManagerAction.java | 27 +++- .../server/nodecenter/NodeCenterActions.java | 66 +++++----- .../nodecenter/NodeCenterWSFrameHandler.java | 6 +- .../bdware/server/nodecenter/UnitActions.java | 4 +- 8 files changed, 154 insertions(+), 97 deletions(-) diff --git a/src/main/java/org/bdware/server/action/DistributeCallback.java b/src/main/java/org/bdware/server/action/DistributeCallback.java index 1ee7812..7cd0833 100644 --- a/src/main/java/org/bdware/server/action/DistributeCallback.java +++ b/src/main/java/org/bdware/server/action/DistributeCallback.java @@ -77,8 +77,9 @@ public class DistributeCallback extends ResultCallback { if (progress.equals("100.00")) index++; Map map = new HashMap<>(); - map.put("action", "onDistributeYPK"); + map.put("action", "onDistribute"); map.put("responseID", distributeID); + map.put("distributeID", distributeID); map.put("content", JsonUtil.toJson(ret2)); map.put("nodeIP", nodeIP); map.put("progress", progress); @@ -86,7 +87,7 @@ public class DistributeCallback extends ResultCallback { } public void onReceive(Map map) { - LOGGER.debug("[DistributeCallback] onReceive : position----9"); + LOGGER.info("[DistributeCallback] onReceive : position----9"); String progress = map.get("progress"); String nodeIP = map.get("ipPort"); if (progress.equals("100")) { @@ -95,15 +96,17 @@ public class DistributeCallback extends ResultCallback { args.put("pubKey", pubKey); args.put("signature", signature); count++; - LOGGER.debug(count + "个节点已收完成" + " 总共有" + nodes.size() + " 个节点"); - if (count == nodes.size()) { + LOGGER.info(count + "个节点已收完成" + " 总共有" + nodes.size() + " 个节点"); + if (count >= nodes.size() - 1) { // res返回给前端,合约分发完成 + LOGGER.info("个节点已收完成" + "send on finish!!!!!"); Map ret2 = new HashMap<>(); ret2.put("action", "onDistributeFinish"); ret2.put("progress", "100%"); Map map_send = new HashMap<>(); map_send.put("action", "onDistribute"); map_send.put("responseID", distributeID); + map_send.put("distributeID", distributeID); map_send.put("over", "true"); map_send.put("nodeIP", nodeIP); map_send.put("content", JsonUtil.toJson(ret2)); @@ -118,7 +121,7 @@ public class DistributeCallback extends ResultCallback { @Override public void onResult(String str) { - LOGGER.debug("[DistributeCallback] str=" + str); + LOGGER.info("[DistributeCallback] str=" + str); Map map = JsonUtil.fromJson(str, new TypeToken>() { }.getType()); @@ -128,15 +131,19 @@ public class DistributeCallback extends ResultCallback { String operation = map.get("operation"); switch (operation) { + //上传到nodecenter case "NCreceive": NCreceive(map.get("progress")); break; + //开始分发给各个节点 case "distribute": distributeContractProject(map.get("receiveFileName"), map.get("isPrivate")); break; + //分发给各个节点 case "onDistribute": onDistribute(map.get("progress"), map.get("nodeIP")); break; + //各个节点返回收到 case "onReceive": onReceive(map); break; diff --git a/src/main/java/org/bdware/server/nodecenter/LogActions.java b/src/main/java/org/bdware/server/nodecenter/LogActions.java index 15ce797..526d5fd 100644 --- a/src/main/java/org/bdware/server/nodecenter/LogActions.java +++ b/src/main/java/org/bdware/server/nodecenter/LogActions.java @@ -130,7 +130,7 @@ public class LogActions { @Action(userPermission = 1 << 9, async = true) public void listNodes(JsonObject json, ResultCallback resultCallback) { long start = System.currentTimeMillis(); - final String pubKey = managerAction.pubKey; + final String pubKey = managerAction.getPubKey(json); LOGGER.debug("[listNodes] managerAction.pubKey " + pubKey); Map nodeinfos = NodeCenterActions.nodeInfos; // 所有在线节点? final Map cinodeinfos = new HashMap<>(); diff --git a/src/main/java/org/bdware/server/nodecenter/MetaIndexAction.java b/src/main/java/org/bdware/server/nodecenter/MetaIndexAction.java index 68f7995..fbb5561 100644 --- a/src/main/java/org/bdware/server/nodecenter/MetaIndexAction.java +++ b/src/main/java/org/bdware/server/nodecenter/MetaIndexAction.java @@ -28,11 +28,7 @@ import org.bdware.server.nodecenter.searchresult.ContractMeta; import org.bdware.server.nodecenter.searchresult.ResultModel; import org.wltea.analyzer.lucene.IKAnalyzer; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.PrintStream; -import java.io.InputStreamReader; +import java.io.*; import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.util.ArrayList; @@ -105,9 +101,9 @@ public class MetaIndexAction { // public static IndexWriter indexWriter; LOGGER.warn("getting index failed! " + e.getMessage()); } } - req.addProperty("action", "requestReadMe"); - req.addProperty("contractID", thisDesp.contractID); - rc.onResult(req); + // req.addProperty("action", "requestReadMe"); + // req.addProperty("contractID", thisDesp.contractID); + // rc.onResult(req); LOGGER.info("contract " + thisDesp.contractName + " --> actually to index"); } if (null != indexReader) { diff --git a/src/main/java/org/bdware/server/nodecenter/NCHttpHandler.java b/src/main/java/org/bdware/server/nodecenter/NCHttpHandler.java index c14d4bd..c3dd180 100644 --- a/src/main/java/org/bdware/server/nodecenter/NCHttpHandler.java +++ b/src/main/java/org/bdware/server/nodecenter/NCHttpHandler.java @@ -1,10 +1,8 @@ package org.bdware.server.nodecenter; import com.google.gson.JsonObject; -import com.google.gson.JsonParser; import com.google.gson.reflect.TypeToken; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -22,13 +20,15 @@ import org.bdware.server.NodeCenterServer; import org.bdware.server.action.Action; import org.bdware.server.action.ActionExecutor; import org.bdware.server.action.HttpResultCallback; -import org.bdware.server.http.HttpFileHandleAdapter; -import org.bdware.server.http.URIHandler; -import org.bdware.server.http.URIPath; +import org.bdware.server.http.HttpMethod; +import org.bdware.server.http.*; import org.bdware.server.permission.Role; import org.zz.gmhelper.SM2Util; -import java.io.*; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileFilter; +import java.io.PrintStream; import java.net.URLDecoder; import java.util.HashMap; import java.util.List; @@ -43,9 +43,14 @@ public class NCHttpHandler extends SimpleChannelInboundHandler { private static final String UNSUPPORTED_HTTP_METHOD = "{\"msg\":\"unsupported http method\"}"; private static final String UNSUPPORTED_ACTION = "{\"msg\":\"unsupported action\"}"; private static final Logger LOGGER = LogManager.getLogger(NCHttpHandler.class); + static NCManagerAction managerAction = new NCManagerAction(null); + static LogActions logActions = new LogActions(managerAction); + static UnitActions unitActions = new UnitActions(managerAction); private static final ActionExecutor actionExecutor = new ActionExecutor( - NodeCenterFrameHandler.executorService, new NodeCenterActions(null)) { + NodeCenterFrameHandler.executorService, + new NodeCenterActions(null), + managerAction, logActions, unitActions) { @Override public boolean checkPermission(Action a, JsonObject arg, long per) { @@ -119,42 +124,61 @@ public class NCHttpHandler extends SimpleChannelInboundHandler { ctx.flush(); } - @URIPath({"/NodeCenter", "/SCIDE/SCExecutor", "/SCIDE/CMManager", "/SCIDE/SCManager"}) - public void parseGetAndHandle(ChannelHandlerContext ctx, FullHttpRequest req) { - QueryStringDecoder decoderQuery = new QueryStringDecoder(req.uri()); - Map> params = decoderQuery.parameters(); - JsonObject map = new JsonObject(); - for (String key : params.keySet()) { - List val = params.get(key); - if (val != null) map.addProperty(key, val.get(0)); + public static class VerifiyCallback implements ArgParser.VerifiedCallback { + + @Override + public void onResult(boolean verified, JsonObject transformedParam) { + LOGGER.info("verify signature: " + verified); + if (verified) { + // 查permission + String ret = getRole(transformedParam.get("pubKey").getAsString()); + long permission; + if (ret != null && ret.length() > 0) { + permission = 0x86000d41L | Role.compoundValue(ret.split(",")); + } else { + permission = Role.compoundValue(ret.split(",")); + } + transformedParam.addProperty("permission", permission + ""); + LOGGER.debug("[CMHttpHandler] http 请求查看用户权限为 : " + permission); + } else { + transformedParam.addProperty("permission", 0 + ""); + } } - String uri = URLDecoder.decode(req.uri()).split("\\?")[1]; - int index = uri.lastIndexOf('&'); - String str = uri; - if (index != -1) { - str = uri.substring(0, index); - } - injectPermission(map, str); - handleReq(map, ctx); } - @URIPath( - method = org.bdware.server.http.HttpMethod.POST, + @URIPath(method = HttpMethod.OPTIONS) + public void crossOrigin(ChannelHandlerContext ctx, FullHttpRequest request) { + DefaultFullHttpResponse fullResponse = + new DefaultFullHttpResponse( + request.protocolVersion(), + OK, + Unpooled.wrappedBuffer("success".getBytes())); + fullResponse.headers().remove("Access-Control-Allow-Origin"); + fullResponse.headers().remove("Access-Control-Allow-Headers"); + fullResponse.headers().add("Access-Control-Allow-Origin", "*"); + fullResponse.headers().add("Access-Control-Allow-Methods", "*"); + fullResponse + .headers() + .add("Access-Control-Allow-Headers", + "Content-Type, Cookie, Accept-Encoding, User-Agent, Host, Referer, " + + "X-Requested-With, Accept, Accept-Language, Cache-Control, Connection"); + ChannelFuture f = ctx.write(fullResponse); + f.addListener(ChannelFutureListener.CLOSE); + LOGGER.info("[Option] received!"); + } + + @URIPath({"/NodeCenter", "/SCIDE/SCExecutor", "/SCIDE/CMManager", "/SCIDE/SCManager"}) + public void parseGetAndHandle(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { + JsonObject transformedParam = ArgParser.parseGetAndVerify(req, new VerifiyCallback()); + handleReq(transformedParam, ctx); + } + + @URIPath(method = org.bdware.server.http.HttpMethod.POST, value = {"/NodeCenter", "/SCIDE/SCExecutor", "/SCIDE/CMManager", "/SCIDE/SCManager"}) public void parsePostAndHandle(ChannelHandlerContext ctx, FullHttpRequest req) - throws UnsupportedEncodingException { - ByteBuf content = req.content(); - JsonObject map = - JsonParser.parseReader(new InputStreamReader(new ByteBufInputStream(content))) - .getAsJsonObject(); - // FIXME 感觉不太对劲 - String uri = URLDecoder.decode(req.uri(), "utf-8").split("\\?")[1]; - int index = uri.lastIndexOf('&'); - String str = uri; - if (index != -1) { - str = uri.substring(0, index); - } - injectPermission(map, str); + throws Exception { + JsonObject map = ArgParser.parsePostAndVerify(req, new VerifiyCallback()); + // injectPermission(map, str); handleReq(map, ctx); } @@ -189,6 +213,8 @@ public class NCHttpHandler extends SimpleChannelInboundHandler { HttpVersion.HTTP_1_1, OK, Unpooled.wrappedBuffer(MetaIndexAction.search(map).getBytes())); + response.headers().add("Access-Control-Allow-Origin", "*"); + response.headers().add("Access-Control-Allow-Methods", "*"); ChannelFuture f = ctx.write(response); f.addListener(ChannelFutureListener.CLOSE); @@ -205,7 +231,7 @@ public class NCHttpHandler extends SimpleChannelInboundHandler { } } - public String getRole(String pubKey) { + public static String getRole(String pubKey) { try { String ret = KeyValueDBUtil.instance.getValue( @@ -239,7 +265,6 @@ public class NCHttpHandler extends SimpleChannelInboundHandler { String pubkey = map.get("pubKey").getAsString(); boolean verify = SM2Util.plainStrVerify(pubkey, str, map.get("sign").getAsString()); LOGGER.debug("[CMHttpHandler] HTTP POST 请求验签为 " + verify); - if (verify) { // 查permission String ret = getRole(pubkey); @@ -281,13 +306,15 @@ public class NCHttpHandler extends SimpleChannelInboundHandler { cb = new HttpResultCallback(ctx, map.get("callback").getAsString()); cb.addHeader("charset", "utf-8"); cb.addHeader("Content-type", "text/plain"); - + cb.addHeader("Access-Control-Allow-Origin", "*"); + cb.addHeader("Access-Control-Allow-Methods", "*"); } else { if (map.has("callback")) cb = new HttpResultCallback(ctx, map.get("callback").getAsString()); else cb = new HttpResultCallback(ctx, null); - + cb.addHeader("Access-Control-Allow-Origin", "*"); + cb.addHeader("Access-Control-Allow-Methods", "*"); cb.addHeader("charset", "utf-8"); cb.addHeader("Content-type", "application/json"); } @@ -296,6 +323,8 @@ public class NCHttpHandler extends SimpleChannelInboundHandler { DefaultFullHttpResponse response = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, OK, Unpooled.wrappedBuffer(ret.getBytes())); + response.headers().add("Access-Control-Allow-Origin", "*"); + response.headers().add("Access-Control-Allow-Methods", "*"); ChannelFuture f = ctx.write(response); f.addListener(ChannelFutureListener.CLOSE); } @@ -305,6 +334,8 @@ public class NCHttpHandler extends SimpleChannelInboundHandler { HttpVersion.HTTP_1_1, OK, Unpooled.wrappedBuffer(e.getMessage().getBytes())); + response.headers().add("Access-Control-Allow-Origin", "*"); + response.headers().add("Access-Control-Allow-Methods", "*"); ChannelFuture f = ctx.write(response); f.addListener(ChannelFutureListener.CLOSE); @@ -318,6 +349,8 @@ public class NCHttpHandler extends SimpleChannelInboundHandler { HttpVersion.HTTP_1_1, OK, Unpooled.wrappedBuffer(JsonUtil.toJson(ret).getBytes())); + response.headers().add("Access-Control-Allow-Origin", "*"); + response.headers().add("Access-Control-Allow-Methods", "*"); ChannelFuture f = ctx.write(response); f.addListener(ChannelFutureListener.CLOSE); } diff --git a/src/main/java/org/bdware/server/nodecenter/NCManagerAction.java b/src/main/java/org/bdware/server/nodecenter/NCManagerAction.java index ea4ebea..7d62b06 100644 --- a/src/main/java/org/bdware/server/nodecenter/NCManagerAction.java +++ b/src/main/java/org/bdware/server/nodecenter/NCManagerAction.java @@ -34,13 +34,22 @@ public class NCManagerAction { SM2Util.DOMAIN_PARAMS); private static final Logger LOGGER = LogManager.getLogger(NCManagerAction.class); String sessionID = null; - String pubKey; + private String publicKey; Random random = new Random(); private NodeCenterWSFrameHandler handler; public NCManagerAction(NodeCenterWSFrameHandler nodeCenterWSFrameHandler) { this.handler = nodeCenterWSFrameHandler; - pubKey = null; + publicKey = null; + } + + public String getPubKey(JsonObject jo) { + try { + if (publicKey != null) return publicKey; + return jo.get("pubKey").getAsString(); + } catch (Exception e) { + } + return null; } public static boolean isCenterManager(String pubkey) { @@ -104,6 +113,7 @@ public class NCManagerAction { } public void getRole(JsonObject json, ResultCallback resultCallback) { + String pubKey = getPubKey(json); if (pubKey == null) { simpleReply(resultCallback, "onLogin", Role.Anonymous.name()); return; @@ -156,7 +166,7 @@ public class NCManagerAction { boolean result = SM2Util.plainStrVerify(pubKey, sessionID, signature); LOGGER.debug("session:" + (sessionID)); if (result) { - this.pubKey = pubKey; + this.publicKey = pubKey; LOGGER.debug("设置公钥" + pubKey); getRole(json, resultCallback); } else { @@ -168,6 +178,8 @@ public class NCManagerAction { @Action(userPermission = 0) public void applyRole(JsonObject json, ResultCallback resultCallback) { + String pubKey = getPubKey(json); + long start = System.currentTimeMillis(); if (pubKey == null) { simpleReply(resultCallback, "onApplyRole", "missing pubKey"); @@ -176,7 +188,7 @@ public class NCManagerAction { if (json.has("role")) { Role role = Role.parse(json.get("role").getAsString()); if (role != Role.Anonymous) { - applyAtDB(role, resultCallback, start); + applyAtDB(role, getPubKey(json), resultCallback, start); return; } } @@ -201,7 +213,9 @@ public class NCManagerAction { } } - private void applyAtDB(Role role, ResultCallback resultCallback, long start) { + private void applyAtDB(Role role, String pubKey, ResultCallback resultCallback, long start) { + + String str = KeyValueDBUtil.instance.getValue(NCTables.ApplyRole.toString(), pubKey); String already = KeyValueDBUtil.instance.getValue(NCTables.NodeUser.toString(), pubKey); if (already != null && already.contains(role.toString())) { @@ -241,6 +255,9 @@ public class NCManagerAction { long start = System.currentTimeMillis(); String pubKey = json.get("pubKey").getAsString(); boolean isAccept = json.get("isAccept").getAsBoolean(); + if (json.has("authorizedPubKey")) { + pubKey = json.get("authorizedPubKey").getAsString(); + } LOGGER.debug("[NCManagerAction] " + json.toString()); if (pubKey == null || pubKey.length() == 0) { simpleReply(resultCallback, "onAuthNodeManager", "missing pubKey"); diff --git a/src/main/java/org/bdware/server/nodecenter/NodeCenterActions.java b/src/main/java/org/bdware/server/nodecenter/NodeCenterActions.java index 7ddf19c..b7ba3f9 100644 --- a/src/main/java/org/bdware/server/nodecenter/NodeCenterActions.java +++ b/src/main/java/org/bdware/server/nodecenter/NodeCenterActions.java @@ -41,7 +41,7 @@ public class NodeCenterActions { // TODO 定期清缓存 // public static Map requestCache = // new ConcurrentHashMap<>(); // key is contractID,only for requestAll type contract - TimerTask checkAliveTask; + // TimerTask checkAliveTask; static Timer checkAliveTimer = new Timer(); public NodeCenterFrameHandler controller; String nodeID; // node pubKey @@ -129,9 +129,9 @@ public class NodeCenterActions { @Action public void syncPing(JsonObject json, ResultCallback rc) { // logger.debug("[NodeCenterAction] syncPing"); - checkAliveTask.cancel(); - checkAliveTask = new CheckAgentAliveTimer(nodeID); - checkAliveTimer.schedule(checkAliveTask, 20000L); + //checkAliveTask.cancel(); + //checkAliveTask = new CheckAgentAliveTimer(nodeID); + //checkAliveTimer.schedule(checkAliveTask, 200000L); if (json.has("data")) { String data = json.get("data").getAsString(); LOGGER.debug("[syncPing] data" + data); @@ -229,8 +229,8 @@ public class NodeCenterActions { r.data = "failed"; if (sessionID != null && SM2Util.plainStrVerify(pubkey, pubkey + sessionID, signature)) { nodeID = pubkey; - checkAliveTask = new CheckAgentAliveTimer(nodeID); - checkAliveTimer.schedule(checkAliveTask, 20000L); + //checkAliveTask = new CheckAgentAliveTimer(nodeID); + //checkAliveTimer.schedule(checkAliveTask, 20000L); controller.pubKey = nodeID; @@ -274,30 +274,30 @@ public class NodeCenterActions { // TODO 如果是NC挂了,其他节点重连之后需要另外考虑 // 向该合约发送请求,令其查看自己崩溃前的集群合约 - LOGGER.info("NodeCenter tells nodes new masters for contracts created before the crash!"); - Map request = new HashMap<>(); - request.put("action", "queryUnitContractsID"); - try { - CMNode conn = nodeInfos.get(pubkey); - NodeCenterActions connection = conn.connection; - NodeCenterFrameHandler connController = connection.controller; - connController.sendMsg(JsonUtil.toJson(request)); - } catch (Exception e) { - e.printStackTrace(); - } - Map onNewNodeConnected = new HashMap<>(); - onNewNodeConnected.put("action", "onNewNodeConnected"); - onNewNodeConnected.put("pubKey", pubkey); - for (CMNode node : nodeInfos.values()) { - if (node.pubKey.equals(pubkey)) { - continue; - } - try { - node.connection.controller.sendMsg(JsonUtil.toJson(onNewNodeConnected)); - } catch (Exception e) { - e.printStackTrace(); - } - } +// LOGGER.info("NodeCenter tells nodes new masters for contracts created before the crash!"); +// Map request = new HashMap<>(); +// request.put("action", "queryUnitContractsID"); +// try { +//// CMNode conn = nodeInfos.get(pubkey); +//// NodeCenterActions connection = conn.connection; +//// NodeCenterFrameHandler connController = connection.controller; +//// connController.sendMsg(JsonUtil.toJson(request)); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// Map onNewNodeConnected = new HashMap<>(); +// onNewNodeConnected.put("action", "onNewNodeConnected"); +// onNewNodeConnected.put("pubKey", pubkey); +// for (CMNode node : nodeInfos.values()) { +// if (node.pubKey.equals(pubkey)) { +// continue; +// } +// try { +// // node.connection.controller.sendMsg(JsonUtil.toJson(onNewNodeConnected)); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } rc.onResult(JsonUtil.toJson(r)); } @@ -366,7 +366,7 @@ public class NodeCenterActions { json.get("contracts"), new TypeToken>() { }.getType()); - MetaIndexAction.updateContractsIndex(contracts, rc); + //MetaIndexAction.updateContractsIndex(contracts, rc); LOGGER.debug("update contracts: " + json.get("contracts")); int version = -1; if (json.has("contractVersion")) { @@ -376,6 +376,8 @@ public class NodeCenterActions { if (json.has("events")) { node.events = json.get("events").getAsInt(); } + //just ignore recover + if (nodeInfos != null) return; // 遍历所有节点 for (CMNode cmNode : nodeInfos.values()) { if (cmNode.pubKey.equals(nodeID)) { @@ -1018,7 +1020,7 @@ public class NodeCenterActions { if (project.isFile()) { try { FileInputStream fin = new FileInputStream(project); - byte[] buff = new byte[30 * 1024]; + byte[] buff = new byte[500 * 1024]; long count = 0; long preCount = 0; long total = project.length(); diff --git a/src/main/java/org/bdware/server/nodecenter/NodeCenterWSFrameHandler.java b/src/main/java/org/bdware/server/nodecenter/NodeCenterWSFrameHandler.java index 517715e..84efdea 100644 --- a/src/main/java/org/bdware/server/nodecenter/NodeCenterWSFrameHandler.java +++ b/src/main/java/org/bdware/server/nodecenter/NodeCenterWSFrameHandler.java @@ -47,7 +47,7 @@ public class NodeCenterWSFrameHandler extends SimpleChannelInboundHandler clz = Class.forName(clzName, true, pluginLoader); @@ -81,12 +82,13 @@ public class NodeCenterWSFrameHandler extends SimpleChannelInboundHandler allunits = KeyValueDBUtil.instance.getKeyValues(NCTables.TrustUnitsDB.toString()); if (pubKey != null && KeyValueDBUtil.instance @@ -73,7 +73,7 @@ public class UnitActions { @Action(userPermission = 1 << 6, async = true) public void createTrustUnit(JsonObject json, ResultCallback resultCallback) { - final String pubKey = managerAction.pubKey; + final String pubKey = managerAction.getPubKey(json); LOGGER.debug("[createTrustUnit] " + json.get("data").toString()); KeyValueDBUtil.instance.setValue( NCTables.TrustUnitsDB.toString(),