|
- /*
- Copyright (c) 2015, 2021, Oracle and/or its affiliates.
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License, version 2.0,
- as published by the Free Software Foundation.
-
- This program is also distributed with certain software (including
- but not limited to OpenSSL) that is licensed under separate terms,
- as designated in a particular file or component or in included license
- documentation. The authors of MySQL hereby grant you an additional
- permission to link the program and your derivative works with the
- separately licensed software that they have included with MySQL.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License, version 2.0, for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- */
-
- #include "mysqldump_tool_chain_maker.h"
- #include "i_output_writer.h"
- #include "file_writer.h"
- #include "standard_writer.h"
- #include "compression_lz4_writer.h"
- #include "compression_zlib_writer.h"
- #include "sql_formatter.h"
- #include "mysqldump_tool_chain_maker_options.h"
- #include <boost/algorithm/string.hpp>
- #include "m_ctype.h"
-
- using namespace Mysql::Tools::Dump;
-
- void Mysqldump_tool_chain_maker::delete_chain(
- uint64 chain_id, I_object_reader* chain)
- {}
-
- I_object_reader* Mysqldump_tool_chain_maker::create_chain(
- Chain_data* chain_data, I_dump_task* dump_task)
- {
- Table_rows_dump_task* rows_task= dynamic_cast<Table_rows_dump_task*>(
- dump_task);
- if (rows_task != NULL
- && (m_options->m_skip_rows_data
- || rows_task->get_related_table()->get_type() == "FEDERATED"
- || rows_task->get_related_table()->get_type() == "MRG_ISAM"
- || !this->compare_no_case_latin_with_db_string(
- "MRG_MyISAM", rows_task->get_related_table()->get_type())))
- {
- return NULL;
- }
-
- if (!m_options->is_object_included_in_dump(
- dynamic_cast<Abstract_data_object*>(
- dump_task->get_related_db_object())))
- {
- return NULL;
- }
-
- if (m_main_object_reader == NULL)
- {
- I_output_writer* writer;
- if (m_options->m_result_file.has_value())
- writer= new File_writer(
- this->get_message_handler(), this->get_object_id_generator(),
- m_options->m_result_file.value());
- else
- writer= new Standard_writer(
- this->get_message_handler(), this->get_object_id_generator());
- m_all_created_elements.push_back(writer);
- if (m_options->m_compress_output_algorithm.has_value())
- {
- std::string algorithm_name=
- m_options->m_compress_output_algorithm.value();
- boost::to_lower(algorithm_name);
-
- Abstract_output_writer_wrapper* compression_writer_as_wrapper= NULL;
- I_output_writer* compression_writer_as_writer= NULL;
- if (algorithm_name == "lz4")
- {
- Compression_lz4_writer* compression_writer=
- new Compression_lz4_writer(
- this->get_message_handler(), this->get_object_id_generator());
- compression_writer_as_wrapper= compression_writer;
- compression_writer_as_writer= compression_writer;
- }
- else if (algorithm_name =="zlib")
- {
- Compression_zlib_writer* compression_writer=
- new Compression_zlib_writer(
- this->get_message_handler(), this->get_object_id_generator(),
- Z_DEFAULT_COMPRESSION);
- compression_writer_as_wrapper= compression_writer;
- compression_writer_as_writer= compression_writer;
- }
- else
- this->pass_message(Mysql::Tools::Base::Message_data(
- 0, "Unknown compression method: " + algorithm_name,
- Mysql::Tools::Base::Message_type_error));
-
- compression_writer_as_wrapper->register_output_writer(writer);
- writer= compression_writer_as_writer;
- m_all_created_elements.push_back(writer);
- }
- Sql_formatter* formatter= new Sql_formatter(
- this->get_connection_provider(),
- this->get_message_handler(), this->get_object_id_generator(),
- m_options,
- m_options->m_formatter_options);
- this->register_progress_watchers_in_child(formatter);
- formatter->register_output_writer(writer);
- m_all_created_elements.push_back(formatter);
-
- m_main_object_reader= new Mysql_object_reader(
- this->get_connection_provider(),
- this->get_message_handler(), this->get_object_id_generator(),
- m_options->m_object_reader_options);
- this->register_progress_watchers_in_child(m_main_object_reader);
- m_main_object_reader->register_data_formatter(formatter);
- m_all_created_elements.push_back(m_main_object_reader);
- }
- /*
- run as a single process only if default parallelism is set to 0 and
- parallel schemas is not set
- */
- if (m_options->m_default_parallelism == 0 &&
- m_options->get_parallel_schemas_thread_count() == 0)
- {
- return m_main_object_reader;
- }
- Abstract_data_object* data_object= dynamic_cast<Abstract_data_object*>(
- dump_task->get_related_db_object());
-
- int object_queue_id= (data_object != NULL)
- ? (m_options->get_object_queue_id_for_schema(data_object->get_schema()))
- : 0;
- std::map<int, Object_queue*>::iterator it=
- m_object_queues.find(object_queue_id);
- if (it != m_object_queues.end())
- {
- return it->second;
- }
- Object_queue* queue= new Object_queue(
- this->get_message_handler(), this->get_object_id_generator(),
- m_options->get_object_queue_threads_count(object_queue_id),
- new Mysql::Instance_callback<void, bool, Mysqldump_tool_chain_maker>(
- this, &Mysqldump_tool_chain_maker::mysql_thread_callback), m_program);
- this->register_progress_watchers_in_child(queue);
- queue->register_object_reader(m_main_object_reader);
- m_all_created_elements.push_back(queue);
- m_object_queues.insert(std::make_pair(object_queue_id, queue));
- return queue;
- }
-
- void Mysqldump_tool_chain_maker::stop_queues()
- {
- std::map<int, Object_queue*>::const_iterator iter;
- for (iter = m_object_queues.begin(); iter != m_object_queues.end(); iter++)
- {
- iter->second->stop_queue();
- }
- }
-
- void Mysqldump_tool_chain_maker::mysql_thread_callback(bool is_starting)
- {
- if (is_starting)
- mysql_thread_init();
- else
- mysql_thread_end();
- }
-
- Mysqldump_tool_chain_maker::~Mysqldump_tool_chain_maker()
- {
- for (std::vector<I_chain_element*>::reverse_iterator it=
- m_all_created_elements.rbegin(); it != m_all_created_elements.rend();
- ++it)
- {
- delete *it;
- }
- }
-
- Mysqldump_tool_chain_maker::Mysqldump_tool_chain_maker(
- I_connection_provider* connection_provider,
- Mysql::I_callable<bool, const Mysql::Tools::Base::Message_data&>*
- message_handler, Simple_id_generator* object_id_generator,
- Mysqldump_tool_chain_maker_options* options,
- Mysql::Tools::Base::Abstract_program* program)
- : Abstract_chain_element(message_handler, object_id_generator),
- Abstract_mysql_chain_element_extension(
- connection_provider, message_handler,
- options->m_mysql_chain_element_options),
- m_options(options),
- m_main_object_reader(NULL),
- m_program(program)
- {}
|