1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.giraph.block_app.library.striping;
19
20 import org.apache.giraph.block_app.framework.block.Block;
21 import org.apache.giraph.block_app.framework.block.FilteringBlock;
22 import org.apache.giraph.block_app.framework.block.SequenceBlock;
23 import org.apache.giraph.function.Function;
24 import org.apache.giraph.function.Predicate;
25 import org.apache.giraph.function.primitive.Int2ObjFunction;
26 import org.apache.giraph.function.primitive.Obj2IntFunction;
27 import org.apache.giraph.function.vertex.SupplierFromVertex;
28 import org.apache.giraph.graph.Vertex;
29 import org.apache.hadoop.io.LongWritable;
30 import org.apache.hadoop.io.Writable;
31 import org.apache.hadoop.io.WritableComparable;
32
33 import com.google.common.base.Preconditions;
34
35 /**
36 * Utility functions for doing superstep striping.
37 *
38 * We need to make sure that partitioning (which uses mod for distributing
39 * data across workers) is independent from striping itself. So we are using
40 * fastHash function below, taken from https://code.google.com/p/fast-hash/.
41 */
42 public class StripingUtils {
43 private StripingUtils() { }
44
45 /* The MIT License
46
47 Copyright (C) 2012 Zilong Tan (eric.zltan@gmail.com)
48
49 Permission is hereby granted, free of charge, to any person
50 obtaining a copy of this software and associated documentation
51 files (the "Software"), to deal in the Software without
52 restriction, including without limitation the rights to use, copy,
53 modify, merge, publish, distribute, sublicense, and/or sell copies
54 of the Software, and to permit persons to whom the Software is
55 furnished to do so, subject to the following conditions:
56
57 The above copyright notice and this permission notice shall be
58 included in all copies or substantial portions of the Software.
59
60 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
61 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
62 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
63 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
64 BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
65 ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
66 CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
67 SOFTWARE.
68 */
69 /**
70 * Returns 32-bit hash of a given value.
71 *
72 * Fast and generally good hashing function, adapted from C++ implementation:
73 * https://code.google.com/p/fast-hash/
74 */
75 public static int fastHash(long h) {
76 h ^= h >> 23;
77 h *= 0x2127599bf4325c37L;
78 h ^= h >> 47;
79 return ((int) (h - (h >> 32))) & 0x7fffffff;
80 }
81
82 /**
83 * Returns number in [0, stripes) range, from given input {@code value}.
84 */
85 public static int fastStripe(long value, int stripes) {
86 return fastHash(value) % stripes;
87 }
88
89 /**
90 * Fast hash-based striping for LongWritable IDs, returns a function
91 * that for a given ID returns it's stripe index.
92 */
93 public static
94 Obj2IntFunction<LongWritable> fastHashStriping(final int stripes) {
95 return new Obj2IntFunction<LongWritable>() {
96 @Override
97 public int apply(LongWritable id) {
98 return fastStripe(id.get(), stripes);
99 }
100 };
101 }
102
103 /**
104 * Fast hash-based striping for LongWritable IDs, returns a function
105 * that for a given stripe index returns a predicate checking whether ID is
106 * in that stripe.
107 */
108 public static
109 Int2ObjFunction<Predicate<LongWritable>> fastHashStripingPredicate(
110 final int stripes) {
111 return new Int2ObjFunction<Predicate<LongWritable>>() {
112 @Override
113 public Predicate<LongWritable> apply(final int stripe) {
114 return new Predicate<LongWritable>() {
115 @Override
116 public boolean apply(LongWritable id) {
117 return fastStripe(id.get(), stripes) == stripe;
118 }
119 };
120 }
121 };
122 }
123
124 /**
125 * Generate striped block, with given number of {@code stripes},
126 * using given {@code blockGenerator} to generate block for each stripe.
127 *
128 * @param stripes Number of stripes
129 * @param blockGenerator Function given predicate representing whether
130 * ID is in current stripe, should return Block
131 * for current stripe
132 * @return Resulting block
133 */
134 public static Block generateStripedBlock(
135 int stripes,
136 Function<Predicate<LongWritable>, Block> blockGenerator) {
137 return generateStripedBlockImpl(
138 stripes, blockGenerator,
139 StripingUtils.fastHashStripingPredicate(stripes));
140 }
141
142 /**
143 * Generate striped block, with given number of {@code stripes},
144 * using given {@code blockGenerator} to generate block for each stripe,
145 * and using striping based on given {@code stripeSupplier}.
146 *
147 * @param stripes Number of stripes
148 * @param blockGenerator Function given predicate representing whether
149 * ID is in current stripe, should return Block
150 * for current stripe
151 * @param stripeSupplier Function given number of stripes,
152 * generates a function that given stripe index,
153 * returns predicate checking whether ID is in that
154 * stripe.
155 * @return Resulting block
156 */
157 public static <I extends WritableComparable>
158 Block generateStripedBlock(
159 int stripes,
160 Function<Predicate<I>, Block> blockGenerator,
161 Int2ObjFunction<Int2ObjFunction<Predicate<I>>> stripeSupplier) {
162 return generateStripedBlockImpl(
163 stripes, blockGenerator, stripeSupplier.apply(stripes));
164 }
165
166 /**
167 * Stripe given block, by calling vertexSend only in it's corresponding
168 * stripe. All other methods are called number of stripes times.
169 *
170 * @param stripes Number of stripes
171 * @param block Block to stripe
172 * @return Resulting block
173 */
174 public static Block stripeBlockBySenders(
175 int stripes,
176 Block block) {
177 return generateStripedBlockImpl(
178 stripes,
179 StripingUtils.<LongWritable>createSingleStripeBySendersFunction(block),
180 StripingUtils.fastHashStripingPredicate(stripes));
181 }
182
183 /**
184 * Given a block, creates a function that will given a predicate filter
185 * calls to vertexSend function based on that predicate.
186 *
187 * Useful to be combined with generateStripedBlock to stripe blocks.
188 */
189 public static <I extends WritableComparable> Function<Predicate<I>, Block>
190 createSingleStripeBySendersFunction(final Block block) {
191 return new Function<Predicate<I>, Block>() {
192 @Override
193 public Block apply(final Predicate<I> stripePredicate) {
194 return FilteringBlock.createSendFiltering(
195 new SupplierFromVertex<I, Writable, Writable, Boolean>() {
196 @Override
197 public Boolean get(Vertex<I, Writable, Writable> vertex) {
198 return stripePredicate.apply(vertex.getId());
199 }
200 }, block);
201 }
202 };
203 }
204
205 private static <I extends WritableComparable>
206 Block generateStripedBlockImpl(
207 int stripes,
208 Function<Predicate<I>, Block> blockGenerator,
209 Int2ObjFunction<Predicate<I>> stripeSupplier) {
210 Preconditions.checkArgument(stripes >= 1);
211 if (stripes == 1) {
212 return blockGenerator.apply(new Predicate<I>() {
213 @Override
214 public boolean apply(I input) {
215 return true;
216 }
217 });
218 }
219 Block[] blocks = new Block[stripes];
220 for (int i = 0; i < stripes; i++) {
221 blocks[i] = blockGenerator.apply(stripeSupplier.apply(i));
222 }
223 return new SequenceBlock(blocks);
224 }
225 }