const esl = require('modesl') ;
const assert = require('assert') ;
const delegate = require('delegates') ;
const Emitter = require('events').EventEmitter ;
const _ = require('lodash') ;
const only = require('only') ;
const async = require('async') ;
const generateUuid = require('uuid') ;
const Endpoint = require('./endpoint') ;
const Conference = require('./conference');
const fs = require('fs') ;
const path = require('path') ;
const net = require('net') ;
const moment = require('moment') ;
const debug = require('debug')('drachtio-fsmrf') ;
function wrapConnectionForTracing(ms, conn, tag) {
const origSend = conn.send ;
conn.send = (command, args) => {
let trace = command + '\n' ;
if (args) {
Object.keys(args).forEach(function(key) {
trace += (key + ': ' + args[key] + '\n');
});
trace += '\n';
}
ms._onRawSend(tag, trace) ;
return origSend.call(conn, command, args) ;
} ;
}
/**
* A freeswitch-based media-processing resource that contains Endpoints and Conferences.
* @constructor
* @param {esl.Connection} conn inbound connection to freeswitch event socket
* @param {Mrf} mrf media resource function that instantiated this MediaServer
* @param {object} app drachtio app
* @param {number} listenPort tcp port to listen on for outbound event socket connections
*
* @fires MediaServer#connect
* @fires MediaServer#ready
* @fires MediaServer#error
*/
class MediaServer extends Emitter {
constructor(conn, mrf, listenAddress, listenPort) {
super() ;
this._conn = conn ;
this._mrf = mrf ;
this._srf = mrf.srf ;
this.pendingConnections = new Map() ;
/**
* maximum number of active Endpoints allowed on the MediaServer
* @type {Number}
*/
this.maxSessions = 0 ;
/**
* current number of active Endpoints on the MediaServer
* @type {Number}
*/
this.currentSessions = 0 ;
/**
* current calls per second on the MediaServer
* @type {Number}
*/
this.cps = 0 ;
/**
* sip addresses and ports that the mediaserver is listening on
* note that different addresses may be used for ipv4, ipv6, dtls, or udp connections
* @type {Object}
*/
this.sip = {
ipv4: {
udp: {},
dtls: {}
},
ipv6: {
udp: {},
dtls: {}
}
} ;
if (mrf.debugDir) {
// trace all messages to a log file
const tag = 'mediaserver-' + conn.socket.address().address + '.txt' ;
this.logger = fs.createWriteStream(path.join(mrf.debugDir, tag)) ;
conn.on('esl::event::**', this._onRawRecv.bind(this, tag)) ;
wrapConnectionForTracing(this, conn, tag) ;
}
this._conn.subscribe(['HEARTBEAT']) ;
this._conn.on('esl::event::HEARTBEAT::*', this._onHeartbeat.bind(this)) ;
this._conn.on('error', this._onError.bind(this));
//create the server (outbound connections)
const server = net.createServer() ;
server.listen(listenPort, listenAddress, () => {
this.listenAddress = server.address().address;
this.listenPort = server.address().port ;
this._server = new esl.Server({server: server, myevents:false}, () => {
this.emit('connect') ;
// exec 'sofia status' on the freeswitch server to find out
// configuration information, including the sip address and port the media server is listening on
this._conn.api('sofia status', (res) => {
const status = res.getBody() ;
let re = /^\s*drachtio_mrf\s.*sip:mod_sofia@((?:[0-9]{1,3}\.){3}[0-9]{1,3}:\d+)/m ;
let results = re.exec(status) ;
if (null === results) throw new Error(`No drachtio_mrf sip profile found on the media server: ${status}`);
if (results) {
this.sip.ipv4.udp.address = results[1] ;
}
// see if we have a TLS endpoint (Note: it needs to come after the RTP one in the sofia status output)
re = /^\s*drachtio_mrf\s.*sip:mod_sofia@((?:[0-9]{1,3}\.){3}[0-9]{1,3}:\d+).*\(TLS\)/m ;
results = re.exec(status) ;
if (results) {
this.sip.ipv4.dtls.address = results[1] ;
}
// see if we have an ipv6 endpoint
re = /^\s*drachtio_mrf.*sip:mod_sofia@(\[[0-9a-f:]+\]:\d+)/m ;
results = re.exec(status) ;
if (results) {
this.sip.ipv6.udp.address = results[1] ;
}
// see if we have an ipv6 TLS endpoint
re = /^\s*drachtio_mrf.*sip:mod_sofia@(\[[0-9a-f:]+\]:\d+).*\(TLS\)/m ;
results = re.exec(status);
if (results) {
this.sip.ipv6.dtls.address = results[1] ;
}
debug('media server signaling addresses: %s', JSON.stringify(this.sip));
this.emit('ready') ;
});
});
this._server.on('connection::ready', this._onNewCall.bind(this)) ;
}) ;
}
get address() {
return this.conn.socket.remoteAddress ;
}
get conn() {
return this._conn ;
}
get srf() {
return this._srf;
}
/**
* disconnect from the media server
*/
disconnect() {
this._server.close() ;
this.conn.disconnect() ;
}
/**
* check if the media server has a specific capability
* @param {string} a named capability - ipv6, ipv4, dtls, or udp
* @return {Boolean} true if the media server supports this capability
*/
hasCapability(capability) {
let family = 'ipv4' ;
const cap = typeof capability === 'string' ? [capability] : capability ;
let idx = cap.indexOf('ipv6') ;
if (-1 !== idx) {
cap.splice(idx, 1) ;
family = 'ipv6' ;
}
else {
idx = cap.indexOf('ipv4') ;
if (-1 !== idx) {
cap.splice(idx, 1) ;
}
}
assert.ok(-1 !== ['dtls', 'udp'].indexOf(cap[0]), 'capability must be from the set ipv6, ipv4, dtls, udp') ;
return 'address' in this.sip[family][cap[0]] ;
}
/**
* send a freeswitch API command to the server
* @param {string} command command to execute
* @param {MediaServer~apiCallback} [callback] optional callback that returns api response
* @return {Promise|Mediaserver} returns a Promise if no callback supplied; otherwise
* a reference to the mediaserver object
*/
api(command, callback) {
assert(typeof command, 'string', '\'command\' must be a valid freeswitch api command') ;
const __x = (callback) => {
this.conn.api(command, (res) => {
callback(res.getBody()) ;
}) ;
};
if (callback) {
__x(callback) ;
return this ;
}
return new Promise((resolve) => {
__x((body) => {
resolve(body);
});
});
}
/**
* allocate an Endpoint on the MediaServer, optionally allocating a media session to stream to a
* remote far end SDP (session description protocol). If no far end SDP is provided, the endpoint
* is initially created in the inactive state.
* @param {MediaServer~EndpointOptions} [opts] - create options
* @param {MediaServer~createEndpointCallback} [callback] callback that provides error or Endpoint
* @return {Promise|Mediaserver} returns a Promise if no callback supplied; otherwise
* a reference to the mediaserver object
*/
createEndpoint(opts, callback) {
if (typeof opts === 'function') {
callback = opts ;
opts = {} ;
}
opts = opts || {} ;
opts.is3pcc = !opts.remoteSdp;
opts.dtls = opts.dtls ;
var family = opts.family || 'ipv4' ;
var proto = opts.dtls ? 'dtls' : 'udp';
const __x = (callback) => {
if (!this.connected()) {
return process.nextTick(() => { callback(new Error('too early: mediaserver is not connected')) ;}) ;
}
if (!this.sip[family][proto].address) {
return process.nextTick(() => { callback(new Error('too early: mediaserver is not ready')) ;}) ;
}
// generate a unique id to track the endpoint during creation
const uuid = generateUuid.v4() ;
const hasDtls = opts.dtls && this.hasCapability([family, 'dtls']);
const uri = `sip:drachtio@${this.sip[family][hasDtls ? 'dtls' : 'udp'].address}`;
let dialog, conn ;
debug(`MediaServer#createEndpoint: sending ${opts.is3pcc ? '3ppc' : ''} INVITE to uri ${uri} with id ${uuid}`);
this.srf.createUAC(uri, {
headers: {
'User-Agent': `drachtio-fsmrf:${uuid}`,
'X-esl-outbound': `${this.listenAddress}:${this.listenPort}`
},
localSdp: opts.remoteSdp
})
.then((dlg) => {
debug(`MediaServer#createEndpoint - createUAC produced dialog for ${uuid}`) ;
dialog = dlg ;
if (conn) produceEndpoint.bind(this)(conn, dialog);
return dlg ;
})
.catch((err) => {
debug(`MediaServer#createEndpoint - createUAC returned error for ${uuid}`) ;
this.pendingConnections.delete(uuid) ;
produceError(new Error(`{err.status} ${err.reason}`));
});
const connectionTimeout = setTimeout(() => {
delete this.pendingConnections.delete(uuid);
produceError(new Error('Connection timeout')) ;
debug(`MediaServer#createEndpoint - connection timeout for ${uuid}`) ;
}, 10000) ;
const connectCallback = (c) => {
debug(`MediaServer#createEndpoint - connectCallback invoked for ${uuid}`) ;
clearTimeout(connectionTimeout);
conn = c ;
if (!opts.is3pcc) conn.execute('answer');
if (dialog) produceEndpoint.bind(this)(conn, dialog) ;
} ;
const produceEndpoint = (conn, dialog) => {
debug(`MediaServer#createEndpoint - produceEndpoint for ${uuid}`);
const endpoint = new Endpoint(conn, dialog, this, opts);
endpoint.once('ready', () => {
debug(`MediaServer#createEndpoint - returning endpoint for uuid ${uuid}`);
callback(null, endpoint);
});
};
const produceError = (err) => {
callback(err);
};
this.pendingConnections.set(uuid, connectCallback.bind(this));
};
if (callback) {
__x(callback);
return this ;
}
return new Promise((resolve, reject) => {
__x((err, endpoint) => {
if (err) return reject(err);
resolve(endpoint);
});
});
}
/**
* connects an incoming call to the media server, producing both an Endpoint and a SIP Dialog upon success
* @param {Object} req - drachtio request object for incoming call
* @param {Object} res - drachtio response object for incoming call
* @param {MediaServer~EndpointOptions} [opts] - options for creating endpoint and responding to caller
* @param {MediaServer~connectCallerCallback} callback callback invoked on completion of operation
* @return {Promise|Mediaserver} returns a Promise if no callback supplied; otherwise
* a reference to the mediaserver object
*/
connectCaller(req, res, opts, callback) {
if (typeof opts === 'function') {
callback = opts ;
opts = {} ;
}
opts = opts || {} ;
const __x = (callback) => {
async.waterfall([
function createEndpoint(callback) {
this.createEndpoint({
remoteSdp: req.body,
codecs: opts.codecs
}, callback) ;
}.bind(this),
function respondToCaller(endpoint, callback) {
this.srf.createUAS(req, res, {
localSdp: endpoint.local.sdp,
headers: opts.headers
}, (err, dialog) => {
if (err) return callback(err);
callback(null, {endpoint, dialog}) ;
}) ;
}.bind(this)
], (err, pair) => {
callback(err, pair) ;
}) ;
};
if (callback) {
__x(callback);
return this ;
}
return new Promise((resolve, reject) => {
__x((err, pair) => {
if (err) return reject(err);
resolve(pair);
});
});
}
/**
* creates a conference on the media server.
* @param {String} [name] - conference name; if not supplied a unique name will be generated
* @param {MediaServer~conferenceCreateOptions} [opts] - conference-level configuration options
* @param {MediaServer~createConferenceCallback} [callback] - callback invoked when conference is created
* @return {Promise|Mediaserver} returns a Promise if no callback supplied; otherwise
* a reference to the mediaserver object
*/
createConference(name, opts, callback) {
if (typeof name !== 'string') {
callback = opts;
opts = name;
name = `anon-${generateUuid.v4()}`;
}
if (typeof opts === 'function') {
callback = opts ;
opts = {} ;
}
opts = opts || {} ;
assert.equal(typeof name, 'string', '\'name\' is a required parameter') ;
assert.ok(typeof opts === 'object', 'opts param must be an object') ;
const __x = (callback) => {
/* Steps for creating a conference:
(1) Check to see if a conference of that name already exists - return error if so
(2) Create the conference control leg (endpoint)
(3) Create the conference
*/
async.waterfall([
(callback) => { // verify conference does not already exist
this.api(`conference ${name} list count`, (result) => {
debug(`return from conference list: ${result}`) ;
if (typeof result === 'string' &&
(result.match(/^No active conferences/) || result.match(/^Conference.*not found/))) {
return callback(null) ;
}
//callback(null) ;
callback(new Error('conference exists')) ;
});
},
(callback) => { // create an endpoint on the media server
this.createEndpoint((err, endpoint) => {
if (err) return callback(err);
callback(null, endpoint) ;
});
},
(endpoint, callback) => { // create the conference on Freeswitch using the control leg
opts.flags = Object.assign({}, opts.flags, {
endconf: true,
mute: true,
vmute: true
});
endpoint.join(name, opts, (err, {memberId, confUuid}) => {
callback(err, endpoint, confUuid);
});
}
], (err, endpoint, confUuid) => {
if (err) {
return callback(err);
}
const conference = new Conference(name, confUuid, endpoint, opts);
debug(`MediaServer#createConference: created conference ${name}:${confUuid}`) ;
callback(null, conference) ;
}) ;
};
if (callback) {
__x(callback);
return this ;
}
return new Promise((resolve, reject) => {
__x((err, conference) => {
if (err) return reject(err);
resolve(conference);
});
});
}
toJSON() {
return only(this, 'sip maxSessions currentSessions cps cpuIdle fsVersion hostname v4address pendingConnections') ;
}
_onError(err) {
this.emit('error', err);
}
_onHeartbeat(evt) {
this.maxSessions = parseInt(evt.getHeader('Max-Sessions')) ;
this.currentSessions = parseInt(evt.getHeader('Session-Count')) ;
this.cps = parseInt(evt.getHeader('Session-Per-Sec')) ;
this.hostname = evt.getHeader('FreeSWITCH-Hostname') ;
this.v4address = evt.getHeader('FreeSWITCH-IPv4') ;
this.v6address = evt.getHeader('FreeSWITCH-IPv6') ;
this.fsVersion = evt.getHeader('FreeSWITCH-Version') ;
this.cpuIdle = parseFloat(evt.getHeader('Idle-CPU')) ;
}
_onCreateTimeout(uuid) {
if (!(uuid in this.pendingConnections)) {
console.error(`MediaServer#_onCreateTimeout: uuid not found: ${uuid}`) ;
return ;
}
const obj = this.pendingConnections[uuid] ;
obj.callback(new Error('Connection timeout')) ;
clearTimeout(obj.createTimeout) ;
delete this.pendingConnections[uuid] ;
console.log(`createEndpoint ${uuid} timed out; after removing there are ${_.keys(this.pendingConnections).length}
Endpoints in pending create state`) ;
}
_onNewCall(conn, id) {
if (this.logger) {
// debug trace, if enabled
const tag = conn.getInfo().getHeader('Channel-Unique-ID') ;
conn.on('esl::event::**', this._onRawRecv.bind(this, tag)) ;
wrapConnectionForTracing(this, conn, tag) ;
}
const userAgent = conn.getInfo().getHeader('variable_sip_user_agent') ;
const re = /^drachtio-fsmrf:(.+)$/ ;
const results = re.exec(userAgent) ;
if (null === results) {
console.error(`received INVITE without drachtio-fsmrf header, unexpected User-Agent: ${userAgent}`) ;
return conn.execute('hangup', 'NO_ROUTE_DESTINATION') ;
}
const uuid = results[1] ;
if (!uuid || !this.pendingConnections.has(uuid)) {
console.error(`received INVITE with unknown uuid: ${uuid}`) ;
return conn.execute('hangup', 'NO_ROUTE_DESTINATION') ;
}
debug(`MediaServer#_onNewCall: ${this.address} received new call with tracking uuid: ${uuid}`) ;
const f = this.pendingConnections.get(uuid);
this.pendingConnections.delete(uuid);
f(conn);
}
_onRawRecv(tag, event, headers, body) {
const obj = _.find(event.headers, (hdr) => { return hdr.name === 'Event-Name'; }) ;
if (obj && -1 !== ['RE_SCHEDULE', 'HEARTBEAT'].indexOf(obj.value)) {
this.logger.write(`\n....skipping ${obj.value}....`) ;
return ;
}
this.logger.write(`\n${moment().format('YYYY-MM-DD:HH:mm:ss')}: RECEIVING ${tag}\n`) ;
this.logger.write(event.serialize()) ;
this.logger.write('\n') ;
}
_onRawSend(tag, data) {
this.logger.write(`\n${moment().format('YYYY-MM-DD:HH:mm:ss')}: SENDING ${tag}\n`) ;
this.logger.write(data) ;
this.logger.write('\n') ;
}
}
/**
* This callback provides the response to an attempt to create an Endpoint on the MediaServer.
* @callback MediaServer~createEndpointCallback
* @param {Error} error encountered while attempting to create the endpoint
* @param {Endpoint} endpoint that was created
*/
/**
* This callback provides the response to an attempt to create a Conference on the MediaServer.
* @callback MediaServer~createConferenceCallback
* @param {Error} error encountered while attempting to create the conference
* @param {Conference} conference that was created
*/
/**
/**
* This callback provides the response to an api request
* @callback Mrf~apiCallback
* @param {string} response - body of the response from freeswitch
*/
/**
* This callback provides the response to an attempt connect a caller to the MediaServer.
* @callback MediaServer~connectCallerCallback
* @param {Error} err - error encountered while attempting to create the endpoint
* @param {Endpoint} ep - endpoint that was created
* @param {Dialog} dialog - sip dialog that was created
*/
/** returns true if the MediaServer is in the 'connected' state
* @name MediaServer#connected
* @method
*/
delegate(MediaServer.prototype, '_conn')
.method('connected') ;
/**
* Options governing the creation of a conference
* @typedef {Object} MediaServer~conferenceCreateOptions
* @property {string} [pin] entry pin for the conference
* @property {string} [profile=default] conference profile to use
* @property {Object} [flags] parameters governing the connection of the endpoint to the conference
* @property {boolean} [flags.waitMod=false] Members will wait (with music) until a member with the 'moderator' flag
* set enters the conference
* @property {boolean} [flags.audioAlways=false] Do not use energy detection to choose which participants to mix;
* instead always mix audio from all members
* @property {boolean} [flags.videoBridgeFirstTwo=false] In mux mode, If there are only 2 people in conference,
* you will see only the other member
* @property {boolean} [flags.videoMuxingPersonalCanvas=false] In mux mode, each member will get their own canvas
* and they will not see themselves
* @property {boolean} [flags.videoRequiredForCanvas=false] Only video participants will be shown
* on the canvas (no avatars)
*/
/**
* Arguments provided when creating an Endpoint on a MediaServer
* @typedef {Object} MediaServer~EndpointOptions
* @property {String} [remoteSdp] remote session description protocol
* (if not provided, an initially inactive Endpoint will be created)
* @property {String[]} [codecs] - array of codecs, in preferred order (e.g. ['PCMU','G722','PCMA'])
*/
/**
* connect event triggered when connection is made to the freeswitch media server.
* @event MediaServer#connect
*/
/**
* ready event triggered after connecting to the server and verifying
* it is properly configured and ready to accept calls.
* @event MediaServer#ready
*/
/**
* Error event triggered when connection to freeswitch media server fails.
*
* @event MediaServer#error
* @type {object}
* @property {String} message - Indicates the reason the connection failed
*/
module.exports = exports = MediaServer ;