00001
00002
00003 #include "module.h"
00004 #define NO_CLIENT_LONG_LONG
00005 #include <mysql/mysql.h>
00006 #include "sql.h"
00007
00008 using namespace SQL;
00009
00020 class MySQLService;
00021
00024 struct QueryRequest
00025 {
00026
00027 MySQLService *service;
00028
00029 Interface *sqlinterface;
00030
00031 Query query;
00032
00033 QueryRequest(MySQLService *s, Interface *i, const Query &q) : service(s), sqlinterface(i), query(q) { }
00034 };
00035
00037 struct QueryResult
00038 {
00039
00040 Interface *sqlinterface;
00041
00042 Result result;
00043
00044 QueryResult(Interface *i, Result &r) : sqlinterface(i), result(r) { }
00045 };
00046
00049 class MySQLResult : public Result
00050 {
00051 MYSQL_RES *res;
00052
00053 public:
00054 MySQLResult(unsigned int i, const Query &q, const Anope::string &fq, MYSQL_RES *r) : Result(i, q, fq), res(r)
00055 {
00056 unsigned num_fields = res ? mysql_num_fields(res) : 0;
00057
00058
00059
00060 if (!num_fields)
00061 return;
00062
00063 for (MYSQL_ROW row; (row = mysql_fetch_row(res));)
00064 {
00065 MYSQL_FIELD *fields = mysql_fetch_fields(res);
00066
00067 if (fields)
00068 {
00069 std::map<Anope::string, Anope::string> items;
00070
00071 for (unsigned field_count = 0; field_count < num_fields; ++field_count)
00072 {
00073 Anope::string column = (fields[field_count].name ? fields[field_count].name : "");
00074 Anope::string data = (row[field_count] ? row[field_count] : "");
00075
00076 items[column] = data;
00077 }
00078
00079 this->entries.push_back(items);
00080 }
00081 }
00082 }
00083
00084 MySQLResult(const Query &q, const Anope::string &fq, const Anope::string &err) : Result(0, q, fq, err), res(NULL)
00085 {
00086 }
00087
00088 ~MySQLResult()
00089 {
00090 if (this->res)
00091 mysql_free_result(this->res);
00092 }
00093 };
00094
00097 class MySQLService : public Provider
00098 {
00099 std::map<Anope::string, std::set<Anope::string> > active_schema;
00100
00101 Anope::string database;
00102 Anope::string server;
00103 Anope::string user;
00104 Anope::string password;
00105 int port;
00106
00107 MYSQL *sql;
00108
00112 Anope::string Escape(const Anope::string &query);
00113
00114 public:
00115
00116
00117
00118
00119 Mutex Lock;
00120
00121 MySQLService(Module *o, const Anope::string &n, const Anope::string &d, const Anope::string &s, const Anope::string &u, const Anope::string &p, int po);
00122
00123 ~MySQLService();
00124
00125 void Run(Interface *i, const Query &query) anope_override;
00126
00127 Result RunQuery(const Query &query) anope_override;
00128
00129 std::vector<Query> CreateTable(const Anope::string &table, const Data &data) anope_override;
00130
00131 Query BuildInsert(const Anope::string &table, unsigned int id, Data &data) anope_override;
00132
00133 Query GetTables(const Anope::string &prefix) anope_override;
00134
00135 void Connect();
00136
00137 bool CheckConnection();
00138
00139 Anope::string BuildQuery(const Query &q);
00140
00141 Anope::string FromUnixtime(time_t);
00142 };
00143
00146 class DispatcherThread : public Thread, public Condition
00147 {
00148 public:
00149 DispatcherThread() : Thread() { }
00150
00151 void Run() anope_override;
00152 };
00153
00154 class ModuleSQL;
00155 static ModuleSQL *me;
00156 class ModuleSQL : public Module, public Pipe
00157 {
00158
00159 std::map<Anope::string, MySQLService *> MySQLServices;
00160 public:
00161
00162 std::deque<QueryRequest> QueryRequests;
00163
00164 std::deque<QueryResult> FinishedRequests;
00165
00166 DispatcherThread *DThread;
00167
00168 ModuleSQL(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, SUPPORTED)
00169 {
00170 me = this;
00171
00172 Implementation i[] = { I_OnReload, I_OnModuleUnload };
00173 ModuleManager::Attach(i, this, 2);
00174
00175 DThread = new DispatcherThread();
00176 DThread->Start();
00177
00178 OnReload();
00179 }
00180
00181 ~ModuleSQL()
00182 {
00183 for (std::map<Anope::string, MySQLService *>::iterator it = this->MySQLServices.begin(); it != this->MySQLServices.end(); ++it)
00184 delete it->second;
00185 MySQLServices.clear();
00186
00187 DThread->SetExitState();
00188 DThread->Wakeup();
00189 DThread->Join();
00190 delete DThread;
00191 }
00192
00193 void OnReload() anope_override
00194 {
00195 ConfigReader config;
00196 int i, num;
00197
00198 for (std::map<Anope::string, MySQLService *>::iterator it = this->MySQLServices.begin(); it != this->MySQLServices.end();)
00199 {
00200 const Anope::string &cname = it->first;
00201 MySQLService *s = it->second;
00202 ++it;
00203
00204 for (i = 0, num = config.Enumerate("mysql"); i < num; ++i)
00205 {
00206 if (config.ReadValue("mysql", "name", "main", i) == cname)
00207 {
00208 break;
00209 }
00210 }
00211
00212 if (i == num)
00213 {
00214 Log(LOG_NORMAL, "mysql") << "MySQL: Removing server connection " << cname;
00215
00216 delete s;
00217 this->MySQLServices.erase(cname);
00218 }
00219 }
00220
00221 for (i = 0, num = config.Enumerate("mysql"); i < num; ++i)
00222 {
00223 Anope::string connname = config.ReadValue("mysql", "name", "mysql/main", i);
00224
00225 if (this->MySQLServices.find(connname) == this->MySQLServices.end())
00226 {
00227 Anope::string database = config.ReadValue("mysql", "database", "anope", i);
00228 Anope::string server = config.ReadValue("mysql", "server", "127.0.0.1", i);
00229 Anope::string user = config.ReadValue("mysql", "username", "anope", i);
00230 Anope::string password = config.ReadValue("mysql", "password", "", i);
00231 int port = config.ReadInteger("mysql", "port", "3306", i, true);
00232
00233 try
00234 {
00235 MySQLService *ss = new MySQLService(this, connname, database, server, user, password, port);
00236 this->MySQLServices.insert(std::make_pair(connname, ss));
00237
00238 Log(LOG_NORMAL, "mysql") << "MySQL: Successfully connected to server " << connname << " (" << server << ")";
00239 }
00240 catch (const SQL::Exception &ex)
00241 {
00242 Log(LOG_NORMAL, "mysql") << "MySQL: " << ex.GetReason();
00243 }
00244 }
00245 }
00246 }
00247
00248 void OnModuleUnload(User *, Module *m) anope_override
00249 {
00250 this->DThread->Lock();
00251
00252 for (unsigned i = this->QueryRequests.size(); i > 0; --i)
00253 {
00254 QueryRequest &r = this->QueryRequests[i - 1];
00255
00256 if (r.sqlinterface && r.sqlinterface->owner == m)
00257 {
00258 if (i == 1)
00259 {
00260 r.service->Lock.Lock();
00261 r.service->Lock.Unlock();
00262 }
00263
00264 this->QueryRequests.erase(this->QueryRequests.begin() + i - 1);
00265 }
00266 }
00267
00268 this->DThread->Unlock();
00269
00270 this->OnNotify();
00271 }
00272
00273 void OnNotify() anope_override
00274 {
00275 this->DThread->Lock();
00276 std::deque<QueryResult> finishedRequests = this->FinishedRequests;
00277 this->FinishedRequests.clear();
00278 this->DThread->Unlock();
00279
00280 for (std::deque<QueryResult>::const_iterator it = finishedRequests.begin(), it_end = finishedRequests.end(); it != it_end; ++it)
00281 {
00282 const QueryResult &qr = *it;
00283
00284 if (!qr.sqlinterface)
00285 throw SQL::Exception("NULL qr.sqlinterface in MySQLPipe::OnNotify() ?");
00286
00287 if (qr.result.GetError().empty())
00288 qr.sqlinterface->OnResult(qr.result);
00289 else
00290 qr.sqlinterface->OnError(qr.result);
00291 }
00292 }
00293 };
00294
00295 MySQLService::MySQLService(Module *o, const Anope::string &n, const Anope::string &d, const Anope::string &s, const Anope::string &u, const Anope::string &p, int po)
00296 : Provider(o, n), database(d), server(s), user(u), password(p), port(po), sql(NULL)
00297 {
00298 Connect();
00299 }
00300
00301 MySQLService::~MySQLService()
00302 {
00303 me->DThread->Lock();
00304 this->Lock.Lock();
00305 mysql_close(this->sql);
00306 this->sql = NULL;
00307
00308 for (unsigned i = me->QueryRequests.size(); i > 0; --i)
00309 {
00310 QueryRequest &r = me->QueryRequests[i - 1];
00311
00312 if (r.service == this)
00313 {
00314 if (r.sqlinterface)
00315 r.sqlinterface->OnError(Result(0, r.query, "SQL Interface is going away"));
00316 me->QueryRequests.erase(me->QueryRequests.begin() + i - 1);
00317 }
00318 }
00319 this->Lock.Unlock();
00320 me->DThread->Unlock();
00321 }
00322
00323 void MySQLService::Run(Interface *i, const Query &query)
00324 {
00325 me->DThread->Lock();
00326 me->QueryRequests.push_back(QueryRequest(this, i, query));
00327 me->DThread->Unlock();
00328 me->DThread->Wakeup();
00329 }
00330
00331 Result MySQLService::RunQuery(const Query &query)
00332 {
00333 this->Lock.Lock();
00334
00335 Anope::string real_query = this->BuildQuery(query);
00336
00337 if (this->CheckConnection() && !mysql_real_query(this->sql, real_query.c_str(), real_query.length()))
00338 {
00339 MYSQL_RES *res = mysql_store_result(this->sql);
00340 unsigned int id = mysql_insert_id(this->sql);
00341
00342 this->Lock.Unlock();
00343 return MySQLResult(id, query, real_query, res);
00344 }
00345 else
00346 {
00347 Anope::string error = mysql_error(this->sql);
00348 this->Lock.Unlock();
00349 return MySQLResult(query, real_query, error);
00350 }
00351 }
00352
00353 std::vector<Query> MySQLService::CreateTable(const Anope::string &table, const Data &data)
00354 {
00355 std::vector<Query> queries;
00356 std::set<Anope::string> &known_cols = this->active_schema[table];
00357
00358 if (known_cols.empty())
00359 {
00360 Log(LOG_DEBUG) << "m_mysql: Fetching columns for " << table;
00361
00362 Result columns = this->RunQuery("SHOW COLUMNS FROM `" + table + "`");
00363 for (int i = 0; i < columns.Rows(); ++i)
00364 {
00365 const Anope::string &column = columns.Get(i, "Field");
00366
00367 Log(LOG_DEBUG) << "m_mysql: Column #" << i << " for " << table << ": " << column;
00368 known_cols.insert(column);
00369 }
00370 }
00371
00372 if (known_cols.empty())
00373 {
00374 Anope::string query_text = "CREATE TABLE `" + table + "` (`id` int(10) unsigned NOT NULL AUTO_INCREMENT,"
00375 " `timestamp` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP";
00376 for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
00377 {
00378 known_cols.insert(it->first);
00379
00380 query_text += ", `" + it->first + "` ";
00381 if (data.GetType(it->first) == Serialize::Data::DT_INT)
00382 query_text += "int(11)";
00383 else
00384 query_text += "text";
00385 }
00386 query_text += ", PRIMARY KEY (`id`), KEY `timestamp_idx` (`timestamp`))";
00387 queries.push_back(query_text);
00388 }
00389 else
00390 for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
00391 {
00392 if (known_cols.count(it->first) > 0)
00393 continue;
00394
00395 known_cols.insert(it->first);
00396
00397 Anope::string query_text = "ALTER TABLE `" + table + "` ADD `" + it->first + "` ";
00398 if (data.GetType(it->first) == Serialize::Data::DT_INT)
00399 query_text += "int(11)";
00400 else
00401 query_text += "text";
00402
00403 queries.push_back(query_text);
00404 }
00405
00406 return queries;
00407 }
00408
00409 Query MySQLService::BuildInsert(const Anope::string &table, unsigned int id, Data &data)
00410 {
00411
00412 const std::set<Anope::string> &known_cols = this->active_schema[table];
00413 for (std::set<Anope::string>::iterator it = known_cols.begin(), it_end = known_cols.end(); it != it_end; ++it)
00414 if (*it != "id" && *it != "timestamp" && data.data.count(*it) == 0)
00415 data[*it] << "";
00416
00417 Anope::string query_text = "INSERT INTO `" + table + "` (`id`";
00418 for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
00419 query_text += ",`" + it->first + "`";
00420 query_text += ") VALUES (" + stringify(id);
00421 for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
00422 query_text += ",@" + it->first + "@";
00423 query_text += ") ON DUPLICATE KEY UPDATE ";
00424 for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
00425 query_text += "`" + it->first + "`=VALUES(`" + it->first + "`),";
00426 query_text.erase(query_text.end() - 1);
00427
00428 Query query(query_text);
00429 for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
00430 {
00431 Anope::string buf;
00432 *it->second >> buf;
00433 query.SetValue(it->first, buf);
00434 }
00435
00436 return query;
00437 }
00438
00439 Query MySQLService::GetTables(const Anope::string &prefix)
00440 {
00441 return Query("SHOW TABLES LIKE '" + prefix + "%';");
00442 }
00443
00444 void MySQLService::Connect()
00445 {
00446 this->sql = mysql_init(this->sql);
00447
00448 const unsigned int timeout = 1;
00449 mysql_options(this->sql, MYSQL_OPT_CONNECT_TIMEOUT, reinterpret_cast<const char *>(&timeout));
00450
00451 bool connect = mysql_real_connect(this->sql, this->server.c_str(), this->user.c_str(), this->password.c_str(), this->database.c_str(), this->port, NULL, CLIENT_MULTI_RESULTS);
00452
00453 if (!connect)
00454 throw SQL::Exception("Unable to connect to MySQL service " + this->name + ": " + mysql_error(this->sql));
00455
00456 Log(LOG_DEBUG) << "Successfully connected to MySQL service " << this->name << " at " << this->server << ":" << this->port;
00457 }
00458
00459
00460 bool MySQLService::CheckConnection()
00461 {
00462 if (!this->sql || mysql_ping(this->sql))
00463 {
00464 try
00465 {
00466 this->Connect();
00467 }
00468 catch (const SQL::Exception &)
00469 {
00470 return false;
00471 }
00472 }
00473
00474 return true;
00475 }
00476
00477 Anope::string MySQLService::Escape(const Anope::string &query)
00478 {
00479 char buffer[BUFSIZE];
00480 mysql_real_escape_string(this->sql, buffer, query.c_str(), query.length());
00481 return buffer;
00482 }
00483
00484 Anope::string MySQLService::BuildQuery(const Query &q)
00485 {
00486 Anope::string real_query = q.query;
00487
00488 for (std::map<Anope::string, QueryData>::const_iterator it = q.parameters.begin(), it_end = q.parameters.end(); it != it_end; ++it)
00489 real_query = real_query.replace_all_cs("@" + it->first + "@", (it->second.escape ? ("'" + this->Escape(it->second.data) + "'") : it->second.data));
00490
00491 return real_query;
00492 }
00493
00494 Anope::string MySQLService::FromUnixtime(time_t t)
00495 {
00496 return "FROM_UNIXTIME(" + stringify(t) + ")";
00497 }
00498
00499 void DispatcherThread::Run()
00500 {
00501 this->Lock();
00502
00503 while (!this->GetExitState())
00504 {
00505 if (!me->QueryRequests.empty())
00506 {
00507 QueryRequest &r = me->QueryRequests.front();
00508 this->Unlock();
00509
00510 Result sresult = r.service->RunQuery(r.query);
00511
00512 this->Lock();
00513 if (!me->QueryRequests.empty() && me->QueryRequests.front().query == r.query)
00514 {
00515 if (r.sqlinterface)
00516 me->FinishedRequests.push_back(QueryResult(r.sqlinterface, sresult));
00517 me->QueryRequests.pop_front();
00518 }
00519 }
00520 else
00521 {
00522 if (!me->FinishedRequests.empty())
00523 me->Notify();
00524 this->Wait();
00525 }
00526 }
00527
00528 this->Unlock();
00529 }
00530
00531 MODULE_INIT(ModuleSQL)
00532