update doip-sdk

This commit is contained in:
CaiHQ 2023-12-15 16:39:32 +08:00
parent fc453d56fc
commit f53c2374df
2 changed files with 11 additions and 21 deletions

View File

@ -445,7 +445,6 @@ public class ContractManager {
} }
public ContractClient getContractClientByDoi(String doi) { public ContractClient getContractClientByDoi(String doi) {
ContractMeta meta = statusRecorder.getContractMeta(doi); ContractMeta meta = statusRecorder.getContractMeta(doi);
return statusRecorder.getContractClient(meta.id); return statusRecorder.getContractClient(meta.id);
} }

View File

@ -53,7 +53,7 @@ public class EventRecorder {
} }
public void createCheckPoint(Map<String, Set<String>> topic2cIds, public void createCheckPoint(Map<String, Set<String>> topic2cIds,
Map<String, IEventConsumer> id2Consumers) { Map<String, IEventConsumer> id2Consumers) {
CheckPoint cp = new CheckPoint(topic2cIds, id2Consumers); CheckPoint cp = new CheckPoint(topic2cIds, id2Consumers);
synchronized (latestEvent) { synchronized (latestEvent) {
if (!latestEvent.isCp()) { if (!latestEvent.isCp()) {
@ -79,10 +79,8 @@ public class EventRecorder {
String key = KeyValueRocksDBUtil.instance.getValue(dbName, LATEST_EVENT_KEY); String key = KeyValueRocksDBUtil.instance.getValue(dbName, LATEST_EVENT_KEY);
latestEvent.set(key); latestEvent.set(key);
CheckPoint cp = new CheckPoint(); CheckPoint cp = new CheckPoint();
Type topic2cIdsType = TypeToken Type topic2cIdsType = TypeToken.getParameterized(ConcurrentHashMap.class, String.class,
.getParameterized(ConcurrentHashMap.class, String.class, TypeToken.getParameterized(Set.class, String.class).getType()).getType();
TypeToken.getParameterized(Set.class, String.class, String.class).getType())
.getType();
// retrieving transactions from database // retrieving transactions from database
while (null != key && !key.isEmpty()) { while (null != key && !key.isEmpty()) {
String json = KeyValueRocksDBUtil.instance.getValue(dbName, key); String json = KeyValueRocksDBUtil.instance.getValue(dbName, key);
@ -94,16 +92,14 @@ public class EventRecorder {
if (json.startsWith("cp")) { if (json.startsWith("cp")) {
// create check point by the transaction and stop retrieving // create check point by the transaction and stop retrieving
JsonObject data = JsonUtil.parseStringAsJsonObject(json.substring(2)); JsonObject data = JsonUtil.parseStringAsJsonObject(json.substring(2));
cp.topic2cIds = cp.topic2cIds = JsonUtil.fromJson(data.get("topic2cIds").toString(), topic2cIdsType);
JsonUtil.fromJson(data.get("topic2cIds").toString(), topic2cIdsType);
JsonObject id2Consumers = data.getAsJsonObject("id2Consumers"); JsonObject id2Consumers = data.getAsJsonObject("id2Consumers");
for (String k : id2Consumers.keySet()) { for (String k : id2Consumers.keySet()) {
JsonObject consumer = id2Consumers.getAsJsonObject(k); JsonObject consumer = id2Consumers.getAsJsonObject(k);
if (!consumer.has("type")) { if (!consumer.has("type")) {
continue; continue;
} }
ConsumerType type = ConsumerType type = ConsumerType.valueOf(consumer.get("type").getAsString());
ConsumerType.valueOf(consumer.get("type").getAsString());
switch (type) { switch (type) {
case Contract: case Contract:
cp.id2Consumers.put(k, cp.id2Consumers.put(k,
@ -148,8 +144,7 @@ public class EventRecorder {
// if empty, return the check point // if empty, return the check point
latestEvent.setCp(true); latestEvent.setCp(true);
} else { } else {
// on the base of old check point, process following sub or unsub events to recover // on the base of old check point, process following sub or unsub events to recover registry
// registry
while (!stack.empty()) { while (!stack.empty()) {
Object record = stack.pop(); Object record = stack.pop();
if (record instanceof CheckPoint) { if (record instanceof CheckPoint) {
@ -159,19 +154,15 @@ public class EventRecorder {
IEventConsumer consumer = broker.parseConsumer(tran.content); IEventConsumer consumer = broker.parseConsumer(tran.content);
switch (tran.type) { switch (tran.type) {
case SUBSCRIBE: case SUBSCRIBE:
if (!broker.subInReg(tran.topic, consumer, cp.topic2cIds, if (!broker.subInReg(tran.topic, consumer, cp.topic2cIds, cp.id2Consumers)) {
cp.id2Consumers)) {
LOGGER.warn("record damaged! " + key); LOGGER.warn("record damaged! " + key);
LOGGER.debug( LOGGER.debug("record damaged! " + key + ": " + JsonUtil.toJson(tran));
"record damaged! " + key + ": " + JsonUtil.toJson(tran));
} }
break; break;
case UNSUBSCRIBE: case UNSUBSCRIBE:
if (!broker.unsubInReg(tran.topic, consumer, cp.topic2cIds, if (!broker.unsubInReg(tran.topic, consumer, cp.topic2cIds, cp.id2Consumers)) {
cp.id2Consumers)) {
LOGGER.warn("record damaged! " + key); LOGGER.warn("record damaged! " + key);
LOGGER.debug( LOGGER.debug("record damaged! " + key + ": " + JsonUtil.toJson(tran));
"record damaged! " + key + ": " + JsonUtil.toJson(tran));
} }
default: default:
break; break;
@ -233,7 +224,7 @@ public class EventRecorder {
} }
public CheckPoint(Map<String, Set<String>> topic2cIds, public CheckPoint(Map<String, Set<String>> topic2cIds,
Map<String, IEventConsumer> id2Consumers) { Map<String, IEventConsumer> id2Consumers) {
this.topic2cIds = topic2cIds; this.topic2cIds = topic2cIds;
this.id2Consumers = id2Consumers; this.id2Consumers = id2Consumers;
} }