## # # Copyright 2001, AllAfrica Global Media # # This file is part of XML::Comma # # XML::Comma is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # For more information about XML::Comma, point a web browser at # http://xml-comma.org, or read the tutorial included # with the XML::Comma distribution at docs/guide.html # ## package XML::Comma::SQL::Base; use XML::Comma::Storage::Util (); use XML::Comma::Util qw( dbg ); use Sys::Hostname (); use strict; sub sql_create_lock_table { my $lock = shift; my $dbh = $lock->get_dbh_writer(); # if database is postgres, we b0rked our cursor when we tried to access # the non-existent comma_lock table... eval{} is in case we have an old # version of DBD::Pg that doesn't support transactions eval { $dbh->rollback() if(XML::Comma->system_db() eq 'postgres'); }; my $sth = $dbh->prepare_cached ( qq[ CREATE TABLE comma_lock ( doc_key VARCHAR(255) UNIQUE, pid INT, info VARCHAR(255), time INT ) ]); $sth->execute(); $sth->finish(); } sub sql_create_hold_table { die "sql_create_hold_table is not implemented"; } # $lock, $key sub sql_get_lock_record { my ($lock, $key) = @_; my $dbh = $lock->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ SELECT doc_key, pid, info, time FROM comma_lock WHERE doc_key = ? ]); $sth->execute( $key ); my $result = $sth->fetchrow_arrayref(); $sth->finish(); return $result ? { doc_key => $result->[0], pid => $result->[1], info => $result->[2], time => $result->[3] } : ''; } # $lock sub sql_delete_locks_held_by_this_pid { my $lock = shift; my $dbh = $lock->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ DELETE from comma_lock WHERE pid = ? ] ); $sth->execute( $$ ); $sth->finish(); } # $lock, $key - returns 1 if row-insert succeeds, 0 on duplicate # key. throws error for any error other than duplicate key. sub sql_doc_lock { my ($lock, $key) = @_; my $dbh = $lock->get_dbh_writer(); eval { my $sth = $dbh->prepare_cached ( qq[ INSERT INTO comma_lock ( doc_key, pid, time, info ) VALUES ( ?, ?, ?, ? ) ]); $sth->execute( $key, $$, time(), Sys::Hostname::hostname() ); $sth->finish(); }; if ( $@ ) { # dbg 'sql lock insert error', $@; we actually want to get an # error on a failed lock. we catch the error and check whether it # signals an attempt to insert a "duplicate" key. If so, the lock # attempt failed, so we return 0. if ( $@ =~ /duplicate/i ) { # print "lock on $_[1] failed\n"; return 0; } die "$@\n"; } return 1; } # $lock, $key sub sql_doc_unlock { my ($lock, $key) = @_; my $dbh = $lock->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ DELETE FROM comma_lock WHERE doc_key = ? ]); $sth->execute( $key ); $sth->finish(); } sub sql_get_hold { die "sql_get_hold unimplemented"; } sub sql_release_hold { die "sql_release_hold unimplemented"; } # # -------------------------------- # sub sql_create_index_tables_table { die "sql_create_index_tables_table unimplemented"; } # table_type => const for the table_type column # table_def_sub => string of sub name to call to get table def # existing_table_name => pass this to *re_create* a table under old name # index_def => string for index_def column (if any) # sort_spec => string for sort_spec column (if any) # textsearch => string for text_search column (if any) # collection => name of collection being binary indexed (if any) sub sql_create_a_table { my ($index, %arg) = @_; my $dbh = $index->get_dbh_writer(); my $sort_spec = $arg{sort_spec} || ''; my $textsearch = $arg{textsearch} || ''; my $collection = $arg{collection} || ''; my $index_def = $arg{index_def} || ''; my $table_type = $arg{table_type} || ''; my $name; my $table_def_sub = $arg{table_def_sub} || die "need table def sub"; if ( ! $arg{existing_table_name} ) { # add an appropriate line to the index table my $sth = $dbh->prepare_cached ( qq[ INSERT INTO index_tables ( doctype, index_name, last_modified, _comma_flag, index_def, sort_spec, textsearch, collection, table_type ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ? ) ]); $sth->execute ( $index->doctype(), $index->element('name')->get(), time(), 0, $index_def, $sort_spec, $textsearch, $collection, $table_type ); $sth->finish(); # make a name for that table my $stub = substr ( $index->doctype(), 0, 8 ); $sth = $dbh->prepare_cached ( qq[ SELECT _sq FROM index_tables WHERE doctype = ? AND index_name = ? AND table_type = ? AND index_def = ? AND sort_spec = ? AND textsearch = ? AND collection = ? ]); $sth->execute ( $index->doctype(), $index->element('name')->get(), $table_type, $index_def, $sort_spec, $textsearch, $collection ); my $s = $sth->fetchrow_arrayref()->[0]; $sth->finish(); $name = $stub . '_' . sprintf ( "%04s", $s ); $sth = $dbh->prepare_cached ( qq[ UPDATE index_tables SET table_name = ? WHERE _sq = ? ]); $sth->execute( $name, $s ); $sth->finish(); } else { $name = $arg{existing_table_name}; } # now make the table eval { my $sth = $dbh->prepare_cached ( $index->$table_def_sub( $name, %arg ) ); $sth->execute(); $sth->finish(); }; if ( $@ ) { die "couldn't create database table ($table_def_sub). DB says: $@\n"; } return $name; } sub sql_create_data_table { my ( $index, $existing_table_name ) = @_; return $index->sql_create_a_table ( table_type => XML::Comma::Indexing::Index->DATA_TABLE_TYPE(), index_def => $index->to_string(), table_def_sub => 'sql_data_table_definition', existing_table_name => $existing_table_name ); } sub sql_data_table_definition { die "sql_data_table_definition is not implemented"; } sub sql_data_table_name { my $index = shift(); my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ SELECT table_name FROM index_tables WHERE doctype = ? AND index_name = ? AND table_type = ? ]); $sth->execute( $index->doctype(), $index->element('name')->get(), XML::Comma::Indexing::Index->DATA_TABLE_TYPE() ); my $result = $sth->fetchrow_arrayref(); $sth->finish(); return $result ? $result->[0] : die "FIX: no data table name found\n"; } sub sql_get_def { my $index = shift(); my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ SELECT index_def FROM index_tables WHERE doctype = ? AND index_name = ? AND table_type = ? ]); $sth->execute( $index->doctype(), $index->element('name')->get(), $index->DATA_TABLE_TYPE() ); my $result = $sth->fetchrow_arrayref(); $sth->finish(); return $result ? $result->[0] : ''; } sub sql_update_def_in_tables_table { my $index = shift(); my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ UPDATE index_tables SET index_def = ? WHERE table_name = ? ]); $sth->execute( $index->to_string(), $index->data_table_name() ); $sth->finish(); } # handles drop or modify. if there is a third arg, we assume that's a # column type, and this is a modify. sub sql_alter_data_table_drop_or_modify { my ( $index, $field_name, $field_type ) = @_; my $dbh = $index->get_dbh_writer(); my $data_table = $index->data_table_name(); if ( $field_type ) { my $sth = $dbh->prepare_cached ( qq[ ALTER TABLE $data_table MODIFY $field_name $field_type ]); $sth->execute(); $sth->finish(); } else { my $sth = $dbh->prepare_cached ( qq[ ALTER TABLE $data_table DROP $field_name ]); $sth->execute(); $sth->finish(); } } sub sql_alter_data_table_add { my ( $index, $field_name, $field_type ) = @_; my $dbh = $index->get_dbh_writer(); my $data_table = $index->data_table_name(); my $sth = $dbh->prepare_cached ( qq[ ALTER TABLE $data_table ADD $field_name $field_type ]); $sth->execute(); $sth->finish(); } sub sql_alter_data_table_change_primary_key { my ( $index, @fields ) = @_; my $dbh = $index->get_dbh_writer(); my $data_table = $index->data_table_name(); my $new_key; unless ( XML::Comma->system_db() eq 'postgres' ) { if ( $index->doc_id_sql_type =~ /\((\d+)\)/ ) { $new_key = "doc_id($1), "; } else { $new_key = "doc_id(100), "; } $new_key .= join ", ", map { "$_(100)" } @fields; } else { # postgres $new_key = join ", ", ( 'doc_id', @fields ); } my $sth; if(XML::Comma->system_db() eq 'postgres') { $sth = $dbh->prepare_cached ( qq [ ALTER TABLE ${data_table} DROP CONSTRAINT ${data_table}_pkey ]); } else { $sth = $dbh->prepare_cached ( qq[ ALTER TABLE $data_table DROP PRIMARY KEY ]); } $sth->execute(); $sth->finish(); $sth = $dbh->prepare_cached ( qq[ ALTER TABLE $data_table ADD PRIMARY KEY ( $new_key ) ]); $sth->execute(); $sth->finish(); } sub sql_alter_data_table_add_collection { my ( $index, $field_name ) = @_; my $dbh = $index->get_dbh_writer(); my $data_table = $index->data_table_name(); my $sth = $dbh->prepare_cached ( qq[ ALTER TABLE $data_table ADD $field_name TEXT ]); $sth->execute(); $sth->finish(); } sub sql_alter_data_table_add_index { my ( $index, $sql_index ) = @_; my $dbh = $index->get_dbh_writer(); my $unique = ( $sql_index->element('unique')->get() ? 'UNIQUE' : '' ); my $fields = $sql_index->element( 'fields' )->get(); my $sql_index_name = $sql_index->element('name')->get() || die "sql_index must have a name\n"; my $data_table = $index->data_table_name(); my $sth = $dbh->prepare_cached ( qq[ CREATE $unique INDEX $sql_index_name ON $data_table ($fields) ]); $sth->execute(); $sth->finish(); } sub sql_alter_data_table_drop_index { my ( $index, $sql_index ) = @_; my $dbh = $index->get_dbh_writer(); my $data_table = $index->data_table_name(); eval { my $sth = $dbh->prepare_cached ( qq[ DROP INDEX $sql_index ON $data_table ]); $sth->execute(); $sth->finish(); }; if ( $@ ) { XML::Comma::Log->warn ( "warning: couldn't drop index $_[1]" ); } } sub sql_insert_into_data { my ( $index, $doc, $comma_flag ) = @_; $comma_flag ||= 0; # the normal case is to treat the doc's doc_id as a nearly-normal # field, getting its value straight from the doc. but there is a # special case where we want the doc_id to get its value here, # during the write, from the _sq number. my ( $doc_make_id_flag, $doc_id ) = ( undef, $doc->doc_id() ); #dbg 'insert_into_data', $doc_id; if ( $doc->doc_id() eq 'COMMA_DB_SEQUENCE_SET' ) { $doc_id = 0; $doc_make_id_flag = 1; }; # the core logic -- insert the row into the data table with all # columns properly filled my $dbh = $index->get_dbh_writer(); my $data_table = $index->data_table_name(); my @columns = $index->columns(); my $columns_list = join ( ',', 'doc_id', @columns ); my @columns_values = $doc_id; push @columns_values, map { $index->column_value($_, $doc,) } @columns; my $placeholders = join( ", ", ('?') x @columns_values ); # dbg 'sql', $string; my $sth = $dbh->prepare_cached ( qq[ INSERT INTO $data_table ( _comma_flag, record_last_modified, $columns_list ) VALUES ( ?, ?, $placeholders ) ]); #dbg 'columns', @columns_values; $sth->execute($comma_flag, time(), @columns_values ); $sth->finish(); # and, finally set the id field correctly, both in the db and in the # doc, if we're responsible for generating the id. CAVEAT: we only # set the 'id' info in the doc -- some caller up the chain should # take responsibility for making all of the doc's storage_info stuff # right. if ( $doc_make_id_flag ) { $sth = $dbh->prepare_cached ( qq[ SELECT _sq FROM $data_table WHERE doc_id = ? ]); $sth->execute( $doc_id ); $doc_id = $sth->fetchrow_arrayref->[0]; $sth->finish(); $sth = $dbh->prepare_cached ( qq[ UPDATE $data_table SET doc_id = ? WHERE _sq = ? ]); $sth->execute( $doc_id, $doc_id ); $sth->finish(); $doc->set_storage_info ( undef, undef, $doc_id ); } } sub sql_update_in_data { my ( $index, $doc, $comma_flag ) = @_; $comma_flag = $comma_flag || 0; my $dbh = $index->get_dbh_writer(); my $data_table = $index->data_table_name(); my $columns_sets = join ( ',', "_comma_flag = ?", "record_last_modified = ?", map { $_ . '=' . ' ?' } $index->columns() ); my $where = $index->sql_get_where_pk( $doc ); my $sth = $dbh->prepare_cached ( qq[ UPDATE $data_table SET $columns_sets $where ]); $sth->execute( $comma_flag, time(), ( map { $index->column_value($_,$doc) } $index->columns() )); $sth->finish(); } sub sql_delete_from_data { my ( $index, $doc ) = @_; my $dbh = $index->get_dbh_writer(); my $data_table = $index->data_table_name(); my $where = $index->sql_get_where_pk( $doc ); my $sth = $dbh->prepare_cached ( qq[ DELETE FROM $data_table $where ]); $sth->execute(); $sth->finish(); } # TODO: come back here and work some trickery to return a statement with # placeholders? sub sql_get_where_pk { my ( $index, $doc ) = @_; my $dbh = $index->get_dbh_writer(); my %key_fields; @key_fields{ qw( doctype store id ) } = map { $dbh->quote( $_ ) } XML::Comma::Storage::Util->split_key( $doc->doc_key() ); my $sql = "WHERE doc_id = " . $key_fields{ id }; foreach my $extra ( @{ $index->{ _Index_extra_pk_fields } } ) { $sql .= " AND $extra = " . $key_fields{ $extra }; } return $sql; } sub sql_create_sort_table { my ( $index, $sort_spec ) = @_; return $index->sql_create_a_table ( table_type => XML::Comma::Indexing::Index->SORT_TABLE_TYPE(), table_def_sub => 'sql_sort_table_definition', sort_spec => $sort_spec ); } sub sql_sort_table_definition { die "sql_sort_table_definition is not implemented"; } sub sql_get_sort_table_for_spec { my ( $index, $sort_spec ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ SELECT table_name FROM index_tables WHERE doctype = ? AND index_name = ? AND sort_spec = ? ]); $sth->execute( $index->doctype(), $index->element('name')->get(), $sort_spec ); my $result = $sth->fetchrow_arrayref(); $sth->finish(); return $result ? $result->[0] : ''; } sub sql_get_sort_spec_for_table { my ( $index, $table_name ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ SELECT sort_spec FROM index_tables WHERE table_name = ? ]); $sth->execute( $table_name ); my $result = $sth->fetchrow_arrayref(); $sth->finish(); return $result ? $result->[0] : ''; } # sort_name is optional -- if not given, just returns all sort tables sub sql_get_sort_tables { my ( $index, $sort_name ) = @_; my $dbh = $index->get_dbh_writer(); my $sql = qq[ SELECT table_name FROM index_tables WHERE doctype = ? AND index_name = ? AND table_type = ? ]; $sql .= $sort_name ? qq[ AND sort_spec LIKE ? ] : ''; my $sth = $dbh->prepare_cached ( $sql ); my @values = ( $index->doctype(), $index->element('name')->get(), $index->SORT_TABLE_TYPE ); push @values, $index->make_sort_spec( $sort_name, '' ) . '%' if $sort_name; $sth->execute( @values ); return map { $_->[0] } @{$sth->fetchall_arrayref()}; } sub sql_insert_into_sort { my ( $index, $doc_id, $sort_table_name ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ INSERT INTO $sort_table_name ( _comma_flag, doc_id ) VALUES ( ?, ? ) ] ); $sth->execute( 0, $doc_id ); $sth->finish(); } # returns the number of rows deleted sub sql_delete_from_sort { my ( $index, $doc_id, $sort_table_name ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ DELETE FROM $sort_table_name WHERE doc_id = ? ] ); $sth->execute( $doc_id ); $sth->finish(); } sub sql_create_bcollection_table { my ( $index, $collection_name, $bcoll_el ) = @_; return $index->sql_create_a_table ( table_type => XML::Comma::Indexing::Index->BCOLLECTION_TABLE_TYPE(), table_def_sub => 'sql_bcollection_table_definition', collection => $collection_name, bcoll_el => $bcoll_el ); } sub sql_bcollection_table_definition { die "sql_bcollection_table_definition is not implemented"; } sub sql_drop_bcollection_table { my ( $index, $collection_name ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ SELECT table_name FROM index_tables WHERE collection = ? AND doctype = ? ] ); $sth->execute( $collection_name, $index->doctype() ); while ( my $row = $sth->fetchrow_arrayref() ) { my $table_name = $row->[0]; my $sth = $dbh->prepare_cached ( qq[ DROP TABLE $table_name ] ); $sth->execute(); $sth->finish(); $sth = $dbh->prepare_cached ( qq[ DELETE FROM index_tables WHERE table_name = ? ] ); $sth->execute( $table_name ); $sth->finish(); } } # name is optional -- if not given, returns all bcollection table names sub sql_get_bcollection_table { my ( $index, $name ) = @_; my $dbh = $index->get_dbh_writer(); my $sql = qq[ SELECT table_name FROM index_tables WHERE doctype = ? AND index_name = ? AND table_type = ? ]; $sql .= qq[ AND collection = ? ] if $name; my @values = ( $index->doctype(), $index->element('name')->get(), $index->BCOLLECTION_TABLE_TYPE ); push @values, $name if $name; my $sth = $dbh->prepare_cached ( $sql ); $sth->execute( @values ); my $result = $sth->fetchall_arrayref(); if ( wantarray ) { return map { $_->[0] } @{$result}; } else { return $result->[0]->[0] || ''; } } sub sql_insert_into_bcollection { my ( $index, $table_name, $doc_id, $col_str, $col_extra ) = @_; my $dbh = $index->get_dbh_writer(); if ( $col_extra ) { my $sth = $dbh->prepare_cached ( qq[ INSERT INTO $table_name ( _comma_flag, doc_id, value, extra ) VALUES ( ?, ?, ?, ? ) ] ); $sth->execute( 0, $doc_id, $col_str, $col_extra ); $sth->finish(); } else { my $sth = $dbh->prepare_cached ( qq[ INSERT INTO $table_name ( _comma_flag, doc_id, value ) VALUES ( ?, ?, ? ) ] ); $sth->execute( 0, $doc_id, $col_str ); $sth->finish(); } } sub sql_get_values_from_bcollection { my ( $index, $doc_id, $table_name ) = @_; my $dbh = $index->get_dbh_reader(); my $sth = $dbh->prepare_cached ( qq[ SELECT value FROM $table_name WHERE doc_id = ? ] ); $sth->execute( $doc_id ); return map { $_->[0] } @{ $sth->fetchall_arrayref() }; } sub sql_delete_from_bcollection { my ( $index, $doc_id, $table_name ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ DELETE FROM $table_name WHERE doc_id = ? ] ); $sth->execute( $doc_id ); $sth->finish(); } sub sql_create_textsearch_tables { my ( $index, $textsearch ) = @_; $index->sql_create_a_table ( table_type => XML::Comma::Indexing::Index->TEXTSEARCH_INDEX_TABLE_TYPE(), table_def_sub => 'sql_textsearch_index_table_definition', textsearch => $textsearch->element('name')->get() ); $index->sql_create_a_table ( table_type => XML::Comma::Indexing::Index->TEXTSEARCH_DEFERS_TABLE_TYPE(), table_def_sub => 'sql_textsearch_defers_table_definition', textsearch => $textsearch->element('name')->get() ); return 1; } sub sql_textsearch_index_table_definition { die "sql_textsearch_index_table_definition is not implemented"; } sub sql_textsearch_defers_table_definition { die "sql_textsearch_defers_table_definition is not implemented"; } sub sql_drop_textsearch_tables { my ( $index, $textsearch_name ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ SELECT table_name FROM index_tables WHERE doctype = ? AND textsearch = ? ] ); $sth->execute( $index->doctype(), $textsearch_name ); while ( my $row = $sth->fetchrow_arrayref() ) { my $table_name = $row->[0]; my $sth = $dbh->prepare_cached ( "DROP TABLE $table_name" ); $sth->execute(); $sth->finish(); $sth = $dbh->prepare_cached ( qq[ DELETE FROM index_tables WHERE table_name = ? ] ); $sth->execute( $table_name ); $sth->finish(); } } sub sql_get_textsearch_tables { my ( $index, $textsearch_name ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ SELECT table_name FROM index_tables WHERE doctype = ? AND textsearch = ? ORDER BY table_type ] ); $sth->execute( $index->doctype(), $textsearch_name ); return map { $_->[0] } @{$sth->fetchall_arrayref()}; } sub sql_textsearch_word_lock { die "sql_textsearch_word_lock is not implemented"; } sub sql_textsearch_word_unlock { die "sql_textsearch_word_unlock is not implemented"; } sub sql_textsearch_pack_seq_list { die "sql_textsearch_pack_seq_list is not implemented"; } sub sql_textsearch_unpack_seq_list { die "sql_textsearch_unpack_seq_list is not implemented"; } # pass EITHER a single doc_id or a list of doc_seqs. sub sql_update_in_textsearch_index_table { my ( $index, $i_table_name, $word, $doc_id, $clobber, @doc_seqs ) = @_; my $dbh = $index->get_dbh_writer(); # generate a sequence if we were passed an id if ( $doc_id ) { @doc_seqs = ( $index->sql_get_sq_from_data_table($doc_id) ); } # just return without doing anything if we turn out not to have any # @doc_seqs. unless we're in $clobber mode, in which case we want to # enter an empty record. return if ! @doc_seqs and ! $clobber; my $packed = $index->sql_textsearch_pack_seq_list ( @doc_seqs ); $index->sql_textsearch_word_lock ( $i_table_name, $word ); # modify row my $sth =$dbh->prepare_cached ( qq[ SELECT seqs FROM $i_table_name WHERE word = ? ] ); $sth->execute( $word ); my $result = $sth->fetchrow_arrayref(); $sth->finish(); if ( $result ) { # if found, update my $new_seqs_string; if ( $result->[0] and ! $clobber ) { $new_seqs_string = $index->sql_textsearch_cat_seq_list($result->[0], $packed); } else { $new_seqs_string = $packed ; } my $sth = $dbh->prepare_cached ( qq[ UPDATE $i_table_name SET seqs = ? WHERE word = ? ] ); $sth->execute( $new_seqs_string, $word ); $sth->finish(); } else { # else insert my $sth = $dbh->prepare_cached ( qq[ INSERT INTO $i_table_name ( word, seqs ) VALUES ( ?, ? ) ] ); $sth->execute( $word, $packed ); $sth->finish(); } $index->sql_textsearch_word_unlock ( $i_table_name, $word ); } sub sql_get_sq_from_data_table { my ( $index, @doc_ids ) = @_; my @caller = caller(1); my @list; my $dbh = $index->get_dbh_writer(); my $data_table_name = $index->data_table_name(); foreach my $id ( @doc_ids ) { my $sth = $dbh->prepare_cached ( qq[ SELECT _sq from $data_table_name WHERE doc_id = ? ] ); $sth->execute( $id ); my $result = $sth->fetchrow_arrayref(); $sth->finish(); push ( @list, $result->[0] ) if $result; } return @list; } sub sql_delete_from_textsearch_index_table { my ( $index, $ts_table_name, $doc_id ) = @_; my ( $sq ) = $index->sql_get_sq_from_data_table( $doc_id ); # shortcut to return if this doc isn't indexed return if ! $sq; my $packed_sq = $index->sql_textsearch_pack_seq_list ( $sq ); # loop over all entries conatining $doc's id #dbg 'trying to delete', $doc_id, $ts_table_name; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ SELECT word FROM $ts_table_name WHERE seqs LIKE ? ] ); $sth->execute( '%' . $packed_sq . '%' ); while ( my $row = $sth->fetchrow_arrayref() ) { my $word = $row->[0]; # get lock $index->sql_textsearch_word_lock ( $ts_table_name, $word ); # fetch seqs column now that we have lock my $sth = $dbh->prepare_cached ( qq[ SELECT seqs FROM $ts_table_name WHERE word = ? ] ); $sth->execute( $word ); my $result = $sth->fetchrow_arrayref(); $sth->finish(); # remove the seq in question and re-put my %seqs = map { $_=>1 } $index->sql_textsearch_unpack_seq_list ( $result->[0] ); delete $seqs{$sq}; my $packed_seqs = $index->sql_textsearch_pack_seq_list( keys %seqs ); $sth = $dbh->prepare_cached ( qq[ UPDATE $ts_table_name SET seqs = ? WHERE word = ? ] ); $sth->execute( $packed_seqs, $word ); $sth->finish(); # release lock $index->sql_textsearch_word_unlock ( $ts_table_name, $word ); } } # DEFER DELETE ACTION CONST = 1; # DEFER UPDATE ACTION CONST = 2; sub sql_textsearch_defer_delete { my ( $index, $d_table_name, $doc_id ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ INSERT INTO $d_table_name ( doc_id, action ) VALUES ( ?, ? ) ] ); $sth->execute( $doc_id, 1 ); $sth->finish(); } sub sql_textsearch_defer_update { my ( $index, $d_table_name, $doc_id, $frozen_words ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ INSERT INTO $d_table_name ( doc_id, action, text ) VALUES ( ?, ?, ? ) ] ); $sth->execute( $doc_id, 2, $frozen_words ); $sth->finish(); } sub sql_get_textsearch_defers_sth { my ( $index, $d_table_name ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ SELECT doc_id, action, _sq, text FROM $d_table_name ORDER BY _sq ] ); $sth->execute(); return $sth; } sub sql_delete_from_textsearch_defers_table { my ( $index, $d_table_name, $doc_id, $seq ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ DELETE FROM $d_table_name WHERE doc_id = ? AND _sq <= ? ] ); $sth->execute( $doc_id, $seq ); $sth->finish(); } sub sql_get_textsearch_indexed_words { my ( $index, $i_table_name ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ SELECT word FROM $i_table_name ] ); $sth->execute(); return map { $_->[0] } @{$sth->fetchall_arrayref()}; } sub sql_get_textsearch_index_packed { my ( $index, $i_table_name, $word ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ SELECT seqs FROM $i_table_name WHERE word = ? ] ); $sth->execute( $word ); my $result = $sth->fetchrow_arrayref(); $sth->finish(); return $result ? $result->[0] : ''; } # returns count sub sql_key_indexed_p { # we have to accept the key and the id here. doc_key() isn't available # during the update() call for index only documents, and doc_id() isn't # enough to go on for update()'s effected by and my ( $index, $key, $ionly_id ) = @_; if ( ! $key ) { return 0 if $ionly_id eq 'COMMA_DB_SEQUENCE_SET'; die "internal index error: update without storage info"; } my ( $doctype, $store, $id ) = XML::Comma::Storage::Util->split_key( $key ); my $AND = ''; my @values; if ( $index->element('index_from_store')->get() ) { $AND .= qq[ AND doctype = ? AND store = ? ]; push @values, $doctype, $store; } my $dbh = $index->get_dbh_writer(); my $table_name = $index->data_table_name(); my $sth = $dbh->prepare_cached ( qq[ SELECT count(*) from $table_name WHERE doc_id = ? $AND ] ); $sth->execute( $id, @values ); my $count = $sth->fetchrow_arrayref->[0]; $sth->finish(); return $count; } # returns count sub sql_id_indexed_p { my ( $index, $id ) = @_; return 0 if $id eq 'COMMA_DB_SEQUENCE_SET'; my $table_name = $index->data_table_name(); my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ SELECT count(*) FROM $table_name WHERE doc_id = ? ] ); $sth->execute( $id ); return $sth->fetchrow_arrayref->[0]; } # returns count sub sql_seq_indexed_p { my ( $index, $seq ) = @_; my $dbh = $index->get_dbh_writer(); my $table_name = $index->data_table_name(); my $sth = $dbh->prepare_cached ( qq[ SELECT count(*) FROM $table_name WHERE _sq = ? ] ); $sth->execute( $seq ); return $sth->fetchrow_arrayref->[0]; } # args: $index, $table_name sub sql_simple_rows_count { my ( $index, $table_name ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ SELECT count(*) from $table_name ] ); $sth->execute(); my $count = $sth->fetchrow_arrayref()->[0]; $sth->finish(); return $count; } # both drops the table and removes the index_tables entry sub sql_drop_table { my ( $index, $table_name ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ DROP TABLE $table_name ] ); $sth->execute(); $sth->finish(); $sth = $dbh->prepare_cached ( qq[ DELETE FROM index_tables WHERE table_name = ? ] ); $sth->execute( $table_name ); $sth->finish(); } sub sql_update_timestamp { my ( $index, $table_name ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached( qq[ UPDATE index_tables SET last_modified = ? WHERE table_name = ? ] ); $sth->execute( time(), $table_name ); $sth->finish(); } # returns timestamp -- (also used to check whether a table exists) sub sql_get_timestamp { my ( $index, $table_name ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ SELECT last_modified FROM index_tables WHERE table_name = ? ] ); $sth->execute( $table_name ); my $result = $sth->fetchrow_arrayref(); $sth->finish(); return $result ? $result->[0] : ''; } # returns flag value sub sql_get_table_comma_flag { my ( $self, $dbh, $table_name ) = @_; my $sth = $dbh->prepare_cached ( qq[ SELECT _comma_flag FROM index_tables WHERE table_name = ? ] ); $sth->execute( $table_name ); my $result = $sth->fetchrow_arrayref(); $sth->finish(); return $result ? $result->[0] : ''; } sub sql_set_table_comma_flag { my ( $self, $dbh, $table_name, $flag_value ) = @_; my $sth = $dbh->prepare_cached ( qq[ UPDATE index_tables SET _comma_flag = ? WHERE table_name = ? ] ); $sth->execute( $flag_value, $table_name ); $sth->finish(); } sub sql_set_all_table_comma_flags_politely { my ( $index, $flag_value ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ UPDATE index_tables SET _comma_flag = ? WHERE index_name = ? AND doctype = ? AND _comma_flag = ? ] ); $sth->execute( $flag_value, $index->element('name')->get(), $index->doctype(), 0 ); $sth->finish(); } sub sql_get_all_tables_with_comma_flags_set { my ( $index, $ignore_flag ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ SELECT table_name FROM index_tables WHERE index_name = ? AND doctype = ? AND ((_comma_flag != ? ) AND (_comma_flag != ? )) ] ); $sth->execute( $index->element('name')->get(), $index->doctype(), 0, $ignore_flag ); return map { $_->[0] } @{$sth->fetchall_arrayref()}; } sub sql_unset_all_table_comma_flags { my ( $index ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ UPDATE index_tables SET _comma_flag = ? WHERE index_name = ? AND doctype = ? ] ); $sth->execute( 0, $index->element('name')->get(), $index->doctype() ); $sth->finish(); } sub sql_unset_table_comma_flag { my ( $self, $dbh, $table_name ) = @_; my $sth = $dbh->prepare_cached ( qq[ UPDATE index_tables SET _comma_flag = ? WHERE table_name = ? ] ); $sth->execute( 0, $table_name ); $sth->finish(); } sub sql_set_all_comma_flags { my ( $index, $table_name, $flag_value ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ UPDATE $table_name SET _comma_flag = ? ] ); $sth->execute( $flag_value ); $sth->finish(); } sub sql_clear_all_comma_flags { my ( $self, $dbh, $table_name ) = @_; my $sth = $dbh->prepare_cached ( qq[ UPDATE $table_name SET _comma_flag = ? ] ); $sth->execute( 0 ); $sth->finish(); } sub sql_clean_find_orphans { my ( $index, $table_name, $data_table_name ) = @_; return qq[ SELECT $table_name.doc_id FROM $table_name LEFT JOIN $data_table_name ON $table_name.doc_id = $data_table_name.doc_id WHERE $data_table_name.doc_id is NULL ]; } sub sql_set_comma_flags_for_clean_first_pass { my ( $index, $dbh, $data_table_name, $table_name, $erase_where_clause, $flag_value ) = @_; ## orphan rows in the sort tables. these can be created in small ## numbers by the normal fact of entries being cleaned from the data ## table before they are removed from the sort tables. orphans can ## be created in large numbers by an aborted rebuild() or other ## large operation. if ( $table_name ne $data_table_name ) { my $sql = $index->sql_clean_find_orphans( $table_name, $data_table_name ); my $sth = $dbh->prepare_cached ( $sql ); $sth->execute(); while ( my $row = $sth->fetchrow_arrayref() ) { my $orphan_id = $row->[0]; # print ( "orphan($table_name:$orphan_id)..." ); my $sth = $dbh->prepare_cached ( qq[ UPDATE $table_name SET _comma_flag = ? WHERE doc_id = ? ] ); $sth->execute( $flag_value, $orphan_id ); $sth->finish(); } } ## rows matching the erase_where_clause if ( $erase_where_clause ) { # TODO: come back here and shoehorn $erase_where_clause into using # placeholders? my $sth = $dbh->prepare_cached ( qq[ UPDATE $table_name SET _comma_flag = ? WHERE $erase_where_clause ] ); $sth->execute( $flag_value ); $sth->finish(); } } sub sql_set_comma_flags_for_clean_second_pass { my ( $self, $dbh, $table_name, $order_by, $sort_spec, $doctype, $indexname, $size_limit, $flag_value ) = @_; # dbg 'table_name', $table_name; # dbg 'order_by', $order_by; # dbg 'size_limit', $size_limit; # get the index so we can make an iterator my $index = XML::Comma::Def->read ( name=>$doctype )->get_index( $indexname ); # now set the flag for everything after the first size_limit entries my $i = $index->iterator ( order_by => $order_by, sort_spec => $sort_spec ); # dbg 'doc_ids after refresh', $_->doc_id() while ( $_ = $i++ ); $i->iterator_refresh ( 0xffffff, $size_limit ); # blech, hack while ( $i->iterator_next() ) { my $id = $i->doc_id(); my $sth = $dbh->prepare_cached ( qq[ UPDATE $table_name SET _comma_flag = ? WHERE doc_id = ? ] ); $sth->execute( $flag_value, $id ); $sth->finish(); # dbg 'set_flag_second', "UPDATE $table_name SET _comma_flag=$flag_value WHERE doc_id='$id'"; } } sub sql_delete_where_not_comma_flags { my ( $self, $dbh, $table_name, $flag_value ) = @_; my $sth = $dbh->prepare_cached ( qq[ DELETE FROM $table_name WHERE _comma_flag != ? ] ); $sth->execute( $flag_value ); $sth->finish(); } sub sql_delete_where_comma_flags { my ( $index, $dbh, $table_name, $flag_value ) = @_; my $sth = $dbh->prepare_cached ( qq[ DELETE FROM $table_name WHERE _comma_flag = ? ] ); $sth->execute( $flag_value ); $sth->finish(); } # XXX not called from anywhere (dug) sub sql_select_aggregate { my ( $index, $aggregate, $field_name, $table_name ) = @_; my $dbh = $index->get_dbh_writer(); my $sth = $dbh->prepare_cached ( qq[ SELECT $aggregate($field_name) FROM $table_name ] ); $sth->execute(); my $result = $sth->fetchall_arrayref(); return $result ? $result->[0]->[0] : ''; } sub sql_select_returns_count { return; } sub sql_select_distinct_field { my ( $it, $index, $field_name, $where ) = @_; my $dbh = $index->get_dbh_reader(); my $data_table_name = $index->data_table_name(); my $str = qq[ SELECT DISTINCT $field_name FROM $data_table_name ]; if ( $where ) { $str .= qq[ WHERE $where ]; } my $sth = $dbh->prepare_cached ( $str ); return $sth; } ## # complex select statement build -- for iterator # # It would take more tricks than I currently have in my bag to coerce # this method into using placeholders (deparsing user specified SQL # before re-packing it with placeholders). [dug] sub sql_select_from_data { my ( $iterator, $index, $order_by_expressions, $from_tables, $where_clause, $having_clause, $distinct, $order_by, $limit_number, $limit_offset, $columns_list, $collection_spec, $textsearch_spec, $do_count_only, $aggregate_function ) = @_; #dbg 'select_from_data', $where_clause; my $data_table_name = $index->data_table_name(); # more hard-coded crappyness. If we aliased the tables for a # binary collection search we need to make $data_table_name match the # alias. foreach my $tbl ( @{ $from_tables } ) { if ( $tbl =~ /^\s*$data_table_name AS t01$/ ) { $data_table_name = 't01'; last; } } my $dbh = $index->get_dbh_writer(); my $distinct_string; if ( $distinct ) { $distinct_string = 'DISTINCT '; } else { $distinct_string = ''; } # the core part of the statement my $select; if ( $aggregate_function ) { $select = "SELECT $aggregate_function" if $aggregate_function; } else { if ( $do_count_only ) { $select = "SELECT COUNT( $distinct_string $data_table_name.doc_id )"; } else { $select = "SELECT $distinct_string"; $select .= join ( ',', "$data_table_name.doc_id", (map { ref $_ ? $_->[0] . '.extra as ' . $_->[1] : "$data_table_name.$_" } @$columns_list), "$data_table_name.record_last_modified" ); } } # extra expressions to select for (the Iterator would have determined # that these are used in the order_by) my @evalled_order_by_list; foreach my $el ( @{$order_by_expressions} ) { my $expr = $el->element('expression')->get(); my $evalled = eval $expr; if ( $@ ) { die "error while eval'ing order_by '$expr': $@\n"; } push @evalled_order_by_list, [ $el->element('name')->get(), $evalled ]; } my $extra_order_by = join ( ',' , map { ' (' . $_->[1] . ') as ' . $_->[0] } @evalled_order_by_list ); $extra_order_by = ',' . $extra_order_by if $extra_order_by; # from tables my $from = ' FROM ' . join ( ',', @{$from_tables} ); # where clause my $where = ' WHERE 1=1'; $where .= " AND ($where_clause)" if $where_clause; $where .= " AND $collection_spec" if $collection_spec; $where .= " AND ($textsearch_spec)" if $textsearch_spec; # having clause my $having = ''; $having .= " HAVING ($having_clause)" if $having_clause; # group by clause my $group_by = ''; # order by clause my $order = ''; if ( $order_by ) { $order = " ORDER BY $order_by"; } # limit what the db server gives back my $limit = $index->sql_limit_clause ( $limit_number, $limit_offset ); # return either a regular statement, a count() statement, or an # aggregate statement if ( $aggregate_function ) { # aggregate ignores limit stuff return $select . $from . $where; } elsif ( $do_count_only ) { # count_only ignores order_by stuff return $select . $from . $where . $group_by; } else { # my ( $package, $filename, $line ) = caller(2); #print $select.$extra_order_by.$from.$where.$order.$limit . "\n"; return $select . $extra_order_by . $from . $where . $having . $group_by. $order . $limit; } } # ## sub sql_limit_clause { my ( $index, $limit_number, $limit_offset ) = @_; if ( $limit_number ) { if ( $limit_offset ) { return " LIMIT $limit_number OFFSET $limit_offset"; } else { return " LIMIT $limit_number"; } } else { return ''; } } sub sql_create_textsearch_temp_table { my ( $index, $ts_index_table_name, $word ) = @_; #dbg 'tcreate', $word; my $dbh = $index->get_dbh_writer(); my $packed = $index->sql_get_textsearch_index_packed ( $ts_index_table_name, $word ) || return ( '', 0 ); my ($temp_fh, $temp_filename ) = File::Temp::tempfile ( 'comma_db_XXXXXX', DIR => XML::Comma->tmp_directory() ); my @unpacked = $index->sql_textsearch_unpack_seq_list($packed); print $temp_fh join ( "\n", @unpacked ) . "\n"; close ( $temp_fh ); chmod 0644, $temp_filename; my $temp_table_name = $index->sql_create_textsearch_temp_table_stmt(); $index->sql_load_data ( $temp_table_name, $temp_filename ); unlink ( $temp_filename ); #dbg $$, "created temp table $temp_table_name for $word"; return ( $temp_table_name, $#unpacked ); } sub sql_create_textsearch_temp_table_stmt { my $index = shift; my $dbh = $index->get_dbh_writer(); my $temp_table_name = '_temp_' . $$ . '_' . int(rand(0xffffffff)); my $sth = $dbh->prepare_cached ( qq[ CREATE TEMPORARY TABLE $temp_table_name ( id VARCHAR(255) PRIMARY KEY ) TYPE=HEAP ]); $sth->execute(); $sth->finish(); return $temp_table_name; } sub sql_load_data { die "sql_load_data is not implemented"; } sub sql_drop_any_temp_tables { my ( $iterator, $index, $iterator_string, @tables_list ) = @_; my $dbh = $index->get_dbh_writer(); foreach my $t ( grep { /^_temp/ } @tables_list ) { # XML::Comma::Log->warn ( "$$ dropping $t for $iterator_string\n" ); my $sth = $dbh->prepare_cached ( qq[ drop table $t ] ); $sth->execute(); $sth->finish(); } } #TODO: replace all these die "foo is not implemented" with more # explanatory text (ie foo is a pure virtual function not defined # in Base.pm or so... sub sql_textsearch_cat_seq_list { die "sql_textsearch_cat_seq_list is not implemented"; } 1;