From f0ca9ab85d054a0296845e9e8a80ffcd0de78c80 Mon Sep 17 00:00:00 2001 From: "Frank.R.Wu" Date: Mon, 1 Nov 2021 22:24:27 +0800 Subject: [PATCH] feat(cm): update event mechanism update resultCallback used in contract consumer part of EventBroker.deliverEvent; add exception handling for WSClientConsumer --- .../java/org/bdware/sc/ContractClient.java | 2 +- src/main/java/org/bdware/sc/ContractMeta.java | 29 ++++---- .../java/org/bdware/sc/event/EventBroker.java | 67 ++++++++++--------- .../sc/event/clients/ContractConsumer.java | 27 ++++---- .../sc/event/clients/WSClientConsumer.java | 3 + 5 files changed, 69 insertions(+), 59 deletions(-) diff --git a/src/main/java/org/bdware/sc/ContractClient.java b/src/main/java/org/bdware/sc/ContractClient.java index debcbba..8328a57 100644 --- a/src/main/java/org/bdware/sc/ContractClient.java +++ b/src/main/java/org/bdware/sc/ContractClient.java @@ -217,7 +217,6 @@ public class ContractClient { public String startProcess(PrintStream ps) { isRunning = false; port = -1; - LOGGER.info("port=" + port); String darg = "-Djava.library.path="; String classpath; @@ -309,6 +308,7 @@ public class ContractClient { } assert status != null; port = Integer.parseInt(status.split(" ")[0]); + LOGGER.debug("port=" + port); ContractManager.cPort.updateDb(port, true); get = new SocketGet("127.0.0.1", port); get.syncGet("", "setDBInfo", ContractManager.dbPath); diff --git a/src/main/java/org/bdware/sc/ContractMeta.java b/src/main/java/org/bdware/sc/ContractMeta.java index 1c6587d..d8c8c8d 100644 --- a/src/main/java/org/bdware/sc/ContractMeta.java +++ b/src/main/java/org/bdware/sc/ContractMeta.java @@ -38,12 +38,17 @@ public class ContractMeta implements IDSerializable { */ // MapMaskInfo; - public ContractMeta() {} + public ContractMeta() { + } public ContractStatusEnum getStatus() { return status; } + public void setStatus(ContractStatusEnum status) { + this.status = status; + } + public String getID() { return id; } @@ -56,7 +61,9 @@ public class ContractMeta implements IDSerializable { return exportedFunctions; } - public Set getDependentContracts() { return dependentContracts; } + public Set getDependentContracts() { + return dependentContracts; + } public Map getEvents() { return declaredEvents; @@ -78,6 +85,12 @@ public class ContractMeta implements IDSerializable { return name; } + public void setName(String name) { + this.name = name; + } + + // public setMask(){} + public boolean getIsDebug() { return isDebug; } @@ -90,10 +103,6 @@ public class ContractMeta implements IDSerializable { if (desp != null) funCache.put(action, desp); return desp; } - // public setMask(){ - ; - - // } private FunctionDesp seekFunction(String action) { for (FunctionDesp desp : exportedFunctions) { @@ -108,12 +117,4 @@ public class ContractMeta implements IDSerializable { status = ContractStatusEnum.HANGED; isDebug = false; } - - public void setName(String name) { - this.name = name; - } - - public void setStatus(ContractStatusEnum status) { - this.status = status; - } } diff --git a/src/main/java/org/bdware/sc/event/EventBroker.java b/src/main/java/org/bdware/sc/event/EventBroker.java index cc086d9..a6a72c5 100644 --- a/src/main/java/org/bdware/sc/event/EventBroker.java +++ b/src/main/java/org/bdware/sc/event/EventBroker.java @@ -409,37 +409,35 @@ public class EventBroker { ContractManager.threadPool.execute(() -> { ResultCallback cb = new ResultCallback() { @Override - public void onResult(String str) { + public void onResult(String unused) { // if the delivering fails, unsubscribe the consumer - if (null != str) { - ContractConsumer c = (ContractConsumer) consumer; - ContractClient client = ContractManager.instance.getClient(c.getContract()); - String reqID = - ContractManager.instance.nodeCenterConn.getNodeKeyPair().getPublicKeyStr() + - "_" + System.currentTimeMillis(); - REvent unsubEvent = - new REvent( - topic, - UNSUBSCRIBE, - "{\"subscriber\":\"" + cId + "\"}", - reqID); - unsubEvent.doSignature(client.getPubkey(), client.getContractKey()); - handle(unsubEvent); + ContractConsumer c = (ContractConsumer) consumer; + ContractClient client = ContractManager.instance.getClient(c.getContract()); + String reqID = + ContractManager.instance.nodeCenterConn.getNodeKeyPair().getPublicKeyStr() + + "_" + System.currentTimeMillis(); + REvent unsubEvent = + new REvent( + topic, + UNSUBSCRIBE, + "{\"subscriber\":\"" + cId + "\"}", + reqID); + unsubEvent.doSignature(client.getPubkey(), client.getContractKey()); + handle(unsubEvent); - // if the event is an ONLY_ONCE event, retry publishing - if (NEED_RETRY.equals(event.getSemantics())) { - REvent newMsg = - new REvent( - topic.split("\\|")[0], - PUBLISH, - event.getContent(), - event.getRequestID()); - newMsg.setSemantics(ONLY_ONCE); - newMsg.setHash(event.getHash()); - newMsg.setTxHash(event.getTxHash()); - newMsg.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair()); - handle(newMsg); - } + // if the event is an ONLY_ONCE event, retry publishing + if (NEED_RETRY.equals(event.getSemantics())) { + REvent newMsg = + new REvent( + topic.split("\\|")[0], + PUBLISH, + event.getContent(), + event.getRequestID()); + newMsg.setSemantics(ONLY_ONCE); + newMsg.setHash(event.getHash()); + newMsg.setTxHash(event.getTxHash()); + newMsg.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair()); + handle(newMsg); } } }; @@ -449,7 +447,7 @@ public class EventBroker { consumer.competeSub(cEventStr, cb, event.getRequestID(), event.getHash()); } }); - } else { + } else if (consumer instanceof NodeConsumer) { // node consumer ContractManager.threadPool.execute(() -> { if (isPub) { @@ -458,6 +456,15 @@ public class EventBroker { consumer.competeSub(nEventStr, null); } }); + } else if (isPub) { + // client consumer + ContractManager.threadPool.execute(() -> + consumer.publishEvent(nEventStr, new ResultCallback() { + @Override + public void onResult(String unused) { + unsubInReg(null, consumer, topic2cIds, id2Consumers); + } + })); } } else { topic2cIds.get(topic).remove(cId); diff --git a/src/main/java/org/bdware/sc/event/clients/ContractConsumer.java b/src/main/java/org/bdware/sc/event/clients/ContractConsumer.java index cfff548..7b2212b 100644 --- a/src/main/java/org/bdware/sc/event/clients/ContractConsumer.java +++ b/src/main/java/org/bdware/sc/event/clients/ContractConsumer.java @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class ContractConsumer implements IEventConsumer { private static final Logger LOGGER = LogManager.getLogger(ContractConsumer.class); - private static final long PERIOD = 2500L; private static final int TIMEOUT_COUNT = 5; private static final Map> scheduledFutures = new ConcurrentHashMap<>(); @@ -76,7 +75,7 @@ public class ContractConsumer implements IEventConsumer { cr.setRequestID(options[0]); ContractClient cc = ContractManager.instance.getClient(contract); if (null == cc) { - rc.onResult(""); + rc.onResult((String) null); return; } AtomicInteger callCount = new AtomicInteger(0); @@ -87,19 +86,18 @@ public class ContractConsumer implements IEventConsumer { boolean ret = true; try { ContractResult result = JsonUtil.fromJson(str, ContractResult.class); - if (result.status == ContractResult.Status.Success) { - String retStr = null; - rc.onResult(retStr); - } else if (callCount.get() == TIMEOUT_COUNT || - (result.status == ContractResult.Status.Exception && - result.result.toString().contains("not exported"))) { - rc.onResult(""); - } else { - ret = false; + if (!result.status.equals(ContractResult.Status.Success)) { + if (callCount.get() == TIMEOUT_COUNT || + (result.status == ContractResult.Status.Exception && + result.result.toString().contains("not exported"))) { + rc.onResult((String) null); + } else { + ret = false; + } } } catch (Exception e) { LOGGER.warn("receiving event error! " + contract + "." + handler + ": " + e.getMessage()); - rc.onResult(""); + rc.onResult((String) null); } callCount.incrementAndGet(); if (ret) { @@ -108,16 +106,17 @@ public class ContractConsumer implements IEventConsumer { // wait for setting scheduledFutures flag.wait(500L); } catch (InterruptedException e) { - e.printStackTrace(); + LOGGER.error(e.getMessage()); } } scheduledFutures.get(getId()).cancel(true); + scheduledFutures.remove(getId()); } } }, (reqID, hashStr) -> { }), 500L, - PERIOD, + 2500L, TimeUnit.MILLISECONDS); scheduledFutures.put(getId(), future); synchronized (flag) { diff --git a/src/main/java/org/bdware/sc/event/clients/WSClientConsumer.java b/src/main/java/org/bdware/sc/event/clients/WSClientConsumer.java index 5327f3d..49096c2 100644 --- a/src/main/java/org/bdware/sc/event/clients/WSClientConsumer.java +++ b/src/main/java/org/bdware/sc/event/clients/WSClientConsumer.java @@ -36,6 +36,9 @@ public class WSClientConsumer implements IEventConsumer { @Override public void publishEvent(String msg, ResultCallback rc, String... options) { + if (!channel.isActive()) { + rc.onResult((String) null); + } JsonObject ret = new JsonObject(); ret.addProperty("action", "onEvent"); ret.add("data", JsonUtil.parseString(msg));