Using Reactive Extensions for Streaming Data from Database

You have probably heard about Reactive Extensions, a library from Microsoft that greatly simplifies working with asynchronous data streams and allows to query them with Linq operators. There are many different scenarios where using rx results in a much more simple and flexible code. This post demonstrates how to use reactive extensions for loading data from database asynchronously in chunks.

The Problem

Recently I had to load data from sqlite database in grid view but the query was taking long time as there were hundreds of thousands of rows and the query was doing like %search term% filtering. The customer wanted to run the query in background and load data in chunks as they became available similar to sql server management studio. This way the grid would show first 200 available rows in the beginning and then when the next 200 rows were loaded they would be added to the grid and so on until the query completes. Here are some screenshots that demonstrate the desired functionality:

Solution without Reactive Extensions

The most straightforward way to solve the problem is to call the method that reads the data in background thread and send the data to main thread when there are enough rows. Here is how to do it in WinForms application using backgroundworker:

internal class CompanyDal
{
    public IEnumerable<Company> GetCompanies(string title)
    {
        using (var connection = new SQLiteConnection(connectionString))
        {
            using (var cmd = connection.CreateCommand())
            {
                cmd.CommandText =
                    "select company, category, state, phone from maindata where company like @name";
                cmd.Parameters.AddWithValue("@name", String.Format("%{0}%", title));

                connection.Open();

                var reader = cmd.ExecuteReader();

                using (reader)
                {
                    while (reader.Read())
                    {
                        var company = new Company
                        {
                            Title = reader.GetString(0),
                            Category = reader.GetString(1),
                            State = reader.GetString(2),
                            Phone = reader.GetString(3)
                        };

                        yield return company;
                    }
                }
            }
        }
    }
}
private void WorkerDoWork(object sender, DoWorkEventArgs e)
        {
            var data = new List<Company>();

            foreach (var company in companyDal.GetCompanies(e.Argument.ToString()))
            {
                data.Add(company);

                if (data.Count % BufferSize == 0)
                {
                    worker.ReportProgress(0, data.ToArray());
                    data.Clear();
                }
            }
            worker.ReportProgress(0, data.ToArray());
        }

        private void WorkerProgressChanged(object sender, ProgressChangedEventArgs e)
        {
            var data = e.UserState as IEnumerable<Company>;

            if (data != null)
            {
                var index = Math.Max(companyGridView.FirstDisplayedScrollingRowIndex, 0);

                companies.AddRange(data);
                companyBindingSource.ResetBindings(false);
                progressLabel.Text = string.Format("Loaded {0} rows", companies.Count);

                companyGridView.FirstDisplayedScrollingRowIndex = index;
            }
        }

The solution works but it can easily become complicated if you want to have more control when the data is loaded or want to be able to stop loading data. For example if you want to load next chunk every 2 seconds or load data when user clicks load button then most part of the code will be dealing with thread synchronization and it will be difficult to understand what is the code really doing. Let’s see how reactive extensions can help.

Reactive Extensions – Simplifying the Solution

To solve the problem with rx first need to convert the source collection which in our case is matching companies to observable stream. We can do it with ToObservable method. We want the query to run on background thread so we pass a thread pool scheduler as a parameter. Now we need to tell the stream that we want to get data in chunks. We do it by calling Observable.Buffer method and pass desired chunk size. As we want the observer to provide notifications on the gui thread we call ObserveOn method and pass SynchronizationContext.Current. Finally we subscribe to the observable to actually process the data:

private void SearchButtonClick(object sender, EventArgs e)
{
    companies.Clear();
    companyBindingSource.ResetBindings(false);

    stopButton.Enabled = true;
    searchButton.Enabled = false;
    progressLabel.Text = string.Format("Searching for {0} ...", searchBox.Text);

    companyDal.GetCompanies(searchBox.Text).ToObservable(Scheduler.ThreadPool).
        Buffer(BufferSize).ObserveOn(SynchronizationContext.Current).
        Subscribe(loadedData =>
        {
            var index = Math.Max(companyGridView.FirstDisplayedScrollingRowIndex, 0);

            companies.AddRange(loadedData);
            companyBindingSource.ResetBindings(false);
            progressLabel.Text = string.Format("Loaded {0} rows", companies.Count);

            companyGridView.FirstDisplayedScrollingRowIndex = index;
        },
  exception => { progressLabel.Text = exception.Message; },
  () => { progressLabel.Text = "Finished loading data"; });
}

We now have basic solution implemented with just a few lines of codes. The important points are that there is practically no threading code, no need to implement partial loading manually and as we will see the above code is easy to extend.

Extending Reactive Extensions Solution

First of all we want to toggle search and stop button state when the search is completed. We could do that in onError and onCompleted callbacks but that would result in duplicated code. Instead we can use Observable.Finally method which executes in both cases. As we want the Finally callback to execute on the gui thread too we append it to ObserveOn method call.

Secondly we want the query to stop when stop button is clicked. The most straightforward way is to store Subscribe method return value in a field and dispose subscription when stop button is clicked but as you have probably guessed there is ‘reactive’ way to do the same. In the form constructor we create an observable from the stop button click event by using Observable.FromEventPattern method. We then combine it with our data stream by TakeUntil method which tells the data stream to return records until the click event observable produces a value:

public MainForm()
{
    InitializeComponent();

    companyBindingSource.DataSource = companies;
    stopClicked = Observable.FromEventPattern<EventArgs>(stopButton, "Click");
}

private void SearchButtonClick(object sender, EventArgs e)
{
    companies.Clear();
    companyBindingSource.ResetBindings(false);

    stopButton.Enabled = true;
    searchButton.Enabled = false;
    progressLabel.Text = string.Format("Searching for {0} ...", searchBox.Text);

    companyDal.GetCompanies(searchBox.Text).ToObservable(Scheduler.ThreadPool).
        Buffer(BufferSize).TakeUntil(stopClicked).
        ObserveOn(SynchronizationContext.Current).
        Finally(() =>
        {
            searchButton.Enabled = true;
            stopButton.Enabled = false;
        }).
        Subscribe(loadedData =>
        {
            var index = Math.Max(companyGridView.FirstDisplayedScrollingRowIndex, 0);

            companies.AddRange(loadedData);
            companyBindingSource.ResetBindings(false);
            progressLabel.Text = string.Format("Loaded {0} rows", companies.Count);

            companyGridView.FirstDisplayedScrollingRowIndex = index;
        },
  exception => { progressLabel.Text = exception.Message; },
  () => { progressLabel.Text = "Finished loading data"; });
}

Finally let’s have a look at how easily we can change buffering logic. The buffer method has many overloads so if we want to buffer data based on time spans we can simply use the overload that takes a timespan instance as a parameter. We can also combine buffer size with timespan so that buffer is created when any of the conditions are met. To show a little but more useful example imagine that we want to load next chunk of data when user clicks load button. We can achieve it by using buffer overload which takes another observable indicating when to create next buffer. As with stop button we first create an observable from load button click event and then we combine our observables like this:

private void SearchButtonClick(object sender, EventArgs e)
{
    companies.Clear();
    companyBindingSource.ResetBindings(false);

    searchButton.Enabled = false;
    loadButton.Enabled = stopButton.Enabled = true;
    progressLabel.Text = string.Format("Searching for {0} ...", searchBox.Text);

    companyDal.GetCompanies(searchBox.Text).ToObservable(Scheduler.ThreadPool).
        Buffer(() => loadClicked).TakeUntil(stopClicked).
        ObserveOn(SynchronizationContext.Current).
        Finally(() =>
        {
            searchButton.Enabled = true;
            loadButton.Enabled = stopButton.Enabled = false;
        }).
        Subscribe(loadedData =>
        {
            var index = Math.Max(companyGridView.FirstDisplayedScrollingRowIndex, 0);

            companies.AddRange(loadedData);
            companyBindingSource.ResetBindings(false);
            progressLabel.Text = string.Format("Loaded {0} rows", companies.Count);

            companyGridView.FirstDisplayedScrollingRowIndex = index;
        },
  exception => { progressLabel.Text = exception.Message; },
  () => { progressLabel.Text = "Finished loading data"; });
}

Conclusion

As we can see reactive extensions enables us to solve complex problems very easily. It makes working with asynchronous streams simple and allow us to concentrate on business logic instead of managing threads manually. For more information about rx and its internals I highly recommend Channel 9videos.

Have you used rx to solve real life problems? Did you it help you implement complex solutions in a concise way? Share your experience and thoughts about it in comments.

Avatar
Giorgi Dalakishvili
World-Class Software Engineer

Related