1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.utils;
2021import org.apache.hadoop.conf.Configuration;
22import org.apache.hadoop.fs.FileStatus;
23import org.apache.hadoop.fs.FileSystem;
24import org.apache.hadoop.fs.Path;
25import org.apache.hadoop.fs.PathFilter;
26import org.apache.log4j.Logger;
2728import java.io.IOException;
29import java.security.InvalidParameterException;
3031importstatic org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY;
3233/**34 * Holds useful functions to get checkpoint paths35 * in hdfs.36 */37publicclassCheckpointingUtils {
3839/** If at the end of a checkpoint file, indicates metadata */40publicstaticfinal String CHECKPOINT_METADATA_POSTFIX = ".metadata";
41/**42 * If at the end of a checkpoint file, indicates vertices, edges,43 * messages, etc.44 */45publicstaticfinal String CHECKPOINT_VERTICES_POSTFIX = ".vertices";
46/**47 * If at the end of a checkpoint file, indicates metadata and data is valid48 * for the same filenames without .valid49 */50publicstaticfinal String CHECKPOINT_VALID_POSTFIX = ".valid";
51/**52 * If at the end of a checkpoint file,53 * indicates that we store WorkerContext and aggregator handler data.54 */55publicstaticfinal String CHECKPOINT_DATA_POSTFIX = ".data";
56/**57 * If at the end of a checkpoint file, indicates the stitched checkpoint58 * file prefixes. A checkpoint is not valid if this file does not exist.59 */60publicstaticfinal String CHECKPOINT_FINALIZED_POSTFIX = ".finalized";
6162/** Class logger */63privatestaticfinal Logger LOG = Logger.getLogger(CheckpointingUtils.class);
6465/**66 * Do not call constructor.67 */68privateCheckpointingUtils() {
69 }
7071/**72 * Path to the checkpoint's root (including job id)73 * @param conf Immutable configuration of the job74 * @param jobId job ID75 * @return checkpoint's root76 */77publicstatic String getCheckpointBasePath(Configuration conf,
78 String jobId) {
79return CHECKPOINT_DIRECTORY.getWithDefault(conf,
80 CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + jobId);
81 }
8283/**84 * Path to checkpoint&halt node in hdfs.85 * It is set to let client know that master has86 * successfully finished checkpointing and job can be restarted.87 * @param conf Immutable configuration of the job88 * @param jobId job ID89 * @return path to checkpoint&halt node in hdfs.90 */91publicstatic Path getCheckpointMarkPath(Configuration conf,
92 String jobId) {
93returnnew Path(getCheckpointBasePath(conf, jobId), "halt");
94 }
9596/**97 * Get the last saved superstep.98 *99 * @param fs file system where checkpoint is stored.100 * @param checkpointBasePath path to checkpoints folder101 * @return Last good superstep number102 * @throws java.io.IOException103 */104publicstaticlong getLastCheckpointedSuperstep(
105 FileSystem fs, String checkpointBasePath) throws IOException {
106 Path cpPath = new Path(checkpointBasePath);
107if (fs.exists(cpPath)) {
108 FileStatus[] fileStatusArray =
109 fs.listStatus(cpPath, newFinalizedCheckpointPathFilter());
110if (fileStatusArray != null) {
111long lastCheckpointedSuperstep = Long.MIN_VALUE;
112for (FileStatus file : fileStatusArray) {
113long superstep = getCheckpoint(file);
114if (superstep > lastCheckpointedSuperstep) {
115 lastCheckpointedSuperstep = superstep;
116 }
117 }
118if (LOG.isInfoEnabled()) {
119 LOG.info("getLastGoodCheckpoint: Found last good checkpoint " +
120 lastCheckpointedSuperstep);
121 }
122return lastCheckpointedSuperstep;
123 }
124 }
125return -1;
126 }
127128/**129 * Get the checkpoint from a finalized checkpoint path130 *131 * @param finalizedPath Path of the finalized checkpoint132 * @return Superstep referring to a checkpoint of the finalized path133 */134privatestaticlong getCheckpoint(FileStatus finalizedPath) {
135if (!finalizedPath.getPath().getName().
136 endsWith(CHECKPOINT_FINALIZED_POSTFIX)) {
137thrownew InvalidParameterException(
138"getCheckpoint: " + finalizedPath + "Doesn't end in " +
139 CHECKPOINT_FINALIZED_POSTFIX);
140 }
141 String checkpointString =
142 finalizedPath.getPath().getName().
143 replace(CHECKPOINT_FINALIZED_POSTFIX, "");
144return Long.parseLong(checkpointString);
145 }
146147148/**149 * Only get the finalized checkpoint files150 */151privatestaticclassFinalizedCheckpointPathFilterimplements PathFilter {
152 @Override
153publicboolean accept(Path path) {
154return path.getName().endsWith(CHECKPOINT_FINALIZED_POSTFIX);
155 }
156157 }
158 }