Библиотека Java SE5java.utiLconcurrent также содержит явный механизм управления мьютексами, определенный в java.util.concurrent.locks. Объект Lock можно явно создать в программе, установить или снять блокировку; правда, полученный код будет менее элегантным, чем при использовании встроенной формы. С другой стороны, он обладает большей гибкостью при решении некоторых типов задач. Вот как выглядит пример SynchronizedEvenGenerator.java с явным использованием объектов Lock:
//: concurrency/MutexEvenGenerator.java
// Предотвращение потоковых конфликтов с использованием мьютексов.
// {RunByHand}
import java.util.concurrent.locks.*;
public class MutexEvenGenerator extends IntGenerator {
private int currentEvenValue = 0;
private Lock lock = new ReentrantLock();
public int next() {
lock.lock();
try {
++currentEvenValue;
Thread.yield(); // Ускоряем сбой
++currentEvenValue;
return currentEvenValue;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
EvenChecker.test(new MutexEvenGenerator());
}
}
MutexEvenGenerator добавляет мьютекс с именем lock и использует методы lock() и unlock() для создания критической секции в next(). При использовании объектов Lock следует применять идиому, показанную в примере: сразу же за вызовом lock() необходимо разместить конструкцию try-finally, при этом в секцию finally включается вызов unlock() — только так можно гарантировать снятие блокировки.
Хотя try-finally требует большего объема кода, чем ключевое слово synchronized, явное использование объектов Lock обладает своими преимуществами. При возникновении проблем с ключевым словом synchronized происходит исключение, но вы не получите возможность выполнить завершающие действия, чтобы сохранить корректное состояние системы. При работе с объектами Lock можно сделать все необходимое в секции finally.
В общем случае использование synchronized уменьшает объем кода, а также радикально снижает вероятность ошибки со стороны программиста, поэтому явные операции с объектами Lock обычно выполняются только при решении особых задач. Например, с ключевым словом synchronized нельзя попытаться получить блокировку с неудачным исходом или попытаться получить блокировку в течение некоторого промежутка времени с последующим отказом — в подобных случаях приходится использовать библиотеку concurrent:
//: concurrency/AttemptLocking.java
// Объекты Lock из библиотеки concurrent делают возможными
// попытки установить блокировку в течение некоторого времени
// Теперь создаем отдельную задачу для установления блокировки:
new Thread() {
{ setDaemon(true); }
public void run() {
al.lock.lock();
System.out.println("acquired");
}
}.start();
Thread.yield(); // Даем возможность 2-й задаче
al.untimed(); // False -- блокировка захвачена задачей
al.timed(); // False -- блокировка захвачена задачей
}
}
<spoiler text="Output:">
tryLock(): true
tryLock(2, TimeUnit.SECONDS): true
acquired
tryLock(): false
tryLock(2, TimeUnit.SECONDS): false
</spoiler> Класс ReentrantLock делает возможной попытку получения блокировки с последующим отказом от нее. Таким образом, если кто-то уже захватил блокировку, вы можете отказаться от своих намерений (вместо того, чтобы дожидаться ее освобождения). В методеtimed() делается попытка установления блокировки, которая может завершиться неудачей через 2 секунды (обратите внимание на использование класса Java SE5TimeUnit для определения единиц времени). В main() отдельный объект Thread создается в виде безымянного класса и устанавливает блокировку, чтобы методы untimed() и timed() могли с чем-то конкурировать.
Атомарные операции и ключевое слово volatile
В дискуссиях, посвященных механизму потоков в Java, часто можно услышать такое утверждение: «Атомарные операции не требуют синхронизации». Атомарная операция — это операция, которую не может прервать планировщик потоков — если она начинается, то продолжается до завершения, без возможности переключения контекста (переключения выполнения на другой поток). Не полагайтесь на атомарность, она ненадежна и опасна — используйте ее вместо синхронизации только в том случае, если вы являетесь экспертом в области синхронизации или, по крайней мере, можете получить помощь от такого эксперта.
Атомарные операции, упоминаемые в таких дискуссиях, включают в себя «простые операции» с примитивными типами, за исключением long и double.
Чтение и запись примитивных переменных гарантированно выполняются как атомарные (неделимые) операции. С другой стороны, JVM разрешается выполнять чтение и запись 64-разрядных величин (long и double) в виде двух раздельных 32-разрядных операций, с ненулевой вероятностью переключения контекста в ходе чтения или записи. Для достижения атомарности (при простом присваивании и возврате значений) можно определить типы long и double с модификатором volatile (учтите, что до выхода Java SE5 ключевое слово volatile не всегда работало корректно). Некоторые реализации JVM могут предоставлять более сильные гарантии, но вы не должны полагаться на платформенно-специфические возможности.
В многопроцессорных системах (которые в наши дни представлены многоядерными процессорами, то есть несколькими процесорами на одном чипе) видимость (visibility) играет гораздо более важную роль, чем в однопроцессорных системах. Изменения, вносимые одной задачей, — даже атомарные в смысле невозможности прерывания — могут оставаться невидимыми для других задач (например, если изменения временно хранятся в локальном кэше процессора). Таким образом, разные задачи будут по-разному воспринимать состояние приложения. Механизм синхронизации обеспечивает распространение видимости изменений, вносимых одной задачей в многопроцессорной системе, по всему приложению. Без синхронизации невозможно заранее предсказать, когда именно изменения станут видимыми.
Ключевое слово volatile обеспечивает видимость в рамках приложения. Если поле объявлено как volatile, это означает, что сразу же после записи в поле изменение будет отражено во всех последующих операциях чтения. Утверждение истинно даже при участии локальных кэшей — поля volatile немедленно записываются в основную память, и дальнейшее чтение происходит из основной памяти.
Если слепо следовать концепции атомарности, можно заметить, что метод getValue() в следующем примере вроде бы отвечает этому описанию:
</spoiler> Однако программа находит нечетные значения и завершается. Хотя return і и является атомарной операцией, отсутствие synchronized позволит читать значение объекта, когда он находится в нестабильном промежуточном состоянии. Вдобавок переменная і не объявлена как volatile, а это приведет к проблемам с видимостью. Оба метода, getValue() и evenIncrement(), должны быть объявлены синхронизируемыми. Только эксперты в области параллельных вычислений могут пытаться применять оптимизацию в подобных случаях.
В качестве второго примера рассмотрим кое-что еще более простое: класс, производящий серийные номера. Каждый раз при вызове метода nextSerialNumber() он должен возвращать уникальное значение:
//: concurrency/SerialNumberGenerator.java
public class SerialNumberGenerator {
private static volatile int serialNumber = 0;
public static int nextSerialNumber() {
return serialNumber++; // Операция не является потоково-безопасной
}
}
Представить себе класс тривиальнее SerialNumberGenerator вряд ли можно, и если вы ранее работали с языком C++ или имеете другие низкоуровневые навыки, то, видимо, ожидаете, что операция инкремента будет атомарной, так как инкремент обычно реализуется в виде одной инструкции микропроцессора. Однако в виртуальной машине Java инкремент не является атомарным и состоит из чтения и записи, соответственно, ниша для проблем с потоками найдется даже в такой простой программе.
Поле serialNumber объявлено как volatile потому, что каждый поток обладает локальным стеком и поддерживает в нем копии некоторых локальных переменных. Если вы объявляете переменную как volatile, то тем самым указываете компилятору не проводить оптимизацию, а это приведет к удалению чтения и записи, удерживающих поле в соответствии с локальными данными потока. Операции чтения и записи осуществляются непосредственно с памятью без кэширования. Кроме того, volatile не позволяет компилятору изменять порядок обращений с целью оптимизации. И все же присутствие volatile не влияет на тот факт, что инкремент не является атомарной операцией.
Для тестирования нам понадобится множество, которое не потребует переизбытка памяти в том случае, если обнаружение проблемы отнимет много времени. Приведенный далее класс CircularSet многократно использует память, в которой хранятся целые числа (int); предполагается, что к тому моменту, когда запись в множество начинается по новому кругу, вероятность конфликта
с перезаписанными значениями минимальна. Методы add() и contains() объявлены как synchronized, чтобы избежать коллизий:
//: concurrency/SerialNumberChecker.java
// Кажущиеся безопасными операции с появлением потоков
// перестают быть таковыми...
// {Args: 4}
import java.util.concurrent.*;
// Reuses storage so we don't run out of memory:
class CircularSet {
private int[] array;
private int len;
private int index = 0;
public CircularSet(int size) {
array = new int[size];
len = size;
// Инициализируем значением, которое не производится
// классом SerialNumberGenerator:
for(int i = 0; i < size; i++)
array[i] = -1;
}
public synchronized void add(int i) {
array[index] = i;
// Возврат индекса к началу с записью поверх старых значений:
index = ++index % len;
}
public synchronized boolean contains(int val) {
for(int i = 0; i < len; i++)
if(array[i] == val) return true;
return false;
}
}
public class SerialNumberChecker {
private static final int SIZE = 10;
private static CircularSet serials =
new CircularSet(1000);
private static ExecutorService exec =
Executors.newCachedThreadPool();
static class SerialChecker implements Runnable {
public void run() {
while(true) {
int serial =
SerialNumberGenerator.nextSerialNumber();
if(serials.contains(serial)) {
System.out.println("Duplicate: " + serial);
System.exit(0);
}
serials.add(serial);
}
}
}
public static void main(String[] args) throws Exception {
for(int i = 0; i < SIZE; i++)
exec.execute(new SerialChecker());
// Остановиться после n секунд при наличии аргумента:
if(args.length > 0) {
TimeUnit.SECONDS.sleep(new Integer(args[0]));
System.out.println("No duplicates detected");
System.exit(0);
}
}
}
<spoiler text="Output:"> (Sample)
Duplicate: 8468656
</spoiler> В классе SerialNumberChecker содержится статическое поле CircuLarSet, хранящее все серийные номера, и вложенный поток Thread, который получает эти номера и удостоверяется в их уникальности. Создав несколько потоков, претендующих на серийные номера, вы обнаружите, что какой-нибудь из них довольно быстро получит уже имеющийся номер (заметьте, что на вашей машине программа может и не обнаружить конфликт, но на многопроцессорной системе она успешно их нашла). Для решения проблемы добавьте к методуnextSerialNumber() слово synchronized.
Предполагается, что безопасными атомарными операциями являются чтение и присвоение примитивов. Однако, как мы увидели в программе AtomicityTest.java, все так же просто использовать атомарную операцию для объекта, который находится в нестабильном промежуточном состоянии, так что ожидать, что какие-то предположения оправдаются, опасно и ненадежно.
Атомарные классы
В Java SE5 появились специальные классы для выполнения атомарных операций с переменными — Atomiclnteger, AtomicLong, AtomicReference и т. д. Эти классы содержат атомарную операцию условного обновления в форме
Эти классы предназначены для оптимизации с целью использования атомарности на машинном уровне на некоторых современных процессорах, поэтому в общем случае вам они не понадобятся. Иногда они применяются и в повседневном программировании, но только при оптимизации производительности. Например, версия AtomicityTest.java, переписанная для использования AtomicInteger, выглядит так:
//: concurrency/AtomicIntegerTest.java
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;
public class AtomicIntegerTest implements Runnable {
Здесь вместо ключевого слова synchronized используется AtomicInteger. Так как сбой в программе не происходит, в программу включается таймер, автоматически завершающий ее через 5 секунд.
Вот как выглядит пример MutexEvenGenerator.java, переписанный для использования класса Atomiclnteger:
//: concurrency/AtomicEvenGenerator.java
// Атомарные классы иногда используются в обычном коде.
// {RunByHand}
import java.util.concurrent.atomic.*;
public class AtomicEvenGenerator extends IntGenerator {
private AtomicInteger currentEvenValue =
new AtomicInteger(0);
public int next() {
return currentEvenValue.addAndGet(2);
}
public static void main(String[] args) {
EvenChecker.test(new AtomicEvenGenerator());
}
}
Стоит еще раз подчеркнуть, что классы Atomic проектировались для построения классов из java.util.concurrent. Используйте их в своих программах только в особых случаях и только тогда, когда вы твердо уверены, что это не создаст новых проблем. В общем случае безопаснее использовать блокировки (с ключевым словом synchronized или явным созданием объектов Lock).
Критические секции
Иногда необходимо предотвратить доступ нескольких потоков только к части кода, а не к методу в целом. Фрагмент кода, который изолируется таким способом, называется критической секцией (critical section), для его создания также применяется ключевое словоsynchronized. На этот раз слово synchronized определяет объект, блокировка которого должна использоваться для синхронизации последующего фрагмента кода:
synchronized(синхронизируемыйОбьект) {
// К такому коду доступ может получить
// одновременно только один поток
}
Такая конструкция иначе называется синхронизированной блокировкой (synchronized block); перед входом в нее необходимо получить блокировку для syncObject. Если блокировка уже предоставлена другому потоку, вход в последующий фрагмент кода запрещается до тех пор, пока блокировка не будет снята.
Следующий пример сравнивает два подхода к синхронизации, показывая, насколько увеличивается время, предоставляемое потокам для доступа к объекту при использовании синхронизированной блокировки вместо синхронизации методов. Вдобавок он демонстрирует, как незащищенный класс может «выжить» в многозадачной среде, если он управляется и защищается другим классом:
//: concurrency/CriticalSection.java
// Синхронизация блоков вместо целых методов. Также демонстрирует защиту
// неприспособленного к многопоточности класса другим классом
// with a thread-safe one.
package concurrency;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;
class Pair { // Not thread-safe
private int x, y;
public Pair(int x, int y) {
this.x = x;
this.y = y;
}
public Pair() { this(0, 0); }
public int getX() { return x; }
public int getY() { return y; }
public void incrementX() { x++; }
public void incrementY() { y++; }
public String toString() {
return "x: " + x + ", y: " + y;
}
public class PairValuesNotEqualException
extends RuntimeException {
public PairValuesNotEqualException() {
super("Pair values not equal: " + Pair.this);
}
}
// Произвольный инвариант - обе переменные должны быть:
public void checkState() {
if(x != y)
throw new PairValuesNotEqualException();
}
}
// Защита класса Pair внутри приспособленного к потокам класса:
abstract class PairManager {
AtomicInteger checkCounter = new AtomicInteger(0);