from functools import wraps from flask import Flask, request, render_template, send_from_directory, abort, redirect, session from flask_sqlalchemy import SQLAlchemy from flask_limiter import Limiter from flask_limiter.util import get_remote_address from mastodon import Mastodon import re import random from datetime import datetime from dateutil.tz import tzlocal import html2text from config import C WRONG_ANS_HTML = ''' 错误

验证问题回答错误

回退 ''' app = Flask(__name__) app.config.from_object('config.C') app.secret_key = C.session_key th = Mastodon( access_token = C.token, api_base_url = 'https://' + C.domain ) limiter = Limiter( app, key_func=get_remote_address, default_limits=["50 / minute"], ) h2t = html2text.HTML2Text() h2t.ignore_links = True db = SQLAlchemy(app) class Candidate(db.Model): id = db.Column(db.Integer, primary_key=True) content = db.Column(db.String(4000)) private = db.Column(db.String(1000)) url = db.Column(db.String(50)) time = db.Column(db.DateTime) toot = db.Column(db.BigInteger) likeNum = db.Column(db.Integer, default=0) # always increment 1 for the id of a new record __table_args__ = { 'sqlite_autoincrement': True } class Like(db.Model): id = db.Column(db.Integer, primary_key=True) cid = db.Column(db.Integer) # (member) id of class Candidate uid = db.Column(db.Integer) # id of user db.create_all() def need_verify(func): @wraps(func) def warp(*args, **kwargs): print(session) if not session.get('verified'): abort(403) return func(*args, **kwargs) return warp @app.route('/img/') def send_img(path): return send_from_directory('static/img', path) @app.route('/ordinary/verify', methods=['POST']) @limiter.limit("3 / hour") def verify(): for name, ques, hint, ans in C.verify: if request.form.get(name) != ans: return WRONG_ANS_HTML, 401 session.permanent = True session['verified'] = True if 'uid' not in session: session['uid'] = random.randint(0, 2000000000) return redirect('.') @app.route('/ordinary/') def can_list(): key = request.args.get('key') sort_by = request.args.get('sort_by', 'time') uid = session.get('uid') q = Candidate.query q = q.order_by(db.desc('likeNum')) if sort_by=='likeNum' else q.order_by(db.desc('id')) pag = q.paginate(max_per_page=100) def check_like(c): c.liked = 'liked' if Like.query.filter_by(uid=uid, cid=c.id).count() else 'like' return c pag.items = map(check_like, pag.items) vs = [{ 'name': name, 'ques': ques, 'hint': hint } for name, ques, hint, ans in C.verify ] return render_template('list.html', pagination=pag, vs=vs, verified=session.get('verified'), showPrivate=(key==C.key), sort_by=sort_by, key=key, base_toot_url='https://%s/web/statuses/' % C.domain) @app.route('/ordinary/new', methods=['POST']) @limiter.limit("5 / hour; 1 / 2 second") @need_verify def new_one(): content = request.form.get('text') private = request.form.get('privateText') url = request.form.get('url') if not content or len(content)>4000: abort(422) if private and len(private)>1000: abort(422) if url and not re.match('https://(cloud\.tsinghua\.edu\.cn/f/[0-9a-z]+/(\?dl=1)?|closed\.social/safeShare/\d([a-zA-Z]+)?)', url): abort(422) if not Candidate.query.filter_by(content=content).first(): toot = th.status_post( f"有新的自荐报名(大家可以直接在此处评论):\n\n{content}", visibility=C.visibility ) c = Candidate( content=content, private=private, url=url, toot=toot.id, time=datetime.now() ) db.session.add(c) db.session.commit() return redirect(".") @app.route('/ordinary/judge', methods=['POST']) @limiter.limit("10 / hour; 1 / 2 second") @need_verify def judge(): group = request.form.get('groupType') return redirect(C.groups.get(group)) @limiter.limit("100 / hour; 2 / second") @app.route('/ordinary//comments') def get_comments(toot): c = Candidate.query.filter_by(toot=toot).first() if not c: abort(404) context = th.status_context(toot) replies = [ { 'disp': (t.account.display_name or t.account.acct), 'url': t.account.url, 'content': h2t.handle(t.content).replace(C.bot_name,'').strip(), 'time': str(t.created_at) } for t in context.descendants ] d = list(filter( lambda r: r['content'] == '删除' and r['url'].split('/@')[1] in C.admins, replies )) if d: db.session.delete(c) db.session.commit() th.status_delete(toot) return '该内容已被删除', 404 return {'replies': replies} @limiter.limit("100 / hour") @app.route('/ordinary//like', methods=['POST']) @need_verify def like(toot): c = Candidate.query.filter_by(toot=toot).first() if not c: abort(404) uid = session['uid'] if not uid: abort(401) if Like.query.filter_by(uid=uid, cid=c.id).first(): return '点赞过了', 403 l = Like(uid=uid, cid=c.id) c.likeNum += 1 db.session.add(l) db.session.commit() return str(c.likeNum) if __name__ == '__main__': app.run(debug=True)