From 5326f41ff78e18d8e3568d99a84068d665238991 Mon Sep 17 00:00:00 2001 From: wangxuxin Date: Mon, 26 Jun 2023 17:20:19 +0800 Subject: [PATCH] feat: split readerVar and writerVar --- .../bdware/sc/crdt/SharableVarManager.java | 21 +- .../org/bdware/sc/crdt/SharableVarState.java | 237 +++++++++++++----- .../bdware/sc/crdt/proxy/AWORSetProxy.java | 22 +- .../org/bdware/sc/crdt/proxy/DWFlagProxy.java | 22 +- .../org/bdware/sc/crdt/proxy/EWFlagProxy.java | 22 +- .../bdware/sc/crdt/proxy/GCounterProxy.java | 22 +- .../org/bdware/sc/crdt/proxy/GSetProxy.java | 16 +- .../sc/crdt/proxy/LWWRegisterProxy.java | 16 +- .../bdware/sc/crdt/proxy/MVRegisterProxy.java | 20 +- .../bdware/sc/crdt/proxy/PNCounterProxy.java | 34 ++- .../bdware/sc/crdt/proxy/RWLWWSetProxy.java | 22 +- .../bdware/sc/crdt/proxy/RWORSetProxy.java | 22 +- .../org/bdware/sc/crdt/proxy/SharableVar.java | 128 +++++++--- .../org/bdware/sc/crdt/proxy/TPSetProxy.java | 22 +- 14 files changed, 443 insertions(+), 183 deletions(-) diff --git a/src/main/java/org/bdware/sc/crdt/SharableVarManager.java b/src/main/java/org/bdware/sc/crdt/SharableVarManager.java index 789c176..9890388 100644 --- a/src/main/java/org/bdware/sc/crdt/SharableVarManager.java +++ b/src/main/java/org/bdware/sc/crdt/SharableVarManager.java @@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream; import java.io.PrintStream; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; public class SharableVarManager { @@ -32,7 +33,6 @@ public class SharableVarManager { IrpClient client; AuditDoaClient doaClient; - public static final String SHARABLEOP = "86.100871/SyncVar"; public SharableVarManager(String cpId, EndpointConfig config) { @@ -52,11 +52,17 @@ public class SharableVarManager { try { String varId = message.header.parameters.attributes.get("varId").getAsString(); String content = message.header.parameters.attributes.get("content").getAsString(); + String type = message.header.parameters.attributes.get("type").getAsString(); SharableVar var = allVars.get(varId); if (var != null) { JoinableCRDT delta = JsonUtil.fromJson(content, JoinableCRDT.class); - var.self.join(delta); - var.remoteDeltaQueue.add(delta); + if (Objects.equals(type, "r2r") || Objects.equals(type, "w2r")) { + var.readerVar.join(delta); + var.readerVarDeltaQueue.add(delta); + } else if (Objects.equals(type, "w2w")) { + var.writerVar.join(delta); + var.writerVarDeltaQueue.add(delta); + } } DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); @@ -75,17 +81,18 @@ public class SharableVarManager { } } - private DoipMessage createSyncMessage(String target, String varId, String content) { + private DoipMessage createSyncMessage(String target, String varId, String content, String type) { DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); builder.createRequest(target, SHARABLEOP); builder.addAttributes("varId", varId); builder.addAttributes("content", content); + builder.addAttributes("type", type); return builder.create(); } - public void broadcastSyncMessage(String varId, List sendTo, String content) { + public void broadcastSyncMessage(String varId, List sendTo, String content, String type) { for (String target : sendTo) { - DoipMessage doipMessage = createSyncMessage(target, varId, content); + DoipMessage doipMessage = createSyncMessage(target, varId, content, type); AuditDoipClient client = getClient(target); client.sendMessage(doipMessage, new DoipMessageCallback() { @Override @@ -126,7 +133,7 @@ public class SharableVarManager { } private SharableVar createSharableVar(SharableVarState.SharableVarConfiguration conf, - String identifier, String type) { + String identifier, String type) { switch (type) { case Constants.TypeName.G_COUNTER: return new GCounterProxy(identifier, cpId, conf); diff --git a/src/main/java/org/bdware/sc/crdt/SharableVarState.java b/src/main/java/org/bdware/sc/crdt/SharableVarState.java index 64d350d..b1a07eb 100644 --- a/src/main/java/org/bdware/sc/crdt/SharableVarState.java +++ b/src/main/java/org/bdware/sc/crdt/SharableVarState.java @@ -1,7 +1,6 @@ package org.bdware.sc.crdt; import java.util.ArrayList; -import java.util.LinkedHashSet; import java.util.List; import java.util.Objects; @@ -10,57 +9,85 @@ public class SharableVarState { private SharableVarConfiguration sharableVarConfiguration; - private int myIndex; + private Integer myIndex; + // 是reader private boolean readerFlag; + // 是writer private boolean writerFlag; - private List sendTo; - private long interval; + // 是reader的根结点 + private boolean readerRootFlag; + + // 是writer的根结点 + private boolean writerRootFlag; + + // 作为writer的话,非根节点具有parent。根结点为null + private String writerParent; + + // 作为reader的话,非叶子节点具有children,叶子节点为null + private List readerChildren; + + // reader森林的根结点们,用来给writer根结点同步数据用 + private List readerRoots; + + private Long writeInterval; + + private Long readerInterval; public SharableVarState(String myId, SharableVarConfiguration sharableVarConfiguration) { this.myId = myId; this.sharableVarConfiguration = sharableVarConfiguration; - this.myIndex = parseMyIndex(); - this.sendTo = parseSendTo(); - this.interval = sharableVarConfiguration.interval[this.myIndex]; - for (int readerIndex : this.sharableVarConfiguration.readerIndexes) { - if (Objects.equals(readerIndex, myIndex)) { - this.readerFlag = true; - break; - } - } - for (int writerIndex : this.sharableVarConfiguration.writerIndexes) { - if (Objects.equals(writerIndex, myIndex)) { - this.writerFlag = true; - break; - } - } + parseProperties(); } - private int parseMyIndex() { + private void parseProperties() { for (int i = 0; i < sharableVarConfiguration.getNodeIds().length; i++) { if (Objects.equals(sharableVarConfiguration.getNodeIds()[i], myId)) { - return i; + this.myIndex = i; + break; } } - return -1; - } + if (this.myIndex == null) { + return; + } + int writerParentIdx = sharableVarConfiguration.writerParents[this.myIndex]; + if (writerParentIdx == -1) { + writerFlag = true; + writerRootFlag = true; + } else if (writerParentIdx >= 0) { + writerFlag = true; + writerParent = sharableVarConfiguration.nodeIds[writerParentIdx]; + } - private List parseSendTo() { - if (myIndex < 0 || myIndex >= sharableVarConfiguration.sendTo.length) { - return new ArrayList<>(); + int readerParentIdx = sharableVarConfiguration.readerParents[this.myIndex]; + if (readerParentIdx == -1) { + readerFlag = true; + readerRootFlag = true; + } else if (readerParentIdx >= 0) { + readerFlag = true; } - LinkedHashSet result = new LinkedHashSet<>(); - int[] sendToIndexes = sharableVarConfiguration.sendTo[myIndex]; - for (int sendToIndex : sendToIndexes) { - if (sendToIndex >= 0 && sendToIndex < sharableVarConfiguration.nodeIds.length - && sendToIndex != myIndex) { - result.add(sharableVarConfiguration.nodeIds[sendToIndex]); + if (readerFlag) { + readerChildren = new ArrayList<>(); + for (int i = 0; i < sharableVarConfiguration.readerParents.length; i++) { + if (sharableVarConfiguration.readerParents[i] == myIndex) { + readerChildren.add(sharableVarConfiguration.nodeIds[i]); + } + } + readerInterval = sharableVarConfiguration.readerIntervals[myIndex]; + } + if (writerFlag) { + writeInterval = sharableVarConfiguration.writerIntervals[myIndex]; + } + if (writerRootFlag) { + readerRoots = new ArrayList<>(); + for (int i = 0; i < sharableVarConfiguration.readerParents.length; i++) { + if (sharableVarConfiguration.readerParents[i] == -1) { + readerRoots.add(sharableVarConfiguration.nodeIds[i]); + } } } - return new ArrayList<>(result); } public String getMyId() { @@ -79,11 +106,11 @@ public class SharableVarState { this.sharableVarConfiguration = sharableVarConfiguration; } - public int getMyIndex() { + public Integer getMyIndex() { return myIndex; } - public void setMyIndex(int myIndex) { + public void setMyIndex(Integer myIndex) { this.myIndex = myIndex; } @@ -103,29 +130,82 @@ public class SharableVarState { this.writerFlag = writerFlag; } - public List getSendTo() { - return sendTo; + public boolean isReaderRootFlag() { + return readerRootFlag; } - public void setSendTo(List sendTo) { - this.sendTo = sendTo; + public void setReaderRootFlag(boolean readerRootFlag) { + this.readerRootFlag = readerRootFlag; } - public long getInterval() { - return interval; + public boolean isWriterRootFlag() { + return writerRootFlag; } - public void setInterval(long interval) { - this.interval = interval; + public void setWriterRootFlag(boolean writerRootFlag) { + this.writerRootFlag = writerRootFlag; + } + + public String getWriterParent() { + return writerParent; + } + + public void setWriterParent(String writerParent) { + this.writerParent = writerParent; + } + + public List getReaderChildren() { + return readerChildren; + } + + public void setReaderChildren(List readerChildren) { + this.readerChildren = readerChildren; + } + + public List getReaderRoots() { + return readerRoots; + } + + public void setReaderRoots(List readerRoots) { + this.readerRoots = readerRoots; + } + + public Long getWriteInterval() { + return writeInterval; + } + + public void setWriteInterval(Long writeInterval) { + this.writeInterval = writeInterval; + } + + public Long getReaderInterval() { + return readerInterval; + } + + public void setReaderInterval(Long readerInterval) { + this.readerInterval = readerInterval; } public static class SharableVarConfiguration { + // ["/bdrepo1/var/0", "/bdrepo1/var/1", "/bdrepo1/var/2", "/bdrepo1/var/3"] + // 0(w) -> 2(w) -> 1(r) -> 3(r) + // | + // 1(w) - + + // ["/bdrepo1/var/0", "/bdrepo1/var/1", "/bdrepo1/var/2", "/bdrepo1/var/3"] String[] nodeIds; + long maxDelay; int[] writerIndexes; int[] readerIndexes; - int[][] sendTo; - long[] interval; - long maxDelay; + + // [2, 2, -1, -2] -2表示该节点为非writer节点,-1表示该结点为writer根结点 + int[] writerParents; + // [-2, -1, -2, 1] -2表示该节点为非reader节点,-1表示该结点为reader根结点 + int[] readerParents; + // [5, 5, 6, -1] -1为无需同步 + long[] writerIntervals; + // [-1, 4, -1, -1] + long[] readerIntervals; public String[] getNodeIds() { return nodeIds; @@ -135,6 +215,14 @@ public class SharableVarState { this.nodeIds = nodeIds; } + public long getMaxDelay() { + return maxDelay; + } + + public void setMaxDelay(long maxDelay) { + this.maxDelay = maxDelay; + } + public int[] getWriterIndexes() { return writerIndexes; } @@ -151,28 +239,61 @@ public class SharableVarState { this.readerIndexes = readerIndexes; } - public int[][] getSendTo() { - return sendTo; + public int[] getWriterParents() { + return writerParents; } - public void setSendTo(int[][] sendTo) { - this.sendTo = sendTo; + public void setWriterParents(int[] writerParents) { + this.writerParents = writerParents; } - public long[] getInterval() { - return interval; + public int[] getReaderParents() { + return readerParents; } - public void setInterval(long[] interval) { - this.interval = interval; + public void setReaderParents(int[] readerParents) { + this.readerParents = readerParents; } - public long getMaxDelay() { - return maxDelay; + public long[] getWriterIntervals() { + return writerIntervals; } - public void setMaxDelay(long maxDelay) { - this.maxDelay = maxDelay; + public void setWriterIntervals(long[] writerIntervals) { + this.writerIntervals = writerIntervals; + } + + public long[] getReaderIntervals() { + return readerIntervals; + } + + public void setReaderIntervals(long[] readerIntervals) { + this.readerIntervals = readerIntervals; } } + + public static void main(String[] args) { + // ["/bdrepo1/var/0", "/bdrepo1/var/1", "/bdrepo1/var/2", "/bdrepo1/var/3"] + // 0(w) -> 2(w) -> 1(r) -> 3(r) + // | + // 1(w) - + + + SharableVarConfiguration sharableVarConfiguration = new SharableVarConfiguration(); + sharableVarConfiguration.setNodeIds(new String[]{"/bdrepo1/var/0", "/bdrepo1/var/1", "/bdrepo1/var/2", "/bdrepo1/var/3"}); + sharableVarConfiguration.setWriterParents(new int[]{2, 2, -1, -2}); + sharableVarConfiguration.setReaderParents(new int[]{-2, -1, -2, 1}); + sharableVarConfiguration.setWriterIntervals(new long[]{5, 5, 6, -1}); + sharableVarConfiguration.setReaderIntervals(new long[]{-1, 4, -1, -1}); + + + SharableVarState sharableVarState0 = new SharableVarState("/bdrepo1/var/0", sharableVarConfiguration); + SharableVarState sharableVarState1 = new SharableVarState("/bdrepo1/var/1", sharableVarConfiguration); + SharableVarState sharableVarState2 = new SharableVarState("/bdrepo1/var/2", sharableVarConfiguration); + SharableVarState sharableVarState3 = new SharableVarState("/bdrepo1/var/3", sharableVarConfiguration); + System.out.println(sharableVarState0); + System.out.println(sharableVarState1); + System.out.println(sharableVarState2); + System.out.println(sharableVarState3); + } } diff --git a/src/main/java/org/bdware/sc/crdt/proxy/AWORSetProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/AWORSetProxy.java index 6aa0678..9b58029 100644 --- a/src/main/java/org/bdware/sc/crdt/proxy/AWORSetProxy.java +++ b/src/main/java/org/bdware/sc/crdt/proxy/AWORSetProxy.java @@ -8,25 +8,31 @@ import java.util.Set; public class AWORSetProxy extends SharableVar> { public AWORSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { super(varId, cpId, conf); - self = new AWORSet<>(cpId, varId); } public void add(Object val) { - AWORSet delta = self.add(val); - localDeltaQueue.add(delta); + if (writerVar != null) { + AWORSet delta = writerVar.add(val); + writerVarDeltaQueue.add(delta); + } } public void remove(Object val) { - AWORSet delta = self.remove(val); - localDeltaQueue.add(delta); + if (writerVar != null) { + AWORSet delta = writerVar.remove(val); + writerVarDeltaQueue.add(delta); + } } public Set read() { - return self.read(); + if (readerVar != null) { + return readerVar.read(); + } + return writerVar.read(); } @Override - protected AWORSet newEmptyDelta() { - return new AWORSet<>(null, varId); + protected AWORSet createDeltaCrdt(String nodeId, String varId) { + return new AWORSet<>(nodeId, varId); } } diff --git a/src/main/java/org/bdware/sc/crdt/proxy/DWFlagProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/DWFlagProxy.java index cf38cdc..265827d 100644 --- a/src/main/java/org/bdware/sc/crdt/proxy/DWFlagProxy.java +++ b/src/main/java/org/bdware/sc/crdt/proxy/DWFlagProxy.java @@ -6,25 +6,31 @@ import org.bdware.sc.crdt.SharableVarState; public class DWFlagProxy extends SharableVar { public DWFlagProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { super(varId, cpId, conf); - self = new DWFlag(cpId, varId); } public void enable() { - DWFlag delta = self.enable(); - localDeltaQueue.add(delta); + if (writerVar != null) { + DWFlag delta = writerVar.enable(); + writerVarDeltaQueue.add(delta); + } } public void disable() { - DWFlag delta = self.disable(); - localDeltaQueue.add(delta); + if (writerVar != null) { + DWFlag delta = writerVar.disable(); + writerVarDeltaQueue.add(delta); + } } public boolean read() { - return self.read(); + if (readerVar != null) { + return readerVar.read(); + } + return writerVar.read(); } @Override - protected DWFlag newEmptyDelta() { - return new DWFlag(null, varId); + protected DWFlag createDeltaCrdt(String nodeId, String varId) { + return new DWFlag(nodeId, varId); } } diff --git a/src/main/java/org/bdware/sc/crdt/proxy/EWFlagProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/EWFlagProxy.java index b312059..89797bb 100644 --- a/src/main/java/org/bdware/sc/crdt/proxy/EWFlagProxy.java +++ b/src/main/java/org/bdware/sc/crdt/proxy/EWFlagProxy.java @@ -6,25 +6,31 @@ import org.bdware.sc.crdt.SharableVarState; public class EWFlagProxy extends SharableVar { public EWFlagProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { super(varId, cpId, conf); - self = new EWFlag(cpId, varId); } public void enable() { - EWFlag delta = self.enable(); - localDeltaQueue.add(delta); + if (writerVar != null) { + EWFlag delta = writerVar.enable(); + writerVarDeltaQueue.add(delta); + } } public void disable() { - EWFlag delta = self.disable(); - localDeltaQueue.add(delta); + if (writerVar != null) { + EWFlag delta = writerVar.disable(); + writerVarDeltaQueue.add(delta); + } } public boolean read() { - return self.read(); + if (readerVar != null) { + return readerVar.read(); + } + return writerVar.read(); } @Override - protected EWFlag newEmptyDelta() { - return new EWFlag(null, varId); + protected EWFlag createDeltaCrdt(String nodeId, String varId) { + return new EWFlag(nodeId, varId); } } diff --git a/src/main/java/org/bdware/sc/crdt/proxy/GCounterProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/GCounterProxy.java index 11e8132..930eabe 100644 --- a/src/main/java/org/bdware/sc/crdt/proxy/GCounterProxy.java +++ b/src/main/java/org/bdware/sc/crdt/proxy/GCounterProxy.java @@ -8,25 +8,31 @@ public class GCounterProxy extends SharableVar { public GCounterProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { super(varId, cpId, conf); - self = new GCounter(cpId, varId); } public void inc() { - GCounter delta = self.inc(); - localDeltaQueue.add(delta); + if (writerVar != null) { + GCounter delta = writerVar.inc(); + writerVarDeltaQueue.add(delta); + } } public void inc(long var) { - GCounter delta = self.inc(var); - localDeltaQueue.add(delta); + if (writerVar != null) { + GCounter delta = writerVar.inc(var); + writerVarDeltaQueue.add(delta); + } } public Long read() { - return self.read(); + if (readerVar != null) { + return readerVar.read(); + } + return writerVar.read(); } @Override - protected GCounter newEmptyDelta() { - return new GCounter(null, varId); + protected GCounter createDeltaCrdt(String nodeId, String varId) { + return new GCounter(nodeId, varId); } } diff --git a/src/main/java/org/bdware/sc/crdt/proxy/GSetProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/GSetProxy.java index 7bb4fc6..d145253 100644 --- a/src/main/java/org/bdware/sc/crdt/proxy/GSetProxy.java +++ b/src/main/java/org/bdware/sc/crdt/proxy/GSetProxy.java @@ -8,20 +8,24 @@ import java.util.Set; public class GSetProxy extends SharableVar> { public GSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { super(varId, cpId, conf); - self = new GSet<>(cpId, varId); } public void add(Object val) { - GSet delta = self.add(val); - localDeltaQueue.add(delta); + if (writerVar != null) { + GSet delta = writerVar.add(val); + writerVarDeltaQueue.add(delta); + } } public Set read() { - return self.read(); + if (readerVar != null) { + return readerVar.read(); + } + return writerVar.read(); } @Override - protected GSet newEmptyDelta() { - return new GSet<>(null, varId); + protected GSet createDeltaCrdt(String nodeId, String varId) { + return new GSet<>(nodeId, varId); } } diff --git a/src/main/java/org/bdware/sc/crdt/proxy/LWWRegisterProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/LWWRegisterProxy.java index ad73177..47aafa7 100644 --- a/src/main/java/org/bdware/sc/crdt/proxy/LWWRegisterProxy.java +++ b/src/main/java/org/bdware/sc/crdt/proxy/LWWRegisterProxy.java @@ -7,20 +7,24 @@ public class LWWRegisterProxy extends SharableVar> { public LWWRegisterProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { super(varId, cpId, conf); - self = new LWWRegister<>(cpId, varId); } public void write(Object val) { - LWWRegister delta = self.write(System.currentTimeMillis(), val); - localDeltaQueue.add(delta); + if (writerVar != null) { + LWWRegister delta = writerVar.write(System.currentTimeMillis(), val); + writerVarDeltaQueue.add(delta); + } } public Object read() { - return self.read(); + if (readerVar != null) { + return readerVar.read(); + } + return writerVar.read(); } @Override - protected LWWRegister newEmptyDelta() { - return new LWWRegister<>(null, varId); + protected LWWRegister createDeltaCrdt(String nodeId, String varId) { + return new LWWRegister<>(nodeId, varId); } } diff --git a/src/main/java/org/bdware/sc/crdt/proxy/MVRegisterProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/MVRegisterProxy.java index 460a7c8..0c4fba4 100644 --- a/src/main/java/org/bdware/sc/crdt/proxy/MVRegisterProxy.java +++ b/src/main/java/org/bdware/sc/crdt/proxy/MVRegisterProxy.java @@ -8,25 +8,29 @@ public class MVRegisterProxy extends SharableVar> { public MVRegisterProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { super(varId, cpId, conf); - self = new MVRegister<>(cpId, varId); } public void write(Object val) { - MVRegister delta = self.write(val); - localDeltaQueue.add(delta); + if (writerVar != null) { + MVRegister delta = writerVar.write(val); + writerVarDeltaQueue.add(delta); + } } public void resolve(Object val) { - MVRegister delta = self.resolve(); - localDeltaQueue.add(delta); + MVRegister delta = writerVar.resolve(); + writerVarDeltaQueue.add(delta); } public Object read() { - return self.read(); + if (readerVar != null) { + return readerVar.read(); + } + return writerVar.read(); } @Override - protected MVRegister newEmptyDelta() { - return new MVRegister<>(null, varId); + protected MVRegister createDeltaCrdt(String nodeId, String varId) { + return new MVRegister<>(nodeId, varId); } } diff --git a/src/main/java/org/bdware/sc/crdt/proxy/PNCounterProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/PNCounterProxy.java index b566c99..4fe5b99 100644 --- a/src/main/java/org/bdware/sc/crdt/proxy/PNCounterProxy.java +++ b/src/main/java/org/bdware/sc/crdt/proxy/PNCounterProxy.java @@ -7,35 +7,45 @@ public class PNCounterProxy extends SharableVar { public PNCounterProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { super(varId, cpId, conf); - self = new PNCounter(cpId, varId); } public void inc() { - PNCounter delta = self.inc(); - localDeltaQueue.add(delta); + if (writerVar != null) { + PNCounter delta = writerVar.inc(); + writerVarDeltaQueue.add(delta); + } } public void inc(long val) { - PNCounter delta = self.inc(val); - localDeltaQueue.add(delta); + if (writerVar != null) { + PNCounter delta = writerVar.inc(val); + writerVarDeltaQueue.add(delta); + } } public void dec() { - PNCounter delta = self.dec(); - localDeltaQueue.add(delta); + if (writerVar != null) { + PNCounter delta = writerVar.dec(); + writerVarDeltaQueue.add(delta); + } } public void dec(long val) { - PNCounter delta = self.dec(val); - localDeltaQueue.add(delta); + if (writerVar != null) { + PNCounter delta = writerVar.dec(val); + writerVarDeltaQueue.add(delta); + } } public Long read() { - return self.read(); + if (readerVar != null) { + return readerVar.read(); + } + return writerVar.read(); } @Override - protected PNCounter newEmptyDelta() { - return new PNCounter(null, varId); + protected PNCounter createDeltaCrdt(String nodeId, String varId) { + return new PNCounter(nodeId, varId); } } diff --git a/src/main/java/org/bdware/sc/crdt/proxy/RWLWWSetProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/RWLWWSetProxy.java index 4fbd074..999f937 100644 --- a/src/main/java/org/bdware/sc/crdt/proxy/RWLWWSetProxy.java +++ b/src/main/java/org/bdware/sc/crdt/proxy/RWLWWSetProxy.java @@ -9,25 +9,31 @@ public class RWLWWSetProxy extends SharableVar> { public RWLWWSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { super(varId, cpId, conf); - self = new RWLWWSet<>(cpId, varId); } public void add(Object val) { - RWLWWSet delta = self.add(System.currentTimeMillis(), val); - localDeltaQueue.add(delta); + if (writerVar != null) { + RWLWWSet delta = writerVar.add(System.currentTimeMillis(), val); + writerVarDeltaQueue.add(delta); + } } public void remove(Object val) { - RWLWWSet delta = self.remove(System.currentTimeMillis(), val); - localDeltaQueue.add(delta); + if (writerVar != null) { + RWLWWSet delta = writerVar.remove(System.currentTimeMillis(), val); + writerVarDeltaQueue.add(delta); + } } public Set read() { - return self.read(); + if (readerVar != null) { + return readerVar.read(); + } + return writerVar.read(); } @Override - protected RWLWWSet newEmptyDelta() { - return new RWLWWSet<>(); + protected RWLWWSet createDeltaCrdt(String nodeId, String varId) { + return new RWLWWSet<>(nodeId, varId); } } diff --git a/src/main/java/org/bdware/sc/crdt/proxy/RWORSetProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/RWORSetProxy.java index 745b493..d493ad2 100644 --- a/src/main/java/org/bdware/sc/crdt/proxy/RWORSetProxy.java +++ b/src/main/java/org/bdware/sc/crdt/proxy/RWORSetProxy.java @@ -8,25 +8,31 @@ import java.util.Set; public class RWORSetProxy extends SharableVar> { public RWORSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { super(varId, cpId, conf); - self = new RWORSet<>(cpId, varId); } public void add(Object val) { - RWORSet delta = self.add(val); - localDeltaQueue.add(delta); + if (writerVar != null) { + RWORSet delta = writerVar.add(val); + writerVarDeltaQueue.add(delta); + } } public void remove(Object val) { - RWORSet delta = self.remove(val); - localDeltaQueue.add(delta); + if (writerVar != null) { + RWORSet delta = writerVar.remove(val); + writerVarDeltaQueue.add(delta); + } } public Set read() { - return self.read(); + if (readerVar != null) { + return readerVar.read(); + } + return writerVar.read(); } @Override - protected RWORSet newEmptyDelta() { - return new RWORSet<>(null, varId); + protected RWORSet createDeltaCrdt(String nodeId, String varId) { + return new RWORSet<>(nodeId, varId); } } diff --git a/src/main/java/org/bdware/sc/crdt/proxy/SharableVar.java b/src/main/java/org/bdware/sc/crdt/proxy/SharableVar.java index 883acab..0845200 100644 --- a/src/main/java/org/bdware/sc/crdt/proxy/SharableVar.java +++ b/src/main/java/org/bdware/sc/crdt/proxy/SharableVar.java @@ -8,63 +8,131 @@ import org.bdware.sc.crdt.SharableVarManager; import org.bdware.sc.crdt.SharableVarState; import org.bdware.sc.util.JsonUtil; +import java.util.Collections; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public abstract class SharableVar { - public final HashedWheelTimer HASHED_WHEEL_TIMER = new HashedWheelTimer(r -> { - Thread t = Executors.defaultThreadFactory().newThread(r); - t.setDaemon(true); - return t; - }, 5, TimeUnit.MILLISECONDS, 2); + // 用于writer节点的变量副本 + public T writerVar; + // 用于reader节点的变量副本 + public T readerVar; - public final Queue localDeltaQueue = new LinkedList<>(); - public final Queue remoteDeltaQueue = new LinkedList<>(); - public T self; - protected String varId; - private SyncTimerTask syncTimerTask; - private SharableVarState sharableVarState; + public Queue writerVarDeltaQueue; + public Queue readerVarDeltaQueue; + + public String varId; + public SharableVarState sharableVarState; + + public HashedWheelTimer readerTimer; + public ReaderSyncTimerTask readerSyncTimerTask; + public HashedWheelTimer writerTimer; + public WriterSyncTimerTask writerSyncTimerTask; public SharableVar(String varId, String cpId, - SharableVarState.SharableVarConfiguration resolveResult) { + SharableVarState.SharableVarConfiguration resolveResult) { this.varId = varId; this.sharableVarState = new SharableVarState(cpId, resolveResult); - this.HASHED_WHEEL_TIMER.newTimeout(timeout -> { - syncTimerTask = new SyncTimerTask(); - HASHED_WHEEL_TIMER.newTimeout(syncTimerTask, sharableVarState.getInterval(), - TimeUnit.MILLISECONDS); - }, 0, TimeUnit.MILLISECONDS); + + if (this.sharableVarState.isReaderFlag()) { + this.readerVar = createDeltaCrdt(cpId, varId); + this.readerVarDeltaQueue = new LinkedList<>(); + if (this.sharableVarState.getReaderChildren() != null && this.sharableVarState.getReaderChildren().size() > 0 + && this.sharableVarState.getReaderInterval() != null && this.sharableVarState.getReaderInterval() > 0) { + this.readerTimer = new HashedWheelTimer(r -> { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + return t; + }, 5, TimeUnit.MILLISECONDS, 2); + this.readerTimer.newTimeout(timeout -> { + readerSyncTimerTask = new ReaderSyncTimerTask(); + readerTimer.newTimeout(readerSyncTimerTask, sharableVarState.getReaderInterval(), + TimeUnit.MILLISECONDS); + }, 0, TimeUnit.MILLISECONDS); + } + } + if (this.sharableVarState.isWriterFlag()) { + this.writerVar = createDeltaCrdt(cpId, varId); + this.writerVarDeltaQueue = new LinkedList<>(); + if (this.sharableVarState.getWriteInterval() != null && this.sharableVarState.getWriteInterval() > 0) { + this.writerTimer = new HashedWheelTimer(r -> { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + return t; + }, 5, TimeUnit.MILLISECONDS, 2); + this.writerTimer.newTimeout(timeout -> { + writerSyncTimerTask = new WriterSyncTimerTask(); + writerTimer.newTimeout(writerSyncTimerTask, sharableVarState.getWriteInterval(), + TimeUnit.MILLISECONDS); + }, 0, TimeUnit.MILLISECONDS); + } + } } - private void syncVar() { - if (localDeltaQueue.isEmpty()) { + private void syncReaderVar() { + if (readerVarDeltaQueue.isEmpty()) { return; } - JoinableCRDT joinedDelta = newEmptyDelta(); - synchronized (localDeltaQueue) { - while (!localDeltaQueue.isEmpty()) { - JoinableCRDT delta = localDeltaQueue.poll(); + JoinableCRDT joinedDelta = createDeltaCrdt(null, varId); + synchronized (readerVarDeltaQueue) { + while (!readerVarDeltaQueue.isEmpty()) { + JoinableCRDT delta = readerVarDeltaQueue.poll(); joinedDelta.join(delta); } } - String content = JsonUtil.toJson(localDeltaQueue); - SharableVarManager.instance.broadcastSyncMessage(varId, sharableVarState.getSendTo(), - content); + String content = JsonUtil.toJson(joinedDelta); + SharableVarManager.instance.broadcastSyncMessage(varId, sharableVarState.getReaderChildren(), content, "r2r"); } - abstract protected T newEmptyDelta(); + private void syncWriterVar() { + if (writerVarDeltaQueue.isEmpty()) { + return; + } + JoinableCRDT joinedDelta = createDeltaCrdt(null, varId); + synchronized (writerVarDeltaQueue) { + while (!writerVarDeltaQueue.isEmpty()) { + JoinableCRDT delta = writerVarDeltaQueue.poll(); + joinedDelta.join(delta); + } + } + String content = JsonUtil.toJson(joinedDelta); + if (sharableVarState.getWriterParent() != null) { + // 父节点是Writer + SharableVarManager.instance.broadcastSyncMessage(varId, Collections.singletonList(sharableVarState.getWriterParent()), content, "w2w"); + } else if (sharableVarState.getReaderRoots() != null) { + // 自己是writer根结点 向reader根结点们广播 + SharableVarManager.instance.broadcastSyncMessage(varId, sharableVarState.getReaderRoots(), content, "w2r"); + } - class SyncTimerTask implements TimerTask { + } + + abstract protected T createDeltaCrdt(String nodeId, String varId); + + class ReaderSyncTimerTask implements TimerTask { @Override public void run(Timeout timeout) throws Exception { try { - syncVar(); + syncReaderVar(); } catch (Exception e) { e.printStackTrace(); } finally { - HASHED_WHEEL_TIMER.newTimeout(syncTimerTask, sharableVarState.getInterval(), + readerTimer.newTimeout(readerSyncTimerTask, sharableVarState.getReaderInterval(), + TimeUnit.MILLISECONDS); + } + } + } + + class WriterSyncTimerTask implements TimerTask { + @Override + public void run(Timeout timeout) throws Exception { + try { + syncWriterVar(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + readerTimer.newTimeout(writerSyncTimerTask, sharableVarState.getWriteInterval(), TimeUnit.MILLISECONDS); } } diff --git a/src/main/java/org/bdware/sc/crdt/proxy/TPSetProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/TPSetProxy.java index de5a435..a01eb4f 100644 --- a/src/main/java/org/bdware/sc/crdt/proxy/TPSetProxy.java +++ b/src/main/java/org/bdware/sc/crdt/proxy/TPSetProxy.java @@ -8,25 +8,31 @@ import java.util.Set; public class TPSetProxy extends SharableVar> { public TPSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { super(varId, cpId, conf); - self = new TPSet<>(cpId, varId); } public void add(Object val) { - TPSet delta = self.add(val); - localDeltaQueue.add(delta); + if (writerVar != null) { + TPSet delta = writerVar.add(val); + writerVarDeltaQueue.add(delta); + } } public void remove(Object val) { - TPSet delta = self.remove(val); - localDeltaQueue.add(delta); + if (writerVar != null) { + TPSet delta = writerVar.remove(val); + writerVarDeltaQueue.add(delta); + } } public Set read() { - return self.read(); + if (readerVar != null) { + return readerVar.read(); + } + return writerVar.read(); } @Override - protected TPSet newEmptyDelta() { - return new TPSet<>(null, varId); + protected TPSet createDeltaCrdt(String nodeId, String varId) { + return new TPSet<>(nodeId, varId); } }