fix: fix bugs in event mechanism

This commit is contained in:
Frank.R.Wu 2021-12-03 15:28:33 +08:00
parent ecc51330db
commit 9657f1930f
6 changed files with 39 additions and 44 deletions

View File

@ -1295,20 +1295,18 @@ public class ContractManager {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
// 9000 // 9000
if (request.getRequester() != null && !request.getRequester().startsWith("event")) {
if (client.contractMeta.sigRequired) { if (client.contractMeta.sigRequired) {
if (!request.verifySignature()) { if (!request.verifySignature()) {
cr = new ContractResult(Status.Error, new JsonPrimitive("sign verified failed")); cr = new ContractResult(Status.Error, new JsonPrimitive("sign verified failed"));
rcb.onResult(JsonUtil.parseObjectAsJsonObject(cr)); rcb.onResult(JsonUtil.parseObjectAsJsonObject(cr));
return; return;
} }
} else { } else if (null != request.getPublicKey() && !request.verifySignature()) {
if (request.getPublicKey() != null) {
if (!request.verifySignature()) {
request.setPublicKey(null); request.setPublicKey(null);
request.setRequester(null);
}
} }
} }
client.times++; client.times++;
client.contractStatus = ContractStatus.Executing; client.contractStatus = ContractStatus.Executing;
ResultCallback acb; ResultCallback acb;
@ -1323,7 +1321,6 @@ public class ContractManager {
JsonObject finalRet = JsonUtil.parseStringAsJsonObject(result); JsonObject finalRet = JsonUtil.parseStringAsJsonObject(result);
rcb.onResult(finalRet); rcb.onResult(finalRet);
if (finalRet != null) {
chainOpener.writeContractResultToLocalAndLedger( chainOpener.writeContractResultToLocalAndLedger(
finalRet.toString(), finalRet.toString(),
client, client,
@ -1331,7 +1328,6 @@ public class ContractManager {
cb, cb,
start, start,
System.currentTimeMillis() - start); System.currentTimeMillis() - start);
}
if (client.contractMeta.getYjsType() == YjsType.Oracle) { if (client.contractMeta.getYjsType() == YjsType.Oracle) {
oracleCounter.inc(); oracleCounter.inc();
} else { } else {
@ -1525,7 +1521,6 @@ public class ContractManager {
// LOGGER.info("查看合约 " + cr.getContractID() + " 的master为 " + pubKey); // LOGGER.info("查看合约 " + cr.getContractID() + " 的master为 " + pubKey);
if (null != pubKey) { if (null != pubKey) {
if (!masterStub.hasAgentConnection(pubKey)) { if (!masterStub.hasAgentConnection(pubKey)) {
pubKey = nodeCenterConn.reRouteContract(cr.getContractID()); pubKey = nodeCenterConn.reRouteContract(cr.getContractID());

View File

@ -287,7 +287,7 @@ public class EventBroker {
private void doPublish(REvent event) { private void doPublish(REvent event) {
String topic = event.getTopic(); String topic = event.getTopic();
// publish simple event message to contract consumer // publish simple event message to contract consumer
String cEventStr = JsonUtil.toJson(new Event(topic, event.getContent(), event.getSemantics())); Event cEvent = new Event(topic, event.getContent(), event.getSemantics());
// publish full event message to node consumer // publish full event message to node consumer
String nEventStr = JsonUtil.toJson(event); String nEventStr = JsonUtil.toJson(event);
if (!topic2cIds.containsKey(topic)) { if (!topic2cIds.containsKey(topic)) {
@ -299,14 +299,14 @@ public class EventBroker {
case NEED_RETRY: case NEED_RETRY:
// send events to all // send events to all
topicConsumers.forEach(cId -> topicConsumers.forEach(cId ->
deliverEvent(event, cEventStr, nEventStr, cId, topic, true)); deliverEvent(event, cEvent, nEventStr, cId, topic, true));
break; break;
case AT_MOST_ONCE: case AT_MOST_ONCE:
// send event to a random consumer // send event to a random consumer
// AT_MOST_ONCE, so broker don't need to do anything when delivering fails // AT_MOST_ONCE, so broker don't need to do anything when delivering fails
deliverEvent( deliverEvent(
event, event,
cEventStr, cEvent,
nEventStr, nEventStr,
topicConsumers.toArray()[(int) (Math.random() * topicConsumers.size())].toString(), topicConsumers.toArray()[(int) (Math.random() * topicConsumers.size())].toString(),
topic, topic,
@ -343,10 +343,9 @@ public class EventBroker {
prePubMsg.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair()); prePubMsg.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair());
topicConsumers.forEach(cId -> topicConsumers.forEach(cId ->
deliverEvent(prePubMsg, deliverEvent(prePubMsg,
JsonUtil.toJson(
new Event(event.getTopic(), new Event(event.getTopic(),
contentHash, contentHash,
prePubMsg.getSemantics())), prePubMsg.getSemantics()),
JsonUtil.toJson(prePubMsg), JsonUtil.toJson(prePubMsg),
cId, cId,
topic, topic,
@ -390,7 +389,7 @@ public class EventBroker {
* publish the event to the consumer * publish the event to the consumer
* *
* @param event event message * @param event event message
* @param cEventStr simple event message to the contract consumer, only the topic and the content * @param cEvent simple event message to the contract consumer, only topic, content and semantics
* @param nEventStr event message to the node * @param nEventStr event message to the node
* @param cId consumer id * @param cId consumer id
* @param topic topic of the event * @param topic topic of the event
@ -398,7 +397,7 @@ public class EventBroker {
*/ */
private void deliverEvent( private void deliverEvent(
REvent event, REvent event,
String cEventStr, Event cEvent,
String nEventStr, String nEventStr,
String cId, String cId,
String topic, String topic,
@ -443,9 +442,9 @@ public class EventBroker {
} }
}; };
if (isPub) { if (isPub) {
consumer.publishEvent(cEventStr, cb, event.getRequestID(), event.getHash()); consumer.publishEvent(cEvent, cb, event.getRequestID(), event.getHash());
} else { } else {
consumer.competeSub(cEventStr, cb, event.getRequestID(), event.getHash()); consumer.competeSub(cEvent, cb, event.getRequestID(), event.getHash());
} }
}); });
} else if (consumer instanceof NodeConsumer) { } else if (consumer instanceof NodeConsumer) {

View File

@ -53,25 +53,25 @@ public class ContractConsumer implements IEventConsumer {
} }
@Override @Override
public void publishEvent(String msg, ResultCallback rc, String... options) { public void publishEvent(Object msg, ResultCallback rc, String... options) {
executeContract(msg, this.handler, rc, options); executeContract(msg, this.handler, rc, options);
} }
@Override @Override
public void competeSub(String msg, ResultCallback rc, String... options) { public void competeSub(Object msg, ResultCallback rc, String... options) {
executeContract(msg, "_preSub", rc, options); executeContract(msg, "_preSub", rc, options);
} }
private void executeContract(String msg, String handler, ResultCallback rc, String... options) { private void executeContract(Object msg, String handler, ResultCallback rc, String... options) {
ContractRequest cr = new ContractRequest(); ContractRequest cr = new ContractRequest();
if (options.length > 1 && null != options[1]) { if (options.length > 1 && null != options[1]) {
cr.setRequester(options[1]); cr.setRequester("event_" + options[1]);
} else { } else {
cr.setRequester("event"); cr.setRequester("event");
} }
cr.setContractID(contract); cr.setContractID(contract);
cr.setAction(handler); cr.setAction(handler);
cr.setArg(msg); cr.setArg(JsonUtil.parseObject(msg));
cr.setRequestID(options[0]); cr.setRequestID(options[0]);
ContractClient cc = ContractManager.instance.getClient(contract); ContractClient cc = ContractManager.instance.getClient(contract);
if (null == cc) { if (null == cc) {

View File

@ -1,15 +1,16 @@
package org.bdware.sc.event.clients; package org.bdware.sc.event.clients;
import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.event.Event;
public interface IEventConsumer { public interface IEventConsumer {
String getId(); String getId();
ConsumerType getType(); ConsumerType getType();
void publishEvent(String msg, ResultCallback rc, String... options); void publishEvent(Object msg, ResultCallback rc, String... options);
void competeSub(String msg, ResultCallback rc, String... options); void competeSub(Object msg, ResultCallback rc, String... options);
enum ConsumerType { enum ConsumerType {
Contract, Contract,

View File

@ -32,15 +32,15 @@ public class NodeConsumer implements IEventConsumer {
} }
@Override @Override
public void publishEvent(String msg, ResultCallback rc, String... options) { public void publishEvent(Object msg, ResultCallback rc, String... options) {
center.publishEvent(msg, nodeId); center.publishEvent(msg.toString(), nodeId);
if (null != rc) { if (null != rc) {
rc.onResult(""); rc.onResult("");
} }
} }
@Override @Override
public void competeSub(String msg, ResultCallback rc, String... options) { public void competeSub(Object msg, ResultCallback rc, String... options) {
publishEvent(msg, rc, options); publishEvent(msg, rc, options);
} }
} }

View File

@ -35,18 +35,18 @@ public class WSClientConsumer implements IEventConsumer {
} }
@Override @Override
public void publishEvent(String msg, ResultCallback rc, String... options) { public void publishEvent(Object msg, ResultCallback rc, String... options) {
if (!channel.isActive()) { if (!channel.isActive()) {
rc.onResult((String) null); rc.onResult((String) null);
} }
JsonObject ret = new JsonObject(); JsonObject ret = new JsonObject();
ret.addProperty("action", "onEvent"); ret.addProperty("action", "onEvent");
ret.addProperty("status", "Success"); ret.addProperty("status", "Success");
ret.add("data", JsonUtil.parseStringAsJsonObject(msg)); ret.add("data", JsonUtil.parseString(msg.toString()));
channel.writeAndFlush(new TextWebSocketFrame(ret.toString())); channel.writeAndFlush(new TextWebSocketFrame(ret.toString()));
} }
@Override @Override
public void competeSub(String msg, ResultCallback rc, String... options) { public void competeSub(Object msg, ResultCallback rc, String... options) {
} }
} }