|
| 1 | +/* |
| 2 | +Copyright 2013 Twitter, Inc. |
| 3 | +
|
| 4 | +Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | +you may not use this file except in compliance with the License. |
| 6 | +You may obtain a copy of the License at |
| 7 | +
|
| 8 | +http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | +
|
| 10 | +Unless required by applicable law or agreed to in writing, software |
| 11 | +distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | +See the License for the specific language governing permissions and |
| 14 | +limitations under the License. |
| 15 | +*/ |
| 16 | + |
| 17 | +package com.twitter.chill.akka |
| 18 | + |
| 19 | +import akka.actor.{ExtendedActorSystem, ActorRef} |
| 20 | +import akka.serialization.Serializer |
| 21 | + |
| 22 | +import com.twitter.chill._ |
| 23 | +import com.twitter.chill.config.ConfiguredInstantiator |
| 24 | + |
| 25 | +/** |
| 26 | + * To use, add a key to your config like: |
| 27 | + * |
| 28 | + * {{{ |
| 29 | + * |
| 30 | + * akka.actor.serializers { |
| 31 | + * kryo = "com.twitter.chill.akka.AkkaSerializer" |
| 32 | + * } |
| 33 | + * }}} |
| 34 | + * |
| 35 | + * Then for the super-classes of all your message types, |
| 36 | + * for instance, scala.Product, write: |
| 37 | + * {{{ |
| 38 | + * akka.actor.serialization-bindings { |
| 39 | + * "scala.Product" = kryo |
| 40 | + * } |
| 41 | + * }}} |
| 42 | + * |
| 43 | + * Kryo is not thread-safe so we use an object pool to avoid over allocating. |
| 44 | + */ |
| 45 | +class AkkaSerializer(system: ExtendedActorSystem) extends Serializer { |
| 46 | + |
| 47 | + /** You can override this to easily change your serializers. |
| 48 | + * If you do so, make sure to change the config to use the name of |
| 49 | + * your new class |
| 50 | + */ |
| 51 | + def kryoInstantiator: KryoInstantiator = |
| 52 | + (new ScalaKryoInstantiator).withRegistrar(new ActorRefSerializer(system)) |
| 53 | + |
| 54 | + /** |
| 55 | + * Since each thread only needs 1 Kryo, the pool doesn't need more |
| 56 | + * space than the number of threads. We guess that there are 4 hyperthreads / |
| 57 | + * core and then multiple by the nember of cores. |
| 58 | + */ |
| 59 | + def poolSize: Int = { |
| 60 | + val GUESS_THREADS_PER_CORE = 4 |
| 61 | + GUESS_THREADS_PER_CORE * Runtime.getRuntime.availableProcessors |
| 62 | + } |
| 63 | + |
| 64 | + val kryoPool: KryoPool = |
| 65 | + KryoPool.withByteArrayOutputStream(poolSize, kryoInstantiator) |
| 66 | + |
| 67 | + def includeManifest: Boolean = false |
| 68 | + def identifier = 8675309 |
| 69 | + def toBinary(obj: AnyRef): Array[Byte] = kryoPool.toBytesWithClass(obj) |
| 70 | + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = |
| 71 | + kryoPool.fromBytes(bytes) |
| 72 | +} |
| 73 | + |
| 74 | +/** Uses the Config system of chill.config to Configure at runtime which KryoInstantiator to use |
| 75 | + * Overriding kryoInstantiator and using your own class name is probably easier for most cases. |
| 76 | + * See ConfiguredInstantiator static methods for how to build up a correct Config with |
| 77 | + * your reflected or serialized instantiators. |
| 78 | + */ |
| 79 | +class ConfiguredAkkaSerializer(system: ExtendedActorSystem) extends AkkaSerializer(system) { |
| 80 | + override def kryoInstantiator: KryoInstantiator = |
| 81 | + (new ConfiguredInstantiator(new AkkaConfig(system.settings.config))) |
| 82 | + .withRegistrar(new ActorRefSerializer(system)) |
| 83 | +} |
0 commit comments