You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

286 lines
8.9 KiB

# There're two ways to run this app:
#
# run the web
# $ export FLASK_APP=app.py
# $ flask run
#
# create or refresh the database
# $ python3 app.py
from flask import Flask, request, render_template, send_from_directory, abort, redirect, session, url_for, send_file
from flask_sqlalchemy import SQLAlchemy
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import ipfshttpclient
from mastodon import Mastodon
from datetime import date, datetime
from functools import wraps
import hashlib
from zipfile import ZipFile
import random
import os
import re
from config import C
app = Flask(__name__)
app.config.from_object('config.C')
app.secret_key = C.session_key
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["50 / minute"],
)
db = SQLAlchemy(app)
ipfs_client = ipfshttpclient.connect()
MAST_LOGIN_URL = Mastodon(api_base_url=C.mast_base_uri) \
.auth_request_url(
client_id = C.mast_client_id,
redirect_uris = C.mast_redirect_uri,
scopes = ['read:accounts']
)
class Paper(db.Model):
id = db.Column(db.Integer, primary_key=True)
course = db.Column(db.String(30), index=True)
teacher = db.Column(db.String(30), index=True)
year = db.Column(db.Integer, index=True)
author = db.Column(db.String(30), index=True)
notes = db.Column(db.String(200))
anon = db.Column(db.Boolean)
create_date = db.Column(db.Date)
like_num = db.Column(db.Integer, index=True, default=0)
down_num = db.Column(db.Integer, index=True, default=0)
file_hash = db.Column(db.String(64))
# always increment 1 for the id of a new record
__table_args__ = { 'sqlite_autoincrement': True }
def is_downloaded(self):
return bool(DownloadRelation.query.filter_by(paper_id=self.id, username=session.get('username')).count())
def is_liked(self):
return bool(LikeRelation.query.filter_by(paper_id=self.id, username=session.get('username')).count())
class DownloadRelation(db.Model):
id = db.Column(db.Integer, primary_key=True)
paper_id = db.Column(db.Integer, index=True)
username = db.Column(db.String(30), index=True)
class LikeRelation(db.Model):
id = db.Column(db.Integer, primary_key=True)
paper_id = db.Column(db.Integer, index=True)
username = db.Column(db.String(30), index=True)
if __name__ == "__main__":
db.create_all()
@app.route('/pastExam/img/<path:path>')
def send_img(path):
return send_from_directory('static/img', path)
def login_required(allow_guest=True):
def login_required_instance(f):
@wraps(f)
def df(*args, **kwargs):
username = session.get('username')
if not username or (not allow_guest and username.startswith('guest<')):
return redirect(url_for('login'))
return f(*args, **kwargs, username=username)
return df
return login_required_instance
@app.route('/pastExam/login/')
def login():
return app.send_static_file('login/index.html')
@app.route('/pastExam/logout')
def logout():
session.pop('username', None)
return redirect('login')
@app.route('/pastExam/login/guest/')
def guest_login():
return render_template('guest-login.html', vs=C.verify, allow_guest_upload=C.allow_guest_upload)
@app.route('/pastExam/login/guest/verify', methods=['POST'])
@limiter.limit("5 / hour")
def guest_login_verify():
for name, ques, hint, ans in C.verify:
if request.form.get(name) != ans:
return '错误!', 401
if 'uid' not in session:
session['uid'] = random.randint(0, 10000000)
session['username'] = 'guest<%s>' % session['uid']
session.pop('avatar', None)
session.permanent = True
return {'r':0}
@app.route('/pastExam/login/mast/')
def mast_login():
return redirect(MAST_LOGIN_URL)
@app.route('/pastExam/login/mast/auth')
def mast_login_auth():
code = request.args.get('code')
client = Mastodon(
client_id=C.mast_client_id,
client_secret=C.mast_client_sec,
api_base_url=C.mast_base_uri
)
token = client.log_in(code=code, redirect_uri=C.mast_redirect_uri,scopes=['read:accounts'])
info = client.account_verify_credentials()
session['username'] = info.acct
session['avatar'] = info.avatar
session.permanent = True
return redirect(url_for('list'))
@app.route('/pastExam/')
@login_required()
def list(username):
avatar = session.get('avatar', C.guest_avatar)
course = request.args.get('course')
teacher = request.args.get('teacher')
year = request.args.get('year')
year = year and year.isdigit() and int(year)
is_my_upload = request.args.get('my_upload')
is_my_fav = request.args.get('my_fav')
has_course = course is not None
has_teacher = teacher is not None
has_year = year is not None
ept = not (has_course or has_teacher or has_year) and is_my_fav is None and is_my_upload is None and 'page' not in request.args
ps = Paper.query
if course:
ps = ps.filter_by(course=course)
if teacher:
ps = ps.filter_by(teacher=teacher)
if year or year==0:
ps = ps.filter_by(year=year)
if is_my_upload:
ps = ps.filter_by(author=username)
if is_my_fav:
ps = ps.join(LikeRelation, Paper.id==LikeRelation.paper_id).filter(LikeRelation.username==username)
ps = ps.order_by(db.desc('like_num'))
pagination = ps.paginate(max_per_page=100)
curr_year = date.today().year
all_courses = [i for i, in db.session.query(Paper.course.distinct()).all()]
all_teachers = [i for i, in db.session.query(Paper.teacher.distinct()).all()]
all_years = [i for i, in db.session.query(Paper.year.distinct()).all()]
ipfs_version = hashlib.sha256(C.ipfs_base_url.encode('utf-8')).hexdigest()
disable_upload = not C.allow_guest_upload and username.startswith('guest<')
return render_template('list.html', **locals())
def check_length(x, limit=30, allow_null=False):
return (x and len(x) <= limit) or (allow_null and not x)
@app.route('/pastExam/upload', methods=['POST'])
@limiter.limit("10 / hour")
@login_required(allow_guest=C.allow_guest_upload)
def upload(username):
name = request.form.get('name')
teacher = request.form.get('teacher')
year = request.form.get('year')
year = year and year.isdigit() and int(year) or 0
notes = request.form.get('notes', '')
anon = request.form.get('anon') == 'on'
if not (check_length(name) and check_length(teacher) and check_length(notes, 200, True)):
abort(422)
files = request.files.getlist('files[]')
dir_name = username + str(datetime.now())
base_path = os.path.join('/tmp', dir_name)
os.mkdir(base_path)
for f in files:
filename = f.filename.replace('/','_')
f.save(os.path.join(base_path, filename))
res = ipfs_client.add(base_path)
file_hash = ''
for r in res:
if r.get('Name') == dir_name:
file_hash = r.get('Hash')
if not file_hash:
abort(500)
paper = Paper(
course=name,
teacher=teacher,
year=year,
notes=notes,
anon=anon,
author=username,
create_date=date.today(),
file_hash=file_hash
)
db.session.add(paper)
db.session.commit()
return redirect('.#part2')
@app.route('/pastExam/<pid>/download')
@limiter.limit("100 / hour")
@login_required()
def download(pid, username):
p = Paper.query.get_or_404(pid)
r = DownloadRelation.query.filter_by(paper_id=pid, username=username).count()
if not r:
r = DownloadRelation(paper_id=pid, username=username)
db.session.add(r)
p.down_num += 1
db.session.commit()
if request.args.get('type') == 'zip':
target_file = '/tmp/%s.zip' % pid
if not os.path.exists(target_file):
ipfs_client.get(p.file_hash, target='/tmp')
with ZipFile(target_file, 'w') as z:
for fname in os.listdir(os.path.join('/tmp', p.file_hash)):
z.write(os.path.join('/tmp', p.file_hash, fname), fname)
filename = re.sub('[^\w@_()()-]', '_', '%s_%s_共享计划_%d' %(p.course, p.teacher, p.id)) + '.zip'
return send_file(target_file, as_attachment=True, attachment_filename=filename)
return redirect(C.ipfs_base_url + p.file_hash, code=301) # 301减少不必要的请求
@app.route('/pastExam/<pid>/like', methods=['POST', 'DELETE'])
@limiter.limit("100 / hour")
@login_required()
def like(pid, username):
p = Paper.query.get_or_404(pid)
r = LikeRelation.query.filter_by(paper_id=pid, username=username).first()
if request.method == 'POST':
if not r:
r = LikeRelation(paper_id=p.id, username=username)
db.session.add(r)
p.like_num += 1
db.session.commit()
else:
if r:
db.session.delete(r)
p.like_num -= 1
db.session.commit()
return str(p.like_num)