pytact.fake_python_server
1import contextlib 2import sys 3import socket 4import socketserver 5import argparse 6import pytact.graph_visualize as gv 7from pytact.data_reader import (capnp_message_generator, ProofState, 8 TacticPredictionGraph, TacticPredictionsGraph, 9 TacticPredictionText, TacticPredictionsText, 10 GlobalContextMessage, CheckAlignmentMessage, CheckAlignmentResponse) 11 12def text_prediction_loop(context : GlobalContextMessage): 13 tactics = [ 'idtac "is it working?"', 'idtac "yes it is working!"', 'auto' ] 14 prediction_requests = context.prediction_requests 15 for msg in prediction_requests: 16 if isinstance(msg, ProofState): 17 proof_state = msg 18 print(proof_state.text) 19 preds = [TacticPredictionText(t, 0.5) for t in tactics] 20 prediction_requests.send(TacticPredictionsText(preds)) 21 elif isinstance(msg, CheckAlignmentMessage): 22 alignment = CheckAlignmentResponse([], []) 23 prediction_requests.send(alignment) 24 elif isinstance(msg, GlobalContextMessage): 25 text_prediction_loop(msg) 26 else: 27 raise Exception("Capnp protocol error") 28 29def graph_initialize_loop(context : GlobalContextMessage, level): 30 print(f"level {level}") 31 for cluster in context.definitions.clustered_definitions(full = False): 32 print('cluster:') 33 for d in cluster: 34 print(f' {d.name}') 35 for t in context.tactics: 36 print(t) 37 print(context.log_annotation) 38 prediction_requests = context.prediction_requests 39 cool_definitions = [ d.node for d in context.definitions.definitions() if d.name == "Coq.Init.Logic.I" ] 40 zeroArgs = [t.ident for t in context.tactics if t.parameters == 0] 41 oneArg = [t.ident for t in context.tactics if t.parameters == 1] 42 for msg in prediction_requests: 43 if isinstance(msg, ProofState): 44 proof_state = msg 45 gv.visualize_proof_state(proof_state) 46 preds = [TacticPredictionGraph(t, [], 0.5) for t in zeroArgs] 47 if len(proof_state.context) > 0: 48 hyp_node = proof_state.context[0] 49 preds += [TacticPredictionGraph(t, [hyp_node], 0.5) for t in oneArg] 50 for d in cool_definitions: 51 preds += [TacticPredictionGraph(t, [d], 0.5) for t in oneArg] 52 prediction_requests.send(TacticPredictionsGraph(preds)) 53 elif isinstance(msg, CheckAlignmentMessage): 54 unknown_definitions = list(context.definitions.definitions()) 55 unknown_tactics = [t.ident for t in context.tactics] 56 alignment = CheckAlignmentResponse(unknown_definitions, unknown_tactics) 57 prediction_requests.send(alignment) 58 elif isinstance(msg, GlobalContextMessage): 59 graph_initialize_loop(msg, level + 1) 60 else: 61 raise Exception("Capnp protocol error") 62 63def run_session(args, capnp_socket, record_file): 64 messages_generator = capnp_message_generator(capnp_socket, record_file) 65 if args.mode == 'text': 66 print('Python server running in text mode') 67 text_prediction_loop(messages_generator) 68 elif args.mode == 'graph': 69 print('Python server running in graph mode') 70 graph_initialize_loop(messages_generator, 0) 71 else: 72 raise Exception("The 'mode' argument needs to be either 'text' or 'graph'") 73 74def main(): 75 sys.setrecursionlimit(10000) 76 parser = argparse.ArgumentParser( 77 description = "Example python server capable of communicating with Coq through Tactician's 'synth' tactic", 78 formatter_class=argparse.ArgumentDefaultsHelpFormatter) 79 80 parser.add_argument('mode', 81 type=str, 82 choices=['graph', 'text'], 83 help='"graph" to communicate in graph-mode, "text" to communicate in text-mode') 84 parser.add_argument('--tcp', 85 type = int, 86 default = 0, 87 help='Run in tcp mode instead of stdin mode on the specified port.') 88 parser.add_argument('--record', 89 dest="record_file", 90 type = str, 91 default = None, 92 help='Record all exchanged messages to the specified file, so that they can later be ' + 93 'replayed through "pytact-fake-coq"') 94 args = parser.parse_args() 95 96 if args.record_file is not None: 97 record_context = open(args.record_file, 'wb') 98 else: 99 record_context = contextlib.nullcontext() 100 with record_context as record_file: 101 if args.tcp != 0: 102 class Handler(socketserver.BaseRequestHandler): 103 def handle(self): 104 run_session(args, self.request, record_file) 105 class Server(socketserver.ThreadingTCPServer): 106 def __init__(self, *kwargs): 107 self.allow_reuse_address = True 108 self.daemon_threads = True 109 super().__init__(*kwargs) 110 addr = ('localhost', args.tcp) 111 with Server(addr, Handler) as server: 112 server.daemon_threads = True 113 server.serve_forever() 114 else: 115 capnp_socket = socket.socket(fileno=sys.stdin.fileno()) 116 run_session(args, capnp_socket, record_file) 117 118if __name__ == '__main__': 119 main()
13def text_prediction_loop(context : GlobalContextMessage): 14 tactics = [ 'idtac "is it working?"', 'idtac "yes it is working!"', 'auto' ] 15 prediction_requests = context.prediction_requests 16 for msg in prediction_requests: 17 if isinstance(msg, ProofState): 18 proof_state = msg 19 print(proof_state.text) 20 preds = [TacticPredictionText(t, 0.5) for t in tactics] 21 prediction_requests.send(TacticPredictionsText(preds)) 22 elif isinstance(msg, CheckAlignmentMessage): 23 alignment = CheckAlignmentResponse([], []) 24 prediction_requests.send(alignment) 25 elif isinstance(msg, GlobalContextMessage): 26 text_prediction_loop(msg) 27 else: 28 raise Exception("Capnp protocol error")
30def graph_initialize_loop(context : GlobalContextMessage, level): 31 print(f"level {level}") 32 for cluster in context.definitions.clustered_definitions(full = False): 33 print('cluster:') 34 for d in cluster: 35 print(f' {d.name}') 36 for t in context.tactics: 37 print(t) 38 print(context.log_annotation) 39 prediction_requests = context.prediction_requests 40 cool_definitions = [ d.node for d in context.definitions.definitions() if d.name == "Coq.Init.Logic.I" ] 41 zeroArgs = [t.ident for t in context.tactics if t.parameters == 0] 42 oneArg = [t.ident for t in context.tactics if t.parameters == 1] 43 for msg in prediction_requests: 44 if isinstance(msg, ProofState): 45 proof_state = msg 46 gv.visualize_proof_state(proof_state) 47 preds = [TacticPredictionGraph(t, [], 0.5) for t in zeroArgs] 48 if len(proof_state.context) > 0: 49 hyp_node = proof_state.context[0] 50 preds += [TacticPredictionGraph(t, [hyp_node], 0.5) for t in oneArg] 51 for d in cool_definitions: 52 preds += [TacticPredictionGraph(t, [d], 0.5) for t in oneArg] 53 prediction_requests.send(TacticPredictionsGraph(preds)) 54 elif isinstance(msg, CheckAlignmentMessage): 55 unknown_definitions = list(context.definitions.definitions()) 56 unknown_tactics = [t.ident for t in context.tactics] 57 alignment = CheckAlignmentResponse(unknown_definitions, unknown_tactics) 58 prediction_requests.send(alignment) 59 elif isinstance(msg, GlobalContextMessage): 60 graph_initialize_loop(msg, level + 1) 61 else: 62 raise Exception("Capnp protocol error")
def
run_session(args, capnp_socket, record_file):
64def run_session(args, capnp_socket, record_file): 65 messages_generator = capnp_message_generator(capnp_socket, record_file) 66 if args.mode == 'text': 67 print('Python server running in text mode') 68 text_prediction_loop(messages_generator) 69 elif args.mode == 'graph': 70 print('Python server running in graph mode') 71 graph_initialize_loop(messages_generator, 0) 72 else: 73 raise Exception("The 'mode' argument needs to be either 'text' or 'graph'")
def
main():
75def main(): 76 sys.setrecursionlimit(10000) 77 parser = argparse.ArgumentParser( 78 description = "Example python server capable of communicating with Coq through Tactician's 'synth' tactic", 79 formatter_class=argparse.ArgumentDefaultsHelpFormatter) 80 81 parser.add_argument('mode', 82 type=str, 83 choices=['graph', 'text'], 84 help='"graph" to communicate in graph-mode, "text" to communicate in text-mode') 85 parser.add_argument('--tcp', 86 type = int, 87 default = 0, 88 help='Run in tcp mode instead of stdin mode on the specified port.') 89 parser.add_argument('--record', 90 dest="record_file", 91 type = str, 92 default = None, 93 help='Record all exchanged messages to the specified file, so that they can later be ' + 94 'replayed through "pytact-fake-coq"') 95 args = parser.parse_args() 96 97 if args.record_file is not None: 98 record_context = open(args.record_file, 'wb') 99 else: 100 record_context = contextlib.nullcontext() 101 with record_context as record_file: 102 if args.tcp != 0: 103 class Handler(socketserver.BaseRequestHandler): 104 def handle(self): 105 run_session(args, self.request, record_file) 106 class Server(socketserver.ThreadingTCPServer): 107 def __init__(self, *kwargs): 108 self.allow_reuse_address = True 109 self.daemon_threads = True 110 super().__init__(*kwargs) 111 addr = ('localhost', args.tcp) 112 with Server(addr, Handler) as server: 113 server.daemon_threads = True 114 server.serve_forever() 115 else: 116 capnp_socket = socket.socket(fileno=sys.stdin.fileno()) 117 run_session(args, capnp_socket, record_file)