from flask import Flask, request, render_template, send_from_directory, abort, redirect, session from flask_sqlalchemy import SQLAlchemy from flask_limiter import Limiter from flask_limiter.util import get_remote_address from mastodon import Mastodon import re import random from datetime import datetime from dateutil.tz import tzlocal import html2text from config import C app = Flask(__name__) app.config.from_object('config.C') app.secret_key = C.session_key th = Mastodon( access_token = C.token, api_base_url = 'https://' + C.domain ) limiter = Limiter( app, key_func=get_remote_address, default_limits=["50 / minute"], ) h2t = html2text.HTML2Text() h2t.ignore_links = True db = SQLAlchemy(app) class Candidate(db.Model): id = db.Column(db.Integer, primary_key=True) content = db.Column(db.String(4000)) private = db.Column(db.String(1000)) url = db.Column(db.String(50)) time = db.Column(db.DateTime) toot = db.Column(db.BigInteger) likeNum = db.Column(db.Integer, default=0) class Like(db.Model): id = db.Column(db.Integer, primary_key=True) cid = db.Column(db.Integer) uid = db.Column(db.Integer) db.create_all() @app.route('/img/') def send_img(path): return send_from_directory('static/img', path) @app.route('/ordinary/set_session') @limiter.limit("3 / hour; 1 / 5 minute") def set_session(): if 'uid' not in session: session['uid'] = random.randint(0, 2000000000) return redirect('.') @app.route('/ordinary/') def can_list(): key = request.args.get('key') sort_by = request.args.get('sort_by', 'time') if 'uid' not in session: return redirect('set_session') uid = session['uid'] q = Candidate.query q = q.order_by(db.desc('likeNum')) if sort_by=='likeNum' else q.order_by(db.desc('id')) pag = q.paginate(max_per_page=100) def check_like(c): c.liked = 'liked' if Like.query.filter_by(uid=uid, cid=c.id).count() else 'like' return c pag.items = map(check_like, pag.items) vs = [{ 'name': name, 'ques': ques, 'hint': hint } for name, ques, hint, ans in C.verify ] return render_template('list.html', pagination=pag, vs=vs, showPrivate=(key==C.key), sort_by=sort_by, key=key) @app.route('/ordinary/new', methods=['POST']) @limiter.limit("5 / hour; 1 / 2 second") def new_one(): content = request.form.get('text') private = request.form.get('privateText') url = request.form.get('url') for name, ques, hint, ans in C.verify: if request.form.get(name) != ans: return ''' 错误

验证问题回答错误

回退 ''', 401 if not content or len(content)>4000: abort(422) if private and len(private)>1000: abort(422) if url and not re.match('https://(cloud\.tsinghua\.edu\.cn/f/[0-9a-z]+/(\?dl=1)?|closed\.social/safeShare/\d([a-zA-Z]+)?)', url): abort(422) if not Candidate.query.filter_by(content=content).first(): toot = th.status_post( f"有新的自荐报名(大家可以直接在此处评论):\n\n{content}", visibility=C.visibility ) c = Candidate( content=content, private=private, url=url, toot=toot.id, time=datetime.now() ) db.session.add(c) db.session.commit() return redirect(".") @limiter.limit("100 / hour; 2 / second") @app.route('/ordinary//comments') def get_comments(toot): c = Candidate.query.filter_by(toot=toot).first() if not c: abort(404) context = th.status_context(toot) replies = [ { 'disp': (t.account.display_name or t.account.acct), 'url': t.account.url, 'content': h2t.handle(t.content).replace(C.bot_name,'').strip(), 'time': str(t.created_at) } for t in context.descendants ] d = list(filter( lambda r: r['content'] == '删除' and r['url'].split('/@')[1] in C.admins, replies )) if d: db.session.delete(c) db.session.commit() th.status_delete(toot) return '该内容已被删除', 404 return {'replies': replies} @limiter.limit("100 / hour") @app.route('/ordinary//like', methods=['POST']) def like(toot): c = Candidate.query.filter_by(toot=toot).first() if not c: abort(404) uid = session['uid'] if not uid: abort(401) if Like.query.filter_by(uid=uid, cid=c.id).first(): return '点赞过了', 403 l = Like(uid=uid, cid=c.id) c.likeNum += 1 db.session.add(l) db.session.commit() return str(c.likeNum) if __name__ == '__main__': app.run(debug=True)