diff --git a/api/persistence/cluster_safety_test.go b/api/persistence/cluster_safety_test.go index 8434a99..41aa381 100644 --- a/api/persistence/cluster_safety_test.go +++ b/api/persistence/cluster_safety_test.go @@ -77,7 +77,7 @@ func TestClusterSafety_MultipleCursorWorkers(t *testing.T) { `, opID, "cluster-tester", fmt.Sprintf("cluster/test/%d", i), "cluster-producer", "req-hash", "resp-hash", "op-hash", "signature", "DOIP", "CREATE", "cluster-test", "cluster-repo", "NOT_TRUSTLOGGED", time.Now()) - + if err != nil { t.Fatalf("Failed to create test data: %v", err) } @@ -88,14 +88,14 @@ func TestClusterSafety_MultipleCursorWorkers(t *testing.T) { workerCount := 3 var clients []*persistence.PersistenceClient var wg sync.WaitGroup - + // 统计变量 var processedCount int64 var duplicateCount int64 for i := 0; i < workerCount; i++ { workerID := i - + // 创建 Pulsar Publisher publisher, err := adapter.NewPublisher(adapter.PublisherConfig{ URL: e2eTestPulsarURL, @@ -138,21 +138,21 @@ func TestClusterSafety_MultipleCursorWorkers(t *testing.T) { } clientConfig := persistence.PersistenceClientConfig{ - Publisher: publisher, - Logger: log, - EnvelopeConfig: envelopeConfig, - DBConfig: dbConfig, - PersistenceConfig: persistenceConfig, - CursorWorkerConfig: cursorConfig, - EnableCursorWorker: true, - RetryWorkerConfig: retryConfig, - EnableRetryWorker: true, + Publisher: publisher, + Logger: log, + EnvelopeConfig: envelopeConfig, + DBConfig: dbConfig, + PersistenceConfig: persistenceConfig, + CursorWorkerConfig: cursorConfig, + EnableCursorWorker: true, + RetryWorkerConfig: retryConfig, + EnableRetryWorker: true, } client, err := persistence.NewPersistenceClient(ctx, clientConfig) require.NoError(t, err, "Failed to create PersistenceClient %d", workerID) clients = append(clients, client) - + t.Logf("✅ Worker %d started", workerID) } @@ -162,23 +162,23 @@ func TestClusterSafety_MultipleCursorWorkers(t *testing.T) { defer wg.Done() ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() - + maxWait := 30 * time.Second startTime := time.Now() - + for { select { case <-ticker.C: var trustloggedCount int db.QueryRow("SELECT COUNT(*) FROM operation WHERE op_id LIKE 'cluster-test-%' AND trustlog_status = 'TRUSTLOGGED'").Scan(&trustloggedCount) - + t.Logf("⏳ Progress: %d/%d operations trustlogged", trustloggedCount, operationCount) - + if trustloggedCount >= operationCount { t.Log("✅ All operations processed") return } - + if time.Since(startTime) > maxWait { t.Log("⚠️ Timeout waiting for processing") return @@ -296,7 +296,7 @@ func TestClusterSafety_ConcurrentStatusUpdate(t *testing.T) { // 使用 CAS 更新状态 opRepo := manager.GetOperationRepo() updated, err := opRepo.UpdateStatusWithCAS(ctx, nil, "concurrent-test", persistence.StatusNotTrustlogged, persistence.StatusTrustlogged) - + if err != nil { t.Logf("Error updating: %v", err) return @@ -334,4 +334,3 @@ func TestClusterSafety_ConcurrentStatusUpdate(t *testing.T) { t.Log("✅ CAS mechanism working correctly - Only one update succeeded") } - diff --git a/api/persistence/cursor_init_verification_test.go b/api/persistence/cursor_init_verification_test.go index 9f7af09..b3a6a1e 100644 --- a/api/persistence/cursor_init_verification_test.go +++ b/api/persistence/cursor_init_verification_test.go @@ -105,15 +105,15 @@ func TestCursorInitialization(t *testing.T) { } clientConfig := persistence.PersistenceClientConfig{ - Publisher: publisher, - Logger: log, - EnvelopeConfig: envelopeConfig, - DBConfig: dbConfig, - PersistenceConfig: persistenceConfig, - CursorWorkerConfig: cursorConfig, - EnableCursorWorker: true, - RetryWorkerConfig: retryConfig, - EnableRetryWorker: true, + Publisher: publisher, + Logger: log, + EnvelopeConfig: envelopeConfig, + DBConfig: dbConfig, + PersistenceConfig: persistenceConfig, + CursorWorkerConfig: cursorConfig, + EnableCursorWorker: true, + RetryWorkerConfig: retryConfig, + EnableRetryWorker: true, } client, err := persistence.NewPersistenceClient(ctx, clientConfig) @@ -204,15 +204,15 @@ func TestCursorInitialization(t *testing.T) { } clientConfig := persistence.PersistenceClientConfig{ - Publisher: publisher, - Logger: log, - EnvelopeConfig: envelopeConfig, - DBConfig: dbConfig, - PersistenceConfig: persistenceConfig, - CursorWorkerConfig: cursorConfig, - EnableCursorWorker: true, - RetryWorkerConfig: retryConfig, - EnableRetryWorker: true, + Publisher: publisher, + Logger: log, + EnvelopeConfig: envelopeConfig, + DBConfig: dbConfig, + PersistenceConfig: persistenceConfig, + CursorWorkerConfig: cursorConfig, + EnableCursorWorker: true, + RetryWorkerConfig: retryConfig, + EnableRetryWorker: true, } t.Log("📌 Creating PersistenceClient...") @@ -226,7 +226,7 @@ func TestCursorInitialization(t *testing.T) { // 立即验证初始 cursor (在 Worker 开始扫描前) // 注意:由于 Worker 可能已经开始处理,我们需要快速读取 time.Sleep(10 * time.Millisecond) // 给一点时间让 InitCursor 完成 - + var initialCursorValue string var updatedAt time.Time err = db.QueryRow("SELECT cursor_value, last_updated_at FROM trustlog_cursor WHERE cursor_key = 'operation_scan'").Scan(&initialCursorValue, &updatedAt) @@ -238,18 +238,18 @@ func TestCursorInitialization(t *testing.T) { // 验证初始 cursor 应该在最早记录之前(或接近) initialCursorTime, err := time.Parse(time.RFC3339Nano, initialCursorValue) require.NoError(t, err) - + var earliestRecordTime time.Time err = db.QueryRow("SELECT MIN(created_at) FROM operation WHERE op_id LIKE 'cursor-init-%'").Scan(&earliestRecordTime) require.NoError(t, err) - + t.Logf(" Earliest record: %v", earliestRecordTime) t.Logf(" Initial cursor time: %v", initialCursorTime) - + // cursor 应该在最早记录之前或相差不超过2秒(考虑 Worker 可能已经开始更新) timeDiff := earliestRecordTime.Sub(initialCursorTime) - require.True(t, timeDiff >= -2*time.Second, - "❌ Cursor (%v) should be before or near earliest record (%v), diff: %v", + require.True(t, timeDiff >= -2*time.Second, + "❌ Cursor (%v) should be before or near earliest record (%v), diff: %v", initialCursorTime, earliestRecordTime, timeDiff) t.Log("✅ Initial cursor position is correct!") @@ -263,7 +263,7 @@ func TestCursorInitialization(t *testing.T) { var finalUpdatedAt time.Time err = db.QueryRow("SELECT cursor_value, last_updated_at FROM trustlog_cursor WHERE cursor_key = 'operation_scan'").Scan(&updatedCursorValue, &finalUpdatedAt) require.NoError(t, err) - + t.Logf("📍 Cursor after processing:") t.Logf(" Value: %s", updatedCursorValue) t.Logf(" Updated: %v", finalUpdatedAt) @@ -273,7 +273,7 @@ func TestCursorInitialization(t *testing.T) { var trustloggedCount int err = db.QueryRow("SELECT COUNT(*) FROM operation WHERE op_id LIKE 'cursor-init-%' AND trustlog_status = 'TRUSTLOGGED'").Scan(&trustloggedCount) require.NoError(t, err) - + t.Logf("📊 Processed records: %d/5", trustloggedCount) require.Equal(t, 5, trustloggedCount, "❌ All 5 records should be processed!") @@ -286,4 +286,3 @@ func TestCursorInitialization(t *testing.T) { t.Log("✅ Cursor initialization verification PASSED") t.Log(strings.Repeat("=", 60)) } - diff --git a/api/persistence/pg_query_integration_test.go b/api/persistence/pg_query_integration_test.go index 8f4044c..5620b86 100644 --- a/api/persistence/pg_query_integration_test.go +++ b/api/persistence/pg_query_integration_test.go @@ -97,17 +97,17 @@ func TestPG_Query_Integration(t *testing.T) { op, err := model.NewFullOperation( model.Source(testOp.opSource), testOp.opType, - testOp.prefix, // doPrefix - testOp.repo, // doRepository - testOp.doid, // doid - testOp.producer, // producerID - testOp.actor, // opActor - nil, // requestBody - nil, // responseBody - testOp.time, // timestamp + testOp.prefix, // doPrefix + testOp.repo, // doRepository + testOp.doid, // doid + testOp.producer, // producerID + testOp.actor, // opActor + nil, // requestBody + nil, // responseBody + testOp.time, // timestamp ) require.NoError(t, err, "Failed to create operation %s", testOp.opID) - + op.OpID = testOp.opID op.ClientIP = testOp.clientIP op.ServerIP = testOp.serverIP @@ -144,7 +144,7 @@ func TestPG_Query_Integration(t *testing.T) { result, err := repo.Query(ctx, req) require.NoError(t, err) assert.GreaterOrEqual(t, result.Total, int64(7)) // 7条DOIP记录 - + for _, op := range result.Operations { assert.Equal(t, "DOIP", string(op.OpSource)) } @@ -163,7 +163,7 @@ func TestPG_Query_Integration(t *testing.T) { result, err := repo.Query(ctx, req) require.NoError(t, err) assert.GreaterOrEqual(t, result.Total, int64(2)) // 2条Create记录 - + for _, op := range result.Operations { assert.Equal(t, "Create", op.OpType) } @@ -182,7 +182,7 @@ func TestPG_Query_Integration(t *testing.T) { result, err := repo.Query(ctx, req) require.NoError(t, err) assert.GreaterOrEqual(t, result.Total, int64(5)) // 5条已存证记录 - + for _, s := range result.Statuses { assert.Equal(t, persistence.StatusTrustlogged, s) } @@ -201,7 +201,7 @@ func TestPG_Query_Integration(t *testing.T) { result, err := repo.Query(ctx, req) require.NoError(t, err) assert.GreaterOrEqual(t, result.Total, int64(3)) // 3条test-001的记录 - + for _, op := range result.Operations { assert.Contains(t, op.Doid, "test-001") } @@ -220,7 +220,7 @@ func TestPG_Query_Integration(t *testing.T) { result, err := repo.Query(ctx, req) require.NoError(t, err) assert.GreaterOrEqual(t, result.Total, int64(3)) // 3条user-1的记录 - + for _, op := range result.Operations { assert.Equal(t, "user-1", op.OpActor) } @@ -239,7 +239,7 @@ func TestPG_Query_Integration(t *testing.T) { result, err := repo.Query(ctx, req) require.NoError(t, err) assert.GreaterOrEqual(t, result.Total, int64(3)) // 3条producer-1的记录 - + for _, op := range result.Operations { assert.Equal(t, "producer-1", op.ProducerID) } @@ -258,7 +258,7 @@ func TestPG_Query_Integration(t *testing.T) { result, err := repo.Query(ctx, req) require.NoError(t, err) assert.GreaterOrEqual(t, result.Total, int64(2)) // 2条192.168.1.10的记录 - + for _, op := range result.Operations { assert.NotNil(t, op.ClientIP) assert.Equal(t, "192.168.1.10", *op.ClientIP) @@ -278,7 +278,7 @@ func TestPG_Query_Integration(t *testing.T) { result, err := repo.Query(ctx, req) require.NoError(t, err) assert.GreaterOrEqual(t, result.Total, int64(3)) // 3条10.0.0.1的记录 - + for _, op := range result.Operations { assert.NotNil(t, op.ServerIP) assert.Equal(t, "10.0.0.1", *op.ServerIP) @@ -301,12 +301,12 @@ func TestPG_Query_Integration(t *testing.T) { require.NoError(t, err) assert.GreaterOrEqual(t, result.Total, int64(3)) // 应该至少有3条记录在这个时间范围 t.Logf("✅ Time range records: %d", result.Total) - + // 验证返回的记录在时间范围内 for i, op := range result.Operations { if !((op.Timestamp.After(timeFrom) || op.Timestamp.Equal(timeFrom)) && (op.Timestamp.Before(timeTo) || op.Timestamp.Equal(timeTo))) { - t.Logf("⚠️ Record %d out of range: timestamp=%v, from=%v, to=%v", + t.Logf("⚠️ Record %d out of range: timestamp=%v, from=%v, to=%v", i, op.Timestamp, timeFrom, timeTo) } } @@ -326,7 +326,7 @@ func TestPG_Query_Integration(t *testing.T) { result, err := repo.Query(ctx, req) require.NoError(t, err) assert.GreaterOrEqual(t, result.Total, int64(3)) // 3条已存证的DOIP记录 - + for i, op := range result.Operations { assert.Equal(t, "DOIP", string(op.OpSource)) assert.Equal(t, persistence.StatusTrustlogged, result.Statuses[i]) @@ -354,12 +354,12 @@ func TestPG_Query_Integration(t *testing.T) { result, err = repo.Query(ctx, req) require.NoError(t, err) assert.LessOrEqual(t, len(result.Operations), 5) - + // 确保第1页和第2页的数据不重复 if len(result.Operations) > 0 { assert.NotEqual(t, firstPageFirst, result.Operations[0].OpID) } - + t.Logf("✅ Pagination works correctly, total pages: %d", result.TotalPages) }) @@ -376,7 +376,7 @@ func TestPG_Query_Integration(t *testing.T) { resultAsc, err := repo.Query(ctx, reqAsc) require.NoError(t, err) assert.GreaterOrEqual(t, len(resultAsc.Operations), 10) - + // 验证升序 for i := 1; i < len(resultAsc.Operations); i++ { assert.True(t, resultAsc.Operations[i].Timestamp.After(resultAsc.Operations[i-1].Timestamp) || @@ -394,7 +394,7 @@ func TestPG_Query_Integration(t *testing.T) { resultDesc, err := repo.Query(ctx, reqDesc) require.NoError(t, err) assert.GreaterOrEqual(t, len(resultDesc.Operations), 10) - + // 验证降序 for i := 1; i < len(resultDesc.Operations); i++ { assert.True(t, resultDesc.Operations[i].Timestamp.Before(resultDesc.Operations[i-1].Timestamp) || @@ -457,7 +457,7 @@ func TestPG_Query_Integration(t *testing.T) { result, err := repo.Query(ctx, req) require.NoError(t, err) assert.GreaterOrEqual(t, result.Total, int64(1)) - + for i, op := range result.Operations { assert.Equal(t, "DOIP", string(op.OpSource)) assert.Equal(t, "Update", op.OpType) @@ -530,19 +530,19 @@ func TestPG_PersistenceClient_Query_Integration(t *testing.T) { // 创建测试数据(通过manager的repository) manager := client.GetManager() repo := manager.GetOperationRepo() - + for i := 0; i < 5; i++ { op, err := model.NewFullOperation( model.OpSourceDOIP, string(model.OpTypeCreate), - "10.10000", // doPrefix - "client-repo", // doRepository + "10.10000", // doPrefix + "client-repo", // doRepository fmt.Sprintf("10.10000/client-repo/test-%d", i), // doid - fmt.Sprintf("client-producer-%d", i), // producerID - fmt.Sprintf("client-actor-%d", i), // opActor - nil, // requestBody - nil, // responseBody - time.Now(), // timestamp + fmt.Sprintf("client-producer-%d", i), // producerID + fmt.Sprintf("client-actor-%d", i), // opActor + nil, // requestBody + nil, // responseBody + time.Now(), // timestamp ) require.NoError(t, err) op.OpID = fmt.Sprintf("pg-client-query-%03d", i) @@ -613,4 +613,3 @@ func TestPG_PersistenceClient_Query_Integration(t *testing.T) { func strPtr(s string) *string { return &s } - diff --git a/api/persistence/query_test.go b/api/persistence/query_test.go index 9e16197..5cfc3c9 100644 --- a/api/persistence/query_test.go +++ b/api/persistence/query_test.go @@ -72,7 +72,7 @@ func TestOperationRepository_Query(t *testing.T) { require.NoError(t, err) assert.Equal(t, int64(2), result.Total) assert.Len(t, result.Operations, 2) - + for _, op := range result.Operations { assert.Equal(t, "DOIP", string(op.OpSource)) } @@ -102,7 +102,7 @@ func TestOperationRepository_Query(t *testing.T) { result, err := repo.Query(ctx, req) require.NoError(t, err) assert.Equal(t, int64(2), result.Total) - + for _, s := range result.Statuses { assert.Equal(t, StatusNotTrustlogged, s) } @@ -111,7 +111,7 @@ func TestOperationRepository_Query(t *testing.T) { t.Run("Query with time range", func(t *testing.T) { timeFrom := now.Add(-2*time.Hour - 30*time.Minute) timeTo := now.Add(-30 * time.Minute) - + req := &OperationQueryRequest{ TimeFrom: &timeFrom, TimeTo: &timeTo, @@ -154,7 +154,7 @@ func TestOperationRepository_Query(t *testing.T) { result, err := repo.Query(ctx, req) require.NoError(t, err) assert.Len(t, result.Operations, 4) - + // 验证降序排列 for i := 1; i < len(result.Operations); i++ { // 后面的时间应该早于或等于前面的时间 @@ -287,4 +287,3 @@ func TestPersistenceClient_QueryOperations(t *testing.T) { assert.Equal(t, StatusNotTrustlogged, status) }) } -