from flask import Flask, request, render_template, send_from_directory, abort, redirect, session
from flask_sqlalchemy import SQLAlchemy
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from mastodon import Mastodon
import re
import random
from datetime import datetime
from dateutil.tz import tzlocal
import html2text
from config import C
WRONG_ANS_HTML = '''
错误
验证问题回答错误
回退
'''
app = Flask(__name__)
app.config.from_object('config.C')
app.secret_key = C.session_key
th = Mastodon(
access_token = C.token,
api_base_url = 'https://' + C.domain
)
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["50 / minute"],
)
h2t = html2text.HTML2Text()
h2t.ignore_links = True
db = SQLAlchemy(app)
class Candidate(db.Model):
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.String(4000))
private = db.Column(db.String(1000))
url = db.Column(db.String(50))
time = db.Column(db.DateTime)
toot = db.Column(db.BigInteger)
likeNum = db.Column(db.Integer, default=0)
# always increment 1 for the id of a new record
__table_args__ = { 'sqlite_autoincrement': True }
class Like(db.Model):
id = db.Column(db.Integer, primary_key=True)
cid = db.Column(db.Integer) # (member) id of class Candidate
uid = db.Column(db.Integer) # id of user
db.create_all()
@app.route('/img/')
def send_img(path):
return send_from_directory('static/img', path)
@app.route('/ordinary/set_session')
@limiter.limit("2 / hour; 1 / 5 minute")
def set_session():
if 'uid' not in session:
session['uid'] = random.randint(0, 2000000000)
session.permanent = True
return redirect('.')
@app.route('/ordinary/')
def can_list():
key = request.args.get('key')
sort_by = request.args.get('sort_by', 'time')
if 'uid' not in session:
return redirect('set_session')
uid = session['uid']
q = Candidate.query
q = q.order_by(db.desc('likeNum')) if sort_by=='likeNum' else q.order_by(db.desc('id'))
pag = q.paginate(max_per_page=100)
def check_like(c):
c.liked = 'liked' if Like.query.filter_by(uid=uid, cid=c.id).count() else 'like'
return c
pag.items = map(check_like, pag.items)
vs = [{
'name': name,
'ques': ques,
'hint': hint
} for name, ques, hint, ans in C.verify
]
return render_template('list.html', pagination=pag, vs=vs, showPrivate=(key==C.key), sort_by=sort_by, key=key)
@app.route('/ordinary/new', methods=['POST'])
@limiter.limit("5 / hour; 1 / 2 second")
def new_one():
content = request.form.get('text')
private = request.form.get('privateText')
url = request.form.get('url')
for name, ques, hint, ans in C.verify:
if request.form.get(name) != ans:
return WRONG_ANS_HTML, 401
if not content or len(content)>4000: abort(422)
if private and len(private)>1000: abort(422)
if url and not re.match('https://(cloud\.tsinghua\.edu\.cn/f/[0-9a-z]+/(\?dl=1)?|closed\.social/safeShare/\d([a-zA-Z]+)?)', url): abort(422)
if not Candidate.query.filter_by(content=content).first():
toot = th.status_post(
f"有新的自荐报名(大家可以直接在此处评论):\n\n{content}",
visibility=C.visibility
)
c = Candidate(
content=content,
private=private,
url=url,
toot=toot.id,
time=datetime.now()
)
db.session.add(c)
db.session.commit()
return redirect(".")
@app.route('/ordinary/judge', methods=['POST'])
@limiter.limit("10 / hour; 1 / 2 second")
def judge():
group = request.form.get('groupType')
for name, ques, hint, ans in C.verify:
if request.form.get(name) != ans:
return WRONG_ANS_HTML, 401
return redirect(C.groups.get(group))
@limiter.limit("100 / hour; 2 / second")
@app.route('/ordinary//comments')
def get_comments(toot):
c = Candidate.query.filter_by(toot=toot).first()
if not c:
abort(404)
context = th.status_context(toot)
replies = [
{
'disp': (t.account.display_name or t.account.acct),
'url': t.account.url,
'content': h2t.handle(t.content).replace(C.bot_name,'').strip(),
'time': str(t.created_at)
}
for t in context.descendants
]
d = list(filter(
lambda r: r['content'] == '删除' and r['url'].split('/@')[1] in C.admins,
replies
))
if d:
db.session.delete(c)
db.session.commit()
th.status_delete(toot)
return '该内容已被删除', 404
return {'replies': replies}
@limiter.limit("100 / hour")
@app.route('/ordinary//like', methods=['POST'])
def like(toot):
c = Candidate.query.filter_by(toot=toot).first()
if not c:
abort(404)
uid = session['uid']
if not uid: abort(401)
if Like.query.filter_by(uid=uid, cid=c.id).first():
return '点赞过了', 403
l = Like(uid=uid, cid=c.id)
c.likeNum += 1
db.session.add(l)
db.session.commit()
return str(c.likeNum)
if __name__ == '__main__':
app.run(debug=True)