aboutsummaryrefslogtreecommitdiff
path: root/node_modules/mysql2/lib/compressed_protocol.js
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/mysql2/lib/compressed_protocol.js')
-rw-r--r--node_modules/mysql2/lib/compressed_protocol.js127
1 files changed, 127 insertions, 0 deletions
diff --git a/node_modules/mysql2/lib/compressed_protocol.js b/node_modules/mysql2/lib/compressed_protocol.js
new file mode 100644
index 0000000..67a7c39
--- /dev/null
+++ b/node_modules/mysql2/lib/compressed_protocol.js
@@ -0,0 +1,127 @@
+'use strict';
+
+// connection mixins
+// implementation of http://dev.mysql.com/doc/internals/en/compression.html
+
+const zlib = require('zlib');
+const PacketParser = require('./packet_parser.js');
+
+function handleCompressedPacket(packet) {
+ // eslint-disable-next-line consistent-this, no-invalid-this
+ const connection = this;
+ const deflatedLength = packet.readInt24();
+ const body = packet.readBuffer();
+
+ if (deflatedLength !== 0) {
+ connection.inflateQueue.push(task => {
+ zlib.inflate(body, (err, data) => {
+ if (err) {
+ connection._handleNetworkError(err);
+ return;
+ }
+ connection._bumpCompressedSequenceId(packet.numPackets);
+ connection._inflatedPacketsParser.execute(data);
+ task.done();
+ });
+ });
+ } else {
+ connection.inflateQueue.push(task => {
+ connection._bumpCompressedSequenceId(packet.numPackets);
+ connection._inflatedPacketsParser.execute(body);
+ task.done();
+ });
+ }
+}
+
+function writeCompressed(buffer) {
+ // http://dev.mysql.com/doc/internals/en/example-several-mysql-packets.html
+ // note: sending a MySQL Packet of the size 2^24−5 to 2^24−1 via compression
+ // leads to at least one extra compressed packet.
+ // (this is because "length of the packet before compression" need to fit
+ // into 3 byte unsigned int. "length of the packet before compression" includes
+ // 4 byte packet header, hence 2^24−5)
+ const MAX_COMPRESSED_LENGTH = 16777210;
+ let start;
+ if (buffer.length > MAX_COMPRESSED_LENGTH) {
+ for (start = 0; start < buffer.length; start += MAX_COMPRESSED_LENGTH) {
+ writeCompressed.call(
+ // eslint-disable-next-line no-invalid-this
+ this,
+ buffer.slice(start, start + MAX_COMPRESSED_LENGTH)
+ );
+ }
+ return;
+ }
+
+ // eslint-disable-next-line no-invalid-this, consistent-this
+ const connection = this;
+
+ let packetLen = buffer.length;
+ const compressHeader = Buffer.allocUnsafe(7);
+
+ // seqqueue is used here because zlib async execution is routed via thread pool
+ // internally and when we have multiple compressed packets arriving we need
+ // to assemble uncompressed result sequentially
+ (function(seqId) {
+ connection.deflateQueue.push(task => {
+ zlib.deflate(buffer, (err, compressed) => {
+ if (err) {
+ connection._handleFatalError(err);
+ return;
+ }
+ let compressedLength = compressed.length;
+
+ if (compressedLength < packetLen) {
+ compressHeader.writeUInt8(compressedLength & 0xff, 0);
+ compressHeader.writeUInt16LE(compressedLength >> 8, 1);
+ compressHeader.writeUInt8(seqId, 3);
+ compressHeader.writeUInt8(packetLen & 0xff, 4);
+ compressHeader.writeUInt16LE(packetLen >> 8, 5);
+ connection.writeUncompressed(compressHeader);
+ connection.writeUncompressed(compressed);
+ } else {
+ // http://dev.mysql.com/doc/internals/en/uncompressed-payload.html
+ // To send an uncompressed payload:
+ // - set length of payload before compression to 0
+ // - the compressed payload contains the uncompressed payload instead.
+ compressedLength = packetLen;
+ packetLen = 0;
+ compressHeader.writeUInt8(compressedLength & 0xff, 0);
+ compressHeader.writeUInt16LE(compressedLength >> 8, 1);
+ compressHeader.writeUInt8(seqId, 3);
+ compressHeader.writeUInt8(packetLen & 0xff, 4);
+ compressHeader.writeUInt16LE(packetLen >> 8, 5);
+ connection.writeUncompressed(compressHeader);
+ connection.writeUncompressed(buffer);
+ }
+ task.done();
+ });
+ });
+ })(connection.compressedSequenceId);
+ connection._bumpCompressedSequenceId(1);
+}
+
+function enableCompression(connection) {
+ connection._lastWrittenPacketId = 0;
+ connection._lastReceivedPacketId = 0;
+
+ connection._handleCompressedPacket = handleCompressedPacket;
+ connection._inflatedPacketsParser = new PacketParser(p => {
+ connection.handlePacket(p);
+ }, 4);
+ connection._inflatedPacketsParser._lastPacket = 0;
+ connection.packetParser = new PacketParser(packet => {
+ connection._handleCompressedPacket(packet);
+ }, 7);
+
+ connection.writeUncompressed = connection.write;
+ connection.write = writeCompressed;
+
+ const seqqueue = require('seq-queue');
+ connection.inflateQueue = seqqueue.createQueue();
+ connection.deflateQueue = seqqueue.createQueue();
+}
+
+module.exports = {
+ enableCompression: enableCompression
+};