Edit on GitHub

pytact.fake_python_server

  1import contextlib
  2import sys
  3import socket
  4import socketserver
  5import argparse
  6import pytact.graph_visualize as gv
  7from pytact.data_reader import (capnp_message_generator, ProofState,
  8                                TacticPredictionGraph, TacticPredictionsGraph,
  9                                TacticPredictionText, TacticPredictionsText,
 10                                GlobalContextMessage, CheckAlignmentMessage, CheckAlignmentResponse)
 11
 12def text_prediction_loop(context : GlobalContextMessage):
 13    tactics = [ 'idtac "is it working?"', 'idtac "yes it is working!"', 'auto' ]
 14    prediction_requests = context.prediction_requests
 15    for msg in prediction_requests:
 16        if isinstance(msg, ProofState):
 17            proof_state = msg
 18            print(proof_state.text)
 19            preds = [TacticPredictionText(t, 0.5) for t in tactics]
 20            prediction_requests.send(TacticPredictionsText(preds))
 21        elif isinstance(msg, CheckAlignmentMessage):
 22            alignment = CheckAlignmentResponse([], [])
 23            prediction_requests.send(alignment)
 24        elif isinstance(msg, GlobalContextMessage):
 25            text_prediction_loop(msg)
 26        else:
 27            raise Exception("Capnp protocol error")
 28
 29def graph_initialize_loop(context : GlobalContextMessage, level):
 30    print(f"level {level}")
 31    for cluster in context.definitions.clustered_definitions(full = False):
 32        print('cluster:')
 33        for d in cluster:
 34            print(f'    {d.name}')
 35    for t in context.tactics:
 36        print(t)
 37    print(context.log_annotation)
 38    prediction_requests = context.prediction_requests
 39    cool_definitions = [ d.node for d in context.definitions.definitions() if d.name == "Coq.Init.Logic.I" ]
 40    zeroArgs = [t.ident for t in context.tactics if t.parameters == 0]
 41    oneArg = [t.ident for t in context.tactics if t.parameters == 1]
 42    for msg in prediction_requests:
 43        if isinstance(msg, ProofState):
 44            proof_state = msg
 45            gv.visualize_proof_state(proof_state)
 46            preds = [TacticPredictionGraph(t, [], 0.5) for t in zeroArgs]
 47            if len(proof_state.context) > 0:
 48                hyp_node = proof_state.context[0]
 49                preds += [TacticPredictionGraph(t, [hyp_node], 0.5) for t in oneArg]
 50            for d in cool_definitions:
 51                preds += [TacticPredictionGraph(t, [d], 0.5) for t in oneArg]
 52            prediction_requests.send(TacticPredictionsGraph(preds))
 53        elif isinstance(msg, CheckAlignmentMessage):
 54            unknown_definitions = list(context.definitions.definitions())
 55            unknown_tactics = [t.ident for t in context.tactics]
 56            alignment = CheckAlignmentResponse(unknown_definitions, unknown_tactics)
 57            prediction_requests.send(alignment)
 58        elif isinstance(msg, GlobalContextMessage):
 59            graph_initialize_loop(msg, level + 1)
 60        else:
 61            raise Exception("Capnp protocol error")
 62
 63def run_session(args, capnp_socket, record_file):
 64    messages_generator = capnp_message_generator(capnp_socket, record_file)
 65    if args.mode == 'text':
 66        print('Python server running in text mode')
 67        text_prediction_loop(messages_generator)
 68    elif args.mode == 'graph':
 69        print('Python server running in graph mode')
 70        graph_initialize_loop(messages_generator, 0)
 71    else:
 72        raise Exception("The 'mode' argument needs to be either 'text' or 'graph'")
 73
 74def main():
 75    sys.setrecursionlimit(10000)
 76    parser = argparse.ArgumentParser(
 77        description = "Example python server capable of communicating with Coq through Tactician's 'synth' tactic",
 78        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
 79
 80    parser.add_argument('mode',
 81                        type=str,
 82                        choices=['graph', 'text'],
 83                        help='"graph" to communicate in graph-mode, "text" to communicate in text-mode')
 84    parser.add_argument('--tcp',
 85                        type = int,
 86                        default = 0,
 87                        help='Run in tcp mode instead of stdin mode on the specified port.')
 88    parser.add_argument('--record',
 89                        dest="record_file",
 90                        type = str,
 91                        default = None,
 92                        help='Record all exchanged messages to the specified file, so that they can later be ' +
 93                        'replayed through "pytact-fake-coq"')
 94    args = parser.parse_args()
 95
 96    if args.record_file is not None:
 97        record_context = open(args.record_file, 'wb')
 98    else:
 99        record_context = contextlib.nullcontext()
100    with record_context as record_file:
101        if args.tcp != 0:
102            class Handler(socketserver.BaseRequestHandler):
103                def handle(self):
104                    run_session(args, self.request, record_file)
105            class Server(socketserver.ThreadingTCPServer):
106                def __init__(self, *kwargs):
107                    self.allow_reuse_address = True
108                    self.daemon_threads = True
109                    super().__init__(*kwargs)
110            addr = ('localhost', args.tcp)
111            with Server(addr, Handler) as server:
112                server.daemon_threads = True
113                server.serve_forever()
114        else:
115            capnp_socket = socket.socket(fileno=sys.stdin.fileno())
116            run_session(args, capnp_socket, record_file)
117
118if __name__ == '__main__':
119    main()
def text_prediction_loop(context: pytact.data_reader.GlobalContextMessage):
13def text_prediction_loop(context : GlobalContextMessage):
14    tactics = [ 'idtac "is it working?"', 'idtac "yes it is working!"', 'auto' ]
15    prediction_requests = context.prediction_requests
16    for msg in prediction_requests:
17        if isinstance(msg, ProofState):
18            proof_state = msg
19            print(proof_state.text)
20            preds = [TacticPredictionText(t, 0.5) for t in tactics]
21            prediction_requests.send(TacticPredictionsText(preds))
22        elif isinstance(msg, CheckAlignmentMessage):
23            alignment = CheckAlignmentResponse([], [])
24            prediction_requests.send(alignment)
25        elif isinstance(msg, GlobalContextMessage):
26            text_prediction_loop(msg)
27        else:
28            raise Exception("Capnp protocol error")
def graph_initialize_loop(context: pytact.data_reader.GlobalContextMessage, level):
30def graph_initialize_loop(context : GlobalContextMessage, level):
31    print(f"level {level}")
32    for cluster in context.definitions.clustered_definitions(full = False):
33        print('cluster:')
34        for d in cluster:
35            print(f'    {d.name}')
36    for t in context.tactics:
37        print(t)
38    print(context.log_annotation)
39    prediction_requests = context.prediction_requests
40    cool_definitions = [ d.node for d in context.definitions.definitions() if d.name == "Coq.Init.Logic.I" ]
41    zeroArgs = [t.ident for t in context.tactics if t.parameters == 0]
42    oneArg = [t.ident for t in context.tactics if t.parameters == 1]
43    for msg in prediction_requests:
44        if isinstance(msg, ProofState):
45            proof_state = msg
46            gv.visualize_proof_state(proof_state)
47            preds = [TacticPredictionGraph(t, [], 0.5) for t in zeroArgs]
48            if len(proof_state.context) > 0:
49                hyp_node = proof_state.context[0]
50                preds += [TacticPredictionGraph(t, [hyp_node], 0.5) for t in oneArg]
51            for d in cool_definitions:
52                preds += [TacticPredictionGraph(t, [d], 0.5) for t in oneArg]
53            prediction_requests.send(TacticPredictionsGraph(preds))
54        elif isinstance(msg, CheckAlignmentMessage):
55            unknown_definitions = list(context.definitions.definitions())
56            unknown_tactics = [t.ident for t in context.tactics]
57            alignment = CheckAlignmentResponse(unknown_definitions, unknown_tactics)
58            prediction_requests.send(alignment)
59        elif isinstance(msg, GlobalContextMessage):
60            graph_initialize_loop(msg, level + 1)
61        else:
62            raise Exception("Capnp protocol error")
def run_session(args, capnp_socket, record_file):
64def run_session(args, capnp_socket, record_file):
65    messages_generator = capnp_message_generator(capnp_socket, record_file)
66    if args.mode == 'text':
67        print('Python server running in text mode')
68        text_prediction_loop(messages_generator)
69    elif args.mode == 'graph':
70        print('Python server running in graph mode')
71        graph_initialize_loop(messages_generator, 0)
72    else:
73        raise Exception("The 'mode' argument needs to be either 'text' or 'graph'")
def main():
 75def main():
 76    sys.setrecursionlimit(10000)
 77    parser = argparse.ArgumentParser(
 78        description = "Example python server capable of communicating with Coq through Tactician's 'synth' tactic",
 79        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
 80
 81    parser.add_argument('mode',
 82                        type=str,
 83                        choices=['graph', 'text'],
 84                        help='"graph" to communicate in graph-mode, "text" to communicate in text-mode')
 85    parser.add_argument('--tcp',
 86                        type = int,
 87                        default = 0,
 88                        help='Run in tcp mode instead of stdin mode on the specified port.')
 89    parser.add_argument('--record',
 90                        dest="record_file",
 91                        type = str,
 92                        default = None,
 93                        help='Record all exchanged messages to the specified file, so that they can later be ' +
 94                        'replayed through "pytact-fake-coq"')
 95    args = parser.parse_args()
 96
 97    if args.record_file is not None:
 98        record_context = open(args.record_file, 'wb')
 99    else:
100        record_context = contextlib.nullcontext()
101    with record_context as record_file:
102        if args.tcp != 0:
103            class Handler(socketserver.BaseRequestHandler):
104                def handle(self):
105                    run_session(args, self.request, record_file)
106            class Server(socketserver.ThreadingTCPServer):
107                def __init__(self, *kwargs):
108                    self.allow_reuse_address = True
109                    self.daemon_threads = True
110                    super().__init__(*kwargs)
111            addr = ('localhost', args.tcp)
112            with Server(addr, Handler) as server:
113                server.daemon_threads = True
114                server.serve_forever()
115        else:
116            capnp_socket = socket.socket(fileno=sys.stdin.fileno())
117            run_session(args, capnp_socket, record_file)