From b256aa7595de14677d8c5df681d3a4d9fff3fb4a Mon Sep 17 00:00:00 2001 From: lin_hl Date: Thu, 23 Oct 2025 20:20:20 +0800 Subject: [PATCH] =?UTF-8?q?1.=20=E5=A2=9E=E5=8A=A0config=E6=96=87=E4=BB=B6?= =?UTF-8?q?=EF=BC=8C=E7=BB=9F=E4=B8=80=E9=85=8D=E7=BD=AE=E7=AE=A1=E7=90=86?= =?UTF-8?q?=EF=BC=9B=202.=20=E5=A2=9E=E5=8A=A0redis=EF=BC=8C=E6=B3=A8?= =?UTF-8?q?=E5=86=8C=E4=B8=AD=E8=BD=AC=E5=AE=9E=E4=BE=8B=EF=BC=8C=E4=BB=A5?= =?UTF-8?q?=E5=8F=8A=E4=BF=9D=E5=AD=98=E8=AE=BE=E5=A4=87=E4=B8=8E=E5=AE=9E?= =?UTF-8?q?=E4=BE=8B=E7=9A=84=E8=BF=9E=E6=8E=A5=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.yml | 28 +++++++++++ config/config.go | 66 +++++++++++++++++++++++++ go.mod | 20 ++++++++ main.go | 111 +++++++++++++++++++++++++++++++++---------- registry/registry.go | 73 ++++++++++++++++++++++++++++ storage/redis.go | 40 ++++++++++++++++ 6 files changed, 313 insertions(+), 25 deletions(-) create mode 100644 config.yml create mode 100644 config/config.go create mode 100644 registry/registry.go create mode 100644 storage/redis.go diff --git a/config.yml b/config.yml new file mode 100644 index 0000000..b550c96 --- /dev/null +++ b/config.yml @@ -0,0 +1,28 @@ +# config.yml +server: + app_listen_port: ":8089" + device_listen_port: ":7002" + + # [重要] 注册到 Redis 的公网地址 + public_app_addr: "192.168.5.193:8089" + public_device_addr: "192.168.5.193:7002" + + instance_id: "" # 留空会自动生成 + +# 认证密钥配置 +auth: + app_access_secret: "D4tBb9Y0oHSXRAyHLHpdKfXAuNCyCZ45AZxKJOhMJMs=" + device_relay_secret: "p+JtJ8aHlM1lDYu7UGFanX8ALVt1pM1BQmKTpqTJccs=" + +redis: + enabled: true + addr: "118.178.183.78:6379" + password: "" + db: 1 + + # [新增] 服务发现相关的 Key 和 TTL + # 使用 Redis Hash: Key 是 instance_registry_key, Field 是 instance_id, Value 是实例信息的 JSON + instance_registry_key: "relay_instances" + device_relay_mapping_key: "device_relay_mapping" + heartbeat_interval_seconds: 15 + instance_ttl_seconds: 45 # TTL 应该是心跳间隔的 2-3 倍 \ No newline at end of file diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..a493262 --- /dev/null +++ b/config/config.go @@ -0,0 +1,66 @@ +package config + +import ( + "github.com/google/uuid" + "github.com/spf13/viper" + "log" + "strings" +) + +type Config struct { + Server ServerConfig `mapstructure:"server"` + Auth AuthConfig `mapstructure:"auth"` + Redis RedisConfig `mapstructure:"redis"` +} +type ServerConfig struct { + AppListenPort string `mapstructure:"app_listen_port"` + DeviceListenPort string `mapstructure:"device_listen_port"` + PublicAppAddr string `mapstructure:"public_app_addr"` + PublicDeviceAddr string `mapstructure:"public_device_addr"` + InstanceID string `mapstructure:"instance_id"` +} +type AuthConfig struct { + AppAccessSecret string `mapstructure:"app_access_secret"` + DeviceRelaySecret string `mapstructure:"device_relay_secret"` +} + +type RedisConfig struct { + Enabled bool `mapstructure:"enabled"` + Addr string `mapstructure:"addr"` + Password string `mapstructure:"password"` + DB int `mapstructure:"db"` + + // [新增] + InstanceRegistryKey string `mapstructure:"instance_registry_key"` + DeviceRelayMappingKey string `mapstructure:"device_relay_mapping_key"` + HeartbeatIntervalSeconds int `mapstructure:"heartbeat_interval_seconds"` + InstanceTTLSeconds int `mapstructure:"instance_ttl_seconds"` +} + +var Cfg *Config + +func LoadConfig() { + viper.SetConfigName("config") + viper.SetConfigType("yml") + viper.AddConfigPath(".") + viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + viper.AutomaticEnv() + + // 读取配置文件 + if err := viper.ReadInConfig(); err != nil { + // 如果配置文件没找到,也没关系,可能完全通过环境变量配置 + if _, ok := err.(viper.ConfigFileNotFoundError); !ok { + log.Fatalf("Fatal error reading config file: %v", err) + } + } + + // 将读取到的配置反序列化到 Cfg 结构体中 + if err := viper.Unmarshal(&Cfg); err != nil { + log.Fatalf("Unable to decode config into struct: %v", err) + } + + if Cfg.Server.InstanceID == "" { + Cfg.Server.InstanceID = uuid.New().String() + } + log.Printf("Configuration loaded. Instance ID: %s", Cfg.Server.InstanceID) +} diff --git a/go.mod b/go.mod index 1a4587c..2062d1f 100644 --- a/go.mod +++ b/go.mod @@ -4,5 +4,25 @@ go 1.24 require ( github.com/golang-jwt/jwt/v5 v5.3.0 + github.com/google/uuid v1.6.0 github.com/hashicorp/yamux v0.1.2 + github.com/redis/go-redis/v9 v9.14.1 + github.com/spf13/viper v1.21.0 +) + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/fsnotify/fsnotify v1.9.0 // indirect + github.com/go-viper/mapstructure/v2 v2.4.0 // indirect + github.com/pelletier/go-toml/v2 v2.2.4 // indirect + github.com/sagikazarmark/locafero v0.11.0 // indirect + github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect + github.com/spf13/afero v1.15.0 // indirect + github.com/spf13/cast v1.10.0 // indirect + github.com/spf13/pflag v1.0.10 // indirect + github.com/subosito/gotenv v1.6.0 // indirect + go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/sys v0.29.0 // indirect + golang.org/x/text v0.28.0 // indirect ) diff --git a/main.go b/main.go index 4031393..6d05236 100644 --- a/main.go +++ b/main.go @@ -6,32 +6,32 @@ import ( "encoding/json" "errors" "fmt" + "github.com/redis/go-redis/v9" "log" + "memobus_relay_server/config" + "memobus_relay_server/registry" + "memobus_relay_server/storage" "net" "net/http" "net/http/httputil" "os" + "os/signal" "strings" "sync" + "syscall" "time" "github.com/golang-jwt/jwt/v5" "github.com/hashicorp/yamux" ) -// ============================================================================== // 1. 密钥配置 -// ============================================================================== var ( - // 用于验证 App 请求的密钥,必须和 ibserver 的 AppAccessSecret 一致 - appAccessSecret = []byte(os.Getenv("APP_ACCESS_SECRET")) - // 用于验证设备连接的密钥,必须和旧中继服务的 RelaySecret 一致 - deviceRelaySecret = []byte(os.Getenv("RELAY_SECRET")) + appAccessSecret []byte + deviceRelaySecret []byte ) -// ============================================================================== // 2. 结构体定义 -// ============================================================================== type AuthPayload struct { DeviceSN string `json:"device_sn"` Token string `json:"token"` @@ -53,20 +53,62 @@ var ( sessionMutex = &sync.RWMutex{} ) -// ============================================================================== // 3. Main & 服务器启动逻辑 -// ============================================================================== func main() { - if len(appAccessSecret) == 0 || len(deviceRelaySecret) == 0 { - log.Println("WARNING: APP_ACCESS_SECRET or RELAY_SECRET environment variable not set.") + // 1. 加载配置 + config.LoadConfig() + appAccessSecret = []byte(config.Cfg.Auth.AppAccessSecret) + deviceRelaySecret = []byte(config.Cfg.Auth.DeviceRelaySecret) + + // 2. 初始化存储层 (Redis) + if err := storage.InitRedis(); err != nil { + log.Fatalf("Failed to initialize storage: %v", err) } - go listenForDevices(":7002") - log.Println("Starting App HTTP server on :8089") - http.HandleFunc("/tunnel/", handleAppRequest) // 统一入口 - if err := http.ListenAndServe(":8089", nil); err != nil { - log.Fatalf("Failed to start App server: %v", err) + // 3. 启动注册与心跳 (它会自己检查 Redis 是否启用) + registry.StartHeartbeat(func() int { + sessionMutex.RLock() + defer sessionMutex.RUnlock() + return len(deviceSessions) + }) + + // 4. 启动核心服务 (放入后台 goroutine) + go listenForDevices(config.Cfg.Server.DeviceListenPort) + + mux := http.NewServeMux() + mux.HandleFunc("/tunnel/", handleAppRequest) + httpServer := &http.Server{ + Addr: config.Cfg.Server.AppListenPort, + Handler: mux, } + go func() { + log.Printf("Starting App HTTP server on %s", config.Cfg.Server.AppListenPort) + if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("App server ListenAndServe error: %v", err) + } + }() + + // 5. 设置并等待优雅停机 + shutdownChan := make(chan os.Signal, 1) + signal.Notify(shutdownChan, syscall.SIGINT, syscall.SIGTERM) + sig := <-shutdownChan + log.Printf("Shutdown signal received (%s), starting graceful shutdown...", sig) + + // 6. 执行清理操作 + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // a. 向调度服务(Redis)注销自己 + registry.Unregister() + + // b. 优雅地关闭 HTTP 服务器 + if err := httpServer.Shutdown(shutdownCtx); err != nil { + log.Printf("HTTP server shutdown error: %v", err) + } else { + log.Println("HTTP server gracefully stopped.") + } + + log.Println("Graceful shutdown complete.") } func listenForDevices(addr string) { @@ -87,9 +129,7 @@ func listenForDevices(addr string) { } } -// ============================================================================== // 4. 设备端认证与会话管理 -// ============================================================================== func handleDeviceSession(conn net.Conn) { defer conn.Close() log.Printf("New device connected from %s, awaiting authentication...\n", conn.RemoteAddr()) @@ -117,11 +157,11 @@ func handleDeviceSession(conn net.Conn) { userID := claims.UserID log.Printf("Device '%s' (user: %s) authenticated successfully.\n", deviceSN, userID) - config := yamux.DefaultConfig() - config.EnableKeepAlive = true - config.KeepAliveInterval = 30 * time.Second + yamuxConfig := yamux.DefaultConfig() + yamuxConfig.EnableKeepAlive = true + yamuxConfig.KeepAliveInterval = 30 * time.Second - session, err := yamux.Server(conn, config) + session, err := yamux.Server(conn, yamuxConfig) if err != nil { log.Printf("Failed to start yamux session for device '%s': %v", deviceSN, err) return @@ -138,6 +178,15 @@ func handleDeviceSession(conn net.Conn) { sessionMutex.Unlock() log.Printf("Yamux session started for device '%s'\n", deviceSN) + if storage.RedisClient != nil { + instanceID := config.Cfg.Server.InstanceID + if err := storage.RedisClient.HSet(context.Background(), config.Cfg.Redis.DeviceRelayMappingKey, deviceSN, instanceID).Err(); err != nil { + log.Printf("ERROR: Failed to update device-relay mapping for %s: %v", deviceSN, err) + } else { + log.Printf("Device %s is now mapped to instance %s in Redis.", deviceSN, instanceID) + } + } + defer func() { sessionMutex.Lock() if currentInfo, exists := deviceSessions[deviceSN]; exists && currentInfo.Session == session { @@ -145,6 +194,20 @@ func handleDeviceSession(conn net.Conn) { } sessionMutex.Unlock() log.Printf("Device '%s' session closed\n", deviceSN) + + // b. 再清理 Redis 映射 + if storage.RedisClient != nil { + instanceID := config.Cfg.Server.InstanceID + // [健壮性优化] 在删除前,先检查一下 Redis 里的值是不是还是自己。 + // 这可以防止因为竞态条件,错误地删除了一个刚刚重连到本机的、更新的会话映射。 + currentInstanceID, err := storage.RedisClient.HGet(context.Background(), config.Cfg.Redis.DeviceRelayMappingKey, deviceSN).Result() + if err == nil && currentInstanceID == instanceID { + storage.RedisClient.HDel(context.Background(), config.Cfg.Redis.DeviceRelayMappingKey, deviceSN) + log.Printf("Removed device-relay mapping for %s.", deviceSN) + } else if err != nil && err != redis.Nil { + log.Printf("ERROR: Could not verify mapping for %s before deleting: %v", deviceSN, err) + } + } }() <-session.CloseChan() @@ -172,9 +235,7 @@ func validateDeviceToken(tokenString string) (*DeviceJWTClaims, error) { return claims, nil } -// ============================================================================== // 5. App 端认证与请求处理 -// ============================================================================== func handleAppRequest(w http.ResponseWriter, r *http.Request) { pathParts := strings.SplitN(strings.TrimPrefix(r.URL.Path, "/"), "/", 3) if len(pathParts) < 2 || pathParts[0] != "tunnel" { diff --git a/registry/registry.go b/registry/registry.go new file mode 100644 index 0000000..451f307 --- /dev/null +++ b/registry/registry.go @@ -0,0 +1,73 @@ +package registry + +import ( + "context" + "encoding/json" + "log" + "memobus_relay_server/config" // 替换为你的模块名 + "memobus_relay_server/storage" + "time" +) + +// InstanceInfo 定义了要注册到 Redis 的实例信息 +type InstanceInfo struct { + InstanceID string `json:"instanceId"` + PublicAppAddr string `json:"publicAppAddr"` + PublicDeviceAddr string `json:"publicDeviceAddr"` + // 还可以增加负载信息 + ConnectedDevices int `json:"connectedDevices"` + LastHeartbeat time.Time `json:"lastHeartbeat"` +} + +func StartHeartbeat(getDeviceCount func() int) { + if storage.RedisClient == nil { + log.Println("Registry heartbeat is disabled because Redis is not enabled.") + return + } + + interval := time.Duration(config.Cfg.Redis.HeartbeatIntervalSeconds) * time.Second + ticker := time.NewTicker(interval) + + updateHeartbeat := func() { + info := InstanceInfo{ + InstanceID: config.Cfg.Server.InstanceID, + PublicAppAddr: config.Cfg.Server.PublicAppAddr, + PublicDeviceAddr: config.Cfg.Server.PublicDeviceAddr, + ConnectedDevices: getDeviceCount(), + LastHeartbeat: time.Now(), + } + jsonData, _ := json.Marshal(info) + + key := config.Cfg.Redis.InstanceRegistryKey + instanceID := config.Cfg.Server.InstanceID + + if err := storage.RedisClient.HSet(context.Background(), key, instanceID, jsonData).Err(); err != nil { + log.Printf("ERROR: Failed to send heartbeat to Redis: %v", err) + } else { + log.Printf("Heartbeat sent. Connected devices: %d", info.ConnectedDevices) + } + } + + go func() { + updateHeartbeat() // 立即执行一次 + for range ticker.C { + updateHeartbeat() + } + }() +} + +func Unregister() { + if storage.RedisClient == nil { + return + } + log.Println("Unregistering instance from Redis...") + key := config.Cfg.Redis.InstanceRegistryKey + instanceID := config.Cfg.Server.InstanceID + if err := storage.RedisClient.HDel(context.Background(), key, instanceID).Err(); err != nil { + // 如果注销失败,打印一个错误日志 + log.Printf("ERROR: Failed to unregister instance '%s' from Redis: %v", instanceID, err) + } else { + // 只有在确认没有错误后,才打印成功日志 + log.Println("Successfully unregistered.") + } +} diff --git a/storage/redis.go b/storage/redis.go new file mode 100644 index 0000000..f9569cc --- /dev/null +++ b/storage/redis.go @@ -0,0 +1,40 @@ +// 文件: storage/redis.go +package storage + +import ( + "context" + "fmt" + "log" + "memobus_relay_server/config" + + "github.com/redis/go-redis/v9" +) + +// 全局 Redis 客户端实例 +var RedisClient *redis.Client + +// InitRedis 初始化全局的 Redis 客户端连接 +// 这个函数现在是唯一的 Redis 初始化入口 +func InitRedis() error { + // 如果配置中未启用 Redis,则不进行任何操作 + if !config.Cfg.Redis.Enabled { + log.Println("Redis is disabled in config. Skipping initialization.") + return nil + } + + // 创建一个新的 Redis 客户端实例 + RedisClient = redis.NewClient(&redis.Options{ + Addr: config.Cfg.Redis.Addr, + Password: config.Cfg.Redis.Password, + DB: config.Cfg.Redis.DB, + }) + + // 测试连接,确保 Redis 服务可用 + if err := RedisClient.Ping(context.Background()).Err(); err != nil { + // 将错误包装后返回,让 main 函数决定如何处理 + return fmt.Errorf("failed to connect to Redis: %w", err) + } + + log.Println("Successfully connected to Redis.") + return nil +}