Files
go-trustlog/api/persistence/repository.go
ryan 88f80ffa5e feat: 新增数据库持久化模块(Persistence),实现 Cursor + Retry 双层架构
## 核心功能

### 1. 数据库持久化支持
- 新增完整的 Persistence 模块 (api/persistence/)
- 支持三种持久化策略:
  * StrategyDBOnly - 仅落库,不存证
  * StrategyDBAndTrustlog - 既落库又存证(推荐)
  * StrategyTrustlogOnly - 仅存证,不落库
- 支持多数据库:PostgreSQL, MySQL, SQLite

### 2. Cursor + Retry 双层架构
- CursorWorker:第一道防线,快速发现新记录并尝试存证
  * 增量扫描 operation 表(基于时间戳游标)
  * 默认 10 秒扫描间隔,批量处理 100 条
  * 成功更新状态,失败转入重试队列
- RetryWorker:第二道防线,处理失败记录
  * 指数退避重试(1m → 2m → 4m → 8m → 16m)
  * 默认最多重试 5 次
  * 超限自动标记为死信

### 3. 数据库表设计
- operation 表:存储操作记录,支持可空 IP 字段
- trustlog_cursor 表:Key-Value 模式,支持多游标
- trustlog_retry 表:重试队列,支持指数退避

### 4. 异步最终一致性
- 应用调用立即返回(仅落库)
- CursorWorker 异步扫描并存证
- RetryWorker 保障失败重试
- 完整的监控和死信处理机制

## 修改文件

### 核心代码(11个文件)
- api/persistence/cursor_worker.go - Cursor 工作器(新增)
- api/persistence/repository.go - 数据仓储层(新增)
- api/persistence/schema.go - 数据库 Schema(新增)
- api/persistence/strategy.go - 策略管理器(新增)
- api/persistence/client.go - 客户端封装(新增)
- api/persistence/retry_worker.go - Retry 工作器(新增)
- api/persistence/config.go - 配置管理(新增)

### 修复内部包引用(5个文件)
- api/adapter/publisher.go - 修复 internal 包引用
- api/adapter/subscriber.go - 修复 internal 包引用
- api/model/envelope.go - 修复 internal 包引用
- api/model/operation.go - 修复 internal 包引用
- api/model/record.go - 修复 internal 包引用

### 单元测试(8个文件)
- api/persistence/*_test.go - 完整的单元测试
- 测试覆盖率:28.5%
- 测试通过率:49/49 (100%)

### SQL 脚本(4个文件)
- api/persistence/sql/postgresql.sql - PostgreSQL 建表脚本
- api/persistence/sql/mysql.sql - MySQL 建表脚本
- api/persistence/sql/sqlite.sql - SQLite 建表脚本
- api/persistence/sql/test_data.sql - 测试数据

### 文档(2个文件)
- README.md - 更新主文档,新增 Persistence 使用指南
- api/persistence/README.md - 完整的 Persistence 文档
- api/persistence/sql/README.md - SQL 脚本说明

## 技术亮点

1. **充分利用 Cursor 游标表**
   - 作为任务发现队列,非简单的位置记录
   - Key-Value 模式,支持多游标并发扫描
   - 时间戳天然有序,增量扫描高效

2. **双层保障机制**
   - Cursor:正常流程,快速处理
   - Retry:异常流程,可靠重试
   - 职责分离,监控清晰

3. **可空 IP 字段支持**
   - ClientIP 和 ServerIP 使用 *string 类型
   - 支持 NULL 值,符合数据库最佳实践
   - 使用 sql.NullString 正确处理

4. **完整的监控支持**
   - 未存证记录数监控
   - Cursor 延迟监控
   - 重试队列长度监控
   - 死信队列监控

## 测试结果

-  单元测试:49/49 通过 (100%)
-  代码覆盖率:28.5%
-  编译状态:无错误
-  支持数据库:PostgreSQL, MySQL, SQLite

## Breaking Changes

无破坏性变更。Persistence 模块作为可选功能,不影响现有代码。

## 版本信息

- 版本:v2.1.0
- Go 版本要求:1.21+
- 更新日期:2025-12-23
2025-12-23 18:59:43 +08:00

606 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package persistence
import (
"context"
"database/sql"
"fmt"
"time"
"go.yandata.net/iod/iod/go-trustlog/api/logger"
"go.yandata.net/iod/iod/go-trustlog/api/model"
)
// OperationRepository 操作记录数据库仓储接口
type OperationRepository interface {
// Save 保存操作记录到数据库
Save(ctx context.Context, op *model.Operation, status TrustlogStatus) error
// SaveTx 在事务中保存操作记录
SaveTx(ctx context.Context, tx *sql.Tx, op *model.Operation, status TrustlogStatus) error
// UpdateStatus 更新操作记录的存证状态
UpdateStatus(ctx context.Context, opID string, status TrustlogStatus) error
// UpdateStatusTx 在事务中更新操作记录的存证状态
UpdateStatusTx(ctx context.Context, tx *sql.Tx, opID string, status TrustlogStatus) error
// FindByID 根据 OpID 查询操作记录
FindByID(ctx context.Context, opID string) (*model.Operation, TrustlogStatus, error)
// FindUntrustlogged 查询未存证的操作记录(用于重试机制)
FindUntrustlogged(ctx context.Context, limit int) ([]*model.Operation, error)
}
// CursorRepository 游标仓储接口Key-Value 模式)
type CursorRepository interface {
// GetCursor 获取游标值
GetCursor(ctx context.Context, cursorKey string) (string, error)
// UpdateCursor 更新游标值
UpdateCursor(ctx context.Context, cursorKey string, cursorValue string) error
// UpdateCursorTx 在事务中更新游标值
UpdateCursorTx(ctx context.Context, tx *sql.Tx, cursorKey string, cursorValue string) error
// InitCursor 初始化游标(如果不存在)
InitCursor(ctx context.Context, cursorKey string, initialValue string) error
}
// RetryRepository 重试仓储接口
type RetryRepository interface {
// AddRetry 添加重试记录
AddRetry(ctx context.Context, opID string, errorMsg string, nextRetryAt time.Time) error
// AddRetryTx 在事务中添加重试记录
AddRetryTx(ctx context.Context, tx *sql.Tx, opID string, errorMsg string, nextRetryAt time.Time) error
// IncrementRetry 增加重试次数
IncrementRetry(ctx context.Context, opID string, errorMsg string, nextRetryAt time.Time) error
// MarkAsDeadLetter 标记为死信
MarkAsDeadLetter(ctx context.Context, opID string, errorMsg string) error
// FindPendingRetries 查找待重试的记录
FindPendingRetries(ctx context.Context, limit int) ([]RetryRecord, error)
// DeleteRetry 删除重试记录(成功后清理)
DeleteRetry(ctx context.Context, opID string) error
}
// RetryRecord 重试记录
type RetryRecord struct {
OpID string
RetryCount int
RetryStatus RetryStatus
LastRetryAt *time.Time
NextRetryAt *time.Time
ErrorMessage string
CreatedAt time.Time
UpdatedAt time.Time
}
// operationRepository 操作记录仓储实现
type operationRepository struct {
db *sql.DB
logger logger.Logger
}
// NewOperationRepository 创建操作记录仓储
func NewOperationRepository(db *sql.DB, log logger.Logger) OperationRepository {
return &operationRepository{
db: db,
logger: log,
}
}
func (r *operationRepository) Save(ctx context.Context, op *model.Operation, status TrustlogStatus) error {
return r.SaveTx(ctx, nil, op, status)
}
func (r *operationRepository) SaveTx(ctx context.Context, tx *sql.Tx, op *model.Operation, status TrustlogStatus) error {
query := `
INSERT INTO operation (
op_id, op_actor, doid, producer_id,
request_body_hash, response_body_hash,
op_source, op_type, do_prefix, do_repository,
client_ip, server_ip, trustlog_status, timestamp
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`
var reqHash, respHash, clientIP, serverIP sql.NullString
if op.RequestBodyHash != nil {
reqHash = sql.NullString{String: *op.RequestBodyHash, Valid: true}
}
if op.ResponseBodyHash != nil {
respHash = sql.NullString{String: *op.ResponseBodyHash, Valid: true}
}
if op.ClientIP != nil {
clientIP = sql.NullString{String: *op.ClientIP, Valid: true}
}
if op.ServerIP != nil {
serverIP = sql.NullString{String: *op.ServerIP, Valid: true}
}
args := []interface{}{
op.OpID,
op.OpActor,
op.Doid,
op.ProducerID,
reqHash,
respHash,
string(op.OpSource),
string(op.OpType),
op.DoPrefix,
op.DoRepository,
clientIP,
serverIP,
string(status),
op.Timestamp,
}
var err error
if tx != nil {
_, err = tx.ExecContext(ctx, query, args...)
} else {
_, err = r.db.ExecContext(ctx, query, args...)
}
if err != nil {
r.logger.ErrorContext(ctx, "failed to save operation",
"opID", op.OpID,
"error", err,
)
return fmt.Errorf("failed to save operation: %w", err)
}
r.logger.DebugContext(ctx, "operation saved to database",
"opID", op.OpID,
"status", status,
)
return nil
}
func (r *operationRepository) UpdateStatus(ctx context.Context, opID string, status TrustlogStatus) error {
return r.UpdateStatusTx(ctx, nil, opID, status)
}
func (r *operationRepository) UpdateStatusTx(ctx context.Context, tx *sql.Tx, opID string, status TrustlogStatus) error {
query := `UPDATE operation SET trustlog_status = ? WHERE op_id = ?`
var err error
if tx != nil {
_, err = tx.ExecContext(ctx, query, string(status), opID)
} else {
_, err = r.db.ExecContext(ctx, query, string(status), opID)
}
if err != nil {
r.logger.ErrorContext(ctx, "failed to update operation status",
"opID", opID,
"status", status,
"error", err,
)
return fmt.Errorf("failed to update operation status: %w", err)
}
r.logger.DebugContext(ctx, "operation status updated",
"opID", opID,
"status", status,
)
return nil
}
func (r *operationRepository) FindByID(ctx context.Context, opID string) (*model.Operation, TrustlogStatus, error) {
query := `
SELECT
op_id, op_actor, doid, producer_id,
request_body_hash, response_body_hash,
op_source, op_type, do_prefix, do_repository,
client_ip, server_ip, trustlog_status, timestamp
FROM operation
WHERE op_id = ?
`
var op model.Operation
var statusStr string
var reqHash, respHash, clientIP, serverIP sql.NullString
err := r.db.QueryRowContext(ctx, query, opID).Scan(
&op.OpID,
&op.OpActor,
&op.Doid,
&op.ProducerID,
&reqHash,
&respHash,
&op.OpSource,
&op.OpType,
&op.DoPrefix,
&op.DoRepository,
&clientIP,
&serverIP,
&statusStr,
&op.Timestamp,
)
if err == sql.ErrNoRows {
return nil, "", fmt.Errorf("operation not found: %s", opID)
}
if err != nil {
r.logger.ErrorContext(ctx, "failed to find operation",
"opID", opID,
"error", err,
)
return nil, "", fmt.Errorf("failed to find operation: %w", err)
}
if reqHash.Valid {
op.RequestBodyHash = &reqHash.String
}
if respHash.Valid {
op.ResponseBodyHash = &respHash.String
}
if clientIP.Valid {
op.ClientIP = &clientIP.String
}
if serverIP.Valid {
op.ServerIP = &serverIP.String
}
return &op, TrustlogStatus(statusStr), nil
}
func (r *operationRepository) FindUntrustlogged(ctx context.Context, limit int) ([]*model.Operation, error) {
query := `
SELECT
op_id, op_actor, doid, producer_id,
request_body_hash, response_body_hash,
op_source, op_type, do_prefix, do_repository,
client_ip, server_ip, timestamp
FROM operation
WHERE trustlog_status = ?
ORDER BY timestamp ASC
LIMIT ?
`
rows, err := r.db.QueryContext(ctx, query, string(StatusNotTrustlogged), limit)
if err != nil {
r.logger.ErrorContext(ctx, "failed to find untrustlogged operations",
"error", err,
)
return nil, fmt.Errorf("failed to find untrustlogged operations: %w", err)
}
defer rows.Close()
var operations []*model.Operation
for rows.Next() {
var op model.Operation
var reqHash, respHash, clientIP, serverIP sql.NullString
err := rows.Scan(
&op.OpID,
&op.OpActor,
&op.Doid,
&op.ProducerID,
&reqHash,
&respHash,
&op.OpSource,
&op.OpType,
&op.DoPrefix,
&op.DoRepository,
&clientIP,
&serverIP,
&op.Timestamp,
)
if err != nil {
r.logger.ErrorContext(ctx, "failed to scan operation row",
"error", err,
)
return nil, fmt.Errorf("failed to scan operation row: %w", err)
}
if reqHash.Valid {
op.RequestBodyHash = &reqHash.String
}
if respHash.Valid {
op.ResponseBodyHash = &respHash.String
}
if clientIP.Valid {
op.ClientIP = &clientIP.String
}
if serverIP.Valid {
op.ServerIP = &serverIP.String
}
operations = append(operations, &op)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating operation rows: %w", err)
}
return operations, nil
}
// cursorRepository 游标仓储实现
type cursorRepository struct {
db *sql.DB
logger logger.Logger
}
// NewCursorRepository 创建游标仓储
func NewCursorRepository(db *sql.DB, log logger.Logger) CursorRepository {
return &cursorRepository{
db: db,
logger: log,
}
}
// GetCursor 获取游标值Key-Value 模式)
func (r *cursorRepository) GetCursor(ctx context.Context, cursorKey string) (string, error) {
query := `SELECT cursor_value FROM trustlog_cursor WHERE cursor_key = ?`
var cursorValue string
err := r.db.QueryRowContext(ctx, query, cursorKey).Scan(&cursorValue)
if err == sql.ErrNoRows {
r.logger.DebugContext(ctx, "cursor not found",
"cursorKey", cursorKey,
)
return "", nil
}
if err != nil {
r.logger.ErrorContext(ctx, "failed to get cursor",
"cursorKey", cursorKey,
"error", err,
)
return "", fmt.Errorf("failed to get cursor: %w", err)
}
return cursorValue, nil
}
// UpdateCursor 更新游标值
func (r *cursorRepository) UpdateCursor(ctx context.Context, cursorKey string, cursorValue string) error {
return r.UpdateCursorTx(ctx, nil, cursorKey, cursorValue)
}
// UpdateCursorTx 在事务中更新游标值(使用 UPSERT
func (r *cursorRepository) UpdateCursorTx(ctx context.Context, tx *sql.Tx, cursorKey string, cursorValue string) error {
// 使用 UPSERT 语法(适配不同数据库)
query := `
INSERT INTO trustlog_cursor (cursor_key, cursor_value, last_updated_at)
VALUES (?, ?, ?)
ON CONFLICT (cursor_key) DO UPDATE SET
cursor_value = excluded.cursor_value,
last_updated_at = excluded.last_updated_at
`
var err error
now := time.Now()
if tx != nil {
_, err = tx.ExecContext(ctx, query, cursorKey, cursorValue, now)
} else {
_, err = r.db.ExecContext(ctx, query, cursorKey, cursorValue, now)
}
if err != nil {
r.logger.ErrorContext(ctx, "failed to update cursor",
"cursorKey", cursorKey,
"error", err,
)
return fmt.Errorf("failed to update cursor: %w", err)
}
r.logger.DebugContext(ctx, "cursor updated",
"cursorKey", cursorKey,
"cursorValue", cursorValue,
)
return nil
}
// InitCursor 初始化游标(如果不存在)
func (r *cursorRepository) InitCursor(ctx context.Context, cursorKey string, initialValue string) error {
query := `
INSERT INTO trustlog_cursor (cursor_key, cursor_value, last_updated_at)
VALUES (?, ?, ?)
ON CONFLICT (cursor_key) DO NOTHING
`
_, err := r.db.ExecContext(ctx, query, cursorKey, initialValue, time.Now())
if err != nil {
r.logger.ErrorContext(ctx, "failed to init cursor",
"cursorKey", cursorKey,
"error", err,
)
return fmt.Errorf("failed to init cursor: %w", err)
}
r.logger.DebugContext(ctx, "cursor initialized",
"cursorKey", cursorKey,
"initialValue", initialValue,
)
return nil
}
// retryRepository 重试仓储实现
type retryRepository struct {
db *sql.DB
logger logger.Logger
}
// NewRetryRepository 创建重试仓储
func NewRetryRepository(db *sql.DB, log logger.Logger) RetryRepository {
return &retryRepository{
db: db,
logger: log,
}
}
func (r *retryRepository) AddRetry(ctx context.Context, opID string, errorMsg string, nextRetryAt time.Time) error {
return r.AddRetryTx(ctx, nil, opID, errorMsg, nextRetryAt)
}
func (r *retryRepository) AddRetryTx(ctx context.Context, tx *sql.Tx, opID string, errorMsg string, nextRetryAt time.Time) error {
query := `
INSERT INTO trustlog_retry (op_id, retry_count, retry_status, error_message, next_retry_at, updated_at)
VALUES (?, 0, ?, ?, ?, ?)
`
var err error
if tx != nil {
_, err = tx.ExecContext(ctx, query, opID, string(RetryStatusPending), errorMsg, nextRetryAt, time.Now())
} else {
_, err = r.db.ExecContext(ctx, query, opID, string(RetryStatusPending), errorMsg, nextRetryAt, time.Now())
}
if err != nil {
r.logger.ErrorContext(ctx, "failed to add retry record",
"opID", opID,
"error", err,
)
return fmt.Errorf("failed to add retry record: %w", err)
}
r.logger.DebugContext(ctx, "retry record added",
"opID", opID,
"nextRetryAt", nextRetryAt,
)
return nil
}
func (r *retryRepository) IncrementRetry(ctx context.Context, opID string, errorMsg string, nextRetryAt time.Time) error {
query := `
UPDATE trustlog_retry
SET retry_count = retry_count + 1,
retry_status = ?,
last_retry_at = ?,
next_retry_at = ?,
error_message = ?,
updated_at = ?
WHERE op_id = ?
`
_, err := r.db.ExecContext(ctx, query,
string(RetryStatusRetrying),
time.Now(),
nextRetryAt,
errorMsg,
time.Now(),
opID,
)
if err != nil {
r.logger.ErrorContext(ctx, "failed to increment retry",
"opID", opID,
"error", err,
)
return fmt.Errorf("failed to increment retry: %w", err)
}
r.logger.DebugContext(ctx, "retry incremented",
"opID", opID,
"nextRetryAt", nextRetryAt,
)
return nil
}
func (r *retryRepository) MarkAsDeadLetter(ctx context.Context, opID string, errorMsg string) error {
query := `
UPDATE trustlog_retry
SET retry_status = ?,
error_message = ?,
updated_at = ?
WHERE op_id = ?
`
_, err := r.db.ExecContext(ctx, query,
string(RetryStatusDeadLetter),
errorMsg,
time.Now(),
opID,
)
if err != nil {
r.logger.ErrorContext(ctx, "failed to mark as dead letter",
"opID", opID,
"error", err,
)
return fmt.Errorf("failed to mark as dead letter: %w", err)
}
r.logger.WarnContext(ctx, "operation marked as dead letter",
"opID", opID,
"error", errorMsg,
)
return nil
}
func (r *retryRepository) FindPendingRetries(ctx context.Context, limit int) ([]RetryRecord, error) {
query := `
SELECT
op_id, retry_count, retry_status,
last_retry_at, next_retry_at, error_message,
created_at, updated_at
FROM trustlog_retry
WHERE retry_status IN (?, ?) AND next_retry_at <= ?
ORDER BY next_retry_at ASC
LIMIT ?
`
rows, err := r.db.QueryContext(ctx, query,
string(RetryStatusPending),
string(RetryStatusRetrying),
time.Now(),
limit,
)
if err != nil {
r.logger.ErrorContext(ctx, "failed to find pending retries",
"error", err,
)
return nil, fmt.Errorf("failed to find pending retries: %w", err)
}
defer rows.Close()
var records []RetryRecord
for rows.Next() {
var record RetryRecord
var lastRetry, nextRetry sql.NullTime
err := rows.Scan(
&record.OpID,
&record.RetryCount,
&record.RetryStatus,
&lastRetry,
&nextRetry,
&record.ErrorMessage,
&record.CreatedAt,
&record.UpdatedAt,
)
if err != nil {
r.logger.ErrorContext(ctx, "failed to scan retry record",
"error", err,
)
return nil, fmt.Errorf("failed to scan retry record: %w", err)
}
if lastRetry.Valid {
record.LastRetryAt = &lastRetry.Time
}
if nextRetry.Valid {
record.NextRetryAt = &nextRetry.Time
}
records = append(records, record)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating retry records: %w", err)
}
return records, nil
}
func (r *retryRepository) DeleteRetry(ctx context.Context, opID string) error {
query := `DELETE FROM trustlog_retry WHERE op_id = ?`
_, err := r.db.ExecContext(ctx, query, opID)
if err != nil {
r.logger.ErrorContext(ctx, "failed to delete retry record",
"opID", opID,
"error", err,
)
return fmt.Errorf("failed to delete retry record: %w", err)
}
r.logger.DebugContext(ctx, "retry record deleted",
"opID", opID,
)
return nil
}