-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpeerProcess.java
More file actions
278 lines (268 loc) · 12.4 KB
/
peerProcess.java
File metadata and controls
278 lines (268 loc) · 12.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
import java.io.*;
import java.lang.reflect.Array;
import java.util.Vector;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;
import java.net.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicReference;
public class peerProcess {
protected String peerId;
protected int numberOfPreferredNeighbours;
protected int unchokingInterval;
protected int optimisticUnchokingInterval;
protected String fileName;
protected int fileSize;
protected int pieceSize;
protected int lastPieceSize;
protected int listeningPort;
protected boolean hasFile;
public ConcurrentLinkedQueue<Message> messageQueue;
public ConcurrentHashMap<String, TCPConnectionInfo> peersToTCPConnectionsMapping;
static String peerInfoConfig = "PeerInfo.cfg";
static String commonConfig = "Common.cfg";
public static Logger logger;
private String peerHome;
protected HashMap<String,RemotePeerInfo> peerInfoMap;
protected Vector<RemotePeerInfo> peersToConnect;
protected Vector<TCPConnectionInfo> activeConnections;
protected ConcurrentHashMap<String, boolean[]> bitFieldsOfPeers;
protected boolean[] myBitField;
protected int numberOfPieces;
protected AtomicReference<String> optimizedNeighbour; // based on timer tasks
protected ConcurrentLinkedQueue<String> interestedPeers; // based on interested messages
protected ConcurrentHashMap<String, Boolean> unchokeStatus; // based on timer tasks; irrespective of optimizedNeighbour
protected ConcurrentLinkedQueue<String> preferredNeighbours; // based on timer tasks
protected ConcurrentHashMap<String, Integer> downloadRate;
protected HashMap<String, Boolean> canRequestStatus; // based on choking and unchoking messages myProcess receives
protected HashSet<Integer> requestedPieces;
protected HashSet<Integer> downloadedPieces;
FileObject myFileObject;
/**
* Constructor for the peerProcess object. Initializes the peerId.
* */
peerProcess(String peerId) {
this.peerId = peerId;
peerHome = "peer_" + peerId + "/";
initializeLogger();
initializeConfig();
getPeerInfo();
activeConnections = new Vector<>();
messageQueue = new ConcurrentLinkedQueue<>();
peersToTCPConnectionsMapping = new ConcurrentHashMap<>();
interestedPeers = new ConcurrentLinkedQueue<>();
requestedPieces = new HashSet<>();
downloadedPieces = new HashSet<>();
}
/**
* Initializes Logger object for this peer
*/
private void initializeLogger() {
try {
logger = new Logger(this.peerId);
logger.start();
} catch (Exception ex) {
}
}
/**
* Initializes the Bit Field of the node and the File object based on the hasFile value.
* */
private void initializeBitFiledAndFile(){
try {
this.myBitField = new boolean[numberOfPieces];
File peerDir = new File(peerHome);
boolean mkdirRes = peerDir.mkdir();
String filePath = peerHome + fileName;
if (this.hasFile) {
Arrays.fill(myBitField, true);
} else {
Arrays.fill(myBitField, false);
//Create an empty file to write pieces to.
File newFile = new File(filePath);
boolean createNewFileRes = newFile.createNewFile();
}
myFileObject = new FileObject(filePath, fileSize, pieceSize);
}catch(IOException ex){
}
}
/**
* Initialize the config parameters in the local datastructures. Read from the common.cfg file.
* If the node does not have the target file, create a new file to read/write the shared pieces.
* Initialize the bitfields of this node and all the peers in the P2P network.
*/
void initializeConfig() {
String st;
try {
BufferedReader in = new BufferedReader(new FileReader(commonConfig));
System.out.println("\nINITIALIZING COMMON.CFG\n");
while ((st = in.readLine()) != null) {
String[] tokens = st.split("\\s+");
switch (tokens[0]) {
case "NumberOfPreferredNeighbors": // 2
System.out.println("NumberOfPreferredNeighbors: "+tokens[1]);
this.numberOfPreferredNeighbours = Integer.parseInt(tokens[1]);
break;
case "UnchokingInterval": // 5
System.out.println("UnchokingInterval: "+tokens[1]);
this.unchokingInterval = Integer.parseInt(tokens[1]);
break;
case "OptimisticUnchokingInterval": // 15
System.out.println("OptimisticUnchokingInterval: "+tokens[1]);
this.optimisticUnchokingInterval = Integer.parseInt(tokens[1]);
break;
case "FileName": // The File.dat
System.out.println("FileName: "+tokens[1]);
this.fileName = tokens[1];
break;
case "FileSize": // 10
System.out.println("FileSize: "+tokens[1]);
this.fileSize = Integer.parseInt(tokens[1]);
break;
case "PieceSize": // 1
System.out.println("PieceSize: "+tokens[1]+"\n");
this.pieceSize = Integer.parseInt(tokens[1]);
break;
default:
throw new Exception("Invalid parameter in the file Common.cfg");
}
}
numberOfPieces = fileSize / pieceSize;
if (fileSize % pieceSize != 0) {
numberOfPieces++;
lastPieceSize = fileSize % pieceSize;
}
in.close();
} catch (Exception ex) {
}
}
/**
* Get the information about all the peers in the network.
* Select the peers for which the TCP connection needs to be initiated by this node.
* Only the previously seen nodes in the peers list are requested for TCP connections.
* All the nodes succeeding the current node in the PeerInfo.cfg file need to send connection request to this node.
* */
void getPeerInfo() {
String st;
bitFieldsOfPeers = new ConcurrentHashMap<>();
peerInfoMap = new HashMap<>();
peersToConnect = new Vector<>();
canRequestStatus = new HashMap<>();
downloadRate = new ConcurrentHashMap<>();
optimizedNeighbour = new AtomicReference<>();
preferredNeighbours = new ConcurrentLinkedQueue<>();
unchokeStatus = new ConcurrentHashMap<>();
boolean makeConnections = true;
try {
System.out.println("INITIALIZING PEERINFO.CFG\n");
BufferedReader in = new BufferedReader(new FileReader(peerInfoConfig));
while ((st = in.readLine()) != null) {
System.out.println(st);
String[] tokens = st.split("\\s+");
RemotePeerInfo newNode = new RemotePeerInfo(tokens[0], tokens[1], tokens[2], Integer.parseInt(tokens[3]));
if (newNode.peerId.equals(this.peerId)) {
this.listeningPort = Integer.parseInt(newNode.peerPort);
this.hasFile = newNode.hasFile;
initializeBitFiledAndFile();
makeConnections = false;
} else {
peerInfoMap.put(newNode.peerId, newNode);
canRequestStatus.put(newNode.peerId, false);
downloadRate.put(newNode.peerId, 0);
unchokeStatus.put(newNode.peerId, true);
bitFieldsOfPeers.put(newNode.peerId, new boolean[numberOfPieces]);
}
if (makeConnections) {
peersToConnect.addElement(newNode);
}
}
System.out.println();
in.close();
} catch (Exception ex) {
}
}
/**
* This is the starting point for a peer node. TCP connections are established here with all the nodes in the P2P network.
* It then creates a Message handler to handle all the messages received by the node.
* */
public static void main(String[] args) {
peerProcess peerNode = new peerProcess(args[0]);
MessageHandler myMessageHandler = new MessageHandler(peerNode);
// new Thread(myMessageHandler).start();
Thread messageHandlerThread = new Thread(myMessageHandler);
ArrayList<Thread> listenerThreads = new ArrayList<>();
messageHandlerThread.start();
NeighbourHandler myNeighbourHandler = new NeighbourHandler(myMessageHandler);
myNeighbourHandler.runUnchokeTasks();
try {
int remainingPeers = peerNode.peerInfoMap.size();
// Send connection requests to all the peers listed in peersToConnect
for (RemotePeerInfo node : peerNode.peersToConnect) {
Socket requestSocket = new Socket(node.peerAddress, Integer.parseInt(node.peerPort));
ObjectInputStream in = new ObjectInputStream(requestSocket.getInputStream());
ObjectOutputStream out = new ObjectOutputStream(requestSocket.getOutputStream());
TCPConnectionInfo newTCPConnection = new TCPConnectionInfo(requestSocket, in, out, peerNode.peerId);
//Create a listener thread.
Runnable newListenerThread = new ListenerThread(newTCPConnection, peerNode.messageQueue, peerNode.peersToTCPConnectionsMapping);
// logger.writeLog(LogMessage.CLIENT_CONNECT,peerNode.);
Thread newListener = new Thread(newListenerThread);
listenerThreads.add(newListener);
newListener.start();
remainingPeers--;
peerNode.activeConnections.addElement(newTCPConnection);
}
//Start a server on this node and wait for the remaining nodes to send connection requests
ServerSocket listener = new ServerSocket(peerNode.listeningPort);
while (remainingPeers > 0) {
Socket listenSocket = listener.accept();
ObjectOutputStream out = new ObjectOutputStream(listenSocket.getOutputStream());
out.flush();
ObjectInputStream in = new ObjectInputStream(listenSocket.getInputStream());
TCPConnectionInfo newTCPConnection = new TCPConnectionInfo(listenSocket, in, out, peerNode.peerId);
Runnable newThread = new ListenerThread(newTCPConnection, peerNode.messageQueue, peerNode.peersToTCPConnectionsMapping);
new Thread(newThread).start();
peerNode.activeConnections.addElement(newTCPConnection);
remainingPeers--;
}
listener.close();
} catch (Exception ex) {
}
boolean isFileSharedToAllPeers = false;
boolean addedLogFor_downloadedAllFiles = false;
while (!isFileSharedToAllPeers) {
//Wait until the peer sharing process is finished
isFileSharedToAllPeers = true;
for(boolean hasPiece: peerNode.myBitField) {
if (!hasPiece) {
isFileSharedToAllPeers = false;
break;
}
}
for (boolean[] bitField : peerNode.bitFieldsOfPeers.values()) {
for (boolean hasPiece : bitField) {
if (!hasPiece) {
isFileSharedToAllPeers = false;
break;
}
}
}
if(peerNode.downloadedPieces.size()==peerNode.numberOfPieces && !addedLogFor_downloadedAllFiles){
addedLogFor_downloadedAllFiles = true;
peerProcess.logger.writeLog(LogMessage.DOWNLOAD_COMPLETE,null);
}
}
messageHandlerThread.interrupt();
for(Thread t: listenerThreads){
t.interrupt();
}
peerNode.myFileObject.cleanUp();
try {
logger.stop();
}catch (Exception ex){
}
System.out.println("Terminating Program");
System.exit(0);
}
}