diff --git a/.travis.yml b/.travis.yml index a9d788c2b..7c8ced9d8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,8 @@ sudo: required dist: trusty language: c +python: + - "3.5" cache: apt: true directories: @@ -27,10 +29,19 @@ before_install: - setup_apt - curl https://install.citusdata.com/community/deb.sh | sudo bash - nuke_pg + - pyenv versions + - pyenv global 3.6 + - sudo apt-get install python3-pip + - sudo pip3 install --upgrade pip + - python --version + - python3 --version install: - install_uncrustify - install_pg - install_custom_pg + - pip3 install --user mitmproxy==3.0.4 + - pip3 install --user construct==2.9.45 + - mitmproxy --version # download and install HLL manually, as custom builds won't satisfy deps # only install if performing non-11 build - | diff --git a/src/test/regress/Makefile b/src/test/regress/Makefile index 766451ec9..5c83ee8a0 100644 --- a/src/test/regress/Makefile +++ b/src/test/regress/Makefile @@ -37,7 +37,7 @@ output_files := $(patsubst $(citus_abs_srcdir)/output/%.source,expected/%.out, $ # intermediate, for muscle memory backward compatibility. check: check-full # check-full triggers all tests that ought to be run routinely -check-full: check-multi check-multi-mx check-multi-task-tracker-extra check-worker check-follower-cluster +check-full: check-multi check-multi-mx check-multi-task-tracker-extra check-worker check-follower-cluster check-failure # using pg_regress_multi_check unnecessarily starts up multiple nodes, which isn't needed # for check-worker. But that's harmless besides a few cycles. @@ -79,6 +79,10 @@ check-follower-cluster: all $(pg_regress_multi_check) --load-extension=citus --follower-cluster \ -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_follower_schedule $(EXTRA_TESTS) +check-failure: all + $(pg_regress_multi_check) --load-extension=citus --mitmproxy \ + -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/failure_schedule $(EXTRA_TESTS) + clean distclean maintainer-clean: rm -f $(output_files) $(input_files) rm -rf tmp_check/ diff --git a/src/test/regress/Pipfile b/src/test/regress/Pipfile new file mode 100644 index 000000000..fd2f9b26b --- /dev/null +++ b/src/test/regress/Pipfile @@ -0,0 +1,20 @@ +[[source]] + +name = "pypi" +url = "https://pypi.python.org/simple" +verify_ssl = true + + +[packages] + +mitmproxy = "==3.0.4" +construct = "*" + + +[dev-packages] + + + +[requires] + +python_version = "3.5" diff --git a/src/test/regress/Pipfile.lock b/src/test/regress/Pipfile.lock new file mode 100644 index 000000000..4be380e65 --- /dev/null +++ b/src/test/regress/Pipfile.lock @@ -0,0 +1,328 @@ +{ + "_meta": { + "hash": { + "sha256": "9ca06bec77d0376075fc797a119ae7b47bbb0a78e37b23d09392c4751f86af69" + }, + "host-environment-markers": { + "implementation_name": "cpython", + "implementation_version": "3.5.2", + "os_name": "posix", + "platform_machine": "x86_64", + "platform_python_implementation": "CPython", + "platform_release": "4.4.0-127-generic", + "platform_system": "Linux", + "platform_version": "#153-Ubuntu SMP Sat May 19 10:58:46 UTC 2018", + "python_full_version": "3.5.2", + "python_version": "3.5", + "sys_platform": "linux" + }, + "pipfile-spec": 6, + "requires": { + "python_version": "3.5" + }, + "sources": [ + { + "name": "pypi", + "url": "https://pypi.python.org/simple", + "verify_ssl": true + } + ] + }, + "default": { + "asn1crypto": { + "hashes": [ + "sha256:2f1adbb7546ed199e3c90ef23ec95c5cf3585bac7d11fb7eb562a3fe89c64e87", + "sha256:9d5c20441baf0cb60a4ac34cc447c6c189024b6b4c6cd7877034f4965c464e49" + ], + "version": "==0.24.0" + }, + "blinker": { + "hashes": [ + "sha256:471aee25f3992bd325afa3772f1063dbdbbca947a041b8b89466dc00d606f8b6" + ], + "version": "==1.4" + }, + "brotlipy": { + "hashes": [ + "sha256:af65d2699cb9f13b26ec3ba09e75e80d31ff422c03675fcb36ee4dabe588fdc2", + "sha256:50ca336374131cfad20612f26cc43c637ac0bfd2be3361495e99270883b52962", + "sha256:fd1d1c64214af5d90014d82cee5d8141b13d44c92ada7a0c0ec0679c6f15a471", + "sha256:b4c98b0d2c9c7020a524ca5bbff42027db1004c6571f8bc7b747f2b843128e7a", + "sha256:8b39abc3256c978f575df5cd7893153277216474f303e26f0e43ba3d3969ef96", + "sha256:5de6f7d010b7558f72f4b061a07395c5c3fd57f0285c5af7f126a677b976a868", + "sha256:637847560d671657f993313ecc6c6c6666a936b7a925779fd044065c7bc035b9", + "sha256:96bc59ff9b5b5552843dc67999486a220e07a0522dddd3935da05dc194fa485c", + "sha256:091b299bf36dd6ef7a06570dbc98c0f80a504a56c5b797f31934d2ad01ae7d17", + "sha256:0be698678a114addcf87a4b9496c552c68a2c99bf93cf8e08f5738b392e82057", + "sha256:d2c1c724c4ac375feb2110f1af98ecdc0e5a8ea79d068efb5891f621a5b235cb", + "sha256:3a3e56ced8b15fbbd363380344f70f3b438e0fd1fcf27b7526b6172ea950e867", + "sha256:653faef61241bf8bf99d73ca7ec4baa63401ba7b2a2aa88958394869379d67c7", + "sha256:0fa6088a9a87645d43d7e21e32b4a6bf8f7c3939015a50158c10972aa7f425b7", + "sha256:79aaf217072840f3e9a3b641cccc51f7fc23037496bd71e26211856b93f4b4cb", + "sha256:a07647886e24e2fb2d68ca8bf3ada398eb56fd8eac46c733d4d95c64d17f743b", + "sha256:c6cc0036b1304dd0073eec416cb2f6b9e37ac8296afd9e481cac3b1f07f9db25", + "sha256:07194f4768eb62a4f4ea76b6d0df6ade185e24ebd85877c351daa0a069f1111a", + "sha256:7e31f7adcc5851ca06134705fcf3478210da45d35ad75ec181e1ce9ce345bb38", + "sha256:9448227b0df082e574c45c983fa5cd4bda7bfb11ea6b59def0940c1647be0c3c", + "sha256:dc6c5ee0df9732a44d08edab32f8a616b769cc5a4155a12d2d010d248eb3fb07", + "sha256:3c1d5e2cf945a46975bdb11a19257fa057b67591eb232f393d260e7246d9e571", + "sha256:2a80319ae13ea8dd60ecdc4f5ccf6da3ae64787765923256b62c598c5bba4121", + "sha256:2699945a0a992c04fc7dc7fa2f1d0575a2c8b4b769f2874a08e8eae46bef36ae", + "sha256:1ea4e578241504b58f2456a6c69952c88866c794648bdc74baee74839da61d44", + "sha256:2e5c64522364a9ebcdf47c5744a5ddeb3f934742d31e61ebfbbc095460b47162", + "sha256:09ec3e125d16749b31c74f021aba809541b3564e5359f8c265cbae442810b41a", + "sha256:786afc8c9bd67de8d31f46e408a3386331e126829114e4db034f91eacb05396d", + "sha256:36def0b859beaf21910157b4c33eb3b06d8ce459c942102f16988cca6ea164df" + ], + "version": "==0.7.0" + }, + "certifi": { + "hashes": [ + "sha256:9fa520c1bacfb634fa7af20a76bcbd3d5fb390481724c597da32c719a7dca4b0", + "sha256:13e698f54293db9f89122b0581843a782ad0934a4fe0172d2a980ba77fc61bb7" + ], + "version": "==2018.4.16" + }, + "cffi": { + "hashes": [ + "sha256:1b0493c091a1898f1136e3f4f991a784437fac3673780ff9de3bcf46c80b6b50", + "sha256:87f37fe5130574ff76c17cab61e7d2538a16f843bb7bca8ebbc4b12de3078596", + "sha256:1553d1e99f035ace1c0544050622b7bc963374a00c467edafac50ad7bd276aef", + "sha256:151b7eefd035c56b2b2e1eb9963c90c6302dc15fbd8c1c0a83a163ff2c7d7743", + "sha256:edabd457cd23a02965166026fd9bfd196f4324fe6032e866d0f3bd0301cd486f", + "sha256:ba5e697569f84b13640c9e193170e89c13c6244c24400fc57e88724ef610cd31", + "sha256:79f9b6f7c46ae1f8ded75f68cf8ad50e5729ed4d590c74840471fc2823457d04", + "sha256:b0f7d4a3df8f06cf49f9f121bead236e328074de6449866515cea4907bbc63d6", + "sha256:4c91af6e967c2015729d3e69c2e51d92f9898c330d6a851bf8f121236f3defd3", + "sha256:7a33145e04d44ce95bcd71e522b478d282ad0eafaf34fe1ec5bbd73e662f22b6", + "sha256:95d5251e4b5ca00061f9d9f3d6fe537247e145a8524ae9fd30a2f8fbce993b5b", + "sha256:b75110fb114fa366b29a027d0c9be3709579602ae111ff61674d28c93606acca", + "sha256:ae5e35a2c189d397b91034642cb0eab0e346f776ec2eb44a49a459e6615d6e2e", + "sha256:fdf1c1dc5bafc32bc5d08b054f94d659422b05aba244d6be4ddc1c72d9aa70fb", + "sha256:9d1d3e63a4afdc29bd76ce6aa9d58c771cd1599fbba8cf5057e7860b203710dd", + "sha256:be2a9b390f77fd7676d80bc3cdc4f8edb940d8c198ed2d8c0be1319018c778e1", + "sha256:ed01918d545a38998bfa5902c7c00e0fee90e957ce036a4000a88e3fe2264917", + "sha256:857959354ae3a6fa3da6651b966d13b0a8bed6bbc87a0de7b38a549db1d2a359", + "sha256:2ba8a45822b7aee805ab49abfe7eec16b90587f7f26df20c71dd89e45a97076f", + "sha256:a36c5c154f9d42ec176e6e620cb0dd275744aa1d804786a71ac37dc3661a5e95", + "sha256:e55e22ac0a30023426564b1059b035973ec82186ddddbac867078435801c7801", + "sha256:3eb6434197633b7748cea30bf0ba9f66727cdce45117a712b29a443943733257", + "sha256:ecbb7b01409e9b782df5ded849c178a0aa7c906cf8c5a67368047daab282b184", + "sha256:770f3782b31f50b68627e22f91cb182c48c47c02eb405fd689472aa7b7aa16dc", + "sha256:d5d8555d9bfc3f02385c1c37e9f998e2011f0db4f90e250e5bc0c0a85a813085", + "sha256:3c85641778460581c42924384f5e68076d724ceac0f267d66c757f7535069c93", + "sha256:e90f17980e6ab0f3c2f3730e56d1fe9bcba1891eeea58966e89d352492cc74f4" + ], + "markers": "platform_python_implementation != 'PyPy'", + "version": "==1.11.5" + }, + "click": { + "hashes": [ + "sha256:29f99fc6125fbc931b758dc053b3114e55c77a6e4c6c3a2674a2dc986016381d", + "sha256:f15516df478d5a56180fbf80e68f206010e6d160fc39fa508b65e035fd75130b" + ], + "version": "==6.7" + }, + "construct": { + "hashes": [ + "sha256:2271a0efd0798679dea825ff47e22a4c550456a5db0ba8baa82f7eae0af0118c" + ], + "version": "==2.9.45" + }, + "cryptography": { + "hashes": [ + "sha256:abd070b5849ed64e6d349199bef955ee0ad99aefbad792f0c587f8effa681a5e", + "sha256:3f3b65d5a16e6b52fba63dc860b62ca9832f51f1a2ae5083c78b6840275f12dd", + "sha256:77d0ad229d47a6e0272d00f6bf8ac06ce14715a9fd02c9a97f5a2869aab3ccb2", + "sha256:808fe471b1a6b777f026f7dc7bd9a4959da4bfab64972f2bbe91e22527c1c037", + "sha256:6fef51ec447fe9f8351894024e94736862900d3a9aa2961528e602eb65c92bdb", + "sha256:60bda7f12ecb828358be53095fc9c6edda7de8f1ef571f96c00b2363643fa3cd", + "sha256:5cb990056b7cadcca26813311187ad751ea644712022a3976443691168781b6f", + "sha256:c332118647f084c983c6a3e1dba0f3bcb051f69d12baccac68db8d62d177eb8a", + "sha256:f57008eaff597c69cf692c3518f6d4800f0309253bb138b526a37fe9ef0c7471", + "sha256:551a3abfe0c8c6833df4192a63371aa2ff43afd8f570ed345d31f251d78e7e04", + "sha256:db6013746f73bf8edd9c3d1d3f94db635b9422f503db3fc5ef105233d4c011ab", + "sha256:d6f46e862ee36df81e6342c2177ba84e70f722d9dc9c6c394f9f1f434c4a5563", + "sha256:9b62fb4d18529c84b961efd9187fecbb48e89aa1a0f9f4161c61b7fc42a101bd", + "sha256:9e5bed45ec6b4f828866ac6a6bedf08388ffcfa68abe9e94b34bb40977aba531", + "sha256:f6c821ac253c19f2ad4c8691633ae1d1a17f120d5b01ea1d256d7b602bc59887", + "sha256:ba6a774749b6e510cffc2fb98535f717e0e5fd91c7c99a61d223293df79ab351", + "sha256:9fc295bf69130a342e7a19a39d7bbeb15c0bcaabc7382ec33ef3b2b7d18d2f63" + ], + "version": "==2.2.2" + }, + "h11": { + "hashes": [ + "sha256:af77d5d82fa027c032650fb8afdef3cd0a3735ba01480bee908cddad9be1bdce", + "sha256:1c0fbb1cba6f809fe3e6b27f8f6d517ca171f848922708871403636143d530d9" + ], + "version": "==0.7.0" + }, + "h2": { + "hashes": [ + "sha256:4be613e35caad5680dc48f98f3bf4e7338c7c429e6375a5137be7fbe45219981", + "sha256:b2962f883fa392a23cbfcc4ad03c335bcc661be0cf9627657b589f0df2206e64" + ], + "version": "==3.0.1" + }, + "hpack": { + "hashes": [ + "sha256:0edd79eda27a53ba5be2dfabf3b15780928a0dff6eb0c60a3d6767720e970c89", + "sha256:8eec9c1f4bfae3408a3f30500261f7e6a65912dc138526ea054f9ad98892e9d2" + ], + "version": "==3.0.0" + }, + "hyperframe": { + "hashes": [ + "sha256:87567c9eb1540de1e7f48805adf00e87856409342fdebd0cd20cf5d381c38b69", + "sha256:a25944539db36d6a2e47689e7915dcee562b3f8d10c6cdfa0d53c91ed692fb04" + ], + "version": "==5.1.0" + }, + "idna": { + "hashes": [ + "sha256:8c7309c718f94b3a625cb648ace320157ad16ff131ae0af362c9f21b80ef6ec4", + "sha256:2c6a5de3089009e3da7c5dde64a141dbc8551d5b7f6cf4ed7c2568d0cc520a8f" + ], + "version": "==2.6" + }, + "kaitaistruct": { + "hashes": [ + "sha256:d1d17c7f6839b3d28fc22b21295f787974786c2201e8788975e72e2a1d109ff5" + ], + "version": "==0.8" + }, + "ldap3": { + "hashes": [ + "sha256:ed3d5ac156f61ff06df0ef499569a970b544011a3e836296bd731e0da92f10c0", + "sha256:44c900354823218597e71de864c81d40be81b6f5fc6bcc109a2130a89a8f4fc8", + "sha256:7912093b2501a04b7a2fb9042f2504a8664c3543498186c6ef0421cbd2eb7331", + "sha256:d257500ea9b5af0ecca8c319fc3fb9b758f9f5e4b8441032cb681dee026c646a", + "sha256:e8fe0d55a8cecb725748c831ffac2873df94c05b2d7eb867ea167c0500bbc6a8", + "sha256:c4133692ff33e0a96780e6bd40f450545251d3e1786557c61d091eaeb2ef9138" + ], + "version": "==2.4.1" + }, + "mitmproxy": { + "hashes": [ + "sha256:ee2f71bb737e9dd32ca489cf8c12eca4b4dbdc1eeb89062366b2d261b800ad3a" + ], + "version": "==3.0.4" + }, + "passlib": { + "hashes": [ + "sha256:43526aea08fa32c6b6dbbbe9963c4c767285b78147b7437597f992812f69d280", + "sha256:3d948f64138c25633613f303bcc471126eae67c04d5e3f6b7b8ce6242f8653e0" + ], + "version": "==1.7.1" + }, + "pyasn1": { + "hashes": [ + "sha256:9a15cc13ff6bf5ed29ac936ca941400be050dff19630d6cd1df3fb978ef4c5ad", + "sha256:8fb265066eac1d3bb5015c6988981b009ccefd294008ff7973ed5f64335b0f2d", + "sha256:ba77f1e8d7d58abc42bfeddd217b545fdab4c1eeb50fd37c2219810ad56303bf", + "sha256:3651774ca1c9726307560792877db747ba5e8a844ea1a41feb7670b319800ab3", + "sha256:a66dcda18dbf6e4663bde70eb30af3fc4fe1acb2d14c4867a861681887a5f9a2", + "sha256:9334cb427609d2b1e195bb1e251f99636f817d7e3e1dffa150cb3365188fb992", + "sha256:d01fbba900c80b42af5c3fe1a999acf61e27bf0e452e0f1ef4619065e57622da", + "sha256:2f57960dc7a2820ea5a1782b872d974b639aa3b448ac6628d1ecc5d0fe3986f2", + "sha256:602fda674355b4701acd7741b2be5ac188056594bf1eecf690816d944e52905e", + "sha256:cdc8eb2eaafb56de66786afa6809cd9db2df1b3b595dcb25aa5b9dc61189d40a", + "sha256:f281bf11fe204f05859225ec2e9da7a7c140b65deccd8a4eb0bc75d0bd6949e0", + "sha256:fb81622d8f3509f0026b0683fe90fea27be7284d3826a5f2edf97f69151ab0fc" + ], + "version": "==0.4.3" + }, + "pycparser": { + "hashes": [ + "sha256:99a8ca03e29851d96616ad0404b4aad7d9ee16f25c9f9708a11faf2810f7b226" + ], + "version": "==2.18" + }, + "pyopenssl": { + "hashes": [ + "sha256:07a2de1a54de07448732a81e38a55df7da109b2f47f599f8bb35b0cbec69d4bd", + "sha256:2c10cfba46a52c0b0950118981d61e72c1e5b1aac451ca1bc77de1a679456773" + ], + "version": "==17.5.0" + }, + "pyparsing": { + "hashes": [ + "sha256:fee43f17a9c4087e7ed1605bd6df994c6173c1e977d7ade7b651292fab2bd010", + "sha256:0832bcf47acd283788593e7a0f542407bd9550a55a8a8435214a1960e04bcb04", + "sha256:9e8143a3e15c13713506886badd96ca4b579a87fbdf49e550dbfc057d6cb218e", + "sha256:281683241b25fe9b80ec9d66017485f6deff1af5cde372469134b56ca8447a07", + "sha256:b8b3117ed9bdf45e14dcc89345ce638ec7e0e29b2b579fa1ecf32ce45ebac8a5", + "sha256:8f1e18d3fd36c6795bb7e02a39fd05c611ffc2596c1e0d995d34d67630426c18", + "sha256:e4d45427c6e20a59bf4f88c639dcc03ce30d193112047f94012102f235853a58" + ], + "version": "==2.2.0" + }, + "pyperclip": { + "hashes": [ + "sha256:43496f0a1f363a5ecfc4cda5eba6a2a3d5056fe6c7ffb9a99fbb1c5a3c7dea05" + ], + "version": "==1.6.2" + }, + "ruamel.yaml": { + "hashes": [ + "sha256:e4d53f6a0c21d8effc23371927e8569096d0364d7c703b2e6956c6281b6bde2c", + "sha256:4b1929101d09612e0c7a42fbe06b0f929a4a89e1d14832353c1eb073580d3ba6", + "sha256:181699cc08b157ef8a59a77e96a01b5ffa150044ed4e49fd98428ab9ac0e6ed9", + "sha256:b6bc5f434d72a672dbe48471e70771789d5d93603716c9e36963fe1dc7a35718", + "sha256:6932e1ad63c805a41665a94e5d7b70808e9e25943f72afba6d327fede2aeb43d", + "sha256:dc051cd1fe541e321f6846bddba8e2c0de8ca409d51a6d9917c7b970d8d89a3d", + "sha256:656dcd3d30774ffe252e46db96f4cf24b284d42c904b93f9cbe6b234028f7d2e", + "sha256:039bb5b50a2f3b17c969ed1d381e050bca851e3c13fe8c2a9ad18f605ca111a5", + "sha256:f5ef82b8efe378de6abb7042263d6f407b0760ad923ed477fa26007b1fa0e563", + "sha256:cea830caa479ae083f51ffdb55fe430a2763e853a7b06195f203db6d28bf5264", + "sha256:882cacb8af5f7009780da75041ef131d0ec80d9e0b81d3cf8d4b49a0a33fe6ef", + "sha256:1d46053cb7acf0cd6b375e34abfb94f2e97c39269c17eb8b0226fe8a470c4ced", + "sha256:2d1df676ac75fb5e0af7b91f7718a4b4f469a5d8ac4150edecc61f063283bbee", + "sha256:759b485e8cda260bd87b7cdd2ad936a0ec359ee6154a9d856357446792b3faf5", + "sha256:7afefe5dab4381393a2aa7ccb585ffd6080d52e7cd05f1df3788e9d0e4dfcea9", + "sha256:766ee90985c667f77bf34950b1d945624c263ecb82d859961f78effb3355c946", + "sha256:509842d96fb194f79b57483b76429f8956d8f7ade3cb49d1e5aeb5c5e9ef4918" + ], + "version": "==0.15.37" + }, + "six": { + "hashes": [ + "sha256:832dc0e10feb1aa2c68dcc57dbb658f1c7e65b9b61af69048abc87a2db00a0eb", + "sha256:70e8a77beed4562e7f14fe23a786b54f6296e34344c23bc42f07b15018ff98e9" + ], + "version": "==1.11.0" + }, + "sortedcontainers": { + "hashes": [ + "sha256:fa96e9920a37bde76bfdcaca919a125c1d2e581af1137e25de54ee0da7835282", + "sha256:566cf5f8dbada3aed99737a19d98f03d15d76bf2a6c27e4fb0f4a718a99be761" + ], + "version": "==1.5.10" + }, + "tornado": { + "hashes": [ + "sha256:88ce0282cce70df9045e515f578c78f1ebc35dcabe1d70f800c3583ebda7f5f5", + "sha256:ba9fbb249ac5390bff8a1d6aa4b844fd400701069bda7d2e380dfe2217895101", + "sha256:408d129e9d13d3c55aa73f8084aa97d5f90ed84132e38d6932e63a67d5bec563", + "sha256:c050089173c2e9272244bccfb6a8615fb9e53b79420a5551acfa76094ecc3111", + "sha256:1b83d5c10550f2653380b4c77331d6f8850f287c4f67d7ce1e1c639d9222fbc7" + ], + "version": "==5.0.2" + }, + "urwid": { + "hashes": [ + "sha256:644d3e3900867161a2fc9287a9762753d66bd194754679adb26aede559bcccbc" + ], + "version": "==2.0.1" + }, + "wsproto": { + "hashes": [ + "sha256:d2a7f718ab3144ec956a3267d57b5c172f0668827f5803e7d670837b0125b9fa", + "sha256:02f214f6bb43cda62a511e2e8f1d5fa4703ed83d376d18d042bd2bbf2e995824" + ], + "version": "==0.11.0" + } + }, + "develop": {} +} diff --git a/src/test/regress/expected/failure_setup.out b/src/test/regress/expected/failure_setup.out new file mode 100644 index 000000000..666f87960 --- /dev/null +++ b/src/test/regress/expected/failure_setup.out @@ -0,0 +1,19 @@ +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +-- add the workers +SELECT master_add_node('localhost', :worker_1_port); -- the second worker + master_add_node +--------------------------------------------------- + (1,1,localhost,57637,default,f,t,primary,default) +(1 row) + +SELECT master_add_node('localhost', :worker_2_port + 2); -- the first worker, behind a mitmproxy + master_add_node +--------------------------------------------------- + (2,2,localhost,57640,default,f,t,primary,default) +(1 row) + diff --git a/src/test/regress/expected/failure_test_helpers.out b/src/test/regress/expected/failure_test_helpers.out new file mode 100644 index 000000000..5362412a1 --- /dev/null +++ b/src/test/regress/expected/failure_test_helpers.out @@ -0,0 +1,50 @@ +-- By default Citus makes lots of connections in the background which fill up the log +-- By tweaking these settings you can make sure you only capture packets related to what +-- you're doing +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO -1; +ALTER SYSTEM set citus.enable_statistics_collection TO false; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +-- Add some helper functions for sending commands to mitmproxy +CREATE FUNCTION citus.mitmproxy(text) RETURNS TABLE(result text) AS $$ +DECLARE + command ALIAS FOR $1; +BEGIN + CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP; + CREATE TEMPORARY TABLE mitmproxy_result (res text) ON COMMIT DROP; + + INSERT INTO mitmproxy_command VALUES (command); + + EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo')); + EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo')); + + RETURN QUERY SELECT * FROM mitmproxy_result; +END; +$$ LANGUAGE plpgsql; +CREATE FUNCTION citus.clear_network_traffic() RETURNS void AS $$ +BEGIN + PERFORM citus.mitmproxy('recorder.reset()'); + RETURN; -- return void +END; +$$ LANGUAGE plpgsql; +CREATE FUNCTION citus.dump_network_traffic() +RETURNS TABLE(conn int, source text, message text) AS $$ +BEGIN + CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP; + CREATE TEMPORARY TABLE mitmproxy_result ( + conn int, source text, message text + ) ON COMMIT DROP; + + INSERT INTO mitmproxy_command VALUES ('recorder.dump()'); + + EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo')); + EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo')); + + RETURN QUERY SELECT * FROM mitmproxy_result; +END; +$$ LANGUAGE plpgsql; diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule new file mode 100644 index 000000000..3b5e61bea --- /dev/null +++ b/src/test/regress/failure_schedule @@ -0,0 +1,5 @@ +# import this file (from psql you can use \i) to use mitmproxy manually +test: failure_test_helpers + +# this should only be run by pg_regress_multi, you don't need it +test: failure_setup diff --git a/src/test/regress/mitmscripts/.gitignore b/src/test/regress/mitmscripts/.gitignore new file mode 100644 index 000000000..bee8a64b7 --- /dev/null +++ b/src/test/regress/mitmscripts/.gitignore @@ -0,0 +1 @@ +__pycache__ diff --git a/src/test/regress/mitmscripts/README b/src/test/regress/mitmscripts/README new file mode 100644 index 000000000..c257a78d4 --- /dev/null +++ b/src/test/regress/mitmscripts/README @@ -0,0 +1,169 @@ +Automated Failure testing +========================= + +Automated Failure Testing works by inserting a network proxy (mitmproxy) between the +citus coordinator and one of the workers (connections to the other worker are left +unchanged). The proxy is configurable, and sits on a fifo waiting for commands. When it +receives a command over the fifo it reconfigures itself and sends back response. +Regression tests which use automated failure testing communicate with mitmproxy by running +special UDFs which talk to said fifo. The tests send commands such as "fail any connection +which contain the string 'COMMIT'" and then run SQL queries and assert that the +coordinator has reasonable behavior when the specified failures occur. + +Contents of this file: + I. Getting Started + II. Running mitmproxy manually + III. citus.mitmproxy() command strings + IV. Recording Network Traffic + +# I. Getting Started + +First off, to use this you'll need mitmproxy, I recommend version 3.0.4, and I also +recommend running it with python 3.6. This script integrates pretty deeply with mitmproxy +so other versions might fail to work. + +I highly recommend using pipenv to install mitmproxy. It lets you easily manage isolated +environments (instead of installing python packages globally). If you've heard of +virtualenv, pipenv is that but much easier to use. + +Once you've installed it: + +$ cd src/test/regress +$ pipenv --python 3.6 +$ pipenv install # there's already a Pipfile.lock in src/test/regress with packages +$ pipenv shell # this enters the virtual environment, putting mitmproxy onto $PATH + +That's all you need to do to run the failure tests: + +$ make check-failure + +# II. Running mitmproxy manually + +$ mkfifo /tmp/mitm.fifo # first, you need a fifo +$ cd src/test/regress +$ pipenv shell +$ mitmdump --rawtcp -p 9702 --mode reverse:localhost:9700 -s mitmscripts/fluent.py --set fifo=/tmp/mitm.fifo + +The specific port numbers will be different depending on your setup. The above string +means mitmdump will accept connections on port 9702 and forward them to the worker +listening on port 9700. + +Now, open psql and run: + +# UPDATE pg_dist_node SET nodeport = 9702 WHERE nodeport = 9700; + +Again, the specific port numbers depend on your setup. + +# \i src/test/regress/sql/failure_test_helpers.sql + +The above file creates some UDFs and also disables a few citus features which make +connections in the background. + +You also want to tell the UDFs how to talk to mitmproxy (careful, this must be an absolute +path): + +# SET citus.mitmfifo = '/tmp/mitm.fifo'; + +(nb: this GUC does not appear in shared_library_init.c, Postgres allows setting and +reading GUCs which have not been defined by any extension) + +You're all ready! If it worked, you should be able to run this: + +# SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +# III. citus.mitmproxy() command strings + +Command strings specify a pipline. Each connection is handled individually, and the +pipeline is called once for every packet which is sent. For example, given this string: + +`conn.onQuery().after(2).kill()` -> kill a connection if three Query packets are seen + +- onQuery() is a filter. It only passes Query packets (packets which the frontend sends +to the backend which specify a query which is to be run) onto the next step of the +pipeline. + +- after(2) is another filter, it ignores the first two packets which are sent to it, then +sends the following packets to the next step of the pipeline. + +- kill() is an action, when a packet reaches it the connection containing that packet will +be killed. + +## Actions + +There are 5 actions you can take on connections: + +conn.allow() - the default, allows all connections to execute unmodified +conn.kill() - kills all connections immediately after the first packet is sent +conn.reset() - kill() calls shutdown(SHUT_WR), shutdown(SHUT_RD), close(). This is a very + graceful way to close the socket. reset() causes a RST packet to be sent + and forces the connection closed in something more resembling an error. +conn.cancel(pid) - This doesn't cause any changes at the network level. Instead it sends + a SIGINT to pid and introduces a short delay, with hopes that the + signal will be received before the delay ends. You can use it to write + cancellation tests. + +The previous actions all work on a per-connection basis. Meaning, each connection is +tracked individually. A command such as `conn.onQuery().kill()` will only kill the +connection on which the Query packet was seen. A command such as +`conn.onQuery().after(2).kill()` will never trigger if each Query is sent on a different +connection, even if you send dozens of Query packets. + +The final action works a bit differently: + +conn.killall() - the killall() command kills this and all subsequent connections. Any + packets sent once it triggers will have their connections killed. + +## Filters + +conn.onQuery().kill() - kill a connection once a "Query" packet is seen +conn.onCopyData().kill() - kill a connection once a "CopyData" packet is seen + +The list of supported packets can be found in ./structs.py, and the list of packets which +could be supported can be found at: +https://www.postgresql.org/docs/current/static/protocol-message-formats.html + +You can also inspect the contents of packets: + +conn.onQuery(query="COMMIT").kill() - you can look into the actual query which is sent and + match on its contents (this is always a regex) +conn.onQuery(query="^COMMIT").kill() - the query must start with COMMIT +conn.onQuery(query="pg_table_size\(") - you must escape parens, since you're in a regex + +after(n) matches after the n-th packet has been sent: + +conn.after(2).kill() - Kill connections when the third packet is sent down them + +There's also a low-level filter which runs a regex against the raw content of the packet: + +conn.matches(b"^Q").kill() - this is another way of writing conn.onQuery(). Note the 'b', + it's always required. + +## Chaining: + +Filters and actions can be arbitrarily chained: + +conn.matches(b"^Q").after(2).kill() - kill any connection when the third Query is sent + +# IV. Recording Network Traffic + +There are also some special commands. This proxy also records every packet and lets you +inspect them: + +recorder.dump() - emits a list of captured packets in COPY text format +recorder.reset() - empties the data structure containing the captured packets + +Both of those calls empty the structure containing the packets, a call to dump() will only +return the packets which were captured since the last call to .dump() or reset() + +Back when you called `\i sql/failure_test_helpers.sql` you created some UDFs which make +using these strings easier. Here are some commands you can run from psql, or from inside +failure tests: + +citus.clear_network_traffic() - this empties the buffer containing captured packets +citus.dump_network_traffic() - this returns a little table and pretty-prints information + on all the packets captured since the last call to + clear_network_traffic() or dump_network_traffic() diff --git a/src/test/regress/mitmscripts/fluent.py b/src/test/regress/mitmscripts/fluent.py new file mode 100644 index 000000000..5cb9b32dd --- /dev/null +++ b/src/test/regress/mitmscripts/fluent.py @@ -0,0 +1,425 @@ +import re +import os +import pprint +import signal +import socket +import struct +import threading +import time +import traceback +import queue + +from mitmproxy import ctx +from mitmproxy.utils import strutils +from mitmproxy.proxy.protocol import TlsLayer, RawTCPLayer + +import structs + +# I. Command Strings + +class Stop(Exception): + pass + +class Handler: + ''' + This class hierarchy serves two purposes: + 1. Allow command strings to be evaluated. Once evaluated you'll have a Handler you can + pass packets to + 2. Process packets as they come in and decide what to do with them. + + Subclasses which want to change how packets are handled should override _handle. + ''' + def __init__(self, root=None): + # all packets are first sent to the root handler to be processed + self.root = root if root else self + # all handlers keep track of the next handler so they know where to send packets + self.next = None + + def _accept(self, flow, message): + result = self._handle(flow, message) + + if result == 'pass': + # defer to our child + if not self.next: + raise Exception("we don't know what to do!") + + try: + self.next._accept(flow, message) + except Stop: + if self.root is not self: + raise + self.next = KillHandler(self) + flow.kill() + elif result == 'done': + # stop processing this packet, move on to the next one + return + elif result == 'stop': + # from now on kill all connections + raise Stop() + + def _handle(self, flow, message): + ''' + Handlers can return one of three things: + - "done" tells the parent to stop processing. This performs the default action, + which is to allow the packet to be sent. + - "pass" means to delegate to self.next and do whatever it wants + - "stop" means all processing will stop, and all connections will be killed + ''' + # subclasses must implement this + raise NotImplementedError() + +class FilterableMixin: + def contains(self, pattern): + self.next = Contains(self.root, pattern) + return self.next + + def matches(self, pattern): + self.next = Matches(self.root, pattern) + return self.next + + def after(self, times): + self.next = After(self.root, times) + return self.next + + def __getattr__(self, attr): + ''' + Methods such as .onQuery trigger when a packet with that name is intercepted + + Adds support for commands such as: + conn.onQuery(query="COPY") + + Returns a function because the above command is resolved in two steps: + conn.onQuery becomes conn.__getattr__("onQuery") + conn.onQuery(query="COPY") becomes conn.__getattr__("onQuery")(query="COPY") + ''' + if attr.startswith('on'): + def doit(**kwargs): + self.next = OnPacket(self.root, attr[2:], kwargs) + return self.next + return doit + raise AttributeError + +class ActionsMixin: + def kill(self): + self.next = KillHandler(self.root) + return self.next + + def allow(self): + self.next = AcceptHandler(self.root) + return self.next + + def killall(self): + self.next = KillAllHandler(self.root) + return self.next + + def reset(self): + self.next = ResetHandler(self.root) + return self.next + + def cancel(self, pid): + self.next = CancelHandler(self.root, pid) + return self.next + +class AcceptHandler(Handler): + def __init__(self, root): + super().__init__(root) + def _handle(self, flow, message): + return 'done' + +class KillHandler(Handler): + def __init__(self, root): + super().__init__(root) + def _handle(self, flow, message): + flow.kill() + return 'done' + +class KillAllHandler(Handler): + def __init__(self, root): + super().__init__(root) + def _handle(self, flow, message): + return 'stop' + +class ResetHandler(Handler): + # try to force a RST to be sent, something went very wrong! + def __init__(self, root): + super().__init__(root) + def _handle(self, flow, message): + flow.kill() # tell mitmproxy this connection should be closed + + # this is a mitmproxy.connections.ClientConnection(mitmproxy.tcp.BaseHandler) + client_conn = flow.client_conn + # this is a regular socket object + conn = client_conn.connection + + # cause linux to send a RST + LINGER_ON, LINGER_TIMEOUT = 1, 0 + conn.setsockopt( + socket.SOL_SOCKET, socket.SO_LINGER, + struct.pack('ii', LINGER_ON, LINGER_TIMEOUT) + ) + conn.close() + + # closing the connection isn't ideal, this thread later crashes when mitmproxy + # tries to call conn.shutdown(), but there's nothing else to clean up so that's + # maybe okay + + return 'done' + +class CancelHandler(Handler): + 'Send a SIGINT to the process' + def __init__(self, root, pid): + super().__init__(root) + self.pid = pid + def _handle(self, flow, message): + os.kill(self.pid, signal.SIGINT) + # give the signal a chance to be received before we let the packet through + time.sleep(0.1) + return 'done' + +class Contains(Handler, ActionsMixin, FilterableMixin): + def __init__(self, root, pattern): + super().__init__(root) + self.pattern = pattern + + def _handle(self, flow, message): + if self.pattern in message.content: + return 'pass' + return 'done' + +class Matches(Handler, ActionsMixin, FilterableMixin): + def __init__(self, root, pattern): + super().__init__(root) + self.pattern = re.compile(pattern) + + def _handle(self, flow, message): + if self.pattern.search(message.content): + return 'pass' + return 'done' + +class After(Handler, ActionsMixin, FilterableMixin): + "Don't pass execution to our child until we've handled 'times' messages" + def __init__(self, root, times): + super().__init__(root) + self.target = times + + def _handle(self, flow, message): + if not hasattr(flow, '_after_count'): + flow._after_count = 0 + + if flow._after_count >= self.target: + return 'pass' + + flow._after_count += 1 + return 'done' + +class OnPacket(Handler, ActionsMixin, FilterableMixin): + '''Triggers when a packet of the specified kind comes around''' + def __init__(self, root, packet_kind, kwargs): + super().__init__(root) + self.packet_kind = packet_kind + self.filters = kwargs + def _handle(self, flow, message): + if not message.parsed: + # if this is the first message in the connection we just skip it + return 'done' + for msg in message.parsed: + typ = structs.message_type(msg, from_frontend=message.from_client) + if typ == self.packet_kind: + matches = structs.message_matches(msg, self.filters, message.from_client) + if matches: + return 'pass' + return 'done' + +class RootHandler(Handler, ActionsMixin, FilterableMixin): + def _handle(self, flow, message): + # do whatever the next Handler tells us to do + return 'pass' + +class RecorderCommand: + def __init__(self): + self.root = self + self.command = None + + def dump(self): + # When the user calls dump() we return everything we've captured + self.command = 'dump' + return self + + def reset(self): + # If the user calls reset() we dump all captured packets without returning them + self.command = 'reset' + return self + +# II. Utilities for interfacing with mitmproxy + +def build_handler(spec): + 'Turns a command string into a RootHandler ready to accept packets' + root = RootHandler() + recorder = RecorderCommand() + handler = eval(spec, {'__builtins__': {}}, {'conn': root, 'recorder': recorder}) + return handler.root + +# a bunch of globals + +handler = None # the current handler used to process packets +command_thread = None # sits on the fifo and waits for new commands to come in +command_queue = queue.Queue() # we poll this from the main thread and apply commands +response_queue = queue.Queue() # the main thread uses this to reply to command_thread +captured_messages = queue.Queue() # where we store messages used for recorder.dump() +connection_count = 0 # so we can give connections ids in recorder.dump() + +def listen_for_commands(fifoname): + + def emit_row(conn, from_client, message): + # we're using the COPY text format. It requires us to escape backslashes + cleaned = message.replace('\\', '\\\\') + source = 'coordinator' if from_client else 'worker' + return '{}\t{}\t{}'.format(conn, source, cleaned) + + def emit_message(message): + if message.is_initial: + return emit_row( + message.connection_id, message.from_client, '[initial message]' + ) + + pretty = structs.print(message.parsed) + return emit_row(message.connection_id, message.from_client, pretty) + + def handle_recorder(recorder): + global connection_count + result = '' + + if recorder.command is 'reset': + result = '' + connection_count = 0 + elif recorder.command is not 'dump': + # this should never happen + raise Exception('Unrecognized command: {}'.format(recorder.command)) + + try: + results = [] + while True: + message = captured_messages.get(block=False) + if recorder.command is 'reset': + continue + results.append(emit_message(message)) + except queue.Empty: + pass + result = '\n'.join(results) + + with open(fifoname, mode='w') as fifo: + fifo.write('{}'.format(result)) + + while True: + with open(fifoname, mode='r') as fifo: + slug = fifo.read() + + try: + handler = build_handler(slug) + if isinstance(handler, RecorderCommand): + handle_recorder(handler) + continue + except Exception as e: + traceback.print_exc() + result = str(e) + else: + result = None + + if not result: + command_queue.put(slug) + result = response_queue.get() + + with open(fifoname, mode='w') as fifo: + fifo.write('{}\n'.format(result)) + +def create_thread(fifoname): + global command_thread + + if not fifoname: + return + if not len(fifoname): + return + + if command_thread: + print('cannot change the fifo path once mitmproxy has started'); + return + + command_thread = threading.Thread(target=listen_for_commands, args=(fifoname,), daemon=True) + command_thread.start() + +# III. mitmproxy callbacks + +def load(loader): + loader.add_option('slug', str, 'conn.allow()', "A script to run") + loader.add_option('fifo', str, '', "Which fifo to listen on for commands") + + +def tick(): + # we do this crazy dance because ctx isn't threadsafe, it is only useable while a + # callback (such as this one) is being called. + try: + slug = command_queue.get_nowait() + except queue.Empty: + return + + try: + ctx.options.update(slug=slug) + except Exception as e: + response_queue.put(str(e)) + else: + response_queue.put('') + + +def configure(updated): + global handler + + if 'slug' in updated: + text = ctx.options.slug + handler = build_handler(text) + + if 'fifo' in updated: + fifoname = ctx.options.fifo + create_thread(fifoname) + + +def next_layer(layer): + ''' + mitmproxy wasn't really meant for intercepting raw tcp streams, it tries to wrap the + upsteam connection (the one to the worker) in a tls stream. This hook intercepts the + part where it creates the TlsLayer (it happens in root_context.py) and instead creates + a RawTCPLayer. That's the layer which calls our tcp_message hook + ''' + if isinstance(layer, TlsLayer): + replacement = RawTCPLayer(layer.ctx) + layer.reply.send(replacement) + + +def tcp_message(flow): + ''' + This callback is hit every time mitmproxy receives a packet. It's the main entrypoint + into this script. + ''' + global connection_count + + tcp_msg = flow.messages[-1] + + # Keep track of all the different connections, assign a unique id to each + if not hasattr(flow, 'connection_id'): + flow.connection_id = connection_count + connection_count += 1 # this is not thread safe but I think that's fine + tcp_msg.connection_id = flow.connection_id + + # The first packet the frontend sends shounld be parsed differently + tcp_msg.is_initial = len(flow.messages) == 1 + + if tcp_msg.is_initial: + # skip parsing initial messages for now, they're not important + tcp_msg.parsed = None + else: + tcp_msg.parsed = structs.parse(tcp_msg.content, from_frontend=tcp_msg.from_client) + + # record the message, for debugging purposes + captured_messages.put(tcp_msg) + + # okay, finally, give the packet to the command the user wants us to use + handler._accept(flow, tcp_msg) diff --git a/src/test/regress/mitmscripts/structs.py b/src/test/regress/mitmscripts/structs.py new file mode 100644 index 000000000..ebeed61a3 --- /dev/null +++ b/src/test/regress/mitmscripts/structs.py @@ -0,0 +1,410 @@ +from construct import ( + Struct, + Int8ub, Int16ub, Int32ub, Int16sb, Int32sb, + Bytes, CString, Computed, Switch, Seek, this, Pointer, + GreedyRange, Enum, Byte, Probe, FixedSized, RestreamData, GreedyBytes, Array +) +import construct.lib as cl + +import re + +class MessageMeta(type): + def __init__(cls, name, bases, namespace): + ''' + __init__ is called every time a subclass of MessageMeta is declared + ''' + if not hasattr(cls, "_msgtypes"): + raise Exception("classes which use MessageMeta must have a '_msgtypes' field") + + if not hasattr(cls, "_classes"): + raise Exception("classes which use MessageMeta must have a '_classes' field") + + if not hasattr(cls, "struct"): + # This is one of the direct subclasses + return + + if cls.__name__ in cls._classes: + raise Exception("You've already made a class called {}".format( cls.__name__)) + cls._classes[cls.__name__] = cls + + # add a _type field to the struct so we can identify it while printing structs + cls.struct = cls.struct + ("_type" / Computed(name)) + + if not hasattr(cls, "key"): + return + + # register the type, so we can tell the parser about it + key = cls.key + if key in cls._msgtypes: + raise Exception('key {} is already assigned to {}'.format( + key, cls._msgtypes[key].__name__) + ) + cls._msgtypes[key] = cls + +class Message: + 'Do not subclass this object directly. Instead, subclass of one of the below types' + + def print(message): + 'Define this on subclasses you want to change the representation of' + raise NotImplementedError + + def typeof(message): + 'Define this on subclasses you want to change the expressed type of' + return message._type + + @classmethod + def _default_print(cls, name, msg): + recur = cls.print_message + return "{}({})".format(name, ",".join( + "{}={}".format(key, recur(value)) for key, value in msg.items() + if not key.startswith('_') + )) + + @classmethod + def find_typeof(cls, msg): + if not hasattr(cls, "_msgtypes"): + raise Exception('Do not call this method on Message, call it on a subclass') + if isinstance(msg, cl.ListContainer): + return ValueError("do not call this on a list of messages") + if not isinstance(msg, cl.Container): + return ValueError("must call this on a parsed message") + if not hasattr(msg, "_type"): + return "Anonymous" + if msg._type and msg._type not in cls._classes: + return msg._type + return cls._classes[msg._type].typeof(msg) + + @classmethod + def print_message(cls, msg): + if not hasattr(cls, "_msgtypes"): + raise Exception('Do not call this method on Message, call it on a subclass') + + if isinstance(msg, cl.ListContainer): + return repr([cls.print_message(message) for message in msg]) + + if not isinstance(msg, cl.Container): + return msg + + if not hasattr(msg, "_type"): + return cls._default_print("Anonymous", msg) + + if msg._type and msg._type not in cls._classes: + return cls._default_print(msg._type, msg) + + try: + return cls._classes[msg._type].print(msg) + except NotImplementedError: + return cls._default_print(msg._type, msg) + + @classmethod + def name_to_struct(cls): + return { + _class.__name__: _class.struct + for _class in cls._msgtypes.values() + } + + @classmethod + def name_to_key(cls): + return { + _class.__name__ : ord(key) + for key, _class in cls._msgtypes.items() + } + +class SharedMessage(Message, metaclass=MessageMeta): + 'A message which could be sent by either the frontend or the backend' + _msgtypes = dict() + _classes = dict() + +class FrontendMessage(Message, metaclass=MessageMeta): + 'A message which will only be sent be a backend' + _msgtypes = dict() + _classes = dict() + +class BackendMessage(Message, metaclass=MessageMeta): + 'A message which will only be sent be a frontend' + _msgtypes = dict() + _classes = dict() + +class Query(FrontendMessage): + key = 'Q' + struct = Struct( + "query" / CString("ascii") + ) + + @staticmethod + def print(message): + query = message.query + query = Query.normalize_shards(query) + query = Query.normalize_timestamps(query) + query = Query.normalize_assign_txn_id(query) + return "Query(query={})".format(query) + + @staticmethod + def normalize_shards(content): + ''' + For example: + >>> normalize_shards( + >>> 'COPY public.copy_test_120340 (key, value) FROM STDIN WITH (FORMAT BINARY))' + >>> ) + 'COPY public.copy_test_XXXXXX (key, value) FROM STDIN WITH (FORMAT BINARY))' + ''' + result = content + pattern = re.compile('public\.[a-z_]+(?P[0-9]+)') + for match in pattern.finditer(content): + span = match.span('shardid') + replacement = 'X'*( span[1] - span[0] ) + result = result[:span[0]] + replacement + result[span[1]:] + return result + + @staticmethod + def normalize_timestamps(content): + ''' + For example: + >>> normalize_timestamps('2018-06-07 05:18:19.388992-07') + 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX' + >>> normalize_timestamps('2018-06-11 05:30:43.01382-07') + 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX' + ''' + + pattern = re.compile( + '[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{2,6}-[0-9]{2}' + ) + + return re.sub(pattern, 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX', content) + + @staticmethod + def normalize_assign_txn_id(content): + ''' + For example: + >>> normalize_assign_txn_id('SELECT assign_distributed_transaction_id(0, 52, ...') + 'SELECT assign_distributed_transaction_id(0, XX, ...' + ''' + + pattern = re.compile( + 'assign_distributed_transaction_id\s*\(' # a method call + '\s*[0-9]+\s*,' # an integer first parameter + '\s*(?P[0-9]+)' # an integer second parameter + ) + result = content + for match in pattern.finditer(content): + span = match.span('transaction_id') + result = result[:span[0]] + 'XX' + result[span[1]:] + return result + +class Terminate(FrontendMessage): + key = 'X' + struct = Struct() + +class CopyData(SharedMessage): + key = 'd' + struct = Struct( + 'data' / GreedyBytes # reads all of the data left in this substream + ) + +class CopyDone(SharedMessage): + key = 'c' + struct = Struct() + +class EmptyQueryResponse(BackendMessage): + key = 'I' + struct = Struct() + +class CopyOutResponse(BackendMessage): + key = 'H' + struct = Struct( + "format" / Int8ub, + "columncount" / Int16ub, + "columns" / Array(this.columncount, Struct( + "format" / Int16ub + )) + ) + +class ReadyForQuery(BackendMessage): + key='Z' + struct = Struct("state"/Enum(Byte, + idle=ord('I'), + in_transaction_block=ord('T'), + in_failed_transaction_block=ord('E') + )) + +class CommandComplete(BackendMessage): + key = 'C' + struct = Struct( + "command" / CString("ascii") + ) + +class RowDescription(BackendMessage): + key = 'T' + struct = Struct( + "fieldcount" / Int16ub, + "fields" / Array(this.fieldcount, Struct( + "_type" / Computed("F"), + "name" / CString("ascii"), + "tableoid" / Int32ub, + "colattrnum" / Int16ub, + "typoid" / Int32ub, + "typlen" / Int16sb, + "typmod" / Int32sb, + "format_code" / Int16ub, + )) + ) + +class DataRow(BackendMessage): + key = 'D' + struct = Struct( + "_type" / Computed("data_row"), + "columncount" / Int16ub, + "columns" / Array(this.columncount, Struct( + "_type" / Computed("C"), + "length" / Int16sb, + "value" / Bytes(this.length) + )) + ) + +class AuthenticationOk(BackendMessage): + key = 'R' + struct = Struct() + +class ParameterStatus(BackendMessage): + key = 'S' + struct = Struct( + "name" / CString("ASCII"), + "value" / CString("ASCII"), + ) + + def print(message): + name, value = ParameterStatus.normalize(message.name, message.value) + return "ParameterStatus({}={})".format(name, value) + + @staticmethod + def normalize(name, value): + if name in ('TimeZone', 'server_version'): + value = 'XXX' + return (name, value) + +class BackendKeyData(BackendMessage): + key = 'K' + struct = Struct( + "pid" / Int32ub, + "key" / Bytes(4) + ) + + def print(message): + # Both of these should be censored, for reproducible regression test output + return "BackendKeyData(XXX)" + +frontend_switch = Switch( + this.type, + { **FrontendMessage.name_to_struct(), **SharedMessage.name_to_struct() }, + default=Bytes(this.length - 4) +) + +backend_switch = Switch( + this.type, + {**BackendMessage.name_to_struct(), **SharedMessage.name_to_struct()}, + default=Bytes(this.length - 4) +) + +frontend_msgtypes = Enum(Byte, **{ + **FrontendMessage.name_to_key(), + **SharedMessage.name_to_key() +}) + +backend_msgtypes = Enum(Byte, **{ + **BackendMessage.name_to_key(), + **SharedMessage.name_to_key() +}) + +# It might seem a little circuitous to say a frontend message is a kind of frontend +# message but this lets us easily customize how they're printed + +class Frontend(FrontendMessage): + struct = Struct( + "type" / frontend_msgtypes, + "length" / Int32ub, # "32-bit unsigned big-endian" + "raw_body" / Bytes(this.length - 4), + # try to parse the body into something more structured than raw bytes + "body" / RestreamData(this.raw_body, frontend_switch), + ) + + def print(message): + if isinstance(message.body, bytes): + return "Frontend(type={},body={})".format( + chr(message.type), message.body + ) + return FrontendMessage.print_message(message.body) + + def typeof(message): + if isinstance(message.body, bytes): + return "Unknown" + return message.body._type + +class Backend(BackendMessage): + struct = Struct( + "type" / backend_msgtypes, + "length" / Int32ub, # "32-bit unsigned big-endian" + "raw_body" / Bytes(this.length - 4), + # try to parse the body into something more structured than raw bytes + "body" / RestreamData(this.raw_body, backend_switch), + ) + + def print(message): + if isinstance(message.body, bytes): + return "Backend(type={},body={})".format( + chr(message.type), message.body + ) + return BackendMessage.print_message(message.body) + + def typeof(message): + if isinstance(message.body, bytes): + return "Unknown" + return message.body._type + +# GreedyRange keeps reading messages until we hit EOF +frontend_messages = GreedyRange(Frontend.struct) +backend_messages = GreedyRange(Backend.struct) + +def parse(message, from_frontend=True): + if from_frontend: + message = frontend_messages.parse(message) + else: + message = backend_messages.parse(message) + message.from_frontend = from_frontend + + return message + +def print(message): + if message.from_frontend: + return FrontendMessage.print_message(message) + return BackendMessage.print_message(message) + +def message_type(message, from_frontend): + if from_frontend: + return FrontendMessage.find_typeof(message) + return BackendMessage.find_typeof(message) + +def message_matches(message, filters, from_frontend): + ''' + Message is something like Backend(Query)) and fiters is something like query="COPY". + + For now we only support strings, and treat them like a regex, which is matched against + the content of the wrapped message + ''' + if message._type != 'Backend' and message._type != 'Frontend': + raise ValueError("can't handle {}".format(message._type)) + + wrapped = message.body + if isinstance(wrapped, bytes): + # we don't know which kind of message this is, so we can't match against it + return False + + for key, value in filters.items(): + if not isinstance(value, str): + raise ValueError("don't yet know how to handle {}".format(type(value))) + + actual = getattr(wrapped, key) + + if not re.search(value, actual): + return False + + return True diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index e1b2e0a31..6274a8363 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -19,6 +19,8 @@ use Getopt::Long; use File::Spec::Functions; use File::Path qw(make_path remove_tree); use Config; +use POSIX qw( WNOHANG mkfifo ); +use Cwd 'abs_path'; sub Usage() { @@ -42,6 +44,7 @@ sub Usage() print " --valgrind-log-file Path to the write valgrind logs\n"; print " --pg_ctl-timeout Timeout for pg_ctl\n"; print " --connection-timeout Timeout for connecting to worker nodes\n"; + print " --mitmproxy Start a mitmproxy for one of the workers\n"; exit 1; } @@ -67,9 +70,12 @@ my $valgrindPath = "valgrind"; my $valgrindLogFile = "valgrind_test_log.txt"; my $pgCtlTimeout = undef; my $connectionTimeout = 5000; +my $useMitmproxy = 0; +my $mitmFifoPath = catfile("tmp_check", "mitmproxy.fifo"); my $serversAreShutdown = "TRUE"; my $usingWindows = 0; +my $mitmPid = 0; if ($Config{osname} eq "MSWin32") { @@ -93,6 +99,7 @@ GetOptions( 'valgrind-log-file=s' => \$valgrindLogFile, 'pg_ctl-timeout=s' => \$pgCtlTimeout, 'connection-timeout=s' => \$connectionTimeout, + 'mitmproxy' => \$useMitmproxy, 'help' => sub { Usage() }); # Update environment to include [DY]LD_LIBRARY_PATH/LIBDIR/etc - @@ -181,6 +188,11 @@ are present. MESSAGE } +if ($useMitmproxy) +{ + system("mitmdump --version") == 0 or die "make sure mitmdump is on PATH"; +} + # If pgCtlTimeout is defined, we will set related environment variable. # This is generally used with valgrind because valgrind starts slow and we # need to increase timeout. @@ -292,6 +304,23 @@ push(@pgOptions, '-c', "citus.remote_task_check_interval=1ms"); push(@pgOptions, '-c', "citus.shard_replication_factor=2"); push(@pgOptions, '-c', "citus.node_connection_timeout=${connectionTimeout}"); +if ($useMitmproxy) +{ + # make tests reproducible by never trying to negotiate ssl + push(@pgOptions, '-c', "citus.node_conninfo=sslmode=disable"); +} + +if ($useMitmproxy) +{ + if (! -e "tmp_check") + { + make_path("tmp_check") or die 'could not create tmp_check directory'; + } + my $absoluteFifoPath = abs_path($mitmFifoPath); + die 'abs_path returned empty string' unless ($absoluteFifoPath ne ""); + push(@pgOptions, '-c', "citus.mitmfifo=$absoluteFifoPath"); +} + if ($followercluster) { push(@pgOptions, '-c', "max_wal_senders=10"); @@ -507,10 +536,48 @@ sub ShutdownServers() or warn "Could not shutdown worker server"; } } + if ($mitmPid != 0) + { + # '-' means signal the process group, 2 is SIGINT + kill(-2, $mitmPid) or warn "could not interrupt mitmdump"; + } $serversAreShutdown = "TRUE"; } } +if ($useMitmproxy) +{ + if (! -e $mitmFifoPath) + { + mkfifo($mitmFifoPath, 0777) or die "could not create fifo"; + } + + if (! -p $mitmFifoPath) + { + die "a file already exists at $mitmFifoPath, delete it before trying again"; + } + + my $childPid = fork(); + + die("Failed to fork\n") + unless (defined $childPid); + + die("No child process\n") + if ($childPid < 0); + + $mitmPid = $childPid; + + if ($mitmPid eq 0) { + setpgrp(0,0); # we're about to spawn both a shell and a mitmdump, kill them as a group + exec("mitmdump --rawtcp -p 57640 --mode reverse:localhost:57638 -s mitmscripts/fluent.py --set fifo=$mitmFifoPath --set flow_detail=0 --set termlog_verbosity=warn >proxy.output 2>&1"); + die 'could not start mitmdump'; + } +} + +$SIG{CHLD} = sub { + while ((my $waitpid = waitpid(-1, WNOHANG)) > 0) {} +}; # If, for some reason, mitmproxy dies before we do + # Set signals to shutdown servers $SIG{INT} = \&ShutdownServers; $SIG{QUIT} = \&ShutdownServers; @@ -538,6 +605,7 @@ if ($valgrind) replace_postgres(); } + # Signal that servers should be shutdown $serversAreShutdown = "FALSE"; @@ -672,7 +740,6 @@ my @arguments = ( "--host", $host, '--port', $masterPort, '--user', $user, -# '--bindir', 'C:\Users\Administrator\Downloads\pg-64\bin', '--bindir', catfile("tmp_check", "tmp-bin") ); diff --git a/src/test/regress/sql/failure_setup.sql b/src/test/regress/sql/failure_setup.sql new file mode 100644 index 000000000..695c47c3c --- /dev/null +++ b/src/test/regress/sql/failure_setup.sql @@ -0,0 +1,5 @@ +SELECT citus.mitmproxy('conn.allow()'); + +-- add the workers +SELECT master_add_node('localhost', :worker_1_port); -- the second worker +SELECT master_add_node('localhost', :worker_2_port + 2); -- the first worker, behind a mitmproxy diff --git a/src/test/regress/sql/failure_test_helpers.sql b/src/test/regress/sql/failure_test_helpers.sql new file mode 100644 index 000000000..13a9ea24a --- /dev/null +++ b/src/test/regress/sql/failure_test_helpers.sql @@ -0,0 +1,49 @@ +-- By default Citus makes lots of connections in the background which fill up the log +-- By tweaking these settings you can make sure you only capture packets related to what +-- you're doing +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO -1; +ALTER SYSTEM set citus.enable_statistics_collection TO false; +SELECT pg_reload_conf(); + +-- Add some helper functions for sending commands to mitmproxy + +CREATE FUNCTION citus.mitmproxy(text) RETURNS TABLE(result text) AS $$ +DECLARE + command ALIAS FOR $1; +BEGIN + CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP; + CREATE TEMPORARY TABLE mitmproxy_result (res text) ON COMMIT DROP; + + INSERT INTO mitmproxy_command VALUES (command); + + EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo')); + EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo')); + + RETURN QUERY SELECT * FROM mitmproxy_result; +END; +$$ LANGUAGE plpgsql; + +CREATE FUNCTION citus.clear_network_traffic() RETURNS void AS $$ +BEGIN + PERFORM citus.mitmproxy('recorder.reset()'); + RETURN; -- return void +END; +$$ LANGUAGE plpgsql; + +CREATE FUNCTION citus.dump_network_traffic() +RETURNS TABLE(conn int, source text, message text) AS $$ +BEGIN + CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP; + CREATE TEMPORARY TABLE mitmproxy_result ( + conn int, source text, message text + ) ON COMMIT DROP; + + INSERT INTO mitmproxy_command VALUES ('recorder.dump()'); + + EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo')); + EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo')); + + RETURN QUERY SELECT * FROM mitmproxy_result; +END; +$$ LANGUAGE plpgsql;