package peer import ( "context" "log" "memobus_relay_server/config" "sync" "github.com/redis/go-redis/v9" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) // Manager 负责管理到其他对等服务器的 gRPC 客户端连接 type Manager struct { redisClient *redis.Client clients map[string]*grpc.ClientConn mu sync.RWMutex } var GlobalManager *Manager func InitManager(redisCli *redis.Client) { if !config.Cfg.Redis.Enabled { return // 单机模式下不需要 Peer 管理器 } GlobalManager = &Manager{ redisClient: redisCli, clients: make(map[string]*grpc.ClientConn), } log.Println("Peer manager initialized for cluster communication.") } // GetClient 查找或创建一个到目标实例的 gRPC 客户端连接 func (m *Manager) GetClient(targetInstanceID string) (*grpc.ClientConn, error) { m.mu.RLock() client, ok := m.clients[targetInstanceID] m.mu.RUnlock() if ok { return client, nil } // 连接未找到, 使用写锁创建一个新的 m.mu.Lock() defer m.mu.Unlock() // 双重检查, 以防在我们等待锁的时候, 其他 goroutine 已经创建了它 if client, ok = m.clients[targetInstanceID]; ok { return client, nil } // 从 Redis 发现目标实例的地址 addr, err := m.redisClient.HGet(context.Background(), config.Cfg.Redis.InstanceRegistryKey, targetInstanceID).Result() if err != nil { return nil, err } log.Printf("Creating new gRPC client connection to peer %s at %s", targetInstanceID, addr) // 生产环境应使用 TLS 凭证替换 insecure conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } m.clients[targetInstanceID] = conn return conn, nil }