Using Reactive Extensions for Streaming Data from Database

You have probably heard about Reactive Extensions, a library from Microsoft that greatly simplifies working with asynchronous data streams and allows to query them with Linq operators. There are many different scenarios where using rx results in a much more simple and flexible code. This post demonstrates how to use reactive extensions for loading data from database asynchronously in chunks.

The Problem

Recently I had to load data from sqlite database in grid view but the query was taking long time as there were hundreds of thousands of rows and the query was doing like %search term% filtering. The customer wanted to run the query in background and load data in chunks as they became available similar to sql server management studio. This way the grid would show first 200 available rows in the beginning and then when the next 200 rows were loaded they would be added to the grid and so on until the query completes. Here are some screenshots that demonstrate the desired functionality:

Initial search screen


Loaded 200 rows


Loaded 400 rows


Loaded 600 rows


Finished searching

Solution without Reactive Extensions

The most straightforward way to solve the problem is to call the method that reads the data in background thread and send the data to main thread when there are enough rows. Here is how to do it in WinForms application using backgroundworker:

internal class CompanyDal
{
    public IEnumerable<Company> GetCompanies(string title)
    {
        using (var connection = new SQLiteConnection(connectionString))
        {
            using (var cmd = connection.CreateCommand())
            {
                cmd.CommandText =
                    "select company, category, state, phone from maindata where company like @name";
                cmd.Parameters.AddWithValue("@name", String.Format("%{0}%", title));

                connection.Open();

                var reader = cmd.ExecuteReader();

                using (reader)
                {
                    while (reader.Read())
                    {
                        var company = new Company
                        {
                            Title = reader.GetString(0),
                            Category = reader.GetString(1),
                            State = reader.GetString(2),
                            Phone = reader.GetString(3)
                        };

                        yield return company;
                    }
                }
            }
        }
    }
}
        private void WorkerDoWork(object sender, DoWorkEventArgs e)
        {
            var data = new List<Company>();

            foreach (var company in companyDal.GetCompanies(e.Argument.ToString()))
            {
                data.Add(company);

                if (data.Count % BufferSize == 0)
                {
                    worker.ReportProgress(0, data.ToArray());
                    data.Clear();
                }
            }
            worker.ReportProgress(0, data.ToArray());
        }

        private void WorkerProgressChanged(object sender, ProgressChangedEventArgs e)
        {
            var data = e.UserState as IEnumerable<Company>;

            if (data != null)
            {
                var index = Math.Max(companyGridView.FirstDisplayedScrollingRowIndex, 0);

                companies.AddRange(data);
                companyBindingSource.ResetBindings(false);
                progressLabel.Text = string.Format("Loaded {0} rows", companies.Count);

                companyGridView.FirstDisplayedScrollingRowIndex = index;
            }
        }

The solution works but it can easily become complicated if you want to have more control when the data is loaded or want to be able to stop loading data. For example if you want to load next chunk every 2 seconds or load data when user clicks load button then most part of the code will be dealing with thread synchronization and it will be difficult to understand what is the code really doing. Let’s see how reactive extensions can help.

Reactive Extensions – Simplifying the Solution

To solve the problem with rx first need to convert the source collection which in our case is matching companies to observable stream. We can do it with ToObservable method. We want the query to run on background thread so we pass a thread pool scheduler as a parameter. Now we need to tell the stream that we want to get data in chunks. We do it by calling Observable.Buffer method and pass desired chunk size. As we want the observer to provide notifications on the gui thread we call ObserveOn method and pass SynchronizationContext.Current. Finally we subscribe to the observable to actually process the data:

private void SearchButtonClick(object sender, EventArgs e)
{
    companies.Clear();
    companyBindingSource.ResetBindings(false);

    stopButton.Enabled = true;
    searchButton.Enabled = false;
    progressLabel.Text = string.Format("Searching for {0} ...", searchBox.Text);

    companyDal.GetCompanies(searchBox.Text).ToObservable(Scheduler.ThreadPool).
        Buffer(BufferSize).ObserveOn(SynchronizationContext.Current).
        Subscribe(loadedData =>
        {
            var index = Math.Max(companyGridView.FirstDisplayedScrollingRowIndex, 0);

            companies.AddRange(loadedData);
            companyBindingSource.ResetBindings(false);
            progressLabel.Text = string.Format("Loaded {0} rows", companies.Count);

            companyGridView.FirstDisplayedScrollingRowIndex = index;
        },
  exception => { progressLabel.Text = exception.Message; },
  () => { progressLabel.Text = "Finished loading data"; });
}

We now have basic solution implemented with just a few lines of codes. The important points are that there is practically no threading code, no need to implement partial loading manually and as we will see the above code is easy to extend.

Extending Reactive Extensions Solution

First of all we want to toggle search and stop button state when the search is completed. We could do that in onError and onCompleted callbacks but that would result in duplicated code. Instead we can use Observable.Finally method which executes in both cases. As we want the Finally callback to execute on the gui thread too we append it to ObserveOn method call.

Secondly we want the query to stop when stop button is clicked. The most straightforward way is to store Subscribe method return value in a field and dispose subscription when stop button is clicked but as you have probably guessed there is ‘reactive’ way to do the same. In the form constructor we create an observable from the stop button click event by using Observable.FromEventPattern method. We then combine it with our data stream by TakeUntil method which tells the data stream to return records until the click event observable produces a value:

public MainForm()
{
    InitializeComponent();

    companyBindingSource.DataSource = companies;
    stopClicked = Observable.FromEventPattern<EventArgs>(stopButton, "Click");
}

private void SearchButtonClick(object sender, EventArgs e)
{
    companies.Clear();
    companyBindingSource.ResetBindings(false);

    stopButton.Enabled = true;
    searchButton.Enabled = false;
    progressLabel.Text = string.Format("Searching for {0} ...", searchBox.Text);

    companyDal.GetCompanies(searchBox.Text).ToObservable(Scheduler.ThreadPool).
        Buffer(BufferSize).TakeUntil(stopClicked).
        ObserveOn(SynchronizationContext.Current).
        Finally(() =>
        {
            searchButton.Enabled = true;
            stopButton.Enabled = false;
        }).
        Subscribe(loadedData =>
        {
            var index = Math.Max(companyGridView.FirstDisplayedScrollingRowIndex, 0);

            companies.AddRange(loadedData);
            companyBindingSource.ResetBindings(false);
            progressLabel.Text = string.Format("Loaded {0} rows", companies.Count);

            companyGridView.FirstDisplayedScrollingRowIndex = index;
        },
  exception => { progressLabel.Text = exception.Message; },
  () => { progressLabel.Text = "Finished loading data"; });
}

Finally let’s have a look at how easily we can change buffering logic. The buffer method has many overloads so if we want to buffer data based on time spans we can simply use the overload that takes a timespan instance as a parameter. We can also combine buffer size with timespan so that buffer is created when any of the conditions are met. To show a little but more useful example imagine that we want to load next chunk of data when user clicks load button. We can achieve it by using buffer overload which takes another observable indicating when to create next buffer. As with stop button we first create an observable from load button click event and then we combine our observables like this:

private void SearchButtonClick(object sender, EventArgs e)
{
    companies.Clear();
    companyBindingSource.ResetBindings(false);

    searchButton.Enabled = false;
    loadButton.Enabled = stopButton.Enabled = true;
    progressLabel.Text = string.Format("Searching for {0} ...", searchBox.Text);

    companyDal.GetCompanies(searchBox.Text).ToObservable(Scheduler.ThreadPool).
        Buffer(() => loadClicked).TakeUntil(stopClicked).
        ObserveOn(SynchronizationContext.Current).
        Finally(() =>
        {
            searchButton.Enabled = true;
            loadButton.Enabled = stopButton.Enabled = false;
        }).
        Subscribe(loadedData =>
        {
            var index = Math.Max(companyGridView.FirstDisplayedScrollingRowIndex, 0);

            companies.AddRange(loadedData);
            companyBindingSource.ResetBindings(false);
            progressLabel.Text = string.Format("Loaded {0} rows", companies.Count);

            companyGridView.FirstDisplayedScrollingRowIndex = index;
        },
  exception => { progressLabel.Text = exception.Message; },
  () => { progressLabel.Text = "Finished loading data"; });
}

Conclusion

As we can see reactive extensions enables us to solve complex problems very easily. It makes working with asynchronous streams simple and allow us to concentrate on business logic instead of managing threads manually. For more information about rx and its internals I highly recommend Channel 9videos.

Have you used rx to solve real life problems? Did you it help you implement complex solutions in a concise way? Share your experience and thoughts about it in comments.

Kick It on DotNetKicks.com
Share

, , ,

  • Pingback: Interesting .NET Links - April 3 , 2012 | Tech Blog

  • Pingback: Using Reactive Extensions for Streaming Data from Database

  • Cooper_3

    can you share sample application?

  • Pingback: friday links 25 « A Programmer with Microsoft tools

  • http://twitter.com/sinclairinator Jeremy Sinclair

    In regards to using RX as opposed to the ASync CTP, which do you think is more efficient?

  • Walter Quesada

    I wonder if there would be any issues running it with PLINQ, something like … companyDal.GetCompanies(searchBox.Text).AsParallel().ToObservable(Scheduler.ThreadPool)… is the ToObservable extension method even available on IParallelEnumerable? Just my initial thoughts, would be interesting to find out.

    • http://www.aboutmycode.com/ Giorgi Dalakishvili

      I don’t think it makes much sense to use rx with PLINQ in this situation as PLINQ does not execute sql queries in parallel. PLINQ is for using linq syntax to parallelize some work.

      • Walter Quesada

        Yes, you are correct that in this situation, this particular example, it doesn’t make sense but if we were to actually do some work in each iteration, adding parallelism definitely makes sense. I was just curious as to how rx would react working with plinq. I actually just downloaded your example, tied it to EF and added AsParallel() and it ran fine. Next I think I’ll throw in some loops to emulate workers and run some benchmarks… Just having some fun here…

  • Pingback: C# | Pearltrees

  • Andre van Delft

    The reactive extension seems to be an improvement, but for me the control flow is still unclear in the text. I don’t like callbacks that much, and I find the statement from line 18 to 37 far long. IMHO Edsger Dijkstra would say that the static structure of the code (the text) does not match the dynamic structure (the execution) well.

    I would suggest a totally different approach, using the Algebra of Communicating Processes (ACP).  In short, ACP treats sequences, choices and parallelism in the same simple way as normal programming languages treat sequences. Likewise, input events are treated on the same level as internal actions and output actions. It is a bit like BNF and YACC, extended with parallelism and without the restriction to text input.

    I am working on an ACP DSL for Scala. Two weeks ago I presented a research paper on this at Scala Days 2012. The main example I gave is a “Search” application that matches closely this “Streaming Data from Database” example. 

    The video of my talk is here:
    http://skillsmatter.com/podcast/scala/subscript-extending-scala-with-the-algebra-of-communicating-processes
    The slides and the paper are here:
    http://code.google.com/p/subscript/downloads/list

    The ACP DSL is still not ready for real world application development; the algebraic expression syntax is not yet supported, but an internal Scala DSL will give a good impression to programming language enthusiasts. After this Scala version I would like to support Java, JavaScript and C# as base languages. I am looking for participants in my Google Code project, so please let me know if you are interested.

  • https://twitter.com/memark Magnus Markling

    This line
    .Buffer(() => loadClicked).TakeUntil(stopClicked)
    is really fantastic. It’ll be my take-away from this article!

More in .Net Framework, c#, reactive extensions
Building Expression Evaluator with Expression Trees in C# – Part 3

In part two of this series we built an expression evaluator capable of parsing expressions with parentheses. In this part we are going to add support for expression with variables.

Building Expression Evaluator with Expression Trees in C# – Part 2

Introduction In previous post I showed how to build a simple expression evaluator using expression trees. Although it works fine...