# -*- coding: utf-8 -*-
|
|
from datetime import datetime
|
|
from functools import wraps
|
|
from flask import (Flask, request, render_template, send_from_directory, abort,
|
|
redirect, session, Blueprint, url_for)
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_limiter import Limiter
|
|
from flask_limiter.util import get_remote_address
|
|
from mastodon import Mastodon
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
import random
|
|
import atexit
|
|
from config import C
|
|
WRONG_ANS_HTML = '''<html>
|
|
<head>
|
|
<meta charset='UTF-8'>
|
|
<meta name='viewport' content='width=device-width initial-scale=1'>
|
|
<title>错误</title>
|
|
</head>
|
|
<body>
|
|
<h1>验证问题回答错误</h1>
|
|
<a href="##" onclick="window.history.back()">回退</a>
|
|
</body>
|
|
</html>
|
|
'''
|
|
|
|
app = Flask(__name__)
|
|
app.config.from_object('config.C')
|
|
app.secret_key = C.session_key
|
|
|
|
|
|
MAST_LOGIN_URL = Mastodon(api_base_url=C.mast_base_uri).auth_request_url(
|
|
client_id = C.mast_client_id,
|
|
redirect_uris = C.mast_redirect_uri, scopes = ['read:accounts']
|
|
)
|
|
|
|
limiter = Limiter(
|
|
app,
|
|
key_func=get_remote_address,
|
|
default_limits=["50 / minute"],
|
|
)
|
|
|
|
db = SQLAlchemy(app)
|
|
|
|
|
|
class Story(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
title = db.Column(db.String(30))
|
|
avatar = db.Column(db.String(128))
|
|
text = db.Column(db.Text)
|
|
tail = db.Column(db.Integer) # 最后一个 Paragraph 的 id
|
|
total_like_num = db.Column(db.Integer, default=0)
|
|
is_tree = db.Column(db.Boolean, default=False)
|
|
|
|
def text_abstract(self, limit=200):
|
|
return self.text[:limit] + ("..." if len(self.text) > limit else "")
|
|
|
|
|
|
class Paragraph(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
parent_id = db.Column(db.Integer, default=0, index=True)
|
|
story_id = db.Column(db.Integer, index=True)
|
|
is_hidden = db.Column(db.Boolean, default=False)
|
|
is_chosen = db.Column(db.Boolean, default=False)
|
|
text = db.Column(db.Text)
|
|
author = db.Column(db.String(30))
|
|
time = db.Column(db.DateTime, default=datetime.now)
|
|
like_num = db.Column(db.Integer, default=0, index=True)
|
|
angry_num = db.Column(db.Integer, default=0)
|
|
fun_num = db.Column(db.Integer, default=0)
|
|
sweat_num = db.Column(db.Integer, default=0)
|
|
|
|
@property
|
|
def create_at(self):
|
|
return self.time.strftime("%m-%d %H:%M")
|
|
|
|
def reaction_status(self):
|
|
user = session.get('username')
|
|
return list(zip(
|
|
'👍😡🤣😅👎',
|
|
[self.like_num, self.angry_num, self.fun_num, self.sweat_num, 0],
|
|
[
|
|
user and bool(Reaction.query.filter_by(pid=self.id, user=user, kind=i).first())
|
|
for i in range(1, 6)
|
|
],
|
|
range(1, 6)
|
|
))
|
|
|
|
|
|
class Reaction(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
kind = db.Column(db.SmallInteger) # 1: like 2: angry 3: funny 4: sweat 5:dislike
|
|
pid = db.Column(db.Integer, index=True) # id of paragraph
|
|
user = db.Column(db.String(30)) # username of user
|
|
|
|
|
|
def choose_new_next(min_like_num=10):
|
|
for story in Story.query.filter_by(is_tree=False).all():
|
|
last_paragraph_id = story.tail
|
|
next_one = Paragraph.query.filter_by(parent_id=last_paragraph_id, is_hidden=False)\
|
|
.order_by(Paragraph.like_num.desc()).first()
|
|
print(next_one)
|
|
if next_one and next_one.like_num >= min_like_num:
|
|
print(next_one, next_one.like_num)
|
|
|
|
story.text += next_one.text
|
|
story.total_like_num += next_one.like_num
|
|
story.tail = next_one.id
|
|
next_one.is_chosen = True
|
|
|
|
# 更新 story!
|
|
db.session.commit()
|
|
|
|
scheduler = BackgroundScheduler()
|
|
for d in range(5, 15):
|
|
for m in range(0, 24 * 60, C.period):
|
|
scheduler.add_job(func=choose_new_next, trigger='date', run_date=datetime(2021, 11, d, m // 60, m % 60), args=[1] if C.debug else [])
|
|
scheduler.start()
|
|
|
|
# Shut down the scheduler when exiting the app
|
|
atexit.register(lambda: scheduler.shutdown())
|
|
|
|
|
|
def sample_question(qs, n=3):
|
|
random.seed(session['uid'])
|
|
return random.sample(qs, n)
|
|
|
|
|
|
def login_required(func):
|
|
@wraps(func)
|
|
def warp(*args, **kwargs):
|
|
if not session.get('username'):
|
|
abort(403)
|
|
return func(*args, **kwargs)
|
|
return warp
|
|
|
|
|
|
bp = Blueprint('main_bp', __name__, url_prefix='/ordinary')
|
|
|
|
|
|
@bp.before_request
|
|
def set_uid():
|
|
if 'uid' not in session:
|
|
session['uid'] = random.randint(0, 10000000)
|
|
|
|
|
|
@bp.route('/static/<path:path>')
|
|
def send_static(path):
|
|
return send_from_directory('static/', path)
|
|
|
|
|
|
@bp.route('/guest_login', methods=['POST'])
|
|
@limiter.limit("5 / hour")
|
|
def guest_login():
|
|
for name, ques, hint, ans in sample_question(C.verify_questions):
|
|
if request.form.get(name) != ans:
|
|
return WRONG_ANS_HTML, 401
|
|
|
|
session['username'] = 'guest<%s>' % session['uid']
|
|
session.pop('avatar', None)
|
|
session.permanent = True
|
|
return redirect(request.referrer)
|
|
|
|
|
|
@bp.route('/mast_auth')
|
|
def mast_login_auth():
|
|
code = request.args.get('code')
|
|
client = Mastodon(
|
|
client_id=C.mast_client_id,
|
|
client_secret=C.mast_client_sec,
|
|
api_base_url=C.mast_base_uri
|
|
)
|
|
client.log_in(code=code, redirect_uri=C.mast_redirect_uri,
|
|
scopes=['read:accounts'])
|
|
info = client.account_verify_credentials()
|
|
session['username'] = info.acct
|
|
session['avatar'] = info.avatar
|
|
session.permanent = True
|
|
return redirect('.')
|
|
|
|
|
|
@bp.route('/logout')
|
|
def logout():
|
|
session.pop('username', None)
|
|
session.pop('avatar', None)
|
|
return redirect('.')
|
|
|
|
|
|
@bp.route('/')
|
|
def home():
|
|
stories = Story.query.order_by(Story.total_like_num.desc()).all()
|
|
return render_template('home.html', **locals())
|
|
|
|
|
|
@bp.route('/<int:story_id>')
|
|
def story(story_id):
|
|
story = Story.query.get_or_404(story_id)
|
|
is_tree = story.is_tree
|
|
|
|
paragraph_part = Paragraph.query.filter_by(
|
|
story_id=story_id, is_chosen=True, is_hidden=False
|
|
).all()
|
|
|
|
sort_by = request.args.get('sort_by', 'time')
|
|
|
|
q = Paragraph.query.filter_by(parent_id=story.tail, is_hidden=False)
|
|
q = q.order_by(Paragraph.like_num.desc() if sort_by == 'like' else
|
|
Paragraph.id.desc())
|
|
pagination = q.paginate(max_per_page=100)
|
|
|
|
username = session.get('username', '')
|
|
avatar = session.get('avatar', C.guest_avatar)
|
|
cs_login_url = MAST_LOGIN_URL
|
|
guest_login_url = url_for('main_bp.guest_login')
|
|
verify_questions = sample_question(C.verify_questions)
|
|
period = C.period
|
|
|
|
return render_template('story.html', **locals())
|
|
|
|
|
|
@bp.route('/create', methods=['POST'])
|
|
@login_required
|
|
@limiter.limit("66 / hour")
|
|
def create():
|
|
story_id = request.form.get('story-id')
|
|
text = request.form.get('text')
|
|
if not text or len(text) > 140:
|
|
abort(422)
|
|
story = Story.query.get_or_404(story_id)
|
|
|
|
p = Paragraph(
|
|
parent_id=story.tail,
|
|
story_id=story_id,
|
|
text=text,
|
|
author=session['username']
|
|
)
|
|
db.session.add(p)
|
|
db.session.commit()
|
|
|
|
return redirect(story_id)
|
|
|
|
|
|
@bp.route('/react', methods=['POST'])
|
|
@login_required
|
|
@limiter.limit("100 / minute")
|
|
def react():
|
|
kind = request.form.get('kind', type=int)
|
|
pid = request.form.get('pid', type=int)
|
|
if kind not in range(1, 6):
|
|
abort(422)
|
|
p = Paragraph.query.get_or_404(pid)
|
|
|
|
d = dict(kind=kind, user=session['username'], pid=pid)
|
|
if Reaction.query.filter_by(**d).first():
|
|
return ''
|
|
db.session.add(Reaction(**d))
|
|
|
|
n = ''
|
|
if kind == 1:
|
|
p.like_num += 1
|
|
n = p.like_num
|
|
elif kind == 2:
|
|
p.angry_num += 1
|
|
n = p.angry_num
|
|
elif kind == 3:
|
|
p.fun_num += 1
|
|
n = p.fun_num
|
|
elif kind == 4:
|
|
p.sweat_num += 1
|
|
n = p.sweat_num
|
|
|
|
db.session.commit()
|
|
|
|
return str(n)
|
|
|
|
|
|
@bp.route('/choose')
|
|
def choose_next():
|
|
min_like = request.args.get('min_like', type=int)
|
|
choose_new_next(min_like)
|
|
return 'ok'
|
|
|
|
app.register_blueprint(bp)
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True)
|