|
|
- from flask import Flask, request, render_template, send_from_directory, abort, redirect
- from flask_sqlalchemy import SQLAlchemy
- from flask_limiter import Limiter
- from flask_limiter.util import get_remote_address
- from mastodon import Mastodon
- import re
- from datetime import datetime
- from dateutil.tz import tzlocal
- import html2text
- from config import C
-
- app = Flask(__name__)
- app.config.from_object('config.C')
-
- th = Mastodon(
- access_token = C.token,
- api_base_url = 'https://' + C.domain
- )
-
- limiter = Limiter(
- app,
- key_func=get_remote_address,
- default_limits=["50 / minute"],
- )
-
- h2t = html2text.HTML2Text()
- h2t.ignore_links = True
-
- db = SQLAlchemy(app)
-
- class Candidate(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- content = db.Column(db.String(4000))
- private = db.Column(db.String(1000))
- url = db.Column(db.String(50))
- time = db.Column(db.DateTime)
- toot = db.Column(db.BigInteger)
-
- db.create_all()
-
- @app.route('/ordinary/')
- def list():
- key = request.args.get('key')
- pag = Candidate.query.order_by(db.desc('id')).paginate(max_per_page=100)
-
- vs = [{
- 'name': name,
- 'ques': ques,
- 'hint': hint
- } for name, ques, hint, ans in C.verify
- ]
-
- return render_template('list.html', pagination=pag, vs=vs, showPrivate=(key==C.key))
-
- @app.route('/ordinary/new', methods=['POST'])
- @limiter.limit("5 / hour; 1 / 2 second")
- def new_one():
-
- content = request.form.get('text')
- private = request.form.get('privateText')
- url = request.form.get('url')
-
- for name, ques, hint, ans in C.verify:
- if request.form.get(name) != ans: abort(401)
-
- if not content or len(content)>4000: abort(422)
- if private and len(private)>1000: abort(422)
- if url and not re.match('https://(cloud\.tsinghua\.edu\.cn/f/[0-9a-z]+/(\?dl=1)?)|(closed\.social/safeShare/\d([a-zA-Z]+)?)', url): abort(422)
-
- if not Candidate.query.filter_by(content=content).first():
- toot = th.status_post(
- f"叮~ 有新的自荐报名(大家可以直接在此处评论):\n\n{content}",
- visibility='unlisted'
- )
-
- c = Candidate(
- content=content,
- private=private,
- url=url,
- toot=toot.id,
- time=datetime.now()
- )
- db.session.add(c)
- db.session.commit()
-
- return redirect(".")
-
- @limiter.limit("100 / hour; 2 / second")
- @app.route('/ordinary/<int:toot>')
- def get_replies(toot):
- c = Candidate.query.filter_by(toot=toot).first()
- if not c:
- abort(404)
-
- context = th.status_context(toot)
- replies = [
- {
- 'disp': (t.account.display_name or t.account.acct),
- 'url': t.account.url,
- 'content': h2t.handle(t.content).replace(C.bot_name,'').strip(),
- 'time': str(t.created_at)
- }
- for t in context.descendants
- ]
- d = list(filter(
- lambda r: r['content'] == '删除' and r['url'].split('/@')[1] in C.admins,
- replies
- ))
- if d:
- db.session.delete(c)
- db.session.commit()
- th.status_delete(toot)
- return '该内容已被删除', 404
-
- return {'replies': replies}
-
- if __name__ == '__main__':
- app.run(debug=True)
-
|