Compare commits
	
		
			2 Commits 
		
	
	
		
			main
			...
			grpc-forwa
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						1fe40f40cd | 2 weeks ago | 
| 
							
							
								 | 
						cd38aa41ac | 2 weeks ago | 
				 11 changed files with 2131 additions and 145 deletions
			
			
		@ -0,0 +1,31 @@ | 
				
			|||
# config.yml | 
				
			|||
 | 
				
			|||
# 服务器相关配置 | 
				
			|||
server: | 
				
			|||
  app_listen_port: ":8089" | 
				
			|||
  device_listen_port: ":7002" | 
				
			|||
  instance_id: "" # 留空会自动生成 UUID, 也可以指定一个固定的ID | 
				
			|||
  # [新增] 用于服务器间通信的 gRPC 配置 | 
				
			|||
  grpc_listen_addr: ":9090" | 
				
			|||
  # 这个地址必须能被其他服务器实例访问到。 | 
				
			|||
  # 在 Docker/K8s 环境中, 这应该是服务名或 Pod IP。 | 
				
			|||
  grpc_advertise_addr: "192.168.5.193:9090" | 
				
			|||
 | 
				
			|||
# 认证密钥配置 | 
				
			|||
auth: | 
				
			|||
  app_access_secret: "D4tBb9Y0oHSXRAyHLHpdKfXAuNCyCZ45AZxKJOhMJMs=" | 
				
			|||
  device_relay_secret: "p+JtJ8aHlM1lDYu7UGFanX8ALVt1pM1BQmKTpqTJccs=" | 
				
			|||
 | 
				
			|||
# Redis 配置 (为下一步做准备) | 
				
			|||
# 如果 enabled 为 false,我们的代码将退回使用内存 map,实现单机兼容 | 
				
			|||
redis: | 
				
			|||
  enabled: true | 
				
			|||
  addr: "118.178.183.78:6379" | 
				
			|||
  password: "" # 留空表示没有密码 | 
				
			|||
  db: 1 | 
				
			|||
  session_ttl_seconds: 120 # 会话在 Redis 中的过期时间、 | 
				
			|||
  # [新增] 用于服务发现的 Key | 
				
			|||
  # 一个 Redis Hash, 存储 instance_id -> grpc_addr 的映射 | 
				
			|||
  instance_registry_key: "relay_instances" | 
				
			|||
  # 实例必须比这个 TTL 更快地发送心跳 | 
				
			|||
  instance_ttl_seconds: 15 | 
				
			|||
@ -0,0 +1,79 @@ | 
				
			|||
package config | 
				
			|||
 | 
				
			|||
import ( | 
				
			|||
	"github.com/google/uuid" | 
				
			|||
	"github.com/spf13/viper" | 
				
			|||
	"log" | 
				
			|||
	"strings" | 
				
			|||
) | 
				
			|||
 | 
				
			|||
// Config 结构体必须与 config.yml 的结构完全对应
 | 
				
			|||
// 使用 `mapstructure` tag 来帮助 Viper 正确映射 YAML 键名到 Go 结构体字段
 | 
				
			|||
type Config struct { | 
				
			|||
	Server ServerConfig `mapstructure:"server"` | 
				
			|||
	Auth   AuthConfig   `mapstructure:"auth"` | 
				
			|||
	Redis  RedisConfig  `mapstructure:"redis"` | 
				
			|||
} | 
				
			|||
 | 
				
			|||
type ServerConfig struct { | 
				
			|||
	AppListenPort    string `mapstructure:"app_listen_port"` | 
				
			|||
	DeviceListenPort string `mapstructure:"device_listen_port"` | 
				
			|||
 | 
				
			|||
	// [新增]
 | 
				
			|||
	InstanceID        string `mapstructure:"instance_id"` | 
				
			|||
	GrpcListenAddr    string `mapstructure:"grpc_listen_addr"` | 
				
			|||
	GrpcAdvertiseAddr string `mapstructure:"grpc_advertise_addr"` | 
				
			|||
} | 
				
			|||
 | 
				
			|||
type AuthConfig struct { | 
				
			|||
	AppAccessSecret   string `mapstructure:"app_access_secret"` | 
				
			|||
	DeviceRelaySecret string `mapstructure:"device_relay_secret"` | 
				
			|||
} | 
				
			|||
 | 
				
			|||
type RedisConfig struct { | 
				
			|||
	Enabled           bool   `mapstructure:"enabled"` | 
				
			|||
	Addr              string `mapstructure:"addr"` | 
				
			|||
	Password          string `mapstructure:"password"` | 
				
			|||
	DB                int    `mapstructure:"db"` | 
				
			|||
	SessionTTLSeconds int    `mapstructure:"session_ttl_seconds"` // 确保有这个字段
 | 
				
			|||
 | 
				
			|||
	// [新增]
 | 
				
			|||
	InstanceRegistryKey string `mapstructure:"instance_registry_key"` | 
				
			|||
	InstanceTTLSeconds  int    `mapstructure:"instance_ttl_seconds"` | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// Cfg 是一个全局变量,用于在项目的任何地方访问配置
 | 
				
			|||
var Cfg *Config | 
				
			|||
 | 
				
			|||
// LoadConfig 是初始化函数,负责读取和解析配置文件
 | 
				
			|||
func LoadConfig() { | 
				
			|||
	viper.SetConfigName("config")   // 配置文件名 (不带扩展名)
 | 
				
			|||
	viper.SetConfigType("yml")      // 配置文件类型
 | 
				
			|||
	viper.AddConfigPath(".")        // 在当前工作目录查找配置文件
 | 
				
			|||
	viper.AddConfigPath("./config") // 也在 config 目录查找
 | 
				
			|||
 | 
				
			|||
	// [关键] 开启环境变量支持
 | 
				
			|||
	// 这允许你通过环境变量覆盖配置文件中的值
 | 
				
			|||
	// 例如:SERVER_APP_LISTEN_ADDR=":9000" 会覆盖文件中的设置
 | 
				
			|||
	viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) | 
				
			|||
	viper.AutomaticEnv() | 
				
			|||
 | 
				
			|||
	// 读取配置文件
 | 
				
			|||
	if err := viper.ReadInConfig(); err != nil { | 
				
			|||
		// 如果配置文件没找到,也没关系,可能完全通过环境变量配置
 | 
				
			|||
		if _, ok := err.(viper.ConfigFileNotFoundError); !ok { | 
				
			|||
			log.Fatalf("Fatal error reading config file: %v", err) | 
				
			|||
		} | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	// 将读取到的配置反序列化到 Cfg 结构体中
 | 
				
			|||
	if err := viper.Unmarshal(&Cfg); err != nil { | 
				
			|||
		log.Fatalf("Unable to decode config into struct: %v", err) | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	// [新增] 如果 instance_id 未配置,则自动生成
 | 
				
			|||
	if Cfg.Server.InstanceID == "" { | 
				
			|||
		Cfg.Server.InstanceID = uuid.New().String() | 
				
			|||
	} | 
				
			|||
	log.Printf("Configuration loaded. Server Instance ID: %s", Cfg.Server.InstanceID) | 
				
			|||
} | 
				
			|||
@ -1,8 +1,34 @@ | 
				
			|||
module memobus_relay_server | 
				
			|||
 | 
				
			|||
go 1.24 | 
				
			|||
go 1.24.0 | 
				
			|||
 | 
				
			|||
toolchain go1.24.2 | 
				
			|||
 | 
				
			|||
require ( | 
				
			|||
	github.com/golang-jwt/jwt/v5 v5.3.0 | 
				
			|||
	github.com/google/uuid v1.6.0 | 
				
			|||
	github.com/hashicorp/yamux v0.1.2 | 
				
			|||
	github.com/redis/go-redis/v9 v9.14.1 | 
				
			|||
	github.com/spf13/viper v1.21.0 | 
				
			|||
	google.golang.org/grpc v1.76.0 | 
				
			|||
) | 
				
			|||
 | 
				
			|||
require ( | 
				
			|||
	github.com/cespare/xxhash/v2 v2.3.0 // indirect | 
				
			|||
	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect | 
				
			|||
	github.com/fsnotify/fsnotify v1.9.0 // indirect | 
				
			|||
	github.com/go-viper/mapstructure/v2 v2.4.0 // indirect | 
				
			|||
	github.com/pelletier/go-toml/v2 v2.2.4 // indirect | 
				
			|||
	github.com/sagikazarmark/locafero v0.11.0 // indirect | 
				
			|||
	github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect | 
				
			|||
	github.com/spf13/afero v1.15.0 // indirect | 
				
			|||
	github.com/spf13/cast v1.10.0 // indirect | 
				
			|||
	github.com/spf13/pflag v1.0.10 // indirect | 
				
			|||
	github.com/subosito/gotenv v1.6.0 // indirect | 
				
			|||
	go.yaml.in/yaml/v3 v3.0.4 // indirect | 
				
			|||
	golang.org/x/net v0.42.0 // indirect | 
				
			|||
	golang.org/x/sys v0.34.0 // indirect | 
				
			|||
	golang.org/x/text v0.28.0 // indirect | 
				
			|||
	google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b // indirect | 
				
			|||
	google.golang.org/protobuf v1.36.6 // indirect | 
				
			|||
) | 
				
			|||
 | 
				
			|||
@ -0,0 +1,360 @@ | 
				
			|||
// 文件: grpc/server.go
 | 
				
			|||
package grpc | 
				
			|||
 | 
				
			|||
import ( | 
				
			|||
	"bufio" | 
				
			|||
	"io" | 
				
			|||
	"log" | 
				
			|||
	relaypb "memobus_relay_server/relay_server/proto" | 
				
			|||
	"memobus_relay_server/session" | 
				
			|||
	"net/http" | 
				
			|||
	"net/http/httptest" | 
				
			|||
	"strings" | 
				
			|||
 | 
				
			|||
	"google.golang.org/grpc/codes" | 
				
			|||
	"google.golang.org/grpc/status" | 
				
			|||
) | 
				
			|||
 | 
				
			|||
// InternalRelayServer 实现了用于代理请求的 gRPC 服务
 | 
				
			|||
type InternalRelayServer struct { | 
				
			|||
	relaypb.UnimplementedInternalRelayServer | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// NewInternalRelayServer 创建一个新的 gRPC 服务实例
 | 
				
			|||
func NewInternalRelayServer() *InternalRelayServer { | 
				
			|||
	return &InternalRelayServer{} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// ProxyRequest 是核心的 gRPC 流处理器,实现了完整的请求代理和流式响应
 | 
				
			|||
func (s *InternalRelayServer) ProxyRequest(stream relaypb.InternalRelay_ProxyRequestServer) error { | 
				
			|||
	// --- 1. 接收和解析请求头 ---
 | 
				
			|||
	headerMsg, err := stream.Recv() | 
				
			|||
	if err != nil { | 
				
			|||
		log.Printf("ERROR (gRPC): Failed to receive initial header: %v", err) | 
				
			|||
		return status.Errorf(codes.InvalidArgument, "failed to receive header: %v", err) | 
				
			|||
	} | 
				
			|||
	header := headerMsg.GetHeader() | 
				
			|||
	if header == nil { | 
				
			|||
		return status.Errorf(codes.InvalidArgument, "first message must be a header") | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	// [新增调试日志]
 | 
				
			|||
	if header.Headers["Upgrade"] == "websocket" { | 
				
			|||
		log.Printf("DEBUG (WebSocket): gRPC server received WebSocket upgrade request. Headers: Connection='%s', Upgrade='%s'", header.Headers["Connection"], header.Headers["Upgrade"]) | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	// 检查是否是 WebSocket 握手请求
 | 
				
			|||
	isWebSocket := header.Headers["Upgrade"] == "websocket" && strings.Contains(strings.ToLower(header.Headers["Connection"]), "upgrade") | 
				
			|||
 | 
				
			|||
	pathParts := strings.SplitN(strings.TrimPrefix(header.Url, "/"), "/", 3) | 
				
			|||
	if len(pathParts) < 2 { | 
				
			|||
		return status.Errorf(codes.InvalidArgument, "invalid URL format in gRPC header") | 
				
			|||
	} | 
				
			|||
	deviceSN := pathParts[1] | 
				
			|||
	appUserID := header.GetAppUserId() | 
				
			|||
 | 
				
			|||
	if appUserID == "" { | 
				
			|||
		return status.Errorf(codes.InvalidArgument, "app_user_id is missing in gRPC header") | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	log.Printf("gRPC Proxy: Handling request for device '%s' from user '%s'", deviceSN, appUserID) | 
				
			|||
 | 
				
			|||
	// --- 2. 查找本地会话并进行授权检查 ---
 | 
				
			|||
	sessionInfo, ok := session.GlobalManager.GetLocalSession(deviceSN) | 
				
			|||
	if !ok { | 
				
			|||
		return status.Errorf(codes.NotFound, "device '%s' not connected to this instance", deviceSN) | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	if sessionInfo.UserID != appUserID { | 
				
			|||
		log.Printf("Forbidden (gRPC): User '%s' attempted to access device '%s' owned by '%s'", appUserID, deviceSN, sessionInfo.UserID) | 
				
			|||
		return sendForbiddenResponse(stream) | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	log.Printf("gRPC Proxy: Handling request for device '%s' from user '%s'", deviceSN, appUserID) | 
				
			|||
 | 
				
			|||
	// --- 3. [核心修改] 根据请求类型进行分流 ---
 | 
				
			|||
	if isWebSocket { | 
				
			|||
		log.Println("gRPC Proxy: Detected WebSocket request, diverting to transparent proxy handler.") | 
				
			|||
		return s.handleWebSocketProxy(stream, sessionInfo, deviceSN, header) | 
				
			|||
	} else { | 
				
			|||
		// 如果是普通 HTTP, 调用原来的 ReverseProxy 处理器
 | 
				
			|||
		log.Println("gRPC Proxy: Detected HTTP request, using ReverseProxy handler.") | 
				
			|||
		// 注意:我把原来的 ProxyRequest 逻辑提取到了一个新函数中,以保持整洁
 | 
				
			|||
		return s.handleHTTPProxy(stream, sessionInfo, deviceSN, header) | 
				
			|||
	} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (s *InternalRelayServer) handleWebSocketProxy(stream relaypb.InternalRelay_ProxyRequestServer, sessionInfo *session.SessionInfo, deviceSN string, header *relaypb.ProxyRequestHeader) error { | 
				
			|||
	// 1. 打开到后端 (yamux) 的连接
 | 
				
			|||
	backendConn, err := sessionInfo.Session.Open() | 
				
			|||
	if err != nil { | 
				
			|||
		log.Printf("ERROR (WebSocket Proxy): Failed to dial backend: %v", err) | 
				
			|||
		return status.Errorf(codes.Internal, "failed to connect to backend service") | 
				
			|||
	} | 
				
			|||
	defer backendConn.Close() | 
				
			|||
 | 
				
			|||
	// 2. 重建原始的 HTTP 升级请求
 | 
				
			|||
	req := httptest.NewRequest(header.Method, "http://internal-proxy"+header.Url, nil) | 
				
			|||
	for k, v := range header.Headers { | 
				
			|||
		req.Header.Set(k, v) | 
				
			|||
	} | 
				
			|||
	req.Host = "immich-internal" // 模拟 ReverseProxy 的行为
 | 
				
			|||
	pathParts := strings.SplitN(strings.TrimPrefix(req.URL.Path, "/"), "/", 3) | 
				
			|||
	if len(pathParts) > 2 { | 
				
			|||
		req.URL.Path = "/" + pathParts[2] | 
				
			|||
	} else { | 
				
			|||
		req.URL.Path = "/" | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	// 3. 将升级请求写入后端连接,发起握手
 | 
				
			|||
	if err := req.Write(backendConn); err != nil { | 
				
			|||
		log.Printf("ERROR (WebSocket Proxy): Failed to write upgrade request to backend: %v", err) | 
				
			|||
		return status.Errorf(codes.Internal, "failed to send upgrade request to backend") | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	// 4. 读取后端的响应 (握手结果)
 | 
				
			|||
	backendReader := bufio.NewReader(backendConn) | 
				
			|||
	resp, err := http.ReadResponse(backendReader, req) | 
				
			|||
	if err != nil { | 
				
			|||
		log.Printf("ERROR (WebSocket Proxy): Failed to read handshake response from backend: %v", err) | 
				
			|||
		return status.Errorf(codes.Internal, "failed to read handshake response from backend") | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	// 5. 将后端的握手响应通过 gRPC 发回给代理节点
 | 
				
			|||
	respHeaderMsg := &relaypb.ProxyResponseMessage{ | 
				
			|||
		Payload: &relaypb.ProxyResponseMessage_Header{ | 
				
			|||
			Header: &relaypb.ProxyResponseHeader{ | 
				
			|||
				StatusCode: int32(resp.StatusCode), | 
				
			|||
				Headers:    make(map[string]string), | 
				
			|||
			}, | 
				
			|||
		}, | 
				
			|||
	} | 
				
			|||
	for k, v := range resp.Header { | 
				
			|||
		respHeaderMsg.GetHeader().Headers[k] = strings.Join(v, ",") | 
				
			|||
	} | 
				
			|||
	if err := stream.Send(respHeaderMsg); err != nil { | 
				
			|||
		log.Printf("ERROR (WebSocket Proxy): Failed to send handshake response via gRPC: %v", err) | 
				
			|||
		return err | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	// 6. 如果握手失败 (不是 101),则流程结束
 | 
				
			|||
	if resp.StatusCode != http.StatusSwitchingProtocols { | 
				
			|||
		log.Printf("WARN (WebSocket Proxy): Backend returned non-101 status for upgrade: %d", resp.StatusCode) | 
				
			|||
		return nil | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	log.Printf("WebSocket handshake for device %s successful. Starting bi-directional stream copy.", deviceSN) | 
				
			|||
 | 
				
			|||
	// 7. 握手成功!现在在 gRPC 流和 yamux 流之间建立双向数据拷贝
 | 
				
			|||
	errChan := make(chan error, 2) | 
				
			|||
 | 
				
			|||
	// Goroutine 1: gRPC 请求流 (来自 App) -> yamux 流 (下行数据)
 | 
				
			|||
	go func() { | 
				
			|||
		// 这个方向的逻辑没有问题
 | 
				
			|||
		for { | 
				
			|||
			msg, err := stream.Recv() | 
				
			|||
			if err == io.EOF { | 
				
			|||
				backendConn.Close() | 
				
			|||
				errChan <- nil | 
				
			|||
				return | 
				
			|||
			} | 
				
			|||
			if err != nil { | 
				
			|||
				errChan <- err | 
				
			|||
				return | 
				
			|||
			} | 
				
			|||
			if chunk := msg.GetBodyChunk(); chunk != nil { | 
				
			|||
				if _, err := backendConn.Write(chunk.Data); err != nil { | 
				
			|||
					errChan <- err | 
				
			|||
					return | 
				
			|||
				} | 
				
			|||
			} | 
				
			|||
		} | 
				
			|||
	}() | 
				
			|||
 | 
				
			|||
	// Goroutine 2: yamux 流 (来自设备) -> gRPC 响应流 (上行数据)
 | 
				
			|||
	go func() { | 
				
			|||
		// [核心修正]
 | 
				
			|||
		// 我们必须从 backendReader (而不是原始的 backendConn) 开始读取,
 | 
				
			|||
		// 以确保 http.ReadResponse 预读到缓冲区的数据不会丢失。
 | 
				
			|||
		// io.Copy 会首先清空 backendReader 的内部缓冲区,然后再继续从底层的 backendConn 读取。
 | 
				
			|||
		if _, err := io.Copy(&grpcResponseWriter{stream: stream}, backendReader); err != nil { | 
				
			|||
			// 过滤掉正常的连接关闭错误
 | 
				
			|||
			if err != io.EOF && err != io.ErrClosedPipe && !strings.Contains(err.Error(), "use of closed") { | 
				
			|||
				errChan <- err | 
				
			|||
			} else { | 
				
			|||
				errChan <- nil | 
				
			|||
			} | 
				
			|||
		} else { | 
				
			|||
			errChan <- nil | 
				
			|||
		} | 
				
			|||
	}() | 
				
			|||
 | 
				
			|||
	// 等待两个 goroutine 都结束
 | 
				
			|||
	err1 := <-errChan | 
				
			|||
	err2 := <-errChan | 
				
			|||
 | 
				
			|||
	if err1 != nil && err1 != io.EOF { | 
				
			|||
		log.Printf("WebSocket stream finished with error: %v", err1) | 
				
			|||
		return err1 | 
				
			|||
	} | 
				
			|||
	if err2 != nil && err2 != io.EOF { | 
				
			|||
		log.Printf("WebSocket stream finished with error: %v", err2) | 
				
			|||
		return err2 | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	log.Printf("WebSocket stream for device %s finished gracefully.", deviceSN) | 
				
			|||
	return nil | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// [新增] handleHTTPProxy 包含了原来 ProxyRequest 的所有逻辑
 | 
				
			|||
func (s *InternalRelayServer) handleHTTPProxy(stream relaypb.InternalRelay_ProxyRequestServer, sessionInfo *session.SessionInfo, deviceSN string, header *relaypb.ProxyRequestHeader) error { | 
				
			|||
	// 这部分代码就是你之前工作正常的、使用 io.Pipe 和 ReverseProxy 的完整流式版本
 | 
				
			|||
	// 我直接粘贴过来
 | 
				
			|||
 | 
				
			|||
	// --- 3. 创建请求和响应的管道 ---
 | 
				
			|||
	reqPr, reqPw := io.Pipe() | 
				
			|||
	req := httptest.NewRequest(header.Method, "http://internal-proxy"+header.Url, reqPr) | 
				
			|||
	for k, v := range header.Headers { | 
				
			|||
		req.Header.Set(k, v) | 
				
			|||
	} | 
				
			|||
	req.Header.Set("X-Forwarded-For", header.RemoteAddr) | 
				
			|||
 | 
				
			|||
	respPr, respPw := io.Pipe() | 
				
			|||
	customResponseWriter := &streamResponseWriter{ | 
				
			|||
		header:        make(http.Header), | 
				
			|||
		pipeWriter:    respPw, | 
				
			|||
		headerWritten: make(chan struct{}), | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	// --- 4. 启动 Goroutines ---
 | 
				
			|||
	go func() { | 
				
			|||
		defer reqPw.Close() | 
				
			|||
		for { | 
				
			|||
			bodyMsg, err := stream.Recv() | 
				
			|||
			if err == io.EOF { | 
				
			|||
				return | 
				
			|||
			} | 
				
			|||
			if err != nil { | 
				
			|||
				reqPw.CloseWithError(err) | 
				
			|||
				return | 
				
			|||
			} | 
				
			|||
			if bodyChunk := bodyMsg.GetBodyChunk(); bodyChunk != nil { | 
				
			|||
				if _, err := reqPw.Write(bodyChunk.Data); err != nil { | 
				
			|||
					return | 
				
			|||
				} | 
				
			|||
			} | 
				
			|||
		} | 
				
			|||
	}() | 
				
			|||
 | 
				
			|||
	errChan := make(chan error, 1) | 
				
			|||
	go func() { | 
				
			|||
		defer close(errChan) | 
				
			|||
		<-customResponseWriter.headerWritten | 
				
			|||
		// b. [修正] 完整地构造 gRPC 响应头
 | 
				
			|||
		respHeaderMsg := &relaypb.ProxyResponseMessage{ | 
				
			|||
			Payload: &relaypb.ProxyResponseMessage_Header{ | 
				
			|||
				Header: &relaypb.ProxyResponseHeader{ | 
				
			|||
					StatusCode: int32(customResponseWriter.statusCode), | 
				
			|||
					Headers:    make(map[string]string), | 
				
			|||
				}, | 
				
			|||
			}, | 
				
			|||
		} | 
				
			|||
		for k, v := range customResponseWriter.header { | 
				
			|||
			respHeaderMsg.GetHeader().Headers[k] = strings.Join(v, ",") | 
				
			|||
		} | 
				
			|||
		if err := stream.Send(respHeaderMsg); err != nil { | 
				
			|||
			errChan <- err | 
				
			|||
			return | 
				
			|||
		} | 
				
			|||
 | 
				
			|||
		buf := make([]byte, 1024*32) | 
				
			|||
		if _, err := io.CopyBuffer(&grpcResponseWriter{stream: stream}, respPr, buf); err != nil { | 
				
			|||
			if err != io.ErrClosedPipe { | 
				
			|||
				errChan <- err | 
				
			|||
			} | 
				
			|||
		} | 
				
			|||
	}() | 
				
			|||
 | 
				
			|||
	// --- 5. 执行代理 ---
 | 
				
			|||
	proxy := session.CreateReverseProxy(sessionInfo, deviceSN, req.URL.Path, req.URL.RawQuery) | 
				
			|||
	proxy.ServeHTTP(customResponseWriter, req) | 
				
			|||
 | 
				
			|||
	// --- 6. 清理 ---
 | 
				
			|||
	respPw.Close() | 
				
			|||
	return <-errChan | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// sendForbiddenResponse 是一个辅助函数,用于发送模拟的 403 响应
 | 
				
			|||
func sendForbiddenResponse(stream relaypb.InternalRelay_ProxyRequestServer) error { | 
				
			|||
	respHeader := &relaypb.ProxyResponseMessage{ | 
				
			|||
		Payload: &relaypb.ProxyResponseMessage_Header{ | 
				
			|||
			Header: &relaypb.ProxyResponseHeader{ | 
				
			|||
				StatusCode: http.StatusForbidden, | 
				
			|||
				Headers:    map[string]string{"Content-Type": "text/plain; charset=utf-8"}, | 
				
			|||
			}, | 
				
			|||
		}, | 
				
			|||
	} | 
				
			|||
	if err := stream.Send(respHeader); err != nil { | 
				
			|||
		return err | 
				
			|||
	} | 
				
			|||
	respBody := &relaypb.ProxyResponseMessage{ | 
				
			|||
		Payload: &relaypb.ProxyResponseMessage_BodyChunk{ | 
				
			|||
			BodyChunk: &relaypb.ProxyResponseBodyChunk{Data: []byte("Forbidden")}, | 
				
			|||
		}, | 
				
			|||
	} | 
				
			|||
	stream.Send(respBody) | 
				
			|||
	return nil // 正常关闭流
 | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// streamResponseWriter 是一个自定义的 http.ResponseWriter
 | 
				
			|||
type streamResponseWriter struct { | 
				
			|||
	header        http.Header | 
				
			|||
	pipeWriter    *io.PipeWriter | 
				
			|||
	statusCode    int | 
				
			|||
	headerWritten chan struct{} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (w *streamResponseWriter) Header() http.Header { | 
				
			|||
	return w.header | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (w *streamResponseWriter) Write(b []byte) (int, error) { | 
				
			|||
	w.WriteHeader(http.StatusOK) | 
				
			|||
	return w.pipeWriter.Write(b) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (w *streamResponseWriter) WriteHeader(statusCode int) { | 
				
			|||
	select { | 
				
			|||
	case <-w.headerWritten: | 
				
			|||
		return | 
				
			|||
	default: | 
				
			|||
		w.statusCode = statusCode | 
				
			|||
		close(w.headerWritten) | 
				
			|||
	} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// grpcResponseWriter 是一个适配器,实现了 io.Writer 接口
 | 
				
			|||
type grpcResponseWriter struct { | 
				
			|||
	stream relaypb.InternalRelay_ProxyRequestServer | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (w *grpcResponseWriter) Write(p []byte) (n int, err error) { | 
				
			|||
	err = w.stream.Send(&relaypb.ProxyResponseMessage{ | 
				
			|||
		Payload: &relaypb.ProxyResponseMessage_BodyChunk{ | 
				
			|||
			BodyChunk: &relaypb.ProxyResponseBodyChunk{Data: p}, | 
				
			|||
		}, | 
				
			|||
	}) | 
				
			|||
	if err != nil { | 
				
			|||
		return 0, err | 
				
			|||
	} | 
				
			|||
	return len(p), nil | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// writeChunk 辅助函数 - 确保这个函数也存在于你的 grpc/server.go 文件中
 | 
				
			|||
func writeChunk(stream relaypb.InternalRelay_ProxyRequestServer, data []byte) error { | 
				
			|||
	return stream.Send(&relaypb.ProxyResponseMessage{ | 
				
			|||
		Payload: &relaypb.ProxyResponseMessage_BodyChunk{ | 
				
			|||
			BodyChunk: &relaypb.ProxyResponseBodyChunk{Data: data}, | 
				
			|||
		}, | 
				
			|||
	}) | 
				
			|||
} | 
				
			|||
@ -0,0 +1,68 @@ | 
				
			|||
package peer | 
				
			|||
 | 
				
			|||
import ( | 
				
			|||
	"context" | 
				
			|||
	"log" | 
				
			|||
	"memobus_relay_server/config" | 
				
			|||
	"sync" | 
				
			|||
 | 
				
			|||
	"github.com/redis/go-redis/v9" | 
				
			|||
	"google.golang.org/grpc" | 
				
			|||
	"google.golang.org/grpc/credentials/insecure" | 
				
			|||
) | 
				
			|||
 | 
				
			|||
// Manager 负责管理到其他对等服务器的 gRPC 客户端连接
 | 
				
			|||
type Manager struct { | 
				
			|||
	redisClient *redis.Client | 
				
			|||
	clients     map[string]*grpc.ClientConn | 
				
			|||
	mu          sync.RWMutex | 
				
			|||
} | 
				
			|||
 | 
				
			|||
var GlobalManager *Manager | 
				
			|||
 | 
				
			|||
func InitManager(redisCli *redis.Client) { | 
				
			|||
	if !config.Cfg.Redis.Enabled { | 
				
			|||
		return // 单机模式下不需要 Peer 管理器
 | 
				
			|||
	} | 
				
			|||
	GlobalManager = &Manager{ | 
				
			|||
		redisClient: redisCli, | 
				
			|||
		clients:     make(map[string]*grpc.ClientConn), | 
				
			|||
	} | 
				
			|||
	log.Println("Peer manager initialized for cluster communication.") | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// GetClient 查找或创建一个到目标实例的 gRPC 客户端连接
 | 
				
			|||
func (m *Manager) GetClient(targetInstanceID string) (*grpc.ClientConn, error) { | 
				
			|||
	m.mu.RLock() | 
				
			|||
	client, ok := m.clients[targetInstanceID] | 
				
			|||
	m.mu.RUnlock() | 
				
			|||
 | 
				
			|||
	if ok { | 
				
			|||
		return client, nil | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	// 连接未找到, 使用写锁创建一个新的
 | 
				
			|||
	m.mu.Lock() | 
				
			|||
	defer m.mu.Unlock() | 
				
			|||
 | 
				
			|||
	// 双重检查, 以防在我们等待锁的时候, 其他 goroutine 已经创建了它
 | 
				
			|||
	if client, ok = m.clients[targetInstanceID]; ok { | 
				
			|||
		return client, nil | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	// 从 Redis 发现目标实例的地址
 | 
				
			|||
	addr, err := m.redisClient.HGet(context.Background(), config.Cfg.Redis.InstanceRegistryKey, targetInstanceID).Result() | 
				
			|||
	if err != nil { | 
				
			|||
		return nil, err | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	log.Printf("Creating new gRPC client connection to peer %s at %s", targetInstanceID, addr) | 
				
			|||
	// 生产环境应使用 TLS 凭证替换 insecure
 | 
				
			|||
	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) | 
				
			|||
	if err != nil { | 
				
			|||
		return nil, err | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	m.clients[targetInstanceID] = conn | 
				
			|||
	return conn, nil | 
				
			|||
} | 
				
			|||
@ -0,0 +1,96 @@ | 
				
			|||
// 指定使用 proto3 语法。 | 
				
			|||
syntax = "proto3"; | 
				
			|||
 | 
				
			|||
// 定义包名。在 Go 中,这会影响生成的代码所在的目录结构和包声明。 | 
				
			|||
package relay; | 
				
			|||
 | 
				
			|||
// 指定生成的 Go 代码的包路径。 | 
				
			|||
option go_package = "relay_server/proto"; | 
				
			|||
 | 
				
			|||
// ----------------------------------------------------------------------------- | 
				
			|||
// 服务定义 (Service Definition) | 
				
			|||
// ----------------------------------------------------------------------------- | 
				
			|||
 | 
				
			|||
// InternalRelay 服务定义了服务器实例之间内部通信的 RPC 方法。 | 
				
			|||
service InternalRelay { | 
				
			|||
  // ProxyRequest 是一个双向流式 RPC。 | 
				
			|||
  // "stream" 关键字表示客户端和服务器都可以连续地发送一系列消息, | 
				
			|||
  // 这对于传输大文件或实时数据流(如视频)至关重要,可以避免将整个内容加载到内存中。 | 
				
			|||
  rpc ProxyRequest(stream ProxyRequestMessage) returns (stream ProxyResponseMessage); | 
				
			|||
} | 
				
			|||
 | 
				
			|||
 | 
				
			|||
// ----------------------------------------------------------------------------- | 
				
			|||
// 请求消息定义 (Request Messages) | 
				
			|||
// ----------------------------------------------------------------------------- | 
				
			|||
 | 
				
			|||
// ProxyRequestMessage 是从“代理实例”(接收App请求的实例) | 
				
			|||
// 发送到“目标实例”(持有设备连接的实例)的消息。 | 
				
			|||
// | 
				
			|||
// 使用 `oneof` 结构可以确保每个消息要么是请求头,要么是请求体的一部分, | 
				
			|||
// 这使得在接收端处理消息时逻辑更清晰、更安全。 | 
				
			|||
message ProxyRequestMessage { | 
				
			|||
  oneof payload { | 
				
			|||
    ProxyRequestHeader header = 1; | 
				
			|||
    ProxyRequestBodyChunk body_chunk = 2; | 
				
			|||
  } | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// ProxyRequestHeader 包含了重建原始 HTTP 请求所需的所有元数据。 | 
				
			|||
// 这个消息必须是客户端发送的第一个消息。 | 
				
			|||
message ProxyRequestHeader { | 
				
			|||
  // HTTP 方法, 例如 "GET", "POST", "PUT" 等。 | 
				
			|||
  string method = 1; | 
				
			|||
 | 
				
			|||
  // 完整的请求 URL 路径,包括查询参数。 | 
				
			|||
  // 例如 "/tunnel/DEVICE_SN_123/api/album?page=1&size=10" | 
				
			|||
  string url = 2; | 
				
			|||
 | 
				
			|||
  // 原始的 HTTP 请求头。 | 
				
			|||
  // `map` 类型非常适合用来表示键值对集合。 | 
				
			|||
  map<string, string> headers = 3; | 
				
			|||
 | 
				
			|||
  // 原始 App 客户端的 IP 地址和端口,用于日志记录或 X-Forwarded-For 头。 | 
				
			|||
  string remote_addr = 4; | 
				
			|||
 | 
				
			|||
  // 经过认证的 App 用户的 ID,用于在目标实例上进行授权检查。 | 
				
			|||
  string app_user_id = 5; | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// ProxyRequestBodyChunk 包含了一小块 HTTP 请求体的数据。 | 
				
			|||
// 通过将请求体分割成多个 chunk 进行流式传输, | 
				
			|||
// 我们可以处理任意大小的上传文件,而不会耗尽服务器内存。 | 
				
			|||
message ProxyRequestBodyChunk { | 
				
			|||
  bytes data = 1; | 
				
			|||
} | 
				
			|||
 | 
				
			|||
 | 
				
			|||
// ----------------------------------------------------------------------------- | 
				
			|||
// 响应消息定义 (Response Messages) | 
				
			|||
// ----------------------------------------------------------------------------- | 
				
			|||
 | 
				
			|||
// ProxyResponseMessage 是从“目标实例”发送回“代理实例”的消息。 | 
				
			|||
// 同样使用 `oneof` 来区分响应头和响应体。 | 
				
			|||
message ProxyResponseMessage { | 
				
			|||
  oneof payload { | 
				
			|||
    ProxyResponseHeader header = 1; | 
				
			|||
    ProxyResponseBodyChunk body_chunk = 2; | 
				
			|||
  } | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// ProxyResponseHeader 包含了 HTTP 响应的元数据。 | 
				
			|||
// 这个消息必须是服务器端在流中发送的第一个消息。 | 
				
			|||
message ProxyResponseHeader { | 
				
			|||
  // HTTP 状态码, 例如 200, 404, 500。 | 
				
			|||
  int32 status_code = 1; | 
				
			|||
 | 
				
			|||
  // HTTP 响应头。 | 
				
			|||
  map<string, string> headers = 2; | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// ProxyResponseBodyChunk 包含了一小块 HTTP 响应体的数据。 | 
				
			|||
// 这使得视频播放、大文件下载等场景可以实现流式传输, | 
				
			|||
// App 客户端可以边接收数据边处理,而无需等待整个文件下载完成。 | 
				
			|||
message ProxyResponseBodyChunk { | 
				
			|||
  bytes data = 1; | 
				
			|||
} | 
				
			|||
@ -0,0 +1,654 @@ | 
				
			|||
// 指定使用 proto3 语法。
 | 
				
			|||
 | 
				
			|||
// Code generated by protoc-gen-go. DO NOT EDIT.
 | 
				
			|||
// versions:
 | 
				
			|||
// 	protoc-gen-go v1.28.1
 | 
				
			|||
// 	protoc        v6.33.0
 | 
				
			|||
// source: proto/relay.proto
 | 
				
			|||
 | 
				
			|||
// 定义包名。在 Go 中,这会影响生成的代码所在的目录结构和包声明。
 | 
				
			|||
 | 
				
			|||
package proto | 
				
			|||
 | 
				
			|||
import ( | 
				
			|||
	protoreflect "google.golang.org/protobuf/reflect/protoreflect" | 
				
			|||
	protoimpl "google.golang.org/protobuf/runtime/protoimpl" | 
				
			|||
	reflect "reflect" | 
				
			|||
	sync "sync" | 
				
			|||
) | 
				
			|||
 | 
				
			|||
const ( | 
				
			|||
	// Verify that this generated code is sufficiently up-to-date.
 | 
				
			|||
	_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) | 
				
			|||
	// Verify that runtime/protoimpl is sufficiently up-to-date.
 | 
				
			|||
	_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) | 
				
			|||
) | 
				
			|||
 | 
				
			|||
// ProxyRequestMessage 是从“代理实例”(接收App请求的实例)
 | 
				
			|||
// 发送到“目标实例”(持有设备连接的实例)的消息。
 | 
				
			|||
//
 | 
				
			|||
// 使用 `oneof` 结构可以确保每个消息要么是请求头,要么是请求体的一部分,
 | 
				
			|||
// 这使得在接收端处理消息时逻辑更清晰、更安全。
 | 
				
			|||
type ProxyRequestMessage struct { | 
				
			|||
	state         protoimpl.MessageState | 
				
			|||
	sizeCache     protoimpl.SizeCache | 
				
			|||
	unknownFields protoimpl.UnknownFields | 
				
			|||
 | 
				
			|||
	// Types that are assignable to Payload:
 | 
				
			|||
	//
 | 
				
			|||
	//	*ProxyRequestMessage_Header
 | 
				
			|||
	//	*ProxyRequestMessage_BodyChunk
 | 
				
			|||
	Payload isProxyRequestMessage_Payload `protobuf_oneof:"payload"` | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyRequestMessage) Reset() { | 
				
			|||
	*x = ProxyRequestMessage{} | 
				
			|||
	if protoimpl.UnsafeEnabled { | 
				
			|||
		mi := &file_proto_relay_proto_msgTypes[0] | 
				
			|||
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) | 
				
			|||
		ms.StoreMessageInfo(mi) | 
				
			|||
	} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyRequestMessage) String() string { | 
				
			|||
	return protoimpl.X.MessageStringOf(x) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (*ProxyRequestMessage) ProtoMessage() {} | 
				
			|||
 | 
				
			|||
func (x *ProxyRequestMessage) ProtoReflect() protoreflect.Message { | 
				
			|||
	mi := &file_proto_relay_proto_msgTypes[0] | 
				
			|||
	if protoimpl.UnsafeEnabled && x != nil { | 
				
			|||
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) | 
				
			|||
		if ms.LoadMessageInfo() == nil { | 
				
			|||
			ms.StoreMessageInfo(mi) | 
				
			|||
		} | 
				
			|||
		return ms | 
				
			|||
	} | 
				
			|||
	return mi.MessageOf(x) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// Deprecated: Use ProxyRequestMessage.ProtoReflect.Descriptor instead.
 | 
				
			|||
func (*ProxyRequestMessage) Descriptor() ([]byte, []int) { | 
				
			|||
	return file_proto_relay_proto_rawDescGZIP(), []int{0} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (m *ProxyRequestMessage) GetPayload() isProxyRequestMessage_Payload { | 
				
			|||
	if m != nil { | 
				
			|||
		return m.Payload | 
				
			|||
	} | 
				
			|||
	return nil | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyRequestMessage) GetHeader() *ProxyRequestHeader { | 
				
			|||
	if x, ok := x.GetPayload().(*ProxyRequestMessage_Header); ok { | 
				
			|||
		return x.Header | 
				
			|||
	} | 
				
			|||
	return nil | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyRequestMessage) GetBodyChunk() *ProxyRequestBodyChunk { | 
				
			|||
	if x, ok := x.GetPayload().(*ProxyRequestMessage_BodyChunk); ok { | 
				
			|||
		return x.BodyChunk | 
				
			|||
	} | 
				
			|||
	return nil | 
				
			|||
} | 
				
			|||
 | 
				
			|||
type isProxyRequestMessage_Payload interface { | 
				
			|||
	isProxyRequestMessage_Payload() | 
				
			|||
} | 
				
			|||
 | 
				
			|||
type ProxyRequestMessage_Header struct { | 
				
			|||
	Header *ProxyRequestHeader `protobuf:"bytes,1,opt,name=header,proto3,oneof"` | 
				
			|||
} | 
				
			|||
 | 
				
			|||
type ProxyRequestMessage_BodyChunk struct { | 
				
			|||
	BodyChunk *ProxyRequestBodyChunk `protobuf:"bytes,2,opt,name=body_chunk,json=bodyChunk,proto3,oneof"` | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (*ProxyRequestMessage_Header) isProxyRequestMessage_Payload() {} | 
				
			|||
 | 
				
			|||
func (*ProxyRequestMessage_BodyChunk) isProxyRequestMessage_Payload() {} | 
				
			|||
 | 
				
			|||
// ProxyRequestHeader 包含了重建原始 HTTP 请求所需的所有元数据。
 | 
				
			|||
// 这个消息必须是客户端发送的第一个消息。
 | 
				
			|||
type ProxyRequestHeader struct { | 
				
			|||
	state         protoimpl.MessageState | 
				
			|||
	sizeCache     protoimpl.SizeCache | 
				
			|||
	unknownFields protoimpl.UnknownFields | 
				
			|||
 | 
				
			|||
	// HTTP 方法, 例如 "GET", "POST", "PUT" 等。
 | 
				
			|||
	Method string `protobuf:"bytes,1,opt,name=method,proto3" json:"method,omitempty"` | 
				
			|||
	// 完整的请求 URL 路径,包括查询参数。
 | 
				
			|||
	// 例如 "/tunnel/DEVICE_SN_123/api/album?page=1&size=10"
 | 
				
			|||
	Url string `protobuf:"bytes,2,opt,name=url,proto3" json:"url,omitempty"` | 
				
			|||
	// 原始的 HTTP 请求头。
 | 
				
			|||
	// `map` 类型非常适合用来表示键值对集合。
 | 
				
			|||
	Headers map[string]string `protobuf:"bytes,3,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` | 
				
			|||
	// 原始 App 客户端的 IP 地址和端口,用于日志记录或 X-Forwarded-For 头。
 | 
				
			|||
	RemoteAddr string `protobuf:"bytes,4,opt,name=remote_addr,json=remoteAddr,proto3" json:"remote_addr,omitempty"` | 
				
			|||
	// 经过认证的 App 用户的 ID,用于在目标实例上进行授权检查。
 | 
				
			|||
	AppUserId string `protobuf:"bytes,5,opt,name=app_user_id,json=appUserId,proto3" json:"app_user_id,omitempty"` | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyRequestHeader) Reset() { | 
				
			|||
	*x = ProxyRequestHeader{} | 
				
			|||
	if protoimpl.UnsafeEnabled { | 
				
			|||
		mi := &file_proto_relay_proto_msgTypes[1] | 
				
			|||
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) | 
				
			|||
		ms.StoreMessageInfo(mi) | 
				
			|||
	} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyRequestHeader) String() string { | 
				
			|||
	return protoimpl.X.MessageStringOf(x) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (*ProxyRequestHeader) ProtoMessage() {} | 
				
			|||
 | 
				
			|||
func (x *ProxyRequestHeader) ProtoReflect() protoreflect.Message { | 
				
			|||
	mi := &file_proto_relay_proto_msgTypes[1] | 
				
			|||
	if protoimpl.UnsafeEnabled && x != nil { | 
				
			|||
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) | 
				
			|||
		if ms.LoadMessageInfo() == nil { | 
				
			|||
			ms.StoreMessageInfo(mi) | 
				
			|||
		} | 
				
			|||
		return ms | 
				
			|||
	} | 
				
			|||
	return mi.MessageOf(x) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// Deprecated: Use ProxyRequestHeader.ProtoReflect.Descriptor instead.
 | 
				
			|||
func (*ProxyRequestHeader) Descriptor() ([]byte, []int) { | 
				
			|||
	return file_proto_relay_proto_rawDescGZIP(), []int{1} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyRequestHeader) GetMethod() string { | 
				
			|||
	if x != nil { | 
				
			|||
		return x.Method | 
				
			|||
	} | 
				
			|||
	return "" | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyRequestHeader) GetUrl() string { | 
				
			|||
	if x != nil { | 
				
			|||
		return x.Url | 
				
			|||
	} | 
				
			|||
	return "" | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyRequestHeader) GetHeaders() map[string]string { | 
				
			|||
	if x != nil { | 
				
			|||
		return x.Headers | 
				
			|||
	} | 
				
			|||
	return nil | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyRequestHeader) GetRemoteAddr() string { | 
				
			|||
	if x != nil { | 
				
			|||
		return x.RemoteAddr | 
				
			|||
	} | 
				
			|||
	return "" | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyRequestHeader) GetAppUserId() string { | 
				
			|||
	if x != nil { | 
				
			|||
		return x.AppUserId | 
				
			|||
	} | 
				
			|||
	return "" | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// ProxyRequestBodyChunk 包含了一小块 HTTP 请求体的数据。
 | 
				
			|||
// 通过将请求体分割成多个 chunk 进行流式传输,
 | 
				
			|||
// 我们可以处理任意大小的上传文件,而不会耗尽服务器内存。
 | 
				
			|||
type ProxyRequestBodyChunk struct { | 
				
			|||
	state         protoimpl.MessageState | 
				
			|||
	sizeCache     protoimpl.SizeCache | 
				
			|||
	unknownFields protoimpl.UnknownFields | 
				
			|||
 | 
				
			|||
	Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyRequestBodyChunk) Reset() { | 
				
			|||
	*x = ProxyRequestBodyChunk{} | 
				
			|||
	if protoimpl.UnsafeEnabled { | 
				
			|||
		mi := &file_proto_relay_proto_msgTypes[2] | 
				
			|||
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) | 
				
			|||
		ms.StoreMessageInfo(mi) | 
				
			|||
	} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyRequestBodyChunk) String() string { | 
				
			|||
	return protoimpl.X.MessageStringOf(x) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (*ProxyRequestBodyChunk) ProtoMessage() {} | 
				
			|||
 | 
				
			|||
func (x *ProxyRequestBodyChunk) ProtoReflect() protoreflect.Message { | 
				
			|||
	mi := &file_proto_relay_proto_msgTypes[2] | 
				
			|||
	if protoimpl.UnsafeEnabled && x != nil { | 
				
			|||
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) | 
				
			|||
		if ms.LoadMessageInfo() == nil { | 
				
			|||
			ms.StoreMessageInfo(mi) | 
				
			|||
		} | 
				
			|||
		return ms | 
				
			|||
	} | 
				
			|||
	return mi.MessageOf(x) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// Deprecated: Use ProxyRequestBodyChunk.ProtoReflect.Descriptor instead.
 | 
				
			|||
func (*ProxyRequestBodyChunk) Descriptor() ([]byte, []int) { | 
				
			|||
	return file_proto_relay_proto_rawDescGZIP(), []int{2} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyRequestBodyChunk) GetData() []byte { | 
				
			|||
	if x != nil { | 
				
			|||
		return x.Data | 
				
			|||
	} | 
				
			|||
	return nil | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// ProxyResponseMessage 是从“目标实例”发送回“代理实例”的消息。
 | 
				
			|||
// 同样使用 `oneof` 来区分响应头和响应体。
 | 
				
			|||
type ProxyResponseMessage struct { | 
				
			|||
	state         protoimpl.MessageState | 
				
			|||
	sizeCache     protoimpl.SizeCache | 
				
			|||
	unknownFields protoimpl.UnknownFields | 
				
			|||
 | 
				
			|||
	// Types that are assignable to Payload:
 | 
				
			|||
	//
 | 
				
			|||
	//	*ProxyResponseMessage_Header
 | 
				
			|||
	//	*ProxyResponseMessage_BodyChunk
 | 
				
			|||
	Payload isProxyResponseMessage_Payload `protobuf_oneof:"payload"` | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyResponseMessage) Reset() { | 
				
			|||
	*x = ProxyResponseMessage{} | 
				
			|||
	if protoimpl.UnsafeEnabled { | 
				
			|||
		mi := &file_proto_relay_proto_msgTypes[3] | 
				
			|||
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) | 
				
			|||
		ms.StoreMessageInfo(mi) | 
				
			|||
	} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyResponseMessage) String() string { | 
				
			|||
	return protoimpl.X.MessageStringOf(x) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (*ProxyResponseMessage) ProtoMessage() {} | 
				
			|||
 | 
				
			|||
func (x *ProxyResponseMessage) ProtoReflect() protoreflect.Message { | 
				
			|||
	mi := &file_proto_relay_proto_msgTypes[3] | 
				
			|||
	if protoimpl.UnsafeEnabled && x != nil { | 
				
			|||
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) | 
				
			|||
		if ms.LoadMessageInfo() == nil { | 
				
			|||
			ms.StoreMessageInfo(mi) | 
				
			|||
		} | 
				
			|||
		return ms | 
				
			|||
	} | 
				
			|||
	return mi.MessageOf(x) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// Deprecated: Use ProxyResponseMessage.ProtoReflect.Descriptor instead.
 | 
				
			|||
func (*ProxyResponseMessage) Descriptor() ([]byte, []int) { | 
				
			|||
	return file_proto_relay_proto_rawDescGZIP(), []int{3} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (m *ProxyResponseMessage) GetPayload() isProxyResponseMessage_Payload { | 
				
			|||
	if m != nil { | 
				
			|||
		return m.Payload | 
				
			|||
	} | 
				
			|||
	return nil | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyResponseMessage) GetHeader() *ProxyResponseHeader { | 
				
			|||
	if x, ok := x.GetPayload().(*ProxyResponseMessage_Header); ok { | 
				
			|||
		return x.Header | 
				
			|||
	} | 
				
			|||
	return nil | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyResponseMessage) GetBodyChunk() *ProxyResponseBodyChunk { | 
				
			|||
	if x, ok := x.GetPayload().(*ProxyResponseMessage_BodyChunk); ok { | 
				
			|||
		return x.BodyChunk | 
				
			|||
	} | 
				
			|||
	return nil | 
				
			|||
} | 
				
			|||
 | 
				
			|||
type isProxyResponseMessage_Payload interface { | 
				
			|||
	isProxyResponseMessage_Payload() | 
				
			|||
} | 
				
			|||
 | 
				
			|||
type ProxyResponseMessage_Header struct { | 
				
			|||
	Header *ProxyResponseHeader `protobuf:"bytes,1,opt,name=header,proto3,oneof"` | 
				
			|||
} | 
				
			|||
 | 
				
			|||
type ProxyResponseMessage_BodyChunk struct { | 
				
			|||
	BodyChunk *ProxyResponseBodyChunk `protobuf:"bytes,2,opt,name=body_chunk,json=bodyChunk,proto3,oneof"` | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (*ProxyResponseMessage_Header) isProxyResponseMessage_Payload() {} | 
				
			|||
 | 
				
			|||
func (*ProxyResponseMessage_BodyChunk) isProxyResponseMessage_Payload() {} | 
				
			|||
 | 
				
			|||
// ProxyResponseHeader 包含了 HTTP 响应的元数据。
 | 
				
			|||
// 这个消息必须是服务器端在流中发送的第一个消息。
 | 
				
			|||
type ProxyResponseHeader struct { | 
				
			|||
	state         protoimpl.MessageState | 
				
			|||
	sizeCache     protoimpl.SizeCache | 
				
			|||
	unknownFields protoimpl.UnknownFields | 
				
			|||
 | 
				
			|||
	// HTTP 状态码, 例如 200, 404, 500。
 | 
				
			|||
	StatusCode int32 `protobuf:"varint,1,opt,name=status_code,json=statusCode,proto3" json:"status_code,omitempty"` | 
				
			|||
	// HTTP 响应头。
 | 
				
			|||
	Headers map[string]string `protobuf:"bytes,2,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyResponseHeader) Reset() { | 
				
			|||
	*x = ProxyResponseHeader{} | 
				
			|||
	if protoimpl.UnsafeEnabled { | 
				
			|||
		mi := &file_proto_relay_proto_msgTypes[4] | 
				
			|||
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) | 
				
			|||
		ms.StoreMessageInfo(mi) | 
				
			|||
	} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyResponseHeader) String() string { | 
				
			|||
	return protoimpl.X.MessageStringOf(x) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (*ProxyResponseHeader) ProtoMessage() {} | 
				
			|||
 | 
				
			|||
func (x *ProxyResponseHeader) ProtoReflect() protoreflect.Message { | 
				
			|||
	mi := &file_proto_relay_proto_msgTypes[4] | 
				
			|||
	if protoimpl.UnsafeEnabled && x != nil { | 
				
			|||
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) | 
				
			|||
		if ms.LoadMessageInfo() == nil { | 
				
			|||
			ms.StoreMessageInfo(mi) | 
				
			|||
		} | 
				
			|||
		return ms | 
				
			|||
	} | 
				
			|||
	return mi.MessageOf(x) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// Deprecated: Use ProxyResponseHeader.ProtoReflect.Descriptor instead.
 | 
				
			|||
func (*ProxyResponseHeader) Descriptor() ([]byte, []int) { | 
				
			|||
	return file_proto_relay_proto_rawDescGZIP(), []int{4} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyResponseHeader) GetStatusCode() int32 { | 
				
			|||
	if x != nil { | 
				
			|||
		return x.StatusCode | 
				
			|||
	} | 
				
			|||
	return 0 | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyResponseHeader) GetHeaders() map[string]string { | 
				
			|||
	if x != nil { | 
				
			|||
		return x.Headers | 
				
			|||
	} | 
				
			|||
	return nil | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// ProxyResponseBodyChunk 包含了一小块 HTTP 响应体的数据。
 | 
				
			|||
// 这使得视频播放、大文件下载等场景可以实现流式传输,
 | 
				
			|||
// App 客户端可以边接收数据边处理,而无需等待整个文件下载完成。
 | 
				
			|||
type ProxyResponseBodyChunk struct { | 
				
			|||
	state         protoimpl.MessageState | 
				
			|||
	sizeCache     protoimpl.SizeCache | 
				
			|||
	unknownFields protoimpl.UnknownFields | 
				
			|||
 | 
				
			|||
	Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyResponseBodyChunk) Reset() { | 
				
			|||
	*x = ProxyResponseBodyChunk{} | 
				
			|||
	if protoimpl.UnsafeEnabled { | 
				
			|||
		mi := &file_proto_relay_proto_msgTypes[5] | 
				
			|||
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) | 
				
			|||
		ms.StoreMessageInfo(mi) | 
				
			|||
	} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyResponseBodyChunk) String() string { | 
				
			|||
	return protoimpl.X.MessageStringOf(x) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (*ProxyResponseBodyChunk) ProtoMessage() {} | 
				
			|||
 | 
				
			|||
func (x *ProxyResponseBodyChunk) ProtoReflect() protoreflect.Message { | 
				
			|||
	mi := &file_proto_relay_proto_msgTypes[5] | 
				
			|||
	if protoimpl.UnsafeEnabled && x != nil { | 
				
			|||
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) | 
				
			|||
		if ms.LoadMessageInfo() == nil { | 
				
			|||
			ms.StoreMessageInfo(mi) | 
				
			|||
		} | 
				
			|||
		return ms | 
				
			|||
	} | 
				
			|||
	return mi.MessageOf(x) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// Deprecated: Use ProxyResponseBodyChunk.ProtoReflect.Descriptor instead.
 | 
				
			|||
func (*ProxyResponseBodyChunk) Descriptor() ([]byte, []int) { | 
				
			|||
	return file_proto_relay_proto_rawDescGZIP(), []int{5} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *ProxyResponseBodyChunk) GetData() []byte { | 
				
			|||
	if x != nil { | 
				
			|||
		return x.Data | 
				
			|||
	} | 
				
			|||
	return nil | 
				
			|||
} | 
				
			|||
 | 
				
			|||
var File_proto_relay_proto protoreflect.FileDescriptor | 
				
			|||
 | 
				
			|||
var file_proto_relay_proto_rawDesc = []byte{ | 
				
			|||
	0x0a, 0x11, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x70, 0x72, | 
				
			|||
	0x6f, 0x74, 0x6f, 0x12, 0x05, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x22, 0x94, 0x01, 0x0a, 0x13, 0x50, | 
				
			|||
	0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, | 
				
			|||
	0x67, 0x65, 0x12, 0x33, 0x0a, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, | 
				
			|||
	0x28, 0x0b, 0x32, 0x19, 0x2e, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, | 
				
			|||
	0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x48, 0x00, 0x52, | 
				
			|||
	0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x3d, 0x0a, 0x0a, 0x62, 0x6f, 0x64, 0x79, 0x5f, | 
				
			|||
	0x63, 0x68, 0x75, 0x6e, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x72, 0x65, | 
				
			|||
	0x6c, 0x61, 0x79, 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, | 
				
			|||
	0x42, 0x6f, 0x64, 0x79, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x48, 0x00, 0x52, 0x09, 0x62, 0x6f, 0x64, | 
				
			|||
	0x79, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x42, 0x09, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, | 
				
			|||
	0x64, 0x22, 0xfd, 0x01, 0x0a, 0x12, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, | 
				
			|||
	0x73, 0x74, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x6d, 0x65, 0x74, 0x68, | 
				
			|||
	0x6f, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, | 
				
			|||
	0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, | 
				
			|||
	0x72, 0x6c, 0x12, 0x40, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x03, 0x20, | 
				
			|||
	0x03, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x50, 0x72, 0x6f, 0x78, | 
				
			|||
	0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x2e, 0x48, | 
				
			|||
	0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, | 
				
			|||
	0x64, 0x65, 0x72, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x61, | 
				
			|||
	0x64, 0x64, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x72, 0x65, 0x6d, 0x6f, 0x74, | 
				
			|||
	0x65, 0x41, 0x64, 0x64, 0x72, 0x12, 0x1e, 0x0a, 0x0b, 0x61, 0x70, 0x70, 0x5f, 0x75, 0x73, 0x65, | 
				
			|||
	0x72, 0x5f, 0x69, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x61, 0x70, 0x70, 0x55, | 
				
			|||
	0x73, 0x65, 0x72, 0x49, 0x64, 0x1a, 0x3a, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, | 
				
			|||
	0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, | 
				
			|||
	0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, | 
				
			|||
	0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, | 
				
			|||
	0x01, 0x22, 0x2b, 0x0a, 0x15, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, | 
				
			|||
	0x74, 0x42, 0x6f, 0x64, 0x79, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, | 
				
			|||
	0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x97, | 
				
			|||
	0x01, 0x0a, 0x14, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, | 
				
			|||
	0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x34, 0x0a, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, | 
				
			|||
	0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x2e, | 
				
			|||
	0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x65, 0x61, | 
				
			|||
	0x64, 0x65, 0x72, 0x48, 0x00, 0x52, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x3e, 0x0a, | 
				
			|||
	0x0a, 0x62, 0x6f, 0x64, 0x79, 0x5f, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, | 
				
			|||
	0x0b, 0x32, 0x1d, 0x2e, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, | 
				
			|||
	0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x6f, 0x64, 0x79, 0x43, 0x68, 0x75, 0x6e, 0x6b, | 
				
			|||
	0x48, 0x00, 0x52, 0x09, 0x62, 0x6f, 0x64, 0x79, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x42, 0x09, 0x0a, | 
				
			|||
	0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0xb5, 0x01, 0x0a, 0x13, 0x50, 0x72, 0x6f, | 
				
			|||
	0x78, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, | 
				
			|||
	0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, | 
				
			|||
	0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, | 
				
			|||
	0x65, 0x12, 0x41, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, | 
				
			|||
	0x28, 0x0b, 0x32, 0x27, 0x2e, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, | 
				
			|||
	0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x2e, 0x48, | 
				
			|||
	0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, | 
				
			|||
	0x64, 0x65, 0x72, 0x73, 0x1a, 0x3a, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, | 
				
			|||
	0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, | 
				
			|||
	0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, | 
				
			|||
	0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, | 
				
			|||
	0x22, 0x2c, 0x0a, 0x16, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, | 
				
			|||
	0x65, 0x42, 0x6f, 0x64, 0x79, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, | 
				
			|||
	0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x32, 0x5c, | 
				
			|||
	0x0a, 0x0d, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x52, 0x65, 0x6c, 0x61, 0x79, 0x12, | 
				
			|||
	0x4b, 0x0a, 0x0c, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, | 
				
			|||
	0x1a, 0x2e, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x71, | 
				
			|||
	0x75, 0x65, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1b, 0x2e, 0x72, 0x65, | 
				
			|||
	0x6c, 0x61, 0x79, 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, | 
				
			|||
	0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x14, 0x5a, 0x12, | 
				
			|||
	0x72, 0x65, 0x6c, 0x61, 0x79, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, | 
				
			|||
	0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, | 
				
			|||
} | 
				
			|||
 | 
				
			|||
var ( | 
				
			|||
	file_proto_relay_proto_rawDescOnce sync.Once | 
				
			|||
	file_proto_relay_proto_rawDescData = file_proto_relay_proto_rawDesc | 
				
			|||
) | 
				
			|||
 | 
				
			|||
func file_proto_relay_proto_rawDescGZIP() []byte { | 
				
			|||
	file_proto_relay_proto_rawDescOnce.Do(func() { | 
				
			|||
		file_proto_relay_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_relay_proto_rawDescData) | 
				
			|||
	}) | 
				
			|||
	return file_proto_relay_proto_rawDescData | 
				
			|||
} | 
				
			|||
 | 
				
			|||
var file_proto_relay_proto_msgTypes = make([]protoimpl.MessageInfo, 8) | 
				
			|||
var file_proto_relay_proto_goTypes = []interface{}{ | 
				
			|||
	(*ProxyRequestMessage)(nil),    // 0: relay.ProxyRequestMessage
 | 
				
			|||
	(*ProxyRequestHeader)(nil),     // 1: relay.ProxyRequestHeader
 | 
				
			|||
	(*ProxyRequestBodyChunk)(nil),  // 2: relay.ProxyRequestBodyChunk
 | 
				
			|||
	(*ProxyResponseMessage)(nil),   // 3: relay.ProxyResponseMessage
 | 
				
			|||
	(*ProxyResponseHeader)(nil),    // 4: relay.ProxyResponseHeader
 | 
				
			|||
	(*ProxyResponseBodyChunk)(nil), // 5: relay.ProxyResponseBodyChunk
 | 
				
			|||
	nil,                            // 6: relay.ProxyRequestHeader.HeadersEntry
 | 
				
			|||
	nil,                            // 7: relay.ProxyResponseHeader.HeadersEntry
 | 
				
			|||
} | 
				
			|||
var file_proto_relay_proto_depIdxs = []int32{ | 
				
			|||
	1, // 0: relay.ProxyRequestMessage.header:type_name -> relay.ProxyRequestHeader
 | 
				
			|||
	2, // 1: relay.ProxyRequestMessage.body_chunk:type_name -> relay.ProxyRequestBodyChunk
 | 
				
			|||
	6, // 2: relay.ProxyRequestHeader.headers:type_name -> relay.ProxyRequestHeader.HeadersEntry
 | 
				
			|||
	4, // 3: relay.ProxyResponseMessage.header:type_name -> relay.ProxyResponseHeader
 | 
				
			|||
	5, // 4: relay.ProxyResponseMessage.body_chunk:type_name -> relay.ProxyResponseBodyChunk
 | 
				
			|||
	7, // 5: relay.ProxyResponseHeader.headers:type_name -> relay.ProxyResponseHeader.HeadersEntry
 | 
				
			|||
	0, // 6: relay.InternalRelay.ProxyRequest:input_type -> relay.ProxyRequestMessage
 | 
				
			|||
	3, // 7: relay.InternalRelay.ProxyRequest:output_type -> relay.ProxyResponseMessage
 | 
				
			|||
	7, // [7:8] is the sub-list for method output_type
 | 
				
			|||
	6, // [6:7] is the sub-list for method input_type
 | 
				
			|||
	6, // [6:6] is the sub-list for extension type_name
 | 
				
			|||
	6, // [6:6] is the sub-list for extension extendee
 | 
				
			|||
	0, // [0:6] is the sub-list for field type_name
 | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func init() { file_proto_relay_proto_init() } | 
				
			|||
func file_proto_relay_proto_init() { | 
				
			|||
	if File_proto_relay_proto != nil { | 
				
			|||
		return | 
				
			|||
	} | 
				
			|||
	if !protoimpl.UnsafeEnabled { | 
				
			|||
		file_proto_relay_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { | 
				
			|||
			switch v := v.(*ProxyRequestMessage); i { | 
				
			|||
			case 0: | 
				
			|||
				return &v.state | 
				
			|||
			case 1: | 
				
			|||
				return &v.sizeCache | 
				
			|||
			case 2: | 
				
			|||
				return &v.unknownFields | 
				
			|||
			default: | 
				
			|||
				return nil | 
				
			|||
			} | 
				
			|||
		} | 
				
			|||
		file_proto_relay_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { | 
				
			|||
			switch v := v.(*ProxyRequestHeader); i { | 
				
			|||
			case 0: | 
				
			|||
				return &v.state | 
				
			|||
			case 1: | 
				
			|||
				return &v.sizeCache | 
				
			|||
			case 2: | 
				
			|||
				return &v.unknownFields | 
				
			|||
			default: | 
				
			|||
				return nil | 
				
			|||
			} | 
				
			|||
		} | 
				
			|||
		file_proto_relay_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { | 
				
			|||
			switch v := v.(*ProxyRequestBodyChunk); i { | 
				
			|||
			case 0: | 
				
			|||
				return &v.state | 
				
			|||
			case 1: | 
				
			|||
				return &v.sizeCache | 
				
			|||
			case 2: | 
				
			|||
				return &v.unknownFields | 
				
			|||
			default: | 
				
			|||
				return nil | 
				
			|||
			} | 
				
			|||
		} | 
				
			|||
		file_proto_relay_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { | 
				
			|||
			switch v := v.(*ProxyResponseMessage); i { | 
				
			|||
			case 0: | 
				
			|||
				return &v.state | 
				
			|||
			case 1: | 
				
			|||
				return &v.sizeCache | 
				
			|||
			case 2: | 
				
			|||
				return &v.unknownFields | 
				
			|||
			default: | 
				
			|||
				return nil | 
				
			|||
			} | 
				
			|||
		} | 
				
			|||
		file_proto_relay_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { | 
				
			|||
			switch v := v.(*ProxyResponseHeader); i { | 
				
			|||
			case 0: | 
				
			|||
				return &v.state | 
				
			|||
			case 1: | 
				
			|||
				return &v.sizeCache | 
				
			|||
			case 2: | 
				
			|||
				return &v.unknownFields | 
				
			|||
			default: | 
				
			|||
				return nil | 
				
			|||
			} | 
				
			|||
		} | 
				
			|||
		file_proto_relay_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { | 
				
			|||
			switch v := v.(*ProxyResponseBodyChunk); i { | 
				
			|||
			case 0: | 
				
			|||
				return &v.state | 
				
			|||
			case 1: | 
				
			|||
				return &v.sizeCache | 
				
			|||
			case 2: | 
				
			|||
				return &v.unknownFields | 
				
			|||
			default: | 
				
			|||
				return nil | 
				
			|||
			} | 
				
			|||
		} | 
				
			|||
	} | 
				
			|||
	file_proto_relay_proto_msgTypes[0].OneofWrappers = []interface{}{ | 
				
			|||
		(*ProxyRequestMessage_Header)(nil), | 
				
			|||
		(*ProxyRequestMessage_BodyChunk)(nil), | 
				
			|||
	} | 
				
			|||
	file_proto_relay_proto_msgTypes[3].OneofWrappers = []interface{}{ | 
				
			|||
		(*ProxyResponseMessage_Header)(nil), | 
				
			|||
		(*ProxyResponseMessage_BodyChunk)(nil), | 
				
			|||
	} | 
				
			|||
	type x struct{} | 
				
			|||
	out := protoimpl.TypeBuilder{ | 
				
			|||
		File: protoimpl.DescBuilder{ | 
				
			|||
			GoPackagePath: reflect.TypeOf(x{}).PkgPath(), | 
				
			|||
			RawDescriptor: file_proto_relay_proto_rawDesc, | 
				
			|||
			NumEnums:      0, | 
				
			|||
			NumMessages:   8, | 
				
			|||
			NumExtensions: 0, | 
				
			|||
			NumServices:   1, | 
				
			|||
		}, | 
				
			|||
		GoTypes:           file_proto_relay_proto_goTypes, | 
				
			|||
		DependencyIndexes: file_proto_relay_proto_depIdxs, | 
				
			|||
		MessageInfos:      file_proto_relay_proto_msgTypes, | 
				
			|||
	}.Build() | 
				
			|||
	File_proto_relay_proto = out.File | 
				
			|||
	file_proto_relay_proto_rawDesc = nil | 
				
			|||
	file_proto_relay_proto_goTypes = nil | 
				
			|||
	file_proto_relay_proto_depIdxs = nil | 
				
			|||
} | 
				
			|||
@ -0,0 +1,143 @@ | 
				
			|||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
 | 
				
			|||
// versions:
 | 
				
			|||
// - protoc-gen-go-grpc v1.2.0
 | 
				
			|||
// - protoc             v6.33.0
 | 
				
			|||
// source: proto/relay.proto
 | 
				
			|||
 | 
				
			|||
package proto | 
				
			|||
 | 
				
			|||
import ( | 
				
			|||
	context "context" | 
				
			|||
	grpc "google.golang.org/grpc" | 
				
			|||
	codes "google.golang.org/grpc/codes" | 
				
			|||
	status "google.golang.org/grpc/status" | 
				
			|||
) | 
				
			|||
 | 
				
			|||
// This is a compile-time assertion to ensure that this generated file
 | 
				
			|||
// is compatible with the grpc package it is being compiled against.
 | 
				
			|||
// Requires gRPC-Go v1.32.0 or later.
 | 
				
			|||
const _ = grpc.SupportPackageIsVersion7 | 
				
			|||
 | 
				
			|||
// InternalRelayClient is the client API for InternalRelay service.
 | 
				
			|||
//
 | 
				
			|||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
 | 
				
			|||
type InternalRelayClient interface { | 
				
			|||
	// ProxyRequest 是一个双向流式 RPC。
 | 
				
			|||
	// "stream" 关键字表示客户端和服务器都可以连续地发送一系列消息,
 | 
				
			|||
	// 这对于传输大文件或实时数据流(如视频)至关重要,可以避免将整个内容加载到内存中。
 | 
				
			|||
	ProxyRequest(ctx context.Context, opts ...grpc.CallOption) (InternalRelay_ProxyRequestClient, error) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
type internalRelayClient struct { | 
				
			|||
	cc grpc.ClientConnInterface | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func NewInternalRelayClient(cc grpc.ClientConnInterface) InternalRelayClient { | 
				
			|||
	return &internalRelayClient{cc} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (c *internalRelayClient) ProxyRequest(ctx context.Context, opts ...grpc.CallOption) (InternalRelay_ProxyRequestClient, error) { | 
				
			|||
	stream, err := c.cc.NewStream(ctx, &InternalRelay_ServiceDesc.Streams[0], "/relay.InternalRelay/ProxyRequest", opts...) | 
				
			|||
	if err != nil { | 
				
			|||
		return nil, err | 
				
			|||
	} | 
				
			|||
	x := &internalRelayProxyRequestClient{stream} | 
				
			|||
	return x, nil | 
				
			|||
} | 
				
			|||
 | 
				
			|||
type InternalRelay_ProxyRequestClient interface { | 
				
			|||
	Send(*ProxyRequestMessage) error | 
				
			|||
	Recv() (*ProxyResponseMessage, error) | 
				
			|||
	grpc.ClientStream | 
				
			|||
} | 
				
			|||
 | 
				
			|||
type internalRelayProxyRequestClient struct { | 
				
			|||
	grpc.ClientStream | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *internalRelayProxyRequestClient) Send(m *ProxyRequestMessage) error { | 
				
			|||
	return x.ClientStream.SendMsg(m) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *internalRelayProxyRequestClient) Recv() (*ProxyResponseMessage, error) { | 
				
			|||
	m := new(ProxyResponseMessage) | 
				
			|||
	if err := x.ClientStream.RecvMsg(m); err != nil { | 
				
			|||
		return nil, err | 
				
			|||
	} | 
				
			|||
	return m, nil | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// InternalRelayServer is the server API for InternalRelay service.
 | 
				
			|||
// All implementations must embed UnimplementedInternalRelayServer
 | 
				
			|||
// for forward compatibility
 | 
				
			|||
type InternalRelayServer interface { | 
				
			|||
	// ProxyRequest 是一个双向流式 RPC。
 | 
				
			|||
	// "stream" 关键字表示客户端和服务器都可以连续地发送一系列消息,
 | 
				
			|||
	// 这对于传输大文件或实时数据流(如视频)至关重要,可以避免将整个内容加载到内存中。
 | 
				
			|||
	ProxyRequest(InternalRelay_ProxyRequestServer) error | 
				
			|||
	mustEmbedUnimplementedInternalRelayServer() | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// UnimplementedInternalRelayServer must be embedded to have forward compatible implementations.
 | 
				
			|||
type UnimplementedInternalRelayServer struct { | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (UnimplementedInternalRelayServer) ProxyRequest(InternalRelay_ProxyRequestServer) error { | 
				
			|||
	return status.Errorf(codes.Unimplemented, "method ProxyRequest not implemented") | 
				
			|||
} | 
				
			|||
func (UnimplementedInternalRelayServer) mustEmbedUnimplementedInternalRelayServer() {} | 
				
			|||
 | 
				
			|||
// UnsafeInternalRelayServer may be embedded to opt out of forward compatibility for this service.
 | 
				
			|||
// Use of this interface is not recommended, as added methods to InternalRelayServer will
 | 
				
			|||
// result in compilation errors.
 | 
				
			|||
type UnsafeInternalRelayServer interface { | 
				
			|||
	mustEmbedUnimplementedInternalRelayServer() | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func RegisterInternalRelayServer(s grpc.ServiceRegistrar, srv InternalRelayServer) { | 
				
			|||
	s.RegisterService(&InternalRelay_ServiceDesc, srv) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func _InternalRelay_ProxyRequest_Handler(srv interface{}, stream grpc.ServerStream) error { | 
				
			|||
	return srv.(InternalRelayServer).ProxyRequest(&internalRelayProxyRequestServer{stream}) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
type InternalRelay_ProxyRequestServer interface { | 
				
			|||
	Send(*ProxyResponseMessage) error | 
				
			|||
	Recv() (*ProxyRequestMessage, error) | 
				
			|||
	grpc.ServerStream | 
				
			|||
} | 
				
			|||
 | 
				
			|||
type internalRelayProxyRequestServer struct { | 
				
			|||
	grpc.ServerStream | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *internalRelayProxyRequestServer) Send(m *ProxyResponseMessage) error { | 
				
			|||
	return x.ServerStream.SendMsg(m) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func (x *internalRelayProxyRequestServer) Recv() (*ProxyRequestMessage, error) { | 
				
			|||
	m := new(ProxyRequestMessage) | 
				
			|||
	if err := x.ServerStream.RecvMsg(m); err != nil { | 
				
			|||
		return nil, err | 
				
			|||
	} | 
				
			|||
	return m, nil | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// InternalRelay_ServiceDesc is the grpc.ServiceDesc for InternalRelay service.
 | 
				
			|||
// It's only intended for direct use with grpc.RegisterService,
 | 
				
			|||
// and not to be introspected or modified (even as a copy)
 | 
				
			|||
var InternalRelay_ServiceDesc = grpc.ServiceDesc{ | 
				
			|||
	ServiceName: "relay.InternalRelay", | 
				
			|||
	HandlerType: (*InternalRelayServer)(nil), | 
				
			|||
	Methods:     []grpc.MethodDesc{}, | 
				
			|||
	Streams: []grpc.StreamDesc{ | 
				
			|||
		{ | 
				
			|||
			StreamName:    "ProxyRequest", | 
				
			|||
			Handler:       _InternalRelay_ProxyRequest_Handler, | 
				
			|||
			ServerStreams: true, | 
				
			|||
			ClientStreams: true, | 
				
			|||
		}, | 
				
			|||
	}, | 
				
			|||
	Metadata: "proto/relay.proto", | 
				
			|||
} | 
				
			|||
@ -0,0 +1,167 @@ | 
				
			|||
package session | 
				
			|||
 | 
				
			|||
import ( | 
				
			|||
	"context" | 
				
			|||
	"log" | 
				
			|||
	"net" | 
				
			|||
	"net/http" | 
				
			|||
	"net/http/httputil" | 
				
			|||
	"strings" | 
				
			|||
	"sync" | 
				
			|||
 | 
				
			|||
	"github.com/hashicorp/yamux" | 
				
			|||
) | 
				
			|||
 | 
				
			|||
// SessionInfo 存储了一个活跃的设备连接所需的所有信息。
 | 
				
			|||
// 我们将 yamux.Session 和 UserID 绑定在一起。
 | 
				
			|||
type SessionInfo struct { | 
				
			|||
	Session *yamux.Session | 
				
			|||
	UserID  string | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// Manager 是会话管理的核心结构体。
 | 
				
			|||
// 它只负责管理本实例内存中的会话,不关心 Redis 或其他存储。
 | 
				
			|||
type Manager struct { | 
				
			|||
	// localSessions 使用设备 SN 作为 key,存储会话信息。
 | 
				
			|||
	localSessions map[string]*SessionInfo | 
				
			|||
	// sessionMutex 用于保护对 localSessions 的并发访问。
 | 
				
			|||
	sessionMutex sync.RWMutex | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// GlobalManager 是一个全局单例,方便在项目各处调用。
 | 
				
			|||
var GlobalManager *Manager | 
				
			|||
 | 
				
			|||
// InitManager 初始化全局的会P话管理器。
 | 
				
			|||
func InitManager() { | 
				
			|||
	GlobalManager = &Manager{ | 
				
			|||
		localSessions: make(map[string]*SessionInfo), | 
				
			|||
	} | 
				
			|||
	log.Println("Local session manager initialized.") | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// AddSession 向管理器中添加一个新的设备会话。
 | 
				
			|||
// 如果已存在同名会话,它会先关闭旧的,再添加新的。
 | 
				
			|||
func (m *Manager) AddSession(deviceSN string, info *SessionInfo) { | 
				
			|||
	m.sessionMutex.Lock() | 
				
			|||
	defer m.sessionMutex.Unlock() | 
				
			|||
 | 
				
			|||
	// 如果设备重连,旧的会话可能还存在,需要先关闭它
 | 
				
			|||
	if oldInfo, exists := m.localSessions[deviceSN]; exists { | 
				
			|||
		log.Printf("Device '%s' already has a local session, closing the old one.", deviceSN) | 
				
			|||
		oldInfo.Session.Close() | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	m.localSessions[deviceSN] = info | 
				
			|||
	log.Printf("Local session for device '%s' has been added.", deviceSN) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// RemoveSession 从管理器中移除一个设备会话。
 | 
				
			|||
// 它会检查传入的 session 对象是否与当前存储的一致,防止误删新会话。
 | 
				
			|||
func (m *Manager) RemoveSession(deviceSN string, session *yamux.Session) { | 
				
			|||
	m.sessionMutex.Lock() | 
				
			|||
	defer m.sessionMutex.Unlock() | 
				
			|||
 | 
				
			|||
	// 这是一个重要的检查:确保我们删除的是正确的、已经过期的会话,
 | 
				
			|||
	// 而不是一个刚刚建立的新会话(万一发生竞争)。
 | 
				
			|||
	if currentInfo, exists := m.localSessions[deviceSN]; exists && currentInfo.Session == session { | 
				
			|||
		delete(m.localSessions, deviceSN) | 
				
			|||
		log.Printf("Local session for device '%s' has been removed.", deviceSN) | 
				
			|||
	} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// GetLocalSession 根据设备 SN 查找一个活跃的本地会话。
 | 
				
			|||
// 这是最常用的查询方法。
 | 
				
			|||
func (m *Manager) GetLocalSession(deviceSN string) (*SessionInfo, bool) { | 
				
			|||
	m.sessionMutex.RLock() | 
				
			|||
	defer m.sessionMutex.RUnlock() | 
				
			|||
 | 
				
			|||
	info, ok := m.localSessions[deviceSN] | 
				
			|||
	if ok && !info.Session.IsClosed() { | 
				
			|||
		// 确保会话不仅存在,而且是活跃的
 | 
				
			|||
		return info, true | 
				
			|||
	} | 
				
			|||
	return nil, false | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// CreateReverseProxy 是一个辅助函数,用于创建一个配置好的 httputil.ReverseProxy。
 | 
				
			|||
// 将这个逻辑放在这里,是因为它与 SessionInfo 强相关,可以被 main.go 和 grpc/server.go 复用。
 | 
				
			|||
func CreateReverseProxy(sessionInfo *SessionInfo, deviceSN string, originalPath string, originalQuery string) *httputil.ReverseProxy { | 
				
			|||
	return &httputil.ReverseProxy{ | 
				
			|||
		// Director 负责在请求被转发前,修改请求的 URL、Header 等。
 | 
				
			|||
		Director: func(req *http.Request) { | 
				
			|||
			// [新增日志] 如果是 WebSocket 请求,就打印它
 | 
				
			|||
			if isWebSocketRequest(req) { // isWebSocketRequest 是我们之前写的辅助函数
 | 
				
			|||
				// true 表示连同 body 一起打印,对于握手请求 body 为空
 | 
				
			|||
				reqDump, _ := httputil.DumpRequestOut(req, true) | 
				
			|||
				log.Printf("--- [SUCCESS CASE] ReverseProxy is about to send this WebSocket request:\n%s\n-------------------------------------------------", string(reqDump)) | 
				
			|||
			} | 
				
			|||
 | 
				
			|||
			// 从原始请求路径中解析出要转发到 immich 的真正路径
 | 
				
			|||
			// 例如,从 "/tunnel/SN123/api/album" -> "/api/album"
 | 
				
			|||
			pathParts := strings.SplitN(strings.TrimPrefix(originalPath, "/"), "/", 3) | 
				
			|||
			if len(pathParts) > 2 { | 
				
			|||
				req.URL.Path = "/" + pathParts[2] | 
				
			|||
			} else { | 
				
			|||
				req.URL.Path = "/" | 
				
			|||
			} | 
				
			|||
 | 
				
			|||
			req.URL.RawQuery = originalQuery // 传递原始的查询参数
 | 
				
			|||
			req.URL.Scheme = "http" | 
				
			|||
			// Host 不重要,因为我们下面会劫持网络连接 (DialContext)
 | 
				
			|||
			req.URL.Host = "immich-internal" | 
				
			|||
			// 设置 X-Real-IP 头,让 immich 知道原始客户端的 IP
 | 
				
			|||
			req.Header.Set("X-Real-IP", req.RemoteAddr) | 
				
			|||
		}, | 
				
			|||
 | 
				
			|||
		// Transport 负责实际的请求发送。我们通过重写 DialContext 来劫持它。
 | 
				
			|||
		Transport: &http.Transport{ | 
				
			|||
			// 这是整个隧道转发的核心:
 | 
				
			|||
			// 当 ReverseProxy 尝试建立一个 TCP 连接到 "immich-internal" 时,
 | 
				
			|||
			// 我们不进行真正的网络拨号,而是直接在 yamux 会话上打开一个新的流 (stream)。
 | 
				
			|||
			// 这个流就等同于一个虚拟的 TCP 连接,直接通往设备端的 immich 容器。
 | 
				
			|||
			DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { | 
				
			|||
				return sessionInfo.Session.Open() | 
				
			|||
			}, | 
				
			|||
			// 必须禁用 HTTP/2,因为它与我们的简单流转发不兼容。
 | 
				
			|||
			ForceAttemptHTTP2: false, | 
				
			|||
		}, | 
				
			|||
 | 
				
			|||
		// FlushInterval 设置为 -1 会禁用缓冲,立即将数据块发送出去。
 | 
				
			|||
		// 这对于视频流和 WebSocket 至关重要。
 | 
				
			|||
		FlushInterval: -1, | 
				
			|||
 | 
				
			|||
		// ModifyResponse 允许我们在响应返回给客户端之前修改它。
 | 
				
			|||
		ModifyResponse: func(resp *http.Response) error { | 
				
			|||
			// [新增调试日志]
 | 
				
			|||
			// 这是一个关键探针!
 | 
				
			|||
			if resp.StatusCode == http.StatusSwitchingProtocols { // 101
 | 
				
			|||
				log.Printf("DEBUG (WebSocket): ModifyResponse received '101 Switching Protocols'. This means backend handshake was successful!") | 
				
			|||
			} | 
				
			|||
			// 这个 Header 告诉上游的代理(如 Nginx)不要缓冲这个响应。
 | 
				
			|||
			resp.Header.Set("X-Accel-Buffering", "no") | 
				
			|||
			return nil | 
				
			|||
		}, | 
				
			|||
 | 
				
			|||
		// ErrorHandler 定义了当转发过程中发生错误(如设备端断开连接)时的处理逻辑。
 | 
				
			|||
		ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) { | 
				
			|||
			// [新增调试日志]
 | 
				
			|||
			// 这是另一个关键探针!
 | 
				
			|||
			if r.Header.Get("Upgrade") == "websocket" { | 
				
			|||
				log.Printf("DEBUG (WebSocket): ErrorHandler was triggered for a WebSocket request. Error: %v", err) | 
				
			|||
			} | 
				
			|||
 | 
				
			|||
			log.Printf("ERROR: Reverse proxy error for device %s: %v", deviceSN, err) | 
				
			|||
			http.Error(w, "Error forwarding request to device", http.StatusBadGateway) | 
				
			|||
		}, | 
				
			|||
	} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// [新增] 确保 isWebSocketRequest 辅助函数存在于 session/manager.go
 | 
				
			|||
func isWebSocketRequest(r *http.Request) bool { | 
				
			|||
	upgradeHeader := strings.ToLower(r.Header.Get("Upgrade")) | 
				
			|||
	if upgradeHeader != "websocket" { | 
				
			|||
		return false | 
				
			|||
	} | 
				
			|||
	connectionHeader := strings.ToLower(r.Header.Get("Connection")) | 
				
			|||
	return strings.Contains(connectionHeader, "upgrade") | 
				
			|||
} | 
				
			|||
@ -0,0 +1,126 @@ | 
				
			|||
// 文件: storage/redis.go
 | 
				
			|||
package storage | 
				
			|||
 | 
				
			|||
import ( | 
				
			|||
	"context" | 
				
			|||
	"fmt" | 
				
			|||
	"github.com/redis/go-redis/v9" | 
				
			|||
	"log" | 
				
			|||
	"memobus_relay_server/config" // 替换为你的模块名
 | 
				
			|||
	"time" | 
				
			|||
) | 
				
			|||
 | 
				
			|||
// RedisManager 结构体封装了所有与 Redis 相关的操作
 | 
				
			|||
type RedisManager struct { | 
				
			|||
	Client     *redis.Client | 
				
			|||
	sessionTTL time.Duration | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// GlobalRedis 是一个全局可访问的 RedisManager 实例
 | 
				
			|||
var GlobalRedis *RedisManager | 
				
			|||
 | 
				
			|||
// InitRedis 初始化 Redis 连接并创建全局的 RedisManager 实例
 | 
				
			|||
// 如果配置中 Redis 未启用,则返回 nil
 | 
				
			|||
func InitRedis() error { | 
				
			|||
	if !config.Cfg.Redis.Enabled { | 
				
			|||
		log.Println("Redis is disabled in config. Skipping initialization.") | 
				
			|||
		return nil | 
				
			|||
 | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	client := redis.NewClient(&redis.Options{ | 
				
			|||
		Addr:     config.Cfg.Redis.Addr, | 
				
			|||
		Password: config.Cfg.Redis.Password, | 
				
			|||
		DB:       config.Cfg.Redis.DB, | 
				
			|||
	}) | 
				
			|||
 | 
				
			|||
	if err := client.Ping(context.Background()).Err(); err != nil { | 
				
			|||
		return fmt.Errorf("failed to connect to Redis: %w", err) | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	GlobalRedis = &RedisManager{ | 
				
			|||
		Client:     client, | 
				
			|||
		sessionTTL: time.Duration(config.Cfg.Redis.SessionTTLSeconds) * time.Second, | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	log.Println("Successfully connected to Redis.") | 
				
			|||
	return nil | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// getRedisKey 生成设备会话在 Redis 中的 key
 | 
				
			|||
func getRedisKey(deviceSN string) string { | 
				
			|||
	return fmt.Sprintf("device_session:%s", deviceSN) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// RegisterDeviceSession 将设备标记为在线
 | 
				
			|||
// 在单机模式下,value 可以是一个简单的占位符,如 "online"
 | 
				
			|||
func (m *RedisManager) RegisterDeviceSession(deviceSN string, value string) error { | 
				
			|||
	key := getRedisKey(deviceSN) | 
				
			|||
	err := m.Client.Set(context.Background(), key, value, m.sessionTTL).Err() | 
				
			|||
	if err != nil { | 
				
			|||
		return fmt.Errorf("failed to register device '%s' to Redis: %w", deviceSN, err) | 
				
			|||
	} | 
				
			|||
	log.Printf("Device '%s' registered in Redis.", deviceSN) | 
				
			|||
	return nil | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// DeregisterDeviceSession 从 Redis 中移除设备会话
 | 
				
			|||
func (m *RedisManager) DeregisterDeviceSession(deviceSN string) { | 
				
			|||
	key := getRedisKey(deviceSN) | 
				
			|||
	m.Client.Del(context.Background(), key) | 
				
			|||
	log.Printf("Device '%s' deregistered from Redis.", deviceSN) | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// IsDeviceOnline 检查设备是否在 Redis 中被标记为在线
 | 
				
			|||
func (m *RedisManager) IsDeviceOnline(deviceSN string) (bool, error) { | 
				
			|||
	key := getRedisKey(deviceSN) | 
				
			|||
	val, err := m.Client.Get(context.Background(), key).Result() | 
				
			|||
	if err == redis.Nil { | 
				
			|||
		return false, nil // Key 不存在,明确表示不在线
 | 
				
			|||
	} | 
				
			|||
	if err != nil { | 
				
			|||
		return false, fmt.Errorf("redis error looking up device '%s': %w", deviceSN, err) | 
				
			|||
	} | 
				
			|||
	return val != "", nil // 只要 key 存在且值不为空,就认为在线
 | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// [新增] GetDeviceOwner 函数,用来获取持有连接的实例 ID
 | 
				
			|||
func (m *RedisManager) GetDeviceOwner(deviceSN string) (string, error) { | 
				
			|||
	key := getRedisKey(deviceSN) | 
				
			|||
	instanceID, err := m.Client.Get(context.Background(), key).Result() | 
				
			|||
	if err != nil { | 
				
			|||
		// 让调用者处理 redis.Nil 错误,这表示设备未找到
 | 
				
			|||
		return "", err | 
				
			|||
	} | 
				
			|||
	return instanceID, nil | 
				
			|||
} | 
				
			|||
 | 
				
			|||
// KeepAliveSession 启动一个 goroutine,为给定的设备会话在 Redis 中定期续期
 | 
				
			|||
func (m *RedisManager) KeepAliveSession(closeChan <-chan struct{}, deviceSN string) { | 
				
			|||
	// 以 TTL 的一半作为续期间隔
 | 
				
			|||
	ticker := time.NewTicker(m.sessionTTL / 2) | 
				
			|||
	defer ticker.Stop() | 
				
			|||
 | 
				
			|||
	key := getRedisKey(deviceSN) | 
				
			|||
	log.Printf("Starting Redis keep-alive for device '%s'.", deviceSN) | 
				
			|||
 | 
				
			|||
	for { | 
				
			|||
		select { | 
				
			|||
		case <-ticker.C: | 
				
			|||
			// 为 key 续期
 | 
				
			|||
			err := m.Client.Expire(context.Background(), key, m.sessionTTL).Err() | 
				
			|||
			if err != nil { | 
				
			|||
				// 如果 key 不存在了(可能被手动删除或过期),就没必要再续了
 | 
				
			|||
				if err == redis.Nil { | 
				
			|||
					log.Printf("Redis key for %s no longer exists, stopping keep-alive.", deviceSN) | 
				
			|||
					return | 
				
			|||
				} | 
				
			|||
				log.Printf("ERROR: Failed to refresh session TTL for %s in Redis: %v", deviceSN, err) | 
				
			|||
			} | 
				
			|||
		case <-closeChan: | 
				
			|||
			// session 关闭了,退出 goroutine
 | 
				
			|||
			log.Printf("Stopping Redis keep-alive for device '%s' due to session close.", deviceSN) | 
				
			|||
			return | 
				
			|||
		} | 
				
			|||
	} | 
				
			|||
} | 
				
			|||
					Loading…
					
					
				
		Reference in new issue