from flask import Flask, request, render_template, send_from_directory, abort, redirect, session, url_for
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_limiter import Limiter
|
|
from flask_limiter.util import get_remote_address
|
|
|
|
import ipfshttpclient
|
|
from mastodon import Mastodon
|
|
|
|
from datetime import date, datetime
|
|
from functools import wraps
|
|
import hashlib
|
|
import random
|
|
import os
|
|
from config import C
|
|
|
|
app = Flask(__name__)
|
|
app.config.from_object('config.C')
|
|
app.secret_key = C.session_key
|
|
|
|
limiter = Limiter(
|
|
app,
|
|
key_func=get_remote_address,
|
|
default_limits=["50 / minute"],
|
|
)
|
|
|
|
db = SQLAlchemy(app)
|
|
|
|
ipfs_client = ipfshttpclient.connect()
|
|
|
|
MAST_LOGIN_URL = Mastodon(api_base_url=C.mast_base_uri) \
|
|
.auth_request_url(
|
|
client_id = C.mast_client_id,
|
|
redirect_uris = C.mast_redirect_uri,
|
|
scopes = ['read:accounts']
|
|
)
|
|
|
|
class Paper(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
course = db.Column(db.String(30), index=True)
|
|
teacher = db.Column(db.String(30), index=True)
|
|
year = db.Column(db.Integer, index=True)
|
|
author = db.Column(db.String(30), index=True)
|
|
notes = db.Column(db.String(200))
|
|
anon = db.Column(db.Boolean)
|
|
create_date = db.Column(db.Date)
|
|
like_num = db.Column(db.Integer, index=True, default=0)
|
|
down_num = db.Column(db.Integer, index=True, default=0)
|
|
file_hash = db.Column(db.String(64))
|
|
|
|
if __name__ == "__main__":
|
|
db.create_all()
|
|
|
|
def login_required(allow_guest=True):
|
|
def login_required_instance(f):
|
|
@wraps(f)
|
|
def df(*args, **kwargs):
|
|
username = session.get('username')
|
|
if not username or (not allow_guest and username.startswith('guest<')):
|
|
return redirect(url_for('login'))
|
|
return f(*args, **kwargs, username=username)
|
|
return df
|
|
return login_required_instance
|
|
|
|
@app.route('/pastExam/login/')
|
|
def login():
|
|
return app.send_static_file('login/index.html')
|
|
|
|
@app.route('/pastExam/login/guest/')
|
|
def guest_login():
|
|
return render_template('guest-login.html', vs=C.verify, allow_guest_upload=C.allow_guest_upload)
|
|
|
|
@app.route('/pastExam/login/guest/verify', methods=['POST'])
|
|
@limiter.limit("10 / hour")
|
|
def guest_login_verify():
|
|
for name, ques, hint, ans in C.verify:
|
|
if request.form.get(name) != ans:
|
|
return '错误!', 401
|
|
|
|
if 'uid' not in session:
|
|
session['uid'] = random.randint(0, 10000000)
|
|
|
|
session['username'] = 'guest<%s>' % session['uid']
|
|
session.permanent = True
|
|
return {'r':0}
|
|
|
|
@app.route('/pastExam/login/mast/')
|
|
def mast_login():
|
|
return redirect(MAST_LOGIN_URL)
|
|
|
|
@app.route('/pastExam/login/mast/auth')
|
|
def mast_login_auth():
|
|
code = request.args.get('code')
|
|
client = Mastodon(
|
|
client_id=C.mast_client_id,
|
|
client_secret=C.mast_client_sec,
|
|
api_base_url=C.mast_base_uri
|
|
)
|
|
token = client.log_in(code=code, redirect_uri=C.mast_redirect_uri,scopes=['read:accounts'])
|
|
info = client.account_verify_credentials()
|
|
session['username'] = info.acct
|
|
|
|
return redirect(url_for('list'))
|
|
|
|
@app.route('/pastExam/')
|
|
@login_required()
|
|
def list(username):
|
|
course = request.args.get('course')
|
|
teacher = request.args.get('teacher')
|
|
year = request.args.get('year')
|
|
year = year and year.isdigit() and int(year)
|
|
|
|
has_course = course is not None
|
|
has_teacher = teacher is not None
|
|
has_year = year is not None
|
|
ept = not (has_course or has_teacher or has_year) and 'page' not in request.args
|
|
|
|
ps = Paper.query
|
|
|
|
if course:
|
|
ps = ps.filter_by(course=course)
|
|
if teacher:
|
|
ps = ps.filter_by(teacher=teacher)
|
|
if year or year==0:
|
|
ps = ps.filter_by(year=year)
|
|
|
|
ps = ps.order_by(db.desc('like_num'))
|
|
pagination = ps.paginate(max_per_page=100)
|
|
curr_year = date.today().year
|
|
all_courses = [i for i, in db.session.query(Paper.course.distinct()).all()]
|
|
all_teachers = [i for i, in db.session.query(Paper.teacher.distinct()).all()]
|
|
all_years = [i for i, in db.session.query(Paper.year.distinct()).all()]
|
|
ipfs_version = hashlib.sha256(C.ipfs_base_url.encode('utf-8')).hexdigest()
|
|
disable_upload = not C.allow_guest_upload and username.startswith('guest<')
|
|
return render_template('list.html', **locals())
|
|
|
|
def check_length(x, limit=30, allow_null=False):
|
|
return (x and len(x) <= limit) or (allow_null and not x)
|
|
|
|
@app.route('/pastExam/upload', methods=['POST'])
|
|
@limiter.limit("10 / hour")
|
|
@login_required(allow_guest=C.allow_guest_upload)
|
|
def upload(username):
|
|
name = request.form.get('name')
|
|
teacher = request.form.get('teacher')
|
|
year = request.form.get('year')
|
|
year = year and year.isdigit() and int(year) or 0
|
|
notes = request.form.get('notes', '')
|
|
anon = request.form.get('anon') == 'on'
|
|
|
|
if not (check_length(name) and check_length(teacher) and check_length(notes, 200, True)):
|
|
abort(422)
|
|
|
|
files = request.files.getlist('files[]')
|
|
|
|
dir_name = username + str(datetime.now())
|
|
base_path = os.path.join('/tmp', dir_name)
|
|
os.mkdir(base_path)
|
|
for f in files:
|
|
filename = f.filename.replace('/','_')
|
|
f.save(os.path.join(base_path, filename))
|
|
|
|
res = ipfs_client.add(base_path)
|
|
file_hash = ''
|
|
for r in res:
|
|
if r.get('Name') == dir_name:
|
|
file_hash = r.get('Hash')
|
|
|
|
if not file_hash:
|
|
abort(500)
|
|
|
|
paper = Paper(
|
|
course=name,
|
|
teacher=teacher,
|
|
year=year,
|
|
notes=notes,
|
|
anon=anon,
|
|
author=username,
|
|
create_date=date.today(),
|
|
file_hash=file_hash
|
|
)
|
|
db.session.add(paper)
|
|
db.session.commit()
|
|
|
|
return redirect('.#part2')
|
|
|
|
@app.route('/pastExam/<pid>/download')
|
|
@login_required()
|
|
def download(pid, username):
|
|
p = Paper.query.get_or_404(pid)
|
|
# TODO: download number
|
|
return redirect(C.ipfs_base_url + p.file_hash, code=301) # 301减少不必要的请求
|
|
|
|
# TODO like
|