You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

208 lines
5.2 KiB

from flask import Flask, request, session, render_template, send_from_directory, abort, redirect
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from mastodon import Mastodon
import os
import time
import json
import requests
from Tools.RtcTokenBuilder import RtcTokenBuilder, Role_Attendee
from Tools.RtmTokenBuilder import RtmTokenBuilder, Role_Rtm_User
from config import C, rds
app = Flask(__name__)
app.secret_key = C.session_key
login_manager = LoginManager()
db = SQLAlchemy(app)
limiter = Limiter(
default_limits=["50 / minute"],
RDS_KEY = 'call_channel_list'
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
acct = db.Column(db.String(64))
disp = db.Column(db.String(64))
avat = db.Column(db.String(256))
url = db.Column(db.String(128))
def __init__(self, id): = id
def __repr__(self):
return f"{}@{self.acct}[{self.disp}]"
def is_active(self):
return True
def is_authenticated(self):
return True
def is_anonymous(self):
return False
def get_id(self):
def load_user(id):
return User.query.get(id)
def unauthorized():
return redirect(C.login_url)
def send_static_file(path):
return send_from_directory('static/', path)
def calc_token(cid, uid):
uid = uid % (2**30 - 1)
expireTimeInSeconds = 3600 * 5
currentTimestamp = int(time.time())
privilegeExpiredTs = currentTimestamp + expireTimeInSeconds
return RtcTokenBuilder.buildTokenWithUid(C.app_id, C.app_certificate, cid, uid, Role_Attendee, privilegeExpiredTs)
def calc_rtm_token(username):
expireTimeInSeconds = 3600 * 5
currentTimestamp = int(time.time())
privilegeExpiredTs = currentTimestamp + expireTimeInSeconds
return RtmTokenBuilder.buildToken(C.app_id, C.app_certificate, username, Role_Rtm_User, privilegeExpiredTs)
def homepage():
me = current_user
app_id = C.app_id
rtm_token = calc_rtm_token(me.acct)
_cid = request.args.get('cid')
if _cid and not rds.hexists(RDS_KEY, _cid):
r = requests.get(
C.ago_api + '/dev/v1/channel/' + app_id,
headers={'Authorization': C.ago_auth, 'Content-Type': 'application/json'}
) # TODO 加缓存
j = r.json()
if not j.get('success'):
return '连接声网出错', 500
remote_list = j.get('data').get('channels')
cnt_dict = {}
for ch in remote_list:
cnt_dict[ch['channel_name']] = ch['user_count']
chs = [(_cid, rds.hget(RDS_KEY, _cid))] if _cid else rds.hgetall(RDS_KEY).items()
ch_list = []
for cid, s_info in chs:
info = json.loads(s_info)
if cid not in cnt_dict:
if not info['empty_time']:
info['empty_time'] = int(time.time())
rds.hset(RDS_KEY, cid, json.dumps(info))
elif int(time.time()) - info['empty_time'] > 1800:
rds.hdel(RDS_KEY, cid)
(cid, info['title'], info['is_private'],
calc_token(cid,, cnt_dict.get(cid, 0))
return render_template('homepage.html', **locals())
@app.route('/call/new', methods=['POST'])
def new_channel():
title = request.form.get('title')
cid = request.form.get('cid') or str(time.time())
is_private = request.form.get('private') == 'on'
if not title or len(title) > 50:
ch_d = {
'title': title,
'is_private': is_private,
'empty_time': None
rds.hset(RDS_KEY, cid, json.dumps(ch_d))
return redirect('.?cid=' + cid)
def user_info(uid):
user = User.query.get(uid)
if not user:
return {
key: getattr(user, key)
for key in ('acct', 'disp', 'avat', 'url')
@limiter.limit("10 / hour")
def auth():
code = request.args.get('code')
client = Mastodon(client_id=C.client_id, client_secret=C.client_secret, api_base_url=C.mas_base_url)
token = client.log_in(code=code, redirect_uri=C.redirect_uri, scopes=['read:accounts'])
info = client.account_verify_credentials()
u = User.query.get(
if not u:
u = User(
u.acct = info.acct
u.disp = info.display_name
u.avat = info.avatar
u.url = info.url
login_user(u, remember=True)
return redirect('.')
def logout():
return redirect('.')
if __name__ == '__main__':