The flatten operator currently calls onCompleted as soon as the
parent signals that it has completed. However, there could be
more data incoming from the returned observables; in fact, the
flattened observable might even be infinite.
Track how many subscriptions are still active, and only call
onCompleted at the time of the last completion.
Basically all tests are rewritten here. Implementation details should
not be checked because tests are becoming fragile when it comes to any
refactoring.
What instead should be tested is the actual **public behavior** of the
library when using its public interfaces.
While doing all these changes the following assumptions about this codebase
have been made:
- class fields which are prefixed with underscore (like `observer._onNext`)
are intended to be private and thus should only be accessed by members of
the same class or its subclasses
- `observer.stopped` is a public **readonly** field and should never be
modified by the outside world, because its role is considered to be
purely informational
If the above assumptions are not correct, this will need to be discussed
further. These improvements of the tests are necessary to introduce any
changes which include refactoring of the `Observer` in order to fix some
bugs.
Fixed wrong test codes with the assertion.
Changed the behaviors of the following functions caused by argument types, to raise an error in the creation phase.
- Observable.defer
- Observable:buffer
- Observable:elementAt
- Observable:skipLast
- Observable:takeLast
- Observable:window
The merge operator removed the source observable if it complete.
After that the subscriptions after that source observable index can not unsubscribe.
Fix the issue by recording the completed source index, as combineLatest operator does.
Currently, if a function is passed to Observable.fromCoroutine, a
coroutine is created and it is shared among all Observers. Instead,
use a fresh coroutine for each Observer so all Observers receive the
same values.
When creating an Observable using an existing coroutine, this isn't
possible because the Observable is stateful (calling coroutine.resume
won't always yield the same values).