add sharedvar

This commit is contained in:
CaiHQ 2023-06-09 17:45:02 +08:00
parent d76f816e92
commit 50991a89d0
6 changed files with 251 additions and 22 deletions

View File

@ -38,6 +38,7 @@ sourceSets {
dependencies {
api project(":common")
api project(":mockjava")
implementation 'org.bdware:delta-crdts:1.1.0'
implementation 'org.apache.commons:commons-lang3:3.0'
implementation 'com.atlassian.commonmark:commonmark:0.17.0'
implementation 'com.idealista:format-preserving-encryption:1.0.0'
@ -67,8 +68,8 @@ jar {
// uncomment this when publish,
// while develop at local use "false"
configurations.runtimeClasspath.filter {
it.getAbsolutePath().contains("/lib/")
//false
// it.getAbsolutePath().contains("/lib/")
false
}.collect {
it.isDirectory() ? it : zipTree(it)
}

View File

@ -742,6 +742,16 @@ public class ContractProcess {
LOGGER.error("DoipLocalSingleton cannot starts properly, plz check the onServerStart function");
e.printStackTrace();
}
funNode = cn.getFunction("onInitSharableVars");
if (funNode != null) {
ContractRequest requestForInitVar = new ContractRequest();
requestForInitVar.setAction("onInitSharableVars");
requestForInitVar.setArg(onStartingDoipServer.getArg());
requestForInitVar.setRequester(onStartingDoipServer.getRequester());
JsonElement onInitSharableVars = invoke(requestForInitVar, funNode).result;
returnValue.add("onInitSharableVars", onInitSharableVars);
}
}
private void handleLog() {

View File

@ -0,0 +1,105 @@
package org.bdware.sc.crdt;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.bdware.crdt.counter.GCounter;
import org.bdware.sc.util.JsonUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
public class SharableVar {
public final static HashedWheelTimer HASHED_WHEEL_TIMER = new HashedWheelTimer(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
return t;
}
}, 5, TimeUnit.MILLISECONDS, 2);
private final String varId;
private long interval;
private final String myId;
GCounter counter;
int offset;
List<String> sendTo;
private SyncTimeout nextTimeOut;
public SharableVar(String cpId, String identifier, SharableVarManager.VarResolveResult resolveResult) {
counter = new GCounter(cpId, identifier);
myId = cpId;
varId = identifier;
HASHED_WHEEL_TIMER.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
initByVarResolve(resolveResult);
}
}, 0, TimeUnit.MILLISECONDS);
}
public void join(String content) {
GCounter toJoin = JsonUtil.fromJson(content, GCounter.class);
counter.join(toJoin);
}
class SyncTimeout implements TimerTask {
@Override
public void run(Timeout timeout) throws Exception {
try {
syncVar();
} catch (Exception e) {
e.printStackTrace();
} finally {
HASHED_WHEEL_TIMER.newTimeout(nextTimeOut, interval, TimeUnit.MILLISECONDS);
}
}
}
private void syncVar() {
String content = JsonUtil.toJson(counter);
SharableVarManager.instance.broadcastSyncMessage(varId, sendTo, content);
}
private void initByVarResolve(SharableVarManager.VarResolveResult resolveResult) {
sendTo = new ArrayList<>();
//假设没有同一个人既是reader又是writer
offset = -1;
for (int i = 0; i < resolveResult.writer.length; i++) {
if (myId.equals(resolveResult.writer[i])) {
offset = i;
}
}
if (offset == -1)
for (int i = 0; i < resolveResult.reader.length; i++) {
if (myId.equals(resolveResult.reader[i])) {
offset = resolveResult.writer.length + i;
}
}
for (int i = 0; i < resolveResult.sendTo[offset].length; i++) {
int pos = resolveResult.sendTo[offset][i];
sendTo.add(findByOffset(pos, resolveResult));
}
interval = resolveResult.interval[offset];
nextTimeOut = new SyncTimeout();
HASHED_WHEEL_TIMER.newTimeout(nextTimeOut, interval, TimeUnit.MILLISECONDS);
}
private String findByOffset(int pos, SharableVarManager.VarResolveResult resolveResult) {
if (pos < resolveResult.writer.length)
return resolveResult.writer[pos];
return resolveResult.reader[pos - resolveResult.writer.length];
}
public GCounter get() {
return counter;
}
}

View File

@ -0,0 +1,121 @@
package org.bdware.sc.crdt;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.doip.audit.AuditDoaClient;
import org.bdware.doip.audit.EndpointConfig;
import org.bdware.doip.audit.client.AuditDoipClient;
import org.bdware.doip.audit.client.AuditIrpClient;
import org.bdware.doip.codec.JsonDoipMessage;
import org.bdware.doip.codec.doipMessage.DoipMessage;
import org.bdware.doip.codec.doipMessage.DoipMessageFactory;
import org.bdware.doip.codec.doipMessage.DoipResponseCode;
import org.bdware.doip.endpoint.client.DoipMessageCallback;
import org.bdware.irp.client.IrpClient;
import org.bdware.irp.stateinfo.StateInfoBase;
import org.bdware.sc.util.JsonUtil;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SharableVarManager {
static Logger LOGGER = LogManager.getLogger(SharableVarManager.class);
public static SharableVarManager instance;
private final String cpId;
Map<String, SharableVar> allVars;
IrpClient client;
AuditDoaClient doaClient;
public static final String SHARABLEOP = "86.100871/SyncVar";
public SharableVarManager(String id, EndpointConfig config) {
allVars = new ConcurrentHashMap<>();
client = new AuditIrpClient(config);
doaClient = new AuditDoaClient("", config, null);
cpId = id;
}
public static void initSharableVarManager(String id, EndpointConfig config) {
instance = new SharableVarManager(id, config);
}
public DoipMessage handleSyncMessage(DoipMessage message) {
try {
String varId = message.header.parameters.attributes.get("varId").getAsString();
String content = message.header.parameters.attributes.get("content").getAsString();
SharableVar var = getVar(varId);
var.join(content);
DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder();
builder.createResponse(DoipResponseCode.Success, message);
builder.addAttributes("msg", "success");
return builder.create();
} catch (Exception e) {
ByteArrayOutputStream bo = new ByteArrayOutputStream();
e.printStackTrace();
e.printStackTrace(new PrintStream(bo));
DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder();
builder.createResponse(DoipResponseCode.UnKnownError, message);
builder.addAttributes("exception", bo.toString());
return builder.create();
}
}
private SharableVar getVar(String varId) {
return allVars.get(varId);
}
private DoipMessage createSyncMessage(String target, String varId, String content) {
DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder();
builder.createRequest(target, SHARABLEOP);
builder.addAttributes("varId", varId);
builder.addAttributes("content", content);
return builder.create();
}
public void broadcastSyncMessage(String varId, List<String> sendTo, String content) {
for (String target : sendTo) {
DoipMessage doipMessage = createSyncMessage(target, varId, content);
AuditDoipClient client = getClient(target);
client.sendMessage(doipMessage, new DoipMessageCallback() {
@Override
public void onResult(DoipMessage doipMessage) {
LOGGER.info("RECV Sync:" + JsonUtil.toJson(JsonDoipMessage.fromDoipMessage(doipMessage)));
}
});
}
}
private AuditDoipClient getClient(String id) {
return doaClient.convertDoidToRepo(id);
}
static class VarResolveResult {
String[] writer;
String[] reader;
int[][] sendTo;
long[] interval;
long maxDelay;
}
public synchronized SharableVar createVar(String identifier, String type) {
try {
if (allVars.containsKey(identifier)) return allVars.get(identifier);
StateInfoBase stateInfoBase = client.resolve(identifier);
if (stateInfoBase.handleValues.has("bdwType") && stateInfoBase.handleValues.get("bdwType").getAsString().equals("SharableVar")) {
VarResolveResult resolveResult = JsonUtil.fromJson(stateInfoBase.handleValues, VarResolveResult.class);
SharableVar sharableVar = new SharableVar(cpId, identifier, resolveResult);
allVars.put(identifier, sharableVar);
return sharableVar;
} else return null;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}

View File

@ -14,11 +14,11 @@ import org.bdware.doip.endpoint.server.NettyServerHandler;
import org.bdware.sc.ContractProcess;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.boundry.JavaScriptEntry;
import org.bdware.sc.crdt.SharableVarManager;
import org.bdware.sc.entity.DoipMessagePacker;
import org.bdware.sc.node.FunctionNode;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.PrintStream;
import java.util.HashMap;
import java.util.Map;
@ -42,21 +42,8 @@ public class DOOPRequestHandler implements DoipRequestHandler {
@Override
public DoipMessage onRequest(ChannelHandlerContext ctx, DoipMessage msg) {
String str = msg.header.parameters.operation;
if (msg.header != null && msg.header.parameters.attributes != null && msg.header.parameters.attributes.has("readGlobalVar")) {
try {
//TODO @wangxuxing
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(bo);
String var = msg.header.parameters.attributes.get("readGlobalVar").getAsString();
Object result = null;
out.writeObject(result);
DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder();
builder.createResponse(DoipResponseCode.Success, msg);
builder.setBody(bo.toByteArray());
return builder.create();
} catch (Exception e) {
}
//return ....
if (msg.header != null && msg.header.parameters.operation.equals(SharableVarManager.SHARABLEOP)) {
return SharableVarManager.instance.handleSyncMessage(msg);
}
logger.debug("[Call operation] name: " + str);
if (str != null) {

View File

@ -12,6 +12,7 @@ import org.bdware.doip.endpoint.server.DoipServerImpl;
import org.bdware.doip.endpoint.server.DoipServiceInfo;
import org.bdware.doip.endpoint.server.StartServerCallback;
import org.bdware.sc.ContractProcess;
import org.bdware.sc.crdt.SharableVarManager;
import org.bdware.sc.handler.DOOPRequestHandler;
import java.net.URI;
@ -42,8 +43,11 @@ public class DoipLocalSingleton {
public static int run(int port, JsonElement otherConfigs) throws InterruptedException {
int i = -1;
LOGGER.info("try to listener port:" + port);
for (i = run("tcp://127.0.0.1:" + port++, otherConfigs); i < 0; ) {
int j = 0;
for (i = run("tcp://127.0.0.1:" + port++, otherConfigs); i < 0 && j < 3; j++) {
LOGGER.info("try again to listener port:" + port);
LOGGER.error("try again to listener port:" + port);
System.out.println("try again to listener port:" + port);
i = run("tcp://127.0.0.1:" + port++, otherConfigs);
}
return i;
@ -62,9 +66,10 @@ public class DoipLocalSingleton {
String repoID = "bdtest/BDRepo/" + UUID.randomUUID().toString();
String owner = ContractProcess.instance.getContract().getOwner();
String repoType = "Repository";
EndpointConfig config = null;
try {
if (otherConfigs != null && otherConfigs.isJsonObject()) {
EndpointConfig config = new TempConfigStorage(otherConfigs.toString()).loadAsEndpointConfig();
config = new TempConfigStorage(otherConfigs.toString()).loadAsEndpointConfig();
if (config.routerURI != null) {
AuditIrpClient irpClient = new AuditIrpClient(config);
EndpointInfo endpointInfo = irpClient.getEndpointInfo();
@ -81,11 +86,11 @@ public class DoipLocalSingleton {
server = new DoipServerImpl(info);
DOOPRequestHandler handler = ContractProcess.instance.doopRequestHandler;
server.setRequestCallback(handler);
SharableVarManager.initSharableVarManager(info.id, config);
ResultChecker checker = new ResultChecker();
server.start(checker);
checker.waitForResult(1000);
if (checker.port > 0)
return port;
if (checker.port > 0) return port;
return -1;
}