diff --git a/src/main/java/org/bdware/sc/ContractManager.java b/src/main/java/org/bdware/sc/ContractManager.java index 4009654..00f7ee0 100644 --- a/src/main/java/org/bdware/sc/ContractManager.java +++ b/src/main/java/org/bdware/sc/ContractManager.java @@ -2,6 +2,7 @@ package org.bdware.sc; import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; +import io.netty.channel.Channel; import io.prometheus.client.Counter; import org.apache.commons.codec.digest.DigestUtils; import org.apache.logging.log4j.LogManager; @@ -18,6 +19,7 @@ import org.bdware.sc.db.*; import org.bdware.sc.event.EventBroker; import org.bdware.sc.event.REvent; import org.bdware.sc.event.REvent.REventSemantics; +import org.bdware.sc.event.clients.WSClientConsumer; import org.bdware.sc.handler.ManagerHandler; import org.bdware.sc.node.*; import org.bdware.sc.units.MultiContractMeta; @@ -1324,9 +1326,9 @@ public class ContractManager { start, System.currentTimeMillis() - start); } - if (client.contractMeta.getYjsType()==YjsType.Oracle){ + if (client.contractMeta.getYjsType() == YjsType.Oracle) { oracleCounter.inc(); - }else { + } else { contractCounter.inc(); } totalCounter.inc(); @@ -2199,6 +2201,10 @@ public class ContractManager { return analysisClient.get.syncGet("", "staticVerify", JsonUtil.toJson(c)); } + public void subEventByClient(String topic, Channel channel) { + eventBroker.doSubscribe(topic, new WSClientConsumer(channel)); + } + // 合约状态 static class StrCollector extends ResultCallback { diff --git a/src/main/java/org/bdware/sc/event/EventBroker.java b/src/main/java/org/bdware/sc/event/EventBroker.java index 408d212..cc086d9 100644 --- a/src/main/java/org/bdware/sc/event/EventBroker.java +++ b/src/main/java/org/bdware/sc/event/EventBroker.java @@ -142,6 +142,10 @@ public class EventBroker { } } + public void doSubscribe(String topic, IEventConsumer consumer) { + subInReg(topic, consumer, this.topic2cIds, this.id2Consumers); + } + /** * do subscribing in registry * @@ -167,10 +171,17 @@ public class EventBroker { topic2cIds.put(topic, new HashSet<>()); } topic2cIds.get(topic).add(cId); - if (consumer instanceof ContractConsumer) { - LOGGER.info("contract " + ((ContractConsumer) consumer).getContract() + " subscribes topic " + topic); - } else { - LOGGER.info("node " + consumer.getId() + " subscribes topic " + topic); + switch (consumer.getType()) { + case Contract: + LOGGER.info("contract " + ((ContractConsumer) consumer).getContract() + " subscribes topic " + topic); + break; + case Node: + LOGGER.info("node " + consumer.getId() + " subscribes topic " + topic); + break; + case WSClient: + LOGGER.info("client " + consumer.getId() + " subscribes topic " + topic); + default: + break; } return true; } diff --git a/src/main/java/org/bdware/sc/event/EventRecorder.java b/src/main/java/org/bdware/sc/event/EventRecorder.java index 7d1f2f7..c89393f 100644 --- a/src/main/java/org/bdware/sc/event/EventRecorder.java +++ b/src/main/java/org/bdware/sc/event/EventRecorder.java @@ -9,6 +9,7 @@ import org.bdware.sc.event.REvent.REventSemantics; import org.bdware.sc.event.REvent.REventType; import org.bdware.sc.event.clients.ContractConsumer; import org.bdware.sc.event.clients.IEventConsumer; +import org.bdware.sc.event.clients.IEventConsumer.ConsumerType; import org.bdware.sc.event.clients.NodeConsumer; import org.bdware.sc.util.HashUtil; import org.bdware.sc.util.JsonUtil; @@ -96,12 +97,28 @@ public class EventRecorder { JsonObject id2Consumers = data.getAsJsonObject("id2Consumers"); for (String k : id2Consumers.keySet()) { JsonObject consumer = id2Consumers.getAsJsonObject(k); - if (consumer.has("handler")) { - cp.id2Consumers.put(k, - new ContractConsumer(consumer.get("contract").getAsString(), - consumer.get("handler").getAsString())); - } else { - cp.id2Consumers.put(k, new NodeConsumer(k)); + if (!consumer.has("type")) { + continue; + } + ConsumerType type = ConsumerType.valueOf(consumer.get("type").getAsString()); + switch (type) { + case Contract: + cp.id2Consumers.put(k, + new ContractConsumer(consumer.get("contract").getAsString(), + consumer.get("handler").getAsString())); + break; + case Node: + cp.id2Consumers.put(k, new NodeConsumer(k)); + default: + break; + } + } + for (String topic : cp.topic2cIds.keySet()) { + Set idSet = cp.topic2cIds.get(topic); + for (String id : idSet) { + if (!cp.id2Consumers.containsKey(id)) { + cp.id2Consumers.remove(id); + } } } cp.prev = data.get("prev").getAsString(); diff --git a/src/main/java/org/bdware/sc/event/clients/ClientConsumer.java b/src/main/java/org/bdware/sc/event/clients/ClientConsumer.java deleted file mode 100644 index 7d46998..0000000 --- a/src/main/java/org/bdware/sc/event/clients/ClientConsumer.java +++ /dev/null @@ -1,23 +0,0 @@ -package org.bdware.sc.event.clients; - -import org.bdware.sc.conn.ResultCallback; - -/** - * @author Kaidong Wu - */ -public class ClientConsumer implements IEventConsumer { - @Override - public String getId() { - return null; - } - - @Override - public void publishEvent(String msg, ResultCallback rc, String... options) { -// TODO - } - - @Override - public void competeSub(String msg, ResultCallback rc, String... options) { -// TODO - } -} diff --git a/src/main/java/org/bdware/sc/event/clients/ContractConsumer.java b/src/main/java/org/bdware/sc/event/clients/ContractConsumer.java index 9d51d31..260ea15 100644 --- a/src/main/java/org/bdware/sc/event/clients/ContractConsumer.java +++ b/src/main/java/org/bdware/sc/event/clients/ContractConsumer.java @@ -31,6 +31,8 @@ public class ContractConsumer implements IEventConsumer { private final String handler; @Expose(serialize = false, deserialize = false) private final Object flag = new Object(); + @SuppressWarnings("unused") + private final ConsumerType type = ConsumerType.Contract; public ContractConsumer(String contract, String handler) { this.contract = contract; @@ -46,6 +48,11 @@ public class ContractConsumer implements IEventConsumer { return HashUtil.sha3(contract + handler); } + @Override + public ConsumerType getType() { + return type; + } + @Override public void publishEvent(String msg, ResultCallback rc, String... options) { executeContract(msg, this.handler, rc, options); diff --git a/src/main/java/org/bdware/sc/event/clients/IEventConsumer.java b/src/main/java/org/bdware/sc/event/clients/IEventConsumer.java index 20a607d..48e1985 100644 --- a/src/main/java/org/bdware/sc/event/clients/IEventConsumer.java +++ b/src/main/java/org/bdware/sc/event/clients/IEventConsumer.java @@ -5,7 +5,15 @@ import org.bdware.sc.conn.ResultCallback; public interface IEventConsumer { String getId(); + ConsumerType getType(); + void publishEvent(String msg, ResultCallback rc, String... options); void competeSub(String msg, ResultCallback rc, String... options); + + enum ConsumerType { + Contract, + Node, + WSClient + } } diff --git a/src/main/java/org/bdware/sc/event/clients/NodeConsumer.java b/src/main/java/org/bdware/sc/event/clients/NodeConsumer.java index ca08a41..77d5f72 100644 --- a/src/main/java/org/bdware/sc/event/clients/NodeConsumer.java +++ b/src/main/java/org/bdware/sc/event/clients/NodeConsumer.java @@ -10,6 +10,8 @@ public class NodeConsumer implements IEventConsumer { private static EventCenter center; private final String nodeId; + @SuppressWarnings("unused") + private final ConsumerType type = ConsumerType.Node; public NodeConsumer(String nodeId) { this.nodeId = nodeId; @@ -24,6 +26,11 @@ public class NodeConsumer implements IEventConsumer { return this.nodeId; } + @Override + public ConsumerType getType() { + return type; + } + @Override public void publishEvent(String msg, ResultCallback rc, String... options) { center.publishEvent(msg, nodeId); diff --git a/src/main/java/org/bdware/sc/event/clients/WSClientConsumer.java b/src/main/java/org/bdware/sc/event/clients/WSClientConsumer.java new file mode 100644 index 0000000..0dcfa05 --- /dev/null +++ b/src/main/java/org/bdware/sc/event/clients/WSClientConsumer.java @@ -0,0 +1,52 @@ +package org.bdware.sc.event.clients; + +import com.google.gson.JsonObject; +import com.google.gson.annotations.Expose; +import io.netty.channel.Channel; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.bdware.sc.conn.ResultCallback; +import org.bdware.sc.util.JsonUtil; + +/** + * @author Kaidong Wu + */ +public class WSClientConsumer implements IEventConsumer { + private static final Logger LOGGER = LogManager.getLogger(WSClientConsumer.class); + + @Expose(serialize = false, deserialize = false) + private final Channel channel; + @SuppressWarnings("unused") + private final ConsumerType type = ConsumerType.WSClient; + + public WSClientConsumer(Channel channel) { + this.channel = channel; + } + + public Channel getChannel() { + return channel; + } + + @Override + public String getId() { + return "WSC_" + channel.id().asLongText(); + } + + @Override + public ConsumerType getType() { + return type; + } + + @Override + public void publishEvent(String msg, ResultCallback rc, String... options) { + JsonObject ret = new JsonObject(); + ret.addProperty("action", "onEvent"); + ret.add("data", JsonUtil.parseString(msg)); + channel.writeAndFlush(new TextWebSocketFrame(ret.toString())); + } + + @Override + public void competeSub(String msg, ResultCallback rc, String... options) { + } +}