1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.comm.requests;
20
21 import org.apache.giraph.comm.ServerData;
22 import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
23 import org.apache.giraph.comm.flow_control.FlowControl;
24
25 import java.io.DataInput;
26 import java.io.DataOutput;
27 import java.io.IOException;
28
29 import static com.google.common.base.Preconditions.checkState;
30
31 /**
32 * Send to a worker a signal to resume sending messages to sender worker. This
33 * type of request is used in adaptive credit-based flow control, where a
34 * worker (W) may assign credit value of 0 to some worker (U), so that U would
35 * stop sending messages to W. Later on, W may want to notify U to continue
36 * sending messages to W. Along with the resume signal, W also announces a new
37 * credit value to U.
38 */
39 public class SendResumeRequest extends WritableRequest
40 implements WorkerRequest {
41 /** credit value */
42 private short credit;
43
44 /** Constructor used for reflection only */
45 public SendResumeRequest() { }
46
47 /**
48 * Constructor
49 *
50 * @param credit credit value
51 */
52 public SendResumeRequest(short credit) {
53 checkState(credit > 0);
54 this.credit = credit;
55 }
56
57 @Override
58 public void doRequest(ServerData serverData) {
59 FlowControl flowControl =
60 serverData.getServiceWorker().getWorkerClient().getFlowControl();
61 checkState(flowControl != null);
62 ((CreditBasedFlowControl) flowControl).processResumeSignal(getClientId(),
63 credit, getRequestId());
64 }
65
66 @Override
67 public RequestType getType() {
68 return RequestType.SEND_RESUME_REQUEST;
69 }
70
71 @Override
72 void readFieldsRequest(DataInput input) throws IOException {
73 credit = input.readShort();
74 }
75
76 @Override
77 void writeRequest(DataOutput output) throws IOException {
78 output.writeShort(credit);
79 }
80 }