Merge branch 'master' into feat/consistency-sdk

This commit is contained in:
汪旭鑫 2022-02-15 15:03:57 +08:00
commit a2f8ab528b
2 changed files with 28 additions and 11 deletions

View File

@ -106,7 +106,7 @@ public class EventBroker {
doSubscribe(event);
// save & try to sub in center
recorder.appendEvent(event);
center.subInCenter(event.getTopic(), event.getSemantics());
center.subInCenter(event.getTopic(), event.getSemantics(), event.getCenter());
}
break;
case UNSUBSCRIBE:

View File

@ -15,17 +15,18 @@ public class EventCenter {
* get the nearest node to the topic in the hash function range
*
* @param topic the topic
* @param k the number of centers
* @return id of the node
*/
public String getCenterByTopic(String topic) {
public String[] getCenterByTopic(String topic, int k) {
if (null == instance.nodeCenterConn) {
return null;
}
String[] centers = DHTUtil.getClusterByKey(topic, 1);
String[] centers = DHTUtil.getClusterByKey(topic, k);
if (null == centers || centers.length == 0) {
return null;
}
return centers[0];
return centers;
}
/**
@ -33,21 +34,34 @@ public class EventCenter {
*
* @param topic event topic
* @param semantics event semantics, used to mark PRESUB events
* @param center id of event center if the subscribing has been handled
*/
public void subInCenter(String topic, REvent.REventSemantics semantics) {
public void subInCenter(String topic, REvent.REventSemantics semantics, String center) {
if (null == instance.nodeCenterConn) {
return;
}
REvent msg = new REvent(
topic,
REvent msg = new REvent(topic,
REvent.REventType.SUBSCRIBE,
String.format("{\"subscriber\":\"%s\"}",
instance.nodeCenterConn.getNodeId()),
"");
msg.setSemantics(semantics);
msg.doSignature(instance.nodeCenterConn.getNodeKeyPair());
String nodeId = getCenterByTopic(topic);
instance.masterStub.deliverEvent(nodeId, JsonUtil.toJson(msg));
msg.setCenter(center);
String[] centers = getCenterByTopic(topic, 2);
if (null == centers) {
return;
}
if (null != center) {
if (centers[0].equals(center) && centers.length > 1) {
instance.masterStub.deliverEvent(centers[1], JsonUtil.toJson(msg));
} else {
instance.masterStub.deliverEvent(centers[0], JsonUtil.toJson(msg));
}
} else if (!instance.masterStub.deliverEvent(centers[0], JsonUtil.toJson(msg)) && centers.length > 1) {
msg.setCenter(centers[0]);
instance.masterStub.deliverEvent(centers[1], JsonUtil.toJson(msg));
}
}
/**
@ -61,8 +75,11 @@ public class EventCenter {
if (null == instance.masterStub) {
return false;
}
String nodeId = getCenterByTopic(topic);
return instance.masterStub.deliverEvent(nodeId, JsonUtil.toJson(event));
String[] centers = getCenterByTopic(topic, 1);
if (null == centers) {
return false;
}
return instance.masterStub.deliverEvent(centers[0], JsonUtil.toJson(event));
}
/**