m_mysql.cpp

Go to the documentation of this file.
00001 /* RequiredLibraries: mysqlclient */
00002 
00003 #include "module.h"
00004 #define NO_CLIENT_LONG_LONG
00005 #include <mysql/mysql.h>
00006 #include "sql.h"
00007 
00008 using namespace SQL;
00009 
00020 class MySQLService;
00021 
00024 struct QueryRequest
00025 {
00026         /* The connection to the database */
00027         MySQLService *service;
00028         /* The interface to use once we have the result to send the data back */
00029         Interface *sqlinterface;
00030         /* The actual query */
00031         Query query;
00032 
00033         QueryRequest(MySQLService *s, Interface *i, const Query &q) : service(s), sqlinterface(i), query(q) { }
00034 };
00035 
00037 struct QueryResult
00038 {
00039         /* The interface to send the data back on */
00040         Interface *sqlinterface;
00041         /* The result */
00042         Result result;
00043 
00044         QueryResult(Interface *i, Result &r) : sqlinterface(i), result(r) { }
00045 };
00046 
00049 class MySQLResult : public Result
00050 {
00051         MYSQL_RES *res;
00052 
00053  public:
00054         MySQLResult(unsigned int i, const Query &q, const Anope::string &fq, MYSQL_RES *r) : Result(i, q, fq), res(r)
00055         {
00056                 unsigned num_fields = res ? mysql_num_fields(res) : 0;
00057 
00058                 /* It is not thread safe to log anything here using Log(this->owner) now :( */
00059 
00060                 if (!num_fields)
00061                         return;
00062 
00063                 for (MYSQL_ROW row; (row = mysql_fetch_row(res));)
00064                 {
00065                         MYSQL_FIELD *fields = mysql_fetch_fields(res);
00066 
00067                         if (fields)
00068                         {
00069                                 std::map<Anope::string, Anope::string> items;
00070 
00071                                 for (unsigned field_count = 0; field_count < num_fields; ++field_count)
00072                                 {
00073                                         Anope::string column = (fields[field_count].name ? fields[field_count].name : "");
00074                                         Anope::string data = (row[field_count] ? row[field_count] : "");
00075 
00076                                         items[column] = data;
00077                                 }
00078 
00079                                 this->entries.push_back(items);
00080                         }
00081                 }
00082         }
00083 
00084         MySQLResult(const Query &q, const Anope::string &fq, const Anope::string &err) : Result(0, q, fq, err), res(NULL)
00085         {
00086         }
00087 
00088         ~MySQLResult()
00089         {
00090                 if (this->res)
00091                         mysql_free_result(this->res);
00092         }
00093 };
00094 
00097 class MySQLService : public Provider
00098 {
00099         std::map<Anope::string, std::set<Anope::string> > active_schema;
00100 
00101         Anope::string database;
00102         Anope::string server;
00103         Anope::string user;
00104         Anope::string password;
00105         int port;
00106 
00107         MYSQL *sql;
00108 
00112         Anope::string Escape(const Anope::string &query);
00113 
00114  public:
00115         /* Locked by the SQL thread when a query is pending on this database,
00116          * prevents us from deleting a connection while a query is executing
00117          * in the thread
00118          */
00119         Mutex Lock;
00120 
00121         MySQLService(Module *o, const Anope::string &n, const Anope::string &d, const Anope::string &s, const Anope::string &u, const Anope::string &p, int po);
00122 
00123         ~MySQLService();
00124 
00125         void Run(Interface *i, const Query &query) anope_override;
00126 
00127         Result RunQuery(const Query &query) anope_override;
00128 
00129         std::vector<Query> CreateTable(const Anope::string &table, const Data &data) anope_override;
00130 
00131         Query BuildInsert(const Anope::string &table, unsigned int id, Data &data) anope_override;
00132 
00133         Query GetTables(const Anope::string &prefix) anope_override;
00134 
00135         void Connect();
00136 
00137         bool CheckConnection();
00138 
00139         Anope::string BuildQuery(const Query &q);
00140 
00141         Anope::string FromUnixtime(time_t);
00142 };
00143 
00146 class DispatcherThread : public Thread, public Condition
00147 {
00148  public:
00149         DispatcherThread() : Thread() { }
00150 
00151         void Run() anope_override;
00152 };
00153 
00154 class ModuleSQL;
00155 static ModuleSQL *me;
00156 class ModuleSQL : public Module, public Pipe
00157 {
00158         /* SQL connections */
00159         std::map<Anope::string, MySQLService *> MySQLServices;
00160  public:
00161         /* Pending query requests */
00162         std::deque<QueryRequest> QueryRequests;
00163         /* Pending finished requests with results */
00164         std::deque<QueryResult> FinishedRequests;
00165         /* The thread used to execute queries */
00166         DispatcherThread *DThread;
00167 
00168         ModuleSQL(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, SUPPORTED)
00169         {
00170                 me = this;
00171 
00172                 Implementation i[] = { I_OnReload, I_OnModuleUnload };
00173                 ModuleManager::Attach(i, this,  2);
00174 
00175                 DThread = new DispatcherThread();
00176                 DThread->Start();
00177 
00178                 OnReload();
00179         }
00180 
00181         ~ModuleSQL()
00182         {
00183                 for (std::map<Anope::string, MySQLService *>::iterator it = this->MySQLServices.begin(); it != this->MySQLServices.end(); ++it)
00184                         delete it->second;
00185                 MySQLServices.clear();
00186 
00187                 DThread->SetExitState();
00188                 DThread->Wakeup();
00189                 DThread->Join();
00190                 delete DThread;
00191         }
00192 
00193         void OnReload() anope_override
00194         {
00195                 ConfigReader config;
00196                 int i, num;
00197 
00198                 for (std::map<Anope::string, MySQLService *>::iterator it = this->MySQLServices.begin(); it != this->MySQLServices.end();)
00199                 {
00200                         const Anope::string &cname = it->first;
00201                         MySQLService *s = it->second;
00202                         ++it;
00203 
00204                         for (i = 0, num = config.Enumerate("mysql"); i < num; ++i)
00205                         {
00206                                 if (config.ReadValue("mysql", "name", "main", i) == cname)
00207                                 {
00208                                         break;
00209                                 }
00210                         }
00211 
00212                         if (i == num)
00213                         {
00214                                 Log(LOG_NORMAL, "mysql") << "MySQL: Removing server connection " << cname;
00215 
00216                                 delete s;
00217                                 this->MySQLServices.erase(cname);
00218                         }
00219                 }
00220 
00221                 for (i = 0, num = config.Enumerate("mysql"); i < num; ++i)
00222                 {
00223                         Anope::string connname = config.ReadValue("mysql", "name", "mysql/main", i);
00224 
00225                         if (this->MySQLServices.find(connname) == this->MySQLServices.end())
00226                         {
00227                                 Anope::string database = config.ReadValue("mysql", "database", "anope", i);
00228                                 Anope::string server = config.ReadValue("mysql", "server", "127.0.0.1", i);
00229                                 Anope::string user = config.ReadValue("mysql", "username", "anope", i);
00230                                 Anope::string password = config.ReadValue("mysql", "password", "", i);
00231                                 int port = config.ReadInteger("mysql", "port", "3306", i, true);
00232 
00233                                 try
00234                                 {
00235                                         MySQLService *ss = new MySQLService(this, connname, database, server, user, password, port);
00236                                         this->MySQLServices.insert(std::make_pair(connname, ss));
00237 
00238                                         Log(LOG_NORMAL, "mysql") << "MySQL: Successfully connected to server " << connname << " (" << server << ")";
00239                                 }
00240                                 catch (const SQL::Exception &ex)
00241                                 {
00242                                         Log(LOG_NORMAL, "mysql") << "MySQL: " << ex.GetReason();
00243                                 }
00244                         }
00245                 }
00246         }
00247 
00248         void OnModuleUnload(User *, Module *m) anope_override
00249         {
00250                 this->DThread->Lock();
00251 
00252                 for (unsigned i = this->QueryRequests.size(); i > 0; --i)
00253                 {
00254                         QueryRequest &r = this->QueryRequests[i - 1];
00255 
00256                         if (r.sqlinterface && r.sqlinterface->owner == m)
00257                         {
00258                                 if (i == 1)
00259                                 {
00260                                         r.service->Lock.Lock();
00261                                         r.service->Lock.Unlock();
00262                                 }
00263 
00264                                 this->QueryRequests.erase(this->QueryRequests.begin() + i - 1);
00265                         }
00266                 }
00267 
00268                 this->DThread->Unlock();
00269 
00270                 this->OnNotify();
00271         }
00272 
00273         void OnNotify() anope_override
00274         {
00275                 this->DThread->Lock();
00276                 std::deque<QueryResult> finishedRequests = this->FinishedRequests;
00277                 this->FinishedRequests.clear();
00278                 this->DThread->Unlock();
00279 
00280                 for (std::deque<QueryResult>::const_iterator it = finishedRequests.begin(), it_end = finishedRequests.end(); it != it_end; ++it)
00281                 {
00282                         const QueryResult &qr = *it;
00283 
00284                         if (!qr.sqlinterface)
00285                                 throw SQL::Exception("NULL qr.sqlinterface in MySQLPipe::OnNotify() ?");
00286 
00287                         if (qr.result.GetError().empty())
00288                                 qr.sqlinterface->OnResult(qr.result);
00289                         else
00290                                 qr.sqlinterface->OnError(qr.result);
00291                 }
00292         }
00293 };
00294 
00295 MySQLService::MySQLService(Module *o, const Anope::string &n, const Anope::string &d, const Anope::string &s, const Anope::string &u, const Anope::string &p, int po)
00296 : Provider(o, n), database(d), server(s), user(u), password(p), port(po), sql(NULL)
00297 {
00298         Connect();
00299 }
00300 
00301 MySQLService::~MySQLService()
00302 {
00303         me->DThread->Lock();
00304         this->Lock.Lock();
00305         mysql_close(this->sql);
00306         this->sql = NULL;
00307 
00308         for (unsigned i = me->QueryRequests.size(); i > 0; --i)
00309         {
00310                 QueryRequest &r = me->QueryRequests[i - 1];
00311 
00312                 if (r.service == this)
00313                 {
00314                         if (r.sqlinterface)
00315                                 r.sqlinterface->OnError(Result(0, r.query, "SQL Interface is going away"));
00316                         me->QueryRequests.erase(me->QueryRequests.begin() + i - 1);
00317                 }
00318         }
00319         this->Lock.Unlock();
00320         me->DThread->Unlock();
00321 }
00322 
00323 void MySQLService::Run(Interface *i, const Query &query)
00324 {
00325         me->DThread->Lock();
00326         me->QueryRequests.push_back(QueryRequest(this, i, query));
00327         me->DThread->Unlock();
00328         me->DThread->Wakeup();
00329 }
00330 
00331 Result MySQLService::RunQuery(const Query &query)
00332 {
00333         this->Lock.Lock();
00334 
00335         Anope::string real_query = this->BuildQuery(query);
00336 
00337         if (this->CheckConnection() && !mysql_real_query(this->sql, real_query.c_str(), real_query.length()))
00338         {
00339                 MYSQL_RES *res = mysql_store_result(this->sql);
00340                 unsigned int id = mysql_insert_id(this->sql);
00341 
00342                 this->Lock.Unlock();
00343                 return MySQLResult(id, query, real_query, res);
00344         }
00345         else
00346         {
00347                 Anope::string error = mysql_error(this->sql);
00348                 this->Lock.Unlock();
00349                 return MySQLResult(query, real_query, error);
00350         }
00351 }
00352 
00353 std::vector<Query> MySQLService::CreateTable(const Anope::string &table, const Data &data)
00354 {
00355         std::vector<Query> queries;
00356         std::set<Anope::string> &known_cols = this->active_schema[table];
00357 
00358         if (known_cols.empty())
00359         {
00360                 Log(LOG_DEBUG) << "m_mysql: Fetching columns for " << table;
00361 
00362                 Result columns = this->RunQuery("SHOW COLUMNS FROM `" + table + "`");
00363                 for (int i = 0; i < columns.Rows(); ++i)
00364                 {
00365                         const Anope::string &column = columns.Get(i, "Field");
00366 
00367                         Log(LOG_DEBUG) << "m_mysql: Column #" << i << " for " << table << ": " << column;
00368                         known_cols.insert(column);
00369                 }
00370         }
00371 
00372         if (known_cols.empty())
00373         {
00374                 Anope::string query_text = "CREATE TABLE `" + table + "` (`id` int(10) unsigned NOT NULL AUTO_INCREMENT,"
00375                         " `timestamp` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP";
00376                 for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
00377                 {
00378                         known_cols.insert(it->first);
00379 
00380                         query_text += ", `" + it->first + "` ";
00381                         if (data.GetType(it->first) == Serialize::Data::DT_INT)
00382                                 query_text += "int(11)";
00383                         else
00384                                 query_text += "text";
00385                 }
00386                 query_text += ", PRIMARY KEY (`id`), KEY `timestamp_idx` (`timestamp`))";
00387                 queries.push_back(query_text);
00388         }
00389         else
00390                 for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
00391                 {
00392                         if (known_cols.count(it->first) > 0)
00393                                 continue;
00394 
00395                         known_cols.insert(it->first);
00396 
00397                         Anope::string query_text = "ALTER TABLE `" + table + "` ADD `" + it->first + "` ";
00398                         if (data.GetType(it->first) == Serialize::Data::DT_INT)
00399                                 query_text += "int(11)";
00400                         else
00401                                 query_text += "text";
00402 
00403                         queries.push_back(query_text);
00404                 }
00405 
00406         return queries;
00407 }
00408 
00409 Query MySQLService::BuildInsert(const Anope::string &table, unsigned int id, Data &data)
00410 {
00411         /* Empty columns not present in the data set */
00412         const std::set<Anope::string> &known_cols = this->active_schema[table];
00413         for (std::set<Anope::string>::iterator it = known_cols.begin(), it_end = known_cols.end(); it != it_end; ++it)
00414                 if (*it != "id" && *it != "timestamp" && data.data.count(*it) == 0)
00415                         data[*it] << "";
00416 
00417         Anope::string query_text = "INSERT INTO `" + table + "` (`id`";
00418         for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
00419                 query_text += ",`" + it->first + "`";
00420         query_text += ") VALUES (" + stringify(id);
00421         for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
00422                 query_text += ",@" + it->first + "@";
00423         query_text += ") ON DUPLICATE KEY UPDATE ";
00424         for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
00425                 query_text += "`" + it->first + "`=VALUES(`" + it->first + "`),";
00426         query_text.erase(query_text.end() - 1);
00427 
00428         Query query(query_text);
00429         for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
00430         {
00431                 Anope::string buf;
00432                 *it->second >> buf;
00433                 query.SetValue(it->first, buf);
00434         }
00435         
00436         return query;
00437 }
00438 
00439 Query MySQLService::GetTables(const Anope::string &prefix)
00440 {
00441         return Query("SHOW TABLES LIKE '" + prefix + "%';");
00442 }
00443 
00444 void MySQLService::Connect()
00445 {
00446         this->sql = mysql_init(this->sql);
00447 
00448         const unsigned int timeout = 1;
00449         mysql_options(this->sql, MYSQL_OPT_CONNECT_TIMEOUT, reinterpret_cast<const char *>(&timeout));
00450 
00451         bool connect = mysql_real_connect(this->sql, this->server.c_str(), this->user.c_str(), this->password.c_str(), this->database.c_str(), this->port, NULL, CLIENT_MULTI_RESULTS);
00452 
00453         if (!connect)
00454                 throw SQL::Exception("Unable to connect to MySQL service " + this->name + ": " + mysql_error(this->sql));
00455         
00456         Log(LOG_DEBUG) << "Successfully connected to MySQL service " << this->name << " at " << this->server << ":" << this->port;
00457 }
00458 
00459 
00460 bool MySQLService::CheckConnection()
00461 {
00462         if (!this->sql || mysql_ping(this->sql))
00463         {
00464                 try
00465                 {
00466                         this->Connect();
00467                 }
00468                 catch (const SQL::Exception &)
00469                 {
00470                         return false;
00471                 }
00472         }
00473 
00474         return true;
00475 }
00476 
00477 Anope::string MySQLService::Escape(const Anope::string &query)
00478 {
00479         char buffer[BUFSIZE];
00480         mysql_real_escape_string(this->sql, buffer, query.c_str(), query.length());
00481         return buffer;
00482 }
00483 
00484 Anope::string MySQLService::BuildQuery(const Query &q)
00485 {
00486         Anope::string real_query = q.query;
00487 
00488         for (std::map<Anope::string, QueryData>::const_iterator it = q.parameters.begin(), it_end = q.parameters.end(); it != it_end; ++it)
00489                 real_query = real_query.replace_all_cs("@" + it->first + "@", (it->second.escape ? ("'" + this->Escape(it->second.data) + "'") : it->second.data));
00490 
00491         return real_query;
00492 }
00493 
00494 Anope::string MySQLService::FromUnixtime(time_t t)
00495 {
00496         return "FROM_UNIXTIME(" + stringify(t) + ")";
00497 }
00498 
00499 void DispatcherThread::Run()
00500 {
00501         this->Lock();
00502 
00503         while (!this->GetExitState())
00504         {
00505                 if (!me->QueryRequests.empty())
00506                 {
00507                         QueryRequest &r = me->QueryRequests.front();
00508                         this->Unlock();
00509 
00510                         Result sresult = r.service->RunQuery(r.query);
00511 
00512                         this->Lock();
00513                         if (!me->QueryRequests.empty() && me->QueryRequests.front().query == r.query)
00514                         {
00515                                 if (r.sqlinterface)
00516                                         me->FinishedRequests.push_back(QueryResult(r.sqlinterface, sresult));
00517                                 me->QueryRequests.pop_front();
00518                         }
00519                 }
00520                 else
00521                 {
00522                         if (!me->FinishedRequests.empty())
00523                                 me->Notify();
00524                         this->Wait();
00525                 }
00526         }
00527 
00528         this->Unlock();
00529 }
00530 
00531 MODULE_INIT(ModuleSQL)
00532