Scala в примерах
Материал из Викиучебника
Здесь ведётся перевод книги Martin'а Odersky "Scala by examples" Присоединяйтесь!
Содержание |
[править] Глава 1
[править] Первый пример
В качестве первого примера рассмотрим "быструю сортировку".
def sort(xs: Array[int]) {
def swap(i: int, j: int) {
val t = xs(i); xs(i) = xs(j); xs(j) = t
}
def sort1(l: int, r: int) {
val pivot = xs((l + r) / 2)
var i = l; var j = r
while (i <= j) {
while (xs(i) < pivot) { i = i + 1 }
while (xs(j) > pivot) { j = j - 1 }
if (i <= j) {
swap(i, j)
i = i + 1
j = j - 1
}
}
if (l < j) sort1(l, j)
if (j < r) sort1(i, r)
}
sort1(0, xs.length - 1)
}
Код весьма похож на Java или С программу. Мы используем те же операторы и похожие управляющие конструкции. Есть, также, немного синтаксических отличий. В частности,
- Определения всегда начинаются с резервированного слова. Определение функций начинается c *def*, задание переменных начинается с *var*, а определение значений (т.е. переменных только для чтения) с *val*.
- Тип идентификатора объявляется через двоеточие после идентификатора. Задание типа часто можно опускать, поскольку компилятор способен выводить его из контекста.
- Массивы типа T записываются как Array[T], вместо T[], а выборка из массива через a(i) вместо a[i].
- Функции могут быть вложены в другие функции. Вложенные функции имеют доступ к параметрам и локальным переменным внешних функций. Например, массив xs является видимым для функций swap и sort1, а значит, его не требуется передовать как параметр.
Пока, Scala представляется довольно обычным языком с некоторыми синтаксическими приправами. И действительно, в Scala можно писать программы в обычном императивном или объектно - ориентированном стиле. Это важно, потому что облегчает объединение Scala - компонент, с кодом, написанным на таких популярных языках, как Java, C# или Visual Basic.
На самом деле, также возможно писать программы в совершенно другом стиле. Вот снова Quicksort.
def sort(xs: Array[int]): Array[int] =
if (xs.length <= 1) xs
else {
val pivot = xs(xs.length / 2)
Array.concat(
sort(xs filter (pivot >)),
xs filter (pivot ==),
sort(xs filter (pivot <)))
}
Функциональная программа схватывает сущность алгоритма "быстрой сортировки":
- Если массив пуст или состоит из одного элемента, то он уже отсортирован.
- Если массив не пуст выбираем средний элемент как разделитель.
- Разбиваем массив на три подмассива, содержащие, соответственно, элементы меньшие разделителя, элементы равные разделителю и элементы большие его.
- Сортируем подмассивы с помощью рекурсивного вызова функции sort.
(Это не совсем то, что делает императивная версия. Там массив разбивается на массив элементов меньших разделителя, и больших или равных ему.)
- После склеивания всех подмассивов возвращаем результат.
Обе реализации, и императивная, и функциональная, имеют одинаковую асимптотическую сложность – O(N log (N)) в среднем, и O(N^2) в худшем случае. Но если императивная версия непосредственно оперирует элементами массива, используя прямую адресацию, то функциональная при каждом рекурсивном вызове возвращает новый отсортированный массив, оставляя без изменения массив переданный как аргумент. Следовательно, функциональная реализация требует больше памяти для выполнения.
Функциональная реализация создаёт впечатление что Scala - язык с операторами на массивах. На самом деле, все операции, использовавшиеся в примере, являются методами класса Seq[t] из стандартной библиотеки Scala. Поскольку массивы - это экземпляры класса Seq, все его методы доступны им. В частности, метод filter, который принимает в качестве аргумента функцию - предикат. Результат выполнения filter - массив, состоящий из тех элементов исходного массива, которые удовлетворяют предикату, т.е. на которых предикатная функция возвращает true. Метод filter класса Array[t], следовательно, имеет сигнатуру
def filter(p: t => boolean): Array[t]
Здесь, t => boolean - запись типа функции, принимающей аргумент типа t и возвращающей булево значение. Функции подобные filter, т.е. принимающие другую функцию как аргумент, или возвращающие функцию как результат, называются функциями высшего порядка.
Scala не различает идентификаторы и имена операторов. Идентификатором может быть последовательность букв или цифр, начинающаяся с буквы, или это может быть последовательность специальных символов, таких как “+”, “*”, или “:”. Любой идентификатор может использоваться как инфиксный оператор. Бинарная операция E op E0 означает следующее: E.op(E0). Следовательно, выражение xs filter (pivot >) равнозначно вызову метода xs.filter(pivot >). В программе быстрой сортировки, в filter трижды, как аргумент, передаётся анонимная функция, т.е. функция без определённого имени. Так, первый раз pivot > представляет функцию, принимающую аргумент x и возвращающую значение выражения pivot > x. Другой способ записать эту функцию: x => pivot > x явно указывает её аргумент. Тип параметра x в данном случае опущен, поскольку компилятор может автоматически вывести его из контекста. Подытоживая, xs.filter(pivot >) возвращает список из всех элементов xs, что меньше чем pivot. Взглянув снова на первую, императивную реализацию Quicksort, мы видим там многие языковые конструкции из второго решения, хотя и в замаскированной форме. Например, “обычные” бинарные операторы +, - , или < не рассматриваются специальным образом. Как и append, они являются методами левых операндов. Следовательно, выражение i + 1 рассматривается как вызов i.+(1) метода + на целочисленном i. При этом, конечно, допустимо распознавание компилятором специального случая вызова метода + на целых числах, и генерации эффективного кода. Для эффективности и лучшего обнаружения ошибок цикл while представлен в Scala как примитивная конструкция языка. Но, в принципе, его можно было бы вынести в библиотеку, оформив как функцию. Вот возможная реализация:
def While (p: => boolean) (s: => unit): unit =
if (p) { s ; While(p)(s) }
Функция While принимает первым параметром функцию проверки, которая не имеет аргументов, и возвращает булево значение. Второй параметр для while это функция s, в данном случае без параметров, возвращающая процедурный тип unit. While вызывает s до тех пор, пока предикатная функция возвращает true. Тип unit в Scala, грубо говоря, соответствует void в Java; он используется всякий раз когда функция не возвращает какого - либо значения, т.е. по сути, является процедурой. В Scala каждая функция возвращает некоторый результат. Если нет явно заданного выражения, определяющего результат, то предполагается значение по умолчанию: {} (произносится “unit”). Это значение процедурного типа unit. Вот более “ориентированная на выражения” формулировка функции swap из первой реализации quicksort; в ней {} используется явным образом:
def swap(i: int, j: int): unit = {
val t = xs(i); xs(i) = xs(j); xs(j) = t
{}
}
Результат этой функции - просто её последнее выражение, при этом ключевое слово return указывать не обязательно. Обратите внимание, что для функций, возвращающих явное значение, всегда необходимо указывать “=” перед их телом.
[править] Программирование с Акторами и Сообщениями
Вот пример, который демонстрирует область, в которой Scala особенно хорош. Рассмотрим задачу по созданию электронного аукциона. Для реализации участников аукциона мы используем модель актора - процесса в erlang стиле.
Акторы являются объектами, которым посылаются сообщения. Каждый процесс имеет "почтовый ящик" для поступающих сообщений, который представлен очередью. Сообщения в очереди могут обрабатываться или в последовательном порядке, или выборочно, как удовлетворяющие некоторому образцу. Для каждого лота есть процесс аукциона, который публикует информацию о лоте и принимает предложения от участников аукциона, и который общается с продавцом лота и покупателем, победившем на аукционе, для завершения сделки.
Первым делом, мы определим сообщения, которые передаются во время аукциона. Есть два абстрактных базовых класса, AuctionMessage для сообщений от клиентов к сервису аукциона, и AuctionReply для ответов сервиса. Для обоих базовых классов существует набор вариаций (cases), как это определено в листинге 3.1. Вариации классов определяют формат конкретных сообщений. Эти сообщения неплохо было бы отображать в небольшие документы [XML]. Мы ожидаем, что автоматические инструментальные средства поспособствуют таким преобразованиям между документами XML и внутренними структурами данных.
// 3.1 Сообщения abstract class AuctionMessage case class Offer(bid: int, client: Actor) extends AuctionMessage case class Inquire(client: Actor) extends AuctionMessage abstract class AuctionReply case class Status(asked: int, expire: Date) extends AuctionReply case object BestOffer extends AuctionReply case class BeatenOffer(maxBid: int) extends AuctionReply case class AuctionConcluded(seller: Actor, client: Actor) extends AuctionReply case object AuctionFailed extends AuctionReply case object AuctionOver extends AuctionReply
Листинг 3.2 предлагает реализацию класса Auction, который координирует предложение цены по лоту. Объекты этого класса создаются с указанием
- процесса продавца, который должен быть извещен о завершении торгов по лоту
- минимального предложения по цене (стартовой цены),
- даты, когда аукцион будет закрыт.
Ход торгов определён run методом. Этот метод периодически отбирает (используя метод receiveWithin) поступающие сообщения и реагирует на них, пока лот не будет закрыт (по истечении времени на торг), что сигнализируется сообщением TIMEOUT. Перед окончательным остановом, процесс остается активным на период, определенный константой timeToShutdown, и отвечает на дальнейшие предложения, что аукцион закрыт.
Вот комментарии по конструкциям использованным в этой программе:
- метод receiveWithin класса Actor принимает как параметры: продолжительность торгов - в миллисекундах, и функцию, которая обрабатывает сообщения, поступающие в почтовый ящик. Функция задана последовательностью вариаций, каждая из которых состоит из шаблона и действий. Метод receiveWithin выбирает из почтового ящика первое сообщение, которое соответствует одному из этих шаболонов и выполняет соответствующее действие.
- последний вариация в receiveWithin охраняется паттерном TIMEOUT. Если никакие другие сообщения не поступают на протяжении времени, передаваемого функции receiveWithin, по истечении этого срока срабатывает TIMEOUT, что, в конечном счёте приводит к завершению receiveWithin. TIMEOUT - по существу, это особый экземпляр класса Message.
- сообщения - ответы посылаются в следующей форме: destination send SomeMessage. send используется здесь как бинарный оператор с аргументами: процессом и сообщением. Это равносильно в Scala вызову метода destination.send( SomeMessage ).
// 3.2 Сервис аукциона
class Auction(seller: Actor, minBid: int, closing: Date) extends Actor {
val timeToShutdown = 36000000 //msec
val bidIncrement = 10
override def run() = {
var maxBid = minBid - bidIncrement
var maxBidder: Actor = _
var running = true
while (running) {
receiveWithin ((closing.getTime() – new Date().getTime())) {
case Offer(bid, client) =>
if (bid >= maxBid + bidIncrement) {
if (maxBid >= minBid) {
maxBidder send BeatenOffer(bid)
}
maxBid = bid; maxBidder = client; client send BestOffer
} else {
client send BeatenOffer(maxBid)
}
case Inquire(client) =>
client send Status(maxBid, closing)
case TIMEOUT =>
if (maxBid >= minBid) {
val reply = AuctionConcluded(seller, maxBidder)
maxBidder send reply; seller send reply
} else {
seller send AuctionFailed
}
receiveWithin(timeToShutdown) {
case Offer(_, client) => client send AuctionOver
case TIMEOUT => running = false
}
}
}
}
}
Предыдущее обсуждение показало возможности распределенного программирования в Scala. Может показаться, что Scala, как язык, имеет богатый набор языковых конструкций, поддерживает концепцию процесса - актора, асинхронный обмен сообщениями, программирование с тайм-аутами, и т.п.. В действительности это не так. Все конструкции, рассмотренные выше, вводятся как методы в библиотечном классе Actor. Сам класс, базируется на модели многопоточности, используемой в нижлежащей платформе (напр. Java, Или .NET). Расмотрение всех характеристик класса Actor, использованного здесь, дано в Разделе 16.11.
Далее авторы объясняют, почему выбранная модель IPM не встроена в язык, а представлена в виде библиотеки. Преимущество подобного подхода к многопоточности (см. также PiLib) , в относительной простоте основного языка и гибкости для разработчиков. Поскольку в основном языке нет необходимости определять детали высокоуровневой связи процессов, это позволяет сохранять простоту и общность. Поскольку конкретная модель сообщений в почтовом ящике суть - библиотечный модуль, она может свободно быть модифицирована, если это требуется в каких - то приложениях. Для такого подхода необходимо, тем не менее, чтобы базовый язык был достаточно выразителен, чтобы он мог обеспечивать необходимые языковые абстракции удобным способом. Scala разработан с учётом этого требования; одна из основных проектных целей заключалась в том, что Scala должен быть достаточно гибок, как язык - платформа для языков описания областей DSL , выполненных как библиотечные модули. Например, представленные выше коммуникации акторов могут рассматриваться как язык описания области, который, в принципе, расширяет ядро Scala.
[править] Глава 4
[править] Глава 5
[править] Глава 6
[править] Глава 7
[править] Глава 8
[править] Глава 9
[править] Глава 10
[править] Глава 11
[править] Вычисления с потоками
В предыдущих главах рассматривались переменные, присваивание и объекты, сохраняющие состояние. Мы видели как объекты реального мира, которые изменяются со временем, могут быть смоделированы изменением состояния переменных. Конечно, такие изменения обычно сокращаются или растягиваются, но их относительный порядок остаётся неизменным. (?) Такой подход представляется вполне естественным, но за него приходится платить: наша простая и выразительная модель подстановок для функционального вычисления будет более не применима, если мы введём переменные и присваивание. Есть ли другой способ? Можем ли мы моделировать изменение состояния в реальном мире, используя лишь функции без побочных эффектов? Руководствуясь соображениями математики, можно дать ответ: "да". Величина, изменяемая со временем просто представляется функцией f(t) с параметром времени t. И точно так же может быть вычислена. Но вместо переписывания переменной полученными результатами мы будем собирать их в список. Так что изменяемая переменная var x: T заменяется неизменяемым значением val x: List[T]. В сущности, теперь различные значения переменной существуют одновременно, как различные элементы списка. Преимуществом такого подхода будет возможность "путешествия по времени", т.е. просмотра нескольких последовательных значений одновременно. Другое преимущество заключается в том, что мы можем использовать библиотеку функций над списками, и это часто упрощает вычисления. Например, рассмотрим императивный способ вычисления суммы всех простых чисел из некоторого интервала:
def sumPrimes(start: int, end: int): int = {
var i = start
var acc = 0
while (i < end) {
if (isPrime(i)) acc = acc + i
i = i + 1
}
acc
}
Заметьте, переменная i "пробегает" все значения из интервала [start .. end1]. Более функциональный способ заключается в представлении списка значений i явно, как range(start, end). Далее функция может быть переписана так:
def sumPrimes(start: int, end: int) = sum(range(start, end) filter isPrime)
Бесспорно, вторая программа короче и яснее! Однако, функциональная программа также и менее эффективна, поскольку она конструирует список список чисел из интервала а затем ещё один для простых чисел. Ещё хуже с точки зрения эффективности следующий пример. Требуется найти второе простое число между 1000 и 10000:
range(1000, 10000) filter isPrime at 1
Здесь конструируется список всех чисел между 1000 and 10000. Но большая часть этого списка не будет использована! Мы можем достичь эффективного выполнения для случаев, таких как этот, с помощью уловки: Избегайте вычисления хвоста последовательности до тех пор, пока он действительно не понадобится. Определим новый класс Stream (Поток) для таких последовательностей. Потоки создаются с помощью empty - константы и конструктора cons, они определены в модуле scala.Stream. Например, следующее выражение создаёт поток с элементами 1 и 2:
Stream.cons(1, Stream.cons(2, Stream.empty))
В качестве другого примера, вот аналог List.range:
def range(start: int, end: int): Stream[int] = if (start >= end) Stream.empty else Stream.cons(start, range(start + 1, end))
Даже хотя Stream.range и List.range выглядят похоже, их поведение во время выполнения совершенно различно: Stream.range немедленно возвращает объект Stream первый элемент которого - start. Все другие элементы вычисляются только если они требуются (вызов метода tail). Как для списков для потоков определены isEmpty, head и tail. Распечатать все элементы списка можно так:
def print(xs: Stream[a]) {
if (!xs.isEmpty) { System.out.println(xs.head); print(xs.tail) }
}
Потоки также поддерживают другие аналоги методов на списках. Например, мы можем найти второе простое число между 1000 и 10000 с помощью методов filter и apply:
Stream.range(1000, 10000) filter isPrime at 1
Различие от предыдущей реализации на списках в том что теперь нет необходимости конструировать и проверять на простоту другие числа, после третьего. Два метода из класса List, не поддерживаемые классом Stream это :: и :::. Причина в том, что эти методы требуют передачи правых аргументов, а это означает, что аргументы должны быть вычислены до вызова метода. Например, в случае x :: xs на списках, хвост xs должен быть вычислен до оператора :: . Это не работает для потоков, поскольку хвост потока пока не потребуется в вычислениях, не должен вычисляться. Объяснение почему метод ::: не адаптируется для потоков - аналогично. Вместо x :: xs, используют Stream.cons(x, xs). Вместо xs ::: ys, - xs append ys.
[править] Неявные(имплицитные) параметры и преобразования.
Неявные(имплицитные) параметры и преобразования – это мощные инструменты для настройки существующих библиотек и для создания высокоуровневых абстракций. Например, давайте начнет с абстрактного класса полугрупп, который поддерживает операцию сложения.
abstract class SemiGroup[A] {
def add(x: A, y: A): A }
А это подкласс SemiGroup – класс Monoid, который добавляет поле unit.
abstract class Monoid[A] extends SemiGroup[A] {
def unit: A
}
Вот две реализации моноидов:
object stringMonoid extends Monoid[String] {
def add(x: String, y: String): String = x.concat(y)
def unit: String = ""
}
object intMonoid extends Monoid[Int] {
def add(x: Int, y: Int): Int = x + y
def unit: Int = 0
}
Метод суммирования, который работает с произвольными моноидами, может быть написан на чистой Scala следующим образом:
def sum[A](xs: List[A])(m: Monoid[A]): A =
if (xs.isEmpty) m.unit
else m.add(xs.head, sum(m)(xs.tail)
Этот метод может быть вызван, например, так:
sum(List("a", "bc", "def"))(stringMonoid) sum(List(1, 2, 3))(intMonoid) Все это работает, но выглядит не очень здорово. Проблема в том, что реализации моноида должны быть переданы в код, который их использует. Иногда мы можем захотеть, чтобы система могла вычислить нужные аргументы автоматически, аналогично тому, как это делается при выводе типов аргументов. Это то, чего помогают добиться неявные параметры.
[править] Неявные параметры: основы
В Scala 2 появилось новое ключевое слово implicit; оно может быть использовано в начале списка параметров. Синтаксис:
ParamClauses ::= {‘(’ [Param {‘,’ Param}] ’)’}
[‘(’ implicit Param {‘,’ Param} ‘)’]
Если ключевое слово присутствует, то оно делает все параметры в списке неявными. Например, в следующей версии метода sum параметр m является неявным.
def sum[A](xs: List[A])(implicit m: Monoid[A]): A =
if (xs.isEmpty) m.unit
else m.add(xs.head, sum(xs.tail))
Как можно заметить из примера, возможно комбинировать обычные и неявные параметры. Однако, для метода или конструктора может быть только один неявный параметр, и он должен быть последним.
Ключевое слово implicit может также быть использовано как модификатор в определениях и объявлениях. Примеры:
implicit object stringMonoid extends Monoid[String] {
def add(x: String, y: String): String = x.concat(y)
def unit: String = ""
}
implicit object intMonoid extends Monoid[Int] {
def add(x: Int, y: Int): Int = x + y
def unit: Int = 0
}
Основная идея неявных параметров – это то, что аргументы для них могут быть выведены из вызова метода. Если аргументы, отвечающие за секцию неявного параметра отсутствуют, тогда они выводятся Scala компилятором. Реальные аргументы, которые могут быть переданы в качестве неявного параметра, – это все идентификаторы X, которые доступны в точке вызова метода без префикса и которые помечены как неявные. Если существует несколько аргументов, подходящих по типу к неявному параметру, компилятор Scala выберет наиболее точный (специфичный), используя стандартные правила разрешения статической перегрузки. Например, пусть вызывается
sum(List(1, 2, 3))
в контексте, где доступны stringMonoid и intMonoid. Мы знаем, что формальный параметр метода sum должен быть типа int. Единственное значение, подходящее к типу формального параметра Monoid[int], это intMonoid, поэтому этот объект будет передан как неявный параметр. Это обсуждение также показывает, что неявный параметр выводится после того, как выведен любой тип аргументов (?).
[править] Неявные преобразования
Пусть у вас есть выражение E типа T, хотя ожидается тип S. T не приводится к S ни одним из предопределенных преобразований. Тогда компилятор Scala попытается применить последнее средство: неявное преобразование I(E). Здесь, I – это идентификатор, выражающий неявное определение или параметр, доступный без префикса в точке преобразования, применимый к аргументам типа T и чей результирующий тип совместим с типом S. Неявные преобразования также могут быть применены в селекторах членов. Получив E.x, где x – это не член типа E, компилятор Scala постарается добавить неявное преобразование I(E).x так, чтобы x был членом I(E). Это пример неявного преобразования функции, которая преобразует целые числа в экземпляры класса scala.Ordered:
implicit def int2ordered(x: Int): Ordered[Int] = new Ordered[Int] {
def compare(y: Int): Int =
if (x < y1) 1
else if (x > y1) 1
else 0
}
[править] View Bounds
View bounds – это удобный синтаксический сахар для неявных параметров. Рассмотрим обобщенный метод сортировки:
def sort[A <% Ordered[A]](xs: List[A]): List[A] =
if (xs.isEmpty || xs.tail.isEmpty) xs
else {
val {ys, zs} = xs.splitAt(xs.length / 2)
merge(ys, zs)
}
View bounded параметр типа [A <% Ordered[A]] выражает, что сортировка применима для списков таких типов, для которых существует неявное преобразование A к Ordered[A]. Определение трактуется как укороченный синтаксис для следующей сигнатуры метода с неявным параметром:
def sort[A](xs: List[A])(implicit c: A => Ordered[A]): List[A] = ...
(Здесь имя параметра c выбрано произвольно с учетом того, чтобы оно не конфликтовало с другими именами в программе.) В качестве более детального примера, рассмотрим метод merge, который появился в методе sort выше:
def merge[A <% Ordered[A]](xs: List[A], ys: List[A]): List[A] =
if (xs.isEmpty) ys
else if (ys.isEmpty) xs
else if (xs.head < ys.head) xs.head :: merge(xs.tail, ys)
else if ys.head :: merge(xs, ys.tail)
После раскрытия view bounds и вставки неявных преобразований, реализация метода принимает вид:
def merge[A](xs: List[A], ys: List[A])
(implicit c: A => Ordered[A]): List[A] =
if (xs.isEmpty) ys
else if (ys.isEmpty) xs
else if (c(xs.head) < ys.head) xs.head :: merge(xs.tail, ys)
else if ys.head :: merge(xs, ys.tail)(c)
Последние две строки определения этого метода демонстрируют два различных использования неявного параметра c. Он применяется в преобразовании в условии во второй строке с конца и передается как неявный аргумент в рекурсивный вызов merge в последней строке.
[править] Глава 15
[править] Абстракции для многопоточности
В этом разделе рассматриваются стандартные паттерны для параллельного программирования и показывается как они могут быть реализованы в Scala. "Процесс" и "поток" используются авторами как синонимы, примеры кода приведены без изменений.
[править] Сигналы и мониторы
В Scala возможность взаимного исключения потоков обеспечивает монитор. Каждый экземпляр класса AnyRef может быть использован в качестве монитора, посредством вызова одного из этих методов:
def synchronized[a] (exec: => a): a def wait(): unit def wait(msec: long): unit def notify(): unit def notifyAll(): unit
Синхронизированный метод (оформленный с помощью служебного слова synchronized) выполняет вычисления exec в режиме взаимного исключения - в любое время, только один поток может выполнять синхронизированный метод, иначе: метод требует экслюзивного доступа. При вызове такого метода (synchronized блока) соответствующий экземпляр объекта блокируется на вызов остальных synchronized методов, что впрочем, не мешает параллельному выполнению несинхронизированных методов.
Поток может приостановиться на мониторе, ожидая сигнала. Поток, вызвавший метод wait снимает блокировку и переходит в режим ожидания вызова другим потоком метода notify на том же объекте. Вызовы notify при отсутствии потоков, ожидающих этот сигнал, игнорируются.
Есть также форма wait, c указанием времени блокировки (в миллисекундах), по истечении которого, процесс обязательно возобновит работу.
Кроме того, есть метод notifyAll, который разблокирует все потоки, ждущие сигнал.
Эти методы, а также класс Monitor являются примитивами в Scala; они реализованы, основываясь на нижлежащих runtime системах.
Обычно поток ждет некоторое условие для выполнения программ. Если условие не выполнено - при вызове wait, поток усыпляется, пока некоторый другой поток формирует условие. И в ответственности последнего пробуждение ожидающих потоков, посредством вызова notify или notifyAll. Заметьте, тем не менее, нет гарантии, что ожидающий поток начнёт исполняться немедленно после вызова notify / notifyAll.
Может статься, что другой поток за это время в очередной раз изменит условие запуска, аннулирует его. Следовательно, правильная форма ожидания условия C использует while цикл:
while (!C) wait()
а не
if (!C) wait()
В качестве примера монитора - реализация буфера с ограничением.
class BoundedBuffer[a](N: Int) {
var in = 0, out = 0, n = 0
val elems = new Array[a](N)
def put(x: a) = synchronized {
while (n >= N) wait()
elems(in) = x ; in = (in + 1) % N ; n = n + 1
if (n == 1) notifyAll()
}
def get: a = synchronized {
while (n == 0) wait()
val x = elems(out) ; out = (out + 1) % N ; n = n - 1
if (n == N - 1)
notifyAll()
x
}
}
А вот программа, использующая этот буфер, для общения между процессами производителем и потребителем.
import scala.concurrent.ops._
...
val buf = new BoundedBuffer[String](10)
spawn { while (true) { val s = produceString ; buf.put(s) } }
spawn { while (true) { val s = buf.get ; consumeString(s) } }
Метод spawn запускает новый поток, который выполняет выражение, переданное параметром. В объекте scala.concurrent.ops он определяется следующим образом.
def spawn(p: => unit) = {
val t = new Thread() { override def run() = p; }
t.start()
}
[править] Синхронизированные переменные
Синхронизированная переменная предлагает методы get и put, для чтения и установки переменной. Операции get блокируют поток, пока переменная не определена. Операция unset сбрасывает переменную в неопределенное состояние.
Вот стандартная реализация синхронизированных переменных.
package scala.concurrent
class SyncVar[a] {
private var isDefined: Boolean = false
private var value: a = _
def get = synchronized {
if (isDefined) wait()
value
}
def set(x: a) = synchronized {
value = x ; isDefined = true ; notifyAll()
}
def isSet: Boolean = synchronized {
isDefined
}
def unset = synchronized {
isDefined = false;
}
}
[править] Futures
Future (оставлено без перевода) является величиной, которая вычисляется параллельно некоторому потоку, чтобы быть использованной им немного позже. Future используется для облегчения параллельной обработки ресурсов. Типичное применение:
import scala.concurrent.ops._ ... val x = future(someLengthyComputation) anotherLengthyComputation val y = f(x()) + g(x())
Метод future определён в scala.concurrent.ops следующим образом.
def future[a](p: => a): unit => a = {
val result = new SyncVar[a]
fork { result.set(p) }
(() => result.get)
}
Метод future для выполнения получает как параметр вычисление p. Тип возвращаемого им результата есть тип a - future параметра. Mетод определяет охрану result, которая принимает параметр, представляющий результат вычисления. Затем ответвляется новый поток (fork), который вычисляет результат, и вызывает охрану result по завершению. Параллельно этому, future возвращает анонимную функцию. При возвращении анонимной функции, поток может блокироваться на result, в ожидании вызова notify на охране, чтобы вернуть результат.
Обратите внимание, что последущие вызовы функции могут возвращать результат немедленно.
[править] Параллельные вычисления
Следующий пример демонстрирует функцию par, которая принимает пару вычислений как параметры и возвращает результаты вычислений в другую пару. Оба вычисления выполняются параллельно.
Функция определена в объекте scala.concurrent.ops следующим образом.
def par[a, b](xp: => a, yp: => b): Pair[a, b] = {
val y = new SyncVar[b]
spawn { y set yp }
Pair(xp, y.get)
}
Определенная там же функция replicate, выполняет параллельно реплицирующиеся вычисления. Каждый случай репликации связывается с целым числом, для идентификации.
def replicate(start: Int, end: Int)(p: Int => Unit): Unit = {
if (start == end)
()
else if (start + 1 == end)
p(start)
else {
val mid = (start + end) / 2
spawn { replicate(start, mid)(p) }
replicate(mid, end)(p)
}
}
Следующая функция использует replicate для выполнения одновременных вычислений на всех элементах массива.
def parMap[a,b](f: a => b, xs: Array[a]): Array[b] = {
val results = new Array[b](xs.length)
replicate(0, xs.length) { i => results(i) = f(xs(i)) }
results
}
[править] Семафоры
Классический механизм синхронизации процессов - семафор. Он предлагает два элементарных действия: занятие и освобождение. Вот реализация простого семафора в Scala:
package scala.concurrent
class Lock {
var available = true
def acquire = synchronized {
if (!available) wait()
available = false
}
def release = synchronized {
available = true
notify()
}
}
[править] Читатели/Писатели
Более сложная форма синхронизации различает читающие потоки, которые имеют доступ к общему ресурсу, не модифицируя его, и потоки, которые могут и читать, и изменять ресурс - писатели . Для того, чтобы синхронизировать читателей и писателей, которые нам нужно осуществлять действия startRead, startWrite, endRead, endWrite, при том, что:
- может быть множество конкурирующих читателей,
- в любой момент может только быть один писатель,
- запись имеет больший приоритет над чтением, но прерывыать процесс чтения не может.
Следующая реализация читателей/писателей основана на понятии почтового ящика (смотри раздел 16.10).
import scala.concurrent._
class ReadersWriters {
val m = new MailBox
private case class Writers(n: int), Readers(n: int) { m send this; }
Writers(0); Readers(0)
def startRead = m receive {
case Writers(n) if n == 0 => m receive {
case Readers(n) => Writers(0) ; Readers(n+1)
}
}
def startWrite = m receive {
case Writers(n) => Writers(n+1)
m receive { case Readers(n) if n == 0 => }
}
def endRead = m receive {
case Readers(n) => Readers(n - 1)
}
def endWrite = m receive {
case Writers(n) => Writers(n - 1); if (n == 0) Readers(0)
}
}
[править] Асинхронные каналы
Основной способ межпроцессной связи - асинхронный канал. Здесь используется следующий простой класс для связанных списков:
class LinkedList[a] {
var elem: a = _
var next: LinkedList[a] = null
}
который, в принципе, формирует вершину списка. Пустые связанные списки начинаются с ложного узла, чей преемник null.
Класс Channel использует связанный список, чтобы загружать данные, которые посланы, но ещё не прочитаны.
Потоки, которым необходимо читать данные, при пустом канале, регистрируют своё наличие, увеличивая nreaders величину и ожидая notify.
package scala.concurrent
class Channel[a] {
class LinkedList[a] {
var elem: a = _
var next: LinkedList[a] = null
}
private var written = new LinkedList[a]
private var lastWritten = written
private var nreaders = 0
def write(x: a) = synchronized {
lastWritten.elem = x
lastWritten.next = new LinkedList[a]
lastWritten = lastWritten.next
if (nreaders > 0) notify()
}
def read: a = synchronized {
if (written.next == null) {
nreaders = nreaders + 1; wait(); nreaders = nreaders 1
}
val x = written.elem
written = written.next
x
}
}
[править] Синхронные каналы
Вот реализация синхронного канала, где передатчик сообщения блокируется, пока это сообщение не получено. Синхронному каналу необходима единственная переменная, чтобы хранить сообщения для транспорта, и две используются, чтобы координировать процессы чтения и записи.
package scala.concurrent
class SyncChannel[a] {
private var data: a = _
private var reading = false
private var writing = false
def write(x: a) = synchronized {
while (writing) wait()
data = x
writing = true
if (reading) notifyAll()
else while (!reading) wait()
}
def read: a = synchronized {
while (reading) wait()
reading = true
while (!writing) wait()
val x = data
writing = false
reading = false
notifyAll()
x
}
}
[править] Сервера вычислений
Реализация сервера вычислений в Scala. Сервер выполняет future метод, который вычисляет данное выражение параллельно своему вызову. В отличие от реализации в Разделе 16.3, сервер вычисляет future только для предопределённого числа потоков. Предполагаемая реализация сервера могла бы запускать каждый поток на отдельном процессоре, что позволило бы избежать усложнений от переключения контекста потока, что характерно для однопроцессорных систем.
import scala.concurrent._, scala.concurrent.ops._
class ComputeServer(n: Int) {
private abstract class Job {
type t
def task: t
def ret(x: t): Unit
}
private val openJobs = new Channel[Job]()
private def processor(i: Int): Unit = {
while (true) {
val job = openJobs.read
job.ret(job.task)
}
}
def future[a](p: => a): () => a = {
val reply = new SyncVar[a]()
openJobs.write{
new Job {
type t = a
def task = p
def ret(x: a) = reply.set(x)
}
}
() => reply.get
}
spawn(replicate(0, n) { processor })
}
Выражения, которые нужно выполнять (т.е. аргументы future), пишутся в Job. Job является объектом с
- абстрактным типом t, описывающим результат вычислений job.
- методом task, без параметров, типа t, который обозначает выражение для вычислений.
- методом ret, который использует результат как только тот будет вычислен.
Вычислительный сервер создает n процессоров, во время своей инициализации. Каждый такой процессор периодически потребляет job, выполняет метод task и передает результат на в ret метод Job.
Пример демонстрирует использование абстрактных типов. Абстрактный тип t определяет результат работы, который может варьироваться от одного Job к другому. Без абстрактных типов было бы невозможно, реализовать подобный же класс в статическом, типобезопасном стиле. Потребовались бы динамическая проверка типов, приведение типов.
Вот пример использования вычислительного сервера, для оценки выражения 41 + 1.
object Test with Executable {
val server = new ComputeServer(1)
val f = server.future(41 + 1)
Console.println(f())
}
[править] Почтовые ящики
Почтовые ящики - это высокоуровневые и гибкие конструкции для синхронизации и взаимодействия процессов.
Они позволяют посылать и получать сообщения. "Сообщение" - означает произвольный объект. Есть специальное сообщения TIMEOUT, оно используется для сигнализиации о задержке.
case object TIMEOUT
Почтовые ящики определяют следующие методы.
class MailBox {
def send(msg: Any): unit
def receive[a](f: PartialFunction[Any, a]): a
def receiveWithin[a](msec: long)(f: PartialFunction[Any, a]): a
}
Содержимое почтового ящика состоит из набора сообщений. Сообщения добавляются к почтовому ящику методом send. Сообщения удаляются посредством метода receive, который передаёт обработчик сообщения f как аргумент. f является частичной функцией на сообщениях, со значениями в некотором произвольном типе. Обычно, эта функция реализуется с помощью соответствия по шаблону (pattern matching expression). Метод receive блокируется пока есть сообщение в почтовом ящике для которого процессор определен. Сообщение затем, по обработке, удаляется из почтового ящика и блокированный поток перезапускается.
Как отправленные сообщения, так и полученные, упорядочены во времени. Получатель r применяется к соответствующему сообщению m только если нет другой пары (сообщение, получатель), которая предшествует (m, r) на частичном упорядочении в парах.
Простой пример использования почтовых ящиков. Рассмотрим одноместный буфер:
class OnePlaceBuffer {
private val m = new MailBox; // An internal mailbox
private case class Empty, Full(x: int); // Types of messages we deal
with
m send Empty; // Initialization
def write(x: int): unit =
m receive { case Empty => m send Full(x) }
def read: int =
m receive { case Full(x) => m send Empty ; x }
}
Почтовый ящик может быть реализован так:
class MailBox {
private abstract class Receiver extends Signal {
def isDefined(msg: Any): boolean
}
var msg = null
}
Мы определяем внутренний класс для получателей с методом теста isDefined, который сообщает определён ли получатель для данного сообщения. Получатель наследует от класса Signal notify метод, который используется, для "побудки" потока получателя. Когда поток получателя разбужена, сообщение, которое должно быть использованно, слохраняется в переменной msg.
private val sent = new LinkedList[Any] private var lastSent = sent private val receivers = new LinkedList[Receiver] private var lastReceiver = receivers
Mailbox класс использует два связных списка, один для отправки необработанных сообщений, другой - для ожидания обработчиков.
def send(msg: Any): unit = synchronized {
var r = receivers, r1 = r.next
while (r1 != null && !r1.elem.isDefined(msg)) {
r = r1; r1 = r1.next
}
if (r1 != null) {
r.next = r1.next; r1.elem.msg = msg; r1.elem.notify
} else {
lastSent = insert(lastSent, msg)
}
}
Send метод сначала проверяет, применим ли ждущий получатель к посылаемому сообщению. Если да, получатель извещается. В противном случае, сообщение будет добавлено в связанный список посланных сообщений.
def receive[a](f: PartialFunction[Any, a]): a = {
val msg: Any = synchronized {
var s = sent, s1 = s.next
while (s1 != null && !f.isDefinedAt(s1.elem)) {
s = s1; s1 = s1.next
}
if (s1 != null) {
s.next = s1.next; s1.elem
} else {
val r = insert(lastReceiver, new Receiver {
def isDefined(msg: Any) = f.isDefinedAt(msg)
})
lastReceiver = r
r.elem.wait()
r.elem.msg
}
}
f(msg)
}
Метод send сначала проверяет применима ли к сообщению функция f обработчика к сообщению, что уже было послано но, ещё не обрабатывалось. Если это верно, поток немедленно применяет f к сообщению. В противном случае, новый получатель создаётся и связывается в списке получателей, и поток ждет уведомление на этом получателе. Как только поток будет разбужен снова, он продолжит выполнение, прилагая f к сообщению, которое было сохранено в получателе. Метод insert в связанном списке определяется следующим образом.
def insert(l: LinkedList[a], x: a): LinkedList[a] = {
l.next = new LinkedList[a]
l.next.elem = x
l.next.next = l.next
l
}
Класс почтового ящика также предлагает метод receiveWithin который блокируется только на определенный интервал времени. Если в течение оговоренного времени (в миллисекундах), никакое сообщение не приходит, процессор сообщения f будет разблокирован специальным сообщением TIME-OUT. Реализация receiveWithin почти как receive:
def receiveWithin[a](msec: long)(f: PartialFunction[Any, a]): a = {
val msg: Any = synchronized {
var s = sent, s1 = s.next
while (s1 != null && !f.isDefinedAt(s1.elem)) {
s = s1; s1 = s1.next
}
if (s1 != null) {
s.next = s1.next; s1.elem
} else {
val r = insert(lastReceiver, new Receiver {
def isDefined(msg: Any) = f.isDefinedAt(msg)
})
lastReceiver = r
r.elem.wait(msec)
if (r.elem.msg == null) r.elem.msg = TIMEOUT
r.elem.msg
}
}
f(msg)
}
} // end MailBox
Различия очевидны.
[править] Акторы
В главе 3 представлен набросок электронного аукциона. Этот сервис основывался на высокоуровневых процессах - акторах, которые управляются сообщениями в почтовом ящике, используя сравнение по образцу. Актор является просто потоком, основа коммуникации которого - почтовый ящик. Акторы, следовательно, определяются как mixin композиция расширения класса Thread с классом MailBox.
abstract class Actor extends Thread with MailBox