merge update

This commit is contained in:
CaiHQ 2024-05-23 20:13:34 +08:00
parent 2c4a3bda9a
commit 8e07feb733
5 changed files with 123 additions and 10 deletions

View File

@ -31,5 +31,6 @@ dependencies {
api project(":common") api project(":common")
api 'io.prometheus:simpleclient:0.12.0' api 'io.prometheus:simpleclient:0.12.0'
api 'org.hyperic.sigar:sigar:1.6.4' api 'org.hyperic.sigar:sigar:1.6.4'
api 'com.github.ben-manes.caffeine:caffeine:2.8.8'
testImplementation 'junit:junit:4.13.2' testImplementation 'junit:junit:4.13.2'
} }

View File

@ -307,8 +307,8 @@ public class ContractClient {
get = new SocketGet("127.0.0.1", port); get = new SocketGet("127.0.0.1", port);
get.syncGet("", "setDBInfo", ContractManager.dbPath); get.syncGet("", "setDBInfo", ContractManager.dbPath);
if (contractMeta.contract.isDebug() || get != null) { if (contractMeta.contract.isDebug() || get != null) {
String tagA = (ps == System.out ? "[Contract_" + port + "_out] " : ""); String tagA = (ps instanceof LoggerPrintStream ? "[Contract_" + port + "_out] " : "");
String tagB = (ps == System.out ? "[Contract_" + port + "_err] " : ""); String tagB = (ps instanceof LoggerPrintStream ? "[Contract_" + port + "_err] " : "");
outputTracer.track(process, sc, tagA, ps); outputTracer.track(process, sc, tagA, ps);
errorTracer.track(process, new Scanner(process.getErrorStream()), tagB, ps); errorTracer.track(process, new Scanner(process.getErrorStream()), tagB, ps);
} else { } else {

View File

@ -771,10 +771,10 @@ public class ContractManager {
} }
public String startContractAndRedirect(Contract c, PrintStream ps) { public String startContractAndRedirect(Contract c, PrintStream ps) {
long freeMemory = getFreeMemory(); // long freeMemory = getFreeMemory();
if (statusRecorder.runningProcess.size() > 5 && (freeMemory < memoryLimit)) { // if (statusRecorder.runningProcess.size() > 5 && (freeMemory < memoryLimit)) {
statusRecorder.hangLeastUsedContractProcess(); // statusRecorder.hangLeastUsedContractProcess();
} // }
ContractResult r; ContractResult r;
try { try {
if (c.getScriptStr().startsWith("/")) { if (c.getScriptStr().startsWith("/")) {
@ -799,7 +799,7 @@ public class ContractManager {
c.setOwner(c.getPublicKey()); c.setOwner(c.getPublicKey());
} }
LOGGER.debug("contract pubKey: " + c.getPublicKey()); LOGGER.debug("contract pubKey: " + c.getPublicKey());
YPKResourceManager.unzipYPK(c);
// 合约启动时读取Manifest文件设置合约DOI // 合约启动时读取Manifest文件设置合约DOI
setContractStateful(c); setContractStateful(c);
ContractClient client = new ContractClient(c); ContractClient client = new ContractClient(c);
@ -964,7 +964,7 @@ public class ContractManager {
} }
public String startContract(Contract c) { public String startContract(Contract c) {
return startContractAndRedirect(c, System.out); return startContractAndRedirect(c, new LoggerPrintStream(LOGGER));
} }
public String queryDEPort(String contractID) { public String queryDEPort(String contractID) {

View File

@ -0,0 +1,19 @@
package org.bdware.sc;
import org.apache.logging.log4j.Logger;
import java.io.PrintStream;
public class LoggerPrintStream extends PrintStream {
public LoggerPrintStream(Logger logger) {
super(System.out, true);
this.logger = logger;
}
Logger logger;
@Override
public void println(String str) {
logger.info(str);
}
}

View File

@ -0,0 +1,93 @@
package org.bdware.sc;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.bdware.sc.bean.Contract;
import java.io.*;
import java.nio.file.Files;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
public class YPKResourceManager {
public static String ypkCacheDir = "./ypkcache";
static ExecutorService es = Executors.newFixedThreadPool(2);
public static void unzipYPK(Contract contract) {
es.execute(() -> unzip(contract.getScriptStr(), getUnzipDir(contract)));
}
public static String getUnzipDir(Contract contract) {
return ypkCacheDir + "/" + contract.getID();
}
private static final long MAX_CACHE_SIZE = 2L * 1024 * 1024 * 1024; // 2GB
private static final long MAX_FILE_SIZE = 10 * 1024 * 1024; // 10MB
private static final Cache<String, byte[]> cache = Caffeine.newBuilder()
.maximumSize(MAX_CACHE_SIZE).expireAfterAccess(30, TimeUnit.MINUTES).build();
public static InputStream getCachedStream(Contract c, String path) throws IOException {
File target = new File(getUnzipDir(c), path);
String key = target.getAbsolutePath();
if (target.length() > MAX_FILE_SIZE) {
// 文件过大直接从文件系统读取
return new FileInputStream(target);
} else {
byte[] data = cache.get(key, k -> {
try {
return Files.readAllBytes(target.toPath());
} catch (IOException e) {
throw new RuntimeException(e);
}
});
// 从缓存中返回一个新的 ByteArrayInputStream
return new ByteArrayInputStream(data);
}
}
public static void unzip(String zipFilePath, String destDir) {
File dir = new File(destDir);
if (!dir.exists())
dir.mkdirs();
byte[] buffer = new byte[1024];
try {
ZipInputStream zis = new ZipInputStream(new FileInputStream(zipFilePath));
ZipEntry zipEntry = zis.getNextEntry();
while (zipEntry != null) {
String fileName = zipEntry.getName();
File newFile = new File(destDir + File.separator + fileName);
// 创建所有上级目录
new File(newFile.getParent()).mkdirs();
if (zipEntry.isDirectory()) {
if (!newFile.isDirectory() && !newFile.mkdirs()) {
throw new IOException("Failed to create directory " + newFile);
}
} else {
// 写入文件内容
FileOutputStream fos = new FileOutputStream(newFile);
int len;
while ((len = zis.read(buffer)) > 0) {
fos.write(buffer, 0, len);
}
fos.close();
}
zipEntry = zis.getNextEntry();
}
zis.closeEntry();
zis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static boolean existYPK(Contract contract) {
return new File(contract.getScriptStr()).exists();
}
}