prune network manager

This commit is contained in:
CaiHQ 2021-11-11 10:31:38 +08:00
parent 1ade70df35
commit 40816e8a45
11 changed files with 43 additions and 186 deletions

View File

@ -31,10 +31,9 @@ import org.bdware.server.action.FileActions;
import org.bdware.server.doip.ContractRepositoryMain;
import org.bdware.server.http.CMHttpHandler;
import org.bdware.server.nodecenter.client.NodeCenterClientHandler;
import org.bdware.server.tcp.MasterClientFrameHandler;
import org.bdware.server.tcp.TCPClientFrameHandler;
import org.bdware.server.ws.ContractManagerFrameHandler;
import org.bdware.units.NetworkManager;
import org.bdware.units.tcp.TCPClientFrameHandler;
import java.io.*;
import java.lang.reflect.Field;
@ -132,9 +131,9 @@ public class CMHttpServer {
// plugins
CMHttpHandler.wsPluginActions = parseStrAsList(cmdConf.wsPluginActions);
MasterClientFrameHandler.clientToAgentPlugins = parseStrAsList(cmdConf.clientToAgentPlugins);
TCPClientFrameHandler.clientToAgentPlugins = parseStrAsList(cmdConf.clientToAgentPlugins);
NodeCenterClientHandler.clientToClusterPlugins = parseStrAsList(cmdConf.clientToClusterPlugins);
TCPClientFrameHandler.tcpPlugins = parseStrAsList(cmdConf.tcpPlugins);
org.bdware.units.tcp.TCPClientFrameHandler.tcpPlugins = parseStrAsList(cmdConf.tcpPlugins);
if (!cmdConf.debug.isEmpty()) {
try {

View File

@ -1,4 +1,4 @@
package org.bdware.server.ws;
package org.bdware.server.action;
import com.google.gson.JsonObject;
import org.bdware.sc.ContractClient;

View File

@ -19,7 +19,7 @@ import org.bdware.server.GlobalConf;
import org.bdware.server.action.Action;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.RequestToMaster;
import org.bdware.server.tcp.MasterClientFrameHandler;
import org.bdware.server.tcp.TCPClientFrameHandler;
import org.bdware.server.trustedmodel.ContractUnitStatus;
import org.bdware.server.trustedmodel.MasterProxy;
import org.bdware.units.function.ExecutionManager;
@ -37,13 +37,13 @@ public class MasterClientRecoverMechAction {
new ConcurrentSet<>(); // contracts which don't finish recoverRestart
public static Map<String, Queue<RequestToMaster>>
requestsToMaster; // when master is re-electing,client node cache ites received requests
private final MasterClientFrameHandler handler;
private final TCPClientFrameHandler handler;
private final MasterClientTCPAction clientAction;
private final Map<String, OutputStream> stateFileMap = new HashMap<>();
private final Map<String, OutputStream> transFileMap = new HashMap<>();
public MasterClientRecoverMechAction(
MasterClientFrameHandler handler, MasterClientTCPAction action) {
TCPClientFrameHandler handler, MasterClientTCPAction action) {
this.handler = handler;
clientAction = action;
}
@ -58,7 +58,7 @@ public class MasterClientRecoverMechAction {
+ contractID
+ " masterID="
+ masterID.substring(0, 5));
MasterClientFrameHandler master = MasterProxy.getHandler(masterID);
TCPClientFrameHandler master = MasterProxy.getHandler(masterID);
Map<String, String> ret = new HashMap<String, String>();
ret.put("action", "askForRecover");

View File

@ -17,7 +17,7 @@ import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.util.JsonUtil;
import org.bdware.server.GlobalConf;
import org.bdware.server.action.*;
import org.bdware.server.tcp.MasterClientFrameHandler;
import org.bdware.server.tcp.TCPClientFrameHandler;
import org.bdware.server.trustedmodel.*;
import org.bdware.units.NetworkManager;
import org.bdware.units.function.ExecutionManager;
@ -43,15 +43,15 @@ public class MasterClientTCPAction {
public static Map<String, MasterClientTCPAction> contractID2MasterInfo =
new ConcurrentHashMap<>();
static Map<String, KillUnitContractInfo> killUnitContractMap = new ConcurrentHashMap<>();
private final MasterClientFrameHandler handler;
private final TCPClientFrameHandler handler;
private final String master;
TimerTask sendPingTask;
TimerTask checkAliveTask;
MasterClientFrameHandler controller;
TCPClientFrameHandler controller;
private long lastMasterPongTime = System.currentTimeMillis();
private boolean waitForSetNode = false;
public MasterClientTCPAction(MasterClientFrameHandler handler, String master) {
public MasterClientTCPAction(TCPClientFrameHandler handler, String master) {
this.handler = handler;
this.master = master;
}
@ -741,7 +741,7 @@ public class MasterClientTCPAction {
}
}
public void init(MasterClientFrameHandler masterClientFrameHandler) {
public void init(TCPClientFrameHandler masterClientFrameHandler) {
LOGGER.info("[MasterClientTCPAction] init : ");
controller = masterClientFrameHandler;
controller.sendMsg(

View File

@ -11,7 +11,7 @@ import org.bdware.sc.util.JsonUtil;
import org.bdware.server.GlobalConf;
import org.bdware.server.action.Action;
import org.bdware.server.action.CMActions;
import org.bdware.server.tcp.MasterClientFrameHandler;
import org.bdware.server.tcp.TCPClientFrameHandler;
import org.bdware.server.trustedmodel.MasterProxy;
import java.io.*;
@ -23,11 +23,11 @@ import java.util.zip.GZIPOutputStream;
public class MasterClientTransferAction {
private static final Logger LOGGER = LogManager.getLogger(MasterClientTransferAction.class);
private final String master;
MasterClientFrameHandler handler;
TCPClientFrameHandler handler;
MasterClientTCPAction action;
private final Map<String, String> id2Memory = new HashMap<>();
public MasterClientTransferAction(MasterClientFrameHandler h, String pubKey, MasterClientTCPAction a) {
public MasterClientTransferAction(TCPClientFrameHandler h, String pubKey, MasterClientTCPAction a) {
handler = h;
master = pubKey;
action = a;

View File

@ -32,7 +32,7 @@ import org.bdware.server.action.p2p.MasterClientRecoverMechAction;
import org.bdware.server.action.p2p.MasterClientTCPAction;
import org.bdware.server.action.p2p.MasterServerRecoverMechAction;
import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.tcp.MasterClientFrameHandler;
import org.bdware.server.tcp.TCPClientFrameHandler;
import org.bdware.server.trustedmodel.MasterProxy;
import org.bdware.server.ws.DelimiterCodec;
import org.bouncycastle.pqc.math.linearalgebra.ByteUtils;
@ -448,7 +448,7 @@ public class NodeCenterClientController implements NodeCenterConn {
public void queryUnitContractsID2(String contractID, String master) {
connectToMaster(master, null); // TODO 等待确保连接成功
MasterClientFrameHandler clientHandler = MasterProxy.getHandler(master);
TCPClientFrameHandler clientHandler = MasterProxy.getHandler(master);
clientHandler.waitForSetNodeID();
RecoverMechTimeRecorder.connectMasterFinish = System.currentTimeMillis();
MasterClientRecoverMechAction.askForRecover(contractID, master, nodeID);
@ -457,6 +457,7 @@ public class NodeCenterClientController implements NodeCenterConn {
// TODO add syncPing
public void connectToMaster(String master, String contractID) {
LOGGER.info("[CMClientController] connectToMaster master= " + master);
// logger.debug("ConnectToMaster:" + master + "\nMasterRoute:" + slaverRouter.get(master));
Bootstrap b;
MasterProxy.MasterConnector connector = null;
@ -468,7 +469,7 @@ public class NodeCenterClientController implements NodeCenterConn {
}
if (connector != null) {
b = new Bootstrap();
MasterClientFrameHandler handler = new MasterClientFrameHandler(master);
TCPClientFrameHandler handler = new TCPClientFrameHandler(master);
connector.bootstrap = b;
connector.handler = handler;
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);

View File

@ -1,132 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.bdware.server.nodecenter.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
/**
* This is an example of a WebSocket client.
* <p>
* In order to run this example you need a compatible WebSocket server.
* Therefore you can either start the WebSocket server from the examples by
* running {@link io.netty.example.http.websocketx.server.WebSocketServer} or
* connect to an existing WebSocket server such as
* <a href="http://www.websocket.org/echo.html">ws://echo.websocket.org</a>.
* <p>
* The client will attempt to connect to the URI passed to it as the first
* argument. You don't have to specify any arguments if you want to connect to
* the example WebSocket server, as this is the default.
*/
public final class WebSocketClient {
static final String URL = "ws://39.106.6.6:8080/SCIDE/SCExecutor";
public static void main(String[] args) throws Exception {
URI uri = new URI(URL);
String scheme = uri.getScheme() == null ? "ws" : uri.getScheme();
final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
final int port;
if (uri.getPort() == -1) {
if ("ws".equalsIgnoreCase(scheme)) {
port = 80;
} else if ("wss".equalsIgnoreCase(scheme)) {
port = 443;
} else {
port = -1;
}
} else {
port = uri.getPort();
}
if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
System.err.println("Only WS(S) is supported.");
return;
}
final boolean ssl = "wss".equalsIgnoreCase(scheme);
final SslContext sslCtx;
if (ssl) {
sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
EventLoopGroup group = new NioEventLoopGroup();
try {
// Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.
// If you change it to V00, ping is not supported and remember to change
// HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.
final NodeCenterClientHandler handler = new NodeCenterClientHandler();
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
}
p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192),
WebSocketClientCompressionHandler.INSTANCE, handler);
}
});
Channel ch = b.connect(uri.getHost(), port).sync().channel();
BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String msg = console.readLine();
if (msg == null) {
break;
} else if ("bye".equalsIgnoreCase(msg)) {
ch.writeAndFlush(new CloseWebSocketFrame());
ch.closeFuture().sync();
break;
} else if ("ping".equalsIgnoreCase(msg)) {
WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[]{8, 1, 8, 1}));
ch.writeAndFlush(frame);
} else {
WebSocketFrame frame = new TextWebSocketFrame(msg);
ch.writeAndFlush(frame);
}
}
} finally {
group.shutdownGracefully();
}
}
}

View File

@ -28,8 +28,8 @@ import java.util.concurrent.Executors;
import static org.bdware.server.CMHttpServer.pluginLoader;
@ChannelHandler.Sharable
public class MasterClientFrameHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger LOGGER = LogManager.getLogger(MasterClientFrameHandler.class);
public class TCPClientFrameHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger LOGGER = LogManager.getLogger(TCPClientFrameHandler.class);
private static final ExecutorService executorService = Executors.newFixedThreadPool(10);
public static String[] clientToAgentPlugins;
public String pubKey;
@ -42,7 +42,7 @@ public class MasterClientFrameHandler extends SimpleChannelInboundHandler<Object
private boolean isConnected;
private String master; // master node pubKey
public MasterClientFrameHandler(String masterPubkey) {
public TCPClientFrameHandler(String masterPubkey) {
master = masterPubkey;
actions = new MasterClientTCPAction(this, master);
recoverActions = new MasterClientRecoverMechAction(this, actions);

View File

@ -16,7 +16,7 @@ import org.bdware.sc.util.JsonUtil;
import org.bdware.server.CongestionControl;
import org.bdware.server.action.CMActions;
import org.bdware.server.action.p2p.MasterServerTCPAction;
import org.bdware.server.tcp.MasterClientFrameHandler;
import org.bdware.server.tcp.TCPClientFrameHandler;
import java.text.SimpleDateFormat;
import java.util.Date;
@ -31,11 +31,7 @@ public class MasterProxy implements MasterStub {
// nodePubkey该node作为master的地址
public static Map<String, String> slaverRouter = new HashMap<>();
// static {
// // 和连的所有master心跳
// }
public static MasterClientFrameHandler getHandler(String masterID) {
public static TCPClientFrameHandler getHandler(String masterID) {
return CONNECTORS.get(masterID).handler;
}
@ -189,7 +185,7 @@ public class MasterProxy implements MasterStub {
public static class MasterConnector {
public Bootstrap bootstrap;
public MasterClientFrameHandler handler;
public TCPClientFrameHandler handler;
}
static class StrCollector extends ResultCallback {

View File

@ -44,9 +44,9 @@ public class NetworkManager {
public static NetworkManager instance = new NetworkManager();
private final Map<String, String> peerID2TCPAddress;
private final Map<String, TCPClientFrameHandler> tcpClientMap;
private TCPServerFrameHandler tcpHandler;
private String tcpNodeCenter;
private String tcpSelf;
// connection to node center
private NodeCenterClientHandler nodeCenterClientHandler;
public NetworkManager() {
@ -67,16 +67,22 @@ public class NetworkManager {
return tcpClientMap.containsKey(peer);
}
public TCPServerFrameHandler getTCPHandler() {
return tcpHandler;
}
private void connectToTCPNodeCenter() {
final Bootstrap b = new Bootstrap();
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
nodeCenterClientHandler = new NodeCenterClientHandler();
CMActions.manager.nodeCenterConn = nodeCenterClientHandler.controller;
setBootstrap(b);
EventLoopGroup group = new NioEventLoopGroup();
b.group(group);
b.channel(NioSocketChannel.class)
.handler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new DelimiterCodec()).addLast(nodeCenterClientHandler);
}
});
ContractManager.scheduledThreadPool.scheduleWithFixedDelay(
() -> {
try {
@ -110,19 +116,6 @@ public class NetworkManager {
TimeUnit.SECONDS);
}
private void setBootstrap(Bootstrap b) {
EventLoopGroup group = new NioEventLoopGroup();
b.group(group);
b.channel(NioSocketChannel.class)
.handler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new DelimiterCodec()).addLast(nodeCenterClientHandler);
}
});
}
private void createTCPServer(int port, EventLoopGroup workerGroup) {
try {

View File

@ -5,7 +5,7 @@ import org.bdware.server.action.ActionExecutor;
import org.bdware.server.http.CMHttpHandler;
import org.bdware.server.permission.Permission;
import org.bdware.server.permission.Role;
import org.bdware.server.tcp.MasterClientFrameHandler;
import org.bdware.server.tcp.TCPClientFrameHandler;
import org.bdware.server.tcp.TCPServerFrameHandler;
import org.bdware.server.ws.ContractManagerFrameHandler;
import org.junit.Before;
@ -21,7 +21,7 @@ public class PermissionHelper {
@Test
public void compareClientAndServer() {
MasterClientFrameHandler h = new MasterClientFrameHandler("abc");
TCPClientFrameHandler h = new TCPClientFrameHandler("abc");
Map<String, ActionExecutor.Pair<Method, Object>> handlers = h.ae.getHandlers();
List<Line> lines = parseWithClzName(handlers);
TCPServerFrameHandler h2 = new TCPServerFrameHandler();
@ -88,7 +88,7 @@ public class PermissionHelper {
@Test
public void listMasterClientAction() {
MasterClientFrameHandler handler = new MasterClientFrameHandler("dd");
TCPClientFrameHandler handler = new TCPClientFrameHandler("dd");
parse(handler.ae.getHandlers(), false);
}