aboutsummaryrefslogtreecommitdiff
path: root/node_modules/mysql2/lib/connection.js
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/mysql2/lib/connection.js')
-rw-r--r--node_modules/mysql2/lib/connection.js936
1 files changed, 936 insertions, 0 deletions
diff --git a/node_modules/mysql2/lib/connection.js b/node_modules/mysql2/lib/connection.js
new file mode 100644
index 0000000..47970e9
--- /dev/null
+++ b/node_modules/mysql2/lib/connection.js
@@ -0,0 +1,936 @@
+// This file was modified by Oracle on June 1, 2021.
+// The changes involve new logic to handle an additional ERR Packet sent by
+// the MySQL server when the connection is closed unexpectedly.
+// Modifications copyright (c) 2021, Oracle and/or its affiliates.
+
+// This file was modified by Oracle on June 17, 2021.
+// The changes involve logic to ensure the socket connection is closed when
+// there is a fatal error.
+// Modifications copyright (c) 2021, Oracle and/or its affiliates.
+
+'use strict';
+
+const Net = require('net');
+const Tls = require('tls');
+const Timers = require('timers');
+const EventEmitter = require('events').EventEmitter;
+const Readable = require('stream').Readable;
+const Queue = require('denque');
+const SqlString = require('sqlstring');
+const LRU = require('lru-cache');
+
+const PacketParser = require('./packet_parser.js');
+const Packets = require('./packets/index.js');
+const Commands = require('./commands/index.js');
+const ConnectionConfig = require('./connection_config.js');
+const CharsetToEncoding = require('./constants/charset_encodings.js');
+
+let _connectionId = 0;
+
+let convertNamedPlaceholders = null;
+
+class Connection extends EventEmitter {
+ constructor(opts) {
+ super();
+ this.config = opts.config;
+ // TODO: fill defaults
+ // if no params, connect to /var/lib/mysql/mysql.sock ( /tmp/mysql.sock on OSX )
+ // if host is given, connect to host:3306
+ // TODO: use `/usr/local/mysql/bin/mysql_config --socket` output? as default socketPath
+ // if there is no host/port and no socketPath parameters?
+ if (!opts.config.stream) {
+ if (opts.config.socketPath) {
+ this.stream = Net.connect(opts.config.socketPath);
+ } else {
+ this.stream = Net.connect(
+ opts.config.port,
+ opts.config.host
+ );
+
+ // Enable keep-alive on the socket. It's disabled by default, but the
+ // user can enable it and supply an initial delay.
+ this.stream.setKeepAlive(true, this.config.keepAliveInitialDelay);
+ }
+ // if stream is a function, treat it as "stream agent / factory"
+ } else if (typeof opts.config.stream === 'function') {
+ this.stream = opts.config.stream(opts);
+ } else {
+ this.stream = opts.config.stream;
+ }
+
+ this._internalId = _connectionId++;
+ this._commands = new Queue();
+ this._command = null;
+ this._paused = false;
+ this._paused_packets = new Queue();
+ this._statements = new LRU({
+ max: this.config.maxPreparedStatements,
+ dispose: function(key, statement) {
+ statement.close();
+ }
+ });
+ this.serverCapabilityFlags = 0;
+ this.authorized = false;
+ this.sequenceId = 0;
+ this.compressedSequenceId = 0;
+ this.threadId = null;
+ this._handshakePacket = null;
+ this._fatalError = null;
+ this._protocolError = null;
+ this._outOfOrderPackets = [];
+ this.clientEncoding = CharsetToEncoding[this.config.charsetNumber];
+ this.stream.on('error', this._handleNetworkError.bind(this));
+ // see https://gist.github.com/khoomeister/4985691#use-that-instead-of-bind
+ this.packetParser = new PacketParser(p => {
+ this.handlePacket(p);
+ });
+ this.stream.on('data', data => {
+ if (this.connectTimeout) {
+ Timers.clearTimeout(this.connectTimeout);
+ this.connectTimeout = null;
+ }
+ this.packetParser.execute(data);
+ });
+ this.stream.on('close', () => {
+ // we need to set this flag everywhere where we want connection to close
+ if (this._closing) {
+ return;
+ }
+ if (!this._protocolError) {
+ // no particular error message before disconnect
+ this._protocolError = new Error(
+ 'Connection lost: The server closed the connection.'
+ );
+ this._protocolError.fatal = true;
+ this._protocolError.code = 'PROTOCOL_CONNECTION_LOST';
+ }
+ this._notifyError(this._protocolError);
+ });
+ let handshakeCommand;
+ if (!this.config.isServer) {
+ handshakeCommand = new Commands.ClientHandshake(this.config.clientFlags);
+ handshakeCommand.on('end', () => {
+ // this happens when handshake finishes early either because there was
+ // some fatal error or the server sent an error packet instead of
+ // an hello packet (for example, 'Too many connactions' error)
+ if (!handshakeCommand.handshake || this._fatalError || this._protocolError) {
+ return;
+ }
+ this._handshakePacket = handshakeCommand.handshake;
+ this.threadId = handshakeCommand.handshake.connectionId;
+ this.emit('connect', handshakeCommand.handshake);
+ });
+ handshakeCommand.on('error', err => {
+ this._closing = true;
+ this._notifyError(err);
+ });
+ this.addCommand(handshakeCommand);
+ }
+ // in case there was no initiall handshake but we need to read sting, assume it utf-8
+ // most common example: "Too many connections" error ( packet is sent immediately on connection attempt, we don't know server encoding yet)
+ // will be overwrittedn with actial encoding value as soon as server handshake packet is received
+ this.serverEncoding = 'utf8';
+ if (this.config.connectTimeout) {
+ const timeoutHandler = this._handleTimeoutError.bind(this);
+ this.connectTimeout = Timers.setTimeout(
+ timeoutHandler,
+ this.config.connectTimeout
+ );
+ }
+ }
+
+ promise(promiseImpl) {
+ const PromiseConnection = require('../promise').PromiseConnection;
+ return new PromiseConnection(this, promiseImpl);
+ }
+
+ _addCommandClosedState(cmd) {
+ const err = new Error(
+ "Can't add new command when connection is in closed state"
+ );
+ err.fatal = true;
+ if (cmd.onResult) {
+ cmd.onResult(err);
+ } else {
+ this.emit('error', err);
+ }
+ }
+
+ _handleFatalError(err) {
+ err.fatal = true;
+ // stop receiving packets
+ this.stream.removeAllListeners('data');
+ this.addCommand = this._addCommandClosedState;
+ this.write = () => {
+ this.emit('error', new Error("Can't write in closed state"));
+ };
+ this._notifyError(err);
+ this._fatalError = err;
+ }
+
+ _handleNetworkError(err) {
+ if (this.connectTimeout) {
+ Timers.clearTimeout(this.connectTimeout);
+ this.connectTimeout = null;
+ }
+ // Do not throw an error when a connection ends with a RST,ACK packet
+ if (err.errno === 'ECONNRESET' && this._closing) {
+ return;
+ }
+ this._handleFatalError(err);
+ }
+
+ _handleTimeoutError() {
+ if (this.connectTimeout) {
+ Timers.clearTimeout(this.connectTimeout);
+ this.connectTimeout = null;
+ }
+ this.stream.destroy && this.stream.destroy();
+ const err = new Error('connect ETIMEDOUT');
+ err.errorno = 'ETIMEDOUT';
+ err.code = 'ETIMEDOUT';
+ err.syscall = 'connect';
+ this._handleNetworkError(err);
+ }
+
+ // notify all commands in the queue and bubble error as connection "error"
+ // called on stream error or unexpected termination
+ _notifyError(err) {
+ if (this.connectTimeout) {
+ Timers.clearTimeout(this.connectTimeout);
+ this.connectTimeout = null;
+ }
+ // prevent from emitting 'PROTOCOL_CONNECTION_LOST' after EPIPE or ECONNRESET
+ if (this._fatalError) {
+ return;
+ }
+ let command;
+ // if there is no active command, notify connection
+ // if there are commands and all of them have callbacks, pass error via callback
+ let bubbleErrorToConnection = !this._command;
+ if (this._command && this._command.onResult) {
+ this._command.onResult(err);
+ this._command = null;
+ // connection handshake is special because we allow it to be implicit
+ // if error happened during handshake, but there are others commands in queue
+ // then bubble error to other commands and not to connection
+ } else if (
+ !(
+ this._command &&
+ this._command.constructor === Commands.ClientHandshake &&
+ this._commands.length > 0
+ )
+ ) {
+ bubbleErrorToConnection = true;
+ }
+ while ((command = this._commands.shift())) {
+ if (command.onResult) {
+ command.onResult(err);
+ } else {
+ bubbleErrorToConnection = true;
+ }
+ }
+ // notify connection if some comands in the queue did not have callbacks
+ // or if this is pool connection ( so it can be removed from pool )
+ if (bubbleErrorToConnection || this._pool) {
+ this.emit('error', err);
+ }
+ // close connection after emitting the event in case of a fatal error
+ if (err.fatal) {
+ this.close();
+ }
+ }
+
+ write(buffer) {
+ const result = this.stream.write(buffer, err => {
+ if (err) {
+ this._handleNetworkError(err);
+ }
+ });
+
+ if (!result) {
+ this.stream.emit('pause');
+ }
+ }
+
+ // http://dev.mysql.com/doc/internals/en/sequence-id.html
+ //
+ // The sequence-id is incremented with each packet and may wrap around.
+ // It starts at 0 and is reset to 0 when a new command
+ // begins in the Command Phase.
+ // http://dev.mysql.com/doc/internals/en/example-several-mysql-packets.html
+ _resetSequenceId() {
+ this.sequenceId = 0;
+ this.compressedSequenceId = 0;
+ }
+
+ _bumpCompressedSequenceId(numPackets) {
+ this.compressedSequenceId += numPackets;
+ this.compressedSequenceId %= 256;
+ }
+
+ _bumpSequenceId(numPackets) {
+ this.sequenceId += numPackets;
+ this.sequenceId %= 256;
+ }
+
+ writePacket(packet) {
+ const MAX_PACKET_LENGTH = 16777215;
+ const length = packet.length();
+ let chunk, offset, header;
+ if (length < MAX_PACKET_LENGTH) {
+ packet.writeHeader(this.sequenceId);
+ if (this.config.debug) {
+ // eslint-disable-next-line no-console
+ console.log(
+ `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`
+ );
+ // eslint-disable-next-line no-console
+ console.log(
+ `${this._internalId} ${this.connectionId} <== ${packet.buffer.toString('hex')}`
+ );
+ }
+ this._bumpSequenceId(1);
+ this.write(packet.buffer);
+ } else {
+ if (this.config.debug) {
+ // eslint-disable-next-line no-console
+ console.log(
+ `${this._internalId} ${this.connectionId} <== Writing large packet, raw content not written:`
+ );
+ // eslint-disable-next-line no-console
+ console.log(
+ `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`
+ );
+ }
+ for (offset = 4; offset < 4 + length; offset += MAX_PACKET_LENGTH) {
+ chunk = packet.buffer.slice(offset, offset + MAX_PACKET_LENGTH);
+ if (chunk.length === MAX_PACKET_LENGTH) {
+ header = Buffer.from([0xff, 0xff, 0xff, this.sequenceId]);
+ } else {
+ header = Buffer.from([
+ chunk.length & 0xff,
+ (chunk.length >> 8) & 0xff,
+ (chunk.length >> 16) & 0xff,
+ this.sequenceId
+ ]);
+ }
+ this._bumpSequenceId(1);
+ this.write(header);
+ this.write(chunk);
+ }
+ }
+ }
+
+ // 0.11+ environment
+ startTLS(onSecure) {
+ if (this.config.debug) {
+ // eslint-disable-next-line no-console
+ console.log('Upgrading connection to TLS');
+ }
+ const secureContext = Tls.createSecureContext({
+ ca: this.config.ssl.ca,
+ cert: this.config.ssl.cert,
+ ciphers: this.config.ssl.ciphers,
+ key: this.config.ssl.key,
+ passphrase: this.config.ssl.passphrase,
+ minVersion: this.config.ssl.minVersion
+ });
+ const rejectUnauthorized = this.config.ssl.rejectUnauthorized;
+ let secureEstablished = false;
+ const secureSocket = new Tls.TLSSocket(this.stream, {
+ rejectUnauthorized: rejectUnauthorized,
+ requestCert: true,
+ secureContext: secureContext,
+ isServer: false
+ });
+ // error handler for secure socket
+ secureSocket.on('_tlsError', err => {
+ if (secureEstablished) {
+ this._handleNetworkError(err);
+ } else {
+ onSecure(err);
+ }
+ });
+ secureSocket.on('secure', () => {
+ secureEstablished = true;
+ onSecure(rejectUnauthorized ? secureSocket.ssl.verifyError() : null);
+ });
+ secureSocket.on('data', data => {
+ this.packetParser.execute(data);
+ });
+ this.write = buffer => {
+ secureSocket.write(buffer);
+ };
+ // start TLS communications
+ secureSocket._start();
+ }
+
+ pipe() {
+ if (this.stream instanceof Net.Stream) {
+ this.stream.ondata = (data, start, end) => {
+ this.packetParser.execute(data, start, end);
+ };
+ } else {
+ this.stream.on('data', data => {
+ this.packetParser.execute(
+ data.parent,
+ data.offset,
+ data.offset + data.length
+ );
+ });
+ }
+ }
+
+ protocolError(message, code) {
+ // Starting with MySQL 8.0.24, if the client closes the connection
+ // unexpectedly, the server will send a last ERR Packet, which we can
+ // safely ignore.
+ // https://dev.mysql.com/worklog/task/?id=12999
+ if (this._closing) {
+ return;
+ }
+
+ const err = new Error(message);
+ err.fatal = true;
+ err.code = code || 'PROTOCOL_ERROR';
+ this.emit('error', err);
+ }
+
+ handlePacket(packet) {
+ if (this._paused) {
+ this._paused_packets.push(packet);
+ return;
+ }
+ if (packet) {
+ if (this.sequenceId !== packet.sequenceId) {
+ const err = new Error(
+ `Warning: got packets out of order. Expected ${this.sequenceId} but received ${packet.sequenceId}`
+ );
+ err.expected = this.sequenceId;
+ err.received = packet.sequenceId;
+ this.emit('warn', err); // REVIEW
+ // eslint-disable-next-line no-console
+ console.error(err.message);
+ }
+ this._bumpSequenceId(packet.numPackets);
+ }
+ if (this.config.debug) {
+ if (packet) {
+ // eslint-disable-next-line no-console
+ console.log(
+ ` raw: ${packet.buffer
+ .slice(packet.offset, packet.offset + packet.length())
+ .toString('hex')}`
+ );
+ // eslint-disable-next-line no-console
+ console.trace();
+ const commandName = this._command
+ ? this._command._commandName
+ : '(no command)';
+ const stateName = this._command
+ ? this._command.stateName()
+ : '(no command)';
+ // eslint-disable-next-line no-console
+ console.log(
+ `${this._internalId} ${this.connectionId} ==> ${commandName}#${stateName}(${[packet.sequenceId, packet.type(), packet.length()].join(',')})`
+ );
+ }
+ }
+ if (!this._command) {
+ const marker = packet.peekByte();
+ // If it's an Err Packet, we should use it.
+ if (marker === 0xff) {
+ const error = Packets.Error.fromPacket(packet);
+ this.protocolError(error.message, error.code);
+ } else {
+ // Otherwise, it means it's some other unexpected packet.
+ this.protocolError(
+ 'Unexpected packet while no commands in the queue',
+ 'PROTOCOL_UNEXPECTED_PACKET'
+ );
+ }
+ this.close();
+ return;
+ }
+ const done = this._command.execute(packet, this);
+ if (done) {
+ this._command = this._commands.shift();
+ if (this._command) {
+ this.sequenceId = 0;
+ this.compressedSequenceId = 0;
+ this.handlePacket();
+ }
+ }
+ }
+
+ addCommand(cmd) {
+ // this.compressedSequenceId = 0;
+ // this.sequenceId = 0;
+ if (this.config.debug) {
+ const commandName = cmd.constructor.name;
+ // eslint-disable-next-line no-console
+ console.log(`Add command: ${commandName}`);
+ cmd._commandName = commandName;
+ }
+ if (!this._command) {
+ this._command = cmd;
+ this.handlePacket();
+ } else {
+ this._commands.push(cmd);
+ }
+ return cmd;
+ }
+
+ format(sql, values) {
+ if (typeof this.config.queryFormat === 'function') {
+ return this.config.queryFormat.call(
+ this,
+ sql,
+ values,
+ this.config.timezone
+ );
+ }
+ const opts = {
+ sql: sql,
+ values: values
+ };
+ this._resolveNamedPlaceholders(opts);
+ return SqlString.format(
+ opts.sql,
+ opts.values,
+ this.config.stringifyObjects,
+ this.config.timezone
+ );
+ }
+
+ escape(value) {
+ return SqlString.escape(value, false, this.config.timezone);
+ }
+
+ escapeId(value) {
+ return SqlString.escapeId(value, false);
+ }
+
+ raw(sql) {
+ return SqlString.raw(sql);
+ }
+
+ _resolveNamedPlaceholders(options) {
+ let unnamed;
+ if (this.config.namedPlaceholders || options.namedPlaceholders) {
+ if (Array.isArray(options.values)) {
+ // if an array is provided as the values, assume the conversion is not necessary.
+ // this allows the usage of unnamed placeholders even if the namedPlaceholders flag is enabled.
+ return
+ }
+ if (convertNamedPlaceholders === null) {
+ convertNamedPlaceholders = require('named-placeholders')();
+ }
+ unnamed = convertNamedPlaceholders(options.sql, options.values);
+ options.sql = unnamed[0];
+ options.values = unnamed[1];
+ }
+ }
+
+ query(sql, values, cb) {
+ let cmdQuery;
+ if (sql.constructor === Commands.Query) {
+ cmdQuery = sql;
+ } else {
+ cmdQuery = Connection.createQuery(sql, values, cb, this.config);
+ }
+ this._resolveNamedPlaceholders(cmdQuery);
+ const rawSql = this.format(cmdQuery.sql, cmdQuery.values !== undefined ? cmdQuery.values : []);
+ cmdQuery.sql = rawSql;
+ return this.addCommand(cmdQuery);
+ }
+
+ pause() {
+ this._paused = true;
+ this.stream.pause();
+ }
+
+ resume() {
+ let packet;
+ this._paused = false;
+ while ((packet = this._paused_packets.shift())) {
+ this.handlePacket(packet);
+ // don't resume if packet hander paused connection
+ if (this._paused) {
+ return;
+ }
+ }
+ this.stream.resume();
+ }
+
+ // TODO: named placeholders support
+ prepare(options, cb) {
+ if (typeof options === 'string') {
+ options = { sql: options };
+ }
+ return this.addCommand(new Commands.Prepare(options, cb));
+ }
+
+ unprepare(sql) {
+ let options = {};
+ if (typeof sql === 'object') {
+ options = sql;
+ } else {
+ options.sql = sql;
+ }
+ const key = Connection.statementKey(options);
+ const stmt = this._statements.get(key);
+ if (stmt) {
+ this._statements.del(key);
+ stmt.close();
+ }
+ return stmt;
+ }
+
+ execute(sql, values, cb) {
+ let options = {};
+ if (typeof sql === 'object') {
+ // execute(options, cb)
+ options = sql;
+ if (typeof values === 'function') {
+ cb = values;
+ } else {
+ options.values = options.values || values;
+ }
+ } else if (typeof values === 'function') {
+ // execute(sql, cb)
+ cb = values;
+ options.sql = sql;
+ options.values = undefined;
+ } else {
+ // execute(sql, values, cb)
+ options.sql = sql;
+ options.values = values;
+ }
+ this._resolveNamedPlaceholders(options);
+ // check for values containing undefined
+ if (options.values) {
+ //If namedPlaceholder is not enabled and object is passed as bind parameters
+ if (!Array.isArray(options.values)) {
+ throw new TypeError(
+ 'Bind parameters must be array if namedPlaceholders parameter is not enabled'
+ );
+ }
+ options.values.forEach(val => {
+ //If namedPlaceholder is not enabled and object is passed as bind parameters
+ if (!Array.isArray(options.values)) {
+ throw new TypeError(
+ 'Bind parameters must be array if namedPlaceholders parameter is not enabled'
+ );
+ }
+ if (val === undefined) {
+ throw new TypeError(
+ 'Bind parameters must not contain undefined. To pass SQL NULL specify JS null'
+ );
+ }
+ if (typeof val === 'function') {
+ throw new TypeError(
+ 'Bind parameters must not contain function(s). To pass the body of a function as a string call .toString() first'
+ );
+ }
+ });
+ }
+ const executeCommand = new Commands.Execute(options, cb);
+ const prepareCommand = new Commands.Prepare(options, (err, stmt) => {
+ if (err) {
+ // skip execute command if prepare failed, we have main
+ // combined callback here
+ executeCommand.start = function() {
+ return null;
+ };
+ if (cb) {
+ cb(err);
+ } else {
+ executeCommand.emit('error', err);
+ }
+ executeCommand.emit('end');
+ return;
+ }
+ executeCommand.statement = stmt;
+ });
+ this.addCommand(prepareCommand);
+ this.addCommand(executeCommand);
+ return executeCommand;
+ }
+
+ changeUser(options, callback) {
+ if (!callback && typeof options === 'function') {
+ callback = options;
+ options = {};
+ }
+ const charsetNumber = options.charset
+ ? ConnectionConfig.getCharsetNumber(options.charset)
+ : this.config.charsetNumber;
+ return this.addCommand(
+ new Commands.ChangeUser(
+ {
+ user: options.user || this.config.user,
+ password: options.password || this.config.password,
+ passwordSha1: options.passwordSha1 || this.config.passwordSha1,
+ database: options.database || this.config.database,
+ timeout: options.timeout,
+ charsetNumber: charsetNumber,
+ currentConfig: this.config
+ },
+ err => {
+ if (err) {
+ err.fatal = true;
+ }
+ if (callback) {
+ callback(err);
+ }
+ }
+ )
+ );
+ }
+
+ // transaction helpers
+ beginTransaction(cb) {
+ return this.query('START TRANSACTION', cb);
+ }
+
+ commit(cb) {
+ return this.query('COMMIT', cb);
+ }
+
+ rollback(cb) {
+ return this.query('ROLLBACK', cb);
+ }
+
+ ping(cb) {
+ return this.addCommand(new Commands.Ping(cb));
+ }
+
+ _registerSlave(opts, cb) {
+ return this.addCommand(new Commands.RegisterSlave(opts, cb));
+ }
+
+ _binlogDump(opts, cb) {
+ return this.addCommand(new Commands.BinlogDump(opts, cb));
+ }
+
+ // currently just alias to close
+ destroy() {
+ this.close();
+ }
+
+ close() {
+ if (this.connectTimeout) {
+ Timers.clearTimeout(this.connectTimeout);
+ this.connectTimeout = null;
+ }
+ this._closing = true;
+ this.stream.end();
+ this.addCommand = this._addCommandClosedState;
+ }
+
+ createBinlogStream(opts) {
+ // TODO: create proper stream class
+ // TODO: use through2
+ let test = 1;
+ const stream = new Readable({ objectMode: true });
+ stream._read = function() {
+ return {
+ data: test++
+ };
+ };
+ this._registerSlave(opts, () => {
+ const dumpCmd = this._binlogDump(opts);
+ dumpCmd.on('event', ev => {
+ stream.push(ev);
+ });
+ dumpCmd.on('eof', () => {
+ stream.push(null);
+ // if non-blocking, then close stream to prevent errors
+ if (opts.flags && opts.flags & 0x01) {
+ this.close();
+ }
+ });
+ // TODO: pipe errors as well
+ });
+ return stream;
+ }
+
+ connect(cb) {
+ if (!cb) {
+ return;
+ }
+ if (this._fatalError || this._protocolError) {
+ return cb(this._fatalError || this._protocolError);
+ }
+ if (this._handshakePacket) {
+ return cb(null, this);
+ }
+ let connectCalled = 0;
+ function callbackOnce(isErrorHandler) {
+ return function(param) {
+ if (!connectCalled) {
+ if (isErrorHandler) {
+ cb(param);
+ } else {
+ cb(null, param);
+ }
+ }
+ connectCalled = 1;
+ };
+ }
+ this.once('error', callbackOnce(true));
+ this.once('connect', callbackOnce(false));
+ }
+
+ // ===================================
+ // outgoing server connection methods
+ // ===================================
+ writeColumns(columns) {
+ this.writePacket(Packets.ResultSetHeader.toPacket(columns.length));
+ columns.forEach(column => {
+ this.writePacket(
+ Packets.ColumnDefinition.toPacket(column, this.serverConfig.encoding)
+ );
+ });
+ this.writeEof();
+ }
+
+ // row is array of columns, not hash
+ writeTextRow(column) {
+ this.writePacket(
+ Packets.TextRow.toPacket(column, this.serverConfig.encoding)
+ );
+ }
+
+ writeTextResult(rows, columns) {
+ this.writeColumns(columns);
+ rows.forEach(row => {
+ const arrayRow = new Array(columns.length);
+ columns.forEach(column => {
+ arrayRow.push(row[column.name]);
+ });
+ this.writeTextRow(arrayRow);
+ });
+ this.writeEof();
+ }
+
+ writeEof(warnings, statusFlags) {
+ this.writePacket(Packets.EOF.toPacket(warnings, statusFlags));
+ }
+
+ writeOk(args) {
+ if (!args) {
+ args = { affectedRows: 0 };
+ }
+ this.writePacket(Packets.OK.toPacket(args, this.serverConfig.encoding));
+ }
+
+ writeError(args) {
+ // if we want to send error before initial hello was sent, use default encoding
+ const encoding = this.serverConfig ? this.serverConfig.encoding : 'cesu8';
+ this.writePacket(Packets.Error.toPacket(args, encoding));
+ }
+
+ serverHandshake(args) {
+ this.serverConfig = args;
+ this.serverConfig.encoding =
+ CharsetToEncoding[this.serverConfig.characterSet];
+ return this.addCommand(new Commands.ServerHandshake(args));
+ }
+
+ // ===============================================================
+ end(callback) {
+ if (this.config.isServer) {
+ this._closing = true;
+ const quitCmd = new EventEmitter();
+ setImmediate(() => {
+ this.stream.end();
+ quitCmd.emit('end');
+ });
+ return quitCmd;
+ }
+ // trigger error if more commands enqueued after end command
+ const quitCmd = this.addCommand(new Commands.Quit(callback));
+ this.addCommand = this._addCommandClosedState;
+ return quitCmd;
+ }
+
+ static createQuery(sql, values, cb, config) {
+ let options = {
+ rowsAsArray: config.rowsAsArray
+ };
+ if (typeof sql === 'object') {
+ // query(options, cb)
+ options = sql;
+ if (typeof values === 'function') {
+ cb = values;
+ } else if (values !== undefined) {
+ options.values = values;
+ }
+ } else if (typeof values === 'function') {
+ // query(sql, cb)
+ cb = values;
+ options.sql = sql;
+ options.values = undefined;
+ } else {
+ // query(sql, values, cb)
+ options.sql = sql;
+ options.values = values;
+ }
+ return new Commands.Query(options, cb);
+ }
+
+ static statementKey(options) {
+ return (
+ `${typeof options.nestTables}/${options.nestTables}/${options.rowsAsArray}${options.sql}`
+ );
+ }
+}
+
+if (Tls.TLSSocket) {
+ // not supported
+} else {
+ Connection.prototype.startTLS = function _startTLS(onSecure) {
+ if (this.config.debug) {
+ // eslint-disable-next-line no-console
+ console.log('Upgrading connection to TLS');
+ }
+ const crypto = require('crypto');
+ const config = this.config;
+ const stream = this.stream;
+ const rejectUnauthorized = this.config.ssl.rejectUnauthorized;
+ const credentials = crypto.createCredentials({
+ key: config.ssl.key,
+ cert: config.ssl.cert,
+ passphrase: config.ssl.passphrase,
+ ca: config.ssl.ca,
+ ciphers: config.ssl.ciphers
+ });
+ const securePair = Tls.createSecurePair(
+ credentials,
+ false,
+ true,
+ rejectUnauthorized
+ );
+
+ if (stream.ondata) {
+ stream.ondata = null;
+ }
+ stream.removeAllListeners('data');
+ stream.pipe(securePair.encrypted);
+ securePair.encrypted.pipe(stream);
+ securePair.cleartext.on('data', data => {
+ this.packetParser.execute(data);
+ });
+ this.write = function(buffer) {
+ securePair.cleartext.write(buffer);
+ };
+ securePair.on('secure', () => {
+ onSecure(rejectUnauthorized ? securePair.ssl.verifyError() : null);
+ });
+ };
+}
+
+module.exports = Connection;