forked from AustP/Gitso
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathNATPMP.py
372 lines (312 loc) · 17 KB
/
NATPMP.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
#!/usr/bin/env python
"""NAT-PMP client library
Provides functions to interact with NAT-PMP gateways implementing version 0
of the NAT-PMP draft specification.
This version does not completely implement the draft standard.
* It does not provide functionality to listen for address change packets.
* It does not have a proper request queuing system, meaning that
multiple requests may be issued in parallel, against spec recommendations.
For more information on NAT-PMP, see the NAT-PMP draft specification:
http://files.dns-sd.org/draft-cheshire-nat-pmp.txt
Requires Python 2.3 or later.
Tested on Python 2.5, 2.6 against Apple AirPort Express.
0.2.2 - changed gateway autodetect, per github issue #1. thanks to jirib
0.2 - changed useException to use_exception, responseDataClass to response_data_class parameters in function calls for consistency
0.1 - repackaged via setuptools. Fixed major bug in gateway detection. Experimental gateway detection support for Windows 7. Python 2.6 testing.
0.0.1.2 - NT autodetection code. Thanks to roee shlomo for the gateway detection regex!
0.0.1.1 - Removed broken mutex code
0.0.1 - Initial release
"""
__version__ = "0.2"
__license__ = """Copyright (c) 2008-2010, Yiming Liu, All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* The names of the author and contributors may not be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS'
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE."""
__author__ = "Yiming Liu <http://www.yimingliu.com/>"
import struct, socket, select, time, platform
import sys, os, re
NATPMP_PORT = 5351
NATPMP_RESERVED_VAL = 0
NATPMP_PROTOCOL_UDP = 1
NATPMP_PROTOCOL_TCP = 2
NATPMP_GATEWAY_NO_VALID_GATEWAY = -10
NATPMP_GATEWAY_NO_SUPPORT = -11
NATPMP_GATEWAY_CANNOT_FIND = -12
NATPMP_RESULT_SUCCESS = 0 # Success
NATPMP_RESULT_UNSUPPORTED_VERSION = 1 # Unsupported Version
NATPMP_RESULT_NOT_AUTHORIZED = 2 # Not Authorized/Refused/NATPMP turned off
NATPMP_RESULT_NETWORK_FAILURE = 3 # Network Failure
NATPMP_RESULT_OUT_OF_RESOURCES = 4 # can not create more mappings
NATPMP_RESULT_UNSUPPORTED_OPERATION = 5 # not a supported opcode
# all remaining results are fatal errors
NATPMP_ERROR_DICT = {
NATPMP_RESULT_SUCCESS:"No error.",
NATPMP_RESULT_UNSUPPORTED_VERSION:"The protocol version specified is unsupported.",
NATPMP_RESULT_NOT_AUTHORIZED:"The operation was refused. NAT-PMP may be turned off on gateway.",
NATPMP_RESULT_NETWORK_FAILURE:"There was a network failure. The gateway may not have an IP address.",# Network Failure
NATPMP_RESULT_OUT_OF_RESOURCES:"The NAT-PMP gateway is out of resources and cannot create more mappings.", # can not create more mappings
NATPMP_RESULT_UNSUPPORTED_OPERATION:"The NAT-PMP gateway does not support this operation", # not a supported opcode
NATPMP_GATEWAY_NO_SUPPORT:'The gateway does not support NAT-PMP',
NATPMP_GATEWAY_NO_VALID_GATEWAY:'No valid gateway address was specified.',
NATPMP_GATEWAY_CANNOT_FIND:'Cannot automatically determine gateway address. Must specify manually.'
}
class NATPMPRequest(object):
"""Represents a basic NAT-PMP request. This currently consists of the
1-byte fields version and opcode.
Other requests are derived from NATPMPRequest.
"""
retry_increment = 0.250 # seconds
def __init__(self, version, opcode):
self.version = version
self.opcode = opcode
def toBytes(self):
"""Converts the request object to a byte string."""
return struct.pack('!BB', self.version, self.opcode)
class PublicAddressRequest(NATPMPRequest):
"""Represents a NAT-PMP request to the local gateway for a public address.
As per the specification, this is a generic request with the opcode = 0.
"""
def __init__(self, version=0):
NATPMPRequest.__init__(self, version, 0)
class PortMapRequest(NATPMPRequest):
"""Represents a NAT-PMP request to the local gateway for a port mapping.
As per the specification, this request extends NATPMPRequest with
the fields private_port, public_port, and lifetime. The first two
are 2-byte unsigned shorts, and the last is a 4-byte unsigned integer.
"""
def __init__(self, protocol, private_port, public_port, lifetime=3600, version=0):
NATPMPRequest.__init__(self, version, protocol)
self.private_port = private_port
self.public_port = public_port
self.lifetime = lifetime
def toBytes(self):
s= NATPMPRequest.toBytes(self) + struct.pack('!HHHI', NATPMP_RESERVED_VAL, self.private_port, self.public_port, self.lifetime)
return s
class NATPMPResponse(object):
"""Represents a generic NAT-PMP response from the local gateway. The
generic response has fields for version, opcode, result, and secs
since last epoch (last boot of the NAT gateway). As per the
specification, the opcode is offset by 128 from the opcode of
the original request.
"""
def __init__(self, version, opcode, result, sec_since_epoch):
self.version = version
self.opcode = opcode
self.result = result
self.sec_since_epoch = sec_since_epoch
def __str__(self):
return "NATPMPResponse(%d, %d, %d, $d)" % (self.version, self.opcode, self.result, self.sec_since_epoch)
class PublicAddressResponse(NATPMPResponse):
"""Represents a NAT-PMP response from the local gateway to a
public-address request. It has one additional 4-byte field
containing the IP returned.
The member variable ip contains the Python-friendly string form, while
ip_int contains the same in the original 4-byte unsigned int.
"""
def __init__(self, bytes):
version, opcode, result, sec_since_epoch, self.ip_int = struct.unpack("!BBHII", bytes)
NATPMPResponse.__init__(self, version, opcode, result, sec_since_epoch)
self.ip = socket.inet_ntoa(bytes[8:8+4])
#self.ip = socket.inet_ntoa(self.ip_bytes)
def __str__(self):
return "PublicAddressResponse: version %d, opcode %d (%d), result %d, ssec %d, ip %s" % (self.version, self.opcode, self.result, self.sec_since_epoch, self.ip)
class PortMapResponse(NATPMPResponse):
"""Represents a NAT-PMP response from the local gateway to a
public-address request. The response contains the private port,
public port, and the lifetime of the mapping in addition to typical
NAT-PMP headers. Note that the port mapping assigned is
NOT NECESSARILY the port requested (see the specification
for details).
"""
def __init__(self, bytes):
version, opcode, result, sec_since_epoch, self.private_port, self.public_port, self.lifetime = struct.unpack('!BBHIHHI', bytes)
NATPMPResponse.__init__(self, version, opcode, result, sec_since_epoch)
def __str__(self):
return "PortMapResponse: version %d, opcode %d (%d), result %d, ssec %d, private_port %d, public port %d, lifetime %d" % (self.version, self.opcode, self.opcode, self.result, self.sec_since_epoch, self.private_port, self.public_port, self.lifetime)
class NATPMPError(Exception):
"""Generic exception state. May be used to represent unknown errors."""
pass
class NATPMPResultError(NATPMPError):
"""Used when a NAT gateway responds with an error-state response."""
pass
class NATPMPNetworkError(NATPMPError):
"""Used when a network error occurred while communicating
with the NAT gateway."""
pass
class NATPMPUnsupportedError(NATPMPError):
"""Used when a NAT gateway does not support NAT-PMP."""
pass
def get_gateway_addr():
"""A hack to obtain the current gateway automatically, since
Python has no interface to sysctl().
This may or may not be the gateway we should be contacting.
It does not guarantee correct results.
This function requires the presence of
netstat on the path on POSIX and NT.
"""
addr = ""
shell_command = 'netstat -rn'
if os.name == "posix":
pattern = re.compile('(?:default|0\.0\.0\.0|::/0)\s+([\w\.:]+)\s+.*UG')
elif os.name == "nt":
if platform.version().startswith("6.1"):
pattern = re.compile(".*?0.0.0.0[ ]+0.0.0.0[ ]+(.*?)[ ]+?.*?\n")
else:
pattern = re.compile(".*?Default Gateway:[ ]+(.*?)\n")
system_out = os.popen(shell_command, 'r').read()
if not system_out:
raise NATPMPNetworkError(NATPMP_GATEWAY_CANNOT_FIND, error_str(NATPMP_GATEWAY_CANNOT_FIND))
match = pattern.search(system_out)
if not match:
raise NATPMPNetworkError(NATPMP_GATEWAY_CANNOT_FIND, error_str(NATPMP_GATEWAY_CANNOT_FIND))
addr = match.groups()[0].strip()
return addr # TODO: use real auto-detection
def error_str(result_code):
"""Takes a numerical error code and returns a human-readable
error string.
"""
result = NATPMP_ERROR_DICT.get(result_code)
if not result:
result = "Unknown fatal error."
return result
def get_gateway_socket(gateway):
"""Takes a gateway address string and returns a non-blocking UDP
socket to communicate with its NAT-PMP implementation on
NATPMP_PORT.
e.g. addr = get_gateway_socket('10.0.1.1')
"""
if not gateway:
raise NATPMPNetworkError(NATPMP_GATEWAY_NO_VALID_GATEWAY, error_str(NATPMP_GATEWAY_NO_VALID_GATEWAY))
response_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
response_socket.setblocking(0)
response_socket.connect((gateway, NATPMP_PORT))
return response_socket
def get_public_address(gateway_ip=None, retry=9):
"""A high-level function that returns the public interface IP of
the current host by querying the NAT-PMP gateway. IP is
returned as string.
Takes two possible keyword arguments:
gateway_ip - the IP to the NAT-PMP compatible gateway.
Defaults to using auto-detection function
get_gateway_addr()
retry - the number of times to retry the request if unsuccessful.
Defaults to 9 as per specification.
"""
if gateway_ip == None:
gateway_ip = get_gateway_addr()
addr = None
addr_request = PublicAddressRequest()
addr_response = send_request_with_retry(gateway_ip, addr_request, response_data_class=PublicAddressResponse, retry=retry)
if addr_response.result != 0:
#sys.stderr.write("NAT-PMP error %d: %s\n" % (addr_response.result, error_str(addr_response.result)))
#sys.stderr.flush()
raise NATPMPResultError(addr_response.result, error_str(addr_response.result), addr_response)
addr = addr_response.ip
return addr
def map_tcp_port(public_port, private_port, lifetime=3600, gateway_ip=None, retry=9, use_exception=True):
"""A high-level wrapper to map_port() that requests a mapping
for a public TCP port on the NAT to a private TCP port on this host.
Returns the complete response on success.
public_port - the public port of the mapping requested
private_port - the private port of the mapping requested
lifetime - the duration of the mapping in seconds.
Defaults to 3600, per specification.
gateway_ip - the IP to the NAT-PMP compatible gateway.
Defaults to using auto-detection function
get_gateway_addr()
retry - the number of times to retry the request if unsuccessful.
Defaults to 9 as per specification.
use_exception - throw an exception if an error result is
received from the gateway. Defaults to True.
"""
return map_port(NATPMP_PROTOCOL_TCP, public_port, private_port, lifetime, gateway_ip=gateway_ip, retry=retry, use_exception=use_exception)
def map_udp_port(public_port, private_port, lifetime=3600, gateway_ip=None, retry=9, use_exception=True):
"""A high-level wrapper to map_port() that requests a mapping for
a public UDP port on the NAT to a private UDP port on this host.
Returns the complete response on success.
public_port - the public port of the mapping requested
private_port - the private port of the mapping requested
lifetime - the duration of the mapping in seconds.
Defaults to 3600, per specification.
gateway_ip - the IP to the NAT-PMP compatible gateway.
Defaults to using auto-detection function
get_gateway_addr()
retry - the number of times to retry the request if unsuccessful.
Defaults to 9 as per specification.
use_exception - throw an exception if an error result is
received from the gateway. Defaults to True.
"""
return map_port(NATPMP_PROTOCOL_UDP, public_port, private_port, lifetime, gateway_ip=gateway_ip, retry=retry, use_exception=use_exception)
def map_port(protocol, public_port, private_port, lifetime=3600, gateway_ip=None, retry=9, use_exception=True):
"""A function to map public_port to private_port of protocol.
Returns the complete response on success.
protocol - NATPMP_PROTOCOL_UDP or NATPMP_PROTOCOL_TCP
public_port - the public port of the mapping requested
private_port - the private port of the mapping requested
lifetime - the duration of the mapping in seconds.
Defaults to 3600, per specification.
gateway_ip - the IP to the NAT-PMP compatible gateway.
Defaults to using auto-detection function
get_gateway_addr()
retry - the number of times to retry the request if unsuccessful.
Defaults to 9 as per specification.
use_exception - throw an exception if an error result
is received from the gateway. Defaults to True.
"""
if protocol not in [NATPMP_PROTOCOL_UDP, NATPMP_PROTOCOL_TCP]:
raise ValueError("Must be either NATPMP_PROTOCOL_UDP or NATPMP_PROTOCOL_TCP")
if gateway_ip == None:
gateway_ip = get_gateway_addr()
response = None
port_mapping_request = PortMapRequest(protocol, private_port, public_port, lifetime)
port_mapping_response = send_request_with_retry(gateway_ip, port_mapping_request, response_data_class=PortMapResponse, retry=retry)
if port_mapping_response.result != 0 and use_exception:
raise NATPMPResultError(port_mapping_response.result, error_str(port_mapping_response.result), port_mapping_response)
return port_mapping_response
def send_request(gateway_socket, request):
gateway_socket.sendall(request.toBytes())
def read_response(gateway_socket, timeout, responseSize=16):
data = ""
source_addr = ("", "")
rlist, wlist, xlist = select.select([gateway_socket], [], [], timeout)
if rlist:
resp_socket = rlist[0]
data,source_addr = resp_socket.recvfrom(responseSize)
return data,source_addr
def send_request_with_retry(gateway_ip, request, response_data_class=None, retry=9):
gateway_socket = get_gateway_socket(gateway_ip)
n = 1
data = ""
while n <= retry and not data:
send_request(gateway_socket, request)
data,source_addr = read_response(gateway_socket, n * request.retry_increment)
if source_addr[0] != gateway_ip or source_addr[1] != NATPMP_PORT:
data = "" # discard data if source mismatch, as per specification
n += 1
if n >= retry and not data:
raise NATPMPUnsupportedError(NATPMP_GATEWAY_NO_SUPPORT, error_str(NATPMP_GATEWAY_NO_SUPPORT))
if data and response_data_class:
data = response_data_class(data)
return data
if __name__ == "__main__":
addr = get_public_address()
map_resp = map_tcp_port(62001, 62001)
print addr
print map_resp.__dict__