- go.yandata.net/iod/iod/go-trustlog → go.yandata.net/wangsiyuan/go-trustlog - 更新 go.mod module声明 - 更新 README.md 安装说明 - 批量更新所有 .go 文件中的 import 路径 - 61个文件受影响 这样go-trustlog可以作为独立SDK使用
120 lines
3.2 KiB
Go
120 lines
3.2 KiB
Go
package adapter
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
|
||
"github.com/ThreeDotsLabs/watermill/message"
|
||
"github.com/apache/pulsar-client-go/pulsar"
|
||
|
||
"go.yandata.net/wangsiyuan/go-trustlog/api/logger"
|
||
)
|
||
|
||
const (
|
||
OperationTopic = "persistent://public/default/operation"
|
||
RecordTopic = "persistent://public/default/record"
|
||
)
|
||
|
||
// PublisherConfig is the configuration to create a publisher.
|
||
type PublisherConfig struct {
|
||
// URL is the Pulsar URL.
|
||
URL string
|
||
// TLSTrustCertsFilePath is the path to the CA certificate file for verifying the server certificate.
|
||
// If empty, TLS verification will be disabled.
|
||
TLSTrustCertsFilePath string
|
||
// TLSCertificateFilePath is the path to the client certificate file for mTLS authentication.
|
||
// If empty, mTLS authentication will be disabled.
|
||
TLSCertificateFilePath string
|
||
// TLSKeyFilePath is the path to the client private key file for mTLS authentication.
|
||
// If empty, mTLS authentication will be disabled.
|
||
TLSKeyFilePath string
|
||
// TLSAllowInsecureConnection allows insecure TLS connections (not recommended for production).
|
||
TLSAllowInsecureConnection bool
|
||
}
|
||
|
||
// Publisher provides the pulsar implementation for watermill publish operations.
|
||
type Publisher struct {
|
||
conn pulsar.Client
|
||
logger logger.Logger
|
||
pubs map[string]pulsar.Producer
|
||
}
|
||
|
||
// NewPublisher creates a new Publisher.
|
||
func NewPublisher(config PublisherConfig, adapter logger.Logger) (*Publisher, error) {
|
||
clientOptions := pulsar.ClientOptions{
|
||
URL: config.URL,
|
||
// Logger: 使用 Pulsar 默认 logger(internal 包引用已移除)
|
||
}
|
||
|
||
// Configure TLS/mTLS
|
||
if err := configureTLSForClient(&clientOptions, config, adapter); err != nil {
|
||
return nil, errors.Join(err, errors.New("failed to configure TLS"))
|
||
}
|
||
|
||
conn, err := pulsar.NewClient(clientOptions)
|
||
if err != nil {
|
||
return nil, errors.Join(err, errors.New("cannot connect to pulsar"))
|
||
}
|
||
|
||
return NewPublisherWithPulsarClient(conn, adapter)
|
||
}
|
||
|
||
// NewPublisherWithPulsarClient creates a new Publisher with the provided pulsar connection.
|
||
func NewPublisherWithPulsarClient(conn pulsar.Client, logger logger.Logger) (*Publisher, error) {
|
||
return &Publisher{
|
||
conn: conn,
|
||
pubs: make(map[string]pulsar.Producer),
|
||
logger: logger,
|
||
}, nil
|
||
}
|
||
|
||
// Publish publishes message to Pulsar.
|
||
//
|
||
// Publish will not return until an ack has been received from Pulsar.
|
||
// When one of messages delivery fails - function is interrupted.
|
||
func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
defer cancel()
|
||
|
||
producer, found := p.pubs[topic]
|
||
|
||
if !found {
|
||
pr, err := p.conn.CreateProducer(pulsar.ProducerOptions{Topic: topic})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
producer = pr
|
||
p.pubs[topic] = producer
|
||
}
|
||
|
||
for _, msg := range messages {
|
||
// 跳过 nil 消息
|
||
if msg == nil {
|
||
continue
|
||
}
|
||
|
||
p.logger.DebugContext(ctx, "Sending message", "key", msg.UUID, "topic", topic)
|
||
_, err := producer.Send(ctx, &pulsar.ProducerMessage{
|
||
Key: msg.UUID,
|
||
Payload: msg.Payload,
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// Close closes the publisher and the underlying connection.
|
||
func (p *Publisher) Close() error {
|
||
for _, pub := range p.pubs {
|
||
pub.Close()
|
||
}
|
||
|
||
p.conn.Close()
|
||
|
||
return nil
|
||
}
|