fix: refind Client and Utils.

This commit is contained in:
CaiHQ 2025-03-27 10:14:42 +08:00
parent c9832f4de9
commit fa679c3cb3
4 changed files with 811 additions and 1 deletions

View File

@ -65,7 +65,6 @@ protobuf {
artifact = 'io.grpc:protoc-gen-grpc-java:' + grpc_java_version artifact = 'io.grpc:protoc-gen-grpc-java:' + grpc_java_version
} }
} }
// generatedFilesBaseDir = "$projectDir/src"
generateProtoTasks { generateProtoTasks {
all().each { task -> all().each { task ->
task.plugins { task.plugins {

View File

@ -0,0 +1,752 @@
package org.bdware.bdledger.api.grpc;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.MetadataUtils;
import org.bdware.bdledger.api.grpc.pb.CommonProto.TransactionType;
import org.bdware.bdledger.api.grpc.pb.LedgerGrpc;
import org.bdware.bdledger.api.grpc.pb.LedgerProto.*;
import org.bdware.bdledger.api.grpc.pb.NodeGrpc;
import org.bdware.bdledger.api.grpc.pb.NodeProto.ClientVersionResponse;
import org.bdware.bdledger.api.grpc.pb.QueryGrpc;
import org.bdware.bdledger.api.grpc.pb.QueryProto.*;
import java.time.ZonedDateTime;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* 事务账本客户端
*
* <p>如有更灵活的需求可直接使用{@link org.bdware.bdledger.api.grpc.pb.LedgerGrpc}
*
* @author nex
* @see <a href="#">事务账本API</a>
*/
public class Client {
private static final Logger logger = Logger.getLogger(Client.class.getName());
private ManagedChannel channel;
private NodeGrpc.NodeFutureStub nodeFutureStub;
private NodeGrpc.NodeBlockingStub nodeBlockingStub;
private LedgerGrpc.LedgerFutureStub ledgerFutureStub;
private LedgerGrpc.LedgerBlockingStub ledgerBlockingStub;
private QueryGrpc.QueryFutureStub queryFutureStub;
private QueryGrpc.QueryBlockingStub queryBlockingStub;
// private final ManagedChannel channel;
// private final NodeGrpc.NodeFutureStub nodeFutureStub;
// private final NodeGrpc.NodeBlockingStub nodeBlockingStub;
// private final LedgerGrpc.LedgerFutureStub ledgerFutureStub;
// private final LedgerGrpc.LedgerBlockingStub ledgerBlockingStub;
// private final QueryGrpc.QueryFutureStub queryFutureStub;
// private final QueryGrpc.QueryBlockingStub queryBlockingStub;
/**
* 构造客户端来向{@code host:port} BDLedger节点发请求
*/
public Client(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port).usePlaintext(), null);
}
/**
* 构造客户端使用{@code token}作access token来向{@code host:port} BDLedger节点发请求
*/
public Client(String host, int port, String token) {
this(ManagedChannelBuilder.forAddress(host, port).usePlaintext(), token);
}
/**
* 用已有的{@link io.grpc.Channel}对象构造客户端来向BDLedger节点发请求
*/
public Client(ManagedChannelBuilder<?> channelBuilder, String token) {
channel = channelBuilder.build();
nodeFutureStub = NodeGrpc.newFutureStub(channel);
nodeBlockingStub = NodeGrpc.newBlockingStub(channel);
ledgerFutureStub = LedgerGrpc.newFutureStub(channel);
ledgerBlockingStub = LedgerGrpc.newBlockingStub(channel);
queryFutureStub = QueryGrpc.newFutureStub(channel);
queryBlockingStub = QueryGrpc.newBlockingStub(channel);
if (token != null) {
Metadata fixedHeaders = new Metadata();
fixedHeaders.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), "bearer " + token);
nodeFutureStub = GrpcHeaderInterceptor.attachHeaders(nodeFutureStub, fixedHeaders);
nodeBlockingStub = GrpcHeaderInterceptor.attachHeaders(nodeBlockingStub, fixedHeaders);
ledgerFutureStub = GrpcHeaderInterceptor.attachHeaders(ledgerFutureStub, fixedHeaders);
ledgerBlockingStub = GrpcHeaderInterceptor.attachHeaders(ledgerBlockingStub, fixedHeaders);
queryFutureStub = GrpcHeaderInterceptor.attachHeaders(queryFutureStub, fixedHeaders);
queryBlockingStub = GrpcHeaderInterceptor.attachHeaders(queryBlockingStub, fixedHeaders);
}
}
/**
* 关闭客户端的网络连接
*/
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
/**
* <a
* href="#">查询BDLedger节点版本</a>
* 非阻塞
*/
public ListenableFuture<ClientVersionResponse> clientVersion() {
info("*** clientVersion");
try {
return nodeFutureStub.clientVersion(Empty.getDefaultInstance());
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
return null;
}
}
/**
* <a
* href="#">查询BDLedger节点版本</a>
* 阻塞
*/
public ClientVersionResponse clientVersionSync() {
info("*** clientVersionSync");
try {
return nodeBlockingStub.clientVersion(Empty.getDefaultInstance());
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
throw e;
}
}
/**
* <a href="#">创建账本</a>
* 非阻塞
*/
public ListenableFuture<CreateLedgerResponse> createLedger(String name) {
info("*** createLedger: name={0}", name);
try {
return ledgerFutureStub.createLedger(createLedgerRequest(name));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
return null;
}
}
/**
* <a href="#">创建账本</a>
* 阻塞
*/
public CreateLedgerResponse createLedgerSync(String name) {
info("*** createLedgerSync: name={0}", name);
try {
return ledgerBlockingStub.createLedger(createLedgerRequest(name));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
throw e;
}
}
private CreateLedgerRequest createLedgerRequest(String name) {
return CreateLedgerRequest.newBuilder().setName(name).build();
}
/**
* <a href="#">返回账本列表</a>
* 非阻塞
*/
public ListenableFuture<GetLedgersResponse> getLedgers() {
info("*** getLedgers");
try {
return ledgerFutureStub.getLedgers(Empty.getDefaultInstance());
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
return null;
}
}
/**
* <a href="#">返回账本列表</a>
* 阻塞
*/
public GetLedgersResponse getLedgersSync() {
info("*** getLedgersSync");
try {
return ledgerBlockingStub.getLedgers(Empty.getDefaultInstance());
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
throw e;
}
}
/**
* <a
* href="#">发送新事务</a>
* 非阻塞
*/
public ListenableFuture<SendTransactionResponse> sendTransaction(
String ledger, TransactionType type, String from, long nonce, String to, byte[] data) {
info(
"*** sendTransaction: ledger={0} type={1} from={2} to={3} data={4}",
ledger, type, from, to, data);
try {
return ledgerFutureStub.sendTransaction(SendTransactionRequest(ledger, type, from, nonce, to, data));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
return null;
}
}
/**
* <a
* href="#">发送新事务</a>
* 阻塞
*/
public SendTransactionResponse sendTransactionSync(
String ledger, TransactionType type, String from, long nonce, String to, byte[] data) {
info(
"*** sendTransactionSync: ledger={0} type={1} from={2} to={3} data={4}",
ledger, type, from, to, data);
try {
return ledgerBlockingStub.sendTransaction(SendTransactionRequest(ledger, type, from, nonce, to, data));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
throw e;
}
}
private SendTransactionRequest SendTransactionRequest(
String ledger, TransactionType type, String from, long nonce, String to, byte[] data) {
SendTransactionRequest.Transaction.Builder txBuilder =
SendTransactionRequest.Transaction.newBuilder().setType(type);
if (from != null) {
txBuilder.setFrom(ByteString.copyFrom(Utils.hexStringToByteArray(from)));
}
txBuilder.setNonce(nonce);
if (to != null) {
txBuilder.setTo(ByteString.copyFrom(Utils.hexStringToByteArray(to)));
}
if (data != null) {
txBuilder.setData(ByteString.copyFrom(data));
}
return SendTransactionRequest.newBuilder().setLedger(ledger).setTransaction(txBuilder).build();
}
/**
* <a
* href="#">返回哈希所指定的区块</a>
* 非阻塞
*/
public ListenableFuture<GetBlockByHashResponse> getBlockByHash(
String ledger, String hash, boolean fullTransactions) {
info(
"*** getBlockByHash: ledger={0} hash={1} fullTransactions={2}",
ledger, hash, fullTransactions);
try {
return queryFutureStub.getBlockByHash(getBlockByHashRequest(ledger, hash, fullTransactions));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
return null;
}
}
/**
* <a
* href="#">返回哈希所指定的区块</a>
* 阻塞
*/
public GetBlockByHashResponse getBlockByHashSync(String ledger, String hash, boolean fullTransactions) {
info(
"*** getBlockByHashSync: ledger={0} hash={1} fullTransactions={2}",
ledger, hash, fullTransactions);
try {
return queryBlockingStub.getBlockByHash(getBlockByHashRequest(ledger, hash, fullTransactions));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
throw e;
}
}
private GetBlockByHashRequest getBlockByHashRequest(
String ledger, String hash, boolean fullTransactions) {
GetBlockByHashRequest.Builder reqBuilder =
GetBlockByHashRequest.newBuilder().setLedger(ledger).setFullTransactions(fullTransactions);
if (hash != null) {
reqBuilder.setHash(ByteString.copyFrom(Utils.hexStringToByteArray(hash)));
}
return reqBuilder.build();
}
/**
* <a
* href="#">返回时间范围内的区块</a>
* 非阻塞
*/
public ListenableFuture<GetBlocksResponse> getBlocks(String ledger, ZonedDateTime startDateTime, IncludeTransactions includeTransactions) {
return getBlocks(ledger, startDateTime.toEpochSecond(), includeTransactions);
}
/**
* <a
* href="#">返回时间范围内的区块</a>
* 非阻塞
*/
public ListenableFuture<GetBlocksResponse> getBlocks(String ledger, ZonedDateTime startDateTime, ZonedDateTime endDateTime, IncludeTransactions includeTransactions) {
return getBlocks(ledger, startDateTime.toEpochSecond(), endDateTime.toEpochSecond(), includeTransactions);
}
/**
* <a
* href="#">返回时间范围内的区块</a>
* 阻塞
*/
public GetBlocksResponse getBlocksSync(String ledger, ZonedDateTime startDateTime, IncludeTransactions includeTransactions) {
return getBlocksSync(ledger, startDateTime.toEpochSecond(), includeTransactions);
}
/**
* <a
* href="#">返回时间范围内的区块</a>
* 阻塞
*/
public GetBlocksResponse getBlocksSync(String ledger, ZonedDateTime startDateTime, ZonedDateTime endDateTime, IncludeTransactions includeTransactions) {
return getBlocksSync(ledger, startDateTime.toEpochSecond(), endDateTime.toEpochSecond(), includeTransactions);
}
/**
* <a
* href="#">返回时间范围内的区块</a>
* 非阻塞
*/
public ListenableFuture<GetBlocksResponse> getBlocks(String ledger, long startUnixTime, IncludeTransactions includeTransactions) {
return getBlocks(ledger, startUnixTime, -1, includeTransactions);
}
/**
* <a
* href="#">返回时间范围内的区块</a>
* 非阻塞
*/
public ListenableFuture<GetBlocksResponse> getBlocks(String ledger, long startUnixTime, long endUnixTime, IncludeTransactions includeTransactions) {
info(
"*** getBlocks: ledger={0} startUnixTime={1} endUnixTime={2}",
ledger, startUnixTime, endUnixTime);
try {
return queryFutureStub.getBlocks(blocksRequest(ledger, startUnixTime, endUnixTime, includeTransactions));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
return null;
}
}
/**
* <a
* href="#">返回时间范围内的区块</a>
* 阻塞
*/
public GetBlocksResponse getBlocksSync(String ledger, long startUnixTime, IncludeTransactions includeTransactions) {
return getBlocksSync(ledger, startUnixTime, -1, includeTransactions);
}
/**
* <a
* href="#">返回时间范围内的区块</a>
* 阻塞
*/
public GetBlocksResponse getBlocksSync(String ledger, long startUnixTime, long endUnixTime, IncludeTransactions includeTransactions) {
info(
"*** getBlocksSync: ledger={0} startUnixTime={1} endUnixTime={2}",
ledger, startUnixTime, endUnixTime);
try {
return queryBlockingStub.getBlocks(blocksRequest(ledger, startUnixTime, endUnixTime, includeTransactions));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
throw e;
}
}
/**
* <a
* href="#">返回账本中的区块数量</a>
* 非阻塞
*/
public ListenableFuture<CountBlocksResponse> countBlocks(String ledger) {
info("*** blockNumber: ledger={0}", ledger);
try {
return queryFutureStub.countBlocks(blocksRequest(ledger, -1, -1, null));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
return null;
}
}
/**
* <a
* href="#">返回账本中的区块数量</a>
* 阻塞
*/
public CountBlocksResponse countBlocksSync(String ledger) {
info("*** blockNumberSync: ledger={0}", ledger);
try {
return queryBlockingStub.countBlocks(blocksRequest(ledger, -1, -1, null));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
throw e;
}
}
private BlocksRequest blocksRequest(String ledger, long startTimestamp, long endTimestamp, IncludeTransactions includeTransactions) {
BlocksRequest.Builder reqBuilder =
BlocksRequest.newBuilder().setLedger(ledger);
if (startTimestamp != -1) {
reqBuilder.setStartTimestamp(startTimestamp);
}
if (endTimestamp != -1) {
reqBuilder.setEndTimestamp(endTimestamp);
}
if (includeTransactions == null) {
includeTransactions = IncludeTransactions.NONE;
}
reqBuilder.setIncludeTransactions(includeTransactions);
return reqBuilder.build();
}
/**
* <a
* href="#">返回时间戳最新的若干区块</a>
* 非阻塞
*/
public ListenableFuture<GetBlocksResponse> getRecentBlocks(String ledger, int count, IncludeTransactions includeTransactions) {
info(
"*** getRecentBlocks: ledger={0} count={1} includeTransactions={2}",
ledger, count, includeTransactions);
try {
return queryFutureStub.getRecentBlocks(getRecentBlocksRequest(ledger, count, includeTransactions));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
return null;
}
}
/**
* <a
* href="#">返回时间戳最新的若干区块</a>
* 阻塞
*/
public GetBlocksResponse getRecentBlocksSync(String ledger, int count, IncludeTransactions includeTransactions) {
info(
"*** getRecentBlocksSync: ledger={0} count={1} includeTransactions={2}",
ledger, count, includeTransactions);
try {
return queryBlockingStub.getRecentBlocks(getRecentBlocksRequest(ledger, count, includeTransactions));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
throw e;
}
}
private RecentBlocksRequest getRecentBlocksRequest(String ledger, int count, IncludeTransactions includeTransactions) {
RecentBlocksRequest.Builder reqBuilder =
RecentBlocksRequest.newBuilder()
.setLedger(ledger)
.setCount(count);
if (includeTransactions == null) {
includeTransactions = IncludeTransactions.NONE;
}
reqBuilder.setIncludeTransactions(includeTransactions);
return reqBuilder.build();
}
/**
* <a
* href="#">返回哈希所指定的事务</a>
* 非阻塞
*/
public ListenableFuture<GetTransactionByHashResponse> getTransactionByHash(String ledger, String hash) {
info("*** getTransactionByHash: ledger={0} hash={1}", ledger, hash);
try {
return queryFutureStub.getTransactionByHash(getTransactionByHashRequest(ledger, hash));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
return null;
}
}
/**
* <a
* href="#">返回哈希所指定的事务</a>
* 阻塞
*/
public GetTransactionByHashResponse getTransactionByHashSync(String ledger, String hash) {
info("*** getTransactionByHashSync: ledger={0} hash={1}", ledger, hash);
try {
return queryBlockingStub.getTransactionByHash(getTransactionByHashRequest(ledger, hash));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
throw e;
}
}
private GetTransactionByHashRequest getTransactionByHashRequest(String ledger, String hash) {
GetTransactionByHashRequest.Builder reqBuilder =
GetTransactionByHashRequest.newBuilder().setLedger(ledger);
if (hash != null) {
reqBuilder.setHash(ByteString.copyFrom(Utils.hexStringToByteArray(hash)));
}
return reqBuilder.build();
}
/**
* <a
* href="#">返回区块的哈希与事务的index所指定的事务</a>
* 非阻塞
*/
public ListenableFuture<GetTransactionByBlockHashAndIndexResponse> getTransactionByBlockHashAndIndex(
String ledger, String blockHash, int index) {
info(
"*** getTransactionByBlockHashAndIndex: ledger={0} blockHash={1} index={2}",
ledger, blockHash, index);
try {
return queryFutureStub.getTransactionByBlockHashAndIndex(
getTransactionByBlockHashAndIndexRequest(ledger, blockHash, index));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
return null;
}
}
/**
* <a
* href="#">返回区块的哈希与事务的index所指定的事务</a>
* 阻塞
*/
public GetTransactionByBlockHashAndIndexResponse getTransactionByBlockHashAndIndexSync(
String ledger, String blockHash, int index) {
info(
"*** getTransactionByBlockHashAndIndexSync: ledger={0} blockHash={1} index={2}",
ledger, blockHash, index);
try {
return queryBlockingStub.getTransactionByBlockHashAndIndex(
getTransactionByBlockHashAndIndexRequest(ledger, blockHash, index));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
throw e;
}
}
private GetTransactionByBlockHashAndIndexRequest getTransactionByBlockHashAndIndexRequest(
String ledger, String blockHash, int index) {
GetTransactionByBlockHashAndIndexRequest.Builder reqBuilder =
GetTransactionByBlockHashAndIndexRequest.newBuilder().setLedger(ledger).setIndex(index);
if (blockHash != null) {
reqBuilder.setBlockHash(ByteString.copyFrom(Utils.hexStringToByteArray(blockHash)));
}
return reqBuilder.build();
}
/**
* <a
* href="#">返回时间范围内的事务</a>
* 非阻塞
*/
public ListenableFuture<GetTransactionsResponse> getTransactions(String ledger, ZonedDateTime startDateTime) {
return getTransactions(ledger, startDateTime.toEpochSecond());
}
/**
* <a
* href="#">返回时间范围内的事务</a>
* 非阻塞
*/
public ListenableFuture<GetTransactionsResponse> getTransactions(String ledger, ZonedDateTime startDateTime, ZonedDateTime endDateTime) {
return getTransactions(ledger, startDateTime.toEpochSecond(), endDateTime.toEpochSecond());
}
/**
* <a
* href="#">返回时间范围内的事务</a>
* 阻塞
*/
public GetTransactionsResponse getTransactionsSync(String ledger, ZonedDateTime startDateTime) {
return getTransactionsSync(ledger, startDateTime.toEpochSecond());
}
/**
* <a
* href="#">返回时间范围内的事务</a>
* 阻塞
*/
public GetTransactionsResponse getTransactionsSync(String ledger, ZonedDateTime startDateTime, ZonedDateTime endDateTime) {
return getTransactionsSync(ledger, startDateTime.toEpochSecond(), endDateTime.toEpochSecond());
}
/**
* <a
* href="#">返回时间范围内的事务</a>
* 非阻塞
*/
public ListenableFuture<GetTransactionsResponse> getTransactions(String ledger, long startUnixTime) {
return getTransactions(ledger, startUnixTime, -1);
}
/**
* <a
* href="#">返回时间范围内的事务</a>
* 非阻塞
*/
public ListenableFuture<GetTransactionsResponse> getTransactions(String ledger, long startUnixTime, long endUnixTime) {
info(
"*** getTransactions: ledger={0} startUnixTime={1} endUnixTime={2}",
ledger, startUnixTime, endUnixTime);
try {
return queryFutureStub.getTransactions(transactionsRequest(ledger, startUnixTime, endUnixTime));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
return null;
}
}
/**
* <a
* href="#">返回时间范围内的事务</a>
* 阻塞
*/
public GetTransactionsResponse getTransactionsSync(String ledger, long startUnixTime) {
return getTransactionsSync(ledger, startUnixTime, -1);
}
/**
* <a
* href="#">返回时间范围内的事务</a>
* 阻塞
*/
public GetTransactionsResponse getTransactionsSync(String ledger, long startUnixTime, long endUnixTime) {
info(
"*** getTransactionsSync: ledger={0} startUnixTime={1} endUnixTime={2}",
ledger, startUnixTime, endUnixTime);
try {
return queryBlockingStub.getTransactions(transactionsRequest(ledger, startUnixTime, endUnixTime));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
throw e;
}
}
/**
* <a
* href="#">返回账本中的事务数量</a>
* 非阻塞
*/
public ListenableFuture<CountTransactionsResponse> countTransactions(String ledger) {
info("*** blockNumber: ledger={0}", ledger);
try {
return queryFutureStub.countTransactions(transactionsRequest(ledger, -1, -1));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
return null;
}
}
/**
* <a
* href="#">返回账本中的事务数量</a>
* 阻塞
*/
public CountTransactionsResponse countTransactionsSync(String ledger) {
info("*** blockNumberSync: ledger={0}", ledger);
try {
return queryBlockingStub.countTransactions(transactionsRequest(ledger, -1, -1));
} catch (StatusRuntimeException e) {
warning("RPC failed: {0}", e.getStatus());
throw e;
}
}
private TransactionsRequest transactionsRequest(String ledger, long startTimestamp, long endTimestamp) {
TransactionsRequest.Builder reqBuilder =
TransactionsRequest.newBuilder().setLedger(ledger);
if (startTimestamp != -1) {
reqBuilder.setStartTimestamp(startTimestamp);
}
if (endTimestamp != -1) {
reqBuilder.setEndTimestamp(endTimestamp);
}
return reqBuilder.build();
}
private void info(String msg, Object... params) {
logger.log(Level.INFO, msg, params);
}
private void warning(String msg, Object... params) {
logger.log(Level.WARNING, msg, params);
}
}

View File

@ -0,0 +1,33 @@
package org.bdware.bdledger.api.grpc;
import io.grpc.*;
import io.grpc.stub.AbstractStub;
import org.bdware.bdledger.api.grpc.pb.NodeGrpc;
public class GrpcHeaderInterceptor implements ClientInterceptor {
private final Metadata extraHeaders;
public GrpcHeaderInterceptor(Metadata metadata) {
this.extraHeaders = metadata;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT,RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// 合并额外的头信息
headers.merge(extraHeaders);
super.start(responseListener, headers);
}
};
}
public static <T extends AbstractStub<T>> T attachHeaders(T stub, Metadata metadata) {
Channel channel = stub.getChannel();
Channel interceptedChannel = ClientInterceptors.intercept(channel, new GrpcHeaderInterceptor(metadata));
return stub.withChannel(interceptedChannel);
}
}

View File

@ -0,0 +1,26 @@
package org.bdware.bdledger.api.grpc;
public class Utils {
public static byte[] hexStringToByteArray(String s) {
if (s.startsWith("0x")) {
s = s.substring(2);
}
int l = s.length();
byte[] data = new byte[l / 2];
for (int i = 0; i < l; i += 2) {
data[i / 2] =
(byte) ((Character.digit(s.charAt(i), 16) << 4) + Character.digit(s.charAt(i + 1), 16));
}
return data;
}
public static String byteArrayToHexString(byte[] bs) {
StringBuilder data = new StringBuilder();
for (byte b : bs) {
data.append(Integer.toHexString((b >> 4) & 0xf));
data.append(Integer.toHexString(b & 0xf));
}
return data.toString();
}
}