mirror of https://github.com/citusdata/citus.git
working in combination with patch to mitmproxy
parent
ca08ce82a5
commit
5343f2f5bf
|
@ -42,15 +42,15 @@ $ make check-failure
|
|||
$ mkfifo /tmp/mitm.fifo # first, you need a fifo
|
||||
$ cd src/test/regress
|
||||
$ pipenv shell
|
||||
$ mitmdump --rawtcp -p 9702 --mode reverse:localhost:9700 -s mitmscripts/fluent.py --set fifo=/tmp/mitm.fifo
|
||||
$ mitmdump --rawtcp -p 9703 --mode reverse:localhost:9702 -s mitmscripts/fluent.py --set fifo=/tmp/mitm.fifo
|
||||
```
|
||||
|
||||
The specific port numbers will be different depending on your setup. The above string means mitmdump will accept connections on port `9702` and forward them to the worker listening on port `9700`.
|
||||
The specific port numbers will be different depending on your setup. The above string means mitmdump will accept connections on port `9703` and forward them to the worker listening on port `9702`.
|
||||
|
||||
Now, open psql and run:
|
||||
|
||||
```psql
|
||||
# UPDATE pg_dist_node SET nodeport = 9702 WHERE nodeport = 9700;
|
||||
# UPDATE pg_dist_node SET nodeport = 9703 WHERE nodeport = 9702;
|
||||
```
|
||||
|
||||
Again, the specific port numbers depend on your setup.
|
||||
|
@ -62,6 +62,12 @@ In a psql front-end run
|
|||
# \i src/test/regress/sql/failure_test_helpers.sql
|
||||
```
|
||||
|
||||
> **_NOTE:_** To make the script above work start psql as follows
|
||||
> ```bash
|
||||
> psql -p9700 --variable=worker_2_port=9702
|
||||
> ```
|
||||
> Assuming the coordinator is running on 9700 and worker 2 (which is going to be intercepted) runs on 9702
|
||||
|
||||
The above file creates some UDFs and also disables a few citus features which make connections in the background.
|
||||
|
||||
You also want to tell the UDFs how to talk to mitmproxy (careful, this must be an absolute path):
|
||||
|
|
|
@ -14,6 +14,8 @@ import queue
|
|||
|
||||
from construct.lib import ListContainer
|
||||
from mitmproxy import ctx, tcp
|
||||
from mitmproxy.proxy import commands
|
||||
from mitmproxy.script import concurrent
|
||||
from mitmproxy.utils import strutils
|
||||
from mitmproxy.proxy.layers import TCPLayer, ClientTLSLayer, ServerTLSLayer
|
||||
|
||||
|
@ -344,10 +346,10 @@ def listen_for_commands(fifoname):
|
|||
global connection_count
|
||||
result = ''
|
||||
|
||||
if recorder.command is 'reset':
|
||||
if recorder.command == 'reset':
|
||||
result = ''
|
||||
connection_count = count()
|
||||
elif recorder.command is not 'dump':
|
||||
elif recorder.command != 'dump':
|
||||
# this should never happen
|
||||
raise Exception('Unrecognized command: {}'.format(recorder.command))
|
||||
|
||||
|
@ -355,7 +357,7 @@ def listen_for_commands(fifoname):
|
|||
messages = all_items(captured_messages)
|
||||
messages = drop_terminate_messages(messages)
|
||||
for message in messages:
|
||||
if recorder.command is 'reset':
|
||||
if recorder.command == 'reset':
|
||||
continue
|
||||
results.append(emit_message(message))
|
||||
result = '\n'.join(results)
|
||||
|
@ -431,16 +433,16 @@ def configure(updated):
|
|||
create_thread(fifoname)
|
||||
|
||||
|
||||
def next_layer(layer):
|
||||
'''
|
||||
mitmproxy wasn't really meant for intercepting raw tcp streams, it tries to wrap the
|
||||
upsteam connection (the one to the worker) in a tls stream. This hook intercepts the
|
||||
part where it creates the TlsLayer (it happens in root_context.py) and instead creates
|
||||
a RawTCPLayer. That's the layer which calls our tcp_message hook
|
||||
'''
|
||||
if isinstance(layer, ClientTLSLayer) or isinstance(layer, ServerTLSLayer):
|
||||
replacement = TCPLayer(layer.ctx)
|
||||
layer.reply.send(replacement)
|
||||
# def next_layer(layer):
|
||||
# '''
|
||||
# mitmproxy wasn't really meant for intercepting raw tcp streams, it tries to wrap the
|
||||
# upsteam connection (the one to the worker) in a tls stream. This hook intercepts the
|
||||
# part where it creates the TlsLayer (it happens in root_context.py) and instead creates
|
||||
# a RawTCPLayer. That's the layer which calls our tcp_message hook
|
||||
# '''
|
||||
# if isinstance(layer, ClientTLSLayer) or isinstance(layer, ServerTLSLayer):
|
||||
# replacement = TCPLayer(layer.ctx)
|
||||
# layer.reply.send(replacement)
|
||||
|
||||
|
||||
def tcp_message(flow: tcp.TCPFlow):
|
||||
|
@ -450,6 +452,8 @@ def tcp_message(flow: tcp.TCPFlow):
|
|||
'''
|
||||
global connection_count
|
||||
|
||||
logging.info("tcp_message: %s, %s", type(flow).__name__, str(flow.live))
|
||||
|
||||
tcp_msg = flow.messages[-1]
|
||||
|
||||
# Keep track of all the different connections, assign a unique id to each
|
||||
|
|
Loading…
Reference in New Issue