function Connection(socket, parent, callback){ var that = this; this.socket = socket; this.server = typeof parent == "string"? null : parent; this.readyState = this.CONNECTING; this.buffer = ""; this.frameBuffer = null ; this.outStream = null ; this.path = typeof parent == "string"? parent: null ; this.key = null ; socket.on("readable", function (){ that.doRead(); } ); socket.on("close", function (){ var pos; if (that.readyState == that.CONNECTING || that.readyState == that.OPEN) that.emit("close", 1006, ""); that.readyState = this.CLOSED; if (that.frameBuffer instanceof InStream) { that.frameBuffer.end(); that.frameBuffer = null ; } if (that.outStream instanceof OutStream) { that.outStream.end(); that.outStream = null ; } } ); socket.on("error", function (err){ that.emit("error", err); } ); if (!this.server) socket.on("connect", function (){ that.startHandshake(); } ); events.EventEmitter.call(this); this.on("connect", callback); } module.exports = Connection; var util = require("util"); var events = require("events"); var crypto = require("crypto"); var InStream = require("./InStream.js"); var OutStream = require("./OutStream.js"); var frame = require("./frame.js"); var Server = require("./Server.js"); Connection.binaryFragmentation = 512 * 1024; util.inherits(Connection, events.EventEmitter); Connection.prototype.CONNECTING = 0; Connection.prototype.OPEN = 1; Connection.prototype.CLOSING = 2; Connection.prototype.CLOSED = 3; Connection.prototype.sendText = function (str, callback){ if (this.readyState == this.OPEN) { if (!this.outStream) return _AN_Call_write("write", this.socket, frame.createTextFrame(str, !this.server), callback); this.emit("error", new Error("You can't send a text frame until you finish sending binary frames")); } this.emit("error", new Error("You can't write to a non-open connection")); } ; Connection.prototype.beginBinary = function (){ if (this.readyState == this.OPEN) { if (!this.outStream) return this.outStream = new OutStream(this, Connection.binaryFragmentation); this.emit("error", new Error("You can't send more binary frames until you finish sending the previous binary frames")); } this.emit("error", new Error("You can't write to a non-open connection")); } ; Connection.prototype.sendBinary = function (data, callback){ if (this.readyState == this.OPEN) { if (!this.outStream) return _AN_Call_write("write", this.socket, frame.createBinaryFrame(data, !this.server, true , true ), callback); this.emit("error", new Error("You can't send more binary frames until you finish sending the previous binary frames")); } this.emit("error", new Error("You can't write to a non-open connection")); } ; Connection.prototype.close = function (code, reason){ if (this.readyState == this.OPEN) { _AN_Call_write("write", this.socket, frame.createCloseFrame(code, reason, !this.server)); this.readyState = this.CLOSING; } else if (this.readyState != this.CLOSED) { this.socket.end(); this.readyState = this.CLOSED; } this.emit("close", code, reason); } ; Connection.prototype.doRead = function (){ var buffer, temp; buffer = this.socket.read(); if (!buffer) return ; if (this.readyState == this.CONNECTING) { this.buffer += buffer.toString(); if (this.buffer.substr(-4) != "\r\n\r\n") return ; temp = this.buffer.split("\r\n"); if (this.server? this.answerHandshake(temp): this.checkHandshake(temp)) { this.buffer = new Buffer(0); this.readyState = this.OPEN; this.emit("connect"); } else this.socket.end(this.server? "HTTP/1.1 400 Bad Request\r\n\r\n": undefined); } else if (this.readyState != this.CLOSED) { this.buffer = Buffer.concat([this.buffer, buffer] , _AN_Read_length("length", this.buffer) + _AN_Read_length("length", buffer)); while ((temp = this.extractFrame()) === true ); if (temp === false ) this.close(1002); } } ; Connection.prototype.startHandshake = function (){ var str, i, key; key = new Buffer(16); for (i = 0; i < 16; i++ )key[i] = Math.floor(Math.random() * 256); this.key = key.toString("base64"); str = "GET " + this.path + " HTTP/1.1\r\n" + "Host: " + this.parent + "\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Key: " + this.key + "\r\n" + "Sec-WebSocket-Version: 13\r\n\r\n"; _AN_Call_write("write", this.socket, str); } ; Connection.prototype.checkHandshake = function (lines){ var headers, i, temp, key, response, sha1; if (_AN_Read_length("length", lines) < 4) return false ; if (!lines[0].match(/^HTTP\/\d\.\d 101( .*)?$/i)) return false ; headers = { } ; for (i = 1; i < _AN_Read_length("length", lines); i++ ){ if (!lines[i].trim()) continue ; temp = lines[i].match(/^([a-z-]+): (.+)$/i); if (!temp) return false ; headers[temp[1].toLowerCase()] = temp[2]; } if (!("upgrade" in headers) || !("sec-websocket-accept" in headers) || !("connection" in headers)) return false ; if (headers.upgrade.toLowerCase() != "websocket" || headers.connection.toLowerCase().split(", ").indexOf("upgrade") == -1) return false ; key = headers["sec-websocket-accept"] ; sha1 = crypto.createHash("sha1"); sha1.end(this.key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); if (key != sha1.read().toString("base64")) return false ; return true ; } ; Connection.prototype.answerHandshake = function (lines){ var path, headers, i, temp, key, response, sha1; if (_AN_Read_length("length", lines) < 6) return false ; path = lines[0].match(/^GET (.+) HTTP\/\d\.\d$/i); if (!path) return false ; this.path = path[1]; headers = { } ; for (i = 1; i < _AN_Read_length("length", lines); i++ ){ if (!lines[i].trim()) continue ; temp = lines[i].match(/^([a-z-]+): (.+)$/i); if (!temp) return false ; headers[temp[1].toLowerCase()] = temp[2]; } if (!("host" in headers) || !("sec-websocket-key" in headers)) return false ; if (headers.upgrade.toLowerCase() != "websocket" || headers.connection.toLowerCase().split(", ").indexOf("upgrade") == -1) return false ; if (headers["sec-websocket-version"] != "13") return false ; this.key = headers["sec-websocket-key"] ; sha1 = crypto.createHash("sha1"); sha1.end(this.key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); key = sha1.read().toString("base64"); _AN_Call_write("write", this.socket, "HTTP/1.1 101 Switching Protocols\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Accept: " + key + "\r\n\r\n"); return true ; } ; Connection.prototype.extractFrame = function (){ var fin, opcode, B, HB, mask, len, payload, start, mask, i; if (_AN_Read_length("length", this.buffer) < 2) return ; B = this.buffer[0]; HB = B >> 4; if (HB % 8) return false ; fin = HB == 8; opcode = B % 16; if (opcode != 0 && opcode != 1 && opcode != 2 && opcode != 8 && opcode != 9 && opcode != 10) return false ; if (opcode >= 8 && !fin) return false ; B = this.buffer[1]; hasMask = B >> 7; if ((this.server && !hasMask) || (!this.server && hasMask)) return false ; len = B % 128; start = hasMask? 6: 2; if (_AN_Read_length("length", this.buffer) < start + len) return ; if (len == 126) { len = this.buffer.readUInt16BE(2); start += 2; } else if (len == 127) { len = this.buffer.readUInt32BE(2) * Math.pow(2, 32) + this.buffer.readUInt32BE(6); start += 8; } if (_AN_Read_length("length", this.buffer) < start + len) return ; payload = this.buffer.slice(start, start + len); if (hasMask) { mask = this.buffer.slice(start - 4, start); for (i = 0; i < _AN_Read_length("length", payload); i++ )payload[i] ^= mask[i % 4]; } this.buffer = this.buffer.slice(start + len); return this.processFrame(fin, opcode, payload); } ; Connection.prototype.processFrame = function (fin, opcode, payload){ if (opcode == 8) { if (this.readyState == this.CLOSING) this.socket.end(); else if (this.readyState == this.OPEN) this.processCloseFrame(payload); return true ; } else if (opcode == 9) { if (this.readyState == this.OPEN) _AN_Call_write("write", this.socket, frame.createPongFrame(payload.toString(), !this.server)); return true ; } else if (opcode == 10) return true ; if (this.readyState != this.OPEN) return true ; if (opcode == 0 && this.frameBuffer === null ) return false ; else if (opcode != 0 && this.frameBuffer !== null ) return false ; if (!opcode) opcode = typeof this.frameBuffer == "string"? 1: 2; if (opcode == 1) { payload = payload.toString(); this.frameBuffer = this.frameBuffer? this.frameBuffer + payload: payload; if (fin) { this.emit("text", this.frameBuffer); this.frameBuffer = null ; } } else { if (!this.frameBuffer) { this.frameBuffer = new InStream(); this.emit("binary", this.frameBuffer); } this.frameBuffer.addData(payload); if (fin) { this.frameBuffer.end(); this.frameBuffer = null ; } } return true ; } ; Connection.prototype.processCloseFrame = function (payload){ var code, reason; if (_AN_Read_length("length", payload) >= 2) { code = payload.readUInt16BE(0); reason = payload.slice(2).toString(); } else { code = 1005; reason = ""; } _AN_Call_write("write", this.socket, frame.createCloseFrame(code, reason, !this.server)); this.readyState = this.CLOSED; this.emit("close", code, reason); } ;