- go.yandata.net/iod/iod/go-trustlog → go.yandata.net/wangsiyuan/go-trustlog - 更新 go.mod module声明 - 更新 README.md 安装说明 - 批量更新所有 .go 文件中的 import 路径 - 61个文件受影响 这样go-trustlog可以作为独立SDK使用
Go-Trustlog SDK
基于 Watermill 和 gRPC 的可信日志SDK,提供操作记录发布、查询验证和数据库持久化功能。
✨ 核心特性
📦 双数据模型
- Operation(操作记录):完整业务操作,支持取证验证
- Record(简单记录):轻量级事件日志
💾 数据库持久化
- 三种策略:仅落库、既落库又存证、仅存证
- 异步机制:Cursor + Retry 双层架构保证最终一致性
- 多数据库:PostgreSQL、MySQL、SQLite
- 可靠重试:指数退避 + 死信队列
🔄 消息发布
- 直接发布:Pulsar Publisher 实时发送
- 事务发布:Watermill Forwarder 保证事务性
🔍 查询验证
- 统一客户端:单一连接池支持 Operation 和 Record 查询
- 流式验证:实时获取取证进度
- 负载均衡:多服务器轮询
🚀 快速开始
安装
# 配置私有仓库(如果使用私有Git)
git config --global url."git@gitea.internetapi.cn:".insteadOf "https://gitea.internetapi.cn"
go env -w GOPRIVATE="go.yandata.net"
# 安装SDK
go get go.yandata.net/wangsiyuan/go-trustlog
1. 基本使用 - 发布操作
package main
import (
"context"
"time"
"go.yandata.net/wangsiyuan/go-trustlog/api/highclient"
"go.yandata.net/wangsiyuan/go-trustlog/api/model"
)
func main() {
// 创建客户端
client, err := highclient.NewHighClient(highclient.HighClientConfig{
PulsarURL: "pulsar://localhost:6650",
})
if err != nil {
panic(err)
}
defer client.Close()
// 创建操作记录
op, err := model.NewFullOperation(
model.OpSourceDOIP, // 操作来源
model.OpCodeCreateID, // 操作代码: 100 (创建标识符)
"10.1000", // DO前缀
"my-repo", // 仓库名
"10.1000/my-repo/object-001", // 完整DOID
"producer-001", // 生产者ID
"user@example.com", // 操作者
map[string]string{"action": "create"}, // 请求体
map[string]string{"status": "ok"}, // 响应体
time.Now(),
)
if err != nil {
panic(err)
}
// 发布操作
ctx := context.Background()
if err := client.OperationPublish(ctx, op); err != nil {
panic(err)
}
}
2. 数据库持久化 - 三种策略
import (
"database/sql"
"go.yandata.net/wangsiyuan/go-trustlog/api/persistence"
_ "github.com/lib/pq"
)
func main() {
// 连接数据库
db, err := sql.Open("postgres",
"host=localhost port=5432 user=postgres password=postgres dbname=trustlog sslmode=disable")
if err != nil {
panic(err)
}
defer db.Close()
// 初始化表结构
if err := persistence.InitDB(db, "postgres"); err != nil {
panic(err)
}
// 策略1: 仅落库(不存证)
clientDBOnly, err := persistence.NewPersistenceClient(persistence.PersistenceConfig{
Strategy: persistence.StrategyDBOnly,
DB: db,
}, log)
// 策略2: 既落库又存证(推荐)
clientBoth, err := persistence.NewPersistenceClient(persistence.PersistenceConfig{
Strategy: persistence.StrategyDBAndTrustlog,
DB: db,
PulsarURL: "pulsar://localhost:6650",
}, log)
// 策略3: 仅存证(不落库)
clientTrustlogOnly, err := persistence.NewPersistenceClient(persistence.PersistenceConfig{
Strategy: persistence.StrategyTrustlogOnly,
PulsarURL: "pulsar://localhost:6650",
}, log)
// 保存操作(自动根据策略处理)
if err := clientBoth.SaveOperation(context.Background(), op); err != nil {
panic(err)
}
}
3. 查询操作
import (
"go.yandata.net/wangsiyuan/go-trustlog/api/queryclient"
)
func main() {
// 创建查询客户端
queryClient, err := queryclient.NewQueryClient(queryclient.ClientConfig{
Servers: []string{"localhost:9090"},
})
if err != nil {
panic(err)
}
defer queryClient.Close()
// 列表查询
resp, err := queryClient.ListOperations(context.Background(), queryclient.ListOperationsRequest{
OpSource: model.OpSourceDOIP,
OpCode: model.OpCodeCreateID,
PageSize: 10,
PageNumber: 1,
})
if err != nil {
panic(err)
}
for _, op := range resp.Data {
fmt.Printf("Operation: %s, DOID: %s\n", op.OpId, op.Doid)
}
// 验证操作
validResp, err := queryClient.ValidateOperation(context.Background(), queryclient.ValidationRequest{
OpID: "op-001",
OpSource: model.OpSourceDOIP,
OpCode: model.OpCodeCreateID,
DoRepository: "my-repo",
})
if err != nil {
panic(err)
}
fmt.Printf("Valid: %v\n", validResp.Valid)
}
4. 数据库查询(持久化后)
import "go.yandata.net/wangsiyuan/go-trustlog/api/persistence"
func main() {
repo := persistence.NewOperationRepository(db, log)
// 按条件查询
result, err := repo.Query(context.Background(), &persistence.OperationQueryRequest{
OpSource: &opSource,
OpCode: &opCode,
TrustlogStatus: &status,
StartTime: &startTime,
EndTime: &endTime,
PageSize: 10,
PageNumber: 1,
SortBy: "timestamp",
SortOrder: "DESC",
})
fmt.Printf("Total: %d operations\n", result.Total)
for i, op := range result.Operations {
fmt.Printf("[%d] %s - Status: %s\n", i+1, op.OpID, result.Statuses[i])
}
}
📊 操作代码 (OpCode)
OpCode 使用 int32 类型,基于 DOIP/IRP 协议标准:
| OpCode | 名称 | 说明 |
|---|---|---|
| 0 | OpCodeReserved | 保留 |
| 1 | OpCodeResolution | 标识符查询 |
| 2 | OpCodeGetSiteInfo | 获取HS_SITE元素 |
| 100 | OpCodeCreateID | 创建新标识符 |
| 101 | OpCodeDeleteID | 删除标识符 |
| 102 | OpCodeAddElement | 添加元素 |
| 103 | OpCodeRemoveElement | 删除元素 |
| 104 | OpCodeModifyElement | 修改元素 |
| 105 | OpCodeListIDs | 列出标识符 |
| 106 | OpCodeListDerivedPrefixes | 列出派生前缀 |
| 200 | OpCodeChallengeResponse | 挑战响应 |
| 201 | OpCodeVerifyResponse | 验证挑战响应 |
| 300 | OpCodeHomePrefix | Home prefix |
| 301 | OpCodeUnhomePrefix | Unhome prefix |
| 302 | OpCodeListHomedPrefixes | 列出homed前缀 |
| 400 | OpCodeSessionSetup | 会话建立 |
| 401 | OpCodeSessionTerminate | 会话终止 |
| 500 | OpCodeQueryIDs | 查询DOIDs |
| 501 | OpCodeRenameID | 重命名DOID |
| 502 | OpCodeResolveAltID | 解析替代标识符 |
| 503 | OpCodeRegisterAltID | 注册替代标识符 |
使用示例:
op, err := model.NewFullOperation(
model.OpSourceDOIP,
model.OpCodeCreateID, // 使用常量,类型安全
// ... 其他参数
)
🏗️ 架构设计
持久化策略架构
┌─────────────────────────────────────────────────────────────┐
│ Application │
└───────────────────┬─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ PersistenceClient │
│ (根据策略选择:DB Only / DB+Trustlog / Trustlog Only) │
└───────────┬──────────────────────────┬──────────────────────┘
│ │
▼ ▼
┌───────────────────────┐ ┌───────────────────────────┐
│ Database (SQL) │ │ Pulsar Publisher │
│ ┌─────────────────┐ │ │ (operation topic) │
│ │ operation │ │ └───────────────────────────┘
│ │ (主表) │ │
│ └─────────────────┘ │
│ ┌─────────────────┐ │
│ │ trustlog_cursor │ │ ← Cursor Worker 扫描
│ │ (游标表) │ │ (异步存证未完成记录)
│ └─────────────────┘ │
│ ┌─────────────────┐ │
│ │ trustlog_retry │ │ ← Retry Worker 重试
│ │ (重试表) │ │ (失败记录指数退避)
│ └─────────────────┘ │
└───────────────────────┘
数据库表结构
operation 表 (主表)
CREATE TABLE operation (
op_id VARCHAR(32) PRIMARY KEY,
op_actor VARCHAR(64),
doid VARCHAR(512),
producer_id VARCHAR(32),
request_body_hash VARCHAR(128),
response_body_hash VARCHAR(128),
sign VARCHAR(512),
op_source VARCHAR(10),
op_code INTEGER, -- 操作代码
do_prefix VARCHAR(128),
do_repository VARCHAR(64),
client_ip VARCHAR(32), -- 客户端IP(仅落库)
server_ip VARCHAR(32), -- 服务端IP(仅落库)
trustlog_status VARCHAR(32), -- NOT_TRUSTLOGGED / TRUSTLOGGED
timestamp TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
trustlog_cursor 表 (游标表)
CREATE TABLE trustlog_cursor (
cursor_key VARCHAR(64) PRIMARY KEY,
cursor_value VARCHAR(128),
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
trustlog_retry 表 (重试表)
CREATE TABLE trustlog_retry (
op_id VARCHAR(32) PRIMARY KEY,
retry_count INTEGER DEFAULT 0,
retry_status VARCHAR(32), -- PENDING / RETRYING / DEAD_LETTER
next_retry_at TIMESTAMP,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
🔧 配置说明
PersistenceConfig
type PersistenceConfig struct {
Strategy PersistenceStrategy // 持久化策略
DB *sql.DB // 数据库连接
PulsarURL string // Pulsar地址
// Cursor Worker配置(可选)
CursorWorkerInterval time.Duration // 扫描间隔,默认10s
CursorBatchSize int // 批处理大小,默认100
// Retry Worker配置(可选)
RetryWorkerInterval time.Duration // 重试间隔,默认30s
RetryMaxAttempts int // 最大重试次数,默认5
RetryBaseDelay time.Duration // 基础延迟,默认1分钟
}
策略选择指南
| 策略 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| StrategyDBOnly | 内部审计、快速查询 | 高性能、本地存储 | 无区块链保障 |
| StrategyDBAndTrustlog | 生产环境(推荐) | 可查询 + 可验证 | 需要数据库维护 |
| StrategyTrustlogOnly | 纯粹存证、简单场景 | 无状态、易部署 | 查询需通过gRPC |
⚙️ 高级特性
1. 集群安全(并发控制)
使用 PostgreSQL 的 SELECT FOR UPDATE SKIP LOCKED 确保多实例场景下的并发安全:
// Cursor Worker 自动处理集群并发
// 多个实例同时扫表时,自动跳过已锁定记录
2. IP字段记录
// IP字段仅存储在数据库中,不参与存证哈希计算
op.ClientIP = &clientIP // 可选
op.ServerIP = &serverIP // 可选
3. 自定义签名
import "go.yandata.net/wangsiyuan/go-trustlog/api/sm2signer"
// 使用SM2算法签名
signer := sm2signer.NewSM2Signer(privateKeyBytes)
if err := client.OperationPublishWithSigner(ctx, op, signer); err != nil {
panic(err)
}
4. 异步发布
// 异步发布(不阻塞)
client.OperationPublishAsync(ctx, op)
📝 注意事项
1. OpCode 类型变更
从 v2.0 开始,OpCode 从 string 改为 int32:
// ❌ 旧版本
op.OpType = "Create"
// ✅ 新版本
op.OpCode = model.OpCodeCreateID // int32: 100
2. 数据库初始化
首次使用前必须初始化表结构:
if err := persistence.InitDB(db, "postgres"); err != nil {
panic(err)
}
3. Cursor Worker 启动
使用 StrategyDBAndTrustlog 策略时,Cursor Worker 自动启动:
client, _ := persistence.NewPersistenceClient(config, log)
defer client.Close() // 自动停止 Worker
4. 事务支持
SaveTx 支持在已有事务中保存:
tx, _ := db.Begin()
repo.SaveTx(ctx, tx, op, persistence.StatusNotTrustlogged)
tx.Commit()
🧪 测试
运行测试套件:
# 单元测试
go test ./api/... -short
# 完整测试(需要PostgreSQL和Pulsar)
go test ./api/... -timeout 5m
# 覆盖率报告
go test ./api/... -coverprofile=coverage.out -coverpkg=./api/...
go tool cover -html=coverage.out
📚 相关资源
Description
Languages
Go
99.4%
Makefile
0.6%