mirror of
https://gitee.com/BDWare/cp.git
synced 2025-01-09 17:34:08 +00:00
feat: sharable var integration
This commit is contained in:
parent
78d068d39b
commit
6dbb0e161c
@ -40,7 +40,7 @@ sourceSets {
|
||||
dependencies {
|
||||
api project(":common")
|
||||
api project(":mockjava")
|
||||
implementation 'org.bdware:delta-crdts:1.1.0'
|
||||
implementation 'org.bdware:delta-crdts:1.2.0'
|
||||
implementation 'org.apache.commons:commons-lang3:3.0'
|
||||
implementation 'com.atlassian.commonmark:commonmark:0.17.0'
|
||||
implementation 'com.idealista:format-preserving-encryption:1.0.0'
|
||||
@ -71,7 +71,7 @@ jar {
|
||||
// while develop at local use "false"
|
||||
configurations.runtimeClasspath.filter {
|
||||
it.getAbsolutePath().contains("/lib/")
|
||||
// false
|
||||
false
|
||||
}.collect {
|
||||
it.isDirectory() ? it : zipTree(it)
|
||||
}
|
||||
|
@ -1,107 +0,0 @@
|
||||
package org.bdware.sc.crdt;
|
||||
|
||||
import io.netty.util.HashedWheelTimer;
|
||||
import io.netty.util.Timeout;
|
||||
import io.netty.util.TimerTask;
|
||||
import org.bdware.crdt.counter.GCounter;
|
||||
import org.bdware.sc.util.JsonUtil;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class SharableVar {
|
||||
public final static HashedWheelTimer HASHED_WHEEL_TIMER =
|
||||
new HashedWheelTimer(new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = Executors.defaultThreadFactory().newThread(r);
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
}
|
||||
}, 5, TimeUnit.MILLISECONDS, 2);
|
||||
private final String varId;
|
||||
|
||||
private long interval;
|
||||
private final String myId;
|
||||
GCounter counter;
|
||||
int offset;
|
||||
List<String> sendTo;
|
||||
private SyncTimeout nextTimeOut;
|
||||
|
||||
|
||||
public SharableVar(String cpId, String identifier,
|
||||
SharableVarManager.VarResolveResult resolveResult) {
|
||||
counter = new GCounter(cpId, identifier);
|
||||
myId = cpId;
|
||||
varId = identifier;
|
||||
HASHED_WHEEL_TIMER.newTimeout(new TimerTask() {
|
||||
@Override
|
||||
public void run(Timeout timeout) throws Exception {
|
||||
initByVarResolve(resolveResult);
|
||||
|
||||
}
|
||||
}, 0, TimeUnit.MILLISECONDS);
|
||||
|
||||
}
|
||||
|
||||
public void join(String content) {
|
||||
GCounter toJoin = JsonUtil.fromJson(content, GCounter.class);
|
||||
counter.join(toJoin);
|
||||
}
|
||||
|
||||
class SyncTimeout implements TimerTask {
|
||||
@Override
|
||||
public void run(Timeout timeout) throws Exception {
|
||||
try {
|
||||
syncVar();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
HASHED_WHEEL_TIMER.newTimeout(nextTimeOut, interval, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void syncVar() {
|
||||
String content = JsonUtil.toJson(counter);
|
||||
SharableVarManager.instance.broadcastSyncMessage(varId, sendTo, content);
|
||||
}
|
||||
|
||||
private void initByVarResolve(SharableVarManager.VarResolveResult resolveResult) {
|
||||
sendTo = new ArrayList<>();
|
||||
// 假设没有同一个人既是reader又是writer。
|
||||
offset = -1;
|
||||
for (int i = 0; i < resolveResult.writer.length; i++) {
|
||||
if (myId.equals(resolveResult.writer[i])) {
|
||||
offset = i;
|
||||
}
|
||||
}
|
||||
if (offset == -1)
|
||||
for (int i = 0; i < resolveResult.reader.length; i++) {
|
||||
if (myId.equals(resolveResult.reader[i])) {
|
||||
offset = resolveResult.writer.length + i;
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < resolveResult.sendTo[offset].length; i++) {
|
||||
int pos = resolveResult.sendTo[offset][i];
|
||||
sendTo.add(findByOffset(pos, resolveResult));
|
||||
}
|
||||
interval = resolveResult.interval[offset];
|
||||
nextTimeOut = new SyncTimeout();
|
||||
HASHED_WHEEL_TIMER.newTimeout(nextTimeOut, interval, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
private String findByOffset(int pos, SharableVarManager.VarResolveResult resolveResult) {
|
||||
if (pos < resolveResult.writer.length)
|
||||
return resolveResult.writer[pos];
|
||||
return resolveResult.reader[pos - resolveResult.writer.length];
|
||||
}
|
||||
|
||||
public GCounter get() {
|
||||
return counter;
|
||||
}
|
||||
|
||||
}
|
@ -2,6 +2,8 @@ package org.bdware.sc.crdt;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.bdware.crdt.basic.Constants;
|
||||
import org.bdware.crdt.basic.JoinableCRDT;
|
||||
import org.bdware.doip.audit.AuditDoaClient;
|
||||
import org.bdware.doip.audit.EndpointConfig;
|
||||
import org.bdware.doip.audit.client.AuditDoipClient;
|
||||
@ -13,6 +15,7 @@ import org.bdware.doip.codec.doipMessage.DoipResponseCode;
|
||||
import org.bdware.doip.endpoint.client.DoipMessageCallback;
|
||||
import org.bdware.irp.client.IrpClient;
|
||||
import org.bdware.irp.stateinfo.StateInfoBase;
|
||||
import org.bdware.sc.crdt.proxy.*;
|
||||
import org.bdware.sc.util.JsonUtil;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
@ -24,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
public class SharableVarManager {
|
||||
static Logger LOGGER = LogManager.getLogger(SharableVarManager.class);
|
||||
public static SharableVarManager instance;
|
||||
private final String cpId;
|
||||
public final String cpId;
|
||||
Map<String, SharableVar> allVars;
|
||||
IrpClient client;
|
||||
AuditDoaClient doaClient;
|
||||
@ -32,23 +35,29 @@ public class SharableVarManager {
|
||||
|
||||
public static final String SHARABLEOP = "86.100871/SyncVar";
|
||||
|
||||
public SharableVarManager(String id, EndpointConfig config) {
|
||||
allVars = new ConcurrentHashMap<>();
|
||||
client = new AuditIrpClient(config);
|
||||
doaClient = new AuditDoaClient("", config, null);
|
||||
cpId = id;
|
||||
public SharableVarManager(String cpId, EndpointConfig config) {
|
||||
this.allVars = new ConcurrentHashMap<>();
|
||||
this.client = new AuditIrpClient(config);
|
||||
this.doaClient = new AuditDoaClient("", config, null);
|
||||
this.cpId = cpId;
|
||||
}
|
||||
|
||||
public static void initSharableVarManager(String id, EndpointConfig config) {
|
||||
instance = new SharableVarManager(id, config);
|
||||
if (instance == null) {
|
||||
instance = new SharableVarManager(id, config);
|
||||
}
|
||||
}
|
||||
|
||||
public DoipMessage handleSyncMessage(DoipMessage message) {
|
||||
try {
|
||||
String varId = message.header.parameters.attributes.get("varId").getAsString();
|
||||
String content = message.header.parameters.attributes.get("content").getAsString();
|
||||
SharableVar var = getVar(varId);
|
||||
var.join(content);
|
||||
SharableVar var = allVars.get(varId);
|
||||
if (var != null) {
|
||||
JoinableCRDT delta = JsonUtil.fromJson(content, JoinableCRDT.class);
|
||||
var.self.join(delta);
|
||||
var.remoteDeltaQueue.add(delta);
|
||||
}
|
||||
DoipMessageFactory.DoipMessageBuilder builder =
|
||||
new DoipMessageFactory.DoipMessageBuilder();
|
||||
builder.createResponse(DoipResponseCode.Success, message);
|
||||
@ -66,10 +75,6 @@ public class SharableVarManager {
|
||||
}
|
||||
}
|
||||
|
||||
private SharableVar getVar(String varId) {
|
||||
return allVars.get(varId);
|
||||
}
|
||||
|
||||
private DoipMessage createSyncMessage(String target, String varId, String content) {
|
||||
DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder();
|
||||
builder.createRequest(target, SHARABLEOP);
|
||||
@ -96,26 +101,20 @@ public class SharableVarManager {
|
||||
return doaClient.convertDoidToRepo(id);
|
||||
}
|
||||
|
||||
|
||||
static class VarResolveResult {
|
||||
String[] writer;
|
||||
String[] reader;
|
||||
int[][] sendTo;
|
||||
long[] interval;
|
||||
long maxDelay;
|
||||
}
|
||||
|
||||
public synchronized SharableVar createVar(String identifier, String type) {
|
||||
try {
|
||||
if (allVars.containsKey(identifier))
|
||||
if (allVars.containsKey(identifier)) {
|
||||
return allVars.get(identifier);
|
||||
}
|
||||
StateInfoBase stateInfoBase = client.resolve(identifier);
|
||||
if (stateInfoBase.handleValues.has("bdwType") && stateInfoBase.handleValues
|
||||
.get("bdwType").getAsString().equals("SharableVar")) {
|
||||
VarResolveResult resolveResult =
|
||||
JsonUtil.fromJson(stateInfoBase.handleValues, VarResolveResult.class);
|
||||
SharableVar sharableVar = new SharableVar(cpId, identifier, resolveResult);
|
||||
allVars.put(identifier, sharableVar);
|
||||
if (stateInfoBase.handleValues.has("bdwType") &&
|
||||
stateInfoBase.handleValues.get("bdwType").getAsString().equals("SharableVar")) {
|
||||
SharableVarState.SharableVarConfiguration sharableVarConf =
|
||||
JsonUtil.fromJson(stateInfoBase.handleValues, SharableVarState.SharableVarConfiguration.class);
|
||||
SharableVar sharableVar = createSharableVar(sharableVarConf, identifier, type);
|
||||
if (sharableVar != null) {
|
||||
allVars.put(identifier, sharableVar);
|
||||
}
|
||||
return sharableVar;
|
||||
} else
|
||||
return null;
|
||||
@ -125,4 +124,30 @@ public class SharableVarManager {
|
||||
}
|
||||
}
|
||||
|
||||
private SharableVar createSharableVar(SharableVarState.SharableVarConfiguration conf, String identifier, String type) {
|
||||
switch (type) {
|
||||
case Constants.TypeName.G_COUNTER:
|
||||
return new GCounterProxy(identifier, cpId, conf);
|
||||
case Constants.TypeName.PN_COUNTER:
|
||||
return new PNCounterProxy(identifier, cpId, conf);
|
||||
case Constants.TypeName.DW_FLAG:
|
||||
return new DWFlagProxy(identifier, cpId, conf);
|
||||
case Constants.TypeName.LWW_REGISTER:
|
||||
return new LWWRegisterProxy(identifier, cpId, conf);
|
||||
case Constants.TypeName.MV_REGISTER:
|
||||
return new MVRegisterProxy(identifier, cpId, conf);
|
||||
case Constants.TypeName.G_SET:
|
||||
return new GSetProxy(identifier, cpId, conf);
|
||||
case Constants.TypeName.TP_SET:
|
||||
return new TPSetProxy(identifier, cpId, conf);
|
||||
case Constants.TypeName.AW_OR_SET:
|
||||
return new AWORSetProxy(identifier, cpId, conf);
|
||||
case Constants.TypeName.RW_OR_SET:
|
||||
return new RWORSetProxy(identifier, cpId, conf);
|
||||
case Constants.TypeName.RW_LWW_SET:
|
||||
return new RWLWWSetProxy(identifier, cpId, conf);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
177
src/main/java/org/bdware/sc/crdt/SharableVarState.java
Normal file
177
src/main/java/org/bdware/sc/crdt/SharableVarState.java
Normal file
@ -0,0 +1,177 @@
|
||||
package org.bdware.sc.crdt;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class SharableVarState {
|
||||
private String myId;
|
||||
|
||||
private SharableVarConfiguration sharableVarConfiguration;
|
||||
|
||||
private int myIndex;
|
||||
|
||||
private boolean readerFlag;
|
||||
|
||||
private boolean writerFlag;
|
||||
private List<String> sendTo;
|
||||
|
||||
private long interval;
|
||||
|
||||
public SharableVarState(String myId, SharableVarConfiguration sharableVarConfiguration) {
|
||||
this.myId = myId;
|
||||
this.sharableVarConfiguration = sharableVarConfiguration;
|
||||
this.myIndex = parseMyIndex();
|
||||
this.sendTo = parseSendTo();
|
||||
this.interval = sharableVarConfiguration.interval[this.myIndex];
|
||||
for (int readerIndex : this.sharableVarConfiguration.readerIndexes) {
|
||||
if (Objects.equals(readerIndex, myIndex)) {
|
||||
this.readerFlag = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
for (int writerIndex : this.sharableVarConfiguration.writerIndexes) {
|
||||
if (Objects.equals(writerIndex, myIndex)) {
|
||||
this.writerFlag = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private int parseMyIndex() {
|
||||
for (int i = 0; i < sharableVarConfiguration.getNodeIds().length; i++) {
|
||||
if (Objects.equals(sharableVarConfiguration.getNodeIds()[i], myId)) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
private List<String> parseSendTo() {
|
||||
if (myIndex < 0 || myIndex >= sharableVarConfiguration.sendTo.length) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
LinkedHashSet<String> result = new LinkedHashSet<>();
|
||||
int[] sendToIndexes = sharableVarConfiguration.sendTo[myIndex];
|
||||
for (int sendToIndex : sendToIndexes) {
|
||||
if (sendToIndex >= 0 && sendToIndex < sharableVarConfiguration.nodeIds.length && sendToIndex != myIndex) {
|
||||
result.add(sharableVarConfiguration.nodeIds[sendToIndex]);
|
||||
}
|
||||
}
|
||||
return new ArrayList<>(result);
|
||||
}
|
||||
|
||||
public String getMyId() {
|
||||
return myId;
|
||||
}
|
||||
|
||||
public void setMyId(String myId) {
|
||||
this.myId = myId;
|
||||
}
|
||||
|
||||
public SharableVarConfiguration getSharableVarConfiguration() {
|
||||
return sharableVarConfiguration;
|
||||
}
|
||||
|
||||
public void setSharableVarConfiguration(SharableVarConfiguration sharableVarConfiguration) {
|
||||
this.sharableVarConfiguration = sharableVarConfiguration;
|
||||
}
|
||||
|
||||
public int getMyIndex() {
|
||||
return myIndex;
|
||||
}
|
||||
|
||||
public void setMyIndex(int myIndex) {
|
||||
this.myIndex = myIndex;
|
||||
}
|
||||
|
||||
public boolean isReaderFlag() {
|
||||
return readerFlag;
|
||||
}
|
||||
|
||||
public void setReaderFlag(boolean readerFlag) {
|
||||
this.readerFlag = readerFlag;
|
||||
}
|
||||
|
||||
public boolean isWriterFlag() {
|
||||
return writerFlag;
|
||||
}
|
||||
|
||||
public void setWriterFlag(boolean writerFlag) {
|
||||
this.writerFlag = writerFlag;
|
||||
}
|
||||
|
||||
public List<String> getSendTo() {
|
||||
return sendTo;
|
||||
}
|
||||
|
||||
public void setSendTo(List<String> sendTo) {
|
||||
this.sendTo = sendTo;
|
||||
}
|
||||
|
||||
public long getInterval() {
|
||||
return interval;
|
||||
}
|
||||
|
||||
public void setInterval(long interval) {
|
||||
this.interval = interval;
|
||||
}
|
||||
|
||||
public static class SharableVarConfiguration {
|
||||
String[] nodeIds;
|
||||
int[] writerIndexes;
|
||||
int[] readerIndexes;
|
||||
int[][] sendTo;
|
||||
long[] interval;
|
||||
long maxDelay;
|
||||
|
||||
public String[] getNodeIds() {
|
||||
return nodeIds;
|
||||
}
|
||||
|
||||
public void setNodeIds(String[] nodeIds) {
|
||||
this.nodeIds = nodeIds;
|
||||
}
|
||||
|
||||
public int[] getWriterIndexes() {
|
||||
return writerIndexes;
|
||||
}
|
||||
|
||||
public void setWriterIndexes(int[] writerIndexes) {
|
||||
this.writerIndexes = writerIndexes;
|
||||
}
|
||||
|
||||
public int[] getReaderIndexes() {
|
||||
return readerIndexes;
|
||||
}
|
||||
|
||||
public void setReaderIndexes(int[] readerIndexes) {
|
||||
this.readerIndexes = readerIndexes;
|
||||
}
|
||||
|
||||
public int[][] getSendTo() {
|
||||
return sendTo;
|
||||
}
|
||||
|
||||
public void setSendTo(int[][] sendTo) {
|
||||
this.sendTo = sendTo;
|
||||
}
|
||||
|
||||
public long[] getInterval() {
|
||||
return interval;
|
||||
}
|
||||
|
||||
public void setInterval(long[] interval) {
|
||||
this.interval = interval;
|
||||
}
|
||||
|
||||
public long getMaxDelay() {
|
||||
return maxDelay;
|
||||
}
|
||||
|
||||
public void setMaxDelay(long maxDelay) {
|
||||
this.maxDelay = maxDelay;
|
||||
}
|
||||
}
|
||||
}
|
32
src/main/java/org/bdware/sc/crdt/proxy/AWORSetProxy.java
Normal file
32
src/main/java/org/bdware/sc/crdt/proxy/AWORSetProxy.java
Normal file
@ -0,0 +1,32 @@
|
||||
package org.bdware.sc.crdt.proxy;
|
||||
|
||||
import org.bdware.crdt.set.AWORSet;
|
||||
import org.bdware.sc.crdt.SharableVarState;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
public class AWORSetProxy extends SharableVar<AWORSet<Object>> {
|
||||
public AWORSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) {
|
||||
super(varId, cpId, conf);
|
||||
self = new AWORSet<>(cpId, varId);
|
||||
}
|
||||
|
||||
public void add(Object val) {
|
||||
AWORSet<Object> delta = self.add(val);
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public void remove(Object val) {
|
||||
AWORSet<Object> delta = self.remove(val);
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public Set<Object> read() {
|
||||
return self.read();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AWORSet<Object> newEmptyDelta() {
|
||||
return new AWORSet<>(null, varId);
|
||||
}
|
||||
}
|
30
src/main/java/org/bdware/sc/crdt/proxy/DWFlagProxy.java
Normal file
30
src/main/java/org/bdware/sc/crdt/proxy/DWFlagProxy.java
Normal file
@ -0,0 +1,30 @@
|
||||
package org.bdware.sc.crdt.proxy;
|
||||
|
||||
import org.bdware.crdt.flag.DWFlag;
|
||||
import org.bdware.sc.crdt.SharableVarState;
|
||||
|
||||
public class DWFlagProxy extends SharableVar<DWFlag> {
|
||||
public DWFlagProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) {
|
||||
super(varId, cpId, conf);
|
||||
self = new DWFlag(cpId, varId);
|
||||
}
|
||||
|
||||
public void enable() {
|
||||
DWFlag delta = self.enable();
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public void disable() {
|
||||
DWFlag delta = self.disable();
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public boolean read() {
|
||||
return self.read();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DWFlag newEmptyDelta() {
|
||||
return new DWFlag(null, varId);
|
||||
}
|
||||
}
|
30
src/main/java/org/bdware/sc/crdt/proxy/EWFlagProxy.java
Normal file
30
src/main/java/org/bdware/sc/crdt/proxy/EWFlagProxy.java
Normal file
@ -0,0 +1,30 @@
|
||||
package org.bdware.sc.crdt.proxy;
|
||||
|
||||
import org.bdware.crdt.flag.EWFlag;
|
||||
import org.bdware.sc.crdt.SharableVarState;
|
||||
|
||||
public class EWFlagProxy extends SharableVar<EWFlag> {
|
||||
public EWFlagProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) {
|
||||
super(varId, cpId, conf);
|
||||
self = new EWFlag(cpId, varId);
|
||||
}
|
||||
|
||||
public void enable() {
|
||||
EWFlag delta = self.enable();
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public void disable() {
|
||||
EWFlag delta = self.disable();
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public boolean read() {
|
||||
return self.read();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EWFlag newEmptyDelta() {
|
||||
return new EWFlag(null, varId);
|
||||
}
|
||||
}
|
31
src/main/java/org/bdware/sc/crdt/proxy/GCounterProxy.java
Normal file
31
src/main/java/org/bdware/sc/crdt/proxy/GCounterProxy.java
Normal file
@ -0,0 +1,31 @@
|
||||
package org.bdware.sc.crdt.proxy;
|
||||
|
||||
import org.bdware.crdt.counter.GCounter;
|
||||
import org.bdware.sc.crdt.SharableVarState;
|
||||
|
||||
public class GCounterProxy extends SharableVar<GCounter> {
|
||||
|
||||
public GCounterProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) {
|
||||
super(varId, cpId, conf);
|
||||
self = new GCounter(cpId, varId);
|
||||
}
|
||||
|
||||
public void inc() {
|
||||
GCounter delta = self.inc();
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public void inc(long var) {
|
||||
GCounter delta = self.inc(var);
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public Long read() {
|
||||
return self.read();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected GCounter newEmptyDelta() {
|
||||
return new GCounter(null, varId);
|
||||
}
|
||||
}
|
27
src/main/java/org/bdware/sc/crdt/proxy/GSetProxy.java
Normal file
27
src/main/java/org/bdware/sc/crdt/proxy/GSetProxy.java
Normal file
@ -0,0 +1,27 @@
|
||||
package org.bdware.sc.crdt.proxy;
|
||||
|
||||
import org.bdware.crdt.set.GSet;
|
||||
import org.bdware.sc.crdt.SharableVarState;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
public class GSetProxy extends SharableVar<GSet<Object>> {
|
||||
public GSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) {
|
||||
super(varId, cpId, conf);
|
||||
self = new GSet<>(cpId, varId);
|
||||
}
|
||||
|
||||
public void add(Object val) {
|
||||
GSet<Object> delta = self.add(val);
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public Set<Object> read() {
|
||||
return self.read();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected GSet<Object> newEmptyDelta() {
|
||||
return new GSet<>(null, varId);
|
||||
}
|
||||
}
|
25
src/main/java/org/bdware/sc/crdt/proxy/LWWRegisterProxy.java
Normal file
25
src/main/java/org/bdware/sc/crdt/proxy/LWWRegisterProxy.java
Normal file
@ -0,0 +1,25 @@
|
||||
package org.bdware.sc.crdt.proxy;
|
||||
|
||||
import org.bdware.crdt.register.LWWRegister;
|
||||
import org.bdware.sc.crdt.SharableVarState;
|
||||
|
||||
public class LWWRegisterProxy extends SharableVar<LWWRegister<Long, Object>> {
|
||||
public LWWRegisterProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) {
|
||||
super(varId, cpId, conf);
|
||||
self = new LWWRegister<>(cpId, varId);
|
||||
}
|
||||
|
||||
public void write(Object val) {
|
||||
LWWRegister<Long, Object> delta = self.write(System.currentTimeMillis(), val);
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public Object read() {
|
||||
return self.read();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected LWWRegister<Long, Object> newEmptyDelta() {
|
||||
return new LWWRegister<>(null, varId);
|
||||
}
|
||||
}
|
31
src/main/java/org/bdware/sc/crdt/proxy/MVRegisterProxy.java
Normal file
31
src/main/java/org/bdware/sc/crdt/proxy/MVRegisterProxy.java
Normal file
@ -0,0 +1,31 @@
|
||||
package org.bdware.sc.crdt.proxy;
|
||||
|
||||
import org.bdware.crdt.register.MVRegister;
|
||||
import org.bdware.sc.crdt.SharableVarState;
|
||||
|
||||
public class MVRegisterProxy extends SharableVar<MVRegister<Object>> {
|
||||
|
||||
public MVRegisterProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) {
|
||||
super(varId, cpId, conf);
|
||||
self = new MVRegister<>(cpId, varId);
|
||||
}
|
||||
|
||||
public void write(Object val) {
|
||||
MVRegister<Object> delta = self.write(val);
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public void resolve(Object val) {
|
||||
MVRegister<Object> delta = self.resolve();
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public Object read() {
|
||||
return self.read();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MVRegister<Object> newEmptyDelta() {
|
||||
return new MVRegister<>(null, varId);
|
||||
}
|
||||
}
|
40
src/main/java/org/bdware/sc/crdt/proxy/PNCounterProxy.java
Normal file
40
src/main/java/org/bdware/sc/crdt/proxy/PNCounterProxy.java
Normal file
@ -0,0 +1,40 @@
|
||||
package org.bdware.sc.crdt.proxy;
|
||||
|
||||
import org.bdware.crdt.counter.PNCounter;
|
||||
import org.bdware.sc.crdt.SharableVarState;
|
||||
|
||||
public class PNCounterProxy extends SharableVar<PNCounter> {
|
||||
public PNCounterProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) {
|
||||
super(varId, cpId, conf);
|
||||
self = new PNCounter(cpId, varId);
|
||||
}
|
||||
|
||||
public void inc() {
|
||||
PNCounter delta = self.inc();
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public void inc(long val) {
|
||||
PNCounter delta = self.inc(val);
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public void dec() {
|
||||
PNCounter delta = self.dec();
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public void dec(long val) {
|
||||
PNCounter delta = self.dec(val);
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public Long read() {
|
||||
return self.read();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PNCounter newEmptyDelta() {
|
||||
return new PNCounter(null, varId);
|
||||
}
|
||||
}
|
32
src/main/java/org/bdware/sc/crdt/proxy/RWLWWSetProxy.java
Normal file
32
src/main/java/org/bdware/sc/crdt/proxy/RWLWWSetProxy.java
Normal file
@ -0,0 +1,32 @@
|
||||
package org.bdware.sc.crdt.proxy;
|
||||
|
||||
import org.bdware.crdt.set.RWLWWSet;
|
||||
import org.bdware.sc.crdt.SharableVarState;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
public class RWLWWSetProxy extends SharableVar<RWLWWSet<Long, Object>> {
|
||||
public RWLWWSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) {
|
||||
super(varId, cpId, conf);
|
||||
self = new RWLWWSet<>(cpId, varId);
|
||||
}
|
||||
|
||||
public void add(Object val) {
|
||||
RWLWWSet<Long, Object> delta = self.add(System.currentTimeMillis(), val);
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public void remove(Object val) {
|
||||
RWLWWSet<Long, Object> delta = self.remove(System.currentTimeMillis(), val);
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public Set<Object> read() {
|
||||
return self.read();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RWLWWSet<Long, Object> newEmptyDelta() {
|
||||
return new RWLWWSet<>();
|
||||
}
|
||||
}
|
32
src/main/java/org/bdware/sc/crdt/proxy/RWORSetProxy.java
Normal file
32
src/main/java/org/bdware/sc/crdt/proxy/RWORSetProxy.java
Normal file
@ -0,0 +1,32 @@
|
||||
package org.bdware.sc.crdt.proxy;
|
||||
|
||||
import org.bdware.crdt.set.RWORSet;
|
||||
import org.bdware.sc.crdt.SharableVarState;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
public class RWORSetProxy extends SharableVar<RWORSet<Object>> {
|
||||
public RWORSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) {
|
||||
super(varId, cpId, conf);
|
||||
self = new RWORSet<>(cpId, varId);
|
||||
}
|
||||
|
||||
public void add(Object val) {
|
||||
RWORSet<Object> delta = self.add(val);
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public void remove(Object val) {
|
||||
RWORSet<Object> delta = self.remove(val);
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public Set<Object> read() {
|
||||
return self.read();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RWORSet<Object> newEmptyDelta() {
|
||||
return new RWORSet<>(null, varId);
|
||||
}
|
||||
}
|
69
src/main/java/org/bdware/sc/crdt/proxy/SharableVar.java
Normal file
69
src/main/java/org/bdware/sc/crdt/proxy/SharableVar.java
Normal file
@ -0,0 +1,69 @@
|
||||
package org.bdware.sc.crdt.proxy;
|
||||
|
||||
import io.netty.util.HashedWheelTimer;
|
||||
import io.netty.util.Timeout;
|
||||
import io.netty.util.TimerTask;
|
||||
import org.bdware.crdt.basic.JoinableCRDT;
|
||||
import org.bdware.sc.crdt.SharableVarManager;
|
||||
import org.bdware.sc.crdt.SharableVarState;
|
||||
import org.bdware.sc.util.JsonUtil;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public abstract class SharableVar<T extends JoinableCRDT> {
|
||||
public final HashedWheelTimer HASHED_WHEEL_TIMER =
|
||||
new HashedWheelTimer(r -> {
|
||||
Thread t = Executors.defaultThreadFactory().newThread(r);
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
}, 5, TimeUnit.MILLISECONDS, 2);
|
||||
|
||||
public final Queue<JoinableCRDT> localDeltaQueue = new LinkedList<>();
|
||||
public final Queue<JoinableCRDT> remoteDeltaQueue = new LinkedList<>();
|
||||
public T self;
|
||||
protected String varId;
|
||||
private SyncTimerTask syncTimerTask;
|
||||
private SharableVarState sharableVarState;
|
||||
|
||||
public SharableVar(String varId, String cpId, SharableVarState.SharableVarConfiguration resolveResult) {
|
||||
this.varId = varId;
|
||||
this.sharableVarState = new SharableVarState(cpId, resolveResult);
|
||||
this.HASHED_WHEEL_TIMER.newTimeout(timeout -> {
|
||||
syncTimerTask = new SyncTimerTask();
|
||||
HASHED_WHEEL_TIMER.newTimeout(syncTimerTask, sharableVarState.getInterval(), TimeUnit.MILLISECONDS);
|
||||
}, 0, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
private void syncVar() {
|
||||
if (localDeltaQueue.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
JoinableCRDT joinedDelta = newEmptyDelta();
|
||||
synchronized (localDeltaQueue) {
|
||||
while (!localDeltaQueue.isEmpty()) {
|
||||
JoinableCRDT delta = localDeltaQueue.poll();
|
||||
joinedDelta.join(delta);
|
||||
}
|
||||
}
|
||||
String content = JsonUtil.toJson(localDeltaQueue);
|
||||
SharableVarManager.instance.broadcastSyncMessage(varId, sharableVarState.getSendTo(), content);
|
||||
}
|
||||
|
||||
abstract protected T newEmptyDelta();
|
||||
|
||||
class SyncTimerTask implements TimerTask {
|
||||
@Override
|
||||
public void run(Timeout timeout) throws Exception {
|
||||
try {
|
||||
syncVar();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
HASHED_WHEEL_TIMER.newTimeout(syncTimerTask, sharableVarState.getInterval(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
32
src/main/java/org/bdware/sc/crdt/proxy/TPSetProxy.java
Normal file
32
src/main/java/org/bdware/sc/crdt/proxy/TPSetProxy.java
Normal file
@ -0,0 +1,32 @@
|
||||
package org.bdware.sc.crdt.proxy;
|
||||
|
||||
import org.bdware.crdt.set.TPSet;
|
||||
import org.bdware.sc.crdt.SharableVarState;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
public class TPSetProxy extends SharableVar<TPSet<Object>> {
|
||||
public TPSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) {
|
||||
super(varId, cpId, conf);
|
||||
self = new TPSet<>(cpId, varId);
|
||||
}
|
||||
|
||||
public void add(Object val) {
|
||||
TPSet<Object> delta = self.add(val);
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public void remove(Object val) {
|
||||
TPSet<Object> delta = self.remove(val);
|
||||
localDeltaQueue.add(delta);
|
||||
}
|
||||
|
||||
public Set<Object> read() {
|
||||
return self.read();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TPSet<Object> newEmptyDelta() {
|
||||
return new TPSet<>(null, varId);
|
||||
}
|
||||
}
|
@ -249,7 +249,11 @@ public class DesktopEngine extends JSEngine {
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
for (SharableNode sharable : contractNode.getSharables()) {
|
||||
for (String variableStatement : sharable.getVariableStatements()) {
|
||||
compileSharable(sharable, variableStatement);
|
||||
}
|
||||
}
|
||||
for (FunctionNode fun : contractNode.getFunctions())
|
||||
try {
|
||||
String str = fun.plainText();
|
||||
@ -292,6 +296,20 @@ public class DesktopEngine extends JSEngine {
|
||||
return cResult;
|
||||
}
|
||||
|
||||
private void compileSharable(SharableNode sharable, String variableStatement) {
|
||||
try {
|
||||
engine.getContext()
|
||||
.setAttribute(
|
||||
ScriptEngine.FILENAME,
|
||||
sharable.getFileName(),
|
||||
ScriptContext.ENGINE_SCOPE);
|
||||
engine.eval("var " + variableStatement);
|
||||
LOGGER.info("load sharable: " + variableStatement);
|
||||
} catch (ScriptException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void compileEventFunction(String name, String topic, REventSemantics semantics) {
|
||||
try {
|
||||
String str;
|
||||
|
Loading…
Reference in New Issue
Block a user