Skip to content
Snippets Groups Projects
peer.js 10.2 KiB
Newer Older
lazorfuzz's avatar
lazorfuzz committed
import PeerConnection from 'rtcpeerconnection';
import WildEmitter from 'wildemitter';
import FileTransfer from 'filetransfer';
import webrtcSupport from './webrtcsupport';
import MessagePack from 'what-the-pack';

const { encode, decode } = MessagePack.initialize(2**22);
lazorfuzz's avatar
lazorfuzz committed

function isAllTracksEnded(stream) {
  let isAllTracksEnded = true;
  stream.getTracks().forEach((t) => {
    isAllTracksEnded = t.readyState === 'ended' && isAllTracksEnded;
  });
  return isAllTracksEnded;
}

const protoSend = RTCDataChannel.prototype.send;
RTCDataChannel.prototype.send = function (data) {
lazorfuzz's avatar
lazorfuzz committed
class Peer extends WildEmitter {
lazorfuzz's avatar
lazorfuzz committed
  constructor(options) {
lazorfuzz's avatar
lazorfuzz committed
    super();
lazorfuzz's avatar
lazorfuzz committed
    const self = this;
    this.id = options.id;
    this.parent = options.parent;
    this.type = options.type || 'video';
    this.oneway = options.oneway || false;
    this.sharemyscreen = options.sharemyscreen || false;
    this.browserPrefix = options.prefix;
    this.stream = options.stream;
    this.enableDataChannels = options.enableDataChannels === undefined ? this.parent.config.enableDataChannels : options.enableDataChannels;
    this.receiveMedia = options.receiveMedia || this.parent.config.receiveMedia;
    this.channels = {};
    this.sid = options.sid || Date.now().toString();
    // Create an RTCPeerConnection via the polyfill
    this.pc = new PeerConnection(Object.assign({}, this.parent.config.peerConnectionConfig,
                                               { iceServers: options.iceServers || this.parent.config.peerConnectionConfig.iceServers }),
                                 this.parent.config.peerConnectionConstraints);
lazorfuzz's avatar
lazorfuzz committed
    this.pc.on('ice', this.onIceCandidate.bind(this));
    this.pc.on('endOfCandidates', (event) => {
      self.send('endOfCandidates', event);
    });
    this.pc.on('offer', (offer) => {
      if (self.parent.config.nick) offer.nick = self.parent.config.nick;
      self.send('offer', offer);
    });
    this.pc.on('answer', (answer) => {
      if (self.parent.config.nick) answer.nick = self.parent.config.nick;
      self.send('answer', answer);
    });
    this.pc.on('addStream', this.handleRemoteStreamAdded.bind(this));
    this.pc.on('addChannel', this.handleDataChannelAdded.bind(this));
    this.pc.on('removeStream', this.handleStreamRemoved.bind(this));
    // Just fire negotiation needed events for now
    // When browser re-negotiation handling seems to work
    // we can use this as the trigger for starting the offer/answer process
    // automatically. We'll just leave it be for now while this stabalizes.
lazorfuzz's avatar
lazorfuzz committed
    this.pc.on('negotiationNeeded', this.emit.bind(this, 'negotiationNeeded'));
    this.pc.on('iceConnectionStateChange', this.emit.bind(this, 'iceConnectionStateChange'));
    this.pc.on('iceConnectionStateChange', () => {
      switch (self.pc.iceConnectionState) {
        case 'failed':
          // currently, in chrome only the initiator goes to failed
          // so we need to signal this to the peer
lazorfuzz's avatar
lazorfuzz committed
          if (self.pc.pc.localDescription.type === 'offer') {
            self.parent.emit('iceFailed', self);
            self.send('connectivityError');
          }
          break;
        case 'closed':
          this.handleStreamRemoved(false);
          break;
lazorfuzz's avatar
lazorfuzz committed
        default:
          break;
lazorfuzz's avatar
lazorfuzz committed
      }
    });
    this.pc.on('signalingStateChange', this.emit.bind(this, 'signalingStateChange'));
    this.logger = this.parent.logger;

    this.parent.localStreams.forEach((stream) => {
      self.pc.addStream(stream);
    });
lazorfuzz's avatar
lazorfuzz committed

    this.on('channelOpen', (channel) => {
    });

    // proxy events to parent
lazorfuzz's avatar
lazorfuzz committed
    this.on('*', function () {
      self.parent.emit(...arguments);
    });
  }

  handleMessage(message) {
    const self = this;
    this.logger.log('getting', message.type, message);
    if (message.prefix) this.browserPrefix = message.prefix;

    if (message.type === 'offer') {
lazorfuzz's avatar
lazorfuzz committed
      if (!this.nick) {
        const n = message.payload.nick;
        this.nick = n;
      }
      // delete message.payload.nick;
lazorfuzz's avatar
lazorfuzz committed
      this.pc.handleOffer(message.payload, (err) => {
        if (err) {
          return;
        }
lazorfuzz's avatar
lazorfuzz committed
        // auto-accept
lazorfuzz's avatar
lazorfuzz committed
        self.pc.answer((err, sessionDescription) => {
lazorfuzz's avatar
lazorfuzz committed
          // self.send('answer', sessionDescription);
lazorfuzz's avatar
lazorfuzz committed
          // console.log('answering', sessionDescription);
lazorfuzz's avatar
lazorfuzz committed
      });
    } else if (message.type === 'answer') {
      if (!this.nick) this.nick = message.payload.nick;
      delete message.payload.nick;
      this.pc.handleAnswer(message.payload);
    } else if (message.type === 'candidate') {
      this.pc.processIce(message.payload);
    } else if (message.type === 'connectivityError') {
      this.parent.emit('connectivityError', self);
    } else if (message.type === 'mute') {
      this.parent.emit('mute', { id: message.from, name: message.payload.name });
    } else if (message.type === 'unmute') {
      this.parent.emit('unmute', { id: message.from, name: message.payload.name });
    } else if (message.type === 'endOfCandidates') {
lazorfuzz's avatar
lazorfuzz committed
      // Edge requires an end-of-candidates. Since only Edge will have mLines or tracks on the
      // shim this will only be called in Edge.
lazorfuzz's avatar
lazorfuzz committed
      const mLines = this.pc.pc.transceivers || [];
      mLines.forEach((mLine) => {
        if (mLine.iceTransport) {
          mLine.iceTransport.addRemoteCandidate({});
        }
      });
lazorfuzz's avatar
lazorfuzz committed
    } else if (message.type === 'signalData') {
      this.parent.emit('receivedSignalData', message.payload.type, message.payload.payload, self);
lazorfuzz's avatar
lazorfuzz committed
    }
  }

lazorfuzz's avatar
lazorfuzz committed
  // send via signaling channel
lazorfuzz's avatar
lazorfuzz committed
  send(messageType, payload) {
    const message = {
      to: this.id,
      sid: this.sid,
      broadcaster: this.broadcaster,
      roomType: this.type,
      type: messageType,
      payload,
      prefix: webrtcSupport.prefix,
lazorfuzz's avatar
lazorfuzz committed
      timestamp: Date.now()
lazorfuzz's avatar
lazorfuzz committed
    };
    this.logger.log('sending', messageType, message);
    this.parent.emit('message', message);
  }

  // send via data channel
  // returns true when message was sent and false if channel is not open
lazorfuzz's avatar
lazorfuzz committed
  sendDirectly(messageType, payload, channel = 'liowebrtc', shout = false, messageId = `${Date.now()}_${Math.random() * 1000000}`) {
lazorfuzz's avatar
lazorfuzz committed
    const message = {
      type: messageType,
lazorfuzz's avatar
lazorfuzz committed
      _id: messageId,
      shout,
lazorfuzz's avatar
lazorfuzz committed
    };
    this.logger.log('sending via datachannel', channel, messageType, message);
    const dc = this.getDataChannel(channel);
    if (dc.readyState !== 'open') return false;
lazorfuzz's avatar
lazorfuzz committed
    return true;
  }

  // Internal method registering handlers for a data channel and emitting events on the peer
lazorfuzz's avatar
lazorfuzz committed
  _observeDataChannel(channel, peer) {
lazorfuzz's avatar
lazorfuzz committed
    const self = this;
    channel.binaryType = 'arraybuffer';
    channel.onclose = this.emit.bind(this, 'channelClose', channel, peer);
    channel.onerror = this.emit.bind(this, 'channelError', channel, peer);
    channel.onmessage = (event) => {
      self.emit('channelMessage', self, channel.label, decode(MessagePack.Buffer.from(event.data)), channel, event);
lazorfuzz's avatar
lazorfuzz committed
    };
lazorfuzz's avatar
lazorfuzz committed
    channel.onopen = this.emit.bind(this, 'channelOpen', channel, peer);
lazorfuzz's avatar
lazorfuzz committed
  }

  // Fetch or create a data channel by the given name
  getDataChannel(name = 'liowebrtc', opts) {
lazorfuzz's avatar
lazorfuzz committed
    let channel = this.channels[name];
    opts || (opts = {});
    if (channel) return channel;
    // if we don't have one by this label, create it
lazorfuzz's avatar
lazorfuzz committed
    channel = this.channels[name] = this.pc.createDataChannel(name, opts);
    this._observeDataChannel(channel, this);
lazorfuzz's avatar
lazorfuzz committed
    return channel;
  }

  onIceCandidate(candidate) {
    if (this.closed) return;
    if (candidate) {
      const pcConfig = this.parent.config.peerConnectionConfig;
      if (webrtcSupport.prefix === 'moz' && pcConfig && pcConfig.iceTransports &&
                  candidate.candidate && candidate.candidate.candidate &&
                  !candidate.candidate.candidate.includes(pcConfig.iceTransports)) {
        this.logger.log('Ignoring ice candidate not matching pcConfig iceTransports type: ', pcConfig.iceTransports);
      } else {
        this.send('candidate', candidate);
      }
    } else {
      this.logger.log('End of candidates.');
    }
  }

  start() {
    const self = this;

    // well, the webrtc api requires that we either
lazorfuzz's avatar
lazorfuzz committed
    // a) create a datachannel a priori
    // b) do a renegotiation later to add the SCTP m-line
    // Let's do (a) first...
lazorfuzz's avatar
lazorfuzz committed
    if (this.enableDataChannels) {
      this.getDataChannel('liowebrtc');
    }

    this.pc.offer(this.receiveMedia, (err, sessionDescription) => {
      // self.send('offer', sessionDescription);
lazorfuzz's avatar
lazorfuzz committed
    });
  }

  icerestart() {
    const constraints = this.receiveMedia;
    constraints.mandatory.IceRestart = true;
    this.pc.offer(constraints, (err, success) => { });
  }

lazorfuzz's avatar
lazorfuzz committed
  end(emitRemoval = true) {
lazorfuzz's avatar
lazorfuzz committed
    if (this.closed) return;
lazorfuzz's avatar
lazorfuzz committed
    if (emitRemoval) {
      this.parent.emit('removedPeer', this);
    }
    this.pc.close();
    this.handleStreamRemoved(emitRemoval);
lazorfuzz's avatar
lazorfuzz committed
  }

  handleRemoteStreamAdded(event) {
    const self = this;
    if (this.stream) {
      this.logger.warn('Already have a remote stream');
    } else {
      this.stream = event.stream;

      this.stream.getTracks().forEach((track) => {
        track.addEventListener('ended', () => {
          if (isAllTracksEnded(self.stream)) {
            self.end();
          }
        });
      });

      this.parent.emit('peerStreamAdded', this.stream, this);
lazorfuzz's avatar
lazorfuzz committed
    }
  }

  handleStreamRemoved(emitRemoval = true) {
lazorfuzz's avatar
lazorfuzz committed
    const peerIndex = this.parent.peers.indexOf(this);
    if (peerIndex > -1) {
      this.parent.peers.splice(peerIndex, 1);
      this.closed = true;
      if (emitRemoval) this.parent.emit('peerStreamRemoved', this);
lazorfuzz's avatar
lazorfuzz committed
    }
  }

  handleDataChannelAdded(channel) {
    this.channels[channel.label] = channel;
    //this._observeDataChannel(channel, this);
lazorfuzz's avatar
lazorfuzz committed
  }

  sendFile(file) {
    const sender = new FileTransfer.Sender();
    const dc = this.getDataChannel(`filetransfer${(new Date()).getTime()}`, {
      protocol: INBAND_FILETRANSFER_V1,
lazorfuzz's avatar
lazorfuzz committed
    });
      // override onopen
    dc.onopen = () => {
lazorfuzz's avatar
lazorfuzz committed
        size: file.size,
lazorfuzz's avatar
lazorfuzz committed
      sender.send(file, dc);
    };
lazorfuzz's avatar
lazorfuzz committed
    dc.onclose = () => {
lazorfuzz's avatar
lazorfuzz committed
      // ('sender received transfer');
lazorfuzz's avatar
lazorfuzz committed
      sender.emit('complete');
    };
    return sender;
  }

  getStats(selector) {
    // TODO: Use adapter.js to patch this across browsers
    return this.pc.pc.getStats(selector);
  }
lazorfuzz's avatar
lazorfuzz committed
}

export default Peer;