diff --git a/cmd/server.go b/cmd/server.go index dfd140f..8e596c7 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -15,7 +15,6 @@ import ( "github.com/synctv-org/synctv/internal/rtmp" sysnotify "github.com/synctv-org/synctv/internal/sysNotify" "github.com/synctv-org/synctv/server" - "github.com/synctv-org/synctv/utils" ) var ServerCmd = &cobra.Command{ @@ -80,14 +79,6 @@ func Server(cmd *cobra.Command, args []string) { e := server.NewAndInit() switch { case conf.Conf.Server.Http.CertPath != "" && conf.Conf.Server.Http.KeyPath != "": - conf.Conf.Server.Http.CertPath, err = utils.OptFilePath(conf.Conf.Server.Http.CertPath) - if err != nil { - log.Fatalf("cert path error: %s", err) - } - conf.Conf.Server.Http.KeyPath, err = utils.OptFilePath(conf.Conf.Server.Http.KeyPath) - if err != nil { - log.Fatalf("key path error: %s", err) - } httpl := muxer.Match(cmux.HTTP2(), cmux.TLS()) go http.ServeTLS(httpl, e.Handler(), conf.Conf.Server.Http.CertPath, conf.Conf.Server.Http.KeyPath) if conf.Conf.Server.Http.Quic { @@ -106,14 +97,6 @@ func Server(cmd *cobra.Command, args []string) { e := server.NewAndInit() switch { case conf.Conf.Server.Http.CertPath != "" && conf.Conf.Server.Http.KeyPath != "": - conf.Conf.Server.Http.CertPath, err = utils.OptFilePath(conf.Conf.Server.Http.CertPath) - if err != nil { - log.Fatalf("cert path error: %s", err) - } - conf.Conf.Server.Http.KeyPath, err = utils.OptFilePath(conf.Conf.Server.Http.KeyPath) - if err != nil { - log.Fatalf("key path error: %s", err) - } go http.ServeTLS(serverHttpListener, e.Handler(), conf.Conf.Server.Http.CertPath, conf.Conf.Server.Http.KeyPath) if conf.Conf.Server.Http.Quic { go http3.ListenAndServeQUIC(udpServerHttpAddr.String(), conf.Conf.Server.Http.CertPath, conf.Conf.Server.Http.KeyPath, e.Handler()) diff --git a/internal/bootstrap/config.go b/internal/bootstrap/config.go index 561b11e..34aef66 100644 --- a/internal/bootstrap/config.go +++ b/internal/bootstrap/config.go @@ -3,6 +3,7 @@ package bootstrap import ( "context" "errors" + "fmt" "path/filepath" "github.com/caarlos0/env/v9" @@ -54,6 +55,33 @@ func InitConfig(ctx context.Context) (err error) { } log.Info("load config success from env") } + return optConfigPath(conf.Conf) +} + +func optConfigPath(conf *conf.Config) error { + var err error + conf.Server.ProxyCachePath, err = utils.OptFilePath(conf.Server.ProxyCachePath) + if err != nil { + return fmt.Errorf("get proxy cache path error: %w", err) + } + conf.Server.Http.CertPath, err = utils.OptFilePath(conf.Server.Http.CertPath) + if err != nil { + return fmt.Errorf("get http cert path error: %w", err) + } + conf.Server.Http.KeyPath, err = utils.OptFilePath(conf.Server.Http.KeyPath) + if err != nil { + return fmt.Errorf("get http key path error: %w", err) + } + conf.Log.FilePath, err = utils.OptFilePath(conf.Log.FilePath) + if err != nil { + return fmt.Errorf("get log file path error: %w", err) + } + for _, op := range conf.Oauth2Plugins { + op.PluginFile, err = utils.OptFilePath(op.PluginFile) + if err != nil { + return fmt.Errorf("get oauth2 plugin file path error: %w", err) + } + } return nil } diff --git a/internal/bootstrap/log.go b/internal/bootstrap/log.go index f8c2f05..33a44bb 100644 --- a/internal/bootstrap/log.go +++ b/internal/bootstrap/log.go @@ -35,10 +35,6 @@ func InitLog(ctx context.Context) (err error) { setLog(logrus.StandardLogger()) forceColor := utils.ForceColor() if conf.Conf.Log.Enable { - conf.Conf.Log.FilePath, err = utils.OptFilePath(conf.Conf.Log.FilePath) - if err != nil { - logrus.Fatalf("log: log file path error: %v", err) - } l := &lumberjack.Logger{ Filename: conf.Conf.Log.FilePath, MaxSize: conf.Conf.Log.MaxSize, diff --git a/internal/bootstrap/provider.go b/internal/bootstrap/provider.go index d6b54be..bcfc9d0 100644 --- a/internal/bootstrap/provider.go +++ b/internal/bootstrap/provider.go @@ -19,7 +19,6 @@ import ( "github.com/synctv-org/synctv/internal/provider/plugins" "github.com/synctv-org/synctv/internal/provider/providers" "github.com/synctv-org/synctv/internal/settings" - "github.com/synctv-org/synctv/utils" "github.com/zijiren233/gencontainer/refreshcache0" ) @@ -83,11 +82,6 @@ func InitProvider(ctx context.Context) (err error) { logLevle = hclog.Debug } for _, op := range conf.Conf.Oauth2Plugins { - op.PluginFile, err = utils.OptFilePath(op.PluginFile) - if err != nil { - log.Fatalf("oauth2 plugin file path error: %v", err) - return err - } log.Infof("load oauth2 plugin: %s", op.PluginFile) err := os.MkdirAll(filepath.Dir(op.PluginFile), 0o755) if err != nil { diff --git a/internal/conf/server.go b/internal/conf/server.go index a34fcbe..dd20792 100644 --- a/internal/conf/server.go +++ b/internal/conf/server.go @@ -1,8 +1,9 @@ package conf type ServerConfig struct { - Http HttpServerConfig `yaml:"http"` - Rtmp RtmpServerConfig `yaml:"rtmp"` + Http HttpServerConfig `yaml:"http"` + Rtmp RtmpServerConfig `yaml:"rtmp"` + ProxyCachePath string `yaml:"proxy_cache_path" env:"SERVER_PROXY_CACHE_PATH"` } type HttpServerConfig struct { @@ -33,5 +34,6 @@ func DefaultServerConfig() ServerConfig { Enable: true, Port: 0, }, + ProxyCachePath: "", } } diff --git a/internal/settings/var.go b/internal/settings/var.go index 3da4636..75c1d84 100644 --- a/internal/settings/var.go +++ b/internal/settings/var.go @@ -59,6 +59,7 @@ var ( MovieProxy = NewBoolSetting("movie_proxy", true, model.SettingGroupProxy) LiveProxy = NewBoolSetting("live_proxy", true, model.SettingGroupProxy) AllowProxyToLocal = NewBoolSetting("allow_proxy_to_local", false, model.SettingGroupProxy) + ProxyCacheEnable = NewBoolSetting("proxy_cache_enable", false, model.SettingGroupProxy) ) var ( diff --git a/server/handlers/proxy/cache.go b/server/handlers/proxy/cache.go index ce8eca9..b5cd10a 100644 --- a/server/handlers/proxy/cache.go +++ b/server/handlers/proxy/cache.go @@ -5,72 +5,18 @@ import ( "fmt" "io" "net/http" - "strconv" - "strings" + "os" + "path/filepath" "sync" json "github.com/json-iterator/go" "github.com/zijiren233/ksync" ) -// ByteRange represents an HTTP Range header value -type ByteRange struct { - Start int64 - End int64 -} - -// ParseByteRange parses a Range header value in the format: -// bytes=- -// where end is optional -func ParseByteRange(r string) (*ByteRange, error) { - if r == "" { - return &ByteRange{Start: 0, End: -1}, nil - } - - if !strings.HasPrefix(r, "bytes=") { - return nil, fmt.Errorf("range header must start with 'bytes=', got: %s", r) - } - - r = strings.TrimPrefix(r, "bytes=") - parts := strings.Split(r, "-") - if len(parts) != 2 { - return nil, fmt.Errorf("range header must contain exactly one hyphen (-) separator, got: %s", r) - } - - parts[0] = strings.TrimSpace(parts[0]) - parts[1] = strings.TrimSpace(parts[1]) - - if parts[0] == "" && parts[1] == "" { - return nil, fmt.Errorf("range header cannot have empty start and end values: %s", r) - } - - var start, end int64 = 0, -1 - var err error - - if parts[0] != "" { - start, err = strconv.ParseInt(parts[0], 10, 64) - if err != nil { - return nil, fmt.Errorf("failed to parse range start value '%s': %v", parts[0], err) - } - if start < 0 { - return nil, fmt.Errorf("range start value must be non-negative, got: %d", start) - } - } - - if parts[1] != "" { - end, err = strconv.ParseInt(parts[1], 10, 64) - if err != nil { - return nil, fmt.Errorf("failed to parse range end value '%s': %v", parts[1], err) - } - if end < 0 { - return nil, fmt.Errorf("range end value must be non-negative, got: %d", end) - } - if start > end { - return nil, fmt.Errorf("range start value (%d) cannot be greater than end value (%d)", start, end) - } - } - - return &ByteRange{Start: start, End: end}, nil +// Cache defines the interface for cache implementations +type Cache interface { + Get(key string) (*CacheItem, bool, error) + Set(key string, data *CacheItem) error } // CacheMetadata stores metadata about a cached response @@ -186,14 +132,6 @@ func (i *CacheItem) ReadFrom(r io.Reader) (int64, error) { return read, nil } -// Cache defines the interface for cache implementations -type Cache interface { - Get(key string) (*CacheItem, bool, error) - Set(key string, data *CacheItem) error -} - -var defaultCache Cache = NewMemoryCache() - // MemoryCache implements an in-memory Cache with thread-safe operations type MemoryCache struct { m map[string]*CacheItem @@ -234,248 +172,67 @@ func (c *MemoryCache) Set(key string, data *CacheItem) error { return nil } -var mu = ksync.DefaultKmutex() - -// Proxy defines the interface for proxy implementations -type Proxy interface { - io.ReadSeeker - ContentTotalLength() (int64, error) - ContentType() (string, error) -} - -// Headers defines the interface for accessing response headers -type Headers interface { - Headers() http.Header +type FileCache struct { + mu *ksync.Krwmutex + filePath string } -// SliceCacheProxy implements caching of content slices -type SliceCacheProxy struct { - r Proxy - cache Cache - key string - sliceSize int64 -} - -// NewSliceCacheProxy creates a new SliceCacheProxy instance -func NewSliceCacheProxy(key string, sliceSize int64, r Proxy, cache Cache) *SliceCacheProxy { - return &SliceCacheProxy{ - key: key, - sliceSize: sliceSize, - r: r, - cache: cache, - } +func NewFileCache(filePath string) *FileCache { + return &FileCache{filePath: filePath, mu: ksync.DefaultKrwmutex()} } -func (c *SliceCacheProxy) cacheKey(offset int64) string { - return fmt.Sprintf("%s-%d-%d", c.key, offset, c.sliceSize) -} - -func (c *SliceCacheProxy) alignedOffset(offset int64) int64 { - return (offset / c.sliceSize) * c.sliceSize -} - -func (c *SliceCacheProxy) fmtContentRange(start, end, total int64) string { - totalStr := "*" - if total >= 0 { - totalStr = strconv.FormatInt(total, 10) - } - if end == -1 { - if total >= 0 { - end = total - 1 - } - return fmt.Sprintf("bytes %d-%d/%s", start, end, totalStr) - } - return fmt.Sprintf("bytes %d-%d/%s", start, end, totalStr) -} - -func (c *SliceCacheProxy) contentLength(start, end, total int64) int64 { - if total == -1 && end == -1 { - return -1 - } - if end == -1 { - if total == -1 { - return -1 - } - return total - start - } - if end >= total && total != -1 { - return total - start +func (c *FileCache) Get(key string) (*CacheItem, bool, error) { + if key == "" { + return nil, false, fmt.Errorf("cache key cannot be empty") } - return end - start + 1 -} -func (c *SliceCacheProxy) fmtContentLength(start, end, total int64) string { - length := c.contentLength(start, end, total) - if length == -1 { - return "" - } - return strconv.FormatInt(length, 10) -} + filePath := filepath.Join(c.filePath, key) -// ServeHTTP implements http.Handler interface -func (c *SliceCacheProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { - byteRange, err := ParseByteRange(r.Header.Get("Range")) - if err != nil { - http.Error(w, fmt.Sprintf("Failed to parse Range header: %v", err), http.StatusBadRequest) - return - } + c.mu.RLock(key) + defer c.mu.RUnlock(key) - alignedOffset := c.alignedOffset(byteRange.Start) - cacheItem, err := c.getCacheItem(alignedOffset) + file, err := os.OpenFile(filePath, os.O_RDONLY, 0o644) if err != nil { - http.Error(w, fmt.Sprintf("Failed to get cache item: %v", err), http.StatusInternalServerError) - return - } - - c.setResponseHeaders(w, byteRange, cacheItem, r.Header.Get("Range") != "") - if err := c.writeResponse(w, byteRange, alignedOffset, cacheItem); err != nil { - fmt.Printf("Failed to write response: %v\n", err) - fmt.Printf("Failed to write response: %v\n", err) - fmt.Printf("Failed to write response: %v\n", err) - return - } -} - -func (c *SliceCacheProxy) setResponseHeaders(w http.ResponseWriter, byteRange *ByteRange, cacheItem *CacheItem, hasRange bool) { - // Copy headers excluding special ones - for k, v := range cacheItem.Metadata.Headers { - switch k { - case "Content-Type", "Content-Length", "Content-Range", "Accept-Ranges": - continue - default: - w.Header()[k] = v - } - } - - w.Header().Set("Content-Length", c.fmtContentLength(byteRange.Start, byteRange.End, cacheItem.Metadata.ContentTotalLength)) - w.Header().Set("Content-Type", cacheItem.Metadata.ContentType) - if hasRange { - w.Header().Set("Accept-Ranges", "bytes") - w.Header().Set("Content-Range", c.fmtContentRange(byteRange.Start, byteRange.End, cacheItem.Metadata.ContentTotalLength)) - w.WriteHeader(http.StatusPartialContent) - } else { - w.WriteHeader(http.StatusOK) - } -} - -func (c *SliceCacheProxy) writeResponse(w http.ResponseWriter, byteRange *ByteRange, alignedOffset int64, cacheItem *CacheItem) error { - sliceOffset := byteRange.Start - alignedOffset - if sliceOffset < 0 { - return fmt.Errorf("slice offset cannot be negative, got: %d", sliceOffset) - } - - remainingLength := c.contentLength(byteRange.Start, byteRange.End, cacheItem.Metadata.ContentTotalLength) - if remainingLength == 0 { - return nil - } - - // Write initial slice - if remainingLength > 0 { - n := int64(len(cacheItem.Data)) - sliceOffset - if n > remainingLength { - n = remainingLength - } - if n > 0 { - if _, err := w.Write(cacheItem.Data[sliceOffset : sliceOffset+n]); err != nil { - return fmt.Errorf("failed to write initial data slice: %w", err) - } - remainingLength -= n - } - } - - // Write subsequent slices - currentOffset := alignedOffset + c.sliceSize - for remainingLength > 0 { - cacheItem, err := c.getCacheItem(currentOffset) - if err != nil { - return fmt.Errorf("failed to get cache item at offset %d: %w", currentOffset, err) - } - - n := int64(len(cacheItem.Data)) - if n > remainingLength { - n = remainingLength - } - if n > 0 { - if _, err := w.Write(cacheItem.Data[:n]); err != nil { - return fmt.Errorf("failed to write data slice at offset %d: %w", currentOffset, err) - } - remainingLength -= n + if os.IsNotExist(err) { + return nil, false, nil } - currentOffset += c.sliceSize + return nil, false, fmt.Errorf("failed to open cache file: %w", err) } + defer file.Close() - return nil -} - -func (c *SliceCacheProxy) getCacheItem(alignedOffset int64) (*CacheItem, error) { - if alignedOffset < 0 { - return nil, fmt.Errorf("cache item offset cannot be negative, got: %d", alignedOffset) + item := &CacheItem{} + if _, err := item.ReadFrom(file); err != nil { + return nil, false, fmt.Errorf("failed to read cache item: %w", err) } - cacheKey := c.cacheKey(alignedOffset) - mu.Lock(cacheKey) - defer mu.Unlock(cacheKey) - - // Try to get from cache first - slice, ok, err := c.cache.Get(cacheKey) - if err != nil { - return nil, fmt.Errorf("failed to get item from cache: %w", err) - } - if ok { - return slice, nil - } - - // Fetch from source if not in cache - slice, err = c.fetchFromSource(alignedOffset) - if err != nil { - return nil, fmt.Errorf("failed to fetch item from source: %w", err) - } - - // Store in cache - if err = c.cache.Set(cacheKey, slice); err != nil { - return nil, fmt.Errorf("failed to store item in cache: %w", err) - } - - return slice, nil + return item, true, nil } -func (c *SliceCacheProxy) fetchFromSource(offset int64) (*CacheItem, error) { - if offset < 0 { - return nil, fmt.Errorf("source offset cannot be negative, got: %d", offset) +func (c *FileCache) Set(key string, data *CacheItem) error { + if key == "" { + return fmt.Errorf("cache key cannot be empty") } - if _, err := c.r.Seek(offset, io.SeekStart); err != nil { - return nil, fmt.Errorf("failed to seek to offset %d in source: %w", offset, err) + if data == nil { + return fmt.Errorf("cannot cache nil CacheItem") } - buf := make([]byte, c.sliceSize) - n, err := io.ReadFull(c.r, buf) - if err != nil && err != io.ErrUnexpectedEOF { - return nil, fmt.Errorf("failed to read %d bytes from source at offset %d: %w", c.sliceSize, offset, err) - } + c.mu.Lock(key) + defer c.mu.Unlock(key) - var headers http.Header - if h, ok := c.r.(Headers); ok { - headers = h.Headers().Clone() - } else { - headers = make(http.Header) + if err := os.MkdirAll(c.filePath, 0o755); err != nil { + return fmt.Errorf("failed to create cache directory: %w", err) } - contentTotalLength, err := c.r.ContentTotalLength() + filePath := filepath.Join(c.filePath, key) + file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644) if err != nil { - return nil, fmt.Errorf("failed to get content total length from source: %w", err) + return fmt.Errorf("failed to create cache file: %w", err) } + defer file.Close() - contentType, err := c.r.ContentType() - if err != nil { - return nil, fmt.Errorf("failed to get content type from source: %w", err) + if _, err := data.WriteTo(file); err != nil { + return fmt.Errorf("failed to write cache item: %w", err) } - return &CacheItem{ - Metadata: &CacheMetadata{ - Headers: headers, - ContentTotalLength: contentTotalLength, - ContentType: contentType, - }, - Data: buf[:n], - }, nil + return nil } diff --git a/server/handlers/proxy/proxy.go b/server/handlers/proxy/proxy.go index edbff3e..64c9943 100644 --- a/server/handlers/proxy/proxy.go +++ b/server/handlers/proxy/proxy.go @@ -7,14 +7,37 @@ import ( "io" "net/http" "strings" + "sync" "github.com/gin-gonic/gin" + log "github.com/sirupsen/logrus" + "github.com/synctv-org/synctv/internal/conf" "github.com/synctv-org/synctv/internal/settings" "github.com/synctv-org/synctv/server/model" "github.com/synctv-org/synctv/utils" "github.com/zijiren233/go-uhc" ) +var ( + defaultCache Cache = NewMemoryCache() + fileCacheOnce sync.Once + fileCache Cache +) + +func getCache() Cache { + fileCacheOnce.Do(func() { + if conf.Conf.Server.ProxyCachePath == "" { + return + } + log.Infof("proxy cache path: %s", conf.Conf.Server.ProxyCachePath) + fileCache = NewFileCache(conf.Conf.Server.ProxyCachePath) + }) + if fileCache != nil { + return fileCache + } + return defaultCache +} + func ProxyURL(ctx *gin.Context, u string, headers map[string]string, cache bool) error { if !settings.AllowProxyToLocal.Get() { if l, err := utils.ParseURLIsLocalIP(u); err != nil { @@ -34,15 +57,16 @@ func ProxyURL(ctx *gin.Context, u string, headers map[string]string, cache bool) } } - if cache { + if cache && settings.ProxyCacheEnable.Get() { rsc := NewHttpReadSeekCloser(u, WithHeadersMap(headers), WithNotSupportRange(ctx.GetHeader("Range") == ""), ) defer rsc.Close() - NewSliceCacheProxy(u, 1024*512, rsc, defaultCache).ServeHTTP(ctx.Writer, ctx.Request) - return nil + return NewSliceCacheProxy(u, 1024*512, rsc, getCache()). + Proxy(ctx.Writer, ctx.Request) } + ctx2, cf := context.WithCancel(ctx) defer cf() req, err := http.NewRequestWithContext(ctx2, http.MethodGet, u, nil) diff --git a/server/handlers/proxy/slice.go b/server/handlers/proxy/slice.go new file mode 100644 index 0000000..8c7e820 --- /dev/null +++ b/server/handlers/proxy/slice.go @@ -0,0 +1,323 @@ +package proxy + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "net/http" + "strconv" + "strings" + + "github.com/zijiren233/ksync" +) + +var mu = ksync.DefaultKmutex() + +// Proxy defines the interface for proxy implementations +type Proxy interface { + io.ReadSeeker + ContentTotalLength() (int64, error) + ContentType() (string, error) +} + +// Headers defines the interface for accessing response headers +type Headers interface { + Headers() http.Header +} + +// SliceCacheProxy implements caching of content slices +type SliceCacheProxy struct { + r Proxy + cache Cache + key string + sliceSize int64 +} + +// NewSliceCacheProxy creates a new SliceCacheProxy instance +func NewSliceCacheProxy(key string, sliceSize int64, r Proxy, cache Cache) *SliceCacheProxy { + return &SliceCacheProxy{ + key: key, + sliceSize: sliceSize, + r: r, + cache: cache, + } +} + +func (c *SliceCacheProxy) cacheKey(offset int64) string { + key := fmt.Sprintf("%s-%d-%d", c.key, offset, c.sliceSize) + hash := sha256.Sum256([]byte(key)) + return hex.EncodeToString(hash[:]) +} + +func (c *SliceCacheProxy) alignedOffset(offset int64) int64 { + return (offset / c.sliceSize) * c.sliceSize +} + +func (c *SliceCacheProxy) fmtContentRange(start, end, total int64) string { + totalStr := "*" + if total >= 0 { + totalStr = strconv.FormatInt(total, 10) + } + if end == -1 { + if total >= 0 { + end = total - 1 + } + return fmt.Sprintf("bytes %d-%d/%s", start, end, totalStr) + } + return fmt.Sprintf("bytes %d-%d/%s", start, end, totalStr) +} + +func (c *SliceCacheProxy) contentLength(start, end, total int64) int64 { + if total == -1 && end == -1 { + return -1 + } + if end == -1 { + if total == -1 { + return -1 + } + return total - start + } + if end >= total && total != -1 { + return total - start + } + return end - start + 1 +} + +func (c *SliceCacheProxy) fmtContentLength(start, end, total int64) string { + length := c.contentLength(start, end, total) + if length == -1 { + return "" + } + return strconv.FormatInt(length, 10) +} + +// ServeHTTP implements http.Handler interface +func (c *SliceCacheProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { + _ = c.Proxy(w, r) +} + +func (c *SliceCacheProxy) Proxy(w http.ResponseWriter, r *http.Request) error { + byteRange, err := ParseByteRange(r.Header.Get("Range")) + if err != nil { + http.Error(w, fmt.Sprintf("Failed to parse Range header: %v", err), http.StatusBadRequest) + return err + } + + alignedOffset := c.alignedOffset(byteRange.Start) + cacheItem, err := c.getCacheItem(alignedOffset) + if err != nil { + http.Error(w, fmt.Sprintf("Failed to get cache item: %v", err), http.StatusInternalServerError) + return err + } + + c.setResponseHeaders(w, byteRange, cacheItem, r.Header.Get("Range") != "") + if err := c.writeResponse(w, byteRange, alignedOffset, cacheItem); err != nil { + return fmt.Errorf("failed to write response: %w", err) + } + return nil +} + +func (c *SliceCacheProxy) setResponseHeaders(w http.ResponseWriter, byteRange *ByteRange, cacheItem *CacheItem, hasRange bool) { + // Copy headers excluding special ones + for k, v := range cacheItem.Metadata.Headers { + switch k { + case "Content-Type", "Content-Length", "Content-Range", "Accept-Ranges": + continue + default: + w.Header()[k] = v + } + } + + w.Header().Set("Content-Length", c.fmtContentLength(byteRange.Start, byteRange.End, cacheItem.Metadata.ContentTotalLength)) + w.Header().Set("Content-Type", cacheItem.Metadata.ContentType) + if hasRange { + w.Header().Set("Accept-Ranges", "bytes") + w.Header().Set("Content-Range", c.fmtContentRange(byteRange.Start, byteRange.End, cacheItem.Metadata.ContentTotalLength)) + w.WriteHeader(http.StatusPartialContent) + } else { + w.WriteHeader(http.StatusOK) + } +} + +func (c *SliceCacheProxy) writeResponse(w http.ResponseWriter, byteRange *ByteRange, alignedOffset int64, cacheItem *CacheItem) error { + sliceOffset := byteRange.Start - alignedOffset + if sliceOffset < 0 { + return fmt.Errorf("slice offset cannot be negative, got: %d", sliceOffset) + } + + remainingLength := c.contentLength(byteRange.Start, byteRange.End, cacheItem.Metadata.ContentTotalLength) + if remainingLength == 0 { + return nil + } + + // Write initial slice + if remainingLength > 0 { + n := int64(len(cacheItem.Data)) - sliceOffset + if n > remainingLength { + n = remainingLength + } + if n > 0 { + if _, err := w.Write(cacheItem.Data[sliceOffset : sliceOffset+n]); err != nil { + return fmt.Errorf("failed to write initial data slice: %w", err) + } + remainingLength -= n + } + } + + // Write subsequent slices + currentOffset := alignedOffset + c.sliceSize + for remainingLength > 0 { + cacheItem, err := c.getCacheItem(currentOffset) + if err != nil { + return fmt.Errorf("failed to get cache item at offset %d: %w", currentOffset, err) + } + + n := int64(len(cacheItem.Data)) + if n > remainingLength { + n = remainingLength + } + if n > 0 { + if _, err := w.Write(cacheItem.Data[:n]); err != nil { + return fmt.Errorf("failed to write data slice at offset %d: %w", currentOffset, err) + } + remainingLength -= n + } + currentOffset += c.sliceSize + } + + return nil +} + +func (c *SliceCacheProxy) getCacheItem(alignedOffset int64) (*CacheItem, error) { + if alignedOffset < 0 { + return nil, fmt.Errorf("cache item offset cannot be negative, got: %d", alignedOffset) + } + + cacheKey := c.cacheKey(alignedOffset) + mu.Lock(cacheKey) + defer mu.Unlock(cacheKey) + + // Try to get from cache first + slice, ok, err := c.cache.Get(cacheKey) + if err != nil { + return nil, fmt.Errorf("failed to get item from cache: %w", err) + } + if ok { + return slice, nil + } + + // Fetch from source if not in cache + slice, err = c.fetchFromSource(alignedOffset) + if err != nil { + return nil, fmt.Errorf("failed to fetch item from source: %w", err) + } + + // Store in cache + if err = c.cache.Set(cacheKey, slice); err != nil { + return nil, fmt.Errorf("failed to store item in cache: %w", err) + } + + return slice, nil +} + +func (c *SliceCacheProxy) fetchFromSource(offset int64) (*CacheItem, error) { + if offset < 0 { + return nil, fmt.Errorf("source offset cannot be negative, got: %d", offset) + } + if _, err := c.r.Seek(offset, io.SeekStart); err != nil { + return nil, fmt.Errorf("failed to seek to offset %d in source: %w", offset, err) + } + + buf := make([]byte, c.sliceSize) + n, err := io.ReadFull(c.r, buf) + if err != nil && err != io.ErrUnexpectedEOF { + return nil, fmt.Errorf("failed to read %d bytes from source at offset %d: %w", c.sliceSize, offset, err) + } + + var headers http.Header + if h, ok := c.r.(Headers); ok { + headers = h.Headers().Clone() + } else { + headers = make(http.Header) + } + + contentTotalLength, err := c.r.ContentTotalLength() + if err != nil { + return nil, fmt.Errorf("failed to get content total length from source: %w", err) + } + + contentType, err := c.r.ContentType() + if err != nil { + return nil, fmt.Errorf("failed to get content type from source: %w", err) + } + + return &CacheItem{ + Metadata: &CacheMetadata{ + Headers: headers, + ContentTotalLength: contentTotalLength, + ContentType: contentType, + }, + Data: buf[:n], + }, nil +} + +// ByteRange represents an HTTP Range header value +type ByteRange struct { + Start int64 + End int64 +} + +// ParseByteRange parses a Range header value in the format: +// bytes=- +// where end is optional +func ParseByteRange(r string) (*ByteRange, error) { + if r == "" { + return &ByteRange{Start: 0, End: -1}, nil + } + + if !strings.HasPrefix(r, "bytes=") { + return nil, fmt.Errorf("range header must start with 'bytes=', got: %s", r) + } + + r = strings.TrimPrefix(r, "bytes=") + parts := strings.Split(r, "-") + if len(parts) != 2 { + return nil, fmt.Errorf("range header must contain exactly one hyphen (-) separator, got: %s", r) + } + + parts[0] = strings.TrimSpace(parts[0]) + parts[1] = strings.TrimSpace(parts[1]) + + if parts[0] == "" && parts[1] == "" { + return nil, fmt.Errorf("range header cannot have empty start and end values: %s", r) + } + + var start, end int64 = 0, -1 + var err error + + if parts[0] != "" { + start, err = strconv.ParseInt(parts[0], 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to parse range start value '%s': %v", parts[0], err) + } + if start < 0 { + return nil, fmt.Errorf("range start value must be non-negative, got: %d", start) + } + } + + if parts[1] != "" { + end, err = strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to parse range end value '%s': %v", parts[1], err) + } + if end < 0 { + return nil, fmt.Errorf("range end value must be non-negative, got: %d", end) + } + if start > end { + return nil, fmt.Errorf("range start value (%d) cannot be greater than end value (%d)", start, end) + } + } + + return &ByteRange{Start: start, End: end}, nil +} diff --git a/utils/utils.go b/utils/utils.go index 7ef1b9a..8897079 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -298,6 +298,9 @@ func getLocalIPs() []net.IP { } func OptFilePath(filePath string) (string, error) { + if filePath == "" { + return "", nil + } if !filepath.IsAbs(filePath) { return filepath.Abs(filepath.Join(flags.Global.DataDir, filePath)) }