diff --git a/build.gradle b/build.gradle index d6b64e3..e87c40e 100644 --- a/build.gradle +++ b/build.gradle @@ -40,7 +40,7 @@ sourceSets { dependencies { api project(":common") api project(":mockjava") - implementation 'org.bdware:delta-crdts:1.1.0' + implementation 'org.bdware:delta-crdts:1.2.0' implementation 'org.apache.commons:commons-lang3:3.0' implementation 'com.atlassian.commonmark:commonmark:0.17.0' implementation 'com.idealista:format-preserving-encryption:1.0.0' @@ -71,7 +71,7 @@ jar { // while develop at local use "false" configurations.runtimeClasspath.filter { it.getAbsolutePath().contains("/lib/") - // false + false }.collect { it.isDirectory() ? it : zipTree(it) } diff --git a/src/main/java/org/bdware/sc/crdt/SharableVar.java b/src/main/java/org/bdware/sc/crdt/SharableVar.java deleted file mode 100644 index e724d9f..0000000 --- a/src/main/java/org/bdware/sc/crdt/SharableVar.java +++ /dev/null @@ -1,107 +0,0 @@ -package org.bdware.sc.crdt; - -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; -import org.bdware.crdt.counter.GCounter; -import org.bdware.sc.util.JsonUtil; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -public class SharableVar { - public final static HashedWheelTimer HASHED_WHEEL_TIMER = - new HashedWheelTimer(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = Executors.defaultThreadFactory().newThread(r); - t.setDaemon(true); - return t; - } - }, 5, TimeUnit.MILLISECONDS, 2); - private final String varId; - - private long interval; - private final String myId; - GCounter counter; - int offset; - List sendTo; - private SyncTimeout nextTimeOut; - - - public SharableVar(String cpId, String identifier, - SharableVarManager.VarResolveResult resolveResult) { - counter = new GCounter(cpId, identifier); - myId = cpId; - varId = identifier; - HASHED_WHEEL_TIMER.newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - initByVarResolve(resolveResult); - - } - }, 0, TimeUnit.MILLISECONDS); - - } - - public void join(String content) { - GCounter toJoin = JsonUtil.fromJson(content, GCounter.class); - counter.join(toJoin); - } - - class SyncTimeout implements TimerTask { - @Override - public void run(Timeout timeout) throws Exception { - try { - syncVar(); - } catch (Exception e) { - e.printStackTrace(); - } finally { - HASHED_WHEEL_TIMER.newTimeout(nextTimeOut, interval, TimeUnit.MILLISECONDS); - } - } - } - - private void syncVar() { - String content = JsonUtil.toJson(counter); - SharableVarManager.instance.broadcastSyncMessage(varId, sendTo, content); - } - - private void initByVarResolve(SharableVarManager.VarResolveResult resolveResult) { - sendTo = new ArrayList<>(); - // 假设没有同一个人既是reader又是writer。 - offset = -1; - for (int i = 0; i < resolveResult.writer.length; i++) { - if (myId.equals(resolveResult.writer[i])) { - offset = i; - } - } - if (offset == -1) - for (int i = 0; i < resolveResult.reader.length; i++) { - if (myId.equals(resolveResult.reader[i])) { - offset = resolveResult.writer.length + i; - } - } - for (int i = 0; i < resolveResult.sendTo[offset].length; i++) { - int pos = resolveResult.sendTo[offset][i]; - sendTo.add(findByOffset(pos, resolveResult)); - } - interval = resolveResult.interval[offset]; - nextTimeOut = new SyncTimeout(); - HASHED_WHEEL_TIMER.newTimeout(nextTimeOut, interval, TimeUnit.MILLISECONDS); - } - - private String findByOffset(int pos, SharableVarManager.VarResolveResult resolveResult) { - if (pos < resolveResult.writer.length) - return resolveResult.writer[pos]; - return resolveResult.reader[pos - resolveResult.writer.length]; - } - - public GCounter get() { - return counter; - } - -} diff --git a/src/main/java/org/bdware/sc/crdt/SharableVarManager.java b/src/main/java/org/bdware/sc/crdt/SharableVarManager.java index da6392e..f4c17a6 100644 --- a/src/main/java/org/bdware/sc/crdt/SharableVarManager.java +++ b/src/main/java/org/bdware/sc/crdt/SharableVarManager.java @@ -2,6 +2,8 @@ package org.bdware.sc.crdt; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.bdware.crdt.basic.Constants; +import org.bdware.crdt.basic.JoinableCRDT; import org.bdware.doip.audit.AuditDoaClient; import org.bdware.doip.audit.EndpointConfig; import org.bdware.doip.audit.client.AuditDoipClient; @@ -13,6 +15,7 @@ import org.bdware.doip.codec.doipMessage.DoipResponseCode; import org.bdware.doip.endpoint.client.DoipMessageCallback; import org.bdware.irp.client.IrpClient; import org.bdware.irp.stateinfo.StateInfoBase; +import org.bdware.sc.crdt.proxy.*; import org.bdware.sc.util.JsonUtil; import java.io.ByteArrayOutputStream; @@ -24,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap; public class SharableVarManager { static Logger LOGGER = LogManager.getLogger(SharableVarManager.class); public static SharableVarManager instance; - private final String cpId; + public final String cpId; Map allVars; IrpClient client; AuditDoaClient doaClient; @@ -32,23 +35,29 @@ public class SharableVarManager { public static final String SHARABLEOP = "86.100871/SyncVar"; - public SharableVarManager(String id, EndpointConfig config) { - allVars = new ConcurrentHashMap<>(); - client = new AuditIrpClient(config); - doaClient = new AuditDoaClient("", config, null); - cpId = id; + public SharableVarManager(String cpId, EndpointConfig config) { + this.allVars = new ConcurrentHashMap<>(); + this.client = new AuditIrpClient(config); + this.doaClient = new AuditDoaClient("", config, null); + this.cpId = cpId; } public static void initSharableVarManager(String id, EndpointConfig config) { - instance = new SharableVarManager(id, config); + if (instance == null) { + instance = new SharableVarManager(id, config); + } } public DoipMessage handleSyncMessage(DoipMessage message) { try { String varId = message.header.parameters.attributes.get("varId").getAsString(); String content = message.header.parameters.attributes.get("content").getAsString(); - SharableVar var = getVar(varId); - var.join(content); + SharableVar var = allVars.get(varId); + if (var != null) { + JoinableCRDT delta = JsonUtil.fromJson(content, JoinableCRDT.class); + var.self.join(delta); + var.remoteDeltaQueue.add(delta); + } DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); builder.createResponse(DoipResponseCode.Success, message); @@ -66,10 +75,6 @@ public class SharableVarManager { } } - private SharableVar getVar(String varId) { - return allVars.get(varId); - } - private DoipMessage createSyncMessage(String target, String varId, String content) { DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); builder.createRequest(target, SHARABLEOP); @@ -96,26 +101,20 @@ public class SharableVarManager { return doaClient.convertDoidToRepo(id); } - - static class VarResolveResult { - String[] writer; - String[] reader; - int[][] sendTo; - long[] interval; - long maxDelay; - } - public synchronized SharableVar createVar(String identifier, String type) { try { - if (allVars.containsKey(identifier)) + if (allVars.containsKey(identifier)) { return allVars.get(identifier); + } StateInfoBase stateInfoBase = client.resolve(identifier); - if (stateInfoBase.handleValues.has("bdwType") && stateInfoBase.handleValues - .get("bdwType").getAsString().equals("SharableVar")) { - VarResolveResult resolveResult = - JsonUtil.fromJson(stateInfoBase.handleValues, VarResolveResult.class); - SharableVar sharableVar = new SharableVar(cpId, identifier, resolveResult); - allVars.put(identifier, sharableVar); + if (stateInfoBase.handleValues.has("bdwType") && + stateInfoBase.handleValues.get("bdwType").getAsString().equals("SharableVar")) { + SharableVarState.SharableVarConfiguration sharableVarConf = + JsonUtil.fromJson(stateInfoBase.handleValues, SharableVarState.SharableVarConfiguration.class); + SharableVar sharableVar = createSharableVar(sharableVarConf, identifier, type); + if (sharableVar != null) { + allVars.put(identifier, sharableVar); + } return sharableVar; } else return null; @@ -125,4 +124,30 @@ public class SharableVarManager { } } + private SharableVar createSharableVar(SharableVarState.SharableVarConfiguration conf, String identifier, String type) { + switch (type) { + case Constants.TypeName.G_COUNTER: + return new GCounterProxy(identifier, cpId, conf); + case Constants.TypeName.PN_COUNTER: + return new PNCounterProxy(identifier, cpId, conf); + case Constants.TypeName.DW_FLAG: + return new DWFlagProxy(identifier, cpId, conf); + case Constants.TypeName.LWW_REGISTER: + return new LWWRegisterProxy(identifier, cpId, conf); + case Constants.TypeName.MV_REGISTER: + return new MVRegisterProxy(identifier, cpId, conf); + case Constants.TypeName.G_SET: + return new GSetProxy(identifier, cpId, conf); + case Constants.TypeName.TP_SET: + return new TPSetProxy(identifier, cpId, conf); + case Constants.TypeName.AW_OR_SET: + return new AWORSetProxy(identifier, cpId, conf); + case Constants.TypeName.RW_OR_SET: + return new RWORSetProxy(identifier, cpId, conf); + case Constants.TypeName.RW_LWW_SET: + return new RWLWWSetProxy(identifier, cpId, conf); + } + return null; + } + } diff --git a/src/main/java/org/bdware/sc/crdt/SharableVarState.java b/src/main/java/org/bdware/sc/crdt/SharableVarState.java new file mode 100644 index 0000000..5c290e6 --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/SharableVarState.java @@ -0,0 +1,177 @@ +package org.bdware.sc.crdt; + +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Objects; + +public class SharableVarState { + private String myId; + + private SharableVarConfiguration sharableVarConfiguration; + + private int myIndex; + + private boolean readerFlag; + + private boolean writerFlag; + private List sendTo; + + private long interval; + + public SharableVarState(String myId, SharableVarConfiguration sharableVarConfiguration) { + this.myId = myId; + this.sharableVarConfiguration = sharableVarConfiguration; + this.myIndex = parseMyIndex(); + this.sendTo = parseSendTo(); + this.interval = sharableVarConfiguration.interval[this.myIndex]; + for (int readerIndex : this.sharableVarConfiguration.readerIndexes) { + if (Objects.equals(readerIndex, myIndex)) { + this.readerFlag = true; + break; + } + } + for (int writerIndex : this.sharableVarConfiguration.writerIndexes) { + if (Objects.equals(writerIndex, myIndex)) { + this.writerFlag = true; + break; + } + } + } + + private int parseMyIndex() { + for (int i = 0; i < sharableVarConfiguration.getNodeIds().length; i++) { + if (Objects.equals(sharableVarConfiguration.getNodeIds()[i], myId)) { + return i; + } + } + return -1; + } + + private List parseSendTo() { + if (myIndex < 0 || myIndex >= sharableVarConfiguration.sendTo.length) { + return new ArrayList<>(); + } + LinkedHashSet result = new LinkedHashSet<>(); + int[] sendToIndexes = sharableVarConfiguration.sendTo[myIndex]; + for (int sendToIndex : sendToIndexes) { + if (sendToIndex >= 0 && sendToIndex < sharableVarConfiguration.nodeIds.length && sendToIndex != myIndex) { + result.add(sharableVarConfiguration.nodeIds[sendToIndex]); + } + } + return new ArrayList<>(result); + } + + public String getMyId() { + return myId; + } + + public void setMyId(String myId) { + this.myId = myId; + } + + public SharableVarConfiguration getSharableVarConfiguration() { + return sharableVarConfiguration; + } + + public void setSharableVarConfiguration(SharableVarConfiguration sharableVarConfiguration) { + this.sharableVarConfiguration = sharableVarConfiguration; + } + + public int getMyIndex() { + return myIndex; + } + + public void setMyIndex(int myIndex) { + this.myIndex = myIndex; + } + + public boolean isReaderFlag() { + return readerFlag; + } + + public void setReaderFlag(boolean readerFlag) { + this.readerFlag = readerFlag; + } + + public boolean isWriterFlag() { + return writerFlag; + } + + public void setWriterFlag(boolean writerFlag) { + this.writerFlag = writerFlag; + } + + public List getSendTo() { + return sendTo; + } + + public void setSendTo(List sendTo) { + this.sendTo = sendTo; + } + + public long getInterval() { + return interval; + } + + public void setInterval(long interval) { + this.interval = interval; + } + + public static class SharableVarConfiguration { + String[] nodeIds; + int[] writerIndexes; + int[] readerIndexes; + int[][] sendTo; + long[] interval; + long maxDelay; + + public String[] getNodeIds() { + return nodeIds; + } + + public void setNodeIds(String[] nodeIds) { + this.nodeIds = nodeIds; + } + + public int[] getWriterIndexes() { + return writerIndexes; + } + + public void setWriterIndexes(int[] writerIndexes) { + this.writerIndexes = writerIndexes; + } + + public int[] getReaderIndexes() { + return readerIndexes; + } + + public void setReaderIndexes(int[] readerIndexes) { + this.readerIndexes = readerIndexes; + } + + public int[][] getSendTo() { + return sendTo; + } + + public void setSendTo(int[][] sendTo) { + this.sendTo = sendTo; + } + + public long[] getInterval() { + return interval; + } + + public void setInterval(long[] interval) { + this.interval = interval; + } + + public long getMaxDelay() { + return maxDelay; + } + + public void setMaxDelay(long maxDelay) { + this.maxDelay = maxDelay; + } + } +} diff --git a/src/main/java/org/bdware/sc/crdt/proxy/AWORSetProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/AWORSetProxy.java new file mode 100644 index 0000000..6aa0678 --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/proxy/AWORSetProxy.java @@ -0,0 +1,32 @@ +package org.bdware.sc.crdt.proxy; + +import org.bdware.crdt.set.AWORSet; +import org.bdware.sc.crdt.SharableVarState; + +import java.util.Set; + +public class AWORSetProxy extends SharableVar> { + public AWORSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { + super(varId, cpId, conf); + self = new AWORSet<>(cpId, varId); + } + + public void add(Object val) { + AWORSet delta = self.add(val); + localDeltaQueue.add(delta); + } + + public void remove(Object val) { + AWORSet delta = self.remove(val); + localDeltaQueue.add(delta); + } + + public Set read() { + return self.read(); + } + + @Override + protected AWORSet newEmptyDelta() { + return new AWORSet<>(null, varId); + } +} diff --git a/src/main/java/org/bdware/sc/crdt/proxy/DWFlagProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/DWFlagProxy.java new file mode 100644 index 0000000..cf38cdc --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/proxy/DWFlagProxy.java @@ -0,0 +1,30 @@ +package org.bdware.sc.crdt.proxy; + +import org.bdware.crdt.flag.DWFlag; +import org.bdware.sc.crdt.SharableVarState; + +public class DWFlagProxy extends SharableVar { + public DWFlagProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { + super(varId, cpId, conf); + self = new DWFlag(cpId, varId); + } + + public void enable() { + DWFlag delta = self.enable(); + localDeltaQueue.add(delta); + } + + public void disable() { + DWFlag delta = self.disable(); + localDeltaQueue.add(delta); + } + + public boolean read() { + return self.read(); + } + + @Override + protected DWFlag newEmptyDelta() { + return new DWFlag(null, varId); + } +} diff --git a/src/main/java/org/bdware/sc/crdt/proxy/EWFlagProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/EWFlagProxy.java new file mode 100644 index 0000000..b312059 --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/proxy/EWFlagProxy.java @@ -0,0 +1,30 @@ +package org.bdware.sc.crdt.proxy; + +import org.bdware.crdt.flag.EWFlag; +import org.bdware.sc.crdt.SharableVarState; + +public class EWFlagProxy extends SharableVar { + public EWFlagProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { + super(varId, cpId, conf); + self = new EWFlag(cpId, varId); + } + + public void enable() { + EWFlag delta = self.enable(); + localDeltaQueue.add(delta); + } + + public void disable() { + EWFlag delta = self.disable(); + localDeltaQueue.add(delta); + } + + public boolean read() { + return self.read(); + } + + @Override + protected EWFlag newEmptyDelta() { + return new EWFlag(null, varId); + } +} diff --git a/src/main/java/org/bdware/sc/crdt/proxy/GCounterProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/GCounterProxy.java new file mode 100644 index 0000000..0870468 --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/proxy/GCounterProxy.java @@ -0,0 +1,31 @@ +package org.bdware.sc.crdt.proxy; + +import org.bdware.crdt.counter.GCounter; +import org.bdware.sc.crdt.SharableVarState; + +public class GCounterProxy extends SharableVar { + + public GCounterProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { + super(varId, cpId, conf); + self = new GCounter(cpId, varId); + } + + public void inc() { + GCounter delta = self.inc(); + localDeltaQueue.add(delta); + } + + public void inc(long var) { + GCounter delta = self.inc(var); + localDeltaQueue.add(delta); + } + + public Long read() { + return self.read(); + } + + @Override + protected GCounter newEmptyDelta() { + return new GCounter(null, varId); + } +} diff --git a/src/main/java/org/bdware/sc/crdt/proxy/GSetProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/GSetProxy.java new file mode 100644 index 0000000..7bb4fc6 --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/proxy/GSetProxy.java @@ -0,0 +1,27 @@ +package org.bdware.sc.crdt.proxy; + +import org.bdware.crdt.set.GSet; +import org.bdware.sc.crdt.SharableVarState; + +import java.util.Set; + +public class GSetProxy extends SharableVar> { + public GSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { + super(varId, cpId, conf); + self = new GSet<>(cpId, varId); + } + + public void add(Object val) { + GSet delta = self.add(val); + localDeltaQueue.add(delta); + } + + public Set read() { + return self.read(); + } + + @Override + protected GSet newEmptyDelta() { + return new GSet<>(null, varId); + } +} diff --git a/src/main/java/org/bdware/sc/crdt/proxy/LWWRegisterProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/LWWRegisterProxy.java new file mode 100644 index 0000000..477da0a --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/proxy/LWWRegisterProxy.java @@ -0,0 +1,25 @@ +package org.bdware.sc.crdt.proxy; + +import org.bdware.crdt.register.LWWRegister; +import org.bdware.sc.crdt.SharableVarState; + +public class LWWRegisterProxy extends SharableVar> { + public LWWRegisterProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { + super(varId, cpId, conf); + self = new LWWRegister<>(cpId, varId); + } + + public void write(Object val) { + LWWRegister delta = self.write(System.currentTimeMillis(), val); + localDeltaQueue.add(delta); + } + + public Object read() { + return self.read(); + } + + @Override + protected LWWRegister newEmptyDelta() { + return new LWWRegister<>(null, varId); + } +} diff --git a/src/main/java/org/bdware/sc/crdt/proxy/MVRegisterProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/MVRegisterProxy.java new file mode 100644 index 0000000..ac02dcd --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/proxy/MVRegisterProxy.java @@ -0,0 +1,31 @@ +package org.bdware.sc.crdt.proxy; + +import org.bdware.crdt.register.MVRegister; +import org.bdware.sc.crdt.SharableVarState; + +public class MVRegisterProxy extends SharableVar> { + + public MVRegisterProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { + super(varId, cpId, conf); + self = new MVRegister<>(cpId, varId); + } + + public void write(Object val) { + MVRegister delta = self.write(val); + localDeltaQueue.add(delta); + } + + public void resolve(Object val) { + MVRegister delta = self.resolve(); + localDeltaQueue.add(delta); + } + + public Object read() { + return self.read(); + } + + @Override + protected MVRegister newEmptyDelta() { + return new MVRegister<>(null, varId); + } +} diff --git a/src/main/java/org/bdware/sc/crdt/proxy/PNCounterProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/PNCounterProxy.java new file mode 100644 index 0000000..eea4569 --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/proxy/PNCounterProxy.java @@ -0,0 +1,40 @@ +package org.bdware.sc.crdt.proxy; + +import org.bdware.crdt.counter.PNCounter; +import org.bdware.sc.crdt.SharableVarState; + +public class PNCounterProxy extends SharableVar { + public PNCounterProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { + super(varId, cpId, conf); + self = new PNCounter(cpId, varId); + } + + public void inc() { + PNCounter delta = self.inc(); + localDeltaQueue.add(delta); + } + + public void inc(long val) { + PNCounter delta = self.inc(val); + localDeltaQueue.add(delta); + } + + public void dec() { + PNCounter delta = self.dec(); + localDeltaQueue.add(delta); + } + + public void dec(long val) { + PNCounter delta = self.dec(val); + localDeltaQueue.add(delta); + } + + public Long read() { + return self.read(); + } + + @Override + protected PNCounter newEmptyDelta() { + return new PNCounter(null, varId); + } +} diff --git a/src/main/java/org/bdware/sc/crdt/proxy/RWLWWSetProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/RWLWWSetProxy.java new file mode 100644 index 0000000..bf07b75 --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/proxy/RWLWWSetProxy.java @@ -0,0 +1,32 @@ +package org.bdware.sc.crdt.proxy; + +import org.bdware.crdt.set.RWLWWSet; +import org.bdware.sc.crdt.SharableVarState; + +import java.util.Set; + +public class RWLWWSetProxy extends SharableVar> { + public RWLWWSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { + super(varId, cpId, conf); + self = new RWLWWSet<>(cpId, varId); + } + + public void add(Object val) { + RWLWWSet delta = self.add(System.currentTimeMillis(), val); + localDeltaQueue.add(delta); + } + + public void remove(Object val) { + RWLWWSet delta = self.remove(System.currentTimeMillis(), val); + localDeltaQueue.add(delta); + } + + public Set read() { + return self.read(); + } + + @Override + protected RWLWWSet newEmptyDelta() { + return new RWLWWSet<>(); + } +} diff --git a/src/main/java/org/bdware/sc/crdt/proxy/RWORSetProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/RWORSetProxy.java new file mode 100644 index 0000000..745b493 --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/proxy/RWORSetProxy.java @@ -0,0 +1,32 @@ +package org.bdware.sc.crdt.proxy; + +import org.bdware.crdt.set.RWORSet; +import org.bdware.sc.crdt.SharableVarState; + +import java.util.Set; + +public class RWORSetProxy extends SharableVar> { + public RWORSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { + super(varId, cpId, conf); + self = new RWORSet<>(cpId, varId); + } + + public void add(Object val) { + RWORSet delta = self.add(val); + localDeltaQueue.add(delta); + } + + public void remove(Object val) { + RWORSet delta = self.remove(val); + localDeltaQueue.add(delta); + } + + public Set read() { + return self.read(); + } + + @Override + protected RWORSet newEmptyDelta() { + return new RWORSet<>(null, varId); + } +} diff --git a/src/main/java/org/bdware/sc/crdt/proxy/SharableVar.java b/src/main/java/org/bdware/sc/crdt/proxy/SharableVar.java new file mode 100644 index 0000000..96fda8b --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/proxy/SharableVar.java @@ -0,0 +1,69 @@ +package org.bdware.sc.crdt.proxy; + +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import org.bdware.crdt.basic.JoinableCRDT; +import org.bdware.sc.crdt.SharableVarManager; +import org.bdware.sc.crdt.SharableVarState; +import org.bdware.sc.util.JsonUtil; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public abstract class SharableVar { + public final HashedWheelTimer HASHED_WHEEL_TIMER = + new HashedWheelTimer(r -> { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + return t; + }, 5, TimeUnit.MILLISECONDS, 2); + + public final Queue localDeltaQueue = new LinkedList<>(); + public final Queue remoteDeltaQueue = new LinkedList<>(); + public T self; + protected String varId; + private SyncTimerTask syncTimerTask; + private SharableVarState sharableVarState; + + public SharableVar(String varId, String cpId, SharableVarState.SharableVarConfiguration resolveResult) { + this.varId = varId; + this.sharableVarState = new SharableVarState(cpId, resolveResult); + this.HASHED_WHEEL_TIMER.newTimeout(timeout -> { + syncTimerTask = new SyncTimerTask(); + HASHED_WHEEL_TIMER.newTimeout(syncTimerTask, sharableVarState.getInterval(), TimeUnit.MILLISECONDS); + }, 0, TimeUnit.MILLISECONDS); + } + + private void syncVar() { + if (localDeltaQueue.isEmpty()) { + return; + } + JoinableCRDT joinedDelta = newEmptyDelta(); + synchronized (localDeltaQueue) { + while (!localDeltaQueue.isEmpty()) { + JoinableCRDT delta = localDeltaQueue.poll(); + joinedDelta.join(delta); + } + } + String content = JsonUtil.toJson(localDeltaQueue); + SharableVarManager.instance.broadcastSyncMessage(varId, sharableVarState.getSendTo(), content); + } + + abstract protected T newEmptyDelta(); + + class SyncTimerTask implements TimerTask { + @Override + public void run(Timeout timeout) throws Exception { + try { + syncVar(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + HASHED_WHEEL_TIMER.newTimeout(syncTimerTask, sharableVarState.getInterval(), TimeUnit.MILLISECONDS); + } + } + } +} diff --git a/src/main/java/org/bdware/sc/crdt/proxy/TPSetProxy.java b/src/main/java/org/bdware/sc/crdt/proxy/TPSetProxy.java new file mode 100644 index 0000000..de5a435 --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/proxy/TPSetProxy.java @@ -0,0 +1,32 @@ +package org.bdware.sc.crdt.proxy; + +import org.bdware.crdt.set.TPSet; +import org.bdware.sc.crdt.SharableVarState; + +import java.util.Set; + +public class TPSetProxy extends SharableVar> { + public TPSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { + super(varId, cpId, conf); + self = new TPSet<>(cpId, varId); + } + + public void add(Object val) { + TPSet delta = self.add(val); + localDeltaQueue.add(delta); + } + + public void remove(Object val) { + TPSet delta = self.remove(val); + localDeltaQueue.add(delta); + } + + public Set read() { + return self.read(); + } + + @Override + protected TPSet newEmptyDelta() { + return new TPSet<>(null, varId); + } +} diff --git a/src/main/java/org/bdware/sc/engine/DesktopEngine.java b/src/main/java/org/bdware/sc/engine/DesktopEngine.java index b3e1995..6a86463 100644 --- a/src/main/java/org/bdware/sc/engine/DesktopEngine.java +++ b/src/main/java/org/bdware/sc/engine/DesktopEngine.java @@ -249,7 +249,11 @@ public class DesktopEngine extends JSEngine { } catch (Exception e) { e.printStackTrace(); } - + for (SharableNode sharable : contractNode.getSharables()) { + for (String variableStatement : sharable.getVariableStatements()) { + compileSharable(sharable, variableStatement); + } + } for (FunctionNode fun : contractNode.getFunctions()) try { String str = fun.plainText(); @@ -292,6 +296,20 @@ public class DesktopEngine extends JSEngine { return cResult; } + private void compileSharable(SharableNode sharable, String variableStatement) { + try { + engine.getContext() + .setAttribute( + ScriptEngine.FILENAME, + sharable.getFileName(), + ScriptContext.ENGINE_SCOPE); + engine.eval("var " + variableStatement); + LOGGER.info("load sharable: " + variableStatement); + } catch (ScriptException e) { + throw new RuntimeException(e); + } + } + private void compileEventFunction(String name, String topic, REventSemantics semantics) { try { String str;