diff options
Diffstat (limited to 'node_modules/mysql2/lib/pool_cluster.js')
-rw-r--r-- | node_modules/mysql2/lib/pool_cluster.js | 283 |
1 files changed, 283 insertions, 0 deletions
diff --git a/node_modules/mysql2/lib/pool_cluster.js b/node_modules/mysql2/lib/pool_cluster.js new file mode 100644 index 0000000..92f53de --- /dev/null +++ b/node_modules/mysql2/lib/pool_cluster.js @@ -0,0 +1,283 @@ +'use strict'; + +const process = require('process'); + +const Pool = require('./pool.js'); +const PoolConfig = require('./pool_config.js'); +const Connection = require('./connection.js'); +const EventEmitter = require('events').EventEmitter; + +/** + * Selector + */ +const makeSelector = { + RR() { + let index = 0; + return clusterIds => clusterIds[index++ % clusterIds.length]; + }, + RANDOM() { + return clusterIds => + clusterIds[Math.floor(Math.random() * clusterIds.length)]; + }, + ORDER() { + return clusterIds => clusterIds[0]; + } +}; + +class PoolNamespace { + constructor(cluster, pattern, selector) { + this._cluster = cluster; + this._pattern = pattern; + this._selector = makeSelector[selector](); + } + + getConnection(cb) { + const clusterNode = this._getClusterNode(); + if (clusterNode === null) { + return cb(new Error('Pool does Not exists.')); + } + return this._cluster._getConnection(clusterNode, (err, connection) => { + if (err) { + return cb(err); + } + if (connection === 'retry') { + return this.getConnection(cb); + } + return cb(null, connection); + }); + } + + /** + * pool cluster query + * @param {*} sql + * @param {*} values + * @param {*} cb + * @returns query + */ + query(sql, values, cb) { + const query = Connection.createQuery(sql, values, cb, {}); + this.getConnection((err, conn) => { + if (err) { + if (typeof query.onResult === 'function') { + query.onResult(err); + } else { + query.emit('error', err); + } + return; + } + try { + conn.query(query).once('end', () => { + conn.release(); + }); + } catch (e) { + conn.release(); + throw e; + } + }); + return query; + } + + /** + * pool cluster execute + * @param {*} sql + * @param {*} values + * @param {*} cb + */ + execute(sql, values, cb) { + if (typeof values === 'function') { + cb = values; + values = []; + } + this.getConnection((err, conn) => { + if (err) { + return cb(err); + } + try { + conn.execute(sql, values, cb).once('end', () => { + conn.release(); + }); + } catch (e) { + conn.release(); + throw e; + } + }); + } + + _getClusterNode() { + const foundNodeIds = this._cluster._findNodeIds(this._pattern); + if (foundNodeIds.length === 0) { + return null; + } + const nodeId = + foundNodeIds.length === 1 + ? foundNodeIds[0] + : this._selector(foundNodeIds); + return this._cluster._getNode(nodeId); + } +} + +class PoolCluster extends EventEmitter { + constructor(config) { + super(); + config = config || {}; + this._canRetry = + typeof config.canRetry === 'undefined' ? true : config.canRetry; + this._removeNodeErrorCount = config.removeNodeErrorCount || 5; + this._defaultSelector = config.defaultSelector || 'RR'; + this._closed = false; + this._lastId = 0; + this._nodes = {}; + this._serviceableNodeIds = []; + this._namespaces = {}; + this._findCaches = {}; + } + + of(pattern, selector) { + pattern = pattern || '*'; + selector = selector || this._defaultSelector; + selector = selector.toUpperCase(); + if (!makeSelector[selector] === 'undefined') { + selector = this._defaultSelector; + } + const key = pattern + selector; + if (typeof this._namespaces[key] === 'undefined') { + this._namespaces[key] = new PoolNamespace(this, pattern, selector); + } + return this._namespaces[key]; + } + + add(id, config) { + if (typeof id === 'object') { + config = id; + id = `CLUSTER::${++this._lastId}`; + } + if (typeof this._nodes[id] === 'undefined') { + this._nodes[id] = { + id: id, + errorCount: 0, + pool: new Pool({ config: new PoolConfig(config) }) + }; + this._serviceableNodeIds.push(id); + this._clearFindCaches(); + } + } + + getConnection(pattern, selector, cb) { + let namespace; + if (typeof pattern === 'function') { + cb = pattern; + namespace = this.of(); + } else { + if (typeof selector === 'function') { + cb = selector; + selector = this._defaultSelector; + } + namespace = this.of(pattern, selector); + } + namespace.getConnection(cb); + } + + end(callback) { + const cb = + callback !== undefined + ? callback + : err => { + if (err) { + throw err; + } + }; + if (this._closed) { + process.nextTick(cb); + return; + } + this._closed = true; + + let calledBack = false; + let waitingClose = 0; + const onEnd = err => { + if (!calledBack && (err || --waitingClose <= 0)) { + calledBack = true; + return cb(err); + } + }; + + for (const id in this._nodes) { + waitingClose++; + this._nodes[id].pool.end(onEnd); + } + if (waitingClose === 0) { + process.nextTick(onEnd); + } + } + + _findNodeIds(pattern) { + if (typeof this._findCaches[pattern] !== 'undefined') { + return this._findCaches[pattern]; + } + let foundNodeIds; + if (pattern === '*') { + // all + foundNodeIds = this._serviceableNodeIds; + } else if (this._serviceableNodeIds.indexOf(pattern) !== -1) { + // one + foundNodeIds = [pattern]; + } else { + // wild matching + const keyword = pattern.substring(pattern.length - 1, 0); + foundNodeIds = this._serviceableNodeIds.filter(id => + id.startsWith(keyword) + ); + } + this._findCaches[pattern] = foundNodeIds; + return foundNodeIds; + } + + _getNode(id) { + return this._nodes[id] || null; + } + + _increaseErrorCount(node) { + if (++node.errorCount >= this._removeNodeErrorCount) { + const index = this._serviceableNodeIds.indexOf(node.id); + if (index !== -1) { + this._serviceableNodeIds.splice(index, 1); + delete this._nodes[node.id]; + this._clearFindCaches(); + node.pool.end(); + this.emit('remove', node.id); + } + } + } + + _decreaseErrorCount(node) { + if (node.errorCount > 0) { + --node.errorCount; + } + } + + _getConnection(node, cb) { + node.pool.getConnection((err, connection) => { + if (err) { + this._increaseErrorCount(node); + if (this._canRetry) { + // REVIEW: this seems wrong? + this.emit('warn', err); + // eslint-disable-next-line no-console + console.warn(`[Error] PoolCluster : ${err}`); + return cb(null, 'retry'); + } + return cb(err); + } + this._decreaseErrorCount(node); + + connection._clusterId = node.id; + return cb(null, connection); + }); + } + + _clearFindCaches() { + this._findCaches = {}; + } +} + +module.exports = PoolCluster; |