from flask import Flask, request, render_template, send_from_directory, abort, redirect
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_limiter import Limiter
|
|
from flask_limiter.util import get_remote_address
|
|
from mastodon import Mastodon
|
|
import re
|
|
from datetime import datetime
|
|
from dateutil.tz import tzlocal
|
|
import html2text
|
|
from config import C
|
|
|
|
app = Flask(__name__)
|
|
app.config.from_object('config.C')
|
|
|
|
th = Mastodon(
|
|
access_token = C.token,
|
|
api_base_url = 'https://' + C.domain
|
|
)
|
|
|
|
limiter = Limiter(
|
|
app,
|
|
key_func=get_remote_address,
|
|
default_limits=["50 / minute"],
|
|
)
|
|
|
|
h2t = html2text.HTML2Text()
|
|
h2t.ignore_links = True
|
|
|
|
db = SQLAlchemy(app)
|
|
|
|
class Candidate(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
content = db.Column(db.String(4000))
|
|
private = db.Column(db.String(1000))
|
|
url = db.Column(db.String(50))
|
|
time = db.Column(db.DateTime)
|
|
toot = db.Column(db.BigInteger)
|
|
|
|
db.create_all()
|
|
|
|
@app.route('/ordinary/')
|
|
def can_list():
|
|
key = request.args.get('key')
|
|
pag = Candidate.query.order_by(db.desc('id')).paginate(max_per_page=100)
|
|
|
|
vs = [{
|
|
'name': name,
|
|
'ques': ques,
|
|
'hint': hint
|
|
} for name, ques, hint, ans in C.verify
|
|
]
|
|
|
|
return render_template('list.html', pagination=pag, vs=vs, showPrivate=(key==C.key))
|
|
|
|
@app.route('/ordinary/new', methods=['POST'])
|
|
@limiter.limit("5 / hour; 1 / 2 second")
|
|
def new_one():
|
|
|
|
content = request.form.get('text')
|
|
private = request.form.get('privateText')
|
|
url = request.form.get('url')
|
|
|
|
for name, ques, hint, ans in C.verify:
|
|
if request.form.get(name) != ans: abort(401)
|
|
|
|
if not content or len(content)>4000: abort(422)
|
|
if private and len(private)>1000: abort(422)
|
|
if url and not re.match('https://(cloud\.tsinghua\.edu\.cn/f/[0-9a-z]+/(\?dl=1)?)|(closed\.social/safeShare/\d([a-zA-Z]+)?)', url): abort(422)
|
|
|
|
if not Candidate.query.filter_by(content=content).first():
|
|
toot = th.status_post(
|
|
f"叮~ 有新的自荐报名(大家可以直接在此处评论):\n\n{content}",
|
|
visibility='unlisted'
|
|
)
|
|
|
|
c = Candidate(
|
|
content=content,
|
|
private=private,
|
|
url=url,
|
|
toot=toot.id,
|
|
time=datetime.now()
|
|
)
|
|
db.session.add(c)
|
|
db.session.commit()
|
|
|
|
return redirect(".")
|
|
|
|
@limiter.limit("100 / hour; 2 / second")
|
|
@app.route('/ordinary/<int:toot>')
|
|
def get_replies(toot):
|
|
c = Candidate.query.filter_by(toot=toot).first()
|
|
if not c:
|
|
abort(404)
|
|
|
|
context = th.status_context(toot)
|
|
replies = [
|
|
{
|
|
'disp': (t.account.display_name or t.account.acct),
|
|
'url': t.account.url,
|
|
'content': h2t.handle(t.content).replace(C.bot_name,'').strip(),
|
|
'time': str(t.created_at)
|
|
}
|
|
for t in context.descendants
|
|
]
|
|
d = list(filter(
|
|
lambda r: r['content'] == '删除' and r['url'].split('/@')[1] in C.admins,
|
|
replies
|
|
))
|
|
if d:
|
|
db.session.delete(c)
|
|
db.session.commit()
|
|
th.status_delete(toot)
|
|
return '该内容已被删除', 404
|
|
|
|
return {'replies': replies}
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True)
|
|
|