W poprzednich częściach ([RX 1], [RX 2], [RX 3], [RX 4], [RX 5]) artykułu o Reactive Extensions dla .NET pojawiło się już dość sporo teorii, ale raczej mało przykładów, w tym wpisie chciałbym się skupić właśnie na przykładach bazujących na zdobytej uprzednio wiedzy.
Zacznijmy od zastanowienia się nad prostymi subskrypcjami. W przykładzie z wpisu pt.: "[RX 3] Reactive Extensions pierwszy kontakt z kodem", zasubskrybowaliśmy obiekt typu IObservable (funkcją Subscribe) i jako parametr przekazane zostało Console.WriteLine, czyli delegat wypisujący otrzymywane znaki na okno konsoli. Takie podejście mogło wywołać pewne zdezorientowanie, gdyż zgodnie z postem pt. "[RX 2] Kolekcje, to podstawa, czyli wprowadzenia do Reactive Extensions część 2" (w którym opisany był wzorzec obserwator) argumentem dla Subscribe powinna być instancja obiektu dziedziczącego po interfejsie IObserver. Przecież taka jest definicja Subscribe dla IObservable. RX pozwala jednak na pewne uproszczenia, otóż subskrybując można przekazać do Subscribe również delegat do obsługi OnNext lub nawet wszystkie trzy delegaty: OnNext, OnError, OnCompleted. Dzięki takiemu podejściu nie musimy sami tworzyć IObserver, a zostanie to zrobione "za nas", na podstawie przekazanych delegat, np.:
obs.Subscribe( x => Console.WriteLine( "Next Value: {0}", x ), x => Console.WriteLine( "Error, message: {0}", x.Message ), () => Console.WriteLine( "Koniec" ) )
Oczywiście można przekazać konkretny obiekt typu IObserver. np. dla poniższej definicji obserwatora:
class MyConsoleObserver<T>: IObserver<T> { void IObserver<T>.OnCompleted() { Console.ForegroundColor = ConsoleColor.Yellow; Console.WriteLine( "Koniec" ); Console.ResetColor(); } void IObserver<T>.OnError( Exception error ) { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine( "Error, message: {0}",error.Message ); Console.ResetColor(); } void IObserver<T>.OnNext( T value ) { Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine( "Next value: {0}", value ); Console.ResetColor(); } }
możemy napisać:
obs.Subscribe(new MyConsoleObserver<int>())
Co dla obserwowane kolekcji utworzonej przez:
IObservable<int> obs = Observable.Generate( 0, i => i < 10, i => i + 1, i => i, i => TimeSpan.FromMilliseconds( i * 100 ) );
da w wyniku:
Podobne doświadczenia można zrobić podstawiając w miejsce definicji obserwowanej kolekcji:
-
Observable.Empty<int>() - w ramach wyniku otrzymamy „Koniec”;
-
Observable.Return(777) - w ramach wyniku otrzymamy „Next Value: 777” i „Koniec”;
-
Observable.Thow<int>(new Exception("It is an error.")) – w ramach wyniku otrzymamy: „Error, message: It is an error.”;
-
Observable.Never<int>() - nic nie otrzymamy;
-
Observable.Range(0,2) – w ramach wyniku otrzymamy: „Next value: 0”, „Next value: 1”, „Koniec”.
Dużo ciekawszym może być jednak własnoręczne utworzenie IObservable, np.:
IObservable<int> obs = Observable.Create<int>( observer => { observer.OnNext( 77 ); observer.OnCompleted(); return ( () => { } ); } );
W powyższym przykładzie utworzyliśmy obserwowany element, który zwraca liczbę 77 i następnie koniec. W tym celu do metody Create przekazaliśmy delegację, która otrzymuje argument typu IObserver, wykonuje na nim odpowiednie (wymagane według naszych potrzeb) operacje i zwraca funkcję, która zostanie wykonana w momencie wykonania Dispose.
Można w ten sposób poeksperymentować z różnymi możliwościami RX. Zróbmy dla przykładu IObservable zwracające błąd, np.:
IObservable<int> obs = Observable.Create<int>( observer => { for ( int i = 0; i < 10; i++ ) { if ( i > 5 ) observer.OnError( new Exception( "Element greater than 5 is wrong" ) ); observer.OnNext( i ); } observer.OnCompleted(); return ( () => { } ); } );
co w wyniku subskrypcji:
using ( obs.Subscribe( new MyConsoleObserver<int>() ) ) { Console.WriteLine( "Hit Enter to finish" ); Console.ReadLine(); }
da nam:
Next value: 0 Next value: 1 Next value: 2 Next value: 3 Next value: 4 Next value: 5 Error, message: Element greater than 5 is wrong Hit Enter to finish
..., ale RX pozwala obsługiwać wyjątki i dzięki zmianie subskrypcji:
obs.Catch(Observable.Return(0)).Subscribe( new MyConsoleObserver<int>() )
otrzymamy:
Next value: 0 Next value: 1 Next value: 2 Next value: 3 Next value: 4 Next value: 5 Next value: 0 Koniec Hit Enter to finish
Tak naprawdę Catch przyjmuje jako argument IObservable (w tym przykładzie było to Observable.Return(0)), które zostanie zwrócone po wystąpieniu błędu). Można taki mechanizm wykorzystać, np. do przełączenia z jednego obserwowanego źródła na inne (np. czytamy dane z jednej lokalizacji sieciowej, występuje błąd i przełączamy się na drugą). Wspomnianą obsługę błędów można jeszcze rozszerzyć o mechanizm powtarzania po błędzie, np. powtarzamy 3 razy i jeżeli nadal wystąpi błąd, to zwracamy informacje:
obs.Retry(3).Catch( Observable.Throw<int>(new Exception("I have tried 3 times. I give up.")) ).Subscribe( new MyConsoleObserver<int>() )
Co da w wyniku:
Next value: 0 Next value: 1 Next value: 2 Next value: 3 Next value: 4 Next value: 5 Next value: 0 Next value: 1 Next value: 2 Next value: 3 Next value: 4 Next value: 5 Next value: 0 Next value: 1 Next value: 2 Next value: 3 Next value: 4 Next value: 5 Error, message: I have tried 3 times. I give up. Hit Enter to finish
RX oferuje również bardzo wygodą obsługę timeout'ów, np. spójrzmy na poniższy przykład:
IObservable<int> obs = Observable.Create<int>( observer => { for ( int i = 0; i < 10; i++ ) { System.Threading.Thread.Sleep( i * 100 ); observer.OnNext( i ); } observer.OnCompleted(); return ( () => { } ); } ); using ( obs.Timeout(TimeSpan.FromMilliseconds(500)).Subscribe( new MyConsoleObserver<int>() ) ) { Console.WriteLine( "Hit Enter to finish" ); Console.ReadLine(); }
Po którego wykonaniu, otrzymujemy:
Next value: 0 Next value: 1 Next value: 2 Next value: 3 Next value: 4 Next value: 5 Error, message: The operation has timed out. Hit Enter to finish
Mam nadzieję, że Ci wszyscy, którzy szukali do tej pory jakichś konkretnych przykładów, są tym razem usatysfakcjonowani. Zachęcam do dalszych samodzielnych eksperymentów oraz do czytania kolejnych części artykułów już wkrótce ...
Brak komentarzy:
Prześlij komentarz