This project has retired. For details please refer to its Attic page.
SaslServerHandler xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty.handler;
20  
21  import org.apache.giraph.comm.netty.NettyServer;
22  import org.apache.giraph.comm.netty.SaslNettyServer;
23  import org.apache.giraph.comm.requests.RequestType;
24  import org.apache.giraph.comm.requests.SaslCompleteRequest;
25  import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
26  import org.apache.giraph.comm.requests.WritableRequest;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.mapred.JobConf;
29  import org.apache.hadoop.mapreduce.security.TokenCache;
30  import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
31  import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
32  import org.apache.hadoop.security.Credentials;
33  import org.apache.hadoop.security.UserGroupInformation;
34  import org.apache.hadoop.security.token.Token;
35  import org.apache.hadoop.security.token.TokenIdentifier;
36  import org.apache.hadoop.util.ReflectionUtils;
37  import org.apache.log4j.Logger;
38  import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
39  
40  import io.netty.channel.ChannelHandlerContext;
41  import io.netty.channel.ChannelInboundHandlerAdapter;
42  
43  import java.io.ByteArrayInputStream;
44  import java.io.DataInputStream;
45  import java.io.IOException;
46  import java.util.Collection;
47  
48  import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED;
49  
50  /**
51   * Generate SASL response tokens to client SASL tokens, allowing clients to
52   * authenticate themselves with this server.
53   */
54  public class SaslServerHandler extends
55      ChannelInboundHandlerAdapter {
56      /** Class logger */
57    private static final Logger LOG =
58        Logger.getLogger(SaslServerHandler.class);
59  
60    // TODO: Move out into a separate, dedicated handler: ("FirstRequestHandler")
61    // or similar.
62    /** Already closed first request? */
63    private static volatile boolean ALREADY_CLOSED_FIRST_REQUEST = false;
64  
65    /** Close connection on first request (used for simulating failure) */
66    private final boolean closeFirstRequest;
67    /** Used to store Hadoop Job Tokens to authenticate clients. */
68    private JobTokenSecretManager secretManager;
69  
70    /**
71     * Constructor
72     *
73     * @param conf Configuration
74     */
75    public SaslServerHandler(
76        Configuration conf) throws IOException {
77      SaslNettyServer.init(conf);
78      setupSecretManager(conf);
79      closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
80    }
81  
82    @Override
83    public void channelRead(ChannelHandlerContext ctx, Object msg)
84      throws Exception {
85  
86      if (LOG.isDebugEnabled()) {
87        LOG.debug("messageReceived: Got " + msg.getClass());
88      }
89  
90      WritableRequest writableRequest = (WritableRequest) msg;
91      // Simulate a closed connection on the first request (if desired)
92      // TODO: Move out into a separate, dedicated handler.
93      if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
94        LOG.info("messageReceived: Simulating closing channel on first " +
95            "request " + writableRequest.getRequestId() + " from " +
96            writableRequest.getClientId());
97        setAlreadyClosedFirstRequest();
98        ctx.close();
99        return;
100     }
101 
102     if (writableRequest.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) {
103       // initialize server-side SASL functionality, if we haven't yet
104       // (in which case we are looking at the first SASL message from the
105       // client).
106       SaslNettyServer saslNettyServer =
107           ctx.attr(NettyServer.CHANNEL_SASL_NETTY_SERVERS).get();
108       if (saslNettyServer == null) {
109         if (LOG.isDebugEnabled()) {
110           LOG.debug("No saslNettyServer for " + ctx.channel() +
111               " yet; creating now, with secret manager: " + secretManager);
112         }
113         try {
114           saslNettyServer = new SaslNettyServer(secretManager,
115             AuthMethod.SIMPLE);
116         } catch (IOException ioe) { //TODO:
117           throw new RuntimeException(ioe);
118         }
119         ctx.attr(NettyServer.CHANNEL_SASL_NETTY_SERVERS).set(saslNettyServer);
120       } else {
121         if (LOG.isDebugEnabled()) {
122           LOG.debug("Found existing saslNettyServer on server:" +
123               ctx.channel().localAddress() + " for client " +
124               ctx.channel().remoteAddress());
125         }
126       }
127 
128       ((SaslTokenMessageRequest) writableRequest).processToken(saslNettyServer);
129       // Send response to client.
130       ctx.write(writableRequest);
131       if (saslNettyServer.isComplete()) {
132         // If authentication of client is complete, we will also send a
133         // SASL-Complete message to the client.
134         if (LOG.isDebugEnabled()) {
135           LOG.debug("SASL authentication is complete for client with " +
136               "username: " + saslNettyServer.getUserName());
137         }
138         SaslCompleteRequest saslComplete = new SaslCompleteRequest();
139         ctx.write(saslComplete);
140         if (LOG.isDebugEnabled()) {
141           LOG.debug("Removing SaslServerHandler from pipeline since SASL " +
142               "authentication is complete.");
143         }
144         ctx.pipeline().remove(this);
145       }
146       ctx.flush();
147       // do not send upstream to other handlers: no further action needs to be
148       // done for SASL_TOKEN_MESSAGE_REQUEST requests.
149       return;
150     } else {
151       // Client should not be sending other-than-SASL messages before
152       // SaslServerHandler has removed itself from the pipeline. Such non-SASL
153       // requests will be denied by the Authorize channel handler (the next
154       // handler upstream in the server pipeline) if SASL authentication has
155       // not completed.
156       LOG.warn("Sending upstream an unexpected non-SASL message :  " +
157           writableRequest);
158       ctx.fireChannelRead(msg);
159     }
160   }
161 
162   /**
163    * Set already closed first request flag
164    */
165   private static void setAlreadyClosedFirstRequest() {
166     ALREADY_CLOSED_FIRST_REQUEST = true;
167   }
168 
169   /**
170    * Load Hadoop Job Token into secret manager.
171    *
172    * @param conf Configuration
173    * @throws IOException
174    */
175   private void setupSecretManager(Configuration conf) throws IOException {
176     secretManager = new JobTokenSecretManager();
177     String localJobTokenFile = System.getenv().get(
178         UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
179     if (localJobTokenFile == null) {
180       throw new IOException("Could not find job credentials: environment " +
181           "variable: " + UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION +
182           " was not defined.");
183     }
184     JobConf jobConf = new JobConf(conf);
185 
186     // Find the JobTokenIdentifiers among all the tokens available in the
187     // jobTokenFile and store them in the secretManager.
188     Credentials credentials =
189         TokenCache.loadTokens(localJobTokenFile, jobConf);
190     Collection<Token<? extends TokenIdentifier>> collection =
191         credentials.getAllTokens();
192     for (Token<? extends TokenIdentifier> token:  collection) {
193       TokenIdentifier tokenIdentifier = decodeIdentifier(token,
194           JobTokenIdentifier.class);
195       if (tokenIdentifier instanceof JobTokenIdentifier) {
196         Token<JobTokenIdentifier> theToken =
197             (Token<JobTokenIdentifier>) token;
198         JobTokenIdentifier jobTokenIdentifier =
199             (JobTokenIdentifier) tokenIdentifier;
200         secretManager.addTokenForJob(
201             jobTokenIdentifier.getJobId().toString(), theToken);
202       }
203     }
204     if (LOG.isDebugEnabled()) {
205       LOG.debug("loaded JobToken credentials: " + credentials + " from " +
206           "localJobTokenFile: " + localJobTokenFile);
207     }
208   }
209 
210   /**
211    * Get the token identifier object, or null if it could not be constructed
212    * (because the class could not be loaded, for example).
213    * Hadoop 2.0.0 (and older Hadoop2 versions? (verify)) need this.
214    * Hadoop 2.0.1 and newer have a Token.decodeIdentifier() method and do not
215    * need this. Might want to create a munge flag to distinguish 2.0.0 vs newer.
216    *
217    * @param token the token to decode into a TokenIdentifier
218    * @param cls the subclass of TokenIdentifier to decode the token into.
219    * @return the token identifier.
220    * @throws IOException
221    */
222   @SuppressWarnings("unchecked")
223   private TokenIdentifier decodeIdentifier(
224       Token<? extends TokenIdentifier> token,
225       Class<? extends TokenIdentifier> cls) throws IOException {
226     TokenIdentifier tokenIdentifier = ReflectionUtils.newInstance(cls, null);
227     ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
228     DataInputStream in = new DataInputStream(buf);
229     tokenIdentifier.readFields(in);
230     in.close();
231     return tokenIdentifier;
232   }
233 
234   /** Factory for {@link SaslServerHandler} */
235   public static class Factory {
236     /**
237      * Constructor
238      */
239     public Factory() {
240     }
241     /**
242      * Create new {@link SaslServerHandler}
243      *
244      * @param conf Configuration to use
245      * @return New {@link SaslServerHandler}
246      */
247     public SaslServerHandler newHandler(
248         Configuration conf) throws IOException {
249       return new SaslServerHandler(conf);
250     }
251   }
252 }