Scala в примерах: различия между версиями

Материал из Викиучебника — открытых книг для открытого мира
Содержимое удалено Содержимое добавлено
Нет описания правки
Нет описания правки
Строка 1: Строка 1:
''Здесь ведётся перевод книги Martin'а Odersky "Scala by examples"''
''Здесь ведётся перевод книги Martin'а Odersky "Scala by examples"''
''Присоединяйтесь!''
''Присоединяйтесь!''
{{wikipedia|Scala}}
==Глава 1==
==Глава 1==



Версия от 07:23, 21 октября 2006

Здесь ведётся перевод книги Martin'а Odersky "Scala by examples" Присоединяйтесь!

Глава 1

Глава 2

Программирование с Акторами и Сообщениями

Вот пример, который демонстрирует область, в которой Scala особенно хорош. Рассмотрим задачу по созданию электронного аукциона. Для реализации участников аукциона мы используем модель актора - процесса в erlang стиле.

Акторы являются объектами, которым посылаются сообщения. Каждый процесс имеет "почтовый ящик" для поступающих сообщений, который представлен очередью. Сообщения в очереди могут обрабатываться или в последовательном порядке, или выборочно, как удовлетворяющие некоторому образцу. Для каждого лота есть процесс аукциона, который публикует информацию о лоте и принимает предложения от участников аукциона, и который общается с продавцом лота и покупателем, победившем на аукционе, для завершения сделки.

Первым делом, мы определим сообщения, которые передаются во время аукциона. Есть два абстрактных базовых класса, AuctionMessage для сообщений от клиентов к сервису аукциона, и AuctionReply для ответов сервиса. Для обоих базовых классов существует набор вариаций (cases), как это определено в листинге 3.1. Вариации классов определяют формат конкретных сообщений. Эти сообщения неплохо было бы отображать в небольшие документы [XML]. Мы ожидаем, что автоматические инструментальные средства поспособствуют таким преобразованиям между документами XML и внутренними структурами данных.

// 3.1 Сообщения
abstract class AuctionMessage
 case class Offer(bid: int, client: Actor) extends AuctionMessage
 case class Inquire(client: Actor) extends AuctionMessage
abstract class AuctionReply
 case class Status(asked: int, expire: Date) extends AuctionReply
 case object BestOffer extends AuctionReply
 case class BeatenOffer(maxBid: int) extends AuctionReply
 case class AuctionConcluded(seller: Actor, client: Actor) extends AuctionReply
 case object AuctionFailed extends AuctionReply
 case object AuctionOver extends AuctionReply

Листинг 3.2 предлагает реализацию класса Auction, который координирует предложение цены по лоту. Объекты этого класса создаются с указанием

  • процесса продавца, который должен быть извещен о завершении торгов по лоту
  • минимального предложения по цене (стартовой цены),
  • даты, когда аукцион будет закрыт.

Ход торгов определён run методом. Этот метод периодически отбирает (используя метод receiveWithin) поступающие сообщения и реагирует на них, пока лот не будет закрыт (по истечении времени на торг), что сигнализируется сообщением TIMEOUT. Перед окончательным остановом, процесс остается активным на период, определенный константой timeToShutdown, и отвечает на дальнейшие предложения, что аукцион закрыт.

Вот комментарии по конструкциям использованным в этой программе:

  • метод receiveWithin класса Actor принимает как параметры: продолжительность торгов - в миллисекундах, и функцию, которая обрабатывает сообщения, поступающие в почтовый ящик. Функция задана последовательностью вариаций, каждая из которых состоит из шаблона и действий. Метод receiveWithin выбирает из почтового ящика первое сообщение, которое соответствует одному из этих шаболонов и выполняет соответствующее действие.
  • последний вариация в receiveWithin охраняется паттерном TIMEOUT. Если никакие другие сообщения не поступают на протяжении времени, передаваемого функции receiveWithin, по истечении этого срока срабатывает TIMEOUT, что, в конечном счёте приводит к завершению receiveWithin. TIMEOUT - по существу, это особый экземпляр класса Message.
  • сообщения - ответы посылаются в следующей форме: destination send SomeMessage. send используется здесь как бинарный оператор с аргументами: процессом и сообщением. Что равносильно вызову метода send с параметром SomeMessage на объекте destination, destination.send( SomeMessage ) Это равносильно в Scala вызову метода destination.send( SomeMessage ).
// 3.2 Сервис аукциона
class Auction(seller: Actor, minBid: int, closing: Date) extends Actor {
    val timeToShutdown = 36000000 //msec
    val bidIncrement = 10
    override def run() = {
        var maxBid = minBid - bidIncrement
        var maxBidder: Actor = _
        var running = true
        while (running) {
            receiveWithin ((closing.getTime() – new Date().getTime())) {
                case Offer(bid, client) =>
                    if (bid >= maxBid + bidIncrement) {
                        if (maxBid >= minBid) {
                            maxBidder send BeatenOffer(bid)
                        }
                        maxBid = bid; maxBidder = client; client send BestOffer
                    } else {
                        client send BeatenOffer(maxBid)
                    }
                case Inquire(client) =>
                    client send Status(maxBid, closing)
                case TIMEOUT =>
                    if (maxBid >= minBid) {
                        val reply = AuctionConcluded(seller, maxBidder)
                        maxBidder send reply; seller send reply
                    } else {
                        seller send AuctionFailed
                    }
                    receiveWithin(timeToShutdown) {
                        case Offer(_, client) => client send AuctionOver
                        case TIMEOUT => running = false
                    }
               }
          }
     }
}

Предыдущее обсуждение показало возможности распределенного программирования в Scala. Может показаться, что Scala, как язык, имеет богатый набор языковых конструкций, поддерживает концепцию процесса - актора, асинхронный обмен сообщениями, программирование с тайм-аутами, и т.п.. В действительности это не так. Все конструкции, рассмотренные выше, вводятся как методы в библиотечном классе Actor. Сам класс, базируется на модели многопоточности, используемой в нижлежащей платформе (напр. Java, Или .NET). Расмотрение всех характеристик класса Actor, использованного здесь, дано в Разделе 16.11.

Далее авторы объясняют, почему выбранная модель IPM не встроена в язык, а представлена в виде библиотеки. Преимущество подобного подхода к многопоточности (см. также PiLib) , в относительной простоте основного языка и гибкости для разработчиков. Поскольку в основном языке нет необходимости определять детали высокоуровневой связи процессов, это позволяет сохранять простоту и общность. Поскольку конкретная модель сообщений в почтовом ящике суть - библиотечный модуль, она может свободно быть модифицирована, если это требуется в каких - то приложениях. Для такого подхода необходимо, тем не менее, чтобы базовый язык был достаточно выразителен, чтобы он мог обеспечивать необходимые языковые абстракции удобным способом. Scala разработан с учётом этого требования; одна из основных проектных целей заключалась в том, что Scala должен быть достаточно гибок, как язык - платформа для языков описания областей DSL , выполненных как библиотечные модули. Например, представленные выше коммуникации акторов могут рассматриваться как язык описания области, который, в принципе, расширяет ядро Scala.

Глава 4

Глава 5

Глава 6

Глава 7

Глава 8

Глава 9

Глава 10

Глава 11

Глава 12

Глава 13

Глава 14

Глава 15

Абстракции для многопоточности

В этом разделе рассматриваются стандартные паттерны для параллельного программирования и показывается как они могут быть реализованы в Scala. "Процесс" и "поток" используются авторами как синонимы, примеры кода приведены без изменений.

Сигналы и мониторы

В Scala возможность взаимного исключения потоков обеспечивает монитор. Каждый экземпляр класса AnyRef может быть использован в качестве монитора, посредством вызова одного из этих методов:

def synchronized[a] (exec: => a): a
def wait(): unit
def wait(msec: long): unit
def notify(): unit
def notifyAll(): unit

Синхронизированный метод (оформленный с помощью служебного слова synchronized) выполняет вычисления exec в режиме взаимного исключения - в любое время, только один поток может выполнять синхронизированный метод, иначе: метод требует экслюзивного доступа. При вызове такого метода (synchronized блока) соответствующий экземпляр объекта блокируется на вызов остальных synchronized методов, что впрочем, не мешает параллельному выполнению несинхронизированных методов.

Поток может приостановиться на мониторе, ожидая сигнала. Поток, вызвавший метод wait снимает блокировку и переходит в режим ожидания вызова другим потоком метода notify на том же объекте. Вызовы notify при отсутствии потоков, ожидающих этот сигнал, игнорируются.

Есть также форма wait, c указанием времени блокировки (в миллисекундах), по истечении которого, процесс обязательно возобновит работу.

Кроме того, есть метод notifyAll, который разблокирует все потоки, ждущие сигнал.

Эти методы, а также класс Monitor являются примитивами в Scala; они реализованы, основываясь на нижлежащих runtime системах.

Обычно поток ждет некоторое условие для выполнения программ. Если условие не выполнено - при вызове wait, поток усыпляется, пока некоторый другой поток формирует условие. И в ответственности последнего пробуждение ожидающих потоков, посредством вызова notify или notifyAll. Заметьте, тем не менее, нет гарантии, что ожидающий поток начнёт исполняться немедленно после вызова notify / notifyAll.

Может статься, что другой поток за это время в очередной раз изменит условие запуска, аннулирует его. Следовательно, правильная форма ожидания условия C использует while цикл:

while (!C) wait()

а не

if (!C) wait()

В качестве примера монитора - реализация буфера с ограничением.

class BoundedBuffer[a](N: Int) {
    var in = 0, out = 0, n = 0
    val elems = new Array[a](N)
    def put(x: a) = synchronized {
        while (n >= N) wait()
        elems(in) = x ; in = (in + 1) % N ; n = n + 1
        if (n == 1) notifyAll()
    }
    def get: a = synchronized {
       while (n == 0) wait()
       val x = elems(out) ; out = (out + 1) % N ; n = n - 1
       if (n == N - 1)
       notifyAll()
       x
    }
}

А вот программа, использующая этот буфер, для общения между процессами производителем и потребителем.

import scala.concurrent.ops._
...
val buf = new BoundedBuffer[String](10)
spawn { while (true) { val s = produceString ; buf.put(s) } }
spawn { while (true) { val s = buf.get ; consumeString(s) } }

Метод spawn запускает новый поток, который выполняет выражение, переданное параметром. В объекте scala.concurrent.ops он определяется следующим образом.

def spawn(p: => unit) = {
    val t = new Thread() { override def run() = p; }
    t.start()
}

Синхронизированные переменные

Синхронизированная переменная предлагает методы get и put, для чтения и установки переменной. Операции get блокируют поток, пока переменная не определена. Операция unset сбрасывает переменную в неопределенное состояние.

Вот стандартная реализация синхронизированных переменных.

package scala.concurrent
class SyncVar[a] {
    private var isDefined: Boolean = false
    private var value: a = _
    def get = synchronized {
        if (isDefined) wait()
        value
    }
    def set(x: a) = synchronized {
        value = x ; isDefined = true ; notifyAll()
    }
    def isSet: Boolean = synchronized {
        isDefined
    }
    def unset = synchronized {
        isDefined = false;
    }
}

Futures

Future (оставлено без перевода) является величиной, которая вычисляется параллельно некоторому потоку, чтобы быть использованной им немного позже. Future используется для облегчения параллельной обработки ресурсов. Типичное применение:

import scala.concurrent.ops._
...
val x = future(someLengthyComputation)
anotherLengthyComputation
val y = f(x()) + g(x())

Метод future определён в scala.concurrent.ops следующим образом.

def future[a](p: => a): unit => a = {
    val result = new SyncVar[a]
    fork { result.set(p) }
    (() => result.get)
}

Метод future для выполнения получает как параметр вычисление p. Тип возвращаемого им результата есть тип a - future параметра. Mетод определяет охрану result, которая принимает параметр, представляющий результат вычисления. Затем ответвляется новый поток (fork), который вычисляет результат, и вызывает охрану result по завершению. Параллельно этому, future возвращает анонимную функцию. При возвращении анонимной функции, поток может блокироваться на result, в ожидании вызова notify на охране, чтобы вернуть результат.

Обратите внимание, что последущие вызовы функции могут возвращать результат немедленно.

Параллельные вычисления

Следующий пример демонстрирует функцию par, которая принимает пару вычислений как параметры и возвращает результаты вычислений в другую пару. Оба вычисления выполняются параллельно.

Функция определена в объекте scala.concurrent.ops следующим образом.

def par[a, b](xp: => a, yp: => b): Pair[a, b] = {
    val y = new SyncVar[b]
    spawn { y set yp }
    Pair(xp, y.get)
}

Определенная там же функция replicate, выполняет параллельно реплицирующиеся вычисления. Каждый случай репликации связывается с целым числом, для идентификации.

def replicate(start: Int, end: Int)(p: Int => Unit): Unit = {
    if (start == end)
        ()
    else if (start + 1 == end)
        p(start)
    else {
        val mid = (start + end) / 2
        spawn { replicate(start, mid)(p) }
        replicate(mid, end)(p)
    }
}

Следующая функция использует replicate для выполнения одновременных вычислений на всех элементах массива.

def parMap[a,b](f: a => b, xs: Array[a]): Array[b] = {
    val results = new Array[b](xs.length)
    replicate(0, xs.length) { i => results(i) = f(xs(i)) }
    results
}

Семафоры

Классический механизм синхронизации процессов - семафор. Он предлагает два элементарных действия: занятие и освобождение. Вот реализация простого семафора в Scala:

package scala.concurrent
class Lock {
    var available = true
    def acquire = synchronized {
        if (!available) wait()
        available = false
    }
    def release = synchronized {
        available = true
        notify()
    }
}

Читатели/Писатели

Более сложная форма синхронизации различает читающие потоки, которые имеют доступ к общему ресурсу, не модифицируя его, и потоки, которые могут и читать, и изменять ресурс - писатели . Для того, чтобы синхронизировать читателей и писателей, которые нам нужно осуществлять действия startRead, startWrite, endRead, endWrite, при том, что:

  • может быть множество конкурирующих читателей,
  • в любой момент может только быть один писатель,
  • запись имеет больший приоритет над чтением, но прерывыать процесс чтения не может.

Следующая реализация читателей/писателей основана на понятии почтового ящика (смотри раздел 16.10).

import scala.concurrent._
class ReadersWriters {
    val m = new MailBox
    private case class Writers(n: int), Readers(n: int) { m send this; }
    Writers(0); Readers(0)
    def startRead = m receive {
        case Writers(n) if n == 0 => m receive {
            case Readers(n) => Writers(0) ; Readers(n+1)
        }
    }
    def startWrite = m receive {
        case Writers(n) =>  Writers(n+1)
        m receive { case Readers(n) if n == 0 => }
    }
    def endRead = m receive {
        case Readers(n) => Readers(n - 1)
    }
    def endWrite = m receive {
        case Writers(n) => Writers(n - 1); if (n == 0) Readers(0)
    }
}

Асинхронные каналы

Основной способ межпроцессной связи - асинхронный канал. Здесь используется следующий простой класс для связанных списков:

class LinkedList[a] {
    var elem: a = _
    var next: LinkedList[a] = null
}

который, в принципе, формирует вершину списка. Пустые связанные списки начинаются с ложного узла, чей преемник null.

Класс Channel использует связанный список, чтобы загружать данные, которые посланы, но ещё не прочитаны.

Потоки, которым необходимо читать данные, при пустом канале, регистрируют своё наличие, увеличивая nreaders величину и ожидая notify.

package scala.concurrent
class Channel[a] {
    class LinkedList[a] {
        var elem: a = _
        var next: LinkedList[a] = null
    }
    private var written = new LinkedList[a]
    private var lastWritten = written
    private var nreaders = 0
    def write(x: a) = synchronized {
        lastWritten.elem = x
        lastWritten.next = new LinkedList[a]
        lastWritten = lastWritten.next
        if (nreaders > 0) notify()
    }
    def read: a = synchronized {
        if (written.next == null) {
            nreaders = nreaders + 1; wait(); nreaders = nreaders 1
        }
        val x = written.elem
        written = written.next
        x
    }
}

Синхронные каналы

Вот реализация синхронного канала, где передатчик сообщения блокируется, пока это сообщение не получено. Синхронному каналу необходима единственная переменная, чтобы хранить сообщения для транспорта, и две используются, чтобы координировать процессы чтения и записи.

package scala.concurrent
class SyncChannel[a] {
    private var data: a = _
    private var reading = false
    private var writing = false
    def write(x: a) = synchronized {
        while (writing) wait()
        data = x
        writing = true
        if (reading) notifyAll()
        else while (!reading) wait()
    }
    def read: a = synchronized {
        while (reading) wait()
        reading = true
        while (!writing) wait()
        val x = data
        writing = false
        reading = false
        notifyAll()
        x
    }
}

Сервера вычислений

Реализация сервера вычислений в Scala. Сервер выполняет future метод, который вычисляет данное выражение параллельно своему вызову. В отличие от реализации в Разделе 16.3, сервер вычисляет future только для предопределённого числа потоков. Предполагаемая реализация сервера могла бы запускать каждый поток на отдельном процессоре, что позволило бы избежать усложнений от переключения контекста потока, что характерно для однопроцессорных систем.

import scala.concurrent._, scala.concurrent.ops._
class ComputeServer(n: Int) {
    private abstract class Job {
        type t
        def task: t
        def ret(x: t): Unit
    }
    private val openJobs = new Channel[Job]()
    private def processor(i: Int): Unit = {
        while (true) {
            val job = openJobs.read
            job.ret(job.task)
        }
    }
    def future[a](p: => a): () => a = {
        val reply = new SyncVar[a]()
        openJobs.write{
            new Job {
                type t = a
                def task = p
                def ret(x: a) = reply.set(x)
            }
        }
        () => reply.get
    }
    spawn(replicate(0, n) { processor })
}

Выражения, которые нужно выполнять (т.е. аргументы future), пишутся в Job. Job является объектом с

  • абстрактным типом t, описывающим результат вычислений job.
  • методом task, без параметров, типа t, который обозначает выражение для вычислений.
  • методом ret, который использует результат как только тот будет вычислен.

Вычислительный сервер создает n процессоров, во время своей инициализации. Каждый такой процессор переодически потребляет job, выполняет метод task и передает результат на в ret метод Job.

Пример демонстрирует использование абстрактных типов. Абстрактный тип t определяет результат работы, который может варьироваться от одного Job к другому. Без абстрактных типов было бы невозможно, реализовать подобный же класс в статическом, типобезопасном стиле. Потребовались бы динамическая проверка типов, приведение типов.

Вот пример использования вычислительного сервера, для оценки выражения 41 + 1.

object Test with Executable {
    val server = new ComputeServer(1)
    val f = server.future(41 + 1)
    Console.println(f())
}

Почтовые ящики

Почтовые ящики - это высокоуровневые и гибкие конструкции для синхронизации и взаимодействия процессов.

Они позволяют посылать и получать сообщения. "Сообщение" - означает произвольный объект. Есть специальное сообщения TIMEOUT, оно используется для сигнализиации о задержке.

case object TIMEOUT

Почтовые ящики определяют следующие методы.

class MailBox {
    def send(msg: Any): unit
    def receive[a](f: PartialFunction[Any, a]): a
    def receiveWithin[a](msec: long)(f: PartialFunction[Any, a]): a
}

Содержимое почтового ящика состоит из набора сообщений. Сообщения добавляются к почтовому ящику методом send. Сообщения удаляются посредством метода receive, который передаёт обработчик сообщения f как аргумент. f является частичной функцией на сообщениях, со значениями в некотором произвольном типе. Обычно, эта функция реализуется с помощью соответствия по шаблону (pattern matching expression). Метод receive блокируется gjrf есть сообщение в почтовом ящике для которого процессор определен. Сообщение затем, по обработке, удаляется из почтового ящика и блокированный поток перезапускается.

Как отправленные сообщения, так и полученные, упорядочены во времени. Получатель r применяется к соответствующему сообщению m только если нет другой пары (сообщение, получатель), которая предшествует (m, r) на частичном упорядочении в парах.

Простой пример использования почтовых ящиков. Рассмотрим одноместный буфер:

class OnePlaceBuffer {
    private val m = new MailBox; // An internal mailbox
    private case class Empty, Full(x: int); // Types of messages we deal
    with
    m send Empty; // Initialization
    def write(x: int): unit =
    m receive { case Empty => m send Full(x) }
    def read: int =
    m receive { case Full(x) => m send Empty ; x }
}

Почтовый ящик может быть реализован так:

class MailBox {
    private abstract class Receiver extends Signal {
      def isDefined(msg: Any): boolean
    }
    var msg = null
}

Мы определяем внутренний класс для получателей с методом теста isDefined, который сообщает определён ли получатель для данного сообщения. Получатель наследует от класса Signal notify метод, который используется, для "побудки" потока получателя. Когда поток получателя разбужена, сообщение, которое должно быть использованно, слохраняется в переменной msg.

private val sent = new LinkedList[Any]
private var lastSent = sent
private val receivers = new LinkedList[Receiver]
private var lastReceiver = receivers

Mailbox класс использует два связных списка, один для отправки необработанных сообщений, другой - для ожидания обработчиков.

def send(msg: Any): unit = synchronized {
    var r = receivers, r1 = r.next
    while (r1 != null && !r1.elem.isDefined(msg)) {
        r = r1; r1 = r1.next
    }
    if (r1 != null) {
        r.next = r1.next; r1.elem.msg = msg; r1.elem.notify
    } else {
        lastSent = insert(lastSent, msg)
    }
}

Send метод сначала проверяет, применим ли ждущий получатель к посылаемому сообщению. Если да, получатель извещается. В противном случае, сообщение будет добавлено в связанный список посланных сообщений.

def receive[a](f: PartialFunction[Any, a]): a = {
    val msg: Any = synchronized {
        var s = sent, s1 = s.next
        while (s1 != null && !f.isDefinedAt(s1.elem)) {
             s = s1; s1 = s1.next
        }
        if (s1 != null) {
            s.next = s1.next; s1.elem
        } else {
            val r = insert(lastReceiver, new Receiver {
                def isDefined(msg: Any) = f.isDefinedAt(msg)
            })
            lastReceiver = r
            r.elem.wait()
            r.elem.msg
        }
    }
    f(msg)
}

Метод send сначала проверяет применима ли к сообщению функция f обработчика к сообщению, что уже было послано но, еще не обрабатывалось. Если это верно, поток немедленно применяет f к сообщению. В противном случае, новый получатель создаётся и связывается в списке получателей, и поток ждет уведомление на этом получателе. Как только поток будет разбужен снова, он продолжит выполнение, прилагая f к сообщению, которое было сохранено в получателе. Метод insert в связанном списке определяется следующим образом.

def insert(l: LinkedList[a], x: a): LinkedList[a] = {
    l.next = new LinkedList[a]
    l.next.elem = x
    l.next.next = l.next
    l
}

Класс почтового ящика также предлагает метод receiveWithin который блокируется только на определенный интервал времени. Если в течение оговоренного времени (в миллисекундах), никакое сообщение не приходит, процессор сообщения f будет разблокирован специальным сообщением TIME-OUT. Реализация receiveWithin почти как receive:

def receiveWithin[a](msec: long)(f: PartialFunction[Any, a]): a = {
    val msg: Any = synchronized {
        var s = sent, s1 = s.next
        while (s1 != null && !f.isDefinedAt(s1.elem)) {
            s = s1; s1 = s1.next
        }
        if (s1 != null) {
            s.next = s1.next; s1.elem
        } else {
            val r = insert(lastReceiver, new Receiver {
                def isDefined(msg: Any) = f.isDefinedAt(msg)
            })
            lastReceiver = r
            r.elem.wait(msec)
            if (r.elem.msg == null) r.elem.msg = TIMEOUT
            r.elem.msg
        }
     }
    f(msg)
  }
} // end MailBox

Различия очевидны.


Акторы

В главе 3 представлен набросок электронного аукциона. Этот сервис основывался на высокоуровневых процессах - акторах, которые управляются сообщениями в почтовом ящике, используя сравнение по образцу. Актор является просто потоком, основа коммуникации которого - почтовый ящик. Акторы, следовательно, определяются как mixin композиция расширения класса Thread с классом MailBox.

abstract class Actor extends Thread with MailBox