diff --git a/src/main/java/org/bdware/sc/crdt/SharableVarManager.java b/src/main/java/org/bdware/sc/crdt/SharableVarManager.java index 9890388..763db2d 100644 --- a/src/main/java/org/bdware/sc/crdt/SharableVarManager.java +++ b/src/main/java/org/bdware/sc/crdt/SharableVarManager.java @@ -93,14 +93,20 @@ public class SharableVarManager { public void broadcastSyncMessage(String varId, List sendTo, String content, String type) { for (String target : sendTo) { DoipMessage doipMessage = createSyncMessage(target, varId, content, type); - AuditDoipClient client = getClient(target); - client.sendMessage(doipMessage, new DoipMessageCallback() { - @Override - public void onResult(DoipMessage doipMessage) { - LOGGER.info("RECV Sync:" - + JsonUtil.toJson(JsonDoipMessage.fromDoipMessage(doipMessage))); - } - }); + if (target.equals(cpId)) { + LOGGER.info("Handle Sync locally:" + + JsonUtil.toJson(JsonDoipMessage.fromDoipMessage(doipMessage))); + handleSyncMessage(doipMessage); + } else { + AuditDoipClient client = getClient(target); + client.sendMessage(doipMessage, new DoipMessageCallback() { + @Override + public void onResult(DoipMessage doipMessage) { + LOGGER.info("RECV Sync:" + + JsonUtil.toJson(JsonDoipMessage.fromDoipMessage(doipMessage))); + } + }); + } } } diff --git a/src/main/java/org/bdware/sc/crdt/planning/PlanningTest.java b/src/main/java/org/bdware/sc/crdt/planning/PlanningTest.java new file mode 100644 index 0000000..b5c8b19 --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/planning/PlanningTest.java @@ -0,0 +1,33 @@ +package org.bdware.sc.crdt.planning; + +public class PlanningTest { + public static void main(String[] args) { + + int[] writers = new int[100]; + int[] readers = new int[100]; + long maxDelay = 60; + int bandwidthUpload = 10 * 1024 * 1024; + int bandwidthDownload = 10 * 1024 * 1024; + int datasize = 100 * 1024; + double domainSize = 100 * datasize; + + PlanningWith0Expansivity planning0 = new PlanningWith0Expansivity(writers, readers, maxDelay, bandwidthDownload, bandwidthUpload, datasize); + PlanningWithkExpansivity planningK = new PlanningWithkExpansivity(writers, readers, maxDelay, bandwidthDownload, bandwidthUpload, datasize, domainSize); + PlanningWith1Expansivity planning1 = new PlanningWith1Expansivity(writers, readers, maxDelay, bandwidthDownload, bandwidthUpload, datasize); + + long start = System.currentTimeMillis(); + planning0.adjustAndCalc(); + long end = System.currentTimeMillis(); + System.out.println("took " + (end - start)); + + start = System.currentTimeMillis(); + planningK.adjustAndCalc(); + end = System.currentTimeMillis(); + System.out.println("took " + (end - start)); + + start = System.currentTimeMillis(); + planning1.adjustAndCalc(); + end = System.currentTimeMillis(); + System.out.println("took " + (end - start)); + } +} diff --git a/src/main/java/org/bdware/sc/crdt/planning/PlanningWith0Expansivity.java b/src/main/java/org/bdware/sc/crdt/planning/PlanningWith0Expansivity.java new file mode 100644 index 0000000..60e6399 --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/planning/PlanningWith0Expansivity.java @@ -0,0 +1,108 @@ +package org.bdware.sc.crdt.planning; + +public class PlanningWith0Expansivity extends SharableNetworkPlanning { + public PlanningWith0Expansivity(int[] writers, int[] readers, long maxDelay, int bandwidthDownload, int bandwidthUpload, int dataSize) { + this.writers = writers; + this.readers = readers; + this.maxDelay = maxDelay; + this.bandwidthDownload = bandwidthDownload; + this.bandwidthUpload = bandwidthUpload; + this.dataSize = dataSize; + this.totalCountW = writers.length; + this.totalCountR = readers.length; + } + + public boolean writerTreeConstraint() { + double common = frequencySyncW * dataSize; + // 非叶子节点下载带宽 + boolean result1 = bandwidthDownload >= common * treeDegreeW; + // 非根节点上行带宽 + boolean result2 = bandwidthUpload >= common; + return result1 && result2; + } + + public boolean writer2ReaderConstraint() { + double common = frequencySyncWR * dataSize; + // Writer根节点上行带宽 + boolean result1 = bandwidthUpload >= common * rootCountR; + // Reader根节点下载带宽 + boolean result2 = bandwidthDownload >= common * rootCountW; + return result1 && result2; + } + + public boolean readerTreeConstraint() { + double common = frequencySyncR * dataSize; + // Reader非叶子节点上行带宽 + boolean result1 = bandwidthUpload >= common * rootCountR; + // Reader非根节点下载带宽 + boolean result2 = bandwidthDownload >= common; + return result1 && result2; + } + + public void calcOptimizedResult() { + double a = totalCountW - rootCountW; + double b = rootCountR * rootCountW; + double c = totalCountR - rootCountR; + + + double A = Math.sqrt(a * (treeHeightW - 1)); + double B = Math.sqrt(b); + double C = Math.sqrt(c * (treeHeightR - 1)); + wDelay = (long) (maxDelay * (A / (A + B + C))); + w2rDelay = (long) (maxDelay * (B / (A + B + C))); + rDelay = (long) (maxDelay * (C / (A + B + C))); + + frequencySyncW = (treeHeightW - 1) / wDelay; + frequencySyncR = (treeHeightR - 1) / rDelay; + frequencySyncWR = 1.0 / w2rDelay; + + totalData = (long) (maxDelay * dataSize * (frequencySyncW * a + frequencySyncWR * b + frequencySyncR * c)); + } + + public static void main(String[] args) { + long start = System.currentTimeMillis(); + int[] writers = new int[1000]; + int[] readers = new int[1000]; + long maxDelay = 60; + int bandwidthUpload = 10 * 1024 * 1024; + int bandwidthDownload = 10 * 1024 * 1024; + int datasize = 10 * 1024; + + PlanningWith0Expansivity planning = new PlanningWith0Expansivity(writers, readers, maxDelay, bandwidthDownload, bandwidthUpload, datasize); + + long minTotalData = Long.MAX_VALUE; + String result = ""; + for (int rootCountW = 1; rootCountW <= writers.length; ++rootCountW) { + for (int treeDegreeW = 2; treeDegreeW <= writers.length / rootCountW - 1; ++treeDegreeW) { + planning.adjustWriterTree(rootCountW, treeDegreeW); + for (int rootCountR = 1; rootCountR <= readers.length; ++rootCountR) { + for (int treeDegreeR = 2; treeDegreeR <= readers.length / rootCountR - 1; ++treeDegreeR) { + planning.adjustReaderTree(rootCountR, treeDegreeR); + planning.calcOptimizedResult(); + if (!planning.readerTreeConstraint()) { + //System.out.println("reader"); + continue; + } + if (!planning.writerTreeConstraint()) { + //System.out.println("writer"); + continue; + } + if (!planning.writer2ReaderConstraint()) { + //System.out.println("writer2Reader"); + continue; + } + if (minTotalData > planning.totalData) { + minTotalData = planning.totalData; + result = planning.toString(); + } + } + } + } + } + + System.out.println(minTotalData); + System.out.println(result); + long end = System.currentTimeMillis(); + System.out.println("took " + (end - start)); + } +} diff --git a/src/main/java/org/bdware/sc/crdt/planning/PlanningWith1Expansivity.java b/src/main/java/org/bdware/sc/crdt/planning/PlanningWith1Expansivity.java new file mode 100644 index 0000000..1cac931 --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/planning/PlanningWith1Expansivity.java @@ -0,0 +1,108 @@ +package org.bdware.sc.crdt.planning; + +public class PlanningWith1Expansivity extends SharableNetworkPlanning { + public PlanningWith1Expansivity(int[] writers, int[] readers, long maxDelay, int bandwidthDownload, int bandwidthUpload, int dataSize) { + this.writers = writers; + this.readers = readers; + this.maxDelay = maxDelay; + this.bandwidthDownload = bandwidthDownload; + this.bandwidthUpload = bandwidthUpload; + this.dataSize = dataSize; + this.totalCountW = writers.length; + this.totalCountR = readers.length; + } + + public boolean writerTreeConstraint() { + double common = frequencySyncW * (treeNodeCountW - 1) * dataSize; + // 非叶子节点下载带宽 + boolean result1 = bandwidthDownload >= common; + // 非根节点上行带宽 + boolean result2 = bandwidthUpload >= common / treeDegreeW; + return result1 && result2; + } + + public boolean writer2ReaderConstraint() { + double common = frequencySyncWR * totalCountW * dataSize; + // Writer根节点上行带宽 + boolean result1 = bandwidthUpload >= common * rootCountR / rootCountW; + // Reader根节点下载带宽 + boolean result2 = bandwidthDownload >= common; + return result1 && result2; + } + + public boolean readerTreeConstraint() { + double common = frequencySyncR * totalCountW * dataSize; + // Reader非叶子节点上行带宽 + boolean result1 = bandwidthUpload >= common * treeDegreeR; + // Reader非根节点下载带宽 + boolean result2 = bandwidthDownload >= common; + return result1 && result2; + } + + public void calcOptimizedResult() { + double a = treeHeightW * Math.pow(treeDegreeW, treeHeightW) / (treeDegreeW - 1) + (treeDegreeW - Math.pow(treeDegreeW, treeHeightW + 1)) / Math.pow(treeDegreeW - 1, 2); + double b = rootCountR * totalCountW; + double c = (totalCountR - rootCountR) * totalCountW; + + double A = Math.sqrt(a * (treeHeightW - 1)); + double B = Math.sqrt(b); + double C = Math.sqrt(c * (treeHeightR - 1)); + wDelay = (long) (maxDelay * (A / (A + B + C))); + w2rDelay = (long) (maxDelay * (B / (A + B + C))); + rDelay = (long) (maxDelay * (C / (A + B + C))); + + + frequencySyncW = (treeHeightW - 1) / wDelay; + frequencySyncR = (treeHeightR - 1) / rDelay; + frequencySyncWR = 1.0 / w2rDelay; + + totalData = (long) (maxDelay * dataSize * (frequencySyncW * a + frequencySyncWR * b + frequencySyncR * c)); + } + + public static void main(String[] args) { + long start = System.currentTimeMillis(); + int[] writers = new int[1000]; + int[] readers = new int[1000]; + long maxDelay = 60; + int bandwidthUpload = 10 * 1024 * 1024; + int bandwidthDownload = 10 * 1024 * 1024; + int datasize = 10 * 1024; + + PlanningWith1Expansivity planning = new PlanningWith1Expansivity(writers, readers, maxDelay, bandwidthDownload, bandwidthUpload, datasize); + + long minTotalData = Long.MAX_VALUE; + String result = ""; + for (int rootCountW = 1; rootCountW <= writers.length; ++rootCountW) { + for (int treeDegreeW = 2; treeDegreeW <= writers.length / rootCountW - 1; ++treeDegreeW) { + planning.adjustWriterTree(rootCountW, treeDegreeW); + for (int rootCountR = 1; rootCountR <= readers.length; ++rootCountR) { + for (int treeDegreeR = 2; treeDegreeR <= readers.length / rootCountR - 1; ++treeDegreeR) { + planning.adjustReaderTree(rootCountR, treeDegreeR); + planning.calcOptimizedResult(); + if (!planning.readerTreeConstraint()) { + //System.out.println("reader"); + continue; + } + if (!planning.writerTreeConstraint()) { + //System.out.println("writer"); + continue; + } + if (!planning.writer2ReaderConstraint()) { + //System.out.println("writer2Reader"); + continue; + } + if (minTotalData > planning.totalData) { + minTotalData = planning.totalData; + result = planning.toString(); + } + } + } + } + } + + System.out.println(minTotalData); + System.out.println(result); + long end = System.currentTimeMillis(); + System.out.println("took " + (end - start)); + } +} diff --git a/src/main/java/org/bdware/sc/crdt/planning/PlanningWithkExpansivity.java b/src/main/java/org/bdware/sc/crdt/planning/PlanningWithkExpansivity.java new file mode 100644 index 0000000..9a51701 --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/planning/PlanningWithkExpansivity.java @@ -0,0 +1,102 @@ +package org.bdware.sc.crdt.planning; + +public class PlanningWithkExpansivity extends SharableNetworkPlanning { + private double k; + + private double domainSize; + + public PlanningWithkExpansivity(int[] writers, int[] readers, long maxDelay, int bandwidthDownload, int bandwidthUpload, int dataSize, double domainSize) { + this.writers = writers; + this.readers = readers; + this.maxDelay = maxDelay; + this.bandwidthDownload = bandwidthDownload; + this.bandwidthUpload = bandwidthUpload; + this.dataSize = dataSize; + this.totalCountW = writers.length; + this.totalCountR = readers.length; + this.domainSize = domainSize; + this.k = (domainSize - dataSize) / domainSize; + } + + private double kWithPow(double v) { + return Math.pow(k, (v - 1) / (treeDegreeW - 1)); + } + + public boolean writerTreeConstraint() { + double common = frequencySyncW * domainSize * (1 - kWithPow(Math.pow(treeDegreeW, treeHeightW - 2))); + // 非叶子节点下载带宽 + boolean result1 = bandwidthDownload >= common * treeDegreeW; + // 非根节点上行带宽 + boolean result2 = bandwidthUpload >= common; + return result1 && result2; + } + + public boolean writer2ReaderConstraint() { + double common = frequencySyncWR * domainSize * (1 - Math.pow(k, treeNodeCountW)); + // Writer根节点上行带宽 + boolean result1 = bandwidthUpload >= common * rootCountR; + // Reader根节点下载带宽 + boolean result2 = bandwidthDownload >= common * rootCountW; + return result1 && result2; + } + + public boolean readerTreeConstraint() { + double common = frequencySyncR * domainSize * (1 - Math.pow(k, totalCountW)); + // Reader非叶子节点上行带宽 + boolean result1 = bandwidthUpload >= common * treeDegreeR; + // Reader非根节点下载带宽 + boolean result2 = bandwidthDownload >= common; + return result1 && result2; + } + + double[] accumulations = new double[1000]; + + double calcAccumulation(int H1) { + if (accumulations[H1] > 0) { + return accumulations[H1]; + } + double result = 0; + for (int h = 1; h < H1; ++h) { + result += (Math.pow(treeDegreeW, h) * (1 - kWithPow(Math.pow(treeDegreeW, H1 - h - 1)))); + } + return result; + } + + public void calcOptimizedResult() { + double a = calcAccumulation((int) treeHeightW); + double b = rootCountR * rootCountW * (1 - Math.pow(k, treeNodeCountW)); + double c = (totalCountR - rootCountR) * (1 - Math.pow(k, totalCountW)); + + double A = Math.sqrt(a * (treeHeightW - 1)); + double B = Math.sqrt(b); + double C = Math.sqrt(c * (treeHeightR - 1)); + wDelay = (long) (maxDelay * (A / (A + B + C))); + w2rDelay = (long) (maxDelay * (B / (A + B + C))); + rDelay = (long) (maxDelay * (C / (A + B + C))); + + + frequencySyncW = (treeHeightW - 1) / wDelay; + frequencySyncR = (treeHeightR - 1) / rDelay; + frequencySyncWR = 1.0 / w2rDelay; + + totalData = (long) (maxDelay * domainSize * (frequencySyncW * a + frequencySyncWR * b + frequencySyncR * c)); + } + + public static void main(String[] args) { + long start = System.currentTimeMillis(); + int[] writers = new int[1000]; + int[] readers = new int[1000]; + long maxDelay = 60; + int bandwidthUpload = 10 * 1024 * 1024; + int bandwidthDownload = 10 * 1024 * 1024; + int datasize = 10 * 1024; + double domainSize = 1000 * 1024; + + PlanningWithkExpansivity planning = new PlanningWithkExpansivity(writers, readers, maxDelay, bandwidthDownload, bandwidthUpload, datasize, domainSize); + + planning.adjustAndCalc(); + + long end = System.currentTimeMillis(); + System.out.println("took " + (end - start)); + } +} diff --git a/src/main/java/org/bdware/sc/crdt/planning/SharableNetworkPlanning.java b/src/main/java/org/bdware/sc/crdt/planning/SharableNetworkPlanning.java new file mode 100644 index 0000000..79f9941 --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/planning/SharableNetworkPlanning.java @@ -0,0 +1,113 @@ +package org.bdware.sc.crdt.planning; + +public class SharableNetworkPlanning { + protected int[] writers; + protected int[] readers; + protected long maxDelay; + + protected int bandwidthDownload; + protected int bandwidthUpload; + protected int dataSize; + + + protected long wDelay; + protected long rDelay; + protected long w2rDelay; + + + protected double totalCountW; + protected double rootCountW; + protected double treeNodeCountW; + protected double treeDegreeW; + protected double treeHeightW; + protected double frequencySyncW; + + + protected double totalCountR; + protected double rootCountR; + protected double treeNodeCountR; + protected double treeDegreeR; + protected double treeHeightR; + protected double frequencySyncR; + + protected double frequencySyncWR; + + protected long totalData; + + public void adjustWriterTree(int rootCountW, int treeDegreeW) { + this.rootCountW = rootCountW; + this.treeDegreeW = treeDegreeW; + this.treeNodeCountW = Math.ceil(totalCountW / rootCountW); + this.treeHeightW = Math.ceil(logNM(treeDegreeW, treeNodeCountW * (treeDegreeW - 1) + 1)); + } + + public void adjustReaderTree(int rootCountR, int treeDegreeR) { + this.rootCountR = rootCountR; + this.treeDegreeR = treeDegreeR; + this.treeNodeCountR = Math.ceil(totalCountR / rootCountR); + this.treeHeightR = Math.ceil(logNM(treeDegreeR, treeNodeCountR * (treeDegreeR - 1) + 1)); + } + + protected double logNM(double n, double m) { + return Math.log(m) / Math.log(n); + } + + protected void adjustAndCalc() { + long minTotalData = Long.MAX_VALUE; + String result = ""; + for (int rootCountW = 1; rootCountW <= writers.length; ++rootCountW) { + for (int treeDegreeW = 2; treeDegreeW <= writers.length / rootCountW - 1; ++treeDegreeW) { + adjustWriterTree(rootCountW, treeDegreeW); + for (int rootCountR = 1; rootCountR <= readers.length; ++rootCountR) { + for (int treeDegreeR = 2; treeDegreeR <= readers.length / rootCountR - 1; ++treeDegreeR) { + adjustReaderTree(rootCountR, treeDegreeR); + calcOptimizedResult(); + if (!readerTreeConstraint()) { + //System.out.println("reader"); + continue; + } + if (!writerTreeConstraint()) { + //System.out.println("writer"); + continue; + } + if (!writer2ReaderConstraint()) { + //System.out.println("writer2Reader"); + continue; + } + if (minTotalData > totalData) { + minTotalData = totalData; + result = toString(); + } + } + } + } + } + + System.out.println(minTotalData); + System.out.println(result); + } + + protected void calcOptimizedResult() { + } + + protected boolean writer2ReaderConstraint() { + return true; + } + + protected boolean writerTreeConstraint() { + return true; + } + + protected boolean readerTreeConstraint() { + return true; + } + + @Override + public String toString() { + StringBuilder result = new StringBuilder(); + result.append("[").append(wDelay).append(",").append(w2rDelay).append(",").append(rDelay).append("]\n") + .append("reader tree: degree ").append(treeDegreeR).append(", count ").append(rootCountR).append(",\n") + .append("writer tree: degree ").append(treeDegreeW).append(", count ").append(rootCountW); + return result.toString(); + } +}