diff --git a/src/main/java/org/bdware/sc/crdt/SharableVarManager.java b/src/main/java/org/bdware/sc/crdt/SharableVarManager.java index de43f8b..48e7450 100644 --- a/src/main/java/org/bdware/sc/crdt/SharableVarManager.java +++ b/src/main/java/org/bdware/sc/crdt/SharableVarManager.java @@ -82,7 +82,8 @@ public class SharableVarManager { } } - private DoipMessage createSyncMessage(String target, String varId, String content, String type) { + private DoipMessage createSyncMessage(String target, String varId, String content, + String type) { DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); builder.createRequest(target, SHARABLEOP); builder.addAttributes("varId", varId); @@ -91,7 +92,8 @@ public class SharableVarManager { return builder.create(); } - public void broadcastSyncMessage(String varId, List sendTo, String content, String type) { + public void broadcastSyncMessage(String varId, List sendTo, String content, + String type) { for (String target : sendTo) { DoipMessage doipMessage = createSyncMessage(target, varId, content, type); if (target.equals(cpId)) { @@ -140,7 +142,7 @@ public class SharableVarManager { } private SharableVar createSharableVar(SharableVarState.SharableVarConfiguration conf, - String identifier, String type) { + String identifier, String type) { switch (type) { case Constants.TypeName.G_COUNTER: return new GCounterProxy(identifier, cpId, conf); diff --git a/src/main/java/org/bdware/sc/crdt/SharableVarState.java b/src/main/java/org/bdware/sc/crdt/SharableVarState.java index b1a07eb..180de02 100644 --- a/src/main/java/org/bdware/sc/crdt/SharableVarState.java +++ b/src/main/java/org/bdware/sc/crdt/SharableVarState.java @@ -189,7 +189,7 @@ public class SharableVarState { public static class SharableVarConfiguration { // ["/bdrepo1/var/0", "/bdrepo1/var/1", "/bdrepo1/var/2", "/bdrepo1/var/3"] // 0(w) -> 2(w) -> 1(r) -> 3(r) - // | + // | // 1(w) - // ["/bdrepo1/var/0", "/bdrepo1/var/1", "/bdrepo1/var/2", "/bdrepo1/var/3"] @@ -275,22 +275,27 @@ public class SharableVarState { public static void main(String[] args) { // ["/bdrepo1/var/0", "/bdrepo1/var/1", "/bdrepo1/var/2", "/bdrepo1/var/3"] // 0(w) -> 2(w) -> 1(r) -> 3(r) - // | + // | // 1(w) - SharableVarConfiguration sharableVarConfiguration = new SharableVarConfiguration(); - sharableVarConfiguration.setNodeIds(new String[]{"/bdrepo1/var/0", "/bdrepo1/var/1", "/bdrepo1/var/2", "/bdrepo1/var/3"}); - sharableVarConfiguration.setWriterParents(new int[]{2, 2, -1, -2}); - sharableVarConfiguration.setReaderParents(new int[]{-2, -1, -2, 1}); - sharableVarConfiguration.setWriterIntervals(new long[]{5, 5, 6, -1}); - sharableVarConfiguration.setReaderIntervals(new long[]{-1, 4, -1, -1}); + sharableVarConfiguration.setNodeIds(new String[] {"/bdrepo1/var/0", "/bdrepo1/var/1", + "/bdrepo1/var/2", "/bdrepo1/var/3"}); + sharableVarConfiguration.setWriterParents(new int[] {2, 2, -1, -2}); + sharableVarConfiguration.setReaderParents(new int[] {-2, -1, -2, 1}); + sharableVarConfiguration.setWriterIntervals(new long[] {5, 5, 6, -1}); + sharableVarConfiguration.setReaderIntervals(new long[] {-1, 4, -1, -1}); - SharableVarState sharableVarState0 = new SharableVarState("/bdrepo1/var/0", sharableVarConfiguration); - SharableVarState sharableVarState1 = new SharableVarState("/bdrepo1/var/1", sharableVarConfiguration); - SharableVarState sharableVarState2 = new SharableVarState("/bdrepo1/var/2", sharableVarConfiguration); - SharableVarState sharableVarState3 = new SharableVarState("/bdrepo1/var/3", sharableVarConfiguration); + SharableVarState sharableVarState0 = + new SharableVarState("/bdrepo1/var/0", sharableVarConfiguration); + SharableVarState sharableVarState1 = + new SharableVarState("/bdrepo1/var/1", sharableVarConfiguration); + SharableVarState sharableVarState2 = + new SharableVarState("/bdrepo1/var/2", sharableVarConfiguration); + SharableVarState sharableVarState3 = + new SharableVarState("/bdrepo1/var/3", sharableVarConfiguration); System.out.println(sharableVarState0); System.out.println(sharableVarState1); System.out.println(sharableVarState2); diff --git a/src/main/java/org/bdware/sc/crdt/planning/PlanningWith0Expansivity.java b/src/main/java/org/bdware/sc/crdt/planning/PlanningWith0Expansivity.java index c9bb0d4..13225c5 100644 --- a/src/main/java/org/bdware/sc/crdt/planning/PlanningWith0Expansivity.java +++ b/src/main/java/org/bdware/sc/crdt/planning/PlanningWith0Expansivity.java @@ -2,7 +2,7 @@ package org.bdware.sc.crdt.planning; public class PlanningWith0Expansivity extends SharableNetworkPlanning { public PlanningWith0Expansivity(String[] nodeIds, int[] writers, int[] readers, long maxDelay, - int bandwidthDownload, int bandwidthUpload, int dataSize) { + int bandwidthDownload, int bandwidthUpload, int dataSize) { this.nodeIds = nodeIds; this.writers = writers; this.readers = readers; @@ -64,7 +64,7 @@ public class PlanningWith0Expansivity extends SharableNetworkPlanning { frequencySyncR = rDelay > 0 ? (treeHeightR - 1) / rDelay : 0; frequencySyncWR = w2rDelay > 0 ? (1.0 / w2rDelay) : 0; - totalData = (long) (dataSize - * (frequencySyncW * a + frequencySyncWR * b + frequencySyncR * c)); + totalData = + (long) (dataSize * (frequencySyncW * a + frequencySyncWR * b + frequencySyncR * c)); } } diff --git a/src/main/java/org/bdware/sc/crdt/planning/PlanningWith1Expansivity.java b/src/main/java/org/bdware/sc/crdt/planning/PlanningWith1Expansivity.java index 72fb33d..2fcde74 100644 --- a/src/main/java/org/bdware/sc/crdt/planning/PlanningWith1Expansivity.java +++ b/src/main/java/org/bdware/sc/crdt/planning/PlanningWith1Expansivity.java @@ -2,7 +2,7 @@ package org.bdware.sc.crdt.planning; public class PlanningWith1Expansivity extends SharableNetworkPlanning { public PlanningWith1Expansivity(String[] nodeIds, int[] writers, int[] readers, long maxDelay, - int bandwidthDownload, int bandwidthUpload, int dataSize) { + int bandwidthDownload, int bandwidthUpload, int dataSize) { this.nodeIds = nodeIds; this.writers = writers; this.readers = readers; @@ -51,9 +51,10 @@ public class PlanningWith1Expansivity extends SharableNetworkPlanning { double a = 0; if (treeHeightW > 1) { if (treeDegreeW > 1) { - a = (treeHeightW - 1) * - (treeHeightW * Math.pow(treeDegreeW, treeHeightW) / (treeDegreeW - 1) - + (treeDegreeW - Math.pow(treeDegreeW, treeHeightW + 1)) / (treeDegreeW - 1) / (treeDegreeW - 1)) + a = (treeHeightW - 1) + * (treeHeightW * Math.pow(treeDegreeW, treeHeightW) / (treeDegreeW - 1) + + (treeDegreeW - Math.pow(treeDegreeW, treeHeightW + 1)) + / (treeDegreeW - 1) / (treeDegreeW - 1)) * rootCountW; } else { // treeDegreeW = 1 @@ -76,7 +77,7 @@ public class PlanningWith1Expansivity extends SharableNetworkPlanning { frequencySyncR = rDelay > 0 ? (treeHeightR - 1) / rDelay : 0; frequencySyncWR = w2rDelay > 0 ? (1.0 / w2rDelay) : 0; - totalData = (long) (dataSize - * (frequencySyncW * a + frequencySyncWR * b + frequencySyncR * c)); + totalData = + (long) (dataSize * (frequencySyncW * a + frequencySyncWR * b + frequencySyncR * c)); } } diff --git a/src/main/java/org/bdware/sc/crdt/planning/PlanningWithkExpansivity.java b/src/main/java/org/bdware/sc/crdt/planning/PlanningWithkExpansivity.java index 2ed44bc..13596b7 100644 --- a/src/main/java/org/bdware/sc/crdt/planning/PlanningWithkExpansivity.java +++ b/src/main/java/org/bdware/sc/crdt/planning/PlanningWithkExpansivity.java @@ -9,7 +9,7 @@ public class PlanningWithkExpansivity extends SharableNetworkPlanning { private final double domainSize; public PlanningWithkExpansivity(String[] nodeIds, int[] writers, int[] readers, long maxDelay, - int bandwidthDownload, int bandwidthUpload, int dataSize, double domainSize) { + int bandwidthDownload, int bandwidthUpload, int dataSize, double domainSize) { this.nodeIds = nodeIds; this.writers = writers; this.readers = readers; @@ -26,8 +26,8 @@ public class PlanningWithkExpansivity extends SharableNetworkPlanning { public boolean writerTreeConstraint() { if (frequencySyncW > 0) { if (treeDegreeW > 1) { - double common = frequencySyncW * domainSize - * (1 - Math.pow(k, (Math.pow(treeDegreeW, treeHeightW - 1) - 1) / (treeDegreeW - 1))); + double common = frequencySyncW * domainSize * (1 - Math.pow(k, + (Math.pow(treeDegreeW, treeHeightW - 1) - 1) / (treeDegreeW - 1))); // 非叶子节点下载带宽 boolean result1 = bandwidthDownload >= common * treeDegreeW; // 非根节点上行带宽 @@ -76,8 +76,7 @@ public class PlanningWithkExpansivity extends SharableNetworkPlanning { } double result = 0; for (int h = 1; h < H1; ++h) { - result += - (Math.pow(D1, h) * (1 - Math.pow(k, (Math.pow(D1, H1 - h) - 1) / (D1 - 1)))); + result += (Math.pow(D1, h) * (1 - Math.pow(k, (Math.pow(D1, H1 - h) - 1) / (D1 - 1)))); } accumulationsCache.computeIfAbsent(H1, k -> new HashMap<>()).put(D1, result); return result; @@ -101,7 +100,9 @@ public class PlanningWithkExpansivity extends SharableNetworkPlanning { } double b = rootCountR * rootCountW * (1 - Math.pow(k, treeNodeCountW)); - double c = treeHeightR > 1 ? (treeHeightR - 1) * (totalCountR - rootCountR) * (1 - Math.pow(k, totalCountW)) : 0; + double c = treeHeightR > 1 + ? (treeHeightR - 1) * (totalCountR - rootCountR) * (1 - Math.pow(k, totalCountW)) + : 0; double A = Math.sqrt(a); double B = Math.sqrt(b); diff --git a/src/main/java/org/bdware/sc/crdt/planning/SharableNetworkPlanning.java b/src/main/java/org/bdware/sc/crdt/planning/SharableNetworkPlanning.java index af3cddd..0b7027d 100644 --- a/src/main/java/org/bdware/sc/crdt/planning/SharableNetworkPlanning.java +++ b/src/main/java/org/bdware/sc/crdt/planning/SharableNetworkPlanning.java @@ -46,7 +46,8 @@ public class SharableNetworkPlanning { this.treeDegreeW = treeDegreeW; this.treeNodeCountW = Math.ceil(totalCountW / rootCountW); if (treeDegreeW > 1) { - this.treeHeightW = Math.ceil(logNM(treeDegreeW, treeNodeCountW * (treeDegreeW - 1) + 1)); + this.treeHeightW = + Math.ceil(logNM(treeDegreeW, treeNodeCountW * (treeDegreeW - 1) + 1)); } else { this.treeHeightW = treeNodeCountW; } @@ -57,7 +58,8 @@ public class SharableNetworkPlanning { this.treeDegreeR = treeDegreeR; this.treeNodeCountR = Math.ceil(totalCountR / rootCountR); if (treeDegreeR > 1) { - this.treeHeightR = Math.ceil(logNM(treeDegreeR, treeNodeCountR * (treeDegreeR - 1) + 1)); + this.treeHeightR = + Math.ceil(logNM(treeDegreeR, treeNodeCountR * (treeDegreeR - 1) + 1)); } else { this.treeHeightR = treeNodeCountR; } @@ -100,8 +102,7 @@ public class SharableNetworkPlanning { System.out.println(result); } - protected void calcOptimizedResult() { - } + protected void calcOptimizedResult() {} protected boolean writer2ReaderConstraint() { return true; @@ -138,7 +139,7 @@ public class SharableNetworkPlanning { } private int[] allocateTreeNode(Set onlySet, Set rwSet, int rootCount, - int degree, long delay, int height) { + int degree, long delay, int height) { int[] result = new int[nodeIds.length]; long[] writerInterval = new long[nodeIds.length]; Arrays.fill(result, -2); @@ -188,10 +189,9 @@ public class SharableNetworkPlanning { @Override public String toString() { - String result = "[" + wDelay + "," + w2rDelay + "," + rDelay + - "]\n" + "writer tree: degree " + treeDegreeW + - ", count " + rootCountW + ",\n" + "reader tree: degree " + treeDegreeR + ", count " + - rootCountR; + String result = "[" + wDelay + "," + w2rDelay + "," + rDelay + "]\n" + + "writer tree: degree " + treeDegreeW + ", count " + rootCountW + ",\n" + + "reader tree: degree " + treeDegreeR + ", count " + rootCountR; return result; } } diff --git a/src/main/java/org/bdware/sc/crdt/proxy/SharableVar.java b/src/main/java/org/bdware/sc/crdt/proxy/SharableVar.java index ad3fac5..4f904e5 100644 --- a/src/main/java/org/bdware/sc/crdt/proxy/SharableVar.java +++ b/src/main/java/org/bdware/sc/crdt/proxy/SharableVar.java @@ -32,15 +32,17 @@ public abstract class SharableVar { public WriterSyncTimerTask writerSyncTimerTask; public SharableVar(String varId, String cpId, - SharableVarState.SharableVarConfiguration resolveResult) { + SharableVarState.SharableVarConfiguration resolveResult) { this.varId = varId; this.sharableVarState = new SharableVarState(cpId, resolveResult); if (this.sharableVarState.isReaderFlag()) { this.readerVar = createDeltaCrdt(cpId, varId); this.readerVarDeltaQueue = new LinkedList<>(); - if (this.sharableVarState.getReaderChildren() != null && this.sharableVarState.getReaderChildren().size() > 0 - && this.sharableVarState.getReaderInterval() != null && this.sharableVarState.getReaderInterval() > 0) { + if (this.sharableVarState.getReaderChildren() != null + && this.sharableVarState.getReaderChildren().size() > 0 + && this.sharableVarState.getReaderInterval() != null + && this.sharableVarState.getReaderInterval() > 0) { this.readerTimer = new HashedWheelTimer(r -> { Thread t = Executors.defaultThreadFactory().newThread(r); t.setDaemon(true); @@ -48,15 +50,16 @@ public abstract class SharableVar { }, 5, TimeUnit.MILLISECONDS, 2); this.readerTimer.newTimeout(timeout -> { readerSyncTimerTask = new ReaderSyncTimerTask(); - readerTimer.newTimeout(readerSyncTimerTask, sharableVarState.getReaderInterval(), - TimeUnit.SECONDS); + readerTimer.newTimeout(readerSyncTimerTask, + sharableVarState.getReaderInterval(), TimeUnit.SECONDS); }, this.sharableVarState.getReaderInterval(), TimeUnit.SECONDS); } } if (this.sharableVarState.isWriterFlag()) { this.writerVar = createDeltaCrdt(cpId, varId); this.writerVarDeltaQueue = new LinkedList<>(); - if (this.sharableVarState.getWriteInterval() != null && this.sharableVarState.getWriteInterval() > 0) { + if (this.sharableVarState.getWriteInterval() != null + && this.sharableVarState.getWriteInterval() > 0) { this.writerTimer = new HashedWheelTimer(r -> { Thread t = Executors.defaultThreadFactory().newThread(r); t.setDaemon(true); @@ -83,7 +86,8 @@ public abstract class SharableVar { } } String content = JsonUtil.toJson(joinedDelta); - SharableVarManager.instance.broadcastSyncMessage(varId, sharableVarState.getReaderChildren(), content, "r2r"); + SharableVarManager.instance.broadcastSyncMessage(varId, + sharableVarState.getReaderChildren(), content, "r2r"); } private void syncWriterVar() { @@ -100,10 +104,12 @@ public abstract class SharableVar { String content = JsonUtil.toJson(joinedDelta); if (sharableVarState.getWriterParent() != null) { // 父节点是Writer - SharableVarManager.instance.broadcastSyncMessage(varId, Collections.singletonList(sharableVarState.getWriterParent()), content, "w2w"); + SharableVarManager.instance.broadcastSyncMessage(varId, + Collections.singletonList(sharableVarState.getWriterParent()), content, "w2w"); } else if (sharableVarState.getReaderRoots() != null) { // 自己是writer根结点 向reader根结点们广播 - SharableVarManager.instance.broadcastSyncMessage(varId, sharableVarState.getReaderRoots(), content, "w2r"); + SharableVarManager.instance.broadcastSyncMessage(varId, + sharableVarState.getReaderRoots(), content, "w2r"); } }