Files
go-trustlog/api/adapter/publisher.go
ryan 0ec1d3b87d refactor: 更改module路径为独立仓库路径
- go.yandata.net/iod/iod/go-trustlog → go.yandata.net/wangsiyuan/go-trustlog
- 更新 go.mod module声明
- 更新 README.md 安装说明
- 批量更新所有 .go 文件中的 import 路径
- 61个文件受影响

这样go-trustlog可以作为独立SDK使用
2025-12-26 14:35:39 +08:00

120 lines
3.2 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package adapter
import (
"context"
"errors"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/apache/pulsar-client-go/pulsar"
"go.yandata.net/wangsiyuan/go-trustlog/api/logger"
)
const (
OperationTopic = "persistent://public/default/operation"
RecordTopic = "persistent://public/default/record"
)
// PublisherConfig is the configuration to create a publisher.
type PublisherConfig struct {
// URL is the Pulsar URL.
URL string
// TLSTrustCertsFilePath is the path to the CA certificate file for verifying the server certificate.
// If empty, TLS verification will be disabled.
TLSTrustCertsFilePath string
// TLSCertificateFilePath is the path to the client certificate file for mTLS authentication.
// If empty, mTLS authentication will be disabled.
TLSCertificateFilePath string
// TLSKeyFilePath is the path to the client private key file for mTLS authentication.
// If empty, mTLS authentication will be disabled.
TLSKeyFilePath string
// TLSAllowInsecureConnection allows insecure TLS connections (not recommended for production).
TLSAllowInsecureConnection bool
}
// Publisher provides the pulsar implementation for watermill publish operations.
type Publisher struct {
conn pulsar.Client
logger logger.Logger
pubs map[string]pulsar.Producer
}
// NewPublisher creates a new Publisher.
func NewPublisher(config PublisherConfig, adapter logger.Logger) (*Publisher, error) {
clientOptions := pulsar.ClientOptions{
URL: config.URL,
// Logger: 使用 Pulsar 默认 loggerinternal 包引用已移除)
}
// Configure TLS/mTLS
if err := configureTLSForClient(&clientOptions, config, adapter); err != nil {
return nil, errors.Join(err, errors.New("failed to configure TLS"))
}
conn, err := pulsar.NewClient(clientOptions)
if err != nil {
return nil, errors.Join(err, errors.New("cannot connect to pulsar"))
}
return NewPublisherWithPulsarClient(conn, adapter)
}
// NewPublisherWithPulsarClient creates a new Publisher with the provided pulsar connection.
func NewPublisherWithPulsarClient(conn pulsar.Client, logger logger.Logger) (*Publisher, error) {
return &Publisher{
conn: conn,
pubs: make(map[string]pulsar.Producer),
logger: logger,
}, nil
}
// Publish publishes message to Pulsar.
//
// Publish will not return until an ack has been received from Pulsar.
// When one of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
producer, found := p.pubs[topic]
if !found {
pr, err := p.conn.CreateProducer(pulsar.ProducerOptions{Topic: topic})
if err != nil {
return err
}
producer = pr
p.pubs[topic] = producer
}
for _, msg := range messages {
// 跳过 nil 消息
if msg == nil {
continue
}
p.logger.DebugContext(ctx, "Sending message", "key", msg.UUID, "topic", topic)
_, err := producer.Send(ctx, &pulsar.ProducerMessage{
Key: msg.UUID,
Payload: msg.Payload,
})
if err != nil {
return err
}
}
return nil
}
// Close closes the publisher and the underlying connection.
func (p *Publisher) Close() error {
for _, pub := range p.pubs {
pub.Close()
}
p.conn.Close()
return nil
}