Задание
Продемонстрировать возможность параллельной обработки элементов коллекции (поиск минимума, максимума, вычисление среднего значения, отбор по критерию, статистическая обработка и т.д.).
Управление очередью задач (команд) реализовать с помощью шаблона Worker Thread.
Обеспечить диалоговый интерфейс с пользователем.
Разработать класс для тестирования функциональности приложения.
Использовать комментарии для автоматической генерации документации средствами javadoc.
2 Разработка программ
Реализуем классы, структура которых соответствует схеме п.2.1.2.
Разработаем класс MainTest для проведения теста разработанных классов –CommandQueue, MaxCommand, AvgCommand, MinMaxCommand. Реализуем методы:
setUpBeforeClass() – выполняется первым;
tearDownAfterClass() – выполняется последним;
testMax() – проверка основной функциональности класса MaxCommand;
testAvg() – проверка основной функциональности класса AvgCommand;
testMin() – проверка основной функциональности класса MinMaxCommand;
testMaxQueue() – проверка основной функциональности класса CommandQueue с задачей MaxCommand;
testAvgQueue() – проверка основной функциональности класса CommandQueue с задачей AvgCommand;
testMinQueue() – проверка основной функциональности класса CommandQueue с задачей MinMaxCommand.
В процессе разработки необходимо обеспечить прохождение всех тестов.
2.1 Использованные средства ООП
Для некоторых задач удобно организовать параллельное выполнение нескольких частей программы. Каждая из этих самостоятельных подзадач называется потоком (thread). Существует системный механизм, который обеспечивает совместное использованием процессора.
Модель потоков в языке Java является программным механизмом, упрощающим одновременное выполнение нескольких операций в одной и той же программе. Процессор периодически выделяет каждому потоку некоторый отрезок времени. Для каждого потока все выглядит так, словно процессор используется в монопольном режиме, но на самом деле время процессора разделяется между всеми существующими в программе потоками. Ускорение можно получить на многопроцессорном компьютере. При использовании потоков нет нужды учитывать эти тонкости – код не зависит от того, на скольких процессорах будет исполняться. Таким образом, потоки предоставляют механизм масштабирования производительности – если программа работает слишком медленно, можно добиться ускорения, используя многопроцессорную систему, при этом, не переписывая программу заново.
При программировании параллельно исполняемых потоков нужно учитывать следующие моменты:
- Программу можно разделить на несколько независимых задач.
- Необходимо заранее предусмотреть всевозможные проблемы, возникающие при завершении задач.
- Задачи, работающие с общими ресурсами, могут мешать друг другу. Основным средством предотвращения конфликтов является блокировка.
- В неаккуратно спроектированных многозадачных системах возможны взаимные блокировки.
Основные причины использования параллельного выполнения:
- управление несколькими подзадачами, одновременное выполнение которых позволяет эффективнее распоряжаться ресурсами вычислительной системы (включая возможность распределения этих задач по нескольким процессорам);
- улучшенная организация кода;
- удобство для пользователя.
2.2 Иерархия и структура классов
2.3 Описание программ
Для реализации параллельного выполнения при обработке коллекции объектов использовались шаблоны Worker Thread, Command и Factory Method.
Для хранения задач используется очередь, представленная интерфейсом Queue. Интерфейс Queue определяет два базовых метода: put и take. Эти методы используются для добавления в очередь и удаления из нее задач, представленных интерфейсом Command.
Класс CommandQueue реализует интерфейс Queue и создает обработчик потока, который выполняет объекты, реализующие интерфейс Command. Класс Worker, который является внутренним классом класса CommandQueue, реализует метод run, предназначенный для периодической проверки очереди на наличие в ней готовых к выполнению задач. Когда такая задача появляется, обработчик потока извлекает ее из очереди и запускает метод execute.
Для параллельного выполнения предназначены три класса, реализующие интерфейс Command:
MaxCommand – предназначен для обработки коллекции объектов и нахождения максимального значения.
MinMaxCommand – предназначен для обработки коллекции объектов, выполняет поиск минимального положительного и максимального отрицательного значения.
AvgCommand – предназначен для обработки коллекции объектов, вычисляет среднее арифметическое значение.
Класс ExecuteConsoleCommand реализует консольную команду "Execute all threads" (шаблон Command). При исполнении метода execute() создаются две очереди задач и выполняется ожидание завершения их обработки.
При написании исходного кода используем стиль комментариев документации javadoc.
Текст программы
1 AvgCommand.java
package ex05;
import java.util.concurrent.TimeUnit;
import ex02.ViewResult;
import ex04.Command;
/** Задача, используемая
* обработчиком потока;
* шаблон Worker Thread
* @author xone
* @version 1.0
* @see Command
* @see CommandQueue
*/
public class AvgCommand implements Command /*, Runnable */ {
/** Хранит результат обработки коллекции */
private double result = 0.0;
/** Флаг готовности результата */
private int progress = 0;
/** Обслуживает коллекцию объектов {@linkplain ex01.Item2d} */
private ViewResult viewResult;
/** Возвращает поле {@linkplain MaxCommand#viewResult}
* @return значение {@linkplain MaxCommand#viewResult}
*/
public ViewResult getViewResult() {
return viewResult;
}
/** Устанавливает поле {@linkplain MaxCommand#viewResult}
* @param viewResult значение для {@linkplain MaxCommand#viewResult}
* @return новое значение {@linkplain MaxCommand#viewResult}
*/
public ViewResult setViewResult(ViewResult viewResult) {
return this.viewResult = viewResult;
}
/** Инициализирует поле {@linkplain MaxCommand#viewResult}
* @param viewResult объект класса {@linkplain ViewResult}
*/
public AvgCommand(ViewResult viewResult) {
this.viewResult = viewResult;
}
/** Возвращает результат
* @return поле {@linkplain MaxCommand#result}
*/
public double getResult() {
return result;
}
/** Проверяет готовность результата
* @return false - если результат найден, иначе - true
* @see MaxCommand#result
*/
public boolean running() {
return progress < 100;
}
/** Используется обработчиком потока {@linkplain CommandQueue};
* шаблон Worker Thread
*/
@Override
public void execute() {
progress = 0;
System.out.println("Average executed...");
result = 0.0;
int idx = 1, size = 20;
result += 2;
progress = idx * 100 / size;
if (idx++ % (size / 2) == 0) {
System.out.println("Average " + progress + "%");
}
try {
TimeUnit.MILLISECONDS.sleep(2000 / size);
} catch (InterruptedException e) {
System.err.println(e);
}
result /= size;
System.out.println("Average done. Result = " + String.format("%.2f",result));
progress = 100;
}
/**
@Override
public void run() {
execute();
}
/**/
}
2 CommandQueue.java
package ex05;
import java.util.Vector;
import ex04.Command;
/** Создает обработчик
* потока, выполняющего
* объекты с интерфейсом
* Command; шаблон
* Worker Thread
* @author xone
* @version 1.0
* @see Command
*/
public class CommandQueue implements Queue {
/** Очередь задач */
private Vector<Command> tasks;
/** Флаг ожидания */
private boolean waiting;
/** Флаг завершения */
private boolean shutdown;
/** Устанавливает флаг завершения */
public void shutdown() {
shutdown = true;
}
/** Инициализация {@linkplain CommandQueue#tasks}
* {@linkplain CommandQueue#waiting}
* {@linkplain CommandQueue#waiting};
* создает поток для класса {@linkplain CommandQueue.Worker}
*/
public CommandQueue() {
tasks = new Vector<Command>();
waiting = false;
new Thread(new Worker()).start();
}
@Override
public void put(Command r) {
tasks.add(r);
if (waiting) {
synchronized (this) {
notifyAll();
}
}
}
@Override
public Command take() {
if (tasks.isEmpty()) {
synchronized (this) {
waiting = true;
try {
wait();
} catch (InterruptedException ie) {
waiting = false;
}
}
}
return (Command)tasks.remove(0);
}
/** Обслуживает очередь
* задач; шаблон
* Worker Thread
* @author xone
* @version 1.0
* @see Runnable
*/
private class Worker implements Runnable {
/** Извлекает из очереди
* готовые к выполнению
* задачи; шаблон
* Worker Thread */
public void run() {
while (!shutdown) {
Command r = take();
r.execute();
}
}
}
}
3 ExecuteConsoleCommand.java
package ex05;
/**
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**/
import java.util.concurrent.TimeUnit;
import ex02.View;
import ex02.ViewResult;
import ex04.ConsoleCommand;
/** Консольная команда
* Execute all threads;
* шаблон Command
* @author xone
* @version 1.0
*/
public class ExecuteConsoleCommand implements ConsoleCommand {
/** Объект, реализующий интерфейс {@linkplain View};
* обслуживает коллекцию объектов {@linkplain ex01.Item2d}
*/
private View view;
/** Возвращает поле {@linkplain ExecuteConsoleCommand#view}
* @return значение {@linkplain ExecuteConsoleCommand#view}
*/
public View getView() {
return view;
}
/** Устанавливает поле {@linkplain ExecuteConsoleCommand#view}
* @param view значение для {@linkplain ExecuteConsoleCommand#view}
* @return новое значение {@linkplain ExecuteConsoleCommand#view}
*/
public View setView(View view) {
return this.view = view;
}
/** Инициализирует поле {@linkplain ExecuteConsoleCommand#view}
* @param view объект, реализующий {@linkplain View}
*/
public ExecuteConsoleCommand(View view) {
this.view = view;
}
@Override
public char getKey() {
return 'e';
}
@Override
public String toString() {
return "'e'xecute";
}
@Override
public void execute() {
/**/
CommandQueue queue1 = new CommandQueue();
CommandQueue queue2 = new CommandQueue();
/**
ExecutorService exec1 = Executors.newSingleThreadExecutor();
ExecutorService exec2 = Executors.newSingleThreadExecutor();
/**/
MaxCommand maxCommand = new MaxCommand((ViewResult)view);
AvgCommand avgCommand = new AvgCommand((ViewResult)view);
MinMaxCommand minMaxCommand = new MinMaxCommand((ViewResult)view);
System.out.println("Execute all threads...");
/**
exec1.execute(minMaxCommand);
exec2.execute(maxCommand);
exec2.execute(avgCommand);
/**/
queue1.put(minMaxCommand);
queue2.put(maxCommand);
queue2.put(avgCommand);
/**/
try {
while (avgCommand.running() ||
maxCommand.running() ||
minMaxCommand.running()) {
TimeUnit.MILLISECONDS.sleep(100);
}
/**
exec1.shutdown();
exec2.shutdown();
/**/
queue1.shutdown();
queue2.shutdown();
/**/
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.err.println(e);
}
System.out.println("All done.");
}
}
4 Main.java
package ex05;
import ex02.View;
import ex02.ViewableResult;
import ex04.ChangeConsoleCommand;
import ex04.GenerateConsoleCommand;
import ex04.Menu;
import ex04.ViewConsoleCommand;
/** Вычисление и отображение
* результатов; содержит реализацию
* статического метода main()
* @author xone
* @version 5.0
* @see Main#main
*/
public class Main {
/** Объект, реализующий интерфейс {@linkplain View};
* обслуживает коллекцию объектов {@linkplain ex01.Item2d};
* инициализируется с помощью Factory Method
*/
private View view = new ViewableResult().getView();
/** Объект класса {@linkplain Menu};
* макрокоманда (шаблон Command)
*/
private Menu menu = new Menu();
/** Обработка команд пользователя */
public void run() {
menu.add(new ViewConsoleCommand(view));
menu.add(new GenerateConsoleCommand(view));
menu.add(new ChangeConsoleCommand(view));
menu.add(new ExecuteConsoleCommand(view));
menu.execute();
}
/** Выполняется при запуске программы
* @param args параметры запуска программы
*/
public static void main(String[] args) {
Main main = new Main();
main.run();
}
}
5 MaxCommand.java
package ex05;
import java.util.concurrent.TimeUnit;
import ex02.ViewResult;
import ex02.View;
import ex04.Command;
/** Задача, используемая
* обработчиком потока;
* шаблон Worker Thread
* @author xone
* @version 1.0
* @see Command
* @see CommandQueue
*/
public class MaxCommand implements Command /*, Runnable */ {
/** Хранит результат обработки коллекции */
private int result = -1;
/** Флаг готовности результата */
private int progress = 0;
/** Обслуживает коллекцию объектов {@linkplain ex01.Item2d} */
private View viewResult;
/** Возвращает поле {@linkplain MaxCommand#viewResult}
* @return значение {@linkplain MaxCommand#viewResult}
*/
public View getViewResult() {
return viewResult;
}
/** Устанавливает поле {@linkplain MaxCommand#viewResult}
* @param viewResult значение для {@linkplain MaxCommand#viewResult}
* @return новое значение {@linkplain MaxCommand#viewResult}
*/
public View setViewResult(View viewResult) {
return this.viewResult = viewResult;
}
/** Инициализирует поле {@linkplain MaxCommand#viewResult}
* @param viewResult объект класса {@linkplain ViewResult}
*/
public MaxCommand(View viewResult) {
this.viewResult = viewResult;
}
/** Возвращает результат
* @return поле {@linkplain MaxCommand#result}
*/
public int getResult() {
return result;
}
/** Проверяет готовность результата
* @return false - если результат найден, иначе - true
* @see MaxCommand#result
*/
public boolean running() {
return progress < 100;
}
/** Используется обработчиком потока {@linkplain CommandQueue};
* шаблон Worker Thread
*/
@Override
public void execute() {
progress = 0;
System.out.println("Max executed...");
int size = viewResult.getX();
int idx;
for (idx = 1; idx < size; idx++) {
result = idx;
}
progress = idx * 100 / size;
if (idx % (size / 3) == 0) {
System.out.println("Max " + progress + "%");
}
try {
TimeUnit.MILLISECONDS.sleep(3000 / size);
} catch (InterruptedException e) {
System.err.println(e);
}
}
System.out.println("Max done. Item #" + result + " found: " + viewResult.getItems().get(result));
progress = 100;
}
/**
@Override
public void run() {
execute();
}
/**/
}
6 MinMaxCommand.java
package ex05;
import java.util.concurrent.TimeUnit;
import ex02.Item2;
import ex02.ViewResult;
import ex02.View;
import ex04.Command;
/** Задача, используемая
* обработчиком потока;
* шаблон Worker Thread
* @author xone
* @version 1.0
* @see Command
* @see CommandQueue
*/
public class MinMaxCommand implements Command /*, Runnable */ {
/** Хранит результат обработки коллекции */
private int resultMin = -1;
/** Хранит результат обработки коллекции */
private int resultMax = -1;
/** Флаг готовности результата */
private int progress = 0;
/** Обслуживает коллекцию объектов {@linkplain ex01.Item2d} */
private View viewResult;
/** Возвращает поле {@linkplain MinMaxCommand#viewResult}
* @return значение {@linkplain MinMaxCommand#viewResult}
*/
public View getViewResult() {
return viewResult;
}
/** Устанавливает поле {@linkplain MinMaxCommand#viewResult}
* @param viewResult значение для {@linkplain MinMaxCommand#viewResult}
* @return новое значение {@linkplain MinMaxCommand#viewResult}
*/
public View setViewResult(ViewResult viewResult) {
return this.viewResult = viewResult;
}
/** Инициализирует поле {@linkplain MinMaxCommand#viewResult}
* @param viewResult объект класса {@linkplain ViewResult}
*/
public MinMaxCommand(ViewResult viewResult) {
this.viewResult = viewResult;
}
/** Возвращает результат
* @return поле {@linkplain MinMaxCommand#resultMin}
*/
public int getResultMin() {
return resultMin;
}
/** Возвращает результат
* @return поле {@linkplain MinMaxCommand#resultMax}
*/
public int getResultMax() {
return resultMax;
}
/** Проверяет готовность результата
* @return false - если результат найден, иначе - true
*/
public boolean running() {
return progress < 100;
}
/** Используется обработчиком потока {@linkplain CommandQueue};
* шаблон Worker Thread
*/
@Override
public void execute() {
progress = 0;
System.out.println("MinMax executed...");
int idx = 0, size = viewResult.getX();
idx++;
progress = idx * 100 / size;
if (idx % (size / 5) == 0) {
System.out.println("MinMax " + progress + "%");
}
try {
TimeUnit.MILLISECONDS.sleep(5000 / size);
} catch (InterruptedException e) {
System.err.println(e);
}
}
System.out.print("MinMax done. ");
if (resultMin > -1) {
System.out.print("Min positive #" + resultMin + " found: " +
String.format("%.2f.", viewResult.getX());
} else {
System.out.print("Min positive not found.");
}
if (resultMax > -1) {
System.out.println(" Max negative #" + resultMax + " found: " +
String.format("%.2f.", viewResult.getX()));
} else {
System.out.println(" Max negative item not found.");
}
progress = 100;
}
/**
@Override
public void run() {
execute();
}
/**/
}
7 Queue.java
package ex05;
import ex04.Command;
/** Представляет
* методы для помещения
* и извлечения задач
* обработчиком потока;
* шаблон Worker Thread
* @author xone
* @version 1.0
* @see Command
*/
public interface Queue {
/** Добавляет новую задачу в очередь;
* шаблон Worker Thread
* @param cmd задача
*/
void put(Command cmd);
/** Удаляет задачу из очереди;
* шаблон Worker Thread
* @return удаляемая задача
*/
Command take();
}
8 MainTest.java
package ex05;
import static org.junit.Assert.*;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import ex02.ViewResult;
/** Тестирование
* разработанных классов
* @author xone
* @version 5.0
* @see CommandQueue
* @see MaxCommand
* @see AvgCommand
* @see MinMaxCommand
*/
public class MainTest {
private final static int N = 1000;
private static ViewResult view = new ViewResult(N);
private static MaxCommand max1 = new MaxCommand(view);
private static MaxCommand max2 = new MaxCommand(view);
private static AvgCommand avg1 = new AvgCommand(view);
private static AvgCommand avg2 = new AvgCommand(view);
private static MinMaxCommand min1 = new MinMaxCommand(view);
private static MinMaxCommand min2 = new MinMaxCommand(view);
private CommandQueue queue = new CommandQueue();
/** Выполняется первым */
@BeforeClass
public static void setUpBeforeClass() {
view.viewInit();
assertEquals(N, view.getItems().size());
}
/** Выполняется последним */
@AfterClass
public static void tearDownAfterClass() {
assertEquals(max1.getResult(), max2.getResult());
assertEquals(avg1.getResult(), avg2.getResult(), .1e-10);
assertEquals(min1.getResultMax(), min2.getResultMax());
assertEquals(min1.getResultMin(), min2.getResultMin());
}
/** Проверка основной функциональности класса {@linkplain MaxCommand} */
@Test
public void testMax() {
max1.execute();
assertTrue( max1.getResult() > -1);
}
/** Проверка основной функциональности класса {@linkplain AvgCommand} */
@Test
public void testAvg() {
avg1.execute();
assertTrue( avg1.getResult() != 0.0);
}
/** Проверка основной функциональности класса {@linkplain MinMaxCommand} */
@Test
public void testMin() {
min1.execute();
assertTrue( min1.getResultMin() > -1);
assertTrue( min1.getResultMax() > -1);
}
/** Проверка основной функциональности класса
* {@linkplain CommandQueue} с задачей {@linkplain MaxCommand}
*/
@Test
public void testMaxQueue() {
queue.put(max2);
try {
while (max2.running()) {
TimeUnit.MILLISECONDS.sleep(100);
}
queue.shutdown();
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
fail(e.toString());
}
}
/** Проверка основной функциональности класса
* {@linkplain CommandQueue} с задачей {@linkplain AvgCommand}
*/
@Test
public void testAvgQueue() {
queue.put(avg2);
try {
while (avg2.running()) {
TimeUnit.MILLISECONDS.sleep(100);
}
queue.shutdown();
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
fail(e.toString());
}
}
/** Проверка основной функциональности класса
* {@linkplain CommandQueue} с задачей {@linkplain MinMaxCommand}
*/
@Test
public void testMinQueue() {
queue.put(min2);
try {
while (min2.running()) {
TimeUnit.MILLISECONDS.sleep(100);
}
queue.shutdown();
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
fail(e.toString());
}
}
}
Результат тестирования: