# -*- coding: utf-8 -*- from datetime import datetime from functools import wraps from flask import (Flask, request, render_template, send_from_directory, abort, redirect, session, Blueprint, url_for) from flask_sqlalchemy import SQLAlchemy from flask_limiter import Limiter from flask_limiter.util import get_remote_address from flask_caching import Cache from mastodon import Mastodon import random from config import C WRONG_ANS_HTML = ''' 错误

验证问题回答错误

回退 ''' MIN_LIKE_NUM = 10 app = Flask(__name__) app.config.from_object('config.C') app.secret_key = C.session_key cache = Cache(config={'CACHE_TYPE': 'SimpleCache'}) cache.init_app(app) MAST_LOGIN_URL = Mastodon(api_base_url=C.mast_base_uri).auth_request_url( client_id = C.mast_client_id, redirect_uris = C.mast_redirect_uri, scopes = ['read:accounts'] ) limiter = Limiter( app, key_func=get_remote_address, default_limits=["50 / minute"], ) db = SQLAlchemy(app) class Story(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(30)) avatar = db.Column(db.String(128)) text = db.Column(db.Text) tail = db.Column(db.Integer) # 最后一个 Paragraph 的 id total_like_num = db.Column(db.Integer, default=0) is_tree = db.Column(db.Boolean, default=False) def text_abstract(self, limit=200): return self.text[:limit] + ("..." if len(self.text) > limit else "") class Paragraph(db.Model): id = db.Column(db.Integer, primary_key=True) parent_id = db.Column(db.Integer, default=0, index=True) story_id = db.Column(db.Integer, index=True) is_hidden = db.Column(db.Boolean, default=False) is_chosen = db.Column(db.Boolean, default=False) text = db.Column(db.Text) author = db.Column(db.String(30)) time = db.Column(db.DateTime, default=datetime.now) like_num = db.Column(db.Integer, default=0, index=True) angry_num = db.Column(db.Integer, default=0) fun_num = db.Column(db.Integer, default=0) sweat_num = db.Column(db.Integer, default=0) @property def create_at(self): return self.time.strftime("%m-%d %H:%M") def reaction_status(self): user = session.get('username') return list(zip( '👍😡🤣😅👎', [self.like_num, self.angry_num, self.fun_num, self.sweat_num, 0], [ user and bool(Reaction.query.filter_by(pid=self.id, user=user, kind=i).first()) for i in range(1, 6) ], range(1, 6) )) class Reaction(db.Model): id = db.Column(db.Integer, primary_key=True) kind = db.Column(db.SmallInteger) # 1: like 2: angry 3: funny 4: sweat 5:dislike pid = db.Column(db.Integer, index=True) # id of paragraph user = db.Column(db.String(30)) # username of user def choose_new_next(min_like_num=MIN_LIKE_NUM): for story in Story.query.filter_by(is_tree=False).all(): last_paragraph_id = story.tail next_one = Paragraph.query.filter_by(story_id=story.id, parent_id=last_paragraph_id, is_hidden=False)\ .order_by(Paragraph.like_num.desc()).first() if next_one and next_one.like_num >= min_like_num: story.text += next_one.text story.total_like_num += next_one.like_num story.tail = next_one.id next_one.is_chosen = True for story in Story.query.filter_by(is_tree=True).all(): _max = story.total_like_num _tail = story.tail _sum_dict = {} def _get_like_sum(p): if p.id in _sum_dict: return _sum_dict[p.id] s = p.like_num + (p.parent_id and _get_like_sum(Paragraph.query.get(p.parent_id))) _sum_dict[p.id] = s return s for p in Paragraph.query.filter(Paragraph.like_num >= min_like_num)\ .filter_by(story_id=story.id, is_hidden=False).all(): if _get_like_sum(p) > _max: _max = _get_like_sum(p) _tail = p.id p = Paragraph.query.get(_tail) _text = p.text while p.parent_id: p = Paragraph.query.get(p.parent_id) _text = p.text + _text story.total_like_num = _max story.tail = _tail story.text = _text db.session.commit() def sample_question(qs, n=3): random.seed(session['uid']) return random.sample(qs, n) def login_required(func): @wraps(func) def warp(*args, **kwargs): if not session.get('username'): abort(403) return func(*args, **kwargs) return warp bp = Blueprint('main_bp', __name__, url_prefix='/ordinary') @bp.before_request def set_uid(): if 'uid' not in session: session['uid'] = random.randint(0, 10000000) @bp.route('/static/') def send_static(path): return send_from_directory('static/', path) @bp.route('/guest_login', methods=['POST']) @limiter.limit("5 / hour") def guest_login(): for name, ques, hint, ans in sample_question(C.verify_questions): if request.form.get(name) != ans: return WRONG_ANS_HTML, 401 session['username'] = 'guest<%s>' % session['uid'] session.pop('avatar', None) session.permanent = True return redirect(request.referrer) @bp.route('/mast_auth') def mast_login_auth(): code = request.args.get('code') client = Mastodon( client_id=C.mast_client_id, client_secret=C.mast_client_sec, api_base_url=C.mast_base_uri ) client.log_in(code=code, redirect_uri=C.mast_redirect_uri, scopes=['read:accounts']) info = client.account_verify_credentials() session['username'] = info.acct session['avatar'] = info.avatar session.permanent = True return redirect('.') @bp.route('/logout') def logout(): session.pop('username', None) session.pop('avatar', None) return redirect('.') @bp.route('/') @cache.cached(timeout=10) def home(): stories = Story.query.order_by(Story.total_like_num.desc()).all() return render_template('home.html', **locals()) @bp.route('/') def story(story_id): story = Story.query.get_or_404(story_id) is_tree = story.is_tree draft = '' if is_tree: tail = request.args.get('tail', story.tail, type=int) p_tail = Paragraph.query.get_or_404(tail) if p_tail.story_id != story_id: abort(404) paragraph_part = [p_tail] p = p_tail while p.parent_id: p = Paragraph.query.get_or_404(p.parent_id) paragraph_part.insert(0, p) else: tail = story.tail paragraph_part = Paragraph.query.filter_by( story_id=story_id, is_chosen=True, is_hidden=False ).all() if 'username' in session: last_post = Paragraph.query.filter_by( story_id=story_id, author=session['username'], is_chosen=False ).order_by(Paragraph.id.desc()).first() if last_post and last_post.parent_id != tail: draft = last_post.text sort_by = request.args.get('sort_by', 'time') q = Paragraph.query.filter_by(story_id=story_id, parent_id=tail, is_hidden=False) q = q.order_by(Paragraph.like_num.desc() if sort_by == 'like' else Paragraph.id.desc()) pagination = q.paginate(max_per_page=100) username = session.get('username', '') avatar = session.get('avatar', C.guest_avatar) cs_login_url = MAST_LOGIN_URL guest_login_url = url_for('main_bp.guest_login') verify_questions = sample_question(C.verify_questions) min_like_num = MIN_LIKE_NUM email = C.email return render_template('story.html', **locals()) @bp.route('/create', methods=['POST']) @login_required @limiter.limit("66 / hour") def create(): story_id = request.form.get('story-id') text = request.form.get('text') if not text or len(text) > 140: abort(422) story = Story.query.get_or_404(story_id) if story.is_tree: parent_id = request.form.get('tail', type=int) else: parent_id = story.tail p = Paragraph( parent_id=parent_id, story_id=story_id, text=text, author=session['username'] ) db.session.add(p) db.session.commit() return redirect(story_id) @bp.route('/react', methods=['POST']) @login_required @limiter.limit("100 / minute") def react(): kind = request.form.get('kind', type=int) pid = request.form.get('pid', type=int) if kind not in range(1, 6): abort(422) p = Paragraph.query.get_or_404(pid) d = dict(kind=kind, user=session['username'], pid=pid) if Reaction.query.filter_by(**d).first(): return '' db.session.add(Reaction(**d)) n = '' if kind == 1: p.like_num += 1 n = p.like_num elif kind == 2: p.angry_num += 1 n = p.angry_num elif kind == 3: p.fun_num += 1 n = p.fun_num elif kind == 4: p.sweat_num += 1 n = p.sweat_num db.session.commit() return str(n) app.register_blueprint(bp) if __name__ == '__main__': app.run(debug=True)