feat(cm): update event mechanism

update resultCallback used in contract consumer part of EventBroker.deliverEvent; add exception handling for WSClientConsumer
This commit is contained in:
Frank.R.Wu 2021-11-01 22:24:27 +08:00
parent 6a35a67631
commit f0ca9ab85d
5 changed files with 69 additions and 59 deletions

View File

@ -217,7 +217,6 @@ public class ContractClient {
public String startProcess(PrintStream ps) {
isRunning = false;
port = -1;
LOGGER.info("port=" + port);
String darg = "-Djava.library.path=";
String classpath;
@ -309,6 +308,7 @@ public class ContractClient {
}
assert status != null;
port = Integer.parseInt(status.split(" ")[0]);
LOGGER.debug("port=" + port);
ContractManager.cPort.updateDb(port, true);
get = new SocketGet("127.0.0.1", port);
get.syncGet("", "setDBInfo", ContractManager.dbPath);

View File

@ -38,12 +38,17 @@ public class ContractMeta implements IDSerializable {
*/
// Map<Object,Object>MaskInfo;
public ContractMeta() {}
public ContractMeta() {
}
public ContractStatusEnum getStatus() {
return status;
}
public void setStatus(ContractStatusEnum status) {
this.status = status;
}
public String getID() {
return id;
}
@ -56,7 +61,9 @@ public class ContractMeta implements IDSerializable {
return exportedFunctions;
}
public Set<String> getDependentContracts() { return dependentContracts; }
public Set<String> getDependentContracts() {
return dependentContracts;
}
public Map<String, REvent.REventSemantics> getEvents() {
return declaredEvents;
@ -78,6 +85,12 @@ public class ContractMeta implements IDSerializable {
return name;
}
public void setName(String name) {
this.name = name;
}
// public setMask(){}
public boolean getIsDebug() {
return isDebug;
}
@ -90,10 +103,6 @@ public class ContractMeta implements IDSerializable {
if (desp != null) funCache.put(action, desp);
return desp;
}
// public setMask(){
;
// }
private FunctionDesp seekFunction(String action) {
for (FunctionDesp desp : exportedFunctions) {
@ -108,12 +117,4 @@ public class ContractMeta implements IDSerializable {
status = ContractStatusEnum.HANGED;
isDebug = false;
}
public void setName(String name) {
this.name = name;
}
public void setStatus(ContractStatusEnum status) {
this.status = status;
}
}

View File

@ -409,37 +409,35 @@ public class EventBroker {
ContractManager.threadPool.execute(() -> {
ResultCallback cb = new ResultCallback() {
@Override
public void onResult(String str) {
public void onResult(String unused) {
// if the delivering fails, unsubscribe the consumer
if (null != str) {
ContractConsumer c = (ContractConsumer) consumer;
ContractClient client = ContractManager.instance.getClient(c.getContract());
String reqID =
ContractManager.instance.nodeCenterConn.getNodeKeyPair().getPublicKeyStr() +
"_" + System.currentTimeMillis();
REvent unsubEvent =
new REvent(
topic,
UNSUBSCRIBE,
"{\"subscriber\":\"" + cId + "\"}",
reqID);
unsubEvent.doSignature(client.getPubkey(), client.getContractKey());
handle(unsubEvent);
ContractConsumer c = (ContractConsumer) consumer;
ContractClient client = ContractManager.instance.getClient(c.getContract());
String reqID =
ContractManager.instance.nodeCenterConn.getNodeKeyPair().getPublicKeyStr() +
"_" + System.currentTimeMillis();
REvent unsubEvent =
new REvent(
topic,
UNSUBSCRIBE,
"{\"subscriber\":\"" + cId + "\"}",
reqID);
unsubEvent.doSignature(client.getPubkey(), client.getContractKey());
handle(unsubEvent);
// if the event is an ONLY_ONCE event, retry publishing
if (NEED_RETRY.equals(event.getSemantics())) {
REvent newMsg =
new REvent(
topic.split("\\|")[0],
PUBLISH,
event.getContent(),
event.getRequestID());
newMsg.setSemantics(ONLY_ONCE);
newMsg.setHash(event.getHash());
newMsg.setTxHash(event.getTxHash());
newMsg.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair());
handle(newMsg);
}
// if the event is an ONLY_ONCE event, retry publishing
if (NEED_RETRY.equals(event.getSemantics())) {
REvent newMsg =
new REvent(
topic.split("\\|")[0],
PUBLISH,
event.getContent(),
event.getRequestID());
newMsg.setSemantics(ONLY_ONCE);
newMsg.setHash(event.getHash());
newMsg.setTxHash(event.getTxHash());
newMsg.doSignature(ContractManager.instance.nodeCenterConn.getNodeKeyPair());
handle(newMsg);
}
}
};
@ -449,7 +447,7 @@ public class EventBroker {
consumer.competeSub(cEventStr, cb, event.getRequestID(), event.getHash());
}
});
} else {
} else if (consumer instanceof NodeConsumer) {
// node consumer
ContractManager.threadPool.execute(() -> {
if (isPub) {
@ -458,6 +456,15 @@ public class EventBroker {
consumer.competeSub(nEventStr, null);
}
});
} else if (isPub) {
// client consumer
ContractManager.threadPool.execute(() ->
consumer.publishEvent(nEventStr, new ResultCallback() {
@Override
public void onResult(String unused) {
unsubInReg(null, consumer, topic2cIds, id2Consumers);
}
}));
}
} else {
topic2cIds.get(topic).remove(cId);

View File

@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class ContractConsumer implements IEventConsumer {
private static final Logger LOGGER = LogManager.getLogger(ContractConsumer.class);
private static final long PERIOD = 2500L;
private static final int TIMEOUT_COUNT = 5;
private static final Map<String, ScheduledFuture<?>> scheduledFutures =
new ConcurrentHashMap<>();
@ -76,7 +75,7 @@ public class ContractConsumer implements IEventConsumer {
cr.setRequestID(options[0]);
ContractClient cc = ContractManager.instance.getClient(contract);
if (null == cc) {
rc.onResult("");
rc.onResult((String) null);
return;
}
AtomicInteger callCount = new AtomicInteger(0);
@ -87,19 +86,18 @@ public class ContractConsumer implements IEventConsumer {
boolean ret = true;
try {
ContractResult result = JsonUtil.fromJson(str, ContractResult.class);
if (result.status == ContractResult.Status.Success) {
String retStr = null;
rc.onResult(retStr);
} else if (callCount.get() == TIMEOUT_COUNT ||
(result.status == ContractResult.Status.Exception &&
result.result.toString().contains("not exported"))) {
rc.onResult("");
} else {
ret = false;
if (!result.status.equals(ContractResult.Status.Success)) {
if (callCount.get() == TIMEOUT_COUNT ||
(result.status == ContractResult.Status.Exception &&
result.result.toString().contains("not exported"))) {
rc.onResult((String) null);
} else {
ret = false;
}
}
} catch (Exception e) {
LOGGER.warn("receiving event error! " + contract + "." + handler + ": " + e.getMessage());
rc.onResult("");
rc.onResult((String) null);
}
callCount.incrementAndGet();
if (ret) {
@ -108,16 +106,17 @@ public class ContractConsumer implements IEventConsumer {
// wait for setting scheduledFutures
flag.wait(500L);
} catch (InterruptedException e) {
e.printStackTrace();
LOGGER.error(e.getMessage());
}
}
scheduledFutures.get(getId()).cancel(true);
scheduledFutures.remove(getId());
}
}
}, (reqID, hashStr) -> {
}),
500L,
PERIOD,
2500L,
TimeUnit.MILLISECONDS);
scheduledFutures.put(getId(), future);
synchronized (flag) {

View File

@ -36,6 +36,9 @@ public class WSClientConsumer implements IEventConsumer {
@Override
public void publishEvent(String msg, ResultCallback rc, String... options) {
if (!channel.isActive()) {
rc.onResult((String) null);
}
JsonObject ret = new JsonObject();
ret.addProperty("action", "onEvent");
ret.add("data", JsonUtil.parseString(msg));