var events = require('events'); var util = require('util'); function Stream(){ events.EventEmitter.call(this); } util.inherits(Stream, events.EventEmitter); exports.Stream = Stream; Stream.prototype.pipe = function (dest){ var source = this; var options, filter; if (typeof arguments[1] == 'object') { options = arguments[1]; filter = arguments[2]; } else { filter = arguments[1]; } function ondata(chunk){ if (dest.writable) { if (false === _AN_Call_write('write', dest, chunk)) source.pause(); } } if (!filter) { source.on('data', ondata); } else { var wait = false ; var waitQueue = [] ; function done(){ wait = false ; if (dest.writable && _AN_Read_length('length', waitQueue)) { wait = true ; filter(waitQueue.shift(), ondata, done); } } source.on('data', function (d){ if (wait) { waitQueue.push(d); source.pause(); } else { wait = true ; filter(d, ondata, done); } } ); } function ondrain(){ if (source.readable) source.resume(); } dest.on('drain', ondrain); if (!options || options.end !== false ) { function onend(){ dest.end(); } source.on('end', onend); } dest.on('close', function (){ source.removeListener('data', ondata); dest.removeListener('drain', ondrain); source.removeListener('end', onend); } ); if (!source.pause) { source.pause = function (){ source.emit('pause'); } ; } if (!source.resume) { source.resume = function (){ source.emit('resume'); } ; } dest.on('pause', function (){ source.pause(); } ); dest.on('resume', function (){ if (source.readable) source.resume(); } ); } ;