- go.yandata.net/iod/iod/go-trustlog → go.yandata.net/wangsiyuan/go-trustlog - 更新 go.mod module声明 - 更新 README.md 安装说明 - 批量更新所有 .go 文件中的 import 路径 - 61个文件受影响 这样go-trustlog可以作为独立SDK使用
213 lines
5.7 KiB
Go
213 lines
5.7 KiB
Go
package persistence
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"fmt"
|
||
|
||
"go.yandata.net/wangsiyuan/go-trustlog/api/logger"
|
||
"go.yandata.net/wangsiyuan/go-trustlog/api/model"
|
||
)
|
||
|
||
// PersistenceStrategy 存证策略枚举
|
||
type PersistenceStrategy int
|
||
|
||
const (
|
||
// StrategyDBOnly 仅落库,不存证
|
||
StrategyDBOnly PersistenceStrategy = iota
|
||
// StrategyDBAndTrustlog 既落库又存证(保证最终一致性)
|
||
StrategyDBAndTrustlog
|
||
// StrategyTrustlogOnly 仅存证,不落库
|
||
StrategyTrustlogOnly
|
||
)
|
||
|
||
// String 返回策略名称
|
||
func (s PersistenceStrategy) String() string {
|
||
switch s {
|
||
case StrategyDBOnly:
|
||
return "DB_ONLY"
|
||
case StrategyDBAndTrustlog:
|
||
return "DB_AND_TRUSTLOG"
|
||
case StrategyTrustlogOnly:
|
||
return "TRUSTLOG_ONLY"
|
||
default:
|
||
return "UNKNOWN"
|
||
}
|
||
}
|
||
|
||
// PersistenceConfig 持久化配置
|
||
type PersistenceConfig struct {
|
||
// Strategy 存证策略
|
||
Strategy PersistenceStrategy
|
||
// EnableRetry 是否启用重试机制(仅对 StrategyDBAndTrustlog 有效)
|
||
EnableRetry bool
|
||
// MaxRetryCount 最大重试次数
|
||
MaxRetryCount int
|
||
// RetryBatchSize 每批重试的记录数
|
||
RetryBatchSize int
|
||
}
|
||
|
||
// DefaultPersistenceConfig 返回默认配置
|
||
func DefaultPersistenceConfig(strategy PersistenceStrategy) PersistenceConfig {
|
||
return PersistenceConfig{
|
||
Strategy: strategy,
|
||
EnableRetry: true,
|
||
MaxRetryCount: 5,
|
||
RetryBatchSize: 100,
|
||
}
|
||
}
|
||
|
||
// OperationPublisher 操作发布器接口
|
||
type OperationPublisher interface {
|
||
Publish(ctx context.Context, op *model.Operation) error
|
||
}
|
||
|
||
// PersistenceManager 持久化管理器
|
||
type PersistenceManager struct {
|
||
db *sql.DB
|
||
config PersistenceConfig
|
||
opRepo OperationRepository
|
||
cursorRepo CursorRepository
|
||
retryRepo RetryRepository
|
||
logger logger.Logger
|
||
publisher OperationPublisher
|
||
}
|
||
|
||
// NewPersistenceManager 创建持久化管理器
|
||
func NewPersistenceManager(
|
||
db *sql.DB,
|
||
config PersistenceConfig,
|
||
log logger.Logger,
|
||
) *PersistenceManager {
|
||
return &PersistenceManager{
|
||
db: db,
|
||
config: config,
|
||
opRepo: NewOperationRepository(db, log),
|
||
cursorRepo: NewCursorRepository(db, log),
|
||
retryRepo: NewRetryRepository(db, log),
|
||
logger: log,
|
||
}
|
||
}
|
||
|
||
// InitSchema 初始化数据库表结构
|
||
func (m *PersistenceManager) InitSchema(ctx context.Context, driverName string) error {
|
||
m.logger.InfoContext(ctx, "initializing database schema",
|
||
"driver", driverName,
|
||
)
|
||
|
||
opDDL, cursorDDL, retryDDL, err := GetDialectDDL(driverName)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to get DDL for driver %s: %w", driverName, err)
|
||
}
|
||
|
||
// 执行 operation 表 DDL
|
||
if _, err := m.db.ExecContext(ctx, opDDL); err != nil {
|
||
return fmt.Errorf("failed to create operation table: %w", err)
|
||
}
|
||
|
||
// 执行 cursor 表 DDL
|
||
if _, err := m.db.ExecContext(ctx, cursorDDL); err != nil {
|
||
return fmt.Errorf("failed to create cursor table: %w", err)
|
||
}
|
||
|
||
// 执行 retry 表 DDL
|
||
if _, err := m.db.ExecContext(ctx, retryDDL); err != nil {
|
||
return fmt.Errorf("failed to create retry table: %w", err)
|
||
}
|
||
|
||
m.logger.InfoContext(ctx, "database schema initialized successfully")
|
||
return nil
|
||
}
|
||
|
||
// SaveOperation 根据策略保存操作
|
||
func (m *PersistenceManager) SaveOperation(ctx context.Context, op *model.Operation) error {
|
||
switch m.config.Strategy {
|
||
case StrategyDBOnly:
|
||
return m.saveDBOnly(ctx, op)
|
||
case StrategyDBAndTrustlog:
|
||
return m.saveDBAndTrustlog(ctx, op)
|
||
case StrategyTrustlogOnly:
|
||
// 仅存证不落库,无需处理
|
||
return nil
|
||
default:
|
||
return fmt.Errorf("unknown persistence strategy: %d", m.config.Strategy)
|
||
}
|
||
}
|
||
|
||
// saveDBOnly 仅落库策略
|
||
func (m *PersistenceManager) saveDBOnly(ctx context.Context, op *model.Operation) error {
|
||
m.logger.DebugContext(ctx, "saving operation with DB_ONLY strategy",
|
||
"opID", op.OpID,
|
||
)
|
||
|
||
// 直接保存到数据库,状态为已存证(因为不需要实际存证)
|
||
if err := m.opRepo.Save(ctx, op, StatusTrustlogged); err != nil {
|
||
return fmt.Errorf("failed to save operation (DB_ONLY): %w", err)
|
||
}
|
||
|
||
m.logger.InfoContext(ctx, "operation saved with DB_ONLY strategy",
|
||
"opID", op.OpID,
|
||
)
|
||
return nil
|
||
}
|
||
|
||
// saveDBAndTrustlog 既落库又存证策略(Cursor + Retry 异步模式)
|
||
// 流程:
|
||
// 1. 仅落库(状态:NOT_TRUSTLOGGED)
|
||
// 2. 由 CursorWorker 定期扫描并异步存证
|
||
// 3. 失败记录由 RetryWorker 重试
|
||
func (m *PersistenceManager) saveDBAndTrustlog(ctx context.Context, op *model.Operation) error {
|
||
m.logger.DebugContext(ctx, "saving operation with DB_AND_TRUSTLOG strategy",
|
||
"opID", op.OpID,
|
||
)
|
||
|
||
// 只落库,状态为未存证
|
||
// CursorWorker 会定期扫描并异步存证
|
||
if err := m.opRepo.Save(ctx, op, StatusNotTrustlogged); err != nil {
|
||
return fmt.Errorf("failed to save operation (DB_AND_TRUSTLOG): %w", err)
|
||
}
|
||
|
||
m.logger.InfoContext(ctx, "operation saved with DB_AND_TRUSTLOG strategy",
|
||
"opID", op.OpID,
|
||
"status", StatusNotTrustlogged,
|
||
"note", "will be discovered and trustlogged by CursorWorker",
|
||
)
|
||
return nil
|
||
}
|
||
|
||
// GetOperationRepo 获取操作仓储
|
||
func (m *PersistenceManager) GetOperationRepo() OperationRepository {
|
||
return m.opRepo
|
||
}
|
||
|
||
// GetCursorRepo 获取游标仓储
|
||
func (m *PersistenceManager) GetCursorRepo() CursorRepository {
|
||
return m.cursorRepo
|
||
}
|
||
|
||
// GetRetryRepo 获取重试仓储
|
||
func (m *PersistenceManager) GetRetryRepo() RetryRepository {
|
||
return m.retryRepo
|
||
}
|
||
|
||
// GetDB 获取数据库连接
|
||
func (m *PersistenceManager) GetDB() *sql.DB {
|
||
return m.db
|
||
}
|
||
|
||
// Close 关闭数据库连接
|
||
func (m *PersistenceManager) Close() error {
|
||
m.logger.Info("closing database connection")
|
||
return m.db.Close()
|
||
}
|
||
|
||
// SetPublisher 设置Publisher(供CursorWorker使用)
|
||
func (m *PersistenceManager) SetPublisher(publisher OperationPublisher) {
|
||
m.publisher = publisher
|
||
}
|
||
|
||
// GetPublisher 获取Publisher
|
||
func (m *PersistenceManager) GetPublisher() OperationPublisher {
|
||
return m.publisher
|
||
}
|