]> git.neil.brown.name Git - edlib.git/blob - python/lib-shellcmd.py
Start git-mode: selection menu item to view git commit.
[edlib.git] / python / lib-shellcmd.py
1 # -*- coding: utf-8 -*-
2 # Copyright Neil Brown ©2016-2023 <neil@brown.name>
3 # May be distributed under terms of GPLv2 - see file:COPYING
4 #
5
6 import edlib
7
8 import subprocess, os, fcntl, signal
9
10 class ShellPane(edlib.Pane):
11     def __init__(self, focus, reusable, add_footer=True):
12         edlib.Pane.__init__(self, focus)
13         self.callback = None
14         self.callback_arg = None
15         self.cb_pane = None
16         self.cb_lines = 0
17         self.line = b''
18         self.lines = 0
19         self.pipe = None
20         self.call("doc:request:Abort")
21         self.add_footer = add_footer
22         if reusable:
23             self.call("editor:request:shell-reuse")
24
25     def check_reuse(self, key, focus, comm2, **a):
26         "handle:shell-reuse"
27         if self.pipe:
28             return 0
29         if comm2:
30             comm2("cb", focus, self["doc-name"])
31         self.call("doc:destroy")
32         return 1
33
34     def run(self, key, focus, num, num2, str, str2, **a):
35         "handle:shell-run"
36         cmd = str
37         cwd = str2
38         header = num != 0
39         send_stdin = num2 != 0
40         if not cwd:
41             cwd=self['dirname']
42         if not cwd:
43             cwd = '/'
44         while cwd and cwd != '/' and cwd[-1] == '/':
45             # don't want a trailing slash
46             cwd = cwd[:-1]
47
48         if send_stdin:
49             input = focus.call("doc:get-str", ret='str')
50             focus.call("doc:clear")
51             stdin = subprocess.PIPE
52         else:
53             stdin = subprocess.DEVNULL
54         if not os.path.isdir(cwd):
55             self.call("doc:replace",
56                        "Directory \"%s\" doesn't exist: cannot run shell command\n"
57                        % cwd)
58             return edlib.Efail
59         if header:
60             self.call("doc:replace", "Cmd: %s\nCwd: %s\n\n" % (cmd,cwd))
61         env = os.environ.copy()
62         env['PWD'] = cwd
63         askp = self.call("xdg-find-edlib-file",
64                          "el-askpass", "bin",  ret='str')
65         if askp:
66             env['SSH_ASKPASS'] = askp
67             env['SSH_ASKPASS_REQUIRE'] = 'force'
68
69         try:
70             self.pipe = subprocess.Popen(cmd, shell=True, close_fds=True,
71                                          cwd=cwd, env=env,
72                                          start_new_session=True,
73                                          stdout=subprocess.PIPE,
74                                          stderr=subprocess.STDOUT,
75                                          stdin =stdin)
76         except:
77             self.pipe = None
78         if not self.pipe:
79             return edlib.Efail
80         if send_stdin:
81             fd = self.pipe.stdin.fileno()
82             fl = fcntl.fcntl(fd, fcntl.F_GETFL);
83             fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
84             self.call("event:write", fd, self.write)
85             self.input = input
86         self.call("doc:set:doc-status", "Running")
87         self.call("doc:notify:doc:status-changed")
88         fd = self.pipe.stdout.fileno()
89         if not edlib.testing:
90             fl = fcntl.fcntl(fd, fcntl.F_GETFL)
91             fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
92             self.call("event:read", fd, self.read)
93         else:
94             # when testing, use blocking IO for predictable results,
95             # and schedule them 'soon' so they cannot happen 'immediately'
96             # and happen with gaps
97             self.call("event:timer", 50, self.read)
98         return True
99
100     def read(self, key, **a):
101         if not self.pipe:
102             return edlib.Efalse
103         try:
104             if not edlib.testing:
105                 r = os.read(self.pipe.stdout.fileno(), 4096)
106             else:
107                 r = b''
108                 b = os.read(self.pipe.stdout.fileno(), 4096)
109                 while b:
110                     r += b
111                     b = os.read(self.pipe.stdout.fileno(), 4096)
112         except IOError:
113             return 1
114         if r is None or len(r) == 0:
115             (out,err) = self.pipe.communicate()
116             ret = self.pipe.poll()
117             self.pipe = None
118             l = self.line + out
119             self.line = b''
120             self.call("doc:replace", l.decode("utf-8", 'ignore'))
121             if self.cb_pane:
122                 p = self.cb_pane
123                 self.cb_pane = None
124                 self.callback("cb:eof", p, ret, self, self.callback_arg)
125             if not self.add_footer:
126                 pass
127             elif not ret:
128                 self.call("doc:replace", "\nProcess Finished\n")
129             elif ret > 0:
130                 self.call("doc:replace", "\nProcess Finished (%d)\n" % ret)
131             else:
132                 self.call("doc:replace", "\nProcess Finished (signaled %d)\n" % -ret)
133             self.call("doc:set:doc-status", "Complete")
134             self.call("doc:notify:doc:status-changed")
135             return edlib.Efalse
136         l = self.line + r
137         i = l.rfind(b'\n')
138         if i >= 0:
139             if self.cb_pane and self.cb_lines > 0:
140                 # need to send callback after this many lines
141                 self.lines += len(l.splitlines())
142                 if self.lines >= self.cb_lines:
143                     p = self.cb_pane
144                     self.cb_pane = None
145                     self.cb_lines = 0
146                     if self.callback("cb:lines", p, self, self.callback_arg) > 1:
147                         # it still wants EOF
148                         self.cb_pane = p
149             self.call("doc:replace", l[:i+1].decode("utf-8", 'ignore'))
150             l = l[i+1:]
151         self.line = l
152         return 1
153
154     def write(self, key, **a):
155         if not self.pipe or not self.pipe.stdin:
156             return edlib.Efalse
157         if not self.input:
158             self.pipe.stdin.close()
159             self.pipe.stdin = None
160             return edlib.Efalse
161         b = self.input[:1024]
162         self.input = self.input[1024:]
163         os.write(self.pipe.stdin.fileno(), b.encode())
164         return 1
165
166     def handle_close(self, key, **a):
167         "handle:Close"
168         if self.pipe is not None:
169             p = self.pipe
170             self.pipe = None
171             os.killpg(p.pid, signal.SIGTERM)
172             p.terminate()
173             try:
174                 p.communicate()
175             except IOError:
176                 pass
177         return 1
178
179     def handle_abort(self, key, **a):
180         "handle:Abort"
181
182         if self.pipe is not None:
183             os.killpg(self.pipe.pid, signal.SIGTERM)
184             self.call("doc:replace", "\nProcess signalled\n")
185         return 1
186
187     def handle_callback(self, key, focus, num, num2, str1, comm2, **a):
188         "handle:shellcmd:set-callback"
189         # comm2 is recorded as a callback to call on focus after
190         # num msecs or num2 lines of output.  Callback is called
191         # when command finishes if not before
192         # str1 saved and included in the callback
193         if not focus or not comm2:
194             return edlib.Einval
195         self.callback = comm2
196         self.callback_arg = str1
197         self.cb_pane = focus
198         self.add_notify(focus, "Notify:Close")
199         self.cb_lines = num2
200         if num > 0:
201             self.call("event:timer", num, self.time_cb)
202         return 1
203
204     def time_cb(self, key, **a):
205         if self.cb_pane:
206             p = self.cb_pane
207             self.cb_pane = None
208             if self.callback("cb:timer", p, self, self.callback_arg) > 1:
209                 # still want moer
210                 self.cb_pane = p
211         return edlib.Efalse
212
213     def handle_nofify_close(self, key, focus, **a):
214         "handle:Notify:Close"
215         if focus == self.cb_pane:
216             self.cb_pane = None
217
218 class ShellViewer(edlib.Pane):
219     # This is a simple overlay to follow EOF
220     # when point is at EOF.
221     def __init__(self, focus):
222         edlib.Pane.__init__(self, focus)
223         self.call("doc:request:doc:replaced")
224
225     def handle_replace(self, key, mark, mark2, **a):
226         "handle:doc:replaced"
227         if not mark or not mark2:
228             return 1
229         p = self.call("doc:point", ret='mark')
230         if p and p == mark:
231             # point is where we inserted text, so move it to
232             # after the insertion point
233             p.to_mark(mark2)
234         return 1
235
236     def handle_clone(self, key, focus, home, **a):
237         "handle:Clone"
238         p = ShellViewer(focus)
239         home.clone_children(p)
240         return 1
241
242 def shell_attach(key, focus, comm2, num, str, str2, **a):
243     # num: 1 - place Cmd/Cwd at top of doc
244     #      2 - reuse doc, don't clear it first
245     #      4 - register to be cleaned up by shell-reuse
246     #      8 - don't add a footer when command completes.
247     #     16 - use content of doc as stdin
248     # Clear document - discarding undo history.
249     if (num & 2) == 0:
250         focus.call("doc:clear")
251     p = ShellPane(focus, num & 4, (num & 8) == 0)
252     if not p:
253         return edlib.Efail
254     focus['view-default'] = 'shell-viewer'
255     if str2:
256         focus['dirname'] = str2
257     try:
258         p.call("shell-run", num&1, num & 16, str, str2)
259     except edlib.commandfailed:
260         p.close()
261         return edlib.Efail
262     if comm2:
263         comm2("callback", p)
264     return 1
265
266 def shell_view_attach(key, focus, comm2, **a):
267     p = focus.call("attach-viewer", ret='pane')
268     p = ShellViewer(p)
269
270     if not p:
271         return edlib.Efail
272     if comm2:
273         comm2("callback", p)
274     return 1
275
276 edlib.editor.call("global-set-command", "attach-shellcmd", shell_attach)
277 edlib.editor.call("global-set-command", "attach-shell-viewer", shell_view_attach)