diff --git a/go.sum b/go.sum index f58ce4a..36fa5f8 100644 --- a/go.sum +++ b/go.sum @@ -173,8 +173,6 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= -golang.org/x/net v0.16.0 h1:7eBu7KsSvFDtSXUIDbh3aqlK4DPsZ1rByC8PFfBThos= -golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/proxy/buffer.go b/proxy/buffer.go new file mode 100644 index 0000000..e363f9b --- /dev/null +++ b/proxy/buffer.go @@ -0,0 +1,64 @@ +package proxy + +import "io" + +type BufferedReadSeeker struct { + r io.ReadSeeker + buffer []byte + readIdx, writeIdx int +} + +func NewBufferedReadSeeker(r io.ReadSeeker, bufSize int) *BufferedReadSeeker { + if bufSize == 0 { + bufSize = 64 * 1024 + } + return &BufferedReadSeeker{r: r, buffer: make([]byte, bufSize)} +} + +func (b *BufferedReadSeeker) Reset(r io.ReadSeeker) { + b.r = r + b.readIdx, b.writeIdx = 0, 0 +} + +func (b *BufferedReadSeeker) Read(p []byte) (n int, err error) { + if len(p) == 0 { + return n, err + } + + if b.readIdx == b.writeIdx { + if len(p) >= len(b.buffer) { + n, err = b.r.Read(p) + return n, err + } + b.readIdx, b.writeIdx = 0, 0 + + n, err = b.r.Read(b.buffer) + if n == 0 { + return n, err + } + + b.writeIdx += n + } + + n = copy(p, b.buffer[b.readIdx:b.writeIdx]) + b.readIdx += n + + return n, err +} + +func (b *BufferedReadSeeker) Seek(offset int64, whence int) (int64, error) { + n, err := b.r.Seek(offset, whence) + + b.Reset(b.r) + + return n, err +} + +func (b *BufferedReadSeeker) ReadAt(p []byte, off int64) (int, error) { + _, err := b.Seek(off, io.SeekStart) + if err != nil { + return 0, err + } + + return b.Read(p) +} diff --git a/proxy/read_seeker.go b/proxy/read_seeker.go new file mode 100644 index 0000000..4b35111 --- /dev/null +++ b/proxy/read_seeker.go @@ -0,0 +1,165 @@ +package proxy + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "net/http" + "strconv" +) + +type HttpReadSeeker struct { + offset int64 + url string + contentLength int64 + method string + body io.Reader + client *http.Client + headers map[string]string + ctx context.Context +} + +type HttpReadSeekerConf func(h *HttpReadSeeker) + +func WithHeaders(headers map[string]string) HttpReadSeekerConf { + return func(h *HttpReadSeeker) { + h.headers = headers + } +} + +func WithAppendHeaders(headers map[string]string) HttpReadSeekerConf { + return func(h *HttpReadSeeker) { + for k, v := range headers { + h.headers[k] = v + } + } +} + +func WithClient(client *http.Client) HttpReadSeekerConf { + return func(h *HttpReadSeeker) { + h.client = client + } +} + +func WithMethod(method string) HttpReadSeekerConf { + return func(h *HttpReadSeeker) { + h.method = method + } +} + +func WithContext(ctx context.Context) HttpReadSeekerConf { + return func(h *HttpReadSeeker) { + h.ctx = ctx + } +} + +func WithBody(body []byte) HttpReadSeekerConf { + return func(h *HttpReadSeeker) { + if len(body) != 0 { + h.body = bytes.NewReader(body) + } + } +} + +func WithContentLength(contentLength int64) HttpReadSeekerConf { + return func(h *HttpReadSeeker) { + if contentLength >= 0 { + h.contentLength = contentLength + } + } +} + +func NewHttpReadSeeker(url string, conf ...HttpReadSeekerConf) *HttpReadSeeker { + rs := &HttpReadSeeker{ + offset: 0, + url: url, + contentLength: -1, + method: http.MethodGet, + } + for _, c := range conf { + c(rs) + } + if rs.client == nil { + rs.client = http.DefaultClient + } + rs.fix() + return rs +} + +func NewBufferedHttpReadSeeker(bufSize int, url string, conf ...HttpReadSeekerConf) *BufferedReadSeeker { + if bufSize == 0 { + bufSize = 64 * 1024 + } + return &BufferedReadSeeker{r: NewHttpReadSeeker(url, conf...), buffer: make([]byte, bufSize)} +} + +func (h *HttpReadSeeker) fix() *HttpReadSeeker { + if h.method == "" { + h.method = http.MethodGet + } + if h.ctx == nil { + h.ctx = context.Background() + } + return h +} + +func (h *HttpReadSeeker) Read(p []byte) (n int, err error) { + req, err := http.NewRequestWithContext(h.ctx, h.method, h.url, h.body) + if err != nil { + return 0, err + } + for k, v := range h.headers { + req.Header.Set(k, v) + } + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", h.offset, h.offset+int64(len(p))-1)) + resp, err := h.client.Do(req) + if err != nil { + return 0, err + } + defer resp.Body.Close() + n, err = io.ReadFull(resp.Body, p) + h.offset += int64(n) + return n, err +} + +func (h *HttpReadSeeker) Seek(offset int64, whence int) (int64, error) { + switch whence { + case io.SeekStart: + h.offset = offset + case io.SeekCurrent: + h.offset += offset + case io.SeekEnd: + if h.contentLength < 0 { + req, err := http.NewRequestWithContext(h.ctx, http.MethodHead, h.url, nil) + if err != nil { + return 0, err + } + for k, v := range h.headers { + req.Header.Set(k, v) + } + resp, err := h.client.Do(req) + if err != nil { + return 0, err + } + defer resp.Body.Close() + + h.contentLength, err = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64) + if err != nil { + return 0, err + } + if h.contentLength < 0 { + return 0, errors.New("content length error") + } + } + h.offset = h.contentLength - offset + default: + return 0, errors.New("whence value error") + } + return h.offset, nil +} + +func (h *HttpReadSeeker) Close() error { + return nil +} diff --git a/server/handlers/api-movie.go b/server/handlers/api-movie.go index b9d357c..42d2e32 100644 --- a/server/handlers/api-movie.go +++ b/server/handlers/api-movie.go @@ -3,7 +3,6 @@ package handlers import ( "errors" "fmt" - "io" "net/http" "net/url" "path" @@ -18,6 +17,7 @@ import ( json "github.com/json-iterator/go" "github.com/synctv-org/synctv/internal/conf" pb "github.com/synctv-org/synctv/proto" + "github.com/synctv-org/synctv/proxy" "github.com/synctv-org/synctv/room" "github.com/synctv-org/synctv/utils" "github.com/zijiren233/livelib/av" @@ -193,6 +193,7 @@ func PushMovie(ctx *gin.Context) { ctx.AbortWithStatusJSON(http.StatusBadRequest, NewApiErrorStringResp("movie proxy is not enabled")) return } + movie.PullKey = uuid.New().String() fallthrough case !movie.Live && !movie.Proxy, movie.Live && !movie.Proxy && !movie.RtmpSource: u, err := url.Parse(movie.Url) @@ -513,58 +514,65 @@ const UserAgent = `Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/5 func ProxyMovie(ctx *gin.Context) { rooms := ctx.Value("rooms").(*room.Rooms) - roomid := ctx.Query("roomid") - if roomid == "" { - ctx.AbortWithStatusJSON(http.StatusBadRequest, NewApiErrorStringResp("roomid is empty")) + roomId := ctx.Param("roomId") + if roomId == "" { + ctx.AbortWithStatusJSON(http.StatusBadRequest, NewApiErrorStringResp("roomId is empty")) return } - room, err := rooms.GetRoom(roomid) + room, err := rooms.GetRoom(roomId) if err != nil { ctx.AbortWithStatusJSON(http.StatusBadRequest, NewApiErrorResp(err)) } - cm := room.Current().Movie - if !cm.Proxy || cm.Live { - ctx.AbortWithStatusJSON(http.StatusBadRequest, NewApiErrorStringResp("not support proxy")) - return - } - u, err := url.Parse(cm.Url) + m, err := room.GetMovieWithPullKey(ctx.Param("pullKey")) if err != nil { ctx.AbortWithStatusJSON(http.StatusBadRequest, NewApiErrorResp(err)) return } - req := resty.New().R(). - SetHeader("Range", ctx.GetHeader("Range")). - SetHeader("User-Agent", UserAgent). - SetHeader("Referer", fmt.Sprintf("%s://%s/", u.Scheme, u.Host)). - SetHeader("Origin", fmt.Sprintf("%s://%s", u.Scheme, u.Host)). - SetHeader("Accept", ctx.GetHeader("Accept")). - SetHeader("Accept-Encoding", ctx.GetHeader("Accept-Encoding")). - SetHeader("Accept-Language", ctx.GetHeader("Accept-Language")) - - if cm.Headers != nil { - for k, v := range cm.Headers { - req.SetHeader(k, v) - } + if !m.Proxy || m.Live || m.RtmpSource { + ctx.AbortWithStatusJSON(http.StatusBadRequest, NewApiErrorStringResp("not support proxy")) + return } - resp, err := req.Get(cm.Url) + r := resty.New().R() + + for k, v := range m.Headers { + r.SetHeader(k, v) + } + resp, err := r.Head(m.Url) if err != nil { ctx.AbortWithStatusJSON(http.StatusInternalServerError, NewApiErrorResp(err)) return } - defer resp.RawBody().Close() + if _, ok := allowedProxyMovieType[resp.Header().Get("Content-Type")]; !ok { ctx.AbortWithStatusJSON(http.StatusBadRequest, NewApiErrorResp(fmt.Errorf("this movie type support proxy: %s", resp.Header().Get("Content-Type")))) return } - for k, v := range resp.Header() { - ctx.Header(k, v[0]) - } ctx.Status(resp.StatusCode()) - io.Copy(ctx.Writer, resp.RawBody()) + ctx.Header("Content-Type", resp.Header().Get("Content-Type")) + l := resp.Header().Get("Content-Length") + ctx.Header("Content-Length", l) + + length, err := strconv.ParseInt(l, 10, 64) + if err != nil { + ctx.AbortWithStatusJSON(http.StatusInternalServerError, NewApiErrorResp(err)) + return + } + + hrs := proxy.NewBufferedHttpReadSeeker(128*1024, m.Url, + proxy.WithContext(ctx), + proxy.WithHeaders(m.Headers), + proxy.WithContext(ctx), + proxy.WithContentLength(length), + ) + name := resp.Header().Get("Content-Disposition") + if name == "" { + name = m.Url + } + http.ServeContent(ctx.Writer, ctx.Request, name, time.Now(), hrs) } type FormatErrNotSupportFileType string diff --git a/server/handlers/init.go b/server/handlers/init.go index acc5930..b635a0d 100644 --- a/server/handlers/init.go +++ b/server/handlers/init.go @@ -105,7 +105,7 @@ func Init(e *gin.Engine, s *rtmps.Server, r *room.Rooms) { movie.POST("/clear", ClearMovies) - movie.GET("/proxy/:roomid", ProxyMovie) + movie.GET("/proxy/:roomId/:pullKey", ProxyMovie) { live := movie.Group("/live")