diff --git a/build.gradle b/build.gradle index 11fa2e8..c6e05bc 100644 --- a/build.gradle +++ b/build.gradle @@ -38,6 +38,7 @@ sourceSets { dependencies { api project(":common") api project(":mockjava") + implementation 'org.bdware:delta-crdts:1.1.0' implementation 'org.apache.commons:commons-lang3:3.0' implementation 'com.atlassian.commonmark:commonmark:0.17.0' implementation 'com.idealista:format-preserving-encryption:1.0.0' @@ -67,8 +68,8 @@ jar { // uncomment this when publish, // while develop at local use "false" configurations.runtimeClasspath.filter { - it.getAbsolutePath().contains("/lib/") - //false + // it.getAbsolutePath().contains("/lib/") + false }.collect { it.isDirectory() ? it : zipTree(it) } diff --git a/src/main/java/org/bdware/sc/ContractProcess.java b/src/main/java/org/bdware/sc/ContractProcess.java index 5f1d41e..a845b8f 100644 --- a/src/main/java/org/bdware/sc/ContractProcess.java +++ b/src/main/java/org/bdware/sc/ContractProcess.java @@ -742,6 +742,16 @@ public class ContractProcess { LOGGER.error("DoipLocalSingleton cannot starts properly, plz check the onServerStart function"); e.printStackTrace(); } + funNode = cn.getFunction("onInitSharableVars"); + + if (funNode != null) { + ContractRequest requestForInitVar = new ContractRequest(); + requestForInitVar.setAction("onInitSharableVars"); + requestForInitVar.setArg(onStartingDoipServer.getArg()); + requestForInitVar.setRequester(onStartingDoipServer.getRequester()); + JsonElement onInitSharableVars = invoke(requestForInitVar, funNode).result; + returnValue.add("onInitSharableVars", onInitSharableVars); + } } private void handleLog() { diff --git a/src/main/java/org/bdware/sc/crdt/SharableVar.java b/src/main/java/org/bdware/sc/crdt/SharableVar.java new file mode 100644 index 0000000..a2755e4 --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/SharableVar.java @@ -0,0 +1,105 @@ +package org.bdware.sc.crdt; + +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import org.bdware.crdt.counter.GCounter; +import org.bdware.sc.util.JsonUtil; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +public class SharableVar { + public final static HashedWheelTimer HASHED_WHEEL_TIMER = new HashedWheelTimer(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + return t; + } + }, 5, TimeUnit.MILLISECONDS, 2); + private final String varId; + + private long interval; + private final String myId; + GCounter counter; + int offset; + List sendTo; + private SyncTimeout nextTimeOut; + + + public SharableVar(String cpId, String identifier, SharableVarManager.VarResolveResult resolveResult) { + counter = new GCounter(cpId, identifier); + myId = cpId; + varId = identifier; + HASHED_WHEEL_TIMER.newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + initByVarResolve(resolveResult); + + } + }, 0, TimeUnit.MILLISECONDS); + + } + + public void join(String content) { + GCounter toJoin = JsonUtil.fromJson(content, GCounter.class); + counter.join(toJoin); + } + + class SyncTimeout implements TimerTask { + @Override + public void run(Timeout timeout) throws Exception { + try { + syncVar(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + HASHED_WHEEL_TIMER.newTimeout(nextTimeOut, interval, TimeUnit.MILLISECONDS); + } + } + } + + private void syncVar() { + String content = JsonUtil.toJson(counter); + SharableVarManager.instance.broadcastSyncMessage(varId, sendTo, content); + } + + private void initByVarResolve(SharableVarManager.VarResolveResult resolveResult) { + sendTo = new ArrayList<>(); + //假设没有同一个人既是reader又是writer。 + offset = -1; + for (int i = 0; i < resolveResult.writer.length; i++) { + if (myId.equals(resolveResult.writer[i])) { + offset = i; + } + } + if (offset == -1) + for (int i = 0; i < resolveResult.reader.length; i++) { + if (myId.equals(resolveResult.reader[i])) { + offset = resolveResult.writer.length + i; + } + } + for (int i = 0; i < resolveResult.sendTo[offset].length; i++) { + int pos = resolveResult.sendTo[offset][i]; + sendTo.add(findByOffset(pos, resolveResult)); + } + interval = resolveResult.interval[offset]; + nextTimeOut = new SyncTimeout(); + HASHED_WHEEL_TIMER.newTimeout(nextTimeOut, interval, TimeUnit.MILLISECONDS); + } + + private String findByOffset(int pos, SharableVarManager.VarResolveResult resolveResult) { + if (pos < resolveResult.writer.length) + return resolveResult.writer[pos]; + return resolveResult.reader[pos - resolveResult.writer.length]; + } + + public GCounter get() { + return counter; + } + +} diff --git a/src/main/java/org/bdware/sc/crdt/SharableVarManager.java b/src/main/java/org/bdware/sc/crdt/SharableVarManager.java new file mode 100644 index 0000000..6cd1c34 --- /dev/null +++ b/src/main/java/org/bdware/sc/crdt/SharableVarManager.java @@ -0,0 +1,121 @@ +package org.bdware.sc.crdt; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.bdware.doip.audit.AuditDoaClient; +import org.bdware.doip.audit.EndpointConfig; +import org.bdware.doip.audit.client.AuditDoipClient; +import org.bdware.doip.audit.client.AuditIrpClient; +import org.bdware.doip.codec.JsonDoipMessage; +import org.bdware.doip.codec.doipMessage.DoipMessage; +import org.bdware.doip.codec.doipMessage.DoipMessageFactory; +import org.bdware.doip.codec.doipMessage.DoipResponseCode; +import org.bdware.doip.endpoint.client.DoipMessageCallback; +import org.bdware.irp.client.IrpClient; +import org.bdware.irp.stateinfo.StateInfoBase; +import org.bdware.sc.util.JsonUtil; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class SharableVarManager { + static Logger LOGGER = LogManager.getLogger(SharableVarManager.class); + public static SharableVarManager instance; + private final String cpId; + Map allVars; + IrpClient client; + AuditDoaClient doaClient; + + + public static final String SHARABLEOP = "86.100871/SyncVar"; + + public SharableVarManager(String id, EndpointConfig config) { + allVars = new ConcurrentHashMap<>(); + client = new AuditIrpClient(config); + doaClient = new AuditDoaClient("", config, null); + cpId = id; + } + + public static void initSharableVarManager(String id, EndpointConfig config) { + instance = new SharableVarManager(id, config); + } + + public DoipMessage handleSyncMessage(DoipMessage message) { + try { + String varId = message.header.parameters.attributes.get("varId").getAsString(); + String content = message.header.parameters.attributes.get("content").getAsString(); + SharableVar var = getVar(varId); + var.join(content); + DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); + builder.createResponse(DoipResponseCode.Success, message); + builder.addAttributes("msg", "success"); + return builder.create(); + } catch (Exception e) { + ByteArrayOutputStream bo = new ByteArrayOutputStream(); + e.printStackTrace(); + e.printStackTrace(new PrintStream(bo)); + DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); + builder.createResponse(DoipResponseCode.UnKnownError, message); + builder.addAttributes("exception", bo.toString()); + return builder.create(); + } + } + + private SharableVar getVar(String varId) { + return allVars.get(varId); + } + + private DoipMessage createSyncMessage(String target, String varId, String content) { + DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); + builder.createRequest(target, SHARABLEOP); + builder.addAttributes("varId", varId); + builder.addAttributes("content", content); + return builder.create(); + } + + public void broadcastSyncMessage(String varId, List sendTo, String content) { + for (String target : sendTo) { + DoipMessage doipMessage = createSyncMessage(target, varId, content); + AuditDoipClient client = getClient(target); + client.sendMessage(doipMessage, new DoipMessageCallback() { + @Override + public void onResult(DoipMessage doipMessage) { + LOGGER.info("RECV Sync:" + JsonUtil.toJson(JsonDoipMessage.fromDoipMessage(doipMessage))); + } + }); + } + } + + private AuditDoipClient getClient(String id) { + return doaClient.convertDoidToRepo(id); + } + + + static class VarResolveResult { + String[] writer; + String[] reader; + int[][] sendTo; + long[] interval; + long maxDelay; + } + + public synchronized SharableVar createVar(String identifier, String type) { + try { + if (allVars.containsKey(identifier)) return allVars.get(identifier); + StateInfoBase stateInfoBase = client.resolve(identifier); + if (stateInfoBase.handleValues.has("bdwType") && stateInfoBase.handleValues.get("bdwType").getAsString().equals("SharableVar")) { + VarResolveResult resolveResult = JsonUtil.fromJson(stateInfoBase.handleValues, VarResolveResult.class); + SharableVar sharableVar = new SharableVar(cpId, identifier, resolveResult); + allVars.put(identifier, sharableVar); + return sharableVar; + } else return null; + } catch (Exception e) { + e.printStackTrace(); + return null; + } + } + +} diff --git a/src/main/java/org/bdware/sc/handler/DOOPRequestHandler.java b/src/main/java/org/bdware/sc/handler/DOOPRequestHandler.java index 73d27fd..49f730e 100644 --- a/src/main/java/org/bdware/sc/handler/DOOPRequestHandler.java +++ b/src/main/java/org/bdware/sc/handler/DOOPRequestHandler.java @@ -14,11 +14,11 @@ import org.bdware.doip.endpoint.server.NettyServerHandler; import org.bdware.sc.ContractProcess; import org.bdware.sc.bean.ContractRequest; import org.bdware.sc.boundry.JavaScriptEntry; +import org.bdware.sc.crdt.SharableVarManager; import org.bdware.sc.entity.DoipMessagePacker; import org.bdware.sc.node.FunctionNode; import java.io.ByteArrayOutputStream; -import java.io.ObjectOutputStream; import java.io.PrintStream; import java.util.HashMap; import java.util.Map; @@ -42,21 +42,8 @@ public class DOOPRequestHandler implements DoipRequestHandler { @Override public DoipMessage onRequest(ChannelHandlerContext ctx, DoipMessage msg) { String str = msg.header.parameters.operation; - if (msg.header != null && msg.header.parameters.attributes != null && msg.header.parameters.attributes.has("readGlobalVar")) { - try { - //TODO @wangxuxing - ByteArrayOutputStream bo = new ByteArrayOutputStream(); - ObjectOutputStream out = new ObjectOutputStream(bo); - String var = msg.header.parameters.attributes.get("readGlobalVar").getAsString(); - Object result = null; - out.writeObject(result); - DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); - builder.createResponse(DoipResponseCode.Success, msg); - builder.setBody(bo.toByteArray()); - return builder.create(); - } catch (Exception e) { - } - //return .... + if (msg.header != null && msg.header.parameters.operation.equals(SharableVarManager.SHARABLEOP)) { + return SharableVarManager.instance.handleSyncMessage(msg); } logger.debug("[Call operation] name: " + str); if (str != null) { diff --git a/src/main/java/org/bdware/sc/server/DoipLocalSingleton.java b/src/main/java/org/bdware/sc/server/DoipLocalSingleton.java index 1e7ea1a..e8a7962 100644 --- a/src/main/java/org/bdware/sc/server/DoipLocalSingleton.java +++ b/src/main/java/org/bdware/sc/server/DoipLocalSingleton.java @@ -12,6 +12,7 @@ import org.bdware.doip.endpoint.server.DoipServerImpl; import org.bdware.doip.endpoint.server.DoipServiceInfo; import org.bdware.doip.endpoint.server.StartServerCallback; import org.bdware.sc.ContractProcess; +import org.bdware.sc.crdt.SharableVarManager; import org.bdware.sc.handler.DOOPRequestHandler; import java.net.URI; @@ -42,8 +43,11 @@ public class DoipLocalSingleton { public static int run(int port, JsonElement otherConfigs) throws InterruptedException { int i = -1; LOGGER.info("try to listener port:" + port); - for (i = run("tcp://127.0.0.1:" + port++, otherConfigs); i < 0; ) { + int j = 0; + for (i = run("tcp://127.0.0.1:" + port++, otherConfigs); i < 0 && j < 3; j++) { LOGGER.info("try again to listener port:" + port); + LOGGER.error("try again to listener port:" + port); + System.out.println("try again to listener port:" + port); i = run("tcp://127.0.0.1:" + port++, otherConfigs); } return i; @@ -62,9 +66,10 @@ public class DoipLocalSingleton { String repoID = "bdtest/BDRepo/" + UUID.randomUUID().toString(); String owner = ContractProcess.instance.getContract().getOwner(); String repoType = "Repository"; + EndpointConfig config = null; try { if (otherConfigs != null && otherConfigs.isJsonObject()) { - EndpointConfig config = new TempConfigStorage(otherConfigs.toString()).loadAsEndpointConfig(); + config = new TempConfigStorage(otherConfigs.toString()).loadAsEndpointConfig(); if (config.routerURI != null) { AuditIrpClient irpClient = new AuditIrpClient(config); EndpointInfo endpointInfo = irpClient.getEndpointInfo(); @@ -81,11 +86,11 @@ public class DoipLocalSingleton { server = new DoipServerImpl(info); DOOPRequestHandler handler = ContractProcess.instance.doopRequestHandler; server.setRequestCallback(handler); + SharableVarManager.initSharableVarManager(info.id, config); ResultChecker checker = new ResultChecker(); server.start(checker); checker.waitForResult(1000); - if (checker.port > 0) - return port; + if (checker.port > 0) return port; return -1; }