2025-12-24 17:22:50 +08:00

Go-Trustlog SDK

Go Version Test Status

本 SDK 提供基于 Watermill 抽象层的统一消息发送与接收能力,基于 gRPC 的操作查询和取证验证功能,以及完整的数据库持久化支持

核心特性

📦 双数据模型

  • Operation(操作记录):完整的业务操作,包含请求/响应体哈希,支持完整的取证验证
  • Record(简单记录):轻量级事件或日志记录,适合日志和事件追踪场景

💾 数据库持久化(新增)

  • 三种持久化策略:仅落库、既落库又存证、仅存证
  • Cursor + Retry 双层架构:异步最终一致性保障
  • 多数据库支持PostgreSQL、MySQL、SQLite
  • 可靠重试机制:指数退避 + 死信队列

🔄 消息发布

  • 直接发布:通过 Pulsar Publisher 发送到对应的 Topic
  • 事务性发布:使用 Watermill Forwarder 持久化到 SQL保证事务性

🔍 查询验证

  • 统一查询客户端:单一连接池同时支持 Operation 和 Record 查询
  • 流式验证:实时获取取证验证进度
  • 负载均衡:多服务器轮询分发

📋 目录


🚀 安装

1. 私有仓库配置(重要)

由于本 SDK 托管在私有仓库,需要配置 SSH 映射和禁用 Go Module 校验:

配置 Git SSH 映射(跳过 HTTPS 验证)

git config --global url."git@go.yandata.net:".insteadOf "https://go.yandata.net"

禁用 Go Module Sum 校验

go env -w GOPRIVATE="go.yandata.net"

2. 安装 SDK

go get go.yandata.net/iod/iod/go-trustlog

📦 核心概念

数据模型

SDK 提供两种数据模型,分别适用于不同的业务场景:

1. Operation操作记录

Operation 用于记录完整的业务操作,包含完整的元数据、请求/响应体哈希等信息,支持完整的取证验证流程。

适用场景

  • 记录 DOIP/IRP 协议的完整操作Create、Update、Delete、Retrieve 等)
  • 需要完整记录请求和响应的审计场景
  • 需要支持完整取证验证的操作记录

核心字段

  • Meta:操作元数据
    • OpID:操作唯一标识符(自动生成 UUID v7
    • Timestamp:操作时间戳(必填)
    • OpSource:操作来源(DOIPIRP
    • OpType:操作类型(如 "Create""Update""Delete" 等,字符串类型)
    • OpAlgorithm:哈希算法类型(默认 Sha256Simd
    • OpMetaHash:元数据哈希值(自动计算)
  • DataID:数据标识
    • DoPrefixDO 前缀(必填)
    • DoRepository:仓库名(必填)
    • Doid:完整 DOID必填格式{DoPrefix}/{DoRepository}/{object}
  • OpActor:操作发起者(默认 SYSTEM
  • RequestBodyHash:请求体哈希值(必填)
  • ResponseBodyHash:响应体哈希值(必填)
  • OpHash:操作整体哈希值(自动计算)

创建方式

op, err := model.NewFullOperation(
    model.OpSourceDOIP,                // 操作来源
    string(model.OpTypeCreate),        // 操作类型(字符串)
    "10.1000",                         // doPrefix
    "my-repo",                         // doRepository  
    "10.1000/my-repo/object123",       // doid完整标识
    "producer-123",                    // producerID
    "user123",                         // opActor
    []byte(`{"foo":"bar"}`),          // 请求体(支持 string 或 []byte
    []byte(`{"status":"ok"}`),        // 响应体(支持 string 或 []byte
    time.Now(),                        // 操作时间戳
)

发布方式

client.OperationPublish(op) // 发布到 OperationTopic

2. Record简单记录

Record 用于记录简单的事件或日志,轻量级设计,适合日志和事件追踪场景。

适用场景

  • 记录简单的日志信息
  • 记录系统中的事件(如用户登录、配置变更等)
  • 不需要完整请求/响应信息的轻量级记录场景

核心字段

  • ID:记录唯一标识符(自动生成 UUID v7
  • DoPrefix:节点前缀(可选)
  • Timestamp:操作时间(可选,默认当前时间)
  • Operator:用户标识(可选)
  • Extra:额外数据(可选,[]byte 类型)
  • RCType:记录类型(可选,如 "log""event" 等)
  • Algorithm:哈希算法类型(默认 Sha256Simd
  • RCHash:记录哈希值(自动计算)

创建方式

// 方式一:完整创建
record, err := model.NewFullRecord(
    "10.1000",                    // DoPrefix
    time.Now(),                   // 时间戳
    "operator123",                // 操作者
    []byte("extra data"),         // 额外数据
    "log",                        // 记录类型
    model.BLAKE3,                 // 哈希算法
)

// 方式二:链式调用创建
record, _ := model.NewRecord(model.SHA256)
record.WithDoPrefix("10.1000").
    WithTimestamp(time.Now()).
    WithOperator("operator123").
    WithExtra([]byte("extra data")).
    WithRCType("log")

发布方式

client.RecordPublish(record) // 发布到 RecordTopic

两种模型的对比

特性 Operation Record
用途 完整业务操作记录 简单事件/日志记录
请求/响应 包含请求体和响应体哈希 不包含
取证验证 完整取证验证流程 哈希验证
数据标识 完整的 DataIDPrefix/Repository/Doid 可选的 DoPrefix
字段复杂度 较高8+ 字段) 较低7 字段)
Topic persistent://public/default/operation persistent://public/default/record
适用场景 审计、完整操作追踪 日志、事件追踪

HashType哈希算法

两种模型都支持以下 18 种哈希算法:

  • MD5 系列MD5MD4
  • SHA 系列SHA1SHA224SHA256SHA384SHA512SHA512/224SHA512/256SHA256-SIMD
  • SHA3 系列SHA3-224SHA3-256SHA3-384SHA3-512
  • BLAKE 系列BLAKE3BLAKE2BBLAKE2S
  • 其他RIPEMD160

默认算法:Sha256Simd

组件说明

  • Publisher
    负责将 OperationRecord 序列化并发布到对应的 Topic

    • Operationpersistent://public/default/operation
    • Recordpersistent://public/default/record
  • Subscriber
    负责从 Topic 中订阅报文并进行 ack/nack 处理(一般无需直接使用)。可以订阅 OperationTopicRecordTopic

  • HighClient
    高层封装的发布客户端,方便业务代码发送 OperationRecord 消息。

  • QueryClient
    基于 gRPC 的统一查询客户端,提供:

    • Operation 操作查询:列表查询和取证验证
    • Record 记录查询:列表查询和验证
    • 单一连接池:两种服务共享同一组 gRPC 连接,支持多服务器负载均衡

🎯 使用场景

发布场景

Operation 发布场景

  • 业务操作记录:记录 DOIP/IRP 协议的完整操作Create、Update、Delete 等)
  • 审计追踪:需要完整记录请求和响应的审计场景
  • 取证验证:需要支持完整取证验证的操作记录

Record 发布场景

  • 日志记录:记录简单的日志信息
  • 事件追踪:记录系统中的事件(如用户登录、配置变更等)
  • 轻量级记录:不需要完整请求/响应信息的场景

发布方式

  • 直接发布:使用 Pulsar PublisherSDK 已提供)发送到对应的 Pulsar 主题
  • 事务性发布:使用 Watermill Forwarder 将消息持久化到 SQL 数据库,保证消息的事务性和可靠性

查询场景

Operation 查询场景

  • 操作列表查询:查询历史操作记录列表(支持分页、按来源/类型/前缀/仓库过滤)
  • 取证验证:对特定操作执行完整的取证验证(流式返回进度)

Record 查询场景

  • 记录列表查询:查询历史记录列表(支持分页、按前缀和类型过滤)
  • 记录验证:对特定记录执行哈希验证(流式返回进度)

统一客户端QueryClient 使用单一连接池同时支持两种服务,共享 gRPC 连接资源


📝 快速开始

1. HighClient 使用(消息发布)

1.1 创建 Logger

SDK 使用 logr 作为日志接口。你需要先创建一个 logr.Logger 实例,然后通过 logger.NewLogger() 包装成 SDK 的 Logger 接口。

方式一:使用默认的 discard logger适用于测试
import (
    "go.yandata.net/iod/iod/go-trustlog/api/logger"
    "github.com/go-logr/logr"
)

// 使用 discard logger不输出任何日志
myLogger := logger.NewLogger(logr.Discard())
方式二:使用 zap推荐生产环境
import (
    "go.yandata.net/iod/iod/go-trustlog/api/logger"
    "github.com/go-logr/zap"
    "go.uber.org/zap"
)

// 创建 zap logger
zapLogger, _ := zap.NewProduction()
// 转换为 logr.Logger
logrLogger := zapr.NewLogger(zapLogger)
// 包装成 SDK 的 Logger
myLogger := logger.NewLogger(logrLogger)
方式三:使用其他 logr 实现
import (
    "go.yandata.net/iod/iod/go-trustlog/api/logger"
    // 可以使用任何实现了 logr.LogSink 的实现
    // 例如github.com/go-logr/logr/slogr基于 slog
    //      github.com/go-logr/zap基于 zap
    //      github.com/go-logr/logrusr基于 logrus
)

// 假设你有一个 logr.Logger 实例
var logrLogger logr.Logger
myLogger := logger.NewLogger(logrLogger)

1.2 创建 Publisher

import (
    "go.yandata.net/iod/iod/go-trustlog/api/adapter"
    "go.yandata.net/iod/iod/go-trustlog/api/logger"
    "github.com/go-logr/logr"
)

// 创建 Logger使用 discard 作为示例)
myLogger := logger.NewLogger(logr.Discard())

// 创建 Pulsar Publisher
pub, err := adapter.NewPublisher(
    adapter.PublisherConfig{
        URL: "pulsar://localhost:6650",
    },
    myLogger,
)
if err != nil {
    panic(err)
}
defer pub.Close()

1.3 使用 HighClient 发送 Operation

import (
    "go.yandata.net/iod/iod/go-trustlog/api/highclient"
    "go.yandata.net/iod/iod/go-trustlog/api/model"
    "time"
)

// 准备SM2密钥十六进制字符串格式
privateKeyHex := []byte("私钥D的十六进制字符串例如abc123...")
publicKeyHex := []byte("04 + x坐标(32字节) + y坐标(32字节)的十六进制字符串")

// 创建Envelope配置
envelopeConfig := model.DefaultEnvelopeConfig(privateKeyHex, publicKeyHex)

// 创建高层客户端使用Envelope序列化方式
client := highclient.NewClient(pub, myLogger, envelopeConfig)
defer client.Close()

// 构造完整的 Operation
op, err := model.NewFullOperation(
    model.OpSourceDOIP,                // 操作来源DOIP 或 IRP
    string(model.OpTypeCreate),        // 操作类型:字符串
    "10.1000",                         // doPrefix
    "my-repo",                         // doRepository
    "10.1000/my-repo/object123",       // doid完整标识
    "producer-123",                    // producerID
    "user123",                         // opActor
    []byte(`{"foo":"bar"}`),          // 请求体
    []byte(`{"status":"ok"}`),        // 响应体
    time.Now(),                        // 操作时间戳
)
if err != nil {
    panic(err)
}

// 发送 Operation
if err := client.OperationPublish(op); err != nil {
    panic(err)
}

1.4 使用 HighClient 发送 Record

// 构造 Record
record, err := model.NewFullRecord(
    "10.1000",                    // DoPrefix
    time.Now(),                   // 时间戳
    "operator123",                // 操作者
    []byte("extra data"),         // 额外数据
    "log",                        // 记录类型
    model.BLAKE3,                 // 哈希算法
)
if err != nil {
    panic(err)
}

// 发送 Record
if err := client.RecordPublish(record); err != nil {
    panic(err)
}

1.5 获取底层 Publisher

// 如果需要直接访问 Watermill Publisher
lowPublisher := client.GetLow()

2. QueryClient 使用(统一查询客户端)

QueryClient 是统一的查询客户端,同时支持 Operation操作Record记录 两种服务的查询和验证。使用单一连接池,两种服务共享同一组 gRPC 连接。

2.1 创建 QueryClient

QueryClient 是统一的查询客户端,同时支持 Operation操作Record记录 两种服务的查询和验证。使用单一连接池,两种服务共享同一组 gRPC 连接。

2.1 创建 QueryClient

单服务器模式
import (
    "go.yandata.net/iod/iod/go-trustlog/api/queryclient"
    "go.yandata.net/iod/iod/go-trustlog/api/logger"
    "github.com/go-logr/logr"
)

// 创建 Logger
myLogger := logger.NewLogger(logr.Discard())

// 创建统一查询客户端(单服务器)
queryClient, err := queryclient.NewClient(
    queryclient.ClientConfig{
        ServerAddr: "localhost:50051",
    },
    myLogger,
)
if err != nil {
    panic(err)
}
defer queryClient.Close()
多服务器负载均衡模式
// 创建查询客户端(多服务器,自动轮询负载均衡)
queryClient, err := queryclient.NewClient(
    queryclient.ClientConfig{
        ServerAddrs: []string{
            "server1:50051",
            "server2:50051",
            "server3:50051",
        },
        // DialOptions: []grpc.DialOption{...}, // 可选:自定义 gRPC 连接选项
    },
    myLogger,
)
if err != nil {
    panic(err)
}
defer queryClient.Close()

2.2 查询操作列表

import (
    "context"
    "time"
)

ctx := context.Background()

// 构造查询请求
req := queryclient.ListRequest{
    PageSize: 100,                      // 每页数量
    PreTime:  time.Now().Add(-24 * time.Hour), // 游标分页(可选)
    
    // 可选过滤条件
    OpSource:     "DOIP",               // 按操作来源过滤(字符串)
    OpType:       "Create",             // 按操作类型过滤(字符串)
    DoPrefix:     "10.1000",            // 按数据前缀过滤
    DoRepository: "my-repo",            // 按仓库过滤
}

// 执行查询
resp, err := queryClient.ListOperations(ctx, req)
if err != nil {
    panic(err)
}

// 处理结果
fmt.Printf("Total count: %d\n", resp.Count)
for _, op := range resp.Data {
    fmt.Printf("Operation ID: %s, Type: %s, Time: %s\n", 
        op.OpID, op.OpType, op.Timestamp)
}

2.3 取证验证(流式)

// 构造验证请求
validationReq := queryclient.ValidateRequest{
    Timestamp:    time.Now().Add(-1 * time.Hour),
    OpID:         "operation-id-123",
    OpType:       "Create",
    DoRepository: "my-repo",
}

// 异步验证(流式接收进度)
resultChan, err := queryClient.ValidateOperation(ctx, validationReq)
if err != nil {
    panic(err)
}

// 处理流式结果
for result := range resultChan {
    if result.IsProcessing() {
        fmt.Printf("Progress: %s - %s\n", result.Progress, result.Msg)
    } else if result.IsCompleted() {
        fmt.Println("Validation completed successfully!")
        if result.Data != nil {
            fmt.Printf("Operation: %+v\n", result.Data)
        }
    } else if result.IsFailed() {
        fmt.Printf("Validation failed: %s\n", result.Msg)
    }
}

2.4 取证验证(同步)

// 同步验证(阻塞直到完成)
finalResult, err := queryClient.ValidateOperationSync(
    ctx,
    validationReq,
    func(progress *model.ValidationResult) {
        // 可选的进度回调
        fmt.Printf("Progress: %s\n", progress.Progress)
    },
)
if err != nil {
    panic(err)
}

if finalResult.IsCompleted() {
    fmt.Println("Validation successful!")
} else {
    fmt.Printf("Validation failed: %s\n", finalResult.Msg)
}

2.5 查询记录列表Record

// 构造记录查询请求
recordReq := queryclient.ListRequest{
    PageSize: 50,                          // 每页数量
    PreTime:  time.Now().Add(-24 * time.Hour), // 游标分页(可选)
    
    // 可选过滤条件
    DoPrefix: "10.1000",   // 按数据前缀过滤
    RCType:   "log",       // 按记录类型过滤
}

// 执行查询
recordResp, err := queryClient.ListRecords(ctx, recordReq)
if err != nil {
    panic(err)
}

// 处理结果
fmt.Printf("Total records: %d\n", recordResp.Count)
for _, rec := range recordResp.Data {
    fmt.Printf("Record ID: %s, Type: %s, Hash: %s\n", 
        rec.ID, rec.RCType, rec.RCHash)
}

2.6 记录验证(流式)

// 构造记录验证请求
recordValidationReq := queryclient.ValidateRequest{
    Timestamp: time.Now().Add(-1 * time.Hour),
    RecordID:  "record-id-123",
    DoPrefix:  "10.1000",
    RCType:    "log",
}

// 异步验证(流式接收进度)
recordResultChan, err := queryClient.ValidateRecord(ctx, recordValidationReq)
if err != nil {
    panic(err)
}

// 处理流式结果
for result := range recordResultChan {
    if result.IsProcessing() {
        fmt.Printf("Progress: %s - %s\n", result.Progress, result.Msg)
    } else if result.IsCompleted() {
        fmt.Println("Record validation completed!")
        if result.Data != nil {
            fmt.Printf("Record: %+v\n", result.Data)
        }
    } else if result.IsFailed() {
        fmt.Printf("Record validation failed: %s\n", result.Msg)
    }
}

2.7 记录验证(同步)

// 同步验证(阻塞直到完成)
finalRecordResult, err := queryClient.ValidateRecordSync(
    ctx,
    recordValidationReq,
    func(progress *model.RecordValidationResult) {
        // 可选的进度回调
        fmt.Printf("Progress: %s\n", progress.Progress)
    },
)
if err != nil {
    panic(err)
}

if finalRecordResult.IsCompleted() {
    fmt.Println("Record validation successful!")
} else {
    fmt.Printf("Record validation failed: %s\n", finalRecordResult.Msg)
}

2.8 获取底层 gRPC 客户端

// 高级用户可以直接访问 gRPC 客户端进行自定义操作

// 获取 Operation 服务客户端
opGrpcClient := queryClient.GetLowLevelOperationClient()

// 获取 Record 服务客户端
recGrpcClient := queryClient.GetLowLevelRecordClient()

// 注意:多服务器模式下,每次调用会返回轮询的下一个客户端

3. Persistence 使用(数据库持久化) 新增

Persistence 模块提供完整的数据库持久化支持,实现 Cursor + Retry 双层架构,保证异步最终一致性。

3.1 快速开始

import (
    "context"
    "time"
    
    "go.yandata.net/iod/iod/go-trustlog/api/persistence"
    "go.yandata.net/iod/iod/go-trustlog/api/adapter"
    "go.yandata.net/iod/iod/go-trustlog/api/model"
    "go.yandata.net/iod/iod/go-trustlog/api/logger"
    "github.com/go-logr/logr"
)

func main() {
    ctx := context.Background()
    
    // 1. 创建 Logger
    myLogger := logger.NewLogger(logr.Discard())
    
    // 2. 创建 Pulsar Publisher
    publisher, err := adapter.NewPublisher(
        adapter.PublisherConfig{
            URL: "pulsar://localhost:6650",
        },
        myLogger,
    )
    if err != nil {
        panic(err)
    }
    defer publisher.Close()
    
    // 3. 准备 SM2 密钥和 Envelope 配置(用于签名)
    privateKeyHex := []byte("私钥D的十六进制字符串")
    publicKeyHex := []byte("04 + x坐标 + y坐标的十六进制字符串")
    envelopeConfig := model.DefaultEnvelopeConfig(privateKeyHex, publicKeyHex)
    
    // 4. 创建 Persistence Client
    client, err := persistence.NewPersistenceClient(ctx, persistence.PersistenceClientConfig{
        Publisher:      publisher,      // Pulsar Publisher
        Logger:         myLogger,
        EnvelopeConfig: envelopeConfig, // ⭐ SM2 签名配置
        DBConfig: persistence.DBConfig{
            DriverName:      "postgres",
            DSN:             "postgres://user:pass@localhost:5432/trustlog?sslmode=disable",
            MaxOpenConns:    20,
            MaxIdleConns:    10,
            ConnMaxLifetime: time.Hour,
        },
        PersistenceConfig: persistence.PersistenceConfig{
            Strategy: persistence.StrategyDBAndTrustlog,  // 既落库又存证
        },
        // 启用 Cursor 工作器(推荐)
        EnableCursorWorker: true,
        CursorWorkerConfig: &persistence.CursorWorkerConfig{
            ScanInterval:    10 * time.Second,  // 10秒扫描一次
            BatchSize:       100,                // 每批处理100条
            MaxRetryAttempt: 1,                  // Cursor阶段快速失败
        },
        // 启用 Retry 工作器(必需)
        EnableRetryWorker: true,
        RetryWorkerConfig: &persistence.RetryWorkerConfig{
            RetryInterval:  30 * time.Second,    // 30秒重试一次
            MaxRetryCount:  5,                   // 最多重试5次
        },
    })
    if err != nil {
        panic(err)
    }
    defer client.Close()
    
    // 5. 发布操作(立即返回,异步存证)
    clientIP := "192.168.1.100"
    serverIP := "10.0.0.1"
    
    op := &model.Operation{
        OpID:         "op-001",
        OpType:       string(model.OpTypeCreate), // 字符串类型
        Doid:         "10.1000/repo/obj",
        ProducerID:   "producer-001",
        OpSource:     model.OpSourceDOIP,
        DoPrefix:     "10.1000",
        DoRepository: "repo",
        OpActor:      "user-123",
        Timestamp:    time.Now(),
        ClientIP:     &clientIP,  // 可空
        ServerIP:     &serverIP,  // 可空
    }
    
    if err := client.OperationPublish(ctx, op); err != nil {
        panic(err)
    }
    
    // ✅ 落库成功CursorWorker 会自动异步存证(带签名)
    println("操作已保存,正在异步存证...")
}

3.2 三种持久化策略

策略 说明 是否签名 适用场景
StrategyDBOnly 仅落库,不存证 不签名 历史数据存档、审计日志
StrategyDBAndTrustlog 既落库又存证(异步) 签名存证 生产环境推荐
StrategyTrustlogOnly 仅存证,不落库 签名存证 轻量级场景

重要说明

  • 所有存证操作(StrategyDBAndTrustlogStrategyTrustlogOnly)都会使用 EnvelopeConfig 进行 SM2 签名
  • StrategyDBOnly 仅保存到数据库,不会进行签名和存证
  • 创建 PersistenceClient必须提供 EnvelopeConfig(即使是 StrategyDBOnly 也建议提供,以便后续切换策略)

3.3 Cursor + Retry 双层架构

应用调用
   ↓
仅落库(立即返回)
   ↓
CursorWorker第一道防线
   ├── 增量扫描 operation 表
   ├── 快速尝试存证(使用 Envelope 签名)✅
   ├── 成功 → 更新状态
   └── 失败 → 加入 retry 表
       ↓
RetryWorker第二道防线
   ├── 扫描 retry 表
   ├── 指数退避重试(使用 Envelope 签名)✅
   ├── 成功 → 删除 retry 记录
   └── 失败 → 标记死信

优势

  • 充分利用 cursor 游标表作为任务发现队列
  • 双层保障确保最终一致性
  • 性能优秀,扩展性强
  • 监控清晰,易于维护
  • 所有存证操作都经过 SM2 签名验证

3.4 数据库表设计

operation 表(必需):

  • 存储所有操作记录
  • trustlog_status 字段标记存证状态
  • client_ip, server_ip 可空字段(仅落库)

trustlog_cursor 表(核心):

  • Key-Value 模式,支持多游标
  • 使用时间戳作为游标值
  • 作为任务发现队列

trustlog_retry 表(必需):

  • 存储失败的重试记录
  • 支持指数退避
  • 死信队列

3.5 监控和查询

// 查询未存证记录数
var count int
db.QueryRow(`
    SELECT COUNT(*) 
    FROM operation 
    WHERE trustlog_status = 'NOT_TRUSTLOGGED'
`).Scan(&count)

// 查询重试队列长度
db.QueryRow(`
    SELECT COUNT(*) 
    FROM trustlog_retry 
    WHERE retry_status IN ('PENDING', 'RETRYING')
`).Scan(&count)

// 查询死信记录
rows, _ := db.Query(`
    SELECT op_id, retry_count, error_message 
    FROM trustlog_retry 
    WHERE retry_status = 'DEAD_LETTER'
`)

3.6 详细文档


4. Subscriber 使用(消息订阅)

注意:通常业务代码不需要直接使用 Subscriber除非需要原始的 Watermill 消息处理。

import (
    "context"
    "go.yandata.net/iod/iod/go-trustlog/api/adapter"
    "go.yandata.net/iod/iod/go-trustlog/api/model"
    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/bytedance/sonic"
    "github.com/apache/pulsar-client-go/pulsar"
    "github.com/go-logr/logr"
    
    "go.yandata.net/iod/iod/go-trustlog/api/logger"
)

// 创建 Logger
myLogger := logger.NewLogger(logr.Discard())

// 创建订阅者
sub, err := adapter.NewSubscriber(
    adapter.SubscriberConfig{
        URL:            "pulsar://localhost:6650",
        SubscriberType: pulsar.KeyShared, // 必须使用 KeyShared 模式
    },
    myLogger,
)
if err != nil {
    panic(err)
}
defer sub.Close()

// 订阅消息context 必须携带 key 为 "subName" 的 value
ctx := context.WithValue(context.Background(), "subName", "my-subscriber")
msgChan, err := sub.Subscribe(ctx, adapter.OperationTopic) // 或者 adapter.RecordTopic
if err != nil {
    panic(err)
}

// 处理消息
for msg := range msgChan {
    var op model.Operation
    if err := sonic.Unmarshal(msg.Payload, &op); err != nil {
        myLogger.ErrorContext(ctx, "Invalid Operation message", "error", err)
        msg.Nack()
        continue
    }

    // 处理业务逻辑
    myLogger.InfoContext(ctx, "Received Operation", "key", op.Key())

    // 根据业务成功与否 ack / nack
    msg.Ack()
}

5. Forwarder 事务性发布SQL持久化

使用 Watermill Forwarder 可以将消息先持久化到 SQL 数据库,然后异步发送到 Pulsar保证消息的事务性和可靠性。
这在需要确保消息不丢失的场景下非常有用。

import (
    "database/sql"
    "github.com/ThreeDotsLabs/watermill/components/forwarder"
    "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql"
    "github.com/go-logr/logr"
    
    "go.yandata.net/iod/iod/go-trustlog/api/adapter"
    "go.yandata.net/iod/iod/go-trustlog/api/highclient"
    "go.yandata.net/iod/iod/go-trustlog/api/logger"
)

// 0. 创建 Logger
myLogger := logger.NewLogger(logr.Discard())

// 1. 创建 SQL Publisher用于持久化
db, err := sql.Open("postgres", "postgres://user:pass@localhost/db")
if err != nil {
    panic(err)
}

sqlPublisher, err := watermillsql.NewPublisher(
    db,
    watermillsql.PublisherConfig{
        SchemaAdapter: watermillsql.DefaultPostgreSQLSchema{},
    },
    myLogger,
)
if err != nil {
    panic(err)
}

// 2. 创建 Pulsar Publisher实际发送
pulsarPublisher, err := adapter.NewPublisher(
    adapter.PublisherConfig{URL: "pulsar://localhost:6650"},
    myLogger,
)
if err != nil {
    panic(err)
}

// 3. 创建 ForwarderSQL -> Pulsar
// 消息先写入 SQL事务提交后异步转发到 Pulsar
fwd, err := forwarder.NewForwarder(sqlPublisher, pulsarPublisher)
if err != nil {
    panic(err)
}

// 4. 使用 Forwarder 创建客户端
// 发布的消息会先存储到 SQL保证事务性
client := highclient.NewClient(fwd, myLogger)
defer client.Close()

// 5. 在数据库事务中发布消息
tx, _ := db.Begin()
// ... 执行业务数据库操作 ...

// 发布 Operation会在同一个事务中写入
_ = client.OperationPublish(op)

// 提交事务(业务数据和消息同时提交)
tx.Commit()

优势

  • 消息与业务数据在同一事务中,保证强一致性
  • 即使 Pulsar 暂时不可用,消息也不会丢失
  • Forwarder 会自动重试发送失败的消息

🎨 完整示例

发布 + 查询 + 验证完整流程

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/go-logr/logr"
    
    "go.yandata.net/iod/iod/go-trustlog/api/adapter"
    "go.yandata.net/iod/iod/go-trustlog/api/highclient"
    "go.yandata.net/iod/iod/go-trustlog/api/logger"
    "go.yandata.net/iod/iod/go-trustlog/api/queryclient"
    "go.yandata.net/iod/iod/go-trustlog/api/model"
)

func main() {
    ctx := context.Background()
    
    // 0. 创建 Logger
    myLogger := logger.NewLogger(logr.Discard())
    
    // 1. 创建并发送 Operation
    pub, _ := adapter.NewPublisher(
        adapter.PublisherConfig{URL: "pulsar://localhost:6650"},
        myLogger,
    )
    defer pub.Close()
    
    // 准备SM2密钥
    privateKeyHex := []byte("私钥D的十六进制字符串")
    publicKeyHex := []byte("04 + x坐标 + y坐标的十六进制字符串")
    envelopeConfig := model.DefaultEnvelopeConfig(privateKeyHex, publicKeyHex)
    
    client := highclient.NewClient(pub, myLogger, envelopeConfig)
    defer client.Close()
    
    dataID := model.DataID{
        DoPrefix:     "10.1000",
        DoRepository: "test-repo",
        Doid:         "10.1000/test-repo/doc001",
    }
    
    op, _ := model.NewFullOperation(
        model.OpSourceDOIP,
        string(model.OpTypeCreate),        // 字符串类型
        "10.1000",                         // doPrefix
        "test-repo",                       // doRepository
        "10.1000/test-repo/doc001",        // doid
        "producer-001",                    // producerID
        "admin",                           // opActor
        []byte(`{"action":"create"}`),    // requestBody
        []byte(`{"status":"success"}`),   // responseBody
        time.Now(),                        // timestamp
    )
    
    _ = client.OperationPublish(op)
    fmt.Printf("Published operation: %s\n", op.Meta.OpID)
    
    // 等待一段时间让消息被处理
    time.Sleep(2 * time.Second)
    
    // 2. 查询操作列表
    queryClient, _ := queryclient.NewClient(
        queryclient.ClientConfig{ServerAddr: "localhost:50051"},
        myLogger,
    )
    defer queryClient.Close()
    
    listResp, _ := queryClient.ListOperations(ctx, queryclient.ListRequest{
        PageSize:     10,
        DoRepository: "test-repo",
    })
    
    fmt.Printf("Found %d operations\n", listResp.Count)
    
    // 3. 执行取证验证
    if len(listResp.Data) > 0 {
        firstOp := listResp.Data[0]
        
        validationReq := queryclient.ValidateRequest{
            Timestamp:    firstOp.Timestamp,
            OpID:         firstOp.OpID,
            OpType:       firstOp.OpType,  // 已经是字符串
            DoRepository: firstOp.DoRepository,
        }
        
        result, _ := queryClient.ValidateOperationSync(ctx, validationReq, nil)
        
        if result.IsCompleted() {
            fmt.Println("✅ Validation passed!")
        } else {
            fmt.Printf("❌ Validation failed: %s\n", result.Msg)
        }
    }
}

📚 操作类型枚举

DOIP 操作类型7种

// 使用时需要转换为字符串
string(model.OpTypeHello)          // "Hello"
string(model.OpTypeRetrieve)       // "Retrieve"
string(model.OpTypeCreate)         // "Create"
string(model.OpTypeDelete)         // "Delete"
string(model.OpTypeUpdate)         // "Update"
string(model.OpTypeSearch)         // "Search"
string(model.OpTypeListOperations) // "ListOperations"

IRP 操作类型33种

// 使用时需要转换为字符串,例如:
string(model.OpTypeOCCreateHandle)  // "OC_CREATE_HANDLE"
string(model.OpTypeOCDeleteHandle)  // "OC_DELETE_HANDLE"

// Handle 基础操作
model.OpTypeOCReserved, model.OpTypeOCResolution, model.OpTypeOCGetSiteInfo
model.OpTypeOCCreateHandle, model.OpTypeOCDeleteHandle, model.OpTypeOCAddValue
model.OpTypeOCRemoveValue, model.OpTypeOCModifyValue, model.OpTypeOCListHandle
model.OpTypeOCListNA

// DOID 操作
model.OpTypeOCResolutionDOID, model.OpTypeOCCreateDOID, model.OpTypeOCDeleteDOID
model.OpTypeOCUpdateDOID, model.OpTypeOCBatchCreateDOID, model.OpTypeOCResolutionDOIDRecursive

// 用户与仓库
model.OpTypeOCGetUsers, model.OpTypeOCGetRepos

// GRS/IRS 管理
model.OpTypeOCVerifyIRS, model.OpTypeOCResolveGRS, model.OpTypeOCCreateOrgGRS
model.OpTypeOCUpdateOrgGRS, model.OpTypeOCDeleteOrgGRS, model.OpTypeOCSyncOrgIRSParent
model.OpTypeOCUpdateOrgIRSParent, model.OpTypeOCDeleteOrgIRSParent

// 安全与会话
model.OpTypeOCChallengeResponse, model.OpTypeOCVerifyChallenge, model.OpTypeOCSessionSetup
model.OpTypeOCSessionTerminate, model.OpTypeOCSessionExchangeKey, model.OpTypeOCVerifyRouter
model.OpTypeOCQueryRouter

⚠️ 注意事项

  1. 私有仓库配置
    必须先配置 Git SSH 映射和 GOPRIVATE 环境变量,否则无法正常安装 SDK。

  2. 日志接口
    SDK 使用 logr 作为日志接口。你需要:

    • 创建一个 logr.Logger 实例(可以使用 zap、logrus 等实现)
    • 通过 logger.NewLogger(logrLogger) 包装成 SDK 的 Logger 接口
    • 在生产环境建议使用 zaprlogrusr 等实现,测试环境可以使用 logr.Discard()
  3. HighClient 方法名

    • 发送 Operation 使用 client.OperationPublish(op),参数为指针类型 *model.Operation
    • 发送 Record 使用 client.RecordPublish(record),参数为指针类型 *model.Record
  4. 固定主题

    • Operation 主题:persistent://public/default/operation
    • Record 主题:persistent://public/default/record
  5. KeyShared 消费模式
    由于 Trustlog 使用 Key Shared 消费模式,其他订阅者必须选择 KeyShared 并避免消费者重名。

  6. ack/nack 必须处理
    确保订阅方根据业务逻辑确认或拒绝消息。

  7. 时间戳处理
    NewFullOperation() 接受 time.Time 类型的时间戳参数。

  8. 统一连接池
    QueryClient 使用单一连接池同时支持 Operation 和 Record 两种服务,共享 gRPC 连接资源,提高资源利用率。

  9. 负载均衡
    支持多服务器轮询负载均衡,自动分发请求到不同服务器,连接在两种服务间共享。

  10. 流式验证
    取证验证Operation 和 Record都支持流式和同步两种模式流式模式可实时获取进度。

  11. 事务性发布
    使用 Watermill Forwarder 可以将消息持久化到 SQL与业务数据在同一事务中提交保证强一致性。

  12. Record 支持
    除了 OperationSDK 现在也支持 Record 类型的发布、查询和验证,两种服务使用同一个 QueryClient。

  13. 数据库持久化 新增
    完整的数据库持久化支持Cursor + Retry 双层架构,保证异步最终一致性,支持 PostgreSQL、MySQL、SQLite。


🔄 架构图

直接发布架构

[业务服务] 
    ↓
[HighClient.Publish()] 
    ↓
[Pulsar Publisher] --(Operation JSON)--> [Pulsar Topic]
                                              ↓
                                         [Subscriber]
                                              ↓
                                         [其他服务]

事务性发布架构(使用 Forwarder

[业务服务 + DB事务]
    ↓
[HighClient.Publish()]
    ↓
[SQL Publisher] --写入--> [PostgreSQL/MySQL]
    ↓                          ↓
[Forwarder 后台轮询]           |
    ↓                          |
[读取未发送消息] <--------------┘
    ↓
[Pulsar Publisher] --(Operation JSON)--> [Pulsar Topic]
    ↓                                          ↓
[标记为已发送]                             [Subscriber]
                                              ↓
                                         [其他服务]

查询架构(统一连接池)

[业务服务]
    ↓
[QueryClient - 单一连接池]
    ├─ Operation 服务客户端 ─┐
    └─ Record 服务客户端 ────┤
                           ↓ (共享 gRPC 连接,轮询负载均衡)
                    [Server 1] ─┐
                    [Server 2] ─┼─ 多服务器
                    [Server 3] ─┘
                           ↓
                      [存储层]

优势:
- 单一连接池,资源高效利用
- Operation 和 Record 服务共享连接
- 自动负载均衡,请求分发到不同服务器
- 减少连接数,降低服务器压力

持久化架构Cursor + Retry 双层模式) 新增

[应用调用 OperationPublish()]
    ↓
[保存到 operation 表状态NOT_TRUSTLOGGED]
    ↓
[立即返回成功]

    [异步处理开始]
         ↓
[CursorWorker每10秒]
    ├── 增量扫描 operation 表
    ├── 尝试发送到存证系统Envelope 签名)✅
    ├── 成功 → 更新状态为 TRUSTLOGGED
    └── 失败 → 加入 trustlog_retry 表
         ↓
[RetryWorker每30秒]
    ├── 扫描 trustlog_retry 表
    ├── 指数退避重试Envelope 签名)✅
    ├── 成功 → 删除 retry 记录
    └── 失败 → 标记为 DEAD_LETTER

优势:
- ✅ 充分利用 cursor 游标表作为任务发现队列
- ✅ 双层保障确保最终一致性
- ✅ 性能优秀(增量扫描 + 索引查询)
- ✅ 易于监控和运维
- ✅ 所有存证操作都经过 SM2 签名验证

📚 相关文档

核心文档

测试状态

  • 49/49 单元测试通过
  • 代码覆盖率: 28.5%
  • 支持数据库: PostgreSQL, MySQL, SQLite

📝 版本信息

  • 当前版本: v2.1.0
  • Go 版本要求: 1.21+
  • 最后更新: 2025-12-23

Description
No description provided
Readme 742 KiB
Languages
Go 99.4%
Makefile 0.6%