# There're two ways to run this app:
|
|
#
|
|
# run the web
|
|
# $ export FLASK_APP=app.py
|
|
# $ flask run
|
|
#
|
|
# create or refresh the database
|
|
# $ python3 app.py
|
|
|
|
from flask import Flask, request, render_template, send_from_directory, abort, redirect, session, url_for, send_file, flash
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from sqlalchemy import func
|
|
from flask_limiter import Limiter
|
|
from flask_limiter.util import get_remote_address
|
|
|
|
import ipfshttpclient
|
|
from mastodon import Mastodon
|
|
|
|
import pypinyin
|
|
|
|
from string import ascii_uppercase as AZ
|
|
from datetime import date, datetime
|
|
from functools import wraps
|
|
import hashlib
|
|
import shutil
|
|
import random
|
|
import os
|
|
import re
|
|
from config import C
|
|
|
|
app = Flask(__name__)
|
|
app.config.from_object('config.C')
|
|
app.secret_key = C.session_key
|
|
|
|
limiter = Limiter(
|
|
app,
|
|
key_func=get_remote_address,
|
|
default_limits=["50 / minute"],
|
|
)
|
|
|
|
db = SQLAlchemy(app)
|
|
|
|
ipfs_client = ipfshttpclient.connect()
|
|
ipfs_info = ipfs_client.id()
|
|
|
|
MAST_LOGIN_URL = Mastodon(api_base_url=C.mast_base_uri) \
|
|
.auth_request_url(
|
|
client_id = C.mast_client_id,
|
|
redirect_uris = C.mast_redirect_uri,
|
|
scopes = ['read:accounts']
|
|
)
|
|
|
|
class Paper(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
course = db.Column(db.String(30), index=True)
|
|
teacher = db.Column(db.String(30), index=True)
|
|
year = db.Column(db.Integer, index=True)
|
|
author = db.Column(db.String(30), index=True)
|
|
notes = db.Column(db.String(200))
|
|
anon = db.Column(db.Boolean)
|
|
create_date = db.Column(db.Date)
|
|
like_num = db.Column(db.Integer, index=True, default=0)
|
|
down_num = db.Column(db.Integer, index=True, default=0)
|
|
file_hash = db.Column(db.String(64))
|
|
|
|
# always increment 1 for the id of a new record
|
|
__table_args__ = { 'sqlite_autoincrement': True }
|
|
|
|
def is_downloaded(self):
|
|
return bool(DownloadRelation.query.filter_by(paper_id=self.id, username=session.get('username')).count())
|
|
|
|
def is_liked(self):
|
|
return bool(LikeRelation.query.filter_by(paper_id=self.id, username=session.get('username')).count())
|
|
|
|
class DownloadRelation(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
paper_id = db.Column(db.Integer, index=True)
|
|
username = db.Column(db.String(30), index=True)
|
|
|
|
class LikeRelation(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
paper_id = db.Column(db.Integer, index=True)
|
|
username = db.Column(db.String(30), index=True)
|
|
|
|
if __name__ == "__main__":
|
|
db.create_all()
|
|
|
|
@app.route('/pastExam/img/<path:path>')
|
|
def send_img(path):
|
|
return send_from_directory('static/img', path)
|
|
|
|
def login_required(allow_guest=True):
|
|
def login_required_instance(f):
|
|
@wraps(f)
|
|
def df(*args, **kwargs):
|
|
username = session.get('username')
|
|
if not username or (not allow_guest and username.startswith('guest<')):
|
|
return redirect(url_for('login'))
|
|
return f(*args, **kwargs, username=username)
|
|
return df
|
|
return login_required_instance
|
|
|
|
@app.route('/pastExam/login/')
|
|
def login():
|
|
return app.send_static_file('login/index.html')
|
|
|
|
@app.route('/pastExam/logout')
|
|
def logout():
|
|
session.pop('username', None)
|
|
return redirect('login')
|
|
|
|
@app.route('/pastExam/login/guest/')
|
|
def guest_login():
|
|
return render_template('guest-login.html', vs=C.verify, allow_guest_upload=C.allow_guest_upload)
|
|
|
|
@app.route('/pastExam/login/guest/verify', methods=['POST'])
|
|
@limiter.limit("5 / hour")
|
|
def guest_login_verify():
|
|
for name, ques, hint, ans in C.verify:
|
|
if request.form.get(name) != ans:
|
|
return '错误!', 401
|
|
|
|
if 'uid' not in session:
|
|
session['uid'] = random.randint(0, 10000000)
|
|
|
|
session['username'] = 'guest<%s>' % session['uid']
|
|
session.pop('avatar', None)
|
|
session.permanent = True
|
|
return {'r':0}
|
|
|
|
@app.route('/pastExam/login/mast/')
|
|
def mast_login():
|
|
return redirect(MAST_LOGIN_URL)
|
|
|
|
@app.route('/pastExam/login/mast/auth')
|
|
def mast_login_auth():
|
|
code = request.args.get('code')
|
|
client = Mastodon(
|
|
client_id=C.mast_client_id,
|
|
client_secret=C.mast_client_sec,
|
|
api_base_url=C.mast_base_uri
|
|
)
|
|
token = client.log_in(code=code, redirect_uri=C.mast_redirect_uri,scopes=['read:accounts'])
|
|
info = client.account_verify_credentials()
|
|
session['username'] = info.acct
|
|
session['avatar'] = info.avatar
|
|
session.permanent = True
|
|
|
|
return redirect(url_for('list'))
|
|
|
|
def by_abc(l):
|
|
d = {
|
|
letter: [] for letter in AZ+'/'
|
|
}
|
|
for item in l:
|
|
k = pypinyin.lazy_pinyin(item[0], style=pypinyin.Style.FIRST_LETTER, errors=lambda x:x[0])[0].upper()
|
|
if k not in AZ:
|
|
k = '/'
|
|
d[k].append(item)
|
|
|
|
return {k: v for k, v in d.items() if v}
|
|
|
|
|
|
@app.route('/pastExam/')
|
|
@login_required()
|
|
def list(username):
|
|
avatar = session.get('avatar', C.guest_avatar)
|
|
course = request.args.get('course')
|
|
teacher = request.args.get('teacher')
|
|
year = request.args.get('year')
|
|
year = year and year.isdigit() and int(year)
|
|
|
|
is_my_upload = request.args.get('my_upload')
|
|
is_my_fav = request.args.get('my_fav')
|
|
|
|
has_course = course is not None
|
|
has_teacher = teacher is not None
|
|
has_year = year is not None
|
|
ept = not (has_course or has_teacher or has_year) and is_my_fav is None and is_my_upload is None and 'page' not in request.args
|
|
|
|
ps = Paper.query
|
|
|
|
if course:
|
|
ps = ps.filter_by(course=course)
|
|
if teacher:
|
|
ps = ps.filter_by(teacher=teacher)
|
|
if year or year==0:
|
|
ps = ps.filter_by(year=year)
|
|
|
|
if is_my_upload:
|
|
ps = ps.filter_by(author=username)
|
|
if is_my_fav:
|
|
ps = ps.join(LikeRelation, Paper.id==LikeRelation.paper_id).filter(LikeRelation.username==username)
|
|
|
|
|
|
ps = ps.order_by(db.desc('like_num'), db.desc('down_num'), db.desc('id'))
|
|
pagination = ps.paginate(max_per_page=100)
|
|
curr_year = date.today().year
|
|
all_courses = db.session.query(Paper.course, func.count()).group_by(Paper.course).all()
|
|
all_courses_abc = by_abc(all_courses)
|
|
all_teachers = db.session.query(Paper.teacher, func.count()).group_by(Paper.teacher).all()
|
|
all_years = db.session.query(Paper.year, func.count()).group_by(Paper.year).all()
|
|
ipfs_version = hashlib.sha256(C.ipfs_base_url.encode('utf-8')).hexdigest()
|
|
ipfs_id = ipfs_info.get('ID')
|
|
disable_upload = not C.allow_guest_upload and username.startswith('guest<')
|
|
contributor_link = C.contributor_link
|
|
return render_template('list.html', **locals())
|
|
|
|
def check_length(x, limit=30, allow_null=False):
|
|
return (x and len(x) <= limit) or (allow_null and not x)
|
|
|
|
@app.route('/pastExam/upload', methods=['POST'])
|
|
@limiter.limit("10 / hour")
|
|
@login_required(allow_guest=C.allow_guest_upload)
|
|
def upload(username):
|
|
name = request.form.get('name')
|
|
teacher = request.form.get('teacher')
|
|
year = request.form.get('year')
|
|
year = year and year.isdigit() and int(year) or 0
|
|
notes = request.form.get('notes', '')
|
|
anon = request.form.get('anon') == 'on'
|
|
|
|
if not (check_length(name) and check_length(teacher) and check_length(notes, 200, True)):
|
|
abort(422)
|
|
|
|
files = request.files.getlist('files[]')
|
|
print(files)
|
|
|
|
dir_name = username + str(datetime.now())
|
|
base_path = os.path.join('/tmp', dir_name)
|
|
os.mkdir(base_path)
|
|
for f in files:
|
|
filename = f.filename.replace('..','__')
|
|
os.makedirs(os.path.dirname(os.path.join(base_path, filename)), exist_ok=True)
|
|
f.save(os.path.join(base_path, filename))
|
|
|
|
res = ipfs_client.add(base_path, recursive=True)
|
|
file_hash = ''
|
|
for r in res:
|
|
if r.get('Name') == dir_name:
|
|
file_hash = r.get('Hash')
|
|
|
|
if not file_hash:
|
|
abort(500)
|
|
|
|
paper = Paper(
|
|
course=name,
|
|
teacher=teacher,
|
|
year=year,
|
|
notes=notes,
|
|
anon=anon,
|
|
author=username,
|
|
create_date=date.today(),
|
|
file_hash=file_hash
|
|
)
|
|
db.session.add(paper)
|
|
db.session.commit()
|
|
|
|
flash('上传成功')
|
|
return redirect('.#part2')
|
|
|
|
@app.route('/pastExam/<pid>/download')
|
|
@limiter.limit("100 / hour")
|
|
@login_required()
|
|
def download(pid, username):
|
|
p = Paper.query.get_or_404(pid)
|
|
|
|
r = DownloadRelation.query.filter_by(paper_id=pid, username=username).count()
|
|
|
|
if not r:
|
|
r = DownloadRelation(paper_id=pid, username=username)
|
|
db.session.add(r)
|
|
p.down_num += 1
|
|
db.session.commit()
|
|
|
|
dformat = request.args.get('format')
|
|
suff = {
|
|
'zip': '.zip',
|
|
'tar': '.tar',
|
|
'gztar': '.tar.gz'
|
|
}
|
|
|
|
if dformat in suff:
|
|
target_file = '/tmp/%s' % pid
|
|
target_filename = target_file + suff[dformat]
|
|
target_dir = os.path.join('/tmp', p.file_hash)
|
|
|
|
if not os.path.exists(target_filename):
|
|
if not os.path.exists(target_dir):
|
|
ipfs_client.get(p.file_hash, target='/tmp')
|
|
shutil.make_archive(target_file, dformat, target_dir)
|
|
|
|
filename = re.sub('[^\w@_()()-]', '_', '%s_%s_共享计划_%d' %(p.course, p.teacher, p.id)) + suff[dformat]
|
|
return send_file(target_filename, as_attachment=True, attachment_filename=filename)
|
|
|
|
return redirect(C.ipfs_base_url + p.file_hash, code=301) # 301减少不必要的请求
|
|
|
|
@app.route('/pastExam/<pid>/like', methods=['POST', 'DELETE'])
|
|
@limiter.limit("100 / hour")
|
|
@login_required()
|
|
def like(pid, username):
|
|
p = Paper.query.get_or_404(pid)
|
|
r = LikeRelation.query.filter_by(paper_id=pid, username=username).first()
|
|
|
|
if request.method == 'POST':
|
|
if not r:
|
|
r = LikeRelation(paper_id=p.id, username=username)
|
|
db.session.add(r)
|
|
p.like_num += 1
|
|
db.session.commit()
|
|
else:
|
|
if r:
|
|
db.session.delete(r)
|
|
p.like_num -= 1
|
|
db.session.commit()
|
|
|
|
return str(p.like_num)
|
|
|