cm/src/main/java/org/bdware/sc/event/EventBroker.java
2021-09-26 12:50:12 +08:00

479 lines
21 KiB
Java

package org.bdware.sc.event;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.ContractClient;
import org.bdware.sc.ContractManager;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.db.CMTables;
import org.bdware.sc.event.clients.ContractConsumer;
import org.bdware.sc.event.clients.IEventConsumer;
import org.bdware.sc.event.clients.NodeConsumer;
import org.bdware.sc.util.HashUtil;
import org.bdware.sc.util.JsonUtil;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import static org.bdware.sc.event.REvent.REventSemantics.NEED_RETRY;
import static org.bdware.sc.event.REvent.REventSemantics.ONLY_ONCE;
import static org.bdware.sc.event.REvent.REventType.*;
/**
* @author Kaidong Wu
*/
public class EventBroker {
private static final Logger LOGGER = LogManager.getLogger(EventBroker.class);
private static final long EXPIRED_TIME = 90 * 1000L;
private final EventCenter center;
private final EventRecorder recorder;
private final Map<String, Set<String>> topic2cIds;
private final Map<String, IEventConsumer> id2Consumers;
private final Map<String, ThreadFlag> threadFlags;
private final Map<String, Long> tempTopics;
// private final Map<String, Stack<REvent>> client2Events;
public EventBroker() {
center = new EventCenter();
recorder = new EventRecorder(CMTables.EventRegistry.toString(), this);
threadFlags = new ConcurrentHashMap<>();
// recover registries from database
EventRecorder.CheckPoint cp = recorder.recoverRegistryFromDb();
topic2cIds = cp.topic2cIds;
id2Consumers = cp.id2Consumers;
tempTopics = recorder.recoverTempTopicsFromDb();
// regularly check temporary topics and clean them
ContractManager.scheduledThreadPool.scheduleWithFixedDelay(
() -> {
long current = System.currentTimeMillis();
int oldSize = tempTopics.size();
tempTopics.keySet().forEach(topic -> {
if (tempTopics.get(topic) + EXPIRED_TIME > current) {
String reqID =
ContractManager.instance.nodeCenterConn.getNodeKeyPair().getPublicKeyStr() +
"_" + System.currentTimeMillis();
REvent cleanEvent =
new REvent(
topic,
UNSUBSCRIBE,
null,
reqID);
cleanEvent.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair());
handle(cleanEvent);
tempTopics.remove(topic);
}
});
if (oldSize != tempTopics.size()) {
recorder.saveTempTopics(tempTopics);
}
},
0L,
EXPIRED_TIME,
TimeUnit.MILLISECONDS);
// regularly create check point in database
ContractManager.scheduledThreadPool.scheduleAtFixedRate(
() -> recorder.createCheckPoint(topic2cIds, id2Consumers),
EXPIRED_TIME,
EXPIRED_TIME,
TimeUnit.MILLISECONDS);
NodeConsumer.setCenter(center);
// client2Events = new HashMap<>();
LOGGER.info("Event Broker starts!");
}
/**
* handle the event after checking the signature
*
* @param event event request
*/
public void handle(REvent event) {
if (!event.verifySignature()) {
LOGGER.debug(JsonUtil.toJson(event));
return;
}
String topic = event.getTopic();
switch (event.getType()) {
case SUBSCRIBE:
if (null != topic && !topic.isEmpty()) {
doSubscribe(event);
// save & try to sub in center
recorder.appendEvent(event);
center.subInCenter(event.getTopic(), event.getSemantics());
}
break;
case UNSUBSCRIBE:
doUnsubscribe(event);
recorder.appendEvent(event);
break;
case PUBLISH:
case PREPUB:
case PRESUB:
LOGGER.info(String.format("Receive %s event from topic %s", event.getSemantics(), topic));
LOGGER.debug(String.format("Receive %s event %s: %s",
event.getSemantics(), topic, event.getContent()));
if (event.isForward()) {
// send event to the event center
event.setForward(center.deliverEvent(topic, event));
}
// if the event is from or in event center, save and do publishing
if (!event.isForward()) {
recorder.appendEvent(event);
doPublish(event);
}
default:
break;
}
}
private void doSubscribe(REvent e) {
subInReg(e.getTopic(), parseConsumer(e.getContent()), this.topic2cIds, this.id2Consumers);
// for events with semantics ONLY_ONCE, mark the topic is a temporary topic
if (ONLY_ONCE.equals(e.getSemantics()) && !tempTopics.containsKey(e.getTopic())) {
tempTopics.put(e.getTopic(), System.currentTimeMillis());
recorder.saveTempTopics(tempTopics);
}
}
/**
* do subscribing in registry
*
* @param topic event topic
* @param consumer the consumer
* @param topic2cIds topic registry of broker or a check point in event recorder
* @param id2Consumers consumer registry of broker or a check point in event recorder
* @return if the subscribing succeeds
*/
public boolean subInReg(
String topic,
IEventConsumer consumer,
Map<String, Set<String>> topic2cIds,
Map<String, IEventConsumer> id2Consumers) {
if (null == consumer) {
return false;
}
String cId = consumer.getId();
if (!id2Consumers.containsKey(cId)) {
id2Consumers.put(cId, consumer);
}
if (!topic2cIds.containsKey(topic)) {
topic2cIds.put(topic, new HashSet<>());
}
topic2cIds.get(topic).add(cId);
if (consumer instanceof ContractConsumer) {
LOGGER.info("contract " + ((ContractConsumer) consumer).getContract() + " subscribes topic " + topic);
} else {
LOGGER.info("node " + consumer.getId() + " subscribes topic " + topic);
}
return true;
}
private void doUnsubscribe(REvent e) {
unsubInReg(e.getTopic(), parseConsumer(e.getContent()), this.topic2cIds, this.id2Consumers);
}
/**
* do subscribing in registry<br/>
* topic and consumer must not be null at the same time
* <ul>
* <li>if consumer is null and topic is not, it means the topic is a temporary topic, remove it</li>
* <li>if topic is null and consumer is not,
* it means a consumer or a contract wants to unsubscribe all topics,
* remove all related consumers in topic registry and consumer registry</li>
* <li>if two of them is not null, do unsubscribing in two registries</li>
* </ul>
*
* @param topic event topic
* @param consumer the consumer, just id is required
* @param topic2cIds topic registry of broker or a check point in event recorder
* @param id2Consumers consumer registry of broker or a check point in event recorder
* @return if the subscribing succeeds
*/
public boolean unsubInReg(
String topic,
IEventConsumer consumer,
Map<String, Set<String>> topic2cIds,
Map<String, IEventConsumer> id2Consumers) {
if (null == topic && null == consumer) {
return false;
}
if (null == consumer) {
topic2cIds.remove(topic);
LOGGER.info("clean temporary topic " + topic);
return true;
}
String cId = consumer.getId();
List<String> toRmIds;
String contract; // just used for log
if (id2Consumers.containsKey(cId)) {
// if cId belongs to a contract consumer, use the cId as a singleton list
toRmIds = Collections.singletonList(cId);
contract = ((ContractConsumer) id2Consumers.get(cId)).getContract();
} else {
// if cId belongs to a contract, find all related consumers
toRmIds = new ArrayList<>();
id2Consumers.forEach((k, c) -> {
if (c instanceof ContractConsumer && ((ContractConsumer) c).getContract().equals(cId)) {
toRmIds.add(k);
}
});
contract = cId;
}
if (toRmIds.isEmpty()) {
return true;
}
if (null == topic || topic.isEmpty()) {
topic2cIds.values().forEach(cIds -> toRmIds.forEach(cIds::remove));
toRmIds.forEach(id2Consumers::remove);
LOGGER.info("contract " + contract + " unsubscribes all topics");
} else if (topic2cIds.containsKey(topic)) {
Set<String> topic2Id = topic2cIds.get(topic);
toRmIds.forEach(topic2Id::remove);
LOGGER.info("contract " + contract + " unsubscribes topic " + topic);
}
return true;
}
/**
* parse consumer information from content str<br/>
* if caller wants to select all consumers of a contract, the content str is also parsed into a node consumer
*
* @param content json string, {"subscriber": "[subscriber]", "handler?": "[handler]"}
* @return a node consumer or contract consumer, or null if exception is thrown
*/
public IEventConsumer parseConsumer(String content) {
if (null == content || content.isEmpty()) {
return null;
}
try {
JsonObject json = JsonUtil.parseString(content);
String subscriber = json.get("subscriber").getAsString();
String handler = json.has("handler") ? json.get("handler").getAsString() : null;
if (null == subscriber || subscriber.isEmpty()) {
return null;
}
IEventConsumer consumer;
if (null != handler && !handler.isEmpty()) {
consumer = new ContractConsumer(subscriber, handler);
} else {
consumer = new NodeConsumer(subscriber);
}
return consumer;
} catch (Exception ignored) {
return null;
}
}
// handle publishing; the process varies with the semantic of the event
private void doPublish(REvent event) {
String topic = event.getTopic();
// publish simple event message to contract consumer
String cEventStr = JsonUtil.toJson(new Event(topic, event.getContent(), event.getSemantics()));
// publish full event message to node consumer
String nEventStr = JsonUtil.toJson(event);
if (!topic2cIds.containsKey(topic)) {
return;
}
Set<String> topicConsumers = topic2cIds.get(topic);
switch (event.getSemantics()) {
case AT_LEAST_ONCE:
case NEED_RETRY:
// send events to all
topicConsumers.forEach(cId ->
deliverEvent(event, cEventStr, nEventStr, cId, topic, true));
break;
case AT_MOST_ONCE:
// send event to a random consumer
// AT_MOST_ONCE, so broker don't need to do anything when delivering fails
deliverEvent(
event,
cEventStr,
nEventStr,
topicConsumers.toArray()[(int) (Math.random() * topicConsumers.size())].toString(),
topic,
true);
break;
case ONLY_ONCE:
switch (event.getType()) {
case PRESUB:
// receive PRESUB events and deliver the first one to the thread
String[] contentArr = event.getContent().split("\\|");
if (contentArr.length == 3) {
ThreadFlag topicFlag;
synchronized (topicFlag = threadFlags.get(contentArr[1])) {
if (topicFlag.get().isEmpty()) {
topicFlag.set(event.getContent());
topicFlag.notify();
}
}
}
break;
case PUBLISH:
// the ONLY_ONCE event won't be delivered to the non-center
String contentHash = HashUtil.sha3(event.getContent());
if (!threadFlags.containsKey(contentHash)) {
final ThreadFlag flag = new ThreadFlag();
threadFlags.put(contentHash, flag);
// send PREPUB events to all consumers
// TODO if there are no consumers to receive the ONLY_ONCE events?
ContractManager.threadPool.execute(() -> {
REvent prePubMsg = new REvent(event.getTopic(),
PREPUB,
contentHash,
event.getRequestID());
prePubMsg.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair());
topicConsumers.forEach(cId ->
deliverEvent(prePubMsg,
JsonUtil.toJson(
new Event(event.getTopic(),
contentHash,
prePubMsg.getSemantics())),
JsonUtil.toJson(prePubMsg),
cId,
topic,
false));
// wait for responses from contracts (PRESUB events)
while (true) {
try {
synchronized (flag) {
flag.wait(30 * 1000L);
}
if (!flag.get().isEmpty()) {
REvent finalMsg = new REvent(flag.get(),
PUBLISH,
event.getContent(),
HashUtil.sha3(
contentHash + System.currentTimeMillis()));
// if the delivering fails, retry publishing
finalMsg.setSemantics(NEED_RETRY);
finalMsg.doSignature(
ContractManager.instance.nodeCenterConn.getNodeKeyPair());
handle(finalMsg);
break;
}
} catch (InterruptedException e) {
LOGGER.warn("ONLY_ONE event delivering is interrupted: " + e.getMessage());
// e.printStackTrace();
}
}
});
}
default:
break;
}
default:
break;
}
}
/**
* publish the event to the consumer
*
* @param event event message
* @param cEventStr simple event message to the contract consumer, only the topic and the content
* @param nEventStr event message to the node
* @param cId consumer id
* @param topic topic of the event
* @param isPub if the event is published or pre-published
*/
private void deliverEvent(
REvent event,
String cEventStr,
String nEventStr,
String cId,
String topic,
boolean isPub) {
if (id2Consumers.containsKey(cId)) {
IEventConsumer consumer = id2Consumers.get(cId);
if (consumer instanceof ContractConsumer) {
// contract consumer
ContractManager.threadPool.execute(() -> {
ResultCallback cb = new ResultCallback() {
@Override
public void onResult(String str) {
// if the delivering fails, unsubscribe the consumer
if (null != str) {
ContractConsumer c = (ContractConsumer) consumer;
ContractClient client = ContractManager.instance.getClient(c.getContract());
String reqID =
ContractManager.instance.nodeCenterConn.getNodeKeyPair().getPublicKeyStr() +
"_" + System.currentTimeMillis();
REvent unsubEvent =
new REvent(
topic,
UNSUBSCRIBE,
"{\"subscriber\":\"" + cId + "\"}",
reqID);
unsubEvent.doSignature(client.getPubkey(), client.getContractKey());
handle(unsubEvent);
// if the event is an ONLY_ONCE event, retry publishing
if (NEED_RETRY.equals(event.getSemantics())) {
REvent newMsg =
new REvent(
topic.split("\\|")[0],
PUBLISH,
event.getContent(),
event.getRequestID());
newMsg.setSemantics(ONLY_ONCE);
newMsg.setHash(event.getHash());
newMsg.setTxHash(event.getTxHash());
newMsg.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair());
handle(newMsg);
}
}
}
};
if (isPub) {
consumer.publishEvent(cEventStr, cb, event.getRequestID(), event.getHash());
} else {
consumer.competeSub(cEventStr, cb, event.getRequestID(), event.getHash());
}
});
} else {
// node consumer
ContractManager.threadPool.execute(() -> {
if (isPub) {
consumer.publishEvent(nEventStr, null);
} else {
consumer.competeSub(nEventStr, null);
}
});
}
} else {
topic2cIds.get(topic).remove(cId);
}
}
/**
* return the number of topics, for node center statistics
*
* @return the number of topics
*/
public int countEvents() {
return topic2cIds.size() - tempTopics.size();
}
/**
* thread flag, used in ONLY_ONCE event publishing
*/
static class ThreadFlag {
private String flag = "";
public String get() {
return flag;
}
public void set(String flag) {
this.flag = flag;
}
}
}