pytact.prover
a test of connection to coq-tactician-api
1""" 2a test of connection to coq-tactician-api 3""" 4 5import sys 6import asyncio 7import socket 8import argparse 9import functools 10import logging 11import itertools 12 13from typing import List, Optional, Iterable 14 15import pytact.common 16 17from pytact.common import Proving, capnpLocalTactic, runTactic 18from pytact.graph_visualize import Visualizer 19 20# Set up logging 21 22logger = logging.getLogger(__name__) 23 24 25# Open a new proof given a nameless theorem statement (str) 26async def open_proof(pull, theorem: str, vis: Visualizer): 27 logger.info("awaiting pull") 28 resp = await pull.reinforce(theorem).a_wait() 29 if not vis is None: 30 vis.render(resp.result) 31 32 available_cap = resp.available 33 34 logger.info("awaiting awailable tactics") 35 available = await available_cap.tactics().a_wait() 36 37 for tac in available.tactics: 38 logger.info("awaiting printTactic") 39 tac_str = await available_cap.printTactic(tac.ident).a_wait() 40 logger.info( 41 "available tactic: ident %s, parameters %d, tac_str %s", 42 tac.ident, tac.parameters, tac_str.tactic) 43 44 return resp.result, available.tactics 45 46class ProtocolError(Exception): 47 """ 48 need more work on handling the errors 49 """ 50 51async def search_dfs(result, tactics, limit) -> Optional[List[capnpLocalTactic]]: 52 """ 53 launches recursive dfs proof search from the current proof search state of result 54 if proof is found, returns the list of actions to execute to reach the proof" 55 in Ocaml notations the return of search_dfs is None | Some of List 56 57 58 TODO: currently is is simplistic dfs that assumes that the proof search graph is a Tree 59 (i.e. that actions do not bring back the visited states) 60 61 but as the proof search graph is not a tree, at the minimum we 62 need to maintain the hashmap of the visited proof states and 63 don't return and expand the visited proof states 64 65 TODO: to perform this correctly we have to construct the correct 66 invariant of the proof state, a proper faithful hash 67 representation that completely identifies isomorphic 68 proof states and distinguished non-isomorphic proof states 69 """ 70 71 if limit == 0: 72 return None # dfs limit exceeded 73 74 logger.info('received result of type %s', result.which()) 75 76 if result.which() == 'protocolError': 77 logger.error('received %s', result.protocolError.which()) 78 raise ProtocolError # our code is wrong 79 elif result.which() == 'failure': 80 return None # dead branch, coq can't execute 81 elif result.which() == 'complete': 82 print('COMPLETE') 83 return [] # proof found :)! hurrah 84 elif result.which() == 'newState': 85 logger.info('considering %s', result.which()) 86 root: int = result.newState.state.root 87 context: List[int] = [n.nodeIndex for n in result.newState.state.context] 88 logger.info('root is %d, context is %s', root, repr(context)) 89 90 actions = [] 91 for tac in tactics: 92 for arg in itertools.product(context, repeat=tac.parameters): 93 action = capnpLocalTactic(tac.ident, arg) 94 actions.append(action) 95 logger.info('expanding search node on %d actions available', len(actions)) 96 for action in actions: 97 next_result = await runTactic(result.newState.obj, action) 98 next_solution = await search_dfs(next_result, tactics, limit-1) 99 if not next_solution is None: 100 next_solution.append(action) 101 return next_solution 102 return None 103 else: 104 raise Exception # it supposed to be exhaustive 105 106 107async def example_script_prover(args, pull): 108 TAC_INTROS = 4249207563281686782 109 TAC_APPLY = 3192827940261208899 110 result, _ = await open_proof(pull, "forall A B C : Prop, (A -> B -> C) -> A -> B -> C", args.vis) 111 result = await runTactic(result.newState.obj, capnpLocalTactic(TAC_INTROS, []), args.vis) 112 result = await runTactic(result.newState.obj, "intro", args.vis) 113 result = await runTactic(result.newState.obj, capnpLocalTactic(TAC_INTROS, []), args.vis) 114 result = await runTactic(result.newState.obj, capnpLocalTactic(TAC_INTROS, []), args.vis) 115 result = await runTactic(result.newState.obj, capnpLocalTactic(TAC_INTROS, []), args.vis) 116 result = await runTactic(result.newState.obj, capnpLocalTactic(TAC_INTROS, []), args.vis) 117 result = await runTactic(result.newState.obj, capnpLocalTactic(TAC_APPLY, [6]), args.vis) 118 result = await runTactic(result.newState.obj, "apply H0", args.vis) 119 result = await runTactic(result.newState.obj, capnpLocalTactic(TAC_APPLY, [8]), args.vis) 120 assert result.which() == 'complete' # if this is correct proof 121 122 123async def dfs_prover(theorems: Iterable[str], args, pull): 124 for theorem in theorems: 125 print(f"DFS starts theorem {theorem}", end='') 126 result, tactics = await open_proof(pull, theorem.strip(), vis=None) 127 solution = await search_dfs(result, tactics, args.dfs_limit) 128 if not solution is None: 129 logging.info("Proof found:") 130 for action in reversed(solution): 131 print(action) 132 result = await runTactic(result.newState.obj, action) 133 if args.vis is not None: 134 args.vis.render(result) 135 else: 136 logging.info("Proof not found") 137 138 139async def client_connected_cb(counter, args, reader, writer): 140 logger.info("client connected to python server socket, remaining connections %d", counter.cnt) 141 142 proving = Proving(reader, writer) 143 144 if args.dfs: 145 if not args.inputfile is None: 146 theorems = open(args.inputfile, 'r') 147 else: 148 theorems = sys.stdin 149 150 with theorems: 151 await proving.run_prover(functools.partial(dfs_prover, theorems, args)) 152 else: 153 await proving.run_prover(functools.partial(example_script_prover, args)) 154 155 counter.dec() 156 logger.info("client connected cb finished %d", counter.cnt) 157 158 159def my_parse_args(): 160 parser = argparse.ArgumentParser( 161 description='example of python code interacting with coq-tactician-api') 162 163 parser.add_argument('--interactive', 164 action='store_true', 165 help='drop to the python shell after proof execution') 166 167 parser.add_argument('--loglevel', 168 default='INFO', 169 help='set logging.loglevel to be DEBUG, INFO, WARNING, ERROR, CRITICAL') 170 171 172 parser.add_argument('--show-labels', 173 action='store_true', 174 help=('show labels on visualized graph') 175 ) 176 177 parser.add_argument('--pdfsequence', 178 action='store_true', 179 help=('write visualization output to a sequence of pdfs' 180 'graph0.pdf, graph1.pdf, ...')) 181 182 parser.add_argument('--pdfname', 183 type=str, 184 default=None, 185 help=('name of pdf file(s) for graph visualization')) 186 187 parser.add_argument('--dot', 188 action='store_true', 189 help='keep .dot files produced for graph visualization') 190 191 192 parser.add_argument('--tcp', action='store_true', 193 help='drive coq-tactician-api with tcp/ip instead of stdin') 194 195 parser.add_argument('--tcp-sessions', 196 type = int, 197 default= -1, 198 help='number of tcp-sessions to serve, with -1 standing for infinity') 199 200 parser.add_argument('--ip', type=str, 201 default="127.0.0.1", 202 help='run python server on this ip') 203 204 parser.add_argument('--port', type=int, 205 default=33333, 206 help='run python server on this port') 207 208 parser.add_argument('--with-coq', action='store_true', 209 help='applies only to tcp mode: launch coq as a subprocess') 210 211 212 parser.add_argument('--coqfile', type=str, default=None, 213 help=('path to coq source code file (.v extension) to execute' 214 'in coq-tactician-api' 215 f'the default is {pytact.common.test_filename_stdin}' 216 f'for --tcp --with-coq default is {pytact.common.test_filename_tcp}')) 217 218 parser.add_argument('--inputfile', type=str, default=None, 219 help=('the filename of the inputfile file with a list of' 220 'independent context free propositions to prove' 221 'one proposition per line, for exampel "forall A:Prop, A->A" (without double quotes)' 222 'no keyword theorem and point in the end is required')) 223 parser.add_argument('--testfile', type=str, default=None, 224 help=('the name of the test input file to run' 225 'that is included in this package')) 226 227 parser.add_argument('--dfs', action='store_true', 228 help='run dfs search instead of a script') 229 230 parser.add_argument('--dfs-limit', type=int, 231 default=50, 232 help=('number of dfs proof search tree total node visits')) 233 234 235 args = parser.parse_args() 236 if not args.testfile is None: 237 args.inputfile = pytact.common.testfile(args.testfile) 238 239 if args.coqfile is None: 240 if (args.tcp and args.with_coq): 241 args.coqfile = pytact.common.test_filename_tcp 242 else: 243 args.coqfile = pytact.common.test_filename_stdin 244 245 numeric_level = getattr(logging, args.loglevel.upper(), None) 246 if not isinstance(numeric_level, int): 247 raise ValueError('Invalid log level: %s' % args.loglevel) 248 249 args.loglevel = numeric_level 250 251 if not args.pdfname is None: 252 args.vis = Visualizer(args.pdfname, args.pdfsequence, 253 args.show_labels, cleanup=not args.dot) 254 else: 255 args.vis = None 256 257 258 print(f"Python: running with tcp_enabled = {args.tcp} on" 259 f"coqfile {args.coqfile} on inputfile {args.inputfile}" 260 f"with visualization output to {args.pdfname}") 261 262 return args 263 264 265# this is a helper class to support finite number of 266# evaluated connections in case of running 267# tcp server on python side 268# to signal finishing serving event 269# normally serving will be finished 270# with events like reaching the certain 271# number of epochs, etc 272 273class Counter: 274 def __init__(self, n: int): 275 self.cnt = n 276 self.event = asyncio.Event() 277 278 def dec(self): 279 self.cnt -= 1 280 if (self.cnt == 0): 281 self.event.set() 282 283 async def waiter(self): 284 await self.event.wait() 285 286 287async def a_main(args): 288 counter = Counter(args.tcp_sessions) 289 290 call_back = functools.partial(client_connected_cb, counter, args) 291 if args.tcp: 292 py_ip = args.ip 293 py_port = args.port 294 logger.info("starting a tcp/ip server on %s %d", py_ip, py_port) 295 server = await asyncio.start_server(call_back, host=py_ip, port=py_port) 296 if args.with_coq: 297 print("Python: launching coqc in subprocess!!", file=sys.stderr) 298 proc = await asyncio.create_subprocess_exec( 299 'tactician', 'exec', 'coqc', args.coqfile, 300 stdin=None, 301 stdout=asyncio.subprocess.PIPE, 302 stderr=asyncio.subprocess.PIPE) 303 #coqout, coqerr = await proc.communicate() 304 #print("proc finished!!", file=sys.stderr, flush=True) 305 306 307 #with open('coq_out.txt','wb') as f: 308 # f.write(coqout) 309 #with open('coq_err.txt','wb') as f: 310 # f.write(coqerr) 311 312 async with server: 313 server_task = asyncio.create_task(server.serve_forever()) 314 waiter_task = asyncio.create_task(counter.waiter()) 315 await asyncio.wait([server_task, waiter_task], return_when=asyncio.FIRST_COMPLETED) 316 logger.info("server task %s", repr(server_task)) 317 logger.info("waiter task %s", repr(waiter_task)) 318 319 if server_task.done() and server.taks.exception(): 320 logger.error("server task exception %s", repr(server_task.exception())) 321 raise server.taks.exception() 322 323 else: 324 print("Python: creating Unix socketpair and giving the other end as stdin to coqc", 325 file=sys.stderr) 326 327 py_sock, coq_sock = socket.socketpair() 328 print(f"running tactician exec coqc {args.coqfile}") 329 proc = await asyncio.create_subprocess_exec( 330 'tactician', 'exec', 'coqc', args.coqfile, 331 stdin=coq_sock, 332 stdout=asyncio.subprocess.PIPE, 333 stderr=asyncio.subprocess.PIPE) 334 coq_sock.close() 335 336 reader, writer = await asyncio.open_connection(sock=py_sock) 337 await call_back(reader, writer) 338 print("Python: call_back is returned!") 339 340 341 342def main(): 343 args = my_parse_args() 344 345 logging.basicConfig(level=args.loglevel) 346 asyncio.run(a_main(args), debug=args.loglevel) 347 348if __name__ == '__main__': 349 main()
27async def open_proof(pull, theorem: str, vis: Visualizer): 28 logger.info("awaiting pull") 29 resp = await pull.reinforce(theorem).a_wait() 30 if not vis is None: 31 vis.render(resp.result) 32 33 available_cap = resp.available 34 35 logger.info("awaiting awailable tactics") 36 available = await available_cap.tactics().a_wait() 37 38 for tac in available.tactics: 39 logger.info("awaiting printTactic") 40 tac_str = await available_cap.printTactic(tac.ident).a_wait() 41 logger.info( 42 "available tactic: ident %s, parameters %d, tac_str %s", 43 tac.ident, tac.parameters, tac_str.tactic) 44 45 return resp.result, available.tactics
need more work on handling the errors
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- add_note
- args
52async def search_dfs(result, tactics, limit) -> Optional[List[capnpLocalTactic]]: 53 """ 54 launches recursive dfs proof search from the current proof search state of result 55 if proof is found, returns the list of actions to execute to reach the proof" 56 in Ocaml notations the return of search_dfs is None | Some of List 57 58 59 TODO: currently is is simplistic dfs that assumes that the proof search graph is a Tree 60 (i.e. that actions do not bring back the visited states) 61 62 but as the proof search graph is not a tree, at the minimum we 63 need to maintain the hashmap of the visited proof states and 64 don't return and expand the visited proof states 65 66 TODO: to perform this correctly we have to construct the correct 67 invariant of the proof state, a proper faithful hash 68 representation that completely identifies isomorphic 69 proof states and distinguished non-isomorphic proof states 70 """ 71 72 if limit == 0: 73 return None # dfs limit exceeded 74 75 logger.info('received result of type %s', result.which()) 76 77 if result.which() == 'protocolError': 78 logger.error('received %s', result.protocolError.which()) 79 raise ProtocolError # our code is wrong 80 elif result.which() == 'failure': 81 return None # dead branch, coq can't execute 82 elif result.which() == 'complete': 83 print('COMPLETE') 84 return [] # proof found :)! hurrah 85 elif result.which() == 'newState': 86 logger.info('considering %s', result.which()) 87 root: int = result.newState.state.root 88 context: List[int] = [n.nodeIndex for n in result.newState.state.context] 89 logger.info('root is %d, context is %s', root, repr(context)) 90 91 actions = [] 92 for tac in tactics: 93 for arg in itertools.product(context, repeat=tac.parameters): 94 action = capnpLocalTactic(tac.ident, arg) 95 actions.append(action) 96 logger.info('expanding search node on %d actions available', len(actions)) 97 for action in actions: 98 next_result = await runTactic(result.newState.obj, action) 99 next_solution = await search_dfs(next_result, tactics, limit-1) 100 if not next_solution is None: 101 next_solution.append(action) 102 return next_solution 103 return None 104 else: 105 raise Exception # it supposed to be exhaustive
launches recursive dfs proof search from the current proof search state of result if proof is found, returns the list of actions to execute to reach the proof" in Ocaml notations the return of search_dfs is None | Some of List
TODO: currently is is simplistic dfs that assumes that the proof search graph is a Tree (i.e. that actions do not bring back the visited states)
but as the proof search graph is not a tree, at the minimum we need to maintain the hashmap of the visited proof states and don't return and expand the visited proof states
TODO: to perform this correctly we have to construct the correct invariant of the proof state, a proper faithful hash representation that completely identifies isomorphic proof states and distinguished non-isomorphic proof states
108async def example_script_prover(args, pull): 109 TAC_INTROS = 4249207563281686782 110 TAC_APPLY = 3192827940261208899 111 result, _ = await open_proof(pull, "forall A B C : Prop, (A -> B -> C) -> A -> B -> C", args.vis) 112 result = await runTactic(result.newState.obj, capnpLocalTactic(TAC_INTROS, []), args.vis) 113 result = await runTactic(result.newState.obj, "intro", args.vis) 114 result = await runTactic(result.newState.obj, capnpLocalTactic(TAC_INTROS, []), args.vis) 115 result = await runTactic(result.newState.obj, capnpLocalTactic(TAC_INTROS, []), args.vis) 116 result = await runTactic(result.newState.obj, capnpLocalTactic(TAC_INTROS, []), args.vis) 117 result = await runTactic(result.newState.obj, capnpLocalTactic(TAC_INTROS, []), args.vis) 118 result = await runTactic(result.newState.obj, capnpLocalTactic(TAC_APPLY, [6]), args.vis) 119 result = await runTactic(result.newState.obj, "apply H0", args.vis) 120 result = await runTactic(result.newState.obj, capnpLocalTactic(TAC_APPLY, [8]), args.vis) 121 assert result.which() == 'complete' # if this is correct proof
124async def dfs_prover(theorems: Iterable[str], args, pull): 125 for theorem in theorems: 126 print(f"DFS starts theorem {theorem}", end='') 127 result, tactics = await open_proof(pull, theorem.strip(), vis=None) 128 solution = await search_dfs(result, tactics, args.dfs_limit) 129 if not solution is None: 130 logging.info("Proof found:") 131 for action in reversed(solution): 132 print(action) 133 result = await runTactic(result.newState.obj, action) 134 if args.vis is not None: 135 args.vis.render(result) 136 else: 137 logging.info("Proof not found")
140async def client_connected_cb(counter, args, reader, writer): 141 logger.info("client connected to python server socket, remaining connections %d", counter.cnt) 142 143 proving = Proving(reader, writer) 144 145 if args.dfs: 146 if not args.inputfile is None: 147 theorems = open(args.inputfile, 'r') 148 else: 149 theorems = sys.stdin 150 151 with theorems: 152 await proving.run_prover(functools.partial(dfs_prover, theorems, args)) 153 else: 154 await proving.run_prover(functools.partial(example_script_prover, args)) 155 156 counter.dec() 157 logger.info("client connected cb finished %d", counter.cnt)
160def my_parse_args(): 161 parser = argparse.ArgumentParser( 162 description='example of python code interacting with coq-tactician-api') 163 164 parser.add_argument('--interactive', 165 action='store_true', 166 help='drop to the python shell after proof execution') 167 168 parser.add_argument('--loglevel', 169 default='INFO', 170 help='set logging.loglevel to be DEBUG, INFO, WARNING, ERROR, CRITICAL') 171 172 173 parser.add_argument('--show-labels', 174 action='store_true', 175 help=('show labels on visualized graph') 176 ) 177 178 parser.add_argument('--pdfsequence', 179 action='store_true', 180 help=('write visualization output to a sequence of pdfs' 181 'graph0.pdf, graph1.pdf, ...')) 182 183 parser.add_argument('--pdfname', 184 type=str, 185 default=None, 186 help=('name of pdf file(s) for graph visualization')) 187 188 parser.add_argument('--dot', 189 action='store_true', 190 help='keep .dot files produced for graph visualization') 191 192 193 parser.add_argument('--tcp', action='store_true', 194 help='drive coq-tactician-api with tcp/ip instead of stdin') 195 196 parser.add_argument('--tcp-sessions', 197 type = int, 198 default= -1, 199 help='number of tcp-sessions to serve, with -1 standing for infinity') 200 201 parser.add_argument('--ip', type=str, 202 default="127.0.0.1", 203 help='run python server on this ip') 204 205 parser.add_argument('--port', type=int, 206 default=33333, 207 help='run python server on this port') 208 209 parser.add_argument('--with-coq', action='store_true', 210 help='applies only to tcp mode: launch coq as a subprocess') 211 212 213 parser.add_argument('--coqfile', type=str, default=None, 214 help=('path to coq source code file (.v extension) to execute' 215 'in coq-tactician-api' 216 f'the default is {pytact.common.test_filename_stdin}' 217 f'for --tcp --with-coq default is {pytact.common.test_filename_tcp}')) 218 219 parser.add_argument('--inputfile', type=str, default=None, 220 help=('the filename of the inputfile file with a list of' 221 'independent context free propositions to prove' 222 'one proposition per line, for exampel "forall A:Prop, A->A" (without double quotes)' 223 'no keyword theorem and point in the end is required')) 224 parser.add_argument('--testfile', type=str, default=None, 225 help=('the name of the test input file to run' 226 'that is included in this package')) 227 228 parser.add_argument('--dfs', action='store_true', 229 help='run dfs search instead of a script') 230 231 parser.add_argument('--dfs-limit', type=int, 232 default=50, 233 help=('number of dfs proof search tree total node visits')) 234 235 236 args = parser.parse_args() 237 if not args.testfile is None: 238 args.inputfile = pytact.common.testfile(args.testfile) 239 240 if args.coqfile is None: 241 if (args.tcp and args.with_coq): 242 args.coqfile = pytact.common.test_filename_tcp 243 else: 244 args.coqfile = pytact.common.test_filename_stdin 245 246 numeric_level = getattr(logging, args.loglevel.upper(), None) 247 if not isinstance(numeric_level, int): 248 raise ValueError('Invalid log level: %s' % args.loglevel) 249 250 args.loglevel = numeric_level 251 252 if not args.pdfname is None: 253 args.vis = Visualizer(args.pdfname, args.pdfsequence, 254 args.show_labels, cleanup=not args.dot) 255 else: 256 args.vis = None 257 258 259 print(f"Python: running with tcp_enabled = {args.tcp} on" 260 f"coqfile {args.coqfile} on inputfile {args.inputfile}" 261 f"with visualization output to {args.pdfname}") 262 263 return args
288async def a_main(args): 289 counter = Counter(args.tcp_sessions) 290 291 call_back = functools.partial(client_connected_cb, counter, args) 292 if args.tcp: 293 py_ip = args.ip 294 py_port = args.port 295 logger.info("starting a tcp/ip server on %s %d", py_ip, py_port) 296 server = await asyncio.start_server(call_back, host=py_ip, port=py_port) 297 if args.with_coq: 298 print("Python: launching coqc in subprocess!!", file=sys.stderr) 299 proc = await asyncio.create_subprocess_exec( 300 'tactician', 'exec', 'coqc', args.coqfile, 301 stdin=None, 302 stdout=asyncio.subprocess.PIPE, 303 stderr=asyncio.subprocess.PIPE) 304 #coqout, coqerr = await proc.communicate() 305 #print("proc finished!!", file=sys.stderr, flush=True) 306 307 308 #with open('coq_out.txt','wb') as f: 309 # f.write(coqout) 310 #with open('coq_err.txt','wb') as f: 311 # f.write(coqerr) 312 313 async with server: 314 server_task = asyncio.create_task(server.serve_forever()) 315 waiter_task = asyncio.create_task(counter.waiter()) 316 await asyncio.wait([server_task, waiter_task], return_when=asyncio.FIRST_COMPLETED) 317 logger.info("server task %s", repr(server_task)) 318 logger.info("waiter task %s", repr(waiter_task)) 319 320 if server_task.done() and server.taks.exception(): 321 logger.error("server task exception %s", repr(server_task.exception())) 322 raise server.taks.exception() 323 324 else: 325 print("Python: creating Unix socketpair and giving the other end as stdin to coqc", 326 file=sys.stderr) 327 328 py_sock, coq_sock = socket.socketpair() 329 print(f"running tactician exec coqc {args.coqfile}") 330 proc = await asyncio.create_subprocess_exec( 331 'tactician', 'exec', 'coqc', args.coqfile, 332 stdin=coq_sock, 333 stdout=asyncio.subprocess.PIPE, 334 stderr=asyncio.subprocess.PIPE) 335 coq_sock.close() 336 337 reader, writer = await asyncio.open_connection(sock=py_sock) 338 await call_back(reader, writer) 339 print("Python: call_back is returned!")