feat: update event mechanism

add event type local and global, clients have to use contractID and topic to subscribe local event; allow clients to subscribe topics (will not be recorded)
This commit is contained in:
Frank.R.Wu 2021-10-31 23:07:15 +08:00
parent 08337e9a10
commit 5a2eb0c45b
8 changed files with 120 additions and 35 deletions

View File

@ -2,6 +2,7 @@ package org.bdware.sc;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive; import com.google.gson.JsonPrimitive;
import io.netty.channel.Channel;
import io.prometheus.client.Counter; import io.prometheus.client.Counter;
import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
@ -18,6 +19,7 @@ import org.bdware.sc.db.*;
import org.bdware.sc.event.EventBroker; import org.bdware.sc.event.EventBroker;
import org.bdware.sc.event.REvent; import org.bdware.sc.event.REvent;
import org.bdware.sc.event.REvent.REventSemantics; import org.bdware.sc.event.REvent.REventSemantics;
import org.bdware.sc.event.clients.WSClientConsumer;
import org.bdware.sc.handler.ManagerHandler; import org.bdware.sc.handler.ManagerHandler;
import org.bdware.sc.node.*; import org.bdware.sc.node.*;
import org.bdware.sc.units.MultiContractMeta; import org.bdware.sc.units.MultiContractMeta;
@ -2199,6 +2201,10 @@ public class ContractManager {
return analysisClient.get.syncGet("", "staticVerify", JsonUtil.toJson(c)); return analysisClient.get.syncGet("", "staticVerify", JsonUtil.toJson(c));
} }
public void subEventByClient(String topic, Channel channel) {
eventBroker.doSubscribe(topic, new WSClientConsumer(channel));
}
// 合约状态 // 合约状态
static class StrCollector extends ResultCallback { static class StrCollector extends ResultCallback {

View File

@ -142,6 +142,10 @@ public class EventBroker {
} }
} }
public void doSubscribe(String topic, IEventConsumer consumer) {
subInReg(topic, consumer, this.topic2cIds, this.id2Consumers);
}
/** /**
* do subscribing in registry * do subscribing in registry
* *
@ -167,10 +171,17 @@ public class EventBroker {
topic2cIds.put(topic, new HashSet<>()); topic2cIds.put(topic, new HashSet<>());
} }
topic2cIds.get(topic).add(cId); topic2cIds.get(topic).add(cId);
if (consumer instanceof ContractConsumer) { switch (consumer.getType()) {
case Contract:
LOGGER.info("contract " + ((ContractConsumer) consumer).getContract() + " subscribes topic " + topic); LOGGER.info("contract " + ((ContractConsumer) consumer).getContract() + " subscribes topic " + topic);
} else { break;
case Node:
LOGGER.info("node " + consumer.getId() + " subscribes topic " + topic); LOGGER.info("node " + consumer.getId() + " subscribes topic " + topic);
break;
case WSClient:
LOGGER.info("client " + consumer.getId() + " subscribes topic " + topic);
default:
break;
} }
return true; return true;
} }

View File

@ -9,6 +9,7 @@ import org.bdware.sc.event.REvent.REventSemantics;
import org.bdware.sc.event.REvent.REventType; import org.bdware.sc.event.REvent.REventType;
import org.bdware.sc.event.clients.ContractConsumer; import org.bdware.sc.event.clients.ContractConsumer;
import org.bdware.sc.event.clients.IEventConsumer; import org.bdware.sc.event.clients.IEventConsumer;
import org.bdware.sc.event.clients.IEventConsumer.ConsumerType;
import org.bdware.sc.event.clients.NodeConsumer; import org.bdware.sc.event.clients.NodeConsumer;
import org.bdware.sc.util.HashUtil; import org.bdware.sc.util.HashUtil;
import org.bdware.sc.util.JsonUtil; import org.bdware.sc.util.JsonUtil;
@ -96,12 +97,28 @@ public class EventRecorder {
JsonObject id2Consumers = data.getAsJsonObject("id2Consumers"); JsonObject id2Consumers = data.getAsJsonObject("id2Consumers");
for (String k : id2Consumers.keySet()) { for (String k : id2Consumers.keySet()) {
JsonObject consumer = id2Consumers.getAsJsonObject(k); JsonObject consumer = id2Consumers.getAsJsonObject(k);
if (consumer.has("handler")) { if (!consumer.has("type")) {
continue;
}
ConsumerType type = ConsumerType.valueOf(consumer.get("type").getAsString());
switch (type) {
case Contract:
cp.id2Consumers.put(k, cp.id2Consumers.put(k,
new ContractConsumer(consumer.get("contract").getAsString(), new ContractConsumer(consumer.get("contract").getAsString(),
consumer.get("handler").getAsString())); consumer.get("handler").getAsString()));
} else { break;
case Node:
cp.id2Consumers.put(k, new NodeConsumer(k)); cp.id2Consumers.put(k, new NodeConsumer(k));
default:
break;
}
}
for (String topic : cp.topic2cIds.keySet()) {
Set<String> idSet = cp.topic2cIds.get(topic);
for (String id : idSet) {
if (!cp.id2Consumers.containsKey(id)) {
cp.id2Consumers.remove(id);
}
} }
} }
cp.prev = data.get("prev").getAsString(); cp.prev = data.get("prev").getAsString();

View File

@ -1,23 +0,0 @@
package org.bdware.sc.event.clients;
import org.bdware.sc.conn.ResultCallback;
/**
* @author Kaidong Wu
*/
public class ClientConsumer implements IEventConsumer {
@Override
public String getId() {
return null;
}
@Override
public void publishEvent(String msg, ResultCallback rc, String... options) {
// TODO
}
@Override
public void competeSub(String msg, ResultCallback rc, String... options) {
// TODO
}
}

View File

@ -31,6 +31,8 @@ public class ContractConsumer implements IEventConsumer {
private final String handler; private final String handler;
@Expose(serialize = false, deserialize = false) @Expose(serialize = false, deserialize = false)
private final Object flag = new Object(); private final Object flag = new Object();
@SuppressWarnings("unused")
private final ConsumerType type = ConsumerType.Contract;
public ContractConsumer(String contract, String handler) { public ContractConsumer(String contract, String handler) {
this.contract = contract; this.contract = contract;
@ -46,6 +48,11 @@ public class ContractConsumer implements IEventConsumer {
return HashUtil.sha3(contract + handler); return HashUtil.sha3(contract + handler);
} }
@Override
public ConsumerType getType() {
return type;
}
@Override @Override
public void publishEvent(String msg, ResultCallback rc, String... options) { public void publishEvent(String msg, ResultCallback rc, String... options) {
executeContract(msg, this.handler, rc, options); executeContract(msg, this.handler, rc, options);

View File

@ -5,7 +5,15 @@ import org.bdware.sc.conn.ResultCallback;
public interface IEventConsumer { public interface IEventConsumer {
String getId(); String getId();
ConsumerType getType();
void publishEvent(String msg, ResultCallback rc, String... options); void publishEvent(String msg, ResultCallback rc, String... options);
void competeSub(String msg, ResultCallback rc, String... options); void competeSub(String msg, ResultCallback rc, String... options);
enum ConsumerType {
Contract,
Node,
WSClient
}
} }

View File

@ -10,6 +10,8 @@ public class NodeConsumer implements IEventConsumer {
private static EventCenter center; private static EventCenter center;
private final String nodeId; private final String nodeId;
@SuppressWarnings("unused")
private final ConsumerType type = ConsumerType.Node;
public NodeConsumer(String nodeId) { public NodeConsumer(String nodeId) {
this.nodeId = nodeId; this.nodeId = nodeId;
@ -24,6 +26,11 @@ public class NodeConsumer implements IEventConsumer {
return this.nodeId; return this.nodeId;
} }
@Override
public ConsumerType getType() {
return type;
}
@Override @Override
public void publishEvent(String msg, ResultCallback rc, String... options) { public void publishEvent(String msg, ResultCallback rc, String... options) {
center.publishEvent(msg, nodeId); center.publishEvent(msg, nodeId);

View File

@ -0,0 +1,52 @@
package org.bdware.sc.event.clients;
import com.google.gson.JsonObject;
import com.google.gson.annotations.Expose;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.util.JsonUtil;
/**
* @author Kaidong Wu
*/
public class WSClientConsumer implements IEventConsumer {
private static final Logger LOGGER = LogManager.getLogger(WSClientConsumer.class);
@Expose(serialize = false, deserialize = false)
private final Channel channel;
@SuppressWarnings("unused")
private final ConsumerType type = ConsumerType.WSClient;
public WSClientConsumer(Channel channel) {
this.channel = channel;
}
public Channel getChannel() {
return channel;
}
@Override
public String getId() {
return "WSC_" + channel.id().asLongText();
}
@Override
public ConsumerType getType() {
return type;
}
@Override
public void publishEvent(String msg, ResultCallback rc, String... options) {
JsonObject ret = new JsonObject();
ret.addProperty("action", "onEvent");
ret.add("data", JsonUtil.parseString(msg));
channel.writeAndFlush(new TextWebSocketFrame(ret.toString()));
}
@Override
public void competeSub(String msg, ResultCallback rc, String... options) {
}
}