Line data Source code
1 : /*
2 : * Unix SMB/CIFS implementation.
3 : * Samba internal messaging functions
4 : * Copyright (C) 2013 by Volker Lendecke
5 : *
6 : * This program is free software; you can redistribute it and/or modify
7 : * it under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation; either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * This program is distributed in the hope that it will be useful,
12 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : * GNU General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public License
17 : * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : */
19 :
20 : #include "replace.h"
21 : #include "util/util.h"
22 : #include "system/network.h"
23 : #include "system/filesys.h"
24 : #include "system/dir.h"
25 : #include "system/select.h"
26 : #include "lib/util/debug.h"
27 : #include "messages_dgm.h"
28 : #include "lib/util/genrand.h"
29 : #include "lib/util/dlinklist.h"
30 : #include "lib/pthreadpool/pthreadpool_tevent.h"
31 : #include "lib/util/msghdr.h"
32 : #include "lib/util/iov_buf.h"
33 : #include "lib/util/blocking.h"
34 : #include "lib/util/tevent_unix.h"
35 : #include "lib/util/smb_strtox.h"
36 :
37 : #define MESSAGING_DGM_FRAGMENT_LENGTH 1024
38 :
39 : struct sun_path_buf {
40 : /*
41 : * This will carry enough for a socket path
42 : */
43 : char buf[sizeof(struct sockaddr_un)];
44 : };
45 :
46 : /*
47 : * We can only have one tevent_fd per dgm_context and per
48 : * tevent_context. Maintain a list of registered tevent_contexts per
49 : * dgm_context.
50 : */
51 : struct messaging_dgm_fde_ev {
52 : struct messaging_dgm_fde_ev *prev, *next;
53 :
54 : /*
55 : * Backreference to enable DLIST_REMOVE from our
56 : * destructor. Also, set to NULL when the dgm_context dies
57 : * before the messaging_dgm_fde_ev.
58 : */
59 : struct messaging_dgm_context *ctx;
60 :
61 : struct tevent_context *ev;
62 : struct tevent_fd *fde;
63 : };
64 :
65 : struct messaging_dgm_out {
66 : struct messaging_dgm_out *prev, *next;
67 : struct messaging_dgm_context *ctx;
68 :
69 : pid_t pid;
70 : int sock;
71 : bool is_blocking;
72 : uint64_t cookie;
73 :
74 : struct tevent_queue *queue;
75 : struct tevent_timer *idle_timer;
76 : };
77 :
78 : struct messaging_dgm_in_msg {
79 : struct messaging_dgm_in_msg *prev, *next;
80 : struct messaging_dgm_context *ctx;
81 : size_t msglen;
82 : size_t received;
83 : pid_t sender_pid;
84 : int sender_sock;
85 : uint64_t cookie;
86 : uint8_t buf[];
87 : };
88 :
89 : struct messaging_dgm_context {
90 : struct tevent_context *ev;
91 : pid_t pid;
92 : struct sun_path_buf socket_dir;
93 : struct sun_path_buf lockfile_dir;
94 : int lockfile_fd;
95 :
96 : int sock;
97 : struct messaging_dgm_in_msg *in_msgs;
98 :
99 : struct messaging_dgm_fde_ev *fde_evs;
100 : void (*recv_cb)(struct tevent_context *ev,
101 : const uint8_t *msg,
102 : size_t msg_len,
103 : int *fds,
104 : size_t num_fds,
105 : void *private_data);
106 : void *recv_cb_private_data;
107 :
108 : bool *have_dgm_context;
109 :
110 : struct pthreadpool_tevent *pool;
111 : struct messaging_dgm_out *outsocks;
112 : };
113 :
114 : /* Set socket close on exec. */
115 101775 : static int prepare_socket_cloexec(int sock)
116 : {
117 : #ifdef FD_CLOEXEC
118 1191 : int flags;
119 :
120 101775 : flags = fcntl(sock, F_GETFD, 0);
121 101775 : if (flags == -1) {
122 0 : return errno;
123 : }
124 101775 : flags |= FD_CLOEXEC;
125 101775 : if (fcntl(sock, F_SETFD, flags) == -1) {
126 0 : return errno;
127 : }
128 : #endif
129 100584 : return 0;
130 : }
131 :
132 82786 : static void close_fd_array(int *fds, size_t num_fds)
133 : {
134 70625 : size_t i;
135 :
136 82823 : for (i = 0; i < num_fds; i++) {
137 37 : if (fds[i] == -1) {
138 0 : continue;
139 : }
140 :
141 37 : close(fds[i]);
142 37 : fds[i] = -1;
143 : }
144 82786 : }
145 :
146 : /*
147 : * The idle handler can free the struct messaging_dgm_out *,
148 : * if it's unused (qlen of zero) which closes the socket.
149 : */
150 :
151 12496 : static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
152 : struct tevent_timer *te,
153 : struct timeval current_time,
154 : void *private_data)
155 : {
156 12496 : struct messaging_dgm_out *out = talloc_get_type_abort(
157 : private_data, struct messaging_dgm_out);
158 28 : size_t qlen;
159 :
160 12496 : out->idle_timer = NULL;
161 :
162 12496 : qlen = tevent_queue_length(out->queue);
163 12496 : if (qlen == 0) {
164 12496 : TALLOC_FREE(out);
165 : }
166 12496 : }
167 :
168 : /*
169 : * Setup the idle handler to fire after 1 second if the
170 : * queue is zero.
171 : */
172 :
173 579990 : static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out *out)
174 : {
175 135780 : size_t qlen;
176 :
177 579990 : qlen = tevent_queue_length(out->queue);
178 579990 : if (qlen != 0) {
179 135139 : TALLOC_FREE(out->idle_timer);
180 135139 : return;
181 : }
182 :
183 444851 : if (out->idle_timer != NULL) {
184 420742 : tevent_update_timer(out->idle_timer,
185 : tevent_timeval_current_ofs(1, 0));
186 420742 : return;
187 : }
188 :
189 24109 : out->idle_timer = tevent_add_timer(
190 : out->ctx->ev, out, tevent_timeval_current_ofs(1, 0),
191 : messaging_dgm_out_idle_handler, out);
192 : /*
193 : * No NULL check, we'll come back here. Worst case we're
194 : * leaking a bit.
195 : */
196 : }
197 :
198 : static int messaging_dgm_out_destructor(struct messaging_dgm_out *dst);
199 : static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
200 : struct tevent_timer *te,
201 : struct timeval current_time,
202 : void *private_data);
203 :
204 : /*
205 : * Connect to an existing rendezvous point for another
206 : * pid - wrapped inside a struct messaging_dgm_out *.
207 : */
208 :
209 29860 : static int messaging_dgm_out_create(TALLOC_CTX *mem_ctx,
210 : struct messaging_dgm_context *ctx,
211 : pid_t pid, struct messaging_dgm_out **pout)
212 : {
213 552 : struct messaging_dgm_out *out;
214 29860 : struct sockaddr_un addr = { .sun_family = AF_UNIX };
215 29860 : int ret = ENOMEM;
216 552 : int out_pathlen;
217 552 : char addr_buf[sizeof(addr.sun_path) + (3 * sizeof(unsigned) + 2)];
218 :
219 29860 : out = talloc(mem_ctx, struct messaging_dgm_out);
220 29860 : if (out == NULL) {
221 0 : goto fail;
222 : }
223 :
224 29860 : *out = (struct messaging_dgm_out) {
225 : .pid = pid,
226 : .ctx = ctx,
227 : .cookie = 1
228 : };
229 :
230 29860 : out_pathlen = snprintf(addr_buf, sizeof(addr_buf),
231 29860 : "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
232 29860 : if (out_pathlen < 0) {
233 0 : goto errno_fail;
234 : }
235 29860 : if ((size_t)out_pathlen >= sizeof(addr.sun_path)) {
236 0 : ret = ENAMETOOLONG;
237 0 : goto fail;
238 : }
239 :
240 29860 : memcpy(addr.sun_path, addr_buf, out_pathlen + 1);
241 :
242 29860 : out->queue = tevent_queue_create(out, addr.sun_path);
243 29860 : if (out->queue == NULL) {
244 0 : ret = ENOMEM;
245 0 : goto fail;
246 : }
247 :
248 29860 : out->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
249 29860 : if (out->sock == -1) {
250 0 : goto errno_fail;
251 : }
252 :
253 29860 : DLIST_ADD(ctx->outsocks, out);
254 29860 : talloc_set_destructor(out, messaging_dgm_out_destructor);
255 :
256 552 : do {
257 29860 : ret = connect(out->sock,
258 : (const struct sockaddr *)(const void *)&addr,
259 : sizeof(addr));
260 29860 : } while ((ret == -1) && (errno == EINTR));
261 :
262 29860 : if (ret == -1) {
263 5939 : goto errno_fail;
264 : }
265 :
266 23921 : ret = set_blocking(out->sock, false);
267 23921 : if (ret == -1) {
268 0 : goto errno_fail;
269 : }
270 23921 : out->is_blocking = false;
271 :
272 23921 : *pout = out;
273 23921 : return 0;
274 5939 : errno_fail:
275 5939 : ret = errno;
276 5939 : fail:
277 5939 : TALLOC_FREE(out);
278 5939 : return ret;
279 : }
280 :
281 59943 : static int messaging_dgm_out_destructor(struct messaging_dgm_out *out)
282 : {
283 59943 : DLIST_REMOVE(out->ctx->outsocks, out);
284 :
285 59943 : if ((tevent_queue_length(out->queue) != 0) &&
286 3 : (tevent_cached_getpid() == out->ctx->pid)) {
287 : /*
288 : * We have pending jobs. We can't close the socket,
289 : * this has been handed over to messaging_dgm_out_queue_state.
290 : */
291 0 : return 0;
292 : }
293 :
294 59940 : if (out->sock != -1) {
295 59940 : close(out->sock);
296 59940 : out->sock = -1;
297 : }
298 58914 : return 0;
299 : }
300 :
301 : /*
302 : * Find the struct messaging_dgm_out * to talk to pid.
303 : * If we don't have one, create it. Set the timer to
304 : * delete after 1 sec.
305 : */
306 :
307 517589 : static int messaging_dgm_out_get(struct messaging_dgm_context *ctx, pid_t pid,
308 : struct messaging_dgm_out **pout)
309 : {
310 68144 : struct messaging_dgm_out *out;
311 68144 : int ret;
312 :
313 956167 : for (out = ctx->outsocks; out != NULL; out = out->next) {
314 926307 : if (out->pid == pid) {
315 420137 : break;
316 : }
317 : }
318 :
319 517589 : if (out == NULL) {
320 29860 : ret = messaging_dgm_out_create(ctx, ctx, pid, &out);
321 29860 : if (ret != 0) {
322 5939 : return ret;
323 : }
324 : }
325 :
326 : /*
327 : * shouldn't be possible, should be set if messaging_dgm_out_create
328 : * succeeded. This check is to satisfy static checker
329 : */
330 511650 : if (out == NULL) {
331 0 : return EINVAL;
332 : }
333 511650 : messaging_dgm_out_rearm_idle_timer(out);
334 :
335 511650 : *pout = out;
336 511650 : return 0;
337 : }
338 :
339 : /*
340 : * This function is called directly to send a message fragment
341 : * when the outgoing queue is zero, and from a pthreadpool
342 : * job thread when messages are being queued (qlen != 0).
343 : * Make sure *ONLY* thread-safe functions are called within.
344 : */
345 :
346 525523 : static ssize_t messaging_dgm_sendmsg(int sock,
347 : const struct iovec *iov, int iovlen,
348 : const int *fds, size_t num_fds,
349 : int *perrno)
350 : {
351 69161 : struct msghdr msg;
352 69161 : ssize_t fdlen, ret;
353 :
354 : /*
355 : * Do the actual sendmsg syscall. This will be called from a
356 : * pthreadpool helper thread, so be careful what you do here.
357 : */
358 :
359 525523 : msg = (struct msghdr) {
360 : .msg_iov = discard_const_p(struct iovec, iov),
361 : .msg_iovlen = iovlen
362 : };
363 :
364 525523 : fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
365 525523 : if (fdlen == -1) {
366 0 : *perrno = EINVAL;
367 0 : return -1;
368 : }
369 :
370 525523 : {
371 525523 : uint8_t buf[fdlen];
372 :
373 525523 : msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
374 :
375 69161 : do {
376 525523 : ret = sendmsg(sock, &msg, 0);
377 525520 : } while ((ret == -1) && (errno == EINTR));
378 : }
379 :
380 525520 : if (ret == -1) {
381 359 : *perrno = errno;
382 : }
383 456359 : return ret;
384 : }
385 :
386 : struct messaging_dgm_out_queue_state {
387 : struct tevent_context *ev;
388 : struct pthreadpool_tevent *pool;
389 :
390 : struct tevent_req *req;
391 : struct tevent_req *subreq;
392 :
393 : int sock;
394 :
395 : int *fds;
396 : uint8_t *buf;
397 :
398 : ssize_t sent;
399 : int err;
400 : };
401 :
402 : static int messaging_dgm_out_queue_state_destructor(
403 : struct messaging_dgm_out_queue_state *state);
404 : static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
405 : void *private_data);
406 : static void messaging_dgm_out_threaded_job(void *private_data);
407 : static void messaging_dgm_out_queue_done(struct tevent_req *subreq);
408 :
409 : /*
410 : * Push a message fragment onto a queue to be sent by a
411 : * threadpool job. Makes copies of data/fd's to be sent.
412 : * The running tevent_queue internally creates an immediate
413 : * event to schedule the write.
414 : */
415 :
416 69335 : static struct tevent_req *messaging_dgm_out_queue_send(
417 : TALLOC_CTX *mem_ctx, struct tevent_context *ev,
418 : struct messaging_dgm_out *out,
419 : const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
420 : {
421 68628 : struct tevent_req *req;
422 68628 : struct messaging_dgm_out_queue_state *state;
423 68628 : struct tevent_queue_entry *e;
424 68628 : size_t i;
425 68628 : ssize_t buflen;
426 :
427 69335 : req = tevent_req_create(out, &state,
428 : struct messaging_dgm_out_queue_state);
429 69335 : if (req == NULL) {
430 0 : return NULL;
431 : }
432 69335 : state->ev = ev;
433 69335 : state->pool = out->ctx->pool;
434 69335 : state->sock = out->sock;
435 69335 : state->req = req;
436 :
437 : /*
438 : * Go blocking in a thread
439 : */
440 69335 : if (!out->is_blocking) {
441 245 : int ret = set_blocking(out->sock, true);
442 245 : if (ret == -1) {
443 0 : tevent_req_error(req, errno);
444 0 : return tevent_req_post(req, ev);
445 : }
446 245 : out->is_blocking = true;
447 : }
448 :
449 69335 : buflen = iov_buflen(iov, iovlen);
450 69335 : if (buflen == -1) {
451 0 : tevent_req_error(req, EMSGSIZE);
452 0 : return tevent_req_post(req, ev);
453 : }
454 :
455 69335 : state->buf = talloc_array(state, uint8_t, buflen);
456 69335 : if (tevent_req_nomem(state->buf, req)) {
457 0 : return tevent_req_post(req, ev);
458 : }
459 69335 : iov_buf(iov, iovlen, state->buf, buflen);
460 :
461 69335 : state->fds = talloc_array(state, int, num_fds);
462 69335 : if (tevent_req_nomem(state->fds, req)) {
463 0 : return tevent_req_post(req, ev);
464 : }
465 :
466 69372 : for (i=0; i<num_fds; i++) {
467 37 : state->fds[i] = -1;
468 : }
469 :
470 69372 : for (i=0; i<num_fds; i++) {
471 :
472 37 : state->fds[i] = dup(fds[i]);
473 :
474 37 : if (state->fds[i] == -1) {
475 0 : int ret = errno;
476 :
477 0 : close_fd_array(state->fds, num_fds);
478 :
479 0 : tevent_req_error(req, ret);
480 0 : return tevent_req_post(req, ev);
481 : }
482 : }
483 :
484 69335 : talloc_set_destructor(state, messaging_dgm_out_queue_state_destructor);
485 :
486 69335 : e = tevent_queue_add_entry(out->queue, ev, req,
487 68628 : messaging_dgm_out_queue_trigger, req);
488 69335 : if (tevent_req_nomem(e, req)) {
489 0 : return tevent_req_post(req, ev);
490 : }
491 707 : return req;
492 : }
493 :
494 69332 : static int messaging_dgm_out_queue_state_destructor(
495 : struct messaging_dgm_out_queue_state *state)
496 : {
497 68628 : int *fds;
498 68628 : size_t num_fds;
499 :
500 69332 : if (state->subreq != NULL) {
501 : /*
502 : * We're scheduled, but we're destroyed. This happens
503 : * if the messaging_dgm_context is destroyed while
504 : * we're stuck in a blocking send. There's nothing we
505 : * can do but to leak memory.
506 : */
507 3 : TALLOC_FREE(state->subreq);
508 3 : (void)talloc_reparent(state->req, NULL, state);
509 3 : return -1;
510 : }
511 :
512 69329 : fds = state->fds;
513 69329 : num_fds = talloc_array_length(fds);
514 69329 : close_fd_array(fds, num_fds);
515 69329 : return 0;
516 : }
517 :
518 : /*
519 : * tevent_queue callback that schedules the pthreadpool to actually
520 : * send the queued message fragment.
521 : */
522 :
523 68323 : static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
524 : void *private_data)
525 : {
526 68323 : struct messaging_dgm_out_queue_state *state = tevent_req_data(
527 : req, struct messaging_dgm_out_queue_state);
528 :
529 68323 : tevent_req_reset_endtime(req);
530 :
531 68323 : state->subreq = pthreadpool_tevent_job_send(
532 : state, state->ev, state->pool,
533 : messaging_dgm_out_threaded_job, state);
534 68323 : if (tevent_req_nomem(state->subreq, req)) {
535 0 : return;
536 : }
537 68323 : tevent_req_set_callback(state->subreq, messaging_dgm_out_queue_done,
538 : req);
539 : }
540 :
541 : /*
542 : * Wrapper function run by the pthread that calls
543 : * messaging_dgm_sendmsg() to actually do the sendmsg().
544 : */
545 :
546 68323 : static void messaging_dgm_out_threaded_job(void *private_data)
547 : {
548 68323 : struct messaging_dgm_out_queue_state *state = talloc_get_type_abort(
549 : private_data, struct messaging_dgm_out_queue_state);
550 :
551 136646 : struct iovec iov = { .iov_base = state->buf,
552 68323 : .iov_len = talloc_get_size(state->buf) };
553 68323 : size_t num_fds = talloc_array_length(state->fds);
554 68323 : int msec = 1;
555 :
556 67639 : while (true) {
557 67639 : int ret;
558 :
559 136643 : state->sent = messaging_dgm_sendmsg(state->sock, &iov, 1,
560 68323 : state->fds, num_fds, &state->err);
561 :
562 68320 : if (state->sent != -1) {
563 681 : return;
564 : }
565 1 : if (state->err != ENOBUFS) {
566 0 : return;
567 : }
568 :
569 : /*
570 : * ENOBUFS is the FreeBSD way of saying "Try
571 : * again". We have to do polling.
572 : */
573 0 : do {
574 0 : ret = poll(NULL, 0, msec);
575 0 : } while ((ret == -1) && (errno == EINTR));
576 :
577 : /*
578 : * Exponential backoff up to once a second
579 : */
580 0 : msec *= 2;
581 0 : msec = MIN(msec, 1000);
582 : }
583 : }
584 :
585 : /*
586 : * Pickup the results of the pthread sendmsg().
587 : */
588 :
589 68317 : static void messaging_dgm_out_queue_done(struct tevent_req *subreq)
590 : {
591 68317 : struct tevent_req *req = tevent_req_callback_data(
592 : subreq, struct tevent_req);
593 68317 : struct messaging_dgm_out_queue_state *state = tevent_req_data(
594 : req, struct messaging_dgm_out_queue_state);
595 67636 : int ret;
596 :
597 68317 : if (subreq != state->subreq) {
598 0 : abort();
599 : }
600 :
601 68317 : ret = pthreadpool_tevent_job_recv(subreq);
602 :
603 68317 : TALLOC_FREE(subreq);
604 68317 : state->subreq = NULL;
605 :
606 68317 : if (tevent_req_error(req, ret)) {
607 0 : return;
608 : }
609 68317 : if (state->sent == -1) {
610 0 : tevent_req_error(req, state->err);
611 0 : return;
612 : }
613 68317 : tevent_req_done(req);
614 : }
615 :
616 68340 : static int messaging_dgm_out_queue_recv(struct tevent_req *req)
617 : {
618 68340 : return tevent_req_simple_recv_unix(req);
619 : }
620 :
621 : static void messaging_dgm_out_sent_fragment(struct tevent_req *req);
622 :
623 : /*
624 : * Core function to send a message fragment given a
625 : * connected struct messaging_dgm_out * destination.
626 : * If no current queue tries to send nonblocking
627 : * directly. If not, queues the fragment (which makes
628 : * a copy of it) and adds a 60-second timeout on the send.
629 : */
630 :
631 526290 : static int messaging_dgm_out_send_fragment(
632 : struct tevent_context *ev, struct messaging_dgm_out *out,
633 : const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
634 : {
635 70144 : struct tevent_req *req;
636 70144 : size_t qlen;
637 70144 : bool ok;
638 :
639 526290 : qlen = tevent_queue_length(out->queue);
640 526290 : if (qlen == 0) {
641 1522 : ssize_t nsent;
642 457200 : int err = 0;
643 :
644 457200 : if (out->is_blocking) {
645 152 : int ret = set_blocking(out->sock, false);
646 152 : if (ret == -1) {
647 456955 : return errno;
648 : }
649 152 : out->is_blocking = false;
650 : }
651 :
652 457200 : nsent = messaging_dgm_sendmsg(out->sock, iov, iovlen, fds,
653 : num_fds, &err);
654 457200 : if (nsent >= 0) {
655 455425 : return 0;
656 : }
657 :
658 358 : if (err == ENOBUFS) {
659 : /*
660 : * FreeBSD's way of telling us the dst socket
661 : * is full. EWOULDBLOCK makes us spawn a
662 : * polling helper thread.
663 : */
664 0 : err = EWOULDBLOCK;
665 : }
666 :
667 358 : if (err != EWOULDBLOCK) {
668 14 : return err;
669 : }
670 : }
671 :
672 69335 : req = messaging_dgm_out_queue_send(out, ev, out, iov, iovlen,
673 : fds, num_fds);
674 69335 : if (req == NULL) {
675 0 : return ENOMEM;
676 : }
677 69335 : tevent_req_set_callback(req, messaging_dgm_out_sent_fragment, out);
678 :
679 69335 : ok = tevent_req_set_endtime(req, ev,
680 : tevent_timeval_current_ofs(60, 0));
681 69335 : if (!ok) {
682 0 : TALLOC_FREE(req);
683 0 : return ENOMEM;
684 : }
685 :
686 707 : return 0;
687 : }
688 :
689 : /*
690 : * Pickup the result of the fragment send. Reset idle timer
691 : * if queue empty.
692 : */
693 :
694 68340 : static void messaging_dgm_out_sent_fragment(struct tevent_req *req)
695 : {
696 68340 : struct messaging_dgm_out *out = tevent_req_callback_data(
697 : req, struct messaging_dgm_out);
698 67636 : int ret;
699 :
700 68340 : ret = messaging_dgm_out_queue_recv(req);
701 68340 : TALLOC_FREE(req);
702 :
703 68340 : if (ret != 0) {
704 23 : DBG_WARNING("messaging_out_queue_recv returned %s\n",
705 : strerror(ret));
706 : }
707 :
708 68340 : messaging_dgm_out_rearm_idle_timer(out);
709 68340 : }
710 :
711 :
712 : struct messaging_dgm_fragment_hdr {
713 : size_t msglen;
714 : pid_t pid;
715 : int sock;
716 : };
717 :
718 : /*
719 : * Fragment a message into MESSAGING_DGM_FRAGMENT_LENGTH - 64-bit cookie
720 : * size chunks and send it.
721 : *
722 : * Message fragments are prefixed by a 64-bit cookie that
723 : * stays the same for all fragments. This allows the receiver
724 : * to recognise fragments of the same message and re-assemble
725 : * them on the other end.
726 : *
727 : * Note that this allows other message fragments from other
728 : * senders to be interleaved in the receive read processing,
729 : * the combination of the cookie and header info allows unique
730 : * identification of the message from a specific sender in
731 : * re-assembly.
732 : *
733 : * If the message is smaller than MESSAGING_DGM_FRAGMENT_LENGTH - cookie
734 : * then send a single message with cookie set to zero.
735 : *
736 : * Otherwise the message is fragmented into chunks and added
737 : * to the sending queue. Any file descriptors are passed only
738 : * in the last fragment.
739 : *
740 : * Finally the cookie is incremented (wrap over zero) to
741 : * prepare for the next message sent to this channel.
742 : *
743 : */
744 :
745 511650 : static int messaging_dgm_out_send_fragmented(struct tevent_context *ev,
746 : struct messaging_dgm_out *out,
747 : const struct iovec *iov,
748 : int iovlen,
749 : const int *fds, size_t num_fds)
750 511650 : {
751 68144 : ssize_t msglen, sent;
752 511650 : int ret = 0;
753 511650 : struct iovec iov_copy[iovlen+2];
754 68144 : struct messaging_dgm_fragment_hdr hdr;
755 68144 : struct iovec src_iov;
756 :
757 511650 : if (iovlen < 0) {
758 0 : return EINVAL;
759 : }
760 :
761 511650 : msglen = iov_buflen(iov, iovlen);
762 511650 : if (msglen == -1) {
763 0 : return EMSGSIZE;
764 : }
765 511650 : if (num_fds > INT8_MAX) {
766 0 : return EINVAL;
767 : }
768 :
769 511650 : if ((size_t) msglen <=
770 : (MESSAGING_DGM_FRAGMENT_LENGTH - sizeof(uint64_t))) {
771 499131 : uint64_t cookie = 0;
772 :
773 499131 : iov_copy[0].iov_base = &cookie;
774 499131 : iov_copy[0].iov_len = sizeof(cookie);
775 499131 : if (iovlen > 0) {
776 499131 : memcpy(&iov_copy[1], iov,
777 : sizeof(struct iovec) * iovlen);
778 : }
779 :
780 499131 : return messaging_dgm_out_send_fragment(
781 : ev, out, iov_copy, iovlen+1, fds, num_fds);
782 :
783 : }
784 :
785 25038 : hdr = (struct messaging_dgm_fragment_hdr) {
786 : .msglen = msglen,
787 12519 : .pid = tevent_cached_getpid(),
788 12519 : .sock = out->sock
789 : };
790 :
791 12519 : iov_copy[0].iov_base = &out->cookie;
792 12519 : iov_copy[0].iov_len = sizeof(out->cookie);
793 12519 : iov_copy[1].iov_base = &hdr;
794 12519 : iov_copy[1].iov_len = sizeof(hdr);
795 :
796 12519 : sent = 0;
797 12519 : src_iov = iov[0];
798 :
799 : /*
800 : * The following write loop sends the user message in pieces. We have
801 : * filled the first two iovecs above with "cookie" and "hdr". In the
802 : * following loops we pull message chunks from the user iov array and
803 : * fill iov_copy piece by piece, possibly truncating chunks from the
804 : * caller's iov array. Ugly, but hopefully efficient.
805 : */
806 :
807 39678 : while (sent < msglen) {
808 : size_t fragment_len;
809 25157 : size_t iov_index = 2;
810 :
811 25157 : fragment_len = sizeof(out->cookie) + sizeof(hdr);
812 :
813 54318 : while (fragment_len < MESSAGING_DGM_FRAGMENT_LENGTH) {
814 2004 : size_t space, chunk;
815 :
816 39678 : space = MESSAGING_DGM_FRAGMENT_LENGTH - fragment_len;
817 39678 : chunk = MIN(space, src_iov.iov_len);
818 :
819 39678 : iov_copy[iov_index].iov_base = src_iov.iov_base;
820 39678 : iov_copy[iov_index].iov_len = chunk;
821 39678 : iov_index += 1;
822 :
823 39678 : src_iov.iov_base = (char *)src_iov.iov_base + chunk;
824 39678 : src_iov.iov_len -= chunk;
825 39678 : fragment_len += chunk;
826 :
827 39678 : if (src_iov.iov_len == 0) {
828 25038 : iov += 1;
829 25038 : iovlen -= 1;
830 25038 : if (iovlen == 0) {
831 12517 : break;
832 : }
833 12519 : src_iov = iov[0];
834 : }
835 : }
836 27159 : sent += (fragment_len - sizeof(out->cookie) - sizeof(hdr));
837 :
838 : /*
839 : * only the last fragment should pass the fd array.
840 : * That simplifies the receiver a lot.
841 : */
842 27159 : if (sent < msglen) {
843 14640 : ret = messaging_dgm_out_send_fragment(
844 : ev, out, iov_copy, iov_index, NULL, 0);
845 : } else {
846 12519 : ret = messaging_dgm_out_send_fragment(
847 : ev, out, iov_copy, iov_index, fds, num_fds);
848 : }
849 27159 : if (ret != 0) {
850 0 : break;
851 : }
852 : }
853 :
854 12519 : out->cookie += 1;
855 12519 : if (out->cookie == 0) {
856 0 : out->cookie += 1;
857 : }
858 :
859 12517 : return ret;
860 : }
861 :
862 : static struct messaging_dgm_context *global_dgm_context;
863 :
864 : static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
865 :
866 59125 : static int messaging_dgm_lockfile_create(struct messaging_dgm_context *ctx,
867 : pid_t pid, int *plockfile_fd,
868 : uint64_t *punique)
869 : {
870 1133 : char buf[64];
871 1133 : int lockfile_fd;
872 1133 : struct sun_path_buf lockfile_name;
873 1133 : struct flock lck;
874 1133 : uint64_t unique;
875 1133 : int unique_len, ret;
876 1133 : ssize_t written;
877 :
878 59125 : ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
879 59125 : "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
880 59125 : if (ret < 0) {
881 0 : return errno;
882 : }
883 59125 : if ((unsigned)ret >= sizeof(lockfile_name.buf)) {
884 0 : return ENAMETOOLONG;
885 : }
886 :
887 : /* no O_EXCL, existence check is via the fcntl lock */
888 :
889 59125 : lockfile_fd = open(lockfile_name.buf, O_NONBLOCK|O_CREAT|O_RDWR,
890 : 0644);
891 :
892 59125 : if ((lockfile_fd == -1) &&
893 0 : ((errno == ENXIO) /* Linux */ ||
894 0 : (errno == ENODEV) /* Linux kernel bug */ ||
895 0 : (errno == EOPNOTSUPP) /* FreeBSD */)) {
896 : /*
897 : * Huh -- a socket? This might be a stale socket from
898 : * an upgrade of Samba. Just unlink and retry, nobody
899 : * else is supposed to be here at this time.
900 : *
901 : * Yes, this is racy, but I don't see a way to deal
902 : * with this properly.
903 : */
904 0 : unlink(lockfile_name.buf);
905 :
906 0 : lockfile_fd = open(lockfile_name.buf,
907 : O_NONBLOCK|O_CREAT|O_WRONLY,
908 : 0644);
909 : }
910 :
911 59125 : if (lockfile_fd == -1) {
912 0 : ret = errno;
913 0 : DEBUG(1, ("%s: open failed: %s\n", __func__, strerror(errno)));
914 0 : return ret;
915 : }
916 :
917 59125 : lck = (struct flock) {
918 : .l_type = F_WRLCK,
919 : .l_whence = SEEK_SET
920 : };
921 :
922 59125 : ret = fcntl(lockfile_fd, F_SETLK, &lck);
923 59125 : if (ret == -1) {
924 0 : ret = errno;
925 0 : DEBUG(1, ("%s: fcntl failed: %s\n", __func__, strerror(ret)));
926 0 : goto fail_close;
927 : }
928 :
929 : /*
930 : * Directly using the binary value for
931 : * SERVERID_UNIQUE_ID_NOT_TO_VERIFY is a layering
932 : * violation. But including all of ndr here just for this
933 : * seems to be a bit overkill to me. Also, messages_dgm might
934 : * be replaced sooner or later by something streams-based,
935 : * where unique_id generation will be handled differently.
936 : */
937 :
938 1133 : do {
939 59125 : generate_random_buffer((uint8_t *)&unique, sizeof(unique));
940 59125 : } while (unique == UINT64_C(0xFFFFFFFFFFFFFFFF));
941 :
942 59125 : unique_len = snprintf(buf, sizeof(buf), "%"PRIu64"\n", unique);
943 :
944 : /* shorten a potentially preexisting file */
945 :
946 59125 : ret = ftruncate(lockfile_fd, unique_len);
947 59125 : if (ret == -1) {
948 0 : ret = errno;
949 0 : DEBUG(1, ("%s: ftruncate failed: %s\n", __func__,
950 : strerror(ret)));
951 0 : goto fail_unlink;
952 : }
953 :
954 59125 : written = write(lockfile_fd, buf, unique_len);
955 59125 : if (written != unique_len) {
956 0 : ret = errno;
957 0 : DEBUG(1, ("%s: write failed: %s\n", __func__, strerror(ret)));
958 0 : goto fail_unlink;
959 : }
960 :
961 59125 : *plockfile_fd = lockfile_fd;
962 59125 : *punique = unique;
963 59125 : return 0;
964 :
965 0 : fail_unlink:
966 0 : unlink(lockfile_name.buf);
967 0 : fail_close:
968 0 : close(lockfile_fd);
969 0 : return ret;
970 : }
971 :
972 : static void messaging_dgm_read_handler(struct tevent_context *ev,
973 : struct tevent_fd *fde,
974 : uint16_t flags,
975 : void *private_data);
976 :
977 : /*
978 : * Create the rendezvous point in the file system
979 : * that other processes can use to send messages to
980 : * this pid.
981 : */
982 :
983 59138 : int messaging_dgm_init(struct tevent_context *ev,
984 : uint64_t *punique,
985 : const char *socket_dir,
986 : const char *lockfile_dir,
987 : void (*recv_cb)(struct tevent_context *ev,
988 : const uint8_t *msg,
989 : size_t msg_len,
990 : int *fds,
991 : size_t num_fds,
992 : void *private_data),
993 : void *recv_cb_private_data)
994 : {
995 1140 : struct messaging_dgm_context *ctx;
996 1140 : int ret;
997 1140 : struct sockaddr_un socket_address;
998 1140 : size_t len;
999 1140 : static bool have_dgm_context = false;
1000 :
1001 59138 : if (have_dgm_context) {
1002 0 : return EEXIST;
1003 : }
1004 :
1005 59138 : if ((socket_dir == NULL) || (lockfile_dir == NULL)) {
1006 0 : return EINVAL;
1007 : }
1008 :
1009 59138 : ctx = talloc_zero(NULL, struct messaging_dgm_context);
1010 59138 : if (ctx == NULL) {
1011 0 : goto fail_nomem;
1012 : }
1013 59138 : ctx->ev = ev;
1014 59138 : ctx->pid = tevent_cached_getpid();
1015 59138 : ctx->recv_cb = recv_cb;
1016 59138 : ctx->recv_cb_private_data = recv_cb_private_data;
1017 :
1018 59138 : len = strlcpy(ctx->lockfile_dir.buf, lockfile_dir,
1019 : sizeof(ctx->lockfile_dir.buf));
1020 59138 : if (len >= sizeof(ctx->lockfile_dir.buf)) {
1021 5 : TALLOC_FREE(ctx);
1022 5 : return ENAMETOOLONG;
1023 : }
1024 :
1025 59133 : len = strlcpy(ctx->socket_dir.buf, socket_dir,
1026 : sizeof(ctx->socket_dir.buf));
1027 59133 : if (len >= sizeof(ctx->socket_dir.buf)) {
1028 8 : TALLOC_FREE(ctx);
1029 8 : return ENAMETOOLONG;
1030 : }
1031 :
1032 59125 : socket_address = (struct sockaddr_un) { .sun_family = AF_UNIX };
1033 59125 : len = snprintf(socket_address.sun_path,
1034 : sizeof(socket_address.sun_path),
1035 59125 : "%s/%u", socket_dir, (unsigned)ctx->pid);
1036 59125 : if (len >= sizeof(socket_address.sun_path)) {
1037 0 : TALLOC_FREE(ctx);
1038 0 : return ENAMETOOLONG;
1039 : }
1040 :
1041 59125 : ret = messaging_dgm_lockfile_create(ctx, ctx->pid, &ctx->lockfile_fd,
1042 : punique);
1043 59125 : if (ret != 0) {
1044 0 : DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
1045 : __func__, strerror(ret)));
1046 0 : TALLOC_FREE(ctx);
1047 0 : return ret;
1048 : }
1049 :
1050 59125 : unlink(socket_address.sun_path);
1051 :
1052 59125 : ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
1053 59125 : if (ctx->sock == -1) {
1054 0 : ret = errno;
1055 0 : DBG_WARNING("socket failed: %s\n", strerror(ret));
1056 0 : TALLOC_FREE(ctx);
1057 0 : return ret;
1058 : }
1059 :
1060 59125 : ret = prepare_socket_cloexec(ctx->sock);
1061 59125 : if (ret == -1) {
1062 0 : ret = errno;
1063 0 : DBG_WARNING("prepare_socket_cloexec failed: %s\n",
1064 : strerror(ret));
1065 0 : TALLOC_FREE(ctx);
1066 0 : return ret;
1067 : }
1068 :
1069 59125 : ret = bind(ctx->sock, (struct sockaddr *)(void *)&socket_address,
1070 : sizeof(socket_address));
1071 59125 : if (ret == -1) {
1072 0 : ret = errno;
1073 0 : DBG_WARNING("bind failed: %s\n", strerror(ret));
1074 0 : TALLOC_FREE(ctx);
1075 0 : return ret;
1076 : }
1077 :
1078 59125 : talloc_set_destructor(ctx, messaging_dgm_context_destructor);
1079 :
1080 59125 : ctx->have_dgm_context = &have_dgm_context;
1081 :
1082 59125 : ret = pthreadpool_tevent_init(ctx, UINT_MAX, &ctx->pool);
1083 59125 : if (ret != 0) {
1084 0 : DBG_WARNING("pthreadpool_tevent_init failed: %s\n",
1085 : strerror(ret));
1086 0 : TALLOC_FREE(ctx);
1087 0 : return ret;
1088 : }
1089 :
1090 59125 : global_dgm_context = ctx;
1091 59125 : return 0;
1092 :
1093 0 : fail_nomem:
1094 0 : TALLOC_FREE(ctx);
1095 0 : return ENOMEM;
1096 : }
1097 :
1098 : /*
1099 : * Remove the rendezvous point in the filesystem
1100 : * if we're the owner.
1101 : */
1102 :
1103 98733 : static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
1104 : {
1105 155700 : while (c->outsocks != NULL) {
1106 38244 : TALLOC_FREE(c->outsocks);
1107 : }
1108 119535 : while (c->in_msgs != NULL) {
1109 1181 : TALLOC_FREE(c->in_msgs);
1110 : }
1111 121802 : while (c->fde_evs != NULL) {
1112 23069 : tevent_fd_set_flags(c->fde_evs->fde, 0);
1113 23069 : c->fde_evs->ctx = NULL;
1114 24250 : DLIST_REMOVE(c->fde_evs, c->fde_evs);
1115 : }
1116 :
1117 98733 : close(c->sock);
1118 :
1119 98733 : if (tevent_cached_getpid() == c->pid) {
1120 267 : struct sun_path_buf name;
1121 267 : int ret;
1122 :
1123 50946 : ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
1124 50946 : c->socket_dir.buf, (unsigned)c->pid);
1125 50946 : if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
1126 : /*
1127 : * We've checked the length when creating, so this
1128 : * should never happen
1129 : */
1130 0 : abort();
1131 : }
1132 50946 : unlink(name.buf);
1133 :
1134 50946 : ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
1135 50946 : c->lockfile_dir.buf, (unsigned)c->pid);
1136 50946 : if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
1137 : /*
1138 : * We've checked the length when creating, so this
1139 : * should never happen
1140 : */
1141 0 : abort();
1142 : }
1143 50946 : unlink(name.buf);
1144 : }
1145 98733 : close(c->lockfile_fd);
1146 :
1147 98733 : if (c->have_dgm_context != NULL) {
1148 98733 : *c->have_dgm_context = false;
1149 : }
1150 :
1151 98733 : return 0;
1152 : }
1153 :
1154 1277372 : static void messaging_dgm_validate(struct messaging_dgm_context *ctx)
1155 : {
1156 : #ifdef DEVELOPER
1157 1277372 : pid_t pid = tevent_cached_getpid();
1158 153746 : struct sockaddr_storage addr;
1159 1277372 : socklen_t addrlen = sizeof(addr);
1160 153746 : struct sockaddr_un *un_addr;
1161 153746 : struct sun_path_buf pathbuf;
1162 153746 : struct stat st1, st2;
1163 153746 : int ret;
1164 :
1165 : /*
1166 : * Protect against using the wrong messaging context after a
1167 : * fork without reinit_after_fork.
1168 : */
1169 :
1170 1277372 : ret = getsockname(ctx->sock, (struct sockaddr *)&addr, &addrlen);
1171 1277372 : if (ret == -1) {
1172 0 : DBG_ERR("getsockname failed: %s\n", strerror(errno));
1173 0 : goto fail;
1174 : }
1175 1277372 : if (addr.ss_family != AF_UNIX) {
1176 0 : DBG_ERR("getsockname returned family %d\n",
1177 : (int)addr.ss_family);
1178 0 : goto fail;
1179 : }
1180 1277372 : un_addr = (struct sockaddr_un *)&addr;
1181 :
1182 1277372 : ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
1183 1277372 : "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
1184 1277372 : if (ret < 0) {
1185 0 : DBG_ERR("snprintf failed: %s\n", strerror(errno));
1186 0 : goto fail;
1187 : }
1188 1277372 : if ((size_t)ret >= sizeof(pathbuf.buf)) {
1189 0 : DBG_ERR("snprintf returned %d chars\n", (int)ret);
1190 0 : goto fail;
1191 : }
1192 :
1193 1277372 : if (strcmp(pathbuf.buf, un_addr->sun_path) != 0) {
1194 0 : DBG_ERR("sockname wrong: Expected %s, got %s\n",
1195 : pathbuf.buf, un_addr->sun_path);
1196 0 : goto fail;
1197 : }
1198 :
1199 1277372 : ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
1200 1277372 : "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
1201 1277372 : if (ret < 0) {
1202 0 : DBG_ERR("snprintf failed: %s\n", strerror(errno));
1203 0 : goto fail;
1204 : }
1205 1277372 : if ((size_t)ret >= sizeof(pathbuf.buf)) {
1206 0 : DBG_ERR("snprintf returned %d chars\n", (int)ret);
1207 0 : goto fail;
1208 : }
1209 :
1210 1277372 : ret = stat(pathbuf.buf, &st1);
1211 1277372 : if (ret == -1) {
1212 0 : DBG_ERR("stat failed: %s\n", strerror(errno));
1213 0 : goto fail;
1214 : }
1215 1277372 : ret = fstat(ctx->lockfile_fd, &st2);
1216 1277372 : if (ret == -1) {
1217 0 : DBG_ERR("fstat failed: %s\n", strerror(errno));
1218 0 : goto fail;
1219 : }
1220 :
1221 1277372 : if ((st1.st_dev != st2.st_dev) || (st1.st_ino != st2.st_ino)) {
1222 0 : DBG_ERR("lockfile differs, expected (%d/%d), got (%d/%d)\n",
1223 : (int)st2.st_dev, (int)st2.st_ino,
1224 : (int)st1.st_dev, (int)st1.st_ino);
1225 0 : goto fail;
1226 : }
1227 :
1228 1277372 : return;
1229 0 : fail:
1230 0 : abort();
1231 : #else
1232 : return;
1233 : #endif
1234 : }
1235 :
1236 : static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
1237 : struct tevent_context *ev,
1238 : uint8_t *msg, size_t msg_len,
1239 : int *fds, size_t num_fds);
1240 :
1241 : /*
1242 : * Raw read callback handler - passes to messaging_dgm_recv()
1243 : * for fragment reassembly processing.
1244 : */
1245 :
1246 202595 : static void messaging_dgm_read_handler(struct tevent_context *ev,
1247 : struct tevent_fd *fde,
1248 : uint16_t flags,
1249 : void *private_data)
1250 202595 : {
1251 202595 : struct messaging_dgm_context *ctx = talloc_get_type_abort(
1252 : private_data, struct messaging_dgm_context);
1253 68066 : ssize_t received;
1254 68066 : struct msghdr msg;
1255 68066 : struct iovec iov;
1256 202595 : size_t msgbufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
1257 202595 : uint8_t msgbuf[msgbufsize];
1258 68066 : uint8_t buf[MESSAGING_DGM_FRAGMENT_LENGTH];
1259 68066 : size_t num_fds;
1260 :
1261 202595 : messaging_dgm_validate(ctx);
1262 :
1263 202595 : if ((flags & TEVENT_FD_READ) == 0) {
1264 0 : return;
1265 : }
1266 :
1267 202595 : iov = (struct iovec) { .iov_base = buf, .iov_len = sizeof(buf) };
1268 202595 : msg = (struct msghdr) { .msg_iov = &iov, .msg_iovlen = 1 };
1269 :
1270 202595 : msghdr_prep_recv_fds(&msg, msgbuf, msgbufsize, INT8_MAX);
1271 :
1272 : #ifdef MSG_CMSG_CLOEXEC
1273 202595 : msg.msg_flags |= MSG_CMSG_CLOEXEC;
1274 : #endif
1275 :
1276 202595 : received = recvmsg(ctx->sock, &msg, 0);
1277 202595 : if (received == -1) {
1278 0 : if ((errno == EAGAIN) ||
1279 0 : (errno == EWOULDBLOCK) ||
1280 0 : (errno == EINTR) ||
1281 0 : (errno == ENOMEM)) {
1282 : /* Not really an error - just try again. */
1283 0 : return;
1284 : }
1285 : /* Problem with the socket. Set it unreadable. */
1286 0 : tevent_fd_set_flags(fde, 0);
1287 0 : return;
1288 : }
1289 :
1290 202595 : if ((size_t)received > sizeof(buf)) {
1291 : /* More than we expected, not for us */
1292 0 : return;
1293 : }
1294 :
1295 202595 : num_fds = msghdr_extract_fds(&msg, NULL, 0);
1296 202595 : if (num_fds == 0) {
1297 68011 : int fds[1];
1298 :
1299 159948 : messaging_dgm_recv(ctx, ev, buf, received, fds, 0);
1300 42647 : } else {
1301 55 : size_t i;
1302 42647 : int fds[num_fds];
1303 :
1304 42647 : msghdr_extract_fds(&msg, fds, num_fds);
1305 :
1306 85352 : for (i = 0; i < num_fds; i++) {
1307 58 : int err;
1308 :
1309 42650 : err = prepare_socket_cloexec(fds[i]);
1310 42650 : if (err != 0) {
1311 0 : close_fd_array(fds, num_fds);
1312 0 : num_fds = 0;
1313 : }
1314 : }
1315 :
1316 42647 : messaging_dgm_recv(ctx, ev, buf, received, fds, num_fds);
1317 : }
1318 : }
1319 :
1320 0 : static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg *m)
1321 : {
1322 0 : DLIST_REMOVE(m->ctx->in_msgs, m);
1323 0 : return 0;
1324 : }
1325 :
1326 189138 : static void messaging_dgm_close_unconsumed(int *fds, size_t num_fds)
1327 : {
1328 66066 : size_t i;
1329 :
1330 231788 : for (i=0; i<num_fds; i++) {
1331 42650 : if (fds[i] != -1) {
1332 6 : close(fds[i]);
1333 6 : fds[i] = -1;
1334 : }
1335 : }
1336 189138 : }
1337 :
1338 : /*
1339 : * Deal with identification of fragmented messages and
1340 : * re-assembly into full messages sent, then calls the
1341 : * callback.
1342 : */
1343 :
1344 202595 : static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
1345 : struct tevent_context *ev,
1346 : uint8_t *buf, size_t buflen,
1347 : int *fds, size_t num_fds)
1348 : {
1349 68066 : struct messaging_dgm_fragment_hdr hdr;
1350 68066 : struct messaging_dgm_in_msg *msg;
1351 68066 : size_t space;
1352 68066 : uint64_t cookie;
1353 :
1354 202595 : if (buflen < sizeof(cookie)) {
1355 0 : goto close_fds;
1356 : }
1357 202595 : memcpy(&cookie, buf, sizeof(cookie));
1358 202595 : buf += sizeof(cookie);
1359 202595 : buflen -= sizeof(cookie);
1360 :
1361 202595 : if (cookie == 0) {
1362 177802 : ctx->recv_cb(ev, buf, buflen, fds, num_fds,
1363 : ctx->recv_cb_private_data);
1364 177802 : messaging_dgm_close_unconsumed(fds, num_fds);
1365 255202 : return;
1366 : }
1367 :
1368 24793 : if (buflen < sizeof(hdr)) {
1369 0 : goto close_fds;
1370 : }
1371 24793 : memcpy(&hdr, buf, sizeof(hdr));
1372 24793 : buf += sizeof(hdr);
1373 24793 : buflen -= sizeof(hdr);
1374 :
1375 24793 : for (msg = ctx->in_msgs; msg != NULL; msg = msg->next) {
1376 13457 : if ((msg->sender_pid == hdr.pid) &&
1377 11457 : (msg->sender_sock == hdr.sock)) {
1378 11457 : break;
1379 : }
1380 : }
1381 :
1382 24793 : if ((msg != NULL) && (msg->cookie != cookie)) {
1383 2002 : TALLOC_FREE(msg);
1384 : }
1385 :
1386 24793 : if (msg == NULL) {
1387 2 : size_t msglen;
1388 11336 : msglen = offsetof(struct messaging_dgm_in_msg, buf) +
1389 11334 : hdr.msglen;
1390 :
1391 11336 : msg = talloc_size(ctx, msglen);
1392 11336 : if (msg == NULL) {
1393 0 : goto close_fds;
1394 : }
1395 11336 : talloc_set_name_const(msg, "struct messaging_dgm_in_msg");
1396 :
1397 11336 : *msg = (struct messaging_dgm_in_msg) {
1398 11334 : .ctx = ctx, .msglen = hdr.msglen,
1399 11334 : .sender_pid = hdr.pid, .sender_sock = hdr.sock,
1400 : .cookie = cookie
1401 : };
1402 11336 : DLIST_ADD(ctx->in_msgs, msg);
1403 11336 : talloc_set_destructor(msg, messaging_dgm_in_msg_destructor);
1404 : }
1405 :
1406 24793 : space = msg->msglen - msg->received;
1407 24793 : if (buflen > space) {
1408 0 : goto close_fds;
1409 : }
1410 :
1411 24793 : memcpy(msg->buf + msg->received, buf, buflen);
1412 24793 : msg->received += buflen;
1413 :
1414 24793 : if (msg->received < msg->msglen) {
1415 : /*
1416 : * Any valid sender will send the fds in the last
1417 : * block. Invalid senders might have sent fd's that we
1418 : * need to close here.
1419 : */
1420 13457 : goto close_fds;
1421 : }
1422 :
1423 11336 : DLIST_REMOVE(ctx->in_msgs, msg);
1424 11336 : talloc_set_destructor(msg, NULL);
1425 :
1426 11336 : ctx->recv_cb(ev, msg->buf, msg->msglen, fds, num_fds,
1427 : ctx->recv_cb_private_data);
1428 11336 : messaging_dgm_close_unconsumed(fds, num_fds);
1429 :
1430 11336 : TALLOC_FREE(msg);
1431 11334 : return;
1432 :
1433 13457 : close_fds:
1434 13457 : close_fd_array(fds, num_fds);
1435 : }
1436 :
1437 98733 : void messaging_dgm_destroy(void)
1438 : {
1439 98733 : TALLOC_FREE(global_dgm_context);
1440 98733 : }
1441 :
1442 517476 : int messaging_dgm_send(pid_t pid,
1443 : const struct iovec *iov, int iovlen,
1444 : const int *fds, size_t num_fds)
1445 : {
1446 517476 : struct messaging_dgm_context *ctx = global_dgm_context;
1447 68045 : struct messaging_dgm_out *out;
1448 68045 : int ret;
1449 517476 : unsigned retries = 0;
1450 :
1451 517476 : if (ctx == NULL) {
1452 0 : return ENOTCONN;
1453 : }
1454 :
1455 517476 : messaging_dgm_validate(ctx);
1456 :
1457 517589 : again:
1458 517589 : ret = messaging_dgm_out_get(ctx, pid, &out);
1459 517589 : if (ret != 0) {
1460 5939 : return ret;
1461 : }
1462 :
1463 511650 : DEBUG(10, ("%s: Sending message to %u\n", __func__, (unsigned)pid));
1464 :
1465 511650 : ret = messaging_dgm_out_send_fragmented(ctx->ev, out, iov, iovlen,
1466 : fds, num_fds);
1467 511650 : if (ret == ECONNREFUSED) {
1468 : /*
1469 : * We cache outgoing sockets. If the receiver has
1470 : * closed and re-opened the socket since our last
1471 : * message, we get connection refused. Retry.
1472 : */
1473 :
1474 113 : TALLOC_FREE(out);
1475 :
1476 113 : if (retries < 5) {
1477 113 : retries += 1;
1478 113 : goto again;
1479 : }
1480 : }
1481 443492 : return ret;
1482 : }
1483 :
1484 557072 : static int messaging_dgm_read_unique(int fd, uint64_t *punique)
1485 : {
1486 17634 : char buf[25];
1487 17634 : ssize_t rw_ret;
1488 557072 : int error = 0;
1489 17634 : unsigned long long unique;
1490 17634 : char *endptr;
1491 :
1492 557072 : rw_ret = pread(fd, buf, sizeof(buf)-1, 0);
1493 557072 : if (rw_ret == -1) {
1494 0 : return errno;
1495 : }
1496 557072 : buf[rw_ret] = '\0';
1497 :
1498 557072 : unique = smb_strtoull(buf, &endptr, 10, &error, SMB_STR_STANDARD);
1499 557072 : if (error != 0) {
1500 0 : return error;
1501 : }
1502 :
1503 557072 : if (endptr[0] != '\n') {
1504 0 : return EINVAL;
1505 : }
1506 557072 : *punique = unique;
1507 557072 : return 0;
1508 : }
1509 :
1510 557076 : int messaging_dgm_get_unique(pid_t pid, uint64_t *unique)
1511 : {
1512 557076 : struct messaging_dgm_context *ctx = global_dgm_context;
1513 17634 : struct sun_path_buf lockfile_name;
1514 17634 : int ret, fd;
1515 :
1516 557076 : if (ctx == NULL) {
1517 0 : return EBADF;
1518 : }
1519 :
1520 557076 : messaging_dgm_validate(ctx);
1521 :
1522 557076 : if (pid == tevent_cached_getpid()) {
1523 : /*
1524 : * Protect against losing our own lock
1525 : */
1526 467480 : return messaging_dgm_read_unique(ctx->lockfile_fd, unique);
1527 : }
1528 :
1529 89596 : ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
1530 89596 : "%s/%u", ctx->lockfile_dir.buf, (int)pid);
1531 89596 : if (ret < 0) {
1532 0 : return errno;
1533 : }
1534 89596 : if ((size_t)ret >= sizeof(lockfile_name.buf)) {
1535 0 : return ENAMETOOLONG;
1536 : }
1537 :
1538 89596 : fd = open(lockfile_name.buf, O_NONBLOCK|O_RDONLY, 0);
1539 89596 : if (fd == -1) {
1540 4 : return errno;
1541 : }
1542 :
1543 89592 : ret = messaging_dgm_read_unique(fd, unique);
1544 89592 : close(fd);
1545 89592 : return ret;
1546 : }
1547 :
1548 15347 : int messaging_dgm_cleanup(pid_t pid)
1549 : {
1550 15347 : struct messaging_dgm_context *ctx = global_dgm_context;
1551 0 : struct sun_path_buf lockfile_name, socket_name;
1552 0 : int fd, len, ret;
1553 15347 : struct flock lck = {
1554 : .l_pid = 0,
1555 : };
1556 :
1557 15347 : if (ctx == NULL) {
1558 0 : return ENOTCONN;
1559 : }
1560 :
1561 15347 : len = snprintf(socket_name.buf, sizeof(socket_name.buf), "%s/%u",
1562 15347 : ctx->socket_dir.buf, (unsigned)pid);
1563 15347 : if (len < 0) {
1564 0 : return errno;
1565 : }
1566 15347 : if ((size_t)len >= sizeof(socket_name.buf)) {
1567 0 : return ENAMETOOLONG;
1568 : }
1569 :
1570 15347 : len = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf), "%s/%u",
1571 15347 : ctx->lockfile_dir.buf, (unsigned)pid);
1572 15347 : if (len < 0) {
1573 0 : return errno;
1574 : }
1575 15347 : if ((size_t)len >= sizeof(lockfile_name.buf)) {
1576 0 : return ENAMETOOLONG;
1577 : }
1578 :
1579 15347 : fd = open(lockfile_name.buf, O_NONBLOCK|O_WRONLY, 0);
1580 15347 : if (fd == -1) {
1581 15305 : ret = errno;
1582 15305 : if (ret != ENOENT) {
1583 0 : DEBUG(10, ("%s: open(%s) failed: %s\n", __func__,
1584 : lockfile_name.buf, strerror(ret)));
1585 : }
1586 15305 : return ret;
1587 : }
1588 :
1589 42 : lck.l_type = F_WRLCK;
1590 42 : lck.l_whence = SEEK_SET;
1591 42 : lck.l_start = 0;
1592 42 : lck.l_len = 0;
1593 :
1594 42 : ret = fcntl(fd, F_SETLK, &lck);
1595 42 : if (ret != 0) {
1596 0 : ret = errno;
1597 0 : if ((ret != EACCES) && (ret != EAGAIN)) {
1598 0 : DEBUG(10, ("%s: Could not get lock: %s\n", __func__,
1599 : strerror(ret)));
1600 : }
1601 0 : close(fd);
1602 0 : return ret;
1603 : }
1604 :
1605 42 : DEBUG(10, ("%s: Cleaning up : %s\n", __func__, strerror(ret)));
1606 :
1607 42 : (void)unlink(socket_name.buf);
1608 42 : (void)unlink(lockfile_name.buf);
1609 42 : (void)close(fd);
1610 42 : return 0;
1611 : }
1612 :
1613 0 : static int messaging_dgm_wipe_fn(pid_t pid, void *private_data)
1614 : {
1615 0 : pid_t *our_pid = (pid_t *)private_data;
1616 0 : int ret;
1617 :
1618 0 : if (pid == *our_pid) {
1619 : /*
1620 : * fcntl(F_GETLK) will succeed for ourselves, we hold
1621 : * that lock ourselves.
1622 : */
1623 0 : return 0;
1624 : }
1625 :
1626 0 : ret = messaging_dgm_cleanup(pid);
1627 0 : DEBUG(10, ("messaging_dgm_cleanup(%lu) returned %s\n",
1628 : (unsigned long)pid, ret ? strerror(ret) : "ok"));
1629 :
1630 0 : return 0;
1631 : }
1632 :
1633 0 : int messaging_dgm_wipe(void)
1634 : {
1635 0 : pid_t pid = tevent_cached_getpid();
1636 0 : messaging_dgm_forall(messaging_dgm_wipe_fn, &pid);
1637 0 : return 0;
1638 : }
1639 :
1640 225 : int messaging_dgm_forall(int (*fn)(pid_t pid, void *private_data),
1641 : void *private_data)
1642 : {
1643 225 : struct messaging_dgm_context *ctx = global_dgm_context;
1644 1 : DIR *msgdir;
1645 1 : struct dirent *dp;
1646 225 : int error = 0;
1647 :
1648 225 : if (ctx == NULL) {
1649 0 : return ENOTCONN;
1650 : }
1651 :
1652 225 : messaging_dgm_validate(ctx);
1653 :
1654 : /*
1655 : * We scan the socket directory and not the lock directory. Otherwise
1656 : * we would race against messaging_dgm_lockfile_create's open(O_CREAT)
1657 : * and fcntl(SETLK).
1658 : */
1659 :
1660 225 : msgdir = opendir(ctx->socket_dir.buf);
1661 225 : if (msgdir == NULL) {
1662 0 : return errno;
1663 : }
1664 :
1665 14803 : while ((dp = readdir(msgdir)) != NULL) {
1666 8 : unsigned long pid;
1667 8 : int ret;
1668 :
1669 14578 : pid = smb_strtoul(dp->d_name, NULL, 10, &error, SMB_STR_STANDARD);
1670 14578 : if ((pid == 0) || (error != 0)) {
1671 : /*
1672 : * . and .. and other malformed entries
1673 : */
1674 450 : continue;
1675 : }
1676 :
1677 14128 : ret = fn(pid, private_data);
1678 14128 : if (ret != 0) {
1679 0 : break;
1680 : }
1681 : }
1682 225 : closedir(msgdir);
1683 :
1684 225 : return 0;
1685 : }
1686 :
1687 : struct messaging_dgm_fde {
1688 : struct tevent_fd *fde;
1689 : };
1690 :
1691 158458 : static int messaging_dgm_fde_ev_destructor(struct messaging_dgm_fde_ev *fde_ev)
1692 : {
1693 158458 : if (fde_ev->ctx != NULL) {
1694 142640 : DLIST_REMOVE(fde_ev->ctx->fde_evs, fde_ev);
1695 142640 : fde_ev->ctx = NULL;
1696 : }
1697 158458 : return 0;
1698 : }
1699 :
1700 : /*
1701 : * Reference counter for a struct tevent_fd messaging read event
1702 : * (with callback function) on a struct tevent_context registered
1703 : * on a messaging context.
1704 : *
1705 : * If we've already registered this struct tevent_context before
1706 : * (so already have a read event), just increase the reference count.
1707 : *
1708 : * Otherwise create a new struct tevent_fd messaging read event on the
1709 : * previously unseen struct tevent_context - this is what drives
1710 : * the message receive processing.
1711 : *
1712 : */
1713 :
1714 639179 : struct messaging_dgm_fde *messaging_dgm_register_tevent_context(
1715 : TALLOC_CTX *mem_ctx, struct tevent_context *ev)
1716 : {
1717 639179 : struct messaging_dgm_context *ctx = global_dgm_context;
1718 21974 : struct messaging_dgm_fde_ev *fde_ev;
1719 21974 : struct messaging_dgm_fde *fde;
1720 :
1721 639179 : if (ctx == NULL) {
1722 0 : return NULL;
1723 : }
1724 :
1725 639179 : fde = talloc(mem_ctx, struct messaging_dgm_fde);
1726 639179 : if (fde == NULL) {
1727 0 : return NULL;
1728 : }
1729 :
1730 837490 : for (fde_ev = ctx->fde_evs; fde_ev != NULL; fde_ev = fde_ev->next) {
1731 718898 : if (tevent_fd_get_flags(fde_ev->fde) == 0) {
1732 : /*
1733 : * If the event context got deleted,
1734 : * tevent_fd_get_flags() will return 0
1735 : * for the stale fde.
1736 : *
1737 : * In that case we should not
1738 : * use fde_ev->ev anymore.
1739 : */
1740 60162 : continue;
1741 : }
1742 658736 : if (fde_ev->ev == ev) {
1743 503527 : break;
1744 : }
1745 : }
1746 :
1747 639179 : if (fde_ev == NULL) {
1748 118592 : fde_ev = talloc(fde, struct messaging_dgm_fde_ev);
1749 118592 : if (fde_ev == NULL) {
1750 0 : return NULL;
1751 : }
1752 118592 : fde_ev->fde = tevent_add_fd(
1753 : ev, fde_ev, ctx->sock, TEVENT_FD_READ,
1754 : messaging_dgm_read_handler, ctx);
1755 118592 : if (fde_ev->fde == NULL) {
1756 0 : TALLOC_FREE(fde);
1757 0 : return NULL;
1758 : }
1759 118592 : fde_ev->ev = ev;
1760 118592 : fde_ev->ctx = ctx;
1761 118592 : DLIST_ADD(ctx->fde_evs, fde_ev);
1762 118592 : talloc_set_destructor(
1763 : fde_ev, messaging_dgm_fde_ev_destructor);
1764 : } else {
1765 : /*
1766 : * Same trick as with tdb_wrap: The caller will never
1767 : * see the talloc_referenced object, the
1768 : * messaging_dgm_fde_ev, so problems with
1769 : * talloc_unlink will not happen.
1770 : */
1771 520587 : if (talloc_reference(fde, fde_ev) == NULL) {
1772 0 : TALLOC_FREE(fde);
1773 0 : return NULL;
1774 : }
1775 : }
1776 :
1777 639179 : fde->fde = fde_ev->fde;
1778 639179 : return fde;
1779 : }
1780 :
1781 503181 : bool messaging_dgm_fde_active(struct messaging_dgm_fde *fde)
1782 : {
1783 131599 : uint16_t flags;
1784 :
1785 503181 : if (fde == NULL) {
1786 0 : return false;
1787 : }
1788 503181 : flags = tevent_fd_get_flags(fde->fde);
1789 503181 : return (flags != 0);
1790 : }
|