diff --git a/build.gradle b/build.gradle
index d6e854f..395f82a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -65,7 +65,6 @@ protobuf {
artifact = 'io.grpc:protoc-gen-grpc-java:' + grpc_java_version
}
}
- // generatedFilesBaseDir = "$projectDir/src"
generateProtoTasks {
all().each { task ->
task.plugins {
diff --git a/src/main/java/org/bdware/bdledger/api/grpc/Client.java b/src/main/java/org/bdware/bdledger/api/grpc/Client.java
new file mode 100644
index 0000000..4c8cd5f
--- /dev/null
+++ b/src/main/java/org/bdware/bdledger/api/grpc/Client.java
@@ -0,0 +1,752 @@
+package org.bdware.bdledger.api.grpc;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Empty;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Metadata;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.MetadataUtils;
+import org.bdware.bdledger.api.grpc.pb.CommonProto.TransactionType;
+import org.bdware.bdledger.api.grpc.pb.LedgerGrpc;
+import org.bdware.bdledger.api.grpc.pb.LedgerProto.*;
+import org.bdware.bdledger.api.grpc.pb.NodeGrpc;
+import org.bdware.bdledger.api.grpc.pb.NodeProto.ClientVersionResponse;
+import org.bdware.bdledger.api.grpc.pb.QueryGrpc;
+import org.bdware.bdledger.api.grpc.pb.QueryProto.*;
+
+import java.time.ZonedDateTime;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * 事务账本客户端
+ *
+ *
如有更灵活的需求可直接使用{@link org.bdware.bdledger.api.grpc.pb.LedgerGrpc}类。
+ *
+ * @author nex
+ * @see 事务账本API
+ */
+public class Client {
+
+ private static final Logger logger = Logger.getLogger(Client.class.getName());
+
+ private ManagedChannel channel;
+ private NodeGrpc.NodeFutureStub nodeFutureStub;
+ private NodeGrpc.NodeBlockingStub nodeBlockingStub;
+ private LedgerGrpc.LedgerFutureStub ledgerFutureStub;
+ private LedgerGrpc.LedgerBlockingStub ledgerBlockingStub;
+ private QueryGrpc.QueryFutureStub queryFutureStub;
+ private QueryGrpc.QueryBlockingStub queryBlockingStub;
+// private final ManagedChannel channel;
+// private final NodeGrpc.NodeFutureStub nodeFutureStub;
+// private final NodeGrpc.NodeBlockingStub nodeBlockingStub;
+// private final LedgerGrpc.LedgerFutureStub ledgerFutureStub;
+// private final LedgerGrpc.LedgerBlockingStub ledgerBlockingStub;
+// private final QueryGrpc.QueryFutureStub queryFutureStub;
+// private final QueryGrpc.QueryBlockingStub queryBlockingStub;
+
+ /**
+ * 构造客户端来向{@code host:port} BDLedger节点发请求。
+ */
+ public Client(String host, int port) {
+ this(ManagedChannelBuilder.forAddress(host, port).usePlaintext(), null);
+ }
+
+ /**
+ * 构造客户端,使用{@code token}作access token来向{@code host:port} BDLedger节点发请求。
+ */
+ public Client(String host, int port, String token) {
+ this(ManagedChannelBuilder.forAddress(host, port).usePlaintext(), token);
+ }
+
+ /**
+ * 用已有的{@link io.grpc.Channel}对象构造客户端来向BDLedger节点发请求。
+ */
+ public Client(ManagedChannelBuilder> channelBuilder, String token) {
+ channel = channelBuilder.build();
+
+ nodeFutureStub = NodeGrpc.newFutureStub(channel);
+ nodeBlockingStub = NodeGrpc.newBlockingStub(channel);
+ ledgerFutureStub = LedgerGrpc.newFutureStub(channel);
+ ledgerBlockingStub = LedgerGrpc.newBlockingStub(channel);
+ queryFutureStub = QueryGrpc.newFutureStub(channel);
+ queryBlockingStub = QueryGrpc.newBlockingStub(channel);
+
+ if (token != null) {
+ Metadata fixedHeaders = new Metadata();
+ fixedHeaders.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), "bearer " + token);
+ nodeFutureStub = GrpcHeaderInterceptor.attachHeaders(nodeFutureStub, fixedHeaders);
+ nodeBlockingStub = GrpcHeaderInterceptor.attachHeaders(nodeBlockingStub, fixedHeaders);
+ ledgerFutureStub = GrpcHeaderInterceptor.attachHeaders(ledgerFutureStub, fixedHeaders);
+ ledgerBlockingStub = GrpcHeaderInterceptor.attachHeaders(ledgerBlockingStub, fixedHeaders);
+ queryFutureStub = GrpcHeaderInterceptor.attachHeaders(queryFutureStub, fixedHeaders);
+ queryBlockingStub = GrpcHeaderInterceptor.attachHeaders(queryBlockingStub, fixedHeaders);
+ }
+ }
+
+ /**
+ * 关闭客户端的网络连接。
+ */
+ public void shutdown() throws InterruptedException {
+ channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ }
+
+ /**
+ * 查询BDLedger节点版本
+ * (非阻塞)
+ */
+ public ListenableFuture clientVersion() {
+
+ info("*** clientVersion");
+
+ try {
+ return nodeFutureStub.clientVersion(Empty.getDefaultInstance());
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ return null;
+ }
+ }
+
+ /**
+ * 查询BDLedger节点版本
+ * (阻塞)
+ */
+ public ClientVersionResponse clientVersionSync() {
+
+ info("*** clientVersionSync");
+
+ try {
+ return nodeBlockingStub.clientVersion(Empty.getDefaultInstance());
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ throw e;
+ }
+ }
+
+ /**
+ * 创建账本
+ * (非阻塞)
+ */
+ public ListenableFuture createLedger(String name) {
+
+ info("*** createLedger: name={0}", name);
+
+ try {
+ return ledgerFutureStub.createLedger(createLedgerRequest(name));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ return null;
+ }
+ }
+
+ /**
+ * 创建账本
+ * (阻塞)
+ */
+ public CreateLedgerResponse createLedgerSync(String name) {
+
+ info("*** createLedgerSync: name={0}", name);
+
+ try {
+ return ledgerBlockingStub.createLedger(createLedgerRequest(name));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ throw e;
+ }
+ }
+
+ private CreateLedgerRequest createLedgerRequest(String name) {
+ return CreateLedgerRequest.newBuilder().setName(name).build();
+ }
+
+ /**
+ * 返回账本列表
+ * (非阻塞)
+ */
+ public ListenableFuture getLedgers() {
+
+ info("*** getLedgers");
+
+ try {
+ return ledgerFutureStub.getLedgers(Empty.getDefaultInstance());
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ return null;
+ }
+ }
+
+ /**
+ * 返回账本列表
+ * (阻塞)
+ */
+ public GetLedgersResponse getLedgersSync() {
+
+ info("*** getLedgersSync");
+
+ try {
+ return ledgerBlockingStub.getLedgers(Empty.getDefaultInstance());
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ throw e;
+ }
+ }
+
+ /**
+ * 发送新事务
+ * (非阻塞)
+ */
+ public ListenableFuture sendTransaction(
+ String ledger, TransactionType type, String from, long nonce, String to, byte[] data) {
+
+ info(
+ "*** sendTransaction: ledger={0} type={1} from={2} to={3} data={4}",
+ ledger, type, from, to, data);
+
+ try {
+ return ledgerFutureStub.sendTransaction(SendTransactionRequest(ledger, type, from, nonce, to, data));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ return null;
+ }
+ }
+
+ /**
+ * 发送新事务
+ * (阻塞)
+ */
+ public SendTransactionResponse sendTransactionSync(
+ String ledger, TransactionType type, String from, long nonce, String to, byte[] data) {
+
+ info(
+ "*** sendTransactionSync: ledger={0} type={1} from={2} to={3} data={4}",
+ ledger, type, from, to, data);
+
+ try {
+ return ledgerBlockingStub.sendTransaction(SendTransactionRequest(ledger, type, from, nonce, to, data));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ throw e;
+ }
+ }
+
+ private SendTransactionRequest SendTransactionRequest(
+ String ledger, TransactionType type, String from, long nonce, String to, byte[] data) {
+
+ SendTransactionRequest.Transaction.Builder txBuilder =
+ SendTransactionRequest.Transaction.newBuilder().setType(type);
+ if (from != null) {
+ txBuilder.setFrom(ByteString.copyFrom(Utils.hexStringToByteArray(from)));
+ }
+ txBuilder.setNonce(nonce);
+ if (to != null) {
+ txBuilder.setTo(ByteString.copyFrom(Utils.hexStringToByteArray(to)));
+ }
+ if (data != null) {
+ txBuilder.setData(ByteString.copyFrom(data));
+ }
+
+ return SendTransactionRequest.newBuilder().setLedger(ledger).setTransaction(txBuilder).build();
+ }
+
+ /**
+ * 返回哈希所指定的区块
+ * (非阻塞)
+ */
+ public ListenableFuture getBlockByHash(
+ String ledger, String hash, boolean fullTransactions) {
+
+ info(
+ "*** getBlockByHash: ledger={0} hash={1} fullTransactions={2}",
+ ledger, hash, fullTransactions);
+
+ try {
+ return queryFutureStub.getBlockByHash(getBlockByHashRequest(ledger, hash, fullTransactions));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ return null;
+ }
+ }
+
+ /**
+ * 返回哈希所指定的区块
+ * (阻塞)
+ */
+ public GetBlockByHashResponse getBlockByHashSync(String ledger, String hash, boolean fullTransactions) {
+
+ info(
+ "*** getBlockByHashSync: ledger={0} hash={1} fullTransactions={2}",
+ ledger, hash, fullTransactions);
+
+ try {
+ return queryBlockingStub.getBlockByHash(getBlockByHashRequest(ledger, hash, fullTransactions));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ throw e;
+ }
+ }
+
+ private GetBlockByHashRequest getBlockByHashRequest(
+ String ledger, String hash, boolean fullTransactions) {
+
+ GetBlockByHashRequest.Builder reqBuilder =
+ GetBlockByHashRequest.newBuilder().setLedger(ledger).setFullTransactions(fullTransactions);
+ if (hash != null) {
+ reqBuilder.setHash(ByteString.copyFrom(Utils.hexStringToByteArray(hash)));
+ }
+
+ return reqBuilder.build();
+ }
+
+ /**
+ * 返回时间范围内的区块
+ * (非阻塞)
+ */
+ public ListenableFuture getBlocks(String ledger, ZonedDateTime startDateTime, IncludeTransactions includeTransactions) {
+ return getBlocks(ledger, startDateTime.toEpochSecond(), includeTransactions);
+ }
+
+ /**
+ * 返回时间范围内的区块
+ * (非阻塞)
+ */
+ public ListenableFuture getBlocks(String ledger, ZonedDateTime startDateTime, ZonedDateTime endDateTime, IncludeTransactions includeTransactions) {
+ return getBlocks(ledger, startDateTime.toEpochSecond(), endDateTime.toEpochSecond(), includeTransactions);
+ }
+
+ /**
+ * 返回时间范围内的区块
+ * (阻塞)
+ */
+ public GetBlocksResponse getBlocksSync(String ledger, ZonedDateTime startDateTime, IncludeTransactions includeTransactions) {
+ return getBlocksSync(ledger, startDateTime.toEpochSecond(), includeTransactions);
+ }
+
+ /**
+ * 返回时间范围内的区块
+ * (阻塞)
+ */
+ public GetBlocksResponse getBlocksSync(String ledger, ZonedDateTime startDateTime, ZonedDateTime endDateTime, IncludeTransactions includeTransactions) {
+ return getBlocksSync(ledger, startDateTime.toEpochSecond(), endDateTime.toEpochSecond(), includeTransactions);
+ }
+
+ /**
+ * 返回时间范围内的区块
+ * (非阻塞)
+ */
+ public ListenableFuture getBlocks(String ledger, long startUnixTime, IncludeTransactions includeTransactions) {
+ return getBlocks(ledger, startUnixTime, -1, includeTransactions);
+ }
+
+ /**
+ * 返回时间范围内的区块
+ * (非阻塞)
+ */
+ public ListenableFuture getBlocks(String ledger, long startUnixTime, long endUnixTime, IncludeTransactions includeTransactions) {
+
+ info(
+ "*** getBlocks: ledger={0} startUnixTime={1} endUnixTime={2}",
+ ledger, startUnixTime, endUnixTime);
+
+ try {
+ return queryFutureStub.getBlocks(blocksRequest(ledger, startUnixTime, endUnixTime, includeTransactions));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ return null;
+ }
+ }
+
+ /**
+ * 返回时间范围内的区块
+ * (阻塞)
+ */
+ public GetBlocksResponse getBlocksSync(String ledger, long startUnixTime, IncludeTransactions includeTransactions) {
+ return getBlocksSync(ledger, startUnixTime, -1, includeTransactions);
+ }
+
+ /**
+ * 返回时间范围内的区块
+ * (阻塞)
+ */
+ public GetBlocksResponse getBlocksSync(String ledger, long startUnixTime, long endUnixTime, IncludeTransactions includeTransactions) {
+
+ info(
+ "*** getBlocksSync: ledger={0} startUnixTime={1} endUnixTime={2}",
+ ledger, startUnixTime, endUnixTime);
+
+ try {
+ return queryBlockingStub.getBlocks(blocksRequest(ledger, startUnixTime, endUnixTime, includeTransactions));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ throw e;
+ }
+ }
+
+ /**
+ * 返回账本中的区块数量
+ * (非阻塞)
+ */
+ public ListenableFuture countBlocks(String ledger) {
+
+ info("*** blockNumber: ledger={0}", ledger);
+
+ try {
+ return queryFutureStub.countBlocks(blocksRequest(ledger, -1, -1, null));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ return null;
+ }
+ }
+
+ /**
+ * 返回账本中的区块数量
+ * (阻塞)
+ */
+ public CountBlocksResponse countBlocksSync(String ledger) {
+
+ info("*** blockNumberSync: ledger={0}", ledger);
+
+ try {
+ return queryBlockingStub.countBlocks(blocksRequest(ledger, -1, -1, null));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ throw e;
+ }
+ }
+
+ private BlocksRequest blocksRequest(String ledger, long startTimestamp, long endTimestamp, IncludeTransactions includeTransactions) {
+
+ BlocksRequest.Builder reqBuilder =
+ BlocksRequest.newBuilder().setLedger(ledger);
+ if (startTimestamp != -1) {
+ reqBuilder.setStartTimestamp(startTimestamp);
+ }
+ if (endTimestamp != -1) {
+ reqBuilder.setEndTimestamp(endTimestamp);
+ }
+ if (includeTransactions == null) {
+ includeTransactions = IncludeTransactions.NONE;
+ }
+ reqBuilder.setIncludeTransactions(includeTransactions);
+
+ return reqBuilder.build();
+ }
+
+ /**
+ * 返回时间戳最新的若干区块
+ * (非阻塞)
+ */
+ public ListenableFuture getRecentBlocks(String ledger, int count, IncludeTransactions includeTransactions) {
+
+ info(
+ "*** getRecentBlocks: ledger={0} count={1} includeTransactions={2}",
+ ledger, count, includeTransactions);
+
+ try {
+ return queryFutureStub.getRecentBlocks(getRecentBlocksRequest(ledger, count, includeTransactions));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ return null;
+ }
+ }
+
+ /**
+ * 返回时间戳最新的若干区块
+ * (阻塞)
+ */
+ public GetBlocksResponse getRecentBlocksSync(String ledger, int count, IncludeTransactions includeTransactions) {
+
+ info(
+ "*** getRecentBlocksSync: ledger={0} count={1} includeTransactions={2}",
+ ledger, count, includeTransactions);
+
+ try {
+ return queryBlockingStub.getRecentBlocks(getRecentBlocksRequest(ledger, count, includeTransactions));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ throw e;
+ }
+ }
+
+ private RecentBlocksRequest getRecentBlocksRequest(String ledger, int count, IncludeTransactions includeTransactions) {
+
+ RecentBlocksRequest.Builder reqBuilder =
+ RecentBlocksRequest.newBuilder()
+ .setLedger(ledger)
+ .setCount(count);
+ if (includeTransactions == null) {
+ includeTransactions = IncludeTransactions.NONE;
+ }
+ reqBuilder.setIncludeTransactions(includeTransactions);
+
+ return reqBuilder.build();
+ }
+
+ /**
+ * 返回哈希所指定的事务
+ * (非阻塞)
+ */
+ public ListenableFuture getTransactionByHash(String ledger, String hash) {
+
+ info("*** getTransactionByHash: ledger={0} hash={1}", ledger, hash);
+
+ try {
+ return queryFutureStub.getTransactionByHash(getTransactionByHashRequest(ledger, hash));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ return null;
+ }
+ }
+
+ /**
+ * 返回哈希所指定的事务
+ * (阻塞)
+ */
+ public GetTransactionByHashResponse getTransactionByHashSync(String ledger, String hash) {
+
+ info("*** getTransactionByHashSync: ledger={0} hash={1}", ledger, hash);
+
+ try {
+ return queryBlockingStub.getTransactionByHash(getTransactionByHashRequest(ledger, hash));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ throw e;
+ }
+ }
+
+ private GetTransactionByHashRequest getTransactionByHashRequest(String ledger, String hash) {
+
+ GetTransactionByHashRequest.Builder reqBuilder =
+ GetTransactionByHashRequest.newBuilder().setLedger(ledger);
+ if (hash != null) {
+ reqBuilder.setHash(ByteString.copyFrom(Utils.hexStringToByteArray(hash)));
+ }
+
+ return reqBuilder.build();
+ }
+
+ /**
+ * 返回区块的哈希与事务的index所指定的事务
+ * (非阻塞)
+ */
+ public ListenableFuture getTransactionByBlockHashAndIndex(
+ String ledger, String blockHash, int index) {
+
+ info(
+ "*** getTransactionByBlockHashAndIndex: ledger={0} blockHash={1} index={2}",
+ ledger, blockHash, index);
+
+ try {
+ return queryFutureStub.getTransactionByBlockHashAndIndex(
+ getTransactionByBlockHashAndIndexRequest(ledger, blockHash, index));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ return null;
+ }
+ }
+
+ /**
+ * 返回区块的哈希与事务的index所指定的事务
+ * (阻塞)
+ */
+ public GetTransactionByBlockHashAndIndexResponse getTransactionByBlockHashAndIndexSync(
+ String ledger, String blockHash, int index) {
+
+ info(
+ "*** getTransactionByBlockHashAndIndexSync: ledger={0} blockHash={1} index={2}",
+ ledger, blockHash, index);
+
+ try {
+ return queryBlockingStub.getTransactionByBlockHashAndIndex(
+ getTransactionByBlockHashAndIndexRequest(ledger, blockHash, index));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ throw e;
+ }
+ }
+
+ private GetTransactionByBlockHashAndIndexRequest getTransactionByBlockHashAndIndexRequest(
+ String ledger, String blockHash, int index) {
+
+ GetTransactionByBlockHashAndIndexRequest.Builder reqBuilder =
+ GetTransactionByBlockHashAndIndexRequest.newBuilder().setLedger(ledger).setIndex(index);
+
+ if (blockHash != null) {
+ reqBuilder.setBlockHash(ByteString.copyFrom(Utils.hexStringToByteArray(blockHash)));
+ }
+
+ return reqBuilder.build();
+ }
+
+ /**
+ * 返回时间范围内的事务
+ * (非阻塞)
+ */
+ public ListenableFuture getTransactions(String ledger, ZonedDateTime startDateTime) {
+ return getTransactions(ledger, startDateTime.toEpochSecond());
+ }
+
+ /**
+ * 返回时间范围内的事务
+ * (非阻塞)
+ */
+ public ListenableFuture getTransactions(String ledger, ZonedDateTime startDateTime, ZonedDateTime endDateTime) {
+ return getTransactions(ledger, startDateTime.toEpochSecond(), endDateTime.toEpochSecond());
+ }
+
+ /**
+ * 返回时间范围内的事务
+ * (阻塞)
+ */
+ public GetTransactionsResponse getTransactionsSync(String ledger, ZonedDateTime startDateTime) {
+ return getTransactionsSync(ledger, startDateTime.toEpochSecond());
+ }
+
+ /**
+ * 返回时间范围内的事务
+ * (阻塞)
+ */
+ public GetTransactionsResponse getTransactionsSync(String ledger, ZonedDateTime startDateTime, ZonedDateTime endDateTime) {
+ return getTransactionsSync(ledger, startDateTime.toEpochSecond(), endDateTime.toEpochSecond());
+ }
+
+ /**
+ * 返回时间范围内的事务
+ * (非阻塞)
+ */
+ public ListenableFuture getTransactions(String ledger, long startUnixTime) {
+ return getTransactions(ledger, startUnixTime, -1);
+ }
+
+ /**
+ * 返回时间范围内的事务
+ * (非阻塞)
+ */
+ public ListenableFuture getTransactions(String ledger, long startUnixTime, long endUnixTime) {
+
+ info(
+ "*** getTransactions: ledger={0} startUnixTime={1} endUnixTime={2}",
+ ledger, startUnixTime, endUnixTime);
+
+ try {
+ return queryFutureStub.getTransactions(transactionsRequest(ledger, startUnixTime, endUnixTime));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ return null;
+ }
+ }
+
+ /**
+ * 返回时间范围内的事务
+ * (阻塞)
+ */
+ public GetTransactionsResponse getTransactionsSync(String ledger, long startUnixTime) {
+ return getTransactionsSync(ledger, startUnixTime, -1);
+ }
+
+ /**
+ * 返回时间范围内的事务
+ * (阻塞)
+ */
+ public GetTransactionsResponse getTransactionsSync(String ledger, long startUnixTime, long endUnixTime) {
+
+ info(
+ "*** getTransactionsSync: ledger={0} startUnixTime={1} endUnixTime={2}",
+ ledger, startUnixTime, endUnixTime);
+
+ try {
+ return queryBlockingStub.getTransactions(transactionsRequest(ledger, startUnixTime, endUnixTime));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ throw e;
+ }
+ }
+
+ /**
+ * 返回账本中的事务数量
+ * (非阻塞)
+ */
+ public ListenableFuture countTransactions(String ledger) {
+
+ info("*** blockNumber: ledger={0}", ledger);
+
+ try {
+ return queryFutureStub.countTransactions(transactionsRequest(ledger, -1, -1));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ return null;
+ }
+ }
+
+ /**
+ * 返回账本中的事务数量
+ * (阻塞)
+ */
+ public CountTransactionsResponse countTransactionsSync(String ledger) {
+
+ info("*** blockNumberSync: ledger={0}", ledger);
+
+ try {
+ return queryBlockingStub.countTransactions(transactionsRequest(ledger, -1, -1));
+ } catch (StatusRuntimeException e) {
+ warning("RPC failed: {0}", e.getStatus());
+ throw e;
+ }
+ }
+
+ private TransactionsRequest transactionsRequest(String ledger, long startTimestamp, long endTimestamp) {
+
+ TransactionsRequest.Builder reqBuilder =
+ TransactionsRequest.newBuilder().setLedger(ledger);
+ if (startTimestamp != -1) {
+ reqBuilder.setStartTimestamp(startTimestamp);
+ }
+ if (endTimestamp != -1) {
+ reqBuilder.setEndTimestamp(endTimestamp);
+ }
+
+ return reqBuilder.build();
+ }
+
+ private void info(String msg, Object... params) {
+ logger.log(Level.INFO, msg, params);
+ }
+
+ private void warning(String msg, Object... params) {
+ logger.log(Level.WARNING, msg, params);
+ }
+}
diff --git a/src/main/java/org/bdware/bdledger/api/grpc/GrpcHeaderInterceptor.java b/src/main/java/org/bdware/bdledger/api/grpc/GrpcHeaderInterceptor.java
new file mode 100644
index 0000000..0543470
--- /dev/null
+++ b/src/main/java/org/bdware/bdledger/api/grpc/GrpcHeaderInterceptor.java
@@ -0,0 +1,33 @@
+package org.bdware.bdledger.api.grpc;
+
+import io.grpc.*;
+import io.grpc.stub.AbstractStub;
+import org.bdware.bdledger.api.grpc.pb.NodeGrpc;
+
+public class GrpcHeaderInterceptor implements ClientInterceptor {
+
+ private final Metadata extraHeaders;
+
+ public GrpcHeaderInterceptor(Metadata metadata) {
+ this.extraHeaders = metadata;
+ }
+
+ @Override
+ public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) {
+ return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) {
+ @Override
+ public void start(Listener responseListener, Metadata headers) {
+ // 合并额外的头信息
+ headers.merge(extraHeaders);
+ super.start(responseListener, headers);
+ }
+ };
+ }
+
+ public static > T attachHeaders(T stub, Metadata metadata) {
+ Channel channel = stub.getChannel();
+ Channel interceptedChannel = ClientInterceptors.intercept(channel, new GrpcHeaderInterceptor(metadata));
+ return stub.withChannel(interceptedChannel);
+ }
+
+}
diff --git a/src/main/java/org/bdware/bdledger/api/grpc/Utils.java b/src/main/java/org/bdware/bdledger/api/grpc/Utils.java
new file mode 100644
index 0000000..04cd518
--- /dev/null
+++ b/src/main/java/org/bdware/bdledger/api/grpc/Utils.java
@@ -0,0 +1,26 @@
+package org.bdware.bdledger.api.grpc;
+
+public class Utils {
+
+ public static byte[] hexStringToByteArray(String s) {
+ if (s.startsWith("0x")) {
+ s = s.substring(2);
+ }
+ int l = s.length();
+ byte[] data = new byte[l / 2];
+ for (int i = 0; i < l; i += 2) {
+ data[i / 2] =
+ (byte) ((Character.digit(s.charAt(i), 16) << 4) + Character.digit(s.charAt(i + 1), 16));
+ }
+ return data;
+ }
+
+ public static String byteArrayToHexString(byte[] bs) {
+ StringBuilder data = new StringBuilder();
+ for (byte b : bs) {
+ data.append(Integer.toHexString((b >> 4) & 0xf));
+ data.append(Integer.toHexString(b & 0xf));
+ }
+ return data.toString();
+ }
+}