merge pbft algorithm

This commit is contained in:
CaiHQ 2021-11-22 11:50:40 +08:00
parent 6bf6343c50
commit b136681526
8 changed files with 81 additions and 30 deletions

View File

@ -384,9 +384,6 @@ public class ContractClient {
get.asyncGet("", "setIdentifier", alias, null);
}
public boolean isUnit() {
return contractMeta.contract.getType() != ContractExecType.Sole;
}
public String signResult(String result) {
return contractMeta.contract.signResult(result);

View File

@ -890,6 +890,7 @@ public class ContractManager {
case RequestAllResponseHalf:
case Sharding:
case Sole:
case PBFT:
ret = client.startProcess(ps);
conflictCheck = checkConflict(c, client, ret);
if (null != conflictCheck) {
@ -925,7 +926,7 @@ public class ContractManager {
return ret;
default:
return "todo";
return "contract manager can't support:" + c.getType();
}
} catch (Exception e) {
e.printStackTrace();
@ -1466,6 +1467,7 @@ public class ContractManager {
JsonObject result = JsonUtil.parseStringAsJsonObject(ret);
this.onResult(result);
}
@Override
public void onResult(JsonObject result) {
ContractManager.instance.extractEventsFromContractResult(

View File

@ -1,5 +1,7 @@
package org.bdware.sc.sequencing;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.ContractManager;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.Node;
@ -12,6 +14,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class PBFTAlgorithm implements CommitAlgorithm {
private static final Logger LOGGER = LogManager.getLogger(PBFTAlgorithm.class);
static byte[] GETPUBKEY = "GETPUBKEY".getBytes();
Committer committer;
Map<Node, PBFTMember> members;
@ -22,6 +26,7 @@ public class PBFTAlgorithm implements CommitAlgorithm {
Map<Integer, Pair<Node, PBFTMessage>> original;
boolean isMaster;
private TrustfulExecutorConnection connection;
private int sendID;
public PBFTAlgorithm(boolean isMaster) {
commitedMsg = new ArrayList<>();
@ -31,6 +36,14 @@ public class PBFTAlgorithm implements CommitAlgorithm {
this.isMaster = isMaster;
}
public void setSendID(int id) {
sendID = id;
}
public void addMember(Node node, PBFTMember member) {
members.put(node, member);
}
public void setCommitter(Committer c) {
committer = c;
}
@ -46,13 +59,14 @@ public class PBFTAlgorithm implements CommitAlgorithm {
// pbftMessage.content.length);
PCInfo temp;
PBFTMessage prepareMsg;
System.out.println("[PBFTAlgorithm] recv: " + pbftMessage.getDisplayStr());
LOGGER.info("recv: " + pbftMessage.getDisplayStr());
switch (pbftMessage.type) {
case AddMember:
PBFTMember member = PBFTMember.parse(pbftMessage.content);
if (member != null)
members.put(sender, member);
// PBFTMember member = PBFTMember.parse(pbftMessage.content);
// if (member != null)
// members.put(sender, member);
// case DeleteMember:
// JUST ignore
break;
case Request:
// all nodes save full content, clientIP, clientPort !!!!
@ -91,12 +105,12 @@ public class PBFTAlgorithm implements CommitAlgorithm {
break;
case Prepare:
temp = info.get(pbftMessage.order);
// System.out.println(getPort() + ": receive Prepare from:" + packet.getPort());
LOGGER.info("receive Prepare from:" + pbftMessage.sendID + " -> " + pbftMessage.order);
if (temp == null) {
PCInfo pcInfo = new PCInfo();
pcInfo.buff.add(pbftMessage);
info.put(pbftMessage.order, pcInfo);
requetPrePrepareFromMaster(pbftMessage.order);
requestPrePrepareFromMaster(pbftMessage.order);
} else if (temp.updatePrepare(pbftMessage, this)) {
// check the sender (is master) and order is in range !!!!
PBFTMessage commitMsg = new PBFTMessage();
@ -113,7 +127,7 @@ public class PBFTAlgorithm implements CommitAlgorithm {
PCInfo pcInfo = new PCInfo();
pcInfo.buff.add(pbftMessage);
info.put(pbftMessage.order, pcInfo);
requetPrePrepareFromMaster(pbftMessage.order);
requestPrePrepareFromMaster(pbftMessage.order);
break;
}
if (temp.updateCommit(pbftMessage, this)) {
@ -177,12 +191,14 @@ public class PBFTAlgorithm implements CommitAlgorithm {
}
}
private void requetPrePrepareFromMaster(long order) {
PBFTMessage message = new PBFTMessage();
message.type = PBFTType.ReSend;
message.order = order;
message.content = new byte[1];
sendToMaster(message);
private void requestPrePrepareFromMaster(long order) {
// PBFTMessage message = new PBFTMessage();
// message.type = PBFTType.ReSend;
// message.order = order;
// message.content = new byte[1];
// sendToMaster(message);
LOGGER.info("request Resend");
}
private void matchPrePrepareFromOriginReqeust(int hash, PBFTMessage pbftMessage) {
@ -239,7 +255,7 @@ public class PBFTAlgorithm implements CommitAlgorithm {
PBFTMessage original = getOriginalMessage(pbftMessage.order);
// execute(pbftMessage);
if (pbftMessage.order == commitedOrder.get() + 1) {
execute(original);
execute(pbftMessage);
} else {
commitedMsg.add(original);
}
@ -247,11 +263,13 @@ public class PBFTAlgorithm implements CommitAlgorithm {
private synchronized void execute(PBFTMessage pbftMessage) {
commitedOrder.incrementAndGet();
LOGGER.info("execute:" + pbftMessage + " infoKeySet:" + JsonUtil.toJson(info.keySet()));
PBFTMessage original = getOriginalMessage(pbftMessage.order);
ContractRequest msg = ContractRequest.parse(original.content);
committer.onCommit(msg);
System.out.println(": execute, " + JsonUtil.toJson(msg));
info.remove(pbftMessage.order);
// ContractProcess.instance.onEvent(msg);
commitedMsg.sort((o1, o2) -> (int) (o1.order - o2.order));
@ -275,17 +293,15 @@ public class PBFTAlgorithm implements CommitAlgorithm {
}
private void broadcast(PBFTMessage pbftMessage) {
for (Node node : members.keySet()) {
System.out.println("[PBFTAlgorithm] send: " + pbftMessage.getDisplayStr());
connection.sendMessage(node, pbftMessage.getBytes());
}
pbftMessage.sendID = sendID;
connection.broadcast(pbftMessage.getBytes());
}
private void sendToMaster(PBFTMessage pbftMessage) {
for (Node node : members.keySet()) {
PBFTMember member = members.get(node);
if (member.isMaster) {
pbftMessage.sendID = sendID;
connection.sendMessage(node, pbftMessage.getBytes());
return;
}
@ -302,6 +318,11 @@ public class PBFTAlgorithm implements CommitAlgorithm {
onPBFTMessage(node, pbftMessage);
}
public void setAtomSeq(int i) {
allocatedID.set(i);
commitedOrder.set(i);
}
static class NodeInfo {
SM2KeyPair privKey;
int hash;

View File

@ -1,11 +1,11 @@
package org.bdware.sc.sequencing;
import org.bdware.sc.conn.ByteUtil;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.bdware.sc.conn.ByteUtil;
public class PBFTMessage {
PBFTType type;
int sendID;
@ -96,4 +96,8 @@ public class PBFTMessage {
sb.append(new String(content));
return sb.toString();
}
public void setOrder(long order) {
this.order = order;
}
}

View File

@ -120,6 +120,13 @@ public class ContractUnitController {
// TODO sig here.
connection.sendMessage(member.getNode(), unitMsg.toByteArray());
}
@Override
public List getNodes() {
List<Node> ret = new ArrayList<>();
ret.addAll(node2member.keySet());
return ret;
}
}
static class PCInfo {

View File

@ -0,0 +1,7 @@
package org.bdware.sc.units;
import org.bdware.sc.conn.Node;
public class PubKeyNode extends Node {
public String pubkey;
}

View File

@ -2,6 +2,15 @@ package org.bdware.sc.units;
import org.bdware.sc.conn.Node;
public interface TrustfulExecutorConnection {
public void sendMessage(Node node, byte[] msg);
import java.util.List;
public interface TrustfulExecutorConnection<T extends Node> {
public void sendMessage(T node, byte[] msg);
default public void broadcast(byte[] msg) {
for (T t : getNodes())
sendMessage(t, msg);
}
public List<T> getNodes();
}

View File

@ -1,10 +1,14 @@
package org.bdware.server.trustedmodel;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.Node;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
public interface ContractExecutor {
void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb);
default void onSyncMessage(Node node, byte[] data) {
}
}