You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
synctv/room/room.go

382 lines
7.6 KiB
Go

package room
import (
"errors"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
"github.com/zijiren233/gencontainer/rwmap"
rtmps "github.com/zijiren233/livelib/server"
"github.com/zijiren233/stream"
"golang.org/x/crypto/bcrypt"
)
var (
ErrRoomIDEmpty = errors.New("roomid is empty")
ErrRoomIDTooLong = errors.New("roomid is too long")
ErrAdminPassWordEmpty = errors.New("admin password is empty")
)
type Room struct {
id string
password []byte
needPassword uint32
version uint64
current *Current
rtmps *rtmps.Server
rtmpa *rtmps.App
hidden uint32
initOnce sync.Once
users rwmap.RWMap[string, *User]
rootUser *User
lastActive int64
createAt int64
mid uint64
hub *hub
*movies
}
type RoomConf func(r *Room)
func WithVersion(version uint64) RoomConf {
return func(r *Room) {
r.version = version
}
}
func WithHidden(hidden bool) RoomConf {
return func(r *Room) {
r.SetHidden(hidden)
}
}
func WithRootUser(u *User) RoomConf {
return func(r *Room) {
u.admin = true
u.room = r
r.rootUser = u
r.AddUser(u)
}
}
// Version cant is 0
func NewRoom(RoomID string, Password string, rtmps *rtmps.Server, conf ...RoomConf) (*Room, error) {
if RoomID == "" {
return nil, ErrRoomIDEmpty
} else if len(RoomID) > 32 {
return nil, ErrRoomIDTooLong
}
now := time.Now().UnixMilli()
r := &Room{
id: RoomID,
rtmps: rtmps,
lastActive: now,
createAt: now,
}
for _, c := range conf {
c(r)
}
if r.version == 0 {
r.version = rand.New(rand.NewSource(now)).Uint64()
}
return r, r.SetPassword(Password)
}
func (r *Room) Init() {
r.initOnce.Do(func() {
r.rtmpa = r.rtmps.GetOrNewApp(r.id)
r.hub = newHub(r.id)
r.movies = newMovies()
r.current = newCurrent()
})
}
func (r *Room) CreateAt() int64 {
return atomic.LoadInt64(&r.createAt)
}
func (r *Room) RootUser() *User {
return r.rootUser
}
func (r *Room) SetRootUser(u *User) {
r.rootUser = u
}
func (r *Room) NewUser(id string, password string, conf ...UserConf) (*User, error) {
u, err := NewUser(id, password, r, conf...)
if err != nil {
return nil, err
}
_, loaded := r.users.LoadOrStore(u.name, u)
if loaded {
return nil, errors.New("user already exist")
}
return u, nil
}
func (r *Room) AddUser(u *User) error {
_, loaded := r.users.LoadOrStore(u.name, u)
if loaded {
return errors.New("user already exist")
}
return nil
}
func (r *Room) GetUser(id string) (*User, error) {
u, ok := r.users.Load(id)
if !ok {
return nil, errors.New("user not found")
}
return u, nil
}
func (r *Room) DelUser(id string) error {
_, ok := r.users.LoadAndDelete(id)
if !ok {
return errors.New("user not found")
}
return nil
}
func (r *Room) GetAndDelUser(id string) (u *User, ok bool) {
return r.users.LoadAndDelete(id)
}
func (r *Room) GetOrNewUser(id string, password string, conf ...UserConf) (*User, error) {
u, err := NewUser(id, password, r, conf...)
if err != nil {
return nil, err
}
user, _ := r.users.LoadOrStore(u.name, u)
return user, nil
}
func (r *Room) UserList() (users []User) {
users = make([]User, 0, r.users.Len())
r.users.Range(func(name string, u *User) bool {
users = append(users, *u)
return true
})
return
}
func (r *Room) NewLiveChannel(channel string) (*rtmps.Channel, error) {
c, err := r.rtmpa.NewChannel(channel)
if err != nil {
return nil, err
}
return c, nil
}
func (r *Room) Start() {
go r.Serve()
}
func (r *Room) Serve() {
r.hub.Serve()
}
func (r *Room) Close() error {
if err := r.hub.Close(); err != nil {
return err
}
err := r.rtmps.DelApp(r.id)
if err != nil {
return err
}
return nil
}
func (r *Room) SetHidden(hidden bool) {
if hidden {
atomic.StoreUint32(&r.hidden, 1)
} else {
atomic.StoreUint32(&r.hidden, 0)
}
}
func (r *Room) Hidden() bool {
return atomic.LoadUint32(&r.hidden) == 1
}
func (r *Room) ID() string {
return r.id
}
func (r *Room) UpdateActiveTime() {
atomic.StoreInt64(&r.lastActive, time.Now().UnixMilli())
}
func (r *Room) LateActiveTime() int64 {
return atomic.LoadInt64(&r.lastActive)
}
func (r *Room) SetPassword(password string) error {
if password != "" {
b, err := bcrypt.GenerateFromPassword(stream.StringToBytes(password), bcrypt.DefaultCost)
if err != nil {
return err
}
r.password = b
atomic.StoreUint32(&r.needPassword, 1)
} else {
atomic.StoreUint32(&r.needPassword, 0)
r.password = nil
}
r.updateVersion()
return nil
}
func (r *Room) SetPasswordAndCloseAll(password string) error {
err := r.SetPassword(password)
if err != nil {
return err
}
r.hub.clients.Range(func(_ string, value *Client) bool {
value.Close()
return true
})
return nil
}
func (r *Room) CheckPassword(password string) (ok bool) {
if !r.NeedPassword() {
return true
}
return bcrypt.CompareHashAndPassword(r.password, stream.StringToBytes(password)) == nil
}
func (r *Room) NeedPassword() bool {
return atomic.LoadUint32(&r.needPassword) == 1
}
func (r *Room) Version() uint64 {
return atomic.LoadUint64(&r.version)
}
func (r *Room) CheckVersion(version uint64) bool {
return r.Version() == version
}
func (r *Room) SetVersion(version uint64) {
atomic.StoreUint64(&r.version, version)
}
func (r *Room) updateVersion() uint64 {
return atomic.AddUint64(&r.version, 1)
}
func (r *Room) Current() *Current {
return r.current
}
// Seek will be set to 0
func (r *Room) ChangeCurrentMovie(id uint64) error {
r.UpdateActiveTime()
e, err := r.movies.getMovie(id)
if err != nil {
return err
}
r.current.SetMovie(MovieInfo{
Id: e.Value.id,
Url: e.Value.Url,
Name: e.Value.Name,
Live: e.Value.Live,
Proxy: e.Value.Proxy,
RtmpSource: e.Value.RtmpSource,
Type: e.Value.Type,
Headers: e.Value.Headers,
PullKey: e.Value.PullKey,
CreateAt: e.Value.CreateAt,
LastEditAt: e.Value.LastEditAt,
Creator: e.Value.Creator().Name(),
})
return nil
}
func (r *Room) SetStatus(playing bool, seek, rate, timeDiff float64) Status {
r.UpdateActiveTime()
return r.current.SetStatus(playing, seek, rate, timeDiff)
}
func (r *Room) SetSeekRate(seek, rate, timeDiff float64) Status {
r.UpdateActiveTime()
return r.current.SetSeekRate(seek, rate, timeDiff)
}
func (r *Room) PushBackMovie(movie *Movie) error {
if r.hub.Closed() {
return ErrAlreadyClosed
}
r.UpdateActiveTime()
return r.movies.PushBackMovie(movie)
}
func (r *Room) PushFrontMovie(movie *Movie) error {
r.UpdateActiveTime()
return r.movies.PushFrontMovie(movie)
}
func (r *Room) DelMovie(id ...uint64) error {
r.UpdateActiveTime()
m, err := r.movies.GetAndDelMovie(id...)
if err != nil {
return err
}
return r.closeLive(m)
}
func (r *Room) ClearMovies() (err error) {
r.UpdateActiveTime()
return r.closeLive(r.movies.GetAndClear())
}
func (r *Room) closeLive(m []*Movie) error {
for _, m := range m {
if m.RtmpSource || (m.Proxy && m.Live) {
if err := r.rtmpa.DelChannel(m.PullKey); err != nil {
return err
}
}
}
return nil
}
func (r *Room) SwapMovie(id1, id2 uint64) error {
r.UpdateActiveTime()
return r.movies.SwapMovie(id1, id2)
}
func (r *Room) Broadcast(msg Message, conf ...BroadcastConf) error {
r.UpdateActiveTime()
return r.hub.Broadcast(msg, conf...)
}
func (r *Room) RegClient(user *User, conn *websocket.Conn) (*Client, error) {
r.UpdateActiveTime()
return r.hub.RegClient(user, conn)
}
func (r *Room) UnRegClient(user *User) error {
r.UpdateActiveTime()
return r.hub.UnRegClient(user)
}
func (r *Room) Closed() bool {
return r.hub.Closed()
}
func (r *Room) ClientNum() int64 {
r.UpdateActiveTime()
return r.hub.ClientNum()
}