From ea6781adb2ce73e75a399a20df22a51140ede448 Mon Sep 17 00:00:00 2001 From: "Frank.R.Wu" Date: Thu, 10 Feb 2022 11:12:24 +0800 Subject: [PATCH] fix: fix bugs in event mechanism now EventCenter.subInCenter will resend event to the previous node to sub in sub-center (when receiving NodeConsumer), or send msg to sub-center (when receiving ContractConsumer) --- .../java/org/bdware/sc/event/EventBroker.java | 23 ++++++++++++------- .../java/org/bdware/sc/event/EventCenter.java | 18 +++++++++++---- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/bdware/sc/event/EventBroker.java b/src/main/java/org/bdware/sc/event/EventBroker.java index 9be32ef..d40f699 100644 --- a/src/main/java/org/bdware/sc/event/EventBroker.java +++ b/src/main/java/org/bdware/sc/event/EventBroker.java @@ -103,10 +103,10 @@ public class EventBroker { switch (event.getType()) { case SUBSCRIBE: if (null != topic && !topic.isEmpty()) { - doSubscribe(event); + IEventConsumer consumer = doSubscribe(event); // save & try to sub in center recorder.appendEvent(event); - center.subInCenter(event.getTopic(), event.getSemantics(), event.getCenter()); + center.subInCenter(event.getTopic(), event.getSemantics(), event.getCenter(), consumer, event); } break; case UNSUBSCRIBE: @@ -133,13 +133,17 @@ public class EventBroker { } } - private void doSubscribe(REvent e) { - subInReg(e.getTopic(), parseConsumer(e.getContent()), this.topic2cIds, this.id2Consumers); - // for events with semantics ONLY_ONCE, mark the topic is a temporary topic - if (ONLY_ONCE.equals(e.getSemantics()) && !tempTopics.containsKey(e.getTopic())) { - tempTopics.put(e.getTopic(), System.currentTimeMillis()); - recorder.saveTempTopics(tempTopics); + private IEventConsumer doSubscribe(REvent e) { + IEventConsumer consumer = parseConsumer(e.getContent()); + if (subInReg(e.getTopic(), consumer, this.topic2cIds, this.id2Consumers)) { + // for events with semantics ONLY_ONCE, mark the topic is a temporary topic + if (ONLY_ONCE.equals(e.getSemantics()) && !tempTopics.containsKey(e.getTopic())) { + tempTopics.put(e.getTopic(), System.currentTimeMillis()); + recorder.saveTempTopics(tempTopics); + } + return consumer; } + return null; } public void doSubscribe(String topic, IEventConsumer consumer) { @@ -275,6 +279,9 @@ public class EventBroker { if (null != handler && !handler.isEmpty()) { consumer = new ContractConsumer(subscriber, handler); } else { + if (subscriber.equals(ContractManager.instance.nodeCenterConn.getNodeId())) { + return null; + } consumer = new NodeConsumer(subscriber); } return consumer; diff --git a/src/main/java/org/bdware/sc/event/EventCenter.java b/src/main/java/org/bdware/sc/event/EventCenter.java index fd4cb3f..dcb8bd9 100644 --- a/src/main/java/org/bdware/sc/event/EventCenter.java +++ b/src/main/java/org/bdware/sc/event/EventCenter.java @@ -1,5 +1,8 @@ package org.bdware.sc.event; +import org.bdware.sc.event.clients.ContractConsumer; +import org.bdware.sc.event.clients.IEventConsumer; +import org.bdware.sc.event.clients.NodeConsumer; import org.bdware.sc.util.DHTUtil; import org.bdware.sc.util.JsonUtil; @@ -35,8 +38,10 @@ public class EventCenter { * @param topic event topic * @param semantics event semantics, used to mark PRESUB events * @param center id of event center if the subscribing has been handled + * @param consumer consumer + * @param event original event */ - public void subInCenter(String topic, REvent.REventSemantics semantics, String center) { + public void subInCenter(String topic, REvent.REventSemantics semantics, String center, IEventConsumer consumer, REvent event) { if (null == instance.nodeCenterConn) { return; } @@ -58,9 +63,14 @@ public class EventCenter { } else { instance.masterStub.deliverEvent(centers[0], JsonUtil.toJson(msg)); } - } else if (!instance.masterStub.deliverEvent(centers[0], JsonUtil.toJson(msg)) && centers.length > 1) { - msg.setCenter(centers[0]); - instance.masterStub.deliverEvent(centers[1], JsonUtil.toJson(msg)); + } else if (!instance.masterStub.deliverEvent(centers[0], JsonUtil.toJson(msg))) { + if (consumer instanceof NodeConsumer) { + event.setCenter(centers[0]); + instance.masterStub.deliverEvent(consumer.getId(), JsonUtil.toJson(event)); + } else if (consumer instanceof ContractConsumer && centers.length > 1) { + msg.setCenter(centers[0]); + instance.masterStub.deliverEvent(centers[1], JsonUtil.toJson(msg)); + } } }