Files
go-trustlog/api/adapter/subscriber.go
ryan 0ec1d3b87d refactor: 更改module路径为独立仓库路径
- go.yandata.net/iod/iod/go-trustlog → go.yandata.net/wangsiyuan/go-trustlog
- 更新 go.mod module声明
- 更新 README.md 安装说明
- 批量更新所有 .go 文件中的 import 路径
- 61个文件受影响

这样go-trustlog可以作为独立SDK使用
2025-12-26 14:35:39 +08:00

274 lines
6.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package adapter
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/apache/pulsar-client-go/pulsar"
"go.yandata.net/wangsiyuan/go-trustlog/api/logger"
)
const (
SubNameKey contextKey = "subName"
ReceiverQueueSizeKey contextKey = "receiverQueueSize"
IndexKey contextKey = "index"
ReceiverQueueSizeDefault = 1000
SubNameDefault = "subName"
TimeOutDefault = time.Second * 10
defaultMessageChannelSize = 10
)
type contextKey string
var _ message.Subscriber = &Subscriber{}
// SubscriberConfig is the configuration to create a subscriber.
type SubscriberConfig struct {
// URL is the URL to the broker
URL string
// SubscriberName is the name of the subscription.
SubscriberName string
// SubscriberType is the type of the subscription.
SubscriberType pulsar.SubscriptionType
// TLSTrustCertsFilePath is the path to the CA certificate file for verifying the server certificate.
// If empty, TLS verification will be disabled.
TLSTrustCertsFilePath string
// TLSCertificateFilePath is the path to the client certificate file for mTLS authentication.
// If empty, mTLS authentication will be disabled.
TLSCertificateFilePath string
// TLSKeyFilePath is the path to the client private key file for mTLS authentication.
// If empty, mTLS authentication will be disabled.
TLSKeyFilePath string
// TLSAllowInsecureConnection allows insecure TLS connections (not recommended for production).
TLSAllowInsecureConnection bool
}
// Subscriber provides the pulsar implementation for watermill subscribe operations.
type Subscriber struct {
conn pulsar.Client
logger logger.Logger
subsLock sync.RWMutex
// Change to map with composite key: topic + subscriptionName + subName
subs map[string]pulsar.Consumer
closed bool
closing chan struct{}
SubscribersCount int
clientID string
config SubscriberConfig
}
// NewSubscriber creates a new Subscriber.
func NewSubscriber(config SubscriberConfig, adapter logger.Logger) (*Subscriber, error) {
clientOptions := pulsar.ClientOptions{
URL: config.URL,
// Logger: 使用 Pulsar 默认 loggerinternal 包引用已移除)
}
// Configure TLS/mTLS
if err := configureTLSForClient(&clientOptions, config, adapter); err != nil {
return nil, errors.Join(err, errors.New("failed to configure TLS"))
}
conn, err := pulsar.NewClient(clientOptions)
if err != nil {
return nil, errors.Join(err, errors.New("cannot connect to Pulsar"))
}
return NewSubscriberWithPulsarClient(conn, config, adapter)
}
// NewSubscriberWithPulsarClient creates a new Subscriber with the provided pulsar client.
func NewSubscriberWithPulsarClient(
conn pulsar.Client,
config SubscriberConfig,
logger logger.Logger,
) (*Subscriber, error) {
return &Subscriber{
conn: conn,
logger: logger,
closing: make(chan struct{}),
clientID: watermill.NewULID(),
subs: make(map[string]pulsar.Consumer),
config: config,
}, nil
}
// Subscribe subscribes messages from Pulsar.
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
output := make(chan *message.Message)
s.subsLock.Lock()
subName, ok := ctx.Value(SubNameKey).(string)
if !ok {
subName = SubNameDefault
}
index, ok := ctx.Value(IndexKey).(int)
if !ok {
index = 0
}
receiverQueueSize, ok := ctx.Value(ReceiverQueueSizeKey).(int)
if !ok {
receiverQueueSize = ReceiverQueueSizeDefault
}
subscriptionName := fmt.Sprintf("%s-%s", topic, s.clientID)
if s.config.SubscriberName != "" {
subscriptionName = s.config.SubscriberName
}
sn := fmt.Sprintf("%s_%s", subscriptionName, subName)
n := fmt.Sprintf("%s_%d", sn, index)
sub, found := s.subs[n]
if !found {
subscribeCtx, cancel := context.WithTimeout(ctx, TimeOutDefault)
defer cancel()
done := make(chan struct{})
var sb pulsar.Consumer
var err error
go func() {
defer close(done)
sb, err = s.conn.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
Name: n,
SubscriptionName: sn,
Type: s.config.SubscriberType,
MessageChannel: make(chan pulsar.ConsumerMessage, defaultMessageChannelSize),
ReceiverQueueSize: receiverQueueSize,
})
}()
select {
case <-subscribeCtx.Done():
s.subsLock.Unlock()
return nil, fmt.Errorf("subscription timeout: %w", subscribeCtx.Err())
case <-done:
if err != nil {
s.subsLock.Unlock()
return nil, fmt.Errorf("subscription failed: %w", err)
}
}
s.subs[n] = sb
sub = sb
}
s.subsLock.Unlock()
// 创建本地引用以避免竞态条件
localSub := sub
go func() {
for {
select {
case <-s.closing:
s.logger.InfoContext(ctx, "subscriber is closing")
return
case <-ctx.Done():
s.logger.InfoContext(ctx, "exiting on context closure")
return
case m, msgOk := <-localSub.Chan():
if !msgOk {
// Channel closed, exit the loop
s.logger.InfoContext(ctx, "consumer channel closed")
return
}
go s.processMessage(ctx, output, m, localSub)
}
}
}()
return output, nil
}
func (s *Subscriber) processMessage(
ctx context.Context,
output chan *message.Message,
m pulsar.Message,
sub pulsar.Consumer,
) {
if s.isClosed() {
return
}
s.logger.DebugContext(ctx, "Received message", "key", m.Key())
ctx, cancelCtx := context.WithCancel(ctx)
defer cancelCtx()
msg := message.NewMessage(m.Key(), m.Payload())
select {
case <-s.closing:
s.logger.DebugContext(ctx, "Closing, message discarded", "key", m.Key())
return
case <-ctx.Done():
s.logger.DebugContext(ctx, "Context cancelled, message discarded")
return
// if this is first can risk 'send on closed channel' errors
case output <- msg:
s.logger.DebugContext(ctx, "Message sent to consumer")
}
select {
case <-msg.Acked():
err := sub.Ack(m)
if err != nil {
s.logger.DebugContext(ctx, "Message Ack Failed")
}
s.logger.DebugContext(ctx, "Message Acked")
case <-msg.Nacked():
sub.Nack(m)
s.logger.DebugContext(ctx, "Message Nacked")
case <-s.closing:
s.logger.DebugContext(ctx, "Closing, message discarded before ack")
return
case <-ctx.Done():
s.logger.DebugContext(ctx, "Context cancelled, message discarded before ack")
return
}
}
// Close closes the publisher and the underlying connection. It will attempt to wait for in-flight messages to complete.
func (s *Subscriber) Close() error {
s.subsLock.Lock()
defer s.subsLock.Unlock()
if s.closed {
return nil
}
s.closed = true
s.logger.DebugContext(context.Background(), "Closing subscriber")
defer s.logger.InfoContext(context.Background(), "Subscriber closed")
close(s.closing)
for _, sub := range s.subs {
sub.Close()
}
s.conn.Close()
return nil
}
func (s *Subscriber) isClosed() bool {
s.subsLock.RLock()
defer s.subsLock.RUnlock()
return s.closed
}