This project has retired. For details please refer to its Attic page.
CheckpointingUtils xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.utils;
20  
21  import org.apache.hadoop.conf.Configuration;
22  import org.apache.hadoop.fs.FileStatus;
23  import org.apache.hadoop.fs.FileSystem;
24  import org.apache.hadoop.fs.Path;
25  import org.apache.hadoop.fs.PathFilter;
26  import org.apache.log4j.Logger;
27  
28  import java.io.IOException;
29  import java.security.InvalidParameterException;
30  
31  import static org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY;
32  
33  /**
34   * Holds useful functions to get checkpoint paths
35   * in hdfs.
36   */
37  public class CheckpointingUtils {
38  
39    /** If at the end of a checkpoint file, indicates metadata */
40    public static final String CHECKPOINT_METADATA_POSTFIX = ".metadata";
41    /**
42     * If at the end of a checkpoint file, indicates vertices, edges,
43     * messages, etc.
44     */
45    public static final String CHECKPOINT_VERTICES_POSTFIX = ".vertices";
46    /**
47     * If at the end of a checkpoint file, indicates metadata and data is valid
48     * for the same filenames without .valid
49     */
50    public static final String CHECKPOINT_VALID_POSTFIX = ".valid";
51    /**
52     * If at the end of a checkpoint file,
53     * indicates that we store WorkerContext and aggregator handler data.
54     */
55    public static final String CHECKPOINT_DATA_POSTFIX = ".data";
56    /**
57     * If at the end of a checkpoint file, indicates the stitched checkpoint
58     * file prefixes.  A checkpoint is not valid if this file does not exist.
59     */
60    public static final String CHECKPOINT_FINALIZED_POSTFIX = ".finalized";
61  
62    /** Class logger */
63    private static final Logger LOG = Logger.getLogger(CheckpointingUtils.class);
64  
65    /**
66     * Do not call constructor.
67     */
68    private CheckpointingUtils() {
69    }
70  
71    /**
72     * Path to the checkpoint's root (including job id)
73     * @param conf Immutable configuration of the job
74     * @param jobId job ID
75     * @return checkpoint's root
76     */
77    public static String getCheckpointBasePath(Configuration conf,
78                                               String jobId) {
79      return CHECKPOINT_DIRECTORY.getWithDefault(conf,
80          CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + jobId);
81    }
82  
83    /**
84     * Path to checkpoint&halt node in hdfs.
85     * It is set to let client know that master has
86     * successfully finished checkpointing and job can be restarted.
87     * @param conf Immutable configuration of the job
88     * @param jobId job ID
89     * @return path to checkpoint&halt node in hdfs.
90     */
91    public static Path getCheckpointMarkPath(Configuration conf,
92                                             String jobId) {
93      return new Path(getCheckpointBasePath(conf, jobId), "halt");
94    }
95  
96    /**
97     * Get the last saved superstep.
98     *
99     * @param fs file system where checkpoint is stored.
100    * @param checkpointBasePath path to checkpoints folder
101    * @return Last good superstep number
102    * @throws java.io.IOException
103    */
104   public static long getLastCheckpointedSuperstep(
105       FileSystem fs, String checkpointBasePath) throws IOException {
106     Path cpPath = new Path(checkpointBasePath);
107     if (fs.exists(cpPath)) {
108       FileStatus[] fileStatusArray =
109           fs.listStatus(cpPath, new FinalizedCheckpointPathFilter());
110       if (fileStatusArray != null) {
111         long lastCheckpointedSuperstep = Long.MIN_VALUE;
112         for (FileStatus file : fileStatusArray) {
113           long superstep = getCheckpoint(file);
114           if (superstep > lastCheckpointedSuperstep) {
115             lastCheckpointedSuperstep = superstep;
116           }
117         }
118         if (LOG.isInfoEnabled()) {
119           LOG.info("getLastGoodCheckpoint: Found last good checkpoint " +
120               lastCheckpointedSuperstep);
121         }
122         return lastCheckpointedSuperstep;
123       }
124     }
125     return -1;
126   }
127 
128   /**
129    * Get the checkpoint from a finalized checkpoint path
130    *
131    * @param finalizedPath Path of the finalized checkpoint
132    * @return Superstep referring to a checkpoint of the finalized path
133    */
134   private static long getCheckpoint(FileStatus finalizedPath) {
135     if (!finalizedPath.getPath().getName().
136         endsWith(CHECKPOINT_FINALIZED_POSTFIX)) {
137       throw new InvalidParameterException(
138           "getCheckpoint: " + finalizedPath + "Doesn't end in " +
139               CHECKPOINT_FINALIZED_POSTFIX);
140     }
141     String checkpointString =
142         finalizedPath.getPath().getName().
143             replace(CHECKPOINT_FINALIZED_POSTFIX, "");
144     return Long.parseLong(checkpointString);
145   }
146 
147 
148   /**
149    * Only get the finalized checkpoint files
150    */
151   private static class FinalizedCheckpointPathFilter implements PathFilter {
152     @Override
153     public boolean accept(Path path) {
154       return path.getName().endsWith(CHECKPOINT_FINALIZED_POSTFIX);
155     }
156 
157   }
158 }