%(best_regards)s
-
-%(sitename)s
-%(need_intervention_please_contact)s %(sitesupportemail)s
- """ % {
- 'sitename': CFG_SITE_NAME_INTL.get(ln, CFG_SITE_NAME),
- 'best_regards': _("Best regards"),
- 'siteurl': CFG_SITE_URL,
- 'need_intervention_please_contact': _("Need human intervention? Contact"),
- 'sitesupportemail': CFG_SITE_SUPPORT_EMAIL
- }
- return out
+
+def attach_embed_image(email, image_id, image_path):
+ """
+ Attach an image to the email.
+ """
+ with open(image_path, 'rb') as image_data:
+ img = MIMEImage(image_data.read())
+ img.add_header('Content-ID', '<%s>' % image_id)
+ img.add_header('Content-Disposition', 'attachment', filename=os.path.split(image_path)[1])
+ email.attach(img)
def forge_email(fromaddr, toaddr, subject, content, html_content='',
html_images=None, usebcc=False, header=None, footer=None,
html_header=None, html_footer=None, ln=CFG_SITE_LANG,
charset=None, replytoaddr="", attachments=None):
"""Prepare email. Add header and footer if needed.
@param fromaddr: [string] sender
@param toaddr: [string or list-of-strings] list of receivers (if string, then
receivers are separated by ',')
@param usebcc: [bool] True for using Bcc in place of To
@param subject: [string] subject of the email
@param content: [string] content of the email
@param html_content: [string] html version of the email
@param html_images: [dict] dictionary of image id, image path
@param header: [string] None for the default header
@param footer: [string] None for the default footer
@param ln: language
@charset: [string] the content charset. By default is None which means
to try to encode the email as ascii, then latin1 then utf-8.
@param replytoaddr: [string or list-of-strings] to be used for the
reply-to header of the email (if string, then
receivers are separated by ',')
@param attachments: list of paths of files to be attached. Alternatively,
every element of the list could be a tuple: (filename, mimetype)
@return: forged email as a string"""
if html_images is None:
html_images = {}
- if header is None:
- content = email_header(ln) + content
- else:
- content = header + content
- if footer is None:
- content += email_footer(ln)
- else:
- content += footer
-
- if charset is None:
- (content, content_charset) = guess_minimum_encoding(content)
- else:
- content_charset = charset
-
- try:
- subject = subject.encode('ascii')
- except (UnicodeEncodeError, UnicodeDecodeError):
- subject = Header(subject, 'utf-8')
-
- try:
- fromaddr = fromaddr.encode('ascii')
- except (UnicodeEncodeError, UnicodeDecodeError):
- fromaddr = Header(fromaddr, 'utf-8')
+ content = render_template_to_string('mail_text.tpl', content=content,
+ header=header, footer=footer)
if type(toaddr) is not str:
toaddr = ','.join(toaddr)
- try:
- toaddr = toaddr.encode('ascii')
- except (UnicodeEncodeError, UnicodeDecodeError):
- toaddr = Header(toaddr, 'utf-8')
if type(replytoaddr) is not str:
replytoaddr = ','.join(replytoaddr)
- try:
- replytoaddr = replytoaddr.encode('ascii')
- except (UnicodeEncodeError, UnicodeDecodeError):
- replytoaddr = Header(replytoaddr, 'utf-8')
toaddr = remove_temporary_emails(toaddr)
+ headers = {}
+ kwargs = {'to': [], 'cc': [], 'bcc': []}
+
+ if replytoaddr:
+ headers['Reply-To'] = replytoaddr
+ if usebcc:
+ headers['Bcc'] = toaddr
+ kwargs['bcc'] = toaddr.split(',')
+ kwargs['to'] = ['Undisclosed.Recipients:']
+ else:
+ kwargs['to'] = toaddr.split(',')
+ headers['From'] = fromaddr
+ headers['Date'] = formatdate(localtime=True)
+ headers['User-Agent'] = 'Invenio %s at %s' % (CFG_VERSION, CFG_SITE_URL)
+
if html_content:
- if html_header is None:
- html_content = email_html_header(ln) + html_content
- else:
- html_content = html_header + html_content
- if html_footer is None:
- html_content += email_html_footer(ln)
- else:
- html_content += html_footer
-
- if charset is None:
- (html_content, html_content_charset) = guess_minimum_encoding(html_content)
- else:
- html_content_charset = charset
-
- msg_root = MIMEMultipart('alternative')
- msg_root.preamble = 'This is a multi-part message in MIME format.'
-
- msg_text = MIMEText(content, _charset=content_charset)
- msg_root.attach(msg_text)
-
- msg_text = MIMEText(html_content, 'html', _charset=html_content_charset)
- if not html_images:
- # No image? Attach the HTML to the root
- msg_root.attach(msg_text)
- else:
+ html_content = render_template_to_string('mail_html.tpl',
+ content=html_content,
+ header=html_header,
+ footer=html_footer)
+
+ msg_root = EmailMultiAlternatives(subject=subject, body=content,
+ from_email=fromaddr,
+ headers=headers, **kwargs)
+ msg_root.attach_alternative(html_content, "text/html")
+
+ #if not html_images:
+ # # No image? Attach the HTML to the root
+ # msg_root.attach(msg_text)
+ #else:
+ if html_images:
# Image(s)? Attach the HTML and image(s) as children of a
# "related" block
msg_related = MIMEMultipart('related')
- msg_related.attach(msg_text)
+ #msg_related.attach(msg_text)
for image_id, image_path in html_images.iteritems():
- msg_image = MIMEImage(open(image_path, 'rb').read())
- msg_image.add_header('Content-ID', '<%s>' % image_id)
- msg_image.add_header('Content-Disposition', 'attachment', filename=os.path.split(image_path)[1])
- msg_related.attach(msg_image)
+ attach_embed_image(msg_related, image_id, image_path)
msg_root.attach(msg_related)
else:
- msg_root = MIMEText(content, _charset=content_charset)
+ msg_root = EmailMessage(subject=subject, body=content,
+ from_email=fromaddr, headers=headers, **kwargs)
if attachments:
from invenio.bibdocfile import _mimes, guess_format_from_url
- old_msg_root = msg_root
- msg_root = MIMEMultipart()
- msg_root.attach(old_msg_root)
+ #old_msg_root = msg_root
+ #msg_root = MIMEMultipart()
+ #msg_root.attach(old_msg_root)
for attachment in attachments:
try:
+ mime = None
if type(attachment) in (list, tuple):
attachment, mime = attachment
if mime is None:
## Automatic guessing of mimetype
mime = _mimes.guess_type(attachment)[0]
if mime is None:
ext = guess_format_from_url(attachment)
mime = _mimes.guess_type("foo" + ext)[0]
if not mime:
mime = 'application/octet-stream'
part = MIMEBase(*mime.split('/', 1))
part.set_payload(open(attachment, 'rb').read())
Encoders.encode_base64(part)
part.add_header('Content-Disposition', 'attachment; filename="%s"' % os.path.basename(attachment))
msg_root.attach(part)
except:
register_exception(alert_admin=True, prefix="Can't attach %s" % attachment)
- msg_root['From'] = fromaddr
- if replytoaddr:
- msg_root['Reply-To'] = replytoaddr
- if usebcc:
- msg_root['Bcc'] = toaddr
- msg_root['To'] = 'Undisclosed.Recipients:'
- else:
- msg_root['To'] = toaddr
- msg_root['Date'] = formatdate(localtime=True)
- msg_root['Subject'] = subject
- msg_root['User-Agent'] = 'Invenio %s at %s' % (CFG_VERSION, CFG_SITE_URL)
- return msg_root.as_string()
+ return msg_root
RE_NEWLINES = re.compile(r' |
', re.I)
RE_SPACES = re.compile(r'\s+')
RE_HTML_TAGS = re.compile(r'<.+?>')
+
def email_strip_html(html_content):
"""Strip html tags from html_content, trying to respect formatting."""
html_content = RE_SPACES.sub(' ', html_content)
html_content = RE_NEWLINES.sub('\n', html_content)
html_content = RE_HTML_TAGS.sub('', html_content)
html_content = html_content.split('\n')
out = StringIO()
out_format = AbstractFormatter(DumbWriter(out))
for row in html_content:
out_format.add_flowing_data(row)
out_format.end_paragraph(1)
return out.getvalue()
+
def remove_temporary_emails(emails):
"""
Removes the temporary emails (which are constructed randomly when user logs in
with an external authentication provider which doesn't supply an email
address) from an email list.
@param emails: email list (if string, then receivers are separated by ',')
@type emails: str|[str]
@rtype: str
"""
from invenio.access_control_config import CFG_TEMP_EMAIL_ADDRESS
if not isinstance(emails, (str, unicode)):
emails = ','.join(emails)
# Remove all of the spaces
emails = emails.replace(' ', '')
# Remove all of the emails formatted like CFG_TEMP_EMAIL_ADDRESS
emails = re.sub((CFG_TEMP_EMAIL_ADDRESS % '\w+') + '(,|$)', '', emails,
re.IGNORECASE)
# Remove all consecutive commas
emails = re.sub(',+', ',', emails)
if emails[0] == ',':
# Remove the comma at the beginning of the string
emails = emails[1:]
if emails[-1] == ',':
# Remove the comma at the end of the string
emails = emails[:-1]
return emails
diff --git a/modules/miscutil/lib/mailutils_backend_adminonly.py b/modules/miscutil/lib/mailutils_backend_adminonly.py
new file mode 100644
index 000000000..7447ddee3
--- /dev/null
+++ b/modules/miscutil/lib/mailutils_backend_adminonly.py
@@ -0,0 +1,61 @@
+# -*- coding: utf-8 -*-
+##
+## This file is part of Invenio.
+## Copyright (C) 2013 CERN.
+##
+## Invenio is free software; you can redistribute it and/or
+## modify it under the terms of the GNU General Public License as
+## published by the Free Software Foundation; either version 2 of the
+## License, or (at your option) any later version.
+##
+## Invenio is distributed in the hope that it will be useful, but
+## WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+## General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with Invenio; if not, write to the Free Software Foundation, Inc.,
+## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
+
+"""
+Invenio Admin mail backend. send_email() will send emails only to
+CFG_SITE_ADMIN_EMAIL.
+"""
+
+__revision__ = "$Id$"
+
+from invenio.config import CFG_SITE_ADMIN_EMAIL
+from flask.ext.email.backends.smtp import Mail as SMTP
+from flask.ext.email.backends.console import Mail as Console
+
+
+def adminonly_class(Backend):
+ """Admin only mail backend class factory."""
+
+ class _Mail(Backend):
+
+ def send_messages(self, email_messages):
+ def process_message(m):
+ m.body = """
+#--------------------------------------------------------------
+#This message would have been sent to the following recipients:
+#%s
+#--------------------------------------------------------------
+#%s""" % (','.join(m.recipients()), m.body)
+ m.to = [CFG_SITE_ADMIN_EMAIL]
+ m.cc = []
+ m.bcc = []
+ if 'Cc' in m.extra_headers:
+ del m.extra_headers['Cc']
+ if 'Bcc' in m.extra_headers:
+ del m.extra_headers['Bcc']
+ return m
+ return super(_Mail, self).send_messages(
+ map(process_message, email_messages))
+
+ return _Mail
+
+SMTPMail = adminonly_class(SMTP)
+ConsoleMail = adminonly_class(Console)
+
+__all__ = ['SMTPMail', 'ConsoleMail']
diff --git a/modules/miscutil/lib/mailutils_unit_tests.py b/modules/miscutil/lib/mailutils_unit_tests.py
new file mode 100644
index 000000000..9d3c18577
--- /dev/null
+++ b/modules/miscutil/lib/mailutils_unit_tests.py
@@ -0,0 +1,284 @@
+# -*- coding: utf-8 -*-
+##
+## This file is part of Invenio.
+## Copyright (C) 2013 CERN.
+##
+## Invenio is free software; you can redistribute it and/or
+## modify it under the terms of the GNU General Public License as
+## published by the Free Software Foundation; either version 2 of the
+## License, or (at your option) any later version.
+##
+## Invenio is distributed in the hope that it will be useful, but
+## WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+## General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with Invenio; if not, write to the Free Software Foundation, Inc.,
+## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
+
+"""
+Test unit for the miscutil/mailutils module.
+"""
+
+import os
+import sys
+from base64 import encodestring
+from StringIO import StringIO
+from flask import current_app
+
+from invenio.mailutils import send_email
+from invenio.testutils import make_test_suite, run_test_suite, InvenioTestCase
+
+
+class MailTestCase(InvenioTestCase):
+
+ EMAIL_BACKEND = 'flask.ext.email.backends.console.Mail'
+
+ def setUp(self):
+ super(MailTestCase, self).setUp()
+ current_app.config['EMAIL_BACKEND'] = self.EMAIL_BACKEND
+ self.__stdout = sys.stdout
+ self.stream = sys.stdout = StringIO()
+
+ def tearDown(self):
+ del self.stream
+ sys.stdout = self.__stdout
+ del self.__stdout
+ super(MailTestCase, self).tearDown()
+
+ def flush_mailbox(self):
+ self.stream = sys.stdout = StringIO()
+
+ #def get_mailbox_content(self):
+ # messages = self.stream.getvalue().split('\n' + ('-' * 79) + '\n')
+ # return [message_from_string(m) for m in messages if m]
+
+
+class TestMailUtils(MailTestCase):
+ """
+ mailutils TestSuite.
+ """
+
+ def test_console_send_email(self):
+ """
+ Test that the console backend can be pointed at an arbitrary stream.
+ """
+ msg_content = """Content-Type: text/plain; charset="utf-8"
+MIME-Version: 1.0
+Content-Transfer-Encoding: 7bit
+Subject: Subject
+From: from@example.com
+To: to@example.com"""
+
+ send_email('from@example.com', ['to@example.com'], subject='Subject',
+ content='Content')
+ self.assertIn(msg_content, sys.stdout.getvalue())
+ self.flush_mailbox()
+
+ send_email('from@example.com', 'to@example.com', subject='Subject',
+ content='Content')
+ self.assertIn(msg_content, sys.stdout.getvalue())
+ self.flush_mailbox()
+
+
+ def test_email_text_template(self):
+ """
+ Test email text template engine.
+ """
+ from invenio.jinja2utils import render_template_to_string
+
+ contexts = {
+ 'ctx1': {'content': 'Content 1'},
+ 'ctx2': {'content': 'Content 2', 'header': 'Header 2'},
+ 'ctx3': {'content': 'Content 3', 'footer': 'Footer 3'},
+ 'ctx4': {'content': 'Content 4', 'header': 'Header 4', 'footer': 'Footer 4'}
+ }
+
+ msg_content = """Content-Type: text/plain; charset="utf-8"
+MIME-Version: 1.0
+Content-Transfer-Encoding: 7bit
+Subject: %s
+From: from@example.com
+To: to@example.com"""
+
+ for name, ctx in contexts.iteritems():
+ msg = render_template_to_string('mail_text.tpl', **ctx)
+ send_email('from@example.com', ['to@example.com'], subject=name,
+ **ctx)
+ email = sys.stdout.getvalue()
+ self.assertIn(msg_content % name, email)
+ self.assertIn(msg, email)
+ self.flush_mailbox()
+
+ def test_email_html_template(self):
+ """
+ Test email html template engine.
+ """
+ from invenio.jinja2utils import render_template_to_string
+
+ contexts = {
+ 'ctx1': {'html_content': 'Content 1'},
+ 'ctx2': {'html_content': 'Content 2',
+ 'html_header': '
',
+ 'html_footer': 'Footer 4'}
+ }
+
+ def strip_html_key(ctx):
+ return dict(map(lambda (k, v): (k[5:], v), ctx.iteritems()))
+
+ for name, ctx in contexts.iteritems():
+ msg = render_template_to_string('mail_html.tpl',
+ **strip_html_key(ctx))
+ send_email('from@example.com', ['to@example.com'], subject=name,
+ content='Content Text', **ctx)
+ email = sys.stdout.getvalue()
+ self.assertIn('Content-Type: multipart/alternative;', email)
+ self.assertIn('Content Text', email)
+ self.assertIn(msg, email)
+ self.flush_mailbox()
+
+ def test_email_html_image(self):
+ """
+ Test sending html message with an image.
+ """
+ from invenio.config import CFG_WEBDIR
+ html_images = {
+ 'img1': os.path.join(CFG_WEBDIR, 'img', 'journal_water_dog.gif')
+ }
+ send_email('from@example.com', ['to@example.com'],
+ subject='Subject', content='Content Text',
+ html_content='',
+ html_images=html_images)
+ email = sys.stdout.getvalue()
+ self.assertIn('Content Text', email)
+ self.assertIn('', email)
+ with open(html_images['img1'], 'r') as f:
+ self.assertIn(encodestring(f.read()), email)
+ self.flush_mailbox()
+
+ def test_sending_attachment(self):
+ """
+ Test sending email with an attachment.
+ """
+ from invenio.config import CFG_WEBDIR
+ attachments = [
+ os.path.join(CFG_WEBDIR, 'img', 'journal_header.png')
+ ]
+ send_email('from@example.com', ['to@example.com'],
+ subject='Subject', content='Content Text',
+ attachments=attachments)
+ email = sys.stdout.getvalue()
+ self.assertIn('Content Text', email)
+ # First attachemnt is image/png
+ self.assertIn('Content-Type: image/png', email)
+ for attachment in attachments:
+ with open(attachment, 'r') as f:
+ self.assertIn(encodestring(f.read()), email)
+ self.flush_mailbox()
+
+ def test_bbc_undisclosed_recipients(self):
+ """
+ Test that the email receivers are hidden.
+ """
+ msg_content = """Content-Type: text/plain; charset="utf-8"
+MIME-Version: 1.0
+Content-Transfer-Encoding: 7bit
+Subject: Subject
+From: from@example.com
+To: Undisclosed.Recipients:"""
+
+ send_email('from@example.com', ['to@example.com', 'too@example.com'],
+ subject='Subject', content='Content')
+ email = sys.stdout.getvalue()
+ self.assertIn(msg_content, email)
+ self.assertIn('Bcc: to@example.com,too@example.com', email)
+ self.flush_mailbox()
+
+ send_email('from@example.com', 'to@example.com, too@example.com',
+ subject='Subject', content='Content')
+ email = sys.stdout.getvalue()
+ self.assertIn(msg_content, email)
+ self.assertIn('Bcc: to@example.com,too@example.com', email)
+ self.flush_mailbox()
+
+
+class TestAdminMailBackend(MailTestCase):
+
+ EMAIL_BACKEND = 'invenio.mailutils_backend_adminonly.ConsoleMail'
+ ADMIN_MESSAGE = "This message would have been sent to the following recipients"
+
+ def test_simple_email_header(self):
+ """
+ Test simple email header.
+ """
+ from invenio.config import CFG_SITE_ADMIN_EMAIL
+ from invenio.jinja2utils import render_template_to_string
+
+ msg_content = """Content-Type: text/plain; charset="utf-8"
+MIME-Version: 1.0
+Content-Transfer-Encoding: 7bit
+Subject: Subject
+From: from@example.com
+To: %s""" % (CFG_SITE_ADMIN_EMAIL, )
+
+ msg = render_template_to_string('mail_text.tpl', content='Content')
+
+ send_email('from@example.com', ['to@example.com'], subject='Subject',
+ content='Content')
+ email = sys.stdout.getvalue()
+ self.assertIn(msg_content, email)
+ self.assertIn(self.ADMIN_MESSAGE, email)
+ self.assertNotIn('Bcc:', email)
+ self.assertIn(msg, email)
+ self.flush_mailbox()
+
+ send_email('from@example.com', 'to@example.com', subject='Subject',
+ content='Content')
+ email = sys.stdout.getvalue()
+ self.assertIn(msg_content, email)
+ self.assertIn(self.ADMIN_MESSAGE, email)
+ self.assertNotIn('Bcc:', email)
+ self.assertIn(msg, email)
+ self.flush_mailbox()
+
+
+ def test_cc_bcc_headers(self):
+ """
+ Test that no Cc and Bcc headers are sent.
+ """
+ from invenio.config import CFG_SITE_ADMIN_EMAIL
+ msg_content = """Content-Type: text/plain; charset="utf-8"
+MIME-Version: 1.0
+Content-Transfer-Encoding: 7bit
+Subject: Subject
+From: from@example.com
+To: %s""" % (CFG_SITE_ADMIN_EMAIL, )
+
+ send_email('from@example.com', ['to@example.com', 'too@example.com'],
+ subject='Subject', content='Content')
+ email = sys.stdout.getvalue()
+ self.assertIn(msg_content, email)
+ self.assertIn(self.ADMIN_MESSAGE, email)
+ self.assertIn('to@example.com,too@example.com', email)
+ self.assertNotIn('Bcc: to@example.com,too@example.com', email)
+ self.flush_mailbox()
+
+ send_email('from@example.com', 'to@example.com, too@example.com',
+ subject='Subject', content='Content')
+ email = sys.stdout.getvalue()
+ self.assertIn(msg_content, email)
+ self.assertIn(self.ADMIN_MESSAGE, email)
+ self.assertIn('to@example.com,too@example.com', email)
+ self.assertNotIn('Bcc: to@example.com,too@example.com', email)
+ self.flush_mailbox()
+
+
+TEST_SUITE = make_test_suite(TestMailUtils, TestAdminMailBackend)
+
+if __name__ == "__main__":
+ run_test_suite(TEST_SUITE)
diff --git a/modules/webmessage/lib/webmessage_model.py b/modules/webmessage/lib/webmessage_model.py
index 1447f2f50..bee5db7d5 100644
--- a/modules/webmessage/lib/webmessage_model.py
+++ b/modules/webmessage/lib/webmessage_model.py
@@ -1,218 +1,218 @@
# -*- coding: utf-8 -*-
#
## This file is part of Invenio.
## Copyright (C) 2011, 2012 CERN.
##
## Invenio is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## Invenio is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Invenio; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
"""
WebMessage database models.
"""
# General imports
from invenio.sqlalchemyutils import db
# Create your models here.
from string import strip
from invenio.websession_model import User, Usergroup
from invenio.webmessage_config import CFG_WEBMESSAGE_SEPARATOR
from sqlalchemy.ext.associationproxy import association_proxy
class MsgMESSAGE(db.Model):
"""Represents a MsgMESSAGE record."""
def __str__(self):
return "From: %s<%s>, Subject: <%s> %s" % \
(self.user_from.nickname or _('None'),
self.user_from.email or _('unknown'),
self.subject, self.body)
__tablename__ = 'msgMESSAGE'
id = db.Column(db.Integer(15, unsigned=True), nullable=False,
primary_key=True,
autoincrement=True)
id_user_from = db.Column(db.Integer(15, unsigned=True),
db.ForeignKey(User.id),
nullable=True, server_default='0')
_sent_to_user_nicks = db.Column(db.Text, name='sent_to_user_nicks',
nullable=False)
_sent_to_group_names = db.Column(db.Text, name='sent_to_group_names',
nullable=False)
subject = db.Column(db.Text, nullable=False)
body = db.Column(db.Text, nullable=True)
sent_date = db.Column(db.DateTime, nullable=False,
server_default='0001-01-01 00:00:00') # db.func.now() -> 'NOW()'
received_date = db.Column(db.DateTime,
server_default='0001-01-01 00:00:00')
user_from = db.relationship(User, backref='sent_messages')
#recipients = db.relationship(User,
# secondary=lambda: UserMsgMESSAGE.__table__,
# collection_class=set)
recipients = association_proxy('sent_to_users', 'user_to',
creator=lambda u:UserMsgMESSAGE(user_to=u))
@db.hybrid_property
def sent_to_user_nicks(self):
""" Alias for column 'sent_to_user_nicks'. """
return self._sent_to_user_nicks
@db.hybrid_property
def sent_to_group_names(self):
""" Alias for column 'sent_to_group_names'. """
return self._sent_to_group_names
@db.validates('_sent_to_user_nicks')
def validate_sent_to_user_nicks(self, key, value):
user_nicks = filter(len, map(strip,
value.split(CFG_WEBMESSAGE_SEPARATOR)))
assert len(user_nicks) == len(set(user_nicks))
if len(user_nicks) > 0:
assert len(user_nicks) == \
User.query.filter(User.nickname.in_(user_nicks)).count()
return CFG_WEBMESSAGE_SEPARATOR.join(user_nicks)
@db.validates('_sent_to_group_names')
def validate_sent_to_group_names(self, key, value):
group_names = filter(len, map(strip,
value.split(CFG_WEBMESSAGE_SEPARATOR)))
assert len(group_names) == len(set(group_names))
if len(group_names) > 0:
assert len(group_names) == \
Usergroup.query.filter(Usergroup.name.in_(group_names)).count()
return CFG_WEBMESSAGE_SEPARATOR.join(group_names)
@sent_to_user_nicks.setter
def sent_to_user_nicks(self, value):
old_user_nicks = self.user_nicks
self._sent_to_user_nicks = value
to_add = set(self.user_nicks)-set(old_user_nicks)
to_del = set(old_user_nicks)-set(self.user_nicks)
if len(self.group_names):
to_del = to_del-set([u.nickname for u in User.query.\
join(User.usergroups).filter(
Usergroup.name.in_(self.group_names)).\
all()])
if len(to_del):
is_to_del = lambda u: u.nickname in to_del
remove_old = filter(is_to_del, self.recipients)
for u in remove_old:
self.recipients.remove(u)
if len(to_add):
for u in User.query.filter(User.nickname.\
in_(to_add)).all():
if u not in self.recipients:
self.recipients.append(u)
@sent_to_group_names.setter
def sent_to_group_names(self, value):
old_group_names = self.group_names
self._sent_to_group_names = value
groups_to_add = set(self.group_names)-set(old_group_names)
groups_to_del = set(old_group_names)-set(self.group_names)
if len(groups_to_del):
to_del = set([u.nickname for u in User.query.\
join(User.usergroups).filter(
Usergroup.name.in_(groups_to_del)).\
all()])-set(self.user_nicks)
is_to_del = lambda u: u.nickname in to_del
remove_old = filter(is_to_del, self.recipients)
for u in remove_old:
self.recipients.remove(u)
if len(groups_to_add):
for u in User.query.join(User.usergroups).filter(db.and_(
Usergroup.name.in_(groups_to_add),
db.not_(User.nickname.in_(self.user_nicks)))).all():
if u not in self.recipients:
self.recipients.append(u)
@property
def user_nicks(self):
if not self._sent_to_user_nicks:
return []
return filter(len, map(strip,
self._sent_to_user_nicks.split(CFG_WEBMESSAGE_SEPARATOR)))
@property
def group_names(self):
if not self._sent_to_group_names:
return []
return filter(len, map(strip,
self.sent_to_group_names.split(CFG_WEBMESSAGE_SEPARATOR)))
#TODO consider moving following lines to separate file.
from invenio.webmessage_config import CFG_WEBMESSAGE_EMAIL_ALERT
from invenio.config import CFG_WEBCOMMENT_ALERT_ENGINE_EMAIL
-from invenio.mailutils import send_email, scheduled_send_email
-from invenio.jinja2utils import render_template_to_string
from invenio.dateutils import datetext_format
from datetime import datetime
def email_alert(mapper, connection, target):
""" Sends email alerts to message recipients. """
+ from invenio.jinja2utils import render_template_to_string
+ from invenio.mailutils import send_email, scheduled_send_email
m = target
is_reminder = m.received_date is not None \
and m.received_date > datetime.now()
alert = send_email
if is_reminder:
alert = lambda *args, **kwargs: scheduled_send_email(*args,
other_bibtasklet_arguments=[
m.received_date.strftime(datetext_format)],
**kwargs)
for u in m.recipients:
if isinstance(u.settings, dict) and \
u.settings.get('webmessage_email_alert', True):
try:
alert(
CFG_WEBCOMMENT_ALERT_ENGINE_EMAIL,
u.email,
subject = m.subject,
content = render_template_to_string(
'webmessage_email_alert.html',
message=m, user=u))
except:
# FIXME tests are not in request context
pass
if CFG_WEBMESSAGE_EMAIL_ALERT:
from sqlalchemy import event
# Register after insert callback.
event.listen(MsgMESSAGE, 'after_insert', email_alert)
class UserMsgMESSAGE(db.Model):
"""Represents a UserMsgMESSAGE record."""
__tablename__ = 'user_msgMESSAGE'
id_user_to = db.Column(db.Integer(15, unsigned=True),
db.ForeignKey(User.id), nullable=False,
server_default='0', primary_key=True)
id_msgMESSAGE = db.Column(db.Integer(15, unsigned=True),
db.ForeignKey(MsgMESSAGE.id),
nullable=False, server_default='0',
primary_key=True)
status = db.Column(db.Char(1), nullable=False,
server_default='N')
user_to = db.relationship(User, backref='received_messages',
collection_class=set)
message = db.relationship(MsgMESSAGE, backref='sent_to_users',
collection_class=set)
__all__ = ['MsgMESSAGE',
'UserMsgMESSAGE']
diff --git a/modules/webstyle/lib/webinterface_handler_flask.py b/modules/webstyle/lib/webinterface_handler_flask.py
index c7c7a80aa..328b11b2e 100644
--- a/modules/webstyle/lib/webinterface_handler_flask.py
+++ b/modules/webstyle/lib/webinterface_handler_flask.py
@@ -1,642 +1,645 @@
# -*- coding: utf-8 -*-
## This file is part of Invenio.
## Copyright (C) 2011, 2012, 2013 CERN.
##
## Invenio is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## Invenio is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Invenio; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
"""
Invenio -> Flask adapter
"""
## Import the remote debugger as a first thing, if allowed
try:
from invenio import remote_debugger
except:
remote_debugger = None
import os
from os.path import join
from pprint import pformat
from functools import wraps
from logging.handlers import RotatingFileHandler
from logging import Formatter
from flask import Flask, session, request, g, url_for, current_app, \
render_template, redirect, flash, abort, has_app_context
from jinja2 import FileSystemLoader, MemcachedBytecodeCache
from werkzeug.routing import BuildError
from invenio import config
from invenio.errorlib import register_exception
from invenio.config import CFG_PYLIBDIR, \
CFG_WEBSESSION_EXPIRY_LIMIT_REMEMBER, \
CFG_BIBDOCFILE_USE_XSENDFILE, \
CFG_LOGDIR, CFG_SITE_LANG, CFG_WEBDIR, \
CFG_ETCDIR, CFG_DEVEL_SITE, \
CFG_FLASK_CACHE_TYPE, CFG_FLASK_DISABLED_BLUEPRINTS, \
CFG_SITE_URL, CFG_SITE_SECURE_URL, CFG_FLASK_SERVE_STATIC_FILES, \
CFG_SITE_SECRET_KEY, CFG_BINDIR
from invenio.websession_config import CFG_WEBSESSION_COOKIE_NAME, \
CFG_WEBSESSION_ONE_DAY
CFG_HAS_HTTPS_SUPPORT = CFG_SITE_SECURE_URL.startswith("https://")
CFG_FULL_HTTPS = CFG_SITE_URL.lower().startswith("https://")
def create_invenio_flask_app(**kwargs_config):
"""
Prepare WSGI Invenio application based on Flask.
Invenio consists of a new Flask application with legacy support for
the old WSGI legacy application and the old Python legacy
scripts (URLs to *.py files).
An incoming request is processed in the following manner:
* The Flask application first routes request via its URL routing
system (see LegacyAppMiddleware.__call__()).
* One route in the Flask system, will match Python legacy
scripts (see static_handler_with_legacy_publisher()).
* If the Flask application aborts the request with a 404 error, the request
is passed on to the WSGI legacy application (see page_not_found()). E.g.
either the Flask application did not find a route, or a view aborted the
request with a 404 error.
"""
## The Flask application instance
_app = Flask(__name__,
## Static files are usually handled directly by the webserver (e.g. Apache)
## However in case WSGI is required to handle static files too (such
## as when running simple server), then this flag can be
## turned on (it is done automatically by wsgi_handler_test).
## We assume anything under '/' which is static to be server directly
## by the webserver from CFG_WEBDIR. In order to generate independent
## url for static files use func:`url_for('static', filename='test')`.
static_url_path='',
static_folder=CFG_WEBDIR)
## Update application config from parameters.
_app.config.update(kwargs_config)
if 'SQLALCHEMY_DATABASE_URI' not in _app.config:
from sqlalchemy.engine.url import URL
# Global variables
from invenio.dbquery import CFG_DATABASE_HOST, CFG_DATABASE_PORT,\
CFG_DATABASE_NAME, CFG_DATABASE_USER, CFG_DATABASE_PASS, \
CFG_DATABASE_TYPE
_app.config['SQLALCHEMY_DATABASE_URI'] = URL(CFG_DATABASE_TYPE,
username=CFG_DATABASE_USER,
password=CFG_DATABASE_PASS,
host=CFG_DATABASE_HOST,
database=CFG_DATABASE_NAME,
port=CFG_DATABASE_PORT,
)
## Let's initialize database.
from invenio.sqlalchemyutils import db
db.init_app(_app)
## Make sure that all tables are loaded in `db.metadata.tables`.
from invenio.importutils import autodiscover_modules
autodiscover_modules(['invenio'], related_name_re=".+_model\.py")
## First check that you have all rights to logs
from invenio.bibtask import check_running_process_user
check_running_process_user()
from invenio.pluginutils import PluginContainer
from invenio.session_flask import InvenioSessionInterface
from invenio.webuser_flask import InvenioLoginManager, current_user
from invenio.messages import wash_language, gettext_set_language, \
language_list_long, is_language_rtl
from invenio.urlutils import create_url, get_canonical_and_alternates_urls
from invenio.cache import cache
from invenio.jinja2utils import CollectionExtension, \
LangExtension, hack_jinja2_utf8decoding, \
extend_application_template_filters
from flask.ext.assets import Environment, Bundle
from invenio.webinterface_handler_flask_utils import unicodifier, InvenioRequest
from flaskext.gravatar import Gravatar
from werkzeug.wrappers import BaseResponse
from werkzeug.exceptions import HTTPException
from invenio.flask_sslify import SSLify
from invenio.webinterface_handler_wsgi import application as legacy_application
from invenio.webinterface_handler_wsgi import is_mp_legacy_publisher_path, \
mp_legacy_publisher
# See note on Jinja2 string decoding using ASCII codec instead of UTF8 in
# function documentation
hack_jinja2_utf8decoding()
# Handle both url with and without trailing slashe by Flask.
# @blueprint.route('/test')
# @blueprint.route('/test/') -> not necessary when strict_slashes == False
_app.url_map.strict_slashes = False
# SECRET_KEY is needed by Flask Debug Toolbar
SECRET_KEY = _app.config.get('SECRET_KEY') or CFG_SITE_SECRET_KEY
if not SECRET_KEY or SECRET_KEY == '':
fill_secret_key = """
Set variable CFG_SITE_SECRET_KEY with random string in invenio-local.conf.
You can use following commands:
$ %s
$ %s
""" % (CFG_BINDIR + os.sep + 'inveniocfg --create-secret-key',
CFG_BINDIR + os.sep + 'inveniocfg --update-config-py')
try:
raise Exception(fill_secret_key)
except Exception:
register_exception(alert_admin=True,
subject="Missing CFG_SITE_SECRET_KEY")
raise Exception(fill_secret_key)
_app.config["SECRET_KEY"] = SECRET_KEY
# Enable Flask Debug Toolbar early to also catch HTTPS redirects
if 'debug-toolbar' in getattr(config, 'CFG_DEVEL_TOOLS', []):
_app.config["DEBUG_TB_ENABLED"] = True
_app.config["DEBUG_TB_INTERCEPT_REDIRECTS"] = 'intercept-redirects' in getattr(config, 'CFG_DEVEL_TOOLS', [])
from flask_debugtoolbar import DebugToolbarExtension
DebugToolbarExtension(_app)
+ # Set email backend for Flask-Email plugin
+ from invenio.mailutils import initialize_email_backend
+ initialize_email_backend(_app)
if CFG_HAS_HTTPS_SUPPORT:
# Makes request always run over HTTPS.
_sslify = SSLify(_app)
if not CFG_FULL_HTTPS:
@_sslify.criteria_handler
def criteria():
"""Extends criteria when to stay on HTTP site."""
_force_https = False
if request.blueprint in current_app.blueprints:
_force_https = current_app.blueprints[request.blueprint].\
_force_https
view_func = current_app.view_functions.get(request.endpoint)
if view_func is not None and hasattr(view_func, '_force_https'):
_force_https = view_func._force_https
return not (_force_https or session.need_https())
class LegacyAppMiddleware(object):
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
if remote_debugger:
remote_debugger.start()
with self.app.request_context(environ):
g.start_response = start_response
try:
response = self.app.full_dispatch_request()
except Exception as e:
register_exception(req=request, alert_admin=True)
response = self.app.handle_exception(e)
return response(environ, start_response)
_app.wsgi_app = LegacyAppMiddleware(_app)
@_app.errorhandler(404)
def page_not_found(error):
try:
response = legacy_application(request.environ, g.start_response)
if not isinstance(response, BaseResponse):
response = current_app.make_response(str(response))
return response
except HTTPException:
return render_template("404.html"), 404
@_app.errorhandler(401)
def do_login_first(error=401):
"""Displays login page when user is not authorised."""
if request.is_xhr:
return _("Authorization failure"), 401
flash(_("Authorization failure"), 'error')
from invenio.webaccount_blueprint import login
return login(referer=request.referrer), 401
@_app.endpoint('static')
@_app.route(_app.static_url_path + '/', methods=['POST'])
def static_handler_with_legacy_publisher(*args, **kwargs):
"""
Adds support for legacy publisher.
NOTE: It changes order of url page lookup. First, the invenio_handler
will be called and on 404 error the mp_legacy_publisher is called.
"""
possible_module, possible_handler = is_mp_legacy_publisher_path(
request.environ['PATH_INFO'])
if possible_module is not None:
legacy_publisher = lambda req: \
mp_legacy_publisher(req, possible_module, possible_handler)
return legacy_application(request.environ, g.start_response,
handler=legacy_publisher)
# Static file serving for devserver
# ---------------------------------
# Apache normally serve all static files, but if we are using the
# devserver we need to serve static files here. Werkzeugs default
# behaviour is to return a '405 Method not allowed' for POST requests
# to static files. However, if we abort all POST requests with 405, the
# legacy_application (see page_not_found()) will not be given a chance
# to serve static files as it only get's invokved when we abort with a
# 404. Hence, on POST requests, we first check if the static file exists,
# and if it does we return we abort the request with a 405.
if not CFG_FLASK_SERVE_STATIC_FILES:
abort(404)
else:
static_file_response = _app.send_static_file(*args, **kwargs)
if request.method == 'POST':
abort(405)
else:
return static_file_response
if CFG_FLASK_CACHE_TYPE not in [None, 'null']:
_app.jinja_options = dict(_app.jinja_options,
auto_reload=False,
cache_size=-1,
bytecode_cache=MemcachedBytecodeCache(
cache, prefix="jinja::",
timeout=3600))
## Let's customize the template loader to first look into
## /opt/invenio/etc-local/templates and then into
## /opt/invenio/etc/templates
_app.jinja_loader = FileSystemLoader([join(CFG_ETCDIR + '-local',
'templates'),
join(CFG_ETCDIR, 'templates')])
## Let's attach our session handling (which is bridging with the native
## Invenio session handling
_app.session_interface = InvenioSessionInterface()
## Set custom request class
_app.request_class = InvenioRequest
## Let's load the whole invenio.config into Flask :-) ...
_app.config.from_object(config)
## ... and map certain common parameters
_app.config['SESSION_COOKIE_NAME'] = CFG_WEBSESSION_COOKIE_NAME
_app.config['PERMANENT_SESSION_LIFETIME'] = \
CFG_WEBSESSION_EXPIRY_LIMIT_REMEMBER * CFG_WEBSESSION_ONE_DAY
_app.config['USE_X_SENDFILE'] = CFG_BIBDOCFILE_USE_XSENDFILE
_app.config['DEBUG'] = CFG_DEVEL_SITE > 0
_app.debug = CFG_DEVEL_SITE > 0
_app.config['CFG_LANGUAGE_LIST_LONG'] = [(lang, longname.decode('utf-8'))
for (lang, longname) in language_list_long()]
## Invenio is all using str objects. Let's change them to unicode
_app.config.update(unicodifier(dict(_app.config)))
## Cache
_app.config['CACHE_TYPE'] = CFG_FLASK_CACHE_TYPE
# FIXME problem in Flask-Cache==0.11.1
cache.app = _app
cache.init_app(_app)
if CFG_FLASK_CACHE_TYPE == 'redis':
def with_try_except_block(f):
def decorator(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception:
register_exception(alert_admin=True)
pass
return decorator
## When the redis is down, we would like to keep the site running.
cache.cache._client.execute_command = with_try_except_block(
cache.cache._client.execute_command)
# FIXME problem in Flask-Cache==0.11.1
cache.app = current_app
_flask_log_handler = RotatingFileHandler(os.path.join(CFG_LOGDIR,
'flask.log'))
_flask_log_handler.setFormatter(Formatter(
'%(asctime)s %(levelname)s: %(message)s '
'[in %(pathname)s:%(lineno)d]'
))
_app.logger.addHandler(_flask_log_handler)
# Let's create login manager.
_login_manager = InvenioLoginManager()
_login_manager.login_view = 'webaccount.login'
_login_manager.setup_app(_app)
_login_manager.unauthorized_handler(do_login_first)
# Let's create main menu.
class Menu(object):
def __init__(self, id='', title='', url='', order=None, children=None,
display=lambda: True):
self.id = id
self.title = title
self.url = url
self.children = children or {}
self.order = order or 100
self.display = display
# Let's create assets environment.
_assets = Environment(_app)
_assets.debug = 'assets-debug' in getattr(config, 'CFG_DEVEL_TOOLS', [])
_assets.directory = config.CFG_WEBDIR
def _jinja2_new_bundle(tag, collection, name=None):
if not _assets.debug:
files = [f for f in collection if os.path.isfile(
os.path.join(_assets.directory, f))]
if len(files) != len(collection):
## Turn on debuging to generate 404 request on missing files.
_assets.debug = True
current_app.logger.error('Missing files: ' + ','.join(
set(collection) - set(files)))
if len(collection):
return Bundle(output="%s/%s-%s.%s" %
(tag, 'invenio' if name is None else name,
hash('|'.join(collection)), tag), *collection)
_app.jinja_env.extend(new_bundle=_jinja2_new_bundle,
default_bundle_name='90-invenio')
_app.jinja_env.add_extension(CollectionExtension)
_app.jinja_env.add_extension(LangExtension)
_app.jinja_env.add_extension('jinja2.ext.do')
# Let's extend application with custom template filters.
extend_application_template_filters(_app)
# Let's create Gravatar bridge.
_gravatar = Gravatar(_app,
size=100,
rating='g',
default='retro',
force_default=False,
force_lower=False)
del _gravatar
@_login_manager.user_loader
def _load_user(uid):
"""
Function should not raise an exception if uid is not valid
or User was not found in database.
"""
from invenio.webuser_flask import UserInfo
return UserInfo(uid)
@_app.before_request
def reset_template_context_processor():
g._template_context_processor = []
@_app.context_processor
def _inject_template_context():
context = {}
if not hasattr(g, '_template_context_processor'):
reset_template_context_processor()
for func in g._template_context_processor:
context.update(func())
return context
@_app.before_request
def _guess_language():
"""
Before every request being handled, let's compute the language needed to
return the answer to the client.
This information will then be available in the session['ln'] and in g.ln.
Additionally under g._ an already configured internationalization function
will be available (configured to return unicode objects).
"""
required_ln = None
try:
values = request.values
except:
values = {}
if "ln" in values:
## If ln is specified explictly as a GET or POST argument
## let's take it!
passed_ln = str(values["ln"])
required_ln = wash_language(passed_ln)
if passed_ln != required_ln:
## But only if it was a valid language
required_ln = None
if required_ln:
## Ok it was. We store it in the session.
session["ln"] = required_ln
if not "ln" in session:
## If there is no language saved into the session...
if "user_info" in session and session["user_info"].get("language"):
## ... and the user is logged in, we try to take it from its
## settings.
session["ln"] = session["user_info"]["language"]
else:
## Otherwise we try to guess it from its request headers
for value, quality in request.accept_languages:
value = str(value)
ln = wash_language(value)
if ln == value or ln[:2] == value[:2]:
session["ln"] = ln
break
else:
## Too bad! We stick to the default :-)
session["ln"] = CFG_SITE_LANG
## Well, let's make it global now
g.ln = session["ln"]
g._ = gettext_set_language(g.ln, use_unicode=True)
@_app.context_processor
def _inject_utils():
"""
This will add some more variables and functions to the Jinja2 to execution
context. In particular it will add:
- `url_for`: an Invenio specific wrapper of Flask url_for, that will let you
obtain URLs for non Flask-native handlers (i.e. not yet ported
Invenio URLs)
- `breadcrumbs`: this will be a list of three-elements tuples, containing
the hierarchy of Label -> URLs of navtrails/breadcrumbs.
- `_`: this can be used to automatically translate a given string.
- `is_language_rtl`: is True if the chosen language should be read right to left
"""
def invenio_url_for(endpoint, **values):
try:
return url_for(endpoint, **values)
except BuildError:
if endpoint.startswith('http://') or endpoint.startswith('https://'):
return endpoint
if endpoint.startswith('.'):
endpoint = request.blueprint + endpoint
return create_url('/' + '/'.join(endpoint.split('.')), values, False).decode('utf-8')
if request.endpoint in current_app.config['breadcrumbs_map']:
breadcrumbs = current_app.config['breadcrumbs_map'][request.endpoint]
elif request.endpoint:
breadcrumbs = [(_('Home'), '')] + current_app.config['breadcrumbs_map'].get(request.endpoint.split('.')[0], [])
else:
breadcrumbs = [(_('Home'), '')]
user = current_user._get_current_object()
canonical_url, alternate_urls = get_canonical_and_alternates_urls(
request.environ['PATH_INFO'])
alternate_urls = dict((ln.replace('_', '-'), alternate_url)
for ln, alternate_url in alternate_urls.iteritems())
_guess_language()
from invenio.bibfield import get_record # should not be global due to bibfield_config
return dict(_=lambda *args, **kwargs: g._(*args, **kwargs),
current_user=user,
get_css_bundle=_app.jinja_env.get_css_bundle,
get_js_bundle=_app.jinja_env.get_js_bundle,
is_language_rtl=is_language_rtl,
canonical_url=canonical_url,
alternate_urls=alternate_urls,
get_record=get_record,
url_for=invenio_url_for,
breadcrumbs=breadcrumbs,
)
def _invenio_blueprint_plugin_builder(plugin_name, plugin_code):
"""
Handy function to bridge pluginutils with (Invenio) blueprints.
"""
if plugin_name in CFG_FLASK_DISABLED_BLUEPRINTS:
raise ValueError('%s is excluded by CFG_FLASK_DISABLED_BLUEPRINTS' % plugin_name)
from invenio.webinterface_handler_flask_utils import InvenioBlueprint
if 'blueprint' in dir(plugin_code):
candidate = getattr(plugin_code, 'blueprint')
if isinstance(candidate, InvenioBlueprint):
return candidate
raise ValueError('%s is not a valid blueprint plugin' % plugin_name)
## Let's load all the blueprints that are composing this Invenio instance
_BLUEPRINTS = PluginContainer(
os.path.join(CFG_PYLIBDIR, 'invenio', '*_blueprint.py'),
plugin_builder=_invenio_blueprint_plugin_builder)
## Let's report about broken plugins
open(join(CFG_LOGDIR, 'broken-blueprints.log'), 'w').write(
pformat(_BLUEPRINTS.get_broken_plugins()))
_app.config['breadcrumbs_map'] = {}
_app.config['menubuilder_map'] = {}
## Let's attach all the blueprints
from invenio.webinterface_handler_flask_utils import _
for plugin in _BLUEPRINTS.values():
_app.register_blueprint(plugin)
if plugin.config:
## Let's include the configuration parameters of the config file.
## E.g. if the blueprint specify the config string
## 'invenio.webmessage_config' any uppercase variable defined in
## the module invenio.webmessage_config is loaded into the system.
_app.config.from_object(plugin.config)
if plugin.breadcrumbs:
_app.config['breadcrumbs_map'][plugin.name] = plugin.breadcrumbs
_app.config['breadcrumbs_map'].update(plugin.breadcrumbs_map)
## Let's build global menu. Each blueprint can plug its own menu items.
if plugin.menubuilder:
_app.config['menubuilder_map'].update((m[0],
Menu(*m)) for m in plugin.menubuilder)
_app.config['menubuilder_map'].update(plugin.menubuilder_map)
_app.config['menubuilder_map'].update({
'main.admin': Menu('main.admin', _('Administration'),
'help.admin', 9998, [],
lambda: current_user.is_admin),
'main.help': Menu('main.help', _('Help'), 'help', 9999)})
menu = {'main': Menu('main', '', ''),
'personalize': Menu('personalize', '', '')}
for key, item in _app.config['menubuilder_map'].iteritems():
start = menu
if '.' not in key:
if key in menu:
menu[key] = item.children.update(menu[key].children)
else:
menu[key] = item
continue
keys = key.split('.')
for k in keys[:-1]:
try:
start = start[k].children
except:
start[k] = Menu()
start = start[k].children
if keys[-1] in start:
item.children.update(start[keys[-1]].children)
start[keys[-1]] = item
_app.config['menubuilder_map'] = menu
# Flask-Admin
from invenio.adminutils import register_admin
register_admin(_app)
try:
## When deploying Invenio, one can prepare a module called
## webinterface_handler_local.py to be deployed under
## CFG_PYLIBDIR/invenio directory, and containing a function called
## customize_app which should accept a Flask application.
## This function has a chance to modify the application as needed
## including changing the URL routing map.
# pylint: disable=E0611
from invenio.webinterface_handler_local import customize_app
# pylint: enable=E0611
customize_app(_app)
except ImportError:
## No customization needed.
pass
return _app
def with_app_context(app=None, new_context=False, **kwargs_config):
"""Run function within application context"""
def get_application():
if app is None:
application = create_invenio_flask_app(**kwargs_config)
elif not isinstance(app, Flask) and callable(app):
application = app(**kwargs_config)
return application
def decorator(f):
@wraps(f)
def decorated_func(*args, **kwargs):
"""This function has to run within application context."""
if not has_app_context() or new_context:
with get_application().app_context():
current_app.preprocess_request()
result = f(*args, **kwargs)
else:
result = f(*args, **kwargs)
return result
return decorated_func
return decorator
diff --git a/requirements-flask-ext.txt b/requirements-flask-ext.txt
index 960f3bb30..c16b43c33 100644
--- a/requirements-flask-ext.txt
+++ b/requirements-flask-ext.txt
@@ -1,9 +1,10 @@
Flask-Assets==0.7
Flask-Cache==0.11.1
Flask-DebugToolbar==0.7.1
+Flask-Email==1.4.4
Flask-Gravatar==0.2.3
Flask-Script==0.5.3
Flask-SQLAlchemy==0.16
Flask-Testing==0.4
Flask-WTF==0.8
-Flask-Admin==1.0.6
\ No newline at end of file
+Flask-Admin==1.0.6