1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.mapping;
20
21 import org.apache.hadoop.io.ByteWritable;
22 import org.apache.hadoop.io.LongWritable;
23
24 /**
25 * Implementation of basic methods in MappingStoreOps
26 */
27 @SuppressWarnings("unchecked, rawtypes")
28 public abstract class AbstractLongByteOps
29 implements MappingStoreOps<LongWritable, ByteWritable> {
30 /** Mapping store instance to operate on */
31 protected LongByteMappingStore mappingStore;
32
33 @Override
34 public void initialize(MappingStore<LongWritable,
35 ByteWritable> mappingStore) {
36 this.mappingStore = (LongByteMappingStore) mappingStore;
37 }
38
39 /**
40 * Compute partition given id, partitionCount, workerCount & target
41 * @param id vertex id
42 * @param partitionCount number of partitions
43 * @param workerCount number of workers
44 * @param target target worker
45 * @return partition number
46 */
47 protected int computePartition(LongWritable id, int partitionCount,
48 int workerCount, byte target) {
49 int numRows = partitionCount / workerCount;
50 numRows = (numRows * workerCount == partitionCount) ? numRows : numRows + 1;
51 if (target == -1) {
52 // default to hash based partitioning
53 return Math.abs(id.hashCode() % partitionCount);
54 } else {
55 int targetWorker = target & 0xFF;
56 // assume zero based indexing of partition & worker [also consecutive]
57 return numRows * targetWorker + Math.abs(id.hashCode() % numRows);
58 }
59 }
60 }