Files
go-trustlog/README.md

1263 lines
39 KiB
Markdown
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Go-Trustlog SDK
[![Go Version](https://img.shields.io/badge/Go-1.21+-blue.svg)](https://golang.org)
[![Test Status](https://img.shields.io/badge/tests-passing-brightgreen.svg)](.)
本 SDK 提供基于 [Watermill](https://watermill.io/) 抽象层的统一消息发送与接收能力,基于 gRPC 的操作查询和取证验证功能,以及**完整的数据库持久化支持**。
### 核心特性
#### 📦 双数据模型
- **`Operation`**(操作记录):完整的业务操作,包含请求/响应体哈希,支持完整的取证验证
- **`Record`**(简单记录):轻量级事件或日志记录,适合日志和事件追踪场景
#### 💾 数据库持久化(新增)
- **三种持久化策略**:仅落库、既落库又存证、仅存证
- **Cursor + Retry 双层架构**:异步最终一致性保障
- **多数据库支持**PostgreSQL、MySQL、SQLite
- **可靠重试机制**:指数退避 + 死信队列
#### 🔄 消息发布
- **直接发布**:通过 Pulsar Publisher 发送到对应的 Topic
- **事务性发布**:使用 Watermill Forwarder 持久化到 SQL保证事务性
#### 🔍 查询验证
- **统一查询客户端**:单一连接池同时支持 Operation 和 Record 查询
- **流式验证**:实时获取取证验证进度
- **负载均衡**:多服务器轮询分发
---
## 📋 目录
- [安装](#-安装)
- [核心概念](#-核心概念)
- [使用场景](#-使用场景)
- [快速开始](#-快速开始)
- [HighClient 使用(消息发布)](#1-highclient-使用消息发布)
- [QueryClient 使用(统一查询)](#2-queryclient-使用统一查询客户端)
- [Persistence 使用(数据库持久化)](#3-persistence-使用数据库持久化) ⭐ 新增
- [Subscriber 使用(消息订阅)](#4-subscriber-使用消息订阅)
- [Forwarder 事务性发布](#5-forwarder-事务性发布sql持久化)
- [完整示例](#-完整示例)
- [操作类型枚举](#-操作类型枚举)
- [注意事项](#-注意事项)
- [架构图](#-架构图)
---
## 🚀 安装
### 1. 私有仓库配置(重要)
由于本 SDK 托管在私有仓库,需要配置 SSH 映射和禁用 Go Module 校验:
#### 配置 Git SSH 映射(跳过 HTTPS 验证)
```bash
git config --global url."git@go.yandata.net:".insteadOf "https://go.yandata.net"
```
#### 禁用 Go Module Sum 校验
```bash
go env -w GOPRIVATE="go.yandata.net"
```
### 2. 安装 SDK
```bash
go get go.yandata.net/iod/iod/go-trustlog
```
---
## 📦 核心概念
### 数据模型
SDK 提供两种数据模型,分别适用于不同的业务场景:
#### 1. Operation操作记录
`Operation` 用于记录完整的业务操作,包含完整的元数据、请求/响应体哈希等信息,支持完整的取证验证流程。
**适用场景**
- 记录 DOIP/IRP 协议的完整操作Create、Update、Delete、Retrieve 等)
- 需要完整记录请求和响应的审计场景
- 需要支持完整取证验证的操作记录
**核心字段**
- `Meta`:操作元数据
- `OpID`:操作唯一标识符(自动生成 UUID v7
- `Timestamp`:操作时间戳(必填)
- `OpSource`:操作来源(`DOIP``IRP`
- `OpType`:操作类型(如 `"Create"``"Update"``"Delete"` 等,字符串类型)
- `OpAlgorithm`:哈希算法类型(默认 `Sha256Simd`
- `OpMetaHash`:元数据哈希值(自动计算)
- `DataID`:数据标识
- `DoPrefix`DO 前缀(必填)
- `DoRepository`:仓库名(必填)
- `Doid`:完整 DOID必填格式`{DoPrefix}/{DoRepository}/{object}`
- `OpActor`:操作发起者(默认 `SYSTEM`
- `RequestBodyHash`:请求体哈希值(必填)
- `ResponseBodyHash`:响应体哈希值(必填)
- `OpHash`:操作整体哈希值(自动计算)
**创建方式**
```go
op, err := model.NewFullOperation(
model.OpSourceDOIP, // 操作来源
string(model.OpTypeCreate), // 操作类型(字符串)
"10.1000", // doPrefix
"my-repo", // doRepository
"10.1000/my-repo/object123", // doid完整标识
"producer-123", // producerID
"user123", // opActor
[]byte(`{"foo":"bar"}`), // 请求体(支持 string 或 []byte
[]byte(`{"status":"ok"}`), // 响应体(支持 string 或 []byte
time.Now(), // 操作时间戳
)
```
**发布方式**
```go
client.OperationPublish(op) // 发布到 OperationTopic
```
#### 2. Record简单记录
`Record` 用于记录简单的事件或日志,轻量级设计,适合日志和事件追踪场景。
**适用场景**
- 记录简单的日志信息
- 记录系统中的事件(如用户登录、配置变更等)
- 不需要完整请求/响应信息的轻量级记录场景
**核心字段**
- `ID`:记录唯一标识符(自动生成 UUID v7
- `DoPrefix`:节点前缀(可选)
- `Timestamp`:操作时间(可选,默认当前时间)
- `Operator`:用户标识(可选)
- `Extra`:额外数据(可选,`[]byte` 类型)
- `RCType`:记录类型(可选,如 `"log"``"event"` 等)
- `Algorithm`:哈希算法类型(默认 `Sha256Simd`
- `RCHash`:记录哈希值(自动计算)
**创建方式**
```go
// 方式一:完整创建
record, err := model.NewFullRecord(
"10.1000", // DoPrefix
time.Now(), // 时间戳
"operator123", // 操作者
[]byte("extra data"), // 额外数据
"log", // 记录类型
model.BLAKE3, // 哈希算法
)
// 方式二:链式调用创建
record, _ := model.NewRecord(model.SHA256)
record.WithDoPrefix("10.1000").
WithTimestamp(time.Now()).
WithOperator("operator123").
WithExtra([]byte("extra data")).
WithRCType("log")
```
**发布方式**
```go
client.RecordPublish(record) // 发布到 RecordTopic
```
#### 两种模型的对比
| 特性 | Operation | Record |
|------|-----------|--------|
| **用途** | 完整业务操作记录 | 简单事件/日志记录 |
| **请求/响应** | ✅ 包含请求体和响应体哈希 | ❌ 不包含 |
| **取证验证** | ✅ 完整取证验证流程 | ✅ 哈希验证 |
| **数据标识** | ✅ 完整的 DataIDPrefix/Repository/Doid | ✅ 可选的 DoPrefix |
| **字段复杂度** | 较高8+ 字段) | 较低7 字段) |
| **Topic** | `persistent://public/default/operation` | `persistent://public/default/record` |
| **适用场景** | 审计、完整操作追踪 | 日志、事件追踪 |
### HashType哈希算法
两种模型都支持以下 18 种哈希算法:
- **MD5 系列**`MD5``MD4`
- **SHA 系列**`SHA1``SHA224``SHA256``SHA384``SHA512``SHA512/224``SHA512/256``SHA256-SIMD`
- **SHA3 系列**`SHA3-224``SHA3-256``SHA3-384``SHA3-512`
- **BLAKE 系列**`BLAKE3``BLAKE2B``BLAKE2S`
- **其他**`RIPEMD160`
默认算法:`Sha256Simd`
### 组件说明
- **Publisher**
负责将 `Operation``Record` 序列化并发布到对应的 Topic
- `Operation``persistent://public/default/operation`
- `Record``persistent://public/default/record`
- **Subscriber**
负责从 Topic 中订阅报文并进行 ack/nack 处理(一般无需直接使用)。可以订阅 `OperationTopic``RecordTopic`
- **HighClient**
高层封装的发布客户端,方便业务代码发送 `Operation``Record` 消息。
- **QueryClient**
基于 gRPC 的统一查询客户端,提供:
- **Operation 操作查询**:列表查询和取证验证
- **Record 记录查询**:列表查询和验证
- **单一连接池**:两种服务共享同一组 gRPC 连接,支持多服务器负载均衡
---
## 🎯 使用场景
### 发布场景
#### Operation 发布场景
- **业务操作记录**:记录 DOIP/IRP 协议的完整操作Create、Update、Delete 等)
- **审计追踪**:需要完整记录请求和响应的审计场景
- **取证验证**:需要支持完整取证验证的操作记录
#### Record 发布场景
- **日志记录**:记录简单的日志信息
- **事件追踪**:记录系统中的事件(如用户登录、配置变更等)
- **轻量级记录**:不需要完整请求/响应信息的场景
**发布方式**
- **直接发布**:使用 Pulsar PublisherSDK 已提供)发送到对应的 Pulsar 主题
- **事务性发布**:使用 Watermill Forwarder 将消息持久化到 SQL 数据库,保证消息的事务性和可靠性
### 查询场景
#### Operation 查询场景
- **操作列表查询**:查询历史操作记录列表(支持分页、按来源/类型/前缀/仓库过滤)
- **取证验证**:对特定操作执行完整的取证验证(流式返回进度)
#### Record 查询场景
- **记录列表查询**:查询历史记录列表(支持分页、按前缀和类型过滤)
- **记录验证**:对特定记录执行哈希验证(流式返回进度)
**统一客户端**`QueryClient` 使用单一连接池同时支持两种服务,共享 gRPC 连接资源
---
## 📝 快速开始
### 1. HighClient 使用(消息发布)
#### 1.1 创建 Logger
SDK 使用 [logr](https://github.com/go-logr/logr) 作为日志接口。你需要先创建一个 logr.Logger 实例,然后通过 `logger.NewLogger()` 包装成 SDK 的 Logger 接口。
##### 方式一:使用默认的 discard logger适用于测试
```go
import (
"go.yandata.net/iod/iod/go-trustlog/api/logger"
"github.com/go-logr/logr"
)
// 使用 discard logger不输出任何日志
myLogger := logger.NewLogger(logr.Discard())
```
##### 方式二:使用 zap推荐生产环境
```go
import (
"go.yandata.net/iod/iod/go-trustlog/api/logger"
"github.com/go-logr/zap"
"go.uber.org/zap"
)
// 创建 zap logger
zapLogger, _ := zap.NewProduction()
// 转换为 logr.Logger
logrLogger := zapr.NewLogger(zapLogger)
// 包装成 SDK 的 Logger
myLogger := logger.NewLogger(logrLogger)
```
##### 方式三:使用其他 logr 实现
```go
import (
"go.yandata.net/iod/iod/go-trustlog/api/logger"
// 可以使用任何实现了 logr.LogSink 的实现
// 例如github.com/go-logr/logr/slogr基于 slog
// github.com/go-logr/zap基于 zap
// github.com/go-logr/logrusr基于 logrus
)
// 假设你有一个 logr.Logger 实例
var logrLogger logr.Logger
myLogger := logger.NewLogger(logrLogger)
```
#### 1.2 创建 Publisher
```go
import (
"go.yandata.net/iod/iod/go-trustlog/api/adapter"
"go.yandata.net/iod/iod/go-trustlog/api/logger"
"github.com/go-logr/logr"
)
// 创建 Logger使用 discard 作为示例)
myLogger := logger.NewLogger(logr.Discard())
// 创建 Pulsar Publisher
pub, err := adapter.NewPublisher(
adapter.PublisherConfig{
URL: "pulsar://localhost:6650",
},
myLogger,
)
if err != nil {
panic(err)
}
defer pub.Close()
```
#### 1.3 使用 HighClient 发送 Operation
```go
import (
"go.yandata.net/iod/iod/go-trustlog/api/highclient"
"go.yandata.net/iod/iod/go-trustlog/api/model"
"time"
)
// 准备SM2密钥十六进制字符串格式
privateKeyHex := []byte("私钥D的十六进制字符串例如abc123...")
publicKeyHex := []byte("04 + x坐标(32字节) + y坐标(32字节)的十六进制字符串")
// 创建Envelope配置
envelopeConfig := model.DefaultEnvelopeConfig(privateKeyHex, publicKeyHex)
// 创建高层客户端使用Envelope序列化方式
client := highclient.NewClient(pub, myLogger, envelopeConfig)
defer client.Close()
// 构造完整的 Operation
op, err := model.NewFullOperation(
model.OpSourceDOIP, // 操作来源DOIP 或 IRP
string(model.OpTypeCreate), // 操作类型:字符串
"10.1000", // doPrefix
"my-repo", // doRepository
"10.1000/my-repo/object123", // doid完整标识
"producer-123", // producerID
"user123", // opActor
[]byte(`{"foo":"bar"}`), // 请求体
[]byte(`{"status":"ok"}`), // 响应体
time.Now(), // 操作时间戳
)
if err != nil {
panic(err)
}
// 发送 Operation
if err := client.OperationPublish(op); err != nil {
panic(err)
}
```
#### 1.4 使用 HighClient 发送 Record
```go
// 构造 Record
record, err := model.NewFullRecord(
"10.1000", // DoPrefix
time.Now(), // 时间戳
"operator123", // 操作者
[]byte("extra data"), // 额外数据
"log", // 记录类型
model.BLAKE3, // 哈希算法
)
if err != nil {
panic(err)
}
// 发送 Record
if err := client.RecordPublish(record); err != nil {
panic(err)
}
```
#### 1.5 获取底层 Publisher
```go
// 如果需要直接访问 Watermill Publisher
lowPublisher := client.GetLow()
```
---
### 2. QueryClient 使用(统一查询客户端)
`QueryClient` 是统一的查询客户端,同时支持 **Operation操作****Record记录** 两种服务的查询和验证。使用单一连接池,两种服务共享同一组 gRPC 连接。
#### 2.1 创建 QueryClient
`QueryClient` 是统一的查询客户端,同时支持 **Operation操作****Record记录** 两种服务的查询和验证。使用单一连接池,两种服务共享同一组 gRPC 连接。
#### 2.1 创建 QueryClient
##### 单服务器模式
```go
import (
"go.yandata.net/iod/iod/go-trustlog/api/queryclient"
"go.yandata.net/iod/iod/go-trustlog/api/logger"
"github.com/go-logr/logr"
)
// 创建 Logger
myLogger := logger.NewLogger(logr.Discard())
// 创建统一查询客户端(单服务器)
queryClient, err := queryclient.NewClient(
queryclient.ClientConfig{
ServerAddr: "localhost:50051",
},
myLogger,
)
if err != nil {
panic(err)
}
defer queryClient.Close()
```
##### 多服务器负载均衡模式
```go
// 创建查询客户端(多服务器,自动轮询负载均衡)
queryClient, err := queryclient.NewClient(
queryclient.ClientConfig{
ServerAddrs: []string{
"server1:50051",
"server2:50051",
"server3:50051",
},
// DialOptions: []grpc.DialOption{...}, // 可选:自定义 gRPC 连接选项
},
myLogger,
)
if err != nil {
panic(err)
}
defer queryClient.Close()
```
#### 2.2 查询操作列表
```go
import (
"context"
"time"
)
ctx := context.Background()
// 构造查询请求
req := queryclient.ListRequest{
PageSize: 100, // 每页数量
PreTime: time.Now().Add(-24 * time.Hour), // 游标分页(可选)
// 可选过滤条件
OpSource: "DOIP", // 按操作来源过滤(字符串)
OpType: "Create", // 按操作类型过滤(字符串)
DoPrefix: "10.1000", // 按数据前缀过滤
DoRepository: "my-repo", // 按仓库过滤
}
// 执行查询
resp, err := queryClient.ListOperations(ctx, req)
if err != nil {
panic(err)
}
// 处理结果
fmt.Printf("Total count: %d\n", resp.Count)
for _, op := range resp.Data {
fmt.Printf("Operation ID: %s, Type: %s, Time: %s\n",
op.OpID, op.OpType, op.Timestamp)
}
```
#### 2.3 取证验证(流式)
```go
// 构造验证请求
validationReq := queryclient.ValidateRequest{
Timestamp: time.Now().Add(-1 * time.Hour),
OpID: "operation-id-123",
OpType: "Create",
DoRepository: "my-repo",
}
// 异步验证(流式接收进度)
resultChan, err := queryClient.ValidateOperation(ctx, validationReq)
if err != nil {
panic(err)
}
// 处理流式结果
for result := range resultChan {
if result.IsProcessing() {
fmt.Printf("Progress: %s - %s\n", result.Progress, result.Msg)
} else if result.IsCompleted() {
fmt.Println("Validation completed successfully!")
if result.Data != nil {
fmt.Printf("Operation: %+v\n", result.Data)
}
} else if result.IsFailed() {
fmt.Printf("Validation failed: %s\n", result.Msg)
}
}
```
#### 2.4 取证验证(同步)
```go
// 同步验证(阻塞直到完成)
finalResult, err := queryClient.ValidateOperationSync(
ctx,
validationReq,
func(progress *model.ValidationResult) {
// 可选的进度回调
fmt.Printf("Progress: %s\n", progress.Progress)
},
)
if err != nil {
panic(err)
}
if finalResult.IsCompleted() {
fmt.Println("Validation successful!")
} else {
fmt.Printf("Validation failed: %s\n", finalResult.Msg)
}
```
#### 2.5 查询记录列表Record
```go
// 构造记录查询请求
recordReq := queryclient.ListRequest{
PageSize: 50, // 每页数量
PreTime: time.Now().Add(-24 * time.Hour), // 游标分页(可选)
// 可选过滤条件
DoPrefix: "10.1000", // 按数据前缀过滤
RCType: "log", // 按记录类型过滤
}
// 执行查询
recordResp, err := queryClient.ListRecords(ctx, recordReq)
if err != nil {
panic(err)
}
// 处理结果
fmt.Printf("Total records: %d\n", recordResp.Count)
for _, rec := range recordResp.Data {
fmt.Printf("Record ID: %s, Type: %s, Hash: %s\n",
rec.ID, rec.RCType, rec.RCHash)
}
```
#### 2.6 记录验证(流式)
```go
// 构造记录验证请求
recordValidationReq := queryclient.ValidateRequest{
Timestamp: time.Now().Add(-1 * time.Hour),
RecordID: "record-id-123",
DoPrefix: "10.1000",
RCType: "log",
}
// 异步验证(流式接收进度)
recordResultChan, err := queryClient.ValidateRecord(ctx, recordValidationReq)
if err != nil {
panic(err)
}
// 处理流式结果
for result := range recordResultChan {
if result.IsProcessing() {
fmt.Printf("Progress: %s - %s\n", result.Progress, result.Msg)
} else if result.IsCompleted() {
fmt.Println("Record validation completed!")
if result.Data != nil {
fmt.Printf("Record: %+v\n", result.Data)
}
} else if result.IsFailed() {
fmt.Printf("Record validation failed: %s\n", result.Msg)
}
}
```
#### 2.7 记录验证(同步)
```go
// 同步验证(阻塞直到完成)
finalRecordResult, err := queryClient.ValidateRecordSync(
ctx,
recordValidationReq,
func(progress *model.RecordValidationResult) {
// 可选的进度回调
fmt.Printf("Progress: %s\n", progress.Progress)
},
)
if err != nil {
panic(err)
}
if finalRecordResult.IsCompleted() {
fmt.Println("Record validation successful!")
} else {
fmt.Printf("Record validation failed: %s\n", finalRecordResult.Msg)
}
```
#### 2.8 获取底层 gRPC 客户端
```go
// 高级用户可以直接访问 gRPC 客户端进行自定义操作
// 获取 Operation 服务客户端
opGrpcClient := queryClient.GetLowLevelOperationClient()
// 获取 Record 服务客户端
recGrpcClient := queryClient.GetLowLevelRecordClient()
// 注意:多服务器模式下,每次调用会返回轮询的下一个客户端
```
---
### 3. Persistence 使用(数据库持久化)⭐ 新增
**Persistence 模块**提供完整的数据库持久化支持,实现 **Cursor + Retry 双层架构**,保证异步最终一致性。
#### 3.1 快速开始
```go
import (
"context"
"time"
"go.yandata.net/iod/iod/go-trustlog/api/persistence"
"go.yandata.net/iod/iod/go-trustlog/api/adapter"
"go.yandata.net/iod/iod/go-trustlog/api/model"
"go.yandata.net/iod/iod/go-trustlog/api/logger"
"github.com/go-logr/logr"
)
func main() {
ctx := context.Background()
// 1. 创建 Logger
myLogger := logger.NewLogger(logr.Discard())
// 2. 创建 Pulsar Publisher
publisher, err := adapter.NewPublisher(
adapter.PublisherConfig{
URL: "pulsar://localhost:6650",
},
myLogger,
)
if err != nil {
panic(err)
}
defer publisher.Close()
// 3. 准备 SM2 密钥和 Envelope 配置(用于签名)
privateKeyHex := []byte("私钥D的十六进制字符串")
publicKeyHex := []byte("04 + x坐标 + y坐标的十六进制字符串")
envelopeConfig := model.DefaultEnvelopeConfig(privateKeyHex, publicKeyHex)
// 4. 创建 Persistence Client
client, err := persistence.NewPersistenceClient(ctx, persistence.PersistenceClientConfig{
Publisher: publisher, // Pulsar Publisher
Logger: myLogger,
EnvelopeConfig: envelopeConfig, // ⭐ SM2 签名配置
DBConfig: persistence.DBConfig{
DriverName: "postgres",
DSN: "postgres://user:pass@localhost:5432/trustlog?sslmode=disable",
MaxOpenConns: 20,
MaxIdleConns: 10,
ConnMaxLifetime: time.Hour,
},
PersistenceConfig: persistence.PersistenceConfig{
Strategy: persistence.StrategyDBAndTrustlog, // 既落库又存证
},
// 启用 Cursor 工作器(推荐)
EnableCursorWorker: true,
CursorWorkerConfig: &persistence.CursorWorkerConfig{
ScanInterval: 10 * time.Second, // 10秒扫描一次
BatchSize: 100, // 每批处理100条
MaxRetryAttempt: 1, // Cursor阶段快速失败
},
// 启用 Retry 工作器(必需)
EnableRetryWorker: true,
RetryWorkerConfig: &persistence.RetryWorkerConfig{
RetryInterval: 30 * time.Second, // 30秒重试一次
MaxRetryCount: 5, // 最多重试5次
},
})
if err != nil {
panic(err)
}
defer client.Close()
// 5. 发布操作(立即返回,异步存证)
clientIP := "192.168.1.100"
serverIP := "10.0.0.1"
op := &model.Operation{
OpID: "op-001",
OpType: string(model.OpTypeCreate), // 字符串类型
Doid: "10.1000/repo/obj",
ProducerID: "producer-001",
OpSource: model.OpSourceDOIP,
DoPrefix: "10.1000",
DoRepository: "repo",
OpActor: "user-123",
Timestamp: time.Now(),
ClientIP: &clientIP, // 可空
ServerIP: &serverIP, // 可空
}
if err := client.OperationPublish(ctx, op); err != nil {
panic(err)
}
// ✅ 落库成功CursorWorker 会自动异步存证(带签名)
println("操作已保存,正在异步存证...")
}
```
#### 3.2 三种持久化策略
| 策略 | 说明 | 是否签名 | 适用场景 |
|------|------|---------|----------|
| **StrategyDBOnly** | 仅落库,不存证 | ❌ 不签名 | 历史数据存档、审计日志 |
| **StrategyDBAndTrustlog** | 既落库又存证(异步) | ✅ **签名存证** | ⭐ 生产环境推荐 |
| **StrategyTrustlogOnly** | 仅存证,不落库 | ✅ **签名存证** | 轻量级场景 |
**重要说明**
- 所有存证操作(`StrategyDBAndTrustlog``StrategyTrustlogOnly`)都会使用 `EnvelopeConfig` 进行 **SM2 签名**
- `StrategyDBOnly` 仅保存到数据库,不会进行签名和存证
- 创建 `PersistenceClient` 时**必须**提供 `EnvelopeConfig`(即使是 `StrategyDBOnly` 也建议提供,以便后续切换策略)
#### 3.3 Cursor + Retry 双层架构
```
应用调用
仅落库(立即返回)
CursorWorker第一道防线
├── 增量扫描 operation 表
├── 快速尝试存证(使用 Envelope 签名)✅
├── 成功 → 更新状态
└── 失败 → 加入 retry 表
RetryWorker第二道防线
├── 扫描 retry 表
├── 指数退避重试(使用 Envelope 签名)✅
├── 成功 → 删除 retry 记录
└── 失败 → 标记死信
```
**优势**
- ✅ 充分利用 cursor 游标表作为任务发现队列
- ✅ 双层保障确保最终一致性
- ✅ 性能优秀,扩展性强
- ✅ 监控清晰,易于维护
-**所有存证操作都经过 SM2 签名验证**
#### 3.4 数据库表设计
**operation 表**(必需):
- 存储所有操作记录
- `trustlog_status` 字段标记存证状态
- `client_ip`, `server_ip` 可空字段(仅落库)
**trustlog_cursor 表**(核心):
- Key-Value 模式,支持多游标
- 使用时间戳作为游标值
- 作为任务发现队列
**trustlog_retry 表**(必需):
- 存储失败的重试记录
- 支持指数退避
- 死信队列
#### 3.5 监控和查询
```go
// 查询未存证记录数
var count int
db.QueryRow(`
SELECT COUNT(*)
FROM operation
WHERE trustlog_status = 'NOT_TRUSTLOGGED'
`).Scan(&count)
// 查询重试队列长度
db.QueryRow(`
SELECT COUNT(*)
FROM trustlog_retry
WHERE retry_status IN ('PENDING', 'RETRYING')
`).Scan(&count)
// 查询死信记录
rows, _ := db.Query(`
SELECT op_id, retry_count, error_message
FROM trustlog_retry
WHERE retry_status = 'DEAD_LETTER'
`)
```
#### 3.6 详细文档
- 📘 [Persistence 完整文档](api/persistence/README.md)
- 🚀 [快速开始指南](PERSISTENCE_QUICKSTART.md)
- 🏗️ [架构设计文档](api/persistence/ARCHITECTURE_V2.md)
- 💾 [SQL 脚本说明](api/persistence/sql/README.md)
---
### 4. Subscriber 使用(消息订阅)
> **注意**:通常业务代码不需要直接使用 Subscriber除非需要原始的 Watermill 消息处理。
```go
import (
"context"
"go.yandata.net/iod/iod/go-trustlog/api/adapter"
"go.yandata.net/iod/iod/go-trustlog/api/model"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/bytedance/sonic"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/go-logr/logr"
"go.yandata.net/iod/iod/go-trustlog/api/logger"
)
// 创建 Logger
myLogger := logger.NewLogger(logr.Discard())
// 创建订阅者
sub, err := adapter.NewSubscriber(
adapter.SubscriberConfig{
URL: "pulsar://localhost:6650",
SubscriberType: pulsar.KeyShared, // 必须使用 KeyShared 模式
},
myLogger,
)
if err != nil {
panic(err)
}
defer sub.Close()
// 订阅消息context 必须携带 key 为 "subName" 的 value
ctx := context.WithValue(context.Background(), "subName", "my-subscriber")
msgChan, err := sub.Subscribe(ctx, adapter.OperationTopic) // 或者 adapter.RecordTopic
if err != nil {
panic(err)
}
// 处理消息
for msg := range msgChan {
var op model.Operation
if err := sonic.Unmarshal(msg.Payload, &op); err != nil {
myLogger.ErrorContext(ctx, "Invalid Operation message", "error", err)
msg.Nack()
continue
}
// 处理业务逻辑
myLogger.InfoContext(ctx, "Received Operation", "key", op.Key())
// 根据业务成功与否 ack / nack
msg.Ack()
}
```
---
### 5. Forwarder 事务性发布SQL持久化
使用 Watermill Forwarder 可以将消息先持久化到 SQL 数据库,然后异步发送到 Pulsar保证消息的事务性和可靠性。
这在需要确保消息不丢失的场景下非常有用。
```go
import (
"database/sql"
"github.com/ThreeDotsLabs/watermill/components/forwarder"
"github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql"
"github.com/go-logr/logr"
"go.yandata.net/iod/iod/go-trustlog/api/adapter"
"go.yandata.net/iod/iod/go-trustlog/api/highclient"
"go.yandata.net/iod/iod/go-trustlog/api/logger"
)
// 0. 创建 Logger
myLogger := logger.NewLogger(logr.Discard())
// 1. 创建 SQL Publisher用于持久化
db, err := sql.Open("postgres", "postgres://user:pass@localhost/db")
if err != nil {
panic(err)
}
sqlPublisher, err := watermillsql.NewPublisher(
db,
watermillsql.PublisherConfig{
SchemaAdapter: watermillsql.DefaultPostgreSQLSchema{},
},
myLogger,
)
if err != nil {
panic(err)
}
// 2. 创建 Pulsar Publisher实际发送
pulsarPublisher, err := adapter.NewPublisher(
adapter.PublisherConfig{URL: "pulsar://localhost:6650"},
myLogger,
)
if err != nil {
panic(err)
}
// 3. 创建 ForwarderSQL -> Pulsar
// 消息先写入 SQL事务提交后异步转发到 Pulsar
fwd, err := forwarder.NewForwarder(sqlPublisher, pulsarPublisher)
if err != nil {
panic(err)
}
// 4. 使用 Forwarder 创建客户端
// 发布的消息会先存储到 SQL保证事务性
client := highclient.NewClient(fwd, myLogger)
defer client.Close()
// 5. 在数据库事务中发布消息
tx, _ := db.Begin()
// ... 执行业务数据库操作 ...
// 发布 Operation会在同一个事务中写入
_ = client.OperationPublish(op)
// 提交事务(业务数据和消息同时提交)
tx.Commit()
```
> **优势**
> - ✅ 消息与业务数据在同一事务中,保证强一致性
> - ✅ 即使 Pulsar 暂时不可用,消息也不会丢失
> - ✅ Forwarder 会自动重试发送失败的消息
---
## 🎨 完整示例
### 发布 + 查询 + 验证完整流程
```go
package main
import (
"context"
"fmt"
"time"
"github.com/go-logr/logr"
"go.yandata.net/iod/iod/go-trustlog/api/adapter"
"go.yandata.net/iod/iod/go-trustlog/api/highclient"
"go.yandata.net/iod/iod/go-trustlog/api/logger"
"go.yandata.net/iod/iod/go-trustlog/api/queryclient"
"go.yandata.net/iod/iod/go-trustlog/api/model"
)
func main() {
ctx := context.Background()
// 0. 创建 Logger
myLogger := logger.NewLogger(logr.Discard())
// 1. 创建并发送 Operation
pub, _ := adapter.NewPublisher(
adapter.PublisherConfig{URL: "pulsar://localhost:6650"},
myLogger,
)
defer pub.Close()
// 准备SM2密钥
privateKeyHex := []byte("私钥D的十六进制字符串")
publicKeyHex := []byte("04 + x坐标 + y坐标的十六进制字符串")
envelopeConfig := model.DefaultEnvelopeConfig(privateKeyHex, publicKeyHex)
client := highclient.NewClient(pub, myLogger, envelopeConfig)
defer client.Close()
dataID := model.DataID{
DoPrefix: "10.1000",
DoRepository: "test-repo",
Doid: "10.1000/test-repo/doc001",
}
op, _ := model.NewFullOperation(
model.OpSourceDOIP,
string(model.OpTypeCreate), // 字符串类型
"10.1000", // doPrefix
"test-repo", // doRepository
"10.1000/test-repo/doc001", // doid
"producer-001", // producerID
"admin", // opActor
[]byte(`{"action":"create"}`), // requestBody
[]byte(`{"status":"success"}`), // responseBody
time.Now(), // timestamp
)
_ = client.OperationPublish(op)
fmt.Printf("Published operation: %s\n", op.Meta.OpID)
// 等待一段时间让消息被处理
time.Sleep(2 * time.Second)
// 2. 查询操作列表
queryClient, _ := queryclient.NewClient(
queryclient.ClientConfig{ServerAddr: "localhost:50051"},
myLogger,
)
defer queryClient.Close()
listResp, _ := queryClient.ListOperations(ctx, queryclient.ListRequest{
PageSize: 10,
DoRepository: "test-repo",
})
fmt.Printf("Found %d operations\n", listResp.Count)
// 3. 执行取证验证
if len(listResp.Data) > 0 {
firstOp := listResp.Data[0]
validationReq := queryclient.ValidateRequest{
Timestamp: firstOp.Timestamp,
OpID: firstOp.OpID,
OpType: firstOp.OpType, // 已经是字符串
DoRepository: firstOp.DoRepository,
}
result, _ := queryClient.ValidateOperationSync(ctx, validationReq, nil)
if result.IsCompleted() {
fmt.Println("✅ Validation passed!")
} else {
fmt.Printf("❌ Validation failed: %s\n", result.Msg)
}
}
}
```
---
## 📚 操作类型枚举
### DOIP 操作类型7种
```go
// 使用时需要转换为字符串
string(model.OpTypeHello) // "Hello"
string(model.OpTypeRetrieve) // "Retrieve"
string(model.OpTypeCreate) // "Create"
string(model.OpTypeDelete) // "Delete"
string(model.OpTypeUpdate) // "Update"
string(model.OpTypeSearch) // "Search"
string(model.OpTypeListOperations) // "ListOperations"
```
### IRP 操作类型33种
```go
// 使用时需要转换为字符串,例如:
string(model.OpTypeOCCreateHandle) // "OC_CREATE_HANDLE"
string(model.OpTypeOCDeleteHandle) // "OC_DELETE_HANDLE"
// Handle 基础操作
model.OpTypeOCReserved, model.OpTypeOCResolution, model.OpTypeOCGetSiteInfo
model.OpTypeOCCreateHandle, model.OpTypeOCDeleteHandle, model.OpTypeOCAddValue
model.OpTypeOCRemoveValue, model.OpTypeOCModifyValue, model.OpTypeOCListHandle
model.OpTypeOCListNA
// DOID 操作
model.OpTypeOCResolutionDOID, model.OpTypeOCCreateDOID, model.OpTypeOCDeleteDOID
model.OpTypeOCUpdateDOID, model.OpTypeOCBatchCreateDOID, model.OpTypeOCResolutionDOIDRecursive
// 用户与仓库
model.OpTypeOCGetUsers, model.OpTypeOCGetRepos
// GRS/IRS 管理
model.OpTypeOCVerifyIRS, model.OpTypeOCResolveGRS, model.OpTypeOCCreateOrgGRS
model.OpTypeOCUpdateOrgGRS, model.OpTypeOCDeleteOrgGRS, model.OpTypeOCSyncOrgIRSParent
model.OpTypeOCUpdateOrgIRSParent, model.OpTypeOCDeleteOrgIRSParent
// 安全与会话
model.OpTypeOCChallengeResponse, model.OpTypeOCVerifyChallenge, model.OpTypeOCSessionSetup
model.OpTypeOCSessionTerminate, model.OpTypeOCSessionExchangeKey, model.OpTypeOCVerifyRouter
model.OpTypeOCQueryRouter
```
---
## ⚠️ 注意事项
1. **私有仓库配置**
必须先配置 Git SSH 映射和 GOPRIVATE 环境变量,否则无法正常安装 SDK。
2. **日志接口**
SDK 使用 [logr](https://github.com/go-logr/logr) 作为日志接口。你需要:
- 创建一个 `logr.Logger` 实例(可以使用 zap、logrus 等实现)
- 通过 `logger.NewLogger(logrLogger)` 包装成 SDK 的 Logger 接口
- 在生产环境建议使用 `zapr``logrusr` 等实现,测试环境可以使用 `logr.Discard()`
3. **HighClient 方法名**
- 发送 Operation 使用 `client.OperationPublish(op)`,参数为指针类型 `*model.Operation`
- 发送 Record 使用 `client.RecordPublish(record)`,参数为指针类型 `*model.Record`
4. **固定主题**
- Operation 主题:`persistent://public/default/operation`
- Record 主题:`persistent://public/default/record`
5. **KeyShared 消费模式**
由于 Trustlog 使用 Key Shared 消费模式,其他订阅者必须选择 KeyShared 并避免消费者重名。
6. **ack/nack 必须处理**
确保订阅方根据业务逻辑确认或拒绝消息。
7. **时间戳处理**
`NewFullOperation()` 接受 `time.Time` 类型的时间戳参数。
8. **统一连接池**
QueryClient 使用单一连接池同时支持 Operation 和 Record 两种服务,共享 gRPC 连接资源,提高资源利用率。
9. **负载均衡**
支持多服务器轮询负载均衡,自动分发请求到不同服务器,连接在两种服务间共享。
10. **流式验证**
取证验证Operation 和 Record都支持流式和同步两种模式流式模式可实时获取进度。
11. **事务性发布**
使用 Watermill Forwarder 可以将消息持久化到 SQL与业务数据在同一事务中提交保证强一致性。
12. **Record 支持**
除了 OperationSDK 现在也支持 Record 类型的发布、查询和验证,两种服务使用同一个 QueryClient。
13. **数据库持久化** ⭐ 新增
完整的数据库持久化支持Cursor + Retry 双层架构,保证异步最终一致性,支持 PostgreSQL、MySQL、SQLite。
---
## 🔄 架构图
### 直接发布架构
```
[业务服务]
[HighClient.Publish()]
[Pulsar Publisher] --(Operation JSON)--> [Pulsar Topic]
[Subscriber]
[其他服务]
```
### 事务性发布架构(使用 Forwarder
```
[业务服务 + DB事务]
[HighClient.Publish()]
[SQL Publisher] --写入--> [PostgreSQL/MySQL]
↓ ↓
[Forwarder 后台轮询] |
↓ |
[读取未发送消息] <--------------┘
[Pulsar Publisher] --(Operation JSON)--> [Pulsar Topic]
↓ ↓
[标记为已发送] [Subscriber]
[其他服务]
```
### 查询架构(统一连接池)
```
[业务服务]
[QueryClient - 单一连接池]
├─ Operation 服务客户端 ─┐
└─ Record 服务客户端 ────┤
↓ (共享 gRPC 连接,轮询负载均衡)
[Server 1] ─┐
[Server 2] ─┼─ 多服务器
[Server 3] ─┘
[存储层]
优势:
- 单一连接池,资源高效利用
- Operation 和 Record 服务共享连接
- 自动负载均衡,请求分发到不同服务器
- 减少连接数,降低服务器压力
```
### 持久化架构Cursor + Retry 双层模式)⭐ 新增
```
[应用调用 OperationPublish()]
[保存到 operation 表状态NOT_TRUSTLOGGED]
[立即返回成功]
[异步处理开始]
[CursorWorker每10秒]
├── 增量扫描 operation 表
├── 尝试发送到存证系统Envelope 签名)✅
├── 成功 → 更新状态为 TRUSTLOGGED
└── 失败 → 加入 trustlog_retry 表
[RetryWorker每30秒]
├── 扫描 trustlog_retry 表
├── 指数退避重试Envelope 签名)✅
├── 成功 → 删除 retry 记录
└── 失败 → 标记为 DEAD_LETTER
优势:
- ✅ 充分利用 cursor 游标表作为任务发现队列
- ✅ 双层保障确保最终一致性
- ✅ 性能优秀(增量扫描 + 索引查询)
- ✅ 易于监控和运维
- ✅ 所有存证操作都经过 SM2 签名验证
```
---
## 📚 相关文档
### 核心文档
- 📘 [Persistence 完整文档](api/persistence/README.md) - 数据库持久化详细说明
- 🚀 [快速开始指南](PERSISTENCE_QUICKSTART.md) - 5分钟上手教程
- 🏗️ [架构设计文档](api/persistence/ARCHITECTURE_V2.md) - Cursor + Retry 双层架构
- 💾 [SQL 脚本说明](api/persistence/sql/README.md) - 数据库脚本文档
- ✅ [修复记录](FIXES_COMPLETED.md) - 问题修复历史
### 测试状态
-**49/49** 单元测试通过
- ✅ 代码覆盖率: **28.5%**
- ✅ 支持数据库: PostgreSQL, MySQL, SQLite
---
## 📝 版本信息
- **当前版本**: v2.1.0
- **Go 版本要求**: 1.21+
- **最后更新**: 2025-12-23
---