diff --git a/src/main/java/org/bdware/sc/event/EventBroker.java b/src/main/java/org/bdware/sc/event/EventBroker.java index 2cad159..9be32ef 100644 --- a/src/main/java/org/bdware/sc/event/EventBroker.java +++ b/src/main/java/org/bdware/sc/event/EventBroker.java @@ -106,7 +106,7 @@ public class EventBroker { doSubscribe(event); // save & try to sub in center recorder.appendEvent(event); - center.subInCenter(event.getTopic(), event.getSemantics()); + center.subInCenter(event.getTopic(), event.getSemantics(), event.getCenter()); } break; case UNSUBSCRIBE: diff --git a/src/main/java/org/bdware/sc/event/EventCenter.java b/src/main/java/org/bdware/sc/event/EventCenter.java index 17a678c..fd4cb3f 100644 --- a/src/main/java/org/bdware/sc/event/EventCenter.java +++ b/src/main/java/org/bdware/sc/event/EventCenter.java @@ -15,17 +15,18 @@ public class EventCenter { * get the nearest node to the topic in the hash function range * * @param topic the topic + * @param k the number of centers * @return id of the node */ - public String getCenterByTopic(String topic) { + public String[] getCenterByTopic(String topic, int k) { if (null == instance.nodeCenterConn) { return null; } - String[] centers = DHTUtil.getClusterByKey(topic, 1); + String[] centers = DHTUtil.getClusterByKey(topic, k); if (null == centers || centers.length == 0) { return null; } - return centers[0]; + return centers; } /** @@ -33,21 +34,34 @@ public class EventCenter { * * @param topic event topic * @param semantics event semantics, used to mark PRESUB events + * @param center id of event center if the subscribing has been handled */ - public void subInCenter(String topic, REvent.REventSemantics semantics) { + public void subInCenter(String topic, REvent.REventSemantics semantics, String center) { if (null == instance.nodeCenterConn) { return; } - REvent msg = new REvent( - topic, + REvent msg = new REvent(topic, REvent.REventType.SUBSCRIBE, String.format("{\"subscriber\":\"%s\"}", instance.nodeCenterConn.getNodeId()), ""); msg.setSemantics(semantics); msg.doSignature(instance.nodeCenterConn.getNodeKeyPair()); - String nodeId = getCenterByTopic(topic); - instance.masterStub.deliverEvent(nodeId, JsonUtil.toJson(msg)); + msg.setCenter(center); + String[] centers = getCenterByTopic(topic, 2); + if (null == centers) { + return; + } + if (null != center) { + if (centers[0].equals(center) && centers.length > 1) { + instance.masterStub.deliverEvent(centers[1], JsonUtil.toJson(msg)); + } else { + instance.masterStub.deliverEvent(centers[0], JsonUtil.toJson(msg)); + } + } else if (!instance.masterStub.deliverEvent(centers[0], JsonUtil.toJson(msg)) && centers.length > 1) { + msg.setCenter(centers[0]); + instance.masterStub.deliverEvent(centers[1], JsonUtil.toJson(msg)); + } } /** @@ -61,8 +75,11 @@ public class EventCenter { if (null == instance.masterStub) { return false; } - String nodeId = getCenterByTopic(topic); - return instance.masterStub.deliverEvent(nodeId, JsonUtil.toJson(event)); + String[] centers = getCenterByTopic(topic, 1); + if (null == centers) { + return false; + } + return instance.masterStub.deliverEvent(centers[0], JsonUtil.toJson(event)); } /**