Files
go-trustlog/api/persistence/client.go
ryan bdca8b59e0 feat: 添加 Operation 查询功能及完整测试
主要功能:
- 新增 OperationQueryRequest/OperationQueryResult 结构体
- 实现 Repository.Query() - 支持多条件筛选、分页、排序
- 实现 Repository.Count() - 统计记录数
- 新增 PersistenceClient.QueryOperations/CountOperations/GetOperationByID

查询功能:
- 支持按 OpID、OpSource、OpType、Doid 等字段筛选
- 支持模糊查询(Doid、DoPrefix)
- 支持时间范围查询(TimeFrom/TimeTo)
- 支持 IP 地址筛选(ClientIP、ServerIP)
- 支持按 TrustlogStatus 筛选
- 支持组合查询
- 支持分页(PageSize、PageNumber)
- 支持排序(OrderBy、OrderDesc)

测试覆盖:
-  query_test.go - 查询功能单元测试
-  pg_query_integration_test.go - PostgreSQL 集成测试(16个测试用例)
  * Query all records
  * Filter by OpSource/OpType/Status/Actor/Producer/IP
  * DOID 模糊查询
  * 时间范围查询
  * 分页测试
  * 排序测试(升序/降序)
  * 组合查询
  * Count 统计
  * PersistenceClient 接口测试

修复:
- 修复 TestClusterSafety_MultipleCursorWorkers - 添加缺失字段
- 修复 TestCursorInitialization - 确保 schema 最新
- 添加自动 schema 更新(ALTER TABLE IF NOT EXISTS)

测试结果:
-  所有单元测试通过(100%)
-  所有集成测试通过(PostgreSQL、Pulsar、E2E)
-  Query 功能测试通过(16个测试用例)
2025-12-24 17:20:09 +08:00

441 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package persistence
import (
"context"
"errors"
"fmt"
"github.com/ThreeDotsLabs/watermill/message"
"go.yandata.net/iod/iod/go-trustlog/api/adapter"
"go.yandata.net/iod/iod/go-trustlog/api/logger"
"go.yandata.net/iod/iod/go-trustlog/api/model"
)
// operationPublisherAdapter 适配器,将 PersistenceClient 的 publishToTrustlog 方法适配为 OperationPublisher 接口
type operationPublisherAdapter struct {
client *PersistenceClient
}
func (a *operationPublisherAdapter) Publish(ctx context.Context, op *model.Operation) error {
return a.client.publishToTrustlog(ctx, op)
}
// PersistenceClient 支持数据库持久化的存证客户端
// 在原有 HighClient 功能基础上,增加了数据库持久化支持
type PersistenceClient struct {
publisher message.Publisher
logger logger.Logger
envelopeConfig model.EnvelopeConfig
manager *PersistenceManager
cursorWorker *CursorWorker
retryWorker *RetryWorker
}
// PersistenceClientConfig 客户端配置
type PersistenceClientConfig struct {
// Publisher 消息发布器(用于存证)
Publisher message.Publisher
// Logger 日志记录器
Logger logger.Logger
// EnvelopeConfig SM2密钥配置用于签名和序列化
EnvelopeConfig model.EnvelopeConfig
// DBConfig 数据库配置
DBConfig DBConfig
// PersistenceConfig 持久化策略配置
PersistenceConfig PersistenceConfig
// CursorWorkerConfig Cursor工作器配置可选
CursorWorkerConfig *CursorWorkerConfig
// EnableCursorWorker 是否启用Cursor工作器仅对 StrategyDBAndTrustlog 有效)
EnableCursorWorker bool
// RetryWorkerConfig 重试工作器配置(可选)
RetryWorkerConfig *RetryWorkerConfig
// EnableRetryWorker 是否启用重试工作器(仅对 StrategyDBAndTrustlog 有效)
EnableRetryWorker bool
}
// NewPersistenceClient 创建支持数据库持久化的存证客户端
func NewPersistenceClient(ctx context.Context, config PersistenceClientConfig) (*PersistenceClient, error) {
if config.Logger == nil {
return nil, errors.New("logger is required")
}
// 创建数据库连接
db, err := NewDB(config.DBConfig)
if err != nil {
config.Logger.ErrorContext(ctx, "failed to create database connection",
"error", err,
)
return nil, fmt.Errorf("failed to create database connection: %w", err)
}
// 创建持久化管理器
manager := NewPersistenceManager(db, config.PersistenceConfig, config.Logger)
// 初始化数据库表结构
if err := manager.InitSchema(ctx, config.DBConfig.DriverName); err != nil {
db.Close()
return nil, fmt.Errorf("failed to initialize database schema: %w", err)
}
client := &PersistenceClient{
publisher: config.Publisher,
logger: config.Logger,
envelopeConfig: config.EnvelopeConfig,
manager: manager,
}
// 创建 Operation Publisher 适配器(将 PersistenceClient 的 publishToTrustlog 方法适配为 OperationPublisher 接口)
opPublisher := &operationPublisherAdapter{
client: client,
}
manager.SetPublisher(opPublisher)
// 如果启用Cursor工作器且策略为 StrategyDBAndTrustlog
if config.EnableCursorWorker && config.PersistenceConfig.Strategy == StrategyDBAndTrustlog {
var workerConfig CursorWorkerConfig
if config.CursorWorkerConfig != nil {
workerConfig = *config.CursorWorkerConfig
} else {
workerConfig = DefaultCursorWorkerConfig()
}
// 确保 Enabled 字段被正确设置
workerConfig.Enabled = true
client.cursorWorker = NewCursorWorker(workerConfig, manager)
if err := client.cursorWorker.Start(ctx); err != nil {
db.Close()
return nil, fmt.Errorf("failed to start cursor worker: %w", err)
}
config.Logger.InfoContext(ctx, "cursor worker started",
"strategy", config.PersistenceConfig.Strategy.String(),
"scanInterval", workerConfig.ScanInterval,
)
}
// 如果启用重试工作器且策略为 StrategyDBAndTrustlog
if config.EnableRetryWorker && config.PersistenceConfig.Strategy == StrategyDBAndTrustlog {
var workerConfig RetryWorkerConfig
if config.RetryWorkerConfig != nil {
workerConfig = *config.RetryWorkerConfig
} else {
workerConfig = DefaultRetryWorkerConfig()
}
client.retryWorker = NewRetryWorker(workerConfig, manager, config.Publisher, config.Logger)
go client.retryWorker.Start(ctx)
config.Logger.InfoContext(ctx, "retry worker started",
"strategy", config.PersistenceConfig.Strategy.String(),
"retryInterval", workerConfig.RetryInterval,
)
}
config.Logger.InfoContext(ctx, "persistence client created",
"strategy", config.PersistenceConfig.Strategy.String(),
"cursorEnabled", config.EnableCursorWorker,
"retryEnabled", config.EnableRetryWorker,
)
return client, nil
}
// OperationPublish 发布操作记录
// 根据配置的策略,选择仅落库、既落库又存证或仅存证
func (c *PersistenceClient) OperationPublish(ctx context.Context, operation *model.Operation) error {
if operation == nil {
c.logger.ErrorContext(ctx, "operation publish failed: operation is nil")
return errors.New("operation cannot be nil")
}
c.logger.DebugContext(ctx, "publishing operation with persistence",
"opID", operation.OpID,
"opType", operation.OpType,
"strategy", c.manager.config.Strategy.String(),
)
strategy := c.manager.config.Strategy
switch strategy {
case StrategyDBOnly:
// 仅落库
return c.publishDBOnly(ctx, operation)
case StrategyDBAndTrustlog:
// 既落库又存证
return c.publishDBAndTrustlog(ctx, operation)
case StrategyTrustlogOnly:
// 仅存证
return c.publishTrustlogOnly(ctx, operation)
default:
return fmt.Errorf("unknown persistence strategy: %s", strategy.String())
}
}
// publishDBOnly 仅落库策略
func (c *PersistenceClient) publishDBOnly(ctx context.Context, operation *model.Operation) error {
c.logger.DebugContext(ctx, "publishing operation with DB_ONLY strategy",
"opID", operation.OpID,
)
// 保存到数据库
if err := c.manager.SaveOperation(ctx, operation); err != nil {
c.logger.ErrorContext(ctx, "failed to save operation to database",
"opID", operation.OpID,
"error", err,
)
return err
}
c.logger.InfoContext(ctx, "operation saved with DB_ONLY strategy",
"opID", operation.OpID,
)
return nil
}
// publishDBAndTrustlog 既落库又存证策略
func (c *PersistenceClient) publishDBAndTrustlog(ctx context.Context, operation *model.Operation) error {
c.logger.DebugContext(ctx, "publishing operation with DB_AND_TRUSTLOG strategy",
"opID", operation.OpID,
)
// 先保存到数据库(状态为未存证)
if err := c.manager.SaveOperation(ctx, operation); err != nil {
c.logger.ErrorContext(ctx, "failed to save operation to database",
"opID", operation.OpID,
"error", err,
)
return err
}
// 尝试发布到存证系统
if err := c.publishToTrustlog(ctx, operation); err != nil {
c.logger.WarnContext(ctx, "failed to publish to trustlog, will retry later",
"opID", operation.OpID,
"error", err,
)
// 发布失败,但数据库已保存,重试工作器会处理
return nil
}
// 发布成功,更新状态为已存证
if err := c.manager.GetOperationRepo().UpdateStatus(ctx, operation.OpID, StatusTrustlogged); err != nil {
c.logger.ErrorContext(ctx, "failed to update operation status",
"opID", operation.OpID,
"error", err,
)
// 状态更新失败,但消息已发送,重试工作器会清理
}
// 删除重试记录(如果存在)
c.manager.GetRetryRepo().DeleteRetry(ctx, operation.OpID)
c.logger.InfoContext(ctx, "operation published with DB_AND_TRUSTLOG strategy",
"opID", operation.OpID,
)
return nil
}
// publishTrustlogOnly 仅存证策略
func (c *PersistenceClient) publishTrustlogOnly(ctx context.Context, operation *model.Operation) error {
c.logger.DebugContext(ctx, "publishing operation with TRUSTLOG_ONLY strategy",
"opID", operation.OpID,
)
// 直接发布到存证系统
if err := c.publishToTrustlog(ctx, operation); err != nil {
c.logger.ErrorContext(ctx, "failed to publish to trustlog",
"opID", operation.OpID,
"error", err,
)
return err
}
c.logger.InfoContext(ctx, "operation published with TRUSTLOG_ONLY strategy",
"opID", operation.OpID,
)
return nil
}
// publishToTrustlog 发布到存证系统(使用 Envelope 格式)
func (c *PersistenceClient) publishToTrustlog(ctx context.Context, operation *model.Operation) error {
messageKey := operation.Key()
c.logger.DebugContext(ctx, "starting envelope serialization",
"messageKey", messageKey,
)
// 使用 Envelope 序列化
envelopeData, err := model.MarshalTrustlog(operation, c.envelopeConfig)
if err != nil {
c.logger.ErrorContext(ctx, "envelope serialization failed",
"messageKey", messageKey,
"error", err,
)
return fmt.Errorf("failed to marshal envelope: %w", err)
}
c.logger.DebugContext(ctx, "envelope serialized successfully",
"messageKey", messageKey,
"envelopeSize", len(envelopeData),
)
msg := message.NewMessage(messageKey, envelopeData)
c.logger.DebugContext(ctx, "publishing message to topic",
"messageKey", messageKey,
"topic", adapter.OperationTopic,
)
if publishErr := c.publisher.Publish(adapter.OperationTopic, msg); publishErr != nil {
c.logger.ErrorContext(ctx, "failed to publish to topic",
"messageKey", messageKey,
"topic", adapter.OperationTopic,
"error", publishErr,
)
return fmt.Errorf("failed to publish message to topic %s: %w", adapter.OperationTopic, publishErr)
}
c.logger.DebugContext(ctx, "message published to topic successfully",
"messageKey", messageKey,
"topic", adapter.OperationTopic,
)
return nil
}
// RecordPublish 发布记录Record 类型不支持数据库持久化)
func (c *PersistenceClient) RecordPublish(ctx context.Context, record *model.Record) error {
if record == nil {
c.logger.ErrorContext(ctx, "record publish failed: record is nil")
return errors.New("record cannot be nil")
}
c.logger.DebugContext(ctx, "publishing record",
"recordID", record.ID,
"rcType", record.RCType,
)
// Record 类型直接发布到存证系统,不落库
messageKey := record.Key()
envelopeData, err := model.MarshalTrustlog(record, c.envelopeConfig)
if err != nil {
c.logger.ErrorContext(ctx, "envelope serialization failed",
"recordID", record.ID,
"error", err,
)
return fmt.Errorf("failed to marshal envelope: %w", err)
}
msg := message.NewMessage(messageKey, envelopeData)
if publishErr := c.publisher.Publish(adapter.RecordTopic, msg); publishErr != nil {
c.logger.ErrorContext(ctx, "failed to publish record to topic",
"recordID", record.ID,
"error", publishErr,
)
return fmt.Errorf("failed to publish record to topic %s: %w", adapter.RecordTopic, publishErr)
}
c.logger.InfoContext(ctx, "record published successfully",
"recordID", record.ID,
"rcType", record.RCType,
)
return nil
}
// GetLow 获取底层 Publisher
func (c *PersistenceClient) GetLow() message.Publisher {
return c.publisher
}
// GetManager 获取持久化管理器
func (c *PersistenceClient) GetManager() *PersistenceManager {
return c.manager
}
// Close 关闭客户端
func (c *PersistenceClient) Close() error {
ctx := context.Background()
c.logger.Info("closing persistence client")
// 停止Cursor工作器
if c.cursorWorker != nil {
if err := c.cursorWorker.Stop(ctx); err != nil {
c.logger.Error("failed to stop cursor worker",
"error", err,
)
}
}
// 停止重试工作器
if c.retryWorker != nil {
c.retryWorker.Stop()
}
// 关闭数据库连接
if err := c.manager.Close(); err != nil {
c.logger.Error("failed to close database",
"error", err,
)
return err
}
// 关闭 Publisher如果存在
if c.publisher != nil {
if err := c.publisher.Close(); err != nil {
c.logger.Error("failed to close publisher",
"error", err,
)
return err
}
}
c.logger.Info("persistence client closed successfully")
return nil
}
// QueryOperations 查询操作记录(支持分页、筛选、排序)
func (c *PersistenceClient) QueryOperations(ctx context.Context, req *OperationQueryRequest) (*OperationQueryResult, error) {
if c.manager == nil {
return nil, fmt.Errorf("persistence manager not initialized")
}
repo := c.manager.GetOperationRepo()
if repo == nil {
return nil, fmt.Errorf("operation repository not available")
}
return repo.Query(ctx, req)
}
// CountOperations 统计符合条件的操作记录数
func (c *PersistenceClient) CountOperations(ctx context.Context, req *OperationQueryRequest) (int64, error) {
if c.manager == nil {
return 0, fmt.Errorf("persistence manager not initialized")
}
repo := c.manager.GetOperationRepo()
if repo == nil {
return 0, fmt.Errorf("operation repository not available")
}
return repo.Count(ctx, req)
}
// GetOperationByID 根据 OpID 查询单条操作记录
func (c *PersistenceClient) GetOperationByID(ctx context.Context, opID string) (*model.Operation, TrustlogStatus, error) {
if c.manager == nil {
return nil, "", fmt.Errorf("persistence manager not initialized")
}
repo := c.manager.GetOperationRepo()
if repo == nil {
return nil, "", fmt.Errorf("operation repository not available")
}
return repo.FindByID(ctx, opID)
}