# -*- coding: utf-8 -*- from functools import wraps from flask import (Flask, request, render_template, send_from_directory, abort, redirect, session, Blueprint, url_for) from flask_sqlalchemy import SQLAlchemy from flask_limiter import Limiter from flask_limiter.util import get_remote_address from mastodon import Mastodon import random from config import C WRONG_ANS_HTML = ''' 错误

验证问题回答错误

回退 ''' app = Flask(__name__) app.config.from_object('config.C') app.secret_key = C.session_key MAST_LOGIN_URL = Mastodon(api_base_url=C.mast_base_uri).auth_request_url( client_id = C.mast_client_id, redirect_uris = C.mast_redirect_uri, scopes = ['read:accounts'] ) limiter = Limiter( app, key_func=get_remote_address, default_limits=["50 / minute"], ) db = SQLAlchemy(app) class Story(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(30)) avatar = db.Column(db.String(128)) text = db.Column(db.Text) tail = db.Column(db.Integer) # 最后一个Paragraph的id total_like_num = db.Column(db.Integer, default=0) is_tree = db.Column(db.Boolean, default=False) def text_abstract(self, limit=200): return self.text[:limit] + ("..." if len(self.text) > limit else "") class Paragraph(db.Model): id = db.Column(db.Integer, primary_key=True) parent_id = db.Column(db.Integer, default=0, index=True) story_id = db.Column(db.Integer, default=0, index=True) is_hidden = db.Column(db.Boolean, default=False) is_chosen = db.Column(db.Boolean, default=False) text = db.Column(db.Text) author = db.Column(db.String(30)) time = db.Column(db.DateTime) like_num = db.Column(db.Integer, default=0, index=True) angry_num = db.Column(db.Integer, default=0) fun_num = db.Column(db.Integer, default=0) sweat_num = db.Column(db.Integer, default=0) @property def create_at(self): return self.time.strftime("%m-%d %H:%M") # always increment 1 for the id of a new record __table_args__ = {'sqlite_autoincrement': True} class Reaction(db.Model): id = db.Column(db.Integer, primary_key=True) kind = db.Column(db.SmallInteger) # 1: like 2: angry 3: funny 4: sweat pid = db.Column(db.Integer, index=True) # id of paragraph user = db.Column(db.String(30)) # username of user def choose_new_next(min_like_num=10): for story in Story.query.filter_by(is_tree=False).all(): last_paragraph_id = Story.tail next_one = Paragraph.query.filter_by(parent_id=last_paragraph_id, is_hidden=False)\ .order_by(Paragraph.like_num.desc()).first() if next_one and next_one.like_num >= min_like_num: story.text += next_one.text story.total_like_num += next_one.like_num story.tail = next_one.id next_one.is_chosen = True db.session.commit() def sample_question(qs, n=3): random.seed(session['uid']) return random.sample(qs, n) def login_required(func): @wraps(func) def warp(*args, **kwargs): if not session.get('username'): abort(403) return func(*args, **kwargs) return warp bp = Blueprint('main_bp', __name__, url_prefix='/ordinary') @bp.before_request def set_uid(): if 'uid' not in session: session['uid'] = random.randint(0, 10000000) @bp.route('/static/') def send_static(path): return send_from_directory('static/', path) @bp.route('/guest_login', methods=['POST']) @limiter.limit("5 / hour") def guest_login(): for name, ques, hint, ans in sample_question(C.verify_questions): if request.form.get(name) != ans: return WRONG_ANS_HTML, 401 session['username'] = 'guest<%s>' % session['uid'] session.pop('avatar', None) session.permanent = True return redirect(request.referrer) @bp.route('/mast_auth') def mast_login_auth(): code = request.args.get('code') client = Mastodon( client_id=C.mast_client_id, client_secret=C.mast_client_sec, api_base_url=C.mast_base_uri ) client.log_in(code=code, redirect_uri=C.mast_redirect_uri, scopes=['read:accounts']) info = client.account_verify_credentials() session['username'] = info.acct session['avatar'] = info.avatar session.permanent = True return redirect('.') @bp.route('/logout') def logout(): session.pop('username', None) session.pop('avatar', None) return redirect('.') @bp.route('/') def home(): stories = Story.query.order_by(Story.total_like_num.desc()).all() return render_template('home.html', **locals()) @bp.route('/') def story(story_id): story = Story.query.get_or_404(story_id) is_tree = story.is_tree paragraph_part = Paragraph.query.filter_by( story_id=story_id, is_chosen=True, is_hidden=False ).all() sort_by = request.args.get('sort_by', 'time') q = Paragraph.query.filter_by(parent_id=story.tail, is_hidden=False) q = q.order_by(Paragraph.like_num if sort_by == 'like' else Paragraph.id.desc()) pagination = q.paginate(max_per_page=100) username = session.get('username', '') avatar = session.get('avatar', C.guest_avatar) cs_login_url = MAST_LOGIN_URL guest_login_url = url_for('main_bp.guest_login') verify_questions = sample_question(C.verify_questions) return render_template('story.html', **locals()) ''' key = request.args.get('key') sort_by = request.args.get('sort_by', 'time') final_list = request.args.get('final_list', '') if 'uid' not in session: return redirect('set_session') uid = session.get('uid') q = Candidate.query if final_list and C.step2.get('final_list'): q = q.filter(Candidate.id.in_(C.step2['final_list'])) q = q.order_by(db.desc('likeNum')) if sort_by=='likeNum' else q.order_by(db.desc('id')) pag = q.paginate(max_per_page=100) def check_like(c): c.liked = 'liked' if Like.query.filter_by(uid=uid, cid=c.id).count() else 'like' return c pag.items = map(check_like, pag.items) vs = [{ 'name': name, 'ques': ques, 'hint': hint } for name, ques, hint, ans in C.verify ] return render_template('list.html', pagination=pag, vs=vs, verified=session.get('verified'), showPrivate=(key==C.key), sort_by=sort_by, key=key, final_list=final_list,base_toot_url='https://%s/web/statuses/' % C.domain, step2=C.step2, text1=C.text1, text2=C.text2) @app.route('/ordinary/new', methods=['POST']) @limiter.limit("5 / hour; 1 / 2 second") @need_verify def new_one(): content = request.form.get('text') private = request.form.get('privateText') url = request.form.get('url') if not content or len(content)>4000: abort(422) if private and len(private)>1000: abort(422) if url and not re.match('https://(cloud\.tsinghua\.edu\.cn/f/[0-9a-z]+/(\?dl=1)?|closed\.social/safeShare/\d([a-zA-Z]+)?)', url): abort(422) if not Candidate.query.filter_by(content=content).first(): toot = th.status_post( f"有新的自荐报名(大家可以直接在此处评论):\n\n{content}", visibility=C.visibility ) c = Candidate( content=content, private=private, url=url, toot=toot.id, time=datetime.now() ) db.session.add(c) db.session.commit() return redirect(".") context = th.status_context(toot) replies = [ { 'disp': (t.account.display_name or t.account.acct), 'url': t.account.url, 'content': h2t.handle(t.content).replace(C.bot_name,'').strip(), 'time': str(t.created_at) } for t in context.descendants ] d = list(filter( lambda r: r['content'] == '删除' and r['url'].split('/@')[1] in C.admins, replies )) if d: db.session.delete(c) db.session.commit() th.status_delete(toot) return '该内容已被删除', 404 return {'replies': replies} @limiter.limit("100 / hour") @app.route('/ordinary//like', methods=['POST']) def like(toot): c = Candidate.query.filter_by(toot=toot).first() if not c: abort(404) uid = session['uid'] if not uid: abort(401) if Like.query.filter_by(uid=uid, cid=c.id).first(): return '点赞过了', 403 l = Like(uid=uid, cid=c.id) c.likeNum += 1 db.session.add(l) db.session.commit() return str(c.likeNum) ''' app.register_blueprint(bp) if __name__ == '__main__': app.run(debug=True)