diff --git a/build.gradle b/build.gradle index eefa89f..887fdfa 100644 --- a/build.gradle +++ b/build.gradle @@ -31,5 +31,6 @@ dependencies { api project(":common") api 'io.prometheus:simpleclient:0.12.0' api 'org.hyperic.sigar:sigar:1.6.4' + api 'com.github.ben-manes.caffeine:caffeine:2.8.8' testImplementation 'junit:junit:4.13.2' } diff --git a/src/main/java/org/bdware/sc/ContractClient.java b/src/main/java/org/bdware/sc/ContractClient.java index 0fe9d61..74545d6 100644 --- a/src/main/java/org/bdware/sc/ContractClient.java +++ b/src/main/java/org/bdware/sc/ContractClient.java @@ -307,8 +307,8 @@ public class ContractClient { get = new SocketGet("127.0.0.1", port); get.syncGet("", "setDBInfo", ContractManager.dbPath); if (contractMeta.contract.isDebug() || get != null) { - String tagA = (ps == System.out ? "[Contract_" + port + "_out] " : ""); - String tagB = (ps == System.out ? "[Contract_" + port + "_err] " : ""); + String tagA = (ps instanceof LoggerPrintStream ? "[Contract_" + port + "_out] " : ""); + String tagB = (ps instanceof LoggerPrintStream ? "[Contract_" + port + "_err] " : ""); outputTracer.track(process, sc, tagA, ps); errorTracer.track(process, new Scanner(process.getErrorStream()), tagB, ps); } else { diff --git a/src/main/java/org/bdware/sc/ContractManager.java b/src/main/java/org/bdware/sc/ContractManager.java index 774a2e7..3ee58c9 100644 --- a/src/main/java/org/bdware/sc/ContractManager.java +++ b/src/main/java/org/bdware/sc/ContractManager.java @@ -771,10 +771,10 @@ public class ContractManager { } public String startContractAndRedirect(Contract c, PrintStream ps) { - long freeMemory = getFreeMemory(); - if (statusRecorder.runningProcess.size() > 5 && (freeMemory < memoryLimit)) { - statusRecorder.hangLeastUsedContractProcess(); - } + // long freeMemory = getFreeMemory(); + // if (statusRecorder.runningProcess.size() > 5 && (freeMemory < memoryLimit)) { + // statusRecorder.hangLeastUsedContractProcess(); + // } ContractResult r; try { if (c.getScriptStr().startsWith("/")) { @@ -799,7 +799,7 @@ public class ContractManager { c.setOwner(c.getPublicKey()); } LOGGER.debug("contract pubKey: " + c.getPublicKey()); - + YPKResourceManager.unzipYPK(c); // 合约启动时读取Manifest文件设置合约DOI setContractStateful(c); ContractClient client = new ContractClient(c); @@ -964,7 +964,7 @@ public class ContractManager { } public String startContract(Contract c) { - return startContractAndRedirect(c, System.out); + return startContractAndRedirect(c, new LoggerPrintStream(LOGGER)); } public String queryDEPort(String contractID) { @@ -1009,7 +1009,7 @@ public class ContractManager { /* * reqCache.entrySet() .removeIf( entry -> { RespCache cache = entry.getValue(); if (cache * == null) return true; if (cache.count < 0) return true; return cache.time < time; }); - * + * */ } @@ -1657,7 +1657,7 @@ public class ContractManager { /* * public String resetMask(String contractName, JsonObject MaskObject) { ContractClient client = * getByName(contractName); System.out.println("contractName"+client.getContractName()); - * + * * //String result = client.get.syncGet("", "changeDebugFlag", String.valueOf(isDebug)); * //設置clinet當中的mask client.contractMeta.isDebug = isDebug; return "resetMask"; } */ diff --git a/src/main/java/org/bdware/sc/LoggerPrintStream.java b/src/main/java/org/bdware/sc/LoggerPrintStream.java new file mode 100644 index 0000000..8a08cea --- /dev/null +++ b/src/main/java/org/bdware/sc/LoggerPrintStream.java @@ -0,0 +1,19 @@ +package org.bdware.sc; + +import org.apache.logging.log4j.Logger; + +import java.io.PrintStream; + +public class LoggerPrintStream extends PrintStream { + public LoggerPrintStream(Logger logger) { + super(System.out, true); + this.logger = logger; + } + + Logger logger; + + @Override + public void println(String str) { + logger.info(str); + } +} diff --git a/src/main/java/org/bdware/sc/YPKResourceManager.java b/src/main/java/org/bdware/sc/YPKResourceManager.java new file mode 100644 index 0000000..795284f --- /dev/null +++ b/src/main/java/org/bdware/sc/YPKResourceManager.java @@ -0,0 +1,93 @@ +package org.bdware.sc; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.bdware.sc.bean.Contract; + +import java.io.*; +import java.nio.file.Files; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +public class YPKResourceManager { + public static String ypkCacheDir = "./ypkcache"; + static ExecutorService es = Executors.newFixedThreadPool(2); + + public static void unzipYPK(Contract contract) { + es.execute(() -> unzip(contract.getScriptStr(), getUnzipDir(contract))); + } + + public static String getUnzipDir(Contract contract) { + return ypkCacheDir + "/" + contract.getID(); + } + + private static final long MAX_CACHE_SIZE = 2L * 1024 * 1024 * 1024; // 2GB + private static final long MAX_FILE_SIZE = 10 * 1024 * 1024; // 10MB + + private static final Cache cache = Caffeine.newBuilder() + .maximumSize(MAX_CACHE_SIZE).expireAfterAccess(30, TimeUnit.MINUTES).build(); + + public static InputStream getCachedStream(Contract c, String path) throws IOException { + File target = new File(getUnzipDir(c), path); + String key = target.getAbsolutePath(); + + if (target.length() > MAX_FILE_SIZE) { + // 文件过大,直接从文件系统读取 + return new FileInputStream(target); + } else { + byte[] data = cache.get(key, k -> { + try { + return Files.readAllBytes(target.toPath()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + // 从缓存中返回一个新的 ByteArrayInputStream + return new ByteArrayInputStream(data); + } + } + + public static void unzip(String zipFilePath, String destDir) { + File dir = new File(destDir); + if (!dir.exists()) + dir.mkdirs(); + + byte[] buffer = new byte[1024]; + try { + ZipInputStream zis = new ZipInputStream(new FileInputStream(zipFilePath)); + ZipEntry zipEntry = zis.getNextEntry(); + while (zipEntry != null) { + String fileName = zipEntry.getName(); + File newFile = new File(destDir + File.separator + fileName); + // 创建所有上级目录 + new File(newFile.getParent()).mkdirs(); + + if (zipEntry.isDirectory()) { + if (!newFile.isDirectory() && !newFile.mkdirs()) { + throw new IOException("Failed to create directory " + newFile); + } + } else { + // 写入文件内容 + FileOutputStream fos = new FileOutputStream(newFile); + int len; + while ((len = zis.read(buffer)) > 0) { + fos.write(buffer, 0, len); + } + fos.close(); + } + zipEntry = zis.getNextEntry(); + } + zis.closeEntry(); + zis.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static boolean existYPK(Contract contract) { + return new File(contract.getScriptStr()).exists(); + } +}