aboutsummaryrefslogtreecommitdiff
path: root/node_modules/mysql2/lib/commands/query.js
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/mysql2/lib/commands/query.js')
-rw-r--r--node_modules/mysql2/lib/commands/query.js321
1 files changed, 321 insertions, 0 deletions
diff --git a/node_modules/mysql2/lib/commands/query.js b/node_modules/mysql2/lib/commands/query.js
new file mode 100644
index 0000000..e51da3c
--- /dev/null
+++ b/node_modules/mysql2/lib/commands/query.js
@@ -0,0 +1,321 @@
+'use strict';
+
+const process = require('process');
+const Timers = require('timers');
+
+const Readable = require('stream').Readable;
+
+const Command = require('./command.js');
+const Packets = require('../packets/index.js');
+const getTextParser = require('../parsers/text_parser.js');
+const ServerStatus = require('../constants/server_status.js');
+
+const EmptyPacket = new Packets.Packet(0, Buffer.allocUnsafe(4), 0, 4);
+
+// http://dev.mysql.com/doc/internals/en/com-query.html
+class Query extends Command {
+ constructor(options, callback) {
+ super();
+ this.sql = options.sql;
+ this.values = options.values;
+ this._queryOptions = options;
+ this.namedPlaceholders = options.namedPlaceholders || false;
+ this.onResult = callback;
+ this.timeout = options.timeout;
+ this.queryTimeout = null;
+ this._fieldCount = 0;
+ this._rowParser = null;
+ this._fields = [];
+ this._rows = [];
+ this._receivedFieldsCount = 0;
+ this._resultIndex = 0;
+ this._localStream = null;
+ this._unpipeStream = function() {};
+ this._streamFactory = options.infileStreamFactory;
+ this._connection = null;
+ }
+
+ then() {
+ const err =
+ "You have tried to call .then(), .catch(), or invoked await on the result of query that is not a promise, which is a programming error. Try calling con.promise().query(), or require('mysql2/promise') instead of 'mysql2' for a promise-compatible version of the query interface. To learn how to use async/await or Promises check out documentation at https://www.npmjs.com/package/mysql2#using-promise-wrapper, or the mysql2 documentation at https://github.com/sidorares/node-mysql2/tree/master/documentation/Promise-Wrapper.md";
+ // eslint-disable-next-line
+ console.log(err);
+ throw new Error(err);
+ }
+
+ /* eslint no-unused-vars: ["error", { "argsIgnorePattern": "^_" }] */
+ start(_packet, connection) {
+ if (connection.config.debug) {
+ // eslint-disable-next-line
+ console.log(' Sending query command: %s', this.sql);
+ }
+ this._connection = connection;
+ this.options = Object.assign({}, connection.config, this._queryOptions);
+ this._setTimeout();
+
+ const cmdPacket = new Packets.Query(
+ this.sql,
+ connection.config.charsetNumber
+ );
+ connection.writePacket(cmdPacket.toPacket(1));
+ return Query.prototype.resultsetHeader;
+ }
+
+ done() {
+ this._unpipeStream();
+ // if all ready timeout, return null directly
+ if (this.timeout && !this.queryTimeout) {
+ return null;
+ }
+ // else clear timer
+ if (this.queryTimeout) {
+ Timers.clearTimeout(this.queryTimeout);
+ this.queryTimeout = null;
+ }
+ if (this.onResult) {
+ let rows, fields;
+ if (this._resultIndex === 0) {
+ rows = this._rows[0];
+ fields = this._fields[0];
+ } else {
+ rows = this._rows;
+ fields = this._fields;
+ }
+ if (fields) {
+ process.nextTick(() => {
+ this.onResult(null, rows, fields);
+ });
+ } else {
+ process.nextTick(() => {
+ this.onResult(null, rows);
+ });
+ }
+ }
+ return null;
+ }
+
+ doneInsert(rs) {
+ if (this._localStreamError) {
+ if (this.onResult) {
+ this.onResult(this._localStreamError, rs);
+ } else {
+ this.emit('error', this._localStreamError);
+ }
+ return null;
+ }
+ this._rows.push(rs);
+ this._fields.push(void 0);
+ this.emit('fields', void 0);
+ this.emit('result', rs);
+ if (rs.serverStatus & ServerStatus.SERVER_MORE_RESULTS_EXISTS) {
+ this._resultIndex++;
+ return this.resultsetHeader;
+ }
+ return this.done();
+ }
+
+ resultsetHeader(packet, connection) {
+ const rs = new Packets.ResultSetHeader(packet, connection);
+ this._fieldCount = rs.fieldCount;
+ if (connection.config.debug) {
+ // eslint-disable-next-line
+ console.log(
+ ` Resultset header received, expecting ${rs.fieldCount} column definition packets`
+ );
+ }
+ if (this._fieldCount === 0) {
+ return this.doneInsert(rs);
+ }
+ if (this._fieldCount === null) {
+ return this._streamLocalInfile(connection, rs.infileName);
+ }
+ this._receivedFieldsCount = 0;
+ this._rows.push([]);
+ this._fields.push([]);
+ return this.readField;
+ }
+
+ _streamLocalInfile(connection, path) {
+ if (this._streamFactory) {
+ this._localStream = this._streamFactory(path);
+ } else {
+ this._localStreamError = new Error(
+ `As a result of LOCAL INFILE command server wants to read ${path} file, but as of v2.0 you must provide streamFactory option returning ReadStream.`
+ );
+ connection.writePacket(EmptyPacket);
+ return this.infileOk;
+ }
+
+ const onConnectionError = () => {
+ this._unpipeStream();
+ };
+ const onDrain = () => {
+ this._localStream.resume();
+ };
+ const onPause = () => {
+ this._localStream.pause();
+ };
+ const onData = function(data) {
+ const dataWithHeader = Buffer.allocUnsafe(data.length + 4);
+ data.copy(dataWithHeader, 4);
+ connection.writePacket(
+ new Packets.Packet(0, dataWithHeader, 0, dataWithHeader.length)
+ );
+ };
+ const onEnd = () => {
+ connection.removeListener('error', onConnectionError);
+ connection.writePacket(EmptyPacket);
+ };
+ const onError = err => {
+ this._localStreamError = err;
+ connection.removeListener('error', onConnectionError);
+ connection.writePacket(EmptyPacket);
+ };
+ this._unpipeStream = () => {
+ connection.stream.removeListener('pause', onPause);
+ connection.stream.removeListener('drain', onDrain);
+ this._localStream.removeListener('data', onData);
+ this._localStream.removeListener('end', onEnd);
+ this._localStream.removeListener('error', onError);
+ };
+ connection.stream.on('pause', onPause);
+ connection.stream.on('drain', onDrain);
+ this._localStream.on('data', onData);
+ this._localStream.on('end', onEnd);
+ this._localStream.on('error', onError);
+ connection.once('error', onConnectionError);
+ return this.infileOk;
+ }
+
+ readField(packet, connection) {
+ this._receivedFieldsCount++;
+ // Often there is much more data in the column definition than in the row itself
+ // If you set manually _fields[0] to array of ColumnDefinition's (from previous call)
+ // you can 'cache' result of parsing. Field packets still received, but ignored in that case
+ // this is the reason _receivedFieldsCount exist (otherwise we could just use current length of fields array)
+ if (this._fields[this._resultIndex].length !== this._fieldCount) {
+ const field = new Packets.ColumnDefinition(
+ packet,
+ connection.clientEncoding
+ );
+ this._fields[this._resultIndex].push(field);
+ if (connection.config.debug) {
+ /* eslint-disable no-console */
+ console.log(' Column definition:');
+ console.log(` name: ${field.name}`);
+ console.log(` type: ${field.columnType}`);
+ console.log(` flags: ${field.flags}`);
+ /* eslint-enable no-console */
+ }
+ }
+ // last field received
+ if (this._receivedFieldsCount === this._fieldCount) {
+ const fields = this._fields[this._resultIndex];
+ this.emit('fields', fields);
+ this._rowParser = new (getTextParser(fields, this.options, connection.config))(fields);
+ return Query.prototype.fieldsEOF;
+ }
+ return Query.prototype.readField;
+ }
+
+ fieldsEOF(packet, connection) {
+ // check EOF
+ if (!packet.isEOF()) {
+ return connection.protocolError('Expected EOF packet');
+ }
+ return this.row;
+ }
+
+ /* eslint no-unused-vars: ["error", { "argsIgnorePattern": "^_" }] */
+ row(packet, _connection) {
+ if (packet.isEOF()) {
+ const status = packet.eofStatusFlags();
+ const moreResults = status & ServerStatus.SERVER_MORE_RESULTS_EXISTS;
+ if (moreResults) {
+ this._resultIndex++;
+ return Query.prototype.resultsetHeader;
+ }
+ return this.done();
+ }
+ let row;
+ try {
+ row = this._rowParser.next(
+ packet,
+ this._fields[this._resultIndex],
+ this.options
+ );
+ } catch (err) {
+ this._localStreamError = err;
+ return this.doneInsert(null);
+ }
+ if (this.onResult) {
+ this._rows[this._resultIndex].push(row);
+ } else {
+ this.emit('result', row);
+ }
+ return Query.prototype.row;
+ }
+
+ infileOk(packet, connection) {
+ const rs = new Packets.ResultSetHeader(packet, connection);
+ return this.doneInsert(rs);
+ }
+
+ stream(options) {
+ options = options || {};
+ options.objectMode = true;
+ const stream = new Readable(options);
+ stream._read = () => {
+ this._connection && this._connection.resume();
+ };
+ this.on('result', row => {
+ if (!stream.push(row)) {
+ this._connection.pause();
+ }
+ stream.emit('result', row); // replicate old emitter
+ });
+ this.on('error', err => {
+ stream.emit('error', err); // Pass on any errors
+ });
+ this.on('end', () => {
+ stream.push(null); // pushing null, indicating EOF
+ stream.emit('close'); // notify readers that query has completed
+ });
+ this.on('fields', fields => {
+ stream.emit('fields', fields); // replicate old emitter
+ });
+ return stream;
+ }
+
+ _setTimeout() {
+ if (this.timeout) {
+ const timeoutHandler = this._handleTimeoutError.bind(this);
+ this.queryTimeout = Timers.setTimeout(
+ timeoutHandler,
+ this.timeout
+ );
+ }
+ }
+
+ _handleTimeoutError() {
+ if (this.queryTimeout) {
+ Timers.clearTimeout(this.queryTimeout);
+ this.queryTimeout = null;
+ }
+
+ const err = new Error('Query inactivity timeout');
+ err.errorno = 'PROTOCOL_SEQUENCE_TIMEOUT';
+ err.code = 'PROTOCOL_SEQUENCE_TIMEOUT';
+ err.syscall = 'query';
+
+ if (this.onResult) {
+ this.onResult(err);
+ } else {
+ this.emit('error', err);
+ }
+ }
+}
+
+Query.prototype.catch = Query.prototype.then;
+
+module.exports = Query;