feat: split readerVar and writerVar

This commit is contained in:
Xuxin Wang 2023-06-26 17:20:19 +08:00
parent 2a59f577fc
commit 5326f41ff7
14 changed files with 443 additions and 183 deletions

View File

@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
public class SharableVarManager { public class SharableVarManager {
@ -32,7 +33,6 @@ public class SharableVarManager {
IrpClient client; IrpClient client;
AuditDoaClient doaClient; AuditDoaClient doaClient;
public static final String SHARABLEOP = "86.100871/SyncVar"; public static final String SHARABLEOP = "86.100871/SyncVar";
public SharableVarManager(String cpId, EndpointConfig config) { public SharableVarManager(String cpId, EndpointConfig config) {
@ -52,11 +52,17 @@ public class SharableVarManager {
try { try {
String varId = message.header.parameters.attributes.get("varId").getAsString(); String varId = message.header.parameters.attributes.get("varId").getAsString();
String content = message.header.parameters.attributes.get("content").getAsString(); String content = message.header.parameters.attributes.get("content").getAsString();
String type = message.header.parameters.attributes.get("type").getAsString();
SharableVar var = allVars.get(varId); SharableVar var = allVars.get(varId);
if (var != null) { if (var != null) {
JoinableCRDT delta = JsonUtil.fromJson(content, JoinableCRDT.class); JoinableCRDT delta = JsonUtil.fromJson(content, JoinableCRDT.class);
var.self.join(delta); if (Objects.equals(type, "r2r") || Objects.equals(type, "w2r")) {
var.remoteDeltaQueue.add(delta); var.readerVar.join(delta);
var.readerVarDeltaQueue.add(delta);
} else if (Objects.equals(type, "w2w")) {
var.writerVar.join(delta);
var.writerVarDeltaQueue.add(delta);
}
} }
DoipMessageFactory.DoipMessageBuilder builder = DoipMessageFactory.DoipMessageBuilder builder =
new DoipMessageFactory.DoipMessageBuilder(); new DoipMessageFactory.DoipMessageBuilder();
@ -75,17 +81,18 @@ public class SharableVarManager {
} }
} }
private DoipMessage createSyncMessage(String target, String varId, String content) { private DoipMessage createSyncMessage(String target, String varId, String content, String type) {
DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder();
builder.createRequest(target, SHARABLEOP); builder.createRequest(target, SHARABLEOP);
builder.addAttributes("varId", varId); builder.addAttributes("varId", varId);
builder.addAttributes("content", content); builder.addAttributes("content", content);
builder.addAttributes("type", type);
return builder.create(); return builder.create();
} }
public void broadcastSyncMessage(String varId, List<String> sendTo, String content) { public void broadcastSyncMessage(String varId, List<String> sendTo, String content, String type) {
for (String target : sendTo) { for (String target : sendTo) {
DoipMessage doipMessage = createSyncMessage(target, varId, content); DoipMessage doipMessage = createSyncMessage(target, varId, content, type);
AuditDoipClient client = getClient(target); AuditDoipClient client = getClient(target);
client.sendMessage(doipMessage, new DoipMessageCallback() { client.sendMessage(doipMessage, new DoipMessageCallback() {
@Override @Override
@ -126,7 +133,7 @@ public class SharableVarManager {
} }
private SharableVar createSharableVar(SharableVarState.SharableVarConfiguration conf, private SharableVar createSharableVar(SharableVarState.SharableVarConfiguration conf,
String identifier, String type) { String identifier, String type) {
switch (type) { switch (type) {
case Constants.TypeName.G_COUNTER: case Constants.TypeName.G_COUNTER:
return new GCounterProxy(identifier, cpId, conf); return new GCounterProxy(identifier, cpId, conf);

View File

@ -1,7 +1,6 @@
package org.bdware.sc.crdt; package org.bdware.sc.crdt;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
@ -10,57 +9,85 @@ public class SharableVarState {
private SharableVarConfiguration sharableVarConfiguration; private SharableVarConfiguration sharableVarConfiguration;
private int myIndex; private Integer myIndex;
// 是reader
private boolean readerFlag; private boolean readerFlag;
// 是writer
private boolean writerFlag; private boolean writerFlag;
private List<String> sendTo;
private long interval; // 是reader的根结点
private boolean readerRootFlag;
// 是writer的根结点
private boolean writerRootFlag;
// 作为writer的话非根节点具有parent根结点为null
private String writerParent;
// 作为reader的话非叶子节点具有children叶子节点为null
private List<String> readerChildren;
// reader森林的根结点们,用来给writer根结点同步数据用
private List<String> readerRoots;
private Long writeInterval;
private Long readerInterval;
public SharableVarState(String myId, SharableVarConfiguration sharableVarConfiguration) { public SharableVarState(String myId, SharableVarConfiguration sharableVarConfiguration) {
this.myId = myId; this.myId = myId;
this.sharableVarConfiguration = sharableVarConfiguration; this.sharableVarConfiguration = sharableVarConfiguration;
this.myIndex = parseMyIndex(); parseProperties();
this.sendTo = parseSendTo();
this.interval = sharableVarConfiguration.interval[this.myIndex];
for (int readerIndex : this.sharableVarConfiguration.readerIndexes) {
if (Objects.equals(readerIndex, myIndex)) {
this.readerFlag = true;
break;
}
}
for (int writerIndex : this.sharableVarConfiguration.writerIndexes) {
if (Objects.equals(writerIndex, myIndex)) {
this.writerFlag = true;
break;
}
}
} }
private int parseMyIndex() { private void parseProperties() {
for (int i = 0; i < sharableVarConfiguration.getNodeIds().length; i++) { for (int i = 0; i < sharableVarConfiguration.getNodeIds().length; i++) {
if (Objects.equals(sharableVarConfiguration.getNodeIds()[i], myId)) { if (Objects.equals(sharableVarConfiguration.getNodeIds()[i], myId)) {
return i; this.myIndex = i;
break;
} }
} }
return -1; if (this.myIndex == null) {
} return;
}
int writerParentIdx = sharableVarConfiguration.writerParents[this.myIndex];
if (writerParentIdx == -1) {
writerFlag = true;
writerRootFlag = true;
} else if (writerParentIdx >= 0) {
writerFlag = true;
writerParent = sharableVarConfiguration.nodeIds[writerParentIdx];
}
private List<String> parseSendTo() { int readerParentIdx = sharableVarConfiguration.readerParents[this.myIndex];
if (myIndex < 0 || myIndex >= sharableVarConfiguration.sendTo.length) { if (readerParentIdx == -1) {
return new ArrayList<>(); readerFlag = true;
readerRootFlag = true;
} else if (readerParentIdx >= 0) {
readerFlag = true;
} }
LinkedHashSet<String> result = new LinkedHashSet<>(); if (readerFlag) {
int[] sendToIndexes = sharableVarConfiguration.sendTo[myIndex]; readerChildren = new ArrayList<>();
for (int sendToIndex : sendToIndexes) { for (int i = 0; i < sharableVarConfiguration.readerParents.length; i++) {
if (sendToIndex >= 0 && sendToIndex < sharableVarConfiguration.nodeIds.length if (sharableVarConfiguration.readerParents[i] == myIndex) {
&& sendToIndex != myIndex) { readerChildren.add(sharableVarConfiguration.nodeIds[i]);
result.add(sharableVarConfiguration.nodeIds[sendToIndex]); }
}
readerInterval = sharableVarConfiguration.readerIntervals[myIndex];
}
if (writerFlag) {
writeInterval = sharableVarConfiguration.writerIntervals[myIndex];
}
if (writerRootFlag) {
readerRoots = new ArrayList<>();
for (int i = 0; i < sharableVarConfiguration.readerParents.length; i++) {
if (sharableVarConfiguration.readerParents[i] == -1) {
readerRoots.add(sharableVarConfiguration.nodeIds[i]);
}
} }
} }
return new ArrayList<>(result);
} }
public String getMyId() { public String getMyId() {
@ -79,11 +106,11 @@ public class SharableVarState {
this.sharableVarConfiguration = sharableVarConfiguration; this.sharableVarConfiguration = sharableVarConfiguration;
} }
public int getMyIndex() { public Integer getMyIndex() {
return myIndex; return myIndex;
} }
public void setMyIndex(int myIndex) { public void setMyIndex(Integer myIndex) {
this.myIndex = myIndex; this.myIndex = myIndex;
} }
@ -103,29 +130,82 @@ public class SharableVarState {
this.writerFlag = writerFlag; this.writerFlag = writerFlag;
} }
public List<String> getSendTo() { public boolean isReaderRootFlag() {
return sendTo; return readerRootFlag;
} }
public void setSendTo(List<String> sendTo) { public void setReaderRootFlag(boolean readerRootFlag) {
this.sendTo = sendTo; this.readerRootFlag = readerRootFlag;
} }
public long getInterval() { public boolean isWriterRootFlag() {
return interval; return writerRootFlag;
} }
public void setInterval(long interval) { public void setWriterRootFlag(boolean writerRootFlag) {
this.interval = interval; this.writerRootFlag = writerRootFlag;
}
public String getWriterParent() {
return writerParent;
}
public void setWriterParent(String writerParent) {
this.writerParent = writerParent;
}
public List<String> getReaderChildren() {
return readerChildren;
}
public void setReaderChildren(List<String> readerChildren) {
this.readerChildren = readerChildren;
}
public List<String> getReaderRoots() {
return readerRoots;
}
public void setReaderRoots(List<String> readerRoots) {
this.readerRoots = readerRoots;
}
public Long getWriteInterval() {
return writeInterval;
}
public void setWriteInterval(Long writeInterval) {
this.writeInterval = writeInterval;
}
public Long getReaderInterval() {
return readerInterval;
}
public void setReaderInterval(Long readerInterval) {
this.readerInterval = readerInterval;
} }
public static class SharableVarConfiguration { public static class SharableVarConfiguration {
// ["/bdrepo1/var/0", "/bdrepo1/var/1", "/bdrepo1/var/2", "/bdrepo1/var/3"]
// 0(w) -> 2(w) -> 1(r) -> 3(r)
// |
// 1(w) -
// ["/bdrepo1/var/0", "/bdrepo1/var/1", "/bdrepo1/var/2", "/bdrepo1/var/3"]
String[] nodeIds; String[] nodeIds;
long maxDelay;
int[] writerIndexes; int[] writerIndexes;
int[] readerIndexes; int[] readerIndexes;
int[][] sendTo;
long[] interval; // [2, 2, -1, -2] -2表示该节点为非writer节点-1表示该结点为writer根结点
long maxDelay; int[] writerParents;
// [-2, -1, -2, 1] -2表示该节点为非reader节点-1表示该结点为reader根结点
int[] readerParents;
// [5, 5, 6, -1] -1为无需同步
long[] writerIntervals;
// [-1, 4, -1, -1]
long[] readerIntervals;
public String[] getNodeIds() { public String[] getNodeIds() {
return nodeIds; return nodeIds;
@ -135,6 +215,14 @@ public class SharableVarState {
this.nodeIds = nodeIds; this.nodeIds = nodeIds;
} }
public long getMaxDelay() {
return maxDelay;
}
public void setMaxDelay(long maxDelay) {
this.maxDelay = maxDelay;
}
public int[] getWriterIndexes() { public int[] getWriterIndexes() {
return writerIndexes; return writerIndexes;
} }
@ -151,28 +239,61 @@ public class SharableVarState {
this.readerIndexes = readerIndexes; this.readerIndexes = readerIndexes;
} }
public int[][] getSendTo() { public int[] getWriterParents() {
return sendTo; return writerParents;
} }
public void setSendTo(int[][] sendTo) { public void setWriterParents(int[] writerParents) {
this.sendTo = sendTo; this.writerParents = writerParents;
} }
public long[] getInterval() { public int[] getReaderParents() {
return interval; return readerParents;
} }
public void setInterval(long[] interval) { public void setReaderParents(int[] readerParents) {
this.interval = interval; this.readerParents = readerParents;
} }
public long getMaxDelay() { public long[] getWriterIntervals() {
return maxDelay; return writerIntervals;
} }
public void setMaxDelay(long maxDelay) { public void setWriterIntervals(long[] writerIntervals) {
this.maxDelay = maxDelay; this.writerIntervals = writerIntervals;
}
public long[] getReaderIntervals() {
return readerIntervals;
}
public void setReaderIntervals(long[] readerIntervals) {
this.readerIntervals = readerIntervals;
} }
} }
public static void main(String[] args) {
// ["/bdrepo1/var/0", "/bdrepo1/var/1", "/bdrepo1/var/2", "/bdrepo1/var/3"]
// 0(w) -> 2(w) -> 1(r) -> 3(r)
// |
// 1(w) -
SharableVarConfiguration sharableVarConfiguration = new SharableVarConfiguration();
sharableVarConfiguration.setNodeIds(new String[]{"/bdrepo1/var/0", "/bdrepo1/var/1", "/bdrepo1/var/2", "/bdrepo1/var/3"});
sharableVarConfiguration.setWriterParents(new int[]{2, 2, -1, -2});
sharableVarConfiguration.setReaderParents(new int[]{-2, -1, -2, 1});
sharableVarConfiguration.setWriterIntervals(new long[]{5, 5, 6, -1});
sharableVarConfiguration.setReaderIntervals(new long[]{-1, 4, -1, -1});
SharableVarState sharableVarState0 = new SharableVarState("/bdrepo1/var/0", sharableVarConfiguration);
SharableVarState sharableVarState1 = new SharableVarState("/bdrepo1/var/1", sharableVarConfiguration);
SharableVarState sharableVarState2 = new SharableVarState("/bdrepo1/var/2", sharableVarConfiguration);
SharableVarState sharableVarState3 = new SharableVarState("/bdrepo1/var/3", sharableVarConfiguration);
System.out.println(sharableVarState0);
System.out.println(sharableVarState1);
System.out.println(sharableVarState2);
System.out.println(sharableVarState3);
}
} }

View File

@ -8,25 +8,31 @@ import java.util.Set;
public class AWORSetProxy extends SharableVar<AWORSet<Object>> { public class AWORSetProxy extends SharableVar<AWORSet<Object>> {
public AWORSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { public AWORSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) {
super(varId, cpId, conf); super(varId, cpId, conf);
self = new AWORSet<>(cpId, varId);
} }
public void add(Object val) { public void add(Object val) {
AWORSet<Object> delta = self.add(val); if (writerVar != null) {
localDeltaQueue.add(delta); AWORSet<Object> delta = writerVar.add(val);
writerVarDeltaQueue.add(delta);
}
} }
public void remove(Object val) { public void remove(Object val) {
AWORSet<Object> delta = self.remove(val); if (writerVar != null) {
localDeltaQueue.add(delta); AWORSet<Object> delta = writerVar.remove(val);
writerVarDeltaQueue.add(delta);
}
} }
public Set<Object> read() { public Set<Object> read() {
return self.read(); if (readerVar != null) {
return readerVar.read();
}
return writerVar.read();
} }
@Override @Override
protected AWORSet<Object> newEmptyDelta() { protected AWORSet<Object> createDeltaCrdt(String nodeId, String varId) {
return new AWORSet<>(null, varId); return new AWORSet<>(nodeId, varId);
} }
} }

View File

@ -6,25 +6,31 @@ import org.bdware.sc.crdt.SharableVarState;
public class DWFlagProxy extends SharableVar<DWFlag> { public class DWFlagProxy extends SharableVar<DWFlag> {
public DWFlagProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { public DWFlagProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) {
super(varId, cpId, conf); super(varId, cpId, conf);
self = new DWFlag(cpId, varId);
} }
public void enable() { public void enable() {
DWFlag delta = self.enable(); if (writerVar != null) {
localDeltaQueue.add(delta); DWFlag delta = writerVar.enable();
writerVarDeltaQueue.add(delta);
}
} }
public void disable() { public void disable() {
DWFlag delta = self.disable(); if (writerVar != null) {
localDeltaQueue.add(delta); DWFlag delta = writerVar.disable();
writerVarDeltaQueue.add(delta);
}
} }
public boolean read() { public boolean read() {
return self.read(); if (readerVar != null) {
return readerVar.read();
}
return writerVar.read();
} }
@Override @Override
protected DWFlag newEmptyDelta() { protected DWFlag createDeltaCrdt(String nodeId, String varId) {
return new DWFlag(null, varId); return new DWFlag(nodeId, varId);
} }
} }

View File

@ -6,25 +6,31 @@ import org.bdware.sc.crdt.SharableVarState;
public class EWFlagProxy extends SharableVar<EWFlag> { public class EWFlagProxy extends SharableVar<EWFlag> {
public EWFlagProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { public EWFlagProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) {
super(varId, cpId, conf); super(varId, cpId, conf);
self = new EWFlag(cpId, varId);
} }
public void enable() { public void enable() {
EWFlag delta = self.enable(); if (writerVar != null) {
localDeltaQueue.add(delta); EWFlag delta = writerVar.enable();
writerVarDeltaQueue.add(delta);
}
} }
public void disable() { public void disable() {
EWFlag delta = self.disable(); if (writerVar != null) {
localDeltaQueue.add(delta); EWFlag delta = writerVar.disable();
writerVarDeltaQueue.add(delta);
}
} }
public boolean read() { public boolean read() {
return self.read(); if (readerVar != null) {
return readerVar.read();
}
return writerVar.read();
} }
@Override @Override
protected EWFlag newEmptyDelta() { protected EWFlag createDeltaCrdt(String nodeId, String varId) {
return new EWFlag(null, varId); return new EWFlag(nodeId, varId);
} }
} }

View File

@ -8,25 +8,31 @@ public class GCounterProxy extends SharableVar<GCounter> {
public GCounterProxy(String varId, String cpId, public GCounterProxy(String varId, String cpId,
SharableVarState.SharableVarConfiguration conf) { SharableVarState.SharableVarConfiguration conf) {
super(varId, cpId, conf); super(varId, cpId, conf);
self = new GCounter(cpId, varId);
} }
public void inc() { public void inc() {
GCounter delta = self.inc(); if (writerVar != null) {
localDeltaQueue.add(delta); GCounter delta = writerVar.inc();
writerVarDeltaQueue.add(delta);
}
} }
public void inc(long var) { public void inc(long var) {
GCounter delta = self.inc(var); if (writerVar != null) {
localDeltaQueue.add(delta); GCounter delta = writerVar.inc(var);
writerVarDeltaQueue.add(delta);
}
} }
public Long read() { public Long read() {
return self.read(); if (readerVar != null) {
return readerVar.read();
}
return writerVar.read();
} }
@Override @Override
protected GCounter newEmptyDelta() { protected GCounter createDeltaCrdt(String nodeId, String varId) {
return new GCounter(null, varId); return new GCounter(nodeId, varId);
} }
} }

View File

@ -8,20 +8,24 @@ import java.util.Set;
public class GSetProxy extends SharableVar<GSet<Object>> { public class GSetProxy extends SharableVar<GSet<Object>> {
public GSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { public GSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) {
super(varId, cpId, conf); super(varId, cpId, conf);
self = new GSet<>(cpId, varId);
} }
public void add(Object val) { public void add(Object val) {
GSet<Object> delta = self.add(val); if (writerVar != null) {
localDeltaQueue.add(delta); GSet<Object> delta = writerVar.add(val);
writerVarDeltaQueue.add(delta);
}
} }
public Set<Object> read() { public Set<Object> read() {
return self.read(); if (readerVar != null) {
return readerVar.read();
}
return writerVar.read();
} }
@Override @Override
protected GSet<Object> newEmptyDelta() { protected GSet<Object> createDeltaCrdt(String nodeId, String varId) {
return new GSet<>(null, varId); return new GSet<>(nodeId, varId);
} }
} }

View File

@ -7,20 +7,24 @@ public class LWWRegisterProxy extends SharableVar<LWWRegister<Long, Object>> {
public LWWRegisterProxy(String varId, String cpId, public LWWRegisterProxy(String varId, String cpId,
SharableVarState.SharableVarConfiguration conf) { SharableVarState.SharableVarConfiguration conf) {
super(varId, cpId, conf); super(varId, cpId, conf);
self = new LWWRegister<>(cpId, varId);
} }
public void write(Object val) { public void write(Object val) {
LWWRegister<Long, Object> delta = self.write(System.currentTimeMillis(), val); if (writerVar != null) {
localDeltaQueue.add(delta); LWWRegister<Long, Object> delta = writerVar.write(System.currentTimeMillis(), val);
writerVarDeltaQueue.add(delta);
}
} }
public Object read() { public Object read() {
return self.read(); if (readerVar != null) {
return readerVar.read();
}
return writerVar.read();
} }
@Override @Override
protected LWWRegister<Long, Object> newEmptyDelta() { protected LWWRegister<Long, Object> createDeltaCrdt(String nodeId, String varId) {
return new LWWRegister<>(null, varId); return new LWWRegister<>(nodeId, varId);
} }
} }

View File

@ -8,25 +8,29 @@ public class MVRegisterProxy extends SharableVar<MVRegister<Object>> {
public MVRegisterProxy(String varId, String cpId, public MVRegisterProxy(String varId, String cpId,
SharableVarState.SharableVarConfiguration conf) { SharableVarState.SharableVarConfiguration conf) {
super(varId, cpId, conf); super(varId, cpId, conf);
self = new MVRegister<>(cpId, varId);
} }
public void write(Object val) { public void write(Object val) {
MVRegister<Object> delta = self.write(val); if (writerVar != null) {
localDeltaQueue.add(delta); MVRegister<Object> delta = writerVar.write(val);
writerVarDeltaQueue.add(delta);
}
} }
public void resolve(Object val) { public void resolve(Object val) {
MVRegister<Object> delta = self.resolve(); MVRegister<Object> delta = writerVar.resolve();
localDeltaQueue.add(delta); writerVarDeltaQueue.add(delta);
} }
public Object read() { public Object read() {
return self.read(); if (readerVar != null) {
return readerVar.read();
}
return writerVar.read();
} }
@Override @Override
protected MVRegister<Object> newEmptyDelta() { protected MVRegister<Object> createDeltaCrdt(String nodeId, String varId) {
return new MVRegister<>(null, varId); return new MVRegister<>(nodeId, varId);
} }
} }

View File

@ -7,35 +7,45 @@ public class PNCounterProxy extends SharableVar<PNCounter> {
public PNCounterProxy(String varId, String cpId, public PNCounterProxy(String varId, String cpId,
SharableVarState.SharableVarConfiguration conf) { SharableVarState.SharableVarConfiguration conf) {
super(varId, cpId, conf); super(varId, cpId, conf);
self = new PNCounter(cpId, varId);
} }
public void inc() { public void inc() {
PNCounter delta = self.inc(); if (writerVar != null) {
localDeltaQueue.add(delta); PNCounter delta = writerVar.inc();
writerVarDeltaQueue.add(delta);
}
} }
public void inc(long val) { public void inc(long val) {
PNCounter delta = self.inc(val); if (writerVar != null) {
localDeltaQueue.add(delta); PNCounter delta = writerVar.inc(val);
writerVarDeltaQueue.add(delta);
}
} }
public void dec() { public void dec() {
PNCounter delta = self.dec(); if (writerVar != null) {
localDeltaQueue.add(delta); PNCounter delta = writerVar.dec();
writerVarDeltaQueue.add(delta);
}
} }
public void dec(long val) { public void dec(long val) {
PNCounter delta = self.dec(val); if (writerVar != null) {
localDeltaQueue.add(delta); PNCounter delta = writerVar.dec(val);
writerVarDeltaQueue.add(delta);
}
} }
public Long read() { public Long read() {
return self.read(); if (readerVar != null) {
return readerVar.read();
}
return writerVar.read();
} }
@Override @Override
protected PNCounter newEmptyDelta() { protected PNCounter createDeltaCrdt(String nodeId, String varId) {
return new PNCounter(null, varId); return new PNCounter(nodeId, varId);
} }
} }

View File

@ -9,25 +9,31 @@ public class RWLWWSetProxy extends SharableVar<RWLWWSet<Long, Object>> {
public RWLWWSetProxy(String varId, String cpId, public RWLWWSetProxy(String varId, String cpId,
SharableVarState.SharableVarConfiguration conf) { SharableVarState.SharableVarConfiguration conf) {
super(varId, cpId, conf); super(varId, cpId, conf);
self = new RWLWWSet<>(cpId, varId);
} }
public void add(Object val) { public void add(Object val) {
RWLWWSet<Long, Object> delta = self.add(System.currentTimeMillis(), val); if (writerVar != null) {
localDeltaQueue.add(delta); RWLWWSet<Long, Object> delta = writerVar.add(System.currentTimeMillis(), val);
writerVarDeltaQueue.add(delta);
}
} }
public void remove(Object val) { public void remove(Object val) {
RWLWWSet<Long, Object> delta = self.remove(System.currentTimeMillis(), val); if (writerVar != null) {
localDeltaQueue.add(delta); RWLWWSet<Long, Object> delta = writerVar.remove(System.currentTimeMillis(), val);
writerVarDeltaQueue.add(delta);
}
} }
public Set<Object> read() { public Set<Object> read() {
return self.read(); if (readerVar != null) {
return readerVar.read();
}
return writerVar.read();
} }
@Override @Override
protected RWLWWSet<Long, Object> newEmptyDelta() { protected RWLWWSet<Long, Object> createDeltaCrdt(String nodeId, String varId) {
return new RWLWWSet<>(); return new RWLWWSet<>(nodeId, varId);
} }
} }

View File

@ -8,25 +8,31 @@ import java.util.Set;
public class RWORSetProxy extends SharableVar<RWORSet<Object>> { public class RWORSetProxy extends SharableVar<RWORSet<Object>> {
public RWORSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { public RWORSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) {
super(varId, cpId, conf); super(varId, cpId, conf);
self = new RWORSet<>(cpId, varId);
} }
public void add(Object val) { public void add(Object val) {
RWORSet<Object> delta = self.add(val); if (writerVar != null) {
localDeltaQueue.add(delta); RWORSet<Object> delta = writerVar.add(val);
writerVarDeltaQueue.add(delta);
}
} }
public void remove(Object val) { public void remove(Object val) {
RWORSet<Object> delta = self.remove(val); if (writerVar != null) {
localDeltaQueue.add(delta); RWORSet<Object> delta = writerVar.remove(val);
writerVarDeltaQueue.add(delta);
}
} }
public Set<Object> read() { public Set<Object> read() {
return self.read(); if (readerVar != null) {
return readerVar.read();
}
return writerVar.read();
} }
@Override @Override
protected RWORSet<Object> newEmptyDelta() { protected RWORSet<Object> createDeltaCrdt(String nodeId, String varId) {
return new RWORSet<>(null, varId); return new RWORSet<>(nodeId, varId);
} }
} }

View File

@ -8,63 +8,131 @@ import org.bdware.sc.crdt.SharableVarManager;
import org.bdware.sc.crdt.SharableVarState; import org.bdware.sc.crdt.SharableVarState;
import org.bdware.sc.util.JsonUtil; import org.bdware.sc.util.JsonUtil;
import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public abstract class SharableVar<T extends JoinableCRDT> { public abstract class SharableVar<T extends JoinableCRDT> {
public final HashedWheelTimer HASHED_WHEEL_TIMER = new HashedWheelTimer(r -> { // 用于writer节点的变量副本
Thread t = Executors.defaultThreadFactory().newThread(r); public T writerVar;
t.setDaemon(true); // 用于reader节点的变量副本
return t; public T readerVar;
}, 5, TimeUnit.MILLISECONDS, 2);
public final Queue<JoinableCRDT> localDeltaQueue = new LinkedList<>(); public Queue<JoinableCRDT> writerVarDeltaQueue;
public final Queue<JoinableCRDT> remoteDeltaQueue = new LinkedList<>(); public Queue<JoinableCRDT> readerVarDeltaQueue;
public T self;
protected String varId; public String varId;
private SyncTimerTask syncTimerTask; public SharableVarState sharableVarState;
private SharableVarState sharableVarState;
public HashedWheelTimer readerTimer;
public ReaderSyncTimerTask readerSyncTimerTask;
public HashedWheelTimer writerTimer;
public WriterSyncTimerTask writerSyncTimerTask;
public SharableVar(String varId, String cpId, public SharableVar(String varId, String cpId,
SharableVarState.SharableVarConfiguration resolveResult) { SharableVarState.SharableVarConfiguration resolveResult) {
this.varId = varId; this.varId = varId;
this.sharableVarState = new SharableVarState(cpId, resolveResult); this.sharableVarState = new SharableVarState(cpId, resolveResult);
this.HASHED_WHEEL_TIMER.newTimeout(timeout -> {
syncTimerTask = new SyncTimerTask(); if (this.sharableVarState.isReaderFlag()) {
HASHED_WHEEL_TIMER.newTimeout(syncTimerTask, sharableVarState.getInterval(), this.readerVar = createDeltaCrdt(cpId, varId);
TimeUnit.MILLISECONDS); this.readerVarDeltaQueue = new LinkedList<>();
}, 0, TimeUnit.MILLISECONDS); if (this.sharableVarState.getReaderChildren() != null && this.sharableVarState.getReaderChildren().size() > 0
&& this.sharableVarState.getReaderInterval() != null && this.sharableVarState.getReaderInterval() > 0) {
this.readerTimer = new HashedWheelTimer(r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
return t;
}, 5, TimeUnit.MILLISECONDS, 2);
this.readerTimer.newTimeout(timeout -> {
readerSyncTimerTask = new ReaderSyncTimerTask();
readerTimer.newTimeout(readerSyncTimerTask, sharableVarState.getReaderInterval(),
TimeUnit.MILLISECONDS);
}, 0, TimeUnit.MILLISECONDS);
}
}
if (this.sharableVarState.isWriterFlag()) {
this.writerVar = createDeltaCrdt(cpId, varId);
this.writerVarDeltaQueue = new LinkedList<>();
if (this.sharableVarState.getWriteInterval() != null && this.sharableVarState.getWriteInterval() > 0) {
this.writerTimer = new HashedWheelTimer(r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
return t;
}, 5, TimeUnit.MILLISECONDS, 2);
this.writerTimer.newTimeout(timeout -> {
writerSyncTimerTask = new WriterSyncTimerTask();
writerTimer.newTimeout(writerSyncTimerTask, sharableVarState.getWriteInterval(),
TimeUnit.MILLISECONDS);
}, 0, TimeUnit.MILLISECONDS);
}
}
} }
private void syncVar() { private void syncReaderVar() {
if (localDeltaQueue.isEmpty()) { if (readerVarDeltaQueue.isEmpty()) {
return; return;
} }
JoinableCRDT joinedDelta = newEmptyDelta(); JoinableCRDT joinedDelta = createDeltaCrdt(null, varId);
synchronized (localDeltaQueue) { synchronized (readerVarDeltaQueue) {
while (!localDeltaQueue.isEmpty()) { while (!readerVarDeltaQueue.isEmpty()) {
JoinableCRDT delta = localDeltaQueue.poll(); JoinableCRDT delta = readerVarDeltaQueue.poll();
joinedDelta.join(delta); joinedDelta.join(delta);
} }
} }
String content = JsonUtil.toJson(localDeltaQueue); String content = JsonUtil.toJson(joinedDelta);
SharableVarManager.instance.broadcastSyncMessage(varId, sharableVarState.getSendTo(), SharableVarManager.instance.broadcastSyncMessage(varId, sharableVarState.getReaderChildren(), content, "r2r");
content);
} }
abstract protected T newEmptyDelta(); private void syncWriterVar() {
if (writerVarDeltaQueue.isEmpty()) {
return;
}
JoinableCRDT joinedDelta = createDeltaCrdt(null, varId);
synchronized (writerVarDeltaQueue) {
while (!writerVarDeltaQueue.isEmpty()) {
JoinableCRDT delta = writerVarDeltaQueue.poll();
joinedDelta.join(delta);
}
}
String content = JsonUtil.toJson(joinedDelta);
if (sharableVarState.getWriterParent() != null) {
// 父节点是Writer
SharableVarManager.instance.broadcastSyncMessage(varId, Collections.singletonList(sharableVarState.getWriterParent()), content, "w2w");
} else if (sharableVarState.getReaderRoots() != null) {
// 自己是writer根结点 向reader根结点们广播
SharableVarManager.instance.broadcastSyncMessage(varId, sharableVarState.getReaderRoots(), content, "w2r");
}
class SyncTimerTask implements TimerTask { }
abstract protected T createDeltaCrdt(String nodeId, String varId);
class ReaderSyncTimerTask implements TimerTask {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
try { try {
syncVar(); syncReaderVar();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
HASHED_WHEEL_TIMER.newTimeout(syncTimerTask, sharableVarState.getInterval(), readerTimer.newTimeout(readerSyncTimerTask, sharableVarState.getReaderInterval(),
TimeUnit.MILLISECONDS);
}
}
}
class WriterSyncTimerTask implements TimerTask {
@Override
public void run(Timeout timeout) throws Exception {
try {
syncWriterVar();
} catch (Exception e) {
e.printStackTrace();
} finally {
readerTimer.newTimeout(writerSyncTimerTask, sharableVarState.getWriteInterval(),
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
} }
} }

View File

@ -8,25 +8,31 @@ import java.util.Set;
public class TPSetProxy extends SharableVar<TPSet<Object>> { public class TPSetProxy extends SharableVar<TPSet<Object>> {
public TPSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { public TPSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) {
super(varId, cpId, conf); super(varId, cpId, conf);
self = new TPSet<>(cpId, varId);
} }
public void add(Object val) { public void add(Object val) {
TPSet<Object> delta = self.add(val); if (writerVar != null) {
localDeltaQueue.add(delta); TPSet<Object> delta = writerVar.add(val);
writerVarDeltaQueue.add(delta);
}
} }
public void remove(Object val) { public void remove(Object val) {
TPSet<Object> delta = self.remove(val); if (writerVar != null) {
localDeltaQueue.add(delta); TPSet<Object> delta = writerVar.remove(val);
writerVarDeltaQueue.add(delta);
}
} }
public Set<Object> read() { public Set<Object> read() {
return self.read(); if (readerVar != null) {
return readerVar.read();
}
return writerVar.read();
} }
@Override @Override
protected TPSet<Object> newEmptyDelta() { protected TPSet<Object> createDeltaCrdt(String nodeId, String varId) {
return new TPSet<>(null, varId); return new TPSet<>(nodeId, varId);
} }
} }