fix: fix bugs in event mechanism

now EventCenter.subInCenter will resend event to the previous node to sub in sub-center (when receiving NodeConsumer), or send msg to sub-center (when receiving ContractConsumer)
This commit is contained in:
Frank.R.Wu 2022-02-10 11:12:24 +08:00
parent 3dec7e9f47
commit ea6781adb2
2 changed files with 29 additions and 12 deletions

View File

@ -103,10 +103,10 @@ public class EventBroker {
switch (event.getType()) { switch (event.getType()) {
case SUBSCRIBE: case SUBSCRIBE:
if (null != topic && !topic.isEmpty()) { if (null != topic && !topic.isEmpty()) {
doSubscribe(event); IEventConsumer consumer = doSubscribe(event);
// save & try to sub in center // save & try to sub in center
recorder.appendEvent(event); recorder.appendEvent(event);
center.subInCenter(event.getTopic(), event.getSemantics(), event.getCenter()); center.subInCenter(event.getTopic(), event.getSemantics(), event.getCenter(), consumer, event);
} }
break; break;
case UNSUBSCRIBE: case UNSUBSCRIBE:
@ -133,13 +133,17 @@ public class EventBroker {
} }
} }
private void doSubscribe(REvent e) { private IEventConsumer doSubscribe(REvent e) {
subInReg(e.getTopic(), parseConsumer(e.getContent()), this.topic2cIds, this.id2Consumers); IEventConsumer consumer = parseConsumer(e.getContent());
// for events with semantics ONLY_ONCE, mark the topic is a temporary topic if (subInReg(e.getTopic(), consumer, this.topic2cIds, this.id2Consumers)) {
if (ONLY_ONCE.equals(e.getSemantics()) && !tempTopics.containsKey(e.getTopic())) { // for events with semantics ONLY_ONCE, mark the topic is a temporary topic
tempTopics.put(e.getTopic(), System.currentTimeMillis()); if (ONLY_ONCE.equals(e.getSemantics()) && !tempTopics.containsKey(e.getTopic())) {
recorder.saveTempTopics(tempTopics); tempTopics.put(e.getTopic(), System.currentTimeMillis());
recorder.saveTempTopics(tempTopics);
}
return consumer;
} }
return null;
} }
public void doSubscribe(String topic, IEventConsumer consumer) { public void doSubscribe(String topic, IEventConsumer consumer) {
@ -275,6 +279,9 @@ public class EventBroker {
if (null != handler && !handler.isEmpty()) { if (null != handler && !handler.isEmpty()) {
consumer = new ContractConsumer(subscriber, handler); consumer = new ContractConsumer(subscriber, handler);
} else { } else {
if (subscriber.equals(ContractManager.instance.nodeCenterConn.getNodeId())) {
return null;
}
consumer = new NodeConsumer(subscriber); consumer = new NodeConsumer(subscriber);
} }
return consumer; return consumer;

View File

@ -1,5 +1,8 @@
package org.bdware.sc.event; package org.bdware.sc.event;
import org.bdware.sc.event.clients.ContractConsumer;
import org.bdware.sc.event.clients.IEventConsumer;
import org.bdware.sc.event.clients.NodeConsumer;
import org.bdware.sc.util.DHTUtil; import org.bdware.sc.util.DHTUtil;
import org.bdware.sc.util.JsonUtil; import org.bdware.sc.util.JsonUtil;
@ -35,8 +38,10 @@ public class EventCenter {
* @param topic event topic * @param topic event topic
* @param semantics event semantics, used to mark PRESUB events * @param semantics event semantics, used to mark PRESUB events
* @param center id of event center if the subscribing has been handled * @param center id of event center if the subscribing has been handled
* @param consumer consumer
* @param event original event
*/ */
public void subInCenter(String topic, REvent.REventSemantics semantics, String center) { public void subInCenter(String topic, REvent.REventSemantics semantics, String center, IEventConsumer consumer, REvent event) {
if (null == instance.nodeCenterConn) { if (null == instance.nodeCenterConn) {
return; return;
} }
@ -58,9 +63,14 @@ public class EventCenter {
} else { } else {
instance.masterStub.deliverEvent(centers[0], JsonUtil.toJson(msg)); instance.masterStub.deliverEvent(centers[0], JsonUtil.toJson(msg));
} }
} else if (!instance.masterStub.deliverEvent(centers[0], JsonUtil.toJson(msg)) && centers.length > 1) { } else if (!instance.masterStub.deliverEvent(centers[0], JsonUtil.toJson(msg))) {
msg.setCenter(centers[0]); if (consumer instanceof NodeConsumer) {
instance.masterStub.deliverEvent(centers[1], JsonUtil.toJson(msg)); event.setCenter(centers[0]);
instance.masterStub.deliverEvent(consumer.getId(), JsonUtil.toJson(event));
} else if (consumer instanceof ContractConsumer && centers.length > 1) {
msg.setCenter(centers[0]);
instance.masterStub.deliverEvent(centers[1], JsonUtil.toJson(msg));
}
} }
} }