From 40816e8a4505e66e4465b966269e0d7d50454ad4 Mon Sep 17 00:00:00 2001 From: CaiHQ Date: Thu, 11 Nov 2021 10:31:38 +0800 Subject: [PATCH] prune network manager --- .../java/org/bdware/server/CMHttpServer.java | 7 +- .../server/{ws => action}/EventActions.java | 2 +- .../p2p/MasterClientRecoverMechAction.java | 8 +- .../action/p2p/MasterClientTCPAction.java | 10 +- .../p2p/MasterClientTransferAction.java | 6 +- .../client/NodeCenterClientController.java | 7 +- .../nodecenter/client/WebSocketClient.java | 132 ------------------ ...andler.java => TCPClientFrameHandler.java} | 6 +- .../server/trustedmodel/MasterProxy.java | 10 +- .../java/org/bdware/units/NetworkManager.java | 35 ++--- .../org/bdware/server/PermissionHelper.java | 6 +- 11 files changed, 43 insertions(+), 186 deletions(-) rename src/main/java/org/bdware/server/{ws => action}/EventActions.java (97%) delete mode 100644 src/main/java/org/bdware/server/nodecenter/client/WebSocketClient.java rename src/main/java/org/bdware/server/tcp/{MasterClientFrameHandler.java => TCPClientFrameHandler.java} (96%) diff --git a/src/main/java/org/bdware/server/CMHttpServer.java b/src/main/java/org/bdware/server/CMHttpServer.java index bfff690..9de4c3e 100644 --- a/src/main/java/org/bdware/server/CMHttpServer.java +++ b/src/main/java/org/bdware/server/CMHttpServer.java @@ -31,10 +31,9 @@ import org.bdware.server.action.FileActions; import org.bdware.server.doip.ContractRepositoryMain; import org.bdware.server.http.CMHttpHandler; import org.bdware.server.nodecenter.client.NodeCenterClientHandler; -import org.bdware.server.tcp.MasterClientFrameHandler; +import org.bdware.server.tcp.TCPClientFrameHandler; import org.bdware.server.ws.ContractManagerFrameHandler; import org.bdware.units.NetworkManager; -import org.bdware.units.tcp.TCPClientFrameHandler; import java.io.*; import java.lang.reflect.Field; @@ -132,9 +131,9 @@ public class CMHttpServer { // plugins CMHttpHandler.wsPluginActions = parseStrAsList(cmdConf.wsPluginActions); - MasterClientFrameHandler.clientToAgentPlugins = parseStrAsList(cmdConf.clientToAgentPlugins); + TCPClientFrameHandler.clientToAgentPlugins = parseStrAsList(cmdConf.clientToAgentPlugins); NodeCenterClientHandler.clientToClusterPlugins = parseStrAsList(cmdConf.clientToClusterPlugins); - TCPClientFrameHandler.tcpPlugins = parseStrAsList(cmdConf.tcpPlugins); + org.bdware.units.tcp.TCPClientFrameHandler.tcpPlugins = parseStrAsList(cmdConf.tcpPlugins); if (!cmdConf.debug.isEmpty()) { try { diff --git a/src/main/java/org/bdware/server/ws/EventActions.java b/src/main/java/org/bdware/server/action/EventActions.java similarity index 97% rename from src/main/java/org/bdware/server/ws/EventActions.java rename to src/main/java/org/bdware/server/action/EventActions.java index 8a96697..13bd2a7 100644 --- a/src/main/java/org/bdware/server/ws/EventActions.java +++ b/src/main/java/org/bdware/server/action/EventActions.java @@ -1,4 +1,4 @@ -package org.bdware.server.ws; +package org.bdware.server.action; import com.google.gson.JsonObject; import org.bdware.sc.ContractClient; diff --git a/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java b/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java index 912fe08..ad00cb3 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java @@ -19,7 +19,7 @@ import org.bdware.server.GlobalConf; import org.bdware.server.action.Action; import org.bdware.server.action.CMActions; import org.bdware.server.action.RequestToMaster; -import org.bdware.server.tcp.MasterClientFrameHandler; +import org.bdware.server.tcp.TCPClientFrameHandler; import org.bdware.server.trustedmodel.ContractUnitStatus; import org.bdware.server.trustedmodel.MasterProxy; import org.bdware.units.function.ExecutionManager; @@ -37,13 +37,13 @@ public class MasterClientRecoverMechAction { new ConcurrentSet<>(); // contracts which don't finish recoverRestart public static Map> requestsToMaster; // when master is re-electing,client node cache ites received requests - private final MasterClientFrameHandler handler; + private final TCPClientFrameHandler handler; private final MasterClientTCPAction clientAction; private final Map stateFileMap = new HashMap<>(); private final Map transFileMap = new HashMap<>(); public MasterClientRecoverMechAction( - MasterClientFrameHandler handler, MasterClientTCPAction action) { + TCPClientFrameHandler handler, MasterClientTCPAction action) { this.handler = handler; clientAction = action; } @@ -58,7 +58,7 @@ public class MasterClientRecoverMechAction { + contractID + " masterID=" + masterID.substring(0, 5)); - MasterClientFrameHandler master = MasterProxy.getHandler(masterID); + TCPClientFrameHandler master = MasterProxy.getHandler(masterID); Map ret = new HashMap(); ret.put("action", "askForRecover"); diff --git a/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java b/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java index e4e81d1..0457acf 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java @@ -17,7 +17,7 @@ import org.bdware.sc.units.MultiContractMeta; import org.bdware.sc.util.JsonUtil; import org.bdware.server.GlobalConf; import org.bdware.server.action.*; -import org.bdware.server.tcp.MasterClientFrameHandler; +import org.bdware.server.tcp.TCPClientFrameHandler; import org.bdware.server.trustedmodel.*; import org.bdware.units.NetworkManager; import org.bdware.units.function.ExecutionManager; @@ -43,15 +43,15 @@ public class MasterClientTCPAction { public static Map contractID2MasterInfo = new ConcurrentHashMap<>(); static Map killUnitContractMap = new ConcurrentHashMap<>(); - private final MasterClientFrameHandler handler; + private final TCPClientFrameHandler handler; private final String master; TimerTask sendPingTask; TimerTask checkAliveTask; - MasterClientFrameHandler controller; + TCPClientFrameHandler controller; private long lastMasterPongTime = System.currentTimeMillis(); private boolean waitForSetNode = false; - public MasterClientTCPAction(MasterClientFrameHandler handler, String master) { + public MasterClientTCPAction(TCPClientFrameHandler handler, String master) { this.handler = handler; this.master = master; } @@ -741,7 +741,7 @@ public class MasterClientTCPAction { } } - public void init(MasterClientFrameHandler masterClientFrameHandler) { + public void init(TCPClientFrameHandler masterClientFrameHandler) { LOGGER.info("[MasterClientTCPAction] init : "); controller = masterClientFrameHandler; controller.sendMsg( diff --git a/src/main/java/org/bdware/server/action/p2p/MasterClientTransferAction.java b/src/main/java/org/bdware/server/action/p2p/MasterClientTransferAction.java index 719391a..90e8832 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterClientTransferAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterClientTransferAction.java @@ -11,7 +11,7 @@ import org.bdware.sc.util.JsonUtil; import org.bdware.server.GlobalConf; import org.bdware.server.action.Action; import org.bdware.server.action.CMActions; -import org.bdware.server.tcp.MasterClientFrameHandler; +import org.bdware.server.tcp.TCPClientFrameHandler; import org.bdware.server.trustedmodel.MasterProxy; import java.io.*; @@ -23,11 +23,11 @@ import java.util.zip.GZIPOutputStream; public class MasterClientTransferAction { private static final Logger LOGGER = LogManager.getLogger(MasterClientTransferAction.class); private final String master; - MasterClientFrameHandler handler; + TCPClientFrameHandler handler; MasterClientTCPAction action; private final Map id2Memory = new HashMap<>(); - public MasterClientTransferAction(MasterClientFrameHandler h, String pubKey, MasterClientTCPAction a) { + public MasterClientTransferAction(TCPClientFrameHandler h, String pubKey, MasterClientTCPAction a) { handler = h; master = pubKey; action = a; diff --git a/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientController.java b/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientController.java index 5dfd692..23a2093 100644 --- a/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientController.java +++ b/src/main/java/org/bdware/server/nodecenter/client/NodeCenterClientController.java @@ -32,7 +32,7 @@ import org.bdware.server.action.p2p.MasterClientRecoverMechAction; import org.bdware.server.action.p2p.MasterClientTCPAction; import org.bdware.server.action.p2p.MasterServerRecoverMechAction; import org.bdware.server.action.p2p.MasterServerTCPAction; -import org.bdware.server.tcp.MasterClientFrameHandler; +import org.bdware.server.tcp.TCPClientFrameHandler; import org.bdware.server.trustedmodel.MasterProxy; import org.bdware.server.ws.DelimiterCodec; import org.bouncycastle.pqc.math.linearalgebra.ByteUtils; @@ -448,7 +448,7 @@ public class NodeCenterClientController implements NodeCenterConn { public void queryUnitContractsID2(String contractID, String master) { connectToMaster(master, null); // TODO 等待确保连接成功? - MasterClientFrameHandler clientHandler = MasterProxy.getHandler(master); + TCPClientFrameHandler clientHandler = MasterProxy.getHandler(master); clientHandler.waitForSetNodeID(); RecoverMechTimeRecorder.connectMasterFinish = System.currentTimeMillis(); MasterClientRecoverMechAction.askForRecover(contractID, master, nodeID); @@ -457,6 +457,7 @@ public class NodeCenterClientController implements NodeCenterConn { // TODO add syncPing public void connectToMaster(String master, String contractID) { LOGGER.info("[CMClientController] connectToMaster master= " + master); + // logger.debug("ConnectToMaster:" + master + "\nMasterRoute:" + slaverRouter.get(master)); Bootstrap b; MasterProxy.MasterConnector connector = null; @@ -468,7 +469,7 @@ public class NodeCenterClientController implements NodeCenterConn { } if (connector != null) { b = new Bootstrap(); - MasterClientFrameHandler handler = new MasterClientFrameHandler(master); + TCPClientFrameHandler handler = new TCPClientFrameHandler(master); connector.bootstrap = b; connector.handler = handler; b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000); diff --git a/src/main/java/org/bdware/server/nodecenter/client/WebSocketClient.java b/src/main/java/org/bdware/server/nodecenter/client/WebSocketClient.java deleted file mode 100644 index 7f171d5..0000000 --- a/src/main/java/org/bdware/server/nodecenter/client/WebSocketClient.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.bdware.server.nodecenter.client; - -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.http.HttpClientCodec; -import io.netty.handler.codec.http.HttpObjectAggregator; -import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; -import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; -import io.netty.handler.codec.http.websocketx.WebSocketFrame; -import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler; -import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslContextBuilder; -import io.netty.handler.ssl.util.InsecureTrustManagerFactory; - -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.net.URI; - -/** - * This is an example of a WebSocket client. - *

- * In order to run this example you need a compatible WebSocket server. - * Therefore you can either start the WebSocket server from the examples by - * running {@link io.netty.example.http.websocketx.server.WebSocketServer} or - * connect to an existing WebSocket server such as - * ws://echo.websocket.org. - *

- * The client will attempt to connect to the URI passed to it as the first - * argument. You don't have to specify any arguments if you want to connect to - * the example WebSocket server, as this is the default. - */ -public final class WebSocketClient { - - static final String URL = "ws://39.106.6.6:8080/SCIDE/SCExecutor"; - - public static void main(String[] args) throws Exception { - URI uri = new URI(URL); - String scheme = uri.getScheme() == null ? "ws" : uri.getScheme(); - final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost(); - final int port; - if (uri.getPort() == -1) { - if ("ws".equalsIgnoreCase(scheme)) { - port = 80; - } else if ("wss".equalsIgnoreCase(scheme)) { - port = 443; - } else { - port = -1; - } - } else { - port = uri.getPort(); - } - - if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) { - System.err.println("Only WS(S) is supported."); - return; - } - - final boolean ssl = "wss".equalsIgnoreCase(scheme); - final SslContext sslCtx; - if (ssl) { - sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); - } else { - sslCtx = null; - } - - EventLoopGroup group = new NioEventLoopGroup(); - try { - // Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00. - // If you change it to V00, ping is not supported and remember to change - // HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline. - final NodeCenterClientHandler handler = new NodeCenterClientHandler(); - - Bootstrap b = new Bootstrap(); - b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel ch) { - ChannelPipeline p = ch.pipeline(); - if (sslCtx != null) { - p.addLast(sslCtx.newHandler(ch.alloc(), host, port)); - } - p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), - WebSocketClientCompressionHandler.INSTANCE, handler); - } - }); - - Channel ch = b.connect(uri.getHost(), port).sync().channel(); - - BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); - while (true) { - String msg = console.readLine(); - if (msg == null) { - break; - } else if ("bye".equalsIgnoreCase(msg)) { - ch.writeAndFlush(new CloseWebSocketFrame()); - ch.closeFuture().sync(); - break; - } else if ("ping".equalsIgnoreCase(msg)) { - WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[]{8, 1, 8, 1})); - ch.writeAndFlush(frame); - } else { - WebSocketFrame frame = new TextWebSocketFrame(msg); - ch.writeAndFlush(frame); - } - } - } finally { - group.shutdownGracefully(); - } - } -} \ No newline at end of file diff --git a/src/main/java/org/bdware/server/tcp/MasterClientFrameHandler.java b/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java similarity index 96% rename from src/main/java/org/bdware/server/tcp/MasterClientFrameHandler.java rename to src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java index 7859a92..242abcf 100644 --- a/src/main/java/org/bdware/server/tcp/MasterClientFrameHandler.java +++ b/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java @@ -28,8 +28,8 @@ import java.util.concurrent.Executors; import static org.bdware.server.CMHttpServer.pluginLoader; @ChannelHandler.Sharable -public class MasterClientFrameHandler extends SimpleChannelInboundHandler { - private static final Logger LOGGER = LogManager.getLogger(MasterClientFrameHandler.class); +public class TCPClientFrameHandler extends SimpleChannelInboundHandler { + private static final Logger LOGGER = LogManager.getLogger(TCPClientFrameHandler.class); private static final ExecutorService executorService = Executors.newFixedThreadPool(10); public static String[] clientToAgentPlugins; public String pubKey; @@ -42,7 +42,7 @@ public class MasterClientFrameHandler extends SimpleChannelInboundHandler slaverRouter = new HashMap<>(); - // static { - // // 和连的所有master心跳 - // } - - public static MasterClientFrameHandler getHandler(String masterID) { + public static TCPClientFrameHandler getHandler(String masterID) { return CONNECTORS.get(masterID).handler; } @@ -189,7 +185,7 @@ public class MasterProxy implements MasterStub { public static class MasterConnector { public Bootstrap bootstrap; - public MasterClientFrameHandler handler; + public TCPClientFrameHandler handler; } static class StrCollector extends ResultCallback { diff --git a/src/main/java/org/bdware/units/NetworkManager.java b/src/main/java/org/bdware/units/NetworkManager.java index ef8503a..dd5c2d7 100644 --- a/src/main/java/org/bdware/units/NetworkManager.java +++ b/src/main/java/org/bdware/units/NetworkManager.java @@ -44,9 +44,9 @@ public class NetworkManager { public static NetworkManager instance = new NetworkManager(); private final Map peerID2TCPAddress; private final Map tcpClientMap; - private TCPServerFrameHandler tcpHandler; private String tcpNodeCenter; private String tcpSelf; + // connection to node center private NodeCenterClientHandler nodeCenterClientHandler; public NetworkManager() { @@ -67,16 +67,22 @@ public class NetworkManager { return tcpClientMap.containsKey(peer); } - public TCPServerFrameHandler getTCPHandler() { - return tcpHandler; - } - private void connectToTCPNodeCenter() { final Bootstrap b = new Bootstrap(); b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000); nodeCenterClientHandler = new NodeCenterClientHandler(); CMActions.manager.nodeCenterConn = nodeCenterClientHandler.controller; - setBootstrap(b); + EventLoopGroup group = new NioEventLoopGroup(); + b.group(group); + b.channel(NioSocketChannel.class) + .handler( + new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline p = ch.pipeline(); + p.addLast(new DelimiterCodec()).addLast(nodeCenterClientHandler); + } + }); ContractManager.scheduledThreadPool.scheduleWithFixedDelay( () -> { try { @@ -110,19 +116,6 @@ public class NetworkManager { TimeUnit.SECONDS); } - private void setBootstrap(Bootstrap b) { - EventLoopGroup group = new NioEventLoopGroup(); - b.group(group); - b.channel(NioSocketChannel.class) - .handler( - new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel ch) { - ChannelPipeline p = ch.pipeline(); - p.addLast(new DelimiterCodec()).addLast(nodeCenterClientHandler); - } - }); - } private void createTCPServer(int port, EventLoopGroup workerGroup) { try { @@ -310,8 +303,8 @@ public class NetworkManager { public void waitForNodeCenterConnected() { for (int i = 0; - i < 10 && null != nodeCenterClientHandler && !nodeCenterClientHandler.isConnected(); - i++) { + i < 10 && null != nodeCenterClientHandler && !nodeCenterClientHandler.isConnected(); + i++) { try { Thread.sleep(200); } catch (InterruptedException e) { diff --git a/src/test/java/org/bdware/server/PermissionHelper.java b/src/test/java/org/bdware/server/PermissionHelper.java index 7a3603e..f85da5c 100644 --- a/src/test/java/org/bdware/server/PermissionHelper.java +++ b/src/test/java/org/bdware/server/PermissionHelper.java @@ -5,7 +5,7 @@ import org.bdware.server.action.ActionExecutor; import org.bdware.server.http.CMHttpHandler; import org.bdware.server.permission.Permission; import org.bdware.server.permission.Role; -import org.bdware.server.tcp.MasterClientFrameHandler; +import org.bdware.server.tcp.TCPClientFrameHandler; import org.bdware.server.tcp.TCPServerFrameHandler; import org.bdware.server.ws.ContractManagerFrameHandler; import org.junit.Before; @@ -21,7 +21,7 @@ public class PermissionHelper { @Test public void compareClientAndServer() { - MasterClientFrameHandler h = new MasterClientFrameHandler("abc"); + TCPClientFrameHandler h = new TCPClientFrameHandler("abc"); Map> handlers = h.ae.getHandlers(); List lines = parseWithClzName(handlers); TCPServerFrameHandler h2 = new TCPServerFrameHandler(); @@ -88,7 +88,7 @@ public class PermissionHelper { @Test public void listMasterClientAction() { - MasterClientFrameHandler handler = new MasterClientFrameHandler("dd"); + TCPClientFrameHandler handler = new TCPClientFrameHandler("dd"); parse(handler.ae.getHandlers(), false); }