feat: sharable var integration

This commit is contained in:
Xuxin Wang 2023-06-25 16:42:56 +08:00
parent 6dbb0e161c
commit 2a59f577fc
9 changed files with 33 additions and 25 deletions

View File

@ -107,10 +107,11 @@ public class SharableVarManager {
return allVars.get(identifier); return allVars.get(identifier);
} }
StateInfoBase stateInfoBase = client.resolve(identifier); StateInfoBase stateInfoBase = client.resolve(identifier);
if (stateInfoBase.handleValues.has("bdwType") && if (stateInfoBase.handleValues.has("bdwType") && stateInfoBase.handleValues
stateInfoBase.handleValues.get("bdwType").getAsString().equals("SharableVar")) { .get("bdwType").getAsString().equals("SharableVar")) {
SharableVarState.SharableVarConfiguration sharableVarConf = SharableVarState.SharableVarConfiguration sharableVarConf =
JsonUtil.fromJson(stateInfoBase.handleValues, SharableVarState.SharableVarConfiguration.class); JsonUtil.fromJson(stateInfoBase.handleValues,
SharableVarState.SharableVarConfiguration.class);
SharableVar sharableVar = createSharableVar(sharableVarConf, identifier, type); SharableVar sharableVar = createSharableVar(sharableVarConf, identifier, type);
if (sharableVar != null) { if (sharableVar != null) {
allVars.put(identifier, sharableVar); allVars.put(identifier, sharableVar);
@ -124,7 +125,8 @@ public class SharableVarManager {
} }
} }
private SharableVar createSharableVar(SharableVarState.SharableVarConfiguration conf, String identifier, String type) { private SharableVar createSharableVar(SharableVarState.SharableVarConfiguration conf,
String identifier, String type) {
switch (type) { switch (type) {
case Constants.TypeName.G_COUNTER: case Constants.TypeName.G_COUNTER:
return new GCounterProxy(identifier, cpId, conf); return new GCounterProxy(identifier, cpId, conf);

View File

@ -55,7 +55,8 @@ public class SharableVarState {
LinkedHashSet<String> result = new LinkedHashSet<>(); LinkedHashSet<String> result = new LinkedHashSet<>();
int[] sendToIndexes = sharableVarConfiguration.sendTo[myIndex]; int[] sendToIndexes = sharableVarConfiguration.sendTo[myIndex];
for (int sendToIndex : sendToIndexes) { for (int sendToIndex : sendToIndexes) {
if (sendToIndex >= 0 && sendToIndex < sharableVarConfiguration.nodeIds.length && sendToIndex != myIndex) { if (sendToIndex >= 0 && sendToIndex < sharableVarConfiguration.nodeIds.length
&& sendToIndex != myIndex) {
result.add(sharableVarConfiguration.nodeIds[sendToIndex]); result.add(sharableVarConfiguration.nodeIds[sendToIndex]);
} }
} }

View File

@ -5,7 +5,8 @@ import org.bdware.sc.crdt.SharableVarState;
public class GCounterProxy extends SharableVar<GCounter> { public class GCounterProxy extends SharableVar<GCounter> {
public GCounterProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { public GCounterProxy(String varId, String cpId,
SharableVarState.SharableVarConfiguration conf) {
super(varId, cpId, conf); super(varId, cpId, conf);
self = new GCounter(cpId, varId); self = new GCounter(cpId, varId);
} }

View File

@ -4,7 +4,8 @@ import org.bdware.crdt.register.LWWRegister;
import org.bdware.sc.crdt.SharableVarState; import org.bdware.sc.crdt.SharableVarState;
public class LWWRegisterProxy extends SharableVar<LWWRegister<Long, Object>> { public class LWWRegisterProxy extends SharableVar<LWWRegister<Long, Object>> {
public LWWRegisterProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { public LWWRegisterProxy(String varId, String cpId,
SharableVarState.SharableVarConfiguration conf) {
super(varId, cpId, conf); super(varId, cpId, conf);
self = new LWWRegister<>(cpId, varId); self = new LWWRegister<>(cpId, varId);
} }

View File

@ -5,7 +5,8 @@ import org.bdware.sc.crdt.SharableVarState;
public class MVRegisterProxy extends SharableVar<MVRegister<Object>> { public class MVRegisterProxy extends SharableVar<MVRegister<Object>> {
public MVRegisterProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { public MVRegisterProxy(String varId, String cpId,
SharableVarState.SharableVarConfiguration conf) {
super(varId, cpId, conf); super(varId, cpId, conf);
self = new MVRegister<>(cpId, varId); self = new MVRegister<>(cpId, varId);
} }

View File

@ -4,7 +4,8 @@ import org.bdware.crdt.counter.PNCounter;
import org.bdware.sc.crdt.SharableVarState; import org.bdware.sc.crdt.SharableVarState;
public class PNCounterProxy extends SharableVar<PNCounter> { public class PNCounterProxy extends SharableVar<PNCounter> {
public PNCounterProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { public PNCounterProxy(String varId, String cpId,
SharableVarState.SharableVarConfiguration conf) {
super(varId, cpId, conf); super(varId, cpId, conf);
self = new PNCounter(cpId, varId); self = new PNCounter(cpId, varId);
} }

View File

@ -6,7 +6,8 @@ import org.bdware.sc.crdt.SharableVarState;
import java.util.Set; import java.util.Set;
public class RWLWWSetProxy extends SharableVar<RWLWWSet<Long, Object>> { public class RWLWWSetProxy extends SharableVar<RWLWWSet<Long, Object>> {
public RWLWWSetProxy(String varId, String cpId, SharableVarState.SharableVarConfiguration conf) { public RWLWWSetProxy(String varId, String cpId,
SharableVarState.SharableVarConfiguration conf) {
super(varId, cpId, conf); super(varId, cpId, conf);
self = new RWLWWSet<>(cpId, varId); self = new RWLWWSet<>(cpId, varId);
} }

View File

@ -14,12 +14,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public abstract class SharableVar<T extends JoinableCRDT> { public abstract class SharableVar<T extends JoinableCRDT> {
public final HashedWheelTimer HASHED_WHEEL_TIMER = public final HashedWheelTimer HASHED_WHEEL_TIMER = new HashedWheelTimer(r -> {
new HashedWheelTimer(r -> { Thread t = Executors.defaultThreadFactory().newThread(r);
Thread t = Executors.defaultThreadFactory().newThread(r); t.setDaemon(true);
t.setDaemon(true); return t;
return t; }, 5, TimeUnit.MILLISECONDS, 2);
}, 5, TimeUnit.MILLISECONDS, 2);
public final Queue<JoinableCRDT> localDeltaQueue = new LinkedList<>(); public final Queue<JoinableCRDT> localDeltaQueue = new LinkedList<>();
public final Queue<JoinableCRDT> remoteDeltaQueue = new LinkedList<>(); public final Queue<JoinableCRDT> remoteDeltaQueue = new LinkedList<>();
@ -28,12 +27,14 @@ public abstract class SharableVar<T extends JoinableCRDT> {
private SyncTimerTask syncTimerTask; private SyncTimerTask syncTimerTask;
private SharableVarState sharableVarState; private SharableVarState sharableVarState;
public SharableVar(String varId, String cpId, SharableVarState.SharableVarConfiguration resolveResult) { public SharableVar(String varId, String cpId,
SharableVarState.SharableVarConfiguration resolveResult) {
this.varId = varId; this.varId = varId;
this.sharableVarState = new SharableVarState(cpId, resolveResult); this.sharableVarState = new SharableVarState(cpId, resolveResult);
this.HASHED_WHEEL_TIMER.newTimeout(timeout -> { this.HASHED_WHEEL_TIMER.newTimeout(timeout -> {
syncTimerTask = new SyncTimerTask(); syncTimerTask = new SyncTimerTask();
HASHED_WHEEL_TIMER.newTimeout(syncTimerTask, sharableVarState.getInterval(), TimeUnit.MILLISECONDS); HASHED_WHEEL_TIMER.newTimeout(syncTimerTask, sharableVarState.getInterval(),
TimeUnit.MILLISECONDS);
}, 0, TimeUnit.MILLISECONDS); }, 0, TimeUnit.MILLISECONDS);
} }
@ -49,7 +50,8 @@ public abstract class SharableVar<T extends JoinableCRDT> {
} }
} }
String content = JsonUtil.toJson(localDeltaQueue); String content = JsonUtil.toJson(localDeltaQueue);
SharableVarManager.instance.broadcastSyncMessage(varId, sharableVarState.getSendTo(), content); SharableVarManager.instance.broadcastSyncMessage(varId, sharableVarState.getSendTo(),
content);
} }
abstract protected T newEmptyDelta(); abstract protected T newEmptyDelta();
@ -62,7 +64,8 @@ public abstract class SharableVar<T extends JoinableCRDT> {
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
HASHED_WHEEL_TIMER.newTimeout(syncTimerTask, sharableVarState.getInterval(), TimeUnit.MILLISECONDS); HASHED_WHEEL_TIMER.newTimeout(syncTimerTask, sharableVarState.getInterval(),
TimeUnit.MILLISECONDS);
} }
} }
} }

View File

@ -298,11 +298,8 @@ public class DesktopEngine extends JSEngine {
private void compileSharable(SharableNode sharable, String variableStatement) { private void compileSharable(SharableNode sharable, String variableStatement) {
try { try {
engine.getContext() engine.getContext().setAttribute(ScriptEngine.FILENAME, sharable.getFileName(),
.setAttribute( ScriptContext.ENGINE_SCOPE);
ScriptEngine.FILENAME,
sharable.getFileName(),
ScriptContext.ENGINE_SCOPE);
engine.eval("var " + variableStatement); engine.eval("var " + variableStatement);
LOGGER.info("load sharable: " + variableStatement); LOGGER.info("load sharable: " + variableStatement);
} catch (ScriptException e) { } catch (ScriptException e) {