# -*- coding: utf-8 -*-
from datetime import datetime
from functools import wraps
from flask import (Flask, request, render_template, send_from_directory, abort,
redirect, session, Blueprint, url_for)
from flask_sqlalchemy import SQLAlchemy
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_caching import Cache
from mastodon import Mastodon
import random
from config import C
WRONG_ANS_HTML = '''
错误
验证问题回答错误
回退
'''
MIN_LIKE_NUM = 5
app = Flask(__name__)
app.config.from_object('config.C')
app.secret_key = C.session_key
cache = Cache(config={'CACHE_TYPE': 'SimpleCache'})
cache.init_app(app)
MAST_LOGIN_URL = Mastodon(api_base_url=C.mast_base_uri).auth_request_url(
client_id = C.mast_client_id,
redirect_uris = C.mast_redirect_uri, scopes = ['read:accounts']
)
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["50 / minute"],
)
db = SQLAlchemy(app)
class Story(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(30))
avatar = db.Column(db.String(128))
text = db.Column(db.Text)
tail = db.Column(db.Integer) # 最后一个 Paragraph 的 id
total_like_num = db.Column(db.Integer, default=0)
is_tree = db.Column(db.Boolean, default=False)
def text_abstract(self, limit=200):
return self.text[:limit] + ("..." if len(self.text) > limit else "")
class Paragraph(db.Model):
id = db.Column(db.Integer, primary_key=True)
parent_id = db.Column(db.Integer, default=0, index=True)
story_id = db.Column(db.Integer, index=True)
is_hidden = db.Column(db.Boolean, default=False)
is_chosen = db.Column(db.Boolean, default=False)
text = db.Column(db.Text)
author = db.Column(db.String(30))
time = db.Column(db.DateTime, default=datetime.now)
like_num = db.Column(db.Integer, default=0, index=True)
angry_num = db.Column(db.Integer, default=0)
fun_num = db.Column(db.Integer, default=0)
sweat_num = db.Column(db.Integer, default=0)
@property
def create_at(self):
return self.time.strftime("%m-%d %H:%M")
def reaction_status(self):
user = session['uid']
return list(zip(
'👍😡🤣😅👎',
[self.like_num, self.angry_num, self.fun_num, self.sweat_num, 0],
[
user and bool(Reaction.query.filter_by(pid=self.id, user=user, kind=i).first())
for i in range(1, 6)
],
range(1, 6)
))
class Reaction(db.Model):
id = db.Column(db.Integer, primary_key=True)
kind = db.Column(db.SmallInteger) # 1: like 2: angry 3: funny 4: sweat 5:dislike
pid = db.Column(db.Integer, index=True) # id of paragraph
user = db.Column(db.String(30)) # str(uid)
def choose_new_next(min_like_num=MIN_LIKE_NUM):
for story in Story.query.filter_by(is_tree=False).all():
last_paragraph_id = story.tail
next_one = Paragraph.query.filter_by(story_id=story.id, parent_id=last_paragraph_id, is_hidden=False)\
.order_by(Paragraph.like_num.desc()).first()
if next_one and next_one.like_num >= min_like_num:
story.text += next_one.text
story.total_like_num += next_one.like_num
story.tail = next_one.id
next_one.is_chosen = True
for story in Story.query.filter_by(is_tree=True).all():
_max = story.total_like_num
_tail = story.tail
_sum_dict = {}
def _get_like_sum(p):
if p.id in _sum_dict:
return _sum_dict[p.id]
s = p.like_num + (p.parent_id and _get_like_sum(Paragraph.query.get(p.parent_id)))
_sum_dict[p.id] = s
return s
for p in Paragraph.query.filter(Paragraph.like_num >= min_like_num)\
.filter_by(story_id=story.id, is_hidden=False).all():
if _get_like_sum(p) > _max:
_max = _get_like_sum(p)
_tail = p.id
p = Paragraph.query.get(_tail)
_text = p.text
while p.parent_id:
p = Paragraph.query.get(p.parent_id)
_text = p.text + _text
story.total_like_num = _max
story.tail = _tail
story.text = _text
db.session.commit()
def sample_question(qs, n=3):
random.seed(session['uid'])
return random.sample(qs, n)
def login_required(func):
@wraps(func)
def warp(*args, **kwargs):
if not session.get('username'):
abort(403)
return func(*args, **kwargs)
return warp
bp = Blueprint('main_bp', __name__, url_prefix='/ordinary')
@bp.before_request
def set_uid():
if 'uid' not in session:
session['uid'] = random.randint(0, 10000000)
@bp.route('/static/')
def send_static(path):
return send_from_directory('static/', path)
@bp.route('/guest_login', methods=['POST'])
@limiter.limit("5 / hour")
def guest_login():
for name, ques, hint, ans in sample_question(C.verify_questions):
if request.form.get(name) != ans:
return WRONG_ANS_HTML, 401
session['username'] = 'guest<%s>' % session['uid']
session.pop('avatar', None)
session.permanent = True
return redirect(request.referrer)
@bp.route('/mast_auth')
def mast_login_auth():
code = request.args.get('code')
client = Mastodon(
client_id=C.mast_client_id,
client_secret=C.mast_client_sec,
api_base_url=C.mast_base_uri
)
client.log_in(code=code, redirect_uri=C.mast_redirect_uri,
scopes=['read:accounts'])
info = client.account_verify_credentials()
session['username'] = info.acct
session['avatar'] = info.avatar
session.permanent = True
return redirect('.')
@bp.route('/logout')
def logout():
session.pop('username', None)
session.pop('avatar', None)
return redirect('.')
@bp.route('/')
@cache.cached(timeout=10)
def home():
stories = Story.query.order_by(Story.total_like_num.desc()).all()
return render_template('home.html', **locals())
@bp.route('/')
def story(story_id):
story = Story.query.get_or_404(story_id)
is_tree = story.is_tree
draft = ''
if is_tree:
tail = request.args.get('tail', story.tail, type=int)
p_tail = Paragraph.query.get_or_404(tail)
if p_tail.story_id != story_id:
abort(404)
paragraph_part = [p_tail]
p = p_tail
while p.parent_id:
p = Paragraph.query.get_or_404(p.parent_id)
paragraph_part.insert(0, p)
else:
tail = story.tail
paragraph_part = Paragraph.query.filter_by(
story_id=story_id, is_chosen=True, is_hidden=False
).all()
if 'username' in session:
last_post = Paragraph.query.filter_by(
story_id=story_id, author=session['username'], is_chosen=False
).order_by(Paragraph.id.desc()).first()
if last_post and last_post.parent_id != tail:
draft = last_post.text
sort_by = request.args.get('sort_by', 'time')
q = Paragraph.query.filter_by(story_id=story_id, parent_id=tail, is_hidden=False)
q = q.order_by(Paragraph.like_num.desc() if sort_by == 'like' else
Paragraph.id.desc())
pagination = q.paginate(max_per_page=100)
username = session.get('username', '')
avatar = session.get('avatar', C.guest_avatar)
cs_login_url = MAST_LOGIN_URL
guest_login_url = url_for('main_bp.guest_login')
verify_questions = sample_question(C.verify_questions)
min_like_num = MIN_LIKE_NUM
email = C.email
return render_template('story.html', **locals())
@bp.route('/create', methods=['POST'])
@login_required
@limiter.limit("66 / hour")
def create():
story_id = request.form.get('story-id')
text = request.form.get('text')
if not text or len(text) > 140:
abort(422)
story = Story.query.get_or_404(story_id)
if story.is_tree:
parent_id = request.form.get('tail', type=int)
else:
parent_id = story.tail
p = Paragraph(
parent_id=parent_id,
story_id=story_id,
text=text,
author=session['username']
)
db.session.add(p)
db.session.commit()
return redirect(story_id)
@bp.route('/react', methods=['POST'])
@limiter.limit("100 / minute")
def react():
kind = request.form.get('kind', type=int)
pid = request.form.get('pid', type=int)
if kind not in range(1, 6):
abort(422)
p = Paragraph.query.get_or_404(pid)
d = dict(kind=kind, user=session['uid'], pid=pid)
if Reaction.query.filter_by(**d).first():
return ''
db.session.add(Reaction(**d))
n = ''
if kind == 1:
p.like_num += 1
n = p.like_num
elif kind == 2:
p.angry_num += 1
n = p.angry_num
elif kind == 3:
p.fun_num += 1
n = p.fun_num
elif kind == 4:
p.sweat_num += 1
n = p.sweat_num
db.session.commit()
return str(n)
app.register_blueprint(bp)
if __name__ == '__main__':
app.run(debug=True)