network proxy-based failure testing

- Lots of detail is in src/test/regress/mitmscripts/README
- Create a new target, make check-failure, which runs tests
- Tells travis how to install everything and run the tests
pull/2257/head
Brian Cloutier 2018-03-20 18:51:09 -07:00 committed by Brian Cloutier
parent c6cf40e9c7
commit a54f9a6d2c
14 changed files with 1565 additions and 2 deletions

View File

@ -1,6 +1,8 @@
sudo: required
dist: trusty
language: c
python:
- "3.5"
cache:
apt: true
directories:
@ -27,10 +29,19 @@ before_install:
- setup_apt
- curl https://install.citusdata.com/community/deb.sh | sudo bash
- nuke_pg
- pyenv versions
- pyenv global 3.6
- sudo apt-get install python3-pip
- sudo pip3 install --upgrade pip
- python --version
- python3 --version
install:
- install_uncrustify
- install_pg
- install_custom_pg
- pip3 install --user mitmproxy==3.0.4
- pip3 install --user construct==2.9.45
- mitmproxy --version
# download and install HLL manually, as custom builds won't satisfy deps
# only install if performing non-11 build
- |

View File

@ -37,7 +37,7 @@ output_files := $(patsubst $(citus_abs_srcdir)/output/%.source,expected/%.out, $
# intermediate, for muscle memory backward compatibility.
check: check-full
# check-full triggers all tests that ought to be run routinely
check-full: check-multi check-multi-mx check-multi-task-tracker-extra check-worker check-follower-cluster
check-full: check-multi check-multi-mx check-multi-task-tracker-extra check-worker check-follower-cluster check-failure
# using pg_regress_multi_check unnecessarily starts up multiple nodes, which isn't needed
# for check-worker. But that's harmless besides a few cycles.
@ -79,6 +79,10 @@ check-follower-cluster: all
$(pg_regress_multi_check) --load-extension=citus --follower-cluster \
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_follower_schedule $(EXTRA_TESTS)
check-failure: all
$(pg_regress_multi_check) --load-extension=citus --mitmproxy \
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/failure_schedule $(EXTRA_TESTS)
clean distclean maintainer-clean:
rm -f $(output_files) $(input_files)
rm -rf tmp_check/

20
src/test/regress/Pipfile Normal file
View File

@ -0,0 +1,20 @@
[[source]]
name = "pypi"
url = "https://pypi.python.org/simple"
verify_ssl = true
[packages]
mitmproxy = "==3.0.4"
construct = "*"
[dev-packages]
[requires]
python_version = "3.5"

328
src/test/regress/Pipfile.lock generated Normal file
View File

@ -0,0 +1,328 @@
{
"_meta": {
"hash": {
"sha256": "9ca06bec77d0376075fc797a119ae7b47bbb0a78e37b23d09392c4751f86af69"
},
"host-environment-markers": {
"implementation_name": "cpython",
"implementation_version": "3.5.2",
"os_name": "posix",
"platform_machine": "x86_64",
"platform_python_implementation": "CPython",
"platform_release": "4.4.0-127-generic",
"platform_system": "Linux",
"platform_version": "#153-Ubuntu SMP Sat May 19 10:58:46 UTC 2018",
"python_full_version": "3.5.2",
"python_version": "3.5",
"sys_platform": "linux"
},
"pipfile-spec": 6,
"requires": {
"python_version": "3.5"
},
"sources": [
{
"name": "pypi",
"url": "https://pypi.python.org/simple",
"verify_ssl": true
}
]
},
"default": {
"asn1crypto": {
"hashes": [
"sha256:2f1adbb7546ed199e3c90ef23ec95c5cf3585bac7d11fb7eb562a3fe89c64e87",
"sha256:9d5c20441baf0cb60a4ac34cc447c6c189024b6b4c6cd7877034f4965c464e49"
],
"version": "==0.24.0"
},
"blinker": {
"hashes": [
"sha256:471aee25f3992bd325afa3772f1063dbdbbca947a041b8b89466dc00d606f8b6"
],
"version": "==1.4"
},
"brotlipy": {
"hashes": [
"sha256:af65d2699cb9f13b26ec3ba09e75e80d31ff422c03675fcb36ee4dabe588fdc2",
"sha256:50ca336374131cfad20612f26cc43c637ac0bfd2be3361495e99270883b52962",
"sha256:fd1d1c64214af5d90014d82cee5d8141b13d44c92ada7a0c0ec0679c6f15a471",
"sha256:b4c98b0d2c9c7020a524ca5bbff42027db1004c6571f8bc7b747f2b843128e7a",
"sha256:8b39abc3256c978f575df5cd7893153277216474f303e26f0e43ba3d3969ef96",
"sha256:5de6f7d010b7558f72f4b061a07395c5c3fd57f0285c5af7f126a677b976a868",
"sha256:637847560d671657f993313ecc6c6c6666a936b7a925779fd044065c7bc035b9",
"sha256:96bc59ff9b5b5552843dc67999486a220e07a0522dddd3935da05dc194fa485c",
"sha256:091b299bf36dd6ef7a06570dbc98c0f80a504a56c5b797f31934d2ad01ae7d17",
"sha256:0be698678a114addcf87a4b9496c552c68a2c99bf93cf8e08f5738b392e82057",
"sha256:d2c1c724c4ac375feb2110f1af98ecdc0e5a8ea79d068efb5891f621a5b235cb",
"sha256:3a3e56ced8b15fbbd363380344f70f3b438e0fd1fcf27b7526b6172ea950e867",
"sha256:653faef61241bf8bf99d73ca7ec4baa63401ba7b2a2aa88958394869379d67c7",
"sha256:0fa6088a9a87645d43d7e21e32b4a6bf8f7c3939015a50158c10972aa7f425b7",
"sha256:79aaf217072840f3e9a3b641cccc51f7fc23037496bd71e26211856b93f4b4cb",
"sha256:a07647886e24e2fb2d68ca8bf3ada398eb56fd8eac46c733d4d95c64d17f743b",
"sha256:c6cc0036b1304dd0073eec416cb2f6b9e37ac8296afd9e481cac3b1f07f9db25",
"sha256:07194f4768eb62a4f4ea76b6d0df6ade185e24ebd85877c351daa0a069f1111a",
"sha256:7e31f7adcc5851ca06134705fcf3478210da45d35ad75ec181e1ce9ce345bb38",
"sha256:9448227b0df082e574c45c983fa5cd4bda7bfb11ea6b59def0940c1647be0c3c",
"sha256:dc6c5ee0df9732a44d08edab32f8a616b769cc5a4155a12d2d010d248eb3fb07",
"sha256:3c1d5e2cf945a46975bdb11a19257fa057b67591eb232f393d260e7246d9e571",
"sha256:2a80319ae13ea8dd60ecdc4f5ccf6da3ae64787765923256b62c598c5bba4121",
"sha256:2699945a0a992c04fc7dc7fa2f1d0575a2c8b4b769f2874a08e8eae46bef36ae",
"sha256:1ea4e578241504b58f2456a6c69952c88866c794648bdc74baee74839da61d44",
"sha256:2e5c64522364a9ebcdf47c5744a5ddeb3f934742d31e61ebfbbc095460b47162",
"sha256:09ec3e125d16749b31c74f021aba809541b3564e5359f8c265cbae442810b41a",
"sha256:786afc8c9bd67de8d31f46e408a3386331e126829114e4db034f91eacb05396d",
"sha256:36def0b859beaf21910157b4c33eb3b06d8ce459c942102f16988cca6ea164df"
],
"version": "==0.7.0"
},
"certifi": {
"hashes": [
"sha256:9fa520c1bacfb634fa7af20a76bcbd3d5fb390481724c597da32c719a7dca4b0",
"sha256:13e698f54293db9f89122b0581843a782ad0934a4fe0172d2a980ba77fc61bb7"
],
"version": "==2018.4.16"
},
"cffi": {
"hashes": [
"sha256:1b0493c091a1898f1136e3f4f991a784437fac3673780ff9de3bcf46c80b6b50",
"sha256:87f37fe5130574ff76c17cab61e7d2538a16f843bb7bca8ebbc4b12de3078596",
"sha256:1553d1e99f035ace1c0544050622b7bc963374a00c467edafac50ad7bd276aef",
"sha256:151b7eefd035c56b2b2e1eb9963c90c6302dc15fbd8c1c0a83a163ff2c7d7743",
"sha256:edabd457cd23a02965166026fd9bfd196f4324fe6032e866d0f3bd0301cd486f",
"sha256:ba5e697569f84b13640c9e193170e89c13c6244c24400fc57e88724ef610cd31",
"sha256:79f9b6f7c46ae1f8ded75f68cf8ad50e5729ed4d590c74840471fc2823457d04",
"sha256:b0f7d4a3df8f06cf49f9f121bead236e328074de6449866515cea4907bbc63d6",
"sha256:4c91af6e967c2015729d3e69c2e51d92f9898c330d6a851bf8f121236f3defd3",
"sha256:7a33145e04d44ce95bcd71e522b478d282ad0eafaf34fe1ec5bbd73e662f22b6",
"sha256:95d5251e4b5ca00061f9d9f3d6fe537247e145a8524ae9fd30a2f8fbce993b5b",
"sha256:b75110fb114fa366b29a027d0c9be3709579602ae111ff61674d28c93606acca",
"sha256:ae5e35a2c189d397b91034642cb0eab0e346f776ec2eb44a49a459e6615d6e2e",
"sha256:fdf1c1dc5bafc32bc5d08b054f94d659422b05aba244d6be4ddc1c72d9aa70fb",
"sha256:9d1d3e63a4afdc29bd76ce6aa9d58c771cd1599fbba8cf5057e7860b203710dd",
"sha256:be2a9b390f77fd7676d80bc3cdc4f8edb940d8c198ed2d8c0be1319018c778e1",
"sha256:ed01918d545a38998bfa5902c7c00e0fee90e957ce036a4000a88e3fe2264917",
"sha256:857959354ae3a6fa3da6651b966d13b0a8bed6bbc87a0de7b38a549db1d2a359",
"sha256:2ba8a45822b7aee805ab49abfe7eec16b90587f7f26df20c71dd89e45a97076f",
"sha256:a36c5c154f9d42ec176e6e620cb0dd275744aa1d804786a71ac37dc3661a5e95",
"sha256:e55e22ac0a30023426564b1059b035973ec82186ddddbac867078435801c7801",
"sha256:3eb6434197633b7748cea30bf0ba9f66727cdce45117a712b29a443943733257",
"sha256:ecbb7b01409e9b782df5ded849c178a0aa7c906cf8c5a67368047daab282b184",
"sha256:770f3782b31f50b68627e22f91cb182c48c47c02eb405fd689472aa7b7aa16dc",
"sha256:d5d8555d9bfc3f02385c1c37e9f998e2011f0db4f90e250e5bc0c0a85a813085",
"sha256:3c85641778460581c42924384f5e68076d724ceac0f267d66c757f7535069c93",
"sha256:e90f17980e6ab0f3c2f3730e56d1fe9bcba1891eeea58966e89d352492cc74f4"
],
"markers": "platform_python_implementation != 'PyPy'",
"version": "==1.11.5"
},
"click": {
"hashes": [
"sha256:29f99fc6125fbc931b758dc053b3114e55c77a6e4c6c3a2674a2dc986016381d",
"sha256:f15516df478d5a56180fbf80e68f206010e6d160fc39fa508b65e035fd75130b"
],
"version": "==6.7"
},
"construct": {
"hashes": [
"sha256:2271a0efd0798679dea825ff47e22a4c550456a5db0ba8baa82f7eae0af0118c"
],
"version": "==2.9.45"
},
"cryptography": {
"hashes": [
"sha256:abd070b5849ed64e6d349199bef955ee0ad99aefbad792f0c587f8effa681a5e",
"sha256:3f3b65d5a16e6b52fba63dc860b62ca9832f51f1a2ae5083c78b6840275f12dd",
"sha256:77d0ad229d47a6e0272d00f6bf8ac06ce14715a9fd02c9a97f5a2869aab3ccb2",
"sha256:808fe471b1a6b777f026f7dc7bd9a4959da4bfab64972f2bbe91e22527c1c037",
"sha256:6fef51ec447fe9f8351894024e94736862900d3a9aa2961528e602eb65c92bdb",
"sha256:60bda7f12ecb828358be53095fc9c6edda7de8f1ef571f96c00b2363643fa3cd",
"sha256:5cb990056b7cadcca26813311187ad751ea644712022a3976443691168781b6f",
"sha256:c332118647f084c983c6a3e1dba0f3bcb051f69d12baccac68db8d62d177eb8a",
"sha256:f57008eaff597c69cf692c3518f6d4800f0309253bb138b526a37fe9ef0c7471",
"sha256:551a3abfe0c8c6833df4192a63371aa2ff43afd8f570ed345d31f251d78e7e04",
"sha256:db6013746f73bf8edd9c3d1d3f94db635b9422f503db3fc5ef105233d4c011ab",
"sha256:d6f46e862ee36df81e6342c2177ba84e70f722d9dc9c6c394f9f1f434c4a5563",
"sha256:9b62fb4d18529c84b961efd9187fecbb48e89aa1a0f9f4161c61b7fc42a101bd",
"sha256:9e5bed45ec6b4f828866ac6a6bedf08388ffcfa68abe9e94b34bb40977aba531",
"sha256:f6c821ac253c19f2ad4c8691633ae1d1a17f120d5b01ea1d256d7b602bc59887",
"sha256:ba6a774749b6e510cffc2fb98535f717e0e5fd91c7c99a61d223293df79ab351",
"sha256:9fc295bf69130a342e7a19a39d7bbeb15c0bcaabc7382ec33ef3b2b7d18d2f63"
],
"version": "==2.2.2"
},
"h11": {
"hashes": [
"sha256:af77d5d82fa027c032650fb8afdef3cd0a3735ba01480bee908cddad9be1bdce",
"sha256:1c0fbb1cba6f809fe3e6b27f8f6d517ca171f848922708871403636143d530d9"
],
"version": "==0.7.0"
},
"h2": {
"hashes": [
"sha256:4be613e35caad5680dc48f98f3bf4e7338c7c429e6375a5137be7fbe45219981",
"sha256:b2962f883fa392a23cbfcc4ad03c335bcc661be0cf9627657b589f0df2206e64"
],
"version": "==3.0.1"
},
"hpack": {
"hashes": [
"sha256:0edd79eda27a53ba5be2dfabf3b15780928a0dff6eb0c60a3d6767720e970c89",
"sha256:8eec9c1f4bfae3408a3f30500261f7e6a65912dc138526ea054f9ad98892e9d2"
],
"version": "==3.0.0"
},
"hyperframe": {
"hashes": [
"sha256:87567c9eb1540de1e7f48805adf00e87856409342fdebd0cd20cf5d381c38b69",
"sha256:a25944539db36d6a2e47689e7915dcee562b3f8d10c6cdfa0d53c91ed692fb04"
],
"version": "==5.1.0"
},
"idna": {
"hashes": [
"sha256:8c7309c718f94b3a625cb648ace320157ad16ff131ae0af362c9f21b80ef6ec4",
"sha256:2c6a5de3089009e3da7c5dde64a141dbc8551d5b7f6cf4ed7c2568d0cc520a8f"
],
"version": "==2.6"
},
"kaitaistruct": {
"hashes": [
"sha256:d1d17c7f6839b3d28fc22b21295f787974786c2201e8788975e72e2a1d109ff5"
],
"version": "==0.8"
},
"ldap3": {
"hashes": [
"sha256:ed3d5ac156f61ff06df0ef499569a970b544011a3e836296bd731e0da92f10c0",
"sha256:44c900354823218597e71de864c81d40be81b6f5fc6bcc109a2130a89a8f4fc8",
"sha256:7912093b2501a04b7a2fb9042f2504a8664c3543498186c6ef0421cbd2eb7331",
"sha256:d257500ea9b5af0ecca8c319fc3fb9b758f9f5e4b8441032cb681dee026c646a",
"sha256:e8fe0d55a8cecb725748c831ffac2873df94c05b2d7eb867ea167c0500bbc6a8",
"sha256:c4133692ff33e0a96780e6bd40f450545251d3e1786557c61d091eaeb2ef9138"
],
"version": "==2.4.1"
},
"mitmproxy": {
"hashes": [
"sha256:ee2f71bb737e9dd32ca489cf8c12eca4b4dbdc1eeb89062366b2d261b800ad3a"
],
"version": "==3.0.4"
},
"passlib": {
"hashes": [
"sha256:43526aea08fa32c6b6dbbbe9963c4c767285b78147b7437597f992812f69d280",
"sha256:3d948f64138c25633613f303bcc471126eae67c04d5e3f6b7b8ce6242f8653e0"
],
"version": "==1.7.1"
},
"pyasn1": {
"hashes": [
"sha256:9a15cc13ff6bf5ed29ac936ca941400be050dff19630d6cd1df3fb978ef4c5ad",
"sha256:8fb265066eac1d3bb5015c6988981b009ccefd294008ff7973ed5f64335b0f2d",
"sha256:ba77f1e8d7d58abc42bfeddd217b545fdab4c1eeb50fd37c2219810ad56303bf",
"sha256:3651774ca1c9726307560792877db747ba5e8a844ea1a41feb7670b319800ab3",
"sha256:a66dcda18dbf6e4663bde70eb30af3fc4fe1acb2d14c4867a861681887a5f9a2",
"sha256:9334cb427609d2b1e195bb1e251f99636f817d7e3e1dffa150cb3365188fb992",
"sha256:d01fbba900c80b42af5c3fe1a999acf61e27bf0e452e0f1ef4619065e57622da",
"sha256:2f57960dc7a2820ea5a1782b872d974b639aa3b448ac6628d1ecc5d0fe3986f2",
"sha256:602fda674355b4701acd7741b2be5ac188056594bf1eecf690816d944e52905e",
"sha256:cdc8eb2eaafb56de66786afa6809cd9db2df1b3b595dcb25aa5b9dc61189d40a",
"sha256:f281bf11fe204f05859225ec2e9da7a7c140b65deccd8a4eb0bc75d0bd6949e0",
"sha256:fb81622d8f3509f0026b0683fe90fea27be7284d3826a5f2edf97f69151ab0fc"
],
"version": "==0.4.3"
},
"pycparser": {
"hashes": [
"sha256:99a8ca03e29851d96616ad0404b4aad7d9ee16f25c9f9708a11faf2810f7b226"
],
"version": "==2.18"
},
"pyopenssl": {
"hashes": [
"sha256:07a2de1a54de07448732a81e38a55df7da109b2f47f599f8bb35b0cbec69d4bd",
"sha256:2c10cfba46a52c0b0950118981d61e72c1e5b1aac451ca1bc77de1a679456773"
],
"version": "==17.5.0"
},
"pyparsing": {
"hashes": [
"sha256:fee43f17a9c4087e7ed1605bd6df994c6173c1e977d7ade7b651292fab2bd010",
"sha256:0832bcf47acd283788593e7a0f542407bd9550a55a8a8435214a1960e04bcb04",
"sha256:9e8143a3e15c13713506886badd96ca4b579a87fbdf49e550dbfc057d6cb218e",
"sha256:281683241b25fe9b80ec9d66017485f6deff1af5cde372469134b56ca8447a07",
"sha256:b8b3117ed9bdf45e14dcc89345ce638ec7e0e29b2b579fa1ecf32ce45ebac8a5",
"sha256:8f1e18d3fd36c6795bb7e02a39fd05c611ffc2596c1e0d995d34d67630426c18",
"sha256:e4d45427c6e20a59bf4f88c639dcc03ce30d193112047f94012102f235853a58"
],
"version": "==2.2.0"
},
"pyperclip": {
"hashes": [
"sha256:43496f0a1f363a5ecfc4cda5eba6a2a3d5056fe6c7ffb9a99fbb1c5a3c7dea05"
],
"version": "==1.6.2"
},
"ruamel.yaml": {
"hashes": [
"sha256:e4d53f6a0c21d8effc23371927e8569096d0364d7c703b2e6956c6281b6bde2c",
"sha256:4b1929101d09612e0c7a42fbe06b0f929a4a89e1d14832353c1eb073580d3ba6",
"sha256:181699cc08b157ef8a59a77e96a01b5ffa150044ed4e49fd98428ab9ac0e6ed9",
"sha256:b6bc5f434d72a672dbe48471e70771789d5d93603716c9e36963fe1dc7a35718",
"sha256:6932e1ad63c805a41665a94e5d7b70808e9e25943f72afba6d327fede2aeb43d",
"sha256:dc051cd1fe541e321f6846bddba8e2c0de8ca409d51a6d9917c7b970d8d89a3d",
"sha256:656dcd3d30774ffe252e46db96f4cf24b284d42c904b93f9cbe6b234028f7d2e",
"sha256:039bb5b50a2f3b17c969ed1d381e050bca851e3c13fe8c2a9ad18f605ca111a5",
"sha256:f5ef82b8efe378de6abb7042263d6f407b0760ad923ed477fa26007b1fa0e563",
"sha256:cea830caa479ae083f51ffdb55fe430a2763e853a7b06195f203db6d28bf5264",
"sha256:882cacb8af5f7009780da75041ef131d0ec80d9e0b81d3cf8d4b49a0a33fe6ef",
"sha256:1d46053cb7acf0cd6b375e34abfb94f2e97c39269c17eb8b0226fe8a470c4ced",
"sha256:2d1df676ac75fb5e0af7b91f7718a4b4f469a5d8ac4150edecc61f063283bbee",
"sha256:759b485e8cda260bd87b7cdd2ad936a0ec359ee6154a9d856357446792b3faf5",
"sha256:7afefe5dab4381393a2aa7ccb585ffd6080d52e7cd05f1df3788e9d0e4dfcea9",
"sha256:766ee90985c667f77bf34950b1d945624c263ecb82d859961f78effb3355c946",
"sha256:509842d96fb194f79b57483b76429f8956d8f7ade3cb49d1e5aeb5c5e9ef4918"
],
"version": "==0.15.37"
},
"six": {
"hashes": [
"sha256:832dc0e10feb1aa2c68dcc57dbb658f1c7e65b9b61af69048abc87a2db00a0eb",
"sha256:70e8a77beed4562e7f14fe23a786b54f6296e34344c23bc42f07b15018ff98e9"
],
"version": "==1.11.0"
},
"sortedcontainers": {
"hashes": [
"sha256:fa96e9920a37bde76bfdcaca919a125c1d2e581af1137e25de54ee0da7835282",
"sha256:566cf5f8dbada3aed99737a19d98f03d15d76bf2a6c27e4fb0f4a718a99be761"
],
"version": "==1.5.10"
},
"tornado": {
"hashes": [
"sha256:88ce0282cce70df9045e515f578c78f1ebc35dcabe1d70f800c3583ebda7f5f5",
"sha256:ba9fbb249ac5390bff8a1d6aa4b844fd400701069bda7d2e380dfe2217895101",
"sha256:408d129e9d13d3c55aa73f8084aa97d5f90ed84132e38d6932e63a67d5bec563",
"sha256:c050089173c2e9272244bccfb6a8615fb9e53b79420a5551acfa76094ecc3111",
"sha256:1b83d5c10550f2653380b4c77331d6f8850f287c4f67d7ce1e1c639d9222fbc7"
],
"version": "==5.0.2"
},
"urwid": {
"hashes": [
"sha256:644d3e3900867161a2fc9287a9762753d66bd194754679adb26aede559bcccbc"
],
"version": "==2.0.1"
},
"wsproto": {
"hashes": [
"sha256:d2a7f718ab3144ec956a3267d57b5c172f0668827f5803e7d670837b0125b9fa",
"sha256:02f214f6bb43cda62a511e2e8f1d5fa4703ed83d376d18d042bd2bbf2e995824"
],
"version": "==0.11.0"
}
},
"develop": {}
}

View File

@ -0,0 +1,19 @@
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
(1 row)
-- add the workers
SELECT master_add_node('localhost', :worker_1_port); -- the second worker
master_add_node
---------------------------------------------------
(1,1,localhost,57637,default,f,t,primary,default)
(1 row)
SELECT master_add_node('localhost', :worker_2_port + 2); -- the first worker, behind a mitmproxy
master_add_node
---------------------------------------------------
(2,2,localhost,57640,default,f,t,primary,default)
(1 row)

View File

@ -0,0 +1,50 @@
-- By default Citus makes lots of connections in the background which fill up the log
-- By tweaking these settings you can make sure you only capture packets related to what
-- you're doing
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO -1;
ALTER SYSTEM set citus.enable_statistics_collection TO false;
SELECT pg_reload_conf();
pg_reload_conf
----------------
t
(1 row)
-- Add some helper functions for sending commands to mitmproxy
CREATE FUNCTION citus.mitmproxy(text) RETURNS TABLE(result text) AS $$
DECLARE
command ALIAS FOR $1;
BEGIN
CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP;
CREATE TEMPORARY TABLE mitmproxy_result (res text) ON COMMIT DROP;
INSERT INTO mitmproxy_command VALUES (command);
EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo'));
EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo'));
RETURN QUERY SELECT * FROM mitmproxy_result;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION citus.clear_network_traffic() RETURNS void AS $$
BEGIN
PERFORM citus.mitmproxy('recorder.reset()');
RETURN; -- return void
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION citus.dump_network_traffic()
RETURNS TABLE(conn int, source text, message text) AS $$
BEGIN
CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP;
CREATE TEMPORARY TABLE mitmproxy_result (
conn int, source text, message text
) ON COMMIT DROP;
INSERT INTO mitmproxy_command VALUES ('recorder.dump()');
EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo'));
EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo'));
RETURN QUERY SELECT * FROM mitmproxy_result;
END;
$$ LANGUAGE plpgsql;

View File

@ -0,0 +1,5 @@
# import this file (from psql you can use \i) to use mitmproxy manually
test: failure_test_helpers
# this should only be run by pg_regress_multi, you don't need it
test: failure_setup

View File

@ -0,0 +1 @@
__pycache__

View File

@ -0,0 +1,169 @@
Automated Failure testing
=========================
Automated Failure Testing works by inserting a network proxy (mitmproxy) between the
citus coordinator and one of the workers (connections to the other worker are left
unchanged). The proxy is configurable, and sits on a fifo waiting for commands. When it
receives a command over the fifo it reconfigures itself and sends back response.
Regression tests which use automated failure testing communicate with mitmproxy by running
special UDFs which talk to said fifo. The tests send commands such as "fail any connection
which contain the string 'COMMIT'" and then run SQL queries and assert that the
coordinator has reasonable behavior when the specified failures occur.
Contents of this file:
I. Getting Started
II. Running mitmproxy manually
III. citus.mitmproxy() command strings
IV. Recording Network Traffic
# I. Getting Started
First off, to use this you'll need mitmproxy, I recommend version 3.0.4, and I also
recommend running it with python 3.6. This script integrates pretty deeply with mitmproxy
so other versions might fail to work.
I highly recommend using pipenv to install mitmproxy. It lets you easily manage isolated
environments (instead of installing python packages globally). If you've heard of
virtualenv, pipenv is that but much easier to use.
Once you've installed it:
$ cd src/test/regress
$ pipenv --python 3.6
$ pipenv install # there's already a Pipfile.lock in src/test/regress with packages
$ pipenv shell # this enters the virtual environment, putting mitmproxy onto $PATH
That's all you need to do to run the failure tests:
$ make check-failure
# II. Running mitmproxy manually
$ mkfifo /tmp/mitm.fifo # first, you need a fifo
$ cd src/test/regress
$ pipenv shell
$ mitmdump --rawtcp -p 9702 --mode reverse:localhost:9700 -s mitmscripts/fluent.py --set fifo=/tmp/mitm.fifo
The specific port numbers will be different depending on your setup. The above string
means mitmdump will accept connections on port 9702 and forward them to the worker
listening on port 9700.
Now, open psql and run:
# UPDATE pg_dist_node SET nodeport = 9702 WHERE nodeport = 9700;
Again, the specific port numbers depend on your setup.
# \i src/test/regress/sql/failure_test_helpers.sql
The above file creates some UDFs and also disables a few citus features which make
connections in the background.
You also want to tell the UDFs how to talk to mitmproxy (careful, this must be an absolute
path):
# SET citus.mitmfifo = '/tmp/mitm.fifo';
(nb: this GUC does not appear in shared_library_init.c, Postgres allows setting and
reading GUCs which have not been defined by any extension)
You're all ready! If it worked, you should be able to run this:
# SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
(1 row)
# III. citus.mitmproxy() command strings
Command strings specify a pipline. Each connection is handled individually, and the
pipeline is called once for every packet which is sent. For example, given this string:
`conn.onQuery().after(2).kill()` -> kill a connection if three Query packets are seen
- onQuery() is a filter. It only passes Query packets (packets which the frontend sends
to the backend which specify a query which is to be run) onto the next step of the
pipeline.
- after(2) is another filter, it ignores the first two packets which are sent to it, then
sends the following packets to the next step of the pipeline.
- kill() is an action, when a packet reaches it the connection containing that packet will
be killed.
## Actions
There are 5 actions you can take on connections:
conn.allow() - the default, allows all connections to execute unmodified
conn.kill() - kills all connections immediately after the first packet is sent
conn.reset() - kill() calls shutdown(SHUT_WR), shutdown(SHUT_RD), close(). This is a very
graceful way to close the socket. reset() causes a RST packet to be sent
and forces the connection closed in something more resembling an error.
conn.cancel(pid) - This doesn't cause any changes at the network level. Instead it sends
a SIGINT to pid and introduces a short delay, with hopes that the
signal will be received before the delay ends. You can use it to write
cancellation tests.
The previous actions all work on a per-connection basis. Meaning, each connection is
tracked individually. A command such as `conn.onQuery().kill()` will only kill the
connection on which the Query packet was seen. A command such as
`conn.onQuery().after(2).kill()` will never trigger if each Query is sent on a different
connection, even if you send dozens of Query packets.
The final action works a bit differently:
conn.killall() - the killall() command kills this and all subsequent connections. Any
packets sent once it triggers will have their connections killed.
## Filters
conn.onQuery().kill() - kill a connection once a "Query" packet is seen
conn.onCopyData().kill() - kill a connection once a "CopyData" packet is seen
The list of supported packets can be found in ./structs.py, and the list of packets which
could be supported can be found at:
https://www.postgresql.org/docs/current/static/protocol-message-formats.html
You can also inspect the contents of packets:
conn.onQuery(query="COMMIT").kill() - you can look into the actual query which is sent and
match on its contents (this is always a regex)
conn.onQuery(query="^COMMIT").kill() - the query must start with COMMIT
conn.onQuery(query="pg_table_size\(") - you must escape parens, since you're in a regex
after(n) matches after the n-th packet has been sent:
conn.after(2).kill() - Kill connections when the third packet is sent down them
There's also a low-level filter which runs a regex against the raw content of the packet:
conn.matches(b"^Q").kill() - this is another way of writing conn.onQuery(). Note the 'b',
it's always required.
## Chaining:
Filters and actions can be arbitrarily chained:
conn.matches(b"^Q").after(2).kill() - kill any connection when the third Query is sent
# IV. Recording Network Traffic
There are also some special commands. This proxy also records every packet and lets you
inspect them:
recorder.dump() - emits a list of captured packets in COPY text format
recorder.reset() - empties the data structure containing the captured packets
Both of those calls empty the structure containing the packets, a call to dump() will only
return the packets which were captured since the last call to .dump() or reset()
Back when you called `\i sql/failure_test_helpers.sql` you created some UDFs which make
using these strings easier. Here are some commands you can run from psql, or from inside
failure tests:
citus.clear_network_traffic() - this empties the buffer containing captured packets
citus.dump_network_traffic() - this returns a little table and pretty-prints information
on all the packets captured since the last call to
clear_network_traffic() or dump_network_traffic()

View File

@ -0,0 +1,425 @@
import re
import os
import pprint
import signal
import socket
import struct
import threading
import time
import traceback
import queue
from mitmproxy import ctx
from mitmproxy.utils import strutils
from mitmproxy.proxy.protocol import TlsLayer, RawTCPLayer
import structs
# I. Command Strings
class Stop(Exception):
pass
class Handler:
'''
This class hierarchy serves two purposes:
1. Allow command strings to be evaluated. Once evaluated you'll have a Handler you can
pass packets to
2. Process packets as they come in and decide what to do with them.
Subclasses which want to change how packets are handled should override _handle.
'''
def __init__(self, root=None):
# all packets are first sent to the root handler to be processed
self.root = root if root else self
# all handlers keep track of the next handler so they know where to send packets
self.next = None
def _accept(self, flow, message):
result = self._handle(flow, message)
if result == 'pass':
# defer to our child
if not self.next:
raise Exception("we don't know what to do!")
try:
self.next._accept(flow, message)
except Stop:
if self.root is not self:
raise
self.next = KillHandler(self)
flow.kill()
elif result == 'done':
# stop processing this packet, move on to the next one
return
elif result == 'stop':
# from now on kill all connections
raise Stop()
def _handle(self, flow, message):
'''
Handlers can return one of three things:
- "done" tells the parent to stop processing. This performs the default action,
which is to allow the packet to be sent.
- "pass" means to delegate to self.next and do whatever it wants
- "stop" means all processing will stop, and all connections will be killed
'''
# subclasses must implement this
raise NotImplementedError()
class FilterableMixin:
def contains(self, pattern):
self.next = Contains(self.root, pattern)
return self.next
def matches(self, pattern):
self.next = Matches(self.root, pattern)
return self.next
def after(self, times):
self.next = After(self.root, times)
return self.next
def __getattr__(self, attr):
'''
Methods such as .onQuery trigger when a packet with that name is intercepted
Adds support for commands such as:
conn.onQuery(query="COPY")
Returns a function because the above command is resolved in two steps:
conn.onQuery becomes conn.__getattr__("onQuery")
conn.onQuery(query="COPY") becomes conn.__getattr__("onQuery")(query="COPY")
'''
if attr.startswith('on'):
def doit(**kwargs):
self.next = OnPacket(self.root, attr[2:], kwargs)
return self.next
return doit
raise AttributeError
class ActionsMixin:
def kill(self):
self.next = KillHandler(self.root)
return self.next
def allow(self):
self.next = AcceptHandler(self.root)
return self.next
def killall(self):
self.next = KillAllHandler(self.root)
return self.next
def reset(self):
self.next = ResetHandler(self.root)
return self.next
def cancel(self, pid):
self.next = CancelHandler(self.root, pid)
return self.next
class AcceptHandler(Handler):
def __init__(self, root):
super().__init__(root)
def _handle(self, flow, message):
return 'done'
class KillHandler(Handler):
def __init__(self, root):
super().__init__(root)
def _handle(self, flow, message):
flow.kill()
return 'done'
class KillAllHandler(Handler):
def __init__(self, root):
super().__init__(root)
def _handle(self, flow, message):
return 'stop'
class ResetHandler(Handler):
# try to force a RST to be sent, something went very wrong!
def __init__(self, root):
super().__init__(root)
def _handle(self, flow, message):
flow.kill() # tell mitmproxy this connection should be closed
# this is a mitmproxy.connections.ClientConnection(mitmproxy.tcp.BaseHandler)
client_conn = flow.client_conn
# this is a regular socket object
conn = client_conn.connection
# cause linux to send a RST
LINGER_ON, LINGER_TIMEOUT = 1, 0
conn.setsockopt(
socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', LINGER_ON, LINGER_TIMEOUT)
)
conn.close()
# closing the connection isn't ideal, this thread later crashes when mitmproxy
# tries to call conn.shutdown(), but there's nothing else to clean up so that's
# maybe okay
return 'done'
class CancelHandler(Handler):
'Send a SIGINT to the process'
def __init__(self, root, pid):
super().__init__(root)
self.pid = pid
def _handle(self, flow, message):
os.kill(self.pid, signal.SIGINT)
# give the signal a chance to be received before we let the packet through
time.sleep(0.1)
return 'done'
class Contains(Handler, ActionsMixin, FilterableMixin):
def __init__(self, root, pattern):
super().__init__(root)
self.pattern = pattern
def _handle(self, flow, message):
if self.pattern in message.content:
return 'pass'
return 'done'
class Matches(Handler, ActionsMixin, FilterableMixin):
def __init__(self, root, pattern):
super().__init__(root)
self.pattern = re.compile(pattern)
def _handle(self, flow, message):
if self.pattern.search(message.content):
return 'pass'
return 'done'
class After(Handler, ActionsMixin, FilterableMixin):
"Don't pass execution to our child until we've handled 'times' messages"
def __init__(self, root, times):
super().__init__(root)
self.target = times
def _handle(self, flow, message):
if not hasattr(flow, '_after_count'):
flow._after_count = 0
if flow._after_count >= self.target:
return 'pass'
flow._after_count += 1
return 'done'
class OnPacket(Handler, ActionsMixin, FilterableMixin):
'''Triggers when a packet of the specified kind comes around'''
def __init__(self, root, packet_kind, kwargs):
super().__init__(root)
self.packet_kind = packet_kind
self.filters = kwargs
def _handle(self, flow, message):
if not message.parsed:
# if this is the first message in the connection we just skip it
return 'done'
for msg in message.parsed:
typ = structs.message_type(msg, from_frontend=message.from_client)
if typ == self.packet_kind:
matches = structs.message_matches(msg, self.filters, message.from_client)
if matches:
return 'pass'
return 'done'
class RootHandler(Handler, ActionsMixin, FilterableMixin):
def _handle(self, flow, message):
# do whatever the next Handler tells us to do
return 'pass'
class RecorderCommand:
def __init__(self):
self.root = self
self.command = None
def dump(self):
# When the user calls dump() we return everything we've captured
self.command = 'dump'
return self
def reset(self):
# If the user calls reset() we dump all captured packets without returning them
self.command = 'reset'
return self
# II. Utilities for interfacing with mitmproxy
def build_handler(spec):
'Turns a command string into a RootHandler ready to accept packets'
root = RootHandler()
recorder = RecorderCommand()
handler = eval(spec, {'__builtins__': {}}, {'conn': root, 'recorder': recorder})
return handler.root
# a bunch of globals
handler = None # the current handler used to process packets
command_thread = None # sits on the fifo and waits for new commands to come in
command_queue = queue.Queue() # we poll this from the main thread and apply commands
response_queue = queue.Queue() # the main thread uses this to reply to command_thread
captured_messages = queue.Queue() # where we store messages used for recorder.dump()
connection_count = 0 # so we can give connections ids in recorder.dump()
def listen_for_commands(fifoname):
def emit_row(conn, from_client, message):
# we're using the COPY text format. It requires us to escape backslashes
cleaned = message.replace('\\', '\\\\')
source = 'coordinator' if from_client else 'worker'
return '{}\t{}\t{}'.format(conn, source, cleaned)
def emit_message(message):
if message.is_initial:
return emit_row(
message.connection_id, message.from_client, '[initial message]'
)
pretty = structs.print(message.parsed)
return emit_row(message.connection_id, message.from_client, pretty)
def handle_recorder(recorder):
global connection_count
result = ''
if recorder.command is 'reset':
result = ''
connection_count = 0
elif recorder.command is not 'dump':
# this should never happen
raise Exception('Unrecognized command: {}'.format(recorder.command))
try:
results = []
while True:
message = captured_messages.get(block=False)
if recorder.command is 'reset':
continue
results.append(emit_message(message))
except queue.Empty:
pass
result = '\n'.join(results)
with open(fifoname, mode='w') as fifo:
fifo.write('{}'.format(result))
while True:
with open(fifoname, mode='r') as fifo:
slug = fifo.read()
try:
handler = build_handler(slug)
if isinstance(handler, RecorderCommand):
handle_recorder(handler)
continue
except Exception as e:
traceback.print_exc()
result = str(e)
else:
result = None
if not result:
command_queue.put(slug)
result = response_queue.get()
with open(fifoname, mode='w') as fifo:
fifo.write('{}\n'.format(result))
def create_thread(fifoname):
global command_thread
if not fifoname:
return
if not len(fifoname):
return
if command_thread:
print('cannot change the fifo path once mitmproxy has started');
return
command_thread = threading.Thread(target=listen_for_commands, args=(fifoname,), daemon=True)
command_thread.start()
# III. mitmproxy callbacks
def load(loader):
loader.add_option('slug', str, 'conn.allow()', "A script to run")
loader.add_option('fifo', str, '', "Which fifo to listen on for commands")
def tick():
# we do this crazy dance because ctx isn't threadsafe, it is only useable while a
# callback (such as this one) is being called.
try:
slug = command_queue.get_nowait()
except queue.Empty:
return
try:
ctx.options.update(slug=slug)
except Exception as e:
response_queue.put(str(e))
else:
response_queue.put('')
def configure(updated):
global handler
if 'slug' in updated:
text = ctx.options.slug
handler = build_handler(text)
if 'fifo' in updated:
fifoname = ctx.options.fifo
create_thread(fifoname)
def next_layer(layer):
'''
mitmproxy wasn't really meant for intercepting raw tcp streams, it tries to wrap the
upsteam connection (the one to the worker) in a tls stream. This hook intercepts the
part where it creates the TlsLayer (it happens in root_context.py) and instead creates
a RawTCPLayer. That's the layer which calls our tcp_message hook
'''
if isinstance(layer, TlsLayer):
replacement = RawTCPLayer(layer.ctx)
layer.reply.send(replacement)
def tcp_message(flow):
'''
This callback is hit every time mitmproxy receives a packet. It's the main entrypoint
into this script.
'''
global connection_count
tcp_msg = flow.messages[-1]
# Keep track of all the different connections, assign a unique id to each
if not hasattr(flow, 'connection_id'):
flow.connection_id = connection_count
connection_count += 1 # this is not thread safe but I think that's fine
tcp_msg.connection_id = flow.connection_id
# The first packet the frontend sends shounld be parsed differently
tcp_msg.is_initial = len(flow.messages) == 1
if tcp_msg.is_initial:
# skip parsing initial messages for now, they're not important
tcp_msg.parsed = None
else:
tcp_msg.parsed = structs.parse(tcp_msg.content, from_frontend=tcp_msg.from_client)
# record the message, for debugging purposes
captured_messages.put(tcp_msg)
# okay, finally, give the packet to the command the user wants us to use
handler._accept(flow, tcp_msg)

View File

@ -0,0 +1,410 @@
from construct import (
Struct,
Int8ub, Int16ub, Int32ub, Int16sb, Int32sb,
Bytes, CString, Computed, Switch, Seek, this, Pointer,
GreedyRange, Enum, Byte, Probe, FixedSized, RestreamData, GreedyBytes, Array
)
import construct.lib as cl
import re
class MessageMeta(type):
def __init__(cls, name, bases, namespace):
'''
__init__ is called every time a subclass of MessageMeta is declared
'''
if not hasattr(cls, "_msgtypes"):
raise Exception("classes which use MessageMeta must have a '_msgtypes' field")
if not hasattr(cls, "_classes"):
raise Exception("classes which use MessageMeta must have a '_classes' field")
if not hasattr(cls, "struct"):
# This is one of the direct subclasses
return
if cls.__name__ in cls._classes:
raise Exception("You've already made a class called {}".format( cls.__name__))
cls._classes[cls.__name__] = cls
# add a _type field to the struct so we can identify it while printing structs
cls.struct = cls.struct + ("_type" / Computed(name))
if not hasattr(cls, "key"):
return
# register the type, so we can tell the parser about it
key = cls.key
if key in cls._msgtypes:
raise Exception('key {} is already assigned to {}'.format(
key, cls._msgtypes[key].__name__)
)
cls._msgtypes[key] = cls
class Message:
'Do not subclass this object directly. Instead, subclass of one of the below types'
def print(message):
'Define this on subclasses you want to change the representation of'
raise NotImplementedError
def typeof(message):
'Define this on subclasses you want to change the expressed type of'
return message._type
@classmethod
def _default_print(cls, name, msg):
recur = cls.print_message
return "{}({})".format(name, ",".join(
"{}={}".format(key, recur(value)) for key, value in msg.items()
if not key.startswith('_')
))
@classmethod
def find_typeof(cls, msg):
if not hasattr(cls, "_msgtypes"):
raise Exception('Do not call this method on Message, call it on a subclass')
if isinstance(msg, cl.ListContainer):
return ValueError("do not call this on a list of messages")
if not isinstance(msg, cl.Container):
return ValueError("must call this on a parsed message")
if not hasattr(msg, "_type"):
return "Anonymous"
if msg._type and msg._type not in cls._classes:
return msg._type
return cls._classes[msg._type].typeof(msg)
@classmethod
def print_message(cls, msg):
if not hasattr(cls, "_msgtypes"):
raise Exception('Do not call this method on Message, call it on a subclass')
if isinstance(msg, cl.ListContainer):
return repr([cls.print_message(message) for message in msg])
if not isinstance(msg, cl.Container):
return msg
if not hasattr(msg, "_type"):
return cls._default_print("Anonymous", msg)
if msg._type and msg._type not in cls._classes:
return cls._default_print(msg._type, msg)
try:
return cls._classes[msg._type].print(msg)
except NotImplementedError:
return cls._default_print(msg._type, msg)
@classmethod
def name_to_struct(cls):
return {
_class.__name__: _class.struct
for _class in cls._msgtypes.values()
}
@classmethod
def name_to_key(cls):
return {
_class.__name__ : ord(key)
for key, _class in cls._msgtypes.items()
}
class SharedMessage(Message, metaclass=MessageMeta):
'A message which could be sent by either the frontend or the backend'
_msgtypes = dict()
_classes = dict()
class FrontendMessage(Message, metaclass=MessageMeta):
'A message which will only be sent be a backend'
_msgtypes = dict()
_classes = dict()
class BackendMessage(Message, metaclass=MessageMeta):
'A message which will only be sent be a frontend'
_msgtypes = dict()
_classes = dict()
class Query(FrontendMessage):
key = 'Q'
struct = Struct(
"query" / CString("ascii")
)
@staticmethod
def print(message):
query = message.query
query = Query.normalize_shards(query)
query = Query.normalize_timestamps(query)
query = Query.normalize_assign_txn_id(query)
return "Query(query={})".format(query)
@staticmethod
def normalize_shards(content):
'''
For example:
>>> normalize_shards(
>>> 'COPY public.copy_test_120340 (key, value) FROM STDIN WITH (FORMAT BINARY))'
>>> )
'COPY public.copy_test_XXXXXX (key, value) FROM STDIN WITH (FORMAT BINARY))'
'''
result = content
pattern = re.compile('public\.[a-z_]+(?P<shardid>[0-9]+)')
for match in pattern.finditer(content):
span = match.span('shardid')
replacement = 'X'*( span[1] - span[0] )
result = result[:span[0]] + replacement + result[span[1]:]
return result
@staticmethod
def normalize_timestamps(content):
'''
For example:
>>> normalize_timestamps('2018-06-07 05:18:19.388992-07')
'XXXX-XX-XX XX:XX:XX.XXXXXX-XX'
>>> normalize_timestamps('2018-06-11 05:30:43.01382-07')
'XXXX-XX-XX XX:XX:XX.XXXXXX-XX'
'''
pattern = re.compile(
'[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{2,6}-[0-9]{2}'
)
return re.sub(pattern, 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX', content)
@staticmethod
def normalize_assign_txn_id(content):
'''
For example:
>>> normalize_assign_txn_id('SELECT assign_distributed_transaction_id(0, 52, ...')
'SELECT assign_distributed_transaction_id(0, XX, ...'
'''
pattern = re.compile(
'assign_distributed_transaction_id\s*\(' # a method call
'\s*[0-9]+\s*,' # an integer first parameter
'\s*(?P<transaction_id>[0-9]+)' # an integer second parameter
)
result = content
for match in pattern.finditer(content):
span = match.span('transaction_id')
result = result[:span[0]] + 'XX' + result[span[1]:]
return result
class Terminate(FrontendMessage):
key = 'X'
struct = Struct()
class CopyData(SharedMessage):
key = 'd'
struct = Struct(
'data' / GreedyBytes # reads all of the data left in this substream
)
class CopyDone(SharedMessage):
key = 'c'
struct = Struct()
class EmptyQueryResponse(BackendMessage):
key = 'I'
struct = Struct()
class CopyOutResponse(BackendMessage):
key = 'H'
struct = Struct(
"format" / Int8ub,
"columncount" / Int16ub,
"columns" / Array(this.columncount, Struct(
"format" / Int16ub
))
)
class ReadyForQuery(BackendMessage):
key='Z'
struct = Struct("state"/Enum(Byte,
idle=ord('I'),
in_transaction_block=ord('T'),
in_failed_transaction_block=ord('E')
))
class CommandComplete(BackendMessage):
key = 'C'
struct = Struct(
"command" / CString("ascii")
)
class RowDescription(BackendMessage):
key = 'T'
struct = Struct(
"fieldcount" / Int16ub,
"fields" / Array(this.fieldcount, Struct(
"_type" / Computed("F"),
"name" / CString("ascii"),
"tableoid" / Int32ub,
"colattrnum" / Int16ub,
"typoid" / Int32ub,
"typlen" / Int16sb,
"typmod" / Int32sb,
"format_code" / Int16ub,
))
)
class DataRow(BackendMessage):
key = 'D'
struct = Struct(
"_type" / Computed("data_row"),
"columncount" / Int16ub,
"columns" / Array(this.columncount, Struct(
"_type" / Computed("C"),
"length" / Int16sb,
"value" / Bytes(this.length)
))
)
class AuthenticationOk(BackendMessage):
key = 'R'
struct = Struct()
class ParameterStatus(BackendMessage):
key = 'S'
struct = Struct(
"name" / CString("ASCII"),
"value" / CString("ASCII"),
)
def print(message):
name, value = ParameterStatus.normalize(message.name, message.value)
return "ParameterStatus({}={})".format(name, value)
@staticmethod
def normalize(name, value):
if name in ('TimeZone', 'server_version'):
value = 'XXX'
return (name, value)
class BackendKeyData(BackendMessage):
key = 'K'
struct = Struct(
"pid" / Int32ub,
"key" / Bytes(4)
)
def print(message):
# Both of these should be censored, for reproducible regression test output
return "BackendKeyData(XXX)"
frontend_switch = Switch(
this.type,
{ **FrontendMessage.name_to_struct(), **SharedMessage.name_to_struct() },
default=Bytes(this.length - 4)
)
backend_switch = Switch(
this.type,
{**BackendMessage.name_to_struct(), **SharedMessage.name_to_struct()},
default=Bytes(this.length - 4)
)
frontend_msgtypes = Enum(Byte, **{
**FrontendMessage.name_to_key(),
**SharedMessage.name_to_key()
})
backend_msgtypes = Enum(Byte, **{
**BackendMessage.name_to_key(),
**SharedMessage.name_to_key()
})
# It might seem a little circuitous to say a frontend message is a kind of frontend
# message but this lets us easily customize how they're printed
class Frontend(FrontendMessage):
struct = Struct(
"type" / frontend_msgtypes,
"length" / Int32ub, # "32-bit unsigned big-endian"
"raw_body" / Bytes(this.length - 4),
# try to parse the body into something more structured than raw bytes
"body" / RestreamData(this.raw_body, frontend_switch),
)
def print(message):
if isinstance(message.body, bytes):
return "Frontend(type={},body={})".format(
chr(message.type), message.body
)
return FrontendMessage.print_message(message.body)
def typeof(message):
if isinstance(message.body, bytes):
return "Unknown"
return message.body._type
class Backend(BackendMessage):
struct = Struct(
"type" / backend_msgtypes,
"length" / Int32ub, # "32-bit unsigned big-endian"
"raw_body" / Bytes(this.length - 4),
# try to parse the body into something more structured than raw bytes
"body" / RestreamData(this.raw_body, backend_switch),
)
def print(message):
if isinstance(message.body, bytes):
return "Backend(type={},body={})".format(
chr(message.type), message.body
)
return BackendMessage.print_message(message.body)
def typeof(message):
if isinstance(message.body, bytes):
return "Unknown"
return message.body._type
# GreedyRange keeps reading messages until we hit EOF
frontend_messages = GreedyRange(Frontend.struct)
backend_messages = GreedyRange(Backend.struct)
def parse(message, from_frontend=True):
if from_frontend:
message = frontend_messages.parse(message)
else:
message = backend_messages.parse(message)
message.from_frontend = from_frontend
return message
def print(message):
if message.from_frontend:
return FrontendMessage.print_message(message)
return BackendMessage.print_message(message)
def message_type(message, from_frontend):
if from_frontend:
return FrontendMessage.find_typeof(message)
return BackendMessage.find_typeof(message)
def message_matches(message, filters, from_frontend):
'''
Message is something like Backend(Query)) and fiters is something like query="COPY".
For now we only support strings, and treat them like a regex, which is matched against
the content of the wrapped message
'''
if message._type != 'Backend' and message._type != 'Frontend':
raise ValueError("can't handle {}".format(message._type))
wrapped = message.body
if isinstance(wrapped, bytes):
# we don't know which kind of message this is, so we can't match against it
return False
for key, value in filters.items():
if not isinstance(value, str):
raise ValueError("don't yet know how to handle {}".format(type(value)))
actual = getattr(wrapped, key)
if not re.search(value, actual):
return False
return True

View File

@ -19,6 +19,8 @@ use Getopt::Long;
use File::Spec::Functions;
use File::Path qw(make_path remove_tree);
use Config;
use POSIX qw( WNOHANG mkfifo );
use Cwd 'abs_path';
sub Usage()
{
@ -42,6 +44,7 @@ sub Usage()
print " --valgrind-log-file Path to the write valgrind logs\n";
print " --pg_ctl-timeout Timeout for pg_ctl\n";
print " --connection-timeout Timeout for connecting to worker nodes\n";
print " --mitmproxy Start a mitmproxy for one of the workers\n";
exit 1;
}
@ -67,9 +70,12 @@ my $valgrindPath = "valgrind";
my $valgrindLogFile = "valgrind_test_log.txt";
my $pgCtlTimeout = undef;
my $connectionTimeout = 5000;
my $useMitmproxy = 0;
my $mitmFifoPath = catfile("tmp_check", "mitmproxy.fifo");
my $serversAreShutdown = "TRUE";
my $usingWindows = 0;
my $mitmPid = 0;
if ($Config{osname} eq "MSWin32")
{
@ -93,6 +99,7 @@ GetOptions(
'valgrind-log-file=s' => \$valgrindLogFile,
'pg_ctl-timeout=s' => \$pgCtlTimeout,
'connection-timeout=s' => \$connectionTimeout,
'mitmproxy' => \$useMitmproxy,
'help' => sub { Usage() });
# Update environment to include [DY]LD_LIBRARY_PATH/LIBDIR/etc -
@ -181,6 +188,11 @@ are present.
MESSAGE
}
if ($useMitmproxy)
{
system("mitmdump --version") == 0 or die "make sure mitmdump is on PATH";
}
# If pgCtlTimeout is defined, we will set related environment variable.
# This is generally used with valgrind because valgrind starts slow and we
# need to increase timeout.
@ -292,6 +304,23 @@ push(@pgOptions, '-c', "citus.remote_task_check_interval=1ms");
push(@pgOptions, '-c', "citus.shard_replication_factor=2");
push(@pgOptions, '-c', "citus.node_connection_timeout=${connectionTimeout}");
if ($useMitmproxy)
{
# make tests reproducible by never trying to negotiate ssl
push(@pgOptions, '-c', "citus.node_conninfo=sslmode=disable");
}
if ($useMitmproxy)
{
if (! -e "tmp_check")
{
make_path("tmp_check") or die 'could not create tmp_check directory';
}
my $absoluteFifoPath = abs_path($mitmFifoPath);
die 'abs_path returned empty string' unless ($absoluteFifoPath ne "");
push(@pgOptions, '-c', "citus.mitmfifo=$absoluteFifoPath");
}
if ($followercluster)
{
push(@pgOptions, '-c', "max_wal_senders=10");
@ -507,10 +536,48 @@ sub ShutdownServers()
or warn "Could not shutdown worker server";
}
}
if ($mitmPid != 0)
{
# '-' means signal the process group, 2 is SIGINT
kill(-2, $mitmPid) or warn "could not interrupt mitmdump";
}
$serversAreShutdown = "TRUE";
}
}
if ($useMitmproxy)
{
if (! -e $mitmFifoPath)
{
mkfifo($mitmFifoPath, 0777) or die "could not create fifo";
}
if (! -p $mitmFifoPath)
{
die "a file already exists at $mitmFifoPath, delete it before trying again";
}
my $childPid = fork();
die("Failed to fork\n")
unless (defined $childPid);
die("No child process\n")
if ($childPid < 0);
$mitmPid = $childPid;
if ($mitmPid eq 0) {
setpgrp(0,0); # we're about to spawn both a shell and a mitmdump, kill them as a group
exec("mitmdump --rawtcp -p 57640 --mode reverse:localhost:57638 -s mitmscripts/fluent.py --set fifo=$mitmFifoPath --set flow_detail=0 --set termlog_verbosity=warn >proxy.output 2>&1");
die 'could not start mitmdump';
}
}
$SIG{CHLD} = sub {
while ((my $waitpid = waitpid(-1, WNOHANG)) > 0) {}
}; # If, for some reason, mitmproxy dies before we do
# Set signals to shutdown servers
$SIG{INT} = \&ShutdownServers;
$SIG{QUIT} = \&ShutdownServers;
@ -538,6 +605,7 @@ if ($valgrind)
replace_postgres();
}
# Signal that servers should be shutdown
$serversAreShutdown = "FALSE";
@ -672,7 +740,6 @@ my @arguments = (
"--host", $host,
'--port', $masterPort,
'--user', $user,
# '--bindir', 'C:\Users\Administrator\Downloads\pg-64\bin',
'--bindir', catfile("tmp_check", "tmp-bin")
);

View File

@ -0,0 +1,5 @@
SELECT citus.mitmproxy('conn.allow()');
-- add the workers
SELECT master_add_node('localhost', :worker_1_port); -- the second worker
SELECT master_add_node('localhost', :worker_2_port + 2); -- the first worker, behind a mitmproxy

View File

@ -0,0 +1,49 @@
-- By default Citus makes lots of connections in the background which fill up the log
-- By tweaking these settings you can make sure you only capture packets related to what
-- you're doing
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO -1;
ALTER SYSTEM set citus.enable_statistics_collection TO false;
SELECT pg_reload_conf();
-- Add some helper functions for sending commands to mitmproxy
CREATE FUNCTION citus.mitmproxy(text) RETURNS TABLE(result text) AS $$
DECLARE
command ALIAS FOR $1;
BEGIN
CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP;
CREATE TEMPORARY TABLE mitmproxy_result (res text) ON COMMIT DROP;
INSERT INTO mitmproxy_command VALUES (command);
EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo'));
EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo'));
RETURN QUERY SELECT * FROM mitmproxy_result;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION citus.clear_network_traffic() RETURNS void AS $$
BEGIN
PERFORM citus.mitmproxy('recorder.reset()');
RETURN; -- return void
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION citus.dump_network_traffic()
RETURNS TABLE(conn int, source text, message text) AS $$
BEGIN
CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP;
CREATE TEMPORARY TABLE mitmproxy_result (
conn int, source text, message text
) ON COMMIT DROP;
INSERT INTO mitmproxy_command VALUES ('recorder.dump()');
EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo'));
EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo'));
RETURN QUERY SELECT * FROM mitmproxy_result;
END;
$$ LANGUAGE plpgsql;