aboutsummaryrefslogtreecommitdiff
path: root/node_modules/asynckit/lib/streamify.js
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/asynckit/lib/streamify.js')
-rw-r--r--node_modules/asynckit/lib/streamify.js141
1 files changed, 141 insertions, 0 deletions
diff --git a/node_modules/asynckit/lib/streamify.js b/node_modules/asynckit/lib/streamify.js
new file mode 100644
index 0000000..f56a1c9
--- /dev/null
+++ b/node_modules/asynckit/lib/streamify.js
@@ -0,0 +1,141 @@
+var async = require('./async.js');
+
+// API
+module.exports = {
+ iterator: wrapIterator,
+ callback: wrapCallback
+};
+
+/**
+ * Wraps iterators with long signature
+ *
+ * @this ReadableAsyncKit#
+ * @param {function} iterator - function to wrap
+ * @returns {function} - wrapped function
+ */
+function wrapIterator(iterator)
+{
+ var stream = this;
+
+ return function(item, key, cb)
+ {
+ var aborter
+ , wrappedCb = async(wrapIteratorCallback.call(stream, cb, key))
+ ;
+
+ stream.jobs[key] = wrappedCb;
+
+ // it's either shortcut (item, cb)
+ if (iterator.length == 2)
+ {
+ aborter = iterator(item, wrappedCb);
+ }
+ // or long format (item, key, cb)
+ else
+ {
+ aborter = iterator(item, key, wrappedCb);
+ }
+
+ return aborter;
+ };
+}
+
+/**
+ * Wraps provided callback function
+ * allowing to execute snitch function before
+ * real callback
+ *
+ * @this ReadableAsyncKit#
+ * @param {function} callback - function to wrap
+ * @returns {function} - wrapped function
+ */
+function wrapCallback(callback)
+{
+ var stream = this;
+
+ var wrapped = function(error, result)
+ {
+ return finisher.call(stream, error, result, callback);
+ };
+
+ return wrapped;
+}
+
+/**
+ * Wraps provided iterator callback function
+ * makes sure snitch only called once,
+ * but passes secondary calls to the original callback
+ *
+ * @this ReadableAsyncKit#
+ * @param {function} callback - callback to wrap
+ * @param {number|string} key - iteration key
+ * @returns {function} wrapped callback
+ */
+function wrapIteratorCallback(callback, key)
+{
+ var stream = this;
+
+ return function(error, output)
+ {
+ // don't repeat yourself
+ if (!(key in stream.jobs))
+ {
+ callback(error, output);
+ return;
+ }
+
+ // clean up jobs
+ delete stream.jobs[key];
+
+ return streamer.call(stream, error, {key: key, value: output}, callback);
+ };
+}
+
+/**
+ * Stream wrapper for iterator callback
+ *
+ * @this ReadableAsyncKit#
+ * @param {mixed} error - error response
+ * @param {mixed} output - iterator output
+ * @param {function} callback - callback that expects iterator results
+ */
+function streamer(error, output, callback)
+{
+ if (error && !this.error)
+ {
+ this.error = error;
+ this.pause();
+ this.emit('error', error);
+ // send back value only, as expected
+ callback(error, output && output.value);
+ return;
+ }
+
+ // stream stuff
+ this.push(output);
+
+ // back to original track
+ // send back value only, as expected
+ callback(error, output && output.value);
+}
+
+/**
+ * Stream wrapper for finishing callback
+ *
+ * @this ReadableAsyncKit#
+ * @param {mixed} error - error response
+ * @param {mixed} output - iterator output
+ * @param {function} callback - callback that expects final results
+ */
+function finisher(error, output, callback)
+{
+ // signal end of the stream
+ // only for successfully finished streams
+ if (!error)
+ {
+ this.push(null);
+ }
+
+ // back to original track
+ callback(error, output);
+}