1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.writable.kryo;
19import com.esotericsoftware.kryo.Kryo;
20import com.esotericsoftware.kryo.KryoException;
21import com.esotericsoftware.kryo.Registration;
22import com.esotericsoftware.kryo.io.Input;
23import com.esotericsoftware.kryo.io.Output;
24import com.esotericsoftware.kryo.util.DefaultClassResolver;
25import com.esotericsoftware.kryo.util.ObjectMap;
26import org.apache.giraph.zk.ZooKeeperExt;
27import org.apache.log4j.Logger;
28import org.apache.zookeeper.CreateMode;
29import org.apache.zookeeper.KeeperException;
30import org.apache.zookeeper.ZooDefs;
31import java.util.HashMap;
32import java.util.Map;
33import java.util.List;
3435importstatic com.esotericsoftware.kryo.util.Util.getWrapperClass;
3637/**38 * In order to avoid writing class names to the stream, this class resolver39 * assigns unique integers to each class name and writes/reads those integers40 * to/from the stream. Reads assume that there is already a class assigned41 * to the given integer. This resolver only assigns unique integers for42 * classes that are not explicitly registered since those classes are already43 * assigned unique integers at the time of registration. This implementation44 * uses zookeeper to provide consistent class name to ID mapping across all45 + nodes.46 *47 *48 * If resolver encounters a class name that has not been assigned to a unique49 * integer yet, it creates a class node in zookeeper under a designated path50 * with persistent_sequential mode - allowing the file name of the class node51 * to be suffixed with an auto incremented integer. After the class node is52 * created, the resolver reads back all the nodes under the designated path53 * and uses the unique suffix as the class id. If there are duplicate entries54 * for the same class name due to some race condition, the lowest suffix is55 * used.56 */57publicclassGiraphClassResolverextends DefaultClassResolver {
58/** Base ID to start for class name assignments.59 * This number has to be high enough to not conflict with60 * explicity registered class IDs.61 * */62privatestaticfinalint BASE_CLASS_ID = 1000;
6364/** Class logger */65privatestaticfinal Logger LOG =
66 Logger.getLogger(GiraphClassResolver.class);
6768/** Class name to ID cache */69privatestatic Map<String, Integer> CLASS_NAME_TO_ID = new HashMap();
70/** ID to class name cache */71privatestatic Map<Integer, String> ID_TO_CLASS_NAME = new HashMap();
72/** Zookeeper */73privatestaticZooKeeperExt ZK;
74/** Zookeeper path for automatic class registrations */75privatestatic String KRYO_REGISTERED_CLASS_PATH;
76/** Minimum class ID assigned by zookeeper sequencing */77privatestaticint MIN_CLASS_ID = -1;
78/** True if the zookeeper class registration path is already created */79privatestaticboolean IS_CLASS_PATH_CREATED = false;
8081/** Memoized class id*/82privateint memoizedClassId = -1;
83/** Memoized class registration */84private Registration memoizedClassIdValue;
8586/**87 * Sets zookeeper informaton.88 * @param zookeeperExt ZookeeperExt89 * @param kryoClassPath Zookeeper directory path where class Name-ID90 * mapping is stored.91 */92publicstaticvoid setZookeeperInfo(ZooKeeperExt zookeeperExt,
93 String kryoClassPath) {
94 ZK = zookeeperExt;
95 KRYO_REGISTERED_CLASS_PATH = kryoClassPath;
96 }
9798/**99 * Return true of the zookeeper is initialized.100 * @return True if the zookeeper is initialized.101 */102publicstaticboolean isInitialized() {
103return ZK != null;
104 }
105106/**107 * Creates a new node for the given class name.108 * Creation mode is persistent sequential, i.e.109 * ZK will always create a new node . There could be110 * multiple entries for the same class name but since111 * the lowest index is used, this is not a problem.112 * @param className Class name113 */114publicstaticvoid createClassName(String className) {
115try {
116 String path = KRYO_REGISTERED_CLASS_PATH + "/" + className;
117 ZK.createExt(path,
118null,
119 ZooDefs.Ids.OPEN_ACL_UNSAFE,
120 CreateMode.PERSISTENT_SEQUENTIAL,
121true);
122 } catch (KeeperException e) {
123thrownew IllegalStateException(
124"Failed to create class " + className, e);
125 } catch (InterruptedException e) {
126thrownew IllegalStateException(
127"Interrupted while creating " + className, e);
128 }
129 }
130131/**132 * Refreshes class-ID mapping from zookeeper.133 * Not thread safe.134 */135publicstaticvoid refreshCache() {
136if (!IS_CLASS_PATH_CREATED) {
137try {
138 ZK.createOnceExt(KRYO_REGISTERED_CLASS_PATH,
139null,
140 ZooDefs.Ids.OPEN_ACL_UNSAFE,
141 CreateMode.PERSISTENT,
142true);
143 IS_CLASS_PATH_CREATED = true;
144 } catch (KeeperException e) {
145thrownew IllegalStateException(
146"Failed to refresh kryo cache " +
147 KRYO_REGISTERED_CLASS_PATH, e);
148 } catch (InterruptedException e) {
149thrownew IllegalStateException(
150"Interrupted while refreshing kryo cache " +
151 KRYO_REGISTERED_CLASS_PATH, e);
152 }
153 }
154155 List<String> registeredList;
156try {
157 registeredList =
158 ZK.getChildrenExt(KRYO_REGISTERED_CLASS_PATH,
159 false,
160true,
161 false);
162 } catch (KeeperException e) {
163thrownew IllegalStateException(
164"Failed to retrieve child nodes for " + KRYO_REGISTERED_CLASS_PATH, e);
165 } catch (InterruptedException e) {
166thrownew IllegalStateException(
167"Interrupted while retrieving child nodes for " +
168 KRYO_REGISTERED_CLASS_PATH, e);
169 }
170171for (String name : registeredList) {
172// Since these files are created with PERSISTENT_SEQUENTIAL mode,173// Kryo appends a sequential number to their file name.174if (LOG.isDebugEnabled()) {
175 LOG.debug("Registered class: " + name);
176 }
177 String className = name.substring(0,
178 name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH);
179int classId = Integer.parseInt(
180 name.substring(name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH));
181182if (MIN_CLASS_ID == -1) {
183 MIN_CLASS_ID = classId;
184 }
185186int adjustedId = classId - MIN_CLASS_ID + BASE_CLASS_ID;
187if (CLASS_NAME_TO_ID.putIfAbsent(className, adjustedId) == null) {
188 ID_TO_CLASS_NAME.put(adjustedId, className);
189 }
190 }
191 }
192193/**194 * Gets ID for the given class name.195 * @param className Class name196 * @return class id Class ID197 */198publicstaticint getClassId(String className) {
199if (CLASS_NAME_TO_ID.containsKey(className)) {
200return CLASS_NAME_TO_ID.get(className);
201 }
202synchronized (GiraphClassResolver.class) {
203if (CLASS_NAME_TO_ID.containsKey(className)) {
204return CLASS_NAME_TO_ID.get(className);
205 }
206 refreshCache();
207208if (!CLASS_NAME_TO_ID.containsKey(className)) {
209 createClassName(className);
210 refreshCache();
211 }
212 }
213214if (!CLASS_NAME_TO_ID.containsKey(className)) {
215thrownew IllegalStateException("Failed to assigned id to " + className);
216 }
217218return CLASS_NAME_TO_ID.get(className);
219 }
220221/**222 * Get class name for given ID.223 * @param id class ID224 * @return class name225 */226publicstatic String getClassName(int id) {
227if (ID_TO_CLASS_NAME.containsKey(id)) {
228return ID_TO_CLASS_NAME.get(id);
229 }
230synchronized (GiraphClassResolver.class) {
231if (ID_TO_CLASS_NAME.containsKey(id)) {
232return ID_TO_CLASS_NAME.get(id);
233 }
234 refreshCache();
235 }
236237if (!ID_TO_CLASS_NAME.containsKey(id)) {
238thrownew IllegalStateException("ID " + id + " doesn't exist");
239 }
240return ID_TO_CLASS_NAME.get(id);
241 }
242243 @Override
244public Registration register(Registration registration) {
245if (registration == null) {
246thrownew IllegalArgumentException("registration cannot be null");
247 }
248if (registration.getId() == NAME) {
249thrownew IllegalArgumentException("Invalid registration ID");
250 }
251252 idToRegistration.put(registration.getId(), registration);
253 classToRegistration.put(registration.getType(), registration);
254if (registration.getType().isPrimitive()) {
255 classToRegistration.put(getWrapperClass(registration.getType()),
256 registration);
257 }
258return registration;
259 }
260261 @Override
262public Registration registerImplicit(Class type) {
263return register(new Registration(type, kryo.getDefaultSerializer(type),
264 getClassId(type.getName())));
265 }
266267 @Override
268public Registration writeClass(Output output, Class type) {
269if (type == null) {
270 output.writeVarInt(Kryo.NULL, true);
271returnnull;
272 }
273274 Registration registration = kryo.getRegistration(type);
275if (registration.getId() == NAME) {
276thrownew IllegalStateException("Invalid registration ID");
277 } else {
278// Class ID's are incremented by 2 when writing, because 0 is used279// for null and 1 is used for non-explicitly registered classes.280 output.writeVarInt(registration.getId() + 2, true);
281 }
282return registration;
283 }
284285 @Override
286public Registration readClass(Input input) {
287int classID = input.readVarInt(true);
288if (classID == Kryo.NULL) {
289returnnull;
290 } elseif (classID == NAME + 2) {
291thrownew IllegalStateException("Invalid class ID");
292 }
293if (classID == memoizedClassId) {
294return memoizedClassIdValue;
295 }
296 Registration registration = idToRegistration.get(classID - 2);
297if (registration == null) {
298 String className = getClassName(classID - 2);
299 Class type = getTypeByName(className);
300if (type == null) {
301try {
302 type = Class.forName(className, false, kryo.getClassLoader());
303 } catch (ClassNotFoundException ex) {
304thrownew KryoException("Unable to find class: " + className, ex);
305 }
306if (nameToClass == null) {
307 nameToClass = new ObjectMap();
308 }
309 nameToClass.put(className, type);
310 }
311 registration = new Registration(type, kryo.getDefaultSerializer(type),
312 classID - 2);
313 register(registration);
314 }
315 memoizedClassId = classID;
316 memoizedClassIdValue = registration;
317return registration;
318 }
319320/**321 * Reset the internal state322 * Reset clears two hash tables:323 * 1 - Class name to ID: Every non-explicitly registered class takes the324 * ID agreed by all kryo instances, and it doesn't change across325 * serializations, so this reset is not required.326 * 2- Reference tracking: Not required because it is disabled.327 *328 * Therefore, this method should not be invoked.329 *330 */331publicvoid reset() {
332thrownew IllegalStateException("Not implemented");
333 }
334335/**336 * This method writes the class name for the first encountered337 * non-explicitly registered class. Since all non-explicitly registered338 * classes take the ID agreed by all kryo instances, there is no need339 * to write the class name, so this method should not be invoked.340 * @param output Output stream341 * @param type CLass type342 * @param registration Registration343 */344 @Override
345protectedvoid writeName(Output output, Class type,
346 Registration registration) {
347thrownew IllegalStateException("Not implemented");
348 }
349350/**351 * This method reads the class name for the first encountered352 * non-explicitly registered class. Since all non-explicitly registered353 * classes take the ID agreed by all kryo instances, class name is354 * never written, so this method should not be invoked.355 * @param input Input stream356 * @return Registration357 */358 @Override
359protected Registration readName(Input input) {
360thrownew IllegalStateException("Not implemented");
361 }
362363/**364 * Get type by class name.365 * @param className Class name366 * @return class type367 */368protected Class<?> getTypeByName(final String className) {
369return nameToClass != null ? nameToClass.get(className) : null;
370 }
371 }