diff --git a/src/main/java/org/bdware/sc/ContractManager.java b/src/main/java/org/bdware/sc/ContractManager.java index 532b345..6ef7a1b 100644 --- a/src/main/java/org/bdware/sc/ContractManager.java +++ b/src/main/java/org/bdware/sc/ContractManager.java @@ -1295,20 +1295,18 @@ public class ContractManager { long start = System.currentTimeMillis(); // 9000 - if (client.contractMeta.sigRequired) { - if (!request.verifySignature()) { - cr = new ContractResult(Status.Error, new JsonPrimitive("sign verified failed")); - rcb.onResult(JsonUtil.parseObjectAsJsonObject(cr)); - return; - } - } else { - if (request.getPublicKey() != null) { + if (request.getRequester() != null && !request.getRequester().startsWith("event")) { + if (client.contractMeta.sigRequired) { if (!request.verifySignature()) { - request.setPublicKey(null); - request.setRequester(null); + cr = new ContractResult(Status.Error, new JsonPrimitive("sign verified failed")); + rcb.onResult(JsonUtil.parseObjectAsJsonObject(cr)); + return; } + } else if (null != request.getPublicKey() && !request.verifySignature()) { + request.setPublicKey(null); } } + client.times++; client.contractStatus = ContractStatus.Executing; ResultCallback acb; @@ -1323,15 +1321,13 @@ public class ContractManager { JsonObject finalRet = JsonUtil.parseStringAsJsonObject(result); rcb.onResult(finalRet); - if (finalRet != null) { - chainOpener.writeContractResultToLocalAndLedger( - finalRet.toString(), - client, - request, - cb, - start, - System.currentTimeMillis() - start); - } + chainOpener.writeContractResultToLocalAndLedger( + finalRet.toString(), + client, + request, + cb, + start, + System.currentTimeMillis() - start); if (client.contractMeta.getYjsType() == YjsType.Oracle) { oracleCounter.inc(); } else { @@ -1525,7 +1521,6 @@ public class ContractManager { // LOGGER.info("查看合约 " + cr.getContractID() + " 的master为 " + pubKey); - if (null != pubKey) { if (!masterStub.hasAgentConnection(pubKey)) { pubKey = nodeCenterConn.reRouteContract(cr.getContractID()); diff --git a/src/main/java/org/bdware/sc/event/EventBroker.java b/src/main/java/org/bdware/sc/event/EventBroker.java index 1c418d6..4b203e6 100644 --- a/src/main/java/org/bdware/sc/event/EventBroker.java +++ b/src/main/java/org/bdware/sc/event/EventBroker.java @@ -287,7 +287,7 @@ public class EventBroker { private void doPublish(REvent event) { String topic = event.getTopic(); // publish simple event message to contract consumer - String cEventStr = JsonUtil.toJson(new Event(topic, event.getContent(), event.getSemantics())); + Event cEvent = new Event(topic, event.getContent(), event.getSemantics()); // publish full event message to node consumer String nEventStr = JsonUtil.toJson(event); if (!topic2cIds.containsKey(topic)) { @@ -299,14 +299,14 @@ public class EventBroker { case NEED_RETRY: // send events to all topicConsumers.forEach(cId -> - deliverEvent(event, cEventStr, nEventStr, cId, topic, true)); + deliverEvent(event, cEvent, nEventStr, cId, topic, true)); break; case AT_MOST_ONCE: // send event to a random consumer // AT_MOST_ONCE, so broker don't need to do anything when delivering fails deliverEvent( event, - cEventStr, + cEvent, nEventStr, topicConsumers.toArray()[(int) (Math.random() * topicConsumers.size())].toString(), topic, @@ -343,10 +343,9 @@ public class EventBroker { prePubMsg.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair()); topicConsumers.forEach(cId -> deliverEvent(prePubMsg, - JsonUtil.toJson( - new Event(event.getTopic(), - contentHash, - prePubMsg.getSemantics())), + new Event(event.getTopic(), + contentHash, + prePubMsg.getSemantics()), JsonUtil.toJson(prePubMsg), cId, topic, @@ -390,7 +389,7 @@ public class EventBroker { * publish the event to the consumer * * @param event event message - * @param cEventStr simple event message to the contract consumer, only the topic and the content + * @param cEvent simple event message to the contract consumer, only topic, content and semantics * @param nEventStr event message to the node * @param cId consumer id * @param topic topic of the event @@ -398,7 +397,7 @@ public class EventBroker { */ private void deliverEvent( REvent event, - String cEventStr, + Event cEvent, String nEventStr, String cId, String topic, @@ -443,9 +442,9 @@ public class EventBroker { } }; if (isPub) { - consumer.publishEvent(cEventStr, cb, event.getRequestID(), event.getHash()); + consumer.publishEvent(cEvent, cb, event.getRequestID(), event.getHash()); } else { - consumer.competeSub(cEventStr, cb, event.getRequestID(), event.getHash()); + consumer.competeSub(cEvent, cb, event.getRequestID(), event.getHash()); } }); } else if (consumer instanceof NodeConsumer) { diff --git a/src/main/java/org/bdware/sc/event/clients/ContractConsumer.java b/src/main/java/org/bdware/sc/event/clients/ContractConsumer.java index 7b2212b..782d8e8 100644 --- a/src/main/java/org/bdware/sc/event/clients/ContractConsumer.java +++ b/src/main/java/org/bdware/sc/event/clients/ContractConsumer.java @@ -53,25 +53,25 @@ public class ContractConsumer implements IEventConsumer { } @Override - public void publishEvent(String msg, ResultCallback rc, String... options) { + public void publishEvent(Object msg, ResultCallback rc, String... options) { executeContract(msg, this.handler, rc, options); } @Override - public void competeSub(String msg, ResultCallback rc, String... options) { + public void competeSub(Object msg, ResultCallback rc, String... options) { executeContract(msg, "_preSub", rc, options); } - private void executeContract(String msg, String handler, ResultCallback rc, String... options) { + private void executeContract(Object msg, String handler, ResultCallback rc, String... options) { ContractRequest cr = new ContractRequest(); if (options.length > 1 && null != options[1]) { - cr.setRequester(options[1]); + cr.setRequester("event_" + options[1]); } else { cr.setRequester("event"); } cr.setContractID(contract); cr.setAction(handler); - cr.setArg(msg); + cr.setArg(JsonUtil.parseObject(msg)); cr.setRequestID(options[0]); ContractClient cc = ContractManager.instance.getClient(contract); if (null == cc) { diff --git a/src/main/java/org/bdware/sc/event/clients/IEventConsumer.java b/src/main/java/org/bdware/sc/event/clients/IEventConsumer.java index 48e1985..ac48fdf 100644 --- a/src/main/java/org/bdware/sc/event/clients/IEventConsumer.java +++ b/src/main/java/org/bdware/sc/event/clients/IEventConsumer.java @@ -1,15 +1,16 @@ package org.bdware.sc.event.clients; import org.bdware.sc.conn.ResultCallback; +import org.bdware.sc.event.Event; public interface IEventConsumer { String getId(); ConsumerType getType(); - void publishEvent(String msg, ResultCallback rc, String... options); + void publishEvent(Object msg, ResultCallback rc, String... options); - void competeSub(String msg, ResultCallback rc, String... options); + void competeSub(Object msg, ResultCallback rc, String... options); enum ConsumerType { Contract, diff --git a/src/main/java/org/bdware/sc/event/clients/NodeConsumer.java b/src/main/java/org/bdware/sc/event/clients/NodeConsumer.java index 77d5f72..e9378e2 100644 --- a/src/main/java/org/bdware/sc/event/clients/NodeConsumer.java +++ b/src/main/java/org/bdware/sc/event/clients/NodeConsumer.java @@ -32,15 +32,15 @@ public class NodeConsumer implements IEventConsumer { } @Override - public void publishEvent(String msg, ResultCallback rc, String... options) { - center.publishEvent(msg, nodeId); + public void publishEvent(Object msg, ResultCallback rc, String... options) { + center.publishEvent(msg.toString(), nodeId); if (null != rc) { rc.onResult(""); } } @Override - public void competeSub(String msg, ResultCallback rc, String... options) { + public void competeSub(Object msg, ResultCallback rc, String... options) { publishEvent(msg, rc, options); } } \ No newline at end of file diff --git a/src/main/java/org/bdware/sc/event/clients/WSClientConsumer.java b/src/main/java/org/bdware/sc/event/clients/WSClientConsumer.java index cbca314..ed7cede 100644 --- a/src/main/java/org/bdware/sc/event/clients/WSClientConsumer.java +++ b/src/main/java/org/bdware/sc/event/clients/WSClientConsumer.java @@ -35,18 +35,18 @@ public class WSClientConsumer implements IEventConsumer { } @Override - public void publishEvent(String msg, ResultCallback rc, String... options) { + public void publishEvent(Object msg, ResultCallback rc, String... options) { if (!channel.isActive()) { rc.onResult((String) null); } JsonObject ret = new JsonObject(); ret.addProperty("action", "onEvent"); ret.addProperty("status", "Success"); - ret.add("data", JsonUtil.parseStringAsJsonObject(msg)); + ret.add("data", JsonUtil.parseString(msg.toString())); channel.writeAndFlush(new TextWebSocketFrame(ret.toString())); } @Override - public void competeSub(String msg, ResultCallback rc, String... options) { + public void competeSub(Object msg, ResultCallback rc, String... options) { } }