Page MenuHomec4science

dbdump.py
No OneTemporary

File Metadata

Created
Wed, Jul 17, 10:12

dbdump.py

# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2009, 2010, 2011, 2012, 2014, 2015 CERN.
#
# Invenio is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
"""
Invenio DB dumper.
"""
import os
import re
import time
from invenio.config import CFG_LOGDIR, CFG_PATH_MYSQL, CFG_PATH_GZIP, \
CFG_DATABASE_HOST, \
CFG_DATABASE_USER, \
CFG_DATABASE_PASS, \
CFG_DATABASE_NAME, \
CFG_DATABASE_PORT, \
CFG_DATABASE_SLAVE
from invenio.legacy.dbquery import get_connection_for_dump_on_slave, run_sql, \
get_table_names as db_get_table_names
from invenio.legacy.bibsched.bibtask import task_init, \
write_message, \
task_set_option, \
task_get_option, \
task_update_progress, \
task_get_task_param, \
task_low_level_submission
from invenio.utils.shell import run_shell_command, escape_shell_arg
from invenio.celery.utils import (
enable_queue,
get_queues,
suspend_queues,
)
def get_table_names(value):
"""
Get table names of the tables matching the given regular expressions
@param option: list of regular expressions
@return: list of strings
"""
rex = re.compile(value)
return [row[0] for row in db_get_table_names() if rex.search(row[0])]
def _delete_old_dumps(dirname, filename, number_to_keep):
"""
Look for files in DIRNAME directory starting with FILENAME
pattern. Delete up to NUMBER_TO_KEEP files (when sorted
alphabetically, which is equal to sorted by date). Useful to
prune old dump files.
"""
files = [x for x in os.listdir(dirname) if x.startswith(filename)]
files.sort()
for afile in files[:-number_to_keep]:
write_message("... deleting %s" % dirname + os.sep + afile)
os.remove(dirname + os.sep + afile)
def check_slave_is_up(connection=None):
"""Raise an StandardError in case the slave is not correctly up."""
if connection is None:
connection = get_connection_for_dump_on_slave()
# FIXME compatibility with postgresql
res = run_sql("SHOW SLAVE STATUS", with_dict=True, connection=connection)
if res[0]['Slave_IO_Running'] != 'Yes':
raise StandardError("Slave_IO_Running is not set to 'Yes'")
if res[0]['Slave_SQL_Running'] != 'Yes':
raise StandardError("Slave_SQL_Running is not set to 'Yes'")
def check_slave_is_down(connection=None):
"""Raise an StandardError in case the slave is not correctly down."""
if connection is None:
connection = get_connection_for_dump_on_slave()
# FIXME compatibility with postgresql
res = run_sql("SHOW SLAVE STATUS", with_dict=True, connection=connection)
if res[0]['Slave_SQL_Running'] != 'No':
raise StandardError("Slave_SQL_Running is not set to 'No'")
def detach_slave(connection=None):
"""Detach the slave."""
if connection is None:
connection = get_connection_for_dump_on_slave()
# FIXME compatibility with postgresql
run_sql("STOP SLAVE SQL_THREAD", connection=connection)
check_slave_is_down(connection)
def attach_slave(connection=None):
"""Attach the slave."""
if connection is None:
connection = get_connection_for_dump_on_slave()
# FIXME compatibility with postgresql
run_sql("START SLAVE", connection=connection)
check_slave_is_up(connection)
def check_slave_is_in_consistent_state(connection=None):
"""
Check if the slave is already aware that dbdump task is running.
dbdump being a monotask, guarantee that no other task is currently
running and it's hence safe to detach the slave and start the
actual dump.
"""
if connection is None:
connection = get_connection_for_dump_on_slave()
i = 0
## Let's take the current status of dbdump (e.g. RUNNING, ABOUT TO STOP, etc.)...
current_status = run_sql("""SELECT status FROM "schTASK" WHERE id=%s""", (task_get_task_param('task_id'), ))[0][0]
while True:
if i == 10:
## Timeout!!
raise StandardError("The slave seems not to pick up with the master")
## ...and let's see if it matches with what the slave sees.
if run_sql("""SELECT status FROM "schTASK" WHERE id=%s AND status=%s""", (task_get_task_param('task_id'), current_status), connection=connection):
## Bingo!
return
time.sleep(3)
i += 1
def dump_database(dump_path, host=CFG_DATABASE_HOST, port=CFG_DATABASE_PORT, \
user=CFG_DATABASE_USER, passw=CFG_DATABASE_PASS, \
name=CFG_DATABASE_NAME, params=None, compress=False, \
ignore_tables=None):
"""
Dump Invenio database into SQL file located at DUMP_PATH.
Will perform the command to mysqldump with the given host configuration
and user credentials.
Optional mysqldump parameters can also be passed. Otherwise, a default
set of parameters will be used.
@param dump_path: path on the filesystem to save the dump to.
@type dump_path: string
@param host: hostname of mysql database node to connect to.
@type host: string
@param port: port of mysql database node to connect to
@type port: string
@param user: username to connect with
@type user: string
@param passw: password to connect to with
@type passw: string
@param name: name of mysql database node to dump
@type name: string
@param params: command line parameters to pass to mysqldump. Optional.
@type params: string
@param compress: should the dump be compressed through gzip?
@type compress: bool
@param ignore_tables: list of tables to ignore in the dump
@type ignore: list of string
"""
write_message("... writing %s" % (dump_path,))
partial_dump_path = dump_path + ".part"
# Is mysqldump installed or in the right path?
cmd_prefix = CFG_PATH_MYSQL + 'dump'
if not os.path.exists(cmd_prefix):
raise StandardError("%s is not installed." % (cmd_prefix))
if not params:
# No parameters set, lets use the default ones.
params = " --skip-opt --add-drop-table --add-locks --create-options" \
" --quick --extended-insert --set-charset --disable-keys" \
" --lock-tables=false --max_allowed_packet=2G "
if ignore_tables:
params += " ".join([escape_shell_arg("--ignore-table=%s.%s" % (CFG_DATABASE_NAME, table)) for table in ignore_tables])
dump_cmd = "%s %s " \
" --host=%s --port=%s --user=%s --password=%s %s" % \
(cmd_prefix, \
params, \
escape_shell_arg(host), \
escape_shell_arg(str(port)), \
escape_shell_arg(user), \
escape_shell_arg(passw), \
escape_shell_arg(name))
if compress:
dump_cmd = "%s | %s -cf; exit ${PIPESTATUS[0]}" % \
(dump_cmd, \
CFG_PATH_GZIP)
dump_cmd = "bash -c %s" % (escape_shell_arg(dump_cmd),)
write_message(dump_cmd, verbose=2)
exit_code, stdout, stderr = run_shell_command(dump_cmd, None, partial_dump_path)
if exit_code:
raise StandardError("ERROR: mysqldump exit code is %s. stderr: %s stdout: %s" % \
(repr(exit_code), \
repr(stderr), \
repr(stdout)))
else:
os.rename(partial_dump_path, dump_path)
write_message("... completed writing %s" % (dump_path,))
def _dbdump_elaborate_submit_param(key, value, dummyopts, dummyargs):
"""
Elaborate task submission parameter. See bibtask's
task_submit_elaborate_specific_parameter_fnc for help.
"""
if key in ('-n', '--number'):
try:
task_set_option('number', int(value))
except ValueError:
raise StandardError("ERROR: Number '%s' is not integer." % (value,))
elif key in ('-o', '--output'):
if os.path.isdir(value):
task_set_option('output', value)
else:
raise StandardError("ERROR: Output '%s' is not a directory." % \
(value,))
elif key in ('--params',):
task_set_option('params', value)
elif key in ('--compress',):
if not CFG_PATH_GZIP or (CFG_PATH_GZIP and not os.path.exists(CFG_PATH_GZIP)):
raise StandardError("ERROR: No valid gzip path is defined.")
task_set_option('compress', True)
elif key in ('-S', '--slave'):
if value:
task_set_option('slave', value)
else:
if not CFG_DATABASE_SLAVE:
raise StandardError("ERROR: No slave defined.")
task_set_option('slave', CFG_DATABASE_SLAVE)
elif key in ('--dump-on-slave-helper', ):
task_set_option('dump_on_slave_helper_mode', True)
elif key in ('--ignore-tables',):
try:
re.compile(value)
task_set_option("ignore_tables", value)
except re.error:
raise StandardError, "ERROR: Passed string: '%s' is not a valid regular expression." % value
elif key in ('--disable-workers', ):
task_set_option('disable_workers', True)
else:
return False
return True
def _dbdump_run_task_core():
"""
Run DB dumper core stuff.
Note: do not use task_can_sleep() stuff here because we don't want
other tasks to interrupt us while we are dumping the DB content.
"""
# read params:
host = CFG_DATABASE_HOST
port = CFG_DATABASE_PORT
connection = None
active_queues = []
try:
if task_get_option('slave') and not task_get_option('dump_on_slave_helper_mode'):
connection = get_connection_for_dump_on_slave()
write_message("Dump on slave requested")
write_message("... checking if slave is well up...")
check_slave_is_up(connection)
write_message("... checking if slave is in consistent state...")
check_slave_is_in_consistent_state(connection)
write_message("... detaching slave database...")
detach_slave(connection)
write_message("... scheduling dump on slave helper...")
helper_arguments = []
if task_get_option("number"):
helper_arguments += ["--number", str(task_get_option("number"))]
if task_get_option("output"):
helper_arguments += ["--output", str(task_get_option("output"))]
if task_get_option("params"):
helper_arguments += ["--params", str(task_get_option("params"))]
if task_get_option("ignore_tables"):
helper_arguments += ["--ignore-tables", str(task_get_option("ignore_tables"))]
if task_get_option("compress"):
helper_arguments += ["--compress"]
if task_get_option("slave"):
helper_arguments += ["--slave", str(task_get_option("slave"))]
helper_arguments += ['-N', 'slavehelper', '--dump-on-slave-helper']
task_id = task_low_level_submission('dbdump', task_get_task_param('user'), '-P4', *helper_arguments)
write_message("Slave scheduled with ID %s" % task_id)
task_update_progress("DONE")
return True
elif task_get_option('dump_on_slave_helper_mode'):
write_message("Dumping on slave mode")
connection = get_connection_for_dump_on_slave()
write_message("... checking if slave is well down...")
check_slave_is_down(connection)
host = CFG_DATABASE_SLAVE
task_update_progress("Reading parameters")
write_message("Reading parameters started")
output_dir = task_get_option('output', CFG_LOGDIR)
output_num = task_get_option('number', 5)
params = task_get_option('params', None)
compress = task_get_option('compress', False)
slave = task_get_option('slave', False)
ignore_tables = task_get_option('ignore_tables', None)
if ignore_tables:
ignore_tables = get_table_names(ignore_tables)
else:
ignore_tables = None
output_file_suffix = task_get_task_param('task_starting_time')
output_file_suffix = output_file_suffix.replace(' ', '_') + '.sql'
if compress:
output_file_suffix = "%s.gz" % (output_file_suffix,)
write_message("Reading parameters ended")
if task_get_option('disable_workers'):
active_queues = get_queues()
if active_queues:
write_message("Suspend workers and wait for any running tasks to complete")
suspend_queues(active_queues)
write_message("Workers suspended")
# make dump:
task_update_progress("Dumping database")
write_message("Database dump started")
if slave:
output_file_prefix = 'slave-%s-dbdump-' % (CFG_DATABASE_NAME,)
else:
output_file_prefix = '%s-dbdump-' % (CFG_DATABASE_NAME,)
output_file = output_file_prefix + output_file_suffix
dump_path = output_dir + os.sep + output_file
dump_database(dump_path, \
host=host,
port=port,
params=params, \
compress=compress, \
ignore_tables=ignore_tables)
write_message("Database dump ended")
finally:
for queue in active_queues:
enable_queue(queue)
if connection and task_get_option('dump_on_slave_helper_mode'):
write_message("Reattaching slave")
attach_slave(connection)
# prune old dump files:
task_update_progress("Pruning old dump files")
write_message("Pruning old dump files started")
_delete_old_dumps(output_dir, output_file_prefix, output_num)
write_message("Pruning old dump files ended")
# we are done:
task_update_progress("Done.")
return True
def main():
"""Main that construct all the bibtask."""
task_init(authorization_action='rundbdump',
authorization_msg="DB Dump Task Submission",
help_specific_usage="""\
-o, --output=DIR Output directory. [default=%s]
-n, --number=NUM Keep up to NUM previous dump files. [default=5]
--params=PARAMS Specify your own mysqldump parameters. Optional.
--compress Compress dump directly into gzip.
-S, --slave=HOST Perform the dump from a slave, if no host use CFG_DATABASE_SLAVE.
--ignore-tables=regex Ignore tables matching the given regular expression
--disable-workers Disable any task queue workers while dumping.
Examples:
$ dbdump --ignore-tables '^(idx|rnk)'
$ dbdump -n3 -o/tmp -s1d -L 02:00-04:00
""" % CFG_LOGDIR,
specific_params=("n:o:p:S:",
["number=", "output=", "params=", "slave=",
"compress", 'ignore-tables=',
"dump-on-slave-helper", "disable-workers"]),
task_submit_elaborate_specific_parameter_fnc=_dbdump_elaborate_submit_param,
task_run_fnc=_dbdump_run_task_core)
if __name__ == '__main__':
main()

Event Timeline