// // Created by Mingliang Chen on 18/4/1. // illuspas[a]gmail.com // Copyright (c) 2018 Nodemedia. All rights reserved. // // const EventEmitter = require('events'); const QueryString = require('querystring'); const AV = require('./node_core_av'); const { AUDIO_SOUND_RATE, AUDIO_CODEC_NAME, VIDEO_CODEC_NAME } = require('./node_core_av'); const AMF = require('./node_core_amf'); const Handshake = require('./node_rtmp_handshake'); const NodeCoreUtils = require('./node_core_utils'); const context = require('./node_core_ctx'); // const Logger = require('./node_core_logger'); const N_CHUNK_STREAM = 8; const RTMP_VERSION = 3; const RTMP_HANDSHAKE_SIZE = 1536; const RTMP_HANDSHAKE_UNINIT = 0; const RTMP_HANDSHAKE_0 = 1; const RTMP_HANDSHAKE_1 = 2; const RTMP_HANDSHAKE_2 = 3; const RTMP_PARSE_INIT = 0; const RTMP_PARSE_BASIC_HEADER = 1; const RTMP_PARSE_MESSAGE_HEADER = 2; const RTMP_PARSE_EXTENDED_TIMESTAMP = 3; const RTMP_PARSE_PAYLOAD = 4; const MAX_CHUNK_HEADER = 18; const RTMP_CHUNK_TYPE_0 = 0; // 11-bytes: timestamp(3) + length(3) + stream type(1) + stream id(4) const RTMP_CHUNK_TYPE_1 = 1; // 7-bytes: delta(3) + length(3) + stream type(1) const RTMP_CHUNK_TYPE_2 = 2; // 3-bytes: delta(3) const RTMP_CHUNK_TYPE_3 = 3; // 0-byte const RTMP_CHANNEL_PROTOCOL = 2; const RTMP_CHANNEL_INVOKE = 3; const RTMP_CHANNEL_AUDIO = 4; const RTMP_CHANNEL_VIDEO = 5; const RTMP_CHANNEL_DATA = 6; const rtmpHeaderSize = [11, 7, 3, 0]; /* Protocol Control Messages */ const RTMP_TYPE_SET_CHUNK_SIZE = 1; const RTMP_TYPE_ABORT = 2; const RTMP_TYPE_ACKNOWLEDGEMENT = 3; // bytes read report const RTMP_TYPE_WINDOW_ACKNOWLEDGEMENT_SIZE = 5; // server bandwidth const RTMP_TYPE_SET_PEER_BANDWIDTH = 6; // client bandwidth /* User Control Messages Event (4) */ const RTMP_TYPE_EVENT = 4; const RTMP_TYPE_AUDIO = 8; const RTMP_TYPE_VIDEO = 9; /* Data Message */ const RTMP_TYPE_FLEX_STREAM = 15; // AMF3 const RTMP_TYPE_DATA = 18; // AMF0 /* Shared Object Message */ const RTMP_TYPE_FLEX_OBJECT = 16; // AMF3 const RTMP_TYPE_SHARED_OBJECT = 19; // AMF0 /* Command Message */ const RTMP_TYPE_FLEX_MESSAGE = 17; // AMF3 const RTMP_TYPE_INVOKE = 20; // AMF0 /* Aggregate Message */ const RTMP_TYPE_METADATA = 22; const RTMP_CHUNK_SIZE = 128; const RTMP_PING_TIME = 60000; const RTMP_PING_TIMEOUT = 30000; const STREAM_BEGIN = 0x00; const STREAM_EOF = 0x01; const STREAM_DRY = 0x02; const STREAM_EMPTY = 0x1f; const STREAM_READY = 0x20; const RtmpPacket = { create: (fmt = 0, cid = 0) => { return { header: { fmt: fmt, cid: cid, timestamp: 0, length: 0, type: 0, stream_id: 0 }, clock: 0, payload: null, capacity: 0, bytes: 0 }; } }; class NodeRtmpSession { constructor(config, socket) { this.config = config; this.socket = socket; this.id = NodeCoreUtils.generateNewSessionID(); this.ip = socket.remoteAddress; this.TAG = 'rtmp'; // this.eventEmitter = new EventEmitter(); this.handshakePayload = Buffer.alloc(RTMP_HANDSHAKE_SIZE); this.handshakeState = RTMP_HANDSHAKE_UNINIT; this.handshakeBytes = 0; this.parserBuffer = Buffer.alloc(MAX_CHUNK_HEADER); this.parserState = RTMP_PARSE_INIT; this.parserBytes = 0; this.parserBasicBytes = 0; this.parserPacket = null; this.inPackets = new Map(); this.inChunkSize = RTMP_CHUNK_SIZE; this.outChunkSize = config.rtmp.chunk_size ? config.rtmp.chunk_size : RTMP_CHUNK_SIZE; this.pingTime = config.rtmp.ping ? config.rtmp.ping * 1000 : RTMP_PING_TIME; this.pingTimeout = config.rtmp.ping_timeout ? config.rtmp.ping_timeout * 1000 : RTMP_PING_TIMEOUT; this.pingInterval = null; this.isIPC = false; this.isLocal = this.ip === '127.0.0.1' || this.ip === '::1' || this.ip == '::ffff:127.0.0.1'; this.isStarting = false; this.isPublishing = false; this.isPlaying = false; this.isIdling = false; this.isPause = false; this.isReceiveAudio = true; this.isReceiveVideo = true; this.metaData = null; this.aacSequenceHeader = null; this.avcSequenceHeader = null; this.audioCodec = 0; this.audioCodecName = ''; this.audioProfileName = ''; this.audioSamplerate = 0; this.audioChannels = 1; this.videoCodec = 0; this.videoCodecName = ''; this.videoProfileName = ''; this.videoWidth = 0; this.videoHeight = 0; this.videoFps = 0; this.videoLevel = 0; this.gopCacheEnable = config.rtmp.gop_cache; this.rtmpGopCacheQueue = null; this.ackSize = 0; this.inAckSize = 0; this.inLastAck = 0; this.appname = ''; this.streams = 0; this.playStreamId = 0; this.playStreamPath = ''; this.playArgs = {}; this.publishStreamId = 0; this.publishStreamPath = ''; this.publishArgs = {}; this.players = new Set(); this.writeBufferQueue = []; context.sessions.set(this.id, this); } // getEventEmitter() { // return this.eventEmitter // } run() { this.socket.on('data', this.onSocketData.bind(this)); this.socket.on('close', this.onSocketClose.bind(this)); this.socket.on('error', this.onSocketError.bind(this)); this.socket.on('timeout', this.onSocketTimeout.bind(this)); this.socket.setTimeout(this.pingTimeout); this.isStarting = true; } stop() { if (this.isStarting) { this.isStarting = false; if (this.playStreamId > 0) { this.onDeleteStream({ streamId: this.playStreamId }); } if (this.publishStreamId > 0) { this.onDeleteStream({ streamId: this.publishStreamId }); } if (this.pingInterval != null) { clearInterval(this.pingInterval); this.pingInterval = null; } if (!this.isIPC) { // Logger.log(`[rtmp disconnect] id=${this.id}`); context.nodeEvent.emit('doneConnect', this.id, this.connectCmdObj); } context.sessions.delete(this.id); this.socket.destroy(); } } reject() { // Logger.log(`[rtmp reject] id=${this.id}`); this.stop(); } writeBuffer(data) { this.writeBufferQueue.push(data); if (this.writeBufferQueue.length >= 10) { this.socket.write(Buffer.concat(this.writeBufferQueue)); this.writeBufferQueue.length = 0; } } onSocketClose() { // Logger.log('onSocketClose'); this.stop(); } onSocketError(e) { // Logger.log('onSocketError', e); this.stop(); } onSocketTimeout() { // Logger.log('onSocketTimeout'); this.stop(); } onSocketData(data) { let bytes = data.length; let p = 0; let n = 0; while (bytes > 0) { switch (this.handshakeState) { case RTMP_HANDSHAKE_UNINIT: // Logger.log('RTMP_HANDSHAKE_UNINIT'); this.handshakeState = RTMP_HANDSHAKE_0; this.handshakeBytes = 0; bytes -= 1; p += 1; break; case RTMP_HANDSHAKE_0: // Logger.log('RTMP_HANDSHAKE_0'); n = RTMP_HANDSHAKE_SIZE - this.handshakeBytes; n = n <= bytes ? n : bytes; data.copy(this.handshakePayload, this.handshakeBytes, p, p + n); this.handshakeBytes += n; bytes -= n; p += n; if (this.handshakeBytes === RTMP_HANDSHAKE_SIZE) { this.handshakeState = RTMP_HANDSHAKE_1; this.handshakeBytes = 0; let s0s1s2 = Handshake.generateS0S1S2(this.handshakePayload); this.socket.write(s0s1s2); } break; case RTMP_HANDSHAKE_1: // Logger.log('RTMP_HANDSHAKE_1'); n = RTMP_HANDSHAKE_SIZE - this.handshakeBytes; n = n <= bytes ? n : bytes; data.copy(this.handshakePayload, this.handshakeBytes, p, n); this.handshakeBytes += n; bytes -= n; p += n; if (this.handshakeBytes === RTMP_HANDSHAKE_SIZE) { this.handshakeState = RTMP_HANDSHAKE_2; this.handshakeBytes = 0; this.handshakePayload = null; } break; case RTMP_HANDSHAKE_2: default: // Logger.log('RTMP_HANDSHAKE_2'); return this.rtmpChunkRead(data, p, bytes); } } } rtmpChunkBasicHeaderCreate(fmt, cid) { let out; if (cid >= 64 + 255) { out = Buffer.alloc(3); out[0] = (fmt << 6) | 1; out[1] = (cid - 64) & 0xFF; out[2] = ((cid - 64) >> 8) & 0xFF; } else if (cid >= 64) { out = Buffer.alloc(2); out[0] = (fmt << 6) | 0; out[1] = (cid - 64) & 0xFF; } else { out = Buffer.alloc(1); out[0] = (fmt << 6) | cid; } return out; } rtmpChunkMessageHeaderCreate(header) { let out = Buffer.alloc(rtmpHeaderSize[header.fmt % 4]); if (header.fmt <= RTMP_CHUNK_TYPE_2) { out.writeUIntBE(header.timestamp >= 0xffffff ? 0xffffff : header.timestamp, 0, 3); } if (header.fmt <= RTMP_CHUNK_TYPE_1) { out.writeUIntBE(header.length, 3, 3); out.writeUInt8(header.type, 6); } if (header.fmt === RTMP_CHUNK_TYPE_0) { out.writeUInt32LE(header.stream_id, 7); } return out; } rtmpChunksCreate(packet) { let header = packet.header; let payload = packet.payload; let payloadSize = header.length; let chunkSize = this.outChunkSize; let chunksOffset = 0; let payloadOffset = 0; let chunkBasicHeader = this.rtmpChunkBasicHeaderCreate(header.fmt, header.cid); let chunkBasicHeader3 = this.rtmpChunkBasicHeaderCreate(RTMP_CHUNK_TYPE_3, header.cid); let chunkMessageHeader = this.rtmpChunkMessageHeaderCreate(header); let useExtendedTimestamp = header.timestamp >= 0xffffff; let headerSize = chunkBasicHeader.length + chunkMessageHeader.length + (useExtendedTimestamp ? 4 : 0); let n = headerSize + payloadSize + Math.floor(payloadSize / chunkSize); if (useExtendedTimestamp) { n += Math.floor(payloadSize / chunkSize) * 4; } if (!(payloadSize % chunkSize)) { n -= 1; if (useExtendedTimestamp) { //TODO CHECK n -= 4; } } let chunks = Buffer.alloc(n); chunkBasicHeader.copy(chunks, chunksOffset); chunksOffset += chunkBasicHeader.length; chunkMessageHeader.copy(chunks, chunksOffset); chunksOffset += chunkMessageHeader.length; if (useExtendedTimestamp) { chunks.writeUInt32BE(header.timestamp, chunksOffset); chunksOffset += 4; } while (payloadSize > 0) { if (payloadSize > chunkSize) { payload.copy(chunks, chunksOffset, payloadOffset, payloadOffset + chunkSize); payloadSize -= chunkSize; chunksOffset += chunkSize; payloadOffset += chunkSize; chunkBasicHeader3.copy(chunks, chunksOffset); chunksOffset += chunkBasicHeader3.length; if (useExtendedTimestamp) { chunks.writeUInt32BE(header.timestamp, chunksOffset); chunksOffset += 4; } } else { payload.copy(chunks, chunksOffset, payloadOffset, payloadOffset + payloadSize); payloadSize -= payloadSize; chunksOffset += payloadSize; payloadOffset += payloadSize; } } return chunks; } rtmpChunkRead(data, p, bytes) { // Logger.log('rtmpChunkRead', p, bytes); let size = 0; let offset = 0; let extended_timestamp = 0; while (offset < bytes) { switch (this.parserState) { case RTMP_PARSE_INIT: this.parserBytes = 1; this.parserBuffer[0] = data[p + offset++]; if (0 === (this.parserBuffer[0] & 0x3F)) { this.parserBasicBytes = 2; } else if (1 === (this.parserBuffer[0] & 0x3F)) { this.parserBasicBytes = 3; } else { this.parserBasicBytes = 1; } this.parserState = RTMP_PARSE_BASIC_HEADER; break; case RTMP_PARSE_BASIC_HEADER: while (this.parserBytes < this.parserBasicBytes && offset < bytes) { this.parserBuffer[this.parserBytes++] = data[p + offset++]; } if (this.parserBytes >= this.parserBasicBytes) { this.parserState = RTMP_PARSE_MESSAGE_HEADER; } break; case RTMP_PARSE_MESSAGE_HEADER: size = rtmpHeaderSize[this.parserBuffer[0] >> 6] + this.parserBasicBytes; while (this.parserBytes < size && offset < bytes) { this.parserBuffer[this.parserBytes++] = data[p + offset++]; } if (this.parserBytes >= size) { this.rtmpPacketParse(); this.parserState = RTMP_PARSE_EXTENDED_TIMESTAMP; } break; case RTMP_PARSE_EXTENDED_TIMESTAMP: size = rtmpHeaderSize[this.parserPacket.header.fmt] + this.parserBasicBytes; if (this.parserPacket.header.timestamp === 0xFFFFFF) size += 4; while (this.parserBytes < size && offset < bytes) { this.parserBuffer[this.parserBytes++] = data[p + offset++]; } if (this.parserBytes >= size) { if (this.parserPacket.header.timestamp === 0xFFFFFF) { extended_timestamp = this.parserBuffer.readUInt32BE(rtmpHeaderSize[this.parserPacket.header.fmt] + this.parserBasicBytes); } else { extended_timestamp = this.parserPacket.header.timestamp; } if (this.parserPacket.bytes === 0) { if (RTMP_CHUNK_TYPE_0 === this.parserPacket.header.fmt) { this.parserPacket.clock = extended_timestamp; } else { this.parserPacket.clock += extended_timestamp; } this.rtmpPacketAlloc(); } this.parserState = RTMP_PARSE_PAYLOAD; } break; case RTMP_PARSE_PAYLOAD: size = Math.min(this.inChunkSize - (this.parserPacket.bytes % this.inChunkSize), this.parserPacket.header.length - this.parserPacket.bytes); size = Math.min(size, bytes - offset); if (size > 0) { data.copy(this.parserPacket.payload, this.parserPacket.bytes, p + offset, p + offset + size); } this.parserPacket.bytes += size; offset += size; if (this.parserPacket.bytes >= this.parserPacket.header.length) { this.parserState = RTMP_PARSE_INIT; this.parserPacket.bytes = 0; if(this.parserPacket.clock > 0xffffffff){ //TODO Shit code, rewrite chunkcreate break; } this.rtmpHandler(); } else if (0 === (this.parserPacket.bytes % this.inChunkSize)) { this.parserState = RTMP_PARSE_INIT; } break; } } this.inAckSize += data.length; if (this.inAckSize >= 0xf0000000) { this.inAckSize = 0; this.inLastAck = 0; } if (this.ackSize > 0 && this.inAckSize - this.inLastAck >= this.ackSize) { this.inLastAck = this.inAckSize; this.sendACK(this.inAckSize); } } rtmpPacketParse() { let fmt = this.parserBuffer[0] >> 6; let cid = 0; if (this.parserBasicBytes === 2) { cid = 64 + this.parserBuffer[1]; } else if (this.parserBasicBytes === 3) { cid = 64 + this.parserBuffer[1] + this.parserBuffer[2] << 8; } else { cid = this.parserBuffer[0] & 0x3F; } let hasp = this.inPackets.has(cid); if (!hasp) { this.parserPacket = RtmpPacket.create(fmt, cid); this.inPackets.set(cid, this.parserPacket); } else { this.parserPacket = this.inPackets.get(cid); } this.parserPacket.header.fmt = fmt; this.parserPacket.header.cid = cid; this.rtmpChunkMessageHeaderRead(); if (this.parserPacket.header.type > RTMP_TYPE_METADATA) { Logger.error("rtmp packet parse error.", this.parserPacket); this.stop(); } } rtmpChunkMessageHeaderRead() { let offset = this.parserBasicBytes; // timestamp / delta if (this.parserPacket.header.fmt <= RTMP_CHUNK_TYPE_2) { this.parserPacket.header.timestamp = this.parserBuffer.readUIntBE(offset, 3); offset += 3; } // message length + type if (this.parserPacket.header.fmt <= RTMP_CHUNK_TYPE_1) { this.parserPacket.header.length = this.parserBuffer.readUIntBE(offset, 3); this.parserPacket.header.type = this.parserBuffer[offset + 3]; offset += 4; } if (this.parserPacket.header.fmt === RTMP_CHUNK_TYPE_0) { this.parserPacket.header.stream_id = this.parserBuffer.readUInt32LE(offset); offset += 4; } return offset; } rtmpPacketAlloc() { if (this.parserPacket.capacity < this.parserPacket.header.length) { this.parserPacket.payload = Buffer.alloc(this.parserPacket.header.length + 1024); this.parserPacket.capacity = this.parserPacket.header.length + 1024; } } rtmpHandler() { switch (this.parserPacket.header.type) { case RTMP_TYPE_SET_CHUNK_SIZE: case RTMP_TYPE_ABORT: case RTMP_TYPE_ACKNOWLEDGEMENT: case RTMP_TYPE_WINDOW_ACKNOWLEDGEMENT_SIZE: case RTMP_TYPE_SET_PEER_BANDWIDTH: return 0 === this.rtmpControlHandler() ? -1 : 0; case RTMP_TYPE_EVENT: return 0 === this.rtmpEventHandler() ? -1 : 0; case RTMP_TYPE_AUDIO: return this.rtmpAudioHandler(); case RTMP_TYPE_VIDEO: return this.rtmpVideoHandler(); case RTMP_TYPE_FLEX_MESSAGE: case RTMP_TYPE_INVOKE: return this.rtmpInvokeHandler(); case RTMP_TYPE_FLEX_STREAM:// AMF3 case RTMP_TYPE_DATA: // AMF0 return this.rtmpDataHandler(); } } rtmpControlHandler() { let payload = this.parserPacket.payload; switch (this.parserPacket.header.type) { case RTMP_TYPE_SET_CHUNK_SIZE: this.inChunkSize = payload.readUInt32BE(); // Logger.debug('set inChunkSize', this.inChunkSize); break; case RTMP_TYPE_ABORT: break; case RTMP_TYPE_ACKNOWLEDGEMENT: break; case RTMP_TYPE_WINDOW_ACKNOWLEDGEMENT_SIZE: this.ackSize = payload.readUInt32BE(); // Logger.debug('set ack Size', this.ackSize); break; case RTMP_TYPE_SET_PEER_BANDWIDTH: break; } } rtmpEventHandler() { } rtmpAudioHandler() { let payload = this.parserPacket.payload.slice(0, this.parserPacket.header.length); let sound_format = (payload[0] >> 4) & 0x0f; let sound_type = payload[0] & 0x01; let sound_size = (payload[0] >> 1) & 0x01; let sound_rate = (payload[0] >> 2) & 0x03; if (this.audioCodec == 0) { this.audioCodec = sound_format; this.audioCodecName = AUDIO_CODEC_NAME[sound_format]; this.audioSamplerate = AUDIO_SOUND_RATE[sound_rate]; this.audioChannels = ++sound_type; if (sound_format == 4) { this.audioSamplerate = 16000; } else if (sound_format == 5) { this.audioSamplerate = 8000; } else if (sound_format == 11) { this.audioSamplerate = 16000; } else if (sound_format == 14) { this.audioSamplerate = 8000; } if (sound_format != 10 && !this.isIPC) { // Logger.log(`[rtmp publish] Handle audio. id=${this.id} streamPath=${this.publishStreamPath} sound_format=${sound_format} sound_type=${sound_type} sound_size=${sound_size} sound_rate=${sound_rate} codec_name=${this.audioCodecName} ${this.audioSamplerate} ${this.audioChannels}ch`); } } if (sound_format == 10 && payload[1] == 0) { //cache aac sequence header this.isFirstAudioReceived = true; this.aacSequenceHeader = Buffer.alloc(payload.length); payload.copy(this.aacSequenceHeader); let info = AV.readAACSpecificConfig(this.aacSequenceHeader); this.audioProfileName = AV.getAACProfileName(info); this.audioSamplerate = info.sample_rate; this.audioChannels = info.channels; if (!this.isIPC) { // Logger.log(`[rtmp publish] Handle audio. id=${this.id} streamPath=${this.publishStreamPath} sound_format=${sound_format} sound_type=${sound_type} sound_size=${sound_size} sound_rate=${sound_rate} codec_name=${this.audioCodecName} ${this.audioSamplerate} ${this.audioChannels}ch`); } } let packet = RtmpPacket.create(); packet.header.fmt = RTMP_CHUNK_TYPE_0; packet.header.cid = RTMP_CHANNEL_AUDIO; packet.header.type = RTMP_TYPE_AUDIO; packet.payload = payload; packet.header.length = packet.payload.length; packet.header.timestamp = this.parserPacket.clock; let rtmpChunks = this.rtmpChunksCreate(packet); //cache gop if (this.rtmpGopCacheQueue != null) { if (this.aacSequenceHeader != null && payload[1] === 0) { //skip aac sequence header } else { this.rtmpGopCacheQueue.add(rtmpChunks); } } for (let playerId of this.players) { let playerSession = context.sessions.get(playerId); if (playerSession instanceof NodeRtmpSession) { if (playerSession.isStarting && playerSession.isPlaying && !playerSession.isPause && playerSession.isReceiveAudio) { rtmpChunks.writeUInt32LE(playerSession.playStreamId, 8); playerSession.writeBuffer(rtmpChunks); } } } } rtmpVideoHandler() { let payload = this.parserPacket.payload.slice(0, this.parserPacket.header.length); let frame_type = (payload[0] >> 4) & 0x0f; let codec_id = payload[0] & 0x0f; if (codec_id == 7 || codec_id == 12) { //cache avc sequence header if (frame_type == 1 && payload[1] == 0) { this.avcSequenceHeader = Buffer.alloc(payload.length); payload.copy(this.avcSequenceHeader); let info = AV.readAVCSpecificConfig(this.avcSequenceHeader); this.videoWidth = info.width; this.videoHeight = info.height; this.videoProfileName = AV.getAVCProfileName(info); this.videoLevel = info.level; this.rtmpGopCacheQueue = this.gopCacheEnable ? new Set() : null; //Logger.log(`[rtmp publish] avc sequence header`,this.avcSequenceHeader); } } if (this.videoCodec == 0) { this.videoCodec = codec_id; this.videoCodecName = VIDEO_CODEC_NAME[codec_id]; if (!this.isIPC) { // Logger.log(`[rtmp publish] Handle video. id=${this.id} streamPath=${this.publishStreamPath} frame_type=${frame_type} codec_id=${codec_id} codec_name=${this.videoCodecName} ${this.videoWidth}x${this.videoHeight}`); } } let packet = RtmpPacket.create(); packet.header.fmt = RTMP_CHUNK_TYPE_0; packet.header.cid = RTMP_CHANNEL_VIDEO; packet.header.type = RTMP_TYPE_VIDEO; packet.payload = payload; packet.header.length = packet.payload.length; packet.header.timestamp = this.parserPacket.clock; let rtmpChunks = this.rtmpChunksCreate(packet); //cache gop if ((codec_id == 7 || codec_id == 12) && this.rtmpGopCacheQueue != null) { if (frame_type == 1 && payload[1] == 1) { this.rtmpGopCacheQueue.clear(); } if (frame_type == 1 && payload[1] == 0) { //skip avc sequence header } else { this.rtmpGopCacheQueue.add(rtmpChunks); } } // Logger.log(rtmpChunks); for (let playerId of this.players) { let playerSession = context.sessions.get(playerId); if (playerSession instanceof NodeRtmpSession) { if (playerSession.isStarting && playerSession.isPlaying && !playerSession.isPause && playerSession.isReceiveVideo) { rtmpChunks.writeUInt32LE(playerSession.playStreamId, 8); playerSession.writeBuffer(rtmpChunks); } } } } rtmpDataHandler() { let offset = this.parserPacket.header.type === RTMP_TYPE_FLEX_STREAM ? 1 : 0; let payload = this.parserPacket.payload.slice(offset, this.parserPacket.header.length); let dataMessage = AMF.decodeAmf0Data(payload); switch (dataMessage.cmd) { case '@setDataFrame': if (dataMessage.dataObj) { this.audioSamplerate = dataMessage.dataObj.audiosamplerate; this.audioChannels = dataMessage.dataObj.stereo ? 2 : 1; this.videoWidth = dataMessage.dataObj.width; this.videoHeight = dataMessage.dataObj.height; this.videoFps = dataMessage.dataObj.framerate; } let opt = { cmd: 'onMetaData', dataObj: dataMessage.dataObj }; this.metaData = AMF.encodeAmf0Data(opt); let packet = RtmpPacket.create(); packet.header.fmt = RTMP_CHUNK_TYPE_0; packet.header.cid = RTMP_CHANNEL_DATA; packet.header.type = RTMP_TYPE_DATA; packet.payload = this.metaData; packet.header.length = packet.payload.length; let rtmpChunks = this.rtmpChunksCreate(packet); for (let playerId of this.players) { let playerSession = context.sessions.get(playerId); if (playerSession instanceof NodeRtmpSession) { if (playerSession.isStarting && playerSession.isPlaying && !playerSession.isPause) { rtmpChunks.writeUInt32LE(playerSession.playStreamId, 8); playerSession.socket.write(rtmpChunks); } } } break; } } rtmpInvokeHandler() { let offset = this.parserPacket.header.type === RTMP_TYPE_FLEX_MESSAGE ? 1 : 0; let payload = this.parserPacket.payload.slice(offset, this.parserPacket.header.length); let invokeMessage = AMF.decodeAmf0Cmd(payload); // Logger.log(invokeMessage); switch (invokeMessage.cmd) { case 'connect': this.onConnect(invokeMessage); break; case 'releaseStream': break; case 'FCPublish': break; case 'createStream': this.onCreateStream(invokeMessage); break; case 'publish': this.onPublish(invokeMessage); break; case 'play': this.onPlay(invokeMessage); break; case 'pause': this.onPause(invokeMessage); break; case 'FCUnpublish': break; case 'deleteStream': this.onDeleteStream(invokeMessage); break; case 'closeStream': this.onCloseStream(); break; case 'receiveAudio': this.onReceiveAudio(invokeMessage); break; case 'receiveVideo': this.onReceiveVideo(invokeMessage); break; } } sendACK(size) { let rtmpBuffer = Buffer.from('02000000000004030000000000000000', 'hex'); rtmpBuffer.writeUInt32BE(size, 12); this.socket.write(rtmpBuffer); } sendWindowACK(size) { let rtmpBuffer = Buffer.from('02000000000004050000000000000000', 'hex'); rtmpBuffer.writeUInt32BE(size, 12); this.socket.write(rtmpBuffer); }; setPeerBandwidth(size, type) { let rtmpBuffer = Buffer.from('0200000000000506000000000000000000', 'hex'); rtmpBuffer.writeUInt32BE(size, 12); rtmpBuffer[16] = type; this.socket.write(rtmpBuffer); }; setChunkSize(size) { let rtmpBuffer = Buffer.from('02000000000004010000000000000000', 'hex'); rtmpBuffer.writeUInt32BE(size, 12); this.socket.write(rtmpBuffer); }; sendStreamStatus(st, id) { let rtmpBuffer = Buffer.from('020000000000060400000000000000000000', 'hex'); rtmpBuffer.writeUInt16BE(st, 12); rtmpBuffer.writeUInt32BE(id, 14); this.socket.write(rtmpBuffer); } sendInvokeMessage(sid, opt) { let packet = RtmpPacket.create(); packet.header.fmt = RTMP_CHUNK_TYPE_0; packet.header.cid = RTMP_CHANNEL_INVOKE; packet.header.type = RTMP_TYPE_INVOKE; packet.header.stream_id = sid; packet.payload = AMF.encodeAmf0Cmd(opt); packet.header.length = packet.payload.length; let chunks = this.rtmpChunksCreate(packet); this.socket.write(chunks); } sendDataMessage(opt, sid) { let packet = RtmpPacket.create(); packet.header.fmt = RTMP_CHUNK_TYPE_0; packet.header.cid = RTMP_CHANNEL_DATA; packet.header.type = RTMP_TYPE_DATA; packet.payload = AMF.encodeAmf0Data(opt); packet.header.length = packet.payload.length; packet.header.stream_id = sid; let chunks = this.rtmpChunksCreate(packet); this.socket.write(chunks); } sendStatusMessage(sid, level, code, description) { let opt = { cmd: 'onStatus', transId: 0, cmdObj: null, info: { level: level, code: code, description: description } }; this.sendInvokeMessage(sid, opt); } sendRtmpSampleAccess(sid) { let opt = { cmd: '|RtmpSampleAccess', bool1: false, bool2: false }; this.sendDataMessage(opt, sid); } sendPingRequest() { let currentTimestamp = Date.now() - this.startTimestamp; let packet = RtmpPacket.create(); packet.header.fmt = RTMP_CHUNK_TYPE_0; packet.header.cid = RTMP_CHANNEL_PROTOCOL; packet.header.type = RTMP_TYPE_EVENT; packet.header.timestamp = currentTimestamp; packet.payload = Buffer.from([0, 6, (currentTimestamp >> 24) & 0xff, (currentTimestamp >> 16) & 0xff, (currentTimestamp >> 8) & 0xff, currentTimestamp & 0xff]); packet.header.length = packet.payload.length; let chunks = this.rtmpChunksCreate(packet); this.socket.write(chunks); } respondConnect(tid) { let opt = { cmd: '_result', transId: tid, cmdObj: { fmsVer: 'FMS/3,0,1,123', capabilities: 31 }, info: { level: 'status', code: 'NetConnection.Connect.Success', description: 'Connection succeeded.', objectEncoding: this.objectEncoding } }; this.sendInvokeMessage(0, opt); } respondCreateStream(tid) { this.streams++; let opt = { cmd: "_result", transId: tid, cmdObj: null, info: this.streams }; this.sendInvokeMessage(0, opt); } respondPlay() { this.sendStreamStatus(STREAM_BEGIN, this.playStreamId); this.sendStatusMessage(this.playStreamId, 'status', 'NetStream.Play.Reset', 'Playing and resetting stream.'); this.sendStatusMessage(this.playStreamId, 'status', 'NetStream.Play.Start', 'Started playing stream.'); this.sendRtmpSampleAccess(); } onConnect(invokeMessage) { invokeMessage.cmdObj.app = invokeMessage.cmdObj.app.replace('/', ''); //fix jwplayer if (!this.isIPC) { context.nodeEvent.emit('preConnect', this.id, invokeMessage.cmdObj); } // this.eventEmitter.emit('authConnection', { // appname: this.appname, // _this: this // }); if (!this.isStarting) { return; } this.connectCmdObj = invokeMessage.cmdObj; this.appname = invokeMessage.cmdObj.app; this.objectEncoding = invokeMessage.cmdObj.objectEncoding != null ? invokeMessage.cmdObj.objectEncoding : 0; this.connectTime = new Date(); this.startTimestamp = Date.now(); this.pingInterval = setInterval(() => { this.sendPingRequest(); }, this.pingTime); this.sendWindowACK(5000000); this.setPeerBandwidth(5000000, 2); this.setChunkSize(this.outChunkSize); this.respondConnect(invokeMessage.transId); if (!this.isIPC) { // Logger.log(`[rtmp connect] id=${this.id} ip=${this.ip} app=${this.appname} args=${JSON.stringify(invokeMessage.cmdObj)}`); context.nodeEvent.emit('postConnect', this.id, invokeMessage.cmdObj); } } onCreateStream(invokeMessage) { this.respondCreateStream(invokeMessage.transId); } onPublish(invokeMessage) { if (typeof invokeMessage.streamName !== 'string') { return; } this.publishStreamPath = '/' + this.appname + '/' + invokeMessage.streamName.split('?')[0]; this.publishArgs = QueryString.parse(invokeMessage.streamName.split('?')[1]); this.publishStreamId = this.parserPacket.header.stream_id; if (!this.isIPC) { context.nodeEvent.emit('prePublish', this.id, this.publishStreamPath, this.publishArgs); } if (!this.isStarting) { return; } if (this.config.auth && this.config.auth.publish && !this.isLocal && !this.isIPC) { let results = NodeCoreUtils.verifyAuth(this.publishArgs.sign, this.publishStreamPath, this.config.auth.secret); if (!results) { // Logger.log(`[rtmp publish] Unauthorized. id=${this.id} streamPath=${this.publishStreamPath} streamId=${this.publishStreamId} sign=${this.publishArgs.sign} `); this.sendStatusMessage(this.publishStreamId, 'error', 'NetStream.publish.Unauthorized', 'Authorization required.'); return; } } if (context.publishers.has(this.publishStreamPath)) { // Logger.log(`[rtmp publish] Already has a stream. id=${this.id} streamPath=${this.publishStreamPath} streamId=${this.publishStreamId}`); this.sendStatusMessage(this.publishStreamId, 'error', 'NetStream.Publish.BadName', 'Stream already publishing'); } else if (this.isPublishing) { // Logger.log(`[rtmp publish] NetConnection is publishing. id=${this.id} streamPath=${this.publishStreamPath} streamId=${this.publishStreamId}`); this.sendStatusMessage(this.publishStreamId, 'error', 'NetStream.Publish.BadConnection', 'Connection already publishing'); } else { if (!this.isIPC) { // Logger.log(`[rtmp publish] New stream. id=${this.id} streamPath=${this.publishStreamPath} streamId=${this.publishStreamId}`); } context.publishers.set(this.publishStreamPath, this.id); this.isPublishing = true; this.sendStatusMessage(this.publishStreamId, 'status', 'NetStream.Publish.Start', `${this.publishStreamPath} is now published.`); for (let idlePlayerId of context.idlePlayers) { let idlePlayer = context.sessions.get(idlePlayerId); if (idlePlayer.playStreamPath === this.publishStreamPath) { idlePlayer.onStartPlay(); context.idlePlayers.delete(idlePlayerId); } } setTimeout(() => { this.publishArgs.ac = this.audioCodec; this.publishArgs.vc = this.videoCodec; if (!this.isIPC) { context.nodeEvent.emit('postPublish', this.id, this.publishStreamPath, this.publishArgs); } }, 1000);//TODO 只提交事件,不传音视频参数,由转码器自行分析 } } onPlay(invokeMessage) { if (typeof invokeMessage.streamName !== 'string') { return; } this.playStreamPath = '/' + this.appname + '/' + invokeMessage.streamName.split('?')[0]; this.playArgs = QueryString.parse(invokeMessage.streamName.split('?')[1]); this.playStreamId = this.parserPacket.header.stream_id; if (!this.isIPC) { context.nodeEvent.emit('prePlay', this.id, this.playStreamPath, this.playArgs); } if (!this.isStarting) { return; } if (this.config.auth && this.config.auth.play && !this.isLocal && !this.isIPC) { let results = NodeCoreUtils.verifyAuth(this.playArgs.sign, this.playStreamPath, this.config.auth.secret); if (!results) { // Logger.log(`[rtmp play] Unauthorized. id=${this.id} streamPath=${this.playStreamPath} streamId=${this.playStreamId} sign=${this.playArgs.sign}`); this.sendStatusMessage(this.playStreamId, 'error', 'NetStream.play.Unauthorized', 'Authorization required.'); return; } } if (this.isPlaying) { if (!this.isIPC) { // Logger.log(`[rtmp play] NetConnection is playing. id=${this.id} streamPath=${this.playStreamPath} streamId=${this.playStreamId} `); } this.sendStatusMessage(this.playStreamId, 'error', 'NetStream.Play.BadConnection', 'Connection already playing'); } else { this.respondPlay(); } if (context.publishers.has(this.playStreamPath)) { this.onStartPlay(); } else { // Logger.log(`[rtmp play] Stream not found. id=${this.id} streamPath=${this.playStreamPath} streamId=${this.playStreamId}`); this.isIdling = true; context.idlePlayers.add(this.id); } } onStartPlay() { let publisherId = context.publishers.get(this.playStreamPath); let publisher = context.sessions.get(publisherId); let players = publisher.players; players.add(this.id); if (publisher.metaData != null) { let packet = RtmpPacket.create(); packet.header.fmt = RTMP_CHUNK_TYPE_0; packet.header.cid = RTMP_CHANNEL_DATA; packet.header.type = RTMP_TYPE_DATA; packet.payload = publisher.metaData; packet.header.length = packet.payload.length; packet.header.stream_id = this.playStreamId; let chunks = this.rtmpChunksCreate(packet); this.socket.write(chunks); } if (publisher.audioCodec === 10) { let packet = RtmpPacket.create(); packet.header.fmt = RTMP_CHUNK_TYPE_0; packet.header.cid = RTMP_CHANNEL_AUDIO; packet.header.type = RTMP_TYPE_AUDIO; packet.payload = publisher.aacSequenceHeader; packet.header.length = packet.payload.length; packet.header.stream_id = this.playStreamId; let chunks = this.rtmpChunksCreate(packet); this.socket.write(chunks); } if (publisher.videoCodec === 7 || publisher.videoCodec === 12) { let packet = RtmpPacket.create(); packet.header.fmt = RTMP_CHUNK_TYPE_0; packet.header.cid = RTMP_CHANNEL_VIDEO; packet.header.type = RTMP_TYPE_VIDEO; packet.payload = publisher.avcSequenceHeader; packet.header.length = packet.payload.length; packet.header.stream_id = this.playStreamId; let chunks = this.rtmpChunksCreate(packet); this.socket.write(chunks); } if (publisher.rtmpGopCacheQueue != null) { for (let chunks of publisher.rtmpGopCacheQueue) { chunks.writeUInt32LE(this.playStreamId, 8); this.socket.write(chunks); } } this.isIdling = false; this.isPlaying = true; if (!this.isIPC) { context.nodeEvent.emit('postPlay', this.id, this.playStreamPath, this.playArgs); // Logger.log(`[rtmp play] Join stream. id=${this.id} streamPath=${this.playStreamPath} streamId=${this.playStreamId} `); } } onPause(invokeMessage) { this.isPause = invokeMessage.pause; let c = this.isPause ? 'NetStream.Pause.Notify' : 'NetStream.Unpause.Notify'; let d = this.isPause ? 'Paused live' : 'Unpaused live'; // Logger.log(`[rtmp play] ${d} stream. id=${this.id} streamPath=${this.playStreamPath} streamId=${this.playStreamId} `); if (!this.isPause) { this.sendStreamStatus(STREAM_BEGIN, this.playStreamId); if (context.publishers.has(this.playStreamPath)) { //fix ckplayer let publisherId = context.publishers.get(this.playStreamPath); let publisher = context.sessions.get(publisherId); let players = publisher.players; if (publisher.audioCodec === 10) { let packet = RtmpPacket.create(); packet.header.fmt = RTMP_CHUNK_TYPE_0; packet.header.cid = RTMP_CHANNEL_AUDIO; packet.header.type = RTMP_TYPE_AUDIO; packet.payload = publisher.aacSequenceHeader; packet.header.length = packet.payload.length; packet.header.stream_id = this.playStreamId; packet.header.timestamp = publisher.parserPacket.clock; // ?? 0 or clock let chunks = this.rtmpChunksCreate(packet); this.socket.write(chunks); } if (publisher.videoCodec === 7 || publisher.videoCodec === 12) { let packet = RtmpPacket.create(); packet.header.fmt = RTMP_CHUNK_TYPE_0; packet.header.cid = RTMP_CHANNEL_VIDEO; packet.header.type = RTMP_TYPE_VIDEO; packet.payload = publisher.avcSequenceHeader; packet.header.length = packet.payload.length; packet.header.stream_id = this.playStreamId; packet.header.timestamp = publisher.parserPacket.clock;// ?? 0 or clock let chunks = this.rtmpChunksCreate(packet); this.socket.write(chunks); } } } else { this.sendStreamStatus(STREAM_EOF, this.playStreamId); } this.sendStatusMessage(this.playStreamId, c, d); } onReceiveAudio(invokeMessage) { this.isReceiveAudio = invokeMessage.bool; // Logger.log(`[rtmp play] receiveAudio=${this.isReceiveAudio} id=${this.id} `); } onReceiveVideo(invokeMessage) { this.isReceiveVideo = invokeMessage.bool; // Logger.log(`[rtmp play] receiveVideo=${this.isReceiveVideo} id=${this.id} `); } onCloseStream() { //red5-publisher let closeStream = { streamId: this.parserPacket.header.stream_id }; this.onDeleteStream(closeStream); } onDeleteStream(invokeMessage) { if (invokeMessage.streamId == this.playStreamId) { if (this.isIdling) { context.idlePlayers.delete(this.id); this.isIdling = false; } else { let publisherId = context.publishers.get(this.playStreamPath); if (publisherId != null) { context.sessions.get(publisherId).players.delete(this.id); } if (!this.isIPC) { context.nodeEvent.emit('donePlay', this.id, this.playStreamPath, this.playArgs); } this.isPlaying = false; } if (!this.isIPC) { // Logger.log(`[rtmp play] Close stream. id=${this.id} streamPath=${this.playStreamPath} streamId=${this.playStreamId}`); } if (this.isStarting) { this.sendStatusMessage(this.playStreamId, 'status', 'NetStream.Play.Stop', 'Stopped playing stream.'); } this.playStreamId = 0; this.playStreamPath = ''; } if (invokeMessage.streamId == this.publishStreamId) { if (this.isPublishing) { if (!this.isIPC) { // Logger.log(`[rtmp publish] Close stream. id=${this.id} streamPath=${this.publishStreamPath} streamId=${this.publishStreamId}`); context.nodeEvent.emit('donePublish', this.id, this.publishStreamPath, this.publishArgs); } if (this.isStarting) { this.sendStatusMessage(this.publishStreamId, 'status', 'NetStream.Unpublish.Success', `${this.publishStreamPath} is now unpublished.`); } for (let playerId of this.players) { let player = context.sessions.get(playerId); if (player instanceof NodeRtmpSession) { player.sendStatusMessage(player.playStreamId, 'status', 'NetStream.Play.UnpublishNotify', 'stream is now unpublished.'); } else { player.stop(); } } //let the players to idlePlayers for (let playerId of this.players) { let player = context.sessions.get(playerId); context.idlePlayers.add(playerId); player.isPlaying = false; player.isIdling = true; if (player instanceof NodeRtmpSession) { player.sendStreamStatus(STREAM_EOF, player.playStreamId); } } context.publishers.delete(this.publishStreamPath); if (this.rtmpGopCacheQueue) { this.rtmpGopCacheQueue.clear(); } this.players.clear(); this.isPublishing = false; } this.publishStreamId = 0; this.publishStreamPath = ''; } } } module.exports = NodeRtmpSession;