Files
go-trustlog/api/persistence/strategy.go
ryan 0ec1d3b87d refactor: 更改module路径为独立仓库路径
- go.yandata.net/iod/iod/go-trustlog → go.yandata.net/wangsiyuan/go-trustlog
- 更新 go.mod module声明
- 更新 README.md 安装说明
- 批量更新所有 .go 文件中的 import 路径
- 61个文件受影响

这样go-trustlog可以作为独立SDK使用
2025-12-26 14:35:39 +08:00

213 lines
5.7 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package persistence
import (
"context"
"database/sql"
"fmt"
"go.yandata.net/wangsiyuan/go-trustlog/api/logger"
"go.yandata.net/wangsiyuan/go-trustlog/api/model"
)
// PersistenceStrategy 存证策略枚举
type PersistenceStrategy int
const (
// StrategyDBOnly 仅落库,不存证
StrategyDBOnly PersistenceStrategy = iota
// StrategyDBAndTrustlog 既落库又存证(保证最终一致性)
StrategyDBAndTrustlog
// StrategyTrustlogOnly 仅存证,不落库
StrategyTrustlogOnly
)
// String 返回策略名称
func (s PersistenceStrategy) String() string {
switch s {
case StrategyDBOnly:
return "DB_ONLY"
case StrategyDBAndTrustlog:
return "DB_AND_TRUSTLOG"
case StrategyTrustlogOnly:
return "TRUSTLOG_ONLY"
default:
return "UNKNOWN"
}
}
// PersistenceConfig 持久化配置
type PersistenceConfig struct {
// Strategy 存证策略
Strategy PersistenceStrategy
// EnableRetry 是否启用重试机制(仅对 StrategyDBAndTrustlog 有效)
EnableRetry bool
// MaxRetryCount 最大重试次数
MaxRetryCount int
// RetryBatchSize 每批重试的记录数
RetryBatchSize int
}
// DefaultPersistenceConfig 返回默认配置
func DefaultPersistenceConfig(strategy PersistenceStrategy) PersistenceConfig {
return PersistenceConfig{
Strategy: strategy,
EnableRetry: true,
MaxRetryCount: 5,
RetryBatchSize: 100,
}
}
// OperationPublisher 操作发布器接口
type OperationPublisher interface {
Publish(ctx context.Context, op *model.Operation) error
}
// PersistenceManager 持久化管理器
type PersistenceManager struct {
db *sql.DB
config PersistenceConfig
opRepo OperationRepository
cursorRepo CursorRepository
retryRepo RetryRepository
logger logger.Logger
publisher OperationPublisher
}
// NewPersistenceManager 创建持久化管理器
func NewPersistenceManager(
db *sql.DB,
config PersistenceConfig,
log logger.Logger,
) *PersistenceManager {
return &PersistenceManager{
db: db,
config: config,
opRepo: NewOperationRepository(db, log),
cursorRepo: NewCursorRepository(db, log),
retryRepo: NewRetryRepository(db, log),
logger: log,
}
}
// InitSchema 初始化数据库表结构
func (m *PersistenceManager) InitSchema(ctx context.Context, driverName string) error {
m.logger.InfoContext(ctx, "initializing database schema",
"driver", driverName,
)
opDDL, cursorDDL, retryDDL, err := GetDialectDDL(driverName)
if err != nil {
return fmt.Errorf("failed to get DDL for driver %s: %w", driverName, err)
}
// 执行 operation 表 DDL
if _, err := m.db.ExecContext(ctx, opDDL); err != nil {
return fmt.Errorf("failed to create operation table: %w", err)
}
// 执行 cursor 表 DDL
if _, err := m.db.ExecContext(ctx, cursorDDL); err != nil {
return fmt.Errorf("failed to create cursor table: %w", err)
}
// 执行 retry 表 DDL
if _, err := m.db.ExecContext(ctx, retryDDL); err != nil {
return fmt.Errorf("failed to create retry table: %w", err)
}
m.logger.InfoContext(ctx, "database schema initialized successfully")
return nil
}
// SaveOperation 根据策略保存操作
func (m *PersistenceManager) SaveOperation(ctx context.Context, op *model.Operation) error {
switch m.config.Strategy {
case StrategyDBOnly:
return m.saveDBOnly(ctx, op)
case StrategyDBAndTrustlog:
return m.saveDBAndTrustlog(ctx, op)
case StrategyTrustlogOnly:
// 仅存证不落库,无需处理
return nil
default:
return fmt.Errorf("unknown persistence strategy: %d", m.config.Strategy)
}
}
// saveDBOnly 仅落库策略
func (m *PersistenceManager) saveDBOnly(ctx context.Context, op *model.Operation) error {
m.logger.DebugContext(ctx, "saving operation with DB_ONLY strategy",
"opID", op.OpID,
)
// 直接保存到数据库,状态为已存证(因为不需要实际存证)
if err := m.opRepo.Save(ctx, op, StatusTrustlogged); err != nil {
return fmt.Errorf("failed to save operation (DB_ONLY): %w", err)
}
m.logger.InfoContext(ctx, "operation saved with DB_ONLY strategy",
"opID", op.OpID,
)
return nil
}
// saveDBAndTrustlog 既落库又存证策略Cursor + Retry 异步模式)
// 流程:
// 1. 仅落库状态NOT_TRUSTLOGGED
// 2. 由 CursorWorker 定期扫描并异步存证
// 3. 失败记录由 RetryWorker 重试
func (m *PersistenceManager) saveDBAndTrustlog(ctx context.Context, op *model.Operation) error {
m.logger.DebugContext(ctx, "saving operation with DB_AND_TRUSTLOG strategy",
"opID", op.OpID,
)
// 只落库,状态为未存证
// CursorWorker 会定期扫描并异步存证
if err := m.opRepo.Save(ctx, op, StatusNotTrustlogged); err != nil {
return fmt.Errorf("failed to save operation (DB_AND_TRUSTLOG): %w", err)
}
m.logger.InfoContext(ctx, "operation saved with DB_AND_TRUSTLOG strategy",
"opID", op.OpID,
"status", StatusNotTrustlogged,
"note", "will be discovered and trustlogged by CursorWorker",
)
return nil
}
// GetOperationRepo 获取操作仓储
func (m *PersistenceManager) GetOperationRepo() OperationRepository {
return m.opRepo
}
// GetCursorRepo 获取游标仓储
func (m *PersistenceManager) GetCursorRepo() CursorRepository {
return m.cursorRepo
}
// GetRetryRepo 获取重试仓储
func (m *PersistenceManager) GetRetryRepo() RetryRepository {
return m.retryRepo
}
// GetDB 获取数据库连接
func (m *PersistenceManager) GetDB() *sql.DB {
return m.db
}
// Close 关闭数据库连接
func (m *PersistenceManager) Close() error {
m.logger.Info("closing database connection")
return m.db.Close()
}
// SetPublisher 设置Publisher供CursorWorker使用
func (m *PersistenceManager) SetPublisher(publisher OperationPublisher) {
m.publisher = publisher
}
// GetPublisher 获取Publisher
func (m *PersistenceManager) GetPublisher() OperationPublisher {
return m.publisher
}