Wednesday, 18 September 2013

Is TakeWhile(...) and etc. extension methods thread safe in Rx 1.0?

Is TakeWhile(...) and etc. extension methods thread safe in Rx 1.0?

I have an event source which fired by a Network I/O very frequently, based
on underlying design, of course the event was always on different thread
each time, now I wrapped this event via Rx with:
Observable.FromEventPattern(...), now I'm using the TakeWhile(predict) to
filter some special event data. At now, I have some concerns on its thread
safety, the TakeWhile(predict) works as a hit and mute, but in concurrent
situation, can it still be guaranteed? because I guess the underlying
implementation could be(I can't read the source code since it's too
complicated...):
public static IObservable<TSource> TakeWhile<TSource>(this
IObservable<TSource> source, Func<TSource, bool> predict)
{
ISubject<TSource> takeUntilObservable = new
TempObservable<TSource>();
IDisposable dps = null;
// 0 for takeUntilObservable still active, 1 for predict failed,
diposed and OnCompleted already send.
int state = 0;
dps = source.Subscribe(
(s) =>
{
/* NOTE here the 'hit and mute' still not thread safe,
one thread may enter 'else' and under CompareExchange,
but meantime another thread may passed the predict(...)
and calling OnNext(...)
* so the CompareExchange here mainly for avoid multiple
time call OnCompleted() and Dispose();
*/
if (predict(s) && state == 0)
{
takeUntilObservable.OnNext(s);
}
else
{
// !=0 means already disposed and OnCompleted send,
avoid multiple times called via parallel threads.
if (0 == Interlocked.CompareExchange(ref state, 1, 0))
{
try
{
takeUntilObservable.OnCompleted();
}
finally
{
dps.Dispose();
}
}
}
},
() =>
{
try
{
takeUntilObservable.OnCompleted();
}
finally { dps.Dispose(); }
},
(ex) => { takeUntilObservable.OnError(ex); });
return takeUntilObservable;
}
That TempObservable is just a simple implementation of ISubject.
If my guess reasonable, then seems the thread safety can't be guaranteed,
means some unexpected event data may still incoming to OnNext(...) because
that 'mute' is still on going. Then I write a simple testing to verify,
but out of expectation, the results are all positive:
public class MultipleTheadEventSource
{
public event EventHandler OnSthNew;
int cocurrentCount = 1000;
public void Start()
{
for (int i = 0; i < this.cocurrentCount; i++)
{
int j = i;
ThreadPool.QueueUserWorkItem((state) =>
{
var safe = this.OnSthNew;
if (safe != null)
safe(j, null);
});
}
}
}
[TestMethod()]
public void MultipleTheadEventSourceTest()
{
int loopTimes = 10;
int onCompletedCalledTimes = 0;
for (int i = 0; i < loopTimes; i++)
{
MultipleTheadEventSource eventSim = new
MultipleTheadEventSource();
var host = Observable.FromEventPattern(eventSim, "OnSthNew");
host.TakeWhile(p => { return int.Parse(p.Sender.ToString()) <
110; }).Subscribe((nxt) =>
{
//try print the unexpected values, BUT I Never saw it
happened!!!
if (int.Parse(nxt.Sender.ToString()) >= 110)
{
this.testContextInstance.WriteLine(nxt.Sender.ToString());
}
}, () => { Interlocked.Increment(ref onCompletedCalledTimes); });
eventSim.Start();
}
// simply wait everything done.
Thread.Sleep(60000);
this.testContextInstance.WriteLine("onCompletedCalledTimes: " +
onCompletedCalledTimes);
}
before I do the testing, some friends here suggest me try to use
Synchronize<TSource> or ObserveOn to make it thread safe, so any idea on
my proceeding thoughts and why the issue not reproduced?

No comments:

Post a Comment