You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
126 lines
3.9 KiB
126 lines
3.9 KiB
// 文件: storage/redis.go
|
|
package storage
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/redis/go-redis/v9"
|
|
"log"
|
|
"memobus_relay_server/config" // 替换为你的模块名
|
|
"time"
|
|
)
|
|
|
|
// RedisManager 结构体封装了所有与 Redis 相关的操作
|
|
type RedisManager struct {
|
|
Client *redis.Client
|
|
sessionTTL time.Duration
|
|
}
|
|
|
|
// GlobalRedis 是一个全局可访问的 RedisManager 实例
|
|
var GlobalRedis *RedisManager
|
|
|
|
// InitRedis 初始化 Redis 连接并创建全局的 RedisManager 实例
|
|
// 如果配置中 Redis 未启用,则返回 nil
|
|
func InitRedis() error {
|
|
if !config.Cfg.Redis.Enabled {
|
|
log.Println("Redis is disabled in config. Skipping initialization.")
|
|
return nil
|
|
|
|
}
|
|
|
|
client := redis.NewClient(&redis.Options{
|
|
Addr: config.Cfg.Redis.Addr,
|
|
Password: config.Cfg.Redis.Password,
|
|
DB: config.Cfg.Redis.DB,
|
|
})
|
|
|
|
if err := client.Ping(context.Background()).Err(); err != nil {
|
|
return fmt.Errorf("failed to connect to Redis: %w", err)
|
|
}
|
|
|
|
GlobalRedis = &RedisManager{
|
|
Client: client,
|
|
sessionTTL: time.Duration(config.Cfg.Redis.SessionTTLSeconds) * time.Second,
|
|
}
|
|
|
|
log.Println("Successfully connected to Redis.")
|
|
return nil
|
|
}
|
|
|
|
// getRedisKey 生成设备会话在 Redis 中的 key
|
|
func getRedisKey(deviceSN string) string {
|
|
return fmt.Sprintf("device_session:%s", deviceSN)
|
|
}
|
|
|
|
// RegisterDeviceSession 将设备标记为在线
|
|
// 在单机模式下,value 可以是一个简单的占位符,如 "online"
|
|
func (m *RedisManager) RegisterDeviceSession(deviceSN string, value string) error {
|
|
key := getRedisKey(deviceSN)
|
|
err := m.Client.Set(context.Background(), key, value, m.sessionTTL).Err()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to register device '%s' to Redis: %w", deviceSN, err)
|
|
}
|
|
log.Printf("Device '%s' registered in Redis.", deviceSN)
|
|
return nil
|
|
}
|
|
|
|
// DeregisterDeviceSession 从 Redis 中移除设备会话
|
|
func (m *RedisManager) DeregisterDeviceSession(deviceSN string) {
|
|
key := getRedisKey(deviceSN)
|
|
m.Client.Del(context.Background(), key)
|
|
log.Printf("Device '%s' deregistered from Redis.", deviceSN)
|
|
}
|
|
|
|
// IsDeviceOnline 检查设备是否在 Redis 中被标记为在线
|
|
func (m *RedisManager) IsDeviceOnline(deviceSN string) (bool, error) {
|
|
key := getRedisKey(deviceSN)
|
|
val, err := m.Client.Get(context.Background(), key).Result()
|
|
if err == redis.Nil {
|
|
return false, nil // Key 不存在,明确表示不在线
|
|
}
|
|
if err != nil {
|
|
return false, fmt.Errorf("redis error looking up device '%s': %w", deviceSN, err)
|
|
}
|
|
return val != "", nil // 只要 key 存在且值不为空,就认为在线
|
|
}
|
|
|
|
// [新增] GetDeviceOwner 函数,用来获取持有连接的实例 ID
|
|
func (m *RedisManager) GetDeviceOwner(deviceSN string) (string, error) {
|
|
key := getRedisKey(deviceSN)
|
|
instanceID, err := m.Client.Get(context.Background(), key).Result()
|
|
if err != nil {
|
|
// 让调用者处理 redis.Nil 错误,这表示设备未找到
|
|
return "", err
|
|
}
|
|
return instanceID, nil
|
|
}
|
|
|
|
// KeepAliveSession 启动一个 goroutine,为给定的设备会话在 Redis 中定期续期
|
|
func (m *RedisManager) KeepAliveSession(closeChan <-chan struct{}, deviceSN string) {
|
|
// 以 TTL 的一半作为续期间隔
|
|
ticker := time.NewTicker(m.sessionTTL / 2)
|
|
defer ticker.Stop()
|
|
|
|
key := getRedisKey(deviceSN)
|
|
log.Printf("Starting Redis keep-alive for device '%s'.", deviceSN)
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
// 为 key 续期
|
|
err := m.Client.Expire(context.Background(), key, m.sessionTTL).Err()
|
|
if err != nil {
|
|
// 如果 key 不存在了(可能被手动删除或过期),就没必要再续了
|
|
if err == redis.Nil {
|
|
log.Printf("Redis key for %s no longer exists, stopping keep-alive.", deviceSN)
|
|
return
|
|
}
|
|
log.Printf("ERROR: Failed to refresh session TTL for %s in Redis: %v", deviceSN, err)
|
|
}
|
|
case <-closeChan:
|
|
// session 关闭了,退出 goroutine
|
|
log.Printf("Stopping Redis keep-alive for device '%s' due to session close.", deviceSN)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|