@@ -25,7 +25,16 @@ import _root_.java.io.{
25
25
ObjectOutputStream
26
26
}
27
27
28
+ import com .esotericsoftware .kryo .serializers .JavaSerializer
29
+ import com .esotericsoftware .kryo .DefaultSerializer
30
+ import _root_ .java .util .concurrent .atomic .{AtomicBoolean , AtomicReference }
31
+ import com .esotericsoftware .kryo .KryoSerializable
32
+
28
33
object Externalizer {
34
+ /* Tokens used to distinguish if we used Kryo or Java */
35
+ private val KRYO = 0
36
+ private val JAVA = 1
37
+
29
38
def apply [T ](t : T ): Externalizer [T ] = {
30
39
val x = new Externalizer [T ]
31
40
x.set(t)
@@ -39,24 +48,38 @@ object Externalizer {
39
48
* work. Of course, Java serialization may fail if the contained
40
49
* item is not Java serializable
41
50
*/
42
- class Externalizer [T ] extends Externalizable {
43
- private var item : Option [T ] = None
51
+ class Externalizer [T ] extends Externalizable with KryoSerializable {
52
+ // Either points to a result or a delegate Externalizer to fufil that result.
53
+ private var item : Either [Externalizer [T ], Option [T ]] = Right (None )
54
+ import Externalizer ._
55
+
56
+ @ transient private val doesJavaWork = new AtomicReference [Option [Boolean ]](None )
57
+ @ transient private val testing = new AtomicBoolean (false )
58
+ // For backwards compatibility
59
+ private def KRYO = Externalizer .KRYO
44
60
45
- def getOption : Option [T ] = item
46
- def get : T = item.get // This should never be None when get is called
61
+ // No vals or var's below this line!
62
+
63
+ def getOption : Option [T ] = item match {
64
+ case Left (e) => e.getOption
65
+ case Right (i) => i
66
+ }
67
+
68
+ def get : T = getOption.get // This should never be None when get is called
47
69
48
70
/** Unfortunately, Java serialization requires mutable objects if
49
71
* you are going to control how the serialization is done.
50
72
* Use the companion object to creat new instances of this
51
73
*/
52
74
def set (it : T ): Unit = {
53
- assert(item.isEmpty, " Tried to call .set on an already constructed Externalizer" )
54
- item = Some (it)
75
+ item match {
76
+ case Left (e) => e.set(it)
77
+ case Right (x) =>
78
+ assert(x.isEmpty, " Tried to call .set on an already constructed Externalizer" )
79
+ item = Right (Some (it))
80
+ }
55
81
}
56
82
57
- /* Tokens used to distinguish if we used Kryo or Java */
58
- private val KRYO = 0
59
- private val JAVA = 1
60
83
61
84
/** Override this to configure Kryo creation with a named subclass,
62
85
* e.g.
@@ -70,35 +93,47 @@ class Externalizer[T] extends Externalizable {
70
93
(new ScalaKryoInstantiator ).setReferences(true )
71
94
72
95
// 1 here is 1 thread, since we will likely only serialize once
73
- private val kpool = KryoPool .withByteArrayOutputStream(1 , kryo)
96
+ // this should not be a val because we don't want to capture a reference
97
+
98
+
99
+ def javaWorks : Boolean =
100
+ doesJavaWork.get match {
101
+ case Some (v) => v
102
+ case None => probeJavaWorks
103
+ }
74
104
75
105
/** Try to round-trip and see if it works without error
76
106
*/
77
- lazy val javaWorks : Boolean = {
107
+ private def probeJavaWorks : Boolean = {
108
+ if (! testing.compareAndSet(false , true )) return true
78
109
try {
79
110
val baos = new ByteArrayOutputStream ()
80
111
val oos = new ObjectOutputStream (baos)
81
- oos.writeObject(item )
112
+ oos.writeObject(getOption )
82
113
val bytes = baos.toByteArray
83
114
val testInput = new ByteArrayInputStream (bytes)
84
115
val ois = new ObjectInputStream (testInput)
85
116
ois.readObject // this may throw
117
+ doesJavaWork.set(Some (true ))
86
118
true
87
119
}
88
120
catch {
89
121
case t : Throwable =>
90
122
Option (System .getenv.get(" CHILL_EXTERNALIZER_DEBUG" ))
91
123
.filter(_.toBoolean)
92
124
.foreach { _ => t.printStackTrace }
125
+ doesJavaWork.set(Some (false ))
93
126
false
94
127
}
128
+ finally {
129
+ testing.set(false )
130
+ }
95
131
}
96
132
97
- private def safeToBytes : Option [Array [Byte ]] = {
133
+ private def safeToBytes ( kryo : KryoInstantiator ) : Option [Array [Byte ]] = {
98
134
try {
99
- val bytes = kpool.toBytesWithClass(item)
100
- // Make sure we can read without throwing
101
- fromBytes(bytes)
135
+ val kpool = KryoPool .withByteArrayOutputStream(1 , kryo)
136
+ val bytes = kpool.toBytesWithClass(getOption)
102
137
Some (bytes)
103
138
}
104
139
catch {
@@ -109,41 +144,80 @@ class Externalizer[T] extends Externalizable {
109
144
None
110
145
}
111
146
}
112
- private def fromBytes (b : Array [Byte ]): Option [T ] =
113
- kpool.fromBytes(b).asInstanceOf [Option [T ]]
147
+ private def fromBytes (b : Array [Byte ], kryo : KryoInstantiator ): Option [T ] =
148
+ KryoPool .withByteArrayOutputStream(1 , kryo)
149
+ .fromBytes(b)
150
+ .asInstanceOf [Option [T ]]
114
151
115
- def readExternal (in : ObjectInput ) {
152
+ override def readExternal (in : ObjectInput ) = maybeReadJavaKryo(in, kryo)
153
+
154
+ private def maybeReadJavaKryo (in : ObjectInput , kryo : KryoInstantiator ) {
116
155
in.read match {
117
156
case JAVA =>
118
- item = in.readObject.asInstanceOf [Option [T ]]
157
+ item = Right ( in.readObject.asInstanceOf [Option [T ]])
119
158
case KRYO =>
120
159
val sz = in.readInt
121
160
val buf = new Array [Byte ](sz)
122
161
in.readFully(buf)
123
- item = fromBytes(buf)
162
+ item = Right ( fromBytes(buf, kryo) )
124
163
}
125
164
}
126
165
127
166
protected def writeJava (out : ObjectOutput ): Boolean =
128
167
javaWorks && {
129
168
out.write(JAVA )
130
- out.writeObject(item )
169
+ out.writeObject(getOption )
131
170
true
132
171
}
133
172
134
- protected def writeKryo (out : ObjectOutput ): Boolean =
135
- safeToBytes.map { bytes =>
173
+ protected def writeKryo (out : ObjectOutput ): Boolean = writeKryo(out, kryo)
174
+
175
+ protected def writeKryo (out : ObjectOutput , kryo : KryoInstantiator ): Boolean =
176
+ safeToBytes(kryo).map { bytes =>
136
177
out.write(KRYO )
137
178
out.writeInt(bytes.size)
138
179
out.write(bytes)
139
180
true
140
181
}.getOrElse(false )
141
182
142
- def writeExternal (out : ObjectOutput ) {
143
- writeJava(out) || writeKryo(out) || {
144
- val inner = item. get
183
+ private def maybeWriteJavaKryo (out : ObjectOutput , kryo : KryoInstantiator ) {
184
+ writeJava(out) || writeKryo(out, kryo ) || {
185
+ val inner = get
145
186
sys.error(" Neither Java nor Kyro works for class: %s instance: %s\n export CHILL_EXTERNALIZER_DEBUG=true to see both stack traces"
146
187
.format(inner.getClass, inner))
147
188
}
148
189
}
190
+
191
+ override def writeExternal (out : ObjectOutput ) = maybeWriteJavaKryo(out, kryo)
192
+
193
+ def write (kryo : Kryo , output : Output ): Unit = {
194
+ val resolver = kryo.getReferenceResolver
195
+ resolver.getWrittenId(item) match {
196
+ case - 1 =>
197
+ output.writeInt(- 1 )
198
+ resolver.addWrittenObject(item)
199
+ val oStream = new ObjectOutputStream (output)
200
+ maybeWriteJavaKryo(oStream, () => kryo)
201
+ oStream.flush
202
+ case n =>
203
+ output.writeInt(n)
204
+ }
205
+ }
206
+
207
+ def read (kryo : Kryo , input : Input ): Unit = {
208
+ doesJavaWork.set(None )
209
+ testing.set(false )
210
+ val state = input.readInt()
211
+ val resolver = kryo.getReferenceResolver
212
+ state match {
213
+ case - 1 =>
214
+ val objId = resolver.nextReadId(this .getClass)
215
+ resolver.setReadObject(objId, this )
216
+ maybeReadJavaKryo(new ObjectInputStream (input), () => kryo)
217
+ case n =>
218
+ val z = resolver.getReadObject(this .getClass, n).asInstanceOf [Externalizer [T ]]
219
+ if (! (z eq this )) item = Left (z)
220
+ }
221
+ }
222
+
149
223
}
0 commit comments