var util = require('util'); var events = require('events'); var stream = require('stream'); var timers = require('timers'); var kMinPoolSpace = 128; var kPoolSize = 40 * 1024; var debug; if (process.env.NODE_DEBUG && /net/.test(process.env.NODE_DEBUG)) { debug = function (x){ util.error.apply(this, arguments); } ; } else { debug = function (){ } ; } var binding = process.binding('net'); var FreeList = require('freelist').FreeList; var IOWatcher = process.binding('io_watcher').IOWatcher; var constants = process.binding('constants'); var assert = require('assert').ok; var socket = binding.socket; var bind = binding.bind; var connect = binding.connect; var listen = binding.listen; var accept = binding.accept; var close = binding.close; var shutdown = binding.shutdown; var read = binding.read; var write = binding.write; var toRead = binding.toRead; var setNoDelay = binding.setNoDelay; var setKeepAlive = binding.setKeepAlive; var socketError = binding.socketError; var getsockname = binding.getsockname; var errnoException = binding.errnoException; var sendMsg = binding.sendMsg; var recvMsg = binding.recvMsg; var EINPROGRESS = constants.EINPROGRESS || constants.WSAEINPROGRESS; var ENOENT = constants.ENOENT; var EMFILE = constants.EMFILE; var END_OF_FILE = 42; var ioWatchers = new FreeList('iowatcher', 100, function (){ return new IOWatcher(); } ); exports.isIP = binding.isIP; exports.isIPv4 = function (input){ if (binding.isIP(input) === 4) { return true ; } return false ; } ; exports.isIPv6 = function (input){ if (binding.isIP(input) === 6) { return true ; } return false ; } ; var pool = null ; function allocNewPool(){ pool = new Buffer(kPoolSize); pool.used = 0; } var emptyBuffer = null ; function allocEmptyBuffer(){ emptyBuffer = new Buffer(1); emptyBuffer.sent = 0; emptyBuffer.length = 0; } function setImplmentationMethods(self){ function noData(buf, off, len){ return !buf || (off != undefined && off >= _AN_Read_length('length', buf)) || (len == 0); } ; if (self.type == 'unix') { self._writeImpl = function (buf, off, len, fd, flags){ if (fd && noData(buf, off, len)) { throw new Error('File descriptors can only be written with data') } return sendMsg(self.fd, buf, off, len, fd, flags); } ; self._readImpl = function (buf, off, len){ var bytesRead = recvMsg(self.fd, buf, off, len); if (recvMsg.fd !== null ) { var fd = recvMsg.fd; process.nextTick(function (){ self.emit('fd', fd); } ); } return bytesRead; } ; } else { self._writeImpl = function (buf, off, len, fd, flags){ return write(self.fd, buf, off, len); } ; self._readImpl = function (buf, off, len){ return read(self.fd, buf, off, len); } ; } self._shutdownImpl = function (){ shutdown(self.fd, 'write'); } ; } function onReadable(readable, writable){ assert(this.socket); var socket = this.socket; socket._onReadable(); } function onWritable(readable, writable){ assert(this.socket); var socket = this.socket; if (socket._connecting) { assert(socket.writable); socket._onConnect(); } else { socket._onWritable(); } } function initSocket(self){ self._readWatcher = ioWatchers.alloc(); self._readWatcher.socket = self; self._readWatcher.callback = onReadable; self.readable = false ; self._writeQueue = [] ; self._writeQueueEncoding = [] ; self._writeQueueFD = [] ; self._writeQueueCallbacks = [] ; self.bufferSize = 0; self._writeWatcher = ioWatchers.alloc(); self._writeWatcher.socket = self; self._writeWatcher.callback = onWritable; self.writable = false ; } function Socket(options){ if (!(this instanceof Socket)) return new Socket(arguments[0], arguments[1]); stream.Stream.call(this); this.bufferSize = 0; this.fd = null ; this.type = null ; this.allowHalfOpen = false ; if (typeof options == 'object') { this.fd = options.fd !== undefined? parseInt(options.fd, 10): null ; this.type = options.type || null ; this.allowHalfOpen = options.allowHalfOpen || false ; } else if (typeof options == 'number') { this.fd = arguments[0]; this.type = arguments[1]; } if (parseInt(this.fd, 10) >= 0) { _AN_Call_open('open', this, this.fd, this.type); } else { setImplmentationMethods(this); } } util.inherits(Socket, stream.Stream); exports.Socket = Socket; exports.Stream = Socket; Socket.prototype._onTimeout = function (){ this.emit('timeout'); } ; Socket.prototype.open = function (fd, type){ initSocket(this); this.fd = fd; this.type = type || null ; this.readable = true ; setImplmentationMethods(this); this._writeWatcher.set(this.fd, false , true ); this.writable = true ; } ; exports.createConnection = function (port, host){ var s = new Socket(); s.connect(port, host); return s; } ; Object.defineProperty(Socket.prototype, 'readyState', { get: function (){ if (this._connecting) { return 'opening'; } else if (this.readable && this.writable) { assert(typeof this.fd == 'number'); return 'open'; } else if (this.readable && !this.writable) { assert(typeof this.fd == 'number'); return 'readOnly'; } else if (!this.readable && this.writable) { assert(typeof this.fd == 'number'); return 'writeOnly'; } else { assert(typeof this.fd != 'number'); return 'closed'; } } } ); Socket.prototype.write = function (data){ var encoding, fd, cb; assert(this.bufferSize >= 0); if (typeof arguments[1] == 'string') { encoding = arguments[1]; if (typeof arguments[2] == 'number') { fd = arguments[2]; cb = arguments[3]; } else { cb = arguments[2]; } } else if (typeof arguments[1] == 'number') { fd = arguments[1]; cb = arguments[2]; } else if (typeof arguments[2] == 'number') { encoding = arguments[1]; fd = arguments[2]; cb = arguments[3]; } else { cb = arguments[1]; } if (this._connecting || (this._writeQueue && _AN_Read_length('length', this._writeQueue))) { if (!this._writeQueue) { this.bufferSize = 0; this._writeQueue = [] ; this._writeQueueEncoding = [] ; this._writeQueueFD = [] ; this._writeQueueCallbacks = [] ; } if (this._writeQueueLast() === END_OF_FILE) { throw new Error('Socket.end() called already; cannot write.') } var last = _AN_Read_length('length', this._writeQueue) - 1; this.bufferSize += _AN_Read_length('length', data); if (typeof data == 'string' && _AN_Read_length('length', this._writeQueue) && typeof this._writeQueue[last] === 'string' && this._writeQueueEncoding[last] === encoding) { this._writeQueue[last] += data; if (cb) { if (!this._writeQueueCallbacks[last]) { this._writeQueueCallbacks[last] = cb; } else { this._writeQueueCallbacks[last] = function (){ this._writeQueueCallbacks[last](); cb(); } ; } } } else { this._writeQueue.push(data); this._writeQueueEncoding.push(encoding); this._writeQueueCallbacks.push(cb); } if (fd != undefined) { this._writeQueueFD.push(fd); } this._onBufferChange(); return false ; } else { return this._writeOut(data, encoding, fd, cb); } } ; Socket.prototype._writeOut = function (data, encoding, fd, cb){ if (!this.writable) { throw new Error('Socket is not writable') } var buffer, off, len; var bytesWritten, charsWritten; var queuedData = false ; if (typeof data != 'string') { buffer = data; off = 0; len = _AN_Read_length('length', data); } else { assert(typeof data == 'string'); if (!pool || _AN_Read_length('length', pool) - pool.used < kMinPoolSpace) { pool = null ; allocNewPool(); } if (!encoding || encoding == 'utf8' || encoding == 'utf-8') { bytesWritten = _AN_Call_write('write', pool, data, 'utf8', pool.used); charsWritten = Buffer._charsWritten; } else { bytesWritten = _AN_Call_write('write', pool, data, encoding, pool.used); charsWritten = bytesWritten; } if (encoding && _AN_Read_length('length', data) > 0) { assert(bytesWritten > 0); } buffer = pool; len = bytesWritten; off = pool.used; pool.used += bytesWritten; debug('wrote ' + bytesWritten + ' bytes to pool'); if (charsWritten != _AN_Read_length('length', data)) { assert(_AN_Read_length('length', data) > charsWritten); this.bufferSize += _AN_Read_length('length', data) - charsWritten; this._writeQueue.unshift(data.slice(charsWritten)); this._writeQueueEncoding.unshift(encoding); this._writeQueueCallbacks.unshift(cb); this._writeWatcher.start(); this._onBufferChange(); queuedData = true ; } } try { bytesWritten = this._writeImpl(buffer, off, len, fd, 0); } catch (e) { this.destroy(e); return false ; } debug('wrote ' + bytesWritten + ' to socket. [fd, off, len] = ' + JSON.stringify([this.fd, off, len] ) + '\n'); timers.active(this); if (bytesWritten == len) { if (buffer === pool) { buffer.used -= len; } if (queuedData) { return false ; } else { if (cb) cb(); return true ; } } this._writeWatcher.start(); var leftOver = buffer.slice(off + bytesWritten, off + len); leftOver.used = _AN_Read_length('length', leftOver); this.bufferSize += _AN_Read_length('length', leftOver); this._writeQueue.unshift(leftOver); this._writeQueueEncoding.unshift(null ); this._writeQueueCallbacks.unshift(cb); this._onBufferChange(); if (!bytesWritten) { this._writeQueueFD.unshift(fd); } return false ; } ; Socket.prototype._onBufferChange = function (){ ; } ; Socket.prototype.flush = function (){ while (this._writeQueue && _AN_Read_length('length', this._writeQueue)){ var data = this._writeQueue.shift(); var encoding = this._writeQueueEncoding.shift(); var cb = this._writeQueueCallbacks.shift(); var fd = this._writeQueueFD.shift(); if (data === END_OF_FILE) { this._shutdown(); return true ; } this.bufferSize -= _AN_Read_length('length', data); this._onBufferChange(); var flushed = this._writeOut(data, encoding, fd, cb); if (!flushed) return false ; } if (this._writeWatcher) this._writeWatcher.stop(); return true ; } ; Socket.prototype._writeQueueLast = function (){ return _AN_Read_length('length', this._writeQueue) > 0? this._writeQueue[_AN_Read_length('length', this._writeQueue) - 1]: null ; } ; Socket.prototype.setEncoding = function (encoding){ var StringDecoder = require('string_decoder').StringDecoder; this._decoder = new StringDecoder(encoding); } ; function doConnect(socket, port, host){ timers.active(socket); try { connect(socket.fd, port, host); } catch (e) { socket.destroy(e); return ; } debug('connecting to ' + host + ' : ' + port); socket._readWatcher.set(socket.fd, true , false ); socket._writeWatcher.set(socket.fd, false , true ); socket._writeWatcher.start(); } function toPort(x){ return (x = Number(x)) >= 0? x: false ; } Socket.prototype._onConnect = function (){ var errno = socketError(this.fd); if (errno == 0) { this._connecting = false ; this.resume(); assert(this.writable); this.readable = this.writable = true ; try { this.emit('connect'); } catch (e) { this.destroy(e); return ; } if (this._writeQueue && _AN_Read_length('length', this._writeQueue)) { this._onWritable(); } } else if (errno != EINPROGRESS) { this.destroy(errnoException(errno, 'connect')); } } ; Socket.prototype._onWritable = function (){ if (this.flush()) { if (this._events && this._events.drain) this.emit('drain'); if (this.ondrain) this.ondrain(); if (this.__destroyOnDrain) this.destroy(); } } ; Socket.prototype._onReadable = function (){ var self = this; if (!pool || _AN_Read_length('length', pool) - pool.used < kMinPoolSpace) { pool = null ; allocNewPool(); } var bytesRead; try { bytesRead = self._readImpl(pool, pool.used, _AN_Read_length('length', pool) - pool.used); } catch (e) { self.destroy(e); return ; } if (bytesRead === 0) { self.readable = false ; self._readWatcher.stop(); if (!self.writable) self.destroy(); if (!self.allowHalfOpen) self.end(); if (self._events && self._events.end) self.emit('end'); if (self.onend) self.onend(); } else if (bytesRead > 0) { timers.active(self); var start = pool.used; var end = pool.used + bytesRead; pool.used += bytesRead; if (self._decoder) { var string = _AN_Call_write('write', self._decoder, pool.slice(start, end)); if (string.length) self.emit('data', string); } else { if (self._events && self._events.data) { self.emit('data', pool.slice(start, end)); } } if (self.ondata) self.ondata(pool, start, end); } } ; Socket.prototype.connect = function (){ var self = this; initSocket(self); if (self.fd) throw new Error('Socket already opened') if (!self._readWatcher) throw new Error('No readWatcher') timers.active(this); self._connecting = true ; self.writable = true ; var lastArg = arguments[_AN_Read_length('length', arguments) - 1]; if (typeof lastArg == 'function') { self.addListener('connect', lastArg); } var port = toPort(arguments[0]); if (port === false ) { self.fd = socket('unix'); self.type = 'unix'; setImplmentationMethods(this); doConnect(self, arguments[0]); } else { require('dns').lookup(arguments[1], function (err, ip, addressType){ if (err) { self.emit('error', err); } else { timers.active(self); self.type = addressType == 4? 'tcp4': 'tcp6'; self.fd = socket(self.type); doConnect(self, port, ip); } } ); } } ; Socket.prototype.address = function (){ return getsockname(this.fd); } ; Socket.prototype.setNoDelay = function (v){ if ((this.type == 'tcp4') || (this.type == 'tcp6')) { setNoDelay(this.fd, v); } } ; Socket.prototype.setKeepAlive = function (enable, time){ if ((this.type == 'tcp4') || (this.type == 'tcp6')) { var secondDelay = Math.ceil(time / 1000); setKeepAlive(this.fd, enable, secondDelay); } } ; Socket.prototype.setTimeout = function (msecs, callback){ if (msecs > 0) { timers.enroll(this, msecs); if (this.fd) { timers.active(this); } if (callback) { this.once('timeout', callback); } } else if (msecs === 0) { timers.unenroll(this); } } ; Socket.prototype.pause = function (){ if (this._readWatcher) this._readWatcher.stop(); } ; Socket.prototype.resume = function (){ if (this.fd === null ) throw new Error('Cannot resume() closed Socket.') if (this._readWatcher) { this._readWatcher.stop(); this._readWatcher.set(this.fd, true , false ); this._readWatcher.start(); } } ; Socket.prototype.destroySoon = function (){ if (this.flush()) { this.destroy(); } else { this.__destroyOnDrain = true ; } } ; Socket.prototype.destroy = function (exception){ var self = this; debug('destroy ' + this.fd); assert(this.bufferSize >= 0); this._writeQueue = [] ; this._writeQueueEncoding = [] ; this._writeQueueCallbacks = [] ; this._writeQueueFD = [] ; this.bufferSize = 0; this.readable = this.writable = false ; if (this._writeWatcher) { this._writeWatcher.stop(); this._writeWatcher.socket = null ; ioWatchers.free(this._writeWatcher); this._writeWatcher = null ; } if (this._readWatcher) { this._readWatcher.stop(); this._readWatcher.socket = null ; ioWatchers.free(this._readWatcher); this._readWatcher = null ; } timers.unenroll(this); if (this.server) { this.server.connections-- ; } if (typeof this.fd == 'number') { debug('close ' + this.fd); close(this.fd); this.fd = null ; process.nextTick(function (){ if (exception) self.emit('error', exception); self.emit('close', exception? true : false ); } ); } } ; Socket.prototype._shutdown = function (){ if (!this.writable) { throw new Error('The connection is not writable') } else { this.writable = false ; if (this.readable) { try { this._shutdownImpl(); } catch (e) { this.destroy(e); } } else { this.destroy(); } } } ; Socket.prototype.end = function (data, encoding){ if (this.writable) { if (this._writeQueueLast() !== END_OF_FILE) { DTRACE_NET_STREAM_END(this); if (data) _AN_Call_write('write', this, data, encoding); this._writeQueue.push(END_OF_FILE); if (!this._connecting) { this.flush(); } } } } ; function Server(){ if (!(this instanceof Server)) return new Server(arguments[0], arguments[1]); events.EventEmitter.call(this); var self = this; var options = { } ; if (typeof arguments[0] == 'object') { options = arguments[0]; } for (var l = _AN_Read_length('length', arguments) - 1; l >= 0; l-- ){ if (typeof arguments[l] == 'function') { self.addListener('connection', arguments[l]); } if (arguments[l] !== undefined) break ; } self.connections = 0; self.allowHalfOpen = options.allowHalfOpen || false ; self.watcher = new IOWatcher(); _AN_Write_host('host', self.watcher, false , self); self.watcher.callback = function (){ getDummyFD(); if (self._pauseTimer) { self.watcher.stop(); } while (self.fd){ try { var peerInfo = accept(self.fd); } catch (e) { if (e.errno != EMFILE) throw e rescueEMFILE(function (){ self._rejectPending(); } ); return ; } if (!peerInfo) return ; if (self.maxConnections && self.connections >= self.maxConnections) { close(peerInfo.fd); self._rejectPending(); return ; } self.connections++ ; var options = { fd: peerInfo.fd, type: self.type, allowHalfOpen: self.allowHalfOpen} ; var s = new Socket(options); s.remoteAddress = peerInfo.address; s.remotePort = _AN_Read_port('port', peerInfo); s.type = self.type; s.server = self; s.resume(); DTRACE_NET_SERVER_CONNECTION(s); self.emit('connection', s); try { s.emit('connect'); } catch (e) { s.destroy(e); return ; } } } ; } util.inherits(Server, events.EventEmitter); exports.Server = Server; exports.createServer = function (){ return new Server(arguments[0], arguments[1]); } ; Server.prototype.pause = function (msecs){ if (this._pauseTimer) return ; var self = this; msecs = msecs || 1000; this.watcher.stop(); this._pauseTimer = _AN_Call_settimeout('setTimeout', window, function (){ assert(parseInt(self.fd) >= 0); self._pauseTimer = null ; self.watcher.start(); } , msecs); } ; Server.prototype._rejectPending = function (){ var self = this; var acceptCount = 0; while (true ){ var peerInfo = accept(this.fd); if (!peerInfo) return ; close(peerInfo.fd); if (++acceptCount > 50) { this.pause(); return ; } } } ; Server.prototype.listen = function (){ var self = this; if (self.fd) throw new Error('Server already opened') var lastArg = arguments[_AN_Read_length('length', arguments) - 1]; if (typeof lastArg == 'function') { self.addListener('listening', lastArg); } var port = toPort(arguments[0]); if (_AN_Read_length('length', arguments) == 0 || typeof arguments[0] == 'function') { self.type = 'tcp4'; self.fd = socket(self.type); self._doListen(port); } else if (port === false ) { self.fd = socket('unix'); self.type = 'unix'; var path = arguments[0]; self.path = path; require('fs').stat(path, function (err, r){ if (err) { if (err.errno == ENOENT) { self._doListen(path); } else { throw r } } else { if (!r.isSocket()) { throw new Error('Non-socket exists at ' + path) } else { require('fs').unlink(path, function (err){ if (err) throw err self._doListen(path); } ); } } } ); } else { require('dns').lookup(arguments[1], function (err, ip, addressType){ if (err) { self.emit('error', err); } else { self.type = addressType == 4? 'tcp4': 'tcp6'; self.fd = socket(self.type); self._doListen(port, ip); } } ); } } ; Server.prototype.listenFD = function (fd, type){ if (this.fd) { throw new Error('Server already opened') } this.fd = fd; this.type = type || null ; this._startWatcher(); } ; Server.prototype._startWatcher = function (){ this.watcher.set(this.fd, true , false ); this.watcher.start(); this.emit('listening'); } ; Server.prototype._doListen = function (){ var self = this; getDummyFD(); try { bind(self.fd, arguments[0], arguments[1]); } catch (err) { self.close(); self.emit('error', err); return ; } process.nextTick(function (){ if (typeof self.fd != 'number') return ; try { listen(self.fd, self._backlog || 128); } catch (err) { self.close(); self.emit('error', err); return ; } self._startWatcher(); } ); } ; Server.prototype.address = function (){ return getsockname(this.fd); } ; Server.prototype.close = function (){ var self = this; if (!self.fd) throw new Error('Not running') self.watcher.stop(); close(self.fd); self.fd = null ; if (self._pauseTimer) { clearTimeout(self._pauseTimer); self._pauseTimer = null ; } if (self.type === 'unix') { require('fs').unlink(self.path, function (){ self.emit('close'); } ); } else { self.emit('close'); } } ; var dummyFD = null ; var lastEMFILEWarning = 0; function rescueEMFILE(callback){ var now = new Date(); if (now - lastEMFILEWarning > 5000) { console.error('(node) Hit max file limit. Increase "ulimit - n"'); lastEMFILEWarning = now; } if (dummyFD) { close(dummyFD); dummyFD = null ; callback(); getDummyFD(); } } function getDummyFD(){ if (!dummyFD) { try { dummyFD = socket('tcp'); } catch (e) { dummyFD = null ; } } }