#!@PERL_EXECUTABLE@ -w
#
# ==========================================================================
#
# ZoneMinder ONVIF Event Watcher Script
# Copyright (C) Jan M. Hochstein
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
# ==========================================================================
#
# This module contains the implementation of the ONVIF event watcher
#

## SETUP:
##    chmod g+rw /dev/shm/zm*
##    chgrp users /dev/shm/zm*
##    systemctl stop iptables

## DEBUG:
##    hexdump -x -s 0 -n 600 /dev/shm/zm.mmap.1
##    rm -rf /srv/www/html/zm/events/[...]
##

use strict;
use bytes;

use Carp;
use Data::Dump qw( dump );

use DBI;
use Getopt::Long qw(:config no_auto_abbrev no_ignore_case bundling);
use IO::Select;
use POSIX qw( EINTR );
use Time::HiRes qw( usleep );

use SOAP::Lite; # +trace;
use SOAP::Transport::HTTP;

use ZoneMinder;

require ONVIF::Client;

require WSNotification::Interfaces::WSBaseNotificationSender::NotificationProducerPort;
require WSNotification::Interfaces::WSBaseNotificationSender::SubscriptionManagerPort;
require WSNotification::Types::TopicExpressionType;

# ========================================================================
# Constants

# in seconds
use constant SUBSCRIPTION_RENEW_INTERVAL => 60;  #3600;
use constant SUBSCRIPTION_RENEW_EARLY => 5; # 60;
use constant MONITOR_RELOAD_INTERVAL => 3600;

# ========================================================================
# Globals

my $verbose = 0;
my $script;
my $daemon_pid;

my $monitor_reload_time = 0;

# this does not work on all architectures
my @EXTRA_SOCK_OPTS = (
  'ReuseAddr' => '1',
  #    'ReusePort' => '1',
  #    'Blocking'  => '0',
);


# =========================================================================
# signal handling

sub handler { # 1st argument is signal name
  my ($sig) = @_;
  Error("Caught a SIG$sig -- shutting down");
  confess();
  kill($daemon_pid) if defined $daemon_pid;
  exit(0);
}

$SIG{INT} = \&handler;
$SIG{HUP} = \&handler;
$SIG{QUIT} = \&handler;
$SIG{TERM} = \&handler;
$SIG{__DIE__} = \&handler;

# =========================================================================
# Debug

#use Data::Hexdumper qw(hexdump);
#use Devel::Peek;

sub dump_mapped {
  my ($monitor) = @_;

  if ( !zmMemVerify($monitor) ) {
    print "Error: Mapped memory not accessible\n";
  }
  my $mmap = $monitor->{MMap};
  print 'Size: '.$ZoneMinder::Memory::mem_size ."\n";
  printf("Mapped at %x\n", $monitor->{MMapAddr});
  #  Dump($mmap);
  if ( $mmap && $$mmap ) {
    #print hexdump(
    #data           => $$mmap, # what to dump
    #output_format  => '  %4a : %S %S %S %S %S %S %S %S         : %d',
    ##      start_position => 336,   # start at this offset ...
    #end_position   => 400    # ... and end at this offset
    #);
  }
}

#push @EXPORT, qw(dump_mapped);

# =========================================================================
# internal methods

sub xs_duration {
  use integer;
  my ($seconds) = @_;
  my $s = $seconds % 60;
  $seconds /= 60;
  my $m = $seconds % 60;
  $seconds /= 60;
  my $h = $seconds % 24;
  $seconds /= 24;
  my $d = $seconds;
  my $str;
  if($d > 0) {
    $str = "P". $d;
  }
  else {
    $str = "P";
  }
  $str = $str . "T";
  if($h > 0) {
    $str = $str . $h . "H";
  }
  if($m > 0) {
    $str = $str . $m . "M";
  }
  if($s > 0) {
    $str = $str . $s . "S";
  }
  return $str;
}

# =========================================================================
### ZoneMinder integration

## make this a singleton ?
{
  package _ZoneMinder;

  use strict;
  use bytes;

  use base qw(Class::Std::Fast);

  use DBI;
  use Encode qw(decode encode);
  use Time::HiRes qw( usleep );
  use ZoneMinder;

  #  my %monitors = ();
  my $dbh;
  my %monitors_of; #   :ATTR(:name<monitors> :default<{}>);

  sub init {
    $dbh = zmDbConnect();
  }

  sub monitors {
    my ($self) = @_;
    return $monitors_of{ident $self}?%{ $monitors_of{ident $self} }:undef;
  }

  sub set_monitors {
    my ($self, %monitors_par) = @_;
    $monitors_of{ident $self} = \%monitors_par;
  }

  ## TODO: remember to refresh this in all threads (daemon is forked)
  sub loadMonitors {
    my ($self) = @_;
    Info("Loading monitors");
    $monitor_reload_time = time();

    my %new_monitors = ();

    my $sql = "SELECT * FROM Monitors WHERE Capturing != 'None' AND ONVIF_URL != ''";
    my $sth = $dbh->prepare_cached($sql) or Fatal("Can't prepare '$sql': ".$dbh->errstr());
    my $res = $sth->execute() or Fatal("Can't execute: ".$sth->errstr());
    while ( my $monitor = $sth->fetchrow_hashref() ) {
      if ( !defined $script ) {
        if ( !zmMemVerify($monitor) ) { # Check shared memory ok
          zmMemInvalidate($monitor);
          next if !zmMemVerify($monitor);
        }
      }

      Info('Monitor URL: '.$monitor->{ONVIF_URL});

      ## set up ONVIF client for monitor
      next if ! $monitor->{ONVIF_URL};

      my $soap_version;
      if ( $monitor->{ONVIF_Options} =~ /SOAP1([12])/ ) {
        $soap_version = "1.$1";
      } else {
        $soap_version = '1.1';
      }
      my $client = ONVIF::Client->new( {
          url_svc_device => $monitor->{ONVIF_URL},
          soap_version => $soap_version } );

      if ( $monitor->{ONVIF_Username} ) {
        $client->set_credentials($monitor->{ONVIF_Username}, $monitor->{ONVIF_Password}, 0);
      }

      $client->create_services();
      $monitor->{onvif_client} = $client;

      $new_monitors{$monitor->{Id}} = $monitor;
    } # end foreach monitor
    $self->set_monitors(%new_monitors);
  } # end foreach db monitor

  sub freeMonitors {
    my ($self) = @_;
    my %monitors = $self->monitors();
    foreach my $monitor ( values %monitors ) {
      # Free up any used memory handle
      zmMemInvalidate($monitor);
    }
  }

  sub eventOn {
    my ($self, $monitorId, $score, $cause, $text, $showtext) = @_;
    #	Info( "Trigger '$trigger'\n" );
    Info("On: $monitorId, $score, $cause, $text, $showtext");
    my %monitors = $self->monitors();
    my $monitor = $monitors{$monitorId};
    if ( defined $script ) {
      #      eval {
      system($script, 'On', $monitor->{Name}, $monitor->{Path}, $cause);
      # this goes to "stopped" in ffmpeg when executed from shell - why?
      #      }
    } else {
      # encode() ensures that no utf-8 is written to mmap'ed memory.
      zmTriggerEventOn($monitor, $score, encode('utf-8', $cause), encode('utf-8', $text));
      zmTriggerShowtext($monitor, encode('utf-8', $showtext)) if defined($showtext);
      #      main::dump_mapped($monitor);
    }
  }

  sub eventOff {
    my ($self, $monitorId, $score, $cause, $text, $showtext) = @_;
    Info("Off: $monitorId, $score, $cause, $text, $showtext");
    my %monitors = $self->monitors();
    my $monitor = $monitors{$monitorId};
    if ( defined $script ) {
      #      eval {
      system($script, 'Off', $monitor->{Name}, $monitor->{Path}, $cause);
      #      }
    } else {
      my $last_event = zmGetLastEvent($monitor);
      zmTriggerEventOff($monitor);
      # encode() ensures that no utf-8 is written to mmap'ed memory.
      zmTriggerShowtext($monitor, encode('utf-8', $showtext) ) if defined($showtext);
      #	Info( "Trigger '$trigger'\n" );
      # Wait til it's finished
      while ( zmInAlarm($monitor) && ($last_event == zmGetLastEvent($monitor)) ) {
        # Tenth of a second
        usleep(100000);
      }
      zmTriggerEventCancel($monitor);
      #    main::dump_mapped($monitor);
    }
  }
}  # end package _ZoneMinder
# =========================================================================
### (experimental) send email

sub send_picture_email {
  #  'ffmpeg -i "rtsp://admin:admin123@192.168.0.70:554/Streaming/Channels/1?transportmode=mcast&profile=Profile_1" -y -frames 1 -vf scale=1024:-1 /tmp/pic2.jpg'
}

# =========================================================================
### Consumer for Notify messages

$SOAP::Constants::DO_NOT_CHECK_MUSTUNDERSTAND = 1;

## make this a singleton ?
{
  package _Consumer;
  use strict;
  use bytes;
  use base qw(Class::Std::Fast SOAP::Server::Parameters);

  use ZoneMinder;

  my $zm;

  sub BUILD {
    my ($self, $ident, $arg_ref) = @_;
    #    $zm_of{$ident} = check_name( $arg_ref->{zm} );
    $zm = $arg_ref->{zm};
  }

  #
  # called on http://docs.oasis-open.org/wsn/bw-2/NotificationConsumer/Notify
  #
  sub Notify {
    my ($self, $unknown, $som) = @_;
    Debug('### Notify');
    my $req = $som->context->request;
    #    Data::Dump::dump($req);
    my $id = 0;
    if ( $req->uri->path =~ m|/ref_(.*)/| ) {
      $id = $1;
    } else {
      Warning('Unknown URL '.$req->uri->path.' called by event');
      return ();
    }
    #    Data::Dump::dump($som);
    my $action = $som->valueof('/Envelope/Header/Action');
    Debug('  Action = '.$action);
    my $msg = $som->match('/Envelope/Body/Notify/NotificationMessage');
    my $topic = $msg->valueof('Topic');
    my $msg2  = $msg->match('Message/Message');
    #    Data::Dump::dump($msg2->current());
    my $time = $msg2->dataof()->attr->{'UtcTime'};

    my (%source, %data);
    foreach my $item ($msg2->dataof('Source/SimpleItem')) {
      $source{$item->attr->{Name}} = $item->attr->{Value};
      #      print $item->attr->{Name} ."=>". $item->attr->{Value} ."\n";
    }
    foreach my $item ($msg2->dataof('Data/SimpleItem')) {
      $data{$item->attr->{Name}} = $item->attr->{Value};
    }
    Debug("Ref=$id, Topic=$topic, $time, Rule=$source{Rule}, isMotion=$data{IsMotion}");
    if ( lc($data{IsMotion}) eq 'true' ) {
      $zm->eventOn($id, 100, $source{Rule}, $time);
    } elsif ( lc($data{IsMotion}) eq 'false' ) {
      $zm->eventOff($id, 100, $source{Rule}, $time);
    }
    return ();
  }
} # end Consumer

# =========================================================================

sub daemon_main {
  my ($daemon) = @_;

  #  $daemon->handle();

  # improve responsiveness with multiple clients (cameras)
  my $d = $daemon->{_daemon};
  my $select = IO::Select->new();
  $select->add($d);
  while ($select->count() ) {
    my @ready = $select->can_read();  # blocks
    foreach my $connection (@ready) {
      if ( $connection == $d ) {
        # on the daemon accept and add the connection
        my $client = $connection->accept();
        $select->add($client);
      } else {
        # it's a client connection
        my $request = $connection->get_request();
        if ( $request ) {
          # process the request (taken from SOAP::Transport::HTTP::Daemon->handle() )
          $daemon->request($request);
          $daemon->SOAP::Transport::HTTP::Server::handle();
          eval {
            local $SIG{PIPE} = sub { print("SIGPIPE\n") };  # die?
            $connection->send_response( $daemon->response );
          };
          if ( $@ && $@ !~ /^SIGPIPE/ ) {
            print($@);  # die?
          }
        } else {
          # connection was closed by the client
          $select->remove($connection);
          $connection->close();  # is this necessary?
        }
      } # end if new connection or existing
    } # end foreach connection
  } # end while select->count
} # end sub daemon_main

sub start_daemon {
  my ($localip, $localport, $zm) = @_;

  ### daemon
  my $daemon = SOAP::Transport::HTTP::Daemon->new(
    LocalAddr => $localip,
    LocalPort => $localport,
    #        'deserializer' => $deserializer,
    @EXTRA_SOCK_OPTS
  );

  ## handling

  # we only handle one method
  $daemon->on_dispatch( sub {
      return ( 'http://docs.oasis-open.org/wsn/bw-2/NotificationConsumer', 'Notify' );
    });

  $daemon_pid = fork();
  die "fork() failed: $!" unless defined $daemon_pid;
  if ( $daemon_pid ) {

    # this is a new process --> use new name and log file
    $0 = $0.' [http-daemon]';
    logInit(id => 'zmonvif-trigger-httpd');
    logSetSignal();

    # $zm is copied and the mmap'ed regions still exist
    my $consumer = _Consumer->new({zm => $zm});
    $daemon->dispatch_with({
        #       "http://docs.oasis-open.org/wsn/bw-2" => $consumer,
        'http://docs.oasis-open.org/wsn/bw-2/NotificationConsumer' => $consumer,
      });
    daemon_main($daemon);
  } else {
    return $daemon;
  }
} # end sub start_daemon

require WSNotification::Elements::Subscribe;
require WSNotification::Types::EndpointReferenceType;
#require WSNotification::Types::ReferenceParametersType;
#require WSNotification::Elements::Metadata;
require WSNotification::Types::FilterType;
require WSNotification::Elements::TopicExpression;
require WSNotification::Elements::MessageContent;
require WSNotification::Types::AbsoluteOrRelativeTimeType;
require WSNotification::Types::AttributedURIType;

sub subscribe {
  my ($client, $localaddr, $topic_str, $duration, $ref_id) = @_;

  # for debugging:
  #  $client->get_endpoint('events')->no_dispatch(1);

  my $result = $client->get_endpoint('events')->Subscribe( {
      ConsumerReference =>  { # WSNotification::Types::EndpointReferenceType
        Address =>  { value => 'http://' . $localaddr . '/ref_'. $ref_id . '/' },
        #      ReferenceParameters =>  { # WSNotification::Types::ReferenceParametersType
        #      },
        #      Metadata =>  { # WSNotification::Types::MetadataType
        #      },
      },
      Filter =>  { # WSNotification::Types::FilterType
        TopicExpression => { # WSNotification::Types::TopicExpressionType
          xmlattr => {
            Dialect => "http://www.onvif.org/ver10/tev/topicExpression/ConcreteSet",
          },
          value => $topic_str,
        },
        #      MessageContent =>  { # WSNotification::Types::QueryExpressionType
        #      },
      },
      InitialTerminationTime => xs_duration($duration), # AbsoluteOrRelativeTimeType
      #    SubscriptionPolicy =>  {
      #    },
    },,
  );

  die $result if not $result;
  # print $result . "\n";

  ### build Subscription Manager
  my $submgr_addr = $result->get_SubscriptionReference()->get_Address()->get_value();
  Info("Subscription Manager at $submgr_addr");

  my $serializer = $client->service('device', 'ep')->get_serializer();

  my $submgr_svc = WSNotification::Interfaces::WSBaseNotificationSender::SubscriptionManagerPort->new({
      serializer => $serializer,
      proxy => $submgr_addr,
    });

  return $submgr_svc;
} # end sub subscribe

sub unsubscribe {
  my ($submgr_svc) = @_;

  $submgr_svc->Unsubscribe( { },, );
}

sub renew {
  my ($submgr_svc, $duration) = @_;

  my $result = $submgr_svc->Renew( {
      TerminationTime => xs_duration($duration), # AbsoluteOrRelativeTimeType
    },,
  );
  die $result if not $result;
}

sub events {
  my ($localip, $localport) = @_;

  my $zm = _ZoneMinder->new();
  $zm->init();
  $zm->loadMonitors();  # call before fork()

  my %monitors = $zm->monitors();
  my $monitor_count = scalar keys(%monitors);
  if ( $monitor_count == 0 ) {
    Warning('No active ONVIF monitors found. Exiting');
    return;
  }
  Debug("Found $monitor_count active ONVIF monitors");
  Info('ONVIF Trigger daemon starting');

  if ( !defined $localip ) {
    #$localip = '192.168.0.2';
    #$localport = '0';
  }

  # re-use local address/port
  #   @LWP::Protocol::http::EXTRA_SOCK_OPTS =
  *LWP::Protocol::http::_extra_sock_opts = sub {
    #      print "### extra_sock_opts ########################################\n";
    @EXTRA_SOCK_OPTS;
  };

  #*LWP::Protocol::http::_check_sock = sub
  #{
  #    my($self, $req, $sock) = @_;
  #    print "### check_sock ########################################\n";
  #    dump($sock);
  #};

  my $daemon = start_daemon($localip, $localport, $zm);
  my $port = $daemon->url;
  $port =~ s|^.*:||;
  $port =~ s|/.*$||;
  my $localaddr = $localip . ':' . $port;

  Info('Daemon uses local address '.$localaddr);

  # This value is passed as the LocalAddr argument to IO::Socket::INET.
  my $transport = SOAP::Transport::HTTP::Client->new(
    #      'local_address' => $localaddr );     ## REUSE port
    local_address => $localip );

  foreach my $monitor (values(%monitors)) {

    my $client = $monitor->{onvif_client};
    my $event_svc = $client->get_endpoint('events');
    $event_svc->set_transport($transport);
    #      print "Sending from local address " .
    #        $event_svc->get_transport()->local_address . "\n";

    my $submgr_svc = subscribe(
      $client, $localaddr, 'tns1:RuleEngine//.',
      SUBSCRIPTION_RENEW_INTERVAL, $monitor->{Id});

    if ( !$submgr_svc ) {
      Warning('Subscription failed for monitor #'.$monitor->{Id});
      next;
    }

    $monitor->{submgr_svc} = $submgr_svc;
  } # end foreach monitor

  while (1) {
    Info('Sleeping for ' . (SUBSCRIPTION_RENEW_INTERVAL - SUBSCRIPTION_RENEW_EARLY).' seconds');
    sleep(SUBSCRIPTION_RENEW_INTERVAL - SUBSCRIPTION_RENEW_EARLY);
    Info('Renewal');
    my %monitors = $zm->monitors();
    foreach my $monitor (values(%monitors)) {
      if ( defined $monitor->{submgr_svc} ) {
        renew($monitor->{submgr_svc}, SUBSCRIPTION_RENEW_INTERVAL + SUBSCRIPTION_RENEW_EARLY);
      }
    }
  };

  Info('ONVIF Trigger daemon exited');

  %monitors = $zm->monitors();
  foreach my $monitor (values(%monitors)) {
    if ( defined $monitor->{submgr_svc} ) {
      unsubscribe($monitor->{submgr_svc});
    }
  }
} # end sub events

# ========================================================================
# options processing

sub HELP_MESSAGE {
  my ($fh, $pkg, $ver, $opts) = @_;
  print $fh "Usage: " . __FILE__ . " <parameters>\n";
  print $fh  <<EOF
  Parameters:
    -v              - increase verbosity
    -l|local-addr   - listen on address (host[:port])
    -s|script       - run script instead of generating ZM events
EOF
}

# ========================================================================
# MAIN

my ($localaddr, $localip, $localport);

# canonicalize command name
my $command = $0;
$0 = $command;

print("Init Log $command\n");
logInit(id=>'zm_onvif-trigger');
logSetSignal();

if ( !GetOptions(
    'local-addr|l=s'    => \$localaddr,
    'script|s=s'        => \$script,
    'verbose|v=s'       => \$verbose,
  )) {
  HELP_MESSAGE(\*STDOUT);
  exit(1);
}

if ( defined $localaddr ) {
  if ( $localaddr =~ /(.*):(.*)/ ) {
    ($localip, $localport) = ($1, $2);
  } else {
    $localip = $localaddr;
    $localport = '0';
  }
}

events($localip, $localport);

1;
__END__

=head1 NAME

zmonvif-trigger.pl - ZoneMinder ONVIF trigger daemon

=head1 SYNOPSIS

 zmonfig-trigger.pl [-v] [-s] [-l=]


=head1 DESCRIPTION


=head1 OPTIONS

    local-addr    - local address to bind to
    script|s=s   - script to run
    verbose|v=s  - increase verbosity

=cut