Files
go-trustlog/api/queryclient/client.go
ryan 0ec1d3b87d refactor: 更改module路径为独立仓库路径
- go.yandata.net/iod/iod/go-trustlog → go.yandata.net/wangsiyuan/go-trustlog
- 更新 go.mod module声明
- 更新 README.md 安装说明
- 批量更新所有 .go 文件中的 import 路径
- 61个文件受影响

这样go-trustlog可以作为独立SDK使用
2025-12-26 14:35:39 +08:00

442 lines
12 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package queryclient
import (
"context"
"errors"
"fmt"
"io"
"time"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
"go.yandata.net/wangsiyuan/go-trustlog/api/grpc/pb"
"go.yandata.net/wangsiyuan/go-trustlog/api/logger"
"go.yandata.net/wangsiyuan/go-trustlog/api/model"
"go.yandata.net/wangsiyuan/go-trustlog/internal/grpcclient"
)
const (
// defaultChannelBuffer 是channel的默认缓冲区大小.
defaultChannelBuffer = 10
)
// serverClients 封装单个服务器的两种服务客户端.
type serverClients struct {
opClient pb.OperationValidationServiceClient
recClient pb.RecordValidationServiceClient
}
// Client 查询客户端包装gRPC客户端提供操作和记录的查询及验证功能.
type Client struct {
connLB *grpcclient.LoadBalancer[*serverClients]
logger logger.Logger
}
// ClientConfig 客户端配置.
type ClientConfig = grpcclient.Config
// NewClient 创建新的查询客户端.
func NewClient(config ClientConfig, logger logger.Logger) (*Client, error) {
// 获取服务器地址列表
addrs, err := config.GetAddrs()
if err != nil {
return nil, err
}
// 创建连接负载均衡器,每个连接同时创建两种服务的客户端
connLB, err := grpcclient.NewLoadBalancer(
addrs,
config.DialOptions,
func(conn grpc.ClientConnInterface) *serverClients {
return &serverClients{
opClient: pb.NewOperationValidationServiceClient(conn),
recClient: pb.NewRecordValidationServiceClient(conn),
}
},
)
if err != nil {
return nil, err
}
logger.Info("Query client initialized", "serverCount", len(addrs))
return &Client{
connLB: connLB,
logger: logger,
}, nil
}
// ListOperationsRequest 列表查询请求参数.
type ListOperationsRequest struct {
// 分页参数
PageSize uint64 // 页面大小
PreTime time.Time // 上一页最后一个时间(用于游标分页)
// 可选过滤条件
Timestamp *time.Time // 操作时间戳
OpSource model.Source // 操作来源
OpCode model.OpCode // 操作代码int32
DoPrefix string // 数据前缀
DoRepository string // 数据仓库
}
// ListOperationsResponse 列表查询响应.
type ListOperationsResponse struct {
Count int64 // 数据总量
Data []*model.Operation // 操作列表
}
// ListOperations 查询操作列表.
func (c *Client) ListOperations(ctx context.Context, req ListOperationsRequest) (*ListOperationsResponse, error) {
c.logger.DebugContext(ctx, "Querying operations list", "pageSize", req.PageSize)
// 使用负载均衡器获取客户端
clients := c.connLB.Next()
client := clients.opClient
// 构建protobuf请求
pbReq := &pb.ListOperationReq{
PageSize: req.PageSize,
OpSource: string(req.OpSource),
OpCode: int32(req.OpCode),
DoPrefix: req.DoPrefix,
DoRepository: req.DoRepository,
}
// 设置可选参数
if !req.PreTime.IsZero() {
pbReq.PreTime = timestamppb.New(req.PreTime)
}
if req.Timestamp != nil {
pbReq.Timestamp = timestamppb.New(*req.Timestamp)
}
// 调用gRPC
pbRes, err := client.ListOperations(ctx, pbReq)
if err != nil {
return nil, fmt.Errorf("failed to list operations: %w", err)
}
// 转换响应
operations := make([]*model.Operation, 0, len(pbRes.GetData()))
for _, pbOp := range pbRes.GetData() {
op, convertErr := model.FromProtobuf(pbOp)
if convertErr != nil {
c.logger.ErrorContext(ctx, "Failed to convert operation", "error", convertErr)
continue
}
operations = append(operations, op)
}
return &ListOperationsResponse{
Count: pbRes.GetCount(),
Data: operations,
}, nil
}
// ValidationRequest 取证验证请求参数.
type ValidationRequest struct {
Time time.Time // 操作时间戳
OpID string // 操作唯一标识符
OpCode model.OpCode // 操作代码int32
DoRepository string // 数据仓库标识
}
// ValidateOperation 执行操作取证验证,返回流式结果通道
// 该方法会启动一个goroutine接收流式响应通过返回的channel发送结果
// 当流结束或发生错误时channel会被关闭.
//
//nolint:dupl // 与 ValidateRecord 有相似逻辑,但处理不同的数据类型和 gRPC 服务
func (c *Client) ValidateOperation(ctx context.Context, req ValidationRequest) (<-chan *model.ValidationResult, error) {
c.logger.InfoContext(ctx, "Starting validation for operation", "opID", req.OpID)
// 使用负载均衡器获取客户端
clients := c.connLB.Next()
client := clients.opClient
// 构建protobuf请求
pbReq := &pb.ValidationReq{
Time: timestamppb.New(req.Time),
OpId: req.OpID,
OpCode: int32(req.OpCode),
DoRepository: req.DoRepository,
}
// 调用gRPC流式方法
stream, err := client.ValidateOperation(ctx, pbReq)
if err != nil {
return nil, fmt.Errorf("failed to start validation: %w", err)
}
// 创建结果通道
resultChan := make(chan *model.ValidationResult, defaultChannelBuffer)
// 启动goroutine接收流式响应
go func() {
defer close(resultChan)
for {
pbRes, recvErr := stream.Recv()
if recvErr != nil {
if errors.Is(recvErr, io.EOF) {
// 流正常结束
c.logger.DebugContext(ctx, "Validation stream completed", "opID", req.OpID)
return
}
// 发生错误
c.logger.ErrorContext(ctx, "Error receiving validation result", "error", recvErr)
// 发送错误结果
resultChan <- &model.ValidationResult{
Code: model.ValidationCodeFailed,
Msg: fmt.Sprintf("Stream error: %v", recvErr),
}
return
}
// 转换并发送结果
result, convertErr := model.FromProtobufValidationResult(pbRes)
if convertErr != nil {
c.logger.ErrorContext(ctx, "Failed to convert validation result", "error", convertErr)
continue
}
select {
case resultChan <- result:
c.logger.DebugContext(ctx, "Sent validation result", "code", result.Code, "progress", result.Progress)
case <-ctx.Done():
c.logger.InfoContext(ctx, "Context cancelled, stopping validation stream")
return
}
}
}()
return resultChan, nil
}
// ValidateOperationSync 同步执行操作取证验证,阻塞直到获得最终结果
// 该方法会处理所有中间进度,只返回最终的完成结果.
func (c *Client) ValidateOperationSync(
ctx context.Context,
req ValidationRequest,
progressCallback func(*model.ValidationResult),
) (*model.ValidationResult, error) {
resultChan, err := c.ValidateOperation(ctx, req)
if err != nil {
return nil, err
}
var finalResult *model.ValidationResult
for result := range resultChan {
if result.IsCompleted() || result.IsFailed() {
finalResult = result
break
}
// 如果提供了进度回调,则调用
if progressCallback != nil {
progressCallback(result)
}
}
if finalResult == nil {
return nil, errors.New("validation completed without final result")
}
return finalResult, nil
}
// ListRecordsRequest 列表查询请求参数.
type ListRecordsRequest struct {
// 分页参数
PageSize uint64 // 页面大小
PreTime time.Time // 上一页最后一个时间(用于游标分页)
// 可选过滤条件
DoPrefix string // 数据前缀
RCType string // 记录类型
}
// ListRecordsResponse 列表查询响应.
type ListRecordsResponse struct {
Count int64 // 数据总量
Data []*model.Record // 记录列表
}
// ListRecords 查询记录列表.
func (c *Client) ListRecords(ctx context.Context, req ListRecordsRequest) (*ListRecordsResponse, error) {
c.logger.DebugContext(ctx, "Querying records list", "pageSize", req.PageSize)
// 使用负载均衡器获取客户端
clients := c.connLB.Next()
client := clients.recClient
// 构建protobuf请求
pbReq := &pb.ListRecordReq{
PageSize: req.PageSize,
DoPrefix: req.DoPrefix,
RcType: req.RCType,
}
// 设置可选参数
if !req.PreTime.IsZero() {
pbReq.PreTime = timestamppb.New(req.PreTime)
}
// 调用gRPC
pbRes, err := client.ListRecords(ctx, pbReq)
if err != nil {
return nil, fmt.Errorf("failed to list records: %w", err)
}
// 转换响应
records := make([]*model.Record, 0, len(pbRes.GetData()))
for _, pbRec := range pbRes.GetData() {
rec, convertErr := model.RecordFromProtobuf(pbRec)
if convertErr != nil {
c.logger.ErrorContext(ctx, "Failed to convert record", "error", convertErr)
continue
}
records = append(records, rec)
}
return &ListRecordsResponse{
Count: pbRes.GetCount(),
Data: records,
}, nil
}
// RecordValidationRequest 记录验证请求参数.
type RecordValidationRequest struct {
Timestamp time.Time // 记录时间戳
RecordID string // 要验证的记录ID
DoPrefix string // 数据前缀(可选)
RCType string // 记录类型
}
// ValidateRecord 执行记录验证,返回流式结果通道
// 该方法会启动一个goroutine接收流式响应通过返回的channel发送结果
// 当流结束或发生错误时channel会被关闭.
//
//nolint:dupl // 与 ValidateOperation 有相似逻辑,但处理不同的数据类型和 gRPC 服务
func (c *Client) ValidateRecord(
ctx context.Context,
req RecordValidationRequest,
) (<-chan *model.RecordValidationResult, error) {
c.logger.InfoContext(ctx, "Starting validation for record", "recordID", req.RecordID)
// 使用负载均衡器获取客户端
clients := c.connLB.Next()
client := clients.recClient
// 构建protobuf请求
pbReq := &pb.RecordValidationReq{
Timestamp: timestamppb.New(req.Timestamp),
RecordId: req.RecordID,
DoPrefix: req.DoPrefix,
RcType: req.RCType,
}
// 调用gRPC流式方法
stream, err := client.ValidateRecord(ctx, pbReq)
if err != nil {
return nil, fmt.Errorf("failed to start validation: %w", err)
}
// 创建结果通道
resultChan := make(chan *model.RecordValidationResult, defaultChannelBuffer)
// 启动goroutine接收流式响应
go func() {
defer close(resultChan)
for {
pbRes, recvErr := stream.Recv()
if recvErr != nil {
if errors.Is(recvErr, io.EOF) {
// 流正常结束
c.logger.DebugContext(ctx, "Validation stream completed", "recordID", req.RecordID)
return
}
// 发生错误
c.logger.ErrorContext(ctx, "Error receiving validation result", "error", recvErr)
// 发送错误结果
resultChan <- &model.RecordValidationResult{
Code: model.ValidationCodeFailed,
Msg: fmt.Sprintf("Stream error: %v", recvErr),
}
return
}
// 转换并发送结果
result, convertErr := model.RecordFromProtobufValidationResult(pbRes)
if convertErr != nil {
c.logger.ErrorContext(ctx, "Failed to convert validation result", "error", convertErr)
continue
}
select {
case resultChan <- result:
c.logger.DebugContext(ctx, "Sent validation result", "code", result.Code, "progress", result.Progress)
case <-ctx.Done():
c.logger.InfoContext(ctx, "Context cancelled, stopping validation stream")
return
}
}
}()
return resultChan, nil
}
// ValidateRecordSync 同步执行记录验证,阻塞直到获得最终结果
// 该方法会处理所有中间进度,只返回最终的完成结果.
func (c *Client) ValidateRecordSync(
ctx context.Context,
req RecordValidationRequest,
progressCallback func(*model.RecordValidationResult),
) (*model.RecordValidationResult, error) {
resultChan, err := c.ValidateRecord(ctx, req)
if err != nil {
return nil, err
}
var finalResult *model.RecordValidationResult
for result := range resultChan {
if result.IsCompleted() || result.IsFailed() {
finalResult = result
break
}
// 如果提供了进度回调,则调用
if progressCallback != nil {
progressCallback(result)
}
}
if finalResult == nil {
return nil, errors.New("validation completed without final result")
}
return finalResult, nil
}
// Close 关闭客户端连接.
func (c *Client) Close() error {
if c.connLB != nil {
return c.connLB.Close()
}
return nil
}
// GetLowLevelOperationClient 获取底层的操作gRPC客户端用于高级用户自定义操作
// 注意:使用负载均衡时,每次调用此方法将返回轮询的下一个客户端.
func (c *Client) GetLowLevelOperationClient() pb.OperationValidationServiceClient {
return c.connLB.Next().opClient
}
// GetLowLevelRecordClient 获取底层的记录gRPC客户端用于高级用户自定义操作
// 注意:使用负载均衡时,每次调用此方法将返回轮询的下一个客户端.
func (c *Client) GetLowLevelRecordClient() pb.RecordValidationServiceClient {
return c.connLB.Next().recClient
}