Files
go-trustlog/api/persistence/schema.go
ryan fb182adef4 feat: OpType重构为OpCode (int32) - 完整实现
🎯 核心变更:
- OpType (string) → OpCode (int32)
- 20+ OpCode枚举常量 (基于DOIP/IRP标准)
- 类型安全 + 性能优化

📊 影响范围:
- 核心模型: Operation结构体、CBOR序列化
- 数据库: schema.go + SQL DDL (PostgreSQL/MySQL/SQLite)
- 持久化: repository.go查询、cursor_worker.go
- API接口: Protobuf定义 + gRPC客户端
- 测试代码: 60+ 测试文件更新

 测试结果:
- 通过率: 100% (所有87个测试用例)
- 总体覆盖率: 53.7%
- 核心包覆盖率: logger(100%), highclient(95.3%), model(79.1%)

📝 文档:
- 精简README (1056行→489行,减少54%)
- 完整的OpCode枚举说明
- 三种持久化策略示例
- 数据库表结构和架构图

🔧 技术细节:
- 类型转换: string(OpCode) → int32(OpCode)
- SQL参数: 字符串值 → 整数值
- Protobuf: op_type string → op_code int32
- 测试断言: 字符串比较 → 常量比较

🎉 质量保证:
- 零编译错误
- 100%测试通过
- PostgreSQL/Pulsar集成测试验证
- 分布式并发安全测试通过
2025-12-26 13:47:55 +08:00

267 lines
8.7 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package persistence
// TrustlogStatus 存证状态枚举
type TrustlogStatus string
const (
// StatusNotTrustlogged 未存证
StatusNotTrustlogged TrustlogStatus = "NOT_TRUSTLOGGED"
// StatusTrustlogged 已存证
StatusTrustlogged TrustlogStatus = "TRUSTLOGGED"
)
// RetryStatus 重试状态枚举
type RetryStatus string
const (
// RetryStatusPending 待重试
RetryStatusPending RetryStatus = "PENDING"
// RetryStatusRetrying 重试中
RetryStatusRetrying RetryStatus = "RETRYING"
// RetryStatusDeadLetter 死信(超过最大重试次数)
RetryStatusDeadLetter RetryStatus = "DEAD_LETTER"
)
// SQL DDL 语句 - 使用通用 SQL 标准,避免方言
// OperationTableDDL 操作记录表的 DDL通用 SQL
const OperationTableDDL = `
CREATE TABLE IF NOT EXISTS operation (
op_id VARCHAR(32) NOT NULL PRIMARY KEY,
op_actor VARCHAR(64),
doid VARCHAR(512),
producer_id VARCHAR(32),
request_body_hash VARCHAR(128),
response_body_hash VARCHAR(128),
sign VARCHAR(512),
op_source VARCHAR(10),
op_code INTEGER,
do_prefix VARCHAR(128),
do_repository VARCHAR(64),
client_ip VARCHAR(32),
server_ip VARCHAR(32),
trustlog_status VARCHAR(32),
timestamp TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_operation_timestamp ON operation(timestamp);
CREATE INDEX IF NOT EXISTS idx_operation_status ON operation(trustlog_status);
CREATE INDEX IF NOT EXISTS idx_operation_doid ON operation(doid);
`
// CursorTableDDL 游标表的 DDL用于跟踪已处理的操作
const CursorTableDDL = `
CREATE TABLE IF NOT EXISTS trustlog_cursor (
cursor_key VARCHAR(64) NOT NULL PRIMARY KEY,
cursor_value VARCHAR(128) NOT NULL,
last_updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_cursor_updated_at ON trustlog_cursor(last_updated_at);
`
// RetryTableDDL 重试表的 DDL
const RetryTableDDL = `
CREATE TABLE IF NOT EXISTS trustlog_retry (
op_id VARCHAR(32) NOT NULL PRIMARY KEY,
retry_count INTEGER DEFAULT 0,
retry_status VARCHAR(32) DEFAULT 'PENDING',
last_retry_at TIMESTAMP,
next_retry_at TIMESTAMP,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_retry_status ON trustlog_retry(retry_status);
CREATE INDEX IF NOT EXISTS idx_retry_next_retry_at ON trustlog_retry(next_retry_at);
`
// GetDialectDDL 根据数据库类型返回适配的 DDL
// 这个函数处理不同数据库的差异,但尽量使用通用 SQL
func GetDialectDDL(driverName string) (string, string, string, error) {
switch driverName {
case "postgres":
return getPostgresDDL(), getCursorDDLPostgres(), getRetryDDLPostgres(), nil
case "mysql":
return getMySQLDDL(), getCursorDDLMySQL(), getRetryDDLMySQL(), nil
case "sqlite3", "sqlite":
return getSQLiteDDL(), getCursorDDLSQLite(), getRetryDDLSQLite(), nil
default:
// 默认使用通用 SQL
return OperationTableDDL, CursorTableDDL, RetryTableDDL, nil
}
}
// PostgreSQL 特定 DDL
func getPostgresDDL() string {
return `
CREATE TABLE IF NOT EXISTS operation (
op_id VARCHAR(32) NOT NULL PRIMARY KEY,
op_actor VARCHAR(64),
doid VARCHAR(512),
producer_id VARCHAR(32),
request_body_hash VARCHAR(128),
response_body_hash VARCHAR(128),
sign VARCHAR(512),
op_source VARCHAR(10),
op_code INTEGER,
do_prefix VARCHAR(128),
do_repository VARCHAR(64),
client_ip VARCHAR(32),
server_ip VARCHAR(32),
trustlog_status VARCHAR(32),
timestamp TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_operation_timestamp ON operation(timestamp);
CREATE INDEX IF NOT EXISTS idx_operation_status ON operation(trustlog_status);
CREATE INDEX IF NOT EXISTS idx_operation_doid ON operation(doid);
`
}
func getCursorDDLPostgres() string {
return `
CREATE TABLE IF NOT EXISTS trustlog_cursor (
cursor_key VARCHAR(64) NOT NULL PRIMARY KEY,
cursor_value VARCHAR(128) NOT NULL,
last_updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_cursor_updated_at ON trustlog_cursor(last_updated_at);
`
}
func getRetryDDLPostgres() string {
return `
CREATE TABLE IF NOT EXISTS trustlog_retry (
op_id VARCHAR(32) NOT NULL PRIMARY KEY,
retry_count INTEGER DEFAULT 0,
retry_status VARCHAR(32) DEFAULT 'PENDING',
last_retry_at TIMESTAMP,
next_retry_at TIMESTAMP,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_retry_status ON trustlog_retry(retry_status);
CREATE INDEX IF NOT EXISTS idx_retry_next_retry_at ON trustlog_retry(next_retry_at);
`
}
// MySQL 特定 DDL
func getMySQLDDL() string {
return `
CREATE TABLE IF NOT EXISTS operation (
op_id VARCHAR(32) NOT NULL PRIMARY KEY,
op_actor VARCHAR(64),
doid VARCHAR(512),
producer_id VARCHAR(32),
request_body_hash VARCHAR(128),
response_body_hash VARCHAR(128),
sign VARCHAR(512),
op_source VARCHAR(10),
op_code INT,
do_prefix VARCHAR(128),
do_repository VARCHAR(64),
client_ip VARCHAR(32),
server_ip VARCHAR(32),
trustlog_status VARCHAR(32),
timestamp DATETIME,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_operation_timestamp (timestamp),
INDEX idx_operation_status (trustlog_status),
INDEX idx_operation_doid (doid(255))
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
`
}
func getCursorDDLMySQL() string {
return `
CREATE TABLE IF NOT EXISTS trustlog_cursor (
cursor_key VARCHAR(64) NOT NULL PRIMARY KEY,
cursor_value VARCHAR(128) NOT NULL,
last_updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_cursor_updated_at (last_updated_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
`
}
func getRetryDDLMySQL() string {
return `
CREATE TABLE IF NOT EXISTS trustlog_retry (
op_id VARCHAR(32) NOT NULL PRIMARY KEY,
retry_count INT DEFAULT 0,
retry_status VARCHAR(32) DEFAULT 'PENDING',
last_retry_at DATETIME,
next_retry_at DATETIME,
error_message TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_retry_status (retry_status),
INDEX idx_retry_next_retry_at (next_retry_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
`
}
// SQLite 特定 DDL
func getSQLiteDDL() string {
return `
CREATE TABLE IF NOT EXISTS operation (
op_id TEXT NOT NULL PRIMARY KEY,
op_actor TEXT,
doid TEXT,
producer_id TEXT,
request_body_hash TEXT,
response_body_hash TEXT,
sign TEXT,
op_source TEXT,
op_code INTEGER,
do_prefix TEXT,
do_repository TEXT,
client_ip TEXT,
server_ip TEXT,
trustlog_status TEXT,
timestamp DATETIME,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_operation_timestamp ON operation(timestamp);
CREATE INDEX IF NOT EXISTS idx_operation_status ON operation(trustlog_status);
CREATE INDEX IF NOT EXISTS idx_operation_doid ON operation(doid);
`
}
func getCursorDDLSQLite() string {
return `
CREATE TABLE IF NOT EXISTS trustlog_cursor (
cursor_key TEXT NOT NULL PRIMARY KEY,
cursor_value TEXT NOT NULL,
last_updated_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_cursor_updated_at ON trustlog_cursor(last_updated_at);
`
}
func getRetryDDLSQLite() string {
return `
CREATE TABLE IF NOT EXISTS trustlog_retry (
op_id TEXT NOT NULL PRIMARY KEY,
retry_count INTEGER DEFAULT 0,
retry_status TEXT DEFAULT 'PENDING',
last_retry_at DATETIME,
next_retry_at DATETIME,
error_message TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_retry_status ON trustlog_retry(retry_status);
CREATE INDEX IF NOT EXISTS idx_retry_next_retry_at ON trustlog_retry(next_retry_at);
`
}