Page MenuHomec4science

alert_engine.py
No OneTemporary

File Metadata

Created
Tue, Jun 11, 21:41

alert_engine.py

## $Id$
## Alert engine implementation.
## This file is part of the CERN Document Server Software (CDSware).
## Copyright (C) 2002 CERN.
##
## The CDSware is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## The CDSware is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with CDSware; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
##read config variables
#include "config.wml"
#include "configbis.wml"
<protect>## $Id$ </protect>
<protect>## DO NOT EDIT THIS FILE! IT WAS AUTOMATICALLY GENERATED FROM CDSware WML SOURCES.</protect>
"""Alert engine implementation."""
## rest of the Python code goes below
__version__ = "$Id$"
try:
from cgi import parse_qs
from re import search
from time import localtime, strftime, mktime
import smtplib
from config import *
from search_engine import perform_request_search
from dbquery import run_sql
from htmlparser import *
except ImportError, e:
print "Error: %s" % e
import sys
sys.exit(1)
MAXIDS = 50
FROMADDR = 'CDS Alert Service <%s>' % supportemail
ALERTURL = weburl + '/youralerts.py/list'
# 0 = production, nothing on the console, email sent
# 1 = messages on the console, email sent
# 2 = messages on the console, but no email sent
# 3 = many messages on the console, no email sent
DEBUGLEVEL = 3
def update_date_lastrun(alert):
return run_sql('update user_query_basket set date_lastrun=%s where id_user=%s and id_query=%s and id_basket=%s;', (strftime("%Y-%m-%d"), alert[0], alert[1], alert[2],))
def get_alert_queries(frequency):
return run_sql('select distinct id, urlargs from query q, user_query_basket uqb where q.id=uqb.id_query and uqb.frequency=%s and uqb.date_lastrun <= now();', (frequency,))
def get_alerts(query, frequency):
r = run_sql('select id_user, id_query, id_basket, frequency, date_lastrun, alert_name, notification from user_query_basket where id_query=%s and frequency=%s;', (query['id_query'], frequency,))
return {'alerts': r, 'records': query['records'], 'argstr': query['argstr']}
def add_record_to_basket(record_id, basket_id):
try:
return run_sql('insert into basket_record (id_basket, id_record) values(%s, %s);', (basket_id, record_id,))
except:
return 0
def add_records_to_basket(record_ids, basket_id):
# TBD: generate the list and all all records in one step
for i in record_ids:
add_record_to_basket(i, basket_id)
def get_email(uid):
r = run_sql('select email from user where id=%s', (uid,))
return r[0][0]
def get_query(alert_id):
r = run_sql('select urlargs from query where id=%s', (alert_id,))
return r[0][0]
def send_email(fromaddr, toaddr, body):
server = smtplib.SMTP('smtp.cern.ch')
server.set_debuglevel(0)
server.sendmail(fromaddr, toaddr, body)
server.quit()
def forge_email(fromaddr, toaddr, subject, content):
body = 'From: %s\nTo: %s\nSubject: %s\n%s' % (fromaddr, toaddr, subject, content)
return body
def format_frequency(freq):
frequency = freq
if frequency == "day":
return 'daily'
else:
return frequency + 'ly'
def print_records(record_ids):
global MAXIDS
msg = ''
c = 1
for i in record_ids:
if c > MAXIDS:
break
msg += '\n\n%s) %s' % (c, get_as_text(i))
c += 1
if c > MAXIDS:
msg += '\n\n' + 'Only the first %s records are displayed above. Please consult your basket to see all the results.' % MAXIDS
return msg
def email_notify(alert, records, argstr):
global FROMADDR
global ALERTURL
global DEBUGLEVEL
if len(records) == 0:
return
msg = "\nHello\n\nBelow are the results of the email alert that you set up with the CERN Document Server:\n"
email = get_email(alert[0])
url = weburl + "/search.py?" + argstr
pattern = get_pattern(argstr)
catalogue = get_catalogue(argstr)
msg += '\nalert name : \'%s\'' % alert[5]
msg += '\npattern : \'%s\'' % pattern
msg += '\ncatalogue(s) : %s' % catalogue
msg += '\nfrequency : %s ' % format_frequency(alert[3])
msg += '\nrun time : %s ' % strftime("%c")
msg += print_records(records)
msg += "\n\nThe search URL for this alert is <%s>\n\n" % url
msg += "To modify your alerts: <%s>" % ALERTURL
msg += "\n\n-- \nCERN Document Server Alert Service <%s>\nEmail: <%s>" % (weburl, supportemail)
subject = 'Search for \'%s\' in %s' % (pattern, catalogue)
body = forge_email(FROMADDR, email, subject, msg)
if DEBUGLEVEL > 0:
print "********************************************************************************"
print body
print "********************************************************************************"
if DEBUGLEVEL < 2:
send_email(FROMADDR, email, body)
def get_argument(args, argname):
if args.has_key(argname):
return args[argname]
else:
return []
def get_record_ids(argstr, date_from, date_until):
args = parse_qs(argstr)
p = get_argument(args, 'p')
c = get_argument(args, 'c')
cc = get_argument(args, 'cc')
as = get_argument(args, 'as')
f = get_argument(args, 'f')
rg = get_argument(args, 'rg')
so = get_argument(args, 'so')
sp = get_argument(args, 'sp')
ot = get_argument(args, 'ot')
as = get_argument(args, 'as')
p1 = get_argument(args, 'p1')
f1 = get_argument(args, 'f1')
m1 = get_argument(args, 'm1')
op1 = get_argument(args, 'op1')
p2 = get_argument(args, 'p2')
f2 = get_argument(args, 'f2')
m2 = get_argument(args, 'm2')
op2 = get_argument(args, 'op2')
p3 = get_argument(args, 'p3')
f3 = get_argument(args, 'f3')
m3 = get_argument(args, 'm3')
sc = get_argument(args, 'sc')
search = get_argument(args, 'search')
d1y, d1m, d1d = date_from
d2y, d2m, d2d = date_until
return perform_request_search(of='id', p=p, c=c, cc=cc, f=f, so=so, sp=sp, ot=ot, as=as, p1=p1, f1=f1, m1=m1, op1=op1, p2=p2, f2=f2, m2=m2, op2=op2, p3=p3, f3=f3, m3=m3, sc=sc, search=search, d1y=d1y, d1m=d1m, d1d=d1d, d2y=d2y, d2m=d2m, d2d=d2d)
# return [7, 9, 13, 15, 20]
def get_argument_as_string(argstr, argname):
args = parse_qs(argstr)
a = get_argument(args, argname)
r = ''
if len(a):
r = a[0]
for i in a[1:len(a)]:
r += ", %s" % i
return r
def get_pattern(argstr):
return get_argument_as_string(argstr, 'p')
def get_catalogue(argstr):
return get_argument_as_string(argstr, 'c')
def get_date_from(time, freq):
t = mktime(time)
if freq == 'day':
time2 = localtime(t - 86400)
elif freq == 'month':
m = time[1] - 1
y = time[0]
if m == 0:
m = 12
y -= 1
time2 = (y, m, time[2], time[3], time[4], time[5], time[6], time[7], time[8])
elif freq == 'week':
time2 = localtime(t - 604800)
ystr = strftime("%Y", time2)
mstr = strftime("%m", time2)
dstr = strftime("%d", time2)
return (ystr, mstr, dstr)
def run_query(query, frequency):
"""Return a dictionary containing the information of the performed query.
The information contains the id of the query, the arguments as a
string, and the list of found records."""
time = localtime()
# Override time here for testing purposes (beware of localtime offset):
#time = (2003, 1, 10, 2, 0, 0, 2, 120, 1) # ('2003', '01', '10')
# Override frequency here for testing
#frequency = 'week'
ystr = strftime("%Y", time)
mstr = strftime("%m", time)
dstr = strftime("%d", time)
date_until = (ystr, mstr, dstr)
date_from = get_date_from(time, frequency)
recs = get_record_ids(query[1], date_from, date_until)
if DEBUGLEVEL > 2:
print "[%s] run query: %s with dates: from=%s, until=%s\n found rec ids: %s" % (strftime("%c"), query, date_from, date_until, recs)
return {'id_query': query[0], 'argstr': query[1], 'records': recs, 'date_from': date_from, 'date_until': date_until}
def process_alert_queries(frequency):
"""Run the alerts according to the frequency.
Retrieves the queries for which an alert exists, performs it, and
processes the corresponding alerts."""
alert_queries = get_alert_queries(frequency)
for aq in alert_queries:
q = run_query(aq, frequency)
alerts = get_alerts(q, frequency)
process_alerts(alerts)
def process_alerts(alerts):
# TBD: do not generate the email each time, forge it once and then
# send it to all appropriate people
for a in alerts['alerts']:
if alert_use_basket_p(a):
add_records_to_basket(alerts['records'], a[2])
if alert_use_notification_p(a):
email_notify(a, alerts['records'], alerts['argstr'])
update_date_lastrun(a)
def alert_use_basket_p(alert):
return alert[2] != 0
def alert_use_notification_p(alert):
return alert[6] == 'y'
def run_alerts():
"""Run the alerts.
First decide which alerts to run according to the current local
time, and runs them."""
t = localtime()
if t[2] == 1: # first of the month
process_alert_queries('month')
t = strftime("%A")
if t == 'Monday': # first day of the week
process_alert_queries('week')
process_alert_queries('day')
if __name__ == '__main__':
run_alerts()

Event Timeline