const Emitter = require('events').EventEmitter ;
const assert = require('assert') ;
const only = require('only') ;
const _ = require('lodash') ;
const async = require('async') ;
const debug = require('debug')('drachtio-fsmrf') ;
const State = {
NOT_CREATED: 1,
CREATED: 2,
DESTROYED: 3
};
function unhandled(evt) {
debug(`unhandled conference event: ${evt.getHeader('Action')}`) ;
}
/**
* An audio or video conference mixer. Conferences may be created on the fly by simply joining an endpoint
* to a named conference without explicitly creating a Conference object. The main purpose of the Conference
* object is to enable the ability to create a conference on the media server without having an inbound call
* (e.g., to create a scheduled conference at a particular point in time).
*
* Note: This constructor should not be called directly: rather, call MediaServer#createConference
* to create an instance of an Endpoint on a MediaServer
* @constructor
* @param {String} name conference name
* @param {String} uuid conference uuid
* @param {Endpoint} endpoint - endpoint that provides the control connection for the conference
* @param {Conference~createOptions} [opts] - conference-level configuration options
*/
class Conference extends Emitter {
constructor(name, uuid, endpoint, opts) {
super() ;
debug('Conference#ctor');
opts = opts || {} ;
this._endpoint = endpoint ;
/**
* conference name
* @type {string}
*/
this.name = name ;
/**
* conference unique id
* @type {string}
*/
this.uuid = uuid ;
/**
* file that conference is currently being recorded to
* @type {String}
*/
this.recordFile = null ;
/**
* conference state
* @type {Number}
*/
this.state = State.CREATED ;
/**
* true if conference is locked
* @type {Boolean}
*/
this.locked = false ;
/**
* member ID of the conference control leg
* @type {Number}
*/
this.memberId = this.endpoint.conf.memberId ;
/**
* current participants in the conference, keyed by member ID
* @type {Map}
*/
this.participants = new Map() ;
/**
* max number of members allowed (-1 means no limit)
* @type {Number}
*/
this.maxMembers = -1 ;
// used to track play commands in progress
this._playCommands = {} ;
this.endpoint.filter('Conference-Unique-ID', this.uuid);
this.endpoint.conn.on('esl::event::CUSTOM::*', this.__onConferenceEvent.bind(this)) ;
if (opts.maxMembers) {
this.endpoint.api('conference', `${name} set max_members ${opts.maxMembers}`) ;
this.maxMembers = opts.maxMembers ;
}
}
get endpoint() {
return this._endpoint ;
}
get mediaserver() {
return this.endpoint.mediaserver ;
}
/**
* destroy the conference, releasing all legs
* @param {Conference~operationCallback} callback - callback invoked when conference has been destroyed
* @return {Promise|Conference} returns a Promise if no callback supplied; otherwise
* a reference to the Conference object
*/
destroy(callback) {
debug(`Conference#destroy - destroying conference ${this.name}`);
const __x = (callback) => {
this.endpoint.destroy(callback) ;
};
if (callback) {
__x(callback) ;
return this ;
}
return new Promise((resolve, reject) => {
__x((err) => {
if (err) return reject(err);
resolve();
});
});
}
/**
* retrieve the current number of participants in the conference
* @return {Promise} promise that resolves with the count of participants (including control leg)
*/
getSize() {
return this.list('count')
.then((evt) => {
try {
return parseInt(evt.getBody()) ;
} catch (err) {
throw new Error(`unexpected (non-integer) response to conference list summary: ${err}`);
}
});
}
/**
* get a conference parameter value
* @param {String} param - parameter to retrieve
* @param {Conference~mediaOperationCallback} [callback] - callback invoked when operation completes
* @return {Promise|Conference} returns a Promise if no callback supplied; otherwise
* a reference to the Conference object
*/
get(...args) { return confOperations.get(this, ...args); }
/**
* set a conference parameter value
* @param {String} param - parameter to set
* @param {String} value - value
* @param {Conference~mediaOperationCallback} [callback] - callback invoked when operation completes
* @return {Promise|Conference} returns a Promise if no callback supplied; otherwise
* a reference to the Conference object
*/
set(...args) { return confOperations.set(this, ...args); }
/**
* adjust the automatic gain control for the conference
* @param {Number|String} level - 'on', 'off', or a numeric level
* @param {Conference~mediaOperationCallback} [callback] - callback invoked when operation completes
* @return {Promise|Conference} returns a Promise if no callback supplied; otherwise
* a reference to the Conference object
*/
agc(...args) { return confOperations.agc(this, ...args); }
/**
* check the status of the conference recording
* @param {Conference~mediaOperationsCallback} [callback] - callback invoked when media operations completes
* @return {Promise|Conference} returns a Promise if no callback supplied; otherwise
* a reference to the Conference object
*/
chkRecord(...args) { return confOperations.chkRecord(this, ...args); }
/**
* deaf all the non-moderators in the conference
* @param {Conference~mediaOperationsCallback} [callback] - callback invoked when media operations completes
* @return {Promise|Conference} returns a Promise if no callback supplied; otherwise
* a reference to the Conference object
*/
deaf(...args) { return confOperations.deaf(this, ...args); }
/**
* undeaf all the non-moderators in the conference
* @param {Conference~mediaOperationsCallback} [callback] - callback invoked when media operations completes
* @return {Promise|Conference} returns a Promise if no callback supplied; otherwise
* a reference to the Conference object
*/
undeaf(...args) { return confOperations.undeaf(this, ...args); }
/**
* mute all the non-moderators in the conference
* @param {Conference~mediaOperationsCallback} [cb] - callback invoked when media operations completes
* @return {Promise|Conference} returns a Promise if no callback supplied; otherwise
* a reference to the Conference object
*/
mute(...args) { return confOperations.mute(this, ...args); }
/**
* unmute all the non-moderators in the conference
* @param {Conference~mediaOperationsCallback} [cb] - callback invoked when media operations completes
* @return {Promise|Conference} returns a Promise if no callback supplied; otherwise
* a reference to the Conference object
*/
unmute(...args) { return confOperations.unmute(this, ...args); }
/**
* lock the conference
* @param {Conference~mediaOperationsCallback} [callback] - callback invoked when media operations completes
* @return {Promise|Conference} returns a Promise if no callback supplied; otherwise
* a reference to the Conference object
*/
lock(...args) { return confOperations.lock(this, ...args); }
/**
* unlock the conference
* @param {Conference~mediaOperationsCallback} [cb] - callback invoked when media operations completes
* @return {Promise|Conference} returns a Promise if no callback supplied; otherwise
* a reference to the Conference object
*/
unlock(...args) { return confOperations.unlock(this, ...args); }
/**
* list members
* @param {Conference~mediaOperationsCallback} [cb] - callback invoked when media operations completes
* @return {Promise|Conference} returns a Promise if no callback supplied; otherwise
* a reference to the Conference object
*/
list(...args) { return confOperations.list(this, ...args); }
/**
* start recording the conference
* @param {String} file - filepath to record to
* @param {Conference~mediaOperationsCallback} [callback] - callback invoked when media operations completes
* @return {Promise|Conference} returns a Promise if no callback supplied; otherwise
* a reference to the Conference object
*/
startRecording(file, callback) {
assert.ok(typeof file === 'string', '\'file\' parameter must be provided') ;
const __x = (callback) => {
this.recordFile = file ;
this.endpoint.api('conference ', `${this.name} recording start ${file}`, (err, evt) => {
if (err) return callback(err, evt);
const body = evt.getBody() ;
const regexp = new RegExp(`^Record file ${file}\n$`);
if (regexp.test(body)) {
return callback(null, body);
}
callback(new Error(body));
}) ;
};
if (callback) {
__x(callback) ;
return this ;
}
return new Promise((resolve, reject) => {
__x((err, ...results) => {
if (err) return reject(err);
resolve(...results);
});
});
}
/**
* pause the recording
* @param {Conference~mediaOperationsCallback} [callback] - callback invoked when media operations completes
* @return {Promise|Conference} returns a Promise if no callback supplied; otherwise
* a reference to the Conference object
*/
pauseRecording(file, callback) {
const __x = (callback) => {
this.recordFile = file ;
this.endpoint.api('conference ', `${this.name} recording pause ${this.recordFile}`, (err, evt) => {
if (err) return callback(err, evt);
const body = evt.getBody() ;
const regexp = new RegExp(`^Pause recording file ${file}\n$`);
if (regexp.test(body)) {
return callback(null, body);
}
callback(new Error(body));
}) ;
};
if (callback) {
__x(callback) ;
return this ;
}
return new Promise((resolve, reject) => {
__x((err, ...results) => {
if (err) return reject(err);
resolve(...results);
});
});
}
/**
* resume the recording
* @param {Conference~mediaOperationsCallback} [callback] - callback invoked when media operations completes
* @return {Promise|Conference} returns a Promise if no callback supplied; otherwise
* a reference to the Conference object
*/
resumeRecording(file, callback) {
const __x = (callback) => {
this.recordFile = file ;
this.endpoint.api('conference ', `${this.name} recording resume ${this.recordFile}`, (err, evt) => {
if (err) return callback(err, evt);
const body = evt.getBody() ;
const regexp = new RegExp(`^Resume recording file ${file}\n$`);
if (regexp.test(body)) {
return callback(null, body);
}
callback(new Error(body));
});
};
if (callback) {
__x(callback) ;
return this ;
}
return new Promise((resolve, reject) => {
__x((err, ...results) => {
if (err) return reject(err);
resolve(...results);
});
});
}
/**
* stop the conference recording
* @param {Conference~mediaOperationsCallback} [callback] - callback invoked when media operations completes
* @return {Promise|Conference} returns a Promise if no callback supplied; otherwise
* a reference to the Conference object
*/
stopRecording(file, callback) {
const __x = (callback) => {
this.endpoint.api('conference ', `${this.name} recording stop ${this.recordFile}`, (err, evt) => {
if (err) return callback(err, evt);
const body = evt.getBody() ;
const regexp = new RegExp(`^Stopped recording file ${file}`);
if (regexp.test(body)) {
return callback(null, body);
}
callback(new Error(body));
}) ;
this.recordFile = null ;
};
if (callback) {
__x(callback) ;
return this ;
}
return new Promise((resolve, reject) => {
__x((err, ...results) => {
if (err) return reject(err);
resolve(...results);
});
});
}
/**
* play an audio file or files into the conference
* @param {string|Array} file file (or array of files) to play
* @param {Conference~playOperationCallback} [callback] - callback invoked when the files have completed playing
* @return {Promise|Conference} returns a Promise if no callback supplied; otherwise
* a reference to the Conference object
*/
play(file, callback) {
assert.ok('string' === typeof file || _.isArray(file),
'file param is required and must be a string or array') ;
const __x = (callback) => {
const files = typeof file === 'string' ? [file] : file ;
// each call to conference play queues the file up;
// i.e. the callback returns immediately upon successful queueing,
// not when the file has finished playing
const queued = [] ;
async.eachSeries(files, (f, callback) => {
this.endpoint.api('conference', `${this.name} play ${f}`, (err, result) => {
if (err) return callback(err);
if (result && result.body && -1 !== result.body.indexOf(' not found.')) {
debug(`file ${f} was not queued because it was not found, or conference is empty`);
}
else {
queued.push(f) ;
}
callback(null) ;
}) ;
}, () => {
debug(`files have been queued for playback into conference: ${queued}`) ;
if (queued.length > 0) {
const firstFile = queued[0] ;
const obj = {
remainingFiles: queued.slice(1),
seconds: 0,
milliseconds: 0,
samples: 0,
done: callback
};
this._playCommands[firstFile] = this._playCommands[firstFile] || [] ;
this._playCommands[firstFile].push(obj) ;
}
else {
// no files actually got queued, so execute the callback
debug('Conference#play: no files were queued for callback, so invoking callback immediately') ;
callback(null, {
seconds: 0,
milliseconds: 0,
samples: 0
}) ;
}
}) ;
};
if (callback) {
__x(callback) ;
return this ;
}
return new Promise((resolve, reject) => {
__x((err, ...results) => {
if (err) return reject(err);
resolve(...results);
});
});
}
_onAddMember(evt) {
debug(`Conference#_onAddMember: ${JSON.stringify(this)}`) ;
const size = parseInt(evt.getHeader('Conference-Size')); //includes control leg
const newMemberId = parseInt(evt.getHeader('Member-ID')) ;
const memberType = evt.getHeader('Member-Type') ;
const memberGhost = evt.getHeader('Member-Ghost') ;
const channelUuid = evt.getHeader('Channel-Call-UUID') ;
const obj = {
memberId: newMemberId,
type: memberType,
ghost: memberGhost,
channelUuid: channelUuid
} ;
this.participants.set(newMemberId, obj) ;
debug(`Conference#_onAddMember: added member ${newMemberId} to ${this.name} size is ${size}`) ;
}
_onDelMember(evt) {
const memberId = parseInt(evt.getHeader('Member-ID')) ;
const size = parseInt(evt.getHeader('Conference-Size')); // includes control leg
this.participants.delete(memberId) ;
debug(`Conference#_onDelMember: removed member ${memberId} from ${this.name} size is ${size}`) ;
}
_onStartTalking(evt) {
debug(`Conf ${this.name}:${this.uuid} member ${evt.getHeader('Member-ID')} started talking`) ;
}
_onStopTalking(evt) {
debug(`Conf ${this.name}:${this.uuid} member ${evt.getHeader('Member-ID')} stopped talking`) ;
}
_onMuteDetect(evt) {
debug(`Conf ${this.name}:${this.uuid} muted member ${evt.getHeader('Member-ID')} is talking`) ;
}
_onUnmuteMember(evt) {
debug(`Conf ${this.name}:${this.uuid} member ${evt.getHeader('Member-ID')} has been unmuted`) ;
}
_onMuteMember(evt) {
debug(`Conf ${this.name}:${this.uuid} member ${evt.getHeader('Member-ID')} has been muted`) ;
}
_onKickMember(evt) {
debug(`Conf ${this.name}:${this.uuid} member ${evt.getHeader('Member-ID')} has been kicked`) ;
}
_onDtmfMember(evt) {
debug(`Conf ${this.name}:${this.uuid} member ${evt.getHeader('Member-ID')} has entered DTMF`) ;
}
_onStartRecording(evt) {
debug(`Conference#_onStartRecording: ${this.name}:${this.uuid} ${JSON.stringify(evt)}`);
const err = evt.getHeader('Error');
if (err) {
const path = evt.getHeader('Path');
console.log(`Conference#_onStartRecording: failed to start recording to ${path}: ${err}`);
}
}
_onStopRecording(evt) {
debug(`Conference#_onStopRecording: ${this.name}:${this.uuid} ${JSON.stringify(evt)}`);
}
_onPlayFile(evt) {
const confName = evt.getHeader('Conference-Name') ;
const file = evt.getHeader('File') ;
debug(`conference-level play has started: ${confName}: ${file}`);
}
_onPlayFileMember(evt) {
debug(`member-level play for member ${evt.getHeader('Member-ID')} has completed`) ;
}
_onPlayFileDone(evt) {
const confName = evt.getHeader('Conference-Name') ;
const file = evt.getHeader('File') ;
const seconds = parseInt(evt.getHeader('seconds')) ;
const milliseconds = parseInt(evt.getHeader('milliseconds')) ;
const samples = parseInt(evt.getHeader('samples')) ;
debug(`conference-level play has completed: ${confName}: ${file}
${seconds} seconds, ${milliseconds} milliseconds, ${samples} samples`);
// check if the caller registered a callback for this play done
const el = this._playCommands[file] ;
if (el) {
assert(_.isArray(el), 'Conference#onPlayFileDone: this._playCommands must be an array') ;
const obj = el[0] ;
obj.seconds += seconds ;
obj.milliseconds += milliseconds ;
obj.samples += samples ;
if (0 === obj.remainingFiles.length) {
// done playing all files in this request
obj.done(null, {
seconds: obj.seconds,
milliseconds: obj.milliseconds,
samples: obj.samples
}) ;
}
else {
const firstFile = obj.remainingFiles[0] ;
obj.remainingFiles = obj.remainingFiles.slice(1) ;
this._playCommands[firstFile] = this._playCommands[firstFile] || [] ;
this._playCommands[firstFile].push(obj) ;
}
this._playCommands[file] = this._playCommands[file].slice(1) ;
if (0 === this._playCommands[file].length) {
//done with all queued requests for this file
delete this._playCommands[file] ;
}
}
}
_onLock(evt) {
debug(`conference has been locked: ${JSON.stringify(evt)}`) ;
}
_onUnlock(evt) {
debug(`conference has been unlocked: ${JSON.stringify(evt)}`) ;
}
_onTransfer(evt) {
debug(`member has been transferred to another conference: ${JSON.stringify(evt)}`) ;
}
_onRecord(evt) {
debug(`conference record has started or stopped: ${evt}`) ;
}
__onConferenceEvent(evt) {
const eventName = evt.getHeader('Event-Subclass') ;
if (eventName === 'conference::maintenance') {
const action = evt.getHeader('Action') ;
debug(`Conference#__onConferenceEvent: conference event action: ${action}`) ;
//invoke a handler for this action, if we have defined one
(Conference.prototype[`_on${_.capitalize(_.camelCase(action))}`] || unhandled).bind(this, evt)() ;
}
else {
debug(`Conference#__onConferenceEvent: got unhandled custom event: ${eventName}`) ;
}
}
toJSON() {
return only(this, 'name state uuid memberId confConn endpoint maxMembers locked recordFile') ;
}
toString() {
return this.toJSON().toString() ;
}
}
/**
* This callback is invoked whenever any media command has completed
* @callback Conference~mediaOperationCallback
* @param {Error} err error returned, if any
* @param {String} response - response to the command
*/
/**
* This callback is invoked when a command has completed
* @callback Conference~operationCallback
* @param {Error} err error returned, if any
*/
/**
* This callback is invoked when a playback to conference command completes with a play done event of the final file.
* @callback Conference~playOperationCallback
* @param {Error} err error returned, if any
* @param {Conference~playbackResults} [results] - results describing the duration of media played
*/
/**
* This object describes the options when creating a conference
* @typedef {Object} Conference~createOptions
* @property {number} maxMembers - maximum number of members to allow in the conference
*/
/**
* This object describes the results of a playback into conference operation
* @typedef {Object} Conference~playbackResults
* @property {number} seconds - total seconds of media played
* @property {number} milliseconds - total milliseconds of media played
* @property {number} samples - total number of samples played
*/
exports = module.exports = Conference ;
const confOperations = {} ;
// conference unary operations
['agc', 'list', 'lock', 'unlock', 'mute', 'deaf', 'unmute', 'undeaf', 'chkRecord'].forEach((op) => {
confOperations[op] = (conference, args, callback) => {
assert(conference instanceof Conference);
if (typeof args === 'function') {
callback = args ;
args = '' ;
}
args = args || '';
if (Array.isArray(args)) args = args.join(' ');
debug(`Conference#${_.startCase(op)} conference ${conference.name} ${args}`);
const __x = (callback) => {
conference.endpoint.api('conference', `${conference.name} ${op} ${args}`, (err, evt) => {
if (err) return callback(err, evt);
const body = evt.getBody() ;
if (-1 !== ['lock', 'unlock', 'mute', 'deaf', 'unmute', 'undeaf'].indexOf(op)) {
if (/^OK\s+/.test(body)) return callback(err, body);
return callback(new Error(body));
}
return callback(err, evt);
}) ;
};
if (callback) {
__x(callback) ;
return this ;
}
return new Promise((resolve, reject) => {
__x((err, result) => {
if (err) return reject(err);
resolve(result);
});
});
};
});
confOperations.set = (conference, param, value, callback) => {
assert(conference instanceof Conference);
debug(`Conference#setParam: conference ${conference.name} set ${param} ${value}`);
const __x = (callback) => {
conference.endpoint.api('conference', `${conference.name} set ${param} ${value}`, (err, evt) => {
if (err) return callback(err, evt);
const body = evt.getBody() ;
return callback(err, body);
}) ;
};
if (callback) {
__x(callback) ;
return this ;
}
return new Promise((resolve, reject) => {
__x((err, result) => {
if (err) return reject(err);
resolve(result);
});
});
};
confOperations.get = (conference, param, value, callback) => {
assert(conference instanceof Conference);
debug(`Conference#getParam: conference ${conference.name} get ${param} ${value}`);
const __x = (callback) => {
conference.endpoint.api('conference', `${conference.name} get ${param}`, (err, evt) => {
if (err) return callback(err, evt);
const body = evt.getBody() ;
const res = /^\d+$/.test(body) ? parseInt(body) : body;
return callback(err, res);
}) ;
};
if (callback) {
__x(callback) ;
return this ;
}
return new Promise((resolve, reject) => {
__x((err, result) => {
if (err) return reject(err);
resolve(result);
});
});
};