新中转服务
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

127 lines
3.9 KiB

// 文件: storage/redis.go
package storage
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"log"
"memobus_relay_server/config" // 替换为你的模块名
"time"
)
// RedisManager 结构体封装了所有与 Redis 相关的操作
type RedisManager struct {
Client *redis.Client
sessionTTL time.Duration
}
// GlobalRedis 是一个全局可访问的 RedisManager 实例
var GlobalRedis *RedisManager
// InitRedis 初始化 Redis 连接并创建全局的 RedisManager 实例
// 如果配置中 Redis 未启用,则返回 nil
func InitRedis() error {
if !config.Cfg.Redis.Enabled {
log.Println("Redis is disabled in config. Skipping initialization.")
return nil
}
client := redis.NewClient(&redis.Options{
Addr: config.Cfg.Redis.Addr,
Password: config.Cfg.Redis.Password,
DB: config.Cfg.Redis.DB,
})
if err := client.Ping(context.Background()).Err(); err != nil {
return fmt.Errorf("failed to connect to Redis: %w", err)
}
GlobalRedis = &RedisManager{
Client: client,
sessionTTL: time.Duration(config.Cfg.Redis.SessionTTLSeconds) * time.Second,
}
log.Println("Successfully connected to Redis.")
return nil
}
// getRedisKey 生成设备会话在 Redis 中的 key
func getRedisKey(deviceSN string) string {
return fmt.Sprintf("device_session:%s", deviceSN)
}
// RegisterDeviceSession 将设备标记为在线
// 在单机模式下,value 可以是一个简单的占位符,如 "online"
func (m *RedisManager) RegisterDeviceSession(deviceSN string, value string) error {
key := getRedisKey(deviceSN)
err := m.Client.Set(context.Background(), key, value, m.sessionTTL).Err()
if err != nil {
return fmt.Errorf("failed to register device '%s' to Redis: %w", deviceSN, err)
}
log.Printf("Device '%s' registered in Redis.", deviceSN)
return nil
}
// DeregisterDeviceSession 从 Redis 中移除设备会话
func (m *RedisManager) DeregisterDeviceSession(deviceSN string) {
key := getRedisKey(deviceSN)
m.Client.Del(context.Background(), key)
log.Printf("Device '%s' deregistered from Redis.", deviceSN)
}
// IsDeviceOnline 检查设备是否在 Redis 中被标记为在线
func (m *RedisManager) IsDeviceOnline(deviceSN string) (bool, error) {
key := getRedisKey(deviceSN)
val, err := m.Client.Get(context.Background(), key).Result()
if err == redis.Nil {
return false, nil // Key 不存在,明确表示不在线
}
if err != nil {
return false, fmt.Errorf("redis error looking up device '%s': %w", deviceSN, err)
}
return val != "", nil // 只要 key 存在且值不为空,就认为在线
}
// [新增] GetDeviceOwner 函数,用来获取持有连接的实例 ID
func (m *RedisManager) GetDeviceOwner(deviceSN string) (string, error) {
key := getRedisKey(deviceSN)
instanceID, err := m.Client.Get(context.Background(), key).Result()
if err != nil {
// 让调用者处理 redis.Nil 错误,这表示设备未找到
return "", err
}
return instanceID, nil
}
// KeepAliveSession 启动一个 goroutine,为给定的设备会话在 Redis 中定期续期
func (m *RedisManager) KeepAliveSession(closeChan <-chan struct{}, deviceSN string) {
// 以 TTL 的一半作为续期间隔
ticker := time.NewTicker(m.sessionTTL / 2)
defer ticker.Stop()
key := getRedisKey(deviceSN)
log.Printf("Starting Redis keep-alive for device '%s'.", deviceSN)
for {
select {
case <-ticker.C:
// 为 key 续期
err := m.Client.Expire(context.Background(), key, m.sessionTTL).Err()
if err != nil {
// 如果 key 不存在了(可能被手动删除或过期),就没必要再续了
if err == redis.Nil {
log.Printf("Redis key for %s no longer exists, stopping keep-alive.", deviceSN)
return
}
log.Printf("ERROR: Failed to refresh session TTL for %s in Redis: %v", deviceSN, err)
}
case <-closeChan:
// session 关闭了,退出 goroutine
log.Printf("Stopping Redis keep-alive for device '%s' due to session close.", deviceSN)
return
}
}
}