mirror of
https://gitee.com/BDWare/cm
synced 2025-01-10 01:44:04 +00:00
auto prune
This commit is contained in:
parent
f53c2374df
commit
2c4a3bda9a
@ -2,9 +2,7 @@ plugins {
|
|||||||
id 'java'
|
id 'java'
|
||||||
id 'java-library'
|
id 'java-library'
|
||||||
}
|
}
|
||||||
|
|
||||||
apply from: '../spotless.gradle'
|
apply from: '../spotless.gradle'
|
||||||
|
|
||||||
repositories {
|
repositories {
|
||||||
mavenCentral()
|
mavenCentral()
|
||||||
}
|
}
|
||||||
|
@ -53,7 +53,7 @@ public class EventRecorder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void createCheckPoint(Map<String, Set<String>> topic2cIds,
|
public void createCheckPoint(Map<String, Set<String>> topic2cIds,
|
||||||
Map<String, IEventConsumer> id2Consumers) {
|
Map<String, IEventConsumer> id2Consumers) {
|
||||||
CheckPoint cp = new CheckPoint(topic2cIds, id2Consumers);
|
CheckPoint cp = new CheckPoint(topic2cIds, id2Consumers);
|
||||||
synchronized (latestEvent) {
|
synchronized (latestEvent) {
|
||||||
if (!latestEvent.isCp()) {
|
if (!latestEvent.isCp()) {
|
||||||
@ -79,8 +79,11 @@ public class EventRecorder {
|
|||||||
String key = KeyValueRocksDBUtil.instance.getValue(dbName, LATEST_EVENT_KEY);
|
String key = KeyValueRocksDBUtil.instance.getValue(dbName, LATEST_EVENT_KEY);
|
||||||
latestEvent.set(key);
|
latestEvent.set(key);
|
||||||
CheckPoint cp = new CheckPoint();
|
CheckPoint cp = new CheckPoint();
|
||||||
Type topic2cIdsType = TypeToken.getParameterized(ConcurrentHashMap.class, String.class,
|
Type topic2cIdsType =
|
||||||
TypeToken.getParameterized(Set.class, String.class).getType()).getType();
|
TypeToken
|
||||||
|
.getParameterized(ConcurrentHashMap.class, String.class,
|
||||||
|
TypeToken.getParameterized(Set.class, String.class).getType())
|
||||||
|
.getType();
|
||||||
// retrieving transactions from database
|
// retrieving transactions from database
|
||||||
while (null != key && !key.isEmpty()) {
|
while (null != key && !key.isEmpty()) {
|
||||||
String json = KeyValueRocksDBUtil.instance.getValue(dbName, key);
|
String json = KeyValueRocksDBUtil.instance.getValue(dbName, key);
|
||||||
@ -92,14 +95,16 @@ public class EventRecorder {
|
|||||||
if (json.startsWith("cp")) {
|
if (json.startsWith("cp")) {
|
||||||
// create check point by the transaction and stop retrieving
|
// create check point by the transaction and stop retrieving
|
||||||
JsonObject data = JsonUtil.parseStringAsJsonObject(json.substring(2));
|
JsonObject data = JsonUtil.parseStringAsJsonObject(json.substring(2));
|
||||||
cp.topic2cIds = JsonUtil.fromJson(data.get("topic2cIds").toString(), topic2cIdsType);
|
cp.topic2cIds =
|
||||||
|
JsonUtil.fromJson(data.get("topic2cIds").toString(), topic2cIdsType);
|
||||||
JsonObject id2Consumers = data.getAsJsonObject("id2Consumers");
|
JsonObject id2Consumers = data.getAsJsonObject("id2Consumers");
|
||||||
for (String k : id2Consumers.keySet()) {
|
for (String k : id2Consumers.keySet()) {
|
||||||
JsonObject consumer = id2Consumers.getAsJsonObject(k);
|
JsonObject consumer = id2Consumers.getAsJsonObject(k);
|
||||||
if (!consumer.has("type")) {
|
if (!consumer.has("type")) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
ConsumerType type = ConsumerType.valueOf(consumer.get("type").getAsString());
|
ConsumerType type =
|
||||||
|
ConsumerType.valueOf(consumer.get("type").getAsString());
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case Contract:
|
case Contract:
|
||||||
cp.id2Consumers.put(k,
|
cp.id2Consumers.put(k,
|
||||||
@ -144,7 +149,8 @@ public class EventRecorder {
|
|||||||
// if empty, return the check point
|
// if empty, return the check point
|
||||||
latestEvent.setCp(true);
|
latestEvent.setCp(true);
|
||||||
} else {
|
} else {
|
||||||
// on the base of old check point, process following sub or unsub events to recover registry
|
// on the base of old check point, process following sub or unsub events to recover
|
||||||
|
// registry
|
||||||
while (!stack.empty()) {
|
while (!stack.empty()) {
|
||||||
Object record = stack.pop();
|
Object record = stack.pop();
|
||||||
if (record instanceof CheckPoint) {
|
if (record instanceof CheckPoint) {
|
||||||
@ -154,15 +160,19 @@ public class EventRecorder {
|
|||||||
IEventConsumer consumer = broker.parseConsumer(tran.content);
|
IEventConsumer consumer = broker.parseConsumer(tran.content);
|
||||||
switch (tran.type) {
|
switch (tran.type) {
|
||||||
case SUBSCRIBE:
|
case SUBSCRIBE:
|
||||||
if (!broker.subInReg(tran.topic, consumer, cp.topic2cIds, cp.id2Consumers)) {
|
if (!broker.subInReg(tran.topic, consumer, cp.topic2cIds,
|
||||||
|
cp.id2Consumers)) {
|
||||||
LOGGER.warn("record damaged! " + key);
|
LOGGER.warn("record damaged! " + key);
|
||||||
LOGGER.debug("record damaged! " + key + ": " + JsonUtil.toJson(tran));
|
LOGGER.debug(
|
||||||
|
"record damaged! " + key + ": " + JsonUtil.toJson(tran));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case UNSUBSCRIBE:
|
case UNSUBSCRIBE:
|
||||||
if (!broker.unsubInReg(tran.topic, consumer, cp.topic2cIds, cp.id2Consumers)) {
|
if (!broker.unsubInReg(tran.topic, consumer, cp.topic2cIds,
|
||||||
|
cp.id2Consumers)) {
|
||||||
LOGGER.warn("record damaged! " + key);
|
LOGGER.warn("record damaged! " + key);
|
||||||
LOGGER.debug("record damaged! " + key + ": " + JsonUtil.toJson(tran));
|
LOGGER.debug(
|
||||||
|
"record damaged! " + key + ": " + JsonUtil.toJson(tran));
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
@ -224,7 +234,7 @@ public class EventRecorder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public CheckPoint(Map<String, Set<String>> topic2cIds,
|
public CheckPoint(Map<String, Set<String>> topic2cIds,
|
||||||
Map<String, IEventConsumer> id2Consumers) {
|
Map<String, IEventConsumer> id2Consumers) {
|
||||||
this.topic2cIds = topic2cIds;
|
this.topic2cIds = topic2cIds;
|
||||||
this.id2Consumers = id2Consumers;
|
this.id2Consumers = id2Consumers;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user