Line data Source code
1 : /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 : /* vim: set ts=2 et sw=2 tw=80: */
3 : /* This Source Code Form is subject to the terms of the Mozilla Public
4 : * License, v. 2.0. If a copy of the MPL was not distributed with this file,
5 : * You can obtain one at http://mozilla.org/MPL/2.0/. */
6 :
7 : // Original author: ekr@rtfm.com
8 : #include <deque>
9 :
10 : #include "logging.h"
11 : #include "runnable_utils.h"
12 : #include "transportflow.h"
13 : #include "transportlayer.h"
14 :
15 : namespace mozilla {
16 :
17 0 : MOZ_MTLOG_MODULE("mtransport")
18 :
19 0 : NS_IMPL_ISUPPORTS0(TransportFlow)
20 :
21 : // There are some hacks here to allow destruction off of
22 : // the main thread.
23 0 : TransportFlow::~TransportFlow() {
24 : // Make sure that if we are off the right thread, we have
25 : // no more attached signals.
26 0 : if (!CheckThreadInt()) {
27 0 : MOZ_ASSERT(SignalStateChange.is_empty());
28 0 : MOZ_ASSERT(SignalPacketReceived.is_empty());
29 : }
30 :
31 : // Push the destruction onto the STS thread. Note that there
32 : // is still some possibility that someone is accessing this
33 : // object simultaneously, but as long as smart pointer discipline
34 : // is maintained, it shouldn't be possible to access and
35 : // destroy it simultaneously. The conversion to an nsAutoPtr
36 : // ensures automatic destruction of the queue at exit of
37 : // DestroyFinal.
38 0 : nsAutoPtr<std::deque<TransportLayer*>> layers_tmp(layers_.release());
39 0 : RUN_ON_THREAD(target_,
40 0 : WrapRunnableNM(&TransportFlow::DestroyFinal, layers_tmp),
41 0 : NS_DISPATCH_NORMAL);
42 0 : }
43 :
44 0 : void TransportFlow::DestroyFinal(nsAutoPtr<std::deque<TransportLayer *> > layers) {
45 0 : ClearLayers(layers.get());
46 0 : }
47 :
48 0 : void TransportFlow::ClearLayers(std::queue<TransportLayer *>* layers) {
49 0 : while (!layers->empty()) {
50 0 : delete layers->front();
51 0 : layers->pop();
52 : }
53 0 : }
54 :
55 0 : void TransportFlow::ClearLayers(std::deque<TransportLayer *>* layers) {
56 0 : while (!layers->empty()) {
57 0 : delete layers->front();
58 0 : layers->pop_front();
59 : }
60 0 : }
61 :
62 0 : nsresult TransportFlow::PushLayer(TransportLayer *layer) {
63 0 : CheckThread();
64 0 : UniquePtr<TransportLayer> layer_tmp(layer); // Destroy on failure.
65 :
66 : // Don't allow pushes once we are in error state.
67 0 : if (state_ == TransportLayer::TS_ERROR) {
68 0 : MOZ_MTLOG(ML_ERROR, id_ + ": Can't call PushLayer in error state for flow");
69 0 : return NS_ERROR_FAILURE;
70 : }
71 :
72 0 : nsresult rv = layer->Init();
73 0 : if (!NS_SUCCEEDED(rv)) {
74 : // Destroy the rest of the flow, because it's no longer in an acceptable
75 : // state.
76 0 : ClearLayers(layers_.get());
77 :
78 : // Set ourselves to have failed.
79 0 : MOZ_MTLOG(ML_ERROR, id_ << ": Layer initialization failed; invalidating");
80 0 : StateChangeInt(TransportLayer::TS_ERROR);
81 :
82 0 : return rv;
83 : }
84 0 : EnsureSameThread(layer);
85 :
86 0 : TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front();
87 :
88 : // Re-target my signals to the new layer
89 0 : if (old_layer) {
90 0 : old_layer->SignalStateChange.disconnect(this);
91 0 : old_layer->SignalPacketReceived.disconnect(this);
92 : }
93 0 : layers_->push_front(layer_tmp.release());
94 0 : layer->Inserted(this, old_layer);
95 :
96 0 : layer->SignalStateChange.connect(this, &TransportFlow::StateChange);
97 0 : layer->SignalPacketReceived.connect(this, &TransportFlow::PacketReceived);
98 0 : StateChangeInt(layer->state());
99 :
100 0 : return NS_OK;
101 : }
102 :
103 : // This is all-or-nothing.
104 0 : nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > layers) {
105 0 : CheckThread();
106 :
107 0 : MOZ_ASSERT(!layers->empty());
108 0 : if (layers->empty()) {
109 0 : MOZ_MTLOG(ML_ERROR, id_ << ": Can't call PushLayers with empty layers");
110 0 : return NS_ERROR_INVALID_ARG;
111 : }
112 :
113 : // Don't allow pushes once we are in error state.
114 0 : if (state_ == TransportLayer::TS_ERROR) {
115 0 : MOZ_MTLOG(ML_ERROR,
116 : id_ << ": Can't call PushLayers in error state for flow ");
117 0 : ClearLayers(layers.get());
118 0 : return NS_ERROR_FAILURE;
119 : }
120 :
121 0 : nsresult rv = NS_OK;
122 :
123 : // Disconnect all the old signals.
124 0 : disconnect_all();
125 :
126 : TransportLayer *layer;
127 :
128 0 : while (!layers->empty()) {
129 0 : TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front();
130 0 : layer = layers->front();
131 :
132 0 : rv = layer->Init();
133 0 : if (NS_FAILED(rv)) {
134 0 : MOZ_MTLOG(ML_ERROR,
135 : id_ << ": Layer initialization failed; invalidating flow ");
136 0 : break;
137 : }
138 :
139 0 : EnsureSameThread(layer);
140 :
141 : // Push the layer onto the queue.
142 0 : layers_->push_front(layer);
143 0 : layers->pop();
144 0 : layer->Inserted(this, old_layer);
145 : }
146 :
147 0 : if (NS_FAILED(rv)) {
148 : // Destroy any layers we could not push.
149 0 : ClearLayers(layers.get());
150 :
151 : // Now destroy the rest of the flow, because it's no longer
152 : // in an acceptable state.
153 0 : ClearLayers(layers_.get());
154 :
155 : // Set ourselves to have failed.
156 0 : StateChangeInt(TransportLayer::TS_ERROR);
157 :
158 : // Return failure.
159 0 : return rv;
160 : }
161 :
162 : // Finally, attach ourselves to the top layer.
163 0 : layer->SignalStateChange.connect(this, &TransportFlow::StateChange);
164 0 : layer->SignalPacketReceived.connect(this, &TransportFlow::PacketReceived);
165 0 : StateChangeInt(layer->state()); // Signals if the state changes.
166 :
167 0 : return NS_OK;
168 : }
169 :
170 0 : TransportLayer *TransportFlow::top() const {
171 0 : CheckThread();
172 :
173 0 : return layers_->empty() ? nullptr : layers_->front();
174 : }
175 :
176 0 : TransportLayer *TransportFlow::GetLayer(const std::string& id) const {
177 0 : CheckThread();
178 :
179 0 : for (std::deque<TransportLayer *>::const_iterator it = layers_->begin();
180 0 : it != layers_->end(); ++it) {
181 0 : if ((*it)->id() == id)
182 0 : return *it;
183 : }
184 :
185 0 : return nullptr;
186 : }
187 :
188 0 : TransportLayer::State TransportFlow::state() {
189 0 : CheckThread();
190 :
191 0 : return state_;
192 : }
193 :
194 0 : TransportResult TransportFlow::SendPacket(const unsigned char *data,
195 : size_t len) {
196 0 : CheckThread();
197 :
198 0 : if (state_ != TransportLayer::TS_OPEN) {
199 0 : return TE_ERROR;
200 : }
201 0 : return top() ? top()->SendPacket(data, len) : TE_ERROR;
202 : }
203 :
204 0 : bool TransportFlow::Contains(TransportLayer *layer) const {
205 0 : if (layers_) {
206 0 : for (auto& l : *layers_) {
207 0 : if (l == layer) {
208 0 : return true;
209 : }
210 : }
211 : }
212 0 : return false;
213 : }
214 :
215 0 : void TransportFlow::EnsureSameThread(TransportLayer *layer) {
216 : // Enforce that if any of the layers have a thread binding,
217 : // they all have the same binding.
218 0 : if (target_) {
219 0 : const nsCOMPtr<nsIEventTarget>& lthread = layer->GetThread();
220 :
221 0 : if (lthread && (lthread != target_))
222 0 : MOZ_CRASH();
223 : }
224 : else {
225 0 : target_ = layer->GetThread();
226 : }
227 0 : }
228 :
229 0 : void TransportFlow::StateChangeInt(TransportLayer::State state) {
230 0 : CheckThread();
231 :
232 0 : if (state == state_) {
233 0 : return;
234 : }
235 :
236 0 : state_ = state;
237 0 : SignalStateChange(this, state_);
238 : }
239 :
240 0 : void TransportFlow::StateChange(TransportLayer *layer,
241 : TransportLayer::State state) {
242 0 : CheckThread();
243 :
244 0 : StateChangeInt(state);
245 0 : }
246 :
247 0 : void TransportFlow::PacketReceived(TransportLayer* layer,
248 : const unsigned char *data,
249 : size_t len) {
250 0 : CheckThread();
251 :
252 0 : SignalPacketReceived(this, data, len);
253 0 : }
254 :
255 : } // close namespace
|