Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bugs in concurrency chapter, "A solution: Producer/Consumer" section #124

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions web/concurrency.textile
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ A common pattern for async computation is to separate producers from consumers a
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}

// Concrete producer
class Producer[T](path: String, queue: BlockingQueue[T]) extends Runnable {
class Producer[T>:String](path: String, queue: BlockingQueue[T]) extends Runnable {
def run() {
Source.fromFile(path, "utf-8").getLines.foreach { line =>
queue.put(line)
Expand All @@ -447,7 +447,7 @@ abstract class Consumer[T](queue: BlockingQueue[T]) extends Runnable {
def consume(x: T)
}

val queue = new LinkedBlockingQueue[String]()
val q = new LinkedBlockingQueue[String]()

// One thread for the producer
val producer = new Producer[String]("users.txt", q)
Expand All @@ -467,8 +467,11 @@ class IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String]) extend
val cores = 8
val pool = Executors.newFixedThreadPool(cores)

val mutable_map = mutable.Map[String, User]()
val index = new InvertedIndex(mutable_map)

// Submit one consumer per core.
for (i <- i to cores) {
for (i <- 1 to cores) {
pool.submit(new IndexerConsumer[String](index, q))
}
</pre>
</pre>
2 changes: 1 addition & 1 deletion web/finagle.textile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ title: An introduction to Finagle
layout: post
---

"Finagle":https://github.com/twitter/finagle is Twitter's RPC system. "This":http://engineering.twitter.com/2011/08/finagle-protocol-agnostic-rpc-system.html blog post explains its motivations and core design tenets, the "finagle README":https://github.com/twitter/finagle/blob/master/README.md contains more detailed documentation. Finagle aims to make it easy to build robust clients and servers.
"Finagle":https://github.com/twitter/finagle is Twitter's RPC system. "This":https://blog.twitter.com/2011/finagle-a-protocol-agnostic-rpc-system blog post explains its motivations and core design tenets, the "finagle README":https://github.com/twitter/finagle/blob/master/README.md contains more detailed documentation. Finagle aims to make it easy to build robust clients and servers.

* "REPL":#repl
* "Futures":#Future: "Sequential composition":#futsequential, "Concurrent composition":#futconcurrent, "Composition Example: Cached Rate Limit":#combined_combinator_example_cache, "Composition Example: Thumbnail Fetcher":#combined_combinator_thumbnail
Expand Down
19 changes: 11 additions & 8 deletions web/ko/concurrency.textile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ h2(#Thread). Thread

스칼라 동시성은 자바 동시성 모델 위에 구현되어 있다.

Sun의 JVM에서 IO-작업 부하가 큰 경우 한 기계 위에 수만개의 쓰레드를 실행할 수 있다.
Sun의 JVM에서 IO-작업 부하가 큰 경우 한 기계 위에 수만개의 쓰레드를 실행할 수 있다.

Thread는 Runnable을 받는다. Thread가 Runnable을 실행하게 만들려면 Thread의 @start@ 를 호출해야 한다.

Expand Down Expand Up @@ -92,7 +92,7 @@ class Handler(socket: Socket) extends Runnable {

이 프로그램의 가장 큰 문제점은 한번에 한 요청밖에 처리하지 못한다는 점이다!

각각의 요청을 쓰레드에 넣었다면 더 좋았을 것이다. 단지 아래 부분을
각각의 요청을 쓰레드에 넣었다면 더 좋았을 것이다. 단지 아래 부분을

<pre>
(new Handler(socket)).run()
Expand Down Expand Up @@ -167,7 +167,7 @@ pool-1-thread-2

h2(#Future). Future

@Future@ 는 비동기적 연산을 나타낸다. 필요한 계산을 Future로 감싼 다음 결과가 필요할 때 그 Future의 @get()@ 메소드를 호출하면 된다. 이 @get()@ 메소드는 블록킹 메소드이다.
@Future@ 는 비동기적 연산을 나타낸다. 필요한 계산을 Future로 감싼 다음 결과가 필요할 때 그 Future의 @get()@ 메소드를 호출하면 된다. 이 @get()@ 메소드는 블록킹 메소드이다.
@Executor@ 는 @Future@ 를 반환한다. Finagle RPC 시스템을 사용한다면 @Future@ 인스턴스에 결과를 담는다. 결과는 경우에 따라 아직 도착하지 않았을 수도 있다.

@FutureTask@ 는 Runnable이며 @Executor@ 가 실행하도록 설계되었다.
Expand Down Expand Up @@ -296,7 +296,7 @@ h2(#example). 안전하지 않은 검색 엔진을 만들자

여기 쓰레드 안전성이 없는 역 인덱스가 있다. 이 역 인덱스는 이름의 일부에 대해 사용자를 연결해준다.

오직 한 쓰레드만 억세스할 수 있다는 순진한 가정하에 작성되어 있다.
오직 한 쓰레드만 억세스할 수 있다는 순진한 가정하에 작성되어 있다.

@mutable.HashMap@ 를 사용하게 되어 있는 추가 기본 생성자 @this()@ 에 유의하라.

Expand Down Expand Up @@ -421,13 +421,13 @@ class FileRecordProducer(path: String) extends UserMaker {

h3. 해결책: 생산자/소비자

비동기적 계산에서 공통된 패턴은 생산자와 소비자를 분리해 둘 사이에는 @Queue@ 를 사용한 통신만 허용하는 것이다. 앞의 검색엔진 인덱스 프로그램에 이를 어떻게 적용할 수 있나 살펴보자.
비동기적 계산에서 공통된 패턴은 생산자와 소비자를 분리해 둘 사이에는 @Queue@ 를 사용한 통신만 허용하는 것이다. 앞의 검색엔진 인덱스 프로그램에 이를 어떻게 적용할 수 있나 살펴보자.

<pre>
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}

// 구체적 생산자
class Producer[T](path: String, queue: BlockingQueue[T]) extends Runnable {
class Producer[T>:String](path: String, queue: BlockingQueue[T]) extends Runnable {
def run() {
Source.fromFile(path, "utf-8").getLines.foreach { line =>
queue.put(line)
Expand All @@ -447,7 +447,7 @@ abstract class Consumer[T](queue: BlockingQueue[T]) extends Runnable {
def consume(x: T)
}

val queue = new LinkedBlockingQueue[String]()
val q = new LinkedBlockingQueue[String]()

// 생산자는 쓰레드 하나로 동작
val producer = new Producer[String]("users.txt", q)
Expand All @@ -467,8 +467,11 @@ class IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String]) extend
val cores = 8
val pool = Executors.newFixedThreadPool(cores)

val mutable_map = mutable.Map[String, User]()
val index = new InvertedIndex(mutable_map)

// 코어마다 소비자를 하나씩 배당하자
for (i <- i to cores) {
for (i <- 1 to cores) {
pool.submit(new IndexerConsumer[String](index, q))
}
</pre>
26 changes: 13 additions & 13 deletions web/ko/finagle.textile
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ title: 피네이글(Finagle) 소개
layout: post
---

"피네이글(Finagle)":https://github.com/twitter/finagle 은 트위터의 RPC(원격 프로시져 호출) 시스템이다.
"이 블로그 글":http://engineering.twitter.com/2011/08/finagle-protocol-agnostic-rpc-system.html 은 만들게 된 동기와 설계 원칙을, "피네이글 README":https://github.com/twitter/finagle/blob/master/README.md 에는 더 자세한 설명이 있다. 피네이글은 튼튼한 클라이언트와 서버를 쉽게 만들려는 목적으로 쓰여졌다.
"피네이글(Finagle)":https://github.com/twitter/finagle 은 트위터의 RPC(원격 프로시져 호출) 시스템이다.
"이 블로그 글":https://blog.twitter.com/2011/finagle-a-protocol-agnostic-rpc-system 은 만들게 된 동기와 설계 원칙을, "피네이글 README":https://github.com/twitter/finagle/blob/master/README.md 에는 더 자세한 설명이 있다. 피네이글은 튼튼한 클라이언트와 서버를 쉽게 만들려는 목적으로 쓰여졌다.

* "REPL":#repl
* "Future":#Future: "순차 합성":#futsequential, "동시 합성":#futconcurrent, "합성 예제: 캐시된 비율 제한":#combined_combinator_example_cache, "합성 예제: 썸네일 페치 장치":#combined_combinator_thumbnail
Expand Down Expand Up @@ -96,7 +96,7 @@ scala>

h3. 순차 합성(Sequential composition)

Future에도 <a href="collections.html#combinators">컬렉션 API의 콤비네이터와와 유사한</a> 콤비네이터가 있다(예: map, flatMap).
Future에도 <a href="collections.html#combinators">컬렉션 API의 콤비네이터와와 유사한</a> 콤비네이터가 있다(예: map, flatMap).
기억을 되살려 보자. 컬렉션의 콤비네이터를 사용하면 "정수의 리스트와 정수를 제곱하는 <tt>squre</tt>함수가 있다. 그 함수를 내 정수 리스트에 적용해서 제곱된 값의 리스트를 구하자" 같은 표현이 가능하다. 아주 깔끔하다. 콤비네이터 함수와 다른 함수들을 함께 조합하면 새로운 함수를 정의하는 효과를 얻는다. Future 콤비네이터로는 "나는 미래에 정수가 될 Future가 있고, <tt>squire</tt>가 있다. 이 함수를 Future에 적용해서 이 미래에 정수가 될 잠재적 정수의 제곱을 구하자"라고 할 수 있다.

만약 비동기 API를 정의한다면, 요청 값이 API에 들어오고, API는 Future로 둘러싸인 응답을 돌려줄 것이다. 따라서 입력과 함수를 Future로 바꿔주는 콤비네이터가 있다면 아주 유용할 것이다. 이를 사용하면 비동기 API를 다른 동기식 API를 기반으로 정의할 수 있기 때문이다.
Expand All @@ -107,8 +107,8 @@ Future에도 <a href="collections.html#combinators">컬렉션 API의 콤비네
<code>def Future[A].flatMap[B](f: A => Future[B]): Future[B]</code>
</blockquote>

<code>flatMap</code>은 두 Future를 순서대로 연결한다. 즉, 한 Future와 비동기식 함수를 받아서 다른 Future를 반환한다. flatMap 메소드의 시그니쳐가 말하는 바 대로이다.
앞의 Future가 성공해서 값을 반환하면, 함수 <code>f</code>는 다음 <code>Future</code>를 제공한다.
<code>flatMap</code>은 두 Future를 순서대로 연결한다. 즉, 한 Future와 비동기식 함수를 받아서 다른 Future를 반환한다. flatMap 메소드의 시그니쳐가 말하는 바 대로이다.
앞의 Future가 성공해서 값을 반환하면, 함수 <code>f</code>는 다음 <code>Future</code>를 제공한다.
<code>flatMap</code>은 입력 Future가 성공적으로 완료된 경우에 자동으로 <code>f</code>를 호출한다. 이 호출의 결과는 새로운 <code>Future</code>이며, 이는 두 Future(입력 Future와 비동기 함수)가 모두 성공적으로 완료된 경우에만 완료된다.
두 <code>Future</code>중 하나라도 실패하면 flatMap의 결과값으로 나온 <code>Future</code>도 또한 실패할 것이다.
묵시적으로 오류를 넘기는 것을 통해 의미상 중요한 경우에만 오류를 처리할 수 있다. <code>flatMap</code>은 이런 의미를 가지는 콤비네이터를 정의할 때 표준적으로 사용하는 이름이다.
Expand Down Expand Up @@ -144,7 +144,7 @@ res45: Boolean = false
scala>
</pre>

이와 비슷하게 <em>동기식</em> 함수를 Future에 적용하려면 <tt>map</tt>을 사용하면 된다.
이와 비슷하게 <em>동기식</em> 함수를 Future에 적용하려면 <tt>map</tt>을 사용하면 된다.
예를 들어 Future[RawCredentials]가 있고 Future[Credentials]이 필요하다 치자. 동기식 함수 <code>normalize</code>가 RawCredentials를 Credentials로 바꿔준다면 <code>map</code>을 쓸 수 있다.

<pre>
Expand Down Expand Up @@ -184,7 +184,7 @@ res48: String = florence
scala>
</pre>

스칼라에는 flatMap을 호출하는 것을 문법적으로 간편하게 해주는 <code>for</code> 컴프리헨션이 있다.
스칼라에는 flatMap을 호출하는 것을 문법적으로 간편하게 해주는 <code>for</code> 컴프리헨션이 있다.
로그인 요청을 비동기 API를 통해 인증하고 해당 사용자가 사용 정지 중인지를 비동기 API를 통해 검사한다 하자. 컴프리헨션을 사용하면 다음과 같이 쓸 수 있다.

<pre>
Expand Down Expand Up @@ -289,7 +289,7 @@ h3. 합성 예제: 캐시 비율 제한
이런 콤비네이터들은 네트워크 서비스에 있어 전형적인 연산을 표현한다. 다음의 (가상의) 코드는 사용자 대신 요청을 뒷단에 디스패치하면서 동시에 비율 제한(지역 비율 제한 캐시를 유지하기 위해 사용함)을 수행한다.

<pre>
// 사용자가 비율 제한에 걸렸는지를 본다. 이 과정은 시간이 오래 걸릴 수 있다.
// 사용자가 비율 제한에 걸렸는지를 본다. 이 과정은 시간이 오래 걸릴 수 있다.
// 원격 서버에게 사용자의 제한 여부를 물어봐야 하기 때문이다
def isRateLimited(u: User): Future[Boolean] = {
...
Expand All @@ -298,7 +298,7 @@ def isRateLimited(u: User): Future[Boolean] = {
// 이 구현을 들어내고 다른 더 제약이 많은 정책을 구현하도록 할 수 있는지 한번 생각해 보라

// 캐시를 검사해 사용자가 비율 제한에 걸렸는지를 본다. 캐시는 단순한 맵으로 되어 있다.
// 따라서 값을 바로 반환한다. 하지만, 혹시 나중에 더 느린 캐시 구현을 사용할 수도 있으므로
// 따라서 값을 바로 반환한다. 하지만, 혹시 나중에 더 느린 캐시 구현을 사용할 수도 있으므로
// 어쨌든 Future를 반환하도록 한다
def isLimitedByCache(u: User): Future[Boolean] = Future.value(limitCache(u))

Expand Down Expand Up @@ -463,7 +463,7 @@ h2(#Service). 서비스

!>finagle_client_server.png(Client and Server)!

클라이언트와 서버를 Service를 기반으로 정의한다.
클라이언트와 서버를 Service를 기반으로 정의한다.

피네이글 클라이언트는 네트워크에서 서비스를 "수입"한다. 개념상 피네이글 클라이언트는 두 부분으로 구성된다.

Expand Down Expand Up @@ -565,7 +565,7 @@ class MyService(client: Service[..]) extends Service[HttpRequest, HttpResponse]
}
</pre>

여기서 <code>rewriteReq</code>와 <code>rewriteRes</code>는 프로토콜 변환을 제공할 수 있다. 예를 들어,
여기서 <code>rewriteReq</code>와 <code>rewriteRes</code>는 프로토콜 변환을 제공할 수 있다. 예를 들어,

<pre>
abstract class Filter[-ReqIn, +RepOut, +ReqOut, -RepIn]
Expand Down Expand Up @@ -678,14 +678,14 @@ val server = ServerBuilder()
.build(filteredService)
</pre>

이렇게 하면 <var>port</var> 상에, <var>service</var>로 요청을 전달하는 Thrift 서버를 만들게 된다. <code>hostConnectionMaxLifeTime</code>부분의 주석을 제거하면 각 연결이 최대 5분간만 살아있도록 제한하게 된다.
이렇게 하면 <var>port</var> 상에, <var>service</var>로 요청을 전달하는 Thrift 서버를 만들게 된다. <code>hostConnectionMaxLifeTime</code>부분의 주석을 제거하면 각 연결이 최대 5분간만 살아있도록 제한하게 된다.
<code>readTimeout</code>의 주석을 제거하면, 요청이 2분 이내에 들어와야만 처리하게 된다. 필수적인 <code>ServerBuilder</code> 옵션은 <code>name</code>, <code>bindTo</code>, <code>codec</code>이다.

h2(#DontBlock). 블록하지 말자(제대로 하는게 아니라면)

피네이글은 서비스가 부드럽게 동작하도록 자동으로 쓰레드를 관리한다. 어느 한 서비스가 블록되면 모든 피네이글 쓰레드가 블록된다.

* 코드가 블록되는 연산(<code>apply</code>나 <code>get</code>)을 호출한다면, <a href="https://github.com/twitter/finagle#Using%20Future%20Pools">Future Pool</a>을 사용해 그 블록되는 코드를 감싸라. 이렇게 하면 블록킹 연산이 자체 쓰레드 풀 안에서 실행되고, Future를 통해 완료(또는 실패)시점을 알 수 있게 된다. 또한 이 Future는 다른 Future와 함성할 수 있다.
* 코드가 블록되는 연산(<code>apply</code>나 <code>get</code>)을 호출한다면, <a href="https://github.com/twitter/finagle#Using%20Future%20Pools">Future Pool</a>을 사용해 그 블록되는 코드를 감싸라. 이렇게 하면 블록킹 연산이 자체 쓰레드 풀 안에서 실행되고, Future를 통해 완료(또는 실패)시점을 알 수 있게 된다. 또한 이 Future는 다른 Future와 함성할 수 있다.

* Future의 순차 합성을 사용한다면 Future중이 블록되는게 있는지 우려할 필요가 없다.

Expand Down
11 changes: 7 additions & 4 deletions web/ru/concurrency.textile
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ h2. Java ConcurrentHashMap
Java поставляется с прекрасным потокобезопасным ConcurrentHashMap. К счастью, мы можем использовать JavaConversions, чтобы использовать семантику Scala.

В самом деле, мы можем легко создать новый код, используя потокобезопасный InvertedIndex, как продолжение старого небезопасного.

<pre>
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConversions._
Expand Down Expand Up @@ -424,7 +424,7 @@ h3. Решение: Producer/Consumer
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}

// Concrete producer
class Producer[T](path: String, queue: BlockingQueue[T]) implements Runnable {
class Producer[T>:String](path: String, queue: BlockingQueue[T]) extends Runnable {
public void run() {
Source.fromFile(path, "utf-8").getLines.foreach { line =>
queue.put(line)
Expand All @@ -444,7 +444,7 @@ abstract class Consumer[T](queue: BlockingQueue[T]) implements Runnable {
def consume(x: T)
}

val queue = new LinkedBlockingQueue[String]()
val q = new LinkedBlockingQueue[String]()

// Один поток для потребителя
val producer = new Producer[String]("users.txt", q)
Expand All @@ -464,8 +464,11 @@ class IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String]) extend
val cores = 8
val pool = Executors.newFixedThreadPool(cores)

val mutable_map = mutable.Map[String, User]()
val index = new InvertedIndex(mutable_map)

// Распределим по одному потребителю на каждое ядро.
for (i <- i to cores) {
for (i <- 1 to cores) {
pool.submit(new IndexerConsumer[String](index, q))
}
</pre>
Loading