|
|
- from flask import Flask, request, session, render_template, send_from_directory, abort, redirect
- from flask_sqlalchemy import SQLAlchemy
- from flask_login import LoginManager, login_user, logout_user, login_required, current_user
- from flask_limiter import Limiter
- from flask_limiter.util import get_remote_address
- from mastodon import Mastodon
-
- import os
- import time
- import json
- import requests
- from Tools.RtcTokenBuilder import RtcTokenBuilder, Role_Attendee
- from Tools.RtmTokenBuilder import RtmTokenBuilder, Role_Rtm_User
- from config import C, rds
-
- app = Flask(__name__)
- app.config.from_object('config.C')
- app.secret_key = C.session_key
-
- login_manager = LoginManager()
- login_manager.init_app(app)
-
- db = SQLAlchemy(app)
-
- limiter = Limiter(
- app,
- key_func=get_remote_address,
- default_limits=["50 / minute"],
- )
-
- RDS_KEY = 'call_channel_list'
-
-
- class User(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- acct = db.Column(db.String(64))
- disp = db.Column(db.String(64))
- avat = db.Column(db.String(256))
- url = db.Column(db.String(128))
-
- def __init__(self, id):
- self.id = id
-
- def __repr__(self):
- return f"{self.id}@{self.acct}[{self.disp}]"
-
- @property
- def is_active(self):
- return True
-
- @property
- def is_authenticated(self):
- return True
-
- @property
- def is_anonymous(self):
- return False
-
- def get_id(self):
- return self.id
-
-
- db.create_all()
-
-
- @login_manager.user_loader
- def load_user(id):
- return User.query.get(id)
-
-
- @login_manager.unauthorized_handler
- def unauthorized():
- return redirect(C.login_url)
-
-
- @app.route('/call/static/<path:path>')
- def send_static_file(path):
- return send_from_directory('static/', path)
-
-
- def calc_token(cid, uid):
- uid = uid % (2**30 - 1)
- expireTimeInSeconds = 3600 * 5
- currentTimestamp = int(time.time())
- privilegeExpiredTs = currentTimestamp + expireTimeInSeconds
-
- return RtcTokenBuilder.buildTokenWithUid(C.app_id, C.app_certificate, cid, uid, Role_Attendee, privilegeExpiredTs)
-
-
- def calc_rtm_token(username):
- expireTimeInSeconds = 3600 * 5
- currentTimestamp = int(time.time())
- privilegeExpiredTs = currentTimestamp + expireTimeInSeconds
-
- return RtmTokenBuilder.buildToken(C.app_id, C.app_certificate, username, Role_Rtm_User, privilegeExpiredTs)
-
-
- @app.route('/call/')
- @login_required
- def homepage():
- me = current_user
- app_id = C.app_id
- rtm_token = calc_rtm_token(me.acct)
- _cid = request.args.get('cid')
-
- if _cid and not rds.hexists(RDS_KEY, _cid):
- abort(404)
-
- r = requests.get(
- C.ago_api + '/dev/v1/channel/' + app_id,
- headers={'Authorization': C.ago_auth, 'Content-Type': 'application/json'}
- ) # TODO 加缓存
- j = r.json()
-
- if not j.get('success'):
- return '连接声网出错', 500
- remote_list = j.get('data').get('channels')
-
- cnt_dict = {}
- for ch in remote_list:
- cnt_dict[ch['channel_name']] = ch['user_count']
-
- chs = [(_cid, rds.hget(RDS_KEY, _cid))] if _cid else rds.hgetall(RDS_KEY).items()
- ch_list = []
- for cid, s_info in chs:
- info = json.loads(s_info)
- if cid not in cnt_dict:
- if not info['empty_time']:
- info['empty_time'] = int(time.time())
- rds.hset(RDS_KEY, cid, json.dumps(info))
- elif int(time.time()) - info['empty_time'] > 1800:
- rds.hdel(RDS_KEY, cid)
- continue
- ch_list.append(
- (cid, info['title'], info['is_private'],
- User.query.get(info['creator_id']),
- calc_token(cid, me.id), cnt_dict.get(cid, 0))
- )
-
- return render_template('homepage.html', **locals())
-
-
- @app.route('/call/new', methods=['POST'])
- @login_required
- def new_channel():
- title = request.form.get('title')
- cid = request.form.get('cid') or str(time.time())
- is_private = request.form.get('private') == 'on'
-
- if not title or len(title) > 50:
- abort(400)
-
- ch_d = {
- 'title': title,
- 'is_private': is_private,
- 'creator_id': current_user.id,
- 'empty_time': None
- }
- rds.hset(RDS_KEY, cid, json.dumps(ch_d))
- return redirect('.?cid=' + cid)
-
-
- @app.route('/call/api/user/<int:uid>')
- @login_required
- def user_info(uid):
- user = User.query.get(uid)
- if not user:
- abort(404)
- return {
- key: getattr(user, key)
- for key in ('acct', 'disp', 'avat', 'url')
- }
-
-
- @app.route('/call/auth')
- @limiter.limit("10 / hour")
- def auth():
- code = request.args.get('code')
- client = Mastodon(client_id=C.client_id, client_secret=C.client_secret, api_base_url=C.mas_base_url)
- token = client.log_in(code=code, redirect_uri=C.redirect_uri, scopes=['read:accounts'])
- info = client.account_verify_credentials()
-
- u = User.query.get(info.id)
- if not u:
- u = User(info.id)
- db.session.add(u)
-
- u.acct = info.acct
- u.disp = info.display_name
- u.avat = info.avatar
- u.url = info.url
-
- db.session.commit()
-
- login_user(u, remember=True)
-
- return redirect('.')
-
-
- @app.route('/call/logout')
- @login_required
- def logout():
- logout_user()
- return redirect('.')
-
-
- if __name__ == '__main__':
- app.run(debug=True)
|