Files
go-trustlog/api/persistence/cursor_worker.go
ryan 4b72a37120 feat: 完善数据库持久化与存证功能
主要更新:

1. 数据库持久化功能
   - 支持三种策略:仅落库、既落库又存证、仅存证
   - 实现 Cursor Worker 异步扫描和存证机制
   - 实现 Retry Worker 失败重试机制
   - 支持 PostgreSQL、MySQL、SQLite 等多种数据库
   - 添加 ClientIP 和 ServerIP 字段(可空,仅落库)

2. 集群并发安全
   - 使用 SELECT FOR UPDATE SKIP LOCKED 防止重复处理
   - 实现 CAS (Compare-And-Set) 原子状态更新
   - 添加 updated_at 字段支持并发控制

3. Cursor 初始化优化
   - 自动基于历史数据初始化 cursor
   - 确保不遗漏任何历史记录
   - 修复 UPSERT 逻辑

4. 测试完善
   - 添加 E2E 集成测试(含 Pulsar 消费者验证)
   - 添加 PostgreSQL 集成测试
   - 添加 Pulsar 集成测试
   - 添加集群并发安全测试
   - 添加 Cursor 初始化验证测试
   - 补充大量单元测试,提升覆盖率

5. 工具脚本
   - 添加数据库迁移脚本
   - 添加 Cursor 状态检查工具
   - 添加 Cursor 初始化工具
   - 添加 Pulsar 消息验证工具

6. 文档清理
   - 删除冗余文档,只保留根目录 README

测试结果:
- 所有 E2E 测试通过(100%)
- 数据库持久化与异步存证流程验证通过
- 集群环境下的并发安全性验证通过
- Cursor 自动初始化和历史数据处理验证通过
2025-12-24 15:31:11 +08:00

590 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package persistence
import (
"context"
"database/sql"
"fmt"
"time"
"go.yandata.net/iod/iod/go-trustlog/api/logger"
"go.yandata.net/iod/iod/go-trustlog/api/model"
)
// OperationRecord 操作记录(包含数据库扩展字段)
type OperationRecord struct {
OpID string
OpActor string
DOID string
ProducerID string
RequestBodyHash string
ResponseBodyHash string
OpHash string
Sign string
OpSource string
OpType string
DOPrefix string
DORepository string
ClientIP *string
ServerIP *string
TrustlogStatus string
CreatedAt time.Time
}
// ToModel 转换为 model.Operation
func (r *OperationRecord) ToModel() *model.Operation {
return &model.Operation{
OpID: r.OpID,
OpActor: r.OpActor,
Doid: r.DOID,
ProducerID: r.ProducerID,
RequestBodyHash: &r.RequestBodyHash,
ResponseBodyHash: &r.ResponseBodyHash,
OpSource: model.Source(r.OpSource),
OpType: model.Type(r.OpType),
DoPrefix: r.DOPrefix,
DoRepository: r.DORepository,
ClientIP: r.ClientIP,
ServerIP: r.ServerIP,
}
}
// CursorWorkerConfig Cursor工作器配置
type CursorWorkerConfig struct {
// ScanInterval 扫描间隔默认10秒快速发现新记录
ScanInterval time.Duration
// BatchSize 批量处理大小默认100
BatchSize int
// CursorKey Cursor键默认 "operation_scan"
CursorKey string
// MaxRetryAttempt Cursor阶段最大重试次数默认1快速失败转入Retry
MaxRetryAttempt int
// Enabled 是否启用Cursor工作器默认启用
Enabled bool
}
// DefaultCursorWorkerConfig 默认Cursor工作器配置
func DefaultCursorWorkerConfig() CursorWorkerConfig {
return CursorWorkerConfig{
ScanInterval: 10 * time.Second,
BatchSize: 100,
CursorKey: "operation_scan",
MaxRetryAttempt: 1,
Enabled: true,
}
}
// CursorWorker Cursor工作器任务发现
// 职责扫描operation表发现新的待存证记录尝试存证
// 成功则更新状态,失败则加入重试表
type CursorWorker struct {
config CursorWorkerConfig
manager *PersistenceManager
logger logger.Logger
stopCh chan struct{}
}
// NewCursorWorker 创建Cursor工作器
func NewCursorWorker(config CursorWorkerConfig, manager *PersistenceManager) *CursorWorker {
if config.ScanInterval == 0 {
config.ScanInterval = 10 * time.Second
}
if config.BatchSize == 0 {
config.BatchSize = 100
}
if config.CursorKey == "" {
config.CursorKey = "operation_scan"
}
if config.MaxRetryAttempt == 0 {
config.MaxRetryAttempt = 1
}
// 注意Enabled 字段需要显式设置,这里不设置默认值
// 因为在 PersistenceClient 创建时会根据 EnableCursorWorker 参数来控制
return &CursorWorker{
config: config,
manager: manager,
logger: manager.logger,
stopCh: make(chan struct{}),
}
}
// Start 启动Cursor工作器
func (w *CursorWorker) Start(ctx context.Context) error {
if !w.config.Enabled {
w.logger.InfoContext(ctx, "cursor worker disabled, skipping start")
return nil
}
w.logger.InfoContext(ctx, "starting cursor worker",
"scanInterval", w.config.ScanInterval,
"batchSize", w.config.BatchSize,
"cursorKey", w.config.CursorKey,
)
// 初始化cursor如果不存在
if err := w.initCursor(ctx); err != nil {
return fmt.Errorf("failed to init cursor: %w", err)
}
// 启动定期扫描
go w.run(ctx)
return nil
}
// Stop 停止Cursor工作器
func (w *CursorWorker) Stop(ctx context.Context) error {
w.logger.InfoContext(ctx, "stopping cursor worker")
close(w.stopCh)
return nil
}
// run 运行循环
func (w *CursorWorker) run(ctx context.Context) {
ticker := time.NewTicker(w.config.ScanInterval)
defer ticker.Stop()
for {
select {
case <-w.stopCh:
w.logger.InfoContext(ctx, "cursor worker stopped")
return
case <-ticker.C:
w.scan(ctx)
}
}
}
// scan 扫描并处理未存证记录(集群并发安全版本)
func (w *CursorWorker) scan(ctx context.Context) {
w.logger.DebugContext(ctx, "cursor worker scanning",
"cursorKey", w.config.CursorKey,
)
// 1. 读取cursor
cursor, err := w.getCursor(ctx)
if err != nil {
w.logger.ErrorContext(ctx, "failed to get cursor",
"error", err,
)
return
}
w.logger.DebugContext(ctx, "cursor position",
"cursor", cursor,
)
// 2. 使用事务 + FOR UPDATE SKIP LOCKED 扫描新记录
// 这样可以避免多个 worker 处理相同的记录
tx, err := w.manager.db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
})
if err != nil {
w.logger.ErrorContext(ctx, "failed to begin transaction",
"error", err,
)
return
}
defer tx.Rollback() // 如果没有提交,确保回滚
operations, opIDs, err := w.findNewOperationsWithLock(ctx, tx, cursor)
if err != nil {
w.logger.ErrorContext(ctx, "failed to find new operations",
"error", err,
)
return
}
if len(operations) == 0 {
w.logger.DebugContext(ctx, "no new operations found")
tx.Commit() // 提交空事务
return
}
w.logger.InfoContext(ctx, "found new operations (locked for processing)",
"count", len(operations),
"opIDs", opIDs,
)
// 3. 处理每条记录(在事务中)
successCount := 0
for i, op := range operations {
if w.processOperationInTx(ctx, tx, op) {
successCount++
}
// 每处理 10 条提交一次,避免长时间锁定
if (i+1)%10 == 0 {
if err := tx.Commit(); err != nil {
w.logger.ErrorContext(ctx, "failed to commit transaction batch",
"error", err,
"processed", i+1,
)
return
}
// 开始新事务
tx, err = w.manager.db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
})
if err != nil {
w.logger.ErrorContext(ctx, "failed to begin new transaction",
"error", err,
)
return
}
defer tx.Rollback()
}
}
// 提交最后一批
if err := tx.Commit(); err != nil {
w.logger.ErrorContext(ctx, "failed to commit final transaction",
"error", err,
)
return
}
w.logger.InfoContext(ctx, "scan completed",
"total", len(operations),
"succeeded", successCount,
)
}
// initCursor 初始化cursor
func (w *CursorWorker) initCursor(ctx context.Context) error {
cursorRepo := w.manager.GetCursorRepo()
// 查询数据库中最早的 NOT_TRUSTLOGGED 记录
db := w.manager.db
var earliestTime sql.NullTime
err := db.QueryRowContext(ctx,
"SELECT MIN(created_at) FROM operation WHERE trustlog_status = $1",
StatusNotTrustlogged,
).Scan(&earliestTime)
if err != nil && err != sql.ErrNoRows {
w.logger.WarnContext(ctx, "failed to query earliest record, using default",
"error", err,
)
}
var initialValue string
if earliestTime.Valid {
// 使用最早记录之前 1 秒作为初始 cursor
initialValue = earliestTime.Time.Add(-1 * time.Second).Format(time.RFC3339Nano)
w.logger.InfoContext(ctx, "setting cursor based on earliest record",
"earliestRecord", earliestTime.Time,
"cursorValue", initialValue,
)
} else {
// 如果没有记录,使用一个很早的时间,确保不会漏掉任何记录
initialValue = time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano)
w.logger.InfoContext(ctx, "no records found, using default early time",
"cursorValue", initialValue,
)
}
err = cursorRepo.InitCursor(ctx, w.config.CursorKey, initialValue)
if err != nil {
return fmt.Errorf("failed to init cursor: %w", err)
}
w.logger.InfoContext(ctx, "cursor initialized",
"cursorKey", w.config.CursorKey,
"initialValue", initialValue,
)
return nil
}
// getCursor 获取cursor值
func (w *CursorWorker) getCursor(ctx context.Context) (string, error) {
cursorRepo := w.manager.GetCursorRepo()
cursor, err := cursorRepo.GetCursor(ctx, w.config.CursorKey)
if err != nil {
return "", fmt.Errorf("failed to get cursor: %w", err)
}
// 如果cursor为空使用一个很早的时间
if cursor == "" {
cursor = time.Time{}.Format(time.RFC3339Nano)
}
return cursor, nil
}
// updateCursor 更新cursor值
func (w *CursorWorker) updateCursor(ctx context.Context, value string) error {
cursorRepo := w.manager.GetCursorRepo()
err := cursorRepo.UpdateCursor(ctx, w.config.CursorKey, value)
if err != nil {
return fmt.Errorf("failed to update cursor: %w", err)
}
w.logger.DebugContext(ctx, "cursor updated",
"cursorKey", w.config.CursorKey,
"newValue", value,
)
return nil
}
// findNewOperationsWithLock 使用 FOR UPDATE SKIP LOCKED 查找新操作(集群安全)
func (w *CursorWorker) findNewOperationsWithLock(ctx context.Context, tx *sql.Tx, cursor string) ([]*OperationRecord, []string, error) {
// 使用 FOR UPDATE SKIP LOCKED 锁定记录
// 这样多个 worker 不会处理相同的记录
query := `
SELECT op_id, op_actor, doid, producer_id,
request_body_hash, response_body_hash, op_hash, sign,
op_source, op_type, do_prefix, do_repository,
client_ip, server_ip, trustlog_status, created_at
FROM operation
WHERE trustlog_status = $1
AND created_at > $2
ORDER BY created_at ASC
LIMIT $3
FOR UPDATE SKIP LOCKED
`
rows, err := tx.QueryContext(ctx, query, StatusNotTrustlogged, cursor, w.config.BatchSize)
if err != nil {
return nil, nil, fmt.Errorf("failed to query operations with lock: %w", err)
}
defer rows.Close()
var operations []*OperationRecord
var opIDs []string
for rows.Next() {
op := &OperationRecord{}
var clientIP, serverIP sql.NullString
var createdAt time.Time
err := rows.Scan(
&op.OpID, &op.OpActor, &op.DOID, &op.ProducerID,
&op.RequestBodyHash, &op.ResponseBodyHash, &op.OpHash, &op.Sign,
&op.OpSource, &op.OpType, &op.DOPrefix, &op.DORepository,
&clientIP, &serverIP, &op.TrustlogStatus, &createdAt,
)
if err != nil {
return nil, nil, fmt.Errorf("failed to scan operation: %w", err)
}
// 处理可空字段
if clientIP.Valid {
op.ClientIP = &clientIP.String
}
if serverIP.Valid {
op.ServerIP = &serverIP.String
}
op.CreatedAt = createdAt
operations = append(operations, op)
opIDs = append(opIDs, op.OpID)
}
return operations, opIDs, nil
}
// getStringOrEmpty 辅助函数:从指针获取字符串或空字符串
func getStringOrEmpty(s *string) string {
if s == nil {
return ""
}
return *s
}
// findNewOperations 查找新的待存证记录(旧版本,保留用于兼容)
func (w *CursorWorker) findNewOperations(ctx context.Context, cursor string) ([]*OperationRecord, error) {
db := w.manager.db
// 查询未存证的记录created_at > cursor
rows, err := db.QueryContext(ctx, `
SELECT op_id, op_actor, doid, producer_id,
request_body_hash, response_body_hash, op_hash, sign,
op_source, op_type, do_prefix, do_repository,
client_ip, server_ip, trustlog_status, created_at
FROM operation
WHERE trustlog_status = $1
AND created_at > $2
ORDER BY created_at ASC
LIMIT $3
`, StatusNotTrustlogged, cursor, w.config.BatchSize)
if err != nil {
return nil, fmt.Errorf("failed to query operations: %w", err)
}
defer rows.Close()
var operations []*OperationRecord
for rows.Next() {
op := &OperationRecord{}
var clientIP, serverIP sql.NullString
var createdAt time.Time
err := rows.Scan(
&op.OpID, &op.OpActor, &op.DOID, &op.ProducerID,
&op.RequestBodyHash, &op.ResponseBodyHash, &op.OpHash, &op.Sign,
&op.OpSource, &op.OpType, &op.DOPrefix, &op.DORepository,
&clientIP, &serverIP, &op.TrustlogStatus, &createdAt,
)
if err != nil {
return nil, fmt.Errorf("failed to scan operation: %w", err)
}
// 处理可空字段
if clientIP.Valid {
op.ClientIP = &clientIP.String
}
if serverIP.Valid {
op.ServerIP = &serverIP.String
}
op.CreatedAt = createdAt
operations = append(operations, op)
}
return operations, nil
}
// processOperationInTx 在事务中处理单条记录(集群安全版本)
// 返回 true 表示处理成功false 表示失败
func (w *CursorWorker) processOperationInTx(ctx context.Context, tx *sql.Tx, op *OperationRecord) bool {
w.logger.DebugContext(ctx, "processing operation in transaction",
"opID", op.OpID,
)
// 尝试存证
err := w.tryTrustlog(ctx, op)
if err != nil {
w.logger.WarnContext(ctx, "failed to trustlog operation",
"opID", op.OpID,
"error", err,
)
// 失败:加入重试表
retryRepo := w.manager.GetRetryRepo()
nextRetryAt := time.Now().Add(1 * time.Minute)
if retryErr := retryRepo.AddRetryTx(ctx, tx, op.OpID, err.Error(), nextRetryAt); retryErr != nil {
w.logger.ErrorContext(ctx, "failed to add to retry queue",
"opID", op.OpID,
"error", retryErr,
)
}
return false
}
// 成功:使用 CAS 更新状态
opRepo := w.manager.GetOperationRepo()
updated, err := opRepo.UpdateStatusWithCAS(ctx, tx, op.OpID, StatusNotTrustlogged, StatusTrustlogged)
if err != nil {
w.logger.ErrorContext(ctx, "failed to update operation status with CAS",
"opID", op.OpID,
"error", err,
)
return false
}
if !updated {
// CAS 失败,说明状态已被其他 worker 修改
w.logger.WarnContext(ctx, "operation already processed by another worker",
"opID", op.OpID,
)
return false
}
w.logger.InfoContext(ctx, "operation trustlogged successfully",
"opID", op.OpID,
)
// 更新cursor
w.updateCursor(ctx, op.CreatedAt.Format(time.RFC3339Nano))
return true
}
// processOperation 处理单条记录(旧版本,保留用于兼容)
func (w *CursorWorker) processOperation(ctx context.Context, op *OperationRecord) {
w.logger.DebugContext(ctx, "processing operation",
"opID", op.OpID,
)
// 尝试存证(最多重试 MaxRetryAttempt 次)
var lastErr error
for attempt := 0; attempt <= w.config.MaxRetryAttempt; attempt++ {
if attempt > 0 {
w.logger.DebugContext(ctx, "retrying trustlog",
"opID", op.OpID,
"attempt", attempt,
)
}
err := w.tryTrustlog(ctx, op)
if err == nil {
// 成功:更新状态
if err := w.updateOperationStatus(ctx, op.OpID, StatusTrustlogged); err != nil {
w.logger.ErrorContext(ctx, "failed to update operation status",
"opID", op.OpID,
"error", err,
)
} else {
w.logger.InfoContext(ctx, "operation trustlogged successfully",
"opID", op.OpID,
)
}
// 更新cursor
w.updateCursor(ctx, op.CreatedAt.Format(time.RFC3339Nano))
return
}
lastErr = err
if attempt < w.config.MaxRetryAttempt {
time.Sleep(time.Second) // 简单的重试延迟
}
}
// 失败:加入重试表
w.logger.WarnContext(ctx, "failed to trustlog in cursor worker, adding to retry queue",
"opID", op.OpID,
"error", lastErr,
)
retryRepo := w.manager.GetRetryRepo()
nextRetryAt := time.Now().Add(1 * time.Minute) // 1分钟后重试
if err := retryRepo.AddRetry(ctx, op.OpID, lastErr.Error(), nextRetryAt); err != nil {
w.logger.ErrorContext(ctx, "failed to add to retry queue",
"opID", op.OpID,
"error", err,
)
}
// 即使失败也更新cursor避免卡在同一条记录
w.updateCursor(ctx, op.CreatedAt.Format(time.RFC3339Nano))
}
// tryTrustlog 尝试存证(调用存证系统)
func (w *CursorWorker) tryTrustlog(ctx context.Context, op *OperationRecord) error {
publisher := w.manager.GetPublisher()
if publisher == nil {
return fmt.Errorf("publisher not available")
}
// 转换为 Operation 模型
modelOp := op.ToModel()
// 调用存证
if err := publisher.Publish(ctx, modelOp); err != nil {
return fmt.Errorf("failed to publish to trustlog: %w", err)
}
return nil
}
// updateOperationStatus 更新操作状态
func (w *CursorWorker) updateOperationStatus(ctx context.Context, opID string, status TrustlogStatus) error {
opRepo := w.manager.GetOperationRepo()
return opRepo.UpdateStatus(ctx, opID, status)
}