From fa679c3cb3f69a08b4f0cb39f430ee15ddb606bc Mon Sep 17 00:00:00 2001 From: CaiHQ Date: Thu, 27 Mar 2025 10:14:42 +0800 Subject: [PATCH] fix: refind Client and Utils. --- build.gradle | 1 - .../org/bdware/bdledger/api/grpc/Client.java | 752 ++++++++++++++++++ .../api/grpc/GrpcHeaderInterceptor.java | 33 + .../org/bdware/bdledger/api/grpc/Utils.java | 26 + 4 files changed, 811 insertions(+), 1 deletion(-) create mode 100644 src/main/java/org/bdware/bdledger/api/grpc/Client.java create mode 100644 src/main/java/org/bdware/bdledger/api/grpc/GrpcHeaderInterceptor.java create mode 100644 src/main/java/org/bdware/bdledger/api/grpc/Utils.java diff --git a/build.gradle b/build.gradle index d6e854f..395f82a 100644 --- a/build.gradle +++ b/build.gradle @@ -65,7 +65,6 @@ protobuf { artifact = 'io.grpc:protoc-gen-grpc-java:' + grpc_java_version } } - // generatedFilesBaseDir = "$projectDir/src" generateProtoTasks { all().each { task -> task.plugins { diff --git a/src/main/java/org/bdware/bdledger/api/grpc/Client.java b/src/main/java/org/bdware/bdledger/api/grpc/Client.java new file mode 100644 index 0000000..4c8cd5f --- /dev/null +++ b/src/main/java/org/bdware/bdledger/api/grpc/Client.java @@ -0,0 +1,752 @@ +package org.bdware.bdledger.api.grpc; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.ByteString; +import com.google.protobuf.Empty; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.MetadataUtils; +import org.bdware.bdledger.api.grpc.pb.CommonProto.TransactionType; +import org.bdware.bdledger.api.grpc.pb.LedgerGrpc; +import org.bdware.bdledger.api.grpc.pb.LedgerProto.*; +import org.bdware.bdledger.api.grpc.pb.NodeGrpc; +import org.bdware.bdledger.api.grpc.pb.NodeProto.ClientVersionResponse; +import org.bdware.bdledger.api.grpc.pb.QueryGrpc; +import org.bdware.bdledger.api.grpc.pb.QueryProto.*; + +import java.time.ZonedDateTime; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * 事务账本客户端 + * + *

如有更灵活的需求可直接使用{@link org.bdware.bdledger.api.grpc.pb.LedgerGrpc}类。 + * + * @author nex + * @see 事务账本API + */ +public class Client { + + private static final Logger logger = Logger.getLogger(Client.class.getName()); + + private ManagedChannel channel; + private NodeGrpc.NodeFutureStub nodeFutureStub; + private NodeGrpc.NodeBlockingStub nodeBlockingStub; + private LedgerGrpc.LedgerFutureStub ledgerFutureStub; + private LedgerGrpc.LedgerBlockingStub ledgerBlockingStub; + private QueryGrpc.QueryFutureStub queryFutureStub; + private QueryGrpc.QueryBlockingStub queryBlockingStub; +// private final ManagedChannel channel; +// private final NodeGrpc.NodeFutureStub nodeFutureStub; +// private final NodeGrpc.NodeBlockingStub nodeBlockingStub; +// private final LedgerGrpc.LedgerFutureStub ledgerFutureStub; +// private final LedgerGrpc.LedgerBlockingStub ledgerBlockingStub; +// private final QueryGrpc.QueryFutureStub queryFutureStub; +// private final QueryGrpc.QueryBlockingStub queryBlockingStub; + + /** + * 构造客户端来向{@code host:port} BDLedger节点发请求。 + */ + public Client(String host, int port) { + this(ManagedChannelBuilder.forAddress(host, port).usePlaintext(), null); + } + + /** + * 构造客户端,使用{@code token}作access token来向{@code host:port} BDLedger节点发请求。 + */ + public Client(String host, int port, String token) { + this(ManagedChannelBuilder.forAddress(host, port).usePlaintext(), token); + } + + /** + * 用已有的{@link io.grpc.Channel}对象构造客户端来向BDLedger节点发请求。 + */ + public Client(ManagedChannelBuilder channelBuilder, String token) { + channel = channelBuilder.build(); + + nodeFutureStub = NodeGrpc.newFutureStub(channel); + nodeBlockingStub = NodeGrpc.newBlockingStub(channel); + ledgerFutureStub = LedgerGrpc.newFutureStub(channel); + ledgerBlockingStub = LedgerGrpc.newBlockingStub(channel); + queryFutureStub = QueryGrpc.newFutureStub(channel); + queryBlockingStub = QueryGrpc.newBlockingStub(channel); + + if (token != null) { + Metadata fixedHeaders = new Metadata(); + fixedHeaders.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), "bearer " + token); + nodeFutureStub = GrpcHeaderInterceptor.attachHeaders(nodeFutureStub, fixedHeaders); + nodeBlockingStub = GrpcHeaderInterceptor.attachHeaders(nodeBlockingStub, fixedHeaders); + ledgerFutureStub = GrpcHeaderInterceptor.attachHeaders(ledgerFutureStub, fixedHeaders); + ledgerBlockingStub = GrpcHeaderInterceptor.attachHeaders(ledgerBlockingStub, fixedHeaders); + queryFutureStub = GrpcHeaderInterceptor.attachHeaders(queryFutureStub, fixedHeaders); + queryBlockingStub = GrpcHeaderInterceptor.attachHeaders(queryBlockingStub, fixedHeaders); + } + } + + /** + * 关闭客户端的网络连接。 + */ + public void shutdown() throws InterruptedException { + channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + } + + /** + * 查询BDLedger节点版本 + * (非阻塞) + */ + public ListenableFuture clientVersion() { + + info("*** clientVersion"); + + try { + return nodeFutureStub.clientVersion(Empty.getDefaultInstance()); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + return null; + } + } + + /** + * 查询BDLedger节点版本 + * (阻塞) + */ + public ClientVersionResponse clientVersionSync() { + + info("*** clientVersionSync"); + + try { + return nodeBlockingStub.clientVersion(Empty.getDefaultInstance()); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + throw e; + } + } + + /** + * 创建账本 + * (非阻塞) + */ + public ListenableFuture createLedger(String name) { + + info("*** createLedger: name={0}", name); + + try { + return ledgerFutureStub.createLedger(createLedgerRequest(name)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + return null; + } + } + + /** + * 创建账本 + * (阻塞) + */ + public CreateLedgerResponse createLedgerSync(String name) { + + info("*** createLedgerSync: name={0}", name); + + try { + return ledgerBlockingStub.createLedger(createLedgerRequest(name)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + throw e; + } + } + + private CreateLedgerRequest createLedgerRequest(String name) { + return CreateLedgerRequest.newBuilder().setName(name).build(); + } + + /** + * 返回账本列表 + * (非阻塞) + */ + public ListenableFuture getLedgers() { + + info("*** getLedgers"); + + try { + return ledgerFutureStub.getLedgers(Empty.getDefaultInstance()); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + return null; + } + } + + /** + * 返回账本列表 + * (阻塞) + */ + public GetLedgersResponse getLedgersSync() { + + info("*** getLedgersSync"); + + try { + return ledgerBlockingStub.getLedgers(Empty.getDefaultInstance()); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + throw e; + } + } + + /** + * 发送新事务 + * (非阻塞) + */ + public ListenableFuture sendTransaction( + String ledger, TransactionType type, String from, long nonce, String to, byte[] data) { + + info( + "*** sendTransaction: ledger={0} type={1} from={2} to={3} data={4}", + ledger, type, from, to, data); + + try { + return ledgerFutureStub.sendTransaction(SendTransactionRequest(ledger, type, from, nonce, to, data)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + return null; + } + } + + /** + * 发送新事务 + * (阻塞) + */ + public SendTransactionResponse sendTransactionSync( + String ledger, TransactionType type, String from, long nonce, String to, byte[] data) { + + info( + "*** sendTransactionSync: ledger={0} type={1} from={2} to={3} data={4}", + ledger, type, from, to, data); + + try { + return ledgerBlockingStub.sendTransaction(SendTransactionRequest(ledger, type, from, nonce, to, data)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + throw e; + } + } + + private SendTransactionRequest SendTransactionRequest( + String ledger, TransactionType type, String from, long nonce, String to, byte[] data) { + + SendTransactionRequest.Transaction.Builder txBuilder = + SendTransactionRequest.Transaction.newBuilder().setType(type); + if (from != null) { + txBuilder.setFrom(ByteString.copyFrom(Utils.hexStringToByteArray(from))); + } + txBuilder.setNonce(nonce); + if (to != null) { + txBuilder.setTo(ByteString.copyFrom(Utils.hexStringToByteArray(to))); + } + if (data != null) { + txBuilder.setData(ByteString.copyFrom(data)); + } + + return SendTransactionRequest.newBuilder().setLedger(ledger).setTransaction(txBuilder).build(); + } + + /** + * 返回哈希所指定的区块 + * (非阻塞) + */ + public ListenableFuture getBlockByHash( + String ledger, String hash, boolean fullTransactions) { + + info( + "*** getBlockByHash: ledger={0} hash={1} fullTransactions={2}", + ledger, hash, fullTransactions); + + try { + return queryFutureStub.getBlockByHash(getBlockByHashRequest(ledger, hash, fullTransactions)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + return null; + } + } + + /** + * 返回哈希所指定的区块 + * (阻塞) + */ + public GetBlockByHashResponse getBlockByHashSync(String ledger, String hash, boolean fullTransactions) { + + info( + "*** getBlockByHashSync: ledger={0} hash={1} fullTransactions={2}", + ledger, hash, fullTransactions); + + try { + return queryBlockingStub.getBlockByHash(getBlockByHashRequest(ledger, hash, fullTransactions)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + throw e; + } + } + + private GetBlockByHashRequest getBlockByHashRequest( + String ledger, String hash, boolean fullTransactions) { + + GetBlockByHashRequest.Builder reqBuilder = + GetBlockByHashRequest.newBuilder().setLedger(ledger).setFullTransactions(fullTransactions); + if (hash != null) { + reqBuilder.setHash(ByteString.copyFrom(Utils.hexStringToByteArray(hash))); + } + + return reqBuilder.build(); + } + + /** + * 返回时间范围内的区块 + * (非阻塞) + */ + public ListenableFuture getBlocks(String ledger, ZonedDateTime startDateTime, IncludeTransactions includeTransactions) { + return getBlocks(ledger, startDateTime.toEpochSecond(), includeTransactions); + } + + /** + * 返回时间范围内的区块 + * (非阻塞) + */ + public ListenableFuture getBlocks(String ledger, ZonedDateTime startDateTime, ZonedDateTime endDateTime, IncludeTransactions includeTransactions) { + return getBlocks(ledger, startDateTime.toEpochSecond(), endDateTime.toEpochSecond(), includeTransactions); + } + + /** + * 返回时间范围内的区块 + * (阻塞) + */ + public GetBlocksResponse getBlocksSync(String ledger, ZonedDateTime startDateTime, IncludeTransactions includeTransactions) { + return getBlocksSync(ledger, startDateTime.toEpochSecond(), includeTransactions); + } + + /** + * 返回时间范围内的区块 + * (阻塞) + */ + public GetBlocksResponse getBlocksSync(String ledger, ZonedDateTime startDateTime, ZonedDateTime endDateTime, IncludeTransactions includeTransactions) { + return getBlocksSync(ledger, startDateTime.toEpochSecond(), endDateTime.toEpochSecond(), includeTransactions); + } + + /** + * 返回时间范围内的区块 + * (非阻塞) + */ + public ListenableFuture getBlocks(String ledger, long startUnixTime, IncludeTransactions includeTransactions) { + return getBlocks(ledger, startUnixTime, -1, includeTransactions); + } + + /** + * 返回时间范围内的区块 + * (非阻塞) + */ + public ListenableFuture getBlocks(String ledger, long startUnixTime, long endUnixTime, IncludeTransactions includeTransactions) { + + info( + "*** getBlocks: ledger={0} startUnixTime={1} endUnixTime={2}", + ledger, startUnixTime, endUnixTime); + + try { + return queryFutureStub.getBlocks(blocksRequest(ledger, startUnixTime, endUnixTime, includeTransactions)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + return null; + } + } + + /** + * 返回时间范围内的区块 + * (阻塞) + */ + public GetBlocksResponse getBlocksSync(String ledger, long startUnixTime, IncludeTransactions includeTransactions) { + return getBlocksSync(ledger, startUnixTime, -1, includeTransactions); + } + + /** + * 返回时间范围内的区块 + * (阻塞) + */ + public GetBlocksResponse getBlocksSync(String ledger, long startUnixTime, long endUnixTime, IncludeTransactions includeTransactions) { + + info( + "*** getBlocksSync: ledger={0} startUnixTime={1} endUnixTime={2}", + ledger, startUnixTime, endUnixTime); + + try { + return queryBlockingStub.getBlocks(blocksRequest(ledger, startUnixTime, endUnixTime, includeTransactions)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + throw e; + } + } + + /** + * 返回账本中的区块数量 + * (非阻塞) + */ + public ListenableFuture countBlocks(String ledger) { + + info("*** blockNumber: ledger={0}", ledger); + + try { + return queryFutureStub.countBlocks(blocksRequest(ledger, -1, -1, null)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + return null; + } + } + + /** + * 返回账本中的区块数量 + * (阻塞) + */ + public CountBlocksResponse countBlocksSync(String ledger) { + + info("*** blockNumberSync: ledger={0}", ledger); + + try { + return queryBlockingStub.countBlocks(blocksRequest(ledger, -1, -1, null)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + throw e; + } + } + + private BlocksRequest blocksRequest(String ledger, long startTimestamp, long endTimestamp, IncludeTransactions includeTransactions) { + + BlocksRequest.Builder reqBuilder = + BlocksRequest.newBuilder().setLedger(ledger); + if (startTimestamp != -1) { + reqBuilder.setStartTimestamp(startTimestamp); + } + if (endTimestamp != -1) { + reqBuilder.setEndTimestamp(endTimestamp); + } + if (includeTransactions == null) { + includeTransactions = IncludeTransactions.NONE; + } + reqBuilder.setIncludeTransactions(includeTransactions); + + return reqBuilder.build(); + } + + /** + * 返回时间戳最新的若干区块 + * (非阻塞) + */ + public ListenableFuture getRecentBlocks(String ledger, int count, IncludeTransactions includeTransactions) { + + info( + "*** getRecentBlocks: ledger={0} count={1} includeTransactions={2}", + ledger, count, includeTransactions); + + try { + return queryFutureStub.getRecentBlocks(getRecentBlocksRequest(ledger, count, includeTransactions)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + return null; + } + } + + /** + * 返回时间戳最新的若干区块 + * (阻塞) + */ + public GetBlocksResponse getRecentBlocksSync(String ledger, int count, IncludeTransactions includeTransactions) { + + info( + "*** getRecentBlocksSync: ledger={0} count={1} includeTransactions={2}", + ledger, count, includeTransactions); + + try { + return queryBlockingStub.getRecentBlocks(getRecentBlocksRequest(ledger, count, includeTransactions)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + throw e; + } + } + + private RecentBlocksRequest getRecentBlocksRequest(String ledger, int count, IncludeTransactions includeTransactions) { + + RecentBlocksRequest.Builder reqBuilder = + RecentBlocksRequest.newBuilder() + .setLedger(ledger) + .setCount(count); + if (includeTransactions == null) { + includeTransactions = IncludeTransactions.NONE; + } + reqBuilder.setIncludeTransactions(includeTransactions); + + return reqBuilder.build(); + } + + /** + * 返回哈希所指定的事务 + * (非阻塞) + */ + public ListenableFuture getTransactionByHash(String ledger, String hash) { + + info("*** getTransactionByHash: ledger={0} hash={1}", ledger, hash); + + try { + return queryFutureStub.getTransactionByHash(getTransactionByHashRequest(ledger, hash)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + return null; + } + } + + /** + * 返回哈希所指定的事务 + * (阻塞) + */ + public GetTransactionByHashResponse getTransactionByHashSync(String ledger, String hash) { + + info("*** getTransactionByHashSync: ledger={0} hash={1}", ledger, hash); + + try { + return queryBlockingStub.getTransactionByHash(getTransactionByHashRequest(ledger, hash)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + throw e; + } + } + + private GetTransactionByHashRequest getTransactionByHashRequest(String ledger, String hash) { + + GetTransactionByHashRequest.Builder reqBuilder = + GetTransactionByHashRequest.newBuilder().setLedger(ledger); + if (hash != null) { + reqBuilder.setHash(ByteString.copyFrom(Utils.hexStringToByteArray(hash))); + } + + return reqBuilder.build(); + } + + /** + * 返回区块的哈希与事务的index所指定的事务 + * (非阻塞) + */ + public ListenableFuture getTransactionByBlockHashAndIndex( + String ledger, String blockHash, int index) { + + info( + "*** getTransactionByBlockHashAndIndex: ledger={0} blockHash={1} index={2}", + ledger, blockHash, index); + + try { + return queryFutureStub.getTransactionByBlockHashAndIndex( + getTransactionByBlockHashAndIndexRequest(ledger, blockHash, index)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + return null; + } + } + + /** + * 返回区块的哈希与事务的index所指定的事务 + * (阻塞) + */ + public GetTransactionByBlockHashAndIndexResponse getTransactionByBlockHashAndIndexSync( + String ledger, String blockHash, int index) { + + info( + "*** getTransactionByBlockHashAndIndexSync: ledger={0} blockHash={1} index={2}", + ledger, blockHash, index); + + try { + return queryBlockingStub.getTransactionByBlockHashAndIndex( + getTransactionByBlockHashAndIndexRequest(ledger, blockHash, index)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + throw e; + } + } + + private GetTransactionByBlockHashAndIndexRequest getTransactionByBlockHashAndIndexRequest( + String ledger, String blockHash, int index) { + + GetTransactionByBlockHashAndIndexRequest.Builder reqBuilder = + GetTransactionByBlockHashAndIndexRequest.newBuilder().setLedger(ledger).setIndex(index); + + if (blockHash != null) { + reqBuilder.setBlockHash(ByteString.copyFrom(Utils.hexStringToByteArray(blockHash))); + } + + return reqBuilder.build(); + } + + /** + * 返回时间范围内的事务 + * (非阻塞) + */ + public ListenableFuture getTransactions(String ledger, ZonedDateTime startDateTime) { + return getTransactions(ledger, startDateTime.toEpochSecond()); + } + + /** + * 返回时间范围内的事务 + * (非阻塞) + */ + public ListenableFuture getTransactions(String ledger, ZonedDateTime startDateTime, ZonedDateTime endDateTime) { + return getTransactions(ledger, startDateTime.toEpochSecond(), endDateTime.toEpochSecond()); + } + + /** + * 返回时间范围内的事务 + * (阻塞) + */ + public GetTransactionsResponse getTransactionsSync(String ledger, ZonedDateTime startDateTime) { + return getTransactionsSync(ledger, startDateTime.toEpochSecond()); + } + + /** + * 返回时间范围内的事务 + * (阻塞) + */ + public GetTransactionsResponse getTransactionsSync(String ledger, ZonedDateTime startDateTime, ZonedDateTime endDateTime) { + return getTransactionsSync(ledger, startDateTime.toEpochSecond(), endDateTime.toEpochSecond()); + } + + /** + * 返回时间范围内的事务 + * (非阻塞) + */ + public ListenableFuture getTransactions(String ledger, long startUnixTime) { + return getTransactions(ledger, startUnixTime, -1); + } + + /** + * 返回时间范围内的事务 + * (非阻塞) + */ + public ListenableFuture getTransactions(String ledger, long startUnixTime, long endUnixTime) { + + info( + "*** getTransactions: ledger={0} startUnixTime={1} endUnixTime={2}", + ledger, startUnixTime, endUnixTime); + + try { + return queryFutureStub.getTransactions(transactionsRequest(ledger, startUnixTime, endUnixTime)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + return null; + } + } + + /** + * 返回时间范围内的事务 + * (阻塞) + */ + public GetTransactionsResponse getTransactionsSync(String ledger, long startUnixTime) { + return getTransactionsSync(ledger, startUnixTime, -1); + } + + /** + * 返回时间范围内的事务 + * (阻塞) + */ + public GetTransactionsResponse getTransactionsSync(String ledger, long startUnixTime, long endUnixTime) { + + info( + "*** getTransactionsSync: ledger={0} startUnixTime={1} endUnixTime={2}", + ledger, startUnixTime, endUnixTime); + + try { + return queryBlockingStub.getTransactions(transactionsRequest(ledger, startUnixTime, endUnixTime)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + throw e; + } + } + + /** + * 返回账本中的事务数量 + * (非阻塞) + */ + public ListenableFuture countTransactions(String ledger) { + + info("*** blockNumber: ledger={0}", ledger); + + try { + return queryFutureStub.countTransactions(transactionsRequest(ledger, -1, -1)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + return null; + } + } + + /** + * 返回账本中的事务数量 + * (阻塞) + */ + public CountTransactionsResponse countTransactionsSync(String ledger) { + + info("*** blockNumberSync: ledger={0}", ledger); + + try { + return queryBlockingStub.countTransactions(transactionsRequest(ledger, -1, -1)); + } catch (StatusRuntimeException e) { + warning("RPC failed: {0}", e.getStatus()); + throw e; + } + } + + private TransactionsRequest transactionsRequest(String ledger, long startTimestamp, long endTimestamp) { + + TransactionsRequest.Builder reqBuilder = + TransactionsRequest.newBuilder().setLedger(ledger); + if (startTimestamp != -1) { + reqBuilder.setStartTimestamp(startTimestamp); + } + if (endTimestamp != -1) { + reqBuilder.setEndTimestamp(endTimestamp); + } + + return reqBuilder.build(); + } + + private void info(String msg, Object... params) { + logger.log(Level.INFO, msg, params); + } + + private void warning(String msg, Object... params) { + logger.log(Level.WARNING, msg, params); + } +} diff --git a/src/main/java/org/bdware/bdledger/api/grpc/GrpcHeaderInterceptor.java b/src/main/java/org/bdware/bdledger/api/grpc/GrpcHeaderInterceptor.java new file mode 100644 index 0000000..0543470 --- /dev/null +++ b/src/main/java/org/bdware/bdledger/api/grpc/GrpcHeaderInterceptor.java @@ -0,0 +1,33 @@ +package org.bdware.bdledger.api.grpc; + +import io.grpc.*; +import io.grpc.stub.AbstractStub; +import org.bdware.bdledger.api.grpc.pb.NodeGrpc; + +public class GrpcHeaderInterceptor implements ClientInterceptor { + + private final Metadata extraHeaders; + + public GrpcHeaderInterceptor(Metadata metadata) { + this.extraHeaders = metadata; + } + + @Override + public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { + return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + // 合并额外的头信息 + headers.merge(extraHeaders); + super.start(responseListener, headers); + } + }; + } + + public static > T attachHeaders(T stub, Metadata metadata) { + Channel channel = stub.getChannel(); + Channel interceptedChannel = ClientInterceptors.intercept(channel, new GrpcHeaderInterceptor(metadata)); + return stub.withChannel(interceptedChannel); + } + +} diff --git a/src/main/java/org/bdware/bdledger/api/grpc/Utils.java b/src/main/java/org/bdware/bdledger/api/grpc/Utils.java new file mode 100644 index 0000000..04cd518 --- /dev/null +++ b/src/main/java/org/bdware/bdledger/api/grpc/Utils.java @@ -0,0 +1,26 @@ +package org.bdware.bdledger.api.grpc; + +public class Utils { + + public static byte[] hexStringToByteArray(String s) { + if (s.startsWith("0x")) { + s = s.substring(2); + } + int l = s.length(); + byte[] data = new byte[l / 2]; + for (int i = 0; i < l; i += 2) { + data[i / 2] = + (byte) ((Character.digit(s.charAt(i), 16) << 4) + Character.digit(s.charAt(i + 1), 16)); + } + return data; + } + + public static String byteArrayToHexString(byte[] bs) { + StringBuilder data = new StringBuilder(); + for (byte b : bs) { + data.append(Integer.toHexString((b >> 4) & 0xf)); + data.append(Integer.toHexString(b & 0xf)); + } + return data.toString(); + } +}