feat: fix sync var

This commit is contained in:
Xuxin Wang 2023-07-08 15:34:43 +08:00
parent c0c580090e
commit a794bd3af4
2 changed files with 9 additions and 8 deletions

View File

@ -55,11 +55,12 @@ public class SharableVarManager {
String type = message.header.parameters.attributes.get("type").getAsString(); String type = message.header.parameters.attributes.get("type").getAsString();
SharableVar var = allVars.get(varId); SharableVar var = allVars.get(varId);
if (var != null) { if (var != null) {
JoinableCRDT delta = JsonUtil.fromJson(content, JoinableCRDT.class);
if (Objects.equals(type, "r2r") || Objects.equals(type, "w2r")) { if (Objects.equals(type, "r2r") || Objects.equals(type, "w2r")) {
JoinableCRDT delta = JsonUtil.fromJson(content, var.readerVar.getClass());
var.readerVar.join(delta); var.readerVar.join(delta);
var.readerVarDeltaQueue.add(delta); var.readerVarDeltaQueue.add(delta);
} else if (Objects.equals(type, "w2w")) { } else if (Objects.equals(type, "w2w")) {
JoinableCRDT delta = JsonUtil.fromJson(content, var.writerVar.getClass());
var.writerVar.join(delta); var.writerVar.join(delta);
var.writerVarDeltaQueue.add(delta); var.writerVarDeltaQueue.add(delta);
} }

View File

@ -49,8 +49,8 @@ public abstract class SharableVar<T extends JoinableCRDT> {
this.readerTimer.newTimeout(timeout -> { this.readerTimer.newTimeout(timeout -> {
readerSyncTimerTask = new ReaderSyncTimerTask(); readerSyncTimerTask = new ReaderSyncTimerTask();
readerTimer.newTimeout(readerSyncTimerTask, sharableVarState.getReaderInterval(), readerTimer.newTimeout(readerSyncTimerTask, sharableVarState.getReaderInterval(),
TimeUnit.MILLISECONDS); TimeUnit.SECONDS);
}, 0, TimeUnit.MILLISECONDS); }, this.sharableVarState.getReaderInterval(), TimeUnit.SECONDS);
} }
} }
if (this.sharableVarState.isWriterFlag()) { if (this.sharableVarState.isWriterFlag()) {
@ -65,8 +65,8 @@ public abstract class SharableVar<T extends JoinableCRDT> {
this.writerTimer.newTimeout(timeout -> { this.writerTimer.newTimeout(timeout -> {
writerSyncTimerTask = new WriterSyncTimerTask(); writerSyncTimerTask = new WriterSyncTimerTask();
writerTimer.newTimeout(writerSyncTimerTask, sharableVarState.getWriteInterval(), writerTimer.newTimeout(writerSyncTimerTask, sharableVarState.getWriteInterval(),
TimeUnit.MILLISECONDS); TimeUnit.SECONDS);
}, 0, TimeUnit.MILLISECONDS); }, this.sharableVarState.getWriteInterval(), TimeUnit.SECONDS);
} }
} }
} }
@ -119,7 +119,7 @@ public abstract class SharableVar<T extends JoinableCRDT> {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
readerTimer.newTimeout(readerSyncTimerTask, sharableVarState.getReaderInterval(), readerTimer.newTimeout(readerSyncTimerTask, sharableVarState.getReaderInterval(),
TimeUnit.MILLISECONDS); TimeUnit.SECONDS);
} }
} }
} }
@ -132,8 +132,8 @@ public abstract class SharableVar<T extends JoinableCRDT> {
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
readerTimer.newTimeout(writerSyncTimerTask, sharableVarState.getWriteInterval(), writerTimer.newTimeout(writerSyncTimerTask, sharableVarState.getWriteInterval(),
TimeUnit.MILLISECONDS); TimeUnit.SECONDS);
} }
} }
} }