Line data Source code
1 : /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 : /* vim: set ts=2 et sw=2 tw=80: */
3 : /* This Source Code Form is subject to the terms of the Mozilla Public
4 : * License, v. 2.0. If a copy of the MPL was not distributed with this file,
5 : * You can obtain one at http://mozilla.org/MPL/2.0/. */
6 :
7 : #include <algorithm>
8 : #include <stdio.h>
9 : #include <stdlib.h>
10 : #if !defined(__Userspace_os_Windows)
11 : #include <arpa/inet.h>
12 : #endif
13 : // usrsctp.h expects to have errno definitions prior to its inclusion.
14 : #include <errno.h>
15 :
16 : #define SCTP_DEBUG 1
17 : #define SCTP_STDINT_INCLUDE <stdint.h>
18 :
19 : #ifdef _MSC_VER
20 : // Disable "warning C4200: nonstandard extension used : zero-sized array in
21 : // struct/union"
22 : // ...which the third-party file usrsctp.h runs afoul of.
23 : #pragma warning(push)
24 : #pragma warning(disable:4200)
25 : #endif
26 :
27 : #include "usrsctp.h"
28 :
29 : #ifdef _MSC_VER
30 : #pragma warning(pop)
31 : #endif
32 :
33 : #include "DataChannelLog.h"
34 :
35 : #include "nsServiceManagerUtils.h"
36 : #include "nsIObserverService.h"
37 : #include "nsIObserver.h"
38 : #include "mozilla/Services.h"
39 : #include "mozilla/SizePrintfMacros.h"
40 : #include "mozilla/Sprintf.h"
41 : #include "nsProxyRelease.h"
42 : #include "nsThread.h"
43 : #include "nsThreadUtils.h"
44 : #include "nsAutoPtr.h"
45 : #include "nsNetUtil.h"
46 : #include "nsNetCID.h"
47 : #include "mozilla/StaticPtr.h"
48 : #include "mozilla/Unused.h"
49 : #ifdef MOZ_PEERCONNECTION
50 : #include "mtransport/runnable_utils.h"
51 : #endif
52 :
53 : #define DATACHANNEL_LOG(args) LOG(args)
54 : #include "DataChannel.h"
55 : #include "DataChannelProtocol.h"
56 :
57 : // Let us turn on and off important assertions in non-debug builds
58 : #ifdef DEBUG
59 : #define ASSERT_WEBRTC(x) MOZ_ASSERT((x))
60 : #elif defined(MOZ_WEBRTC_ASSERT_ALWAYS)
61 : #define ASSERT_WEBRTC(x) do { if (!(x)) { MOZ_CRASH(); } } while (0)
62 : #endif
63 :
64 : static bool sctp_initialized;
65 :
66 : namespace mozilla {
67 :
68 : LazyLogModule gDataChannelLog("DataChannel");
69 : static LazyLogModule gSCTPLog("SCTP");
70 :
71 : #define SCTP_LOG(args) MOZ_LOG(mozilla::gSCTPLog, mozilla::LogLevel::Debug, args)
72 :
73 : class DataChannelShutdown : public nsIObserver
74 : {
75 : public:
76 : // This needs to be tied to some form object that is guaranteed to be
77 : // around (singleton likely) unless we want to shutdown sctp whenever
78 : // we're not using it (and in which case we'd keep a refcnt'd object
79 : // ref'd by each DataChannelConnection to release the SCTP usrlib via
80 : // sctp_finish). Right now, the single instance of this class is
81 : // owned by the observer service.
82 :
83 : NS_DECL_ISUPPORTS
84 :
85 0 : DataChannelShutdown() {}
86 :
87 0 : void Init()
88 : {
89 : nsCOMPtr<nsIObserverService> observerService =
90 0 : mozilla::services::GetObserverService();
91 0 : if (!observerService)
92 0 : return;
93 :
94 0 : nsresult rv = observerService->AddObserver(this,
95 : "xpcom-will-shutdown",
96 0 : false);
97 0 : MOZ_ASSERT(rv == NS_OK);
98 : (void) rv;
99 : }
100 :
101 : private:
102 : // The only instance of DataChannelShutdown is owned by the observer
103 : // service, so there is no need to call RemoveObserver here.
104 0 : virtual ~DataChannelShutdown() = default;
105 :
106 : public:
107 0 : NS_IMETHOD Observe(nsISupports* aSubject, const char* aTopic,
108 : const char16_t* aData) override {
109 0 : if (strcmp(aTopic, "xpcom-will-shutdown") == 0) {
110 0 : LOG(("Shutting down SCTP"));
111 0 : if (sctp_initialized) {
112 0 : usrsctp_finish();
113 0 : sctp_initialized = false;
114 : }
115 : nsCOMPtr<nsIObserverService> observerService =
116 0 : mozilla::services::GetObserverService();
117 0 : if (!observerService)
118 0 : return NS_ERROR_FAILURE;
119 :
120 0 : nsresult rv = observerService->RemoveObserver(this,
121 0 : "xpcom-will-shutdown");
122 0 : MOZ_ASSERT(rv == NS_OK);
123 : (void) rv;
124 : }
125 0 : return NS_OK;
126 : }
127 : };
128 :
129 0 : NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver);
130 :
131 0 : BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data,
132 0 : size_t length) : mLength(length)
133 : {
134 0 : mSpa = new sctp_sendv_spa;
135 0 : *mSpa = spa;
136 0 : auto *tmp = new char[length]; // infallible malloc!
137 0 : memcpy(tmp, data, length);
138 0 : mData = tmp;
139 0 : }
140 :
141 0 : BufferedMsg::~BufferedMsg()
142 : {
143 0 : delete mSpa;
144 0 : delete mData;
145 0 : }
146 :
147 : static int
148 0 : receive_cb(struct socket* sock, union sctp_sockstore addr,
149 : void *data, size_t datalen,
150 : struct sctp_rcvinfo rcv, int flags, void *ulp_info)
151 : {
152 0 : DataChannelConnection *connection = static_cast<DataChannelConnection*>(ulp_info);
153 0 : return connection->ReceiveCallback(sock, data, datalen, rcv, flags);
154 : }
155 :
156 : static
157 : DataChannelConnection *
158 0 : GetConnectionFromSocket(struct socket* sock)
159 : {
160 0 : struct sockaddr *addrs = nullptr;
161 0 : int naddrs = usrsctp_getladdrs(sock, 0, &addrs);
162 0 : if (naddrs <= 0 || addrs[0].sa_family != AF_CONN) {
163 0 : return nullptr;
164 : }
165 : // usrsctp_getladdrs() returns the addresses bound to this socket, which
166 : // contains the SctpDataMediaChannel* as sconn_addr. Read the pointer,
167 : // then free the list of addresses once we have the pointer. We only open
168 : // AF_CONN sockets, and they should all have the sconn_addr set to the
169 : // pointer that created them, so [0] is as good as any other.
170 0 : struct sockaddr_conn *sconn = reinterpret_cast<struct sockaddr_conn *>(&addrs[0]);
171 : DataChannelConnection *connection =
172 0 : reinterpret_cast<DataChannelConnection *>(sconn->sconn_addr);
173 0 : usrsctp_freeladdrs(addrs);
174 :
175 0 : return connection;
176 : }
177 :
178 : // called when the buffer empties to the threshold value
179 : static int
180 0 : threshold_event(struct socket* sock, uint32_t sb_free)
181 : {
182 0 : DataChannelConnection *connection = GetConnectionFromSocket(sock);
183 0 : if (connection) {
184 0 : LOG(("SendDeferred()"));
185 0 : connection->SendDeferredMessages();
186 : } else {
187 0 : LOG(("Can't find connection for socket %p", sock));
188 : }
189 0 : return 0;
190 : }
191 :
192 : static void
193 0 : debug_printf(const char *format, ...)
194 : {
195 : va_list ap;
196 : char buffer[1024];
197 :
198 0 : if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
199 0 : va_start(ap, format);
200 : #ifdef _WIN32
201 : if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) {
202 : #else
203 0 : if (VsprintfLiteral(buffer, format, ap) > 0) {
204 : #endif
205 0 : SCTP_LOG(("%s", buffer));
206 : }
207 0 : va_end(ap);
208 : }
209 0 : }
210 :
211 0 : DataChannelConnection::DataChannelConnection(DataConnectionListener *listener,
212 0 : nsIEventTarget *aTarget)
213 : : NeckoTargetHolder(aTarget)
214 0 : , mLock("netwerk::sctp::DataChannelConnection")
215 : {
216 0 : mState = CLOSED;
217 0 : mSocket = nullptr;
218 0 : mMasterSocket = nullptr;
219 0 : mListener = listener;
220 0 : mLocalPort = 0;
221 0 : mRemotePort = 0;
222 0 : LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get()));
223 0 : mInternalIOThread = nullptr;
224 0 : }
225 :
226 0 : DataChannelConnection::~DataChannelConnection()
227 : {
228 0 : LOG(("Deleting DataChannelConnection %p", (void *) this));
229 : // This may die on the MainThread, or on the STS thread
230 0 : ASSERT_WEBRTC(mState == CLOSED);
231 0 : MOZ_ASSERT(!mMasterSocket);
232 0 : MOZ_ASSERT(mPending.GetSize() == 0);
233 :
234 : // Already disconnected from sigslot/mTransportFlow
235 : // TransportFlows must be released from the STS thread
236 0 : if (!IsSTSThread()) {
237 0 : ASSERT_WEBRTC(NS_IsMainThread());
238 0 : if (mTransportFlow) {
239 0 : ASSERT_WEBRTC(mSTS);
240 0 : NS_ProxyRelease(
241 0 : "DataChannelConnection::mTransportFlow", mSTS, mTransportFlow.forget());
242 : }
243 :
244 0 : if (mInternalIOThread) {
245 : // Avoid spinning the event thread from here (which if we're mainthread
246 : // is in the event loop already)
247 0 : nsCOMPtr<nsIRunnable> r = WrapRunnable(nsCOMPtr<nsIThread>(mInternalIOThread),
248 0 : &nsIThread::Shutdown);
249 0 : Dispatch(r.forget());
250 : }
251 : } else {
252 : // on STS, safe to call shutdown
253 0 : if (mInternalIOThread) {
254 0 : mInternalIOThread->Shutdown();
255 : }
256 : }
257 0 : }
258 :
259 : void
260 0 : DataChannelConnection::Destroy()
261 : {
262 : // Though it's probably ok to do this and close the sockets;
263 : // if we really want it to do true clean shutdowns it can
264 : // create a dependant Internal object that would remain around
265 : // until the network shut down the association or timed out.
266 0 : LOG(("Destroying DataChannelConnection %p", (void *) this));
267 0 : ASSERT_WEBRTC(NS_IsMainThread());
268 0 : CloseAll();
269 :
270 0 : MutexAutoLock lock(mLock);
271 : // If we had a pending reset, we aren't waiting for it - clear the list so
272 : // we can deregister this DataChannelConnection without leaking.
273 0 : ClearResets();
274 :
275 0 : MOZ_ASSERT(mSTS);
276 0 : ASSERT_WEBRTC(NS_IsMainThread());
277 : // Must do this in Destroy() since we may then delete this object.
278 : // Do this before dispatching to create a consistent ordering of calls to
279 : // the SCTP stack.
280 0 : if (mUsingDtls) {
281 0 : usrsctp_deregister_address(static_cast<void *>(this));
282 0 : LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this)));
283 : }
284 :
285 : // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed,
286 : // the usrsctp_close() calls can move back here (and just proxy the
287 : // disconnect_all())
288 0 : RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
289 : &DataChannelConnection::DestroyOnSTS,
290 : mSocket, mMasterSocket),
291 0 : NS_DISPATCH_NORMAL);
292 :
293 : // These will be released on STS
294 0 : mSocket = nullptr;
295 0 : mMasterSocket = nullptr; // also a flag that we've Destroyed this connection
296 :
297 : // We can't get any more new callbacks from the SCTP library
298 : // All existing callbacks have refs to DataChannelConnection
299 :
300 : // nsDOMDataChannel objects have refs to DataChannels that have refs to us
301 0 : }
302 :
303 0 : void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket,
304 : struct socket *aSocket)
305 : {
306 0 : if (aSocket && aSocket != aMasterSocket)
307 0 : usrsctp_close(aSocket);
308 0 : if (aMasterSocket)
309 0 : usrsctp_close(aMasterSocket);
310 :
311 0 : disconnect_all();
312 0 : }
313 :
314 : bool
315 0 : DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls)
316 : {
317 : struct sctp_initmsg initmsg;
318 : struct sctp_udpencaps encaps;
319 : struct sctp_assoc_value av;
320 : struct sctp_event event;
321 : socklen_t len;
322 :
323 : uint16_t event_types[] = {SCTP_ASSOC_CHANGE,
324 : SCTP_PEER_ADDR_CHANGE,
325 : SCTP_REMOTE_ERROR,
326 : SCTP_SHUTDOWN_EVENT,
327 : SCTP_ADAPTATION_INDICATION,
328 : SCTP_SEND_FAILED_EVENT,
329 : SCTP_STREAM_RESET_EVENT,
330 0 : SCTP_STREAM_CHANGE_EVENT};
331 : {
332 0 : ASSERT_WEBRTC(NS_IsMainThread());
333 :
334 : // MutexAutoLock lock(mLock); Not needed since we're on mainthread always
335 0 : if (!sctp_initialized) {
336 0 : if (aUsingDtls) {
337 0 : LOG(("sctp_init(DTLS)"));
338 : #ifdef MOZ_PEERCONNECTION
339 : usrsctp_init(0,
340 : DataChannelConnection::SctpDtlsOutput,
341 : debug_printf
342 0 : );
343 : #else
344 : NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport");
345 : #endif
346 : } else {
347 0 : LOG(("sctp_init(%u)", aPort));
348 0 : usrsctp_init(aPort,
349 : nullptr,
350 : debug_printf
351 0 : );
352 : }
353 :
354 : // Set logging to SCTP:LogLevel::Debug to get SCTP debugs
355 0 : if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
356 0 : usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
357 : }
358 :
359 0 : usrsctp_sysctl_set_sctp_blackhole(2);
360 : // ECN is currently not supported by the Firefox code
361 0 : usrsctp_sysctl_set_sctp_ecn_enable(0);
362 0 : sctp_initialized = true;
363 :
364 0 : RefPtr<DataChannelShutdown> shutdown = new DataChannelShutdown();
365 0 : shutdown->Init();
366 : }
367 : }
368 :
369 : // XXX FIX! make this a global we get once
370 : // Find the STS thread
371 : nsresult rv;
372 0 : mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
373 0 : MOZ_ASSERT(NS_SUCCEEDED(rv));
374 :
375 : // Open sctp with a callback
376 0 : if ((mMasterSocket = usrsctp_socket(
377 : aUsingDtls ? AF_CONN : AF_INET,
378 : SOCK_STREAM, IPPROTO_SCTP, receive_cb, threshold_event,
379 0 : usrsctp_sysctl_get_sctp_sendspace() / 2, this)) == nullptr) {
380 0 : return false;
381 : }
382 :
383 : // Make non-blocking for bind/connect. SCTP over UDP defaults to non-blocking
384 : // in associations for normal IO
385 0 : if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) {
386 0 : LOG(("Couldn't set non_blocking on SCTP socket"));
387 : // We can't handle connect() safely if it will block, not that this will
388 : // even happen.
389 0 : goto error_cleanup;
390 : }
391 :
392 : // Make sure when we close the socket, make sure it doesn't call us back again!
393 : // This would cause it try to use an invalid DataChannelConnection pointer
394 : struct linger l;
395 0 : l.l_onoff = 1;
396 0 : l.l_linger = 0;
397 0 : if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER,
398 : (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
399 0 : LOG(("Couldn't set SO_LINGER on SCTP socket"));
400 : // unsafe to allow it to continue if this fails
401 0 : goto error_cleanup;
402 : }
403 :
404 : // XXX Consider disabling this when we add proper SDP negotiation.
405 : // We may want to leave enabled for supporting 'cloning' of SDP offers, which
406 : // implies re-use of the same pseudo-port number, or forcing a renegotiation.
407 : {
408 0 : uint32_t on = 1;
409 0 : if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT,
410 : (const void *)&on, (socklen_t)sizeof(on)) < 0) {
411 0 : LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket"));
412 : }
413 0 : if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY,
414 : (const void *)&on, (socklen_t)sizeof(on)) < 0) {
415 0 : LOG(("Couldn't set SCTP_NODELAY on SCTP socket"));
416 : }
417 : }
418 :
419 0 : if (!aUsingDtls) {
420 0 : memset(&encaps, 0, sizeof(encaps));
421 0 : encaps.sue_address.ss_family = AF_INET;
422 0 : encaps.sue_port = htons(aPort);
423 0 : if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REMOTE_UDP_ENCAPS_PORT,
424 : (const void*)&encaps,
425 : (socklen_t)sizeof(struct sctp_udpencaps)) < 0) {
426 0 : LOG(("*** failed encaps errno %d", errno));
427 0 : goto error_cleanup;
428 : }
429 0 : LOG(("SCTP encapsulation local port %d", aPort));
430 : }
431 :
432 0 : av.assoc_id = SCTP_ALL_ASSOC;
433 0 : av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
434 0 : if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av,
435 : (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
436 0 : LOG(("*** failed enable stream reset errno %d", errno));
437 0 : goto error_cleanup;
438 : }
439 :
440 : /* Enable the events of interest. */
441 0 : memset(&event, 0, sizeof(event));
442 0 : event.se_assoc_id = SCTP_ALL_ASSOC;
443 0 : event.se_on = 1;
444 0 : for (unsigned short event_type : event_types) {
445 0 : event.se_type = event_type;
446 0 : if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) {
447 0 : LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno));
448 0 : goto error_cleanup;
449 : }
450 : }
451 :
452 : // Update number of streams
453 0 : mStreams.AppendElements(aNumStreams);
454 0 : for (uint32_t i = 0; i < aNumStreams; ++i) {
455 0 : mStreams[i] = nullptr;
456 : }
457 0 : memset(&initmsg, 0, sizeof(initmsg));
458 0 : len = sizeof(initmsg);
459 0 : if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
460 0 : LOG(("*** failed getsockopt SCTP_INITMSG"));
461 0 : goto error_cleanup;
462 : }
463 0 : LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams,
464 : initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
465 0 : initmsg.sinit_num_ostreams = aNumStreams;
466 0 : initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
467 0 : if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
468 : (socklen_t)sizeof(initmsg)) < 0) {
469 0 : LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
470 0 : goto error_cleanup;
471 : }
472 :
473 0 : mSocket = nullptr;
474 0 : if (aUsingDtls) {
475 0 : mUsingDtls = true;
476 0 : usrsctp_register_address(static_cast<void *>(this));
477 0 : LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this)));
478 : } else {
479 0 : mUsingDtls = false;
480 : }
481 0 : return true;
482 :
483 : error_cleanup:
484 0 : usrsctp_close(mMasterSocket);
485 0 : mMasterSocket = nullptr;
486 0 : mUsingDtls = false;
487 0 : return false;
488 : }
489 :
490 : #ifdef MOZ_PEERCONNECTION
491 : void
492 0 : DataChannelConnection::SetEvenOdd()
493 : {
494 0 : ASSERT_WEBRTC(IsSTSThread());
495 :
496 : TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
497 0 : mTransportFlow->GetLayer(TransportLayerDtls::ID()));
498 0 : MOZ_ASSERT(dtls); // DTLS is mandatory
499 0 : mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT);
500 0 : }
501 :
502 : bool
503 0 : DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
504 : {
505 0 : LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
506 :
507 0 : NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!");
508 0 : NS_ENSURE_TRUE(aFlow, false);
509 :
510 0 : mTransportFlow = aFlow;
511 0 : mLocalPort = localport;
512 0 : mRemotePort = remoteport;
513 0 : mState = CONNECTING;
514 :
515 0 : RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
516 : &DataChannelConnection::SetSignals),
517 0 : NS_DISPATCH_NORMAL);
518 0 : return true;
519 : }
520 :
521 : void
522 0 : DataChannelConnection::SetSignals()
523 : {
524 0 : ASSERT_WEBRTC(IsSTSThread());
525 0 : ASSERT_WEBRTC(mTransportFlow);
526 0 : LOG(("Setting transport signals, state: %d", mTransportFlow->state()));
527 0 : mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput);
528 : // SignalStateChange() doesn't call you with the initial state
529 0 : mTransportFlow->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect);
530 0 : CompleteConnect(mTransportFlow, mTransportFlow->state());
531 0 : }
532 :
533 : void
534 0 : DataChannelConnection::CompleteConnect(TransportFlow *flow, TransportLayer::State state)
535 : {
536 0 : LOG(("Data transport state: %d", state));
537 0 : MutexAutoLock lock(mLock);
538 0 : ASSERT_WEBRTC(IsSTSThread());
539 : // We should abort connection on TS_ERROR.
540 : // Note however that the association will also fail (perhaps with a delay) and
541 : // notify us in that way
542 0 : if (state != TransportLayer::TS_OPEN || !mMasterSocket)
543 0 : return;
544 :
545 : struct sockaddr_conn addr;
546 0 : memset(&addr, 0, sizeof(addr));
547 0 : addr.sconn_family = AF_CONN;
548 : #if defined(__Userspace_os_Darwin)
549 : addr.sconn_len = sizeof(addr);
550 : #endif
551 0 : addr.sconn_port = htons(mLocalPort);
552 0 : addr.sconn_addr = static_cast<void *>(this);
553 :
554 0 : LOG(("Calling usrsctp_bind"));
555 0 : int r = usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
556 0 : sizeof(addr));
557 0 : if (r < 0) {
558 0 : LOG(("usrsctp_bind failed: %d", r));
559 : } else {
560 : // This is the remote addr
561 0 : addr.sconn_port = htons(mRemotePort);
562 0 : LOG(("Calling usrsctp_connect"));
563 0 : r = usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
564 0 : sizeof(addr));
565 0 : if (r >= 0 || errno == EINPROGRESS) {
566 : struct sctp_paddrparams paddrparams;
567 : socklen_t opt_len;
568 :
569 0 : memset(&paddrparams, 0, sizeof(struct sctp_paddrparams));
570 0 : memcpy(&paddrparams.spp_address, &addr, sizeof(struct sockaddr_conn));
571 0 : opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
572 0 : r = usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
573 0 : &paddrparams, &opt_len);
574 0 : if (r < 0) {
575 0 : LOG(("usrsctp_getsockopt failed: %d", r));
576 : } else {
577 : // draft-ietf-rtcweb-data-channel-13 section 5: max initial MTU IPV4 1200, IPV6 1280
578 0 : paddrparams.spp_pathmtu = 1200; // safe for either
579 0 : paddrparams.spp_flags &= ~SPP_PMTUD_ENABLE;
580 0 : paddrparams.spp_flags |= SPP_PMTUD_DISABLE;
581 0 : opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
582 0 : r = usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
583 0 : &paddrparams, opt_len);
584 0 : if (r < 0) {
585 0 : LOG(("usrsctp_getsockopt failed: %d", r));
586 : } else {
587 0 : LOG(("usrsctp: PMTUD disabled, MTU set to %u", paddrparams.spp_pathmtu));
588 : }
589 : }
590 : }
591 0 : if (r < 0) {
592 0 : if (errno == EINPROGRESS) {
593 : // non-blocking
594 0 : return;
595 : }
596 0 : LOG(("usrsctp_connect failed: %d", errno));
597 0 : mState = CLOSED;
598 : } else {
599 : // We set Even/Odd and fire ON_CONNECTION via SCTP_COMM_UP when we get that
600 : // This also avoids issues with calling TransportFlow stuff on Mainthread
601 0 : return;
602 : }
603 : }
604 : // Note: currently this doesn't actually notify the application
605 0 : Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
606 : DataChannelOnMessageAvailable::ON_CONNECTION,
607 0 : this)));
608 0 : return;
609 : }
610 :
611 : // Process any pending Opens
612 : void
613 0 : DataChannelConnection::ProcessQueuedOpens()
614 : {
615 : // The nsDeque holds channels with an AddRef applied. Another reference
616 : // (may) be held by the DOMDataChannel, unless it's been GC'd. No other
617 : // references should exist.
618 :
619 : // Can't copy nsDeque's. Move into temp array since any that fail will
620 : // go back to mPending
621 0 : nsDeque temp;
622 : DataChannel *temp_channel; // really already_AddRefed<>
623 0 : while (nullptr != (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) {
624 0 : temp.Push(static_cast<void *>(temp_channel));
625 : }
626 :
627 0 : RefPtr<DataChannel> channel;
628 : // All these entries have an AddRef(); make that explicit now via the dont_AddRef()
629 0 : while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) {
630 0 : if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
631 0 : LOG(("Processing queued open for %p (%u)", channel.get(), channel->mStream));
632 0 : channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
633 : // OpenFinish returns a reference itself, so we need to take it can Release it
634 0 : channel = OpenFinish(channel.forget()); // may reset the flag and re-push
635 : } else {
636 0 : NS_ASSERTION(false, "How did a DataChannel get queued without the FINISH_OPEN flag?");
637 : }
638 : }
639 :
640 0 : }
641 : void
642 0 : DataChannelConnection::SctpDtlsInput(TransportFlow *flow,
643 : const unsigned char *data, size_t len)
644 : {
645 0 : if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
646 : char *buf;
647 :
648 0 : if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) != nullptr) {
649 0 : SCTP_LOG(("%s", buf));
650 0 : usrsctp_freedumpbuffer(buf);
651 : }
652 : }
653 : // Pass the data to SCTP
654 0 : usrsctp_conninput(static_cast<void *>(this), data, len, 0);
655 0 : }
656 :
657 : int
658 0 : DataChannelConnection::SendPacket(unsigned char data[], size_t len, bool release)
659 : {
660 : //LOG(("%p: SCTP/DTLS sent %ld bytes", this, len));
661 0 : int res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0;
662 0 : if (release)
663 0 : delete [] data;
664 0 : return res;
665 : }
666 :
667 : /* static */
668 : int
669 0 : DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length,
670 : uint8_t tos, uint8_t set_df)
671 : {
672 0 : DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr);
673 : int res;
674 :
675 0 : if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
676 : char *buf;
677 :
678 0 : if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) != nullptr) {
679 0 : SCTP_LOG(("%s", buf));
680 0 : usrsctp_freedumpbuffer(buf);
681 : }
682 : }
683 : // We're async proxying even if on the STSThread because this is called
684 : // with internal SCTP locks held in some cases (such as in usrsctp_connect()).
685 : // SCTP has an option for Apple, on IP connections only, to release at least
686 : // one of the locks before calling a packet output routine; with changes to
687 : // the underlying SCTP stack this might remove the need to use an async proxy.
688 : if ((false /*peer->IsSTSThread()*/)) {
689 : res = peer->SendPacket(static_cast<unsigned char *>(buffer), length, false);
690 : } else {
691 0 : auto *data = new unsigned char[length];
692 0 : memcpy(data, buffer, length);
693 : // Commented out since we have to Dispatch SendPacket to avoid deadlock"
694 : // res = -1;
695 :
696 : // XXX It might be worthwhile to add an assertion against the thread
697 : // somehow getting into the DataChannel/SCTP code again, as
698 : // DISPATCH_SYNC is not fully blocking. This may be tricky, as it
699 : // needs to be a per-thread check, not a global.
700 0 : peer->mSTS->Dispatch(WrapRunnable(
701 0 : RefPtr<DataChannelConnection>(peer),
702 : &DataChannelConnection::SendPacket, data, length, true),
703 0 : NS_DISPATCH_NORMAL);
704 0 : res = 0; // cheat! Packets can always be dropped later anyways
705 : }
706 0 : return res;
707 : }
708 : #endif
709 :
710 : #ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
711 : // listen for incoming associations
712 : // Blocks! - Don't call this from main thread!
713 :
714 : #error This code will not work as-is since SetEvenOdd() runs on Mainthread
715 :
716 : bool
717 : DataChannelConnection::Listen(unsigned short port)
718 : {
719 : struct sockaddr_in addr;
720 : socklen_t addr_len;
721 :
722 : NS_WARNING_ASSERTION(!NS_IsMainThread(),
723 : "Blocks, do not call from main thread!!!");
724 :
725 : /* Acting as the 'server' */
726 : memset((void *)&addr, 0, sizeof(addr));
727 : #ifdef HAVE_SIN_LEN
728 : addr.sin_len = sizeof(struct sockaddr_in);
729 : #endif
730 : addr.sin_family = AF_INET;
731 : addr.sin_port = htons(port);
732 : addr.sin_addr.s_addr = htonl(INADDR_ANY);
733 : LOG(("Waiting for connections on port %u", ntohs(addr.sin_port)));
734 : mState = CONNECTING;
735 : if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(struct sockaddr_in)) < 0) {
736 : LOG(("***Failed userspace_bind"));
737 : return false;
738 : }
739 : if (usrsctp_listen(mMasterSocket, 1) < 0) {
740 : LOG(("***Failed userspace_listen"));
741 : return false;
742 : }
743 :
744 : LOG(("Accepting connection"));
745 : addr_len = 0;
746 : if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == nullptr) {
747 : LOG(("***Failed accept"));
748 : return false;
749 : }
750 : mState = OPEN;
751 :
752 : struct linger l;
753 : l.l_onoff = 1;
754 : l.l_linger = 0;
755 : if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER,
756 : (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
757 : LOG(("Couldn't set SO_LINGER on SCTP socket"));
758 : }
759 :
760 : SetEvenOdd();
761 :
762 : // Notify Connection open
763 : // XXX We need to make sure connection sticks around until the message is delivered
764 : LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
765 : Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
766 : DataChannelOnMessageAvailable::ON_CONNECTION,
767 : this, (DataChannel *) nullptr)));
768 : return true;
769 : }
770 :
771 : // Blocks! - Don't call this from main thread!
772 : bool
773 : DataChannelConnection::Connect(const char *addr, unsigned short port)
774 : {
775 : struct sockaddr_in addr4;
776 : struct sockaddr_in6 addr6;
777 :
778 : NS_WARNING_ASSERTION(!NS_IsMainThread(),
779 : "Blocks, do not call from main thread!!!");
780 :
781 : /* Acting as the connector */
782 : LOG(("Connecting to %s, port %u", addr, port));
783 : memset((void *)&addr4, 0, sizeof(struct sockaddr_in));
784 : memset((void *)&addr6, 0, sizeof(struct sockaddr_in6));
785 : #ifdef HAVE_SIN_LEN
786 : addr4.sin_len = sizeof(struct sockaddr_in);
787 : #endif
788 : #ifdef HAVE_SIN6_LEN
789 : addr6.sin6_len = sizeof(struct sockaddr_in6);
790 : #endif
791 : addr4.sin_family = AF_INET;
792 : addr6.sin6_family = AF_INET6;
793 : addr4.sin_port = htons(port);
794 : addr6.sin6_port = htons(port);
795 : mState = CONNECTING;
796 :
797 : #if !defined(__Userspace_os_Windows)
798 : if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) {
799 : if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
800 : LOG(("*** Failed userspace_connect"));
801 : return false;
802 : }
803 : } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) {
804 : if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
805 : LOG(("*** Failed userspace_connect"));
806 : return false;
807 : }
808 : } else {
809 : LOG(("*** Illegal destination address."));
810 : }
811 : #else
812 : {
813 : struct sockaddr_storage ss;
814 : int sslen = sizeof(ss);
815 :
816 : if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET6, nullptr, (struct sockaddr*)&ss, &sslen)) {
817 : addr6.sin6_addr = (reinterpret_cast<struct sockaddr_in6 *>(&ss))->sin6_addr;
818 : if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
819 : LOG(("*** Failed userspace_connect"));
820 : return false;
821 : }
822 : } else if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET, nullptr, (struct sockaddr*)&ss, &sslen)) {
823 : addr4.sin_addr = (reinterpret_cast<struct sockaddr_in *>(&ss))->sin_addr;
824 : if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
825 : LOG(("*** Failed userspace_connect"));
826 : return false;
827 : }
828 : } else {
829 : LOG(("*** Illegal destination address."));
830 : }
831 : }
832 : #endif
833 :
834 : mSocket = mMasterSocket;
835 :
836 : LOG(("connect() succeeded! Entering connected mode"));
837 : mState = OPEN;
838 :
839 : SetEvenOdd();
840 :
841 : // Notify Connection open
842 : // XXX We need to make sure connection sticks around until the message is delivered
843 : LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
844 : Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
845 : DataChannelOnMessageAvailable::ON_CONNECTION,
846 : this, (DataChannel *) nullptr)));
847 : return true;
848 : }
849 : #endif
850 :
851 : DataChannel *
852 0 : DataChannelConnection::FindChannelByStream(uint16_t stream)
853 : {
854 0 : return mStreams.SafeElementAt(stream);
855 : }
856 :
857 : uint16_t
858 0 : DataChannelConnection::FindFreeStream()
859 : {
860 : uint32_t i, j, limit;
861 :
862 0 : limit = mStreams.Length();
863 0 : if (limit > MAX_NUM_STREAMS)
864 0 : limit = MAX_NUM_STREAMS;
865 :
866 0 : for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) {
867 0 : if (!mStreams[i]) {
868 : // Verify it's not still in the process of closing
869 0 : for (j = 0; j < mStreamsResetting.Length(); ++j) {
870 0 : if (mStreamsResetting[j] == i) {
871 0 : break;
872 : }
873 : }
874 0 : if (j == mStreamsResetting.Length())
875 0 : break;
876 : }
877 : }
878 0 : if (i >= limit) {
879 0 : return INVALID_STREAM;
880 : }
881 0 : return i;
882 : }
883 :
884 : bool
885 0 : DataChannelConnection::RequestMoreStreams(int32_t aNeeded)
886 : {
887 : struct sctp_status status;
888 : struct sctp_add_streams sas;
889 : uint32_t outStreamsNeeded;
890 : socklen_t len;
891 :
892 0 : if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) {
893 0 : aNeeded = MAX_NUM_STREAMS - mStreams.Length();
894 : }
895 0 : if (aNeeded <= 0) {
896 0 : return false;
897 : }
898 :
899 0 : len = (socklen_t)sizeof(struct sctp_status);
900 0 : if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) {
901 0 : LOG(("***failed: getsockopt SCTP_STATUS"));
902 0 : return false;
903 : }
904 0 : outStreamsNeeded = aNeeded; // number to add
905 :
906 : // Note: if multiple channel opens happen when we don't have enough space,
907 : // we'll call RequestMoreStreams() multiple times
908 0 : memset(&sas, 0, sizeof(sas));
909 0 : sas.sas_instrms = 0;
910 0 : sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */
911 : // Doesn't block, we get an event when it succeeds or fails
912 0 : if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas,
913 : (socklen_t) sizeof(struct sctp_add_streams)) < 0) {
914 0 : if (errno == EALREADY) {
915 0 : LOG(("Already have %u output streams", outStreamsNeeded));
916 0 : return true;
917 : }
918 :
919 0 : LOG(("***failed: setsockopt ADD errno=%d", errno));
920 0 : return false;
921 : }
922 0 : LOG(("Requested %u more streams", outStreamsNeeded));
923 : // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the
924 : // values are larger than mStreams.Length()
925 0 : return true;
926 : }
927 :
928 : int32_t
929 0 : DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stream)
930 : {
931 : struct sctp_sndinfo sndinfo;
932 :
933 : // Note: Main-thread IO, but doesn't block
934 0 : memset(&sndinfo, 0, sizeof(struct sctp_sndinfo));
935 0 : sndinfo.snd_sid = stream;
936 0 : sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
937 0 : if (usrsctp_sendv(mSocket, msg, len, nullptr, 0,
938 : &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo),
939 : SCTP_SENDV_SNDINFO, 0) < 0) {
940 : //LOG(("***failed: sctp_sendv")); don't log because errno is a return!
941 0 : return (0);
942 : }
943 0 : return (1);
944 : }
945 :
946 : int32_t
947 0 : DataChannelConnection::SendOpenAckMessage(uint16_t stream)
948 : {
949 : struct rtcweb_datachannel_ack ack;
950 :
951 0 : memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
952 0 : ack.msg_type = DATA_CHANNEL_ACK;
953 :
954 0 : return SendControlMessage(&ack, sizeof(ack), stream);
955 : }
956 :
957 : int32_t
958 0 : DataChannelConnection::SendOpenRequestMessage(const nsACString& label,
959 : const nsACString& protocol,
960 : uint16_t stream, bool unordered,
961 : uint16_t prPolicy, uint32_t prValue)
962 : {
963 0 : const int label_len = label.Length(); // not including nul
964 0 : const int proto_len = protocol.Length(); // not including nul
965 : // careful - request struct include one char for the label
966 : const int req_size = sizeof(struct rtcweb_datachannel_open_request) - 1 +
967 0 : label_len + proto_len;
968 : struct rtcweb_datachannel_open_request *req =
969 0 : (struct rtcweb_datachannel_open_request*) moz_xmalloc(req_size);
970 :
971 0 : memset(req, 0, req_size);
972 0 : req->msg_type = DATA_CHANNEL_OPEN_REQUEST;
973 0 : switch (prPolicy) {
974 : case SCTP_PR_SCTP_NONE:
975 0 : req->channel_type = DATA_CHANNEL_RELIABLE;
976 0 : break;
977 : case SCTP_PR_SCTP_TTL:
978 0 : req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
979 0 : break;
980 : case SCTP_PR_SCTP_RTX:
981 0 : req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
982 0 : break;
983 : default:
984 : // FIX! need to set errno! Or make all these SendXxxx() funcs return 0 or errno!
985 0 : free(req);
986 0 : return (0);
987 : }
988 0 : if (unordered) {
989 : // Per the current types, all differ by 0x80 between ordered and unordered
990 0 : req->channel_type |= 0x80; // NOTE: be careful if new types are added in the future
991 : }
992 :
993 0 : req->reliability_param = htonl(prValue);
994 0 : req->priority = htons(0); /* XXX: add support */
995 0 : req->label_length = htons(label_len);
996 0 : req->protocol_length = htons(proto_len);
997 0 : memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len);
998 0 : memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len);
999 :
1000 0 : int32_t result = SendControlMessage(req, req_size, stream);
1001 :
1002 0 : free(req);
1003 0 : return result;
1004 : }
1005 :
1006 : // XXX This should use a separate thread (outbound queue) which should
1007 : // select() to know when to *try* to send data to the socket again.
1008 : // Alternatively, it can use a timeout, but that's guaranteed to be wrong
1009 : // (just not sure in what direction). We could re-implement NSPR's
1010 : // PR_POLL_WRITE/etc handling... with a lot of work.
1011 :
1012 : // Better yet, use the SCTP stack's notifications on buffer state to avoid
1013 : // filling the SCTP's buffers.
1014 :
1015 : // returns if we're still blocked or not
1016 : bool
1017 0 : DataChannelConnection::SendDeferredMessages()
1018 : {
1019 : uint32_t i;
1020 0 : RefPtr<DataChannel> channel; // we may null out the refs to this
1021 0 : bool still_blocked = false;
1022 :
1023 : // This may block while something is modifying channels, but should not block for IO
1024 0 : MutexAutoLock lock(mLock);
1025 :
1026 : // XXX For total fairness, on a still_blocked we'd start next time at the
1027 : // same index. Sorry, not going to bother for now.
1028 0 : for (i = 0; i < mStreams.Length(); ++i) {
1029 0 : channel = mStreams[i];
1030 0 : if (!channel)
1031 0 : continue;
1032 :
1033 : // Only one of these should be set....
1034 0 : if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) {
1035 0 : if (SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
1036 0 : channel->mStream,
1037 0 : channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED,
1038 0 : channel->mPrPolicy, channel->mPrValue)) {
1039 0 : channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ;
1040 :
1041 0 : channel->mState = OPEN;
1042 0 : channel->mReady = true;
1043 0 : LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
1044 0 : Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1045 : DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
1046 0 : channel)));
1047 : } else {
1048 0 : if (errno == EAGAIN || errno == EWOULDBLOCK) {
1049 0 : still_blocked = true;
1050 : } else {
1051 : // Close the channel, inform the user
1052 0 : mStreams[channel->mStream] = nullptr;
1053 0 : channel->mState = CLOSED;
1054 : // Don't need to reset; we didn't open it
1055 0 : Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1056 : DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
1057 0 : channel)));
1058 : }
1059 : }
1060 : }
1061 0 : if (still_blocked)
1062 0 : break;
1063 :
1064 0 : if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) {
1065 0 : if (SendOpenAckMessage(channel->mStream)) {
1066 0 : channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK;
1067 : } else {
1068 0 : if (errno == EAGAIN || errno == EWOULDBLOCK) {
1069 0 : still_blocked = true;
1070 : } else {
1071 : // Close the channel, inform the user
1072 0 : CloseInt(channel);
1073 : // XXX send error via DataChannelOnMessageAvailable (bug 843625)
1074 : }
1075 : }
1076 : }
1077 0 : if (still_blocked)
1078 0 : break;
1079 :
1080 0 : if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) {
1081 0 : bool failed_send = false;
1082 : int32_t result;
1083 :
1084 0 : if (channel->mState == CLOSED || channel->mState == CLOSING) {
1085 0 : channel->mBufferedData.Clear();
1086 : }
1087 :
1088 0 : uint32_t buffered_amount = channel->GetBufferedAmountLocked();
1089 0 : uint32_t threshold = channel->GetBufferedAmountLowThreshold();
1090 0 : bool was_over_threshold = buffered_amount >= threshold;
1091 :
1092 0 : while (!channel->mBufferedData.IsEmpty() &&
1093 0 : !failed_send) {
1094 0 : struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa;
1095 0 : const char *data = channel->mBufferedData[0]->mData;
1096 0 : size_t len = channel->mBufferedData[0]->mLength;
1097 :
1098 : // SCTP will return EMSGSIZE if the message is bigger than the buffer
1099 : // size (or EAGAIN if there isn't space)
1100 0 : if ((result = usrsctp_sendv(mSocket, data, len,
1101 : nullptr, 0,
1102 : (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa),
1103 : SCTP_SENDV_SPA,
1104 : 0)) < 0) {
1105 0 : if (errno == EAGAIN || errno == EWOULDBLOCK) {
1106 : // leave queued for resend
1107 0 : failed_send = true;
1108 0 : LOG(("queue full again when resending %" PRIuSIZE " bytes (%d)", len, result));
1109 : } else {
1110 0 : LOG(("error %d re-sending string", errno));
1111 0 : failed_send = true;
1112 : }
1113 : } else {
1114 0 : LOG(("Resent buffer of %" PRIuSIZE " bytes (%d)", len, result));
1115 : // In theory this could underflow if >4GB was buffered and re
1116 : // truncated in GetBufferedAmount(), but this won't cause any problems.
1117 0 : buffered_amount -= channel->mBufferedData[0]->mLength;
1118 0 : channel->mBufferedData.RemoveElementAt(0);
1119 : // can never fire with default threshold of 0
1120 0 : if (was_over_threshold && buffered_amount < threshold) {
1121 0 : LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
1122 : channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
1123 0 : Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1124 : DataChannelOnMessageAvailable::BUFFER_LOW_THRESHOLD,
1125 0 : this, channel)));
1126 0 : was_over_threshold = false;
1127 : }
1128 0 : if (buffered_amount == 0) {
1129 : // buffered-to-not-buffered transition; tell the DOM code in case this makes it
1130 : // available for GC
1131 0 : LOG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u", __FUNCTION__,
1132 : channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
1133 0 : Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1134 : DataChannelOnMessageAvailable::NO_LONGER_BUFFERED,
1135 0 : this, channel)));
1136 : }
1137 : }
1138 : }
1139 0 : if (channel->mBufferedData.IsEmpty())
1140 0 : channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA;
1141 : else
1142 0 : still_blocked = true;
1143 : }
1144 0 : if (still_blocked)
1145 0 : break;
1146 : }
1147 :
1148 0 : return still_blocked;
1149 : }
1150 :
1151 : void
1152 0 : DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
1153 : size_t length,
1154 : uint16_t stream)
1155 : {
1156 0 : RefPtr<DataChannel> channel;
1157 : uint32_t prValue;
1158 : uint16_t prPolicy;
1159 : uint32_t flags;
1160 :
1161 0 : mLock.AssertCurrentThreadOwns();
1162 :
1163 0 : if (length != (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) {
1164 0 : LOG(("%s: Inconsistent length: %" PRIuSIZE ", should be %" PRIuSIZE, __FUNCTION__, length,
1165 : (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)));
1166 0 : if (length < (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length))
1167 0 : return;
1168 : }
1169 :
1170 0 : LOG(("%s: length %" PRIuSIZE ", sizeof(*req) = %" PRIuSIZE, __FUNCTION__, length, sizeof(*req)));
1171 :
1172 0 : switch (req->channel_type) {
1173 : case DATA_CHANNEL_RELIABLE:
1174 : case DATA_CHANNEL_RELIABLE_UNORDERED:
1175 0 : prPolicy = SCTP_PR_SCTP_NONE;
1176 0 : break;
1177 : case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
1178 : case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
1179 0 : prPolicy = SCTP_PR_SCTP_RTX;
1180 0 : break;
1181 : case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
1182 : case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED:
1183 0 : prPolicy = SCTP_PR_SCTP_TTL;
1184 0 : break;
1185 : default:
1186 0 : LOG(("Unknown channel type %d", req->channel_type));
1187 : /* XXX error handling */
1188 0 : return;
1189 : }
1190 0 : prValue = ntohl(req->reliability_param);
1191 0 : flags = (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
1192 :
1193 0 : if ((channel = FindChannelByStream(stream))) {
1194 0 : if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
1195 0 : LOG(("ERROR: HandleOpenRequestMessage: channel for stream %u is in state %d instead of CLOSED.",
1196 : stream, channel->mState));
1197 : /* XXX: some error handling */
1198 : } else {
1199 0 : LOG(("Open for externally negotiated channel %u", stream));
1200 : // XXX should also check protocol, maybe label
1201 0 : if (prPolicy != channel->mPrPolicy ||
1202 0 : prValue != channel->mPrValue ||
1203 0 : flags != (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED))
1204 : {
1205 0 : LOG(("WARNING: external negotiation mismatch with OpenRequest:"
1206 : "channel %u, policy %u/%u, value %u/%u, flags %x/%x",
1207 : stream, prPolicy, channel->mPrPolicy,
1208 : prValue, channel->mPrValue, flags, channel->mFlags));
1209 : }
1210 : }
1211 0 : return;
1212 : }
1213 0 : if (stream >= mStreams.Length()) {
1214 0 : LOG(("%s: stream %u out of bounds (%" PRIuSIZE ")", __FUNCTION__, stream, mStreams.Length()));
1215 0 : return;
1216 : }
1217 :
1218 0 : nsCString label(nsDependentCSubstring(&req->label[0], ntohs(req->label_length)));
1219 0 : nsCString protocol(nsDependentCSubstring(&req->label[ntohs(req->label_length)],
1220 0 : ntohs(req->protocol_length)));
1221 :
1222 : channel = new DataChannel(this,
1223 : stream,
1224 : DataChannel::CONNECTING,
1225 : label,
1226 : protocol,
1227 : prPolicy, prValue,
1228 : flags,
1229 0 : nullptr, nullptr);
1230 0 : mStreams[stream] = channel;
1231 :
1232 0 : channel->mState = DataChannel::WAITING_TO_OPEN;
1233 :
1234 0 : LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__,
1235 : channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState));
1236 0 : Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1237 : DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
1238 0 : this, channel)));
1239 :
1240 0 : LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
1241 :
1242 0 : if (!SendOpenAckMessage(stream)) {
1243 : // XXX Only on EAGAIN!? And if not, then close the channel??
1244 0 : channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK;
1245 : // Note: we're locked, so there's no danger of a race with the
1246 : // buffer-threshold callback
1247 : }
1248 :
1249 : // Now process any queued data messages for the channel (which will
1250 : // themselves likely get queued until we leave WAITING_TO_OPEN, plus any
1251 : // more that come in before that happens)
1252 0 : DeliverQueuedData(stream);
1253 : }
1254 :
1255 : // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
1256 : // That would make this code moot. Keep it for now for backwards compatibility.
1257 : void
1258 0 : DataChannelConnection::DeliverQueuedData(uint16_t stream)
1259 : {
1260 0 : mLock.AssertCurrentThreadOwns();
1261 :
1262 0 : uint32_t i = 0;
1263 0 : while (i < mQueuedData.Length()) {
1264 : // Careful! we may modify the array length from within the loop!
1265 0 : if (mQueuedData[i]->mStream == stream) {
1266 0 : LOG(("Delivering queued data for stream %u, length %u",
1267 : stream, (unsigned int) mQueuedData[i]->mLength));
1268 : // Deliver the queued data
1269 0 : HandleDataMessage(mQueuedData[i]->mPpid,
1270 0 : mQueuedData[i]->mData, mQueuedData[i]->mLength,
1271 0 : mQueuedData[i]->mStream);
1272 0 : mQueuedData.RemoveElementAt(i);
1273 0 : continue; // don't bump index since we removed the element
1274 : }
1275 0 : i++;
1276 : }
1277 0 : }
1278 :
1279 : void
1280 0 : DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
1281 : size_t length, uint16_t stream)
1282 : {
1283 : DataChannel *channel;
1284 :
1285 0 : mLock.AssertCurrentThreadOwns();
1286 :
1287 0 : channel = FindChannelByStream(stream);
1288 0 : NS_ENSURE_TRUE_VOID(channel);
1289 :
1290 0 : LOG(("OpenAck received for stream %u, waiting=%d", stream,
1291 : (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
1292 :
1293 0 : channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
1294 : }
1295 :
1296 : void
1297 0 : DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream)
1298 : {
1299 : /* XXX: Send an error message? */
1300 0 : LOG(("unknown DataChannel message received: %u, len %" PRIuSIZE " on stream %d", ppid, length, stream));
1301 : // XXX Log to JS error console if possible
1302 0 : }
1303 :
1304 : void
1305 0 : DataChannelConnection::HandleDataMessage(uint32_t ppid,
1306 : const void *data, size_t length,
1307 : uint16_t stream)
1308 : {
1309 : DataChannel *channel;
1310 0 : const char *buffer = (const char *) data;
1311 :
1312 0 : mLock.AssertCurrentThreadOwns();
1313 :
1314 0 : channel = FindChannelByStream(stream);
1315 :
1316 : // XXX A closed channel may trip this... check
1317 : // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
1318 : // That would make this code moot. Keep it for now for backwards compatibility.
1319 0 : if (!channel) {
1320 : // In the updated 0-RTT open case, the sender can send data immediately
1321 : // after Open, and doesn't set the in-order bit (since we don't have a
1322 : // response or ack). Also, with external negotiation, data can come in
1323 : // before we're told about the external negotiation. We need to buffer
1324 : // data until either a) Open comes in, if the ordering get messed up,
1325 : // or b) the app tells us this channel was externally negotiated. When
1326 : // these occur, we deliver the data.
1327 :
1328 : // Since this is rare and non-performance, keep a single list of queued
1329 : // data messages to deliver once the channel opens.
1330 0 : LOG(("Queuing data for stream %u, length %" PRIuSIZE, stream, length));
1331 : // Copies data
1332 0 : mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, data, length));
1333 0 : return;
1334 : }
1335 :
1336 : // XXX should this be a simple if, no warnings/debugbreaks?
1337 0 : NS_ENSURE_TRUE_VOID(channel->mState != CLOSED);
1338 :
1339 : {
1340 0 : nsAutoCString recvData(buffer, length); // copies (<64) or allocates
1341 0 : bool is_binary = true;
1342 :
1343 0 : if (ppid == DATA_CHANNEL_PPID_DOMSTRING ||
1344 : ppid == DATA_CHANNEL_PPID_DOMSTRING_LAST) {
1345 0 : is_binary = false;
1346 : }
1347 0 : if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
1348 0 : NS_WARNING("DataChannel message aborted by fragment type change!");
1349 0 : channel->mRecvBuffer.Truncate(0);
1350 : }
1351 0 : channel->mIsRecvBinary = is_binary;
1352 :
1353 0 : switch (ppid) {
1354 : case DATA_CHANNEL_PPID_DOMSTRING:
1355 : case DATA_CHANNEL_PPID_BINARY:
1356 0 : channel->mRecvBuffer += recvData;
1357 0 : LOG(("DataChannel: Partial %s message of length %" PRIuSIZE " (total %u) on channel id %u",
1358 : is_binary ? "binary" : "string", length, channel->mRecvBuffer.Length(),
1359 : channel->mStream));
1360 0 : return; // Not ready to notify application
1361 :
1362 : case DATA_CHANNEL_PPID_DOMSTRING_LAST:
1363 0 : LOG(("DataChannel: String message received of length %" PRIuSIZE " on channel %u",
1364 : length, channel->mStream));
1365 0 : if (!channel->mRecvBuffer.IsEmpty()) {
1366 0 : channel->mRecvBuffer += recvData;
1367 0 : LOG(("%s: sending ON_DATA (string fragmented) for %p", __FUNCTION__, channel));
1368 : channel->SendOrQueue(new DataChannelOnMessageAvailable(
1369 : DataChannelOnMessageAvailable::ON_DATA, this,
1370 0 : channel, channel->mRecvBuffer, -1));
1371 0 : channel->mRecvBuffer.Truncate(0);
1372 0 : return;
1373 : }
1374 : // else send using recvData normally
1375 0 : length = -1; // Flag for DOMString
1376 :
1377 : // WebSockets checks IsUTF8() here; we can try to deliver it
1378 0 : break;
1379 :
1380 : case DATA_CHANNEL_PPID_BINARY_LAST:
1381 0 : LOG(("DataChannel: Received binary message of length %" PRIuSIZE " on channel id %u",
1382 : length, channel->mStream));
1383 0 : if (!channel->mRecvBuffer.IsEmpty()) {
1384 0 : channel->mRecvBuffer += recvData;
1385 0 : LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel));
1386 : channel->SendOrQueue(new DataChannelOnMessageAvailable(
1387 : DataChannelOnMessageAvailable::ON_DATA, this,
1388 : channel, channel->mRecvBuffer,
1389 0 : channel->mRecvBuffer.Length()));
1390 0 : channel->mRecvBuffer.Truncate(0);
1391 0 : return;
1392 : }
1393 : // else send using recvData normally
1394 0 : break;
1395 :
1396 : default:
1397 0 : NS_ERROR("Unknown data PPID");
1398 0 : return;
1399 : }
1400 : /* Notify onmessage */
1401 0 : LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel));
1402 : channel->SendOrQueue(new DataChannelOnMessageAvailable(
1403 : DataChannelOnMessageAvailable::ON_DATA, this,
1404 0 : channel, recvData, length));
1405 : }
1406 : }
1407 :
1408 : // Called with mLock locked!
1409 : void
1410 0 : DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream)
1411 : {
1412 : const struct rtcweb_datachannel_open_request *req;
1413 : const struct rtcweb_datachannel_ack *ack;
1414 :
1415 0 : mLock.AssertCurrentThreadOwns();
1416 :
1417 0 : switch (ppid) {
1418 : case DATA_CHANNEL_PPID_CONTROL:
1419 0 : req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
1420 :
1421 0 : NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // smallest message
1422 0 : switch (req->msg_type) {
1423 : case DATA_CHANNEL_OPEN_REQUEST:
1424 : // structure includes a possibly-unused char label[1] (in a packed structure)
1425 0 : NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1);
1426 :
1427 0 : HandleOpenRequestMessage(req, length, stream);
1428 0 : break;
1429 : case DATA_CHANNEL_ACK:
1430 : // >= sizeof(*ack) checked above
1431 :
1432 0 : ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
1433 0 : HandleOpenAckMessage(ack, length, stream);
1434 0 : break;
1435 : default:
1436 0 : HandleUnknownMessage(ppid, length, stream);
1437 0 : break;
1438 : }
1439 0 : break;
1440 : case DATA_CHANNEL_PPID_DOMSTRING:
1441 : case DATA_CHANNEL_PPID_DOMSTRING_LAST:
1442 : case DATA_CHANNEL_PPID_BINARY:
1443 : case DATA_CHANNEL_PPID_BINARY_LAST:
1444 0 : HandleDataMessage(ppid, buffer, length, stream);
1445 0 : break;
1446 : default:
1447 0 : LOG(("Message of length %" PRIuSIZE ", PPID %u on stream %u received.",
1448 : length, ppid, stream));
1449 0 : break;
1450 : }
1451 : }
1452 :
1453 : void
1454 0 : DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac)
1455 : {
1456 : uint32_t i, n;
1457 :
1458 0 : switch (sac->sac_state) {
1459 : case SCTP_COMM_UP:
1460 0 : LOG(("Association change: SCTP_COMM_UP"));
1461 0 : if (mState == CONNECTING) {
1462 0 : mSocket = mMasterSocket;
1463 0 : mState = OPEN;
1464 :
1465 0 : SetEvenOdd();
1466 :
1467 0 : Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1468 : DataChannelOnMessageAvailable::ON_CONNECTION,
1469 0 : this)));
1470 0 : LOG(("DTLS connect() succeeded! Entering connected mode"));
1471 :
1472 : // Open any streams pending...
1473 0 : ProcessQueuedOpens();
1474 :
1475 0 : } else if (mState == OPEN) {
1476 0 : LOG(("DataConnection Already OPEN"));
1477 : } else {
1478 0 : LOG(("Unexpected state: %d", mState));
1479 : }
1480 0 : break;
1481 : case SCTP_COMM_LOST:
1482 0 : LOG(("Association change: SCTP_COMM_LOST"));
1483 : // This association is toast, so also close all the channels -- from mainthread!
1484 0 : Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1485 : DataChannelOnMessageAvailable::ON_DISCONNECTED,
1486 0 : this)));
1487 0 : break;
1488 : case SCTP_RESTART:
1489 0 : LOG(("Association change: SCTP_RESTART"));
1490 0 : break;
1491 : case SCTP_SHUTDOWN_COMP:
1492 0 : LOG(("Association change: SCTP_SHUTDOWN_COMP"));
1493 0 : Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1494 : DataChannelOnMessageAvailable::ON_DISCONNECTED,
1495 0 : this)));
1496 0 : break;
1497 : case SCTP_CANT_STR_ASSOC:
1498 0 : LOG(("Association change: SCTP_CANT_STR_ASSOC"));
1499 0 : break;
1500 : default:
1501 0 : LOG(("Association change: UNKNOWN"));
1502 0 : break;
1503 : }
1504 0 : LOG(("Association change: streams (in/out) = (%u/%u)",
1505 : sac->sac_inbound_streams, sac->sac_outbound_streams));
1506 :
1507 0 : NS_ENSURE_TRUE_VOID(sac);
1508 0 : n = sac->sac_length - sizeof(*sac);
1509 0 : if (((sac->sac_state == SCTP_COMM_UP) ||
1510 0 : (sac->sac_state == SCTP_RESTART)) && (n > 0)) {
1511 0 : for (i = 0; i < n; ++i) {
1512 0 : switch (sac->sac_info[i]) {
1513 : case SCTP_ASSOC_SUPPORTS_PR:
1514 0 : LOG(("Supports: PR"));
1515 0 : break;
1516 : case SCTP_ASSOC_SUPPORTS_AUTH:
1517 0 : LOG(("Supports: AUTH"));
1518 0 : break;
1519 : case SCTP_ASSOC_SUPPORTS_ASCONF:
1520 0 : LOG(("Supports: ASCONF"));
1521 0 : break;
1522 : case SCTP_ASSOC_SUPPORTS_MULTIBUF:
1523 0 : LOG(("Supports: MULTIBUF"));
1524 0 : break;
1525 : case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
1526 0 : LOG(("Supports: RE-CONFIG"));
1527 0 : break;
1528 : default:
1529 0 : LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
1530 0 : break;
1531 : }
1532 : }
1533 0 : } else if (((sac->sac_state == SCTP_COMM_LOST) ||
1534 0 : (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) {
1535 0 : LOG(("Association: ABORT ="));
1536 0 : for (i = 0; i < n; ++i) {
1537 0 : LOG((" 0x%02x", sac->sac_info[i]));
1538 : }
1539 : }
1540 0 : if ((sac->sac_state == SCTP_CANT_STR_ASSOC) ||
1541 0 : (sac->sac_state == SCTP_SHUTDOWN_COMP) ||
1542 0 : (sac->sac_state == SCTP_COMM_LOST)) {
1543 0 : return;
1544 : }
1545 : }
1546 :
1547 : void
1548 0 : DataChannelConnection::HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc)
1549 : {
1550 0 : const char *addr = "";
1551 : #if !defined(__Userspace_os_Windows)
1552 : char addr_buf[INET6_ADDRSTRLEN];
1553 : struct sockaddr_in *sin;
1554 : struct sockaddr_in6 *sin6;
1555 : #endif
1556 :
1557 0 : switch (spc->spc_aaddr.ss_family) {
1558 : case AF_INET:
1559 : #if !defined(__Userspace_os_Windows)
1560 0 : sin = (struct sockaddr_in *)&spc->spc_aaddr;
1561 0 : addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN);
1562 : #endif
1563 0 : break;
1564 : case AF_INET6:
1565 : #if !defined(__Userspace_os_Windows)
1566 0 : sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr;
1567 0 : addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN);
1568 : #endif
1569 0 : break;
1570 : case AF_CONN:
1571 0 : addr = "DTLS connection";
1572 0 : break;
1573 : default:
1574 0 : break;
1575 : }
1576 0 : LOG(("Peer address %s is now ", addr));
1577 0 : switch (spc->spc_state) {
1578 : case SCTP_ADDR_AVAILABLE:
1579 0 : LOG(("SCTP_ADDR_AVAILABLE"));
1580 0 : break;
1581 : case SCTP_ADDR_UNREACHABLE:
1582 0 : LOG(("SCTP_ADDR_UNREACHABLE"));
1583 0 : break;
1584 : case SCTP_ADDR_REMOVED:
1585 0 : LOG(("SCTP_ADDR_REMOVED"));
1586 0 : break;
1587 : case SCTP_ADDR_ADDED:
1588 0 : LOG(("SCTP_ADDR_ADDED"));
1589 0 : break;
1590 : case SCTP_ADDR_MADE_PRIM:
1591 0 : LOG(("SCTP_ADDR_MADE_PRIM"));
1592 0 : break;
1593 : case SCTP_ADDR_CONFIRMED:
1594 0 : LOG(("SCTP_ADDR_CONFIRMED"));
1595 0 : break;
1596 : default:
1597 0 : LOG(("UNKNOWN"));
1598 0 : break;
1599 : }
1600 0 : LOG((" (error = 0x%08x).\n", spc->spc_error));
1601 0 : }
1602 :
1603 : void
1604 0 : DataChannelConnection::HandleRemoteErrorEvent(const struct sctp_remote_error *sre)
1605 : {
1606 : size_t i, n;
1607 :
1608 0 : n = sre->sre_length - sizeof(struct sctp_remote_error);
1609 0 : LOG(("Remote Error (error = 0x%04x): ", sre->sre_error));
1610 0 : for (i = 0; i < n; ++i) {
1611 0 : LOG((" 0x%02x", sre-> sre_data[i]));
1612 : }
1613 0 : }
1614 :
1615 : void
1616 0 : DataChannelConnection::HandleShutdownEvent(const struct sctp_shutdown_event *sse)
1617 : {
1618 0 : LOG(("Shutdown event."));
1619 : /* XXX: notify all channels. */
1620 : // Attempts to actually send anything will fail
1621 0 : }
1622 :
1623 : void
1624 0 : DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai)
1625 : {
1626 0 : LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind));
1627 0 : }
1628 :
1629 : void
1630 0 : DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe)
1631 : {
1632 : size_t i, n;
1633 :
1634 0 : if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) {
1635 0 : LOG(("Unsent "));
1636 : }
1637 0 : if (ssfe->ssfe_flags & SCTP_DATA_SENT) {
1638 0 : LOG(("Sent "));
1639 : }
1640 0 : if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) {
1641 0 : LOG(("(flags = %x) ", ssfe->ssfe_flags));
1642 : }
1643 0 : LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x",
1644 : ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid,
1645 : ssfe->ssfe_info.snd_flags, ssfe->ssfe_error));
1646 0 : n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event);
1647 0 : for (i = 0; i < n; ++i) {
1648 0 : LOG((" 0x%02x", ssfe->ssfe_data[i]));
1649 : }
1650 0 : }
1651 :
1652 : void
1653 0 : DataChannelConnection::ClearResets()
1654 : {
1655 : // Clear all pending resets
1656 0 : if (!mStreamsResetting.IsEmpty()) {
1657 0 : LOG(("Clearing resets for %" PRIuSIZE " streams", mStreamsResetting.Length()));
1658 : }
1659 :
1660 0 : for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) {
1661 0 : RefPtr<DataChannel> channel;
1662 0 : channel = FindChannelByStream(mStreamsResetting[i]);
1663 0 : if (channel) {
1664 0 : LOG(("Forgetting channel %u (%p) with pending reset",channel->mStream, channel.get()));
1665 0 : mStreams[channel->mStream] = nullptr;
1666 : }
1667 : }
1668 0 : mStreamsResetting.Clear();
1669 0 : }
1670 :
1671 : void
1672 0 : DataChannelConnection::ResetOutgoingStream(uint16_t stream)
1673 : {
1674 : uint32_t i;
1675 :
1676 0 : mLock.AssertCurrentThreadOwns();
1677 0 : LOG(("Connection %p: Resetting outgoing stream %u",
1678 : (void *) this, stream));
1679 : // Rarely has more than a couple items and only for a short time
1680 0 : for (i = 0; i < mStreamsResetting.Length(); ++i) {
1681 0 : if (mStreamsResetting[i] == stream) {
1682 0 : return;
1683 : }
1684 : }
1685 0 : mStreamsResetting.AppendElement(stream);
1686 : }
1687 :
1688 : void
1689 0 : DataChannelConnection::SendOutgoingStreamReset()
1690 : {
1691 : struct sctp_reset_streams *srs;
1692 : uint32_t i;
1693 : size_t len;
1694 :
1695 0 : LOG(("Connection %p: Sending outgoing stream reset for %" PRIuSIZE " streams",
1696 : (void *) this, mStreamsResetting.Length()));
1697 0 : mLock.AssertCurrentThreadOwns();
1698 0 : if (mStreamsResetting.IsEmpty()) {
1699 0 : LOG(("No streams to reset"));
1700 0 : return;
1701 : }
1702 0 : len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t);
1703 0 : srs = static_cast<struct sctp_reset_streams *> (moz_xmalloc(len)); // infallible malloc
1704 0 : memset(srs, 0, len);
1705 0 : srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
1706 0 : srs->srs_number_streams = mStreamsResetting.Length();
1707 0 : for (i = 0; i < mStreamsResetting.Length(); ++i) {
1708 0 : srs->srs_stream_list[i] = mStreamsResetting[i];
1709 : }
1710 0 : if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) {
1711 0 : LOG(("***failed: setsockopt RESET, errno %d", errno));
1712 : // if errno == EALREADY, this is normal - we can't send another reset
1713 : // with one pending.
1714 : // When we get an incoming reset (which may be a response to our
1715 : // outstanding one), see if we have any pending outgoing resets and
1716 : // send them
1717 : } else {
1718 0 : mStreamsResetting.Clear();
1719 : }
1720 0 : free(srs);
1721 : }
1722 :
1723 : void
1724 0 : DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst)
1725 : {
1726 : uint32_t n, i;
1727 0 : RefPtr<DataChannel> channel; // since we may null out the ref to the channel
1728 :
1729 0 : if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
1730 0 : !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) {
1731 0 : n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t);
1732 0 : for (i = 0; i < n; ++i) {
1733 0 : if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
1734 0 : channel = FindChannelByStream(strrst->strreset_stream_list[i]);
1735 0 : if (channel) {
1736 : // The other side closed the channel
1737 : // We could be in three states:
1738 : // 1. Normal state (input and output streams (OPEN)
1739 : // Notify application, send a RESET in response on our
1740 : // outbound channel. Go to CLOSED
1741 : // 2. We sent our own reset (CLOSING); either they crossed on the
1742 : // wire, or this is a response to our Reset.
1743 : // Go to CLOSED
1744 : // 3. We've sent a open but haven't gotten a response yet (CONNECTING)
1745 : // I believe this is impossible, as we don't have an input stream yet.
1746 :
1747 0 : LOG(("Incoming: Channel %u closed, state %d",
1748 : channel->mStream, channel->mState));
1749 0 : ASSERT_WEBRTC(channel->mState == DataChannel::OPEN ||
1750 : channel->mState == DataChannel::CLOSING ||
1751 : channel->mState == DataChannel::CONNECTING ||
1752 : channel->mState == DataChannel::WAITING_TO_OPEN);
1753 0 : if (channel->mState == DataChannel::OPEN ||
1754 0 : channel->mState == DataChannel::WAITING_TO_OPEN) {
1755 : // Mark the stream for reset (the reset is sent below)
1756 0 : ResetOutgoingStream(channel->mStream);
1757 : }
1758 0 : mStreams[channel->mStream] = nullptr;
1759 :
1760 0 : LOG(("Disconnected DataChannel %p from connection %p",
1761 : (void *) channel.get(), (void *) channel->mConnection.get()));
1762 : // This sends ON_CHANNEL_CLOSED to mainthread
1763 0 : channel->StreamClosedLocked();
1764 : } else {
1765 0 : LOG(("Can't find incoming channel %d",i));
1766 : }
1767 : }
1768 : }
1769 : }
1770 :
1771 : // Process any pending resets now:
1772 0 : if (!mStreamsResetting.IsEmpty()) {
1773 0 : LOG(("Sending %" PRIuSIZE " pending resets", mStreamsResetting.Length()));
1774 0 : SendOutgoingStreamReset();
1775 : }
1776 0 : }
1777 :
1778 : void
1779 0 : DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg)
1780 : {
1781 : uint16_t stream;
1782 0 : RefPtr<DataChannel> channel;
1783 :
1784 0 : if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
1785 0 : LOG(("*** Failed increasing number of streams from %" PRIuSIZE " (%u/%u)",
1786 : mStreams.Length(),
1787 : strchg->strchange_instrms,
1788 : strchg->strchange_outstrms));
1789 : // XXX FIX! notify pending opens of failure
1790 0 : return;
1791 : }
1792 0 : if (strchg->strchange_instrms > mStreams.Length()) {
1793 0 : LOG(("Other side increased streams from %" PRIuSIZE " to %u",
1794 : mStreams.Length(), strchg->strchange_instrms));
1795 : }
1796 0 : if (strchg->strchange_outstrms > mStreams.Length() ||
1797 0 : strchg->strchange_instrms > mStreams.Length()) {
1798 0 : uint16_t old_len = mStreams.Length();
1799 : uint16_t new_len = std::max(strchg->strchange_outstrms,
1800 0 : strchg->strchange_instrms);
1801 0 : LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
1802 : old_len, new_len, new_len - old_len,
1803 : strchg->strchange_instrms));
1804 : // make sure both are the same length
1805 0 : mStreams.AppendElements(new_len - old_len);
1806 0 : LOG(("New length = %" PRIuSIZE " (was %d)", mStreams.Length(), old_len));
1807 0 : for (size_t i = old_len; i < mStreams.Length(); ++i) {
1808 0 : mStreams[i] = nullptr;
1809 : }
1810 : // Re-process any channels waiting for streams.
1811 : // Linear search, but we don't increase channels often and
1812 : // the array would only get long in case of an app error normally
1813 :
1814 : // Make sure we request enough streams if there's a big jump in streams
1815 : // Could make a more complex API for OpenXxxFinish() and avoid this loop
1816 0 : size_t num_needed = mPending.GetSize();
1817 0 : LOG(("%" PRIuSIZE " of %d new streams already needed", num_needed,
1818 : new_len - old_len));
1819 0 : num_needed -= (new_len - old_len); // number we added
1820 0 : if (num_needed > 0) {
1821 0 : if (num_needed < 16)
1822 0 : num_needed = 16;
1823 0 : LOG(("Not enough new streams, asking for %" PRIuSIZE " more", num_needed));
1824 0 : RequestMoreStreams(num_needed);
1825 0 : } else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
1826 0 : LOG(("Requesting %d output streams to match partner",
1827 : strchg->strchange_instrms - strchg->strchange_outstrms));
1828 0 : RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms);
1829 : }
1830 :
1831 0 : ProcessQueuedOpens();
1832 : }
1833 : // else probably not a change in # of streams
1834 :
1835 0 : for (uint32_t i = 0; i < mStreams.Length(); ++i) {
1836 0 : channel = mStreams[i];
1837 0 : if (!channel)
1838 0 : continue;
1839 :
1840 0 : if ((channel->mState == CONNECTING) &&
1841 0 : (channel->mStream == INVALID_STREAM)) {
1842 0 : if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
1843 0 : (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
1844 : /* XXX: Signal to the other end. */
1845 0 : channel->mState = CLOSED;
1846 0 : Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1847 : DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
1848 0 : channel)));
1849 : // maybe fire onError (bug 843625)
1850 : } else {
1851 0 : stream = FindFreeStream();
1852 0 : if (stream != INVALID_STREAM) {
1853 0 : channel->mStream = stream;
1854 0 : mStreams[stream] = channel;
1855 0 : channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
1856 : // Note: we're locked, so there's no danger of a race with the
1857 : // buffer-threshold callback
1858 : } else {
1859 : /* We will not find more ... */
1860 0 : break;
1861 : }
1862 : }
1863 : }
1864 : }
1865 : }
1866 :
1867 : // Called with mLock locked!
1868 : void
1869 0 : DataChannelConnection::HandleNotification(const union sctp_notification *notif, size_t n)
1870 : {
1871 0 : mLock.AssertCurrentThreadOwns();
1872 0 : if (notif->sn_header.sn_length != (uint32_t)n) {
1873 0 : return;
1874 : }
1875 0 : switch (notif->sn_header.sn_type) {
1876 : case SCTP_ASSOC_CHANGE:
1877 0 : HandleAssociationChangeEvent(&(notif->sn_assoc_change));
1878 0 : break;
1879 : case SCTP_PEER_ADDR_CHANGE:
1880 0 : HandlePeerAddressChangeEvent(&(notif->sn_paddr_change));
1881 0 : break;
1882 : case SCTP_REMOTE_ERROR:
1883 0 : HandleRemoteErrorEvent(&(notif->sn_remote_error));
1884 0 : break;
1885 : case SCTP_SHUTDOWN_EVENT:
1886 0 : HandleShutdownEvent(&(notif->sn_shutdown_event));
1887 0 : break;
1888 : case SCTP_ADAPTATION_INDICATION:
1889 0 : HandleAdaptationIndication(&(notif->sn_adaptation_event));
1890 0 : break;
1891 : case SCTP_PARTIAL_DELIVERY_EVENT:
1892 0 : LOG(("SCTP_PARTIAL_DELIVERY_EVENT"));
1893 0 : break;
1894 : case SCTP_AUTHENTICATION_EVENT:
1895 0 : LOG(("SCTP_AUTHENTICATION_EVENT"));
1896 0 : break;
1897 : case SCTP_SENDER_DRY_EVENT:
1898 : //LOG(("SCTP_SENDER_DRY_EVENT"));
1899 0 : break;
1900 : case SCTP_NOTIFICATIONS_STOPPED_EVENT:
1901 0 : LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT"));
1902 0 : break;
1903 : case SCTP_SEND_FAILED_EVENT:
1904 0 : HandleSendFailedEvent(&(notif->sn_send_failed_event));
1905 0 : break;
1906 : case SCTP_STREAM_RESET_EVENT:
1907 0 : HandleStreamResetEvent(&(notif->sn_strreset_event));
1908 0 : break;
1909 : case SCTP_ASSOC_RESET_EVENT:
1910 0 : LOG(("SCTP_ASSOC_RESET_EVENT"));
1911 0 : break;
1912 : case SCTP_STREAM_CHANGE_EVENT:
1913 0 : HandleStreamChangeEvent(&(notif->sn_strchange_event));
1914 0 : break;
1915 : default:
1916 0 : LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type));
1917 0 : break;
1918 : }
1919 : }
1920 :
1921 : int
1922 0 : DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen,
1923 : struct sctp_rcvinfo rcv, int32_t flags)
1924 : {
1925 0 : ASSERT_WEBRTC(!NS_IsMainThread());
1926 :
1927 0 : if (!data) {
1928 0 : usrsctp_close(sock); // SCTP has finished shutting down
1929 : } else {
1930 0 : MutexAutoLock lock(mLock);
1931 0 : if (flags & MSG_NOTIFICATION) {
1932 0 : HandleNotification(static_cast<union sctp_notification *>(data), datalen);
1933 : } else {
1934 0 : HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid);
1935 : }
1936 : }
1937 : // sctp allocates 'data' with malloc(), and expects the receiver to free
1938 : // it (presumably with free).
1939 : // XXX future optimization: try to deliver messages without an internal
1940 : // alloc/copy, and if so delay the free until later.
1941 0 : free(data);
1942 : // usrsctp defines the callback as returning an int, but doesn't use it
1943 0 : return 1;
1944 : }
1945 :
1946 : already_AddRefed<DataChannel>
1947 0 : DataChannelConnection::Open(const nsACString& label, const nsACString& protocol,
1948 : Type type, bool inOrder,
1949 : uint32_t prValue, DataChannelListener *aListener,
1950 : nsISupports *aContext, bool aExternalNegotiated,
1951 : uint16_t aStream)
1952 : {
1953 : // aStream == INVALID_STREAM to have the protocol allocate
1954 0 : uint16_t prPolicy = SCTP_PR_SCTP_NONE;
1955 : uint32_t flags;
1956 :
1957 0 : LOG(("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, context %p, external: %s, stream %u",
1958 : PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(),
1959 : type, inOrder, prValue, aListener, aContext,
1960 : aExternalNegotiated ? "true" : "false", aStream));
1961 0 : switch (type) {
1962 : case DATA_CHANNEL_RELIABLE:
1963 0 : prPolicy = SCTP_PR_SCTP_NONE;
1964 0 : break;
1965 : case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
1966 0 : prPolicy = SCTP_PR_SCTP_RTX;
1967 0 : break;
1968 : case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
1969 0 : prPolicy = SCTP_PR_SCTP_TTL;
1970 0 : break;
1971 : default:
1972 0 : LOG(("ERROR: unsupported channel type: %u", type));
1973 0 : MOZ_ASSERT(false);
1974 : return nullptr;
1975 : }
1976 0 : if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) {
1977 0 : return nullptr;
1978 : }
1979 :
1980 : // Don't look past currently-negotiated streams
1981 0 : if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) {
1982 0 : LOG(("ERROR: external negotiation of already-open channel %u", aStream));
1983 : // XXX How do we indicate this up to the application? Probably the
1984 : // caller's job, but we may need to return an error code.
1985 0 : return nullptr;
1986 : }
1987 :
1988 0 : flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
1989 : RefPtr<DataChannel> channel(new DataChannel(this,
1990 : aStream,
1991 : DataChannel::CONNECTING,
1992 : label, protocol,
1993 : prPolicy, prValue,
1994 : flags,
1995 0 : aListener, aContext));
1996 0 : if (aExternalNegotiated) {
1997 0 : channel->mFlags |= DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED;
1998 : }
1999 :
2000 0 : MutexAutoLock lock(mLock); // OpenFinish assumes this
2001 0 : return OpenFinish(channel.forget());
2002 : }
2003 :
2004 : // Separate routine so we can also call it to finish up from pending opens
2005 : already_AddRefed<DataChannel>
2006 0 : DataChannelConnection::OpenFinish(already_AddRefed<DataChannel>&& aChannel)
2007 : {
2008 0 : RefPtr<DataChannel> channel(aChannel); // takes the reference passed in
2009 : // Normally 1 reference if called from ::Open(), or 2 if called from
2010 : // ProcessQueuedOpens() unless the DOMDataChannel was gc'd
2011 0 : uint16_t stream = channel->mStream;
2012 0 : bool queue = false;
2013 :
2014 0 : mLock.AssertCurrentThreadOwns();
2015 :
2016 : // Cases we care about:
2017 : // Pre-negotiated:
2018 : // Not Open:
2019 : // Doesn't fit:
2020 : // -> change initial ask or renegotiate after open
2021 : // -> queue open
2022 : // Open:
2023 : // Doesn't fit:
2024 : // -> RequestMoreStreams && queue
2025 : // Does fit:
2026 : // -> open
2027 : // Not negotiated:
2028 : // Not Open:
2029 : // -> queue open
2030 : // Open:
2031 : // -> Try to get a stream
2032 : // Doesn't fit:
2033 : // -> RequestMoreStreams && queue
2034 : // Does fit:
2035 : // -> open
2036 : // So the Open cases are basically the same
2037 : // Not Open cases are simply queue for non-negotiated, and
2038 : // either change the initial ask or possibly renegotiate after open.
2039 :
2040 0 : if (mState == OPEN) {
2041 0 : if (stream == INVALID_STREAM) {
2042 0 : stream = FindFreeStream(); // may be INVALID_STREAM if we need more
2043 : }
2044 0 : if (stream == INVALID_STREAM || stream >= mStreams.Length()) {
2045 : // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra streams
2046 : // to avoid going back immediately for more if the ask to N, N+1, etc
2047 0 : int32_t more_needed = (stream == INVALID_STREAM) ? 16 :
2048 0 : (stream-((int32_t)mStreams.Length())) + 16;
2049 0 : if (!RequestMoreStreams(more_needed)) {
2050 : // Something bad happened... we're done
2051 0 : goto request_error_cleanup;
2052 : }
2053 0 : queue = true;
2054 : }
2055 : } else {
2056 : // not OPEN
2057 0 : if (stream != INVALID_STREAM && stream >= mStreams.Length() &&
2058 0 : mState == CLOSED) {
2059 : // Update number of streams for init message
2060 : struct sctp_initmsg initmsg;
2061 0 : socklen_t len = sizeof(initmsg);
2062 0 : int32_t total_needed = stream+16;
2063 :
2064 0 : memset(&initmsg, 0, sizeof(initmsg));
2065 0 : if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
2066 0 : LOG(("*** failed getsockopt SCTP_INITMSG"));
2067 0 : goto request_error_cleanup;
2068 : }
2069 0 : LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed,
2070 : initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
2071 0 : initmsg.sinit_num_ostreams = total_needed;
2072 0 : initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
2073 0 : if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
2074 : (socklen_t)sizeof(initmsg)) < 0) {
2075 0 : LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
2076 0 : goto request_error_cleanup;
2077 : }
2078 :
2079 0 : int32_t old_len = mStreams.Length();
2080 0 : mStreams.AppendElements(total_needed - old_len);
2081 0 : for (int32_t i = old_len; i < total_needed; ++i) {
2082 0 : mStreams[i] = nullptr;
2083 : }
2084 : }
2085 : // else if state is CONNECTING, we'll just re-negotiate when OpenFinish
2086 : // is called, if needed
2087 0 : queue = true;
2088 : }
2089 0 : if (queue) {
2090 0 : LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream));
2091 : // Also serves to mark we told the app
2092 0 : channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN;
2093 : // we need a ref for the nsDeQue and one to return
2094 0 : DataChannel* rawChannel = channel;
2095 0 : rawChannel->AddRef();
2096 0 : mPending.Push(rawChannel);
2097 0 : return channel.forget();
2098 : }
2099 :
2100 0 : MOZ_ASSERT(stream != INVALID_STREAM);
2101 : // just allocated (& OPEN), or externally negotiated
2102 0 : mStreams[stream] = channel; // holds a reference
2103 0 : channel->mStream = stream;
2104 :
2105 : #ifdef TEST_QUEUED_DATA
2106 : // It's painful to write a test for this...
2107 : channel->mState = OPEN;
2108 : channel->mReady = true;
2109 : SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST);
2110 : #endif
2111 :
2112 0 : if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) {
2113 : // Don't send unordered until this gets cleared
2114 0 : channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK;
2115 : }
2116 :
2117 0 : if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
2118 0 : if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
2119 : stream,
2120 0 : !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
2121 0 : channel->mPrPolicy, channel->mPrValue)) {
2122 0 : LOG(("SendOpenRequest failed, errno = %d", errno));
2123 0 : if (errno == EAGAIN || errno == EWOULDBLOCK) {
2124 0 : channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
2125 : // Note: we're locked, so there's no danger of a race with the
2126 : // buffer-threshold callback
2127 0 : return channel.forget();
2128 : }
2129 0 : if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
2130 : // We already returned the channel to the app.
2131 0 : NS_ERROR("Failed to send open request");
2132 0 : Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2133 : DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
2134 0 : channel)));
2135 : }
2136 : // If we haven't returned the channel yet, it will get destroyed when we exit
2137 : // this function.
2138 0 : mStreams[stream] = nullptr;
2139 0 : channel->mStream = INVALID_STREAM;
2140 : // we'll be destroying the channel
2141 0 : channel->mState = CLOSED;
2142 0 : return nullptr;
2143 : /* NOTREACHED */
2144 : }
2145 : }
2146 : // Either externally negotiated or we sent Open
2147 0 : channel->mState = OPEN;
2148 0 : channel->mReady = true;
2149 : // FIX? Move into DOMDataChannel? I don't think we can send it yet here
2150 0 : LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
2151 0 : Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2152 : DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
2153 0 : channel)));
2154 :
2155 0 : return channel.forget();
2156 :
2157 : request_error_cleanup:
2158 0 : channel->mState = CLOSED;
2159 0 : if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
2160 : // We already returned the channel to the app.
2161 0 : NS_ERROR("Failed to request more streams");
2162 0 : Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2163 : DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
2164 0 : channel)));
2165 0 : return channel.forget();
2166 : }
2167 : // we'll be destroying the channel, but it never really got set up
2168 : // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
2169 : // Dispatch it to ourselves
2170 0 : return nullptr;
2171 : }
2172 :
2173 : int32_t
2174 0 : DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data,
2175 : size_t length, uint32_t ppid)
2176 : {
2177 : uint16_t flags;
2178 : struct sctp_sendv_spa spa;
2179 : int32_t result;
2180 :
2181 0 : NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0);
2182 0 : NS_WARNING_ASSERTION(length > 0, "Length is 0?!");
2183 :
2184 : // To avoid problems where an in-order OPEN is lost and an
2185 : // out-of-order data message "beats" it, require data to be in-order
2186 : // until we get an ACK.
2187 0 : if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
2188 0 : !(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
2189 0 : flags = SCTP_UNORDERED;
2190 : } else {
2191 0 : flags = 0;
2192 : }
2193 :
2194 0 : spa.sendv_sndinfo.snd_ppid = htonl(ppid);
2195 0 : spa.sendv_sndinfo.snd_sid = channel->mStream;
2196 0 : spa.sendv_sndinfo.snd_flags = flags;
2197 0 : spa.sendv_sndinfo.snd_context = 0;
2198 0 : spa.sendv_sndinfo.snd_assoc_id = 0;
2199 0 : spa.sendv_flags = SCTP_SEND_SNDINFO_VALID;
2200 :
2201 0 : if (channel->mPrPolicy != SCTP_PR_SCTP_NONE) {
2202 0 : spa.sendv_prinfo.pr_policy = channel->mPrPolicy;
2203 0 : spa.sendv_prinfo.pr_value = channel->mPrValue;
2204 0 : spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
2205 : }
2206 :
2207 : // Note: Main-thread IO, but doesn't block!
2208 : // XXX FIX! to deal with heavy overruns of JS trying to pass data in
2209 : // (more than the buffersize) queue data onto another thread to do the
2210 : // actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp
2211 :
2212 : // SCTP will return EMSGSIZE if the message is bigger than the buffer
2213 : // size (or EAGAIN if there isn't space)
2214 :
2215 : // Avoid a race between buffer-full-failure (where we have to add the
2216 : // packet to the buffered-data queue) and the buffer-now-only-half-full
2217 : // callback, which happens on a different thread. Otherwise we might
2218 : // fail here, then before we add it to the queue get the half-full
2219 : // callback, find nothing to do, then on this thread add it to the
2220 : // queue - which would sit there. Also, if we later send more data, it
2221 : // would arrive ahead of the buffered message, but if the buffer ever
2222 : // got to 1/2 full, the message would get sent - but at a semi-random
2223 : // time, after other data it was supposed to be in front of.
2224 :
2225 : // Must lock before empty check for similar reasons!
2226 0 : MutexAutoLock lock(mLock);
2227 0 : if (channel->mBufferedData.IsEmpty()) {
2228 0 : result = usrsctp_sendv(mSocket, data, length,
2229 : nullptr, 0,
2230 : (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa),
2231 : SCTP_SENDV_SPA, 0);
2232 0 : LOG(("Sent buffer (len=%" PRIuSIZE "), result=%d", length, result));
2233 : } else {
2234 : // Fake EAGAIN if we're already buffering data
2235 0 : result = -1;
2236 0 : errno = EAGAIN;
2237 : }
2238 0 : if (result < 0) {
2239 0 : if (errno == EAGAIN || errno == EWOULDBLOCK) {
2240 :
2241 : // queue data for resend! And queue any further data for the stream until it is...
2242 0 : auto *buffered = new BufferedMsg(spa, data, length); // infallible malloc
2243 0 : channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array
2244 0 : channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA;
2245 0 : LOG(("Queued %" PRIuSIZE " buffers (len=%" PRIuSIZE ")",
2246 : channel->mBufferedData.Length(), length));
2247 0 : return 0;
2248 : }
2249 0 : LOG(("error %d sending string", errno));
2250 : }
2251 0 : return result;
2252 : }
2253 :
2254 : // Handles fragmenting binary messages
2255 : int32_t
2256 0 : DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
2257 : size_t len,
2258 : uint32_t ppid_partial, uint32_t ppid_final)
2259 : {
2260 : // Since there's a limit on network buffer size and no limits on message
2261 : // size, and we don't want to use EOR mode (multiple writes for a
2262 : // message, but all other streams are blocked until you finish sending
2263 : // this message), we need to add application-level fragmentation of large
2264 : // messages. On a reliable channel, these can be simply rebuilt into a
2265 : // large message. On an unreliable channel, we can't and don't know how
2266 : // long to wait, and there are no retransmissions, and no easy way to
2267 : // tell the user "this part is missing", so on unreliable channels we
2268 : // need to return an error if sending more bytes than the network buffers
2269 : // can hold, and perhaps a lower number.
2270 :
2271 : // We *really* don't want to do this from main thread! - and SendMsgInternal
2272 : // avoids blocking.
2273 : // This MUST be reliable and in-order for the reassembly to work
2274 0 : if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
2275 0 : channel->mPrPolicy == DATA_CHANNEL_RELIABLE &&
2276 0 : !(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
2277 0 : int32_t sent=0;
2278 0 : uint32_t origlen = len;
2279 0 : LOG(("Sending binary message length %" PRIuSIZE " in chunks", len));
2280 : // XXX check flags for out-of-order, or force in-order for large binary messages
2281 0 : while (len > 0) {
2282 0 : size_t sendlen = std::min<size_t>(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
2283 : uint32_t ppid;
2284 0 : len -= sendlen;
2285 0 : ppid = len > 0 ? ppid_partial : ppid_final;
2286 0 : LOG(("Send chunk of %" PRIuSIZE " bytes, ppid %u", sendlen, ppid));
2287 : // Note that these might end up being deferred and queued.
2288 0 : sent += SendMsgInternal(channel, data, sendlen, ppid);
2289 0 : data += sendlen;
2290 : }
2291 0 : LOG(("Sent %d buffers for %u bytes, %d sent immediately, %" PRIuSIZE " buffers queued",
2292 : (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT,
2293 : origlen, sent,
2294 : channel->mBufferedData.Length()));
2295 0 : return sent;
2296 : }
2297 0 : NS_WARNING_ASSERTION(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
2298 : "Sending too-large data on unreliable channel!");
2299 :
2300 : // This will fail if the message is too large (default 256K)
2301 0 : return SendMsgInternal(channel, data, len, ppid_final);
2302 : }
2303 :
2304 0 : class ReadBlobRunnable : public Runnable {
2305 : public:
2306 0 : ReadBlobRunnable(DataChannelConnection* aConnection,
2307 : uint16_t aStream,
2308 : nsIInputStream* aBlob)
2309 0 : : Runnable("ReadBlobRunnable")
2310 : , mConnection(aConnection)
2311 : , mStream(aStream)
2312 0 : , mBlob(aBlob)
2313 0 : {}
2314 :
2315 0 : NS_IMETHOD Run() override {
2316 : // ReadBlob() is responsible to releasing the reference
2317 0 : DataChannelConnection *self = mConnection;
2318 0 : self->ReadBlob(mConnection.forget(), mStream, mBlob);
2319 0 : return NS_OK;
2320 : }
2321 :
2322 : private:
2323 : // Make sure the Connection doesn't die while there are jobs outstanding.
2324 : // Let it die (if released by PeerConnectionImpl while we're running)
2325 : // when we send our runnable back to MainThread. Then ~DataChannelConnection
2326 : // can send the IOThread to MainThread to die in a runnable, avoiding
2327 : // unsafe event loop recursion. Evil.
2328 : RefPtr<DataChannelConnection> mConnection;
2329 : uint16_t mStream;
2330 : // Use RefCount for preventing the object is deleted when SendBlob returns.
2331 : RefPtr<nsIInputStream> mBlob;
2332 : };
2333 :
2334 : int32_t
2335 0 : DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob)
2336 : {
2337 0 : DataChannel *channel = mStreams[stream];
2338 0 : NS_ENSURE_TRUE(channel, 0);
2339 : // Spawn a thread to send the data
2340 0 : if (!mInternalIOThread) {
2341 0 : nsresult rv = NS_NewNamedThread("DataChannel IO",
2342 0 : getter_AddRefs(mInternalIOThread));
2343 0 : if (NS_FAILED(rv)) {
2344 0 : return -1;
2345 : }
2346 : }
2347 :
2348 0 : mInternalIOThread->Dispatch(do_AddRef(new ReadBlobRunnable(this, stream, aBlob)), NS_DISPATCH_NORMAL);
2349 0 : return 0;
2350 : }
2351 :
2352 : class DataChannelBlobSendRunnable : public Runnable
2353 : {
2354 : public:
2355 0 : DataChannelBlobSendRunnable(
2356 : already_AddRefed<DataChannelConnection>& aConnection,
2357 : uint16_t aStream)
2358 0 : : Runnable("DataChannelBlobSendRunnable")
2359 : , mConnection(aConnection)
2360 0 : , mStream(aStream)
2361 : {
2362 0 : }
2363 :
2364 0 : ~DataChannelBlobSendRunnable() override
2365 0 : {
2366 0 : if (!NS_IsMainThread() && mConnection) {
2367 0 : MOZ_ASSERT(false);
2368 : // explicitly leak the connection if destroyed off mainthread
2369 : Unused << mConnection.forget().take();
2370 : }
2371 0 : }
2372 :
2373 0 : NS_IMETHOD Run() override
2374 : {
2375 0 : ASSERT_WEBRTC(NS_IsMainThread());
2376 :
2377 0 : mConnection->SendBinaryMsg(mStream, mData);
2378 0 : mConnection = nullptr;
2379 0 : return NS_OK;
2380 : }
2381 :
2382 : // explicitly public so we can avoid allocating twice and copying
2383 : nsCString mData;
2384 :
2385 : private:
2386 : // Note: we can be destroyed off the target thread, so be careful not to let this
2387 : // get Released()ed on the temp thread!
2388 : RefPtr<DataChannelConnection> mConnection;
2389 : uint16_t mStream;
2390 : };
2391 :
2392 : void
2393 0 : DataChannelConnection::ReadBlob(already_AddRefed<DataChannelConnection> aThis,
2394 : uint16_t aStream, nsIInputStream* aBlob)
2395 : {
2396 : // NOTE: 'aThis' has been forgotten by the caller to avoid releasing
2397 : // it off mainthread; if PeerConnectionImpl has released then we want
2398 : // ~DataChannelConnection() to run on MainThread
2399 :
2400 : // XXX to do this safely, we must enqueue these atomically onto the
2401 : // output socket. We need a sender thread(s?) to enqueue data into the
2402 : // socket and to avoid main-thread IO that might block. Even on a
2403 : // background thread, we may not want to block on one stream's data.
2404 : // I.e. run non-blocking and service multiple channels.
2405 :
2406 : // For now as a hack, send as a single blast of queued packets which may
2407 : // be deferred until buffer space is available.
2408 : uint64_t len;
2409 :
2410 : // Must not let Dispatching it cause the DataChannelConnection to get
2411 : // released on the wrong thread. Using WrapRunnable(RefPtr<DataChannelConnection>(aThis),...
2412 : // will occasionally cause aThis to get released on this thread. Also, an explicit Runnable
2413 : // lets us avoid copying the blob data an extra time.
2414 : RefPtr<DataChannelBlobSendRunnable> runnable = new DataChannelBlobSendRunnable(aThis,
2415 0 : aStream);
2416 : // avoid copying the blob data by passing the mData from the runnable
2417 0 : if (NS_FAILED(aBlob->Available(&len)) ||
2418 0 : NS_FAILED(NS_ReadInputStreamToString(aBlob, runnable->mData, len))) {
2419 : // Bug 966602: Doesn't return an error to the caller via onerror.
2420 : // We must release DataChannelConnection on MainThread to avoid issues (bug 876167)
2421 : // aThis is now owned by the runnable; release it there
2422 : NS_ReleaseOnMainThread(
2423 0 : "DataChannelBlobSendRunnable", runnable.forget());
2424 0 : return;
2425 : }
2426 0 : aBlob->Close();
2427 0 : Dispatch(runnable.forget());
2428 : }
2429 :
2430 : void
2431 0 : DataChannelConnection::GetStreamIds(std::vector<uint16_t>* aStreamList)
2432 : {
2433 0 : ASSERT_WEBRTC(NS_IsMainThread());
2434 0 : for (uint32_t i = 0; i < mStreams.Length(); ++i) {
2435 0 : if (mStreams[i]) {
2436 0 : aStreamList->push_back(mStreams[i]->mStream);
2437 : }
2438 : }
2439 0 : }
2440 :
2441 : int32_t
2442 0 : DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg,
2443 : bool isBinary)
2444 : {
2445 0 : ASSERT_WEBRTC(NS_IsMainThread());
2446 : // We really could allow this from other threads, so long as we deal with
2447 : // asynchronosity issues with channels closing, in particular access to
2448 : // mStreams, and issues with the association closing (access to mSocket).
2449 :
2450 0 : const char *data = aMsg.BeginReading();
2451 0 : uint32_t len = aMsg.Length();
2452 : DataChannel *channel;
2453 :
2454 0 : LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len));
2455 : // XXX if we want more efficiency, translate flags once at open time
2456 0 : channel = mStreams[stream];
2457 0 : NS_ENSURE_TRUE(channel, 0);
2458 :
2459 0 : if (isBinary)
2460 0 : return SendBinary(channel, data, len,
2461 0 : DATA_CHANNEL_PPID_BINARY, DATA_CHANNEL_PPID_BINARY_LAST);
2462 0 : return SendBinary(channel, data, len,
2463 0 : DATA_CHANNEL_PPID_DOMSTRING, DATA_CHANNEL_PPID_DOMSTRING_LAST);
2464 : }
2465 :
2466 : void
2467 0 : DataChannelConnection::Close(DataChannel *aChannel)
2468 : {
2469 0 : MutexAutoLock lock(mLock);
2470 0 : CloseInt(aChannel);
2471 0 : }
2472 :
2473 : // So we can call Close() with the lock already held
2474 : // Called from someone who holds a ref via ::Close(), or from ~DataChannel
2475 : void
2476 0 : DataChannelConnection::CloseInt(DataChannel *aChannel)
2477 : {
2478 0 : MOZ_ASSERT(aChannel);
2479 0 : RefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us
2480 :
2481 0 : mLock.AssertCurrentThreadOwns();
2482 0 : LOG(("Connection %p/Channel %p: Closing stream %u",
2483 : channel->mConnection.get(), channel.get(), channel->mStream));
2484 : // re-test since it may have closed before the lock was grabbed
2485 0 : if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) {
2486 0 : LOG(("Channel already closing/closed (%u)", aChannel->mState));
2487 0 : if (mState == CLOSED && channel->mStream != INVALID_STREAM) {
2488 : // called from CloseAll()
2489 : // we're not going to hang around waiting any more
2490 0 : mStreams[channel->mStream] = nullptr;
2491 : }
2492 0 : return;
2493 : }
2494 0 : aChannel->mBufferedData.Clear();
2495 0 : if (channel->mStream != INVALID_STREAM) {
2496 0 : ResetOutgoingStream(channel->mStream);
2497 0 : if (mState == CLOSED) { // called from CloseAll()
2498 : // Let resets accumulate then send all at once in CloseAll()
2499 : // we're not going to hang around waiting
2500 0 : mStreams[channel->mStream] = nullptr;
2501 : } else {
2502 0 : SendOutgoingStreamReset();
2503 : }
2504 : }
2505 0 : aChannel->mState = CLOSING;
2506 0 : if (mState == CLOSED) {
2507 : // we're not going to hang around waiting
2508 0 : channel->StreamClosedLocked();
2509 : }
2510 : // At this point when we leave here, the object is a zombie held alive only by the DOM object
2511 : }
2512 :
2513 0 : void DataChannelConnection::CloseAll()
2514 : {
2515 0 : LOG(("Closing all channels (connection %p)", (void*) this));
2516 : // Don't need to lock here
2517 :
2518 : // Make sure no more channels will be opened
2519 : {
2520 0 : MutexAutoLock lock(mLock);
2521 0 : mState = CLOSED;
2522 : }
2523 :
2524 : // Close current channels
2525 : // If there are runnables, they hold a strong ref and keep the channel
2526 : // and/or connection alive (even if in a CLOSED state)
2527 0 : bool closed_some = false;
2528 0 : for (uint32_t i = 0; i < mStreams.Length(); ++i) {
2529 0 : if (mStreams[i]) {
2530 0 : mStreams[i]->Close();
2531 0 : closed_some = true;
2532 : }
2533 : }
2534 :
2535 : // Clean up any pending opens for channels
2536 0 : RefPtr<DataChannel> channel;
2537 0 : while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(mPending.PopFront())))) {
2538 0 : LOG(("closing pending channel %p, stream %u", channel.get(), channel->mStream));
2539 0 : channel->Close(); // also releases the ref on each iteration
2540 0 : closed_some = true;
2541 : }
2542 : // It's more efficient to let the Resets queue in shutdown and then
2543 : // SendOutgoingStreamReset() here.
2544 0 : if (closed_some) {
2545 0 : MutexAutoLock lock(mLock);
2546 0 : SendOutgoingStreamReset();
2547 : }
2548 0 : }
2549 :
2550 0 : DataChannel::~DataChannel()
2551 : {
2552 : // NS_ASSERTION since this is more "I think I caught all the cases that
2553 : // can cause this" than a true kill-the-program assertion. If this is
2554 : // wrong, nothing bad happens. A worst it's a leak.
2555 0 : NS_ASSERTION(mState == CLOSED || mState == CLOSING, "unexpected state in ~DataChannel");
2556 0 : }
2557 :
2558 : void
2559 0 : DataChannel::Close()
2560 : {
2561 0 : if (mConnection) {
2562 : // ensure we don't get deleted
2563 0 : RefPtr<DataChannelConnection> connection(mConnection);
2564 0 : connection->Close(this);
2565 : }
2566 0 : }
2567 :
2568 : // Used when disconnecting from the DataChannelConnection
2569 : void
2570 0 : DataChannel::StreamClosedLocked()
2571 : {
2572 0 : mConnection->mLock.AssertCurrentThreadOwns();
2573 0 : ENSURE_DATACONNECTION;
2574 :
2575 0 : LOG(("Destroying Data channel %u", mStream));
2576 0 : MOZ_ASSERT_IF(mStream != INVALID_STREAM,
2577 : !mConnection->FindChannelByStream(mStream));
2578 0 : mStream = INVALID_STREAM;
2579 0 : mState = CLOSED;
2580 0 : mMainThreadEventTarget->Dispatch(
2581 0 : do_AddRef(new DataChannelOnMessageAvailable(
2582 : DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED,
2583 0 : mConnection, this)));
2584 : // We leave mConnection live until the DOM releases us, to avoid races
2585 : }
2586 :
2587 : void
2588 0 : DataChannel::ReleaseConnection()
2589 : {
2590 0 : ASSERT_WEBRTC(NS_IsMainThread());
2591 0 : mConnection = nullptr;
2592 0 : }
2593 :
2594 : void
2595 0 : DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext)
2596 : {
2597 0 : MutexAutoLock mLock(mListenerLock);
2598 0 : mContext = aContext;
2599 0 : mListener = aListener;
2600 0 : }
2601 :
2602 : // May be called from another (i.e. Main) thread!
2603 : void
2604 0 : DataChannel::AppReady()
2605 : {
2606 0 : ENSURE_DATACONNECTION;
2607 :
2608 0 : MutexAutoLock lock(mConnection->mLock);
2609 :
2610 0 : mReady = true;
2611 0 : if (mState == WAITING_TO_OPEN) {
2612 0 : mState = OPEN;
2613 0 : mMainThreadEventTarget->Dispatch(
2614 0 : do_AddRef(new DataChannelOnMessageAvailable(
2615 : DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection,
2616 0 : this)));
2617 0 : for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) {
2618 0 : nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i];
2619 0 : MOZ_ASSERT(runnable);
2620 0 : mMainThreadEventTarget->Dispatch(runnable.forget());
2621 : }
2622 : } else {
2623 0 : NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN");
2624 : }
2625 0 : mQueuedMessages.Clear();
2626 0 : mQueuedMessages.Compact();
2627 : // We never use it again... We could even allocate the array in the odd
2628 : // cases we need it.
2629 : }
2630 :
2631 : uint32_t
2632 0 : DataChannel::GetBufferedAmountLocked() const
2633 : {
2634 0 : size_t buffered = 0;
2635 :
2636 0 : for (auto& buffer : mBufferedData) {
2637 0 : buffered += buffer->mLength;
2638 : }
2639 : // XXX Note: per Michael Tuexen, there's no way to currently get the buffered
2640 : // amount from the SCTP stack for a single stream. It is on their to-do
2641 : // list, and once we import a stack with support for that, we'll need to
2642 : // add it to what we buffer. Also we'll need to ask for notification of a per-
2643 : // stream buffer-low event and merge that into the handling of buffer-low
2644 : // (the equivalent to TCP_NOTSENT_LOWAT on TCP sockets)
2645 :
2646 0 : if (buffered > UINT32_MAX) { // paranoia - >4GB buffered is very very unlikely
2647 0 : buffered = UINT32_MAX;
2648 : }
2649 0 : return buffered;
2650 : }
2651 :
2652 : uint32_t
2653 0 : DataChannel::GetBufferedAmountLowThreshold()
2654 : {
2655 0 : return mBufferedThreshold;
2656 : }
2657 :
2658 : // Never fire immediately, as it's defined to fire on transitions, not state
2659 : void
2660 0 : DataChannel::SetBufferedAmountLowThreshold(uint32_t aThreshold)
2661 : {
2662 0 : mBufferedThreshold = aThreshold;
2663 0 : }
2664 :
2665 : // Called with mLock locked!
2666 : void
2667 0 : DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage)
2668 : {
2669 0 : if (!mReady &&
2670 0 : (mState == CONNECTING || mState == WAITING_TO_OPEN)) {
2671 0 : mQueuedMessages.AppendElement(aMessage);
2672 : } else {
2673 0 : nsCOMPtr<nsIRunnable> runnable = aMessage;
2674 0 : mMainThreadEventTarget->Dispatch(runnable.forget());
2675 : }
2676 0 : }
2677 :
2678 : } // namespace mozilla
|