четверг, 19 сентября 2013 г.

Что делать, если запуск потоков зависит от событий, которые не реализуют уведомления?

Сегодня будет небольшой пример на многопоточность, в ответ на вопрос с форумов MSDN.
Задача состоит в том, что есть внешний источник информации, который говорит нам сколько потоков запускать, также там есть информация о том, готово ли все для запуска или нет. Если таким источником данных является MS SQL сервер, то узнать о том, что там поменялась информация мы можем только обратившись за ней сами. Все, перехожу к примеру.

У этой задачи есть два способа решения, про первый спрашивается в вопросе. Т.е. мы запускаем все потоки и они ждут, когда будет все готово, для продолжения выполнения.
Для эмуляции данных считываемых из базы данных, я буду пользоваться вот таким классом:

class MyThread
{
    public string Name { get; set; }
 
    public bool IsReady { get; set; }
}

Для эмуляции чтения из БД, я заведу вот такой класс:

class DbEmulator
{
    int _readCount = 0;
 
    List<MyThread> _data = null;
 
    public DbEmulator()
    {
        _readCount = 0;
        _data = new List<MyThread>();
        _data.Add(new MyThread() { Name = "A", IsReady = false });
        _data.Add(new MyThread() { Name = "B", IsReady = false });
        _data.Add(new MyThread() { Name = "C", IsReady = false });
    }
 
    public IEnumerable<MyThread> Read()
    {
        if (_readCount < 3)
        {
            _data[_readCount].IsReady = true;
            _readCount++;
        }
        return _data;
    }
}

Как видно, при первых трех чтения, соответствующий элемент данных будет менять статус на Ready, при последующих чтениях всегда будет возвращаться что все готовы.
Все, можно переходить к примеру. Я буду писать консольное приложение.
Для эмуляции метода с полезной работой я воспользуюсь вот таким кодом:
 
static void Work(string p_name)
{
    Console.WriteLine(p_name);
}

Для хранения информации о запущенных потоках, я воспользуюсь вот таким классом:

class ThreadInfo
{
    public Thread Thread { get; set; }
 
    public AutoResetEvent StartEvent { get; set; }
 
    public bool IsStarted { get; set; }
}
 
 

Все, переходим к реализации. При запуске программы, считываем имеющиеся данные из БД и создаем по потоку на каждую считанную единицу данных, также, для реализации ожидания создаем AutoResetEvent и в зависимости от считанного свойства IsReady ставим ему статус. Ну и запускаем отдельный поток, который будет раз в пять секунд считывать данные из БД и информировать нас о том, что он там считал. Если некий флаг IsReady меняется на true, то вызываем на соответствующем AutoResetEvent-е метод Set:

static void Main(string[] args)
{
    List<ThreadInfo> threads = new List<ThreadInfo>();
    DbEmulator context = new DbEmulator();
    var data = context.Read(); // Первое чтение, только один в статусе IsReady
    foreach (var item in data)
    {
        AutoResetEvent autoResetEvent = new AutoResetEvent(item.IsReady);
        string name = item.Name;
        ThreadInfo threadInfo = new ThreadInfo()
        {                   
            Thread = new Thread(new ThreadStart(() => Run(Work, name, autoResetEvent))),
            StartEvent = autoResetEvent,
            IsStarted = item.IsReady
        };
        threadInfo.Thread.Start(); // Запускаем поток
        threads.Add(threadInfo);
    }
    // Запускаем поток, для последовательного чтения данных
    Thread checker = new Thread(new ThreadStart(() =>
        {
            while (threads.Any(t => !t.IsStarted))
            {
                Thread.Sleep(5000);
                var newData = context.Read();
                Console.Write("Статусы: ");
                foreach (var d in newData)
                {
                    Console.Write("{0} ", d.IsReady);
                }
                Console.WriteLine();
                // Проверка статусов
                for (int i = 0; i < newData.Count(); i++)
                {
                    lock (threads[i])
                    {
                        if (newData.ElementAt(i).IsReady != threads[i].IsStarted && newData.ElementAt(i).IsReady)
                        {
                            // Ок, пора запускать очередной поток
                            threads[i].StartEvent.Set();
                            threads[i].IsStarted = true;
                        }
                    }
                }
            }
        }));
    checker.Start();
    Console.ReadKey();
}

Все. Последнее что нам осталось, это использованный в этом фрагменте кода метод Run, я его специально сделал универсальным, т.е. мы можем в него передавать любые методы которые нам необходимо запустить, например, будем выбирать метод на основе считанной из БД информации:

static void Run(Action<string> p_method, string p_param, AutoResetEvent p_startEvent)
{
    p_startEvent.WaitOne();
    p_method(p_param);
}
Запускаем и видим:
Через 5 секунд:

И еще через 5:
Все. При первом чтении у нас запустился только поток A, при втором, в связи с изменением статуса B, при третьем C.
А теперь проанонсированное второе решение. Мне для него не понадобятся ни ThreadInfo, ни AutoResetEvent, ни вспомогательный метод Run. Почему? Да потому, что я буду запускать потоки по мере прихода нужных мне статусов из БД:

static void Main1(string[] args)
{
    List<ThreadInfo> threads = new List<ThreadInfo>();
    DbEmulator context = new DbEmulator();
    var data = context.Read(); // Первое чтение, только один в статусе IsReady
    foreach (var item in data)
    {
        AutoResetEvent autoResetEvent = new AutoResetEvent(item.IsReady);
        string name = item.Name;
        ThreadInfo threadInfo = new ThreadInfo()
        {                   
            Thread = new Thread(new ThreadStart(() => Run(Work, name, autoResetEvent))),
            StartEvent = autoResetEvent,
            IsStarted = item.IsReady
        };
        threadInfo.Thread.Start(); // Запускаем поток
        threads.Add(threadInfo);
    }
    // Запускаем поток, для последовательного чтения данных
    Thread checker = new Thread(new ThreadStart(() =>
        {
            while (threads.Any(t => !t.IsStarted))
            {
                Thread.Sleep(5000);
                var newData = context.Read();
                Console.Write("Статусы: ");
                foreach (var d in newData)
                {
                    Console.Write("{0} ", d.IsReady);
                }
                Console.WriteLine();
                // Проверка статусов
                for (int i = 0; i < newData.Count(); i++)
                {
                    lock (threads[i])
                    {
                        if (newData.ElementAt(i).IsReady != threads[i].IsStarted && newData.ElementAt(i).IsReady)
                        {
                            // Ок, пора запускать очередной поток
                            threads[i].StartEvent.Set();
                            threads[i].IsStarted = true;
                        }
                    }
                }
            }
        }));
    checker.Start();
    Console.ReadKey();
}

Все. Рабоатет аналогично пердыдущему примеру, но на мой взгляд значительно проще.

Комментариев нет:

Отправить комментарий