1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.conf;
2021import org.apache.commons.lang3.StringUtils;
22import org.apache.giraph.comm.flow_control.StaticFlowControl;
23import org.apache.giraph.comm.netty.NettyClient;
24import org.apache.giraph.master.BspServiceMaster;
25import org.apache.giraph.worker.MemoryObserver;
26import org.apache.hadoop.conf.Configuration;
2728import com.google.common.base.Preconditions;
2930import java.util.ArrayList;
31import java.util.List;
3233/**34 * Default configuration used in Facebook35 */36publicclassFacebookConfigurationimplementsBulkConfigurator {
37/**38 * How much memory per mapper should we use39 */40publicstaticfinalIntConfOption MAPPER_MEMORY =
41newIntConfOption("giraph.mapperMemoryGb", 10,
42"How many GBs of memory to give to the mappers");
43/**44 * How many cores per mapper should we use45 */46publicstaticfinalIntConfOption MAPPER_CORES =
47newIntConfOption("giraph.mapperCores", 10,
48"How many cores will mapper be allowed to use");
4950/**51 * Fraction of {@link #MAPPER_MEMORY} to use for new generation52 */53publicstaticfinalFloatConfOption NEW_GEN_MEMORY_FRACTION =
54newFloatConfOption("giraph.newGenMemoryFraction", 0.1f,
55"Fraction of total mapper memory to use for new generation");
56/**57 * Note: using G1 is often faster, but we've seen it add off heap memory58 * overhead which can cause issues.59 */60publicstaticfinalBooleanConfOption USE_G1_COLLECTOR =
61newBooleanConfOption("giraph.useG1Collector", false,
62"Whether or not to use G1 garbage collector");
63/**64 * Which fraction of cores to use for threads when computation and65 * communication overlap66 */67publicstaticfinalFloatConfOption CORES_FRACTION_DURING_COMMUNICATION =
68newFloatConfOption("giraph.coresFractionDuringCommunication", 0.7f,
69"Fraction of mapper cores to use for threads which overlap with" +
70" network communication");
7172/**73 * Whether to configure java opts.74 */75publicstaticfinalBooleanConfOption CONFIGURE_JAVA_OPTS =
76newBooleanConfOption("giraph.configureJavaOpts", true,
77"Whether to configure java opts");
7879/**80 * Java options passed to mappers.81 */82publicstaticfinalStrConfOption MAPRED_JAVA_JOB_OPTIONS =
83newStrConfOption("mapred.child.java.opts", null,
84"Java options passed to mappers");
8586/**87 * Expand GiraphConfiguration with default Facebook settings.88 * Assumes {@link #MAPPER_CORES} and number of workers to use89 * are already set correctly in Configuration.90 *91 * For all conf options it changed it will only do so if they are not set,92 * so it won't override any of your custom settings. The only exception is93 * mapred.child.java.opts, this one will be overwritten depending on the94 * {@link #CONFIGURE_JAVA_OPTS} setting95 *96 * @param conf Configuration97 */98 @Override
99publicvoid configure(GiraphConfiguration conf) {
100int workers = conf.getInt(GiraphConstants.MIN_WORKERS, -1);
101 Preconditions.checkArgument(workers > 0, "Number of workers not set");
102int cores = MAPPER_CORES.get(conf);
103104// Nothing else happens while we write input splits to zk,105// so we can use all threads106 conf.setIfUnset(BspServiceMaster.NUM_MASTER_ZK_INPUT_SPLIT_THREADS,
107 Integer.toString(cores));
108// Nothing else happens while we write output, so we can use all threads109 GiraphConstants.NUM_OUTPUT_THREADS.setIfUnset(conf, cores);
110111int threadsDuringCommunication = Math.max(1,
112 (int) (cores * CORES_FRACTION_DURING_COMMUNICATION.get(conf)));
113// Input overlaps with communication, set threads properly114 GiraphConstants.NUM_INPUT_THREADS.setIfUnset(
115 conf, threadsDuringCommunication);
116// Compute overlaps with communication, set threads properly117 GiraphConstants.NUM_COMPUTE_THREADS.setIfUnset(
118 conf, threadsDuringCommunication);
119// Netty server threads are the ones adding messages to stores,120// or adding vertices and edges to stores during input,121// these are expensive operations so set threads properly122 GiraphConstants.NETTY_SERVER_THREADS.setIfUnset(
123 conf, threadsDuringCommunication);
124125// Ensure we can utilize all communication threads by having enough126// channels per server, in cases when we have just a few machines127 GiraphConstants.CHANNELS_PER_SERVER.setIfUnset(conf,
128 Math.max(1, 2 * threadsDuringCommunication / workers));
129130// Limit number of open requests to 2000131 NettyClient.LIMIT_NUMBER_OF_OPEN_REQUESTS.setIfUnset(conf, true);
132 StaticFlowControl.MAX_NUMBER_OF_OPEN_REQUESTS.setIfUnset(conf, 100);
133// Pooled allocator in netty is faster134 GiraphConstants.NETTY_USE_POOLED_ALLOCATOR.setIfUnset(conf, true);
135136// Synchronize full gc calls across workers137 MemoryObserver.USE_MEMORY_OBSERVER.setIfUnset(conf, true);
138139// Increase number of partitions per compute thread140 GiraphConstants.MIN_PARTITIONS_PER_COMPUTE_THREAD.setIfUnset(conf, 3);
141142// Prefer ip addresses143 GiraphConstants.PREFER_IP_ADDRESSES.setIfUnset(conf, true);
144145// Track job progress146 GiraphConstants.TRACK_JOB_PROGRESS_ON_CLIENT.setIfUnset(conf, true);
147// Thread-level debugging for easier understanding148 GiraphConstants.LOG_THREAD_LAYOUT.setIfUnset(conf, true);
149// Enable tracking and printing of metrics150 GiraphConstants.METRICS_ENABLE.setIfUnset(conf, true);
151152if (CONFIGURE_JAVA_OPTS.get(conf)) {
153 List<String> javaOpts = getMemoryJavaOpts(conf);
154 javaOpts.addAll(getGcJavaOpts(conf));
155 MAPRED_JAVA_JOB_OPTIONS.set(conf, StringUtils.join(javaOpts, " "));
156 }
157 }
158159/**160 * Get memory java opts to use161 *162 * @param conf Configuration163 * @return Java opts164 */165publicstatic List<String> getMemoryJavaOpts(Configuration conf) {
166int memoryGb = MAPPER_MEMORY.get(conf);
167 List<String> javaOpts = new ArrayList<>();
168// Set xmx and xms to the same value169 javaOpts.add("-Xms" + memoryGb + "g");
170 javaOpts.add("-Xmx" + memoryGb + "g");
171// Non-uniform memory allocator (great for multi-threading and appears to172// have no impact when single threaded)173 javaOpts.add("-XX:+UseNUMA");
174return javaOpts;
175 }
176177/**178 * Get garbage collection java opts to use179 *180 * @param conf Configuration181 * @return Java opts182 */183publicstatic List<String> getGcJavaOpts(Configuration conf) {
184 List<String> gcJavaOpts = new ArrayList<>();
185if (USE_G1_COLLECTOR.get(conf)) {
186 gcJavaOpts.add("-XX:+UseG1GC");
187 gcJavaOpts.add("-XX:MaxGCPauseMillis=500");
188 } else {
189int newGenMemoryGb = Math.max(1,
190 (int) (MAPPER_MEMORY.get(conf) * NEW_GEN_MEMORY_FRACTION.get(conf)));
191// Use parallel gc collector192 gcJavaOpts.add("-XX:+UseParallelGC");
193 gcJavaOpts.add("-XX:+UseParallelOldGC");
194// Fix new size generation195 gcJavaOpts.add("-XX:NewSize=" + newGenMemoryGb + "g");
196 gcJavaOpts.add("-XX:MaxNewSize=" + newGenMemoryGb + "g");
197 }
198return gcJavaOpts;
199 }
200 }