1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.partition;
2021import org.apache.giraph.worker.LocalData;
22import org.apache.hadoop.io.LongWritable;
23import org.apache.hadoop.io.Writable;
24import org.apache.log4j.Logger;
2526/**27 * Factory for long-byte mapping based partitioners.28 *29 * @param <V> vertexValue type30 * @param <E> edgeValue type31 */32 @SuppressWarnings("unchecked")
33publicclass LongMappingStorePartitionerFactory<V extends Writable,
34 E extends Writable> extends GraphPartitionerFactory<LongWritable, V, E> {
35/** Logger Instance */36privatestaticfinal Logger LOG = Logger.getLogger(
37 LongMappingStorePartitionerFactory.class);
38/** Local Data that supplies the mapping store */39protected LocalData<LongWritable, V, E, ? extends Writable> localData = null;
4041 @Override
42publicvoid initialize(LocalData<LongWritable, V, E,
43 ? extends Writable> localData) {
44this.localData = localData;
45 LOG.info("Initializing LongMappingStorePartitionerFactory with localData");
46 }
4748 @Override
49publicint getPartition(LongWritable id, int partitionCount,
50int workerCount) {
51return localData.getMappingStoreOps().getPartition(id,
52 partitionCount, workerCount);
53 }
5455 @Override
56publicint getWorker(int partition, int partitionCount, int workerCount) {
57int numRows = partitionCount / workerCount;
58 numRows = (numRows * workerCount == partitionCount) ? numRows : numRows + 1;
59return partition / numRows;
60 }
61 }