Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.

Simple Blocking Queue for Thread Communication and Inter-thread Invocation

4.98/5 (20 votes)
23 Jan 2011CPOL4 min read 82.7K  
A generic class to be used as a conveyor for data and tasks between threads

I want to provide this advice on thread communication, because I already gave several answers on the use of threads to CodeProject members. I post my Answers to Questions found in Question & Answers section of the site; and usually my answers are accepted very well. The only problem is that many questions cause me to reproduce nearly the same ideas over and over. I need to place some code and explanation in a permanent place and refer to it. At the same time, the techniques I offer may not fit the format of whole article. So, I think the form of Tips/Tricks will serve my purpose well.

Here is the most typical situation. A developer creates some UI, most typically with System.Windows.Forms; WFP is less usual at this time. Some UI action requires long time processing; this is where the handling of the UI event does not work well. In this case, some developers don't know what to do, and some use threads, but try to create a thread repeatedly, using Background worker, created a regular threads or a thread from thread pool. In simple cases this is enough, but in massive processing and more complex logics the problem is the total number of threads that becomes unpredictable. Of course, the extra cost of repeated creation and later abandoning of a thread can easily cause a performance hit, and more importantly, a problem of clean-up (and generally, of support).

 

In many such cases, I advice to create a fixed number of threads and permanently keep them running, posting some task to such thread when needed. What is relatively difficult to explain is how to keep a thread in a wait state (wasting zero CPU time) between tasks. It also takes some nerve to explain why spin wait is wrong.

 

Another problem which seems to be different is a desire to implement behavior similar to System.Windows.Forms.Control.Invoke or System.Windows.Threading.Dispatcher. There is a popular misconception about this class, caused by the method System.Windows.Threading.Dispatcher.GetCurrectDispatcher. It always works, but the Invoke method can only be called on the target thread if this thread is designed in a special way. In practice, such inter-thread processing really takes place if the target thread if the specially designed UI thread created in the UI cycle of Window.Form.Application or WPF Application. The problem is that the class Dispatcher is sealed. So, the question is how to provide similar functionality a custom thread.

 

The two groups of problems depicted above can be resolved using some queue encapsulated in the thread used as a target of the invocation. First, I made a simple blocking queue class to provide both data transport and synchronization between threads:

 

C#
using System;
using System.Threading;
using System.Collections.Generic;

public class DataQueue<ITEM> {

    public DataQueue() { this.IsBlocking = true; }
    public DataQueue(bool blocking) { this.IsBlocking = blocking; }

    public EventWaitHandle WaitHandle { get { return FWaitHandle; } }

    public void Close() {
        this.FWaitHandle.Set();
    } //Close

    public void Submit(ITEM item) {
        lock (LockObject) 
            Queue.Enqueue(item);
        FWaitHandle.Set();
    } //Submit

    public ITEM GetMessage() {
        if (IsBlocking)
            FWaitHandle.WaitOne();
        ITEM result = default(ITEM);
        lock (LockObject) {
            if (Queue.Count > 0)
                result = Queue.Dequeue();
            if (IsBlocking && (Queue.Count < 1))
                FWaitHandle.Reset();
            return result;
        } //lock
    } //GetMessage

    public void Clear() {
        lock (LockObject)
            Clear(Queue);
    } //Clear
    public void Clear(Func<ITEM, bool> removalCriterion) {
        lock (LockObject)
            Clear(Queue, removalCriterion);
    } //Clear

    #region implementation
    
    static void Clear(Queue<ITEM> queue) { queue.Clear(); }
    static void Clear(Queue<ITEM> queue, Func<ITEM, bool> removalCriterion) {
        if (removalCriterion == null) {
            queue.Clear();
            return;
        } //if
        Queue<ITEM> copy = new Queue<ITEM>();
        while (queue.Count > 0) {
            ITEM item = queue.Dequeue();
            if (!removalCriterion(item))
                copy.Enqueue(item);
        } //loop
        while (copy.Count > 0)
            queue.Enqueue(copy.Dequeue());
    } //Clear

    bool IsBlocking;
    Queue<ITEM> Queue = new Queue<ITEM>();
    
    //SA!!! important ManualReset.
    //See GetMessage for re-setting
    EventWaitHandle FWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
    Object LockObject = new object();

    #endregion implementation

} //DataQueue

 

 

 

 

I recommend using blocking operation nearly in all cases, otherwise the mechanism would behave like spin wait, causing unwanted abuse of the CPU.

 

 

 

Here is the simple sample of the target thread and the usage, a Console application:

 

 

 

C#
using System;
using System.Threading;

class QueuedThread {
    internal QueuedThread() { this.Thread = new Thread(Body); }
    internal void Start() { this.Thread.Start(); }
    internal void Abort() { //be extra careful!
        this.Queue.Close();
        this.Thread.Abort();
    } //Abort
    internal void PostMessage(string message) {
        this.Queue.Submit(message);
    } //void PostMessage
    #region implementaion
    void Body() {
        try {
            while (true) {
                string message = Queue.GetMessage();
                //safe way to exit, not alway possible
                if (message == null)
                    break;
                ProcessMessage(message);
            } //loop
        } catch (ThreadAbortException) {
            //Very important in case Abort is used
            //Provide last-will processing here
            //even if Abort is not a designed option
        } catch (Exception e) { //always needed
            Console.WriteLine("{0}: {1}", e.GetType().Name, e.Message);
        } //exception
    } //Body
    void ProcessMessage(string message) {
        Console.WriteLine("message Processed {0}", message);
    } //ProcessMessage
    Thread Thread;
    DataQueue<string> Queue = new DataQueue<string>();
    #endregion implementaion
} //class QueuedThread

class Program {

    void Run() {
        QueuedThread t = new QueuedThread();
        t.Start();
        t.PostMessage("first");
        Thread.Sleep(400);
        t.PostMessage("second");
        Thread.Sleep(400);
        t.PostMessage("third");
        Console.WriteLine();
        Console.WriteLine("Finished. Press any key...");
        Console.ReadKey(true);
        //safest way of exiting of the thread,
        //unfortunatelly, not applicable to 100% cases:
        t.PostMessage(null);
        //less safe way, need extra care in thread implementation,
        //safe enough for this sample though:
        //t.Abort();
    } //Run

    static void Main(string[] args) {
        new Program().Run();
    } //Main

} //class Program

 

 

I don't want to go deeper into the relatively complex issue related to acceptance of Abort method (see sample code). Sometimes, such discussions lead to flame wars. The final decision should be made by the developer who wants to use my advice (and, generally, any developer using threads). Anyway, when a data queue is used, Abort could easily be avoided using a special “termination” message (like an empty string in my simple example above).

 

 

Finally, one can also use delegates to implement processing tasks instead just of data elements:

 

The above may appear to be not flexible enough. The thread needs to know what to do with the data item; hence ProcessMessage is just one fixed method. Here is when inter-thread invocation similar to Dispatcher should come into play.

 

So, how about the Delegate invocation? Well, nothing prevents adding a delegate to the ITEM type. This is a just bit more complex. The first thing to understand is that a parameter (or parameters) for delegate invocation should be supplied as well. This is a minimalistic sample, again a Console application:

 

C#
using System;
using System.Threading;

class DelegateInvocationThread {

    class ActionWithParameter<PARAMETER> {
        internal ActionWithParameter(Action<PARAMETER> action, PARAMETER parameterValue) {
            this.Action = action;
            this.Parameter = parameterValue;
        } //ActionWithParameter
        internal static ActionWithParameter<PARAMETER> ExitAction { get { return FExitAction; } }
        internal void Invoke() {
            if (Action != null)
                Action(Parameter);
        } //Invoke
        internal bool ExitCondition { get { return Action == null || Parameter == null; } }
        Action<PARAMETER> Action;
        PARAMETER Parameter;
        static ActionWithParameter<PARAMETER> FExitAction =
            new ActionWithParameter<PARAMETER>(null, default(PARAMETER));
    } //ActionWithParameter

    internal DelegateInvocationThread() { this.Thread = new Thread(Body); }
    internal void Start() { this.Thread.Start(); }
    internal void Invoke(Action<string> action, string parameter) {
        Queue.Submit(new ActionWithParameter<string>(action, parameter));
    } //Invoke
    internal void InvokeExit() {
        Queue.Submit(ActionWithParameter<string>.ExitAction);
    } //InvokeExit()
    internal void Abort() { //be extra careful!
        this.Queue.Close();
        this.Thread.Abort();
    } //Abort
    
    #region implementaion
    void Body() {
        try {
            while (true) {
                ActionWithParameter<string> action = Queue.GetMessage();
                //safe way to exit, not alway possible
                if (action.ExitCondition)
                    break;
                action.Invoke();
            } //loop
        } catch (ThreadAbortException) {
            //Very important in case Abort is used
            //Provide last-will processing here
            //even if Abort is not a designed option
        } catch (Exception e) { //always needed
            Console.WriteLine("{0}: {1}", e.GetType().Name, e.Message);
        } //exception
    } //Body
    void ProcessMessage(string message) {
        Console.WriteLine("message Processed {0}", message);
    } //ProcessMessage
    Thread Thread;
    DataQueue<ActionWithParameter<string>> Queue = 
        new DataQueue<ActionWithParameter<string>>();
    #endregion implementaion
} //class DelegateInvocationThread

class Program {

    void Run() {
        DelegateInvocationThread t = new DelegateInvocationThread();
        t.Start();
        t.Invoke(
            delegate(string value) {
                Console.WriteLine(string.Format("Method #1, parameter: {0}", value));
            }, "first");
        Thread.Sleep(400);
        t.Invoke(
            delegate(string value) {
                Console.WriteLine(string.Format("Method #2, parameter: {0}", value));
            }, "second");
        Thread.Sleep(400);
        t.Invoke(
            delegate(string value) {
                Console.WriteLine(string.Format("Method #3, parameter: {0}", value));
            }, "third");
        Console.WriteLine();
        Console.WriteLine("Finished. Press any key...");
        Console.ReadKey(true);
        //safest way of exiting of the thread,
        //unfortunatelly, not applicable to 100% cases:
        t.InvokeExit();
        //less safe way, need extra care in thread implementation,
        //safe enough for this sample though:
        //t.Abort();
    } //Run

    static void Main(string[] args) {
        new Program().Run();
    } //Main

} //class Program

I've shown anonymous delegate syntax which can be compiled with C# v.2 or later. With C# v.3 lambda form of anonymous delegate can also be used:

 

 

C#
t.Invoke(
    new Action<string>((value) => {
        Console.WriteLine(string.Format("Method #3, parameter: {0}", value));
    }), "third");

 

 

This idea could further be used to combine more than one queue in the same target thread. The warning is: it is not possible to use more than one instance of DataQueue in the same thread: they will block each other, if blocking operation is used. The modifications should go inside the class DataQueue. At the same time, such generalization is easy using the techniques I offer.

Thank you,

 

 

— Sergey A Kryukov

 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)