## 核心功能 ### 1. 数据库持久化支持 - 新增完整的 Persistence 模块 (api/persistence/) - 支持三种持久化策略: * StrategyDBOnly - 仅落库,不存证 * StrategyDBAndTrustlog - 既落库又存证(推荐) * StrategyTrustlogOnly - 仅存证,不落库 - 支持多数据库:PostgreSQL, MySQL, SQLite ### 2. Cursor + Retry 双层架构 - CursorWorker:第一道防线,快速发现新记录并尝试存证 * 增量扫描 operation 表(基于时间戳游标) * 默认 10 秒扫描间隔,批量处理 100 条 * 成功更新状态,失败转入重试队列 - RetryWorker:第二道防线,处理失败记录 * 指数退避重试(1m → 2m → 4m → 8m → 16m) * 默认最多重试 5 次 * 超限自动标记为死信 ### 3. 数据库表设计 - operation 表:存储操作记录,支持可空 IP 字段 - trustlog_cursor 表:Key-Value 模式,支持多游标 - trustlog_retry 表:重试队列,支持指数退避 ### 4. 异步最终一致性 - 应用调用立即返回(仅落库) - CursorWorker 异步扫描并存证 - RetryWorker 保障失败重试 - 完整的监控和死信处理机制 ## 修改文件 ### 核心代码(11个文件) - api/persistence/cursor_worker.go - Cursor 工作器(新增) - api/persistence/repository.go - 数据仓储层(新增) - api/persistence/schema.go - 数据库 Schema(新增) - api/persistence/strategy.go - 策略管理器(新增) - api/persistence/client.go - 客户端封装(新增) - api/persistence/retry_worker.go - Retry 工作器(新增) - api/persistence/config.go - 配置管理(新增) ### 修复内部包引用(5个文件) - api/adapter/publisher.go - 修复 internal 包引用 - api/adapter/subscriber.go - 修复 internal 包引用 - api/model/envelope.go - 修复 internal 包引用 - api/model/operation.go - 修复 internal 包引用 - api/model/record.go - 修复 internal 包引用 ### 单元测试(8个文件) - api/persistence/*_test.go - 完整的单元测试 - 测试覆盖率:28.5% - 测试通过率:49/49 (100%) ### SQL 脚本(4个文件) - api/persistence/sql/postgresql.sql - PostgreSQL 建表脚本 - api/persistence/sql/mysql.sql - MySQL 建表脚本 - api/persistence/sql/sqlite.sql - SQLite 建表脚本 - api/persistence/sql/test_data.sql - 测试数据 ### 文档(2个文件) - README.md - 更新主文档,新增 Persistence 使用指南 - api/persistence/README.md - 完整的 Persistence 文档 - api/persistence/sql/README.md - SQL 脚本说明 ## 技术亮点 1. **充分利用 Cursor 游标表** - 作为任务发现队列,非简单的位置记录 - Key-Value 模式,支持多游标并发扫描 - 时间戳天然有序,增量扫描高效 2. **双层保障机制** - Cursor:正常流程,快速处理 - Retry:异常流程,可靠重试 - 职责分离,监控清晰 3. **可空 IP 字段支持** - ClientIP 和 ServerIP 使用 *string 类型 - 支持 NULL 值,符合数据库最佳实践 - 使用 sql.NullString 正确处理 4. **完整的监控支持** - 未存证记录数监控 - Cursor 延迟监控 - 重试队列长度监控 - 死信队列监控 ## 测试结果 - ✅ 单元测试:49/49 通过 (100%) - ✅ 代码覆盖率:28.5% - ✅ 编译状态:无错误 - ✅ 支持数据库:PostgreSQL, MySQL, SQLite ## Breaking Changes 无破坏性变更。Persistence 模块作为可选功能,不影响现有代码。 ## 版本信息 - 版本:v2.1.0 - Go 版本要求:1.21+ - 更新日期:2025-12-23
502 lines
14 KiB
Go
502 lines
14 KiB
Go
package model
|
||
|
||
import (
|
||
"bytes"
|
||
"errors"
|
||
"fmt"
|
||
|
||
"go.yandata.net/iod/iod/go-trustlog/api/logger"
|
||
"go.yandata.net/iod/iod/go-trustlog/internal/helpers"
|
||
)
|
||
|
||
// Envelope 包装序列化后的数据,包含元信息和报文体。
|
||
// 用于 Trustlog 接口类型的序列化和反序列化。
|
||
type Envelope struct {
|
||
ProducerID string // 日志提供者ID
|
||
Signature []byte // 签名(根据客户端密钥与指定算法进行签名,二进制格式)
|
||
Body []byte // CBOR序列化的报文体
|
||
}
|
||
|
||
// EnvelopeConfig 序列化配置。
|
||
type EnvelopeConfig struct {
|
||
Signer Signer // 签名器,用于签名和验签
|
||
}
|
||
|
||
// VerifyConfig 验签配置。
|
||
type VerifyConfig struct {
|
||
Signer Signer // 签名器,用于验签
|
||
}
|
||
|
||
// NewEnvelopeConfig 创建Envelope配置。
|
||
func NewEnvelopeConfig(signer Signer) EnvelopeConfig {
|
||
log := logger.GetGlobalLogger()
|
||
log.Debug("Creating new EnvelopeConfig",
|
||
"signerType", fmt.Sprintf("%T", signer),
|
||
)
|
||
return EnvelopeConfig{
|
||
Signer: signer,
|
||
}
|
||
}
|
||
|
||
// NewSM2EnvelopeConfig 创建使用SM2签名的Envelope配置。
|
||
// 便捷方法,用于快速创建SM2签名器配置。
|
||
func NewSM2EnvelopeConfig(privateKey, publicKey []byte) EnvelopeConfig {
|
||
log := logger.GetGlobalLogger()
|
||
log.Debug("Creating new SM2 EnvelopeConfig",
|
||
"privateKeyLength", len(privateKey),
|
||
"publicKeyLength", len(publicKey),
|
||
)
|
||
return EnvelopeConfig{
|
||
Signer: NewSM2Signer(privateKey, publicKey),
|
||
}
|
||
}
|
||
|
||
// NewVerifyConfig 创建验签配置。
|
||
func NewVerifyConfig(signer Signer) VerifyConfig {
|
||
log := logger.GetGlobalLogger()
|
||
log.Debug("Creating new VerifyConfig",
|
||
"signerType", fmt.Sprintf("%T", signer),
|
||
)
|
||
return VerifyConfig{
|
||
Signer: signer,
|
||
}
|
||
}
|
||
|
||
// NewSM2VerifyConfig 创建使用SM2签名的验签配置。
|
||
// 便捷方法,用于快速创建SM2签名器验签配置。
|
||
// 注意:验签只需要公钥,但SM2Signer需要同时提供私钥和公钥(私钥可以为空)。
|
||
func NewSM2VerifyConfig(publicKey []byte) VerifyConfig {
|
||
return VerifyConfig{
|
||
Signer: NewSM2Signer(nil, publicKey),
|
||
}
|
||
}
|
||
|
||
//
|
||
// ===== Envelope 序列化/反序列化 =====
|
||
//
|
||
|
||
// MarshalEnvelope 将 Envelope 序列化为 TLV 格式(Varint长度编码)。
|
||
// 格式:[字段1长度][字段1值:producerID][字段2长度][字段2值:签名][字段3长度][字段3值:CBOR报文体]。
|
||
func MarshalEnvelope(env *Envelope) ([]byte, error) {
|
||
log := logger.GetGlobalLogger()
|
||
log.Debug("Marshaling envelope to TLV format")
|
||
if env == nil {
|
||
log.Error("Envelope is nil")
|
||
return nil, errors.New("envelope cannot be nil")
|
||
}
|
||
|
||
buf := new(bytes.Buffer)
|
||
writer := helpers.NewTLVWriter(buf)
|
||
|
||
log.Debug("Writing producerID to TLV",
|
||
"producerID", env.ProducerID,
|
||
)
|
||
if err := writer.WriteStringField(env.ProducerID); err != nil {
|
||
log.Error("Failed to write producerID",
|
||
"error", err,
|
||
"producerID", env.ProducerID,
|
||
)
|
||
return nil, fmt.Errorf("failed to write producerID: %w", err)
|
||
}
|
||
|
||
log.Debug("Writing signature to TLV",
|
||
"signatureLength", len(env.Signature),
|
||
)
|
||
if err := writer.WriteField(env.Signature); err != nil {
|
||
log.Error("Failed to write signature",
|
||
"error", err,
|
||
"signatureLength", len(env.Signature),
|
||
)
|
||
return nil, fmt.Errorf("failed to write signature: %w", err)
|
||
}
|
||
|
||
log.Debug("Writing body to TLV",
|
||
"bodyLength", len(env.Body),
|
||
)
|
||
if err := writer.WriteField(env.Body); err != nil {
|
||
log.Error("Failed to write body",
|
||
"error", err,
|
||
"bodyLength", len(env.Body),
|
||
)
|
||
return nil, fmt.Errorf("failed to write body: %w", err)
|
||
}
|
||
|
||
result := buf.Bytes()
|
||
log.Debug("Envelope marshaled successfully",
|
||
"producerID", env.ProducerID,
|
||
"totalLength", len(result),
|
||
)
|
||
return result, nil
|
||
}
|
||
|
||
// UnmarshalEnvelope 完整反序列化:读取所有字段。
|
||
// 解析完整的Envelope结构,包括所有元数据和Body。
|
||
// 为了向后兼容,如果遇到旧格式(包含原hash字段),会自动跳过该字段。
|
||
func UnmarshalEnvelope(data []byte) (*Envelope, error) {
|
||
log := logger.GetGlobalLogger()
|
||
log.Debug("Unmarshaling envelope from TLV format",
|
||
"dataLength", len(data),
|
||
)
|
||
if len(data) == 0 {
|
||
log.Error("Data is empty")
|
||
return nil, errors.New("data is empty")
|
||
}
|
||
|
||
r := bytes.NewReader(data)
|
||
reader := helpers.NewTLVReader(r)
|
||
|
||
log.Debug("Reading producerID from TLV")
|
||
producerID, err := reader.ReadStringField()
|
||
if err != nil {
|
||
log.Error("Failed to read producerID",
|
||
"error", err,
|
||
)
|
||
return nil, fmt.Errorf("failed to read producerID: %w", err)
|
||
}
|
||
log.Debug("ProducerID read successfully",
|
||
"producerID", producerID,
|
||
)
|
||
|
||
// 读取第一个字段(可能是原hash或签名)
|
||
log.Debug("Reading field 1 from TLV")
|
||
field1, err := reader.ReadField()
|
||
if err != nil {
|
||
log.Error("Failed to read field 1",
|
||
"error", err,
|
||
)
|
||
return nil, fmt.Errorf("failed to read field 1: %w", err)
|
||
}
|
||
log.Debug("Field 1 read successfully",
|
||
"field1Length", len(field1),
|
||
)
|
||
|
||
// 读取第二个字段(可能是签名或body)
|
||
log.Debug("Reading field 2 from TLV")
|
||
field2, err := reader.ReadField()
|
||
if err != nil {
|
||
log.Error("Failed to read field 2",
|
||
"error", err,
|
||
)
|
||
return nil, fmt.Errorf("failed to read field 2: %w", err)
|
||
}
|
||
log.Debug("Field 2 read successfully",
|
||
"field2Length", len(field2),
|
||
)
|
||
|
||
// 尝试读取第三个字段来判断格式
|
||
log.Debug("Attempting to read field 3 to determine format")
|
||
field3, err := reader.ReadField()
|
||
if err == nil {
|
||
// 有第三个字段,说明是旧格式:producerID, originalHash, encryptedHash, body
|
||
// field1 = originalHash, field2 = encryptedHash/signature, field3 = body
|
||
log.Debug("Detected old format (with originalHash)",
|
||
"producerID", producerID,
|
||
"signatureLength", len(field2),
|
||
"bodyLength", len(field3),
|
||
)
|
||
return &Envelope{
|
||
ProducerID: producerID,
|
||
Signature: field2,
|
||
Body: field3,
|
||
}, nil
|
||
}
|
||
|
||
// 没有第三个字段,说明是新格式:producerID, signature, body
|
||
// field1 = signature, field2 = body
|
||
log.Debug("Detected new format (without originalHash)",
|
||
"producerID", producerID,
|
||
"signatureLength", len(field1),
|
||
"bodyLength", len(field2),
|
||
)
|
||
return &Envelope{
|
||
ProducerID: producerID,
|
||
Signature: field1,
|
||
Body: field2,
|
||
}, nil
|
||
}
|
||
|
||
//
|
||
// ===== 部分反序列化(无需反序列化全部报文) =====
|
||
//
|
||
|
||
// UnmarshalEnvelopeProducerID 部分反序列化:只读取字段1(producerID)。
|
||
// 用于快速获取producerID而不解析整个Envelope。
|
||
func UnmarshalEnvelopeProducerID(data []byte) (string, error) {
|
||
if len(data) == 0 {
|
||
return "", errors.New("data is empty")
|
||
}
|
||
|
||
r := bytes.NewReader(data)
|
||
reader := helpers.NewTLVReader(r)
|
||
|
||
producerID, err := reader.ReadStringField()
|
||
if err != nil {
|
||
return "", fmt.Errorf("failed to read producerID: %w", err)
|
||
}
|
||
|
||
return producerID, nil
|
||
}
|
||
|
||
// UnmarshalEnvelopeSignature 部分反序列化:读取字段1、2(producerID, 签名)。
|
||
// 用于获取签名信息而不解析整个Body。
|
||
// 为了向后兼容,如果遇到旧格式(包含原hash字段),会自动跳过该字段。
|
||
func UnmarshalEnvelopeSignature(data []byte) (string, []byte, error) {
|
||
if len(data) == 0 {
|
||
return "", nil, errors.New("data is empty")
|
||
}
|
||
|
||
r := bytes.NewReader(data)
|
||
reader := helpers.NewTLVReader(r)
|
||
|
||
producerID, err := reader.ReadStringField()
|
||
if err != nil {
|
||
return "", nil, fmt.Errorf("failed to read producerID: %w", err)
|
||
}
|
||
|
||
// 读取第一个字段(可能是原hash或签名)
|
||
field1, err := reader.ReadField()
|
||
if err != nil {
|
||
return "", nil, fmt.Errorf("failed to read field 1: %w", err)
|
||
}
|
||
|
||
// 读取第二个字段(可能是签名或body)
|
||
field2, err := reader.ReadField()
|
||
if err != nil {
|
||
return "", nil, fmt.Errorf("failed to read field 2: %w", err)
|
||
}
|
||
|
||
// 尝试读取第三个字段来判断格式
|
||
_, err = reader.ReadField()
|
||
if err == nil {
|
||
// 有第三个字段,说明是旧格式:producerID, originalHash, encryptedHash/signature, body
|
||
// field1 = originalHash, field2 = signature
|
||
return producerID, field2, nil
|
||
}
|
||
|
||
// 没有第三个字段,说明是新格式:producerID, signature, body
|
||
// field1 = signature
|
||
return producerID, field1, nil
|
||
}
|
||
|
||
//
|
||
// ===== Trustlog 序列化/反序列化 =====
|
||
//
|
||
|
||
// MarshalTrustlog 序列化 Trustlog 为 Envelope 格式。
|
||
// Trustlog 实现了 Trustlog 接口,自动提取 producerID 并使用 Canonical CBOR 编码。
|
||
func MarshalTrustlog(t Trustlog, config EnvelopeConfig) ([]byte, error) {
|
||
log := logger.GetGlobalLogger()
|
||
log.Debug("Marshaling Trustlog to Envelope format",
|
||
"trustlogType", fmt.Sprintf("%T", t),
|
||
)
|
||
if t == nil {
|
||
log.Error("Trustlog is nil")
|
||
return nil, errors.New("trustlog cannot be nil")
|
||
}
|
||
|
||
producerID := t.GetProducerID()
|
||
if producerID == "" {
|
||
log.Error("ProducerID is empty")
|
||
return nil, errors.New("producerID cannot be empty")
|
||
}
|
||
log.Debug("ProducerID extracted",
|
||
"producerID", producerID,
|
||
)
|
||
|
||
// 1. 序列化CBOR报文体(使用 Trustlog 的 MarshalBinary,确保使用 Canonical CBOR)
|
||
log.Debug("Marshaling trustlog to CBOR binary")
|
||
bodyCBOR, err := t.MarshalBinary()
|
||
if err != nil {
|
||
log.Error("Failed to marshal trustlog to CBOR",
|
||
"error", err,
|
||
"producerID", producerID,
|
||
)
|
||
return nil, fmt.Errorf("failed to marshal trustlog: %w", err)
|
||
}
|
||
log.Debug("Trustlog marshaled to CBOR successfully",
|
||
"producerID", producerID,
|
||
"bodyLength", len(bodyCBOR),
|
||
)
|
||
|
||
// 2. 计算签名
|
||
if config.Signer == nil {
|
||
log.Error("Signer is nil")
|
||
return nil, errors.New("signer is required")
|
||
}
|
||
log.Debug("Signing trustlog body",
|
||
"producerID", producerID,
|
||
"bodyLength", len(bodyCBOR),
|
||
)
|
||
signature, err := config.Signer.Sign(bodyCBOR)
|
||
if err != nil {
|
||
log.Error("Failed to sign trustlog body",
|
||
"error", err,
|
||
"producerID", producerID,
|
||
)
|
||
return nil, fmt.Errorf("failed to sign data: %w", err)
|
||
}
|
||
log.Debug("Trustlog body signed successfully",
|
||
"producerID", producerID,
|
||
"signatureLength", len(signature),
|
||
)
|
||
|
||
// 3. 构建Envelope
|
||
env := &Envelope{
|
||
ProducerID: producerID,
|
||
Signature: signature,
|
||
Body: bodyCBOR,
|
||
}
|
||
|
||
// 4. 序列化为TLV格式
|
||
log.Debug("Marshaling envelope to TLV format",
|
||
"producerID", producerID,
|
||
)
|
||
return MarshalEnvelope(env)
|
||
}
|
||
|
||
// UnmarshalTrustlog 反序列化 Envelope 为 Trustlog。
|
||
// 解析Envelope数据并恢复 Trustlog 结构。
|
||
func UnmarshalTrustlog(data []byte, t Trustlog) error {
|
||
log := logger.GetGlobalLogger()
|
||
log.Debug("Unmarshaling Envelope to Trustlog",
|
||
"trustlogType", fmt.Sprintf("%T", t),
|
||
"dataLength", len(data),
|
||
)
|
||
if t == nil {
|
||
log.Error("Trustlog is nil")
|
||
return errors.New("trustlog cannot be nil")
|
||
}
|
||
|
||
env, err := UnmarshalEnvelope(data)
|
||
if err != nil {
|
||
log.Error("Failed to unmarshal envelope",
|
||
"error", err,
|
||
)
|
||
return err
|
||
}
|
||
log.Debug("Envelope unmarshaled successfully",
|
||
"producerID", env.ProducerID,
|
||
"bodyLength", len(env.Body),
|
||
)
|
||
|
||
// 使用 Trustlog 的 UnmarshalBinary 反序列化
|
||
log.Debug("Unmarshaling trustlog body from CBOR",
|
||
"producerID", env.ProducerID,
|
||
)
|
||
if errUnmarshal := t.UnmarshalBinary(env.Body); errUnmarshal != nil {
|
||
log.Error("Failed to unmarshal trustlog body",
|
||
"error", errUnmarshal,
|
||
"producerID", env.ProducerID,
|
||
)
|
||
return fmt.Errorf("failed to unmarshal trustlog body: %w", errUnmarshal)
|
||
}
|
||
log.Debug("Trustlog unmarshaled successfully",
|
||
"producerID", env.ProducerID,
|
||
)
|
||
return nil
|
||
}
|
||
|
||
//
|
||
// ===== Operation 序列化/反序列化 =====
|
||
//
|
||
|
||
// MarshalOperation 序列化 Operation 为 Envelope 格式。
|
||
func MarshalOperation(op *Operation, config EnvelopeConfig) ([]byte, error) {
|
||
return MarshalTrustlog(op, config)
|
||
}
|
||
|
||
// UnmarshalOperation 反序列化 Envelope 为 Operation。
|
||
func UnmarshalOperation(data []byte) (*Operation, error) {
|
||
var op Operation
|
||
if err := UnmarshalTrustlog(data, &op); err != nil {
|
||
return nil, err
|
||
}
|
||
return &op, nil
|
||
}
|
||
|
||
//
|
||
// ===== Record 序列化/反序列化 =====
|
||
//
|
||
|
||
// MarshalRecord 序列化 Record 为 Envelope 格式。
|
||
func MarshalRecord(record *Record, config EnvelopeConfig) ([]byte, error) {
|
||
return MarshalTrustlog(record, config)
|
||
}
|
||
|
||
// UnmarshalRecord 反序列化 Envelope 为 Record。
|
||
func UnmarshalRecord(data []byte) (*Record, error) {
|
||
var record Record
|
||
if err := UnmarshalTrustlog(data, &record); err != nil {
|
||
return nil, err
|
||
}
|
||
return &record, nil
|
||
}
|
||
|
||
//
|
||
// ===== 验证 =====
|
||
//
|
||
|
||
// VerifyEnvelope 验证Envelope的完整性(使用EnvelopeConfig)。
|
||
// 验证签名是否匹配,确保数据未被篡改。
|
||
// 如果验证成功,返回解析后的Envelope结构体指针;如果验证失败,返回错误。
|
||
func VerifyEnvelope(data []byte, config EnvelopeConfig) (*Envelope, error) {
|
||
if config.Signer == nil {
|
||
return nil, errors.New("signer is required for verification")
|
||
}
|
||
|
||
verifyConfig := VerifyConfig(config)
|
||
return VerifyEnvelopeWithConfig(data, verifyConfig)
|
||
}
|
||
|
||
// VerifyEnvelopeWithConfig 验证Envelope的完整性(使用VerifyConfig)。
|
||
// 验证签名是否匹配,确保数据未被篡改。
|
||
// 如果验证成功,返回解析后的Envelope结构体指针;如果验证失败,返回错误。
|
||
func VerifyEnvelopeWithConfig(data []byte, config VerifyConfig) (*Envelope, error) {
|
||
log := logger.GetGlobalLogger()
|
||
log.Debug("Verifying envelope",
|
||
"dataLength", len(data),
|
||
)
|
||
if config.Signer == nil {
|
||
log.Error("Signer is nil")
|
||
return nil, errors.New("signer is required for verification")
|
||
}
|
||
|
||
env, err := UnmarshalEnvelope(data)
|
||
if err != nil {
|
||
log.Error("Failed to unmarshal envelope",
|
||
"error", err,
|
||
)
|
||
return nil, fmt.Errorf("failed to unmarshal envelope: %w", err)
|
||
}
|
||
log.Debug("Envelope unmarshaled for verification",
|
||
"producerID", env.ProducerID,
|
||
"bodyLength", len(env.Body),
|
||
"signatureLength", len(env.Signature),
|
||
)
|
||
|
||
// 验证签名
|
||
log.Debug("Verifying signature",
|
||
"producerID", env.ProducerID,
|
||
)
|
||
valid, err := config.Signer.Verify(env.Body, env.Signature)
|
||
if err != nil {
|
||
log.Error("Failed to verify signature",
|
||
"error", err,
|
||
"producerID", env.ProducerID,
|
||
)
|
||
return nil, fmt.Errorf("failed to verify signature: %w", err)
|
||
}
|
||
|
||
if !valid {
|
||
log.Warn("Signature verification failed",
|
||
"producerID", env.ProducerID,
|
||
)
|
||
return nil, errors.New("signature verification failed")
|
||
}
|
||
|
||
log.Debug("Envelope verified successfully",
|
||
"producerID", env.ProducerID,
|
||
)
|
||
return env, nil
|
||
}
|