From 4122cc01282c93803c9e80926d6d0c56cf080838 Mon Sep 17 00:00:00 2001 From: CaiHQ Date: Fri, 2 Sep 2022 21:24:29 +0800 Subject: [PATCH] optimize http result callback --- .../server/action/HttpResultCallback.java | 12 +-- .../HttpServerSentEventResultCallback.java | 86 +++++++++++++++++ .../src/org/bdware/server/http/ArgParser.java | 96 +++++++++++++++++++ 3 files changed, 185 insertions(+), 9 deletions(-) create mode 100644 src/main/src/org/bdware/server/action/HttpServerSentEventResultCallback.java create mode 100644 src/main/src/org/bdware/server/http/ArgParser.java diff --git a/src/main/src/org/bdware/server/action/HttpResultCallback.java b/src/main/src/org/bdware/server/action/HttpResultCallback.java index 2f98e18..81ee87b 100644 --- a/src/main/src/org/bdware/server/action/HttpResultCallback.java +++ b/src/main/src/org/bdware/server/action/HttpResultCallback.java @@ -58,8 +58,9 @@ public class HttpResultCallback extends ResultCallback implements Runnable { } - public void addHeader(String key, String val) { + public HttpResultCallback addHeader(String key, String val) { extraHeaders.put(key, val); + return this; } @Override @@ -72,11 +73,4 @@ public class HttpResultCallback extends ResultCallback implements Runnable { public void setDecodeBase64() { decodeAsB64 = true; } -} -/* -scp ./libs/front-base-0.80.jar dev@023.node.internetapi.cn:./ -scp ./libs/front-base-0.80.jar dev@021.node.internetapi.cn:./ -scp ./libs/front-base-0.80.jar dev@024.node.internetapi.cn:./ - - -* */ \ No newline at end of file +} \ No newline at end of file diff --git a/src/main/src/org/bdware/server/action/HttpServerSentEventResultCallback.java b/src/main/src/org/bdware/server/action/HttpServerSentEventResultCallback.java new file mode 100644 index 0000000..e73e68f --- /dev/null +++ b/src/main/src/org/bdware/server/action/HttpServerSentEventResultCallback.java @@ -0,0 +1,86 @@ +package org.bdware.server.action; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.*; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class HttpServerSentEventResultCallback extends HttpResultCallback implements Closeable { + public static ScheduledExecutorService scheduledThreadPool = + Executors.newScheduledThreadPool(1); + private ScheduledFuture currentScheduler; + private long lastUpdate; + + public HttpServerSentEventResultCallback(ChannelHandlerContext ctx, String jsonCallback) { + super(ctx, jsonCallback); + ctxField = ctx; + + } + + public void writeInitialHead() { + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, + HttpResponseStatus.OK); + for (String key : extraHeaders.keySet()) + response.headers().add(key, extraHeaders.get(key)); + ctxField.writeAndFlush(response); + currentScheduler = scheduledThreadPool.schedule(this, 10, TimeUnit.SECONDS); + final ByteBuf buffer = Unpooled.copiedBuffer("\n", StandardCharsets.UTF_8); + ctxField.writeAndFlush(new DefaultHttpContent(buffer)); + } + + boolean closed = false; + + @Override + public void onResult(String ret) { + ByteBuf event = Unpooled.copiedBuffer("event: onResult\n", StandardCharsets.UTF_8); + ctxField.writeAndFlush(new DefaultHttpContent(event)); + ByteBuf buffer = Unpooled.copiedBuffer("data: " + ret + "\n\n", StandardCharsets.UTF_8); + ctxField.writeAndFlush(new DefaultHttpContent(buffer)); + lastUpdate = System.currentTimeMillis(); + if (ret.contains("\"onDistributeFinish\"")) { + lastUpdate = 0; + currentScheduler.cancel(true); + try { + close(); + } catch (Exception e) { + } + } + } + + static Logger LOGGER = LogManager.getLogger(HttpServerSentEventResultCallback.class); + + @Override + public void run() { + try { + if (System.currentTimeMillis() - lastUpdate < 10000L) { + //reschedule + LOGGER.info("Reschedule time out"); + currentScheduler = scheduledThreadPool.schedule(this, 10, TimeUnit.SECONDS); + return; + } + if (!closed) { + final ByteBuf buffer = Unpooled.copiedBuffer("{\"action\":\"onDistributeFinish\",\"progress\":\"-1\",\"data\":\"timeout\"}" + "\n\n", StandardCharsets.UTF_8); + ctxField.writeAndFlush(new DefaultHttpContent(buffer)); + } + close(); + } catch (IOException e) { + + } + } + + @Override + public synchronized void close() throws IOException { + closed = true; + ctxField.close(); + } +} diff --git a/src/main/src/org/bdware/server/http/ArgParser.java b/src/main/src/org/bdware/server/http/ArgParser.java new file mode 100644 index 0000000..4d4584a --- /dev/null +++ b/src/main/src/org/bdware/server/http/ArgParser.java @@ -0,0 +1,96 @@ +package org.bdware.server.http; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.QueryStringDecoder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.bdware.sc.util.ExceptionUtil; +import org.bouncycastle.crypto.params.ECPublicKeyParameters; +import org.bouncycastle.pqc.math.linearalgebra.ByteUtils; +import org.zz.gmhelper.BCECUtil; +import org.zz.gmhelper.SM2Util; + +import java.io.InputStreamReader; +import java.net.URLDecoder; +import java.util.List; +import java.util.Map; + +public class ArgParser { + + public interface VerifiedCallback { + void onResult(boolean verified, JsonObject param); + } + + private static final Logger LOGGER = LogManager.getLogger(ArgParser.class); + + public static JsonObject parseGetAndVerify(FullHttpRequest msg, VerifiedCallback cb) throws Exception { + QueryStringDecoder decoderQuery = new QueryStringDecoder(msg.uri()); + Map> parameters = decoderQuery.parameters(); + JsonObject transformedParam = new JsonObject(); + for (String key : parameters.keySet()) { + List val = parameters.get(key); + if (null != val) { + transformedParam.addProperty(key, val.get(0)); + } + } + // 匿名用户权限为0 + transformedParam.addProperty("permission", 0); + transformedParam.remove("verifiedPubKey"); + // 验签 有pubKey就必须有sign + String uri = URLDecoder.decode(msg.uri(), "UTF-8").split("\\?")[1]; + int index = uri.lastIndexOf('&'); + if (index >= 0) + verifyParam(transformedParam, uri.substring(0, index), cb); + return transformedParam; + } + + private static void verifyParam(JsonObject transformedParam, String toVerifyStr, VerifiedCallback cb) { + boolean verify = false; + if (transformedParam.has("pubKey")) { + LOGGER.info("before verifying: " + toVerifyStr); + try { + ECPublicKeyParameters pubKey = + BCECUtil.createECPublicKeyFromStrParameters( + transformedParam.get("pubKey").getAsString(), SM2Util.CURVE, SM2Util.DOMAIN_PARAMS); + verify = + SM2Util.verify( + pubKey, toVerifyStr.getBytes(), ByteUtils.fromHexString(transformedParam.get("sign").getAsString())); + + } catch (Exception e) { + LOGGER.error(e.getMessage()); + LOGGER.debug(ExceptionUtil.exceptionToString(e)); + } + } + if (cb != null) + cb.onResult(verify, transformedParam); + return; + } + + public static JsonObject parsePostAndVerify(FullHttpRequest msg, VerifiedCallback cb) throws Exception { + ByteBuf content = msg.content(); + JsonObject map = + JsonParser.parseReader(new InputStreamReader(new ByteBufInputStream(content))) + .getAsJsonObject(); + StringBuilder toSign = new StringBuilder(); + boolean isFirst = true; + for (String key : map.keySet()) { + if (!key.equals("sign")) { + if (!isFirst) toSign.append("&"); + isFirst = false; + toSign.append(key).append("="); + JsonElement je = map.get(key); + if (je.isJsonPrimitive()) + toSign.append(je.getAsString()); + else toSign.append(je.toString()); + } + } + verifyParam(map, toSign.toString(), cb); + return map; + + } +}