Feat: new rtmp handler

pull/21/head
zijiren233 1 year ago
parent 4887aa66f8
commit 43d40ab357

@ -24,7 +24,7 @@ require (
github.com/spf13/cobra v1.7.0
github.com/zijiren233/gencontainer v0.0.0-20230930135658-e410015e13cc
github.com/zijiren233/go-colorable v0.0.0-20230930131441-997304c961cb
github.com/zijiren233/livelib v0.1.2-0.20231010145337-1651f7b4be26
github.com/zijiren233/livelib v0.2.1
github.com/zijiren233/stream v0.5.1
github.com/zijiren233/yaml-comment v0.2.0
golang.org/x/crypto v0.14.0

@ -174,6 +174,10 @@ github.com/zijiren233/go-colorable v0.0.0-20230930131441-997304c961cb h1:0DyOxf/
github.com/zijiren233/go-colorable v0.0.0-20230930131441-997304c961cb/go.mod h1:6TCzjDiQ8+5gWZiwsC3pnA5M0vUy2jV2Y7ciHJh729g=
github.com/zijiren233/livelib v0.1.2-0.20231010145337-1651f7b4be26 h1:h7cw3cPQX3VheviU0y0bUVV0CnQ8fJegJgZMBpb/tfw=
github.com/zijiren233/livelib v0.1.2-0.20231010145337-1651f7b4be26/go.mod h1:2wrAAqNIdMZjQrdbO7ERQfqK4VS5fzgUj2xXwrJ8/uo=
github.com/zijiren233/livelib v0.2.0 h1:o2YbXAA4v3WTq97hzIToBg6mvmGXLUHHJSBh7qSmXLE=
github.com/zijiren233/livelib v0.2.0/go.mod h1:2wrAAqNIdMZjQrdbO7ERQfqK4VS5fzgUj2xXwrJ8/uo=
github.com/zijiren233/livelib v0.2.1 h1:7a+R/yiq3WJXM+1kwez9w//uWpRDrQN4hT+TC1hqkpI=
github.com/zijiren233/livelib v0.2.1/go.mod h1:2wrAAqNIdMZjQrdbO7ERQfqK4VS5fzgUj2xXwrJ8/uo=
github.com/zijiren233/stream v0.5.1 h1:9SUwM/fpET6frtBRT5WZBHnan0Hyzkezk/P8N78cgZQ=
github.com/zijiren233/stream v0.5.1/go.mod h1:iIrOm3qgIepQFmptD/HDY+YzamSSzQOtPjpVcK7FCOw=
github.com/zijiren233/yaml-comment v0.2.0 h1:xGcmpFsjK+IIK1InHtl+rKxYVKQ9rne/aRP1gkczZt4=

@ -13,38 +13,46 @@ import (
)
func InitRtmp(ctx context.Context) error {
s := rtmps.NewRtmpServer(rtmps.WithInitHlsPlayer(true))
s := rtmps.NewRtmpServer(auth)
rtmp.Init(s)
s.SetParseChannelFunc(func(ReqAppName, ReqChannelName string, IsPublisher bool) (TrueAppName string, TrueChannel string, err error) {
return nil
}
func auth(ReqAppName, ReqChannelName string, IsPublisher bool) (*rtmps.Channel, error) {
if IsPublisher {
channelName, err := rtmp.AuthRtmpPublish(ReqChannelName)
if err != nil {
log.Errorf("rtmp: publish auth to %s error: %v", ReqAppName, err)
return "", "", err
return nil, err
}
log.Infof("rtmp: publisher login success: %s/%s", ReqAppName, channelName)
id, err := strconv.Atoi(ReqAppName)
if err != nil {
log.Errorf("rtmp: parse channel name to id error: %v", err)
return "", "", err
return nil, err
}
r, err := op.GetRoomByID(uint(id))
if err != nil {
log.Errorf("rtmp: get room by id error: %v", err)
return "", "", err
return nil, err
}
err = r.LazyInit()
if err != nil {
log.Errorf("rtmp: lazy init room error: %v", err)
return "", "", err
return r.GetChannel(channelName)
}
return ReqAppName, channelName, nil
} else if !conf.Conf.Rtmp.RtmpPlayer {
if !conf.Conf.Rtmp.RtmpPlayer {
log.Warnf("rtmp: dial to %s/%s error: %s", ReqAppName, ReqChannelName, "rtmp player is not enabled")
return "", "", fmt.Errorf("rtmp: dial to %s/%s error: %s", ReqAppName, ReqChannelName, "rtmp player is not enabled")
return nil, fmt.Errorf("rtmp: dial to %s/%s error: %s", ReqAppName, ReqChannelName, "rtmp player is not enabled")
}
return ReqAppName, ReqChannelName, nil
})
return nil
id, err := strconv.Atoi(ReqAppName)
if err != nil {
log.Errorf("rtmp: parse channel name to id error: %v", err)
return nil, err
}
r, err := op.GetRoomByID(uint(id))
if err != nil {
log.Errorf("rtmp: get room by id error: %v", err)
return nil, err
}
return r.GetChannel(ReqChannelName)
}

@ -3,7 +3,6 @@ package op
import (
"errors"
"net/url"
"strconv"
"sync/atomic"
"time"
@ -14,8 +13,8 @@ import (
"github.com/synctv-org/synctv/internal/conf"
"github.com/synctv-org/synctv/internal/db"
"github.com/synctv-org/synctv/internal/model"
"github.com/synctv-org/synctv/internal/rtmp"
"github.com/synctv-org/synctv/utils"
"github.com/zijiren233/gencontainer/rwmap"
"github.com/zijiren233/livelib/av"
"github.com/zijiren233/livelib/container/flv"
rtmpProto "github.com/zijiren233/livelib/protocol/rtmp"
@ -27,23 +26,17 @@ import (
type Room struct {
model.Room
uuid uuid.UUID
version uint32
current *current
rtmpa *rtmps.App
initOnce utils.Once
lastActive int64
hub *Hub
channles rwmap.RWMap[string, *rtmps.Channel]
}
func (r *Room) LazyInit() (err error) {
r.initOnce.Do(func() {
r.hub = newHub(r.ID)
r.rtmpa, err = rtmp.RtmpServer().NewApp(strconv.Itoa(int(r.ID)))
if err != nil {
log.Errorf("failed to create rtmp app: %s", err.Error())
return
}
var ms []*model.Movie
ms, err = r.GetAllMoviesByRoomID()
@ -81,13 +74,21 @@ func (r *Room) GetChannel(channelName string) (*rtmps.Channel, error) {
return nil, err
}
return r.rtmpa.GetChannel(channelName)
c, ok := r.channles.Load(channelName)
if !ok {
return nil, errors.New("channel not found")
}
return c, nil
}
func (r *Room) close() {
if r.initOnce.Done() {
r.hub.Close()
rtmp.RtmpServer().DelApp(r.Name)
r.channles.Range(func(_ string, c *rtmps.Channel) bool {
c.Close()
return true
})
}
}
@ -127,13 +128,10 @@ func (r *Room) UpdateMovie(movieId uint, movie model.BaseMovieInfo) error {
func (r *Room) terminateMovie(movie *model.Movie) error {
switch {
case movie.Live && movie.RtmpSource:
if movie.PullKey != "" {
r.rtmpa.DelChannel(movie.PullKey)
}
case movie.Live && movie.Proxy:
if movie.PullKey != "" {
r.rtmpa.DelChannel(movie.PullKey)
case movie.Live && movie.RtmpSource, movie.Live && movie.Proxy:
c, loaded := r.channles.LoadAndDelete(movie.PullKey)
if loaded {
return c.Close()
}
}
return nil
@ -150,10 +148,11 @@ func (r *Room) initMovie(movie *model.Movie) error {
if movie.PullKey == "" {
movie.PullKey = uuid.NewString()
}
_, err := r.rtmpa.NewChannel(movie.PullKey)
if err != nil {
return err
c, loaded := r.channles.LoadOrStore(movie.PullKey, rtmps.NewChannel())
if loaded {
return errors.New("pull key already exists")
}
c.InitHlsPlayer()
case movie.Live && movie.Proxy:
if !conf.Conf.Proxy.LiveProxy {
return errors.New("live proxy is not enabled")
@ -167,11 +166,12 @@ func (r *Room) initMovie(movie *model.Movie) error {
}
switch u.Scheme {
case "rtmp":
movie.PullKey = uuid.NewMD5(r.uuid, []byte(movie.Url)).String()
c, err := r.rtmpa.NewChannel(movie.PullKey)
if err != nil {
return err
movie.PullKey = uuid.NewMD5(uuid.NameSpaceURL, []byte(movie.Url)).String()
c, loaded := r.channles.LoadOrStore(movie.PullKey, rtmps.NewChannel())
if loaded {
return errors.New("pull key already exists")
}
c.InitHlsPlayer()
go func() {
for {
if c.Closed() {
@ -193,11 +193,12 @@ func (r *Room) initMovie(movie *model.Movie) error {
if movie.Type != "flv" {
return errors.New("only flv is supported")
}
movie.PullKey = uuid.NewMD5(r.uuid, []byte(movie.Url)).String()
c, err := r.rtmpa.NewChannel(movie.PullKey)
if err != nil {
return err
movie.PullKey = uuid.NewMD5(uuid.NameSpaceURL, []byte(movie.Url)).String()
c, loaded := r.channles.LoadOrStore(movie.PullKey, rtmps.NewChannel())
if loaded {
return errors.New("pull key already exists")
}
c.InitHlsPlayer()
go func() {
for {
if c.Closed() {
@ -238,7 +239,7 @@ func (r *Room) initMovie(movie *model.Movie) error {
if u.Scheme != "http" && u.Scheme != "https" {
return errors.New("unsupported scheme")
}
movie.PullKey = uuid.NewMD5(r.uuid, []byte(movie.Url)).String()
movie.PullKey = uuid.NewMD5(uuid.NameSpaceURL, []byte(movie.Url)).String()
case !movie.Live && !movie.Proxy, movie.Live && !movie.Proxy && !movie.RtmpSource:
u, err := url.Parse(movie.Url)
if err != nil {

@ -3,15 +3,11 @@ package op
import (
"errors"
"math/rand"
"strconv"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/synctv-org/synctv/internal/db"
"github.com/synctv-org/synctv/internal/model"
"github.com/zijiren233/gencontainer/rwmap"
"github.com/zijiren233/stream"
)
var roomCache rwmap.RWMap[uint, *Room]
@ -35,8 +31,6 @@ func WithVersion(version uint32) RoomConf {
func initRoom(room *model.Room, conf ...RoomConf) (*Room, error) {
r := &Room{
Room: *room,
uuid: uuid.NewMD5(uuid.NameSpaceURL, stream.StringToBytes(strconv.Itoa(int(room.ID)))),
lastActive: time.Now().UnixMilli(),
version: rand.Uint32(),
current: newCurrent(),
}

@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"net/http"
"net/url"
"path"
"path/filepath"
"strconv"
@ -385,14 +384,8 @@ func ProxyMovie(ctx *gin.Context) {
return
}
u, err := url.Parse(m.Url)
if err != nil {
ctx.AbortWithStatusJSON(http.StatusBadRequest, model.NewApiErrorResp(err))
return
}
if utils.IsLocalIP(u.Host) {
ctx.AbortWithStatusJSON(http.StatusBadRequest, model.NewApiErrorStringResp("local ip is not allowed"))
if l, err := utils.ParseURLIsLocalIP(m.Url); err != nil || l {
ctx.AbortWithStatusJSON(http.StatusBadRequest, model.NewApiErrorStringResp("parse url error or url is local ip"))
return
}

@ -3,6 +3,7 @@ package utils
import (
"math/rand"
"net"
"net/url"
"os"
"path/filepath"
"strconv"
@ -182,6 +183,14 @@ func (o *Once) Reset() {
atomic.StoreUint32(&o.done, 0)
}
func ParseURLIsLocalIP(u string) (bool, error) {
url, err := url.Parse(u)
if err != nil {
return false, err
}
return IsLocalIP(url.Host), nil
}
func IsLocalIP(address string) bool {
host, _, err := net.SplitHostPort(address)
if err != nil {

Loading…
Cancel
Save