diff --git a/src/main/java/org/bdware/sc/ContractClient.java b/src/main/java/org/bdware/sc/ContractClient.java index 04464e5..514f349 100644 --- a/src/main/java/org/bdware/sc/ContractClient.java +++ b/src/main/java/org/bdware/sc/ContractClient.java @@ -384,9 +384,6 @@ public class ContractClient { get.asyncGet("", "setIdentifier", alias, null); } - public boolean isUnit() { - return contractMeta.contract.getType() != ContractExecType.Sole; - } public String signResult(String result) { return contractMeta.contract.signResult(result); diff --git a/src/main/java/org/bdware/sc/ContractManager.java b/src/main/java/org/bdware/sc/ContractManager.java index 04fa9ac..afc1f9f 100644 --- a/src/main/java/org/bdware/sc/ContractManager.java +++ b/src/main/java/org/bdware/sc/ContractManager.java @@ -890,6 +890,7 @@ public class ContractManager { case RequestAllResponseHalf: case Sharding: case Sole: + case PBFT: ret = client.startProcess(ps); conflictCheck = checkConflict(c, client, ret); if (null != conflictCheck) { @@ -925,7 +926,7 @@ public class ContractManager { return ret; default: - return "todo"; + return "contract manager can't support:" + c.getType(); } } catch (Exception e) { e.printStackTrace(); @@ -1462,10 +1463,11 @@ public class ContractManager { long start = System.currentTimeMillis(); ResultCallback eventExtractor = new ResultCallback() { @Override - public void onResult(String ret){ + public void onResult(String ret) { JsonObject result = JsonUtil.parseStringAsJsonObject(ret); this.onResult(result); } + @Override public void onResult(JsonObject result) { ContractManager.instance.extractEventsFromContractResult( diff --git a/src/main/java/org/bdware/sc/sequencing/PBFTAlgorithm.java b/src/main/java/org/bdware/sc/sequencing/PBFTAlgorithm.java index 45f052b..fdc0f68 100644 --- a/src/main/java/org/bdware/sc/sequencing/PBFTAlgorithm.java +++ b/src/main/java/org/bdware/sc/sequencing/PBFTAlgorithm.java @@ -1,5 +1,7 @@ package org.bdware.sc.sequencing; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.bdware.sc.ContractManager; import org.bdware.sc.bean.ContractRequest; import org.bdware.sc.conn.Node; @@ -12,6 +14,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class PBFTAlgorithm implements CommitAlgorithm { + private static final Logger LOGGER = LogManager.getLogger(PBFTAlgorithm.class); + static byte[] GETPUBKEY = "GETPUBKEY".getBytes(); Committer committer; Map members; @@ -22,6 +26,7 @@ public class PBFTAlgorithm implements CommitAlgorithm { Map> original; boolean isMaster; private TrustfulExecutorConnection connection; + private int sendID; public PBFTAlgorithm(boolean isMaster) { commitedMsg = new ArrayList<>(); @@ -31,6 +36,14 @@ public class PBFTAlgorithm implements CommitAlgorithm { this.isMaster = isMaster; } + public void setSendID(int id) { + sendID = id; + } + + public void addMember(Node node, PBFTMember member) { + members.put(node, member); + } + public void setCommitter(Committer c) { committer = c; } @@ -46,13 +59,14 @@ public class PBFTAlgorithm implements CommitAlgorithm { // pbftMessage.content.length); PCInfo temp; PBFTMessage prepareMsg; - System.out.println("[PBFTAlgorithm] recv: " + pbftMessage.getDisplayStr()); + LOGGER.info("recv: " + pbftMessage.getDisplayStr()); switch (pbftMessage.type) { case AddMember: - PBFTMember member = PBFTMember.parse(pbftMessage.content); - if (member != null) - members.put(sender, member); + // PBFTMember member = PBFTMember.parse(pbftMessage.content); + // if (member != null) + // members.put(sender, member); // case DeleteMember: + // JUST ignore break; case Request: // all nodes save full content, clientIP, clientPort !!!! @@ -91,12 +105,12 @@ public class PBFTAlgorithm implements CommitAlgorithm { break; case Prepare: temp = info.get(pbftMessage.order); - // System.out.println(getPort() + ": receive Prepare from:" + packet.getPort()); + LOGGER.info("receive Prepare from:" + pbftMessage.sendID + " -> " + pbftMessage.order); if (temp == null) { PCInfo pcInfo = new PCInfo(); pcInfo.buff.add(pbftMessage); info.put(pbftMessage.order, pcInfo); - requetPrePrepareFromMaster(pbftMessage.order); + requestPrePrepareFromMaster(pbftMessage.order); } else if (temp.updatePrepare(pbftMessage, this)) { // check the sender (is master) and order is in range !!!! PBFTMessage commitMsg = new PBFTMessage(); @@ -113,7 +127,7 @@ public class PBFTAlgorithm implements CommitAlgorithm { PCInfo pcInfo = new PCInfo(); pcInfo.buff.add(pbftMessage); info.put(pbftMessage.order, pcInfo); - requetPrePrepareFromMaster(pbftMessage.order); + requestPrePrepareFromMaster(pbftMessage.order); break; } if (temp.updateCommit(pbftMessage, this)) { @@ -177,12 +191,14 @@ public class PBFTAlgorithm implements CommitAlgorithm { } } - private void requetPrePrepareFromMaster(long order) { - PBFTMessage message = new PBFTMessage(); - message.type = PBFTType.ReSend; - message.order = order; - message.content = new byte[1]; - sendToMaster(message); + private void requestPrePrepareFromMaster(long order) { + +// PBFTMessage message = new PBFTMessage(); +// message.type = PBFTType.ReSend; +// message.order = order; +// message.content = new byte[1]; +// sendToMaster(message); + LOGGER.info("request Resend"); } private void matchPrePrepareFromOriginReqeust(int hash, PBFTMessage pbftMessage) { @@ -239,7 +255,7 @@ public class PBFTAlgorithm implements CommitAlgorithm { PBFTMessage original = getOriginalMessage(pbftMessage.order); // execute(pbftMessage); if (pbftMessage.order == commitedOrder.get() + 1) { - execute(original); + execute(pbftMessage); } else { commitedMsg.add(original); } @@ -247,11 +263,13 @@ public class PBFTAlgorithm implements CommitAlgorithm { private synchronized void execute(PBFTMessage pbftMessage) { commitedOrder.incrementAndGet(); + LOGGER.info("execute:" + pbftMessage + " infoKeySet:" + JsonUtil.toJson(info.keySet())); + + PBFTMessage original = getOriginalMessage(pbftMessage.order); ContractRequest msg = ContractRequest.parse(original.content); committer.onCommit(msg); - System.out.println(": execute, " + JsonUtil.toJson(msg)); info.remove(pbftMessage.order); // ContractProcess.instance.onEvent(msg); commitedMsg.sort((o1, o2) -> (int) (o1.order - o2.order)); @@ -275,17 +293,15 @@ public class PBFTAlgorithm implements CommitAlgorithm { } private void broadcast(PBFTMessage pbftMessage) { - for (Node node : members.keySet()) { - System.out.println("[PBFTAlgorithm] send: " + pbftMessage.getDisplayStr()); - - connection.sendMessage(node, pbftMessage.getBytes()); - } + pbftMessage.sendID = sendID; + connection.broadcast(pbftMessage.getBytes()); } private void sendToMaster(PBFTMessage pbftMessage) { for (Node node : members.keySet()) { PBFTMember member = members.get(node); if (member.isMaster) { + pbftMessage.sendID = sendID; connection.sendMessage(node, pbftMessage.getBytes()); return; } @@ -302,6 +318,11 @@ public class PBFTAlgorithm implements CommitAlgorithm { onPBFTMessage(node, pbftMessage); } + public void setAtomSeq(int i) { + allocatedID.set(i); + commitedOrder.set(i); + } + static class NodeInfo { SM2KeyPair privKey; int hash; diff --git a/src/main/java/org/bdware/sc/sequencing/PBFTMessage.java b/src/main/java/org/bdware/sc/sequencing/PBFTMessage.java index 9115fd5..5ae6fcf 100644 --- a/src/main/java/org/bdware/sc/sequencing/PBFTMessage.java +++ b/src/main/java/org/bdware/sc/sequencing/PBFTMessage.java @@ -1,11 +1,11 @@ package org.bdware.sc.sequencing; +import org.bdware.sc.conn.ByteUtil; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; -import org.bdware.sc.conn.ByteUtil; - public class PBFTMessage { PBFTType type; int sendID; @@ -64,7 +64,7 @@ public class PBFTMessage { return msg; } - private static String IDA = "PBFTMsg"; + private static String IDA = "PBFTMsg"; // public void sign(SM2KeyPair pair) { // sendID = Arrays.hashCode(pair.getPublicKey().getEncoded(false)); @@ -96,4 +96,8 @@ public class PBFTMessage { sb.append(new String(content)); return sb.toString(); } + + public void setOrder(long order) { + this.order = order; + } } diff --git a/src/main/java/org/bdware/sc/units/ContractUnitController.java b/src/main/java/org/bdware/sc/units/ContractUnitController.java index 459760b..4d96b2e 100644 --- a/src/main/java/org/bdware/sc/units/ContractUnitController.java +++ b/src/main/java/org/bdware/sc/units/ContractUnitController.java @@ -120,6 +120,13 @@ public class ContractUnitController { // TODO sig here. connection.sendMessage(member.getNode(), unitMsg.toByteArray()); } + + @Override + public List getNodes() { + List ret = new ArrayList<>(); + ret.addAll(node2member.keySet()); + return ret; + } } static class PCInfo { diff --git a/src/main/java/org/bdware/sc/units/PubKeyNode.java b/src/main/java/org/bdware/sc/units/PubKeyNode.java new file mode 100644 index 0000000..471f0c8 --- /dev/null +++ b/src/main/java/org/bdware/sc/units/PubKeyNode.java @@ -0,0 +1,7 @@ +package org.bdware.sc.units; + +import org.bdware.sc.conn.Node; + +public class PubKeyNode extends Node { + public String pubkey; +} diff --git a/src/main/java/org/bdware/sc/units/TrustfulExecutorConnection.java b/src/main/java/org/bdware/sc/units/TrustfulExecutorConnection.java index c105944..7c03337 100644 --- a/src/main/java/org/bdware/sc/units/TrustfulExecutorConnection.java +++ b/src/main/java/org/bdware/sc/units/TrustfulExecutorConnection.java @@ -2,6 +2,15 @@ package org.bdware.sc.units; import org.bdware.sc.conn.Node; -public interface TrustfulExecutorConnection { - public void sendMessage(Node node, byte[] msg); +import java.util.List; + +public interface TrustfulExecutorConnection { + public void sendMessage(T node, byte[] msg); + + default public void broadcast(byte[] msg) { + for (T t : getNodes()) + sendMessage(t, msg); + } + + public List getNodes(); } \ No newline at end of file diff --git a/src/main/java/org/bdware/server/trustedmodel/ContractExecutor.java b/src/main/java/org/bdware/server/trustedmodel/ContractExecutor.java index ba6cce8..b976816 100644 --- a/src/main/java/org/bdware/server/trustedmodel/ContractExecutor.java +++ b/src/main/java/org/bdware/server/trustedmodel/ContractExecutor.java @@ -1,10 +1,14 @@ package org.bdware.server.trustedmodel; import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.conn.Node; import org.bdware.sc.conn.OnHashCallback; import org.bdware.sc.conn.ResultCallback; public interface ContractExecutor { void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb); + default void onSyncMessage(Node node, byte[] data) { + + } }