View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.conf;
20  
21  import org.apache.commons.lang3.StringUtils;
22  import org.apache.giraph.comm.flow_control.StaticFlowControl;
23  import org.apache.giraph.comm.netty.NettyClient;
24  import org.apache.giraph.master.BspServiceMaster;
25  import org.apache.giraph.worker.MemoryObserver;
26  import org.apache.hadoop.conf.Configuration;
27  
28  import com.google.common.base.Preconditions;
29  
30  import java.util.ArrayList;
31  import java.util.List;
32  
33  /**
34   * Default configuration used in Facebook
35   */
36  public class FacebookConfiguration implements BulkConfigurator {
37    /**
38     * How much memory per mapper should we use
39     */
40    public static final IntConfOption MAPPER_MEMORY =
41        new IntConfOption("giraph.mapperMemoryGb", 10,
42            "How many GBs of memory to give to the mappers");
43    /**
44     * How many cores per mapper should we use
45     */
46    public static final IntConfOption MAPPER_CORES =
47        new IntConfOption("giraph.mapperCores", 10,
48            "How many cores will mapper be allowed to use");
49  
50    /**
51     * Fraction of {@link #MAPPER_MEMORY} to use for new generation
52     */
53    public static final FloatConfOption NEW_GEN_MEMORY_FRACTION =
54        new FloatConfOption("giraph.newGenMemoryFraction", 0.1f,
55            "Fraction of total mapper memory to use for new generation");
56    /**
57     * Note: using G1 is often faster, but we've seen it add off heap memory
58     * overhead which can cause issues.
59     */
60    public static final BooleanConfOption USE_G1_COLLECTOR =
61        new BooleanConfOption("giraph.useG1Collector", false,
62            "Whether or not to use G1 garbage collector");
63    /**
64     * Which fraction of cores to use for threads when computation and
65     * communication overlap
66     */
67    public static final FloatConfOption CORES_FRACTION_DURING_COMMUNICATION =
68        new FloatConfOption("giraph.coresFractionDuringCommunication", 0.7f,
69            "Fraction of mapper cores to use for threads which overlap with" +
70                " network communication");
71  
72    /**
73     * Whether to configure java opts.
74     */
75    public static final BooleanConfOption CONFIGURE_JAVA_OPTS =
76        new BooleanConfOption("giraph.configureJavaOpts", true,
77            "Whether to configure java opts");
78  
79    /**
80     * Java options passed to mappers.
81     */
82    public static final StrConfOption MAPRED_JAVA_JOB_OPTIONS =
83        new StrConfOption("mapred.child.java.opts", null,
84            "Java options passed to mappers");
85  
86    /**
87     * Expand GiraphConfiguration with default Facebook settings.
88     * Assumes {@link #MAPPER_CORES} and number of workers to use
89     * are already set correctly in Configuration.
90     *
91     * For all conf options it changed it will only do so if they are not set,
92     * so it won't override any of your custom settings. The only exception is
93     * mapred.child.java.opts, this one will be overwritten depending on the
94     * {@link #CONFIGURE_JAVA_OPTS} setting
95     *
96     * @param conf Configuration
97     */
98    @Override
99    public void configure(GiraphConfiguration conf) {
100     int workers = conf.getInt(GiraphConstants.MIN_WORKERS, -1);
101     Preconditions.checkArgument(workers > 0, "Number of workers not set");
102     int cores = MAPPER_CORES.get(conf);
103 
104     // Nothing else happens while we write input splits to zk,
105     // so we can use all threads
106     conf.setIfUnset(BspServiceMaster.NUM_MASTER_ZK_INPUT_SPLIT_THREADS,
107         Integer.toString(cores));
108     // Nothing else happens while we write output, so we can use all threads
109     GiraphConstants.NUM_OUTPUT_THREADS.setIfUnset(conf, cores);
110 
111     int threadsDuringCommunication = Math.max(1,
112         (int) (cores * CORES_FRACTION_DURING_COMMUNICATION.get(conf)));
113     // Input overlaps with communication, set threads properly
114     GiraphConstants.NUM_INPUT_THREADS.setIfUnset(
115         conf, threadsDuringCommunication);
116     // Compute overlaps with communication, set threads properly
117     GiraphConstants.NUM_COMPUTE_THREADS.setIfUnset(
118         conf, threadsDuringCommunication);
119     // Netty server threads are the ones adding messages to stores,
120     // or adding vertices and edges to stores during input,
121     // these are expensive operations so set threads properly
122     GiraphConstants.NETTY_SERVER_THREADS.setIfUnset(
123         conf, threadsDuringCommunication);
124 
125     // Ensure we can utilize all communication threads by having enough
126     // channels per server, in cases when we have just a few machines
127     GiraphConstants.CHANNELS_PER_SERVER.setIfUnset(conf,
128         Math.max(1, 2 * threadsDuringCommunication / workers));
129 
130     // Limit number of open requests to 2000
131     NettyClient.LIMIT_NUMBER_OF_OPEN_REQUESTS.setIfUnset(conf, true);
132     StaticFlowControl.MAX_NUMBER_OF_OPEN_REQUESTS.setIfUnset(conf, 100);
133     // Pooled allocator in netty is faster
134     GiraphConstants.NETTY_USE_POOLED_ALLOCATOR.setIfUnset(conf, true);
135     // Turning off auto read is faster
136     GiraphConstants.NETTY_AUTO_READ.setIfUnset(conf, false);
137 
138     // Synchronize full gc calls across workers
139     MemoryObserver.USE_MEMORY_OBSERVER.setIfUnset(conf, true);
140 
141     // Increase number of partitions per compute thread
142     GiraphConstants.MIN_PARTITIONS_PER_COMPUTE_THREAD.setIfUnset(conf, 3);
143 
144     // Prefer ip addresses
145     GiraphConstants.PREFER_IP_ADDRESSES.setIfUnset(conf, true);
146 
147     // Track job progress
148     GiraphConstants.TRACK_JOB_PROGRESS_ON_CLIENT.setIfUnset(conf, true);
149     // Thread-level debugging for easier understanding
150     GiraphConstants.LOG_THREAD_LAYOUT.setIfUnset(conf, true);
151     // Enable tracking and printing of metrics
152     GiraphConstants.METRICS_ENABLE.setIfUnset(conf, true);
153 
154     if (CONFIGURE_JAVA_OPTS.get(conf)) {
155       List<String> javaOpts = getMemoryJavaOpts(conf);
156       javaOpts.addAll(getGcJavaOpts(conf));
157       MAPRED_JAVA_JOB_OPTIONS.set(conf, StringUtils.join(javaOpts, " "));
158     }
159   }
160 
161   /**
162    * Get memory java opts to use
163    *
164    * @param conf Configuration
165    * @return Java opts
166    */
167   public static List<String> getMemoryJavaOpts(Configuration conf) {
168     int memoryGb = MAPPER_MEMORY.get(conf);
169     List<String> javaOpts = new ArrayList<>();
170     // Set xmx and xms to the same value
171     javaOpts.add("-Xms" + memoryGb + "g");
172     javaOpts.add("-Xmx" + memoryGb + "g");
173     // Non-uniform memory allocator (great for multi-threading and appears to
174     // have no impact when single threaded)
175     javaOpts.add("-XX:+UseNUMA");
176     return javaOpts;
177   }
178 
179   /**
180    * Get garbage collection java opts to use
181    *
182    * @param conf Configuration
183    * @return Java opts
184    */
185   public static List<String> getGcJavaOpts(Configuration conf) {
186     List<String> gcJavaOpts = new ArrayList<>();
187     if (USE_G1_COLLECTOR.get(conf)) {
188       gcJavaOpts.add("-XX:+UseG1GC");
189       gcJavaOpts.add("-XX:MaxGCPauseMillis=500");
190     } else {
191       int newGenMemoryGb = Math.max(1,
192           (int) (MAPPER_MEMORY.get(conf) * NEW_GEN_MEMORY_FRACTION.get(conf)));
193       // Use parallel gc collector
194       gcJavaOpts.add("-XX:+UseParallelGC");
195       gcJavaOpts.add("-XX:+UseParallelOldGC");
196       // Fix new size generation
197       gcJavaOpts.add("-XX:NewSize=" + newGenMemoryGb + "g");
198       gcJavaOpts.add("-XX:MaxNewSize=" + newGenMemoryGb + "g");
199     }
200     return gcJavaOpts;
201   }
202 }