- # -*- coding: utf-8 -*-
- from functools import wraps
- from flask import Flask, request, render_template, send_from_directory, abort, redirect, session
- from flask_sqlalchemy import SQLAlchemy
- from flask_limiter import Limiter
- from flask_limiter.util import get_remote_address
- from mastodon import Mastodon
- import re
- import random
- from datetime import datetime
- from dateutil.tz import tzlocal
- import html2text
- from config import C
- WRONG_ANS_HTML = '''<html>
- <head>
- <meta charset='UTF-8'>
- <meta name='viewport' content='width=device-width initial-scale=1'>
- <title>错误</title>
- </head>
- <body>
- <h1>验证问题回答错误</h1>
- <a href="##" onclick="window.history.back()">回退</a>
- </body>
- </html>
- '''
- app = Flask(__name__)
- app.config.from_object('config.C')
- app.secret_key = C.session_key
- th = Mastodon(
- access_token = C.token,
- api_base_url = 'https://' + C.domain
- )
- limiter = Limiter(
- app,
- key_func=get_remote_address,
- default_limits=["50 / minute"],
- )
- h2t = html2text.HTML2Text()
- h2t.ignore_links = True
- db = SQLAlchemy(app)
- class Candidate(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- content = db.Column(db.String(4000))
- private = db.Column(db.String(1000))
- url = db.Column(db.String(50))
- time = db.Column(db.DateTime)
- toot = db.Column(db.BigInteger)
- likeNum = db.Column(db.Integer, default=0)
- # always increment 1 for the id of a new record
- __table_args__ = { 'sqlite_autoincrement': True }
- class Like(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- cid = db.Column(db.Integer) # (member) id of class Candidate
- uid = db.Column(db.Integer) # id of user
- db.create_all()
- def need_verify(func):
- @wraps(func)
- def warp(*args, **kwargs):
- print(session)
- if not session.get('verified'):
- abort(403)
- return func(*args, **kwargs)
- return warp
- @app.route('/img/<path:path>')
- def send_img(path):
- return send_from_directory('static/img', path)
- @app.route('/ordinary/verify', methods=['POST'])
- @limiter.limit("3 / hour")
- def verify():
- for name, ques, hint, ans in C.verify:
- if request.form.get(name) != ans:
- return WRONG_ANS_HTML, 401
- session['verified'] = True
- return redirect('.')
- @app.route('/ordinary/set_session')
- @limiter.limit("2 / hour; 1 / 5 minute")
- def set_session():
- if 'uid' not in session:
- session['uid'] = random.randint(0, 2000000000)
- session.permanent = True
- return redirect('.')
- @app.route('/ordinary/')
- def can_list():
- key = request.args.get('key')
- sort_by = request.args.get('sort_by', 'time')
- if 'uid' not in session:
- return redirect('set_session')
- uid = session.get('uid')
- q = Candidate.query
- q = q.order_by(db.desc('likeNum')) if sort_by=='likeNum' else q.order_by(db.desc('id'))
- pag = q.paginate(max_per_page=100)
- def check_like(c):
- c.liked = 'liked' if Like.query.filter_by(uid=uid, cid=c.id).count() else 'like'
- return c
- pag.items = map(check_like, pag.items)
- vs = [{
- 'name': name,
- 'ques': ques,
- 'hint': hint
- } for name, ques, hint, ans in C.verify
- ]
- return render_template('list.html', pagination=pag, vs=vs, verified=session.get('verified'), showPrivate=(key==C.key), sort_by=sort_by, key=key, base_toot_url='https://%s/web/statuses/' % C.domain, step2=C.step2, text1=C.text1, text2=C.text2)
- @app.route('/ordinary/new', methods=['POST'])
- @limiter.limit("5 / hour; 1 / 2 second")
- @need_verify
- def new_one():
- content = request.form.get('text')
- private = request.form.get('privateText')
- url = request.form.get('url')
- if not content or len(content)>4000: abort(422)
- if private and len(private)>1000: abort(422)
- if url and not re.match('https://(cloud\.tsinghua\.edu\.cn/f/[0-9a-z]+/(\?dl=1)?|closed\.social/safeShare/\d([a-zA-Z]+)?)', url): abort(422)
- if not Candidate.query.filter_by(content=content).first():
- toot = th.status_post(
- f"有新的自荐报名(大家可以直接在此处评论):\n\n{content}",
- visibility=C.visibility
- )
- c = Candidate(
- content=content,
- private=private,
- url=url,
- toot=toot.id,
- time=datetime.now()
- )
- db.session.add(c)
- db.session.commit()
- return redirect(".")
- @app.route('/ordinary/judge', methods=['POST'])
- @limiter.limit("10 / hour; 1 / 2 second")
- @need_verify
- def judge():
- group = request.form.get('groupType')
- return redirect(C.groups.get(group))
- @limiter.limit("100 / hour; 2 / second")
- @app.route('/ordinary/<int:toot>/comments')
- def get_comments(toot):
- c = Candidate.query.filter_by(toot=toot).first()
- if not c:
- abort(404)
- context = th.status_context(toot)
- replies = [
- {
- 'disp': (t.account.display_name or t.account.acct),
- 'url': t.account.url,
- 'content': h2t.handle(t.content).replace(C.bot_name,'').strip(),
- 'time': str(t.created_at)
- }
- for t in context.descendants
- ]
- d = list(filter(
- lambda r: r['content'] == '删除' and r['url'].split('/@')[1] in C.admins,
- replies
- ))
- if d:
- db.session.delete(c)
- db.session.commit()
- th.status_delete(toot)
- return '该内容已被删除', 404
- return {'replies': replies}
- @limiter.limit("100 / hour")
- @app.route('/ordinary/<int:toot>/like', methods=['POST'])
- def like(toot):
- c = Candidate.query.filter_by(toot=toot).first()
- if not c:
- abort(404)
- uid = session['uid']
- if not uid: abort(401)
- if Like.query.filter_by(uid=uid, cid=c.id).first():
- return '点赞过了', 403
- l = Like(uid=uid, cid=c.id)
- c.likeNum += 1
- db.session.add(l)
- db.session.commit()
- return str(c.likeNum)
- if __name__ == '__main__':
- app.run(debug=True)