You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
68 lines
1.7 KiB
68 lines
1.7 KiB
package peer
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"memobus_relay_server/config"
|
|
"sync"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
)
|
|
|
|
// Manager 负责管理到其他对等服务器的 gRPC 客户端连接
|
|
type Manager struct {
|
|
redisClient *redis.Client
|
|
clients map[string]*grpc.ClientConn
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
var GlobalManager *Manager
|
|
|
|
func InitManager(redisCli *redis.Client) {
|
|
if !config.Cfg.Redis.Enabled {
|
|
return // 单机模式下不需要 Peer 管理器
|
|
}
|
|
GlobalManager = &Manager{
|
|
redisClient: redisCli,
|
|
clients: make(map[string]*grpc.ClientConn),
|
|
}
|
|
log.Println("Peer manager initialized for cluster communication.")
|
|
}
|
|
|
|
// GetClient 查找或创建一个到目标实例的 gRPC 客户端连接
|
|
func (m *Manager) GetClient(targetInstanceID string) (*grpc.ClientConn, error) {
|
|
m.mu.RLock()
|
|
client, ok := m.clients[targetInstanceID]
|
|
m.mu.RUnlock()
|
|
|
|
if ok {
|
|
return client, nil
|
|
}
|
|
|
|
// 连接未找到, 使用写锁创建一个新的
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
// 双重检查, 以防在我们等待锁的时候, 其他 goroutine 已经创建了它
|
|
if client, ok = m.clients[targetInstanceID]; ok {
|
|
return client, nil
|
|
}
|
|
|
|
// 从 Redis 发现目标实例的地址
|
|
addr, err := m.redisClient.HGet(context.Background(), config.Cfg.Redis.InstanceRegistryKey, targetInstanceID).Result()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
log.Printf("Creating new gRPC client connection to peer %s at %s", targetInstanceID, addr)
|
|
// 生产环境应使用 TLS 凭证替换 insecure
|
|
conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.clients[targetInstanceID] = conn
|
|
return conn, nil
|
|
}
|
|
|